| In: |
lib/rq-3.0.0/jobqueue.rb
|
| Parent: | Object |
the JobQueue class is responsible for high level access to the job queue
| MAX_JID | = | 2 ** 20 |
| bin | [R] | |
| opts | [R] | |
| path | [R] | |
| qdb | [R] | |
| stderr | [R] | |
| stdin | [R] | |
| stdout | [R] |
# File lib/rq-3.0.0/jobqueue.rb, line 28
28: def create path, opts = {}
29: #--{{{
30: FileUtils::rm_rf path
31: FileUtils::mkdir_p path
32: db = File::join path, 'db'
33: qdb = QDB.create db, opts
34: opts['qdb'] = qdb
35: q = new path, opts
36: FileUtils::mkdir_p q.bin
37: FileUtils::mkdir_p q.stdin
38: FileUtils::mkdir_p q.stdout
39: FileUtils::mkdir_p q.stderr
40: q
41: #--}}}
42: end
# File lib/rq-3.0.0/jobqueue.rb, line 55
55: def initialize path, opts = {}
56: #--{{{
57: @path = path # do NOT expand this or it'll be fubar from misc nfs mounts!!
58: @bin = File::join @path, 'bin'
59: @stdin = File::join @path, 'stdin'
60: @stdout = File::join @path, 'stdout'
61: @stderr = File::join @path, 'stderr'
62: @opts = opts
63: raise "q <#{ @path }> does not exist" unless test ?e, @path
64: raise "q <#{ @path }> is not a directory" unless test ?d, @path
65: @basename = File::basename(@path)
66: @dirname = File::dirname(@path)
67: @logger = getopt('logger', opts) || Logger::new(STDERR)
68: @qdb = getopt('qdb', opts) || QDB::new(File::join(@path, 'db'), 'logger' => @logger)
69: @in_transaction = false
70: @in_ro_transaction = false
71: #--}}}
72: end
# File lib/rq-3.0.0/jobqueue.rb, line 882
882: def []= key, value
883: #--{{{
884: sql = "select count(*) from attributes where key='#{ key }';"
885: tuples = @qdb.execute sql
886: tuple = tuples.first
887: count = Integer tuple['count(*)']
888: case count
889: when 0
890: sql = "insert into attributes values('#{ key }','#{ value }');"
891: @qdb.execute sql
892: when 1
893: sql = "update attributes set key='#{ key }', value='#{ value }' where key='#{ key }';"
894: @qdb.execute sql
895: else
896: raise "key <#{ key }> has become corrupt!"
897: end
898: #--}}}
899: end
# File lib/rq-3.0.0/jobqueue.rb, line 810
810: def abort_transaction(*a,&b)
811: #--{{{
812: @qdb.abort_transaction(*a,&b)
813: #--}}}
814: end
# File lib/rq-3.0.0/jobqueue.rb, line 900
900: def attributes
901: #--{{{
902: h = {}
903: tuples = @qdb.execute "select * from attributes;"
904: tuples.map!{|t| h[t['key']] = t['value']}
905: h
906: #--}}}
907: end
# File lib/rq-3.0.0/jobqueue.rb, line 505
505: def delete(*args, &block)
506: #--{{{
507: whats, optargs = args.partition{|arg| not Hash === arg}
508:
509: opts = {}
510: optargs.each{|oa| opts.update oa}
511:
512: force = Util::getopt 'force', opts
513:
514: delete_sql, select_sql = '', ''
515:
516: whats << 'all' if whats.empty?
517:
518: whats.each do |what|
519: case "#{ what }"
520: when %/^\s*\d+\s*$/io # number
521: delete_sql << "delete from jobs where jid=#{ what } and state!='running';\n"
522: select_sql << "select * from jobs where jid=#{ what } and state!='running';\n"
523: when %/^\s*p/io # pending
524: delete_sql << "delete from jobs where state='pending';\n"
525: select_sql << "select * from jobs where state='pending';\n"
526: when %/^\s*h/io # holding
527: delete_sql << "delete from jobs where state='holding';\n"
528: select_sql << "select * from jobs where state='holding';\n"
529: when %/^\s*r/io # running
530: delete_sql << "delete from jobs where state='running';\n" if force
531: select_sql << "select * from jobs where state='running';\n" if force
532: when %/^\s*f/io # finished
533: delete_sql << "delete from jobs where state='finished';\n"
534: select_sql << "select * from jobs where state='finished';\n"
535: when %/^\s*d/io # dead
536: delete_sql << "delete from jobs where state='dead';\n"
537: select_sql << "select * from jobs where state='dead';\n"
538: when %/^\s*a/io # all
539: delete_sql << "delete from jobs where state!='running';\n"
540: select_sql << "select * from jobs where state!='running';\n"
541: else
542: raise ArgumentError, "cannot delete <#{ what.inspect }>"
543: end
544: end
545:
546: scrub = lambda do |jid|
547: [standard_in_4(jid), standard_out_4(jid), standard_err_4(jid)].each do |path|
548: FileUtils::rm_f path
549: end
550: end
551:
552: tuples = []
553:
554: metablock =
555: if block
556: lambda do |tuple|
557: jid = tuple['jid']
558: block[tuple]
559: scrub[jid]
560: end
561: else
562: lambda do |tuple|
563: jid = tuple['jid']
564: scrub[jid]
565: tuples << tuple
566: end
567: end
568:
569: # TODO - make file deletion transactional too
570:
571: transaction do
572: execute(select_sql, &metablock)
573: execute(delete_sql){}
574: end
575:
576: delete_sql = nil
577: select_sql = nil
578:
579: block ? nil : tuples
580: #--}}}
581: end
# File lib/rq-3.0.0/jobqueue.rb, line 790
790: def execute(*args, &block)
791: #--{{{
792: @qdb.execute(*args, &block)
793: #--}}}
794: end
# File lib/rq-3.0.0/jobqueue.rb, line 727
727: def getdeadjobs(started, &block)
728: #--{{{
729: ret = nil
730: sql = "select * from jobs\nwhere\nstate = 'running' and\nrunner='\#{ Util::hostname }' and\nstarted<='\#{ started }'\n"
731: if block
732: execute(sql, &block)
733: else
734: ret = execute(sql)
735: end
736: ret
737: #--}}}
738: end
# File lib/rq-3.0.0/jobqueue.rb, line 679
679: def getjob
680: #--{{{
681: sql = "select * from jobs\nwhere\n(state='pending' or (state='dead' and (not restartable isnull))) and\n(runner like '%\#{ Util::host }%' or runner isnull)\norder by priority desc, submitted asc, jid asc\nlimit 1;\n"
682: tuples = execute sql
683: job = tuples.first
684: job
685: #--}}}
686: end
# File lib/rq-3.0.0/jobqueue.rb, line 795
795: def integrity_check(*args, &block)
796: #--{{{
797: @qdb.integrity_check(*args, &block)
798: #--}}}
799: end
# File lib/rq-3.0.0/jobqueue.rb, line 746
746: def jobisdead job
747: #--{{{
748: jid = job['jid']
749: if jid
750: sql = "update jobs set state='dead' where jid='#{ jid }'"
751: execute(sql){}
752: end
753: job
754: #--}}}
755: end
# File lib/rq-3.0.0/jobqueue.rb, line 712
712: def jobisdone job
713: #--{{{
714: sql = "update jobs\nset\nstate = '\#{ job['state'] }',\nexit_status = '\#{ job['exit_status'] }',\nfinished = '\#{ job['finished'] }',\nelapsed = '\#{ job['elapsed'] }'\nwhere jid = \#{ job['jid'] };\n"
715: execute sql
716: #--}}}
717: end
# File lib/rq-3.0.0/jobqueue.rb, line 695
695: def jobisrunning job
696: #--{{{
697: sql = "update jobs\nset\npid='\#{ job['pid'] }',\nstate='\#{ job['state'] }',\nstarted='\#{ job['started'] }',\nrunner='\#{ job['runner'] }',\nstdout='\#{ job['stdout'] }',\nstderr='\#{ job['stderr'] }'\nwhere jid=\#{ job['jid'] };\n"
698: execute sql
699: #--}}}
700: end
# File lib/rq-3.0.0/jobqueue.rb, line 245
245: def list(*whats, &block)
246: #--{{{
247: ret = nil
248:
249: whats.replace(%( pending running finished dead )) if
250: whats.empty? or whats.include?('all')
251:
252: whats.map! do |what|
253: case what
254: when %/^\s*p/io
255: 'pending'
256: when %/^\s*h/io
257: 'holding'
258: when %/^\s*r/io
259: 'running'
260: when %/^\s*f/io
261: 'finished'
262: when %/^\s*d/io
263: 'dead'
264: else
265: what
266: end
267: end
268:
269: where_clauses = []
270:
271: whats.each do |what|
272: case what
273: when Numeric
274: where_clauses << "jid=#{ what }\n"
275: else
276: what = "#{ what }"
277: if what.to_s =~ %/^\s*\d+\s*$/o
278: where_clauses << "jid=#{ QDB::q what }\n"
279: else
280: where_clauses << "state=#{ QDB::q what }\n"
281: end
282: end
283: end
284:
285: where_clause = where_clauses.join(" or \n")
286:
287: sql = "select * from jobs\nwhere \#{ where_clause }\n"
288:
289: if block
290: ro_transaction{ execute(sql, &block) }
291: else
292: ret = ro_transaction{ execute(sql) }
293: end
294:
295: ret
296: #--}}}
297: end
# File lib/rq-3.0.0/jobqueue.rb, line 805
805: def lock(*args, &block)
806: #--{{{
807: @qdb.lock(*args, &block)
808: #--}}}
809: end
TODO - use mtime to optimize checks by feeder??
# File lib/rq-3.0.0/jobqueue.rb, line 877
877: def mtime
878: #--{{{
879: File::stat(@path).mtime
880: #--}}}
881: end
# File lib/rq-3.0.0/jobqueue.rb, line 475
475: def query(where_clause = nil, &block)
476: #--{{{
477: ret = nil
478:
479: sql =
480: if where_clause
481:
482: #
483: # turn =~ into like clauses
484: #
485: #where_clause.gsub!(/(=~\s*([^\s')(=]+))/om){q = $2.gsub(%r/'+|\s+/o,''); "like '%#{ q }%'"}
486: #
487: # quote everything on the rhs of an '=' sign - helps with shell problems...
488: #
489: #where_clause.gsub!(/(==?\s*([^\s')(=]+))/om){q = $2.gsub(%r/'+|\s+/o,''); "='#{ q }'"}
490:
491: "select * from jobs where #{ where_clause };"
492: else
493: "select * from jobs;"
494: end
495:
496: if block
497: ro_transaction{ execute(sql, &block) }
498: else
499: ret = ro_transaction{ execute(sql) }
500: end
501:
502: ret
503: #--}}}
504: end
# File lib/rq-3.0.0/jobqueue.rb, line 800
800: def recover!(*args, &block)
801: #--{{{
802: @qdb.recover!(*args, &block)
803: #--}}}
804: end
# File lib/rq-3.0.0/jobqueue.rb, line 157
157: def resubmit(*jobs, &block)
158: #--{{{
159: now = Util::timestamp Time::now
160:
161: transaction do
162: jobs.each do |job|
163: jid = Integer job['jid']
164: command = job['command']
165: stdin = job['stdin']
166:
167: raise "no jid for job <#{ job.inspect }>" unless jid
168: raise "no command for job <#{ job.inspect }>" unless command
169:
170: tmp_stdin(stdin) do |ts|
171: tuple = QDB::tuple
172:
173: tuple['jid'] = jid
174: tuple['command'] = command
175: tuple['priority'] = job['priority'] || 0
176: tuple['tag'] = job['tag']
177: tuple['runner'] = job['runner']
178: tuple['restartable'] = job['restartable']
179: tuple['state'] = 'pending'
180: tuple['submitted'] = now
181: tuple['submitter'] = Util::hostname
182: tuple['stdin'] = stdin4 jid
183: tuple['stdout'] = nil
184: tuple['stderr'] = nil
185:
186: kvs = tuple.fields[1..-1].map{|f| "#{ f }=#{ QDB::q(tuple[ f ]) }"}
187: sql = "update jobs set #{ kvs.join ',' } where jid=#{ jid };\n"
188:
189: execute(sql){}
190:
191: FileUtils::rm_rf standard_in_4(jid)
192: FileUtils::rm_rf standard_out_4(jid)
193: FileUtils::rm_rf standard_err_4(jid)
194: FileUtils::cp ts.path, standard_in_4(jid) if ts
195:
196: if block
197: sql = "select * from jobs where jid = '#{ jid }'"
198: execute(sql, &block)
199: end
200: end # tmp_stdin
201: end # jobs.each
202: end # transaction
203:
204: self
205: #--}}}
206: end
# File lib/rq-3.0.0/jobqueue.rb, line 774
774: def ro_transaction(*args)
775: #--{{{
776: ret = nil
777: if @in_ro_transaction || @in_transaction
778: ret = yield
779: else
780: begin
781: @in_ro_transaction = true
782: @qdb.ro_transaction(*args){ ret = yield }
783: ensure
784: @in_ro_transaction = false
785: end
786: end
787: ret
788: #--}}}
789: end
# File lib/rq-3.0.0/jobqueue.rb, line 815
815: def rollback_transaction(*a,&b)
816: #--{{{
817: @qdb.rollback_transaction(*a,&b)
818: #--}}}
819: end
# File lib/rq-3.0.0/jobqueue.rb, line 821
821: def snapshot qtmp = "#{ @basename }.snapshot", retries = nil
822: #--{{{
823: qtmp ||= "#{ @basename }.snapshot"
824: debug{ "snapshot <#{ @path }> -> <#{ qtmp }>" }
825: retries = Integer(retries || 16)
826: debug{ "retries <#{ retries }>" }
827:
828: qss = nil
829: loopno = 0
830:
831: take_snapshot = lambda do
832: FileUtils::rm_rf qtmp
833: FileUtils::mkdir_p qtmp
834: %(db db.schema lock).each do |base|
835: src, dest = File::join(@path, base), File::join(qtmp, base)
836: debug{ "cp <#{ src }> -> <#{ dest }>" }
837: FileUtils::cp(src, dest)
838: end
839: ss = klass::new qtmp, @opts
840: if ss.integrity_check
841: ss
842: else
843: begin; recover! unless integrity_check; rescue; nil; end
844: ss.recover!
845: end
846: end
847:
848: loop do
849: break if loopno >= retries
850: if((ss = take_snapshot.call))
851: debug{ "snapshot <#{ qtmp }> created" }
852: qss = ss
853: break
854: else
855: debug{ "failure <#{ loopno + 1}> of <#{ retries }> attempts to create snapshot <#{ qtmp }> - retrying..." }
856: end
857: loopno += 1
858: end
859:
860: unless qss
861: debug{ "locking <#{ @path }> as last resort" }
862: @qdb.write_lock do
863: if((ss = take_snapshot.call))
864: debug{ "snapshot <#{ qtmp }> created" }
865: qss = ss
866: else
867: raise "failed <#{ loopno }> times to create snapshot <#{ qtmp }>"
868: end
869: end
870: end
871:
872: qss
873: #--}}}
874: end
# File lib/rq-3.0.0/jobqueue.rb, line 98
98: def standard_err_4 jid
99: #--{{{
100: File::expand_path(File::join(path, stderr4(jid)))
101: #--}}}
102: end
# File lib/rq-3.0.0/jobqueue.rb, line 78
78: def standard_in_4 jid
79: #--{{{
80: File::expand_path(File::join(path, stdin4(jid)))
81: #--}}}
82: end
# File lib/rq-3.0.0/jobqueue.rb, line 88
88: def standard_out_4 jid
89: #--{{{
90: File::expand_path(File::join(path, stdout4(jid)))
91: #--}}}
92: end
# File lib/rq-3.0.0/jobqueue.rb, line 302
302: def status options = {}
303: #--{{{
304: stats = OrderedAutoHash::new
305:
306: now = Time::now
307:
308: hms = lambda do |t|
309: elapsed =
310: begin
311: Float t
312: rescue
313: now - Util::stamptime(t, 'local' => true)
314: end
315: sh, sm, ss = Util::hms elapsed.to_f
316: s = "#{ '%2.2d' % sh }h#{ '%2.2d' % sm }m#{ '%05.2f' % ss }s"
317: end
318:
319: exit_code_map =
320: options[:exit_code_map] || options['exit_code_map'] || {}
321:
322: ro_transaction do
323: #
324: # jobs stats
325: #
326: total = 0
327: %( pending holding running finished dead ).each do |state|
328: sql = "select count(*) from jobs\nwhere\nstate='\#{ state }'\n"
329: tuples = execute sql
330: tuple = tuples.first
331: count = (tuple ? Integer(tuple.first || 0) : 0)
332: stats['jobs'][state] = count
333: total += count
334: end
335: stats['jobs']['total'] = total
336: #
337: # temporal stats
338: #
339: metrics = OrderedAutoHash::new
340: metrics['pending'] = 'submitted'
341: metrics['holding'] = 'submitted'
342: metrics['running'] = 'started'
343: metrics['finished'] = 'elapsed'
344: metrics['dead'] = 'elapsed'
345:
346: metrics.each do |state, metric|
347: sql =
348: unless metric == 'elapsed'
349: "select min(\#{ metric }) as max, max(\#{ metric }) as min\nfrom jobs where state='\#{ state }'\n"
350: else
351: "select min(\#{ metric }) as min, max(\#{ metric }) as max\nfrom jobs where state='\#{ state }'\n"
352: end
353: tuple = execute(sql).first
354: next unless tuple
355:
356: %( min max ).each do |time|
357: oh = nil
358: t = tuple[time]
359: if t
360: sql = "select jid from jobs where \#{ metric }='\#{ t }' and state='\#{ state }'\n"
361: which = execute(sql).first
362: jid = (which and which['jid']).to_i
363: if jid
364: oh = OrderedAutoHash::new
365: oh[jid] = hms[t]
366: oh.yaml_inline = true
367: end
368: stats['temporal'][state][time] = oh
369: end
370: end
371: #stats['temporal'][state] ||= nil
372: end
373: stats['temporal'] ||= nil
374: #
375: # generate performance stats
376: #
377: sql = "select avg(elapsed) from jobs\nwhere\nstate='finished'\n"
378: tuples = execute sql
379: tuple = tuples.first
380: avg = (tuple ? Float(tuple.first || 0) : 0)
381: stats['performance']['avg_time_per_job'] = hms[avg]
382:
383: list = []
384: 0.step(5){|i| list << (2 ** i)}
385: list << 24
386: list.sort!
387:
388: list = 1, 12, 24
389:
390: list.each do |n|
391: ago = now - (n * 3600)
392: ago = Util::timestamp ago
393: sql = "select count(*) from jobs\nwhere\nstate = 'finished' and\nfinished > '\#{ ago }'\n"
394: tuples = execute sql
395: tuple = tuples.first
396: count = (tuple ? Integer(tuple.first || 0) : 0)
397: #stats['performance']["n_jobs_in_last_#{ n }_hrs"] = count
398: stats['performance']["n_jobs_in_last_hrs"][n] = count
399: end
400:
401: #
402: # generate exit_status stats
403: #
404: #stats['exit_status'] = {}
405: sql = "select count(*) from jobs\nwhere\nstate='finished' and\nexit_status=0\n"
406: tuples = execute sql
407: tuple = tuples.first
408: successes = (tuple ? Integer(tuple.first || 0) : 0)
409: stats['exit_status']['successes'] = successes
410:
411: sql = "select count(*) from jobs\nwhere\n(state='finished' and\nexit_status!=0) or\nstate='dead'\n"
412: tuples = execute sql
413: tuple = tuples.first
414: failures = (tuple ? Integer(tuple.first || 0) : 0)
415: stats['exit_status']['failures'] = failures
416:
417: exit_code_map.each do |which, codes|
418: exit_status_clause = codes.map{|code| "exit_status=#{ code }"}.join(' or ')
419: sql = "select count(*) from jobs\nwhere\n(state='finished' and (\#{ exit_status_clause }))\n"
420: tuples = execute sql
421: tuple = tuples.first
422: n = (tuple ? Integer(tuple.first || 0) : 0)
423: stats['exit_status'][which] = n
424: end
425: end
426:
427: stats
428: #--}}}
429: end
# File lib/rq-3.0.0/jobqueue.rb, line 93
93: def stderr4 jid
94: #--{{{
95: "stderr/#{ jid }"
96: #--}}}
97: end
# File lib/rq-3.0.0/jobqueue.rb, line 73
73: def stdin4 jid
74: #--{{{
75: "stdin/#{ jid }"
76: #--}}}
77: end
# File lib/rq-3.0.0/jobqueue.rb, line 83
83: def stdout4 jid
84: #--{{{
85: "stdout/#{ jid }"
86: #--}}}
87: end
# File lib/rq-3.0.0/jobqueue.rb, line 103
103: def submit(*jobs, &block)
104: #--{{{
105: now = Util::timestamp Time::now
106:
107: transaction do
108: sql = "select max(jid) from jobs"
109: tuple = execute(sql).first
110: jid = tuple.first || 0
111: jid = Integer(jid) + 1
112:
113: jobs.each do |job|
114: command = job['command']
115: stdin = job['stdin']
116:
117: raise "no command for job <#{ job.inspect }>" unless command
118:
119: tmp_stdin(stdin) do |ts|
120: tuple = QDB::tuple
121:
122: tuple['command'] = command
123: tuple['priority'] = job['priority'] || 0
124: tuple['tag'] = job['tag']
125: tuple['runner'] = job['runner']
126: tuple['restartable'] = job['restartable']
127: tuple['state'] = 'pending'
128: tuple['submitted'] = now
129: tuple['submitter'] = Util::hostname
130: tuple['stdin'] = stdin4 jid
131: tuple['stdout'] = nil
132: tuple['stderr'] = nil
133:
134: values = QDB::q tuple
135:
136: sql = "insert into jobs values (#{ values.join ',' });\n"
137: execute(sql){}
138:
139: FileUtils::rm_rf standard_in_4(jid)
140: FileUtils::rm_rf standard_out_4(jid)
141: FileUtils::rm_rf standard_err_4(jid)
142: FileUtils::cp ts.path, standard_in_4(jid) if ts
143:
144: if block
145: sql = "select * from jobs where jid = '#{ jid }'"
146: execute(sql, &block)
147: end
148: end
149:
150: jid += 1
151: end
152: end
153:
154: self
155: #--}}}
156: end
# File lib/rq-3.0.0/jobqueue.rb, line 207
207: def tmp_stdin stdin = nil
208: #--{{{
209: stdin = nil if stdin.to_s.empty?
210: stdin = STDIN if stdin == '-'
211:
212: was_opened = false
213:
214: begin
215: unless stdin.respond_to?('read') or stdin.nil?
216: stdin = stdin.to_s
217: # relative to queue
218: if stdin =~ %|^@?stdin/\d+$|
219: stdin.gsub! %|^@|, ''
220: stdin = File::join(path, stdin)
221: end
222: stdin = File.expand_path stdin
223: stdin = open stdin
224: was_opened = true
225: end
226:
227: tmp = Tempfile::new "#{ Process::pid }_#{ rand }"
228: while((buf = stdin.read(8192))); tmp.write buf; end if stdin
229: tmp.close
230:
231: if block_given?
232: begin
233: yield tmp
234: ensure
235: tmp.close!
236: end
237: else
238: return tmp
239: end
240: ensure
241: stdin.close if was_opened rescue nil
242: end
243: #--}}}
244: end
# File lib/rq-3.0.0/jobqueue.rb, line 757
757: def transaction(*args)
758: #--{{{
759: raise "cannot upgrade ro_transaction" if @in_ro_transaction
760: ret = nil
761: if @in_transaction
762: ret = yield
763: else
764: begin
765: @in_transaction = true
766: @qdb.transaction(*args){ ret = yield }
767: ensure
768: @in_transaction = false
769: end
770: end
771: ret
772: #--}}}
773: end
# File lib/rq-3.0.0/jobqueue.rb, line 587
587: def update(kvs, *jids, &block)
588: #--{{{
589: ret = nil
590: #
591: # yank out stdin - which we allow as a key
592: #
593: stdin = kvs.delete 'stdin'
594: #
595: # validate/munge state value iff present
596: #
597: if((state = kvs['state']))
598: case state
599: when %/^p/io
600: kvs['state'] = 'pending'
601: when %/^h/io
602: kvs['state'] = 'holding'
603: else
604: raise "update of <state> = <#{ state }> not allowed (try pending or holding)"
605: end
606: end
607: #
608: # validate kvs pairs
609: #
610: allowed = %( priority command tag runner restartable )
611: kvs.each do |key, val|
612: raise "update of <#{ key }> = <#{ val }> not allowed" unless
613: (allowed.include?(key)) or (key == 'state' and %( pending holding ).include?(val))
614: end
615: #
616: # ensure there are acutally some jobs to update
617: #
618: raise "no jobs to update" if jids.empty?
619: #
620: # generates sql to update jids with kvs and sql to show updated tuples
621: #
622: build_sql =
623: lambda do |kvs, jids|
624: if(jids.delete('pending'))
625: execute("select jid from jobs where state='pending'") do |tuple|
626: jids << tuple['jid']
627: end
628: end
629:
630: if(jids.delete('holding'))
631: execute("select jid from jobs where state='holding'") do |tuple|
632: jids << tuple['jid']
633: end
634: end
635:
636: rollback_transaction "no jobs to update" if jids.empty?
637:
638: update_clause = kvs.map{|k,v| v ? "#{ k }='#{ v }'" : "#{ k }=NULL" }.join(",\n")
639: where_clause = jids.map{|jid| "jid=#{ jid }"}.join(" or\n")
640: update_sql =
641: "update jobs\n" <<
642: "set\n#{ update_clause }\n" <<
643: "where\n(state='pending' or state='holding') and\n(#{ where_clause })"
644: select_sql = "select * from jobs where (state='pending' or state='holding') and\n(#{ where_clause })"
645:
646: if kvs.empty?
647: [ nil, select_sql ]
648: else
649: [ update_sql, select_sql ]
650: end
651: end
652: #
653: # setup stdin
654: #
655: tmp_stdin(stdin) do |ts|
656: clobber_stdin = lambda{|job| FileUtils::cp ts.path, standard_in_4(job['jid']) if ts}
657:
658: tuples = []
659:
660: metablock =
661: if block
662: lambda{|job| clobber_stdin[job]; block[job]}
663: else
664: lambda{|job| clobber_stdin[job]; tuples << job}
665: end
666:
667: transaction do
668: update_sql, select_sql = build_sql[kvs, jids]
669: break unless select_sql
670: execute(update_sql){} if update_sql
671: execute(select_sql, &metablock)
672: end
673:
674: block ? nil : tuples
675: end
676: #--}}}
677: end