socksvc & scgisvc: make them async

Instead of socksvc being a blocking function call, make it return a "handle"
that can be used later to stop the thread and close sockets.

Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
5 files changed, 101 insertions(+), 59 deletions(-)

M include/jeffpc/scgisvc.h
M include/jeffpc/socksvc.h
M mapfile-vers
M scgisvc.c
M socksvc.c
M include/jeffpc/scgisvc.h +4 -3
@@ 1,5 1,5 @@ 
 /*
- * Copyright (c) 2017-2018 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
+ * Copyright (c) 2017-2018,2024 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal

          
@@ 80,7 80,8 @@ struct scgi {
 	} scgi_stats;
 };
 
-extern int scgisvc(const char *host, uint16_t port, int nthreads,
-		   const struct scgiops *ops, void *private);
+extern struct socksvc *scgisvc(const char *host, uint16_t port, int nthreads,
+			       const struct scgiops *ops, void *private);
+extern void scgisvc_stop(struct socksvc *svc);
 
 #endif

          
M include/jeffpc/socksvc.h +8 -4
@@ 1,5 1,5 @@ 
 /*
- * Copyright (c) 2015-2017 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
+ * Copyright (c) 2015-2017,2024 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal

          
@@ 31,8 31,12 @@ struct socksvc_stats {
 	uint64_t dequeued_time;	/* time when this conn started being processed */
 };
 
-extern int socksvc(const char *host, uint16_t port, int nthreads,
-		   void (*func)(int fd, struct socksvc_stats *, void *),
-		   void *private);
+struct socksvc;
+
+extern struct socksvc *socksvc(const char *host, uint16_t port, int nthreads,
+			       void (*func)(int fd, struct socksvc_stats *,
+					    void *),
+			       void *private);
+extern void socksvc_stop(struct socksvc *state);
 
 #endif

          
M mapfile-vers +1 -0
@@ 273,6 273,7 @@ JEFFPC_0.10 {
 
 		# socksvc
 		socksvc;
+		socksvc_stop;
 
 		# str / sym
 		_strsym_alloc;

          
M scgisvc.c +28 -9
@@ 1,5 1,5 @@ 
 /*
- * Copyright (c) 2014-2020 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
+ * Copyright (c) 2014-2020,2024 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal

          
@@ 420,16 420,35 @@ out:
 	scgi_free(req, false);
 }
 
-int scgisvc(const char *host, uint16_t port, int nthreads,
-	    const struct scgiops *ops, void *private)
+struct socksvc *scgisvc(const char *host, uint16_t port, int nthreads,
+			const struct scgiops *ops, void *private)
 {
-	struct scgiargs args = {
-		.ops = ops,
-		.private = private,
-	};
+	struct socksvc *svc;
+	struct scgiargs *args;
 
 	if (!ops || !ops->process)
-		return -EINVAL;
+		return ERR_PTR(-EINVAL);
+
+	args = malloc(sizeof(struct scgiargs));
+	if (!args)
+		return ERR_PTR(-ENOMEM);
+
+	args->ops = ops;
+	args->private = private;
+
+	svc = socksvc(host, port, nthreads, scgi_conn, args);
+	if (IS_ERR(svc))
+		free(args);
 
-	return socksvc(host, port, nthreads, scgi_conn, &args);
+	return svc;
 }
+
+void scgisvc_stop(struct socksvc *svc)
+{
+	if (!svc)
+		return;
+
+	/* FIXME: this leaks the heap-allocated args */
+
+	socksvc_stop(svc);
+}

          
M socksvc.c +60 -43
@@ 38,6 38,7 @@ 
 #include <jeffpc/mem.h>
 #include <jeffpc/sock.h>
 #include <jeffpc/socksvc.h>
+#include <jeffpc/thread.h>
 
 /*
  * This part of the communication library is a bit different.  It is meant

          
@@ 51,22 52,24 @@ 
 #define MAX_SOCK_FDS		8
 #define CONN_BACKLOG		64
 
-struct state {
+struct socksvc {
 	void *private;
 	struct taskq *taskq;
+	pthread_t thread;
 	void (*func)(int, struct socksvc_stats *, void *);
 	int fds[MAX_SOCK_FDS];
 	int nfds;
+
+	atomic_t server_shutdown;
 };
 
 struct cb {
 	struct socksvc_stats stats;
-	struct state *state;
+	struct socksvc *state;
 	int fd;
 };
 
 static struct mem_cache *socksvc_cache;
-static atomic_t server_shutdown;
 
 static void __attribute__((constructor)) init_socksvc_subsys(void)
 {

          
@@ 75,13 78,6 @@ static void __attribute__((constructor))
 	ASSERT(!IS_ERR(socksvc_cache));
 }
 
-static void sigterm_handler(int signum, siginfo_t *info, void *unused)
-{
-	cmn_err(CE_INFO, "SIGTERM received");
-
-	atomic_set(&server_shutdown, 1);
-}
-
 static void handle_signals(void)
 {
 	struct sigaction action;

          
@@ 96,17 92,9 @@ static void handle_signals(void)
 	if (ret)
 		cmn_err(CE_INFO, "Failed to ignore SIGPIPE: %s",
 			strerror(errno));
-
-	action.sa_sigaction = sigterm_handler;
-	action.sa_flags = SA_SIGINFO;
-
-	ret = sigaction(SIGTERM, &action, NULL);
-	if (ret)
-		cmn_err(CE_INFO, "Failed to set SIGTERM handler: %s",
-			strerror(errno));
 }
 
-static int bind_sock(struct state *state, int family, struct sockaddr *addr,
+static int bind_sock(struct socksvc *state, int family, struct sockaddr *addr,
 		     int addrlen)
 {
 	const int on = 1;

          
@@ 141,7 129,7 @@ err:
 	return ret;
 }
 
-static int start_listening(struct state *state, const char *host,
+static int start_listening(struct socksvc *state, const char *host,
 			   uint16_t port)
 {
 	struct addrinfo hints, *res, *p;

          
@@ 185,7 173,7 @@ static int start_listening(struct state 
 	return 0;
 }
 
-static void stop_listening(struct state *state)
+static void stop_listening(struct socksvc *state)
 {
 	int i;
 

          
@@ 205,8 193,9 @@ static void wrap_taskq_callback(void *ar
 	mem_cache_free(socksvc_cache, cb);
 }
 
-static void accept_conns(struct state *state)
+static void *accept_conns(void *private)
 {
+	struct socksvc *state = private;
 	fd_set set;
 	int maxfd;
 	int ret;

          
@@ 230,7 219,7 @@ static void accept_conns(struct state *s
 
 		select_time = gettime();
 
-		if (atomic_read(&server_shutdown))
+		if (atomic_read(&state->server_shutdown))
 			break;
 
 		if ((ret < 0) && (errno != EINTR))

          
@@ 278,46 267,74 @@ static void accept_conns(struct state *s
 			ret--;
 		}
 	}
+
+	return NULL;
 }
 
-int socksvc(const char *host, uint16_t port, int nthreads,
-	    void (*func)(int fd, struct socksvc_stats *, void *), void *private)
+struct socksvc *socksvc(const char *host, uint16_t port, int nthreads,
+			void (*func)(int fd, struct socksvc_stats *, void *),
+			void *private)
 {
 	char name[128];
-	struct state state;
+	struct socksvc *state;
 	int ret;
 
-	memset(&state, 0, sizeof(state));
-
-	state.func = func;
-	state.private = private;
+	state = zalloc(sizeof(struct socksvc));
+	if (!state)
+		return ERR_PTR(-ENOMEM);
 
-	snprintf(name, sizeof(name), "socksvc: %s:%d", host ? host : "<any>",
-		 port);
+	state->func = func;
+	state->private = private;
+	atomic_set(&state->server_shutdown, false);
 
-	state.taskq = taskq_create_fixed(name, nthreads);
-	if (IS_ERR(state.taskq)) {
-		ret = PTR_ERR(state.taskq);
+	state->taskq = taskq_create_fixed(name, nthreads);
+	if (IS_ERR(state->taskq)) {
+		ret = PTR_ERR(state->taskq);
 		goto err;
 	}
 
 	handle_signals();
 
-	ret = start_listening(&state, host, port);
+	ret = start_listening(state, host, port);
 	if (ret)
 		goto err_taskq;
 
-	accept_conns(&state);
-
-	stop_listening(&state);
+	ret = xthr_create(&state->thread, accept_conns, state);
+	if (ret)
+		goto err_listen;
 
-	taskq_wait(state.taskq);
+	return 0;
 
-	ret = 0;
+err_listen:
+	stop_listening(state);
 
 err_taskq:
-	taskq_destroy(state.taskq);
+	taskq_destroy(state->taskq);
 
 err:
-	return ret;
+	free(state);
+
+	return ERR_PTR(ret);
 }
+
+void socksvc_stop(struct socksvc *state)
+{
+	int ret;
+
+	if (!state)
+		return;
+
+	atomic_set(&state->server_shutdown, true);
+
+	ret = xthr_join(state->thread, NULL);
+	if (ret)
+		cmn_err(CE_ERROR, "%s: failed to join thread: %s\n", __func__,
+			xstrerror(ret));
+
+	stop_listening(state);
+
+	taskq_wait(state->taskq);
+	taskq_destroy(state->taskq);
+
+	free(state);
+}