#!/dmsp/reference/bin/ruby # # make sure ruby version is good enough # require 'rbconfig' #{{{ major, minor, teeny = %w(MAJOR MINOR TEENY).map{|k| Config::CONFIG[k].to_i} unless major > 1 or major == 1 and minor >= 8 STDERR.puts("=" * 79) STDERR.puts "<#{ $0 }> requires a ruby version > 1.8.0" STDERR.puts STDERR.puts "you are currenlty running ruby version <#{ major }.#{ minor }.#{ teeny }>" STDERR.puts STDERR.puts "possible problems which could cause this are:" STDERR.puts " - improper PATH environment variable setting" STDERR.puts " - ruby > 1.8.0 has not been installed" STDERR.puts("=" * 79) exit 1 end #}}} # # builtin # require 'optparse' require 'logger' require 'yaml' require 'pp' require 'socket' require 'pathname' require 'tempfile' require 'fileutils' # # raa - http://raa.ruby-lang.org # require 'arrayfields' require 'sqlite' require 'posixlock' require 'lockfile' # TODO - shell selection for jobs # TODO - out/err redirection for jobs # TODO - resource monitoring requests # TODO - housekeeping - make backups of queue every so often! # # logging support # module Logging #{{{ # # a module that adds an accessor to Logging objects in ored to fix a bug where # not all logging devices are put into sync mode, resulting in improper log # rolling. this is a hack. # module LoggerExt #{{{ attr :logdev #}}} end # module LoggerExt # # implementations of the methods shared by both classes and objects of classes # which include Logging # module LogMethods #{{{ def logger #{{{ if defined?(@logger) and @logger @logger else if Class === self @logger = self.default_logger else @logger = self::class::logger end raise "@logger is undefined!" unless defined?(@logger) and @logger @logger end #}}} end def logger= log #{{{ @logger = log @logger.extend LoggerExt @logger.logdev.dev.sync = true @logger #}}} end def debug(*args, &block); logger.debug(*args, &block); end def info(*args, &block); logger.info(*args, &block) ; end def warn(*args, &block); logger.warn(*args, &block) ; end def error(*args, &block); logger.error(*args, &block); end def fatal(*args, &block); logger.fatal(*args, &block); end def log_err e #{{{ if logger.debug? error{ errmsg e } else error{ emsg e } end #}}} end def emsg e #{{{ "<#{ e.class }> - <#{ e.message }>" #}}} end def btrace e #{{{ e.backtrace.join("\n") #}}} end def errmsg e #{{{ emsg(e) << "\n" << btrace(e) #}}} end #}}} end # module LogMethods EOL = "\n" DIV0 = ("." * 79) << EOL DIV1 = ("-" * 79) << EOL DIV2 = ("=" * 79) << EOL DIV3 = ("#" * 79) << EOL SEC0 = ("." * 16) << EOL SEC1 = ("-" * 16) << EOL SEC2 = ("=" * 16) << EOL SEC3 = ("#" * 16) << EOL class << self #{{{ def append_features c #{{{ ret = super c.extend LogMethods class << c def default_logger #{{{ if defined?(@default_logger) and @default_logger @default_logger else self.default_logger = Logger.new STDOUT @default_logger.debug{ "<#{ self }> using default logger"} @default_logger end #}}} end def default_logger= log #{{{ @default_logger = log @default_logger.extend LoggerExt @default_logger.logdev.dev.sync = true @default_logger #}}} end end ret #}}} end #}}} end include LogMethods #}}} end # module Logging # # random utilities # require 'pathname' module Util #{{{ class << self def export sym #{{{ sym = "#{ sym }".intern module_function sym public sym #}}} end def append_features c #{{{ super c.extend Util #}}} end end def mcp obj #{{{ Marshal.load(Marshal.dump(obj)) #}}} end export 'mcp' def klass #{{{ self.class #}}} end export 'klass' def realpath path #{{{ path = File.expand_path "#{ path }" begin Pathname.new(path).realpath.to_s rescue Errno::ENOENT path end #}}} end export 'realpath' def hashify(*hashes) #{{{ hashes.inject(accum={}){|accum,hash| accum.update hash} #}}} end export 'hashify' def getopt opt, hash #{{{ opt_s = "#{ opt }" hash[opt] || hash[opt_s] || hash[opt_s.intern] #}}} end export 'getopt' def alive? pid #{{{ pid = Integer("#{ pid }") begin Process.kill 0, pid true rescue Errno::ESRCH false end #}}} end export 'alive?' def maim(pid, opts = {}) #{{{ sigs = getopt('signals', opts) || %w(SIGTERM SIGQUIT SIGKILL) suspend = getopt('suspend', opts) || 4 pid = Integer("#{ pid }") sigs.each do |sig| puts "sending <#{ sig }>" Process.kill(sig, pid) Process.waitpid(pid, Process::WNOHANG | Process::WUNTRACED) rescue nil unless alive?(pid) break else sleep suspend end end not alive?(pid) #}}} end export 'maim' def timestamp time = Time.now #{{{ usec = "#{ time.usec }" usec << ('0' * (6 - usec.size)) if usec.size < 6 time.strftime('%Y-%m-%d %H:%M:%S.') << usec #}}} end export 'timestamp' def stamptime string, local = true #{{{ string = "#{ string }" pat = %r/^\s*(\d\d\d\d)-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d).(\d\d\d\d\d\d)\s*$/o match = pat.match string raise ArgumentError, "<#{ string.inspect }>" unless match yyyy,mm,dd,h,m,s,u = match.to_a[1..-1].map{|m| m.to_i} if local Time.local yyyy,mm,dd,h,m,s,u else Time.gm yyyy,mm,dd,h,m,s,u end #}}} end export 'stamptime' def escape! s, char, esc #{{{ re = %r/([#{0x5c.chr << esc}]*)#{char}/ s.gsub!(re) do (($1.size % 2 == 0) ? ($1 << esc) : $1) + char end #}}} end export 'escape!' def escape s, char, esc #{{{ ss = "#{ s }" escape! ss, char, esc ss #}}} end export 'escape' def fork(*args, &block) #{{{ begin verbose = $VERBOSE $VERBOSE = nil Process.fork(*args, &block) ensure $VERBOSE = verbose end #}}} end export 'fork' def hostname #{{{ @__hostname__ ||= Socket::gethostname #}}} end export 'hostname' def host #{{{ @__host__ ||= Socket::gethostname.gsub(%r/\..*$/o,'') #}}} end export 'host' def emsg e #{{{ "<#{ e.class }> - <#{ e.message }>" #}}} end export 'emsg' def btrace e #{{{ (e.backtrace or []).join("\n") #}}} end export 'btrace' def errmsg e #{{{ emsg(e) << "\n" << btrace(e) #}}} end export 'errmsg' #}}} end # class Util # # queue database class # class QDB #{{{ include Util include Logging FIELDS = #{{{ %w( jid priority state submitted started finished elapsed submitter runner pid exit_status command ) #}}} PRAGMAS = #{{{ <<-sql PRAGMA default_synchronous = FULL; sql #}}} SCHEMA = #{{{ <<-sql create table jobs( jid integer primary key, #{ FIELDS[1..-1].join ",\n" } ); create table attributes( key, value, primary key (key) ); sql #}}} INDEXES = #{{{ FIELDS.inject('') do |s,f| s << "create index jobs_#{ f }_idx on jobs(#{ f });\n" end #}}} class << self #{{{ def t2h tuple #{{{ h = {} FIELDS.each_with_index{|f,i| h[f] = tuple[i]} h #}}} end def h2t h #{{{ t = tuple FIELDS.each{|f| t[f] = h[f]} t #}}} end def tuple #{{{ t = Array.new FIELDS.size t.fields = FIELDS t #}}} end def q tuple #{{{ tuple.map do |f| if f "'" << Util.escape(f,"'","'") << "'" else 'NULL' end end #}}} end def create path, opts = {} #{{{ qdb = new path, opts FileUtils.touch qdb.lockfile qdb.transaction do qdb.execute PRAGMAS qdb.execute SCHEMA qdb.execute INDEXES end qdb #}}} end #}}} end attr :path attr :dirname attr :fields attr :mutex attr :in_transaction attr :lockfile def initialize path, opts = {} #{{{ @path = path @dirname = File.dirname path @waiting_w = File.join(@dirname, "#{ hostname }.#{ $$ }.waiting.w") @waiting_r = File.join(@dirname, "#{ hostname }.#{ $$ }.waiting.r") @lock_w = File.join(@dirname, "#{ hostname }.#{ $$ }.lock.w") @lock_r = File.join(@dirname, "#{ hostname }.#{ $$ }.lock.r") #@lockfile = Lockfile.new File.join(@dirname, 'lock') @lockfile = File.join(@dirname, 'lock') @lockf = Lockfile.new("#{ @path }.lock") @logger = getopt('logger', opts) || Logger.new(STDERR) @fields = FIELDS @in_transaction = false @db = nil #}}} end def connect read_only = false #{{{ ret = nil opened = nil aquire_lock(read_only) do begin @db = SQLite::Database.new @path, 0 opened = true #@db.use_arrayfields = true @db.use_array = true ret = yield @db ensure @db.close if opened @db = nil end end ret #}}} end def aquire_lock read_only = false #{{{ ret = nil waiting, ltype, lfile = if read_only [@waiting_r, File::LOCK_SH, @lock_r] else [@waiting_w, File::LOCK_EX, @lock_w] end ltype_s = (ltype == File::LOCK_EX ? 'write' : 'read') debug{ "aquiring lock" } begin @lockf.lock unless read_only #debug{ "lockf.path <#{ @lockf.path }>" } open(@lockfile, 'a+') do |lf| locked = false begin #debug{ "aquiring lock" } FileUtils.touch waiting locked = lf.posixlock ltype FileUtils.rm_f waiting rescue nil debug{ "aquired lock" } FileUtils.touch lfile ret = yield ensure lf.posixlock File::LOCK_UN if locked rescue nil FileUtils.rm_f waiting rescue nil FileUtils.rm_f lfile rescue nil end end ensure @lockf.unlock unless read_only end debug{ "released lock" } ret #}}} end def transaction read_only = false #{{{ raise 'nested transaction' if @in_transaction ret = nil begin @in_transaction = true connect(read_only) do execute 'begin;' begin ret = yield execute 'commit;' rescue execute 'rollback;' raise end end ensure @in_transaction = false end ret #}}} end def ro_transaction &block #{{{ transaction read_only = true, &block #}}} end def execute sql, &block #{{{ raise 'not in transaction' unless @in_transaction debug{ "executing \n#{ sql.gsub %r/^\s*/o, '' }" } if @logger.debug? retry_if_locked{ @db.execute sql, &block } #}}} end def vacuum #{{{ raise 'nested transaction' if @in_transaction begin @in_transaction = true connect{ execute 'vacuum' } ensure @in_transaction = false end self #}}} end def retry_if_locked #{{{ ret = nil begin ret = yield rescue => e if e.message =~ %r/locked/o debug{ "database locked - waiting(1.0) and retrying" } sleep 1.0 retry else raise end end ret #}}} end #}}} end # # queue class # class Queue #{{{ include Logging include Util class Error < StandardError; end MAX_JID = 2 ** 20 class << self #{{{ def create path, opts = {} #{{{ FileUtils.rm_rf path FileUtils.mkdir_p path db = File.join path, 'db' qdb = QDB.create db, opts opts['qdb'] = qdb q = new path, opts q #}}} end #}}} end attr :path attr :qdb attr :empty alias empty? empty def initialize path, opts = {} #{{{ @path = path raise "q <#{ @path }> does not exist" unless test ?e, @path raise "q <#{ @path }> is not a directory" unless test ?d, @path @logger = getopt('logger', opts) || Logger.new(STDERR) @qdb = getopt('qdb', opts) || QDB.new(File.join(@path, 'db'), 'logger' => @logger) @empty = false #}}} end def submit(priority, *commands) #{{{ now = Util.timestamp Time.now tuples = nil sql = '' commands.each do |command| tuple = QDB.tuple tuple['priority'] = priority tuple['command'] = command tuple['state'] = 'pending' tuple['submitted'] = now tuple['submitter'] = hostname values = QDB.q tuple sql << "insert into jobs values (#{ values.join ',' });\n" end tuples = nil transaction do execute(sql){} if block_given? sql = "select * from jobs where submitted = '#{ now }'" tuples = execute sql end end tuples.each{|t| yield t} if tuples tuples = nil self #}}} end def list(*whats) #{{{ whats.replace(['pending', 'running', 'finished', 'dead']) if whats.empty? or whats.include?('all') whats.map! do |what| case what when %r/^\s*p/io 'pending' when %r/^\s*r/io 'running' when %r/^\s*f/io 'finished' when %r/^\s*d/io 'dead' else what end end where_clause = QDB.q(whats).map{|f| "state = #{ f }\n"}.join(' or ') sql = <<-sql select * from jobs where #{ where_clause } sql puts '---' fields = @qdb.fields tuples = ro_transaction{ execute sql } tuples.each do |tuple| puts '-' fields.each{|f| puts " #{ f }: #{ tuple[f] }" } end tuples = nil self #}}} end def status(*whats) #{{{ whats.replace(['pending', 'running', 'finished', 'dead']) if whats.empty? or whats.include?('all') whats.map! do |what| case what when %r/^\s*p/io 'pending' when %r/^\s*r/io 'running' when %r/^\s*f/io 'finished' when %r/^\s*d/io 'dead' else what end end puts '---' ro_transaction do whats.each do |what| sql = <<-sql select count(*) from jobs where state = '#{ what }'; sql tuples = execute sql tuple = tuples.first count = (tuple ? tuple.first : 0) puts "#{ what } : #{ count }" end end self #}}} end def query where_clause = nil #{{{ # # quote everything on the rhs of an '=' sign - helps with shell problems... # where_clause.gsub!(/(=\s*([^\s')(=]+))/om){q = $2.gsub(%r/'+|\s+/o,''); "='#{ q }'"} sql = if where_clause "select * from jobs where #{ where_clause };" else "select * from jobs;" end puts '---' fields = @qdb.fields tuples = ro_transaction{ execute sql } tuples.each do |tuple| puts '-' fields.each{|f| puts " #{ f }: #{ tuple[f] }"} end tuples = nil self #}}} end def delete(*jids) #{{{ what = jids.first || 'all' case what when String sql = case what when %r/^\s*p/io "delete from jobs where state = 'pending';" when %r/^\s*r/io "delete from jobs where state = 'running';" when %r/^\s*f/io "delete from jobs where state = 'finished';" when %r/^\s*d/io "delete from jobs where state = 'dead';" when %r/^\s*a/io "delete from jobs;" else raise ArgumentError, "cannot delete <#{ what }>" end else sql = '' jids.each do |jid| jid = Integer jid sql << "delete from jobs where jid = #{ jid };" end end transaction{ execute sql } sql = nil @qdb.vacuum self #}}} end def transaction(*args, &block) #{{{ @qdb.transaction(*args, &block) #}}} end def ro_transaction &block #{{{ @qdb.ro_transaction &block #}}} end def execute(*args, &block) #{{{ @qdb.execute(*args, &block) #}}} end def getjob #{{{ sql = <<-sql select * from jobs where state = 'pending' order by priority desc, submitted asc limit 2; sql tuples = execute sql job = tuples.first @empty = (tuples.size == 1) job #}}} end def jobisrunning job #{{{ sql = <<-sql update jobs set pid = '#{ job['pid'] }', state = '#{ job['state'] }', started = '#{ job['started'] }', runner = '#{ job['runner'] }' where jid = #{ job['jid'] }; sql #transaction{ execute sql } execute sql #}}} end def jobisdone job #{{{ sql = <<-sql update jobs set state = '#{ job['state'] }', exit_status = '#{ job['exit_status'] }', finished = '#{ job['finished'] }', elapsed = '#{ job['elapsed'] }' where jid = #{ job['jid'] }; sql #transaction{ execute sql } execute sql #}}} end def mtime #{{{ File.stat(@path).mtime #}}} end def []= key, value #{{{ sql = "select count(*) from attributes where key='#{ key }';" tuples = @qdb.execute sql tuple = tuples.first count = Integer tuple['count(*)'] case count when 0 sql = "insert into attributes values('#{ key }','#{ value }');" @qdb.execute sql when 1 sql = "update attributes set key='#{ key }', value='#{ value }' where key='#{ key }';" @qdb.execute sql else raise "key <#{ key }> has become corrupt!" end #}}} end def attributes #{{{ h = {} tuples = @qdb.execute "select * from attributes;" tuples.map!{|t| h[t['key']] = t['value']} h #}}} end #}}} end # # main program # class Main #{{{ VERSION = '0.1.2' PROGNAM = File.basename(File.expand_path($0)) DEFAULT_CONFIG_PATH = "#{ PROGNAM }.conf" CONFIG_SEARCH_PATH = %w(. ~ /usr/local/etc /usr/etc /etc) EXIT_SUCCESS = 0 EXIT_FAILURE = 1 attr :argv attr :op attr :logger attr :logdev attr :config attr :mode attr :q attr :qpath attr :hostname def initialize argv = cp(ARGV) #{{{ @argv = argv @hostname = Util.hostname parse_opts parse_argv #}}} end def run #{{{ #gen_template and exit EXIT_SUCCESS if @opt_template usage and exit EXIT_SUCCESS if @opt_help init_logging #init_config debug{ "qpath <#{ @qpath }>" } debug{ "mode <#{ @mode }>" } @qpath = Util.realpath @qpath begin case @mode when 'create', 'c' create when 'list', 'l' list when 'status', 't' status when 'submit', 's' submit when 'delete', 'd' delete when 'feed', 'f' feed when 'query', 'q' query when 'conf', 'o' conf when 'help', 'h' usage and exit else raise "unknown mode <#{ @mode || 'no mode' }>" end rescue Errno::EPIPE end #}}} end def set_q create = false #{{{ @q = if create Queue.create @qpath, 'logger' => @logger else raise "q <#{ @qpath }> does not exist" unless test ?e, @qpath Queue.new @qpath, 'logger' => @logger end #}}} end def create #{{{ set_q true debug{ "created <#{ @q.path }>" } #}}} end def list #{{{ set_q @q.list(*@argv) #}}} end def status #{{{ set_q @q.status(*@argv) #}}} end def query #{{{ set_q where_clause = @argv.join ' ' if where_clause.empty? debug{ "reading where_clause from STDIN" } while((buf = STDIN.gets)) buf.strip! buf.gsub! %r/#.*$/o, '' next if buf.empty? where_clause << "#{ buf } " end end @q.query where_clause #}}} end def conf #{{{ set_q unless @argv.empty? kv_pat = %r/^\s*([^\s]+)\s*=+\s*([^\s]+)\s*$/o @argv.each do |arg| match = kv_pat.match arg if match k, v = match[1], match[2] @q[k] = v end end end attributes = @q.attributes y attributes #}}} end def submit #{{{ set_q commands = (@argv.empty? ? [] : [@argv.join(' ')]) if opt_infile or commands.empty? stdin = (opt_infile == '-') || commands.empty? begin f = (stdin ? STDIN : open(opt_infile)) while((line = f.gets)) if f.lineno == 1 and line =~ %r/^---\s*$/o joblist = YAML::load(f.read) joblist.each{|job| commands << job["command"]} if joblist else line.gsub!(%r/(?:^\s+)|(?:\s+$)|(?:#.*$)/o, '') next if line.empty? commands << line debug{ "parsed command <#{ line.inspect }>" } end end ensure f.close unless stdin end end priority = opt_priority || 0 debug{ " priority <#{ priority }>" } if opt_quiet @q.submit(priority, *commands) else puts '---' fields = QDB::FIELDS @q.submit(priority, *commands) do |tuple| puts '-' fields.each{|f| puts " #{ f }: #{ tuple[f] }"} end end commands = nil self #}}} end def delete #{{{ set_q jids = @argv if jids.empty? pat = %r/^\s*jid\s*:\s*(\d+)\s*$/io while((line = STDIN.gets)) match = pat.match line next unless match begin jids << Integer(match[1]) rescue raise "syntax error in <#{ line.inspect }>" end end end if opt_quiet @q.delete(*jids) else @q.delete(*jids) jids.each{|jid| puts "- #{ jid }"} end #}}} end def feed #{{{ @started = Util.timestamp @min_sleep = Integer(opt_min_sleep || 42) @max_sleep = Integer(opt_max_sleep || 240) @max_feed = Integer(opt_feed || 2) @children = {} daemon do set_q gen_pidfile info{ "#{ PROGNAM } STARTED" } debug{ "info <#{ @pidfile.path }>" } fill_morgue loop do throttle(@min_sleep) do debug{ "feed:starting jobs" } start_jobs unless busy? last_time = Time.now debug{ "feed:n_running <#{ @children.size }>" } if nothing_running relax else reap_jobs end end end end #}}} end def fill_morgue #{{{ debug{ "filling morgue" } sql = <<-sql select * from jobs where state = 'running' and runner = '#{ hostname }' and started <= '#{ @started }' sql db = @q.qdb transaction do tuples = db.execute sql tuples.each do |tuple| jid = tuple['jid'] sql = "update jobs set state='dead' where jid='#{ jid }'" db.execute sql info{ "burried job <#{ jid }>" } end end debug{ "filled morgue" } #}}} end def throttle rate = 42 #{{{ if rate if defined? @last_throttle_time and @last_throttle_time elapsed = Time.now - @last_throttle_time timeout = rate - elapsed if timeout > 0 timeout = timeout + rand(rate * 0.10) debug{ "throttle rate of <#{ rate }> exceeded - sleeping <#{ timeout }>" } sleep timeout end end @last_throttle_time = Time.now end yield #}}} end def start_jobs #{{{ n_started = 0 transaction do until busy? break unless((job = @q.getjob)) start_job job n_started += 1 end end debug{ "start_jobs:n_started <#{ n_started }>" } n_started #}}} end def start_job job #{{{ jid = job['jid'] command = job['command'] r,w = IO.pipe cid = Util::fork do w.close STDIN.reopen r exec ['bash', "rq_job__#{ jid }__"], '--login' end r.close w.puts command w.close if cid job['pid'] = cid job['state'] = 'running' job['started'] = Util.timestamp(Time.now) job['runner'] = hostname @children[cid] = job @q.jobisrunning job info{ "jid <#{ job['jid'] }> started as pid <#{ job['pid'] }>" } info{ "command <#{ job['command'] }>" } else warn{ "jid <#{ job['jid'] }> failed to start" } end cid #}}} end def nothing_running #{{{ @children.size == 0 #}}} end def reap_jobs block = true #{{{ reaped = [] if block cid, status = Process.waitpid2 -1, Process::WUNTRACED else cid, status = Process.waitpid2 -1, Process::WNOHANG | Process::WUNTRACED end if cid and status job = @children[cid] finish_job job, status transaction do loop do @q.jobisdone job @children.delete cid reaped << cid start_jobs sleep 0.1 if @children.size == 0 break else cid, status = Process.waitpid2 -1, Process::WNOHANG | Process::WUNTRACED break unless cid and status job = @children[cid] finish_job job, status end end end end debug{ "reap_jobs:n_reaped <#{ reaped.size }>" } reaped #}}} end def finish_job job, status #{{{ job['state'] = 'finished' job['exit_status'] = status.exitstatus rescue nil job['finished'] = Util.timestamp(Time.now) job['elapsed'] = Util.stamptime(job['finished']) - Util.stamptime(job['started']) info{ "jid <#{ job['jid'] }> exit_status <#{ job['exit_status'] }>" } #}}} end def transaction #{{{ ret = nil if @in_transaction ret = yield else begin @in_transaction = true @q.transaction{ ret = yield } ensure @in_transaction = false end end ret #}}} end def busy? #{{{ @children.size >= @max_feed #}}} end def relax #{{{ seconds = rand(@max_sleep - @min_sleep + 1) + @min_sleep debug{ "sleep <#{ seconds }>" } sleep seconds #}}} end def daemon #{{{ if opt_daemon fork do Process.setsid #Process.setpgrp trap 'SIGHUP', 'IGNORE' #trap 'SIGCLD', 'IGNORE' fork do Dir.chdir(Util.realpath('~')) #File.umask 0 open('/dev/null','r+') do |f| STDIN.reopen f STDOUT.reopen f STDERR.reopen f end begin yield rescue Exception => e STDERR.puts "#{ e.msg } - (#{ e.class })" exit EXIT_FAILURE end exit EXIT_SUCCESS end exit! end exit! else yield end #}}} end def gen_pidfile #{{{ name = gen_feeder_name(opt_name || @q.path) @pidfile = begin open name, File::CREAT | File::EXCL | File::RDWR rescue open name, File::RDWR end unless @pidfile and @pidfile.flock(File::LOCK_EX | File::LOCK_NB) pid = IO.read(name) rescue nil pid ||= 'unknown' raise "process <#{ pid }> is already feeding from this queue" else @pidfile.rewind @pidfile.sync = true @pidfile.print Process.pid @pidfile.truncate @pidfile.pos at_exit{ FileUtils.rm_f name rescue nil } end #}}} end def gen_feeder_name path #{{{ path = Util.realpath(path).gsub(%r|/|o, '_') File.join(Util.realpath('~'), ".#{ path }.feeder") #}}} end def timestamp t #{{{ time = Time.now usec = time.usec.to_s usec << '0' while usec.size < 6 time.strftime('%Y-%m-%d %H:%M:%S.') << usec #}}} end def parse_opts #{{{ @op = OptionParser.new @op.banner = '' define_options #begin @op.parse! argv #rescue OptionParser::InvalidOption => e # preverve unknown options #e.recover(argv) #rescue Exception => e #STDERR.puts PROGNAM #STDERR.puts @op #exit 2 #end #}}} end def parse_argv #{{{ @qpath = ENV['RQ_Q'] || @argv.shift @mode = @argv.shift #}}} end def usage io = STDOUT #{{{ io << USAGE io << "\n" io << @op io << "\n" io << EXAMPLES if defined? EXAMPLES self #}}} end def define_options #{{{ options = [ %w(--feed=appetite -f), %w(--priority=priority -p), %w(--name), %w(--daemon -d), %w(--quiet -q), %w(--select -e), %w(--infile=infile -i), %w(--max_sleep=seconds -M), %w(--min_sleep=seconds -m), %w(--log=path -l), %w(--verbosity=0-4|debug|info|warn|error|fatal -v), %w(--log_age=log_age), %w(--log_size=log_size), %w(--config=path -c), %w(--template=template), %w(--help -h), ] options.each do |option| opt = option.first.gsub(%r/(?:--)|(?:=.*$)/o,'').strip get, set = opt_attr opt @op.def_option(*option){|v| self.send(set, (v or true))} end #}}} end %w(debug info warn error fatal).each do |m| eval "def #{ m }(*args,&block);@logger.#{ m }(*args,&block);end" end def gen_template #{{{ Config::gen_template(opt_template) self #}}} end def init_logging #{{{ if @opt_log_age @opt_log_age = @opt_log_age.to_i if @opt_log_age =~ /\d/ end if @opt_log_size @opt_log_size = @opt_log_size.to_i if @opt_log_size =~ /\d/ end $logger = @logger = Logger.new(@opt_log || STDERR, @opt_log_age, @opt_log_size) # # hack to fix Logger sync bug # class << @logger; attr :logdev; end @logdev = @logger.logdev.dev @logdev.sync = true level = nil @opt_verbosity ||= 'info' @opt_verbosity = case @opt_verbosity when /^\s*(?:4|d|debug)\s*$/io level = 'Logging::DEBUG' 4 when /^\s*(?:3|i|info)\s*$/io level = 'Logging::INFO' 3 when /^\s*(?:2|w|warn)\s*$/io level = 'Logging::WARN' 2 when /^\s*(?:1|e|error)\s*$/io level = 'Logging::ERROR' 1 when /^\s*(?:0|f|fatal)\s*$/io level = 'Logging::FATAL' 0 else abort "illegal verbosity setting <#{ @opt_verbosity }>" end @logger.level = 2 - ((@opt_verbosity % 5) - 2) debug {"logging level <#{ level }>"} #}}} end def init_config #{{{ begin $config = @config = nil if @opt_config $config = @config = Config.new(@opt_config) else CONFIG_SEARCH_PATH.each do |dir| path = File.join dir, DEFAULT_CONFIG_PATH if test ?e, path $config = @config = Config.new(path) break end end end $config = @config = Config.new('default') unless @config rescue => e fatal{ e } exit EXIT_FAILURE end debug { "config.path <#{ @config.path }>" } debug { "config <\n#{ PP::pp @config, ''}\n>" } #}}} end def opt_attr opt #{{{ get = "opt_#{ opt }" set = "#{ get }=" code = <<-code class << self def #{ get }; defined?(@#{ get }) ? @#{ get } : nil; end def #{ set } value; @#{ get } = value; end end code instance_eval code [get, set] #}}} end def cp obj #{{{ Marshal.load(Marshal.dump(obj)) #}}} end def pretty obj #{{{ PP::pp obj, '' #}}} end class Config < Hash #{{{ class << self def gen_template(path) #{{{ @template ||= DATA.read case path when String open(path, 'w'){|f| f.write @template} else STDOUT.write @template end self #}}} end def load_default #{{{ @loaded ||= YAML::load(munge(DATA.read)) || {} #}}} end def munge buf #{{{ buf.gsub(%r/\t/o,' ') #}}} end end attr :path def initialize path #{{{ @path = nil yaml = nil if path.nil? or path and path =~ /^\s*default/io yaml = self.class.load_default @path = 'DEFAULT_CONFIG' else path yaml = YAML::load(self.class.munge(open(path).read)) @path = path end self.update yaml #}}} end #}}} end USAGE = #{{{ <<-usage NAME #{ PROGNAM } v#{ VERSION } SYNOPSIS #{ PROGNAM } [queue] mode [mode_args]* [options]* DESCRIPTION #{ PROGNAM } is an __experimental__ tool used to manage nfs mounted work queues. multiple instances of #{ PROGNAM } on multiples hosts can work from these queues to distribute processing load to 'n' nodes - bringing many dozens of otherwise powerful cpus to their knees with a single blow. clearly this software should be kept out of the hands of radicals, SETI enthusiasts, and one mr. jeff safran. #{ PROGNAM } operates in one of the modes create, submit, feed, list, delete, query, or help. depending on the mode of operation and the options used the meaning of mode_args may change, sometime wildly and unpredictably (i jest, of course). MODES modes may be abbreviated to uniqueness, therefore the following shortcuts apply : c => create s => submit f => feed l => list d => delete q => query h => help create, c : creates a queue. the queue MUST be located on an nfs mounted file system visible from all nodes intended to run jobs from it. examples : 0) to create a queue ~ > #{ PROGNAM } q create or simply ~ > #{ PROGNAM } q c list, l : show combinations of pending, running, dead, or finished jobs. for this command mode_args must be one of pending, running, dead, finished, or all. the default is all. mode_args may be abbreviated to uniqueness, therefore the following shortcuts apply : p => pending r => running f => finished d => dead a => all examples : 0) show everything in q ~ > #{ PROGNAM } q list all or ~ > #{ PROGNAM } q l all or ~ > export RQ_Q=q ~ > #{ PROGNAM } l 0) show q's pending jobs ~ > #{ PROGNAM } q list pending 1) show q's running jobs ~ > #{ PROGNAM } q list running 2) show q's finished jobs ~ > #{ PROGNAM } q list finshed submit, s : submit jobs to a queue to be proccesed by any feeding node. any mode_args are taken as the command to run. note that mode_args are subject to shell expansion - if you don't understand what this means do not use this feature. when running in submit mode a file may by specified as a list of commands to run using the '--infile, -i' option. this file is taken to be a newline separated list of commands to submit, blank lines and comments (#) are allowed. if submitting a large number of jobs the input file method is MUCH more efficient. if no commands are specified on the command line #{ PROGNAM } automaticallys reads them from STDIN. yaml formatted files are also allowed as input (http://www.yaml.org/) - note that output of nearly all #{ PROGNAM } commands is valid yaml and may, therefore, be piped as input into the submit command. the '--priority, -p' option can be used here to determine the priority of jobs. priorities may be any number (0, 10]; therefore 9 is the maximum priority. submitting a high priority job will NOT supplant currently running low priority jobs, but higher priority jobs will always migrate above lower priority jobs in the queue in order that they be run sooner. note that constant submission of high priority jobs may create a starvation situation whereby low priority jobs are never allowed to run. avoiding this situation is the responsibility of the user. examples : 0) submit the job ls to run on some feeding host ~ > #{ PROGNAM } q s ls 1) submit the job ls to run on some feeding host, at priority 9 ~ > #{ PROGNAM } -p9 q s ls 2) submit 42000 jobs (quietly) to run from a command file. ~ > wc -l cmdfile 42000 ~ > #{ PROGNAM } q s -q < cmdfile 3) submit 42 jobs to run at priority 9 from a command file. ~ > wc -l cmdfile 42 ~ > #{ PROGNAM } -p9 q s < cmdfile 4) re-submit all finished jobs ~ > #{ PROGNAM } q l f | #{ PROGNAM } q s feed, f : take jobs from the queue and run them on behalf of the submitter. jobs are taken from the queue in an 'oldest highest priority' order. feeders can be run from any number of nodes allowing you to harness the CPU power of many nodes simoultaneously in order to more effectively clobber your network. the most useful method of feeding from a queue is to do so in daemon mode so that if the process loses it's controling terminal and will not exit when you exit your terminal session. use the '--daemon, -d' option to accomplish this. by default only one feeding process per host per queue is allowed to run at any given moment. because of this it is acceptable to start a feeder at some regular interval from a cron entry since, if a feeder is alreay running, the process will simply exit and otherwise a new feeder will be started. in this way you may keep feeder processing running even acroess machine reboots. examples : 0) feed from a queue verbosely for debugging purposes, using a minimum and maximum polling time of 2 and 4 respectively ~ > #{ PROGNAM } q feed -v4 -m2 -M4 1) feed from a queue in daemon mode logging into /home/ahoward/rq.log ~ > #{ PROGNAM } q feed -d -l/home/ahoward/rq.log 2) use something like this sample crontab entry to keep a feeder running forever (it attempts to (re)start every fifteen minutes) # # your crontab file # */15 * * * * /full/path/to/bin/rq /full/path/to/nfs/mounted/q f -d -l/home/user/rq.log log rolling while running in daemon mode is automatic. delete, d : delete combinations of pending, running, finished, dead, or specific jobs. the delete mode is capable of parsing the output of list mode, making it possible to create filters to delete jobs meeting very specific conditions. mode_args are the same as for 'list', including 'running'. note that it is possible to 'delete' a running job, but there is no way to actually STOP it mid execution since the node doing the deleteing has no way to communicate this information to the (possibly) remote execution host. therefore you should use the 'delete running' feature with care and only for housekeeping purposes or to prevent future jobs from being scheduled. examples : 0) delete all pending, running, and finished jobs from a queue ~ > #{ PROGNAM } q d all 1) delete all pending jobs from a queue ~ > #{ PROGNAM } q d p 2) delete all finished jobs from a queue ~ > #{ PROGNAM } q d f 3) delete jobs via hand crafted filter program ~ > #{ PROGNAM } q list | filter_prog | #{ PROGNAM } q d query, q : query exposes the database more directly the user, evaluating the where clause specified on the command line (or from STDIN). this feature can be used to make a fine grained slection of jobs for reporting or as input into the delete command. you must have a basic understanding of SQL syntax to use this feature, but it is fairly intuitive in this capacity. examples: 0) show all jobs submitted within a specific 10 minute range ~ > #{ PROGNAM } q query "started >= '2004-06-29 22:51:00' and started < '2004-06-29 22:51:10'" 1) shell quoting can be tricky here so input on STDIN is also allowed ~ > cat contraints started >= '2004-06-29 22:51:00' and started < '2004-06-29 22:51:10' ~ > #{ PROGNAM } q query < contraints or (same thing) ~ > cat contraints | #{ PROGNAM } q query 2) this query output may then be used to delete specific jobs ~ > cat contraints | #{ PROGNAM } q query | #{ PROGNAM } q d 3) show all jobs which are either finished or dead ~ > #{ PROGNAM } q q state=finished or state=dead NOTES - realize that your job is going to be running on a remote host and this has implication. paths, for example, should be absolute, not relative. specifically the submitted job must be visible from all hosts currently feeding from a q. - you need to consider __CAREFULLY__ what the ramifications of having multiple instances of your program all running at the same time will be. it is beyond the scope of #{ PROGNAM } to ensure multiple instances of a program will not overwrite each others output files, for instance. coordination of programs is left entirely to the user. - the list of finished jobs will grow without bound unless you sometimes delete some (all) of them. the reason for this is that #{ PROGNAM } cannot know when the user has collected the exit_status, etc. from a job and so keeps this information in the queue until instructed to delete it. - if you are using the crontab feature to maintain an immortal feeder on a host then that feeder will be running in the environment provided by cron. this is NOT the same environment found in a login shell and you may be suprised at the range of commands which do not function. if you want submitted jobs to behave as closely as possibly to their behaviour when typed interactively you'll need to wrap each job in a shell script that looks like the following: #/bin/bash --login commmands_for_your_job and submit that script ENVIRONMENT RQ_Q: full path to queue the queue argument to all commands may be omitted if, and only if, the environment variable 'RQ_Q' contains the full path to the q. eg. ~ > export RQ_Q=/full/path/to/my/q this feature can save a considerable amount of typing for those weak of wrist DIAGNOSTICS success => $? == 0 failure => $? != 0 AUTHOR ara.t.howard@noaa.gov BUGS 1 < bugno && bugno <= 42 OPTIONS usage #}}} EXAMPLES = #{{{ <<-examples examples #}}} #}}} end if $0 == __FILE__ Main.new.run end __END__ # # default config # key : value