Explorar o código

Add "STOP_FLAG_NEEDS_LOCK" define

The ctx->stop_flag is a boolean flag used to stop the server. It is written
once in "mg_stop" from 0 to 1. All CivetWeb threads check this flag in various
functions in a loop.
The flag is small enough to be read and written atomically, and it is declared
volatile (C volatile, not C++ volatile), so the compiler must not assume the
value will not change in another thread.
Still, tools like thread sanitizer raise a warning for this kind of exit
notification ... there could be an compiler+architecture combination that
might not understand this pattern. I did not encounter any such combination
in more than 20 years, but ... in theory ... such a combination might exist.
Thus, the STOP_FLAG_NEEDS_LOCK preprocessor define has been added.

See also #861
bel2125 %!s(int64=5) %!d(string=hai) anos
pai
achega
486ca49c49
Modificáronse 3 ficheiros con 81 adicións e 32 borrados
  1. 3 0
      examples/embedded_cpp/embedded_cpp.cpp
  2. 77 31
      src/civetweb.c
  3. 1 1
      src/timer.inl

+ 3 - 0
examples/embedded_cpp/embedded_cpp.cpp

@@ -380,6 +380,8 @@ class WebSocketHandler : public CivetWebSocketHandler {
 int
 main(int argc, char *argv[])
 {
+	mg_init_library(0);
+	
 	const char *options[] = {
 	    "document_root", DOCUMENT_ROOT, "listening_ports", PORT, 0};
     
@@ -438,6 +440,7 @@ main(int argc, char *argv[])
 	}
 
 	printf("Bye!\n");
+	mg_exit_library();
 
 	return 0;
 }

+ 77 - 31
src/civetweb.c

@@ -2744,6 +2744,44 @@ struct mg_domain_context {
 };
 
 
+/* Stop flag can be "volatile" or require a lock */
+typedef int volatile stop_flag_t;
+
+#ifdef STOP_FLAG_NEEDS_LOCK
+static int
+STOP_FLAG_IS_ZERO(stop_flag_t *f)
+{
+	int r;
+	mg_global_lock();
+	r = ((*f) == 0);
+	mg_global_unlock();
+	return r;
+}
+
+static int
+STOP_FLAG_IS_TWO(stop_flag_t *f)
+{
+	int r;
+	mg_global_lock();
+	r = ((*f) == 2);
+	mg_global_unlock();
+	return r;
+}
+
+static void
+STOP_FLAG_ASSIGN(stop_flag_t *f, int v)
+{
+	mg_global_lock();
+	(*f) = v;
+	mg_global_unlock();
+}
+#else /* STOP_FLAG_NEEDS_LOCK */
+#define STOP_FLAG_IS_ZERO(f) ((*(f)) == 0)
+#define STOP_FLAG_IS_TWO(f) ((*(f)) == 2)
+#define STOP_FLAG_ASSIGN(f, v) ((*(f)) = (v))
+#endif /* STOP_FLAG_NEEDS_LOCK */
+
+
 struct mg_context {
 
 	/* Part 1 - Physical context:
@@ -2772,7 +2810,7 @@ struct mg_context {
 #endif
 
 	/* Thread related */
-	volatile int stop_flag;       /* Should we stop event loop */
+	stop_flag_t stop_flag;        /* Should we stop event loop */
 	pthread_mutex_t thread_mutex; /* Protects (max|num)_threads */
 
 	pthread_t masterthreadid; /* The master thread ID */
@@ -6338,7 +6376,7 @@ static int
 mg_poll(struct mg_pollfd *pfd,
         unsigned int n,
         int milliseconds,
-        volatile int *stop_server)
+        stop_flag_t *stop_flag)
 {
 	/* Call poll, but only for a maximum time of a few seconds.
 	 * This will allow to stop the server after some seconds, instead
@@ -6348,7 +6386,7 @@ mg_poll(struct mg_pollfd *pfd,
 	do {
 		int result;
 
-		if (*stop_server) {
+		if (!STOP_FLAG_IS_ZERO(&*stop_flag)) {
 			/* Shut down signal */
 			return -2;
 		}
@@ -6469,7 +6507,7 @@ push_inner(struct mg_context *ctx,
 			}
 		}
 
-		if (ctx->stop_flag) {
+		if (!STOP_FLAG_IS_ZERO(&ctx->stop_flag)) {
 			return -2;
 		}
 
@@ -6505,7 +6543,7 @@ push_inner(struct mg_context *ctx,
 			pfd[0].fd = sock;
 			pfd[0].events = POLLOUT;
 			pollres = mg_poll(pfd, 1, (int)(ms_wait), &(ctx->stop_flag));
-			if (ctx->stop_flag) {
+			if (!STOP_FLAG_IS_ZERO(&ctx->stop_flag)) {
 				return -2;
 			}
 			if (pollres > 0) {
@@ -6548,7 +6586,7 @@ push_all(struct mg_context *ctx,
 		timeout = atoi(ctx->dd.config[REQUEST_TIMEOUT]) / 1000.0;
 	}
 
-	while ((len > 0) && (ctx->stop_flag == 0)) {
+	while ((len > 0) && STOP_FLAG_IS_ZERO(&ctx->stop_flag)) {
 		n = push_inner(ctx, fp, sock, ssl, buf + nwritten, len, timeout);
 		if (n < 0) {
 			if (nwritten == 0) {
@@ -6652,7 +6690,7 @@ pull_inner(FILE *fp,
 		                  1,
 		                  (int)(timeout * 1000.0),
 		                  &(conn->phys_ctx->stop_flag));
-		if (conn->phys_ctx->stop_flag) {
+		if (!STOP_FLAG_IS_ZERO(&conn->phys_ctx->stop_flag)) {
 			return -2;
 		}
 		if (pollres > 0) {
@@ -6691,7 +6729,7 @@ pull_inner(FILE *fp,
 		                  1,
 		                  (int)(timeout * 1000.0),
 		                  &(conn->phys_ctx->stop_flag));
-		if (conn->phys_ctx->stop_flag) {
+		if (!STOP_FLAG_IS_ZERO(&conn->phys_ctx->stop_flag)) {
 			return -2;
 		}
 		if (pollres > 0) {
@@ -6710,7 +6748,7 @@ pull_inner(FILE *fp,
 		}
 	}
 
-	if (conn->phys_ctx->stop_flag) {
+	if (!STOP_FLAG_IS_ZERO(&conn->phys_ctx->stop_flag)) {
 		return -2;
 	}
 
@@ -6782,7 +6820,7 @@ pull_all(FILE *fp, struct mg_connection *conn, char *buf, int len)
 		timeout_ns = (uint64_t)(timeout * 1.0E9);
 	}
 
-	while ((len > 0) && (conn->phys_ctx->stop_flag == 0)) {
+	while ((len > 0) && STOP_FLAG_IS_ZERO(&conn->phys_ctx->stop_flag)) {
 		n = pull_inner(fp, conn, buf + nread, len, timeout);
 		if (n == -2) {
 			if (nread == 0) {
@@ -7021,7 +7059,8 @@ mg_write(struct mg_connection *conn, const void *buf, size_t len)
 		    == allowed) {
 			buf = (const char *)buf + total;
 			conn->last_throttle_bytes += total;
-			while ((total < (int)len) && (conn->phys_ctx->stop_flag == 0)) {
+			while ((total < (int)len)
+			       && STOP_FLAG_IS_ZERO(&conn->phys_ctx->stop_flag)) {
 				allowed = (conn->throttle > ((int)len - total))
 				              ? (int)len - total
 				              : conn->throttle;
@@ -9410,7 +9449,8 @@ connect_socket(struct mg_context *ctx /* may be NULL */,
 		struct mg_pollfd pfd[1];
 		int pollres;
 		int ms_wait = 10000; /* 10 second timeout */
-		int nonstop = 0;
+		stop_flag_t nonstop;
+		STOP_FLAG_ASSIGN(&nonstop, 0);
 
 		/* For a non-blocking socket, the connect sequence is:
 		 * 1) call connect (will not block)
@@ -11025,7 +11065,7 @@ read_message(FILE *fp,
 
 	while (request_len == 0) {
 		/* Full request not yet received */
-		if (conn->phys_ctx->stop_flag != 0) {
+		if (!STOP_FLAG_IS_ZERO(&conn->phys_ctx->stop_flag)) {
 			/* Server is to be stopped. */
 			return -1;
 		}
@@ -12686,7 +12726,8 @@ read_websocket(struct mg_connection *conn,
 
 	/* Loop continuously, reading messages from the socket, invoking the
 	 * callback, and waiting repeatedly until an error occurs. */
-	while (!conn->phys_ctx->stop_flag && !conn->must_close) {
+	while (STOP_FLAG_IS_ZERO(&conn->phys_ctx->stop_flag)
+	       && (!conn->must_close)) {
 		header_len = 0;
 		DEBUG_ASSERT(conn->data_len >= conn->request_len);
 		if ((body_len = (size_t)(conn->data_len - conn->request_len)) >= 2) {
@@ -12889,7 +12930,8 @@ read_websocket(struct mg_connection *conn,
 				/* Reset open PING count */
 				ping_count = 0;
 			} else {
-				if (!conn->phys_ctx->stop_flag && !conn->must_close) {
+				if (STOP_FLAG_IS_ZERO(&conn->phys_ctx->stop_flag)
+				    && (!conn->must_close)) {
 					if (ping_count > MG_MAX_UNANSWERED_PING) {
 						/* Stop sending PING */
 						DEBUG_TRACE("Too many (%i) unanswered ping from %s:%u "
@@ -15653,7 +15695,7 @@ static int
 sslize(struct mg_connection *conn,
        SSL_CTX *s,
        int (*func)(SSL *),
-       volatile int *stop_server,
+       stop_flag_t *stop_flag,
        const struct mg_client_options *client_options)
 {
 	int ret, err;
@@ -15719,7 +15761,7 @@ sslize(struct mg_connection *conn,
 			    || (err == SSL_ERROR_WANT_ACCEPT)
 			    || (err == SSL_ERROR_WANT_READ) || (err == SSL_ERROR_WANT_WRITE)
 			    || (err == SSL_ERROR_WANT_X509_LOOKUP)) {
-				if (*stop_server) {
+				if (!STOP_FLAG_IS_ZERO(&*stop_flag)) {
 					/* Don't wait if the server is going to be stopped. */
 					break;
 				}
@@ -15737,7 +15779,7 @@ sslize(struct mg_connection *conn,
 					              || (err == SSL_ERROR_WANT_WRITE))
 					                 ? POLLOUT
 					                 : POLLIN;
-					pollres = mg_poll(&pfd, 1, 50, stop_server);
+					pollres = mg_poll(&pfd, 1, 50, stop_flag);
 					if (pollres < 0) {
 						/* Break if error occured (-1)
 						 * or server shutdown (-2) */
@@ -17118,7 +17160,7 @@ mg_close_connection(struct mg_connection *conn)
 		unsigned int i;
 
 		/* client context: loops must end */
-		conn->phys_ctx->stop_flag = 1;
+		STOP_FLAG_ASSIGN(&conn->phys_ctx->stop_flag, 1);
 		conn->must_close = 1;
 
 		/* We need to get the client thread out of the select/recv call
@@ -18106,7 +18148,7 @@ websocket_client_thread(void *data)
 
 	/* The websocket_client context has only this thread. If it runs out,
 	   set the stop_flag to 2 (= "stopped"). */
-	cdata->conn->phys_ctx->stop_flag = 2;
+	STOP_FLAG_ASSIGN(&cdata->conn->phys_ctx->stop_flag, 2);
 
 	if (cdata->conn->phys_ctx->callbacks.exit_thread) {
 		cdata->conn->phys_ctx->callbacks.exit_thread(cdata->conn->phys_ctx,
@@ -18553,8 +18595,9 @@ process_new_connection(struct mg_connection *conn)
 		 * Therefore, memorize should_keep_alive() result now for later
 		 * use in loop exit condition. */
 		/* Enable it only if this request is completely discardable. */
-		keep_alive = (conn->phys_ctx->stop_flag == 0) && should_keep_alive(conn)
-		             && (conn->content_len >= 0) && (conn->request_len > 0)
+		keep_alive = STOP_FLAG_IS_ZERO(&conn->phys_ctx->stop_flag)
+		             && should_keep_alive(conn) && (conn->content_len >= 0)
+		             && (conn->request_len > 0)
 		             && ((conn->is_chunked == 4)
 		                 || (!conn->is_chunked
 		                     && ((conn->consumed_content == conn->content_len)
@@ -18679,7 +18722,8 @@ consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
 	DEBUG_TRACE("%s", "going idle");
 
 	/* If the queue is empty, wait. We're idle at this point. */
-	while ((ctx->sq_head == ctx->sq_tail) && (ctx->stop_flag == 0)) {
+	while ((ctx->sq_head == ctx->sq_tail)
+	       && (STOP_FLAG_IS_ZERO(&ctx->stop_flag))) {
 		pthread_cond_wait(&ctx->sq_full, &ctx->thread_mutex);
 	}
 
@@ -18701,7 +18745,7 @@ consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
 	(void)pthread_cond_signal(&ctx->sq_empty);
 	(void)pthread_mutex_unlock(&ctx->thread_mutex);
 
-	return !ctx->stop_flag;
+	return STOP_FLAG_IS_ZERO(&ctx->stop_flag);
 }
 
 
@@ -18716,7 +18760,8 @@ produce_socket(struct mg_context *ctx, const struct socket *sp)
 	queue_filled = ctx->sq_head - ctx->sq_tail;
 
 	/* If the queue is full, wait */
-	while ((ctx->stop_flag == 0) && (queue_filled >= ctx->sq_size)) {
+	while (STOP_FLAG_IS_ZERO(&ctx->stop_flag)
+	       && (queue_filled >= ctx->sq_size)) {
 		ctx->sq_blocked = 1; /* Status information: All threads bussy */
 #if defined(USE_SERVER_STATS)
 		if (queue_filled > ctx->sq_max_fill) {
@@ -19108,7 +19153,7 @@ master_thread_run(struct mg_context *ctx)
 
 	/* Start the server */
 	pfd = ctx->listening_socket_fds;
-	while (ctx->stop_flag == 0) {
+	while (STOP_FLAG_IS_ZERO(&ctx->stop_flag)) {
 		for (i = 0; i < ctx->num_listening_sockets; i++) {
 			pfd[i].fd = ctx->listening_sockets[i].sock;
 			pfd[i].events = POLLIN;
@@ -19121,7 +19166,8 @@ master_thread_run(struct mg_context *ctx)
 				 * (POLLRDNORM | POLLRDBAND)
 				 * Therefore, we're checking pfd[i].revents & POLLIN, not
 				 * pfd[i].revents == POLLIN. */
-				if ((ctx->stop_flag == 0) && (pfd[i].revents & POLLIN)) {
+				if (STOP_FLAG_IS_ZERO(&ctx->stop_flag)
+				    && (pfd[i].revents & POLLIN)) {
 					accept_new_connection(&ctx->listening_sockets[i], ctx);
 				}
 			}
@@ -19184,7 +19230,7 @@ master_thread_run(struct mg_context *ctx)
 	/* Signal mg_stop() that we're done.
 	 * WARNING: This must be the very last thing this
 	 * thread does, as ctx becomes invalid after this line. */
-	ctx->stop_flag = 2;
+	STOP_FLAG_ASSIGN(&ctx->stop_flag, 2);
 }
 
 
@@ -19325,10 +19371,10 @@ mg_stop(struct mg_context *ctx)
 	ctx->masterthreadid = 0;
 
 	/* Set stop flag, so all threads know they have to exit. */
-	ctx->stop_flag = 1;
+	STOP_FLAG_ASSIGN(&ctx->stop_flag, 1);
 
 	/* Wait until everything has stopped. */
-	while (ctx->stop_flag != 2) {
+	while (!STOP_FLAG_IS_TWO(&ctx->stop_flag)) {
 		(void)mg_sleep(10);
 	}
 
@@ -20070,7 +20116,7 @@ mg_start_domain2(struct mg_context *ctx,
 		return -1;
 	}
 
-	if (ctx->stop_flag != 0) {
+	if (!STOP_FLAG_IS_ZERO(&ctx->stop_flag)) {
 		if ((error != NULL) && (error->text_buffer_size > 0)) {
 			mg_snprintf(NULL,
 			            NULL, /* No truncation check for error buffers */

+ 1 - 1
src/timer.inl

@@ -151,7 +151,7 @@ timer_thread_run(void *thread_func_param)
 
 	d = timer_getcurrenttime(ctx);
 
-	while (ctx->stop_flag == 0) {
+	while (STOP_FLAG_IS_ZERO(&ctx->stop_flag)) {
 		pthread_mutex_lock(&ctx->timers->mutex);
 		if ((ctx->timers->timer_count > 0)
 		    && (d >= ctx->timers->timers[0].time)) {