#!/dmsp/reference/bin/ruby # # builtin libs # require 'optparse' require 'yaml' require 'thread' require 'tempfile' # # ara's utility lib # require 'alib' # # # raa - http://raa.ruby-lang.org # require 'fileutils' require 'arrayfields' require 'sqlite' require 'posixlock' require 'lockfile' # # simple sleep cycle class # class SleepCycle < Array #{{{ attr :min attr :max attr :range attr :inc def initialize min, max, inc #{{{ @min, @max, @inc = Float(min), Float(max), Float(inc) @range = @max - @min raise RangeError, "max < min" if @max < @min raise RangeError, "inc > range" if @inc > @range s = @min push(s) and s += @inc while(s <= @max) self[-1] = @max if self[-1] < @max reset #}}} end def next #{{{ ret = self[@idx] @idx = ((@idx + 1) % self.size) ret #}}} end def reset #{{{ @idx = 0 #}}} end #}}} end # # examine this class carefully for holdovers from QDB # class DB #{{{ include ALib include ALib::Util include ALib::Logging FIELDS = #{{{ %w( path dev ino mode nlink uid gid rdev size blksize blocks atime mtime ctime ) #}}} PRAGMAS = #{{{ <<-sql PRAGMA default_synchronous = FULL; sql #}}} SCHEMA = #{{{ <<-sql create table paths ( #{ FIELDS.join ",\n " }, primary key (path) ); create table attributes ( key, value, primary key (key) ); sql #}}} INDEXES = #{{{ FIELDS.inject('') do |s,f| s << "create index paths_#{ f }_idx on paths(#{ f });\n" end #}}} class << self #{{{ include ALib include ALib::Util def fields #{{{ FIELDS #}}} end def integrity_check dbpath #{{{ ret = false tuple = nil begin db = SQLite::Database.new dbpath, 0 opened = true db.use_array = true tuple = db.execute 'PRAGMA integrity_check;' ret = (tuple and tuple.first and (tuple.first["integrity_check"] =~ /^\s*ok\s*$/io)) ensure db.close if opened db = nil end ret #}}} end def t2h tuple #{{{ h = {} FIELDS.each_with_index{|f,i| h[f] = tuple[i]} h #}}} end def h2t h #{{{ t = tuple FIELDS.each{|f| t[f] = h[f]} t #}}} end def tuple #{{{ t = Array.new FIELDS.size t.fields = FIELDS t #}}} end def q tuple #{{{ tuple.map do |f| if f "'" << Util.escape(f,"'","'") << "'" else 'NULL' end end #}}} end # # hack to integreate auto conf file creation # def create path, opts = {} #{{{ db = new path, opts FileUtils::touch db.lockfile conf = File::join(db.dirname, 'dirwatch.conf') ConfigFile::gen_template(conf) unless File::exist? conf create_schema db.schema db.transaction do db.execute PRAGMAS db.execute SCHEMA db.execute INDEXES end db #}}} end def create_schema path #{{{ tmp = "#{ path }.tmp" open(tmp,'w') do |f| f.puts PRAGMAS f.puts SCHEMA f.puts INDEXES end FileUtils::mv tmp, path #}}} end #}}} end attr :path attr :opts attr :dirname attr :schema attr :fields attr :mutex attr :in_transaction attr :lockfile attr :sql_debug, true def initialize path, opts = {} #{{{ @path = path @opts = opts @logger = getopt('logger', @opts) || Logger::new(STDERR) @sql_debug = getopt('sql_debug', @opts) || ENV['SQLDEBUG'] || false @transaction_retries = getopt('transaction_retries', @opts) || 16 @schema = "#{ @path }.schema" @dirname = File.dirname(path).gsub(%r|/+\s*$|o,'') @basename = File.basename(path) @waiting_w = File.join(@dirname, "#{ hostname }.#{ $$ }.waiting.w") @waiting_r = File.join(@dirname, "#{ hostname }.#{ $$ }.waiting.r") @lock_w = File.join(@dirname, "#{ hostname }.#{ $$ }.lock.w") @lock_r = File.join(@dirname, "#{ hostname }.#{ $$ }.lock.r") #@lockfile = Lockfile.new File.join(@dirname, 'lock') @lockfile = File.join(@dirname, 'lock') @lockf = Lockfile.new("#{ @path }.lock") @fields = FIELDS @in_transaction = false @db = nil @lockd_recover = "#{ @dirname }.lockd_recover" @lockd_recover_lockf = Lockfile.new "#{ @lockd_recover }.lock" @lockd_recovered = false @aquire_lock_sc = SleepCycle.new 0.42, 42, 0.42 @transaction_retries_sc = SleepCycle.new 0.42, 42, 0.42 #}}} end def ro_transaction &block #{{{ transaction read_only = true, &block #}}} end def transaction read_only = false #{{{ raise 'nested transaction' if @in_transaction ret = nil begin @in_transaction = true lockd_recover_wrap do transaction_wrap(read_only) do aquire_lock(read_only) do #sillyclean(read_only) do connect do execute 'begin' unless read_only ret = yield execute 'commit' unless read_only end #end end end end ensure @in_transaction = false end ret #}}} end def lockd_recover_wrap #{{{ ret = nil try_again = false begin begin @lockd_recovered = false old_mtime = begin Util::uncache @lockd_recover File.stat(@lockd_recover).mtime rescue Time.now end ret = yield ensure new_mtime = begin Util::uncache @lockd_recover File.stat(@lockd_recover).mtime rescue old_mtime end if new_mtime and old_mtime and new_mtime > old_mtime and not @lockd_recovered try_again = true end end rescue if try_again warn{ "a remote lockd recovery has invalidated this transaction!" } warn{ "retrying..."} sleep 120 retry else raise end end ret #}}} end def transaction_wrap read_only = false #{{{ ret = nil if read_only ret = yield else errors = [] @transaction_retries_sc.reset begin ret = yield rescue => e #rescue SQLite::DatabaseException, SQLite::SQLException, SystemCallError => e if errors.size >= @transaction_retries error{ "MAXIMUM TRANSACTION RETRIES SURPASSED" } #errors[0...-1].each{|err| error{ err }} raise else warn{ e } if(errors.empty? or not Util::erreq(errors[-1], e)) errors << e warn{ "retry <#{ errors.size }>..." } end sleep @transaction_retries_sc.next retry end end ret #}}} end def sillyclean read_only = false, dir = @dirname #{{{ ret = nil if read_only ret = yield else glob = File.join dir,'.nfs*' orgsilly = Dir[glob] ret = yield newsilly = Dir[glob] silly = newsilly - orgsilly silly.each{|path| FileUtils::rm_rf path} end ret #}}} end def aquire_lock read_only = false #{{{ ret = nil @aquire_lock_sc.reset waiting, ltype, lfile = if read_only [@waiting_r, File::LOCK_SH | File::LOCK_NB, @lock_r] else [@waiting_w, File::LOCK_EX | File::LOCK_NB, @lock_w] end ltype_s = (ltype == File::LOCK_EX ? 'write' : 'read') ltype ||= File::LOCK_NB aquired = false until aquired begin debug{ "aquiring lock" } #@lockf.lock unless read_only open(@lockfile, 'a+') do |lf| locked = false refresher = nil sc = nil begin FileUtils.touch waiting # poll 42.times do locked = lf.posixlock(ltype | File::LOCK_NB) break if locked sleep rand end if locked aquired = true refresher = Thread.new do loop do FileUtils.touch @lockfile sleep 10 end end FileUtils.rm_f waiting rescue nil FileUtils.touch lfile debug{ "aquired lock" } ret = yield debug{ "released lock" } else aquired = false stat = File.stat @lockfile mtime = stat.mtime stale = mtime < (Time.now - (60 * 30)) lockd_recover if stale sc = @aquire_lock_sc.next debug{ "failed to aquire lock - sleep(#{ sc })" } sleep sc end ensure if locked unlocked = false begin 42.times do unlocked = lf.posixlock(File::LOCK_UN | File::LOCK_NB) break if unlocked sleep rand end ensure lf.posixlock File::LOCK_UN unless unlocked end end refresher.kill if refresher FileUtils.rm_f waiting rescue nil FileUtils.rm_f lfile rescue nil end end ensure #@lockf.unlock rescue nil unless read_only end end ret #}}} end def connect #{{{ ret = nil opened = nil begin raise 'db has no schema' unless test ?e, @schema $db = @db = SQLite::Database.new(@path, 0) opened = true @db.use_array = true ret = yield @db ensure @db.close if opened $db = @db = nil end ret #}}} end def execute sql, &block #{{{ raise 'not in transaction' unless @in_transaction if @sql_debug logger << "SQL:\n#{ sql }\n" end #ret = retry_if_locked{ @db.execute sql, &block } ret = @db.execute sql, &block if @sql_debug and ret and ret.first logger << "RESULT:\n#{ ret.first.inspect }\n...\n" end ret #}}} end def uncache #{{{ Util::uncache @dirname rescue nil Util::uncache @path rescue nil Util::uncache @lockfile rescue nil Util::uncache @lockd_recover rescue nil #}}} end def retry_if_locked #{{{ ret = nil begin ret = yield rescue SQLite::BusyException warn{ "database locked - waiting(1.0) and retrying" } sleep 1.0 retry end ret #}}} end def vacuum #{{{ raise 'nested transaction' if @in_transaction begin @in_transaction = true connect{ execute 'vacuum' } ensure @in_transaction = false end self #}}} end def lockd_recover #{{{ warn{ "attempting lockd recovery" } time = Time.now ret = nil @lockd_recover_lockf.lock do uncache mtime = File.stat(@lockd_recover).mtime rescue time if mtime > time warn{ "skipping lockd recovery (another node has already recovered)" } ret = true else moved = false begin FileUtils.touch @lockd_recover @lockd_recovered = false begin report = <<-msg hostname : #{ Util::hostname } pid : #{ Process.pid } time : #{ Time.now } q : path : #{ @dirname } stat : #{ File.stat(@dirname).inspect } db : path : #{ @path } stat : #{ File.stat(@path).inspect } lockfile : path : #{ @lockfile } stat : #{ File.stat(@lockfile).inspect } msg info{ "LOCKD RECOVERY REPORT" } logger << report cmd = "mail -s LOCKD_RECOVERY ara.t.howard@noaa.gov <" } klass.integrity_check(path) #}}} end #}}} end # # unify File::Stat and DB::tuple # class Stat < Array #{{{ include ALib::Util FIELDS = #{{{ %w( dev ino mode nlink uid gid rdev size blksize blocks atime mtime ctime ) #}}} IFIELDS, TFIELDS = FIELDS.partition{|f| f !~ /time/} FIELDS.each do |f| #{{{ eval <<-code def #{ f }; self['#{ f }']; end def #{ f }= f; self['#{ f }'] = f; end code #}}} end def initialize arg #{{{ self.fields = FIELDS case arg when File::Stat FIELDS.each do |f| field = arg.send f raise "<#{ arg.inspect }> missing field <#{ f }>" unless field self[f] = field end else FIELDS.each do |f| field = arg[f] raise "<#{ arg.inspect }> missing field <#{ f }>" unless field self[f] = field end IFIELDS.each do |f| self[f] = Integer self[f] end TFIELDS.each do |f| field = self[f] unless Time === field self[f] = stamptime field end end end #}}} end def inspect #{{{ buf = "#<#{ self.class } " << 'dev=0x%x, ' % self['dev'] << 'ino=%d, ' % self['ino'] << 'mode=%6.6o, ' % self['mode'] << 'nlink=%d, ' % self['nlink'] << 'uid=%o, ' % self['uid'] << 'gid=%o, ' % self['gid'] << 'rdev=0x%x, ' % self['rdev'] << 'size=%d, ' % self['size'] << 'blksize=%d, ' % self['blksize'] << 'blocks=%d, ' % self['blocks'] << 'atime=%s, ' % timestamp(self['atime']) << #'atime=%s, ' % self['atime'] << 'mtime=%s, ' % timestamp(self['mtime']) << #'mtime=%s, ' % self['mtime'] << 'ctime=%s' % timestamp(self['ctime']) << #'ctime=%s, ' % self['ctime'] << ">" #}}} end def dup #{{{ self.class::new self #}}} end alias clone dup def tuple #{{{ t = self.dup %w( atime mtime ctime ).each do |f| time = t[f] t[f] = timestamp(time) if Time === time end t #}}} end #}}} end # # main program class # class Main #{{{ include ALib include ALib::Util include ALib::Logging VERSION = '0.0.2' AUTHOR = 'ara.t.howard@noaa.gov' PROGNAM = prognam CONFIG_DEFAULT_PATH = "#{ PROGNAM }.conf" CONFIG_SEARCH_PATH = %w( .dirwatch/ . ) USAGE = #{{{ <<-usage NAME #{ PROGNAM } v#{ VERSION } SYNOPSIS #{ PROGNAM } [options]+ [directory] [mode] DESCRIPTTION #{ PROGNAM } maintain a database that mirrors the prior state of a directory. using this database it is able to determine when files have been created, modified, deleted, or were pre-existing and to trigger commands on those conditions. the mode must be one of create or c => initialize database watch or w => maintain database and take actions list or l => dump database to stdout if directory and mode are omitted the defaults of '.' and 'watch' are used. #{ PROGNAM } creates/maintains a database which will be stored in 'directory/.dirwatch/db' using the config specified on the command line or the one found in 'directory/.dirwatch/dirwatch.conf' if it exists #{ PROGNAM } uses the config's enumeration of the actions to be taken when files are discovered to have been created, modified, updated, deleted or existing to determine which commands to run. the action section of the config contains sections for each of actions created, modified, updated, deleted, and existing of the form - command: echo @file pattern: .*\.txt where the token '@file' or '$file' will be replaced with the currently processed file and the command run iff it matches pattern. pattern is not required, in which case the command will always be run for that action. in order to assist programs in knowing whether they are running due a created, modified, deleted, or existing action the environment variable 'DIRWATCH_ACTION' will be set in the child process to contain the type of action. additionally the environment var 'DIRWATCH_PATH' will be set to contain the full path to the entry in question. this sounds more complicated than it is and this is compounded by the fact that i can't seem to explain it : better to just try running ~ > #{ PROGNAM } -t to view a sample config, which should be rather self explanatory. ENVIRONMENT SQLDEBUG=true -> cause sql debugging info to be logged CONFIG default path -> #{ CONFIG_DEFAULT_PATH } search path -> #{ CONFIG_SEARCH_PATH.join ' ' } DIAGNOSTICS success -> $? == 0 failure -> $? != 0 AUTHOR #{ AUTHOR } BUGS > 1 usage #}}} OPTSPEC = #{{{ [ [ '--pattern=pattern', '-p', 'watch only files matching pattern (not shell glob!)' ], [ '--files_only', '-f', 'ignore everything but files - (default directories and files)' ], [ '--flat', '-F', 'do not recurse into subdirectories - (default recurse)' ], [ '--help', '-h', 'this message' ], [ '--verbosity=verbostiy', '-v', '0|fatal < 1|error < 2|warn < 3|info < 4|debug - (default info)' ], [ '--log=path','-l', 'set log file - (default stderr)' ], [ '--log_age=log_age', 'daily | weekly | monthly - what age will cause log rolling (default nil)' ], [ '--log_size=log_size', 'size in bytes - what size will cause log rolling (default nil)' ], [ '--config=path', '-c', 'valid path - specify config file (default nil)' ], [ '--template=[path]', 'valid path - generate a template config file in path (default stdout)' ], ] #}}} EXAMPLES = #{{{ <<-examples EXAMPLES 0) initialize a directory for watching ~ > #{ PROGNAM } dir create 1) create a config (edit afterwards) ~ > #{ PROGNAM } -t > conf 2) watch a dir taking actions described in config ~ > #{ PROGNAM } dir watch -c conf 3) watch a dir taking actions described in config, but not recursing into any subdirectories ~ > #{ PROGNAM } dir watch -c conf --flat 4) watch a dir taking actions described in config, ignoring all but regular files (ignoring directories, device files, etc.) ~ > #{ PROGNAM } dir watch -c conf --files_only 5) watch a dir taking actions described in config, ignoring all entries (directory or file) except those that match pattern ~ > #{ PROGNAM } dir watch -c conf --pattern='\.(OIS|OIF)$' note that the config allows a pattern to be specified for each command to be run; the command line switch, however, is applied before any command specific pattern filters. 6) run using all defaults in the current directory ~ > #{ PROGNAM } 7) dump contents of database in yaml format ~ > #{ PROGNAM } dir list examples #}}} EXIT_SUCCESS = 0 EXIT_FAILURE = 1 attr :logger attr :argv attr :env attr :options attr :op attr :logdev attr :verbosity attr :config def initialize argv = ARGV, env = ENV #{{{ begin @logger = Logger::new STDERR @argv = mcp(argv.to_a) @env = mcp(env.to_hash) parse_options if @options['help'] usage STDOUT exit EXIT_SUCCESS end if @options.has_key? 'template' gen_template @options['template'] exit EXIT_SUCCESS end if @argv.include? 'help' usage STDOUT exit EXIT_SUCCESS end parse_argv default_config = File::join @dir, '.dirwatch/dirwatch.conf' if File::file?(default_config) and not @options['config'] init_config 'config' => default_config else init_config end %w( log log_age log_size verbosity pattern flat files_only ).each do |key| if @config[key] and not @options[key] @options[key] = @config[key] end end init_logging if @config.path =~ %r/^\s*default\s*$/io warn { "you are using the default config!" } end debug { "config <#{ @config.path }>" } #debug { "config\n#{ @config.to_hash.to_yaml }\n" } exit run rescue => e unless SystemExit === e begin fatal{ logger.debug? ? e : emsg(e) } exit EXIT_FAILURE rescue end raise else exit e.status end end #}}} end def usage port = STDERR #{{{ if defined? USAGE port << USAGE << "\n" end if defined? OPTSPEC port << 'OPTIONS' << "\n" OPTSPEC.each do |os| a, b, c = os long, short, desc = nil [a,b,c].each do |word| next unless word word.strip! case word when %r/^--[^-]/o long = word when %r/^-[^-]/o short = word else desc = word end end spec = ((long and short) ? [long, short] : [long]) if spec port << columnize(spec.join(', '), 80, 2) port << "\n" end if desc port << columnize(desc, 80, 8) port << "\n" end end port << "\n" end if defined? EXAMPLES port << EXAMPLES << "\n" end port #}}} end def parse_options #{{{ @op = OptionParser.new @options = {} OPTSPEC.each do |spec| k = spec.first.gsub(%r/(?:--)|(?:=.*$)|(?:\s+)/o,'') @op.def_option(*spec){|v| @options[k] = v} end #begin op.parse! @argv #rescue OptionParser::InvalidOption => e # preverve unknown options #e.recover(argv) #end @options #}}} end # # hack here for safe mutithread/process logging # require 'thread' require 'posixlock' def init_logging opts = (@options || {}) #{{{ log = getopt 'log', opts log_age = getopt 'log_age', opts log_size = getopt 'log_size', opts verbosity = getopt 'verbosity', opts log ||= STDERR log_age = Integer(log_age) if log_age log_size = Integer(log_size) if log_size $logger = @logger = Logger::new(log, log_age, log_size) # # small hack to fix Logger sync bug # class << @logger; attr :logdev unless @logger.respond_to?(:logdev); end @logdev = @logger.logdev.dev @logdev.sync = true # # small hack to add locking when log is a file # if File === @logdev class << @logdev alias __write__ write def write(*a,&b) @logmutex ||= Mutex::new @logmutex.synchronize do begin posixlock File::LOCK_EX __write__(*a,&b) ensure posixlock File::LOCK_UN rescue nil end end end end end level = nil verbosity ||= 'info' verbosity = case verbosity when String case verbosity when %r/^\s*(?:4|d|debug)\s*$/ level = 'Logging::DEBUG' 4 when %r/^\s*(?:3|i|info)\s*$/ level = 'Logging::INFO' 3 when %r/^\s*(?:2|w|warn)\s*$/ level = 'Logging::WARN' 2 when %r/^\s*(?:1|e|error)\s*$/ level = 'Logging::ERROR' 1 when %r/^\s*(?:0|f|fatal)\s*$/ level = 'Logging::FATAL' 0 else raise "illegal verbosity setting <#{ verbosity }>" end when Fixnum verbosity else raise "illegal verbosity argument <#{ verbosity } (#{ verbosity.class }>" end @logger.level = 2 - ((verbosity % 5) - 2) debug {"logging level <#{ level }>"} @logger #}}} end def init_config opts = (@options || {}) #{{{ @config = if opts['config'] ConfigFile::new(opts['config']) else ConfigFile::any CONFIG_DEFAULT_PATH, CONFIG_SEARCH_PATH end #debug { "config.path <#{ @config.path }>" } #debug { "config\n#{ @config.to_hash.to_yaml }\n" } @config #}}} end def gen_template template #{{{ ConfigFile::gen_template(template) self #}}} end def parse_argv #{{{ @dir = @argv.shift || '.' @mode = @argv.shift || 'watch' #}}} end def run #{{{ debug{ "dir <#{ @dir }>" } debug{ "mode <#{ @mode }>" } case @mode when %r/^\s*c(?:reate)?/io create when %r/^\s*w(?:atch)?/io watch when %r/^\s*l(?:ist)?/io list else raise "unknown mode <#{ @mode }>" end return EXIT_SUCCESS #}}} end def create #{{{ @dbdir = File::join @dir, '.dirwatch' raise "<#{ @dbdir }> exists!" if test ?e, @dbdir debug{ "dbdir <#{ @dbdir }>" } FileUtils::mkdir_p @dbdir @dbpath = File::join @dbdir, 'db' @db = DB::create @dbpath, 'logger' => @logger #}}} end def watch #{{{ # # setup dbdir # @dbdir = File::join @dir, '.dirwatch' unless test ?e, @dbdir fatal{ "<#{ @dbdir }> does not exist" } exit EXIT_FAILURE end debug{ "dbdir <#{ @dbdir }>" } # # setup dbpath # @dbpath = File::join @dbdir, 'db' raise "<#{ @dbpath }> does not exists" unless test ?e, @dbpath debug{ "dbpath <#{ @dbpath }>" } # # setp db # @db = DB::new @dbpath, 'logger' => @logger # # modifiers # flat = @options['flat'] debug{ "flat <#{ flat ? 'true' : 'false' }>" } files_only = @options['files_only'] debug{ "files_only <#{ files_only ? 'true' : 'false' }>" } pattern = @options['pattern'] if pattern begin pattern = Regexp::new pattern debug{ "pattern <#{ pattern.source }>" } rescue raise "bad pattern <#{ pattern.source }>" end else debug{ "pattern <>" } end # # init # @runners = {} @actions = @config['actions'] || {} created = deleted = modified = existing = updated = nil exception = nil sql = '' opaths = {} npaths = {} glob = @options['flat'] ? "#{ @dir }/*" : "#{ @dir }/**/*" # # ignore pretty much all but kill -9 # %w( INT TERM HUP ).each{|sig| trap sig, 'SIG_IGN'} # # update db # @db.transaction do # # slurp db in # tuples = @db.execute 'select * from paths' begin # # generate list of old paths # tuples.each do |tuple| stat = Stat::new tuple path = tuple.shift opaths[path] = stat end # # generate list of new paths # paths = Dir[glob] paths.each do |path| next if path =~ /dirwatch/ next if files_only and not File::file?(path) next if pattern and not pattern.match(path) path = realpath path begin npaths[path] = Stat::new(File::stat(path)) rescue Errno::ESTALE, Errno::ENOENT, Errno::EACCES => e warn{ emsg e } next end end # # generate state # oldpaths = opaths.keys.sort newpaths = npaths.keys.sort created = newpaths - oldpaths deleted = oldpaths - newpaths common = oldpaths & newpaths modified = [] existing = [] common.each do |path| ostat = opaths[path] nstat = npaths[path] if nstat.mtime > ostat.mtime modified << path else existing << path end end updated = created + modified # # handle created # debug{ '** PROCESSING CREATED ENTRIES **' } created.each do |path| run_commands path, 'created' tuple = [path, npaths[path].tuple].flatten values = DB::q(tuple).join(',') statement = "insert into paths values(#{ values });\n" sql << statement end run_filters created, 'created' # # handle modified # debug{ '** PROCESSING MODIFIED ENTRIES **' } modified.each do |path| run_commands path, 'modified' tuple = [path, npaths[path].tuple].flatten values = DB::q(tuple).join(',') statement = "delete from paths where path='#{ path }';\n" sql << statement statement = "insert into paths values(#{ values });\n" sql << statement end run_filters modified, 'modified' # # handle updated # debug{ '** PROCESSING UPDATED ENTRIES **' } updated.each do |path| run_commands path, 'updated' tuple = [path, npaths[path].tuple].flatten values = DB::q(tuple).join(',') statement = "delete from paths where path='#{ path }';\n" sql << statement statement = "insert into paths values(#{ values });\n" sql << statement end run_filters updated, 'updated' # # handle deleted # debug{ '** PROCESSING DELETED ENTRIES **' } deleted.each do |path| run_commands path, 'deleted' statement = "delete from paths where path='#{ path }';\n" sql << statement end run_filters deleted, 'deleted' # # handle existing # debug{ '** PROCESSING EXISTING ENTRIES **' } existing.each do |path| run_commands path, 'existing' end run_filters existing, 'existing' rescue => e exception = e end # # commit updates # @db.execute sql unless sql.empty? end # # reap runners # threads = @runners.keys threads.map do |t| t.join cmd = @runners[t] value = t.value unless value error{ "the runner for <#{ cmd }> has exploded!"} end end # # finally, if an exception was thrown raise it here # if exception fatal{ exception } exit EXIT_FAILURE end #}}} end def list #{{{ # # setup dbdir # @dbdir = File::join @dir, '.dirwatch' raise "<#{ @dbdir }> does not exist" unless test ?e, @dbdir debug{ "dbdir <#{ @dbdir }>" } # # setup dbpath # @dbpath = File::join @dbdir, 'db' raise "<#{ @dbpath }> does not exists" unless test ?e, @dbpath debug{ "dbpath <#{ @dbpath }>" } # # setp db # @db = DB::new @dbpath, 'logger' => @logger # # list db # tuples = @db.ro_transaction{ @db.execute 'select * from paths' } puts '---' tuples.each do |tuple| puts " -" DB::FIELDS.each{|f| puts " #{ f }: #{ tuple.shift }"} end #}}} end def run_commands path, action #{{{ conf = @actions[action] return if conf.nil? return if conf.empty? commands = conf['commands'] return if commands.nil? return if commands.empty? debug{ "checking commands for <#{ path }>" } cmds = [] commands.each do |entry| command = entry['command'] debug{ "candidate command <#{ command }>" } pattern = entry['pattern'] if pattern begin pattern = Regexp::new pattern debug{ "pattern <#{ pattern.source }>" } if pattern.match(path) debug{ "match" } else debug{ "nomatch" } next end rescue warn{ "bad pattern <#{ pattern }>" } next end else debug{ "pattern <>" } end cmd = command.gsub %r/\$file|@file/o, path cmds << cmd end @tid ||= 0 cmds.each do |cmd| tid = Float "#{ $$ }.#{ @tid }" @tid += 1 thread = Thread::new(cmd, tid) do this = "ACTION.#{ action.upcase }.#{ tid }" info{ "#{ this } - cmd <#{ cmd }>" } env = { 'DIRWATCH_ACTION' => action, 'DIRWATCH_PATH' => path, 'DIRWATCH_PID' => $$, } status = spawn cmd, env info{ "#{ this } - exit_status <#{ status }>"} end @runners[thread] = cmd end #}}} end def run_filters paths, action #{{{ conf = @actions[action] return if conf.nil? return if conf.empty? filters = conf['filters'] return if filters.nil? return if filters.empty? cmds = [] filters.each do |entry| cmd = entry['command'] debug{ "candidate filter <#{ cmd }>" } fpaths = nil pattern = entry['pattern'] if pattern begin pattern = Regexp::new pattern debug{ "pattern <#{ pattern.source }>" } fpaths = paths.grep pattern debug{ "matches <#{ fpaths.size }>" } rescue => e warn{ "bad pattern <#{ pattern }>" } warn{ Util::emsg e } next end else debug{ "pattern <>" } fpaths = paths end next if fpaths.empty? or fpaths.nil? debug{"fpaths <#{ fpaths }>"} cmds << [cmd, fpaths] end @tid ||= 0 cmds.each do |cmd, fpaths| tid = Float "#{ $$ }.#{ @tid }" @tid += 1 thread = Thread::new(cmd, tid) do this = "ACTION.#{ action.upcase }.#{ tid }" tf = Tempfile::new tid fpaths.each{|fp| tf.puts fp} tf.close info{ "#{ this } - cmd <#{ cmd }> < #{ tf.path }" } env = { 'DIRWATCH_ACTION' => action, 'DIRWATCH_PID' => $$, } status = spawn "#{ cmd } < #{ tf.path }", env info{ "#{ this } - exit_status <#{ status }>"} tf.close! end @runners[thread] = cmd end #}}} end # # TODO - is ?$ threadsafe????? # def spawn cmd, env = {} #{{{ commands = [] commands << 'bash --login <<__bash_commands__' env.map{|k, v| commands << "export #{ k }='#{ v }'"} commands << cmd commands << '__bash_commands__' #debug{ "spawning <#{ command.inspect }>" } system commands.join("\n") $?.exitstatus #}}} end #}}} end # # run main program unless included as lib # Main::new if $0 == __FILE__ # # the default config is stored here # __END__ # # sample dirwatch config - identation and format are important - you almost # certainly NOT want to use this as your config # # # here are all possible actions enumerated # actions: # # we list only one command for created files which just prints the filename # created : commands : - command: echo "detected created file <@file>" # # we list two commands for modified files the first command simply prints the # file the second lists the file, but only if it is a .txt file # modified : commands : - command: echo "detected modified file <@file>" - command: ls -l @file pattern: ^.*\.txt$ # # we list only one command for updated files which just prints the filename # note that the updated action fires for BOTH created and modified files so, if # you use this action, you probably don't want the same action in either created # or modified sections. here we have an example of a filter, if a filter is # specifed the list of ALL updated (created, etc.) files will be given to that # command on STDIN. this is useful if the list of files will be very large, or # if some atomic action should be taken with all files at once. patterns can # also be applied to filters, as the below shows (though the pattern matches ALL # files ;-) ) # updated : commands : - command: echo "detected updated file <@file>" filters : - command : while read x;do echo "(filter) updated <\$x>";done pattern : ^.*$ # # here we see how a shell script might access the environment vars # DIRWATCH_ACTION and DIRWATCH_PATH - which will be set for all child processes # run. this is very useful for the 'delete' action since, of course, the file # will not exist anymore when the command is run. the @file substitution still # works as well # deleted : commands : - command: echo "action <$DIRWATCH_ACTION> path <$DIRWATCH_PATH>" # # ~ is shorthand for do nothing. you could do something with existing files, # such as remove ones older than some age in order to implement a tmpwatch kind # of functionality # existing : ~