| In: |
lib/rq-2.3.1/relayer.rb
|
| Parent: | MainHelper |
| DEFAULT_MIN_SLEEP | = | 42 |
| DEFAULT_MAX_SLEEP | = | 240 |
| DEFAULT_RELAY | = | 16 |
| max_sleep | [RW] | |
| min_sleep | [RW] | |
| relay | [RW] |
# File lib/rq-2.3.1/relayer.rb, line 98
98: def daemon
99: #--{{{
100: if @options['daemon']
101: fork do
102: Process::setsid
103: fork do
104: Dir::chdir(Util.realpath('~'))
105: File::umask 0
106: open('/dev/null','r+') do |f|
107: STDIN.reopen f
108: STDOUT.reopen f
109: STDERR.reopen f
110: end
111: @daemon = true
112: yield
113: exit EXIT_SUCCESS
114: end
115: exit!
116: end
117: exit!
118: else
119: @daemon = false
120: yield
121: exit EXIT_SUCCESS
122: end
123: #--}}}
124: end
# File lib/rq-2.3.1/relayer.rb, line 125
125: def gen_pidfile name = nil
126: #--{{{
127: name ||= gen_relayer_name(@options['name'] || @qpath)
128: @pidfile =
129: begin
130: open name, File::CREAT | File::EXCL | File::RDWR
131: rescue
132: open name, File::RDWR
133: end
134: unless @pidfile and @pidfile.posixlock(File::LOCK_EX | File::LOCK_NB)
135: pid = IO::read(name) rescue nil
136: pid ||= 'unknown'
137: if @options['quiet']
138: exit EXIT_FAILURE
139: else
140: raise "process <#{ pid }> is already relaying from this queue"
141: end
142: else
143: @pidfile.rewind
144: @pidfile.sync = true
145: @pidfile.print Process::pid
146: @pidfile.truncate @pidfile.pos
147: at_exit{ FileUtils::rm_f name rescue nil }
148: end
149: #--}}}
150: end
# File lib/rq-2.3.1/relayer.rb, line 151
151: def gen_relayer_name path
152: #--{{{
153: path = Util::realpath(path).gsub(%|/|o, '_')
154: File::join(Util::realpath('~'), ".#{ path }.relayer")
155: #--}}}
156: end
# File lib/rq-2.3.1/relayer.rb, line 183
183: def handle_signal
184: #--{{{
185: if $rq_sigterm or $rq_sigint
186: reap_jobs(reap_only = true) until nothing_running?
187: info{ "** STOPPING **" }
188: @jrd.shutdown rescue nil
189: @pidfile.posixlock File::LOCK_UN
190: exit EXIT_SUCCESS
191: end
192:
193: if $rq_sighup
194: reap_jobs(reap_only = true) until nothing_running?
195: info{ "** RESTARTING **" }
196: @jrd.shutdown rescue nil
197: Util::uncache __FILE__
198: @pidfile.posixlock File::LOCK_UN
199: Util::exec @cmd
200: end
201: #--}}}
202: end
# File lib/rq-2.3.1/relayer.rb, line 157
157: def install_signal_handlers
158: #--{{{
159: if @daemon
160: $rq_signaled = false
161: $rq_sighup = false
162: $rq_sigterm = false
163: $rq_sigint = false
164: trap('SIGHUP') do
165: $rq_signaled = $rq_sighup = 'SIGHUP'
166: warn{ "signal <SIGHUP>" }
167: warn{ "finishing running jobs before handling signal" }
168: end
169: trap('SIGTERM') do
170: $rq_signaled = $rq_sigterm = 'SIGTERM'
171: warn{ "signal <SIGTERM>" }
172: warn{ "finishing running jobs before handling signal" }
173: end
174: trap('SIGINT') do
175: $rq_signaled = $rq_sigint = 'SIGINT'
176: warn{ "signal <SIGINT>" }
177: warn{ "finishing running jobs before handling signal" }
178: end
179: @jrd.install_signal_handlers
180: end
181: #--}}}
182: end
# File lib/rq-2.3.1/relayer.rb, line 256
256: def reap
257: #--{{{
258: debug{ "reaping finished/dead jobs" }
259:
260: sql = "select jid from jobs where or state='running'\n"
261: tuples = hdb.execute sql
262: hjids = tuples.map{|t| t['jid']}
263:
264: unless jids.empty
265: where_clauses = hjids.map{|hjid| "jid=#{ hjid }" }
266: where_clause = where_clauses.join ' or '
267: sql = "select jid from jobs where state='finished' or state='dead' and (\#{ where_clause })\n"
268: end
269:
270: debug{ "reaped finished/dead jobs" }
271: self
272: #--}}}
273: end
TODO - this will need to map jids here to jids there
# File lib/rq-2.3.1/relayer.rb, line 251
251: def reap_and_sow
252: #--{{{
253: transaction{ reap and sow }
254: #--}}}
255: end
# File lib/rq-2.3.1/relayer.rb, line 240
240: def relax
241: #--{{{
242: seconds = rand(@max_sleep - @min_sleep + 1) + @min_sleep
243: debug{ "relaxing <#{ seconds }>" }
244: sleep seconds
245: #--}}}
246: end
# File lib/rq-2.3.1/relayer.rb, line 27
27: def relay
28: #--{{{
29: daemon do
30: gen_pidfile
31: @main.init_logging
32: @logger = @main.logger
33:
34: set_q
35:
36: #
37: # munge @q/@qpath to set there
38: #
39: @here = @q
40: @qpath = realpath @main.argv.shift
41: set_q
42: @there = @q
43: @q = @here
44: @hdb = @here.qdb
45: @tdb = @there.qdb
46:
47: @pid = Process::pid
48: @cmd = @main.cmd
49: @started = Util::timestamp
50: @min_sleep = Integer(@options['min_sleep'] || defval('min_sleep'))
51: @max_sleep = Integer(@options['max_sleep'] || defval('max_sleep'))
52: @relay = Integer(@options['number'] || defval('relay'))
53:
54: @transactions = {}
55:
56:
57: install_signal_handlers
58:
59: info{ "** STARTED **" }
60: info{ "version <#{ RQ::VERSION }>" }
61: info{ "cmd <#{ @cmd }>" }
62: info{ "pid <#{ @pid }>" }
63: info{ "pidfile <#{ @pidfile.path }>" }
64: info{ "here <#{ @here.path }>" }
65: info{ "there <#{ @there.path }>" }
66:
67: debug{ "mode <#{ @mode }>" }
68: debug{ "min_sleep <#{ @min_sleep }>" }
69: debug{ "max_sleep <#{ @max_sleep }>" }
70: debug{ "relay <#{ @relay }>" }
71:
72: exit
73:
74: loop do
75: handle_signal if $rq_signaled
76: throttle(@min_sleep) do
77: reap_and_sow
78: relax
79: end
80: end
81: =begin
82: loop do
83: handle_signal if $rq_signaled
84: throttle(@min_sleep) do
85: start_jobs unless busy?
86: if nothing_running?
87: relax
88: else
89: reap_jobs
90: end
91: end
92: end
93: =end
94:
95: end
96: #--}}}
97: end
# File lib/rq-2.3.1/relayer.rb, line 203
203: def throttle rate = @min_sleep
204: #--{{{
205: if Numeric === rate and rate > 0
206: if defined? @last_throttle_time and @last_throttle_time
207: elapsed = Time.now - @last_throttle_time
208: timeout = rate - elapsed
209: if timeout > 0
210: timeout = timeout + rand(rate * 0.10)
211: debug{ "throttle rate of <#{ rate }> exceeded - sleeping <#{ timeout }>" }
212: sleep timeout
213: end
214: end
215: @last_throttle_time = Time.now
216: end
217: yield
218: #--}}}
219: end
# File lib/rq-2.3.1/relayer.rb, line 220
220: def transaction
221: #--{{{
222: ret = nil
223: if @in_transaction
224: ret = yield
225: else
226: begin
227: @in_transaction = true
228: @here.transaction do
229: @there.transaction do
230: ret = yield
231: end
232: end
233: ensure
234: @in_transaction = false
235: end
236: end
237: ret
238: #--}}}
239: end