@@ 21,6 21,7 @@
*/
#include <jeffpc/timer.h>
+#include <jeffpc/taskq.h>
#include <jeffpc/sock.h>
#include <hlog/qso.h>
@@ 30,6 31,7 @@
#include <hlog-rpc/announce-pub.h>
#include <hlog-rpc/announce-sub.h>
#include <hlog-rpc/rig-mode.h>
+#include <hlog-rpc/rpc-client.h>
#include "../common/config.h"
@@ 56,9 58,115 @@ static struct rig_info {
static struct lock rig_lock;
static LOCK_CLASS(rig_lc);
+struct fetch_args {
+ struct xuuid uuid;
+ union xsockaddr addr;
+ char hostname[sizeof(((struct instance_info *) NULL)->hostname)];
+};
+
+static struct taskq *fetch_tq;
static struct periodic info_periodic;
/*
+ * QSO
+ */
+static void print_fetch_error(const char *hostname,
+ const union xsockaddr *addr,
+ const struct xuuid *uuid,
+ int ret)
+
+{
+ char str[XUUID_PRINTABLE_STRING_LENGTH];
+ char addr_str[XSOCKADDR_STRLEN];
+
+ xuuid_unparse(uuid, str);
+
+ if (!xsockaddr_ntop(&addr->sa, addr_str, sizeof(addr_str)))
+ strcpy(addr_str, "???");
+
+ if (!hostname)
+ hostname = "???";
+
+ msg_print(MSG_QSO, CE_WARN, "Failed to retrieve announced QSO "
+ "from %s (%s): %s\n %s", addr_str, hostname,
+ xstrerror(ret), str);
+}
+
+static void fetch_qso(void *_args)
+{
+ struct fetch_args *args = _args;
+ struct qso *qso;
+ int ret;
+
+ qso = rpc_call_get_hlq(&args->addr, &args->uuid);
+ if (IS_ERR(qso)) {
+ ret = PTR_ERR(qso);
+ goto err;
+ }
+
+ /* TODO: add in sync related metadata */
+
+ /* TODO: call script_event_qso_done? somehow need to update dup structures */
+
+ ret = save_qso_and_update_index(qso, ANNOUNCE_QSO_SRC_ANNOUNCE);
+
+ qso_putref(qso);
+
+err:
+ if (ret)
+ print_fetch_error(args->hostname, &args->addr, &args->uuid,
+ ret);
+
+ free(args);
+}
+
+static void handle_qso_notification(enum announce_qso_msg_type type,
+ const struct instance_info *info,
+ const struct xuuid *uuid)
+{
+ const union xsockaddr *announce_addr = &info->addrs[0].addr;
+ struct fetch_args *args;
+ int ret;
+
+ if (info->self)
+ return; /* we already have our own QSOs */
+
+ ASSERT3U(info->num_addrs, >=, 1);
+
+ if (!info->rpc_port) {
+ ret = -EDESTADDRREQ;
+ args = NULL;
+ goto err;
+ }
+
+ args = malloc(sizeof(struct fetch_args));
+ if (!args) {
+ ret = -ENOMEM;
+ goto err;
+ }
+
+ args->uuid = *uuid;
+ args->addr = *announce_addr;
+ STATIC_ASSERT(sizeof(args->hostname) >= sizeof(info->hostname));
+ memcpy(args->hostname, info->hostname, sizeof(info->hostname));
+
+ /* stash the rpc port in the xsockaddr */
+ ret = xsockaddr_set_inet_port(&args->addr, info->rpc_port);
+ if (ret)
+ goto err;
+
+ ret = taskq_dispatch(fetch_tq, fetch_qso, args);
+
+err:
+ if (ret) {
+ print_fetch_error(info->hostname, args ? &args->addr : NULL,
+ uuid, ret);
+
+ free(args);
+ }
+}
+
+/*
* location
*/
static void handle_location_notification(enum announce_loc_msg_type type,
@@ 269,15 377,21 @@ int contest_core_init(struct contest_par
params->template = template;
+ fetch_tq = taskq_create_fixed("qso-fetch", -1);
+ if (IS_ERR(fetch_tq)) {
+ ret = PTR_ERR(fetch_tq);
+ goto err_free;
+ }
+
/*
* Note: the announcement callbacks make use of the template
*/
ret = announce_connect(net_update_row,
- NULL,
+ handle_qso_notification,
handle_location_notification,
handle_rig_notification);
if (ret)
- goto err_free;
+ goto err_taskq;
/*
* Note: the periodic timer makes uses of the template!
@@ 288,6 402,9 @@ int contest_core_init(struct contest_par
return 0;
+err_taskq:
+ taskq_destroy(fetch_tq);
+
err_free:
qso_putref(template);
@@ 305,6 422,9 @@ void contest_core_cleanup(void)
/* disconnect from announcements - a little racy, but good enough */
announce_connect(NULL, NULL, NULL, NULL);
+ taskq_wait(fetch_tq);
+ taskq_destroy(fetch_tq);
+
qso_putref(contest_params->template);
MXDESTROY(&rig_lock);