Add more work on the worker, Hg hook, etc.
R Gemfile =>  +0 -2
@@ 1,2 0,0 @@ 
-source "https://rubygems.org/"
-gemspec

          
R Gemfile.devel =>  +0 -7
@@ 1,7 0,0 @@ 
-source "https://rubygems.org/"
-gemspec
-
-gem 'hglib', path: '../hglib'
-gem 'cztop-reactor', path: '../CZTop-Reactor'
-gem 'pry'
-

          
M Rakefile +1 -1
@@ 33,7 33,7 @@ hoespec = Hoe.spec 'assemblage' do |spec
 
 	spec.dependency 'loggability', '~> 0.11'
 	spec.dependency 'configurability', '~> 3.2'
-	spec.dependency 'hglib', '~> 0.1'
+	spec.dependency 'hglib', '~> 0.2'
 	spec.dependency 'git', '~> 1.3'
 	spec.dependency 'gli', '~> 2.18'
 	spec.dependency 'tty-prompt', '~> 0.18'

          
M data/assemblage/hooks/mercurial-hook.rb +31 -65
@@ 1,85 1,51 @@ 
-#!/usr/bin/env ruby
+#!/usr/bin/env rvm 2.6.3@ci do ruby
+
+BEGIN {
+	$stderr.puts( RUBY_VERSION )
+	$stderr.puts "Setting CWD to %p" % [ __dir__ ]
+	Dir.chdir( __dir__ )
+}
 
 require 'bundler/setup'
 require 'pathname'
 require 'hglib'
-require 'assemblage'
-require 'assemblage/cli'
+require 'assemblage/client'
 
 BASEDIR = Pathname( __FILE__ ).dirname
 CONFIG = BASEDIR + 'config.yml'
+EVENT_MAP = {
+	'changegroup'       => 'commit',
+	'commit'            => 'commit',
+	'incoming'          => 'commit',
+	'txnclose-bookmark' => 'bookmark',
+	'txnclose-phase'    => 'phase',
+	'pushkey'           => 'pushkey',
+	'tag'               => 'tag'
+}
 
 Assemblage.load_config( CONFIG )
+Loggability[ Assemblage ].level = :debug
 
 client = Assemblage::Client.new_by_name( 'server' )
 hgenv = ENV.
-	filter {|k,v| k.starts_with?('HG_') }.
-	each_with_object( {} ) do |hash, (name, val)|
+	filter {|k,v| k.start_with?('HG_') }.
+	each_with_object( {} ) do |(name, val), hash|
 		hash[ name.downcase.to_sym ] = val
 	end
 
-$stderr.puts "Publishing a %p event." % [ hgenv[:hg_hooktype] ]
-case hgenv[ :hg_hooktype ]
-
-# Run after a changegroup has been added via push, pull or unbundle. The ID of
-# the first new changeset is in "$HG_NODE" and last is in "$HG_NODE_LAST". The
-# URL from which changes came is in "$HG_URL".
-when 'changegroup'
-	client.send_message( :event, 'commit', hgenv[:hg_last_node], **hgenv )
+repo_name = ARGV.shift || File.basename( hgenv[:hg_url] )
 
-# Run after a changeset has been created in the local repository. The ID of
-# the newly created changeset is in "$HG_NODE". Parent changeset IDs are in
-# "$HG_PARENT1" and "$HG_PARENT2".
-when 'commit'
-	client.send_message( :event, 'commit', hgenv[:hg_node], **hgenv )
-
-# Run after a changeset has been pulled, pushed, or unbundled into the local
-# repository. The ID of the newly arrived changeset is in "$HG_NODE". The URL
-# that was source of the changes is in "$HG_URL".
-when 'incoming'
-	client.send_message( :event, 'commit', hgenv[:hg_node], **hgenv )
+if (event_name = EVENT_MAP[ hgenv[:hg_hooktype] ])
+	$stderr.puts "Publishing a %p event." % [ hgenv[:hg_hooktype] ]
 
-# Run after any bookmark change has been committed. At this point, the
-# transaction can no longer be rolled back. The hook will run after the lock is
-# released. The name of the bookmark will be available in "$HG_BOOKMARK", the
-# new bookmark location will be available in "$HG_NODE" while the previous
-# location will be available in "$HG_OLDNODE". In case of a bookmark creation
-# "$HG_OLDNODE" will be empty. In case of deletion "$HG_NODE" will be empty. In
-# addition, the reason for the transaction opening will be in "$HG_TXNNAME", and
-# a unique identifier for the transaction will be in "HG_TXNID".
-when 'txnclose-bookmark'
-	client.send_message( :event, 'bookmark', hgenv[:hg_node], hgenv[:hg_bookmark], **hgenv )
-
-# Run after any phase change has been committed. At this point, the transaction
-# can no longer be rolled back. The hook will run after the lock is released.
-# The affected node is available in "$HG_NODE", the phase in "$HG_PHASE" while
-# the previous "$HG_OLDPHASE". In case of new node, "$HG_OLDPHASE" will be
-# empty. In addition, the reason for the transaction opening will be in
-# "$HG_TXNNAME", and a unique identifier for the transaction will be in
-# "HG_TXNID". The hook is also run for newly added revisions. In this case the
-# "$HG_OLDPHASE" entry will be empty.
-when 'txnclose-phase'
-	client.send_message( :event, 'phase', hgenv[:hg_node], hgenv[:hg_oldphase],
-		hgenv[:hg_phase], **hgenv )
-
-# Run after a pushkey (like a bookmark) is added to the repository. The key
-# namespace is in "$HG_NAMESPACE", the key is in "$HG_KEY", the old value (if
-# any) is in "$HG_OLD", and the new value is in "$HG_NEW".;
-when 'pushkey'
-	client.send_message( :event, 'pushkey', hgenv[:hg_namespace], hgenv[:hg_key],
-		hgenv[:hg_new], **hgenv )
-
-# Run after a tag is created. The ID of the tagged changeset is in "$HG_NODE".
-# The name of tag is in "$HG_TAG". The tag is local if "$HG_LOCAL=1", or in the
-# repository if "$HG_LOCAL=0".
-when 'tag'
-	if hgenv[:hg_local] == '0'
-		client.send_message( :event, 'tag', hgenv[:hg_node], hgenv[:hg_tag], **hgenv )
+	client.send_message( :event, [event_name, repo_name, hgenv] )
+	client.start do |type, data, **header|
+		$stderr.puts "%p response: %p" % [ type, data ]
+		if type == :disconnect || type == :error
+			client.stop
+		end
 	end
+else
+	$stderr.puts "No event mapping for the %s hook." % [ hgenv[:hg_hooktype] ]
 end
 
-client.start do |type, data, **header|
-	$stderr.puts "%s response: %p" % [ type, data ]
-end
-
-

          
M lib/assemblage/assembly_builder.rb +2 -3
@@ 106,10 106,9 @@ class Assemblage::AssemblyBuilder
 
 	### Builder stage: Clone the assembly's repo.
 	def clone_repo( result )
-		repo_url = self.repository[ :url ] or raise "No repository URL for this assembly?!"
+		repo_url  = self.repository[ :url ] or raise "No repository URL for this assembly?!"
 		repo_type = self.repository[ :type ] or raise "No repository type set for this assembly."
-		repo_name = self.repository[ :connection_name ] || self.name
-		work_dir = Dir.mktmpdir( [self.name, "assemblies"], self.class.work_dir )
+		work_dir  = Dir.mktmpdir( [self.name, "assemblies"], self.class.work_dir )
 
 		vcs = Assemblage::VCSStrategy.get_subclass( repo_type )
 		vcs.clone( repo_url, work_dir )

          
M lib/assemblage/auth.rb +7 -14
@@ 52,16 52,6 @@ module Assemblage::Auth
 
 		##
 		# :singleton-method:
-		# Configurable: If +true+, allow new clients to register themselves by
-		# connecting with a valid CURVE certificate. If this is set to +false+,
-		# only connections which present one of the certs from #cert_store_dir are
-		# allowed.
-		setting :allow_registration, default: true do |value|
-			value ? true : false
-		end
-
-		##
-		# :singleton-method:
 		# Configurable: an array of allowed IP addresses. If this is not set, any connection
 		# which presents valid CURVE authentication credentials is allowed.
 		setting :allowed_ips, default: [] do |value|

          
@@ 261,18 251,21 @@ module Assemblage::Auth
 	singleton_method_alias :connection_type_for_cert, :connection_type_for
 
 
-	### Return a configured CZTop::CertStore pointing to the configured #data_dir
-	def self::authenticator
+	### Return a configured CZTop::CertStore pointing to the configured #data_dir. If
+	### +allow_registration+ is +true+, the authenticator will be configured to allow
+	### connections with unregistered certs instead of restricting them to ones that
+	### have already been registered.
+	def self::authenticator( allow_registration: false )
 		return @authenticator ||= begin
 			self.log.info "Creating CURVE authenticator."
 			auth = CZTop::Authenticator.new
 
-			if (certs_dir = self.remote_certs_path) && ! self.allow_registration
+			if (certs_dir = self.remote_certs_path) && ! allow_registration
 				self.log.info "Using remote certs dir %s for auth." % [ certs_dir ]
 				certs_dir.mkpath
 				auth.curve( certs_dir.to_s )
 			else
-				self.log.warn "Using ALLOW_ANY remote curve auth."
+				self.log.warn "Using ALLOW_ANY remote curve auth (registration mode)."
 				auth.curve
 			end
 

          
M lib/assemblage/client.rb +1 -2
@@ 237,8 237,7 @@ class Assemblage::Client
 
 
 	### Send a message to the server.
-	def send_message( verb, *data, **header )
-		data.flatten!( 1 )
+	def send_message( verb, data, **header )
 		self.input_queue << Assemblage::Protocol.encode( verb.to_sym, data, **header )
 		self.register_for_writing if self.reactor.registered?( self.socket )
 	end

          
M lib/assemblage/command/client.rb +1 -1
@@ 37,7 37,7 @@ module Assemblage::CLI::ClientCommand
 				client.handle_error_message( data, **header )
 				client.stop
 			end
-			client.send_message( :register_worker, name, cert.public_key )
+			client.send_message( :register_worker, [name, cert.public_key] )
 			client.start
 		end
 	end

          
M lib/assemblage/command/server.rb +6 -1
@@ 200,8 200,13 @@ module Assemblage::CLI::ServerCommand
 		END_DESC
 		server.arg :DIRECTORY, :optional
 		server.command :start do |start|
+			start.switch [:r, :registration],
+				desc: "Run in registration mode.",
+				long_desc: "Run in a mode that allows new clients to be registered instead " \
+					"of normal authentication."
+
 			start.action do |globals, options, args|
-				Assemblage::Server.run( args.shift )
+				Assemblage::Server.run( args.shift, register_mode: options[:r] )
 			end
 		end
 

          
M lib/assemblage/command/worker.rb +8 -8
@@ 46,9 46,9 @@ module Assemblage::CLI::WorkerCommand
 			create.flag [:N, :name], type: String,
 				must_match: Assemblage::Auth::CLIENT_NAME_PATTERN
 
-			create.desc "Specify one or more tags that indicate what assemblies " +
+			create.desc "Specify one or more traits that indicate what assemblies " +
 				"the worker should accept"
-			create.flag [:t, :tags], type: Array
+			create.flag [:t, :traits], type: Array
 
 			create.action do |globals, options, args|
 				directory, server_url, server_key = *args

          
@@ 67,10 67,10 @@ module Assemblage::CLI::WorkerCommand
 				server_url = URI( server_url )
 
 				name = options.name || prompt_for_name( directory )
-				tags = options.tags || prompt_for_tags()
+				traits = options.traits || prompt_for_tags()
 
 				prompt.say "Creating a worker run directory in %s..." % [ directory ]
-				Assemblage::Worker.setup_run_directory( directory, name, tags )
+				Assemblage::Worker.setup_run_directory( directory, name, traits )
 
 				prompt.say "Generating a cert..."
 				Assemblage::Worker.generate_cert( name ) or

          
@@ 162,7 162,7 @@ module Assemblage::CLI::WorkerCommand
 	end
 
 
-	### Ask the user for one or more tags for a new worker, returning them as an
+	### Ask the user for one or more traits for a new worker, returning them as an
 	### Array of Strings.
 	def prompt_for_tags
 		prompt.say( "Tags (one per line):" )

          
@@ 173,13 173,13 @@ module Assemblage::CLI::WorkerCommand
 			modify: %i[remove]
 		}
 
-		tags = []
+		traits = []
 		while input = prompt.ask( '>>', options )
 			break if input == ''
-			tags << input
+			traits << input
 		end
 
-		return tags
+		return traits
 	end
 
 end # module Assemblage::CLI::AddServer

          
M lib/assemblage/protocol.rb +1 -0
@@ 60,6 60,7 @@ module Assemblage::Protocol
 
 	### Encode a message of the specified +type+ and return it as a CZTop::Message.
 	def encode( command, data, **header )
+		self.log.debug "Encode %p message with data: %p, header: %p" % [ command, data, header ]
 		header = DEFAULT_HEADER.merge( symbolify_keys(header) )
 		Assemblage::Protocol.check_message_header( header )
 

          
M lib/assemblage/publisher.rb +1 -1
@@ 99,7 99,7 @@ class Assemblage::Publisher
 			client.handle_error_message( data, **header )
 			client.stop
 		end
-		client.send_message( :register_publisher, name, cert.public_key )
+		client.send_message( :register_publisher, [name, cert.public_key] )
 		client.start
 	end
 

          
M lib/assemblage/server.rb +15 -3
@@ 20,7 20,7 @@ using Assemblage::Refinements
 #
 # This gathers events from repositories and dispatches them to workers via one
 # or more "assemblies". An assembly is the combination of a repository and one
-# or more tags that describe pre-requisites for building a particular product.
+# or more traits that describe pre-requisites for building a particular product.
 class Assemblage::Server
 	extend Loggability,
 	       Configurability,

          
@@ 182,16 182,18 @@ class Assemblage::Server
 	#
 
 	### Create a new Assemblage::Server.
-	def initialize( endpoint: nil )
+	def initialize( endpoint: nil, register_mode: false )
 		@endpoint      = endpoint || Assemblage::Server.endpoint
+		@register_mode = register_mode
 
 		@reactor       = CZTop::Reactor.new
-		@authenticator = Assemblage::Auth.authenticator
+		@authenticator = Assemblage::Auth.authenticator( allow_registration: @register_mode )
 
 		@socket        = nil
 		@connections   = {}
 		@output_queue  = []
 		@start_time    = nil
+		@worker_status = {}
 	end
 
 

          
@@ 204,6 206,10 @@ class Assemblage::Server
 	attr_reader :endpoint
 
 	##
+	# Whether or not the server is running in registration mode
+	attr_predicate :register_mode
+
+	##
 	# The CZTop::Reactor that handles asynchronous IO, timed events, and signals.
 	attr_reader :reactor
 

          
@@ 228,6 234,10 @@ class Assemblage::Server
 	# The time the server started up.
 	attr_reader :start_time
 
+	##
+	# Last status reports of connected workers as a Hash keyed by worker name
+	attr_reader :worker_status
+
 
 	### Run the server.
 	def run

          
@@ 324,12 334,14 @@ class Assemblage::Server
 	### Handle a connection sending a status report.
 	def handle_status_report_command( connection, report, * )
 		self.log.debug "Connection %s sent a status report: %p" % [ connection, report ]
+		self.worker_status[ connection.name ] = report
 	end
 
 
 	### Handle a connection asking for any assemblies pending for it.
 	def handle_fetch_assemblies_command( connection, criteria )
 		self.log.debug "Connection %s fetched assemblies: %p" % [ connection, criteria ]
+		msg = connection.response( :)
 	end
 
 

          
M lib/assemblage/worker.rb +33 -21
@@ 4,6 4,7 @@ 
 require 'fiber'
 require 'state_machines'
 require 'socket'
+require 'time'
 require 'cztop/reactor'
 require 'cztop/reactor/signal_handling'
 require 'cztop/reactor/socket_monitoring'

          
@@ 31,8 32,8 @@ class Assemblage::Worker
 	# A Regexp for matching valid worker names
 	VALID_NAME = /\A[a-z](\w+)(-\w+)*\z/
 
-	# A Regexp for matching valid tags
-	VALID_TAG = /\A\p{Alnum}+(-\p{Alnum}+)*:\s*\p{Print}+\z/
+	# A Regexp for matching valid traits
+	VALID_TRAIT = /\A\p{Alnum}+(-\p{Alnum}+)*:\s*\p{Print}+\z/
 
 	# The list of valid actions for a worker
 	HANDLED_MESSAGE_TYPES = %i[hello goodbye new_assembly]

          
@@ 40,6 41,9 @@ class Assemblage::Worker
 	# The time between checks for new assemblies to work on
 	ASSEMBLY_TIMER_INTERVAL = 5 # seconds
 
+	# The number of seconds between status reports sent to the server
+	STATUS_REPORT_INTERVAL = 10
+
 
 	# Loggability API -- log to the Assemblage logger.
 	log_to :assemblage

          
@@ 64,10 68,10 @@ class Assemblage::Worker
 
 
 		##
-		# The tags the worker uses to advertise the capabilities of its local
+		# The traits the worker uses to advertise the capabilities of its local
 		# environment.
-		setting :tags, default: [] do |tags|
-			Array( tags )
+		setting :traits, default: [] do |traits|
+			Array( traits )
 		end
 
 	end

          
@@ 134,11 138,11 @@ class Assemblage::Worker
 
 	### Set up the Worker's directory as a worker run directory. Raises an
 	### exception if the directory already exists and is not empty.
-	def self::setup_run_directory( directory='.', name=DEFAULT_NAME, tags=[] )
+	def self::setup_run_directory( directory='.', name=DEFAULT_NAME, traits=[] )
 		directory = Pathname( directory || '.' )
 		raise "Directory not empty" if directory.exist? && !directory.empty?
-		tags.each do |tag|
-			raise "Invalid tag %p" % [ tag ] unless tag.match( VALID_TAG )
+		traits.each do |trait|
+			raise "Invalid trait %p" % [ trait ] unless trait.match( VALID_TRAIT )
 		end
 
 		self.log.debug "Attempting to set up %s as a run directory." % [ directory ]

          
@@ 148,7 152,7 @@ class Assemblage::Worker
 		config = Assemblage.config || Configurability.default_config
 		config.assemblage.auth.cert_store_dir ||= (directory + 'certs').to_s
 		config.assemblage.worker.name = name
-		config.assemblage.worker.tags = tags
+		config.assemblage.worker.traits = traits
 
 		Loggability.with_level( :fatal ) do
 			config.install

          
@@ 188,7 192,7 @@ class Assemblage::Worker
 			client.handle_error_message( data, **header )
 			client.stop
 		end
-		client.send_message( :register_worker, name, cert.public_key )
+		client.send_message( :register_worker, [name, cert.public_key] )
 		client.start
 	end
 

          
@@ 205,9 209,9 @@ class Assemblage::Worker
 	#
 
 	### Create a nwe Assemblage::Worker.
-	def initialize( name: nil, server: nil, tags: nil )
+	def initialize( name: nil, server: nil, traits: nil )
 		@name       = name || Assemblage::Worker.name or raise "No worker name specified."
-		@tags       = Array( tags || Assemblage::Worker.tags )
+		@traits       = Array( traits || Assemblage::Worker.traits )
 		@reactor    = CZTop::Reactor.new
 		@start_time = nil
 

          
@@ 263,12 267,18 @@ class Assemblage::Worker
 		endpoint = server_cert['endpoint'] or
 			raise "Server cert doesn't have an endpoint set."
 
-		@client = Assemblage::Client.new( url.to_s, cert, server_cert )
+		@client = Assemblage::Client.new( endpoint.to_s, local_cert, server_cert )
 
 		@client.on_message( :error ) do |data, **header|
 			client.handle_error_message( data, **header )
 			client.stop
 		end
+		@client.on_message( :new_assembly ) do |data, **header|
+			self.on_new_assembly_message( *data )
+		end
+
+		self.start_assembly_timer
+		self.start_status_report_timer
 
 		@start_time = Time.now
 		@client.start

          
@@ 287,6 297,7 @@ class Assemblage::Worker
 	def stop
 		self.log.info "Stopping the assembly worker."
 		self.stop_status_report_timer
+		self.stop_assembly_timer
 		self.client.stop
 	end
 

          
@@ 303,9 314,9 @@ class Assemblage::Worker
 	end
 
 
-	### Return a Hash of the criteria the worker should use when
-	### requesting/subscribing to jobs from the server.
-	def job_criteria
+	### Return a Hash of the traits the worker should use when fetching assemblies
+	### from the server.
+	def job_traits
 		platform = Gem::Platform.local
 		return {
 			os: platform.os,

          
@@ 319,8 330,8 @@ class Assemblage::Worker
 	### Populate the worker's assembly builders with any assemblies that were queued
 	### for workers matching this one.
 	def fetch_pending_assemblies
-		criteria = self.job_criteria
-		self.send_message( :fetch_assemblies, criteria )
+		traits = self.job_traits
+		self.send_message( :fetch_assemblies, traits )
 	end
 
 

          
@@ 387,11 398,12 @@ class Assemblage::Worker
 
 
 	### Queue a status report message for the worker.
-	def send_status_report
+	def send_status_report( * )
 		report = {
 			version: Assemblage::VERSION,
 			status: self.status,
-			uptime: self.uptime
+			uptime: self.uptime,
+			time: Time.now.rfc2822
 		}
 
 		self.client.send_message( :status_report, report )

          
@@ 400,7 412,7 @@ class Assemblage::Worker
 
 	### Queue a result message for the worker.
 	def send_result( assembly_id, result )
-		self.client.send_message( :post_result, assembly_id, result )
+		self.client.send_message( :post_result, [assembly_id, result] )
 	end