RQ::JobQueue (Class)

In: lib/rq-3.0.0/jobqueue.rb
Parent: Object
MainHelper StatusLister Snapshotter ReSubmitter Feeder Deleter Toucher Relayer Executor Submitter Locker IOViewer Backer Cron Configurator Lister Rotater Creator Recoverer Updater Querier ::Hash ConfigFile DRbUndumped JobRunner Main QDB JobQueue JobRunnerDaemon Array SleepCycle Job ArrayFields ::OrderedHash OrderedAutoHash LogMethods Refresher ResourceManager Resource lib/rq-3.0.0/refresher.rb lib/rq-3.0.0/snapshotter.rb lib/rq-3.0.0/deleter.rb lib/rq-3.0.0/feeder.rb lib/rq-3.0.0/configurator.rb lib/rq-3.0.0/cron.rb lib/rq-3.0.0/jobqueue.rb lib/rq-3.0.0/rotater.rb lib/rq-3.0.0/backer.rb lib/rq-3.0.0/toucher.rb lib/rq-3.0.0/qdb.rb lib/rq-3.0.0/configfile.rb lib/rq-3.0.0/mainhelper.rb lib/rq-3.0.0/lister.rb bin/rq.rb lib/rq-3.0.0/statuslister.rb lib/rq-3.0.0/updater.rb lib/rq-3.0.0/jobrunner.rb lib/rq-3.0.0/job.rb lib/rq-3.0.0/creator.rb lib/rq-3.0.0/sleepcycle.rb lib/rq-3.0.0/executor.rb lib/rq-3.0.0/resubmitter.rb lib/rq-3.0.0/orderedautohash.rb lib/rq-3.0.0/resourcemanager.rb lib/rq-3.0.0/resource.rb lib/rq-3.0.0/jobrunnerdaemon.rb lib/rq-3.0.0/recoverer.rb lib/rq-3.0.0/querier.rb lib/rq-3.0.0/ioviewer.rb lib/rq-3.0.0/locker.rb lib/rq-3.0.0/submitter.rb lib/rq-3.0.0/relayer.rb Usage Util LogClassMethods LoggerExt LogMethods Logging RQ Module: RQ

the JobQueue class is responsible for high level access to the job queue

Constants

MAX_JID = 2 ** 20

External Aliases

qdb -> db

Attributes

bin  [R] 
opts  [R] 
path  [R] 
qdb  [R] 
stderr  [R] 
stdin  [R] 
stdout  [R] 

Classes and Modules

Class RQ::JobQueue::Error

Included Modules

Logging Util

Public Class methods

[Source]

    # File lib/rq-3.0.0/jobqueue.rb, line 28
28:         def create path, opts = {}
29: #--{{{
30:           FileUtils::rm_rf path
31:           FileUtils::mkdir_p path
32:           db = File::join path, 'db'
33:           qdb = QDB.create db, opts
34:           opts['qdb'] = qdb
35:           q = new path, opts
36:           FileUtils::mkdir_p q.bin
37:           FileUtils::mkdir_p q.stdin 
38:           FileUtils::mkdir_p q.stdout
39:           FileUtils::mkdir_p q.stderr
40:           q
41: #--}}}
42:         end

[Source]

    # File lib/rq-3.0.0/jobqueue.rb, line 55
55:       def initialize path, opts = {}
56: #--{{{
57:         @path = path # do NOT expand this or it'll be fubar from misc nfs mounts!!
58:         @bin = File::join @path, 'bin' 
59:         @stdin = File::join @path, 'stdin' 
60:         @stdout = File::join @path, 'stdout' 
61:         @stderr = File::join @path, 'stderr' 
62:         @opts = opts
63:         raise "q <#{ @path }> does not exist" unless test ?e, @path
64:         raise "q <#{ @path }> is not a directory" unless test ?d, @path
65:         @basename = File::basename(@path)
66:         @dirname = File::dirname(@path)
67:         @logger = getopt('logger', opts) || Logger::new(STDERR)
68:         @qdb = getopt('qdb', opts) || QDB::new(File::join(@path, 'db'), 'logger' => @logger)
69:         @in_transaction = false
70:         @in_ro_transaction = false
71: #--}}}
72:       end

Public Instance methods

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 882
882:       def []= key, value
883: #--{{{
884:         sql = "select count(*) from attributes where key='#{ key }';"
885:         tuples = @qdb.execute sql
886:         tuple = tuples.first
887:         count = Integer tuple['count(*)']
888:         case count
889:           when 0
890:             sql = "insert into attributes values('#{ key }','#{ value }');"
891:             @qdb.execute sql
892:           when 1
893:             sql = "update attributes set key='#{ key }', value='#{ value }' where key='#{ key }';"
894:             @qdb.execute sql
895:           else
896:             raise "key <#{ key }> has become corrupt!"
897:         end
898: #--}}}
899:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 810
810:       def abort_transaction(*a,&b)
811: #--{{{
812:         @qdb.abort_transaction(*a,&b)
813: #--}}}
814:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 900
900:       def attributes 
901: #--{{{
902:         h = {}
903:         tuples = @qdb.execute "select * from attributes;"
904:         tuples.map!{|t| h[t['key']] = t['value']}
905:         h
906: #--}}}
907:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 505
505:       def delete(*args, &block)
506: #--{{{
507:         whats, optargs = args.partition{|arg| not Hash === arg}
508: 
509:         opts = {}
510:         optargs.each{|oa| opts.update oa}
511: 
512:         force = Util::getopt 'force', opts
513: 
514:         delete_sql, select_sql = '', ''
515: 
516:         whats << 'all' if whats.empty?
517: 
518:         whats.each do |what|
519:           case "#{ what }"
520:             when %/^\s*\d+\s*$/io # number
521:               delete_sql << "delete from jobs where jid=#{ what } and state!='running';\n"
522:               select_sql << "select * from jobs where jid=#{ what } and state!='running';\n"
523:             when %/^\s*p/io # pending
524:               delete_sql << "delete from jobs where state='pending';\n"
525:               select_sql << "select * from jobs where state='pending';\n"
526:             when %/^\s*h/io # holding
527:               delete_sql << "delete from jobs where state='holding';\n"
528:               select_sql << "select * from jobs where state='holding';\n"
529:             when %/^\s*r/io # running
530:               delete_sql << "delete from jobs where state='running';\n" if force
531:               select_sql << "select * from jobs where state='running';\n" if force
532:             when %/^\s*f/io # finished
533:               delete_sql << "delete from jobs where state='finished';\n"
534:               select_sql << "select * from jobs where state='finished';\n"
535:             when %/^\s*d/io # dead
536:               delete_sql << "delete from jobs where state='dead';\n"
537:               select_sql << "select * from jobs where state='dead';\n"
538:             when %/^\s*a/io # all
539:               delete_sql << "delete from jobs where state!='running';\n"
540:               select_sql << "select * from jobs where state!='running';\n"
541:             else
542:               raise ArgumentError, "cannot delete <#{ what.inspect }>"
543:           end
544:         end
545: 
546:         scrub = lambda do |jid|
547:           [standard_in_4(jid), standard_out_4(jid), standard_err_4(jid)].each do |path| 
548:             FileUtils::rm_f path
549:           end
550:         end
551: 
552:         tuples = []
553: 
554:         metablock = 
555:           if block
556:             lambda do |tuple|
557:               jid = tuple['jid']
558:               block[tuple]
559:               scrub[jid]
560:             end
561:           else
562:             lambda do |tuple|
563:               jid = tuple['jid']
564:               scrub[jid]
565:               tuples << tuple
566:             end
567:           end
568: 
569: # TODO - make file deletion transactional too
570: 
571:         transaction do
572:           execute(select_sql, &metablock)
573:           execute(delete_sql){}
574:         end
575: 
576:         delete_sql = nil
577:         select_sql = nil
578: 
579:         block ? nil : tuples
580: #--}}}
581:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 790
790:       def execute(*args, &block)
791: #--{{{
792:         @qdb.execute(*args, &block)
793: #--}}}
794:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 727
727:       def getdeadjobs(started, &block)
728: #--{{{
729:         ret = nil
730:         sql = "select * from jobs\nwhere\nstate = 'running' and\nrunner='\#{ Util::hostname }' and\nstarted<='\#{ started }'\n"
731:         if block
732:           execute(sql, &block)
733:         else
734:           ret = execute(sql)
735:         end
736:         ret
737: #--}}}
738:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 679
679:       def getjob
680: #--{{{
681:         sql = "select * from jobs\nwhere\n(state='pending' or (state='dead' and (not restartable isnull))) and\n(runner like '%\#{ Util::host }%' or runner isnull)\norder by priority desc, submitted asc, jid asc\nlimit 1;\n"
682:         tuples = execute sql
683:         job = tuples.first
684:         job
685: #--}}}
686:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 795
795:       def integrity_check(*args, &block)
796: #--{{{
797:         @qdb.integrity_check(*args, &block)
798: #--}}}
799:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 746
746:       def jobisdead job
747: #--{{{
748:         jid = job['jid']
749:         if jid
750:           sql = "update jobs set state='dead' where jid='#{ jid }'"
751:           execute(sql){}
752:         end
753:         job
754: #--}}}
755:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 712
712:       def jobisdone job
713: #--{{{
714:         sql = "update jobs\nset\nstate = '\#{ job['state'] }',\nexit_status = '\#{ job['exit_status'] }',\nfinished = '\#{ job['finished'] }',\nelapsed = '\#{ job['elapsed'] }'\nwhere jid = \#{ job['jid'] };\n"
715:         execute sql
716: #--}}}
717:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 695
695:       def jobisrunning job 
696: #--{{{
697:         sql = "update jobs\nset\npid='\#{ job['pid'] }',\nstate='\#{ job['state'] }',\nstarted='\#{ job['started'] }',\nrunner='\#{ job['runner'] }',\nstdout='\#{ job['stdout'] }',\nstderr='\#{ job['stderr'] }'\nwhere jid=\#{ job['jid'] };\n"
698:         execute sql
699: #--}}}
700:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 245
245:       def list(*whats, &block)
246: #--{{{
247:         ret = nil
248: 
249:         whats.replace(%( pending running finished dead )) if 
250:           whats.empty? or whats.include?('all')
251:     
252:         whats.map! do |what|
253:           case what
254:             when %/^\s*p/io
255:               'pending'
256:             when %/^\s*h/io
257:               'holding'
258:             when %/^\s*r/io
259:               'running'
260:             when %/^\s*f/io
261:               'finished'
262:             when %/^\s*d/io
263:               'dead'
264:             else
265:               what
266:           end
267:         end
268: 
269:         where_clauses = [] 
270: 
271:         whats.each do |what|
272:           case what
273:             when Numeric
274:               where_clauses << "jid=#{ what }\n"
275:             else
276:               what = "#{ what }"
277:               if what.to_s =~ %/^\s*\d+\s*$/o
278:                 where_clauses << "jid=#{ QDB::q what }\n"
279:               else
280:                 where_clauses << "state=#{ QDB::q what }\n"
281:               end
282:           end
283:         end
284: 
285:         where_clause = where_clauses.join(" or \n")
286: 
287:         sql = "select * from jobs\nwhere \#{ where_clause }\n"
288: 
289:         if block
290:           ro_transaction{ execute(sql, &block) }
291:         else
292:           ret = ro_transaction{ execute(sql) }
293:         end
294: 
295:         ret
296: #--}}}
297:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 805
805:       def lock(*args, &block)
806: #--{{{
807:         @qdb.lock(*args, &block)
808: #--}}}
809:       end

TODO - use mtime to optimize checks by feeder??

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 877
877:       def mtime
878: #--{{{
879:         File::stat(@path).mtime
880: #--}}}
881:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 475
475:       def query(where_clause = nil, &block)
476: #--{{{
477:         ret = nil
478: 
479:         sql = 
480:           if where_clause
481: 
482:           #
483:           # turn =~ into like clauses 
484:           #
485:             #where_clause.gsub!(/(=~\s*([^\s')(=]+))/om){q = $2.gsub(%r/'+|\s+/o,''); "like '%#{ q }%'"}
486:           #
487:           # quote everything on the rhs of an '=' sign - helps with shell problems...
488:           #
489:             #where_clause.gsub!(/(==?\s*([^\s')(=]+))/om){q = $2.gsub(%r/'+|\s+/o,''); "='#{ q }'"}
490: 
491:             "select * from jobs where #{ where_clause };"
492:           else
493:             "select * from jobs;"
494:           end
495: 
496:         if block
497:           ro_transaction{ execute(sql, &block) }
498:         else
499:           ret = ro_transaction{ execute(sql) }
500:         end
501: 
502:         ret
503: #--}}}
504:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 800
800:       def recover!(*args, &block)
801: #--{{{
802:         @qdb.recover!(*args, &block)
803: #--}}}
804:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 157
157:       def resubmit(*jobs, &block)
158: #--{{{
159:         now = Util::timestamp Time::now
160: 
161:         transaction do
162:           jobs.each do |job|
163:             jid = Integer job['jid']
164:             command = job['command']
165:             stdin = job['stdin']
166: 
167:             raise "no jid for job <#{ job.inspect }>" unless jid 
168:             raise "no command for job <#{ job.inspect }>" unless command 
169: 
170:             tmp_stdin(stdin) do |ts|
171:               tuple = QDB::tuple
172: 
173:               tuple['jid']         = jid
174:               tuple['command']     = command
175:               tuple['priority']    = job['priority'] || 0
176:               tuple['tag']         = job['tag']
177:               tuple['runner']      = job['runner']
178:               tuple['restartable'] = job['restartable']
179:               tuple['state']       = 'pending'
180:               tuple['submitted']   = now
181:               tuple['submitter']   = Util::hostname
182:               tuple['stdin']       = stdin4 jid
183:               tuple['stdout']      = nil
184:               tuple['stderr']      = nil
185: 
186:               kvs = tuple.fields[1..-1].map{|f| "#{ f }=#{ QDB::q(tuple[ f ]) }"}
187:               sql = "update jobs set #{ kvs.join ',' } where jid=#{ jid };\n"
188: 
189:               execute(sql){}
190: 
191:               FileUtils::rm_rf standard_in_4(jid)
192:               FileUtils::rm_rf standard_out_4(jid)
193:               FileUtils::rm_rf standard_err_4(jid)
194:               FileUtils::cp ts.path, standard_in_4(jid) if ts
195: 
196:               if block
197:                 sql = "select * from jobs where jid = '#{ jid }'"
198:                 execute(sql, &block)
199:               end
200:             end # tmp_stdin
201:           end # jobs.each
202:         end # transaction
203:     
204:         self
205: #--}}}
206:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 774
774:       def ro_transaction(*args)
775: #--{{{
776:         ret = nil
777:         if @in_ro_transaction || @in_transaction
778:           ret = yield
779:         else
780:           begin
781:             @in_ro_transaction = true
782:             @qdb.ro_transaction(*args){ ret = yield }
783:           ensure
784:             @in_ro_transaction = false 
785:           end
786:         end
787:         ret
788: #--}}}
789:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 815
815:       def rollback_transaction(*a,&b)
816: #--{{{
817:         @qdb.rollback_transaction(*a,&b)
818: #--}}}
819:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 821
821:       def snapshot qtmp = "#{ @basename }.snapshot", retries = nil 
822: #--{{{
823:         qtmp ||= "#{ @basename }.snapshot"
824:         debug{ "snapshot <#{ @path }> -> <#{ qtmp }>" }
825:         retries = Integer(retries || 16)
826:         debug{ "retries <#{ retries }>" }
827: 
828:         qss = nil
829:         loopno = 0
830: 
831:         take_snapshot = lambda do
832:           FileUtils::rm_rf qtmp
833:           FileUtils::mkdir_p qtmp
834:           %(db db.schema lock).each do |base|
835:             src, dest = File::join(@path, base), File::join(qtmp, base)
836:             debug{ "cp <#{ src }> -> <#{ dest }>" }
837:             FileUtils::cp(src, dest)
838:           end
839:           ss = klass::new qtmp, @opts
840:           if ss.integrity_check
841:             ss
842:           else
843:             begin; recover! unless integrity_check; rescue; nil; end
844:             ss.recover!
845:           end
846:         end
847: 
848:         loop do
849:           break if loopno >= retries
850:           if((ss = take_snapshot.call))
851:             debug{ "snapshot <#{ qtmp }> created" }
852:             qss = ss
853:             break
854:           else
855:             debug{ "failure <#{ loopno + 1}> of <#{ retries }> attempts to create snapshot <#{ qtmp }> - retrying..." }
856:           end
857:           loopno += 1
858:         end
859: 
860:         unless qss
861:           debug{ "locking <#{ @path }> as last resort" }
862:           @qdb.write_lock do
863:             if((ss = take_snapshot.call))
864:               debug{ "snapshot <#{ qtmp }> created" }
865:               qss = ss
866:             else
867:               raise "failed <#{ loopno }> times to create snapshot <#{ qtmp }>"
868:             end
869:           end
870:         end
871: 
872:         qss
873: #--}}}
874:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 98
 98:       def standard_err_4 jid
 99: #--{{{
100:         File::expand_path(File::join(path, stderr4(jid)))
101: #--}}}
102:       end

[Source]

    # File lib/rq-3.0.0/jobqueue.rb, line 78
78:       def standard_in_4 jid
79: #--{{{
80:         File::expand_path(File::join(path, stdin4(jid)))
81: #--}}}
82:       end

[Source]

    # File lib/rq-3.0.0/jobqueue.rb, line 88
88:       def standard_out_4 jid
89: #--{{{
90:         File::expand_path(File::join(path, stdout4(jid)))
91: #--}}}
92:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 302
302:       def status options = {}
303: #--{{{
304:         stats = OrderedAutoHash::new
305: 
306:         now = Time::now
307: 
308:         hms = lambda do |t|
309:           elapsed =
310:             begin
311:               Float t
312:             rescue
313:               now - Util::stamptime(t, 'local' => true)
314:             end
315:           sh, sm, ss = Util::hms elapsed.to_f
316:           s = "#{ '%2.2d' % sh }h#{ '%2.2d' % sm }m#{ '%05.2f' % ss }s" 
317:         end
318: 
319:         exit_code_map = 
320:           options[:exit_code_map] || options['exit_code_map'] || {}
321: 
322:         ro_transaction do
323:         #
324:         # jobs stats
325:         #
326:           total = 0
327:           %( pending holding running finished dead ).each do |state|
328:             sql = "select count(*) from jobs\nwhere\nstate='\#{ state }'\n"
329:             tuples = execute sql 
330:             tuple = tuples.first
331:             count = (tuple ? Integer(tuple.first || 0) : 0)
332:             stats['jobs'][state] = count
333:             total += count
334:           end
335:           stats['jobs']['total'] = total
336:         #
337:         # temporal stats 
338:         #
339:           metrics = OrderedAutoHash::new
340:           metrics['pending']  = 'submitted'
341:           metrics['holding']  = 'submitted'
342:           metrics['running']  = 'started'
343:           metrics['finished'] = 'elapsed'
344:           metrics['dead']     = 'elapsed'
345: 
346:           metrics.each do |state, metric|
347:             sql = 
348:               unless metric == 'elapsed'
349:                 "select min(\#{ metric }) as max, max(\#{ metric }) as min\nfrom jobs where state='\#{ state }'\n"
350:               else
351:                 "select min(\#{ metric }) as min, max(\#{ metric }) as max\nfrom jobs where state='\#{ state }'\n"
352:               end
353:             tuple = execute(sql).first
354:             next unless tuple
355: 
356:             %( min max ).each do |time|
357:               oh = nil
358:               t = tuple[time]
359:               if t
360:                 sql = "select jid from jobs where \#{ metric }='\#{ t }' and state='\#{ state }'\n"
361:                 which = execute(sql).first
362:                 jid = (which and which['jid']).to_i
363:                 if jid
364:                   oh = OrderedAutoHash::new
365:                   oh[jid] = hms[t]
366:                   oh.yaml_inline = true
367:                 end
368:                 stats['temporal'][state][time] = oh 
369:               end
370:             end
371:             #stats['temporal'][state] ||= nil
372:           end
373:           stats['temporal'] ||= nil
374:         #
375:         # generate performance stats
376:         #
377:           sql = "select avg(elapsed) from jobs\nwhere\nstate='finished'\n"
378:           tuples = execute sql 
379:           tuple = tuples.first
380:           avg = (tuple ? Float(tuple.first || 0) : 0)
381:           stats['performance']['avg_time_per_job'] = hms[avg] 
382: 
383:           list = []
384:           0.step(5){|i| list << (2 ** i)}
385:           list << 24
386:           list.sort!
387: 
388:           list = 1, 12, 24
389: 
390:           list.each do |n|
391:             ago = now - (n * 3600)
392:             ago = Util::timestamp ago
393:             sql = "select count(*) from jobs\nwhere\nstate = 'finished' and\nfinished  > '\#{ ago }'\n"
394:             tuples = execute sql 
395:             tuple = tuples.first
396:             count = (tuple ? Integer(tuple.first || 0) : 0)
397:             #stats['performance']["n_jobs_in_last_#{ n }_hrs"] = count
398:             stats['performance']["n_jobs_in_last_hrs"][n] = count
399:           end
400: 
401:         #
402:         # generate exit_status stats
403:         #
404:           #stats['exit_status'] = {}
405:           sql = "select count(*) from jobs\nwhere\nstate='finished' and\nexit_status=0\n"
406:           tuples = execute sql 
407:           tuple = tuples.first
408:           successes = (tuple ? Integer(tuple.first || 0) : 0)
409:           stats['exit_status']['successes'] = successes
410: 
411:           sql = "select count(*) from jobs\nwhere\n(state='finished' and\nexit_status!=0) or\nstate='dead'\n"
412:           tuples = execute sql 
413:           tuple = tuples.first
414:           failures = (tuple ? Integer(tuple.first || 0) : 0)
415:           stats['exit_status']['failures'] = failures
416: 
417:           exit_code_map.each do |which, codes|
418:             exit_status_clause = codes.map{|code| "exit_status=#{ code }"}.join(' or ')
419:             sql = "select count(*) from jobs\nwhere\n(state='finished' and (\#{ exit_status_clause }))\n"
420:             tuples = execute sql 
421:             tuple = tuples.first
422:             n = (tuple ? Integer(tuple.first || 0) : 0)
423:             stats['exit_status'][which] = n
424:           end
425:         end
426: 
427:         stats
428: #--}}}
429:       end

[Source]

    # File lib/rq-3.0.0/jobqueue.rb, line 93
93:       def stderr4 jid
94: #--{{{
95:         "stderr/#{ jid }"
96: #--}}}
97:       end

[Source]

    # File lib/rq-3.0.0/jobqueue.rb, line 73
73:       def stdin4 jid
74: #--{{{
75:         "stdin/#{ jid }"
76: #--}}}
77:       end

[Source]

    # File lib/rq-3.0.0/jobqueue.rb, line 83
83:       def stdout4 jid
84: #--{{{
85:         "stdout/#{ jid }"
86: #--}}}
87:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 103
103:       def submit(*jobs, &block)
104: #--{{{
105:         now = Util::timestamp Time::now
106:     
107:         transaction do
108:           sql = "select max(jid) from jobs"
109:           tuple = execute(sql).first
110:           jid = tuple.first || 0
111:           jid = Integer(jid) + 1
112: 
113:           jobs.each do |job|
114:             command = job['command']
115:             stdin = job['stdin']
116: 
117:             raise "no command for job <#{ job.inspect }>" unless command 
118: 
119:             tmp_stdin(stdin) do |ts|
120:               tuple = QDB::tuple
121: 
122:               tuple['command']     = command 
123:               tuple['priority']    = job['priority'] || 0
124:               tuple['tag']         = job['tag']
125:               tuple['runner']      = job['runner']
126:               tuple['restartable'] = job['restartable']
127:               tuple['state']       = 'pending'
128:               tuple['submitted']   = now
129:               tuple['submitter']   = Util::hostname
130:               tuple['stdin']       = stdin4 jid
131:               tuple['stdout']      = nil 
132:               tuple['stderr']      = nil 
133: 
134:               values = QDB::q tuple
135: 
136:               sql = "insert into jobs values (#{ values.join ',' });\n"
137:               execute(sql){}
138: 
139:               FileUtils::rm_rf standard_in_4(jid)
140:               FileUtils::rm_rf standard_out_4(jid)
141:               FileUtils::rm_rf standard_err_4(jid)
142:               FileUtils::cp ts.path, standard_in_4(jid) if ts
143: 
144:               if block
145:                 sql = "select * from jobs where jid = '#{ jid }'"
146:                 execute(sql, &block)
147:               end
148:             end
149: 
150:             jid += 1
151:           end
152:         end
153:     
154:         self
155: #--}}}
156:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 207
207:       def tmp_stdin stdin = nil
208: #--{{{
209:         stdin = nil if stdin.to_s.empty?
210:         stdin = STDIN if stdin == '-'
211: 
212:         was_opened = false
213: 
214:         begin
215:           unless stdin.respond_to?('read') or stdin.nil?
216:             stdin = stdin.to_s
217:             # relative to queue
218:             if stdin =~ %|^@?stdin/\d+$|
219:               stdin.gsub! %|^@|, ''
220:               stdin = File::join(path, stdin)
221:             end
222:             stdin = File.expand_path stdin
223:             stdin = open stdin
224:             was_opened = true
225:           end
226: 
227:           tmp = Tempfile::new "#{ Process::pid }_#{ rand }"
228:           while((buf = stdin.read(8192))); tmp.write buf; end if stdin
229:           tmp.close
230: 
231:           if block_given?
232:             begin
233:               yield tmp
234:             ensure
235:               tmp.close!
236:             end
237:           else
238:             return tmp
239:           end
240:         ensure
241:           stdin.close if was_opened rescue nil
242:         end
243: #--}}}
244:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 757
757:       def transaction(*args)
758: #--{{{
759:         raise "cannot upgrade ro_transaction" if @in_ro_transaction
760:         ret = nil
761:         if @in_transaction
762:           ret = yield
763:         else
764:           begin
765:             @in_transaction = true
766:             @qdb.transaction(*args){ ret = yield }
767:           ensure
768:             @in_transaction = false 
769:           end
770:         end
771:         ret
772: #--}}}
773:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 587
587:       def update(kvs, *jids, &block)
588: #--{{{
589:         ret = nil
590:       #
591:       # yank out stdin - which we allow as a key
592:       #
593:         stdin = kvs.delete 'stdin'
594:       #
595:       # validate/munge state value iff present
596:       #
597:         if((state = kvs['state']))
598:           case state
599:             when %/^p/io
600:               kvs['state'] = 'pending'
601:             when %/^h/io
602:               kvs['state'] = 'holding'
603:             else
604:               raise "update of <state> = <#{ state }> not allowed (try pending or holding)"
605:           end
606:         end
607:       #
608:       # validate kvs pairs
609:       #
610:         allowed = %( priority command tag runner restartable )
611:         kvs.each do |key, val|
612:           raise "update of <#{ key }> = <#{ val }> not allowed" unless
613:             (allowed.include?(key)) or (key == 'state' and %( pending holding ).include?(val))
614:         end
615:       #
616:       # ensure there are acutally some jobs to update
617:       #
618:         raise "no jobs to update" if jids.empty?
619:       #
620:       # generates sql to update jids with kvs and sql to show updated tuples
621:       #
622:         build_sql = 
623:           lambda do |kvs, jids|
624:             if(jids.delete('pending'))
625:               execute("select jid from jobs where state='pending'") do |tuple| 
626:                 jids << tuple['jid']
627:               end
628:             end
629: 
630:             if(jids.delete('holding'))
631:               execute("select jid from jobs where state='holding'") do |tuple| 
632:                 jids << tuple['jid']
633:               end
634:             end
635: 
636:             rollback_transaction "no jobs to update" if jids.empty?
637: 
638:             update_clause = kvs.map{|k,v| v ? "#{ k }='#{ v }'" : "#{ k }=NULL" }.join(",\n")
639:             where_clause = jids.map{|jid| "jid=#{ jid }"}.join(" or\n")
640:             update_sql = 
641:               "update jobs\n" <<
642:               "set\n#{ update_clause }\n" <<
643:               "where\n(state='pending' or state='holding') and\n(#{ where_clause })"
644:             select_sql = "select * from jobs where (state='pending' or state='holding') and\n(#{ where_clause })"
645: 
646:             if kvs.empty?
647:               [ nil, select_sql ]
648:             else
649:               [ update_sql, select_sql ]
650:             end
651:           end
652:         #
653:         # setup stdin
654:         #
655:         tmp_stdin(stdin) do |ts|
656:           clobber_stdin = lambda{|job| FileUtils::cp ts.path, standard_in_4(job['jid']) if ts}
657: 
658:           tuples = []
659: 
660:           metablock = 
661:             if block
662:               lambda{|job| clobber_stdin[job]; block[job]}
663:             else
664:               lambda{|job| clobber_stdin[job]; tuples << job}
665:             end
666: 
667:           transaction do 
668:             update_sql, select_sql = build_sql[kvs, jids]
669:             break unless select_sql
670:             execute(update_sql){} if update_sql
671:             execute(select_sql, &metablock)
672:           end
673: 
674:           block ? nil : tuples
675:         end
676: #--}}}
677:       end

[Source]

     # File lib/rq-3.0.0/jobqueue.rb, line 582
582:       def vacuum
583: #--{{{
584:         @qdb.vacuum
585: #--}}}
586:       end

[Validate]