| In: |
lib/rq-3.0.0/qdb.rb
|
| Parent: | Object |
the QDB class is the low level access point to the actual sqlite database. the primary function if performs is to serialize access to the queue db via the locking protocol
| FIELDS | = | #--{{{ %w( jid priority state submitted started finished elapsed submitter runner stdin stdout stderr pid exit_status tag restartable command ) |
| PRAGMAS | = | #--{{{ <<-sql PRAGMA default_synchronous = FULL; sql |
| SCHEMA | = | #--{{{ <<-sql create table jobs ( jid integer primary key, #{ FIELDS[1..-1].join ",\n " } ); create table attributes ( key, value, primary key (key) ); sql |
| DEFAULT_LOGGER | = | Logger::new(STDERR) |
| DEFAULT_SQL_DEBUG | = | false |
| DEFAULT_TRANSACTION_RETRIES | = | 4 |
| DEFAULT_AQUIRE_LOCK_SC | = | SleepCycle::new(2, 16, 2) |
| DEFAULT_TRANSACTION_RETRIES_SC | = | SleepCycle::new(8, 24, 8) |
| DEFAULT_ATTEMPT_LOCKD_RECOVERY | = | true |
| DEFAULT_LOCKD_RECOVER_WAIT | = | 3600 |
| DEFAULT_AQUIRE_LOCK_LOCKFILE_STALE_AGE | = | 21600 |
| DEFAULT_AQUIRE_LOCK_REFRESH_RATE | = | 30 |
| aquire_lock_lockfile_stale_age | [RW] | |
| aquire_lock_lockfile_stale_age | [RW] | |
| aquire_lock_refresh_rate | [RW] | |
| aquire_lock_refresh_rate | [RW] | |
| aquire_lock_sc | [RW] | |
| aquire_lock_sc | [RW] | |
| attempt_lockd_recovery | [RW] | |
| attempt_lockd_recovery | [RW] | |
| dirname | [R] | |
| fields | [R] | |
| lockd_recover_wait | [RW] | |
| lockd_recover_wait | [RW] | |
| lockfile | [R] | |
| mutex | [R] | |
| opts | [R] | |
| path | [R] | |
| schema | [R] | |
| sql_debug | [RW] | |
| sql_debug | [RW] | |
| transaction_retries | [RW] | |
| transaction_retries | [RW] | |
| transaction_retries_sc | [RW] | |
| transaction_retries_sc | [RW] |
# File lib/rq-3.0.0/qdb.rb, line 145
145: def create path, opts = {}
146: #--{{{
147: qdb = new path, opts
148: FileUtils::touch qdb.lockfile
149: create_schema qdb.schema
150: qdb.transaction do
151: qdb.execute PRAGMAS
152: qdb.execute SCHEMA
153: end
154: qdb
155: #--}}}
156: end
# File lib/rq-3.0.0/qdb.rb, line 157
157: def create_schema path
158: #--{{{
159: tmp = "#{ path }.tmp"
160: open(tmp,'w') do |f|
161: f.puts PRAGMAS
162: f.puts SCHEMA
163: end
164: FileUtils::mv tmp, path
165: #--}}}
166: end
# File lib/rq-3.0.0/qdb.rb, line 120
120: def h2t h
121: #--{{{
122: t = tuple
123: FIELDS.each{|f| t[f] = h[f]}
124: t
125: #--}}}
126: end
# File lib/rq-3.0.0/qdb.rb, line 91
91: def integrity_check dbpath
92: #--{{{
93: ret = false
94: tuple = nil
95: begin
96: db =
97: begin
98: SQLite::Database::new dbpath, 0
99: rescue
100: SQLite::Database::new dbpath
101: end
102: opened = true
103: db.use_array = true rescue nil
104: tuple = db.execute 'PRAGMA integrity_check;'
105: ret = (tuple and tuple.first and (tuple.first["integrity_check"] =~ /^\s*ok\s*$/io))
106: ensure
107: db.close if opened
108: db = nil
109: end
110: ret
111: #--}}}
112: end
# File lib/rq-3.0.0/qdb.rb, line 187
187: def initialize path, opts = {}
188: #--{{{
189: @path = path
190: @opts = opts
191:
192: @logger =
193: Util::getopt('logger', @opts) ||
194: klass.logger ||
195: DEFAULT_LOGGER
196:
197: @sql_debug =
198: Util::getopt('sql_debug', @opts) ||
199: klass.sql_debug ||
200: ENV['RQ_SQL_DEBUG'] ||
201: DEFAULT_SQL_DEBUG
202:
203: @transaction_retries =
204: Util::getopt('transaction_retries', @opts) ||
205: klass.transaction_retries ||
206: DEFAULT_TRANSACTION_RETRIES
207:
208: @aquire_lock_sc =
209: Util::getopt('aquire_lock_sc', @opts) ||
210: klass.aquire_lock_sc ||
211: DEFAULT_AQUIRE_LOCK_SC
212:
213: @transaction_retries_sc =
214: Util::getopt('transaction_retries_sc', @opts) ||
215: klass.transaction_retries_sc ||
216: DEFAULT_TRANSACTION_RETRIES_SC
217:
218: @attempt_lockd_recovery =
219: Util::getopt('attempt_lockd_recovery', @opts) ||
220: klass.attempt_lockd_recovery ||
221: DEFAULT_ATTEMPT_LOCKD_RECOVERY
222:
223: @lockd_recover_wait =
224: Util::getopt('lockd_recover_wait', @opts) ||
225: klass.lockd_recover_wait ||
226: DEFAULT_LOCKD_RECOVER_WAIT
227:
228: @aquire_lock_lockfile_stale_age =
229: Util::getopt('aquire_lock_lockfile_stale_age', @opts) ||
230: klass.aquire_lock_lockfile_stale_age ||
231: DEFAULT_AQUIRE_LOCK_LOCKFILE_STALE_AGE
232:
233: @aquire_lock_refresh_rate =
234: Util::getopt('aquire_lock_refresh_rate', @opts) ||
235: klass.aquire_lock_refresh_rate ||
236: DEFAULT_AQUIRE_LOCK_REFRESH_RATE
237:
238:
239: @schema = "#{ @path }.schema"
240: @dirname = File::dirname(path).gsub(%|/+\s*$|,'')
241: @basename = File::basename(path)
242: @waiting_w = File::join(@dirname, "#{ Util::hostname }.#{ $$ }.waiting.w")
243: @waiting_r = File::join(@dirname, "#{ Util::hostname }.#{ $$ }.waiting.r")
244: @lock_w = File::join(@dirname, "#{ Util::hostname }.#{ $$ }.lock.w")
245: @lock_r = File::join(@dirname, "#{ Util::hostname }.#{ $$ }.lock.r")
246: @lockfile = File::join(@dirname, 'lock')
247: @lockf = Lockfile::new("#{ @path }.lock")
248: @fields = FIELDS
249: @in_transaction = false
250: @in_ro_transaction = false
251: @db = nil
252:
253: @lockd_recover = "#{ @dirname }.lockd_recover"
254: @lockd_recover_lockf = Lockfile::new "#{ @lockd_recover }.lock"
255: @lockd_recovered = false
256: #--}}}
257: end
# File lib/rq-3.0.0/qdb.rb, line 134
134: def q tuple
135: #--{{{
136: [ tuple ].flatten.map do |f|
137: if f and not f.to_s.empty?
138: "'" << Util.escape(f,"'","'") << "'"
139: else
140: 'NULL'
141: end
142: end
143: #--}}}
144: end
# File lib/rq-3.0.0/qdb.rb, line 113
113: def t2h tuple
114: #--{{{
115: h = {}
116: FIELDS.each_with_index{|f,i| h[f] = tuple[i]}
117: h
118: #--}}}
119: end
# File lib/rq-3.0.0/qdb.rb, line 127
127: def tuple
128: #--{{{
129: t = Array::new FIELDS.size
130: t.fields = FIELDS
131: t
132: #--}}}
133: end
# File lib/rq-3.0.0/qdb.rb, line 412
412: def abort_transaction(*a)
413: #--{{{
414: raise AbortedTransactionError, *a
415: #--}}}
416: end
# File lib/rq-3.0.0/qdb.rb, line 439
439: def aquire_lock opts = {}
440: #--{{{
441: ro = Util::getopt 'read_only', opts
442: ret = nil
443:
444: @aquire_lock_sc.reset
445:
446: waiting, ltype, lfile =
447: if ro
448: [@waiting_r, File::LOCK_SH | File::LOCK_NB, @lock_r]
449: else
450: [@waiting_w, File::LOCK_EX | File::LOCK_NB, @lock_w]
451: end
452:
453: ltype_s = (ltype == File::LOCK_EX ? 'write' : 'read')
454: ltype ||= File::LOCK_NB
455:
456: aquired = false
457:
458: until aquired
459: begin
460: debug{ "aquiring lock" }
461: #@lockf.lock unless ro
462:
463: open(@lockfile, 'a+') do |lf|
464:
465: locked = false
466: refresher = nil
467: sc = nil
468:
469: begin
470: FileUtils::touch waiting
471: # poll
472: 42.times do
473: locked = lf.posixlock(ltype | File::LOCK_NB)
474: break if locked
475: sleep rand
476: end
477:
478: if locked
479: aquired = true
480: refresher = Refresher::new @lockfile, @aquire_lock_refresh_rate
481: debug{ "refresher pid <#{ refresher.pid }> refresh_rate <#{ @aquire_lock_refresh_rate }>" }
482: FileUtils::rm_f waiting rescue nil
483: FileUtils::touch lfile rescue nil
484: debug{ "aquired lock" }
485: ret = yield
486: debug{ "released lock" }
487: else
488: aquired = false
489: stat = File::stat @lockfile
490: mtime = stat.mtime
491: stale = mtime < (Time::now - @aquire_lock_lockfile_stale_age)
492: if stale
493: Util::uncache @lockfile rescue nil
494: stat = File::stat @lockfile
495: mtime = stat.mtime
496: stale = mtime < (Time::now - @aquire_lock_lockfile_stale_age)
497: if stale
498: warn{ "detected stale lockfile of mtime <#{ mtime }>" }
499: lockd_recover if @attempt_lockd_recovery
500: end
501: end
502: sc = @aquire_lock_sc.next
503: debug{ "failed to aquire lock - sleep(#{ sc })" }
504: sleep sc
505: end
506:
507: ensure
508: if locked
509: unlocked = false
510: begin
511: 42.times do
512: unlocked = lf.posixlock(File::LOCK_UN | File::LOCK_NB)
513: break if unlocked
514: sleep rand
515: end
516: ensure
517: lf.posixlock File::LOCK_UN unless unlocked
518: end
519: end
520: refresher.kill if refresher
521: FileUtils::rm_f waiting rescue nil
522: FileUtils::rm_f lfile rescue nil
523: end
524: end
525: ensure
526: #@lockf.unlock rescue nil unless read_only
527: end
528: end
529: ret
530: #--}}}
531: end
# File lib/rq-3.0.0/qdb.rb, line 532
532: def connect
533: #--{{{
534: ret = nil
535: opened = nil
536: begin
537: raise 'db has no schema' unless test ?e, @schema
538: debug{"connecting to db <#{ @path }>..."}
539: $db = @db =
540: begin
541: SQLite::Database::new(@path, 0)
542: rescue
543: SQLite::Database::new(@path)
544: end
545: debug{"connected."}
546: opened = true
547: @db.use_array = true rescue nil
548: ret = yield @db
549: ensure
550: @db.close if opened
551: $db = @db = nil
552: debug{"disconnected from db <#{ @path }>"}
553: end
554: ret
555: #--}}}
556: end
# File lib/rq-3.0.0/qdb.rb, line 557
557: def execute sql, &block
558: #--{{{
559: raise 'not in transaction' unless @in_transaction
560: if @sql_debug
561: logger << "SQL:\n#{ sql }\n"
562: end
563: #ret = retry_if_locked{ @db.execute sql, &block }
564: ret = @db.execute sql, &block
565: if @sql_debug and ret and Array === ret and ret.first
566: logger << "RESULT:\n#{ ret.first.inspect }\n...\n"
567: end
568: ret
569: #--}}}
570: end
# File lib/rq-3.0.0/qdb.rb, line 705
705: def integrity_check path = @path
706: #--{{{
707: debug{ "running integrity_check on <#{ path }>" }
708: klass.integrity_check(path)
709: #--}}}
710: end
# File lib/rq-3.0.0/qdb.rb, line 711
711: def lock opts = {}
712: #--{{{
713: ret = nil
714: lockd_recover_wrap do
715: aquire_lock(opts) do
716: ret = yield
717: end
718: end
719: ret
720: #--}}}
721: end
# File lib/rq-3.0.0/qdb.rb, line 613
613: def lockd_recover
614: #--{{{
615: return nil unless @attempt_lockd_recovery
616: warn{ "attempting lockd recovery" }
617: time = Time::now
618: ret = nil
619:
620: @lockd_recover_lockf.lock do
621: Util::uncache @dirname rescue nil
622: Util::uncache @path rescue nil
623: Util::uncache @lockfile rescue nil
624: Util::uncache @lockd_recover rescue nil
625: mtime = File::stat(@lockd_recover).mtime rescue time
626:
627: if mtime > time
628: warn{ "skipping lockd recovery (another node has already recovered)" }
629: ret = true
630: else
631: moved = false
632: begin
633: FileUtils::touch @lockd_recover
634: @lockd_recovered = false
635:
636: begin
637: report = "hostname : \#{ Util::hostname }\npid : \#{ Process.pid }\ntime : \#{ Time::now }\nq :\npath : \#{ @dirname }\nstat : \#{ File::stat(@dirname).inspect }\ndb :\npath : \#{ @path }\nstat : \#{ File::stat(@path).inspect }\nlockfile :\npath : \#{ @lockfile }\nstat : \#{ File::stat(@lockfile).inspect }\n"
638: info{ "LOCKD RECOVERY REPORT" }
639: logger << report
640: cmd = "mail -s LOCKD_RECOVERY ara.t.howard@noaa.gov <<eof\n#{ report }\neof"
641: Util::system cmd
642: rescue
643: nil
644: end
645:
646: warn{ "sleeping #{ @lockd_recover_wait }s before continuing..." }
647: sleep @lockd_recover_wait
648:
649: tmp = "#{ @dirname }.tmp"
650: FileUtils::rm_rf tmp
651: FileUtils::mv @dirname, tmp
652: moved = true
653:
654: rfiles = [@path, @lockfile].map{|f| File::join(tmp,File::basename(f))}
655: rfiles.each do |f|
656: ftmp = "#{ f }.tmp"
657: FileUtils::rm_rf ftmp
658: FileUtils::cp f, ftmp
659: FileUtils::rm f
660: FileUtils::mv ftmp, f
661: end
662:
663: dbtmp = File::join(tmp,File::basename(@path))
664:
665: if integrity_check(dbtmp)
666: FileUtils::mv tmp, @dirname
667: FileUtils::cp @lockd_recover_lockf.path, @lockd_recover
668: @lockd_recovered = true
669: Util::uncache @dirname rescue nil
670: Util::uncache @path rescue nil
671: Util::uncache @lockfile rescue nil
672: Util::uncache @lockd_recover rescue nil
673: warn{ "lockd recovery complete" }
674: else
675: FileUtils::mv tmp, @dirname
676: @lockd_recovered = false
677: error{ "lockd recovery failed" }
678: end
679:
680: ret = @lockd_recovered
681: ensure
682: if moved and not @lockd_recovered and tmp and test(?d, tmp)
683: FileUtils::mv tmp, @dirname
684: end
685: end
686: end
687: end
688: ret
689: #--}}}
690: end
# File lib/rq-3.0.0/qdb.rb, line 329
329: def lockd_recover_wrap opts = {}
330: #--{{{
331: ret = nil
332: try_again = false
333: begin
334: begin
335: @lockd_recovered = false
336: old_mtime =
337: begin
338: Util::uncache @lockd_recover rescue nil
339: File::stat(@lockd_recover).mtime
340: rescue
341: Time::now
342: end
343: ret = yield
344: ensure
345: new_mtime =
346: begin
347: Util::uncache @lockd_recover rescue nil
348: File::stat(@lockd_recover).mtime
349: rescue
350: old_mtime
351: end
352:
353: if new_mtime and old_mtime and new_mtime > old_mtime and not @lockd_recovered
354: try_again = true
355: end
356: end
357: rescue
358: if try_again
359: warn{ "a remote lockd recovery has invalidated this transaction!" }
360: warn{ "retrying..."}
361: sleep 120
362: retry
363: else
364: raise
365: end
366: end
367: ret
368: #--}}}
369: end
# File lib/rq-3.0.0/qdb.rb, line 724
724: def read_lock(opts = {}, &block)
725: #--{{{
726: opts['read_only'] = true
727: lock opts, &block
728: #--}}}
729: end
# File lib/rq-3.0.0/qdb.rb, line 599
599: def recover!
600: #--{{{
601: raise 'nested transaction' if @in_transaction
602: begin
603: @in_transaction = true
604: connect{ execute 'vacuum' }
605: require 'timeout'
606: Timeout::timeout(60){ system "sqlite #{ @path } .tables >/dev/null 2>&1" }
607: ensure
608: @in_transaction = false
609: end
610: integrity_check
611: #--}}}
612: end
TODO - add sleep cycle if this ends up getting used
# File lib/rq-3.0.0/qdb.rb, line 574
574: def retry_if_locked
575: #--{{{
576: ret = nil
577: begin
578: ret = yield
579: rescue SQLite::BusyException
580: warn{ "database locked - waiting(1.0) and retrying" }
581: sleep 1.0
582: retry
583: end
584: ret
585: #--}}}
586: end
# File lib/rq-3.0.0/qdb.rb, line 291
291: def ro_transaction(opts = {}, &block)
292: #--{{{
293: opts['read_only'] = true
294: transaction(opts, &block)
295: #--}}}
296: end
# File lib/rq-3.0.0/qdb.rb, line 258
258: def ro_transaction(opts = {}, &block)
259: #--{{{
260: opts['read_only'] = true
261: transaction(opts, &block)
262: #--}}}
263: end
# File lib/rq-3.0.0/qdb.rb, line 417
417: def rollback_transaction(*a)
418: #--{{{
419: raise RollbackTransactionError, *a
420: #--}}}
421: end
# File lib/rq-3.0.0/qdb.rb, line 422
422: def sillyclean opts = {}
423: #--{{{
424: ro = Util::getopt 'read_only', opts
425: ret = nil
426: if ro
427: ret = yield
428: else
429: glob = File::join @dirname,'.nfs*'
430: orgsilly = Dir[glob]
431: ret = yield
432: newsilly = Dir[glob]
433: silly = newsilly - orgsilly
434: silly.each{|path| FileUtils::rm_rf path}
435: end
436: ret
437: #--}}}
438: end
# File lib/rq-3.0.0/qdb.rb, line 264
264: def transaction opts = {}
265: #--{{{
266: raise 'nested transaction' if @in_transaction
267: ro = Util::getopt 'read_only', opts
268: ret = nil
269: begin
270: @in_transaction = true
271: lockd_recover_wrap(opts) do
272: transaction_wrap(opts) do
273: aquire_lock(opts) do
274: #sillyclean(opts) do
275: connect do
276: execute 'begin' unless ro
277: ret = yield
278: execute 'commit' unless ro
279: end
280: #end
281: end
282: end
283: end
284: ensure
285: @in_transaction = false
286: end
287: ret
288: #--}}}
289: end
# File lib/rq-3.0.0/qdb.rb, line 297
297: def transaction opts = {}
298: #--{{{
299: ro = Util::getopt 'read_only', opts
300: ret = nil
301: if @in_transaction
302: STDERR.puts 'continuing transaction...'
303: ret = yield
304: else
305: begin
306: STDERR.puts 'starting transaction...'
307: @in_transaction = true
308: lockd_recover_wrap(opts) do
309: transaction_wrap(opts) do
310: aquire_lock(opts) do
311: #sillyclean(opts) do
312: connect do
313: execute 'begin' unless ro
314: ret = yield
315: execute 'commit' unless ro
316: end
317: #end
318: end
319: end
320: end
321: ensure
322: @in_transaction = false
323: end
324: end
325: ret
326: #--}}}
327: end
TODO - perhaps should not retry on SQLException?? yet errors seem to map to this exception even when the sql is fine… safest (and most anoying) is to simply retry.
# File lib/rq-3.0.0/qdb.rb, line 375
375: def transaction_wrap opts = {}
376: #--{{{
377: ro = Util::getopt 'read_only', opts
378: ret = nil
379: if ro
380: ret = yield
381: else
382: errors = []
383: @transaction_retries_sc.reset
384: begin
385: ret = yield
386: rescue => e
387: #rescue SQLite::DatabaseException, SQLite::SQLException, SystemCallError => e
388: case e
389: when AbortedTransactionError
390: raise
391: when RollbackTransactionError
392: raise
393: else
394: if @transaction_retries == 0
395: raise
396: elsif errors.size >= @transaction_retries
397: error{ "MAXIMUM TRANSACTION RETRIES SURPASSED" }
398: raise
399: else
400: warn{ e } if(errors.empty? or not Util::erreq(errors[-1], e))
401: errors << e
402: warn{ "retry <#{ errors.size }>..." }
403: end
404: sleep @transaction_retries_sc.next
405: retry
406: end
407: end
408: end
409: ret
410: #--}}}
411: end