RQ::QDB (Class)

In: lib/rq-2.3.1/qdb.rb
Parent: Object
MainHelper StatusLister Snapshotter ReSubmitter Feeder Deleter Relayer Executor Submitter Locker Backer Configurator Lister Rotater Creator Updater Querier ::Hash ConfigFile DRbUndumped JobRunner Main QDB JobQueue JobRunnerDaemon Array SleepCycle Job ArrayFields ::OrderedHash OrderedAutoHash LogMethods Refresher lib/rq-2.3.1/rotater.rb lib/rq-2.3.1/sleepcycle.rb lib/rq-2.3.1/statuslister.rb lib/rq-2.3.1/updater.rb lib/rq-2.3.1/relayer.rb lib/rq-2.3.1/creator.rb lib/rq-2.3.1/lister.rb lib/rq-2.3.1/snapshotter.rb bin/rq.rb lib/rq-2.3.1/resubmitter.rb lib/rq-2.3.1/orderedautohash.rb lib/rq-2.3.1/locker.rb lib/rq-2.3.1/querier.rb lib/rq-2.3.1/submitter.rb lib/rq-2.3.1/configfile.rb lib/rq-2.3.1/executor.rb lib/rq-2.3.1/refresher.rb lib/rq-2.3.1/job.rb lib/rq-2.3.1/mainhelper.rb lib/rq-2.3.1/qdb.rb lib/rq-2.3.1/feeder.rb lib/rq-2.3.1/configurator.rb lib/rq-2.3.1/deleter.rb lib/rq-2.3.1/jobqueue.rb lib/rq-2.3.1/jobrunnerdaemon.rb lib/rq-2.3.1/backer.rb lib/rq-2.3.1/jobrunner.rb Usage Util LogClassMethods LoggerExt LogMethods Logging RQ Module: RQ

the QDB class is the low level access point to the actual sqlite database. the primary function if performs is to serialize access to the queue db via the locking protocol

Constants

FIELDS = #--{{{ %w( jid priority state submitted started finished elapsed submitter runner pid exit_status tag restartable command )
PRAGMAS = #--{{{ <<-sql PRAGMA default_synchronous = FULL; sql
SCHEMA = #--{{{ <<-sql create table jobs ( jid integer primary key, #{ FIELDS[1..-1].join ",\n " } ); create table attributes ( key, value, primary key (key) ); sql
DEFAULT_LOGGER = Logger::new(STDERR)
DEFAULT_SQL_DEBUG = false
DEFAULT_TRANSACTION_RETRIES = 4
DEFAULT_AQUIRE_LOCK_SC = SleepCycle::new(2, 16, 2)
DEFAULT_TRANSACTION_RETRIES_SC = SleepCycle::new(8, 24, 8)
DEFAULT_ATTEMPT_LOCKD_RECOVERY = true
DEFAULT_LOCKD_RECOVER_WAIT = 1800
DEFAULT_AQUIRE_LOCK_LOCKFILE_STALE_AGE = 1800
DEFAULT_AQUIRE_LOCK_REFRESH_RATE = 8

Attributes

aquire_lock_lockfile_stale_age  [RW] 
aquire_lock_lockfile_stale_age  [RW] 
aquire_lock_refresh_rate  [RW] 
aquire_lock_refresh_rate  [RW] 
aquire_lock_sc  [RW] 
aquire_lock_sc  [RW] 
attempt_lockd_recovery  [RW] 
attempt_lockd_recovery  [RW] 
dirname  [R] 
fields  [R] 
lockd_recover_wait  [RW] 
lockd_recover_wait  [RW] 
lockfile  [R] 
mutex  [R] 
opts  [R] 
path  [R] 
schema  [R] 
sql_debug  [RW] 
sql_debug  [RW] 
transaction_retries  [RW] 
transaction_retries  [RW] 
transaction_retries_sc  [RW] 
transaction_retries_sc  [RW] 

Included Modules

Util Logging

Public Class methods

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 139
139:         def create path, opts = {}
140: #--{{{
141:           qdb = new path, opts
142:           FileUtils::touch qdb.lockfile
143:           create_schema qdb.schema
144:           qdb.transaction do 
145:             qdb.execute PRAGMAS
146:             qdb.execute SCHEMA
147:           end
148:           qdb
149: #--}}}
150:         end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 151
151:         def create_schema path
152: #--{{{
153:           tmp = "#{ path }.tmp"
154:           open(tmp,'w') do |f| 
155:             f.puts PRAGMAS 
156:             f.puts SCHEMA
157:           end
158:           FileUtils::mv tmp, path
159: #--}}}
160:         end

[Source]

    # File lib/rq-2.3.1/qdb.rb, line 85
85:         def fields
86: #--{{{
87:           FIELDS
88: #--}}}
89:         end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 114
114:         def h2t h
115: #--{{{
116:           t = tuple
117:           FIELDS.each{|f| t[f] = h[f]}
118:           t
119: #--}}}
120:         end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 90
 90:         def integrity_check dbpath
 91: #--{{{
 92:           ret = false 
 93:           tuple = nil
 94:           begin
 95:             db = SQLite::Database::new dbpath, 0
 96:             opened = true
 97:             db.use_array = true
 98:             tuple = db.execute 'PRAGMA integrity_check;'
 99:             ret = (tuple and tuple.first and (tuple.first["integrity_check"] =~ /^\s*ok\s*$/io))
100:           ensure
101:             db.close if opened
102:             db = nil
103:           end
104:           ret
105: #--}}}
106:         end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 181
181:       def initialize path, opts = {}
182: #--{{{
183:         @path = path
184:         @opts = opts
185: 
186:         @logger = 
187:           Util::getopt('logger', @opts) || 
188:           klass.logger || 
189:           DEFAULT_LOGGER
190: 
191:         @sql_debug = 
192:           Util::getopt('sql_debug', @opts) || 
193:           klass.sql_debug || 
194:           ENV['RQ_SQL_DEBUG'] || 
195:           DEFAULT_SQL_DEBUG
196: 
197:         @transaction_retries = 
198:           Util::getopt('transaction_retries', @opts) || 
199:           klass.transaction_retries ||
200:           DEFAULT_TRANSACTION_RETRIES
201: 
202:         @aquire_lock_sc =
203:           Util::getopt('aquire_lock_sc', @opts) ||
204:           klass.aquire_lock_sc ||
205:           DEFAULT_AQUIRE_LOCK_SC
206: 
207:         @transaction_retries_sc = 
208:           Util::getopt('transaction_retries_sc', @opts) ||
209:           klass.transaction_retries_sc ||
210:           DEFAULT_TRANSACTION_RETRIES_SC
211: 
212:         @attempt_lockd_recovery = 
213:           Util::getopt('attempt_lockd_recovery', @opts) ||
214:           klass.attempt_lockd_recovery ||
215:           DEFAULT_ATTEMPT_LOCKD_RECOVERY
216: 
217:         @lockd_recover_wait = 
218:           Util::getopt('lockd_recover_wait', @opts) ||
219:           klass.lockd_recover_wait ||
220:           DEFAULT_LOCKD_RECOVER_WAIT
221: 
222:         @aquire_lock_lockfile_stale_age = 
223:           Util::getopt('aquire_lock_lockfile_stale_age', @opts) ||
224:           klass.aquire_lock_lockfile_stale_age ||
225:           DEFAULT_AQUIRE_LOCK_LOCKFILE_STALE_AGE
226: 
227:         @aquire_lock_refresh_rate = 
228:           Util::getopt('aquire_lock_refresh_rate', @opts) ||
229:           klass.aquire_lock_refresh_rate ||
230:           DEFAULT_AQUIRE_LOCK_REFRESH_RATE
231: 
232: 
233:         @schema = "#{ @path }.schema"
234:         @dirname = File::dirname(path).gsub(%|/+\s*$|,'')
235:         @basename = File::basename(path)
236:         @waiting_w = File::join(@dirname, "#{ Util::hostname }.#{ $$ }.waiting.w") 
237:         @waiting_r = File::join(@dirname, "#{ Util::hostname }.#{ $$ }.waiting.r") 
238:         @lock_w = File::join(@dirname, "#{ Util::hostname }.#{ $$ }.lock.w") 
239:         @lock_r = File::join(@dirname, "#{ Util::hostname }.#{ $$ }.lock.r") 
240:         @lockfile = File::join(@dirname, 'lock') 
241:         @lockf = Lockfile::new("#{ @path }.lock") 
242:         @fields = FIELDS
243:         @in_transaction = false
244:         @in_ro_transaction = false
245:         @db = nil
246: 
247:         @lockd_recover = "#{ @dirname }.lockd_recover"
248:         @lockd_recover_lockf = Lockfile::new "#{ @lockd_recover }.lock"
249:         @lockd_recovered = false
250: #--}}}
251:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 128
128:         def q tuple
129: #--{{{
130:           [ tuple ].flatten.map do |f| 
131:             if f
132:               "'" << Util.escape(f,"'","'") << "'"
133:             else
134:               'NULL'
135:             end
136:           end
137: #--}}}
138:         end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 107
107:         def t2h tuple
108: #--{{{
109:           h = {}
110:           FIELDS.each_with_index{|f,i| h[f] = tuple[i]}
111:           h
112: #--}}}
113:         end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 121
121:         def tuple
122: #--{{{
123:           t = Array::new FIELDS.size
124:           t.fields = FIELDS
125:           t
126: #--}}}
127:         end

Public Instance methods

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 406
406:       def abort_transaction(*a)
407: #--{{{
408:         raise AbortedTransactionError, *a
409: #--}}}
410:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 433
433:       def aquire_lock opts = {} 
434: #--{{{
435:         ro = Util::getopt 'read_only', opts
436:         ret = nil
437: 
438:         @aquire_lock_sc.reset
439:     
440:         waiting, ltype, lfile =
441:           if ro
442:             [@waiting_r, File::LOCK_SH | File::LOCK_NB, @lock_r]
443:           else
444:             [@waiting_w, File::LOCK_EX | File::LOCK_NB, @lock_w]
445:           end
446:     
447:         ltype_s = (ltype == File::LOCK_EX ? 'write' : 'read')
448:         ltype ||= File::LOCK_NB
449: 
450:         aquired = false
451: 
452:         until aquired
453:           begin
454:             debug{ "aquiring lock" }
455:             #@lockf.lock unless ro
456:       
457:             open(@lockfile, 'a+') do |lf|
458: 
459:               locked = false
460:               refresher = nil
461:               sc = nil
462: 
463:               begin
464:                 FileUtils::touch waiting
465:                 # poll
466:                 42.times do
467:                   locked = lf.posixlock(ltype | File::LOCK_NB)
468:                   break if locked
469:                   sleep rand
470:                 end
471: 
472:                 if locked
473:                   aquired = true
474:                   refresher = Refresher::new @lockfile, @aquire_lock_refresh_rate
475:                   debug{ "refresher pid <#{ refresher.pid }> refresh_rate <#{ @aquire_lock_refresh_rate }>" }
476:                   FileUtils::rm_f waiting rescue nil
477:                   FileUtils::touch lfile rescue nil
478:                   debug{ "aquired lock" }
479:                   ret = yield
480:                   debug{ "released lock" }
481:                 else
482:                   aquired = false 
483:                   stat = File::stat @lockfile
484:                   mtime = stat.mtime
485:                   stale = mtime < (Time::now - @aquire_lock_lockfile_stale_age)
486:                   if stale
487:                     warn{ "detected stale lockfile of mtime <#{ mtime }>" }
488:                     lockd_recover if @attempt_lockd_recovery
489:                   end
490:                   sc = @aquire_lock_sc.next
491:                   debug{ "failed to aquire lock - sleep(#{ sc })" }
492:                   sleep sc 
493:                 end
494: 
495:               ensure
496:                 if locked
497:                   unlocked = false
498:                   begin
499:                     42.times do
500:                       unlocked = lf.posixlock(File::LOCK_UN | File::LOCK_NB)
501:                       break if unlocked
502:                       sleep rand
503:                     end
504:                   ensure
505:                     lf.posixlock File::LOCK_UN unless unlocked
506:                   end
507:                 end
508:                 refresher.kill if refresher
509:                 FileUtils::rm_f waiting rescue nil
510:                 FileUtils::rm_f lfile rescue nil 
511:               end
512:             end
513:           ensure
514:             #@lockf.unlock rescue nil unless read_only
515:           end
516:         end
517:         ret
518: #--}}}
519:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 520
520:       def connect
521: #--{{{
522:         ret = nil
523:         opened = nil
524:         begin
525:           raise 'db has no schema' unless test ?e, @schema
526:           debug{"connecting to db <#{ @path }>..."}
527:           $db = @db = SQLite::Database::new(@path, 0)
528:           debug{"connected."}
529:           opened = true
530:           @db.use_array = true
531:           ret = yield @db
532:         ensure
533:           @db.close if opened
534:           $db = @db = nil
535:           debug{"disconnected from db <#{ @path }>"}
536:         end
537:         ret
538: #--}}}
539:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 540
540:       def execute sql, &block
541: #--{{{
542:         raise 'not in transaction' unless @in_transaction
543:         if @sql_debug
544:           logger << "SQL:\n#{ sql }\n"
545:         end
546:         #ret = retry_if_locked{ @db.execute sql, &block }
547:         ret = @db.execute sql, &block
548:         if @sql_debug and ret and Array === ret and ret.first
549:           logger << "RESULT:\n#{ ret.first.inspect }\n...\n"
550:         end
551:         ret
552: #--}}}
553:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 674
674:       def integrity_check path = @path
675: #--{{{
676:         debug{ "running integrity_check on <#{ path }>" }
677:         klass.integrity_check(path)  
678: #--}}}
679:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 680
680:       def lock opts = {} 
681: #--{{{
682:         ret = nil
683:         lockd_recover_wrap do
684:           aquire_lock(opts) do
685:             ret = yield 
686:           end
687:         end
688:         ret
689: #--}}}
690:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 582
582:       def lockd_recover
583: #--{{{
584:         return nil unless @attempt_lockd_recovery
585:         warn{ "attempting lockd recovery" }
586:         time = Time::now
587:         ret = nil
588: 
589:         @lockd_recover_lockf.lock do
590:           Util::uncache @dirname rescue nil
591:           Util::uncache @path rescue nil
592:           Util::uncache @lockfile rescue nil
593:           Util::uncache @lockd_recover rescue nil
594:           mtime = File::stat(@lockd_recover).mtime rescue time
595: 
596:           if mtime > time 
597:             warn{ "skipping lockd recovery (another node has already recovered)" }
598:             ret = true
599:           else
600:             moved = false
601:             begin
602:               FileUtils::touch @lockd_recover 
603:               @lockd_recovered = false 
604: 
605:               begin
606:                 report = "hostname : \#{ Util::hostname }\npid      : \#{ Process.pid }\ntime     : \#{ Time::now }\nq        :\npath : \#{ @dirname }\nstat : \#{ File::stat(@dirname).inspect }\ndb       :\npath : \#{ @path }\nstat : \#{ File::stat(@path).inspect }\nlockfile :\npath : \#{ @lockfile }\nstat : \#{ File::stat(@lockfile).inspect }\n"
607:                 info{ "LOCKD RECOVERY REPORT" }
608:                 logger << report
609:                 cmd = "mail -s LOCKD_RECOVERY ara.t.howard@noaa.gov <<eof\n#{ report }\neof"
610:                 Util::system cmd
611:               rescue
612:                 nil
613:               end
614: 
615:               warn{ "sleeping #{ @lockd_recover_wait }s before continuing..." }
616:               sleep @lockd_recover_wait 
617: 
618:               tmp = "#{ @dirname }.tmp"
619:               FileUtils::rm_rf tmp
620:               FileUtils::mv @dirname, tmp
621:               moved = true
622: 
623:               rfiles = [@path, @lockfile].map{|f| File::join(tmp,File::basename(f))}
624:               rfiles.each do |f|
625:                 ftmp = "#{ f }.tmp"
626:                 FileUtils::rm_rf ftmp
627:                 FileUtils::cp f, ftmp 
628:                 FileUtils::rm f 
629:                 FileUtils::mv ftmp, f 
630:               end
631: 
632:               dbtmp = File::join(tmp,File::basename(@path))
633: 
634:               if integrity_check(dbtmp)
635:                 FileUtils::mv tmp, @dirname
636:                 FileUtils::cp @lockd_recover_lockf.path, @lockd_recover 
637:                 @lockd_recovered = true 
638:                 Util::uncache @dirname rescue nil
639:                 Util::uncache @path rescue nil
640:                 Util::uncache @lockfile rescue nil
641:                 Util::uncache @lockd_recover rescue nil
642:                 warn{ "lockd recovery complete" }
643:               else
644:                 FileUtils::mv tmp, @dirname
645:                 @lockd_recovered = false 
646:                 error{ "lockd recovery failed" }
647:               end
648: 
649:               ret = @lockd_recovered 
650:             ensure
651:               if moved and not @lockd_recovered and tmp and test(?d, tmp)
652:                 FileUtils::mv tmp, @dirname
653:               end
654:             end
655:           end
656:         end
657:         ret
658: #--}}}
659:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 323
323:       def lockd_recover_wrap opts = {}
324: #--{{{
325:         ret = nil
326:         try_again = false
327:         begin
328:           begin
329:             @lockd_recovered = false
330:             old_mtime = 
331:               begin
332:                 Util::uncache @lockd_recover rescue nil
333:                 File::stat(@lockd_recover).mtime
334:               rescue
335:                 Time::now 
336:               end
337:             ret = yield
338:           ensure
339:             new_mtime =
340:               begin
341:                 Util::uncache @lockd_recover rescue nil
342:                 File::stat(@lockd_recover).mtime
343:               rescue
344:                 old_mtime
345:               end
346: 
347:             if new_mtime and old_mtime and new_mtime > old_mtime and not @lockd_recovered
348:               try_again = true
349:             end
350:           end
351:         rescue
352:           if try_again
353:             warn{ "a remote lockd recovery has invalidated this transaction!" }
354:             warn{ "retrying..."}
355:             sleep 120
356:             retry
357:           else
358:             raise
359:           end
360:         end
361:         ret
362: #--}}}
363:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 693
693:       def read_lock(opts = {}, &block)
694: #--{{{
695:         opts['read_only'] = true
696:         lock opts, &block
697: #--}}}
698:       end

TODO - add sleep cycle if this ends up getting used

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 557
557:       def retry_if_locked
558: #--{{{
559:         ret = nil
560:         begin
561:           ret = yield 
562:         rescue SQLite::BusyException
563:           warn{ "database locked - waiting(1.0) and retrying" }
564:           sleep 1.0 
565:           retry
566:         end
567:         ret
568: #--}}}
569:       end
rlock(opts = {}, &block)

Alias for read_lock

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 285
285:       def ro_transaction(opts = {}, &block)
286: #--{{{
287:         opts['read_only'] = true
288:         transaction(opts, &block)
289: #--}}}
290:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 252
252:       def ro_transaction(opts = {}, &block)
253: #--{{{
254:         opts['read_only'] = true
255:         transaction(opts, &block)
256: #--}}}
257:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 411
411:       def rollback_transaction(*a)
412: #--{{{
413:         raise RollbackTransactionError, *a
414: #--}}}
415:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 416
416:       def sillyclean opts = {} 
417: #--{{{
418:         ro = Util::getopt 'read_only', opts
419:         ret = nil
420:         if ro
421:           ret = yield
422:         else
423:           glob = File::join @dirname,'.nfs*'
424:           orgsilly = Dir[glob]
425:           ret = yield
426:           newsilly = Dir[glob]
427:           silly = newsilly - orgsilly 
428:           silly.each{|path| FileUtils::rm_rf path}
429:         end
430:         ret
431: #--}}}
432:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 291
291:       def transaction opts = {} 
292: #--{{{
293:         ro = Util::getopt 'read_only', opts 
294:         ret = nil
295:         if @in_transaction
296:       STDERR.puts 'continuing transaction...'
297:           ret = yield
298:         else
299:           begin 
300:       STDERR.puts 'starting transaction...'
301:             @in_transaction = true
302:             lockd_recover_wrap(opts) do
303:               transaction_wrap(opts) do
304:                 aquire_lock(opts) do
305:                   #sillyclean(opts) do
306:                     connect do
307:                       execute 'begin' unless ro
308:                       ret = yield 
309:                       execute 'commit' unless ro
310:                     end
311:                   #end
312:                 end
313:               end
314:             end
315:           ensure
316:             @in_transaction = false
317:           end
318:         end
319:         ret
320: #--}}}
321:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 258
258:       def transaction opts = {} 
259: #--{{{
260:         raise 'nested transaction' if @in_transaction
261:         ro = Util::getopt 'read_only', opts 
262:         ret = nil
263:         begin 
264:           @in_transaction = true
265:           lockd_recover_wrap(opts) do
266:             transaction_wrap(opts) do
267:               aquire_lock(opts) do
268:                 #sillyclean(opts) do
269:                   connect do
270:                     execute 'begin' unless ro
271:                     ret = yield 
272:                     execute 'commit' unless ro
273:                   end
274:                 #end
275:               end
276:             end
277:           end
278:         ensure
279:           @in_transaction = false
280:         end
281:         ret
282: #--}}}
283:       end

TODO - perhaps should not retry on SQLException?? yet errors seem to map to this exception even when the sql is fine… safest (and most anoying) is to simply retry.

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 369
369:       def transaction_wrap opts = {} 
370: #--{{{
371:         ro = Util::getopt 'read_only', opts
372:         ret = nil
373:         if ro
374:           ret = yield 
375:         else
376:           errors = []
377:           @transaction_retries_sc.reset
378:           begin
379:             ret = yield 
380:           rescue => e
381:           #rescue SQLite::DatabaseException, SQLite::SQLException, SystemCallError => e
382:             case e
383:               when AbortedTransactionError 
384:                 raise
385:               when RollbackTransactionError 
386:                 raise
387:               else
388:                 if @transaction_retries == 0
389:                   raise
390:                 elsif errors.size >= @transaction_retries
391:                   error{ "MAXIMUM TRANSACTION RETRIES SURPASSED" }
392:                   raise
393:                 else
394:                   warn{ e } if(errors.empty? or not Util::erreq(errors[-1], e))
395:                   errors << e
396:                   warn{ "retry <#{ errors.size }>..." }
397:                 end
398:                 sleep @transaction_retries_sc.next
399:                 retry
400:               end
401:           end
402:         end
403:         ret
404: #--}}}
405:       end

[Source]

     # File lib/rq-2.3.1/qdb.rb, line 570
570:       def vacuum
571: #--{{{
572:         raise 'nested transaction' if @in_transaction
573:         begin 
574:           @in_transaction = true
575:           connect{ execute 'vacuum' }
576:         ensure
577:           @in_transaction = false
578:         end
579:         self
580: #--}}}
581:       end
wlockopts = {}

Alias for write_lock

write_lockopts = {}

Alias for lock

[Validate]