M include/jeffpc/scgisvc.h +4 -3
@@ 1,5 1,5 @@
/*
- * Copyright (c) 2017-2018 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
+ * Copyright (c) 2017-2018,2024 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ 80,7 80,8 @@ struct scgi {
} scgi_stats;
};
-extern int scgisvc(const char *host, uint16_t port, int nthreads,
- const struct scgiops *ops, void *private);
+extern struct socksvc *scgisvc(const char *host, uint16_t port, int nthreads,
+ const struct scgiops *ops, void *private);
+extern void scgisvc_stop(struct socksvc *svc);
#endif
M include/jeffpc/socksvc.h +8 -4
@@ 1,5 1,5 @@
/*
- * Copyright (c) 2015-2017 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
+ * Copyright (c) 2015-2017,2024 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ 31,8 31,12 @@ struct socksvc_stats {
uint64_t dequeued_time; /* time when this conn started being processed */
};
-extern int socksvc(const char *host, uint16_t port, int nthreads,
- void (*func)(int fd, struct socksvc_stats *, void *),
- void *private);
+struct socksvc;
+
+extern struct socksvc *socksvc(const char *host, uint16_t port, int nthreads,
+ void (*func)(int fd, struct socksvc_stats *,
+ void *),
+ void *private);
+extern void socksvc_stop(struct socksvc *state);
#endif
M mapfile-vers +1 -0
@@ 273,6 273,7 @@ JEFFPC_0.10 {
# socksvc
socksvc;
+ socksvc_stop;
# str / sym
_strsym_alloc;
M scgisvc.c +28 -9
@@ 1,5 1,5 @@
/*
- * Copyright (c) 2014-2020 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
+ * Copyright (c) 2014-2020,2024 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ 420,16 420,35 @@ out:
scgi_free(req, false);
}
-int scgisvc(const char *host, uint16_t port, int nthreads,
- const struct scgiops *ops, void *private)
+struct socksvc *scgisvc(const char *host, uint16_t port, int nthreads,
+ const struct scgiops *ops, void *private)
{
- struct scgiargs args = {
- .ops = ops,
- .private = private,
- };
+ struct socksvc *svc;
+ struct scgiargs *args;
if (!ops || !ops->process)
- return -EINVAL;
+ return ERR_PTR(-EINVAL);
+
+ args = malloc(sizeof(struct scgiargs));
+ if (!args)
+ return ERR_PTR(-ENOMEM);
+
+ args->ops = ops;
+ args->private = private;
+
+ svc = socksvc(host, port, nthreads, scgi_conn, args);
+ if (IS_ERR(svc))
+ free(args);
- return socksvc(host, port, nthreads, scgi_conn, &args);
+ return svc;
}
+
+void scgisvc_stop(struct socksvc *svc)
+{
+ if (!svc)
+ return;
+
+ /* FIXME: this leaks the heap-allocated args */
+
+ socksvc_stop(svc);
+}
M socksvc.c +60 -43
@@ 38,6 38,7 @@
#include <jeffpc/mem.h>
#include <jeffpc/sock.h>
#include <jeffpc/socksvc.h>
+#include <jeffpc/thread.h>
/*
* This part of the communication library is a bit different. It is meant
@@ 51,22 52,24 @@
#define MAX_SOCK_FDS 8
#define CONN_BACKLOG 64
-struct state {
+struct socksvc {
void *private;
struct taskq *taskq;
+ pthread_t thread;
void (*func)(int, struct socksvc_stats *, void *);
int fds[MAX_SOCK_FDS];
int nfds;
+
+ atomic_t server_shutdown;
};
struct cb {
struct socksvc_stats stats;
- struct state *state;
+ struct socksvc *state;
int fd;
};
static struct mem_cache *socksvc_cache;
-static atomic_t server_shutdown;
static void __attribute__((constructor)) init_socksvc_subsys(void)
{
@@ 75,13 78,6 @@ static void __attribute__((constructor))
ASSERT(!IS_ERR(socksvc_cache));
}
-static void sigterm_handler(int signum, siginfo_t *info, void *unused)
-{
- cmn_err(CE_INFO, "SIGTERM received");
-
- atomic_set(&server_shutdown, 1);
-}
-
static void handle_signals(void)
{
struct sigaction action;
@@ 96,17 92,9 @@ static void handle_signals(void)
if (ret)
cmn_err(CE_INFO, "Failed to ignore SIGPIPE: %s",
strerror(errno));
-
- action.sa_sigaction = sigterm_handler;
- action.sa_flags = SA_SIGINFO;
-
- ret = sigaction(SIGTERM, &action, NULL);
- if (ret)
- cmn_err(CE_INFO, "Failed to set SIGTERM handler: %s",
- strerror(errno));
}
-static int bind_sock(struct state *state, int family, struct sockaddr *addr,
+static int bind_sock(struct socksvc *state, int family, struct sockaddr *addr,
int addrlen)
{
const int on = 1;
@@ 141,7 129,7 @@ err:
return ret;
}
-static int start_listening(struct state *state, const char *host,
+static int start_listening(struct socksvc *state, const char *host,
uint16_t port)
{
struct addrinfo hints, *res, *p;
@@ 185,7 173,7 @@ static int start_listening(struct state
return 0;
}
-static void stop_listening(struct state *state)
+static void stop_listening(struct socksvc *state)
{
int i;
@@ 205,8 193,9 @@ static void wrap_taskq_callback(void *ar
mem_cache_free(socksvc_cache, cb);
}
-static void accept_conns(struct state *state)
+static void *accept_conns(void *private)
{
+ struct socksvc *state = private;
fd_set set;
int maxfd;
int ret;
@@ 230,7 219,7 @@ static void accept_conns(struct state *s
select_time = gettime();
- if (atomic_read(&server_shutdown))
+ if (atomic_read(&state->server_shutdown))
break;
if ((ret < 0) && (errno != EINTR))
@@ 278,46 267,74 @@ static void accept_conns(struct state *s
ret--;
}
}
+
+ return NULL;
}
-int socksvc(const char *host, uint16_t port, int nthreads,
- void (*func)(int fd, struct socksvc_stats *, void *), void *private)
+struct socksvc *socksvc(const char *host, uint16_t port, int nthreads,
+ void (*func)(int fd, struct socksvc_stats *, void *),
+ void *private)
{
char name[128];
- struct state state;
+ struct socksvc *state;
int ret;
- memset(&state, 0, sizeof(state));
-
- state.func = func;
- state.private = private;
+ state = zalloc(sizeof(struct socksvc));
+ if (!state)
+ return ERR_PTR(-ENOMEM);
- snprintf(name, sizeof(name), "socksvc: %s:%d", host ? host : "<any>",
- port);
+ state->func = func;
+ state->private = private;
+ atomic_set(&state->server_shutdown, false);
- state.taskq = taskq_create_fixed(name, nthreads);
- if (IS_ERR(state.taskq)) {
- ret = PTR_ERR(state.taskq);
+ state->taskq = taskq_create_fixed(name, nthreads);
+ if (IS_ERR(state->taskq)) {
+ ret = PTR_ERR(state->taskq);
goto err;
}
handle_signals();
- ret = start_listening(&state, host, port);
+ ret = start_listening(state, host, port);
if (ret)
goto err_taskq;
- accept_conns(&state);
-
- stop_listening(&state);
+ ret = xthr_create(&state->thread, accept_conns, state);
+ if (ret)
+ goto err_listen;
- taskq_wait(state.taskq);
+ return 0;
- ret = 0;
+err_listen:
+ stop_listening(state);
err_taskq:
- taskq_destroy(state.taskq);
+ taskq_destroy(state->taskq);
err:
- return ret;
+ free(state);
+
+ return ERR_PTR(ret);
}
+
+void socksvc_stop(struct socksvc *state)
+{
+ int ret;
+
+ if (!state)
+ return;
+
+ atomic_set(&state->server_shutdown, true);
+
+ ret = xthr_join(state->thread, NULL);
+ if (ret)
+ cmn_err(CE_ERROR, "%s: failed to join thread: %s\n", __func__,
+ xstrerror(ret));
+
+ stop_listening(state);
+
+ taskq_wait(state->taskq);
+ taskq_destroy(state->taskq);
+
+ free(state);
+}