contest: fetch & save QSOs from announcements

Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
1 files changed, 122 insertions(+), 2 deletions(-)

M hlog/contest/core.c
M hlog/contest/core.c +122 -2
@@ 21,6 21,7 @@ 
  */
 
 #include <jeffpc/timer.h>
+#include <jeffpc/taskq.h>
 #include <jeffpc/sock.h>
 
 #include <hlog/qso.h>

          
@@ 30,6 31,7 @@ 
 #include <hlog-rpc/announce-pub.h>
 #include <hlog-rpc/announce-sub.h>
 #include <hlog-rpc/rig-mode.h>
+#include <hlog-rpc/rpc-client.h>
 
 #include "../common/config.h"
 

          
@@ 56,9 58,115 @@ static struct rig_info {
 static struct lock rig_lock;
 static LOCK_CLASS(rig_lc);
 
+struct fetch_args {
+	struct xuuid uuid;
+	union xsockaddr addr;
+	char hostname[sizeof(((struct instance_info *) NULL)->hostname)];
+};
+
+static struct taskq *fetch_tq;
 static struct periodic info_periodic;
 
 /*
+ * QSO
+ */
+static void print_fetch_error(const char *hostname,
+			      const union xsockaddr *addr,
+			      const struct xuuid *uuid,
+			      int ret)
+
+{
+	char str[XUUID_PRINTABLE_STRING_LENGTH];
+	char addr_str[XSOCKADDR_STRLEN];
+
+	xuuid_unparse(uuid, str);
+
+	if (!xsockaddr_ntop(&addr->sa, addr_str, sizeof(addr_str)))
+		strcpy(addr_str, "???");
+
+	if (!hostname)
+		hostname = "???";
+
+	msg_print(MSG_QSO, CE_WARN, "Failed to retrieve announced QSO "
+		  "from %s (%s): %s\n  %s", addr_str, hostname,
+		  xstrerror(ret), str);
+}
+
+static void fetch_qso(void *_args)
+{
+	struct fetch_args *args = _args;
+	struct qso *qso;
+	int ret;
+
+	qso = rpc_call_get_hlq(&args->addr, &args->uuid);
+	if (IS_ERR(qso)) {
+		ret = PTR_ERR(qso);
+		goto err;
+	}
+
+	/* TODO: add in sync related metadata */
+
+	/* TODO: call script_event_qso_done? somehow need to update dup structures */
+
+	ret = save_qso_and_update_index(qso, ANNOUNCE_QSO_SRC_ANNOUNCE);
+
+	qso_putref(qso);
+
+err:
+	if (ret)
+		print_fetch_error(args->hostname, &args->addr, &args->uuid,
+				  ret);
+
+	free(args);
+}
+
+static void handle_qso_notification(enum announce_qso_msg_type type,
+				    const struct instance_info *info,
+				    const struct xuuid *uuid)
+{
+	const union xsockaddr *announce_addr = &info->addrs[0].addr;
+	struct fetch_args *args;
+	int ret;
+
+	if (info->self)
+		return; /* we already have our own QSOs */
+
+	ASSERT3U(info->num_addrs, >=, 1);
+
+	if (!info->rpc_port) {
+		ret = -EDESTADDRREQ;
+		args = NULL;
+		goto err;
+	}
+
+	args = malloc(sizeof(struct fetch_args));
+	if (!args) {
+		ret = -ENOMEM;
+		goto err;
+	}
+
+	args->uuid = *uuid;
+	args->addr = *announce_addr;
+	STATIC_ASSERT(sizeof(args->hostname) >= sizeof(info->hostname));
+	memcpy(args->hostname, info->hostname, sizeof(info->hostname));
+
+	/* stash the rpc port in the xsockaddr */
+	ret = xsockaddr_set_inet_port(&args->addr, info->rpc_port);
+	if (ret)
+		goto err;
+
+	ret = taskq_dispatch(fetch_tq, fetch_qso, args);
+
+err:
+	if (ret) {
+		print_fetch_error(info->hostname, args ? &args->addr : NULL,
+				  uuid, ret);
+
+		free(args);
+	}
+}
+
+/*
  * location
  */
 static void handle_location_notification(enum announce_loc_msg_type type,

          
@@ 269,15 377,21 @@ int contest_core_init(struct contest_par
 
 	params->template = template;
 
+	fetch_tq = taskq_create_fixed("qso-fetch", -1);
+	if (IS_ERR(fetch_tq)) {
+		ret = PTR_ERR(fetch_tq);
+		goto err_free;
+	}
+
 	/*
 	 * Note: the announcement callbacks make use of the template
 	 */
 	ret = announce_connect(net_update_row,
-			       NULL,
+			       handle_qso_notification,
 			       handle_location_notification,
 			       handle_rig_notification);
 	if (ret)
-		goto err_free;
+		goto err_taskq;
 
 	/*
 	 * Note: the periodic timer makes uses of the template!

          
@@ 288,6 402,9 @@ int contest_core_init(struct contest_par
 
 	return 0;
 
+err_taskq:
+	taskq_destroy(fetch_tq);
+
 err_free:
 	qso_putref(template);
 

          
@@ 305,6 422,9 @@ void contest_core_cleanup(void)
 	/* disconnect from announcements - a little racy, but good enough */
 	announce_connect(NULL, NULL, NULL, NULL);
 
+	taskq_wait(fetch_tq);
+	taskq_destroy(fetch_tq);
+
 	qso_putref(contest_params->template);
 
 	MXDESTROY(&rig_lock);