# HG changeset patch # User Michael Granger # Date 1563829601 25200 # Mon Jul 22 14:06:41 2019 -0700 # Node ID f51c605123ecea444cc5fa391cb1224dcb883e09 # Parent 6ef7226dc8264487604e35173bb647dd34b9019b Add more work on the worker, Hg hook, etc. diff --git a/Gemfile b/Gemfile deleted file mode 100644 --- a/Gemfile +++ /dev/null @@ -1,2 +0,0 @@ -source "https://rubygems.org/" -gemspec diff --git a/Gemfile.devel b/Gemfile.devel deleted file mode 100644 --- a/Gemfile.devel +++ /dev/null @@ -1,7 +0,0 @@ -source "https://rubygems.org/" -gemspec - -gem 'hglib', path: '../hglib' -gem 'cztop-reactor', path: '../CZTop-Reactor' -gem 'pry' - diff --git a/Rakefile b/Rakefile --- a/Rakefile +++ b/Rakefile @@ -33,7 +33,7 @@ spec.dependency 'loggability', '~> 0.11' spec.dependency 'configurability', '~> 3.2' - spec.dependency 'hglib', '~> 0.1' + spec.dependency 'hglib', '~> 0.2' spec.dependency 'git', '~> 1.3' spec.dependency 'gli', '~> 2.18' spec.dependency 'tty-prompt', '~> 0.18' diff --git a/data/assemblage/hooks/mercurial-hook.rb b/data/assemblage/hooks/mercurial-hook.rb --- a/data/assemblage/hooks/mercurial-hook.rb +++ b/data/assemblage/hooks/mercurial-hook.rb @@ -1,85 +1,51 @@ -#!/usr/bin/env ruby +#!/usr/bin/env rvm 2.6.3@ci do ruby + +BEGIN { + $stderr.puts( RUBY_VERSION ) + $stderr.puts "Setting CWD to %p" % [ __dir__ ] + Dir.chdir( __dir__ ) +} require 'bundler/setup' require 'pathname' require 'hglib' -require 'assemblage' -require 'assemblage/cli' +require 'assemblage/client' BASEDIR = Pathname( __FILE__ ).dirname CONFIG = BASEDIR + 'config.yml' +EVENT_MAP = { + 'changegroup' => 'commit', + 'commit' => 'commit', + 'incoming' => 'commit', + 'txnclose-bookmark' => 'bookmark', + 'txnclose-phase' => 'phase', + 'pushkey' => 'pushkey', + 'tag' => 'tag' +} Assemblage.load_config( CONFIG ) +Loggability[ Assemblage ].level = :debug client = Assemblage::Client.new_by_name( 'server' ) hgenv = ENV. - filter {|k,v| k.starts_with?('HG_') }. - each_with_object( {} ) do |hash, (name, val)| + filter {|k,v| k.start_with?('HG_') }. + each_with_object( {} ) do |(name, val), hash| hash[ name.downcase.to_sym ] = val end -$stderr.puts "Publishing a %p event." % [ hgenv[:hg_hooktype] ] -case hgenv[ :hg_hooktype ] - -# Run after a changegroup has been added via push, pull or unbundle. The ID of -# the first new changeset is in "$HG_NODE" and last is in "$HG_NODE_LAST". The -# URL from which changes came is in "$HG_URL". -when 'changegroup' - client.send_message( :event, 'commit', hgenv[:hg_last_node], **hgenv ) +repo_name = ARGV.shift || File.basename( hgenv[:hg_url] ) -# Run after a changeset has been created in the local repository. The ID of -# the newly created changeset is in "$HG_NODE". Parent changeset IDs are in -# "$HG_PARENT1" and "$HG_PARENT2". -when 'commit' - client.send_message( :event, 'commit', hgenv[:hg_node], **hgenv ) - -# Run after a changeset has been pulled, pushed, or unbundled into the local -# repository. The ID of the newly arrived changeset is in "$HG_NODE". The URL -# that was source of the changes is in "$HG_URL". -when 'incoming' - client.send_message( :event, 'commit', hgenv[:hg_node], **hgenv ) +if (event_name = EVENT_MAP[ hgenv[:hg_hooktype] ]) + $stderr.puts "Publishing a %p event." % [ hgenv[:hg_hooktype] ] -# Run after any bookmark change has been committed. At this point, the -# transaction can no longer be rolled back. The hook will run after the lock is -# released. The name of the bookmark will be available in "$HG_BOOKMARK", the -# new bookmark location will be available in "$HG_NODE" while the previous -# location will be available in "$HG_OLDNODE". In case of a bookmark creation -# "$HG_OLDNODE" will be empty. In case of deletion "$HG_NODE" will be empty. In -# addition, the reason for the transaction opening will be in "$HG_TXNNAME", and -# a unique identifier for the transaction will be in "HG_TXNID". -when 'txnclose-bookmark' - client.send_message( :event, 'bookmark', hgenv[:hg_node], hgenv[:hg_bookmark], **hgenv ) - -# Run after any phase change has been committed. At this point, the transaction -# can no longer be rolled back. The hook will run after the lock is released. -# The affected node is available in "$HG_NODE", the phase in "$HG_PHASE" while -# the previous "$HG_OLDPHASE". In case of new node, "$HG_OLDPHASE" will be -# empty. In addition, the reason for the transaction opening will be in -# "$HG_TXNNAME", and a unique identifier for the transaction will be in -# "HG_TXNID". The hook is also run for newly added revisions. In this case the -# "$HG_OLDPHASE" entry will be empty. -when 'txnclose-phase' - client.send_message( :event, 'phase', hgenv[:hg_node], hgenv[:hg_oldphase], - hgenv[:hg_phase], **hgenv ) - -# Run after a pushkey (like a bookmark) is added to the repository. The key -# namespace is in "$HG_NAMESPACE", the key is in "$HG_KEY", the old value (if -# any) is in "$HG_OLD", and the new value is in "$HG_NEW".; -when 'pushkey' - client.send_message( :event, 'pushkey', hgenv[:hg_namespace], hgenv[:hg_key], - hgenv[:hg_new], **hgenv ) - -# Run after a tag is created. The ID of the tagged changeset is in "$HG_NODE". -# The name of tag is in "$HG_TAG". The tag is local if "$HG_LOCAL=1", or in the -# repository if "$HG_LOCAL=0". -when 'tag' - if hgenv[:hg_local] == '0' - client.send_message( :event, 'tag', hgenv[:hg_node], hgenv[:hg_tag], **hgenv ) + client.send_message( :event, [event_name, repo_name, hgenv] ) + client.start do |type, data, **header| + $stderr.puts "%p response: %p" % [ type, data ] + if type == :disconnect || type == :error + client.stop + end end +else + $stderr.puts "No event mapping for the %s hook." % [ hgenv[:hg_hooktype] ] end -client.start do |type, data, **header| - $stderr.puts "%s response: %p" % [ type, data ] -end - - diff --git a/lib/assemblage/assembly_builder.rb b/lib/assemblage/assembly_builder.rb --- a/lib/assemblage/assembly_builder.rb +++ b/lib/assemblage/assembly_builder.rb @@ -106,10 +106,9 @@ ### Builder stage: Clone the assembly's repo. def clone_repo( result ) - repo_url = self.repository[ :url ] or raise "No repository URL for this assembly?!" + repo_url = self.repository[ :url ] or raise "No repository URL for this assembly?!" repo_type = self.repository[ :type ] or raise "No repository type set for this assembly." - repo_name = self.repository[ :connection_name ] || self.name - work_dir = Dir.mktmpdir( [self.name, "assemblies"], self.class.work_dir ) + work_dir = Dir.mktmpdir( [self.name, "assemblies"], self.class.work_dir ) vcs = Assemblage::VCSStrategy.get_subclass( repo_type ) vcs.clone( repo_url, work_dir ) diff --git a/lib/assemblage/auth.rb b/lib/assemblage/auth.rb --- a/lib/assemblage/auth.rb +++ b/lib/assemblage/auth.rb @@ -52,16 +52,6 @@ ## # :singleton-method: - # Configurable: If +true+, allow new clients to register themselves by - # connecting with a valid CURVE certificate. If this is set to +false+, - # only connections which present one of the certs from #cert_store_dir are - # allowed. - setting :allow_registration, default: true do |value| - value ? true : false - end - - ## - # :singleton-method: # Configurable: an array of allowed IP addresses. If this is not set, any connection # which presents valid CURVE authentication credentials is allowed. setting :allowed_ips, default: [] do |value| @@ -261,18 +251,21 @@ singleton_method_alias :connection_type_for_cert, :connection_type_for - ### Return a configured CZTop::CertStore pointing to the configured #data_dir - def self::authenticator + ### Return a configured CZTop::CertStore pointing to the configured #data_dir. If + ### +allow_registration+ is +true+, the authenticator will be configured to allow + ### connections with unregistered certs instead of restricting them to ones that + ### have already been registered. + def self::authenticator( allow_registration: false ) return @authenticator ||= begin self.log.info "Creating CURVE authenticator." auth = CZTop::Authenticator.new - if (certs_dir = self.remote_certs_path) && ! self.allow_registration + if (certs_dir = self.remote_certs_path) && ! allow_registration self.log.info "Using remote certs dir %s for auth." % [ certs_dir ] certs_dir.mkpath auth.curve( certs_dir.to_s ) else - self.log.warn "Using ALLOW_ANY remote curve auth." + self.log.warn "Using ALLOW_ANY remote curve auth (registration mode)." auth.curve end diff --git a/lib/assemblage/client.rb b/lib/assemblage/client.rb --- a/lib/assemblage/client.rb +++ b/lib/assemblage/client.rb @@ -237,8 +237,7 @@ ### Send a message to the server. - def send_message( verb, *data, **header ) - data.flatten!( 1 ) + def send_message( verb, data, **header ) self.input_queue << Assemblage::Protocol.encode( verb.to_sym, data, **header ) self.register_for_writing if self.reactor.registered?( self.socket ) end diff --git a/lib/assemblage/command/client.rb b/lib/assemblage/command/client.rb --- a/lib/assemblage/command/client.rb +++ b/lib/assemblage/command/client.rb @@ -37,7 +37,7 @@ client.handle_error_message( data, **header ) client.stop end - client.send_message( :register_worker, name, cert.public_key ) + client.send_message( :register_worker, [name, cert.public_key] ) client.start end end diff --git a/lib/assemblage/command/server.rb b/lib/assemblage/command/server.rb --- a/lib/assemblage/command/server.rb +++ b/lib/assemblage/command/server.rb @@ -200,8 +200,13 @@ END_DESC server.arg :DIRECTORY, :optional server.command :start do |start| + start.switch [:r, :registration], + desc: "Run in registration mode.", + long_desc: "Run in a mode that allows new clients to be registered instead " \ + "of normal authentication." + start.action do |globals, options, args| - Assemblage::Server.run( args.shift ) + Assemblage::Server.run( args.shift, register_mode: options[:r] ) end end diff --git a/lib/assemblage/command/worker.rb b/lib/assemblage/command/worker.rb --- a/lib/assemblage/command/worker.rb +++ b/lib/assemblage/command/worker.rb @@ -46,9 +46,9 @@ create.flag [:N, :name], type: String, must_match: Assemblage::Auth::CLIENT_NAME_PATTERN - create.desc "Specify one or more tags that indicate what assemblies " + + create.desc "Specify one or more traits that indicate what assemblies " + "the worker should accept" - create.flag [:t, :tags], type: Array + create.flag [:t, :traits], type: Array create.action do |globals, options, args| directory, server_url, server_key = *args @@ -67,10 +67,10 @@ server_url = URI( server_url ) name = options.name || prompt_for_name( directory ) - tags = options.tags || prompt_for_tags() + traits = options.traits || prompt_for_tags() prompt.say "Creating a worker run directory in %s..." % [ directory ] - Assemblage::Worker.setup_run_directory( directory, name, tags ) + Assemblage::Worker.setup_run_directory( directory, name, traits ) prompt.say "Generating a cert..." Assemblage::Worker.generate_cert( name ) or @@ -162,7 +162,7 @@ end - ### Ask the user for one or more tags for a new worker, returning them as an + ### Ask the user for one or more traits for a new worker, returning them as an ### Array of Strings. def prompt_for_tags prompt.say( "Tags (one per line):" ) @@ -173,13 +173,13 @@ modify: %i[remove] } - tags = [] + traits = [] while input = prompt.ask( '>>', options ) break if input == '' - tags << input + traits << input end - return tags + return traits end end # module Assemblage::CLI::AddServer diff --git a/lib/assemblage/protocol.rb b/lib/assemblage/protocol.rb --- a/lib/assemblage/protocol.rb +++ b/lib/assemblage/protocol.rb @@ -60,6 +60,7 @@ ### Encode a message of the specified +type+ and return it as a CZTop::Message. def encode( command, data, **header ) + self.log.debug "Encode %p message with data: %p, header: %p" % [ command, data, header ] header = DEFAULT_HEADER.merge( symbolify_keys(header) ) Assemblage::Protocol.check_message_header( header ) diff --git a/lib/assemblage/publisher.rb b/lib/assemblage/publisher.rb --- a/lib/assemblage/publisher.rb +++ b/lib/assemblage/publisher.rb @@ -99,7 +99,7 @@ client.handle_error_message( data, **header ) client.stop end - client.send_message( :register_publisher, name, cert.public_key ) + client.send_message( :register_publisher, [name, cert.public_key] ) client.start end diff --git a/lib/assemblage/server.rb b/lib/assemblage/server.rb --- a/lib/assemblage/server.rb +++ b/lib/assemblage/server.rb @@ -20,7 +20,7 @@ # # This gathers events from repositories and dispatches them to workers via one # or more "assemblies". An assembly is the combination of a repository and one -# or more tags that describe pre-requisites for building a particular product. +# or more traits that describe pre-requisites for building a particular product. class Assemblage::Server extend Loggability, Configurability, @@ -182,16 +182,18 @@ # ### Create a new Assemblage::Server. - def initialize( endpoint: nil ) + def initialize( endpoint: nil, register_mode: false ) @endpoint = endpoint || Assemblage::Server.endpoint + @register_mode = register_mode @reactor = CZTop::Reactor.new - @authenticator = Assemblage::Auth.authenticator + @authenticator = Assemblage::Auth.authenticator( allow_registration: @register_mode ) @socket = nil @connections = {} @output_queue = [] @start_time = nil + @worker_status = {} end @@ -204,6 +206,10 @@ attr_reader :endpoint ## + # Whether or not the server is running in registration mode + attr_predicate :register_mode + + ## # The CZTop::Reactor that handles asynchronous IO, timed events, and signals. attr_reader :reactor @@ -228,6 +234,10 @@ # The time the server started up. attr_reader :start_time + ## + # Last status reports of connected workers as a Hash keyed by worker name + attr_reader :worker_status + ### Run the server. def run @@ -324,12 +334,14 @@ ### Handle a connection sending a status report. def handle_status_report_command( connection, report, * ) self.log.debug "Connection %s sent a status report: %p" % [ connection, report ] + self.worker_status[ connection.name ] = report end ### Handle a connection asking for any assemblies pending for it. def handle_fetch_assemblies_command( connection, criteria ) self.log.debug "Connection %s fetched assemblies: %p" % [ connection, criteria ] + msg = connection.response( :) end diff --git a/lib/assemblage/worker.rb b/lib/assemblage/worker.rb --- a/lib/assemblage/worker.rb +++ b/lib/assemblage/worker.rb @@ -4,6 +4,7 @@ require 'fiber' require 'state_machines' require 'socket' +require 'time' require 'cztop/reactor' require 'cztop/reactor/signal_handling' require 'cztop/reactor/socket_monitoring' @@ -31,8 +32,8 @@ # A Regexp for matching valid worker names VALID_NAME = /\A[a-z](\w+)(-\w+)*\z/ - # A Regexp for matching valid tags - VALID_TAG = /\A\p{Alnum}+(-\p{Alnum}+)*:\s*\p{Print}+\z/ + # A Regexp for matching valid traits + VALID_TRAIT = /\A\p{Alnum}+(-\p{Alnum}+)*:\s*\p{Print}+\z/ # The list of valid actions for a worker HANDLED_MESSAGE_TYPES = %i[hello goodbye new_assembly] @@ -40,6 +41,9 @@ # The time between checks for new assemblies to work on ASSEMBLY_TIMER_INTERVAL = 5 # seconds + # The number of seconds between status reports sent to the server + STATUS_REPORT_INTERVAL = 10 + # Loggability API -- log to the Assemblage logger. log_to :assemblage @@ -64,10 +68,10 @@ ## - # The tags the worker uses to advertise the capabilities of its local + # The traits the worker uses to advertise the capabilities of its local # environment. - setting :tags, default: [] do |tags| - Array( tags ) + setting :traits, default: [] do |traits| + Array( traits ) end end @@ -134,11 +138,11 @@ ### Set up the Worker's directory as a worker run directory. Raises an ### exception if the directory already exists and is not empty. - def self::setup_run_directory( directory='.', name=DEFAULT_NAME, tags=[] ) + def self::setup_run_directory( directory='.', name=DEFAULT_NAME, traits=[] ) directory = Pathname( directory || '.' ) raise "Directory not empty" if directory.exist? && !directory.empty? - tags.each do |tag| - raise "Invalid tag %p" % [ tag ] unless tag.match( VALID_TAG ) + traits.each do |trait| + raise "Invalid trait %p" % [ trait ] unless trait.match( VALID_TRAIT ) end self.log.debug "Attempting to set up %s as a run directory." % [ directory ] @@ -148,7 +152,7 @@ config = Assemblage.config || Configurability.default_config config.assemblage.auth.cert_store_dir ||= (directory + 'certs').to_s config.assemblage.worker.name = name - config.assemblage.worker.tags = tags + config.assemblage.worker.traits = traits Loggability.with_level( :fatal ) do config.install @@ -188,7 +192,7 @@ client.handle_error_message( data, **header ) client.stop end - client.send_message( :register_worker, name, cert.public_key ) + client.send_message( :register_worker, [name, cert.public_key] ) client.start end @@ -205,9 +209,9 @@ # ### Create a nwe Assemblage::Worker. - def initialize( name: nil, server: nil, tags: nil ) + def initialize( name: nil, server: nil, traits: nil ) @name = name || Assemblage::Worker.name or raise "No worker name specified." - @tags = Array( tags || Assemblage::Worker.tags ) + @traits = Array( traits || Assemblage::Worker.traits ) @reactor = CZTop::Reactor.new @start_time = nil @@ -263,12 +267,18 @@ endpoint = server_cert['endpoint'] or raise "Server cert doesn't have an endpoint set." - @client = Assemblage::Client.new( url.to_s, cert, server_cert ) + @client = Assemblage::Client.new( endpoint.to_s, local_cert, server_cert ) @client.on_message( :error ) do |data, **header| client.handle_error_message( data, **header ) client.stop end + @client.on_message( :new_assembly ) do |data, **header| + self.on_new_assembly_message( *data ) + end + + self.start_assembly_timer + self.start_status_report_timer @start_time = Time.now @client.start @@ -287,6 +297,7 @@ def stop self.log.info "Stopping the assembly worker." self.stop_status_report_timer + self.stop_assembly_timer self.client.stop end @@ -303,9 +314,9 @@ end - ### Return a Hash of the criteria the worker should use when - ### requesting/subscribing to jobs from the server. - def job_criteria + ### Return a Hash of the traits the worker should use when fetching assemblies + ### from the server. + def job_traits platform = Gem::Platform.local return { os: platform.os, @@ -319,8 +330,8 @@ ### Populate the worker's assembly builders with any assemblies that were queued ### for workers matching this one. def fetch_pending_assemblies - criteria = self.job_criteria - self.send_message( :fetch_assemblies, criteria ) + traits = self.job_traits + self.send_message( :fetch_assemblies, traits ) end @@ -387,11 +398,12 @@ ### Queue a status report message for the worker. - def send_status_report + def send_status_report( * ) report = { version: Assemblage::VERSION, status: self.status, - uptime: self.uptime + uptime: self.uptime, + time: Time.now.rfc2822 } self.client.send_message( :status_report, report ) @@ -400,7 +412,7 @@ ### Queue a result message for the worker. def send_result( assembly_id, result ) - self.client.send_message( :post_result, assembly_id, result ) + self.client.send_message( :post_result, [assembly_id, result] ) end