#!/usr/bin/env ruby #!/dmsp/reference/bin/ruby # # make sure ruby version is good enough # require 'rbconfig' #{{{ major, minor, teeny = %w(MAJOR MINOR TEENY).map{|k| Config::CONFIG[k].to_i} unless major > 1 or major == 1 and minor >= 8 STDERR.puts("=" * 79) STDERR.puts "<#{ $0 }> requires a ruby version > 1.8.0" STDERR.puts STDERR.puts "you are currenlty running ruby version <#{ major }.#{ minor }.#{ teeny }>" STDERR.puts STDERR.puts "possible problems which could cause this are:" STDERR.puts " - improper PATH environment variable setting" STDERR.puts " - ruby > 1.8.0 has not been installed" STDERR.puts("=" * 79) exit 1 end #}}} # # builtin # require 'optparse' require 'logger' require 'yaml' require 'pp' require 'socket' require 'pathname' require 'tempfile' require 'fileutils' require 'tmpdir' # # raa - http://raa.ruby-lang.org # require 'arrayfields' require 'sqlite' require 'posixlock' require 'lockfile' # # TODO # # - break out into libraries # - api docs # - user docs # - installer script # - examples # - shell selection for jobs # - out/err redirection for jobs # - full boolean resource monitoring requests # - housekeeping - make backups of queue every so often, integrity_checks # - rq relay mode (submit and track exit_status in local db) # - message queue to send commands to remote nodes # - track feeders in queue # # TODO # # # logging support # require 'logger' module Logging #{{{ # # a module that adds an accessor to Logging objects in ored to fix a bug where # not all logging devices are put into sync mode, resulting in improper log # rolling. this is a hack. # module LoggerExt #{{{ attr :logdev #}}} end # module LoggerExt # # implementations of the methods shared by both classes and objects of classes # which include Logging # module LogMethods #{{{ def logger #{{{ if defined?(@logger) and @logger @logger else if Class === self @logger = self.default_logger else @logger = self::class::logger end raise "@logger is undefined!" unless defined?(@logger) and @logger @logger end #}}} end def logger= log #{{{ @logger = log @logger.extend LoggerExt @logger.logdev.dev.sync = true @logger #}}} end def debug(*args, &block); logger.debug(*args, &block); end def info(*args, &block); logger.info(*args, &block) ; end def warn(*args, &block); logger.warn(*args, &block) ; end def error(*args, &block); logger.error(*args, &block); end def fatal(*args, &block); logger.fatal(*args, &block); end def log_err e #{{{ if logger.debug? error{ errmsg e } else error{ emsg e } end #}}} end def emsg e #{{{ "<#{ e.class }> - <#{ e.message }>" #}}} end def btrace e #{{{ e.backtrace.join("\n") #}}} end def errmsg e #{{{ emsg(e) << "\n" << btrace(e) #}}} end #}}} end # module LogMethods EOL = "\n" DIV0 = ("." * 79) << EOL DIV1 = ("-" * 79) << EOL DIV2 = ("=" * 79) << EOL DIV3 = ("#" * 79) << EOL SEC0 = ("." * 16) << EOL SEC1 = ("-" * 16) << EOL SEC2 = ("=" * 16) << EOL SEC3 = ("#" * 16) << EOL class << self #{{{ def append_features c #{{{ ret = super c.extend LogMethods class << c def default_logger #{{{ if defined?(@default_logger) and @default_logger @default_logger else self.default_logger = Logger.new STDOUT @default_logger.debug{ "<#{ self }> using default logger"} @default_logger end #}}} end def default_logger= log #{{{ @default_logger = log @default_logger.extend LoggerExt @default_logger.logdev.dev.sync = true @default_logger #}}} end end ret #}}} end #}}} end include LogMethods #}}} end # module Logging # # random utilities # require 'pathname' require 'socket' require 'tmpdir' module Util #{{{ class << self def export sym #{{{ sym = "#{ sym }".intern module_function sym public sym #}}} end def append_features c #{{{ super c.extend Util #}}} end end def mcp obj #{{{ Marshal.load(Marshal.dump(obj)) #}}} end export 'mcp' def klass #{{{ self.class #}}} end export 'klass' def realpath path #{{{ path = File.expand_path "#{ path }" begin Pathname.new(path).realpath.to_s rescue Errno::ENOENT, Errno::ENOTDIR path end #}}} end export 'realpath' def hashify(*hashes) #{{{ hashes.inject(accum={}){|accum,hash| accum.update hash} #}}} end export 'hashify' def getopt opt, hash #{{{ opt_s = "#{ opt }" hash[opt] || hash[opt_s] || hash[opt_s.intern] #}}} end export 'getopt' def alive? pid #{{{ pid = Integer("#{ pid }") begin Process.kill 0, pid true rescue Errno::ESRCH false end #}}} end export 'alive?' def maim(pid, opts = {}) #{{{ sigs = getopt('signals', opts) || %w(SIGTERM SIGQUIT SIGKILL) suspend = getopt('suspend', opts) || 4 pid = Integer("#{ pid }") sigs.each do |sig| begin Process.kill(sig, pid) rescue Errno::ESRCH return nil end sleep 0.2 unless alive?(pid) break else sleep suspend end end not alive?(pid) #}}} end export 'maim' def timestamp time = Time.now #{{{ usec = "#{ time.usec }" usec << ('0' * (6 - usec.size)) if usec.size < 6 time.strftime('%Y-%m-%d %H:%M:%S.') << usec #}}} end export 'timestamp' def stamptime string, local = true #{{{ string = "#{ string }" pat = %r/^\s*(\d\d\d\d)-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d).(\d\d\d\d\d\d)\s*$/o match = pat.match string raise ArgumentError, "<#{ string.inspect }>" unless match yyyy,mm,dd,h,m,s,u = match.to_a[1..-1].map{|m| m.to_i} if local Time.local yyyy,mm,dd,h,m,s,u else Time.gm yyyy,mm,dd,h,m,s,u end #}}} end export 'stamptime' def escape! s, char, esc #{{{ re = %r/([#{0x5c.chr << esc}]*)#{char}/ s.gsub!(re) do (($1.size % 2 == 0) ? ($1 << esc) : $1) + char end #}}} end export 'escape!' def escape s, char, esc #{{{ ss = "#{ s }" escape! ss, char, esc ss #}}} end export 'escape' def fork(*args, &block) #{{{ begin verbose = $VERBOSE $VERBOSE = nil Process.fork(*args, &block) ensure $VERBOSE = verbose end #}}} end export 'fork' def exec(*args, &block) #{{{ begin verbose = $VERBOSE $VERBOSE = nil Kernel.exec(*args, &block) ensure $VERBOSE = verbose end #}}} end export 'exec' def hostname #{{{ @__hostname__ ||= Socket::gethostname #}}} end export 'hostname' def host #{{{ @__host__ ||= Socket::gethostname.gsub(%r/\..*$/o,'') #}}} end export 'host' def emsg e #{{{ "<#{ e.class }> - <#{ e.message }>" #}}} end export 'emsg' def btrace e #{{{ (e.backtrace or []).join("\n") #}}} end export 'btrace' def errmsg e #{{{ emsg(e) << "\n" << btrace(e) #}}} end export 'errmsg' def erreq a, b #{{{ a.class == b.class and a.message == b.message and a.backtrace == b.backtrace #}}} end export 'erreq' def tmpnam dir = Dir.tmpdir, seed = File.basename($0) #{{{ pid = Process.pid path = "%s_%s_%s_%s_%d" % [Util::hostname, seed, pid, Util::timestamp.gsub(/\s+/o,'_'), rand(101010)] File.join(dir, path) #}}} end export 'tmpnam' def uncache file #{{{ refresh = nil begin is_a_file = File === file path = (is_a_file ? file.path : file.to_s) stat = (is_a_file ? file.stat : File.stat(file.to_s)) refresh = tmpnam(File.dirname(path)) File.link path, refresh rescue File.symlink path, refresh File.chmod stat.mode, path File.utime stat.atime, stat.mtime, path ensure begin File.unlink refresh if refresh rescue Errno::ENOENT end end #}}} end export 'uncache' #}}} end # class Util # # the sleepcycle class provides timeouts for slightly better than average polling # class SleepCycle < Array #{{{ attr :min attr :max attr :range attr :inc def initialize min, max, inc #{{{ @min, @max, @inc = Float(min), Float(max), Float(inc) @range = @max - @min raise RangeError, "max < min" if @max < @min raise RangeError, "inc > range" if @inc > @range s = @min push(s) and s += @inc while(s <= @max) self[-1] = @max if self[-1] < @max reset #}}} end def next #{{{ ret = self[@idx] @idx = ((@idx + 1) % self.size) ret #}}} end def reset #{{{ @idx = 0 #}}} end #}}} end # # queue database class # TODO - defaults for timeouts should be set only for feeding # TODO - should not retry on SQLException # class QDB #{{{ include Util include Logging FIELDS = #{{{ %w( jid priority state submitted started finished elapsed submitter runner pid exit_status command ) #}}} PRAGMAS = #{{{ <<-sql PRAGMA default_synchronous = FULL; sql #}}} SCHEMA = #{{{ <<-sql create table jobs ( jid integer primary key, #{ FIELDS[1..-1].join ",\n " } ); create table attributes ( key, value, primary key (key) ); sql #}}} INDEXES = #{{{ FIELDS.inject('') do |s,f| s << "create index jobs_#{ f }_idx on jobs(#{ f });\n" end #}}} class << self #{{{ def integrity_check dbpath #{{{ ret = false tuple = nil begin db = SQLite::Database.new dbpath, 0 opened = true db.use_array = true tuple = db.execute 'PRAGMA integrity_check;' ret = (tuple and tuple.first and (tuple.first["integrity_check"] =~ /^\s*ok\s*$/io)) ensure db.close if opened db = nil end ret #}}} end def t2h tuple #{{{ h = {} FIELDS.each_with_index{|f,i| h[f] = tuple[i]} h #}}} end def h2t h #{{{ t = tuple FIELDS.each{|f| t[f] = h[f]} t #}}} end def tuple #{{{ t = Array.new FIELDS.size t.fields = FIELDS t #}}} end def q tuple #{{{ tuple.map do |f| if f "'" << Util.escape(f,"'","'") << "'" else 'NULL' end end #}}} end def create path, opts = {} #{{{ qdb = new path, opts FileUtils.touch qdb.lockfile create_schema qdb.schema qdb.transaction do qdb.execute PRAGMAS qdb.execute SCHEMA qdb.execute INDEXES end qdb #}}} end def create_schema path #{{{ tmp = "#{ path }.tmp" open(tmp,'w') do |f| f.puts PRAGMAS f.puts SCHEMA f.puts INDEXES end FileUtils.mv tmp, path #}}} end #}}} end attr :path attr :opts attr :dirname attr :schema attr :fields attr :mutex attr :in_transaction attr :lockfile attr :sql_debug, true def initialize path, opts = {} #{{{ @path = path @opts = opts @logger = getopt('logger', @opts) || Logger.new(STDERR) @sql_debug = getopt('sql_debug', @opts) || ENV['RQ_SQL_DEBUG'] || false @transaction_retries = getopt('transaction_retries', @opts) || 16 @schema = "#{ @path }.schema" @dirname = File.dirname(path).gsub(%r|/+\s*$|o,'') @basename = File.basename(path) @waiting_w = File.join(@dirname, "#{ hostname }.#{ $$ }.waiting.w") @waiting_r = File.join(@dirname, "#{ hostname }.#{ $$ }.waiting.r") @lock_w = File.join(@dirname, "#{ hostname }.#{ $$ }.lock.w") @lock_r = File.join(@dirname, "#{ hostname }.#{ $$ }.lock.r") #@lockfile = Lockfile.new File.join(@dirname, 'lock') @lockfile = File.join(@dirname, 'lock') @lockf = Lockfile.new("#{ @path }.lock") @fields = FIELDS @in_transaction = false @db = nil @lockd_recover = "#{ @dirname }.lockd_recover" @lockd_recover_lockf = Lockfile.new "#{ @lockd_recover }.lock" @lockd_recovered = false @aquire_lock_sc = SleepCycle.new 0.42, 42, 0.42 @transaction_retries_sc = SleepCycle.new 0.42, 42, 0.42 #}}} end # # --- new methods # def ro_transaction &block #{{{ transaction read_only = true, &block #}}} end def transaction read_only = false #{{{ raise 'nested transaction' if @in_transaction ret = nil begin @in_transaction = true lockd_recover_wrap do transaction_wrap do aquire_lock(read_only) do sillyclean do connect do execute 'begin' unless read_only ret = yield execute 'commit' unless read_only end end end end end ensure @in_transaction = false end ret #}}} end def lockd_recover_wrap #{{{ ret = nil try_again = false begin begin @lockd_recovered = false old_mtime = begin Util::uncache @lockd_recover File.stat(@lockd_recover).mtime rescue Time.now end ret = yield ensure new_mtime = begin Util::uncache @lockd_recover File.stat(@lockd_recover).mtime rescue old_mtime end if new_mtime and old_mtime and new_mtime > old_mtime and not @lockd_recovered try_again = true end end rescue if try_again warn{ "a remote lockd recovery has invalidated this transaction!" } warn{ "retrying..."} sleep 120 retry else raise end end ret #}}} end # # retry only write operations? # def transaction_wrap(errors = [], &block) #{{{ @transaction_retries_sc.reset begin block.call errors rescue => e #rescue SQLite::DatabaseException, SQLite::SQLException, SystemCallError => e if errors.size >= @transaction_retries error{ "MAXIMUM TRANSACTION RETRIES SURPASSED" } errors.each{|err| error{ err }} raise else warn{ e } if(errors.empty? or not Util.erreq(errors[-1], e)) errors << e warn{ "retry <#{ errors.size }>..." } end sleep @transaction_retries_sc.next retry end #}}} end def sillyclean dir = @dirname #{{{ glob = File.join dir,'.nfs*' orgsilly = Dir[glob] yield newsilly = Dir[glob] silly = newsilly - orgsilly silly.each do |path| debug{ "rm_rf sillyname <#{ path }>" } FileUtils::rm_rf path end #}}} end # # TODO - consider LOCK_UN and it's blocking effect... # TODO - back-off if many others are waiting # def aquire_lock read_only = false #{{{ ret = nil @aquire_lock_sc.reset waiting, ltype, lfile = if read_only [@waiting_r, File::LOCK_SH | File::LOCK_NB, @lock_r] else [@waiting_w, File::LOCK_EX | File::LOCK_NB, @lock_w] end ltype_s = (ltype == File::LOCK_EX ? 'write' : 'read') ltype ||= File::LOCK_NB aquired = false until aquired begin debug{ "aquiring lock" } #@lockf.lock unless read_only # # TODO - refactor this... # #line = Dir[File.join(@dirname, '*waiting*')] #line.each do |other| #debug{ "sleeping on account of <#{ other }>" } #sleep 4 #end open(@lockfile, 'a+') do |lf| locked = false refresher = nil sc = nil begin FileUtils.touch waiting # # poll # 42.times do locked = lf.posixlock(ltype | File::LOCK_NB) break if locked sleep rand end if locked aquired = true refresher = Thread.new do loop do FileUtils.touch @lockfile sleep 10 end end FileUtils.rm_f waiting rescue nil FileUtils.touch lfile debug{ "aquired lock" } ret = yield debug{ "released lock" } else aquired = false stat = File.stat @lockfile mtime = stat.mtime stale = mtime < (Time.now - 600) lockd_recover if stale sc = @aquire_lock_sc.next debug{ "failed to aquire lock - sleep(#{ sc })" } sleep sc end ensure if locked unlocked = false begin 42.times do unlocked = lf.posixlock(File::LOCK_UN | File::LOCK_NB) break if unlocked sleep rand end ensure lf.posixlock File::LOCK_UN unless unlocked end end refresher.kill if refresher FileUtils.rm_f waiting rescue nil FileUtils.rm_f lfile rescue nil end end ensure #@lockf.unlock rescue nil unless read_only end end ret #}}} end def connect #{{{ ret = nil opened = nil begin raise 'db has no schema' unless test ?e, @schema @db = SQLite::Database.new @path, 0 opened = true @db.use_array = true ret = yield @db ensure @db.close if opened @db = nil end ret #}}} end def execute sql, &block #{{{ raise 'not in transaction' unless @in_transaction debug{ "executing\n#{ sql.gsub %r/^\s*/o, '' }" } if @sql_debug #ret = retry_if_locked{ @db.execute sql, &block } ret = @db.execute sql, &block if @sql_debug and ret and ret.first debug{ "res\n#{ res.first.inspect }\n..." } end ret #}}} end def uncache #{{{ Util::uncache @dirname rescue nil Util::uncache @path rescue nil Util::uncache @lockfile rescue nil Util::uncache @lockd_recover rescue nil #}}} end # # TODO - add sleep cycle # def retry_if_locked #{{{ ret = nil begin ret = yield rescue SQLite::BusyException warn{ "database locked - waiting(1.0) and retrying" } sleep 1.0 retry end ret #}}} end def vacuum #{{{ raise 'nested transaction' if @in_transaction begin @in_transaction = true connect{ execute 'vacuum' } ensure @in_transaction = false end self #}}} end # # TODO - check db ok after - itegrity check? # def lockd_recover #{{{ warn{ "attempting lockd recovery" } time = Time.now ret = nil @lockd_recover_lockf.lock do uncache mtime = File.stat(@lockd_recover).mtime rescue time if mtime > time warn{ "skipping lockd recovery (already recovered)" } ret = true else moved = false begin FileUtils.touch @lockd_recover @lockd_recovered = false begin report = <<-msg hostname : #{ Util::hostname } pid : #{ Process.pid } time : #{ Time.now } q : path : #{ @dirname } stat : #{ File.stat(@dirname).inspect } db : path : #{ @path } stat : #{ File.stat(@path).inspect } lockfile : path : #{ @lockfile } stat : #{ File.stat(@lockfile).inspect } msg logger << report cmd = "mail -s LOCKD_RECOVERY ara.t.howard@noaa.gov < does not exist" unless test ?e, @path raise "q <#{ @path }> is not a directory" unless test ?d, @path @basename = File.basename(@path) @dirname = File.dirname(@path) @logger = getopt('logger', opts) || Logger.new(STDERR) @qdb = getopt('qdb', opts) || QDB.new(File.join(@path, 'db'), 'logger' => @logger) @empty = false #}}} end def submit(priority, *commands) #{{{ now = Util.timestamp Time.now tuples = nil sql = '' commands.each do |command| tuple = QDB.tuple tuple['priority'] = priority tuple['command'] = command tuple['state'] = 'pending' tuple['submitted'] = now tuple['submitter'] = hostname values = QDB.q tuple sql << "insert into jobs values (#{ values.join ',' });\n" end tuples = nil transaction do execute(sql){} if block_given? sql = "select * from jobs where submitted = '#{ now }'" tuples = execute sql end end tuples.each{|t| yield t} if tuples tuples = nil self #}}} end def list(*whats) #{{{ whats.replace(['pending', 'running', 'finished', 'dead']) if whats.empty? or whats.include?('all') whats.map! do |what| case what when %r/^\s*p/io 'pending' when %r/^\s*r/io 'running' when %r/^\s*f/io 'finished' when %r/^\s*d/io 'dead' else what end end where_clause = QDB.q(whats).map{|f| "state = #{ f }\n"}.join(' or ') sql = <<-sql select * from jobs where #{ where_clause } sql puts '---' fields = @qdb.fields tuples = ro_transaction{ execute sql } tuples.each do |tuple| puts '-' fields.each{|f| puts " #{ f }: #{ tuple[f] }" } end tuples = nil self #}}} end def status(*whats) #{{{ whats.replace(['pending', 'running', 'finished', 'dead']) if whats.empty? or whats.include?('all') whats.map! do |what| case what when %r/^\s*p/io 'pending' when %r/^\s*r/io 'running' when %r/^\s*f/io 'finished' when %r/^\s*d/io 'dead' else what end end puts '---' ro_transaction do whats.each do |what| sql = <<-sql select count(*) from jobs where state = '#{ what }'; sql tuples = execute sql tuple = tuples.first count = (tuple ? tuple.first : 0) puts "#{ what } : #{ count }" end end self #}}} end def query where_clause = nil #{{{ # # quote everything on the rhs of an '=' sign - helps with shell problems... # where_clause.gsub!(/(=\s*([^\s')(=]+))/om){q = $2.gsub(%r/'+|\s+/o,''); "='#{ q }'"} sql = if where_clause "select * from jobs where #{ where_clause };" else "select * from jobs;" end puts '---' fields = @qdb.fields tuples = ro_transaction{ execute sql } tuples.each do |tuple| puts '-' fields.each{|f| puts " #{ f }: #{ tuple[f] }"} end tuples = nil self #}}} end def delete(*jids) #{{{ what = jids.first || 'all' case what when String sql = case what when %r/^\s*p/io "delete from jobs where state = 'pending';" when %r/^\s*r/io "delete from jobs where state = 'running';" when %r/^\s*f/io "delete from jobs where state = 'finished';" when %r/^\s*d/io "delete from jobs where state = 'dead';" when %r/^\s*a/io "delete from jobs;" else raise ArgumentError, "cannot delete <#{ what }>" end else sql = '' jids.each do |jid| jid = Integer jid sql << "delete from jobs where jid = #{ jid };" end end transaction{ execute sql } sql = nil @qdb.vacuum self #}}} end def transaction(*args, &block) #{{{ @qdb.transaction(*args, &block) #}}} end def ro_transaction &block #{{{ @qdb.ro_transaction &block #}}} end def execute(*args, &block) #{{{ @qdb.execute(*args, &block) #}}} end def integrity_check(*args, &block) #{{{ @qdb.integrity_check(*args, &block) #}}} end def snapshot qtmp = "#{ @basename }.snapshot", retries = 16 #{{{ debug{ "snapshot <#{ @path }> -> <#{ qtmp }>" } debug{ "retries <#{ retries }>" } qss = nil loopno = 0 loop do raise "surpassed retries <#{ retries }>" if loopno >= retries FileUtils.rm_rf qtmp FileUtils.mkdir_p qtmp %w(db db.schema lock).each do |base| src, dest = File.join(@path, base), File.join(qtmp, base) debug{ "cp <#{ src }> -> <#{ dest }>" } FileUtils.cp(src, dest) end qss = klass.new qtmp, @opts if qss.integrity_check debug{ "successfully created <#{ qtmp }>" } break else debug{ "failed to create <#{ qtmp }> - retrying" } end loopno += 1 end qss #}}} end def getjob #{{{ sql = <<-sql select * from jobs where state = 'pending' order by priority desc, submitted asc limit 2; sql tuples = execute sql job = tuples.first @empty = (tuples.size == 1) job #}}} end def jobisrunning job #{{{ sql = <<-sql update jobs set pid = '#{ job['pid'] }', state = '#{ job['state'] }', started = '#{ job['started'] }', runner = '#{ job['runner'] }' where jid = #{ job['jid'] }; sql #transaction{ execute sql } execute sql #}}} end def jobisdone job #{{{ sql = <<-sql update jobs set state = '#{ job['state'] }', exit_status = '#{ job['exit_status'] }', finished = '#{ job['finished'] }', elapsed = '#{ job['elapsed'] }' where jid = #{ job['jid'] }; sql #transaction{ execute sql } execute sql #}}} end def mtime #{{{ File.stat(@path).mtime #}}} end def []= key, value #{{{ sql = "select count(*) from attributes where key='#{ key }';" tuples = @qdb.execute sql tuple = tuples.first count = Integer tuple['count(*)'] case count when 0 sql = "insert into attributes values('#{ key }','#{ value }');" @qdb.execute sql when 1 sql = "update attributes set key='#{ key }', value='#{ value }' where key='#{ key }';" @qdb.execute sql else raise "key <#{ key }> has become corrupt!" end #}}} end def attributes #{{{ h = {} tuples = @qdb.execute "select * from attributes;" tuples.map!{|t| h[t['key']] = t['value']} h #}}} end #}}} end # # main program # class Main #{{{ VERSION = '0.1.3' PROGNAM = File.basename(File.expand_path($0)) DEFAULT_CONFIG_PATH = "#{ PROGNAM }.conf" CONFIG_SEARCH_PATH = %w(. ~ /usr/local/etc /usr/etc /etc) EXIT_SUCCESS = 0 EXIT_FAILURE = 1 attr :argv attr :op attr :logger attr :logdev attr :config attr :mode attr :q attr :qpath attr :hostname def initialize argv = cp(ARGV) #{{{ @argv = argv @hostname = Util.hostname parse_opts parse_argv #}}} end def run #{{{ #gen_template and exit EXIT_SUCCESS if @opt_template usage and exit EXIT_SUCCESS if @opt_help #init_logging #init_config #debug{ "qpath <#{ @qpath }>" } #debug{ "mode <#{ @mode }>" } @qpath = Util.realpath @qpath unless @mode == 'feed' or @mode == 'f' init_logging debug{ "qpath <#{ @qpath }>" } debug{ "mode <#{ @mode }>" } else @logger = Logger.new STDERR end begin case @mode when 'create', 'c' create when 'list', 'l' list when 'status', 't' status when 'submit', 's' submit when 'delete', 'd' delete when 'feed', 'f' feed when 'query', 'q' query when 'conf', 'o' conf when 'snapshot', 'p' snapshot when 'help', 'h' usage and exit else raise "unknown mode <#{ @mode || 'no mode' }>" end rescue Errno::EPIPE end #}}} end def snapshot #{{{ set_q qtmp = @argv.shift raise "<#{ qtmp }> exists" if qtmp and test(?e,qtmp) qss = @q.snapshot qtmp #}}} end def set_q create = false #{{{ @q = nil if create @q = Queue.create @qpath, 'logger' => @logger else raise "q <#{ @qpath }> does not exist" unless test ?e, @qpath @q = Queue.new @qpath, 'logger' => @logger if opt_snapshot ss = "#{ $0 }_#{ Process.pid }_#{ Thread.current.id.abs }_#{ rand(Time.now.to_i) }".gsub(%r|/|o,'_') qtmp = File.join(Dir.tmpdir, ss) @q = @q.snapshot qtmp at_exit{ FileUtils.rm_rf qtmp } end end #}}} end def create #{{{ set_q true debug{ "created <#{ @q.path }>" } #}}} end def list #{{{ set_q @q.list(*@argv) #}}} end def status #{{{ set_q @q.status(*@argv) #}}} end def query #{{{ set_q where_clause = @argv.join ' ' if where_clause.empty? debug{ "reading where_clause from STDIN" } while((buf = STDIN.gets)) buf.strip! buf.gsub! %r/#.*$/o, '' next if buf.empty? where_clause << "#{ buf } " end end @q.query where_clause #}}} end def conf #{{{ set_q unless @argv.empty? kv_pat = %r/^\s*([^\s]+)\s*=+\s*([^\s]+)\s*$/o @argv.each do |arg| match = kv_pat.match arg if match k, v = match[1], match[2] @q[k] = v end end end attributes = @q.attributes y attributes #}}} end def submit #{{{ set_q commands = (@argv.empty? ? [] : [@argv.join(' ')]) joblist = nil if opt_infile or commands.empty? stdin = (opt_infile == '-') || commands.empty? begin f = (stdin ? STDIN : open(opt_infile)) while((line = f.gets)) if f.lineno == 1 and line =~ %r/^---\s*$/o joblist = YAML::load(f.read) joblist.each{|job| commands << job["command"]} if joblist else line.gsub!(%r/(?:^\s+)|(?:\s+$)|(?:#.*$)/o, '') next if line.empty? commands << line debug{ "parsed command <#{ line.inspect }>" } end end ensure f.close unless stdin end end priority = opt_priority || 0 debug{ " priority <#{ priority }>" } if opt_quiet @q.submit(priority, *commands) else puts '---' fields = QDB::FIELDS @q.submit(priority, *commands) do |tuple| puts '-' fields.each{|f| puts " #{ f }: #{ tuple[f] }"} end end commands = nil self #}}} end def delete #{{{ set_q jids = @argv if jids.empty? pat = %r/^\s*jid\s*:\s*(\d+)\s*$/io while((line = STDIN.gets)) match = pat.match line next unless match begin jids << Integer(match[1]) rescue raise "syntax error in <#{ line.inspect }>" end end end if opt_quiet @q.delete(*jids) else @q.delete(*jids) jids.each{|jid| puts "- #{ jid }"} end #}}} end def feed #{{{ STDOUT.sync = true @started = Util.timestamp @min_sleep = Integer(opt_min_sleep || 42) @max_sleep = Integer(opt_max_sleep || 240) @max_feed = Integer(opt_feed || 2) @children = {} trap('SIGHUP') do $signaled = $sighup = true warn{ "signal " } end trap('SIGTERM') do $signaled = $sigterm = true warn{ "signal " } end trap('SIGINT') do $signaled = $sigint = true warn{ "signal " } end daemon do set_q gen_pidfile init_logging info{ "#{ PROGNAM } STARTED" } info{ "pidfile <#{ @pidfile.path }>" } info{ "qpath <#{ @qpath }>" } info{ "mode <#{ @mode }>" } fill_morgue loop do if $signaled reap_jobs(reap_only = true) until nothing_running? if $sighup cmd = ([$0] + ARGV).join(' ') info{ "#{ PROGNAM } RESTARTING" } @pidfile.posixlock(File::LOCK_UN) Util.exec cmd else info{ "#{ PROGNAM } STOPPED" } exit EXIT_SUCCESS end end throttle(@min_sleep) do debug{ "feed:starting jobs" } start_jobs unless busy? last_time = Time.now debug{ "feed:n_running <#{ @children.size }>" } if nothing_running? relax else reap_jobs end end end end #}}} end def fill_morgue #{{{ debug{ "filling morgue" } sql = <<-sql select * from jobs where state = 'running' and runner = '#{ hostname }' and started <= '#{ @started }' sql db = @q.qdb transaction do tuples = db.execute sql tuples.each do |tuple| jid = tuple['jid'] if jid sql = "update jobs set state='dead' where jid='#{ jid }'" db.execute sql info{ "burried job <#{ jid }>" } end end end debug{ "filled morgue" } #}}} end def throttle rate = 42 #{{{ if rate if defined? @last_throttle_time and @last_throttle_time elapsed = Time.now - @last_throttle_time timeout = rate - elapsed if timeout > 0 timeout = timeout + rand(rate * 0.10) debug{ "throttle rate of <#{ rate }> exceeded - sleeping <#{ timeout }>" } sleep timeout end end @last_throttle_time = Time.now end yield #}}} end def start_jobs #{{{ n_started = 0 transaction do until busy? break unless((job = @q.getjob)) start_job job n_started += 1 end end debug{ "start_jobs:n_started <#{ n_started }>" } n_started #}}} end def start_job job #{{{ jid = job['jid'] command = job['command'] r,w = IO.pipe cid = Util::fork do w.close STDIN.reopen r exec ['bash', "rq_job__#{ jid }__"], '--login' end r.close w.puts command w.close if cid job['pid'] = cid job['state'] = 'running' job['started'] = Util.timestamp(Time.now) job['runner'] = hostname @children[cid] = job @q.jobisrunning job info{ "jid <#{ job['jid'] }> started as pid <#{ job['pid'] }>" } info{ "command <#{ job['command'] }>" } else warn{ "jid <#{ job['jid'] }> failed to start" } end cid #}}} end def nothing_running? #{{{ @children.size == 0 #}}} end def reap_jobs reap_only = false, blocking = true #{{{ reaped = [] if blocking cid, status = Process.waitpid2 -1, Process::WUNTRACED else cid, status = Process.waitpid2 -1, Process::WNOHANG | Process::WUNTRACED end if cid and status job = @children[cid] finish_job job, status transaction do loopno = 0 loop do @q.jobisdone job @children.delete cid reaped << cid start_jobs unless reap_only or $signaled if @children.size == 0 or loopno > 42 break else sleep 0.1 cid, status = Process.waitpid2 -1, Process::WNOHANG | Process::WUNTRACED break unless cid and status job = @children[cid] finish_job job, status end loopno += 1 end end end debug{ "reap_jobs:n_reaped <#{ reaped.size }>" } reaped #}}} end def finish_job job, status #{{{ job['state'] = 'finished' job['exit_status'] = status.exitstatus rescue nil job['finished'] = Util.timestamp(Time.now) job['elapsed'] = Util.stamptime(job['finished']) - Util.stamptime(job['started']) info{ "jid <#{ job['jid'] }> exit_status <#{ job['exit_status'] }>" } #}}} end def transaction #{{{ ret = nil if @in_transaction ret = yield else begin @in_transaction = true @q.transaction{ ret = yield } ensure @in_transaction = false end end ret #}}} end def busy? #{{{ @children.size >= @max_feed #}}} end def relax #{{{ seconds = rand(@max_sleep - @min_sleep + 1) + @min_sleep debug{ "sleep <#{ seconds }>" } sleep seconds #}}} end def daemon #{{{ if opt_daemon fork do Process.setsid #Process.setpgrp #trap 'SIGCLD', 'IGNORE' fork do Dir.chdir(Util.realpath('~')) #File.umask 0 open('/dev/null','r+') do |f| STDIN.reopen f STDOUT.reopen f STDERR.reopen f end daemon_wrap{ yield } exit EXIT_SUCCESS end exit! end exit! else daemon_wrap{ yield } exit EXIT_SUCCESS end #}}} end def daemon_wrap #{{{ begin yield rescue Exception => e unless SystemExit === e or opt_quiet fatal{ e } rescue nil end exit EXIT_FAILURE end #}}} end def gen_pidfile #{{{ name = gen_feeder_name(opt_name || @q.path) @pidfile = begin open name, File::CREAT | File::EXCL | File::RDWR rescue open name, File::RDWR end #unless @pidfile and @pidfile.flock(File::LOCK_EX | File::LOCK_NB) unless @pidfile and @pidfile.posixlock(File::LOCK_EX | File::LOCK_NB) pid = IO.read(name) rescue nil pid ||= 'unknown' raise "process <#{ pid }> is already feeding from this queue" else @pidfile.rewind @pidfile.sync = true @pidfile.print Process.pid @pidfile.truncate @pidfile.pos at_exit{ FileUtils.rm_f name rescue nil } end #}}} end def gen_feeder_name path #{{{ path = Util.realpath(path).gsub(%r|/|o, '_') File.join(Util.realpath('~'), ".#{ path }.feeder") #}}} end def timestamp t #{{{ time = Time.now usec = time.usec.to_s usec << '0' while usec.size < 6 time.strftime('%Y-%m-%d %H:%M:%S.') << usec #}}} end def parse_opts #{{{ @op = OptionParser.new @op.banner = '' define_options #begin @op.parse! argv #rescue OptionParser::InvalidOption => e # preverve unknown options #e.recover(argv) #rescue Exception => e #STDERR.puts PROGNAM #STDERR.puts @op #exit 2 #end #}}} end def parse_argv #{{{ @qpath = ENV['RQ_Q'] || @argv.shift @mode = @argv.shift #}}} end def usage io = STDOUT #{{{ io << USAGE io << "\n" io << @op io << "\n" io << EXAMPLES if defined? EXAMPLES self #}}} end def define_options #{{{ options = [ %w(--feed=appetite -f), %w(--priority=priority -p), %w(--name), %w(--daemon -d), %w(--quiet -q), %w(--select -e), %w(--snapshot -s), %w(--infile=infile -i), %w(--max_sleep=seconds -M), %w(--min_sleep=seconds -m), %w(--log=path -l), %w(--verbosity=0-4|debug|info|warn|error|fatal -v), %w(--log_age=log_age), %w(--log_size=log_size), %w(--config=path -c), %w(--template=template), %w(--help -h), ] options.each do |option| opt = option.first.gsub(%r/(?:--)|(?:=.*$)/o,'').strip get, set = opt_attr opt @op.def_option(*option){|v| self.send(set, (v or true))} end #}}} end %w(debug info warn error fatal).each do |m| eval "def #{ m }(*args,&block);@logger.#{ m }(*args,&block);end" end def gen_template #{{{ Config::gen_template(opt_template) self #}}} end def init_logging #{{{ if @opt_log_age @opt_log_age = @opt_log_age.to_i if @opt_log_age =~ /\d/ end if @opt_log_size @opt_log_size = @opt_log_size.to_i if @opt_log_size =~ /\d/ end $logger = @logger = Logger.new(@opt_log || STDERR, @opt_log_age, @opt_log_size) # # hack to fix Logger sync bug # class << @logger; attr :logdev; end @logdev = @logger.logdev.dev @logdev.sync = true level = nil @opt_verbosity ||= 'info' @opt_verbosity = case @opt_verbosity when /^\s*(?:4|d|debug)\s*$/io level = 'Logging::DEBUG' 4 when /^\s*(?:3|i|info)\s*$/io level = 'Logging::INFO' 3 when /^\s*(?:2|w|warn)\s*$/io level = 'Logging::WARN' 2 when /^\s*(?:1|e|error)\s*$/io level = 'Logging::ERROR' 1 when /^\s*(?:0|f|fatal)\s*$/io level = 'Logging::FATAL' 0 else abort "illegal verbosity setting <#{ @opt_verbosity }>" end @logger.level = 2 - ((@opt_verbosity % 5) - 2) debug {"logging level <#{ level }>"} #}}} end def init_config #{{{ begin $config = @config = nil if @opt_config $config = @config = Config.new(@opt_config) else CONFIG_SEARCH_PATH.each do |dir| path = File.join dir, DEFAULT_CONFIG_PATH if test ?e, path $config = @config = Config.new(path) break end end end $config = @config = Config.new('default') unless @config rescue => e fatal{ e } exit EXIT_FAILURE end debug { "config.path <#{ @config.path }>" } debug { "config <\n#{ PP::pp @config, ''}\n>" } #}}} end def opt_attr opt #{{{ get = "opt_#{ opt }" set = "#{ get }=" code = <<-code class << self def #{ get }; defined?(@#{ get }) ? @#{ get } : nil; end def #{ set } value; @#{ get } = value; end end code instance_eval code [get, set] #}}} end def cp obj #{{{ Marshal.load(Marshal.dump(obj)) #}}} end def pretty obj #{{{ PP::pp obj, '' #}}} end class Config < Hash #{{{ class << self def gen_template(path) #{{{ @template ||= DATA.read case path when String open(path, 'w'){|f| f.write @template} else STDOUT.write @template end self #}}} end def load_default #{{{ @loaded ||= YAML::load(munge(DATA.read)) || {} #}}} end def munge buf #{{{ buf.gsub(%r/\t/o,' ') #}}} end end attr :path def initialize path #{{{ @path = nil yaml = nil if path.nil? or path and path =~ /^\s*default/io yaml = self.class.load_default @path = 'DEFAULT_CONFIG' else path yaml = YAML::load(self.class.munge(open(path).read)) @path = path end self.update yaml #}}} end #}}} end USAGE = #{{{ <<-usage NAME #{ PROGNAM } v#{ VERSION } SYNOPSIS #{ PROGNAM } [queue] mode [mode_args]* [options]* DESCRIPTION #{ PROGNAM } is an __experimental__ tool used to manage nfs mounted work queues. multiple instances of #{ PROGNAM } running from multiples hosts can work from these queues to distribute processing load to 'n' nodes - bringing many dozens of otherwise powerful cpus to their knees with a single blow. clearly this software should be kept out of the hands of radicals, SETI enthusiasts, and one mr. jeff safran. #{ PROGNAM } operates in one of the modes create, submit, feed, list, delete, query, snapshot, or help. depending on the mode of operation and the options used the meaning of mode_args may change, sometime wildly and unpredictably (i jest, of course). MODES abbreviations for many modes exist c => create s => submit f => feed l => list d => delete q => query p => snapshot h => help create, c : creates a queue. the queue MUST be located on an nfs mounted file system visible from all nodes intended to run jobs from it. examples : 0) to create a queue ~ > #{ PROGNAM } q create or simply ~ > #{ PROGNAM } q c list, l : show combinations of pending, running, dead, or finished jobs. for this command mode_args must be one of pending, running, dead, finished, or all. the default is all. mode_args may be abbreviated to uniqueness, therefore the following shortcuts apply : p => pending r => running f => finished d => dead a => all examples : 0) show everything in q ~ > #{ PROGNAM } q list all or ~ > #{ PROGNAM } q l all or ~ > export RQ_Q=q ~ > #{ PROGNAM } l 0) show q's pending jobs ~ > #{ PROGNAM } q list pending 1) show q's running jobs ~ > #{ PROGNAM } q list running 2) show q's finished jobs ~ > #{ PROGNAM } q list finshed submit, s : submit jobs to a queue to be proccesed by any feeding node. any mode_args are taken as the command to run. note that mode_args are subject to shell expansion - if you don't understand what this means do not use this feature. when running in submit mode a file may by specified as a list of commands to run using the '--infile, -i' option. this file is taken to be a newline separated list of commands to submit, blank lines and comments (#) are allowed. if submitting a large number of jobs the input file method is MUCH more efficient. if no commands are specified on the command line #{ PROGNAM } automaticallys reads them from STDIN. yaml formatted files are also allowed as input (http://www.yaml.org/) - note that output of nearly all #{ PROGNAM } commands is valid yaml and may, therefore, be piped as input into the submit command. the '--priority, -p' option can be used here to determine the priority of jobs. priorities may be any number (0, 10]; therefore 9 is the maximum priority. submitting a high priority job will NOT supplant currently running low priority jobs, but higher priority jobs will always migrate above lower priority jobs in the queue in order that they be run sooner. note that constant submission of high priority jobs may create a starvation situation whereby low priority jobs are never allowed to run. avoiding this situation is the responsibility of the user. examples : 0) submit the job ls to run on some feeding host ~ > #{ PROGNAM } q s ls 1) submit the job ls to run on some feeding host, at priority 9 ~ > #{ PROGNAM } -p9 q s ls 2) submit 42000 jobs (quietly) to run from a command file. ~ > wc -l cmdfile 42000 ~ > #{ PROGNAM } q s -q < cmdfile 3) submit 42 jobs to run at priority 9 from a command file. ~ > wc -l cmdfile 42 ~ > #{ PROGNAM } -p9 q s < cmdfile 4) re-submit all finished jobs ~ > #{ PROGNAM } q l f | #{ PROGNAM } q s feed, f : take jobs from the queue and run them on behalf of the submitter. jobs are taken from the queue in an 'oldest highest priority' order. feeders can be run from any number of nodes allowing you to harness the CPU power of many nodes simoultaneously in order to more effectively clobber your network. the most useful method of feeding from a queue is to do so in daemon mode so that if the process loses it's controling terminal and will not exit when you exit your terminal session. use the '--daemon, -d' option to accomplish this. by default only one feeding process per host per queue is allowed to run at any given moment. because of this it is acceptable to start a feeder at some regular interval from a cron entry since, if a feeder is alreay running, the process will simply exit and otherwise a new feeder will be started. in this way you may keep feeder processing running even acroess machine reboots. examples : 0) feed from a queue verbosely for debugging purposes, using a minimum and maximum polling time of 2 and 4 respectively ~ > #{ PROGNAM } q feed -v4 -m2 -M4 1) feed from a queue in daemon mode logging into /home/ahoward/rq.log ~ > #{ PROGNAM } q feed -d -l/home/ahoward/rq.log 2) use something like this sample crontab entry to keep a feeder running forever (it attempts to (re)start every fifteen minutes) # # your crontab file # */15 * * * * /full/path/to/bin/rq /full/path/to/nfs/mounted/q feed -d -l/home/username/cfq.log -q >>/home/cfadmin/cfq.oe 2>&1 log rolling while running in daemon mode is automatic. delete, d : delete combinations of pending, running, finished, dead, or specific jobs. the delete mode is capable of parsing the output of list mode, making it possible to create filters to delete jobs meeting very specific conditions. mode_args are the same as for 'list', including 'running'. note that it is possible to 'delete' a running job, but there is no way to actually STOP it mid execution since the node doing the deleteing has no way to communicate this information to the (possibly) remote execution host. therefore you should use the 'delete running' feature with care and only for housekeeping purposes or to prevent future jobs from being scheduled. examples : 0) delete all pending, running, and finished jobs from a queue ~ > #{ PROGNAM } q d all 1) delete all pending jobs from a queue ~ > #{ PROGNAM } q d p 2) delete all finished jobs from a queue ~ > #{ PROGNAM } q d f 3) delete jobs via hand crafted filter program ~ > #{ PROGNAM } q list | filter_prog | #{ PROGNAM } q d query, q : query exposes the database more directly the user, evaluating the where clause specified on the command line (or from STDIN). this feature can be used to make a fine grained slection of jobs for reporting or as input into the delete command. you must have a basic understanding of SQL syntax to use this feature, but it is fairly intuitive in this capacity. examples: 0) show all jobs submitted within a specific 10 minute range ~ > #{ PROGNAM } q query "started >= '2004-06-29 22:51:00' and started < '2004-06-29 22:51:10'" 1) shell quoting can be tricky here so input on STDIN is also allowed ~ > cat contraints started >= '2004-06-29 22:51:00' and started < '2004-06-29 22:51:10' ~ > #{ PROGNAM } q query < contraints or (same thing) ~ > cat contraints | #{ PROGNAM } q query 2) this query output may then be used to delete specific jobs ~ > cat contraints | #{ PROGNAM } q query | #{ PROGNAM } q d 3) show all jobs which are either finished or dead ~ > #{ PROGNAM } q q state=finished or state=dead snapshot, p : snapshot provides a means of taking a snapshot of the q when many queries are going to be run against it, perhaps to figure out a command, so that your commands will not compete with the feeders for the queue's lock. examples: 0) take a snapshot using default snapshot naming, which is made via the basename of the q plus '.snapshot' ~ > #{ PROGNAM } /path/to/nfs/q snapshot ~ > #{ PROGNAM } ./q.snapshot status ~ > #{ PROGNAM } ./q.snapshot list running ~ > #{ PROGNAM } ./q.snapshot list running | grep `hostname` note that there is also a snapshot option - this option is not the same as the snapshot command. the option can be applied to any command and then that command will be run on a snapshot of the database before removing the snapshot. this is really only useful if one were to need to run a command against a heavily loaded queue. eg. 0) get the status of a heavily loaded queue ~ > #{ PROGNAM } q t --snapshot NOTES - realize that your job is going to be running on a remote host and this has implication. paths, for example, should be absolute, not relative. specifically the submitted job must be visible from all hosts currently feeding from a q. - jobs are currently run under the bash shell using the --login option. therefore any settings in your .bashrc will apply. - you need to consider __CAREFULLY__ what the ramifications of having multiple instances of your program all running at the same time will be. it is beyond the scope of #{ PROGNAM } to ensure multiple instances of a program will not overwrite each others output files, for instance. coordination of programs is left entirely to the user. - the list of finished jobs will grow without bound unless you sometimes delete some (all) of them. the reason for this is that #{ PROGNAM } cannot know when the user has collected the exit_status, etc. from a job and so keeps this information in the queue until instructed to delete it. ENVIRONMENT RQ_Q: full path to queue the queue argument to all commands may be omitted if, and only if, the environment variable 'RQ_Q' contains the full path to the q. eg. ~ > export RQ_Q=/full/path/to/my/q this feature can save a considerable amount of typing for those weak of wrist DIAGNOSTICS success => $? == 0 failure => $? != 0 AUTHOR ara.t.howard@noaa.gov BUGS 1 < bugno && bugno <= 42 OPTIONS usage #}}} EXAMPLES = #{{{ <<-examples examples #}}} #}}} end if $0 == __FILE__ Main.new.run end __END__ # # default config # key : value