Checkpoint after reworking registration, connection management
R .assemblies/00_quality_check =>  +0 -4
@@ 1,4 0,0 @@ 
-#!/usr/bin/env bash
-
-rake quality_check
-

          
R .assemblies/01_test =>  +0 -3
@@ 1,3 0,0 @@ 
-#!/usr/bin/env bash
-
-rake test

          
R .assemblies/02_docs =>  +0 -3
@@ 1,3 0,0 @@ 
-#!/usr/bin/env bash
-
-rake docs

          
A => .assembly/on_commit-00_quality_check +4 -0
@@ 0,0 1,4 @@ 
+#!/usr/bin/env bash
+
+rake quality_check
+

          
A => .assembly/on_commit-01_test +3 -0
@@ 0,0 1,3 @@ 
+#!/usr/bin/env bash
+
+rake test

          
A => .assembly/on_tag-00_check_signature +3 -0
@@ 0,0 1,3 @@ 
+#!/usr/bin/env bash
+
+rake sigcheck $EVENT_ID

          
A => .assembly/on_tag-01_test +3 -0
@@ 0,0 1,3 @@ 
+#!/usr/bin/env bash
+
+rake release

          
A => .assembly/on_tag-02_release +3 -0
@@ 0,0 1,3 @@ 
+#!/usr/bin/env bash
+
+rake release

          
A => .assembly/on_tag-03_publish_docs +3 -0
@@ 0,0 1,3 @@ 
+#!/usr/bin/env bash
+
+rake publish_docs

          
M .gems +1 -0
@@ 7,6 7,7 @@ hoe-deveiate
 loggability
 msgpack
 pluggability
+rb-readline
 rdoc-generator-fivefish
 rspec-wait
 sequel

          
M README.md +31 -104
@@ 17,33 17,46 @@ docs
 
 ## Description
 
+`assemblage`, **noun**:
+
+1. a collection of parts, as of machinery, put together to form a whole.
+2. the process of joining persons or things to form a whole.
+
 Assemblage is a continuous integration library. It's intended to provide you
 with a minimal toolkit for distributing and performing automated tasks when
 **commits** arrive at one or more version control **repositories**. It makes as
 few assumptions as possible as to what those tasks might be.
 
-A task in Assemblage is called an Assembly. Assemblage has three primary parts for manipulating Assemblies: the **Assembly Server**, **Assembly
-Workers**, and **Repositories**.
+The product or output from such a task in Assemblage is called an Assembly.
+Assemblage has three primary parts for manipulating Assemblies: the **Assembly
+Server**, **Assembly Workers**, and **Publishers**. **Assembly Workers** listen for events set to an **Assembly Server** by a **Publisher**, and run one or more **Assembly Scripts** to assemble their products.
 
 <dl>
   <dt>Assembly Server</dt>
   <dd>Aggregates and distributes events from <em>repositories</em> to
-  <em>workers</em> via one or more "assemblies".</dd>
+  <em>workers</em> to make one or more "assemblies".</dd>
 
   <dt>Assembly Workers</dt>
-  <dd>Listens for <em>commits</em> published by the <em>assembly server</em>,
-  checks out a <em>repository</em>, and runs an assembly script in that
-  repository.</dd>
+  <dd>Listens for <em>events</em> published by the <em>assembly server</em>,
+  checks out a <em>repository</em>, and runs one or more assembly scripts in
+  that repository.</dd>
 
-  <dt>Repository</dt>
-  <dd>A distributed version control repository. Assemblage currently supports
-  Mercurial and Git.</dd>
+  <dt>Publisher</dt>
+  <dd>A client that sends events to the server from distributed version control
+  repositories. Assemblage currently supports Mercurial and Git.</dd>
 
-  <dt>Commit</dt>
-  <dd>A single commit in a repository.</dd>
+  <dt>Event</dt>
+  <dd>Notification of some change in a repository, e.g., a
+    <code>commit</code>.</dd>
 
   <dt>Assembly</dt>
-  <dd>A script that builds something, checked into the repository source in the `.assemblies` directory (by default).</dd>
+  <dd>A product or collection of products built as a reaction to one or more
+    <em>events</em>.</dd>
+
+  <dt>Assembly Script</dt>
+  <dd>A script that is executed when an <em>event</em> occurs in a repository,
+  checked into the repository source in the `.assembly` directory (by
+  default).</dd>
 </dl>
 
 

          
@@ 60,100 73,14 @@ Workers**, and **Repositories**.
 This example uses three different hosts for the three parts, but you can, of
 course, run all of this on a single host.
 
-You'll first need a server to manage your assemblies:
-
-    example $ sudo gem install assemblage
-    example $ assemblage server create /usr/local/assemblage
-    Creating a server run directory in /usr/local/assemblage...
-    Generating a server key...
-    Creating the assemblies database...
-    done.
-
-    You can start the assembly server like so:
-      assemblage server start /usr/local/assemblage
-    
-    Server public key is:
-      &}T0.[{MZSJC]roN-{]x2QCkG+dXki!6j!.1JU1u
-
-    example $ assemblage server start /usr/local/assemblage
-    Starting assembly server at:
-      tcp://example.com:7872
-
-Now (possibly on a different host) you can create a new worker installation.
-Workers have a name and a list of tags that describe its capabilities, e.g.,
-the OS it's running on, installed software, etc. Our example is running on
-FreeBSD 11, and has Ruby 2.4, Ruby 2.5, Python 2.7, ZeroMQ, and the PostgreSQL
-client libraries available. We'll use a pretty simple tag convention but you
-can make it as simple or complex as you want.
-
-    user@example-client $ sudo gem install assemblage
-    user@example-client $ mkdir -p /usr/local/assemblage
-    user@example-client $ cd /usr/local/assemblage
-    user@example-client $ assemblage worker create \
-      -t freebsd,freebsd11,ruby,ruby24,ruby25,python,python27,zeromq,libpq worker1
-    Creating a new assembly worker run directory in
-      /usr/local/assemblage/worker1...
-    Set up with worker name: example-client-worker1
-    Client public key is:
-      PJL=qK@SHy3#re-w@W)4C]Aj+aD}toGf*Y*SOOZ4
-    done.
-
-Now we need to register the worker with the server. On the server host:
-
-    user@example $ assemblage worker add example-client-worker1 \
-      "PJL=qK@SHy3#re-w@W)4C]Aj+aD}toGf*Y*SOOZ4"
-    Approving connections from example-client-worker1...
-    done.
+...
 
-Now go back to the worker and tell it that it should talk to the new server we
-just set up:
 
-    user@example-client $ cd /usr/local/assemblage/worker1
-    user@example-client $ assemblage server add \
-      tcp://example.com:7872 "&}T0.[{MZSJC]roN-{]x2QCkG+dXki!6j!.1JU1u"
-    Talking to tcp://example.com:7872...
-    Testing registration... success.
-    done.
-
-Now you can start the worker, which will listen for jobs it can work on.
-
-    user@example-client $ cd /usr/local/assemblage/worker1
-    user@example-client $ assemblage worker start
-    Starting assembly worker `worker1`...
-    Connecting to assembly servers...
-       example... done.
-    Waiting for jobs...
-
-Now we need our repositories to notify the assembly server when events occur.
-We'll hook up a Mercurial repo for a Ruby library so that it runs unit tests
-whenever there's a new commit. First we'll install assemblage and the Mercurial
-client library on the repo server and create a publisher for repo operations:
-
-    user@example-repo $ sudo gem install assemblage hglib
-    user@example-repo $ mkdir /usr/local/hg/repos/.assemblage
-    user@example-repo $ cd /usr/local/hg/repos/.assemblage
-    user@example-repo $ assemblage publisher setup
-    Setting up a new assemblage publisher config directory in
-      /usr/local/hg/repos/.assemblage...
-    Publisher public key is:
-      bq9VheQbLtcu]LGK4I&xzK3^UW0Iyak/6<YS=^$w
-    done.
-
-Now we'll need to register the publisher on the server like we did before for
-the worker:
-
-    user@example $ assemblage publisher add \
-      'bq9VheQbLtcu]LGK4I&xzK3^UW0Iyak/6<YS=^$w'
-    Accepting publisher events from pub-example-com-0a44fe...
-    done.
-
-    user@example $ assemblage repo add --type=hg project1 \
-      http://repo.example.com/project1
-
-We'll add a hook to the repository's .hg/hgrc that looks like:
+Now copy the Mercurial hook to the repository and hook it into the .hg/hgrc:
 
     [hooks]
-    incoming.assemblage = /usr/local/bin/assemblage commit project1 $HG_NODE
+    incoming.assemblage = assemblage-hook.rb
+    incoming.assemblage = assemblage-hook.rb
 
 And finally, we'll combine all the parts into an assembly called
 `project1-freebsd-tests` that will run on a worker with the `freebsd`, `ruby`,

          
@@ 167,9 94,9 @@ Now when commits arrive at our repo, it 
 
 - get a notification of the commit
 - clone the repository checked out to that commit
-- look for an assembly script called `commit` in a directory called `.assemblies/` (by default)
+- look for an assembly script called `commit` in a directory called `.assembly/` (by default)
 - if it finds one, it will run the script from the cloned repo
-- it will then send back any files contained in the `.assemblies/` subdirectory with the SHA of the commit (if it exists) along with the exit code of the script.
+- it will then send back any files contained in the `.assembly/` subdirectory with the SHA of the commit (if it exists) along with the exit code of the script.
 
 
 

          
M Rakefile +6 -3
@@ 33,12 33,15 @@ hoespec = Hoe.spec 'assemblage' do |spec
 
 	spec.dependency 'loggability', '~> 0.11'
 	spec.dependency 'configurability', '~> 3.2'
-	spec.dependency 'hglib', '~> 0'
+	spec.dependency 'hglib', '~> 0.1'
 	spec.dependency 'git', '~> 1.3'
 	spec.dependency 'gli', '~> 2.18'
-	spec.dependency 'tty', '~> 0.7'
+	spec.dependency 'tty-prompt', '~> 0.18'
+	spec.dependency 'tty-table', '~> 0.10'
 	spec.dependency 'sequel', '~> 5.6'
 	spec.dependency 'msgpack', '~> 1.2'
+	spec.dependency 'rb-readline', '~> 0.5'
+	spec.dependency 'rbnacl', '~> 5.0'
 	spec.dependency 'state_machines', '~> 0.5'
 	spec.dependency 'cztop-reactor', '~> 0.7'
 	spec.dependency 'pluggability', '~> 0.6'

          
@@ 99,7 102,7 @@ task GEMSPEC do |task|
 	spec.files.delete( '.gemtest' )
 	spec.signing_key = nil
 	spec.cert_chain = ['certs/ged.pem']
-	spec.version = "#{spec.version.bump}.pre#{Time.now.strftime("%Y%m%d%H%M%S")}"
+	spec.version = "#{spec.version.bump}.0.pre#{Time.now.strftime("%Y%m%d%H%M%S")}"
 	File.open( task.name, 'w' ) do |fh|
 		fh.write( spec.to_ruby )
 	end

          
A => To-Do.md +10 -0
@@ 0,0 1,10 @@ 
+# To Do List
+
+## Before First Release
+
+* Move endpoints for servers for publishers and workers to the metadata of the public certs that they save instead of their config files. This will make it easier to support multiple servers for clients/publishers later.
+
+## After First Release
+
+* Add the ability to talk to multiple servers to client/publishers. This will require making Client take an optional `reactor` argument so it can participate with an outside reactor. Then clients can create their own reactor and one client per server and multiplex across them.
+

          
M assemblage.gemspec +20 -14
@@ 1,16 1,16 @@ 
 # -*- encoding: utf-8 -*-
-# stub: assemblage 0.1.pre20180823182117 ruby lib
+# stub: assemblage 0.1.pre20190508145718 ruby lib
 
 Gem::Specification.new do |s|
   s.name = "assemblage".freeze
-  s.version = "0.1.pre20180823182117"
+  s.version = "0.1.pre20190508145718"
 
   s.required_rubygems_version = Gem::Requirement.new("> 1.3.1".freeze) if s.respond_to? :required_rubygems_version=
   s.require_paths = ["lib".freeze]
   s.authors = ["Michael Granger".freeze]
   s.cert_chain = ["certs/ged.pem".freeze]
-  s.date = "2018-08-24"
-  s.description = "Assemblage is a continuous integration library. It's intended to provide you\nwith a minimal toolkit for distributing and performing automated tasks\nfor one or more version control repositories. It makes as few assumptions as\npossible as to what those tasks might be.\n\nA task in Assemblage is called an Assembly. Assemblage has three primary parts for manipulating Assemblies: the **Assembly Server**, **Assembly\nWorkers**, and **Repositories**.\n\n<dl>\n  <dt>Assembly Server</dt>\n  <dd>Aggregates and distributes events from <em>repositories</em> to\n  <em>workers</em> via one or more \"assemblies\".</dd>\n\n  <dt>Assembly Workers</dt>\n  <dd>Listens for events published by the <em>assembly server</em>, checks out\n  a <em>repository</em>, and runs an assembly script in that repository.</dd>\n\n  <dt>Repository</dt>\n  <dd>A distributed version control repository. Assemblage currently supports\n  Mercurial and Git.</dd>\n</dl>".freeze
+  s.date = "2019-05-08"
+  s.description = "Assemblage is a continuous integration library. It's intended to provide you\nwith a minimal toolkit for distributing and performing automated tasks\nfor one or more version control repositories. It makes as few assumptions as\npossible as to what those tasks might be.\n\nA task in Assemblage is called an Assembly. Assemblage has three primary parts for manipulating Assemblies: the **Assembly Server**, **Assembly\nWorkers**, and **Repositories**.\n\n<dl>\n  <dt>Assembly Server</dt>\n  <dd>Aggregates and distributes events from <em>repositories</em> to\n  <em>workers</em> via one or more \"assemblies\".</dd>\n\n  <dt>Assembly Workers</dt>\n  <dd>Listens for <em>commits</em> published by the <em>assembly server</em>,\n  checks out a <em>repository</em>, and runs an assembly script in that\n  repository.</dd>\n\n  <dt>Repository</dt>\n  <dd>A distributed version control repository. Assemblage currently supports\n  Mercurial and Git.</dd>\n\n  <dt>Commit</dt>\n  <dd>A single commit in a repository.</dd>\n\n  <dt>Assembly</dt>\n  <dd>A script that builds something, checked into the repository source in the `.assembly` directory (by default).</dd>\n</dl>".freeze
   s.email = ["ged@FaerieMUD.org".freeze]
   s.executables = ["assemblage".freeze]
   s.extra_rdoc_files = ["History.md".freeze, "LICENSE.txt".freeze, "Manifest.txt".freeze, "README.md".freeze, "History.md".freeze, "Protocol.md".freeze, "README.md".freeze]

          
@@ 19,7 19,7 @@ Gem::Specification.new do |s|
   s.licenses = ["BSD-3-Clause".freeze]
   s.rdoc_options = ["--main".freeze, "README.md".freeze]
   s.required_ruby_version = Gem::Requirement.new(">= 2.3.4".freeze)
-  s.rubygems_version = "2.7.6".freeze
+  s.rubygems_version = "3.0.3".freeze
   s.summary = "Assemblage is a continuous integration library".freeze
 
   if s.respond_to? :specification_version then

          
@@ 28,12 28,14 @@ Gem::Specification.new do |s|
     if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then
       s.add_runtime_dependency(%q<loggability>.freeze, ["~> 0.11"])
       s.add_runtime_dependency(%q<configurability>.freeze, ["~> 3.2"])
-      s.add_runtime_dependency(%q<hglib>.freeze, ["~> 0"])
+      s.add_runtime_dependency(%q<hglib>.freeze, ["~> 0.1"])
       s.add_runtime_dependency(%q<git>.freeze, ["~> 1.3"])
-      s.add_runtime_dependency(%q<gli>.freeze, ["~> 2.17"])
-      s.add_runtime_dependency(%q<tty>.freeze, ["~> 0.7"])
+      s.add_runtime_dependency(%q<gli>.freeze, ["~> 2.18"])
+      s.add_runtime_dependency(%q<tty-prompt>.freeze, ["~> 0.18"])
+      s.add_runtime_dependency(%q<tty-table>.freeze, ["~> 0.10"])
       s.add_runtime_dependency(%q<sequel>.freeze, ["~> 5.6"])
       s.add_runtime_dependency(%q<msgpack>.freeze, ["~> 1.2"])
+      s.add_runtime_dependency(%q<rb-readline>.freeze, ["~> 0.5"])
       s.add_runtime_dependency(%q<state_machines>.freeze, ["~> 0.5"])
       s.add_runtime_dependency(%q<cztop-reactor>.freeze, ["~> 0.7"])
       s.add_runtime_dependency(%q<pluggability>.freeze, ["~> 0.6"])

          
@@ 49,12 51,14 @@ Gem::Specification.new do |s|
     else
       s.add_dependency(%q<loggability>.freeze, ["~> 0.11"])
       s.add_dependency(%q<configurability>.freeze, ["~> 3.2"])
-      s.add_dependency(%q<hglib>.freeze, ["~> 0"])
+      s.add_dependency(%q<hglib>.freeze, ["~> 0.1"])
       s.add_dependency(%q<git>.freeze, ["~> 1.3"])
-      s.add_dependency(%q<gli>.freeze, ["~> 2.17"])
-      s.add_dependency(%q<tty>.freeze, ["~> 0.7"])
+      s.add_dependency(%q<gli>.freeze, ["~> 2.18"])
+      s.add_dependency(%q<tty-prompt>.freeze, ["~> 0.18"])
+      s.add_dependency(%q<tty-table>.freeze, ["~> 0.10"])
       s.add_dependency(%q<sequel>.freeze, ["~> 5.6"])
       s.add_dependency(%q<msgpack>.freeze, ["~> 1.2"])
+      s.add_dependency(%q<rb-readline>.freeze, ["~> 0.5"])
       s.add_dependency(%q<state_machines>.freeze, ["~> 0.5"])
       s.add_dependency(%q<cztop-reactor>.freeze, ["~> 0.7"])
       s.add_dependency(%q<pluggability>.freeze, ["~> 0.6"])

          
@@ 71,12 75,14 @@ Gem::Specification.new do |s|
   else
     s.add_dependency(%q<loggability>.freeze, ["~> 0.11"])
     s.add_dependency(%q<configurability>.freeze, ["~> 3.2"])
-    s.add_dependency(%q<hglib>.freeze, ["~> 0"])
+    s.add_dependency(%q<hglib>.freeze, ["~> 0.1"])
     s.add_dependency(%q<git>.freeze, ["~> 1.3"])
-    s.add_dependency(%q<gli>.freeze, ["~> 2.17"])
-    s.add_dependency(%q<tty>.freeze, ["~> 0.7"])
+    s.add_dependency(%q<gli>.freeze, ["~> 2.18"])
+    s.add_dependency(%q<tty-prompt>.freeze, ["~> 0.18"])
+    s.add_dependency(%q<tty-table>.freeze, ["~> 0.10"])
     s.add_dependency(%q<sequel>.freeze, ["~> 5.6"])
     s.add_dependency(%q<msgpack>.freeze, ["~> 1.2"])
+    s.add_dependency(%q<rb-readline>.freeze, ["~> 0.5"])
     s.add_dependency(%q<state_machines>.freeze, ["~> 0.5"])
     s.add_dependency(%q<cztop-reactor>.freeze, ["~> 0.7"])
     s.add_dependency(%q<pluggability>.freeze, ["~> 0.6"])

          
M data/assemblage/migrations/20180314_initial.rb +51 -9
@@ 3,45 3,87 @@ 
 
 Sequel.migration do
 	up do
+
 		create_table( :repositories ) do
 			primary_key :id
 
 			String :name, null: false, unique: true
 			String :type, null: false
 			String :url, null: false, unique: true
+
 			Time :created_at, default: Sequel.function(:now)
 			Time :updated_at
+			Time :removed_at
 		end
 
-		create_table( :commits ) do
+
+		create_table( :connections ) do
 			primary_key :id
 
-			String :revision, null: false
+			String :type, null: false
+			String :name, null: false
+
 			Time :created_at, default: Sequel.function(:now)
 			Time :updated_at
+			Time :removed_at
+
+			unique [:type, :name]
+			constraint( :valid_connection_type, type: %w[worker publisher] )
+		end
+
+
+		create_table( :events ) do
+			primary_key :id
+
+			String :type, null: false
+			String :identifier, null: false
+
+			Time :created_at, default: Sequel.function(:now)
+			Time :updated_at
+			Time :removed_at
 
 			foreign_key :repository_id, :repositories, null: false,
 				on_delete: :cascade
-			unique [:revision, :repository_id]
+			foreign_key :via_connection_id, :connections, null: false
+
+			unique [:type, :identifier, :repository_id]
 		end
 
+
+		create_table( :assemblies ) do
+			primary_key :id
+
+			String :name
+
+			Time :created_at, default: Sequel.function(:now)
+			Time :updated_at
+			Time :finished_at
+
+			foreign_key :via_connection_id, :connections, null: false
+			foreign_key :triggering_event_id, :events, null: false,
+				on_delete: :cascade
+		end
+
+
 		create_table( :assembly_results ) do
 			primary_key :id
 
-			String :assembly_name, null: false
+			File :data
+
 			Time :created_at, default: Sequel.function(:now)
 			Time :updated_at
-			Time :finished_at
-			String :worker_name, null: false, unique: true
 
-			foreign_key :commit_id, :commits, null: false,
-				on_delete: :cascade
+			foreign_key :assembly_id, :assemblies, null: false
 		end
+
 	end
 
+
 	down do
 		drop_table( :assembly_results, cascade: true )
-		drop_table( :commits, cascade: true )
+		drop_table( :assemblies, cascade: true )
+		drop_table( :events, cascade: true )
+		drop_table( :connections, cascade: true )
 		drop_table( :repositories, cascade: true )
 	end
 end

          
M lib/assemblage/assembly_builder.rb +1 -2
@@ 104,12 104,11 @@ class Assemblage::AssemblyBuilder
 	end
 
 
-
 	### Builder stage: Clone the assembly's repo.
 	def clone_repo( result )
 		repo_url = self.repository[ :url ] or raise "No repository URL for this assembly?!"
 		repo_type = self.repository[ :type ] or raise "No repository type set for this assembly."
-		repo_name = self.repository[ :client_name ] || self.name
+		repo_name = self.repository[ :connection_name ] || self.name
 		work_dir = Dir.mktmpdir( [self.name, "assemblies"], self.class.work_dir )
 
 		vcs = Assemblage::VCSStrategy.get_subclass( repo_type )

          
M lib/assemblage/assembly_result.rb +2 -2
@@ 20,8 20,8 @@ class Assemblage::AssemblyResult < Assem
 
 
 	##
-	# The client that generated these results
-	many_to_one :client, class: 'Assemblage::Client'
+	# The connection that generated these results
+	many_to_one :connection, class: 'Assemblage::Connection'
 
 
 end # class Assemblage::AssemblyResult

          
M lib/assemblage/auth.rb +108 -39
@@ 9,6 9,11 @@ require 'assemblage' unless defined?( As
 require 'assemblage/mixins'
 
 
+# Module that contains all the authentication-management functions for ZAUTH
+# authentication.
+#
+# Refs:
+# - http://czmq.zeromq.org/manual:zauth
 module Assemblage::Auth
 	extend Configurability,
 	       Loggability,

          
@@ 16,15 21,18 @@ module Assemblage::Auth
 
 	# The name of the metadata field that stores the server/worker/repo name
 	# associated with a remote key
-	CLIENT_NAME_KEY = 'client_name'
+	CONNECTION_NAME_KEY = 'name'
 
-	# Regexp for testing client names for validity
+	# The name of the metadata field that stores the type of connection in the cert
+	CONNECTION_TYPE_KEY = 'type'
+
+	# Regexp for testing connection names for validity
 	CLIENT_NAME_PATTERN = /\A[a-z][\w\-]+\z/i
 
-	# The minimum number of characters for a client name
+	# The minimum number of characters for a connection name
 	NAME_MIN_LENGTH = 3
 
-	# The amximum number of characters for a client name
+	# The amximum number of characters for a connection name
 	NAME_MAX_LENGTH = 35
 
 

          
@@ 42,6 50,24 @@ module Assemblage::Auth
 			dir ? Pathname( dir ) : nil
 		end
 
+		##
+		# :singleton-method:
+		# Configurable: If +true+, allow new clients to register themselves by
+		# connecting with a valid CURVE certificate. If this is set to +false+,
+		# only connections which present one of the certs from #cert_store_dir are
+		# allowed.
+		setting :allow_registration, default: true do |value|
+			value ? true : false
+		end
+
+		##
+		# :singleton-method:
+		# Configurable: an array of allowed IP addresses. If this is not set, any connection
+		# which presents valid CURVE authentication credentials is allowed.
+		setting :allowed_ips, default: [] do |value|
+			Array( value || [] )
+		end
+
 	end
 
 

          
@@ 60,7 86,7 @@ module Assemblage::Auth
 	end
 
 
-	### Return a Pathname for the directory that contains client certs if a
+	### Return a Pathname for the directory that contains connection certs if a
 	### cert_store_dir is configured. Returns nil if it is not.
 	def self::remote_certs_path
 		certdir = self.cert_store_dir or return nil

          
@@ 99,12 125,14 @@ module Assemblage::Auth
 
 	### Generate a new local cert and save it. Raises an exception if there is no
 	### cert_store_dir configured or if there is already a local cert in it.
-	def self::generate_local_cert
+	def self::generate_local_cert( name, type=:worker, **metadata )
+		self.log.debug "Generating a local %s cert for %s" % [ type, name ]
 		cert_file = self.local_cert_path or raise "No local cert dir configured."
-		raise "Server cert already exists at %s" % [ cert_file ] if cert_file.exist?
+		raise "Local cert already exists at %s" % [ cert_file ] if cert_file.exist?
 
 		cert_file.dirname.mkpath
-		cert = self.make_local_cert
+		cert = self.make_local_cert( name, type, **metadata )
+		self.log.debug "Saving cert to: %s." % [ cert_file ]
 		cert.save( cert_file )
 
 		return cert

          
@@ 112,18 140,32 @@ module Assemblage::Auth
 
 
 	### Generate a certificate and return it as a CZTop::Certificate.
-	def self::make_local_cert
+	def self::make_local_cert( name, type, **metadata )
+		self.log.debug "Generating a new local %s cert." % [ type ]
+
+		metadata = metadata.merge(
+			CONNECTION_NAME_KEY.to_sym => name.to_s,
+			CONNECTION_TYPE_KEY.to_sym => type.to_s
+		).transform_keys( &:to_s )
+
 		cert = CZTop::Certificate.new
-		cert[ 'name' ] = 'Assembly Local Cert'
+
+		metadata.each do |key, val|
+			self.log.debug "  setting cert %s to: %p" % [ key, val.to_s ]
+			cert[ key.to_s ] = val.to_s
+		end
+
 		return cert
 	end
 
 
 	### Return the CZTop::Certificate (public key only) for the specified
-	### +client_name+ if it exists. Returns +nil+ if it doesn't.
-	def self::remote_cert( client_name )
+	### +connection_name+ if it exists. Returns +nil+ if it doesn't.
+	def self::remote_cert( connection_name )
 		dir = self.remote_certs_path or return nil
-		certpath = dir + client_name
+		certpath = dir + connection_name
+
+		self.log.debug "Trying to load %s" % [ certpath ]
 		return nil unless certpath.exist?
 		return CZTop::Certificate.load( certpath )
 	end

          
@@ 136,30 178,32 @@ module Assemblage::Auth
 	end
 
 
-	### Returns +true+ if a remote cert has been saved for the specified +client_name+.
-	def self::has_remote_cert?( client_name )
-		raise ArgumentError, "invalid client_name %p" % [ client_name ] unless
-			self.valid_client_name?( client_name )
+	### Returns +true+ if a remote cert has been saved for the specified +connection_name+.
+	def self::has_remote_cert?( connection_name )
+		raise ArgumentError, "invalid connection_name %p" % [ connection_name ] unless
+			self.valid_connection_name?( connection_name )
 
-		certfile = self.remote_certs_path + client_name
+		certfile = self.remote_certs_path + connection_name
 
 		return certfile.exist?
 	end
 
 
-	### Make a remote cert for the given +client_name+, save it in the certs dir if one
+	### Make a remote cert for the given +connection_name+, save it in the certs dir if one
 	### is configured, and return it as a CZTop::Certificate.
-	def self::save_remote_cert( client_name, public_key )
-		raise ArgumentError, "invalid client name %p" % [ client_name ] unless
-			self.valid_client_name?( client_name )
+	def self::save_remote_cert( connection_name, public_key, type: :worker, **metadata )
+		raise ArgumentError, "invalid connection name %p" % [ connection_name ] unless
+			self.valid_connection_name?( connection_name )
 
-		cert = CZTop::Certificate.new_from( public_key )
-		cert[ CLIENT_NAME_KEY ] = client_name
-		self.log.debug "Cert %p set to: %p" % [ CLIENT_NAME_KEY, cert[CLIENT_NAME_KEY] ]
+		metadata = metadata.merge(
+			CONNECTION_NAME_KEY.to_sym => connection_name.to_s,
+			CONNECTION_TYPE_KEY.to_sym => type.to_s
+		)
+		cert = self.cert_with_metadata( public_key, nil, **metadata )
 
 		if self.cert_store_dir
 			self.remote_certs_path.mkpath
-			cert.save( self.remote_certs_path + client_name )
+			cert.save( self.remote_certs_path + connection_name )
 		else
 			self.log.warn "Not saving remote cert to disk: no cert store!"
 		end

          
@@ 168,31 212,53 @@ module Assemblage::Auth
 	end
 
 
-	### Remove an existing remote cert for the given +client_name+ if it exists.
-	def self::remove_remote_cert( client_name )
-		raise ArgumentError, "invalid client_name %p" % [ client_name ] unless
-			self.valid_client_name?( client_name )
+	### Create a new cert given the specified +public_key+ and an optional
+	### +secret_key+ and custom +metadata+.
+	def self::cert_with_metadata( public_key, secret_key=nil, **metadata )
+		cert = CZTop::Certificate.new_from( public_key, secret_key )
+
+		metadata.transform_keys( &:to_s ).each do |key, val|
+			self.log.debug "  setting cert %s to: %p" % [ key, val.to_s ]
+			cert[ key.to_s ] = val.to_s
+		end
+
+		return cert
+	end
+
+
+	### Remove an existing remote cert for the given +connection_name+ if it exists.
+	def self::remove_remote_cert( connection_name )
+		raise ArgumentError, "invalid connection_name %p" % [ connection_name ] unless
+			self.valid_connection_name?( connection_name )
 
 		if self.remote_certs_path
-			public_cert = self.remote_certs_path + client_name
+			public_cert = self.remote_certs_path + connection_name
 			public_cert.unlink
 		end
 	end
 
 
-	### Returns +true+ if the specified +name+ is valid for a client's name.
-	def self::valid_client_name?( name )
+	### Returns +true+ if the specified +name+ is valid for a connection's name.
+	def self::valid_connection_name?( name )
 		return CLIENT_NAME_PATTERN.match?( name ) &&
 			( NAME_MIN_LENGTH .. NAME_MAX_LENGTH ).cover?( name.length )
 	end
 
 
-	### Given a remote +cert+ (a CZTop::Certificate), return the associated client
+	### Given a remote +cert+ (a CZTop::Certificate), return the associated connection
 	### name.
-	def self::client_name_for( cert )
-		return cert[ CLIENT_NAME_KEY ]
+	def self::connection_name_for( cert )
+		return cert[ CONNECTION_NAME_KEY ]
 	end
-	singleton_method_alias :client_name_for_cert, :client_name_for
+	singleton_method_alias :connection_name_for_cert, :connection_name_for
+
+
+	### Given a remote +cert+ (a CZTop::Certificate), return the associated connection
+	### type.
+	def self::connection_type_for( cert )
+		return cert[ CONNECTION_TYPE_KEY ]
+	end
+	singleton_method_alias :connection_type_for_cert, :connection_type_for
 
 
 	### Return a configured CZTop::CertStore pointing to the configured #data_dir

          
@@ 201,15 267,17 @@ module Assemblage::Auth
 			self.log.info "Creating CURVE authenticator."
 			auth = CZTop::Authenticator.new
 
-			if certs_dir = self.remote_certs_path
+			if certs_dir = self.remote_certs_path && ! self.allow_registration
 				self.log.info "Using remote certs dir %s for auth." % [ certs_dir ]
 				certs_dir.mkpath
 				auth.curve( certs_dir.to_s )
 			else
 				self.log.warn "Using ALLOW_ANY remote curve auth."
-				auth.curve( CZTop::Authenticator::ALLOW_ANY )
+				auth.curve
 			end
 
+			auth.allow( *self.allowed_ips )
+
 			auth
 		end
 	end

          
@@ 225,6 293,7 @@ module Assemblage::Auth
 	### Reset memoized objects in the class (mostly for testing).
 	def self::reset
 		if @authenticator
+			self.log.info "Terminating authenticator agent"
 			@authenticator.actor.terminate
 			@authenticator = nil
 		end

          
M lib/assemblage/cli.rb +5 -4
@@ 97,15 97,16 @@ module Assemblage::CLI
 
 	# Write the error to the log on exceptions.
 	on_error do |exception|
+
 		case exception
 		when OptionParser::ParseError, GLI::CustomExit
-			self.log.debug( exception )
+			msg = exception.full_message(highlight: false, order: :bottom)
+			self.log.debug( msg )
 		else
-			self.log.error( exception )
+			msg = exception.full_message(highlight: true, order: :bottom)
+			self.log.error( msg )
 		end
 
-		exception.backtrace.each {|frame| self.log.debug(frame) }
-
 		true
 	end
 

          
M lib/assemblage/client.rb +382 -35
@@ 1,25 1,145 @@ 
 # -*- ruby -*-
 # frozen_string_literal: true
 
+require 'msgpack'
 require 'cztop'
+require 'cztop/monkeypatches'
+require 'cztop/reactor'
+require 'cztop/reactor/signal_handling'
+require 'cztop/reactor/socket_monitoring'
+require 'state_machines'
 
 require 'assemblage' unless defined?( Assemblage )
+require 'assemblage/mixins'
 require 'assemblage/protocol'
 
 
+# A reference player client
 class Assemblage::Client
-	extend Loggability
+	extend Loggability,
+	       Configurability,
+	       Assemblage::MethodUtilities
+
+	include CZTop::Reactor::SignalHandling
+
 
-	# Loggability API -- set up a logger for clients
-	log_to :assemblage
+	# The list of signals to handle
+	# The signals the gameworld daemon responds to
+	HANDLED_SIGNALS = [
+		:INT, :TERM
+		# :TODO: :QUIT, :WINCH, :USR2, :TTIN, :TTOU, :USR1,
+	] & Signal.list.keys.map( &:to_sym )
+
+
+	# Loggability API -- log to Assemblage's logger
+	log_as :assemblage_client
+
 
 
-	### Overridden to add a few non-column instance variables.
-	def initialize( * )
-		super
+	state_machine( :state, initial: :unstarted ) do
+		state :unstarted,
+			:connecting,
+			:connected,
+			:authenticated,
+			:disconnected,
+			:closed,
+			:reconnecting,
+			:connect_delayed,
+			:failed
+
+		# ZMQ Monitor Events:
+		# CONNECTED
+		# CONNECT_DELAYED
+		# CONNECT_RETRIED
+		# LISTENING
+		# BIND_FAILED
+		# ACCEPTED
+		# ACCEPT_FAILED
+		# CLOSED
+		# CLOSE_FAILED
+		# DISCONNECTED
+		# MONITOR_STOPPED
+		# HANDSHAKE_SUCCEEDED
+		# HANDSHAKE_FAILED_NO_DETAIL
+		# HANDSHAKE_FAILED_PROTOCOL
+		# HANDSHAKE_FAILED_AUTH
+
+		event :on_connected do |*|
+			transition any => :connected
+		end
+
+		event :on_handshake_succeeded do |*|
+			transition :connected => :authenticated
+		end
+
+		event :on_handshake_failed_no_detail do |*|
+			transition :connected => :failed
+		end
+
+		event :on_handshake_failed_protocol do |*|
+			transition :connected => :failed
+		end
+
+		event :on_handshake_failed_auth do |*|
+			transition :connected => :failed
+		end
+
+		event :on_disconnected do |*|
+			transition any => :disconnected
+		end
 
-		@routing_id = nil
-		@server = nil
+		event :on_closed do |*|
+			transition any => :closed
+		end
+
+		event :on_close_failed do |*|
+			transition any => :failed
+		end
+
+		event :on_connect_retried do |*|
+			transition any => :reconnecting
+		end
+
+		event :on_connect_delayed do |*|
+			transition any => :connect_delayed
+		end
+
+
+		after_transition :connected => :authenticated, do: :start_processing_messages
+		after_transition :authenticated => :disconnected, do: :stop_processing_messages
+		after_transition any => [:connected, :authenticated, :disconnected, :failed],
+			do: :log_transition
+		after_transition any => :failed, do: :stop
+
+		after_failure do: :log_transition_failure
+
+	end
+
+	# Include this after declaring the state machine so its monitor methods overlay
+	# the state machine's.
+	include CZTop::Reactor::SocketMonitoring
+
+
+	### Create a client that will communicate with the server at the specified
+	### +endpoint+. It will used the given +client_cert+ and +server_cert+ for ZAUTH
+	### authentication, and if +msg_handler+ is provided, it will be used as the
+	### default message handler.
+	def initialize( endpoint, client_cert, server_cert, &msg_handler )
+		super() if defined?( super )
+
+		@endpoint    = endpoint
+		@client_cert = client_cert
+		@server_cert = server_cert
+
+		@reactor     = CZTop::Reactor.new
+		@socket      = CZTop::Socket::CLIENT.new
+		@socket.options.linger = 0
+
+		@thread      = nil
+		@input_queue = []
+		@running     = false
+
+		@message_handlers = Hash.new( msg_handler )
 	end
 
 

          
@@ 28,47 148,274 @@ class Assemblage::Client
 	######
 
 	##
-	# The routing ID associated with the client's socket.
-	attr_reader :routing_id
+	# The CZTop::Reactor that manages async IO
+	attr_reader :reactor
+
+	##
+	# The CLIENT socket used to communicate with the server
+	attr_accessor :socket
+
+	##
+	# The Hash of message callbacks keyed by message name as a Symbol
+	attr_reader :message_handlers
+
+	##
+	# The client's IO thread.
+	attr_reader :thread
+
+	##
+	# The queue of messages waiting to be sent to the server
+	attr_reader :input_queue
+
+	##
+	# The ZMQ endpoint to connect to
+	attr_reader :endpoint
+
+	##
+	# The CURVE cert (public + secret key) to use for authentication when connecting
+	attr_reader :client_cert
 
 	##
-	# The server the client is connected to.
-	attr_reader :server
+	# The CURVE cert (public key only) to use for server verification when connecting
+	attr_reader :server_cert
+
+	##
+	# Whether or not the client is running.
+	attr_predicate :running
+
+
+	### Start the client
+	def start
+		self.log.info "Starting up..."
+		self.register_default_handlers
+
+		self.with_signal_handler( self.reactor, *HANDLED_SIGNALS ) do
+			self.with_socket_monitor( self.reactor, self.socket ) do
+				self.connect
+				@running = true
+				self.reactor.start_polling( ignore_interrupts: true )
+			end
+		end
+	ensure
+		self.log.info "Shutting down..."
+		@running = false
+		self.log.debug "  disconnecting..."
+		self.disconnect
+	end
+
+
+	### Stop the client
+	def stop
+		self.log.info "Client stopping..."
+		self.reactor.stop_polling
+	end
 
 
-	### Connection callback -- called when the +server+ handles the first message.
-	def on_connected( server, routing_id )
-		self.log.info "%s [%s]: Connected with routing ID %p" %
-			[ self.type, self.name, routing_id ]
+	### Register a +handler+ Proc for messages from the server with the specified
+	### +verb+.
+	def on_message( verb, &handler )
+		self.message_handlers[ verb ] = handler
+	end
+
+
+	### Send a message to the server.
+	def send_message( verb, *data, **header )
+		self.input_queue << Assemblage::Protocol.encode( verb.to_sym, data, **header )
+		self.register_for_writing if self.reactor.registered?( self.socket )
+	end
+
+
+	### Return a human-readable representation of the Client, suitable for debugging.
+	def inspect
+		return "#<%p:%#0x %s (%s)>" % [
+			self.class,
+			self.object_id * 2,
+			self.state,
+			self.thread.status,
+		]
+	end
+
+
+	#########
+	protected
+	#########
+
+	### Establish the connection to the server. Raises if the connection fails.
+	def connect
+		sock = self.socket
+
+		sock.CURVE_client!( self.client_cert, self.server_cert )
 
-		@server = server
-		@routing_id = routing_id
+		sock.options.sndtimeo          = 2000
+		sock.options.heartbeat_ivl     = 100
+		sock.options.heartbeat_timeout = 300
+		sock.options.heartbeat_ttl     = 500
+
+		self.reactor.register( sock, :read, :write, &self.method(:on_socket_event) )
+		sock.connect( self.endpoint.to_s )
+	end
+
+
+	### Disconnect from the server.
+	def disconnect
+		# :TODO: Don't be rude to the server!
+		self.log.debug "  disconnecting from %p..." % [ self.endpoint ]
+		self.socket.options.linger = 0
+		self.socket.options.reconnect_ivl = 0
+		self.socket.disconnect( self.endpoint.to_s )
+	rescue => err
+		self.log.debug "%p while disconnecting: %s" % [ err.class, err.message ]
+	end
+
+
+	### Register handlers for the basic messages if there aren't already handlers
+	### for them.
+	def register_default_handlers
+		self.message_handlers[ :disconnect ] ||= self.method( :handle_disconnect_message )
+		self.message_handlers[ :error ] ||= self.method( :handle_error_message )
+		self.message_handlers[ :info ] ||= self.method( :handle_info_message )
+	end
+
+
+	### Callback for when the client becomes authenticated.
+	def start_processing_messages
+		self.log.info "Authenticated. Starting message processing."
 	end
 
 
-	### Connection callback -- called just before the server instructs the client to
-	### disconnect.
-	def on_disconnected
-		self.log.info "%s [%s]: Disconnecting."
-		@routing_id = nil
-		@server = nil
+	### Callback for when the client gets disconnected.
+	def stop_processing_messages
+		self.log.warn "Disconnected. Stopping message processing."
+		self.disconnect
+	end
+
+
+	### Start watching the socket for writability if it's not already.
+	def register_for_writing
+		self.reactor.enable_events( self.socket, :write ) unless
+			self.reactor.event_enabled?( self.socket, :write )
+	end
+
+
+	### Stop watching the socket for writability if it was registered for writes.
+	def unregister_for_writing
+		self.reactor.disable_events( self.socket, :write ) if
+			self.reactor.event_enabled?( self.socket, :write )
+	end
+
+
+	### Log every status transition
+	def log_transition( transition )
+		self.log.debug "%s → %s" % [ transition.from, transition.to ]
+	end
+
+
+	### Log every status transition failure
+	def log_transition_failure( transition )
+		self.log.debug "%s: Failed to transition from %s" %
+			[ transition.event, transition.from ]
+	end
+
+
+	### Handle a message from the server of the given +type+ with a payload of
+	### +data+.
+	def handle_message( type, data, header )
+		if (( handler = self.message_handlers[ type.to_sym ] ))
+			self.log.debug "Calling handler for %p: %p" % [ type, handler ]
+			handler.call( type, data, **header )
+		else
+			self.log.warn "Ignoring unhandled %p server message: %p %p" % [ type, data, header ]
+		end
+	end
+
+
+	### Handle a disconnect control message from the server by dropping the connection.
+	def handle_disconnect_message( * )
+		self.log.warn "Got disconnect from server. Stopping the client."
+		self.stop
+	end
+
+
+	### Handle an error control message from the server.
+	def handle_error_message( data, header )
+		data = data.join( "\n" ) if data.respond_to?( :join )
+		self.log.error "Got an error response from server: %s" % [ data ]
+	end
+
+
+	### Handle an informational message from the server.
+	def handle_info_message( data, header )
+		data = data.join( "\n" ) if data.respond_to?( :join )
+		self.log.info "Got an info message from the server: %s" % [ data ]
 	end
 
 
-	### Return a CZTop::Message to send to the client with the specified +type+ and
-	### +data+.
-	def make_response( type, data, **headers )
-		return unless self.routing_id
-		msg = Assemblage::Protocol.encode( type, data, headers )
-		msg.routing_id = self.routing_id
-		return msg
+	### Handle a readable/writable event on the CLIENT socket.
+	def on_socket_event( event )
+		if event.readable?
+			message = event.socket.receive
+			type, data, header = Assemblage::Protocol.decode( message )
+
+			self.handle_message( type, data, header )
+
+		elsif event.writable?
+			if (( data = self.input_queue.shift ))
+				event.socket << data
+			end
+			self.unregister_for_writing if self.input_queue.empty?
+
+		else
+			self.log.warn "Got event which is neither readable nor writable (%p)." % [ event ]
+		end
+	end
+
+
+	### Add handling for HANDSHAKE_FAILED_AUTH monitor events
+	def on_handshake_failed_auth( * )
+		self.log.error "Connection failed: authentication failure."
+		super
+	end
+
+
+	### Add handling for HANDSHAKE_FAILED_PROTOCOL monitor events
+	def on_handshake_failed_protocol( * )
+		self.log.error "Connection failed: protocol error."
+		super
 	end
 
 
-	### Return a CZTop::Message to send to the client that indicates an +error+ occurred.
-	def make_error_response( error )
-		return self.make_response( :error, error.message, success: false )
+	### Add handling for HANDSHAKE_FAILED_NO_DETAIL monitor events
+	def on_handshake_failed_no_detail( * )
+		self.log.error "Connection failed."
+		super
 	end
 
+
+	#
+	# :section: Signal Handling
+	# These methods set up some behavior for starting, restarting, and stopping
+	# the client when a signal is received.
+	#
+
+	### Handle signals.
+	def handle_signal( sig )
+		self.log.debug "Handling signal %s" % [ sig ]
+		case sig
+		when :INT, :TERM
+			self.on_termination_signal( sig )
+
+		else
+			self.log.warn "Unhandled signal %s" % [ sig ]
+		end
+
+	end
+
+
+	### Shut the client down on SIGTERM or SIGINT.
+	def on_termination_signal( signo )
+		self.log.warn "Terminated (%p)" % [ signo ]
+		self.stop
+	end
+
+
 end # class Assemblage::Client
-

          
A => lib/assemblage/command/client.rb +46 -0
@@ 0,0 1,46 @@ 
+# -*- ruby -*-
+# frozen_string_literal: true
+
+require 'assemblage/cli' unless defined?( Assemblage::CLI )
+require 'assemblage/client'
+
+
+# Assemblage Client shell
+module Assemblage::CLI::ClientCommand
+	extend Assemblage::CLI::Subcommand
+
+	desc "Start a client shell connected "
+
+	arg :SERVER_URL
+	arg :NAME
+	arg :DIRECTORY, :optional
+
+	command :client do |client|
+
+		client.action do |globals, options, args|
+			url = args.shift or help_now!( "No server URL specified.", 64 )
+			name = args.shift or help_now!( "No name specified.", 64 )
+			directory = Pathname( args.shift || '.' ).expand_path
+
+			self.log.debug "Creating a client..."
+			cert = Assemblage::Auth.local_cert
+			server_cert ||= Assemblage::Auth.remote_cert( 'server' )
+
+			client = Assemblage::Client.new( url.to_s, cert, server_cert ) do |type, data, header|
+				prompt.say ""
+			end
+			client.on_message( :worker_registered ) do |data, **headers|
+				self.log.debug "Client successfully registered."
+				client.stop
+			end
+			client.on_message( :error ) do |data, **header|
+				client.handle_error_message( data, **header )
+				client.stop
+			end
+			client.send_message( :register_worker, name, cert.public_key )
+			client.start
+		end
+	end
+
+end # module Assemblage::CLI::ClientCommand
+

          
M lib/assemblage/command/publisher.rb +99 -35
@@ 10,16 10,15 @@ module Assemblage::CLI::PublisherCommand
 	extend Assemblage::CLI::Subcommand
 
 
-	SETUP_ADVICE = %{
-		Now you can tell a server to accept repository events published from this host
-		like so:
-		  assemblage server add-publisher %{name} '%{public_key}' <server_directory>
+	CREATE_ADVICE = %{
+		Now you can approve this publisher for the server like so:
+		  assemblage server approve-publisher %{name} <server directory>
 
-		You can use this publisher to publish events from a Mercurial repository
-		by adding a hook:
+		Once it is approved, you can use this publisher to publish events
+		from a Mercurial repository by adding a hook:
 
 		[hooks]
-		incoming.myrepo = assemblage -c %{directory}/config.yml publish commit %{name} $HG_NODE
+		incoming.myrepo = assemblage %{directory}/config.yml publish commit %{name} $HG_NODE
 
 	}.gsub( /^\t+/, '' )
 

          
@@ 27,31 26,53 @@ module Assemblage::CLI::PublisherCommand
 	desc "Publisher commands"
 	command :publisher do |publisher|
 
-		publisher.desc 'Set up a new repo event publisher run directory'
-		publisher.long_desc <<-END_DESC
-		Set up a new run directory for hooks that run in one or more repositories.
-		This is how the commit (or other) hook authenticates to the server when
-		it runs.
+		publisher.desc 'Set up a new assembly publisher'
+		publisher.long_desc <<~END_DESC
+		Set up a new assembly publisher in the given DIRECTORY for the server at
+		SERVER_URL. If the DIRECTORY is not specified, the current directory will
+		be used. If the target directory is not empty, this command will abort.
 		END_DESC
-		publisher.arg :NAME
 		publisher.arg :DIRECTORY
-		publisher.command :setup do |setup|
+		publisher.arg :SERVER_URL
+		publisher.arg :SERVER_KEY
+		publisher.command :create do |create|
+
+			create.desc "Specify a name that will identify the publisher on any " +
+				"servers it registers with"
+			create.flag [:N, :name], type: String,
+				must_match: Assemblage::Auth::CLIENT_NAME_PATTERN
+
+			create.action do |globals, options, args|
+				directory, server_url, server_key = *args
 
-			setup.action do |globals, options, args|
-				name = args.shift or help_now!( "No name specified.", 64 )
-				directory = Pathname( args.shift || '.' ).expand_path
-				name = options.name
+				# Infer a default publisher dir of '.' if the first arg is a URL
+				if directory =~ %r<\A\p{Alpha}+://>
+					server_key = server_url
+					server_url = directory
+					directory = '.'
+				end
+
+				help_now!( "No server URL specified!" ) unless server_url
+				help_now!( "No server key specified!" ) unless server_key
+
+				directory = Pathname( directory ).expand_path
+				server_url = URI( server_url )
+
+				name = options.name || prompt_for_name( directory )
 
 				prompt.say "Creating a publisher run directory in %s..." % [ directory ]
-				Assemblage::Publisher.setup_run_directory( directory )
+				Assemblage::Publisher.setup_run_directory( directory, name )
 
-				prompt.say "Generating a publisher key..."
-				Assemblage::Publisher.generate_cert
+				prompt.say "Generating a cert..."
+				Assemblage::Publisher.generate_cert( name ) or
+					raise "Failed to generate a new publisher cert!"
 
-				msg = SETUP_ADVICE % {
-					public_key: Assemblage::Publisher.public_key,
-					directory: directory,
-					name: name
+				prompt.say "Registering with %s..." % [ server_url ]
+				Assemblage::Publisher.register_with_server( name, server_url, server_key )
+
+				msg = CREATE_ADVICE % {
+					name: name,
+					directory: directory
 				}
 				prompt.say( msg )
 			end

          
@@ 68,36 89,79 @@ module Assemblage::CLI::PublisherCommand
 				directory = Pathname( args.shift || '.' ).expand_path
 
 				Assemblage.use_run_directory( directory )
-				prompt.say "Worker's public key:"
+				prompt.say "Publisher's public key:"
 				puts Assemblage::Publisher.public_key
 			end
 		end
 
 
-		publisher.desc "Add a server key to a publisher config"
-		publisher.long_desc <<-END_DESC
-		Add the specified PUBLIC_KEY for a server named NAME to the server config
-		in SERVER_DIRECTORY.
+		publisher.desc "Add a new server to a publisher"
+		publisher.long_desc <<~END_DESC
+		Add a cert and configuration for a new server to a publisher run directory.
 		END_DESC
-		publisher.arg :ENDPOINT
+		publisher.arg :URL
 		publisher.arg :PUBLIC_KEY
-		publisher.arg :DIRECTORY, :optional
+		publisher.arg :PUBLISHER_DIRECTORY, :optional
 		publisher.command 'add-server' do |add|
 
 			add.action do |globals, options, args|
-				endpoint = args.shift or help_now!( "Missing the endpoint." )
+				url = args.shift or help_now!( "Missing the server url." )
 				public_key = args.shift or help_now!( "Missing the server public key." )
 
 				Assemblage.use_run_directory( args.shift )
 
-				prompt.say "Adding server key for connecting to %s..." % [ endpoint ]
-				Assemblage::Publisher.add_server( endpoint, public_key )
+				prompt.say "Registering with %s..." % [ server_url ]
+				Assemblage::Publisher.register_with_server( name, server_url, server_key )
+
 				prompt.say "done."
 			end
+		end
 
+
+		publisher.desc "Publish an event from a repo."
+		publisher.long_desc <<~END_DESC
+		Publish the specified EVENT 
+		END_DESC
+		publisher.arg :URL
+		publisher.arg :PUBLIC_KEY
+		publisher.arg :PUBLISHER_DIRECTORY, :optional
+		publisher.command 'add-server' do |add|
+
+			add.action do |globals, options, args|
+				url = args.shift or help_now!( "Missing the server url." )
+				public_key = args.shift or help_now!( "Missing the server public key." )
+
+				Assemblage.use_run_directory( args.shift )
+
+				prompt.say "Registering with %s..." % [ server_url ]
+				Assemblage::Publisher.register_with_server( name, server_url, server_key )
+
+				prompt.say "done."
+			end
 		end
 
 	end
 
+
+	###############
+	module_function
+	###############
+
+	### Ask the user for the name of the publisher, defaulting to one derived from
+	### the hostname and the specified +directory+.
+	def prompt_for_name( directory )
+		default = "%s-%s" % [
+			Socket.gethostname.downcase.gsub('.', '-'),
+			directory.basename
+		]
+
+		return prompt.ask( "Publisher name: " ) do |config|
+			config.required
+			config.default( default )
+			config.modify( :remove, :down )
+			config.validate( Assemblage::Publisher::VALID_NAME )
+		end
+	end
+
 end # module Assemblage::CLI::PublisherCommand
 

          
M lib/assemblage/command/server.rb +19 -19
@@ 111,46 111,46 @@ module Assemblage::CLI::ServerCommand
 		end
 
 
-		server.desc "Add a new worker to a server"
+		server.desc "Approve a worker"
 		server.long_desc <<-END_DESC
-		Add a cert and configuration for a new worker to a server run directory.
+		Approve connections from the worker associated with the given NAME.
 		END_DESC
 		server.arg :NAME
-		server.arg :PUBLIC_KEY
-		server.arg :SERVER_DIRECTORY, :optional
-		server.command 'add-worker' do |add|
+		server.arg :DIRECTORY, :optional
+		server.command 'approve-worker' do |add|
 
 			add.action do |globals, options, args|
 				name = args.shift or help_now!( "Missing the worker name." )
-				public_key = args.shift or help_now!( "Missing the worker public key." )
 
 				Assemblage.use_run_directory( args.shift )
 
-				prompt.say "Approving connections from %s..." % [ name ]
-				Assemblage::Server.add_worker( name, public_key )
-				prompt.say "done."
+				if Assemblage::Server.connections_approved?( name )
+					exit_now! "Connections from %s are already approved." % [ name ]
+				else
+					prompt.say "Approving connections from %s..." % [ name ]
+					Assemblage::Server.approve_connections( name, :worker )
+					prompt.say "done."
+				end
 			end
 
 		end
 
 
-		server.desc "Add a new publisher to a server"
+		server.desc "Approve a publisher"
 		server.long_desc <<-END_DESC
-		Add a cert and configuration for a new publisher to a server run directory.
+		Approve connections from the publisher associated with the given NAME.
 		END_DESC
 		server.arg :NAME
-		server.arg :PUBLIC_KEY
-		server.arg :SERVER_DIRECTORY, :optional
-		server.command 'add-publisher' do |add|
+		server.arg :DIRECTORY, :optional
+		server.command 'approve-publisher' do |add|
 
 			add.action do |globals, options, args|
 				name = args.shift or help_now!( "Missing the publisher name." )
-				public_key = args.shift or help_now!( "Missing the publisher public key." )
 
 				Assemblage.use_run_directory( args.shift )
 
 				prompt.say "Approving connections from %s..." % [ name ]
-				Assemblage::Server.add_publisher( name, public_key )
+				Assemblage::Server.approve_connections( name, :publisher )
 				prompt.say "done."
 			end
 

          
@@ 176,17 176,17 @@ module Assemblage::CLI::ServerCommand
 				name = args.shift or help_now!( "Missing the repo name." )
 				type = args.shift or help_now!( "Missing the repo type." )
 				url = args.shift or help_now!( "Missing the repo URL." )
-				directory = args.shift || '.'
+
+				Assemblage.use_run_directory( args.shift )
 
 				unless Assemblage::VCSStrategy.available.include?( type )
 					help_now! "Invalid repo type %p; supported types are: %s" %
 						[ type, Assemblage::VCSStrategy.available.join(', ') ]
 				end
 
-				Assemblage.use_run_directory( args.shift )
 
 				prompt.say "Approving connections from %s..." % [ name ]
-				Assemblage::Server.add_repo( name, public_key )
+				Assemblage::Server.add_repo( name, type, url )
 				prompt.say "done."
 			end
 

          
M lib/assemblage/command/worker.rb +101 -35
@@ 4,16 4,23 @@ 
 require 'socket'
 require 'assemblage/cli' unless defined?( Assemblage::CLI )
 require 'assemblage/db_object'
+require 'assemblage/client'
+require 'assemblage/worker'
+
+
+# assemblage create worker worker1/ tcp://127.0.0.1:28118 "e*Mq%oyv0gcBt%dL(R?pUhJ+KqWCCdRvZPssjtH@"
+# assemblage create worker . tcp://127.0.0.1:28118 "e*Mq%oyv0gcBt%dL(R?pUhJ+KqWCCdRvZPssjtH@"
+# assemblage create worker tcp://127.0.0.1:28118 "e*Mq%oyv0gcBt%dL(R?pUhJ+KqWCCdRvZPssjtH@"
 
 # Aseemblage worker commands
 module Assemblage::CLI::WorkerCommand
 	extend Assemblage::CLI::Subcommand
 
 	CREATE_ADVICE = %{
-		Now you can register this worker with a server like so:
-		  assemblage worker add %{name} '%{public_key}' <server directory>
+		Now you can approve this worker for the server like so:
+		  assemblage server approve-worker %{name} <server directory>
 
-		Once it is registered, you can start the assembly worker like so:
+		Once it is approved, you can start the assembly worker like so:
 		  assemblage worker start %{directory}
 
 	}.gsub( /^\t+/, '' )

          
@@ 24,12 31,14 @@ module Assemblage::CLI::WorkerCommand
 	command :worker do |worker|
 
 		worker.desc 'Set up a new assembly worker'
-		worker.long_desc <<-END_DESC
-		Set up a new assembly worker in the given DIRECTORY. If the DIRECTORY is
-		not specified, the current directory will be used. If the target directory
-		is not empty, this command will abort.
+		worker.long_desc <<~END_DESC
+		Set up a new assembly worker in the given DIRECTORY for the server at SERVER_URL. If the
+		DIRECTORY is not specified, the current directory will be used. If the target
+		directory is not empty, this command will abort.
 		END_DESC
 		worker.arg :DIRECTORY
+		worker.arg :SERVER_URL
+		worker.arg :SERVER_KEY
 		worker.command :create do |create|
 
 			create.desc "Specify a name that will identify the worker on any " +

          
@@ 42,21 51,36 @@ module Assemblage::CLI::WorkerCommand
 			create.flag [:t, :tags], type: Array
 
 			create.action do |globals, options, args|
-				directory = Pathname( args.shift || '.' ).expand_path
+				directory, server_url, server_key = *args
+
+				# Infer a default worker dir of '.' if the first arg is a URL
+				if directory =~ %r<\A\p{Alpha}+://>
+					server_key = server_url
+					server_url = directory
+					directory = '.'
+				end
 
-				name = options.name || "%s-%s" %
-					[ Socket.gethostname.downcase.gsub('.', '-'), directory.basename ]
-				tags = options.tags
+				help_now!( "No server URL specified!" ) unless server_url
+				help_now!( "No server key specified!" ) unless server_key
+
+				directory = Pathname( directory ).expand_path
+				server_url = URI( server_url )
+
+				name = options.name || prompt_for_name( directory )
+				tags = options.tags || prompt_for_tags()
 
 				prompt.say "Creating a worker run directory in %s..." % [ directory ]
 				Assemblage::Worker.setup_run_directory( directory, name, tags )
 
-				prompt.say "Generating a worker key..."
-				Assemblage::Worker.generate_cert
+				prompt.say "Generating a cert..."
+				Assemblage::Worker.generate_cert( name ) or
+					raise "Failed to generate a new worker cert!"
+
+				prompt.say "Registering with %s..." % [ server_url ]
+				Assemblage::Worker.register_with_server( name, server_url, server_key )
 
 				msg = CREATE_ADVICE % {
 					name: name,
-					public_key: Assemblage::Worker.public_key,
 					directory: directory
 				}
 				prompt.say( msg )

          
@@ 65,7 89,7 @@ module Assemblage::CLI::WorkerCommand
 
 
 		worker.desc "Output the worker's public key"
-		worker.long_desc <<-END_DESC
+		worker.long_desc <<~END_DESC
 		Output the worker's public key to STDOUT.
 		END_DESC
 		worker.arg :DIRECTORY, :optional

          
@@ 80,29 104,31 @@ module Assemblage::CLI::WorkerCommand
 		end
 
 
-		worker.desc "Add a new server to a worker"
-		worker.long_desc <<-END_DESC
-		Add a cert and configuration for a new server to a worker run directory.
-		END_DESC
-		worker.arg :URL
-		worker.arg :PUBLIC_KEY
-		worker.arg :WORKER_DIRECTORY, :optional
-		worker.command 'add-server' do |add|
-
-			add.action do |globals, options, args|
-				url = args.shift or help_now!( "Missing the server url." )
-				public_key = args.shift or help_now!( "Missing the server public key." )
-
-				Assemblage.use_run_directory( args.shift )
-				Assemblage::Worker.add_server( url, public_key )
-
-				prompt.say "done."
-			end
-		end
+		# worker.desc "Add a new server to a worker"
+		# worker.long_desc <<~END_DESC
+		# Add a cert and configuration for a new server to a worker run directory.
+		# END_DESC
+		# worker.arg :URL
+		# worker.arg :PUBLIC_KEY
+		# worker.arg :WORKER_DIRECTORY, :optional
+		# worker.command 'add-server' do |add|
+		#
+		# 	add.action do |globals, options, args|
+		# 		url = args.shift or help_now!( "Missing the server url." )
+		# 		public_key = args.shift or help_now!( "Missing the server public key." )
+		#
+		# 		Assemblage.use_run_directory( args.shift )
+		#
+		# 		prompt.say "Registering with %s..." % [ server_url ]
+		# 		Assemblage::Worker.register_with_server( name, server_url, server_key )
+		#
+		# 		prompt.say "done."
+		# 	end
+		# end
 
 
 		worker.desc 'Start an assembly worker'
-		worker.long_desc <<-END_DESC
+		worker.long_desc <<~END_DESC
 		Start an Assemblage worker in the specified DIRECTORY. If not specified, the
 		DIRECTORY will default to the current working directory.
 		END_DESC

          
@@ 116,5 142,45 @@ module Assemblage::CLI::WorkerCommand
 
 	end
 
+
+
+	###############
+	module_function
+	###############
+
+	### Ask the user for the name of the worker, defaulting to one derived from
+	### the hostname and the specified +directory+.
+	def prompt_for_name( directory )
+		default = Assemblage::Worker::DEFAULT_NAME
+
+		return prompt.ask( "Worker name: " ) do |config|
+			config.required
+			config.default( default )
+			config.modify( :remove, :down )
+			config.validate( Assemblage::Worker::VALID_NAME )
+		end
+	end
+
+
+	### Ask the user for one or more tags for a new worker, returning them as an
+	### Array of Strings.
+	def prompt_for_tags
+		prompt.say( "Tags (one per line):" )
+		prompt.say( "Examples: os:freebsd-10, ruby:2.6, cpus:16" )
+
+		options = {
+			validate: Assemblage::Worker::VALID_TAG,
+			modify: %i[remove]
+		}
+
+		tags = []
+		while input = prompt.ask( '>>', options )
+			break if input == ''
+			tags << input
+		end
+
+		return tags
+	end
+
 end # module Assemblage::CLI::AddServer
 

          
A => lib/assemblage/connection.rb +74 -0
@@ 0,0 1,74 @@ 
+# -*- ruby -*-
+# frozen_string_literal: true
+
+require 'cztop'
+
+require 'assemblage' unless defined?( Assemblage )
+require 'assemblage/db_object'
+require 'assemblage/protocol'
+
+
+class Assemblage::Connection < Assemblage::DbObject( Sequel[:connections] )
+
+	# Maintain the timestamp fields automatically
+	plugin :timestamps
+
+
+	##
+	# The routing ID associated with the socket using this connection.
+	attr_accessor :routing_id
+
+
+	### Connection callback -- set up internal state when the connection is used.
+	def on_connected( server, routing_id )
+		self.routing_id = routing_id
+	end
+
+
+	### Callback from the server when a socket associated with this Connection is
+	### dropped.
+	def on_disconnected
+		self.log.debug "Connection for routing ID %p terminated." % [ self.routing_id ]
+	end
+
+
+	### Returns +true+ if the Connection is for an authenticated, approved client.
+	def authenticated?
+		return self.pk && self.name && ! self.modified?
+	end
+
+
+	### Build a response message destined for the receiving connection with the
+	### specified +command+, +data+, and +header+ and return it.
+	def response( command, data=[], **header )
+		routing_id = self.routing_id or
+			raise "Cannot respond to un-connected connection."
+
+		msg = Assemblage::Protocol.encode( command, data, **header )
+		msg.routing_id = routing_id
+
+		return msg
+	end
+
+
+	### Create an error response that will be sent to the receiving connection.
+	def error_response( error )
+		msg = Assemblage::Protocol.error_response( error )
+		msg.routing_id = self.routing_id
+		return msg
+	end
+
+
+	### Return a human-readable representation of the object suitable for debugging.
+	def inspect
+		return "#<%p:%#x %s routing_id: %p%s>" % [
+			self.class,
+			self.object_id * 2,
+			self.name || '(anonymous)',
+			self.routing_id || 'not connected',
+			self.frozen? ? ' *' : '',
+		]
+	end
+
+end # class Assemblage::Connection
+

          
M lib/assemblage/db_object.rb +25 -27
@@ 1,7 1,6 @@ 
 # -*- ruby -*-
-#encoding: utf-8
+# frozen_string_literal: true
 
-require 'tsort'
 require 'sequel'
 
 require 'assemblage' unless defined?( Assemblage )

          
@@ 22,8 21,7 @@ end
 
 
 class Assemblage::DbObject
-	extend TSort,
-	       Loggability,
+	extend Loggability,
 	       Assemblage::MethodUtilities,
 	       Configurability
 

          
@@ 107,17 105,15 @@ class Assemblage::DbObject
 	end
 
 
-	### Configurability interface -- Configure the Sequel connection
-	def self::configure( config=nil )
-		super
-
-		if self.uri
-			Loggability[ Assemblage::DbObject ].info "Connecting to %s" % [ self.uri ]
-			self.db = Sequel.connect( self.uri, self.options )
-			Assemblage::DbObject.setup_database
-			Assemblage::DbObject.require_models
+	### Returns +true+ if the database for the model classes exist.
+	def self::database_is_current?
+		return Loggability.with_level( :fatal ) do
+			Sequel::Migrator.is_current?( self.db, self.migrations_dir.to_s )
 		end
-
+	rescue Sequel::Migrator::Error => err
+		self.log.debug "Got a %p while checking to see if the database was current: %s" %
+			[ err.class, err.message ]
+		return false
 	end
 
 

          
@@ 131,18 127,6 @@ class Assemblage::DbObject
 	end
 
 
-	### Returns +true+ if the database for the model classes exist.
-	def self::database_is_current?
-		return Loggability.with_level( :fatal ) do
-			Sequel::Migrator.is_current?( self.db, self.migrations_dir.to_s )
-		end
-	rescue Sequel::Migrator::Error => err
-		self.log.debug "Got a %p while checking to see if the database was current: %s" %
-			[ err.class, err.message ]
-		return false
-	end
-
-
 	### Tear down the configured metastore database.
 	def self::teardown_database
 		self.log.info "Tearing down database schema..."

          
@@ 167,7 151,21 @@ class Assemblage::DbObject
 	end
 
 
-	Assemblage::DbObject.register_model( 'assemblage/client' )
+	### Configurability interface -- Configure the Sequel connection
+	def self::configure( config=nil )
+		super
+
+		if self.uri
+			Loggability[ Assemblage::DbObject ].info "Connecting to %s" % [ self.uri ]
+			self.db = Sequel.connect( self.uri, self.options )
+			Assemblage::DbObject.setup_database
+			Assemblage::DbObject.require_models
+		end
+
+	end
+
+
+	Assemblage::DbObject.register_model( 'assemblage/connection' )
 
 end # class Assemblage::DbObject
 

          
M lib/assemblage/protocol.rb +30 -18
@@ 3,12 3,14 @@ 
 
 require 'e2mmap'
 require 'msgpack'
+require 'loggability'
 require 'assemblage' unless defined?( Assemblage )
 
 
 # A container for functions that manipulate events in the Assemblage hub protocol
 module Assemblage::Protocol
-	extend Exception2MessageMapper,
+	extend Loggability,
+	       Exception2MessageMapper,
 	       Assemblage::DataUtilities
 
 

          
@@ 24,6 26,9 @@ module Assemblage::Protocol
 	}
 
 
+	# Use the Assemblage logger
+	log_to :assemblage
+
 	# Exceptions raised while decoding
 	def_exception :Error, "protocol error", ArgumentError
 

          
@@ 31,8 36,8 @@ module Assemblage::Protocol
 	### Check the specified message +header+ for sanity, and raise an
 	### Assemblage::Protocol::Error if there is a problem with it.
 	def self::check_message_header( header )
+		self.log.debug "Checking header: %p" % [ header ] if $VERBOSE
 		self.check_message_version( header )
-		self.check_message_type( header )
 	end
 
 

          
@@ 49,27 54,16 @@ module Assemblage::Protocol
 	end
 
 
-	### Check the `type` specified in the given message +header+, raising an
-	### Assemblage::Protocol::Error if there is a problem with it.
-	def self::check_message_type( header )
-		type = header[:type] or
-			raise Assemblage::Protocol::Error, "malformed message header: no type"
-		unless VALID_TYPE_PATTERN.match?( type )
-			raise Assemblage::Protocol::Error, "malformed message type: %p" % [ type ]
-		end
-	end
-
-
 	###############
 	module_function
 	###############
 
 	### Encode a message of the specified +type+ and return it as a CZTop::Message.
-	def encode( type, data, **header )
-		header = DEFAULT_HEADER.merge( symbolify_keys(header) ).merge( type: type )
+	def encode( command, data, **header )
+		header = DEFAULT_HEADER.merge( symbolify_keys(header) )
 		Assemblage::Protocol.check_message_header( header )
 
-		raw_message = [ header, data ]
+		raw_message = [ command, data, header ]
 		encoded = MessagePack.pack( raw_message )
 
 		return CZTop::Message.new( encoded )

          
@@ 82,16 76,23 @@ module Assemblage::Protocol
 		encoded = message[ 0 ]
 
 		raw_message = MessagePack.unpack( encoded )
-		header, data = *raw_message
+		self.log.debug "  raw message: %p" % [ raw_message ] if $VERBOSE
+		type, data, header = *raw_message
 		header = symbolify_keys( header )
 
 		Assemblage::Protocol.check_message_header( header )
 
-		type = header.delete( :type )
 		return type.to_sym, data, header
 	end
 
 
+	### Construct an error response message.
+	def error_response( error )
+		error = error.message if error.respond_to?( :message )
+		return Assemblage::Protocol.encode( :error, error.to_s )
+	end
+
+
 	### Construct a HELLO message from the specified +sender+.
 	def hello_from( sender )
 		type = sender.class.name.sub( /.*::/, '' ).downcase

          
@@ 108,4 109,15 @@ module Assemblage::Protocol
 	end
 
 
+	### Construct a STATUS message from the specified 
+	def status
+		return Assemblage::Protocol.encode( :status )
+	end
+
+
+	### Construct a response to the STATUS command.
+	def status_response( data )
+		return Assemblage::Protocol.encode( :status_response, data )
+	end
+
 end # module Assemblage::Protocol

          
M lib/assemblage/publisher.rb +56 -19
@@ 4,29 4,49 @@ 
 require 'loggability'
 
 require 'assemblage' unless defined?( Assemblage )
+require 'assemblage/client'
 
 
-# A client that pushes events for one or more repositories to the server.
+# A connection that pushes events for one or more repositories to the server.
 class Assemblage::Publisher
 	extend Loggability,
 		Configurability
 
+	# The name given to publishers by default
+	DEFAULT_NAME = "#{Socket.gethostname.gsub('.', '-').downcase}-publisher"
+
+	# A Regexp for matching valid publisher names
+	VALID_NAME = /\A[a-z](\w+)(-\w+)*\z/
+
+
 	# Loggability API -- log to the assemblage logger
 	log_to :assemblage
 
 
+	### Returns +true+ if the specified +name+ is valid for a publisher's name.
+	def self::valid_publisher_name?( name )
+		return Assemblage::Auth.valid_connection_name?( name )
+	end
+
+
+	# Configurability API -- declare some settings for Publishers.
 	configurability( 'assemblage.publisher' ) do
 
 		##
-		# The ZeroMQ endpoint URL of the server to publish to
-		setting :server_endpoint
+		# The name the publisher uses to identify itself
+		setting :name, default: DEFAULT_NAME do |name|
+			raise ArgumentError, "invalid publisher name %p" % [ name ] unless
+				Assemblage::Publisher.valid_publisher_name?( name )
+			name
+		end
 
 	end
 
 
-	### Generate a client CZTop::Certificate for the publisher.
-	def self::generate_cert
-		Assemblage::Auth.generate_local_cert unless Assemblage::Auth.has_local_cert?
+	### Generate a connection CZTop::Certificate for the publisher.
+	def self::generate_cert( name )
+		Assemblage::Auth.generate_local_cert( name, :publisher ) unless
+			Assemblage::Auth.has_local_cert?
 	end
 
 

          
@@ 36,20 56,9 @@ class Assemblage::Publisher
 	end
 
 
-	### Add a server with the specified +endpoint+ and +public_key+ to the current run
-	### directory.
-	def self::add_server( endpoint, public_key )
-		self.log.info "Adding server at %p with public key: %s" % [ endpoint, public_key ]
-		Assemblage::Auth.save_remote_cert( 'server', public_key )
-
-		Assemblage.config.assemblage.publisher.server_endpoint = endpoint
-		Assemblage.config.write
-	end
-
-
 	### Set up the +directory+ as a publisher run directory. Raises an
 	### exception if the directory already exists and is not empty.
-	def self::setup_run_directory( directory='.' )
+	def self::setup_run_directory( directory='.', name=DEFAULT_NAME )
 		directory = Pathname( directory || '.' )
 		raise "Directory not empty" if directory.exist? && !directory.empty?
 

          
@@ 59,11 68,39 @@ class Assemblage::Publisher
 
 		config = Assemblage.config || Configurability.default_config
 		config.assemblage.auth.cert_store_dir ||= (directory + 'certs').to_s
+		config.assemblage.publisher.name = name
 
 		Loggability.with_level( :fatal ) do
 			config.install
 		end
-		config.write( directory + Assemblage::DEFAULT_CONFIG_FILE )
+		config_path = directory + Assemblage::DEFAULT_CONFIG_FILE
+		config.write( config_path )
+		config.path = config_path
+	end
+
+
+	### Register the client as +name+ with the server at the specified +url+, using
+	### the given +public_key+ to authenticate.
+	def self::register_with_server( name, url, server_key )
+		server_key = server_key.public_key if server_key.respond_to?( :public_key )
+
+		# :TODO: Change this when/if publishers support posting to multiple servers.
+		server_cert = Assemblage::Auth.
+			save_remote_cert( 'server', server_key, type: :server, endpoint: url )
+
+		self.log.debug "Talking to the server to register"
+		cert = Assemblage::Auth.local_cert
+		client = Assemblage::Client.new( url.to_s, cert, server_cert )
+		client.on_message( :publisher_registered ) do |data, **headers|
+			self.log.debug "Client successfully registered."
+			client.stop
+		end
+		client.on_message( :error ) do |data, **header|
+			client.handle_error_message( data, **header )
+			client.stop
+		end
+		client.send_message( :register_publisher, name, cert.public_key )
+		client.start
 	end
 
 end # class Assemblage::Publisher

          
M lib/assemblage/repository.rb +0 -1
@@ 6,7 6,6 @@ require 'assemblage/db_object'
 
 
 class Assemblage::Repository < Assemblage::DbObject( :repositories )
-	extend Configurability
 
 
 	# Maintain the timestamp fields automatically

          
M lib/assemblage/server.rb +183 -99
@@ 29,17 29,23 @@ class Assemblage::Server
 		CZTop::Reactor::SocketMonitoring
 
 	# The list of signals the server responds to
-	HANDLED_SIGNALS = %i[TERM HUP INT] & Signal.list.keys.map( &:to_sym )
+	HANDLED_SIGNALS = %i[TERM HUP INT INFO ] & Signal.list.keys.map( &:to_sym )
 
 	# The ZAP authentication domain the server will use
 	ZAP_DOMAIN = 'assemblage'
 
-	# The list of valid commands for clients
-	VALID_COMMANDS = %i[
+	# The list of valid commands for authenticated connections
+	AUTHED_COMMANDS = %i[
 		status
 		status_report
 	]
 
+	# The list of valid commands for unauthenticated connections
+	UNAUTHED_COMMANDS = %i[
+		register_worker
+		register_publisher
+	]
+
 
 	# Log to the Assemblage logger
 	log_to :assemblage

          
@@ 60,7 66,7 @@ class Assemblage::Server
 
 		##
 		# :singleton-method:
-		# The default ZMQ endpoint to listen on for connections from clients and repos
+		# Configurable: The ZMQ endpoint for connections from workers and repos
 		setting :endpoint, default: 'tcp://127.0.0.1:*'
 
 	end

          
@@ 96,7 102,8 @@ class Assemblage::Server
 
 	### Generate a new server cert/keypair for authentication.
 	def self::generate_cert
-		Assemblage::Auth.generate_local_cert unless Assemblage::Auth.has_local_cert?
+		Assemblage::Auth.generate_local_cert( 'server', :server ) unless
+			Assemblage::Auth.has_local_cert?
 	end
 
 

          
@@ 112,19 119,32 @@ class Assemblage::Server
 	end
 
 
-	### Add a worker with the specified +name+ and +public_key+ to the current run
-	### directory.
-	def self::add_worker( name, public_key )
-		self.log.info "Adding worker %p with public key: %s" % [ name, public_key ]
-		Assemblage::Auth.save_remote_cert( name, public_key )
+	### Approve connections from the client with the specified +name+ and
+	### +public_key+ to the current run directory.
+	def self::approve_connections( name, type=:worker )
+		registered_type = self.connection_type( name )
+		raise "Cert for %p is not a %s cert." % [ name, type ] unless
+			registered_type == type.to_s
+
+		self.log.info "Approving %s %p" % [ type, name ]
+		conn = Assemblage::Connection.create( type: type.to_s, name: name )
+		self.log.debug "  created connection %d" % [ conn.pk ]
 	end
 
 
-	### Add a publisher with the specified +name+ and +public_key+ to the current run
-	### directory.
-	def self::add_publisher( name, public_key )
-		self.log.info "Adding publisher %p with public key: %s" % [ name, public_key ]
-		Assemblage::Auth.save_remote_cert( name, public_key )
+	### If connections from the client with the specified +name+ is registered,
+	### return the associated connection type.
+	def self::connection_type( name )
+		cert = Assemblage::Auth.remote_cert( name ) or
+			raise "No cert for client %p" % [ name ]
+		return cert[ Assemblage::Auth::CONNECTION_TYPE_KEY ]
+	end
+
+
+	### Returns +true+ if connections from the client with the specified +name+
+	### have been approved.
+	def self::connections_approved?( name )
+		return self.connection_type( name ) ? true : false
 	end
 
 

          
@@ 141,13 161,15 @@ class Assemblage::Server
 
 	### Create a new Assemblage::Server.
 	def initialize( endpoint: nil )
-		@endpoint     = endpoint || Assemblage::Server.endpoint
+		@endpoint      = endpoint || Assemblage::Server.endpoint
+
+		@reactor       = CZTop::Reactor.new
+		@authenticator = Assemblage::Auth.authenticator
 
-		@reactor      = CZTop::Reactor.new
-		@socket       = nil
-		@clients      = {}
-		@output_queue = []
-		@start_time   = nil
+		@socket        = nil
+		@connections   = {}
+		@output_queue  = []
+		@start_time    = nil
 	end
 
 

          
@@ 164,12 186,17 @@ class Assemblage::Server
 	attr_reader :reactor
 
 	##
+	# The server's ZAUTH authenticator agent
+	# Ref: http://czmq.zeromq.org/manual:zauth
+	attr_reader :authenticator
+
+	##
 	# The CZTop::Socket::SERVER the server uses for communication with repos and workers.
 	attr_reader :socket
 
 	##
-	# The Hash of connected Assemblage::Clients, keyed by their routing ID
-	attr_reader :clients
+	# The Hash of connected Assemblage::Connections, keyed by their routing ID
+	attr_reader :connections
 
 	##
 	# The queue of outgoing messages which are waiting for the socket to become writable.

          
@@ 185,7 212,7 @@ class Assemblage::Server
 		Assemblage::Auth.check_environment
 		self.log.info "Starting assembly server."
 
-		Assemblage::Auth.authenticator.verbose! if $DEBUG
+		self.authenticator.verbose! if $VERBOSE
 
 		@socket = self.create_server_socket
 		self.reactor.register( @socket, :read, &self.method(:on_socket_event) )

          
@@ 193,12 220,14 @@ class Assemblage::Server
 		self.log.debug "Starting event loop."
 		self.with_signal_handler( self.reactor, *HANDLED_SIGNALS ) do
 			self.with_socket_monitor( self.reactor, @socket ) do
-				@start_time = Time.now
+				@start_time = self.localtime
 				self.reactor.start_polling( ignore_interrupts: true )
 			end
 		end
 		@start_time = nil
 		self.log.debug "Exited event loop."
+
+		self.show_thread_status
 	end
 
 

          
@@ 206,6 235,7 @@ class Assemblage::Server
 	def stop
 		self.log.info "Stopping the assembly server."
 		self.reactor.stop_polling
+		Assemblage::Auth.reset
 	end
 
 

          
@@ 228,10 258,20 @@ class Assemblage::Server
 	end
 
 
+	### Return a floating-point Time suitable for measuring durations.
+	def localtime
+		if Process.respond_to?( :clock_gettime ) && Process.const_defined?( :CLOCK_MONOTONIC )
+			return Process.clock_gettime( Process::CLOCK_MONOTONIC )
+		else
+			return Time.now.to_f
+		end
+	end
+
+
 	### Return the number of seconds the server has been running if it is. Returns
 	### +nil+ otherwise.
 	def uptime
-		return (Time.now - self.start_time)
+		return (self.localtime - self.start_time)
 	end
 
 

          
@@ 239,22 279,46 @@ class Assemblage::Server
 	# Command methods
 	#
 
-	### Handle a `status` command for the specified +client+.
-	def handle_status_command( client, * )
+	### Handle a `status` command for the specified +connection+.
+	def handle_status_command( connection, * )
 		status = { version: Assemblage::VERSION, uptime: self.uptime }
-		self.queue_output_message( client.make_response(:status, status) )
+		self.queue_output_message( Assemblage::Protocol.status_response(status) )
+	end
+
+
+	### Handle a connection sending a status report.
+	def handle_status_report_command( connection, report, * )
+		self.log.debug "Connection %s sent a status report: %p" % [ connection, report ]
+	end
+
+
+	### Handle a connection asking for any assemblies pending for it.
+	def handle_fetch_assemblies_command( connection, criteria )
+		self.log.debug "Connection %s fetched assemblies: %p" % [ connection, criteria ]
 	end
 
 
-	### Handle a client sending a status report.
-	def handle_status_report_command( client, report, * )
-		self.log.debug "Client %s sent a status report: %p" % [ report ]
+	### Handle a connection asking to register a new worker.
+	def handle_register_worker_command( connection, data, header )
+		name, public_key = *data
+
+		self.log.info "Registering a new worker %p with key: %p" % [ name, public_key ]
+		Assemblage::Auth.save_remote_cert( name, public_key, type: :worker )
+
+		msg = connection.response( :worker_registered, [name] )
+		self.queue_output_message( msg )
 	end
 
 
-	### Handle a client asking for any assemblies pending for it.
-	def handle_fetch_assemblies_command( client, criteria )
-		
+	### Handle a connection asking to register a new publisher.
+	def handle_register_publisher_command( connection, data, header )
+		name, public_key = *data
+
+		self.log.info "Registering a new publisher %p with key: %p" % [ name, public_key ]
+		Assemblage::Auth.save_remote_cert( name, public_key, type: :publisher )
+
+		msg = connection.response( :publisher_registered, [name] )
+		self.queue_output_message( msg )
 	end
 
 

          
@@ 280,40 344,43 @@ class Assemblage::Server
 	end
 
 
-	### Handle incoming read events from connected/authed clients.
+	### Handle incoming read events from connected/authed connections.
 	def on_socket_event( event )
 		self.log.debug "Got socket event: %p" % [ event ]
 		if event.readable?
-			self.handle_client_input( event )
+			self.handle_connection_input( event )
 		elsif event.writable?
-			self.handle_client_output( event )
+			self.handle_connection_output( event )
 		else
 			raise "Socket event was neither readable nor writable!? (%s)" % [ event ]
 		end
 	end
 
 
-	### Read a message from a user and route it to their Client.
-	def handle_client_input( event )
+	### Read a message from a user and route it to their Connection.
+	def handle_connection_input( event )
 		raise "Server is shutting down" unless self.running?
 
-		self.log.debug "Handling client input."
-		message = event.socket.receive
-		frame = message.frames.first
+		self.log.debug "Handling connection input."
+		raw_message = event.socket.receive
+		frame       = raw_message.frames.first
+		command     = Assemblage::Protocol.decode( raw_message )
 
-		if (( client = self.client_for_sender(frame) ))
-			self.log.debug "Got message from %s" % [ frame.meta(Assemblage::Auth::CLIENT_NAME_KEY) ]
-			command = Assemblage::Protocol.decode( message )
-			self.handle_command( client, *command )
+		if (( conn = self.connection_for_sender(frame) ))
+			self.log.debug "Got message from %s" % [ conn ]
+			self.handle_command( conn, *command )
 		else
-			self.log.error "Read event from an unknown origin: routing_id: %p, client_name: %p" %
-				[ frame.routing_id, frame.meta(Assemblage::Auth::CLIENT_NAME_KEY) ]
+			self.log.info "No connection for input from routing_id: %p" %
+				[ frame.routing_id ]
+			msg = Assemblage::Protocol.encode( :disonnect )
+			msg.routing_id = frame.routing_id
+			self.queue_output_message( msg )
 		end
 	end
 
 
-	### Dequeue a message from the output queue and route it to the appropriate client.
-	def handle_client_output( event )
+	### Dequeue a message from the output queue and route it to the appropriate connection.
+	def handle_connection_output( event )
 		message = self.output_queue.shift
 		message.send_to( event.socket )
 	rescue IO::EAGAINWaitWritable => err

          
@@ 327,21 394,6 @@ class Assemblage::Server
 	end
 
 
-	### Handle the specified +command+ triple for +client+. The +command+ should be an Array
-	### of a command type (a Symbol), any data associated with the command, and a header Hash.
-	def handle_command( client, command, data, header )
-		raise "Invalid command %p" % [ command ] unless VALID_COMMANDS.include?( command )
-
-		method_name = "handle_%s_command" % [ command ]
-		callable = self.method( method_name )
-		callable.call( client, data, header )
-
-	rescue => err
-		self.log.error "%p while handling command: %s" % [ err.class, err.message ]
-		self.queue_output_message( client.make_error_response(err) )
-	end
-
-
 	### Queue the given +message+ for output on the server socket.
 	def queue_output_message( message )
 		self.output_queue << message

          
@@ 365,60 417,90 @@ class Assemblage::Server
 	end
 
 
-	### Find the client associated with the specified message +frame+ (a
+	### Find the connection associated with the specified message +frame+ (a
 	### CZTop::Frame), creating a new one if this is the first message from them.
-	def client_for_sender( frame )
+	def connection_for_sender( frame )
 		rid = frame.routing_id
-		return self.clients.fetch( rid ) do
-			require 'pry'; binding.pry
-			clientname = frame.meta( Assemblage::Auth::CLIENT_NAME_KEY )
+		return self.connections.fetch( rid ) do
+			connection_name = frame.meta( Assemblage::Auth::CONNECTION_NAME_KEY )
 
-			if clientname
-				self.log.debug "Looking up client %p" % [ clientname ]
-				self.connect_client( clientname, rid )
+			connection = nil
+			if connection_name
+				self.log.debug "Looking up connection %p" % [ connection_name ]
+				connection = Assemblage::Connection[ name: connection_name ] or
+					self.log.info "Ignoring frame from unapproved connection %p" %
+						[ connection_name ]
 			else
-				self.log.error "No client name associated with %p" % [ frame ]
-				nil
+				self.log.info "Making anonymous connection for routing ID %p" % [ rid ]
+				connection = Assemblage::Connection.new
 			end
+
+			if connection
+				connection.on_connected( self, rid )
+				connection.freeze
+
+				self.connections[ rid ] = connection
+			end
+
+			connection
 		end
 	end
 
 
-	### Look up the client associated with the specified +clientname+, connect it to
-	### the manager via the given +routing_id+, and return it.
-	def connect_client( clientname, routing_id )
-		self.log.debug "Looking up client: %s" % [ clientname ]
+	### Handle the specified +command+ triple for +connection+. The +command+ should be an Array
+	### of a command type (a Symbol), any data associated with the command, and a header Hash.
+	def handle_command( connection, command, data, header )
+		if connection.authenticated?
+			raise "Invalid command %p" % [ command ] unless
+				AUTHED_COMMANDS.include?( command )
+		else
+			raise "Invalid (unauthed) command %p" % [ command ] unless
+				UNAUTHED_COMMANDS.include?( command )
+		end
 
-		client = Assemblage::Client[ name: clientname ] or return nil
-		client.on_connected( self, routing_id )
-		self.clients[ routing_id ] = client
+		method_name = "handle_%s_command" % [ command ]
+		callable = self.method( method_name )
+		callable.call( connection, data, header )
 
-		return client
+	rescue => err
+		self.log.error "%p while handling command from %p: %s" %
+			[ err.class, connection, err.message ]
+		self.log.debug "  " + err.full_message( order: :bottom )
+
+		msg = connection.error_response( err )
+		self.queue_output_message( msg )
 	end
 
 
-	### Disconnect the specified +client+.
-	def disconnect_client( client )
-		self.log.info "Dropping connection for client '%s'" % [ client.name ]
-		routing_id = client.routing_id
+	### Disconnect the specified +connection+.
+	def disconnect_connection( connection )
+		self.log.info "Dropping connection for connection '%s'" % [ connection.name ]
+		routing_id = connection.routing_id
 
-		client.on_disconnected
+		connection.on_disconnected
 
-		message = Assemblage::Protocol.encode( :control, [:disconnect] )
-		message.routing_id = routing_id
+		message = connection.response( :disconnect )
 		self.queue_output_message( message )
 
-		self.log.debug "  deleting %p from: %p" % [ routing_id, self.clients ]
-		self.clients.delete( routing_id )
-		self.log.debug "  disconnected (%d client/s remain)." % [ self.clients.length ]
+		self.log.debug "  deleting %p from: %p" % [ routing_id, self.connections ]
+		self.connections.delete( routing_id )
+		self.log.debug "  disconnected (%d connection/s remain)." % [ self.connections.length ]
 	end
 
 
-	### Disconnect all currently-connected clients.
-	def disconnect_all_clients
-		self.log.info "Disconnecting %d client/s." % [ self.clients.length ]
-		self.clients.values.each do |client|
-			self.disconnect_client( client )
+	### Disconnect all currently-connected connections.
+	def disconnect_all_connections
+		self.log.info "Disconnecting %d connection/s." % [ self.connections.length ]
+		self.connections.values.each do |connection|
+			self.disconnect_connection( connection )
+		end
+	end
+
+
+	### Dump the status of all threads to the tty (SIGUSR1 handler)
+	def show_thread_status
+		Thread.list.each do |thr|
+			p thr
 		end
 	end
 

          
@@ 429,6 511,8 @@ class Assemblage::Server
 		case signal
 		when :INT, :TERM, :HUP
 			self.stop
+		when :INFO
+			self.show_thread_status
 		else
 			super
 		end

          
A => lib/assemblage/user.rb +23 -0
@@ 0,0 1,23 @@ 
+# -*- ruby -*-
+# frozen_string_literal: true
+
+require 'rbnacl'
+
+require 'assemblage' unless defined?( Assemblage )
+require 'assemblage/db_object'
+
+
+class Assemblage::User < Assemblage::DbObject( :users )
+
+
+	# Maintain the timestamp fields automatically
+	plugin :timestamps
+
+	#
+	# Associations
+	#
+
+end # class Assemblage::User
+
+
+

          
M lib/assemblage/worker.rb +106 -134
@@ 26,7 26,13 @@ class Assemblage::Worker
 	HANDLED_SIGNALS = %i[TERM HUP INT] & Signal.list.keys.map( &:to_sym )
 
 	# The name given to workers by default
-	DEFAULT_WORKER_NAME = "#{Socket.gethostname.gsub('.', '-').downcase}-worker1"
+	DEFAULT_NAME = "#{Socket.gethostname.gsub('.', '-').downcase}-worker"
+
+	# A Regexp for matching valid worker names
+	VALID_NAME = /\A[a-z](\w+)(-\w+)*\z/
+
+	# A Regexp for matching valid tags
+	VALID_TAG = /\A\p{Alnum}+(-\p{Alnum}+)*:\s*\p{Print}+\z/
 
 	# The list of valid actions for a worker
 	HANDLED_MESSAGE_TYPES = %i[hello goodbye new_assembly]

          
@@ 41,7 47,7 @@ class Assemblage::Worker
 
 	### Returns +true+ if the specified +name+ is valid for a worker's name.
 	def self::valid_worker_name?( name )
-		return Assemblage::Auth.valid_client_name?( name )
+		return Assemblage::Auth.valid_connection_name?( name )
 	end
 
 

          
@@ 50,7 56,7 @@ class Assemblage::Worker
 
 		##
 		# The name the worker uses to identify itself
-		setting :name, default: DEFAULT_WORKER_NAME do |name|
+		setting :name, default: DEFAULT_NAME do |name|
 			raise ArgumentError, "invalid worker name %p" % [ name ] unless
 				Assemblage::Worker.valid_worker_name?( name )
 			name

          
@@ 64,11 70,6 @@ class Assemblage::Worker
 			Array( tags )
 		end
 
-
-		##
-		# The assembly server the worker will listen to for work.
-		setting :server
-
 	end
 
 

          
@@ 112,9 113,10 @@ class Assemblage::Worker
 	end
 
 
-	### Generate a client CZTop::Certificate for the worker.
-	def self::generate_cert
-		Assemblage::Auth.generate_local_cert unless Assemblage::Auth.has_local_cert?
+	### Generate a connection CZTop::Certificate for the worker.
+	def self::generate_cert( name )
+		Assemblage::Auth.generate_local_cert( name, :worker ) unless
+			Assemblage::Auth.has_local_cert?
 	end
 
 

          
@@ 124,11 126,20 @@ class Assemblage::Worker
 	end
 
 
+	### Return the worker's private key as a Z85-encoded ASCII string.
+	def self::private_key
+		return Assemblage::Auth.local_cert.private_key
+	end
+
+
 	### Set up the Worker's directory as a worker run directory. Raises an
 	### exception if the directory already exists and is not empty.
-	def self::setup_run_directory( directory='.', name=DEFAULT_WORKER_NAME, tags=[] )
+	def self::setup_run_directory( directory='.', name=DEFAULT_NAME, tags=[] )
 		directory = Pathname( directory || '.' )
 		raise "Directory not empty" if directory.exist? && !directory.empty?
+		tags.each do |tag|
+			raise "Invalid tag %p" % [ tag ] unless tag.match( VALID_TAG )
+		end
 
 		self.log.debug "Attempting to set up %s as a run directory." % [ directory ]
 		directory.mkpath

          
@@ 142,25 153,43 @@ class Assemblage::Worker
 		Loggability.with_level( :fatal ) do
 			config.install
 		end
-		config.write( directory + Assemblage::DEFAULT_CONFIG_FILE )
+		config_path = directory + Assemblage::DEFAULT_CONFIG_FILE
+		config.write( config_path )
+		config.path = config_path
 	end
 
 
-	### Add a new server at the given +url+ to the current run directory.
-	def self::add_server( url, public_key )
+	### Register the client with the server at the specified +url+, using the given
+	### +public_key+ to authenticate.
+	def self::register_with_server( name, url, server_key )
+		server_key = server_key.public_key if server_key.respond_to?( :public_key )
+
 		config = Assemblage.config
-
 		if config&.path
 			self.log.debug "Writing server config to %s" % [ config.path ]
-			config.assemblage.worker.server = url
+			config.assemblage.worker.server = url.to_s
 			config.write
 		else
 			self.log.warn "Couldn't write server URL to the config: not loaded from a file!"
 		end
 
 		# :TODO: Change this when/if workers support listening to multiple servers.
-		name = 'server'
-		Assemblage::Auth.save_remote_cert( name, public_key )
+		server_cert = Assemblage::Auth.
+			save_remote_cert( 'server', server_key, type: :server, endpoint: url )
+
+		self.log.debug "Talking to the server to register"
+		cert = Assemblage::Auth.local_cert
+		client = Assemblage::Client.new( url.to_s, cert, server_cert )
+		client.on_message( :worker_registered ) do |data, **headers|
+			self.log.debug "Client successfully registered."
+			client.stop
+		end
+		client.on_message( :error ) do |data, **header|
+			client.handle_error_message( data, **header )
+			client.stop
+		end
+		client.send_message( :register_worker, name, cert.public_key )
+		client.start
 	end
 
 

          
@@ 178,15 207,16 @@ class Assemblage::Worker
 	### Create a nwe Assemblage::Worker.
 	def initialize( name: nil, server: nil, tags: nil )
 		@name       = name || Assemblage::Worker.name or raise "No worker name specified."
-		@server     = server || Assemblage::Worker.server or raise "No server specified."
 		@tags       = Array( tags || Assemblage::Worker.tags )
 		@reactor    = CZTop::Reactor.new
-		@socket     = nil
 		@start_time = nil
 
-		@assembly_builders   = []
-		@send_queue          = []
-		@status_report_timer = nil
+		@client               = nil
+
+		@assembly_builders    = []
+		@send_queue           = []
+		@status_report_timer  = nil
+		@fetch_assembly_timer = nil
 	end
 
 

          
@@ 196,24 226,20 @@ class Assemblage::Worker
 
 
 	##
-	# The client name associated with the worker
+	# The connection name associated with the worker
 	attr_reader :name
 
 	##
-	# The URL for the server the worker will accept assemblies from
-	attr_reader :server
-
-	##
 	# The CZTop::Reactor that handles asynchronous IO, timed events, and signals.
 	attr_reader :reactor
 
 	##
-	# The CZTop::Socket::SERVER the server uses for communication with repos and workers.
-	attr_reader :socket
+	# The Time the worker started
+	attr_accessor :start_time
 
 	##
-	# The Time the worker started
-	attr_accessor :start_time
+	# The Assemblage::Client the worker uses to communicate with its server.
+	attr_reader :client
 
 	##
 	# The Assemblage::AssemblyBuilders that have been created for pending

          
@@ 230,32 256,44 @@ class Assemblage::Worker
 		Assemblage::Auth.check_environment
 		self.log.info "Starting assembly worker '%s'." % [ self.name ]
 
-		@socket = self.create_client_socket
+		local_cert = Assemblage::Auth.local_cert or
+			raise "No local auth cert configured."
+		server_cert = Assemblage::Auth.remote_cert( 'server' ) or
+			raise "No server cert configured."
+		endpoint = server_cert['endpoint'] or
+			raise "Server cert doesn't have an endpoint set."
+
+		@client = Assemblage::Client.new( url.to_s, cert, server_cert )
 
-		self.log.debug "Starting event loop."
-		self.with_signal_handler( self.reactor, *HANDLED_SIGNALS ) do
-			self.with_socket_monitor( self.reactor, @socket ) do
-				@start_time = Time.now
-				self.fetch_pending_assemblies
-				self.start_assembly_timer
-				self.reactor.start_polling( ignore_interrupts: true )
-			end
+		@client.on_message( :error ) do |data, **header|
+			client.handle_error_message( data, **header )
+			client.stop
 		end
-		@start_time = nil
-		self.log.debug "Exited event loop."
+
+		@start_time = Time.now
+		@client.start
+	end
+
+
+	### Return the client's reactor.
+	### :TODO: This will eventually be the reactor that belongs to the worker that
+	### is delegated to multiple clients.
+	def reactor
+		return self.client.reactor
 	end
 
 
 	### Stop the worker.
 	def stop
 		self.log.info "Stopping the assembly worker."
-		self.reactor.stop_polling
+		self.stop_status_report_timer
+		self.client.stop
 	end
 
 
 	### Returns +true+ if the server is running.
 	def running?
-		return @start_time && ! self.reactor.empty?
+		return self.client.running?
 	end
 
 

          
@@ 265,37 303,41 @@ class Assemblage::Worker
 	end
 
 
-	### Return the CLIENT socket that's used to connect to the assembly server.
-	def create_client_socket
-		self.log.debug "Creating a CLIENT socket bound to: %s" % [ self.server ]
-		sock = CZTop::Socket::CLIENT.new
-
-		client_cert = Assemblage::Auth.local_cert
-		server_cert = Assemblage::Auth.remote_cert( 'server' )
-
-		self.log.debug "Connecting with %p and %p" % [ client_cert, server_cert ]
-		sock.CURVE_client!( client_cert, server_cert )
-		sock.connect( self.server )
-
-		return sock
+	### Return a Hash of the criteria the worker should use when
+	### requesting/subscribing to jobs frmo the server.
+	def job_criteria
+		platform = Gem::Platform.local
+		return {
+			os: platform.os,
+			cpu: platform.cpu,
+			os_version: platform.version,
+			ruby_version: RUBY_VERSION,
+		}
 	end
 
 
 	### Populate the worker's assembly builders with any assemblies that were queued
 	### for workers matching this one.
 	def fetch_pending_assemblies
-		
+		criteria = self.job_criteria
+		self.send_message( :fetch_assemblies, criteria )
 	end
 
 
 	### Periodically check for new assemblies to work on. If there are some, start
 	### working on them.
 	def start_assembly_timer
-		self.reactor.
+		@fetch_assembly_timer = self.reactor.
 			add_periodic_timer( ASSEMBLY_TIMER_INTERVAL, &self.method(:work_on_assemblies) )
 	end
 
 
+	### Stop the timer that checks for new assemblies to work on.
+	def stop_assembly_timer
+		self.reactor.remove_timer( @fetch_assembly_timer ) if @fetch_assembly_timer
+	end
+
+
 	### Return the AssemblyBuilder that is currently working, if any.
 	def current_assembly_builder
 		return self.assembly_builders.first

          
@@ 334,7 376,7 @@ class Assemblage::Worker
 
 	### Stop the timer that periodically reports on the worker's status.
 	def stop_status_report_timer
-		self.reactor.remove_timer( @status_report_timer )
+		self.reactor.remove_timer( @status_report_timer ) if @status_report_timer
 	end
 
 

          
@@ 352,83 394,13 @@ class Assemblage::Worker
 			uptime: self.uptime
 		}
 
-		message = Assemblage::Protocol.encode( :status_report, report )
-
-		self.send_message( message )
+		self.client.send_message( :status_report, report )
 	end
 
 
 	### Queue a result message for the worker.
 	def send_result( assembly_id, result )
-		message = Assemblage::Protocol.encode( :result, assembly_id, result )
-		self.send_message( message )
-	end
-
-
-	### Queue up the specified +message+ for sending to the server.
-	def send_message( message )
-		self.send_queue << message
-		self.reactor.enable_events( self.socket, :write ) unless
-			self.reactor.event_enabled?( self.socket, :write )
-	end
-
-
-	### Handle an event on the CLIENT socket.
-	def on_socket_event( event )
-		if event.readable?
-			self.handle_readable_io_event( event )
-		elsif event.writable?
-			self.handle_writable_io_event( event )
-		else
-			raise "Socket event was neither readable nor writable!? (%p)" % [ event ]
-		end
-	end
-
-
-	### Handle a readable event on a socket.
-	def handle_readable_io_event( event )
-		self.log.debug "Got socket read event: %p" % [ event ]
-		msg = event.socket.receive
-		type, data, header = Assemblage::Protocol.decode( msg )
-
-		unless HANDLED_MESSAGE_TYPES.include?( type )
-			self.log.error "Got unhandled message type %p" % [ type ]
-			raise "Invalid action %p!" % [ type ]
-		end
-
-		method_name = "on_%s_message" % [ type ]
-		handler = self.method( method_name )
-		handler.call( data, header )
-	end
-
-
-	### Handle the socket becoming writable by sending the next queued message to the hub and
-	### unregistered it from writable events if that empties the queue.
-	def handle_writable_io_event( event )
-		if message = self.send_queue.shift
-			message.send_to( self.socket )
-		else
-			self.reactor.disable_events( self.socket, :write )
-		end
-	end
-
-
-	### Handle a +signal+ trapped by the reactor.
-	def handle_signal( signal )
-		self.log.info "Handling %p signal." % [ signal ]
-		case signal
-		when :INT, :TERM, :HUP
-			self.stop
-		else
-			super
-		end
-	end
-
-
-	### Handle a `hello` message from the server.
-	def on_hello_message( info, * )
-		self.log.info "Connected. Waiting for an assembly to build."
-		super
+		self.client.send_message( :post_result, assembly_id, result )
 	end
 
 

          
A => lib/cztop/monkeypatches.rb +105 -0
@@ 0,0 1,105 @@ 
+# -*- ruby -*-
+# frozen_string_literal: true
+
+require 'cztop'
+
+
+# Monkeypatches that add some more stuff to CZTop classes.
+module CZTop::MonkeyPatches
+
+
+	# Mixin: adds more accessors for zsock options.
+	module MoreZsockOptions
+		include CZMQ::FFI
+
+		# Aliases for differently-cased options
+		def self::included( mod )
+			super
+			mod.instance_eval do
+				alias_method :plain_server, :PLAIN_server?
+				alias_method :plain_server=, :PLAIN_server=
+				alias_method :plain_username, :PLAIN_username
+				alias_method :plain_username=, :PLAIN_username=
+				alias_method :plain_password, :PLAIN_password
+				alias_method :plain_password=, :PLAIN_password=
+				alias_method :curve_server, :CURVE_server?
+				alias_method :curve_server=, :CURVE_server=
+				alias_method :curve_publickey, :CURVE_publickey
+				alias_method :curve_secretkey, :CURVE_secretkey
+				alias_method :curve_serverkey, :CURVE_serverkey
+				alias_method :curve_serverkey=, :CURVE_serverkey=
+			end
+		end
+
+
+		def gssapi_server; Zsock.gssapi_server(self.zocket); end
+		def gssapi_server=( value ); Zsock.set_gssapi_server(self.zocket, value); end
+		def gssapi_plaintext; Zsock.gssapi_plaintext(self.zocket); end
+		def gssapi_plaintext=( value ); Zsock.set_gssapi_plaintext(self.zocket, value); end
+		def gssapi_principal; Zsock.gssapi_principal(self.zocket); end
+		def gssapi_principal=( value ); Zsock.set_gssapi_principal(self.zocket, value); end
+		def gssapi_service_principal; Zsock.gssapi_service_principal(self.zocket); end
+		def gssapi_service_principal=( value ); Zsock.set_gssapi_service_principal(self.zocket, value); end
+		def immediate; Zsock.immediate(self.zocket); end
+		def immediate=( value ); Zsock.set_immediate(self.zocket, value); end
+		def type; Zsock.type(self.zocket); end
+		def type=( value ); Zsock.set_type(self.zocket, value); end
+		def affinity; Zsock.affinity(self.zocket); end
+		def affinity=( value ); Zsock.set_affinity(self.zocket, value); end
+		def rate; Zsock.rate(self.zocket); end
+		def rate=( value ); Zsock.set_rate(self.zocket, value); end
+		def recovery_ivl; Zsock.recovery_ivl(self.zocket); end
+		def recovery_ivl=( value ); Zsock.set_recovery_ivl(self.zocket, value); end
+		def sndbuf; Zsock.sndbuf(self.zocket); end
+		def sndbuf=( value ); Zsock.set_sndbuf(self.zocket, value); end
+		def rcvbuf; Zsock.rcvbuf(self.zocket); end
+		def rcvbuf=( value ); Zsock.set_rcvbuf(self.zocket, value); end
+		def reconnect_ivl; Zsock.reconnect_ivl(self.zocket); end
+		def reconnect_ivl=( value ); Zsock.set_reconnect_ivl(self.zocket, value); end
+		def reconnect_ivl_max; Zsock.reconnect_ivl_max(self.zocket); end
+		def reconnect_ivl_max=( value ); Zsock.set_reconnect_ivl_max(self.zocket, value); end
+		def backlog; Zsock.backlog(self.zocket); end
+		def backlog=( value ); Zsock.set_backlog(self.zocket, value); end
+		def maxmsgsize; Zsock.maxmsgsize(self.zocket); end
+		def maxmsgsize=( value ); Zsock.set_maxmsgsize(self.zocket, value); end
+		def multicast_hops; Zsock.multicast_hops(self.zocket); end
+		def multicast_hops=( value ); Zsock.set_multicast_hops(self.zocket, value); end
+		def tcp_keepalive; Zsock.tcp_keepalive(self.zocket); end
+		def tcp_keepalive=( value ); Zsock.set_tcp_keepalive(self.zocket, value); end
+		def tcp_keepalive_idle; Zsock.tcp_keepalive_idle(self.zocket); end
+		def tcp_keepalive_idle=( value ); Zsock.set_tcp_keepalive_idle(self.zocket, value); end
+		def tcp_keepalive_cnt; Zsock.tcp_keepalive_cnt(self.zocket); end
+		def tcp_keepalive_cnt=( value ); Zsock.set_tcp_keepalive_cnt(self.zocket, value); end
+		def tcp_keepalive_intvl; Zsock.tcp_keepalive_intvl(self.zocket); end
+		def tcp_keepalive_intvl=( value ); Zsock.set_tcp_keepalive_intvl(self.zocket, value); end
+		def rcvmore; Zsock.rcvmore(self.zocket); end
+		def rcvmore=( value ); Zsock.set_rcvmore(self.zocket, value); end
+		def last_endpoint; Zsock.last_endpoint(self.zocket); end
+		def last_endpoint=( value ); Zsock.set_last_endpoint(self.zocket, value); end
+		def ipv6; Zsock.ipv6(self.zocket); end
+		def ipv6=( value ); Zsock.set_ipv6(self.zocket, value); end
+		def ipv4only; Zsock.ipv4only(self.zocket); end
+		def ipv4only=( value ); Zsock.set_ipv4only(self.zocket, value); end
+		def tcp_accept_filter; Zsock.tcp_accept_filter(self.zocket); end
+		def tcp_accept_filter=( value ); Zsock.set_tcp_accept_filter(self.zocket, value); end
+
+	end # module MoreZsockOptions
+	CZTop::ZsockOptions::OptionsAccessor.include( MoreZsockOptions )
+
+
+	# Mixin: adds metadata accessor for zframes.
+	module MoreZframeMethods
+		include CZMQ::FFI
+
+		### Return metadata +property+ from the Frame.
+		def meta( property )
+			return ffi_delegate.meta( property.to_s )
+		end
+
+	end # module MoreZframeMethods
+	CZTop::Frame.include( MoreZframeMethods )
+
+
+end # module CZTop::MonkeyPatches
+
+

          
M spec/assemblage/auth_spec.rb +20 -6
@@ 26,7 26,7 @@ describe Assemblage::Auth do
 
 
 		it "can generate a new local cert" do
-			cert = described_class.generate_local_cert
+			cert = described_class.generate_local_cert( 'local' )
 			expect( cert ).to be_a( CZTop::Certificate )
 			expect( described_class.cert_store_dir + 'local' ).to exist
 			expect( described_class.cert_store_dir + 'local_secret' ).to exist

          
@@ 34,9 34,9 @@ describe Assemblage::Auth do
 
 
 		it "won't clobber an existing local cert" do
-			described_class.generate_local_cert
+			described_class.generate_local_cert( 'local' )
 			expect {
-				described_class.generate_local_cert
+				described_class.generate_local_cert( 'local' )
 			}.to raise_error( /already/i )
 		end
 

          
@@ 44,7 44,7 @@ describe Assemblage::Auth do
 		it "raises a sensible error when attempting to generate a cert without a cert_store_dir" do
 			described_class.cert_store_dir = nil
 			expect {
-				described_class.generate_local_cert
+				described_class.generate_local_cert( 'testing' )
 			}.to raise_error( /no local cert dir/i )
 		end
 

          
@@ 76,7 76,21 @@ describe Assemblage::Auth do
 			expect( cert ).to be_a( CZTop::Certificate )
 			expect( cert.public_key ).to_not be_empty
 			expect( cert.secret_key ).to be_nil
-			expect( described_class.client_name_for(cert) ).to eq( 'funkervogt' )
+			expect( described_class.connection_name_for(cert) ).to eq( 'funkervogt' )
+		end
+
+
+		it "can save remote certs with custom metadata" do
+			thirdparty_cert = CZTop::Certificate.new
+			pubkey = thirdparty_cert.public_key
+
+			cert = described_class.save_remote_cert( 'funkervogt', pubkey, endpoint: 'inproc:/foo' )
+
+			expect( cert ).to be_a( CZTop::Certificate )
+			expect( cert.public_key ).to_not be_empty
+			expect( cert.secret_key ).to be_nil
+			expect( described_class.connection_name_for(cert) ).to eq( 'funkervogt' )
+			expect( cert['endpoint'] ).to eq( 'inproc:/foo' )
 		end
 
 

          
@@ 89,7 103,7 @@ describe Assemblage::Auth do
 
 			expect( cert ).to be_a( CZTop::Certificate )
 			expect( cert.public_key ).to eq( pubkey )
-			expect( described_class.client_name_for(cert) ).to eq( 'adelios' )
+			expect( described_class.connection_name_for(cert) ).to eq( 'adelios' )
 		end
 
 	end