| In: |
lib/rq-2.3.1/qdb.rb
|
| Parent: | Object |
the QDB class is the low level access point to the actual sqlite database. the primary function if performs is to serialize access to the queue db via the locking protocol
| FIELDS | = | #--{{{ %w( jid priority state submitted started finished elapsed submitter runner pid exit_status tag restartable command ) |
| PRAGMAS | = | #--{{{ <<-sql PRAGMA default_synchronous = FULL; sql |
| SCHEMA | = | #--{{{ <<-sql create table jobs ( jid integer primary key, #{ FIELDS[1..-1].join ",\n " } ); create table attributes ( key, value, primary key (key) ); sql |
| DEFAULT_LOGGER | = | Logger::new(STDERR) |
| DEFAULT_SQL_DEBUG | = | false |
| DEFAULT_TRANSACTION_RETRIES | = | 4 |
| DEFAULT_AQUIRE_LOCK_SC | = | SleepCycle::new(2, 16, 2) |
| DEFAULT_TRANSACTION_RETRIES_SC | = | SleepCycle::new(8, 24, 8) |
| DEFAULT_ATTEMPT_LOCKD_RECOVERY | = | true |
| DEFAULT_LOCKD_RECOVER_WAIT | = | 1800 |
| DEFAULT_AQUIRE_LOCK_LOCKFILE_STALE_AGE | = | 1800 |
| DEFAULT_AQUIRE_LOCK_REFRESH_RATE | = | 8 |
| aquire_lock_lockfile_stale_age | [RW] | |
| aquire_lock_lockfile_stale_age | [RW] | |
| aquire_lock_refresh_rate | [RW] | |
| aquire_lock_refresh_rate | [RW] | |
| aquire_lock_sc | [RW] | |
| aquire_lock_sc | [RW] | |
| attempt_lockd_recovery | [RW] | |
| attempt_lockd_recovery | [RW] | |
| dirname | [R] | |
| fields | [R] | |
| lockd_recover_wait | [RW] | |
| lockd_recover_wait | [RW] | |
| lockfile | [R] | |
| mutex | [R] | |
| opts | [R] | |
| path | [R] | |
| schema | [R] | |
| sql_debug | [RW] | |
| sql_debug | [RW] | |
| transaction_retries | [RW] | |
| transaction_retries | [RW] | |
| transaction_retries_sc | [RW] | |
| transaction_retries_sc | [RW] |
# File lib/rq-2.3.1/qdb.rb, line 139
139: def create path, opts = {}
140: #--{{{
141: qdb = new path, opts
142: FileUtils::touch qdb.lockfile
143: create_schema qdb.schema
144: qdb.transaction do
145: qdb.execute PRAGMAS
146: qdb.execute SCHEMA
147: end
148: qdb
149: #--}}}
150: end
# File lib/rq-2.3.1/qdb.rb, line 151
151: def create_schema path
152: #--{{{
153: tmp = "#{ path }.tmp"
154: open(tmp,'w') do |f|
155: f.puts PRAGMAS
156: f.puts SCHEMA
157: end
158: FileUtils::mv tmp, path
159: #--}}}
160: end
# File lib/rq-2.3.1/qdb.rb, line 114
114: def h2t h
115: #--{{{
116: t = tuple
117: FIELDS.each{|f| t[f] = h[f]}
118: t
119: #--}}}
120: end
# File lib/rq-2.3.1/qdb.rb, line 90
90: def integrity_check dbpath
91: #--{{{
92: ret = false
93: tuple = nil
94: begin
95: db = SQLite::Database::new dbpath, 0
96: opened = true
97: db.use_array = true
98: tuple = db.execute 'PRAGMA integrity_check;'
99: ret = (tuple and tuple.first and (tuple.first["integrity_check"] =~ /^\s*ok\s*$/io))
100: ensure
101: db.close if opened
102: db = nil
103: end
104: ret
105: #--}}}
106: end
# File lib/rq-2.3.1/qdb.rb, line 181
181: def initialize path, opts = {}
182: #--{{{
183: @path = path
184: @opts = opts
185:
186: @logger =
187: Util::getopt('logger', @opts) ||
188: klass.logger ||
189: DEFAULT_LOGGER
190:
191: @sql_debug =
192: Util::getopt('sql_debug', @opts) ||
193: klass.sql_debug ||
194: ENV['RQ_SQL_DEBUG'] ||
195: DEFAULT_SQL_DEBUG
196:
197: @transaction_retries =
198: Util::getopt('transaction_retries', @opts) ||
199: klass.transaction_retries ||
200: DEFAULT_TRANSACTION_RETRIES
201:
202: @aquire_lock_sc =
203: Util::getopt('aquire_lock_sc', @opts) ||
204: klass.aquire_lock_sc ||
205: DEFAULT_AQUIRE_LOCK_SC
206:
207: @transaction_retries_sc =
208: Util::getopt('transaction_retries_sc', @opts) ||
209: klass.transaction_retries_sc ||
210: DEFAULT_TRANSACTION_RETRIES_SC
211:
212: @attempt_lockd_recovery =
213: Util::getopt('attempt_lockd_recovery', @opts) ||
214: klass.attempt_lockd_recovery ||
215: DEFAULT_ATTEMPT_LOCKD_RECOVERY
216:
217: @lockd_recover_wait =
218: Util::getopt('lockd_recover_wait', @opts) ||
219: klass.lockd_recover_wait ||
220: DEFAULT_LOCKD_RECOVER_WAIT
221:
222: @aquire_lock_lockfile_stale_age =
223: Util::getopt('aquire_lock_lockfile_stale_age', @opts) ||
224: klass.aquire_lock_lockfile_stale_age ||
225: DEFAULT_AQUIRE_LOCK_LOCKFILE_STALE_AGE
226:
227: @aquire_lock_refresh_rate =
228: Util::getopt('aquire_lock_refresh_rate', @opts) ||
229: klass.aquire_lock_refresh_rate ||
230: DEFAULT_AQUIRE_LOCK_REFRESH_RATE
231:
232:
233: @schema = "#{ @path }.schema"
234: @dirname = File::dirname(path).gsub(%|/+\s*$|,'')
235: @basename = File::basename(path)
236: @waiting_w = File::join(@dirname, "#{ Util::hostname }.#{ $$ }.waiting.w")
237: @waiting_r = File::join(@dirname, "#{ Util::hostname }.#{ $$ }.waiting.r")
238: @lock_w = File::join(@dirname, "#{ Util::hostname }.#{ $$ }.lock.w")
239: @lock_r = File::join(@dirname, "#{ Util::hostname }.#{ $$ }.lock.r")
240: @lockfile = File::join(@dirname, 'lock')
241: @lockf = Lockfile::new("#{ @path }.lock")
242: @fields = FIELDS
243: @in_transaction = false
244: @in_ro_transaction = false
245: @db = nil
246:
247: @lockd_recover = "#{ @dirname }.lockd_recover"
248: @lockd_recover_lockf = Lockfile::new "#{ @lockd_recover }.lock"
249: @lockd_recovered = false
250: #--}}}
251: end
# File lib/rq-2.3.1/qdb.rb, line 128
128: def q tuple
129: #--{{{
130: [ tuple ].flatten.map do |f|
131: if f
132: "'" << Util.escape(f,"'","'") << "'"
133: else
134: 'NULL'
135: end
136: end
137: #--}}}
138: end
# File lib/rq-2.3.1/qdb.rb, line 107
107: def t2h tuple
108: #--{{{
109: h = {}
110: FIELDS.each_with_index{|f,i| h[f] = tuple[i]}
111: h
112: #--}}}
113: end
# File lib/rq-2.3.1/qdb.rb, line 121
121: def tuple
122: #--{{{
123: t = Array::new FIELDS.size
124: t.fields = FIELDS
125: t
126: #--}}}
127: end
# File lib/rq-2.3.1/qdb.rb, line 406
406: def abort_transaction(*a)
407: #--{{{
408: raise AbortedTransactionError, *a
409: #--}}}
410: end
# File lib/rq-2.3.1/qdb.rb, line 433
433: def aquire_lock opts = {}
434: #--{{{
435: ro = Util::getopt 'read_only', opts
436: ret = nil
437:
438: @aquire_lock_sc.reset
439:
440: waiting, ltype, lfile =
441: if ro
442: [@waiting_r, File::LOCK_SH | File::LOCK_NB, @lock_r]
443: else
444: [@waiting_w, File::LOCK_EX | File::LOCK_NB, @lock_w]
445: end
446:
447: ltype_s = (ltype == File::LOCK_EX ? 'write' : 'read')
448: ltype ||= File::LOCK_NB
449:
450: aquired = false
451:
452: until aquired
453: begin
454: debug{ "aquiring lock" }
455: #@lockf.lock unless ro
456:
457: open(@lockfile, 'a+') do |lf|
458:
459: locked = false
460: refresher = nil
461: sc = nil
462:
463: begin
464: FileUtils::touch waiting
465: # poll
466: 42.times do
467: locked = lf.posixlock(ltype | File::LOCK_NB)
468: break if locked
469: sleep rand
470: end
471:
472: if locked
473: aquired = true
474: refresher = Refresher::new @lockfile, @aquire_lock_refresh_rate
475: debug{ "refresher pid <#{ refresher.pid }> refresh_rate <#{ @aquire_lock_refresh_rate }>" }
476: FileUtils::rm_f waiting rescue nil
477: FileUtils::touch lfile rescue nil
478: debug{ "aquired lock" }
479: ret = yield
480: debug{ "released lock" }
481: else
482: aquired = false
483: stat = File::stat @lockfile
484: mtime = stat.mtime
485: stale = mtime < (Time::now - @aquire_lock_lockfile_stale_age)
486: if stale
487: warn{ "detected stale lockfile of mtime <#{ mtime }>" }
488: lockd_recover if @attempt_lockd_recovery
489: end
490: sc = @aquire_lock_sc.next
491: debug{ "failed to aquire lock - sleep(#{ sc })" }
492: sleep sc
493: end
494:
495: ensure
496: if locked
497: unlocked = false
498: begin
499: 42.times do
500: unlocked = lf.posixlock(File::LOCK_UN | File::LOCK_NB)
501: break if unlocked
502: sleep rand
503: end
504: ensure
505: lf.posixlock File::LOCK_UN unless unlocked
506: end
507: end
508: refresher.kill if refresher
509: FileUtils::rm_f waiting rescue nil
510: FileUtils::rm_f lfile rescue nil
511: end
512: end
513: ensure
514: #@lockf.unlock rescue nil unless read_only
515: end
516: end
517: ret
518: #--}}}
519: end
# File lib/rq-2.3.1/qdb.rb, line 520
520: def connect
521: #--{{{
522: ret = nil
523: opened = nil
524: begin
525: raise 'db has no schema' unless test ?e, @schema
526: debug{"connecting to db <#{ @path }>..."}
527: $db = @db = SQLite::Database::new(@path, 0)
528: debug{"connected."}
529: opened = true
530: @db.use_array = true
531: ret = yield @db
532: ensure
533: @db.close if opened
534: $db = @db = nil
535: debug{"disconnected from db <#{ @path }>"}
536: end
537: ret
538: #--}}}
539: end
# File lib/rq-2.3.1/qdb.rb, line 540
540: def execute sql, &block
541: #--{{{
542: raise 'not in transaction' unless @in_transaction
543: if @sql_debug
544: logger << "SQL:\n#{ sql }\n"
545: end
546: #ret = retry_if_locked{ @db.execute sql, &block }
547: ret = @db.execute sql, &block
548: if @sql_debug and ret and Array === ret and ret.first
549: logger << "RESULT:\n#{ ret.first.inspect }\n...\n"
550: end
551: ret
552: #--}}}
553: end
# File lib/rq-2.3.1/qdb.rb, line 674
674: def integrity_check path = @path
675: #--{{{
676: debug{ "running integrity_check on <#{ path }>" }
677: klass.integrity_check(path)
678: #--}}}
679: end
# File lib/rq-2.3.1/qdb.rb, line 680
680: def lock opts = {}
681: #--{{{
682: ret = nil
683: lockd_recover_wrap do
684: aquire_lock(opts) do
685: ret = yield
686: end
687: end
688: ret
689: #--}}}
690: end
# File lib/rq-2.3.1/qdb.rb, line 582
582: def lockd_recover
583: #--{{{
584: return nil unless @attempt_lockd_recovery
585: warn{ "attempting lockd recovery" }
586: time = Time::now
587: ret = nil
588:
589: @lockd_recover_lockf.lock do
590: Util::uncache @dirname rescue nil
591: Util::uncache @path rescue nil
592: Util::uncache @lockfile rescue nil
593: Util::uncache @lockd_recover rescue nil
594: mtime = File::stat(@lockd_recover).mtime rescue time
595:
596: if mtime > time
597: warn{ "skipping lockd recovery (another node has already recovered)" }
598: ret = true
599: else
600: moved = false
601: begin
602: FileUtils::touch @lockd_recover
603: @lockd_recovered = false
604:
605: begin
606: report = "hostname : \#{ Util::hostname }\npid : \#{ Process.pid }\ntime : \#{ Time::now }\nq :\npath : \#{ @dirname }\nstat : \#{ File::stat(@dirname).inspect }\ndb :\npath : \#{ @path }\nstat : \#{ File::stat(@path).inspect }\nlockfile :\npath : \#{ @lockfile }\nstat : \#{ File::stat(@lockfile).inspect }\n"
607: info{ "LOCKD RECOVERY REPORT" }
608: logger << report
609: cmd = "mail -s LOCKD_RECOVERY ara.t.howard@noaa.gov <<eof\n#{ report }\neof"
610: Util::system cmd
611: rescue
612: nil
613: end
614:
615: warn{ "sleeping #{ @lockd_recover_wait }s before continuing..." }
616: sleep @lockd_recover_wait
617:
618: tmp = "#{ @dirname }.tmp"
619: FileUtils::rm_rf tmp
620: FileUtils::mv @dirname, tmp
621: moved = true
622:
623: rfiles = [@path, @lockfile].map{|f| File::join(tmp,File::basename(f))}
624: rfiles.each do |f|
625: ftmp = "#{ f }.tmp"
626: FileUtils::rm_rf ftmp
627: FileUtils::cp f, ftmp
628: FileUtils::rm f
629: FileUtils::mv ftmp, f
630: end
631:
632: dbtmp = File::join(tmp,File::basename(@path))
633:
634: if integrity_check(dbtmp)
635: FileUtils::mv tmp, @dirname
636: FileUtils::cp @lockd_recover_lockf.path, @lockd_recover
637: @lockd_recovered = true
638: Util::uncache @dirname rescue nil
639: Util::uncache @path rescue nil
640: Util::uncache @lockfile rescue nil
641: Util::uncache @lockd_recover rescue nil
642: warn{ "lockd recovery complete" }
643: else
644: FileUtils::mv tmp, @dirname
645: @lockd_recovered = false
646: error{ "lockd recovery failed" }
647: end
648:
649: ret = @lockd_recovered
650: ensure
651: if moved and not @lockd_recovered and tmp and test(?d, tmp)
652: FileUtils::mv tmp, @dirname
653: end
654: end
655: end
656: end
657: ret
658: #--}}}
659: end
# File lib/rq-2.3.1/qdb.rb, line 323
323: def lockd_recover_wrap opts = {}
324: #--{{{
325: ret = nil
326: try_again = false
327: begin
328: begin
329: @lockd_recovered = false
330: old_mtime =
331: begin
332: Util::uncache @lockd_recover rescue nil
333: File::stat(@lockd_recover).mtime
334: rescue
335: Time::now
336: end
337: ret = yield
338: ensure
339: new_mtime =
340: begin
341: Util::uncache @lockd_recover rescue nil
342: File::stat(@lockd_recover).mtime
343: rescue
344: old_mtime
345: end
346:
347: if new_mtime and old_mtime and new_mtime > old_mtime and not @lockd_recovered
348: try_again = true
349: end
350: end
351: rescue
352: if try_again
353: warn{ "a remote lockd recovery has invalidated this transaction!" }
354: warn{ "retrying..."}
355: sleep 120
356: retry
357: else
358: raise
359: end
360: end
361: ret
362: #--}}}
363: end
# File lib/rq-2.3.1/qdb.rb, line 693
693: def read_lock(opts = {}, &block)
694: #--{{{
695: opts['read_only'] = true
696: lock opts, &block
697: #--}}}
698: end
TODO - add sleep cycle if this ends up getting used
# File lib/rq-2.3.1/qdb.rb, line 557
557: def retry_if_locked
558: #--{{{
559: ret = nil
560: begin
561: ret = yield
562: rescue SQLite::BusyException
563: warn{ "database locked - waiting(1.0) and retrying" }
564: sleep 1.0
565: retry
566: end
567: ret
568: #--}}}
569: end
# File lib/rq-2.3.1/qdb.rb, line 285
285: def ro_transaction(opts = {}, &block)
286: #--{{{
287: opts['read_only'] = true
288: transaction(opts, &block)
289: #--}}}
290: end
# File lib/rq-2.3.1/qdb.rb, line 252
252: def ro_transaction(opts = {}, &block)
253: #--{{{
254: opts['read_only'] = true
255: transaction(opts, &block)
256: #--}}}
257: end
# File lib/rq-2.3.1/qdb.rb, line 411
411: def rollback_transaction(*a)
412: #--{{{
413: raise RollbackTransactionError, *a
414: #--}}}
415: end
# File lib/rq-2.3.1/qdb.rb, line 416
416: def sillyclean opts = {}
417: #--{{{
418: ro = Util::getopt 'read_only', opts
419: ret = nil
420: if ro
421: ret = yield
422: else
423: glob = File::join @dirname,'.nfs*'
424: orgsilly = Dir[glob]
425: ret = yield
426: newsilly = Dir[glob]
427: silly = newsilly - orgsilly
428: silly.each{|path| FileUtils::rm_rf path}
429: end
430: ret
431: #--}}}
432: end
# File lib/rq-2.3.1/qdb.rb, line 291
291: def transaction opts = {}
292: #--{{{
293: ro = Util::getopt 'read_only', opts
294: ret = nil
295: if @in_transaction
296: STDERR.puts 'continuing transaction...'
297: ret = yield
298: else
299: begin
300: STDERR.puts 'starting transaction...'
301: @in_transaction = true
302: lockd_recover_wrap(opts) do
303: transaction_wrap(opts) do
304: aquire_lock(opts) do
305: #sillyclean(opts) do
306: connect do
307: execute 'begin' unless ro
308: ret = yield
309: execute 'commit' unless ro
310: end
311: #end
312: end
313: end
314: end
315: ensure
316: @in_transaction = false
317: end
318: end
319: ret
320: #--}}}
321: end
# File lib/rq-2.3.1/qdb.rb, line 258
258: def transaction opts = {}
259: #--{{{
260: raise 'nested transaction' if @in_transaction
261: ro = Util::getopt 'read_only', opts
262: ret = nil
263: begin
264: @in_transaction = true
265: lockd_recover_wrap(opts) do
266: transaction_wrap(opts) do
267: aquire_lock(opts) do
268: #sillyclean(opts) do
269: connect do
270: execute 'begin' unless ro
271: ret = yield
272: execute 'commit' unless ro
273: end
274: #end
275: end
276: end
277: end
278: ensure
279: @in_transaction = false
280: end
281: ret
282: #--}}}
283: end
TODO - perhaps should not retry on SQLException?? yet errors seem to map to this exception even when the sql is fine… safest (and most anoying) is to simply retry.
# File lib/rq-2.3.1/qdb.rb, line 369
369: def transaction_wrap opts = {}
370: #--{{{
371: ro = Util::getopt 'read_only', opts
372: ret = nil
373: if ro
374: ret = yield
375: else
376: errors = []
377: @transaction_retries_sc.reset
378: begin
379: ret = yield
380: rescue => e
381: #rescue SQLite::DatabaseException, SQLite::SQLException, SystemCallError => e
382: case e
383: when AbortedTransactionError
384: raise
385: when RollbackTransactionError
386: raise
387: else
388: if @transaction_retries == 0
389: raise
390: elsif errors.size >= @transaction_retries
391: error{ "MAXIMUM TRANSACTION RETRIES SURPASSED" }
392: raise
393: else
394: warn{ e } if(errors.empty? or not Util::erreq(errors[-1], e))
395: errors << e
396: warn{ "retry <#{ errors.size }>..." }
397: end
398: sleep @transaction_retries_sc.next
399: retry
400: end
401: end
402: end
403: ret
404: #--}}}
405: end