#!/usr/bin/env ruby # # === the rq program # # the rq program is the single command line interface by which all queue # operations are affected. it always takes, as it's first argument, the name of # the queue to be operated on. the second argument is always the mode of # operation. the action taken and meaning of subsequent arguments depends # directory on the mode of operation. for example the command # # rq queue create # # has the the mode _create_ and will create the queue _queue_. similarly the # command # # rq queue submit my_job.sh huge_input_file.dat # # runs in _submit_ mode and will sumbit a job to _queue_. # # run # # rq --help # # or see README # # for the detailed instructions for each of the operation modes # require 'rq-2.2.0' module RQ # # the Main class is responsible for parsing command line paramters and # switches, doing some validation, initializing logging, and, ultimately, # delegating the bulk of the work to a MainHelper based on the _mode_ given. # the relationship between Main and MainHelper is a tight one by design - the # primary purpose of it being to prevent the Main class from becoming 10000 # lines long. the delegators used include: # # * Creator # * Submitter # * Lister # * StatusLister # * Deleter # * Updater # * Querier # * Executor # * Configurator # * Snapshotter # * Locker # * Backer # * Rotater # * Feeder # class Main #--{{{ include Util include Logging include Usage # an enumeration of option specifications used to parse command line OPTSPEC = #--{{{ [ [ '--priority=priority', '-p', 'modes : set the job(s) priority - lowest(0) .. highest(n) - (default 0)' ], [ '--tag=tag', '-t', 'modes : set the job(s) user data tag' ], [ '--runner=runner', 'modes : set the job(s) required runner(s)' ], [ '--restartable', 'modes : set the job(s) to be restartable on node reboot' ], [ '--infile=infile', 'modes : infile' ], [ '--quiet', '-q', 'modes : do not echo submitted jobs, fail silently if another process is already feeding' ], [ '--daemon', '-D', 'modes : spawn a daemon' ], [ '--max_feed=max_feed', 'modes : the maximum number of concurrent jobs run' ], # [ # '--name=name', # 'set the feeder name - (default q path)' # ], [ '--retries=retries', 'modes : specify transaction retries' ], [ '--min_sleep=min_sleep', 'modes : specify min sleep' ], [ '--max_sleep=max_sleep', 'modes : specify max sleep' ], [ '--snapshot', '-s', 'operate on snapshot of queue' ], [ '--verbosity=verbostiy', '-v', '0|fatal < 1|error < 2|warn < 3|info < 4|debug - (default info)' ], [ '--log=path','-l', 'set log file - (default stderr)' ], [ '--log_age=log_age', 'daily | weekly | monthly - what age will cause log rolling (default nil)' ], [ '--log_size=log_size', 'size in bytes - what size will cause log rolling (default nil)' ], # [ # '--config=path', # 'valid path - specify config file (default nil)' # ], # [ # '--template=[path]', # 'valid path - generate a template config file in path (default stdout)' # ], [ '--help', '-h', 'this message' ], [ '--version', 'show version number' ], ] #--}}} # the default config file searched for has this basename CONFIG_DEFAULT_PATH = 'rq.conf' # config files are searched for using this list of locations CONFIG_SEARCH_PATH = %w( . ~ /dmsp/reference/etc /usr/local/etc /usr/etc /etc ) # the queue can be specified in the environment Q = ENV['RQ_Q'] || ENV['RQ_QUEUE'] attr :logger attr :argv attr :env attr :cmd attr :options attr :qpath attr :mode attr :q attr :daemon # given a command line and environment run the rq program def initialize argv = ARGV, env = ENV #--{{{ begin @logger = Logger::new STDERR @argv = Util::mcp(argv.to_a) @env = Util::mcp(env.to_hash) @cmd = ([$0] + @argv).join(' ') parse_options if(@options.has_key?('help') or @argv.include?('help')) usage('port' => STDOUT, 'long' => true) exit EXIT_SUCCESS end if(@options.has_key?('template') or (idx = @argv.index('template'))) gen_template(@options['template'] || @argv[idx + 1]) exit EXIT_SUCCESS end if @options.has_key?('version') puts RQ::VERSION exit EXIT_SUCCESS end parse_argv status = run case status when Integer exit status else exit(status ? EXIT_SUCCESS : EXIT_FAILURE) end rescue => e unless SystemExit === e logerr e exit EXIT_FAILURE else exit e.status end end #--}}} end # extract command lines args def parse_argv #--{{{ @qpath = Q || @argv.shift @mode = @argv.shift #--}}} end # select a MainHelper based on mode and delegate to it def run #--{{{ @qpath = Util::realpath @qpath if @mode.nil? or @mode.strip.empty? usage 'port' => STDERR, 'long' => false exit EXIT_FAILURE end shortcuts = { 'c' => 'create', 's' => 'submit', 'l' => 'list', 'ls' => 'list', 't' => 'status', 'd' => 'delete', 'rm' => 'delete', 'u' => 'update', 'q' => 'query', 'e' => 'execute', 'C' => 'configure', 'S' => 'snapshot', 'L' => 'lock', 'b' => 'backup', 'r' => 'rotate', 'h' => 'help', 'f' => 'feed', } if((longmode = shortcuts[@mode])) @mode = longmode end begin case @mode when 'create' create when 'submit' submit when 'list' list when 'status' status when 'delete' delete when 'update' update when 'query' query when 'execute' execute when 'configure' configure when 'snapshot' snapshot when 'lock' lock when 'backup' backup when 'rotate' rotate when 'help' usage 'port' => STDOUT, 'long' => true exit EXIT_SUCCESS when 'feed' feed when 'start' start when 'shutdown' shutdown when 'stop' stop when 'feeder' feeder else raise "invalid mode <#{ @mode }>" end rescue Errno::EPIPE => e raise if STDOUT.tty? end #--}}} end # delegated to a Creator def create #--{{{ init_logging creator = Creator::new self creator.create #--}}} end # delegated to a Submitter def submit #--{{{ init_logging submitter = Submitter::new self submitter.submit #--}}} end # delegated to a Lister def list #--{{{ init_logging lister = Lister::new self lister.list #--}}} end # delegated to a StatusLister def status #--{{{ init_logging statuslister = StatusLister::new self statuslister.statuslist #--}}} end # delegated to a Deleter def delete #--{{{ init_logging deleter = Deleter::new self deleter.delete #--}}} end # delegated to a Updater def update #--{{{ init_logging updater = Updater::new self updater.update #--}}} end # delegated to a Querier def query #--{{{ init_logging querier = Querier::new self querier.query #--}}} end # delegated to a Executor def execute #--{{{ init_logging executor = Executor::new self executor.execute #--}}} end # delegated to a Configurator def configure #--{{{ init_logging configurator = Configurator::new self configurator.configure #--}}} end # delegated to a Snapshotter def snapshot #--{{{ init_logging snapshotter = Snapshotter::new self snapshotter.snapshot #--}}} end # delegated to a Locker def lock #--{{{ init_logging locker = Locker::new self locker.lock #--}}} end # delegated to a Backer def backup #--{{{ init_logging backer = Backer::new self backer.backup #--}}} end # delegated to a Rotater def rotate #--{{{ init_logging rotater = Rotater::new self rotater.rotate #--}}} end # delegated to a Feeder def feed #--{{{ feeder = Feeder::new self feeder.feed #--}}} end # def start #--{{{ unless exists @options['daemon'] = true @options['log'] ||= File::join(ENV['HOME'], File::basename(@qpath) + ".log") feeder = Feeder::new self feeder.feed end #--}}} end def shutdown #--{{{ exists and signal_feeder 'TERM' #--}}} end def stop #--{{{ exists and signal_feeder 'KILL' #--}}} end def feeder #--{{{ puts "feeder <#{ exists }>" #--}}} end def exists #--{{{ begin signal_feeder 0 rescue Errno::ESRCH false end #--}}} end def signal_feeder sig #--{{{ feeder = Feeder::new self pidfilepath = feeder.gen_pidfilepath pid = Integer(IO::read(pidfilepath)) rescue nil begin Process::kill(sig, pid) pid rescue nil end #--}}} end # uses OPTSPEC to parse command line switches def parse_options #--{{{ @op = OptionParser.new @options = {} OPTSPEC.each do |spec| k = spec.first.gsub(%r/(?:--)|(?:=.*$)|(?:\s+)/o,'') @op.def_option(*spec){|v| @options[k] = v} end @op.parse! @argv @options #--}}} end # initialize logging object - all classes then use this object def init_logging #--{{{ log, log_age, log_size, verbosity = @options.values_at 'log', 'log_age', 'log_size', 'verbosity' log_age = atoi log_age rescue nil log_size = atoi log_size rescue nil $logger = @logger = Logger::new(log || STDERR, log_age, log_size) # # hack to fix Logger sync bug # @logger.class.instance_eval do attr :logdev unless @logger.respond_to?(:logdev) end @logdev = @logger.logdev.dev @logdev.sync = true level = nil verbosity ||= 'info' verbosity = case verbosity when /^\s*(?:4|d|debug)\s*$/io level = 'Logging::DEBUG' 4 when /^\s*(?:3|i|info)\s*$/io level = 'Logging::INFO' 3 when /^\s*(?:2|w|warn)\s*$/io level = 'Logging::WARN' 2 when /^\s*(?:1|e|error)\s*$/io level = 'Logging::ERROR' 1 when /^\s*(?:0|f|fatal)\s*$/io level = 'Logging::FATAL' 0 else abort "illegal verbosity setting <#{ verbosity }>" end @logger.level = 2 - ((verbosity % 5) - 2) #debug {"logging level <#{ level }>"} @logger #--}}} end # initialize configuration file - not currenlty utilized def init_config #--{{{ @config = if @options['config'] ConfigFile::new(@options['config']) else ConfigFile::any CONFIG_DEFAULT_PATH, CONFIG_SEARCH_PATH end debug { "config.path <#{ @config.path }>" } @config #--}}} end # generate a template/sample config file which can then be edited def gen_template template #--{{{ ConfigFile::gen_template(template) self #--}}} end #--}}} end end # # run main program unless included as a library (testing purposes) # RQ::Main::new(ARGV, ENV) if $0 == __FILE__