require 'drb/drb'
require 'fileutils'
require 'tmpdir'
require 'tempfile'
require 'fcntl'

#
# the Slave class encapsulates the work of setting up a drb server in another
# process running on localhost.  the slave process is attached to it's parent
# via a Heartbeat which is designed such that the slave cannot out-live it's
# parent and become a zombie, even if the parent dies and early death, such as
# by 'kill -9'.  the concept and purpose of the Slave class is to be able to
# setup any server object in another process so easily that using a
# multi-process, drb/ipc, based design is as easy, or easier, than a
# multi-threaded one.  eg
#
#   class Server
#     def add_two n
#       n + 2
#     end
#   end
#  
#   slave = Slave.new Server.new
#   server = slave.object
#  
#   p server.add_two(40) #=> 42
#
  class Slave
#--{{{
    VERSION = '0.0.1'
  #
  # config
  #
    DEFAULT_SOCKET_CREATION_ATTEMPTS =
      Integer(ENV['SLAVE_SOCKET_CREATION_ATTEMPTS'] || 42)

    DEFAULT_PULSE_RATE = 
      Float(ENV['SLAVE_PULSE_RATE'] || 8)

    DEFAULT_DEBUG = 
      (ENV['SLAVE_DEBUG'] ? true : false)

    @socket_creation_attempts = DEFAULT_SOCKET_CREATION_ATTEMPTS
    @pulse_rate = DEFAULT_PULSE_RATE
    @debug = DEFAULT_DEBUG
  #
  # class methods
  #
    class << self
#--{{{
      # defineds how many attempts will be made to create a temporary unix domain
      # socket
      attr :socket_creation_attempts, true

      # defined the rate of pinging in the Heartbeat object
      attr :pulse_rate, true

      # if this is true and you are running from a terminal information is printed
      # on STDERR
      attr :debug, true

      # look up a value in an option hash failing back to class defaults
      def getval key, opts = {}
#--{{{
        keys = [key, key.to_s, key.to_s.intern]
        keys.each{|k| return opts[k] if opts.has_key?(k)}
        send key rescue nil
#--}}}
      end
      # just fork with out silly warnings
      def fork &block
#--{{{
        v = $VERBOSE
        begin
          $VERBOSE = nil
          Process::fork &block
        ensure
        $VERBOSE = v
        end
#--}}}
      end
#--}}}
    end

    attr :object
    attr :obj
    attr :psname
    attr :pid
    attr :ppid
    attr :uri
    attr :pulse_rate
    attr :socket
    attr :debug
    attr :status

  #
  # 'obj' can be any object and 'opts' may contain the keys
  # 'socket_creation_attempts', 'pulse_rate', 'psname', or 'debug'
  #
    def initialize obj = nil, opts = {}, &block
#--{{{
      @obj = obj

      @socket_creation_attempts = getval('socket_creation_attempts', opts)
      @pulse_rate = getval('pulse_rate', opts)
      @debug = getval('debug', opts)
      @psname = getval('psname', opts) || gen_psname(@obj)

      trace{ "socket_creation_attempts <#{ @socket_creation_attempts }>" }
      trace{ "pulse_rate <#{ @pulse_rate }>" }
      trace{ "psname <#{ @psname }>" }

      @shutdown = false
      @waiter = @status = nil

      @heartbeat = Heartbeat::new @pulse_rate, @debug
      @r, @w = IO::pipe
    #
    # child
    #
      unless((@pid = Slave::fork))
        e = nil
        begin
          $0 = @psname 
          @pid = Process::pid
          @ppid = Process::ppid

          @r.close
          @socket = nil
          @uri = nil

          tmpdir = Dir::tmpdir
          basename = File::basename @psname

          @socket_creation_attempts.times do |attempt|
            begin
              s = File::join(tmpdir, "#{ basename }_#{ attempt }")
              u = "drbunix://#{ s }"
              DRb::start_service u, obj 
              @socket = s
              @uri = u
              trace{ "child - socket <#{ @socket }>" }
              trace{ "child - uri <#{ @uri }>" }
              break
            rescue Errno::EADDRINUSE
              nil
            end
          end

          if @socket and @uri
            @heartbeat.start
            @w.write @socket 
            @w.close
            trap('SIGUSR2') do
              # @heartbeat.stop rescue nil
              DBb::thread.kill rescue nil
              FileUtils::rm_f @socket rescue nil
              exit!
            end
            block[obj] if block
            DRb::thread.join
          else
            @w.close
          end
        rescue Exception => e
          trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
        ensure
          status = e.respond_to?('status') ? e.status : 1
          exit!(status)
        end
    #
    # parent 
    #
      else
        #Process::detach @pid
        detach
        @w.close
        @socket = @r.read
        @r.close

        trace{ "parent - socket <#{ @socket }>" }

        if @socket and File::exist? @socket
          at_exit{ FileUtils::rm_f @socket }
          @uri = "drbunix://#{ socket }"
          trace{ "parent - uri <#{ @uri }>" }
          @heartbeat.start
        #
        # starting drb on localhost avoids dns lookups!
        #
          DRb::start_service('druby://localhost:0', nil) unless DRb::thread
          @object = DRbObject::new nil, @uri
        else
          raise "failed to find slave socket <#{ @socket }>"
        end
      end
#--}}}
    end
  #
  # starts a thread to attempt collecting the child status
  #
    def detach
#--{{{
      @waiter =
        Thread.new{ @status = Process::waitpid2(@pid).last }
#--}}}
    end
  #
  # wait for slave to finish
  #
    def wait
#--{{{
      @waiter.value
#--}}}
    end
    alias :wait2 :wait
  #
  # stops the heartbeat thread and kills the child process
  #
    def shutdown
#--{{{
      raise "already shutdown" if @shutdown
      @heartbeat.stop rescue nil
      Process::kill('SIGUSR2', @pid) rescue nil
      Process::kill('SIGTERM', @pid) rescue nil
      FileUtils::rm_f @socket
      @shutdown = true
#--}}}
    end
  #
  # generate a default name to appear in ps/top
  #
    def gen_psname obj
#--{{{
      "#{ obj.class }_slave_of_#{ Process::pid }".downcase.gsub(%r/\s*/,'_')
#--}}}
    end
  #
  # see docs for Slave.getval
  #
    def getval key, opts = {}
#--{{{
      self.class.getval key
#--}}}
    end
  #
  # debugging output - ENV['SLAVE_DEBUG']=1 to enable
  #
    def trace
#--{{{
      STDERR.puts(yield) if @debug and STDERR.tty?
#--}}}
    end

  #
  # the Heartbeat class is essentially wrapper over an IPC channel that sends
  # a ping on the channel indicating process health.  if either end of the
  # channel is detached the ping will fail and an error will be raised.  in
  # this way it is ensured that Slave objects cannot continue to live without
  # their parent being alive.
  #
    class Heartbeat
#--{{{
      def initialize pulse_rate = 4.2, debug = false
#--{{{
        @pulse_rate = Float pulse_rate
        @debug = debug
        @r, @w = IO::pipe
        @pid = Process::pid
        @ppid = Process::ppid
        @cid = nil
        @thread = nil
        @ppid = nil
        @whoami = nil
        @beating = nil
        @pipe = nil
#--}}}
      end
      def start 
#--{{{
        if Process::pid == @pid
          @w.close
          @pipe = @r
          @pipe.fcntl Fcntl::F_SETFD, Fcntl::FD_CLOEXEC
          parent_start
        else
          @r.close
          @pipe = @w
          child_start
        end
        @beating = true 
#--}}}
      end
      def parent_start
#--{{{
        @whoami = 'parent' 
        @thread =
          Thread::new(Thread::current) do |cur|
            begin
              loop do 
                buf = @pipe.gets
                trace{ "<#{ @whoami }> <#{ @pid }> gets <#{ buf.inspect }>" }
                @cid = Integer buf.strip if @cid.nil? and buf =~ %r/^\s*\d+\s*$/
              end
            rescue => e
              cur.raise e
            ensure
              @pipe.close rescue nil
            end
          end
#--}}}
      end
      def child_start
#--{{{
        @whoami = 'child' 
        @pid = Process::pid
        @ppid = Process::ppid
        @thread =
          Thread::new(Thread::current) do |cur|
            begin
              loop do
                trace{ "<#{ @whoami }> <#{ @pid }> puts <#{ @pid }>" }
                @pipe.puts @pid 
                Process::kill 0, @ppid
                sleep @pulse_rate 
              end
            rescue => e
              cur.raise e
            ensure
              @pipe.close rescue nil
            end
          end
#--}}}
      end

      def start 
#--{{{
        if Process::pid == @pid
          @r.close
          @pipe = @w
          @pipe.fcntl Fcntl::F_SETFD, Fcntl::FD_CLOEXEC
          parent_start
        else
          @w.close
          @pipe = @r
          child_start
        end
        @beating = true 
#--}}}
      end
      def parent_start
#--{{{
        @whoami = 'parent' 
        @thread =
          Thread::new(Thread::current) do |cur|
            begin
              sleep
            rescue => e
              cur.raise e
            ensure
              @pipe.close rescue nil
            end
          end
#--}}}
      end
      def child_start
#--{{{
        @whoami = 'child' 
        @pid = Process::pid
        @ppid = Process::ppid
        @thread =
          Thread::new(Thread::current) do |cur|
            begin
            trace{ "child reading..." }
              @pipe.read
            trace{ "child read." }
            trace{ "child exiting." }
            exit!
            rescue => e
              cur.raise e
            ensure
              @pipe.close rescue nil
            end
          end
#--}}}
      end
      def stop 
#--{{{
        raise "not beating" unless @beating
        @thread.kill
        @pipe.close rescue nil
        @beating = false
#--}}}
      end
      def trace
#--{{{
        STDERR.puts(yield) if @debug and STDERR.tty?
#--}}}
      end
#--}}}
    end # class Heartbeat
#--}}}
  end # class Slave 
