| In: |
lib/rq-2.3.1/jobrunnerdaemon.rb
|
| Parent: | Object |
as stated in the description of the JobRunner class, the JobRunnerDaemon is a helper daemon that runs as a drb object. it’s primary responsibilty is simply for enable forks to occur in a a different address space that the one doing the sqlite transaction. in addition to forking to create child processes in which to run jobs, the JobRunnerDaemon daemon also provides facilities to wait for these children
| pid | [RW] | |
| runners | [R] | |
| uri | [RW] |
# File lib/rq-2.3.1/jobrunnerdaemon.rb, line 29
29: def daemon(*a,&b)
30: #--{{{
31: jrd = new(*a, &b)
32:
33: r, w = IO::pipe
34:
35: unless((pid = fork)) # child
36: $0 = "#{ self }".gsub(%/[^a-zA-Z]+/,'_').downcase
37: begin
38: r.close
39: n = 0
40: uri = nil
41: socket = nil
42:
43: 42.times do
44: begin
45: s = "%s/%s_%s_%s_%s" %
46: [Dir::tmpdir, File::basename($0), Process::ppid, n, rand(42)]
47: u = "drbunix://#{ s }"
48: DRb::start_service u, jrd
49: socket = s
50: uri = u
51: break
52: rescue Errno::EADDRINUSE
53: n += 1
54: end
55: end
56:
57: if socket and uri
58: w.write socket
59: w.close
60: pid = Process::pid
61: ppid = Process::ppid
62: cur = Thread::current
63: Thread::new(pid, ppid, cur) do |pid, ppid, cur|
64: loop do
65: begin
66: Process::kill 0, ppid
67: sleep 42
68: rescue
69: cur.raise "parent <#{ ppid }> died unexpectedly"
70: end
71: end
72: end
73: DRb::thread.join
74: else
75: w.close
76: end
77: ensure
78: exit!
79: end
80: else # parent
81: w.close
82: socket = r.read
83: r.close
84:
85: if socket and File::exist?(socket)
86: at_exit{ FileUtils::rm_f socket }
87: uri = "drbunix://#{ socket }"
88: #
89: # starting this on localhost avoids dns lookups!
90: #
91: DRb::start_service 'druby://localhost:0', nil
92: jrd = DRbObject::new nil, uri
93: jrd.pid = pid
94: jrd.uri = uri
95: else
96: raise "failed to start job runner daemon"
97: end
98: end
99:
100: return jrd
101: #--}}}
102: end
# File lib/rq-2.3.1/jobrunnerdaemon.rb, line 108
108: def initialize
109: #--{{{
110: @runners = {}
111: @uri = nil
112: @pid = Process::pid
113: #--}}}
114: end
# File lib/rq-2.3.1/jobrunnerdaemon.rb, line 181
181: def install_signal_handlers
182: #--{{{
183: %(TERM INT HUP).each{|sig| trap sig, 'SIG_IGN'}
184: #--}}}
185: end
# File lib/rq-2.3.1/jobrunnerdaemon.rb, line 115
115: def runner job
116: #--{{{
117: r = nil
118: retried = false
119: begin
120: r = JobRunner::new job
121: rescue Errno::ENOMEM, Errno::EAGAIN
122: GC::start
123: unless retried
124: retried = true
125: retry
126: else
127: raise
128: end
129: end
130: @runners[r.pid] = r
131: r
132: #--}}}
133: end
# File lib/rq-2.3.1/jobrunnerdaemon.rb, line 164
164: def shutdown
165: #--{{{
166: @death =
167: Thread::new do
168: begin
169: while not @runners.empty?
170: pid = Process::wait
171: @runners.delete pid
172: end
173: ensure
174: #sleep 4.2
175: DRb::thread.kill
176: Thread::main exit!
177: end
178: end
179: #--}}}
180: end
# File lib/rq-2.3.1/jobrunnerdaemon.rb, line 134
134: def wait
135: #--{{{
136: pid = Process::wait
137: @runners.delete pid
138: pid
139: #--}}}
140: end
# File lib/rq-2.3.1/jobrunnerdaemon.rb, line 141
141: def wait2
142: #--{{{
143: pid, status = Process::wait2
144: @runners.delete pid
145: [pid, status]
146: #--}}}
147: end
# File lib/rq-2.3.1/jobrunnerdaemon.rb, line 148
148: def waitpid pid = -1, flags = 0
149: #--{{{
150: pid = pid.pid if pid.respond_to? 'pid'
151: pid = Process::waitpid pid, flags
152: @runners.delete pid
153: pid
154: #--}}}
155: end