Преглед на файлове

Merge pull request #1172 from jfriesne/jaf-demand-create-worker-threads

Option to demand-create worker threads rather than always creating all of them inside mg_start()
bel2125 преди 2 години
родител
ревизия
042c508a06
променени са 6 файла, в които са добавени 118 реда и са изтрити 42 реда
  1. 8 3
      docs/Embedding.md
  2. 13 4
      docs/UserManual.md
  3. 1 0
      resources/civetweb.conf
  4. 94 35
      src/civetweb.c
  5. 1 0
      test/page3.ssjs
  6. 1 0
      unittest/private.c

+ 8 - 3
docs/Embedding.md

@@ -213,10 +213,15 @@ about web server instance:
 
 When `mg_start()` returns, all initialization is guaranteed to be complete
 (e.g. listening ports are opened, SSL is initialized, etc). `mg_start()` starts
-some threads: a master thread, that accepts new connections, and several
-worker threads, that process accepted connections. The number of worker threads
-is configurable via `num_threads` configuration option. That number puts a
+some threads: a master thread, that accepts new connections, and optionally some
+worker threads, that process accepted connections. The maximum number of worker
+threads is configurable via `num_threads` configuration option. That number puts a
 limit on number of simultaneous requests that can be handled by CivetWeb.
+
+The number of worker threads to be pre-spawned at startup is specified via the
+'prespawn_threads' configuration option; worker threads that are not pre-spawned
+will instead be demand-created the first time they are needed.
+
 If you embed CivetWeb into a program that uses SSL outside CivetWeb as well,
 you may need to initialize SSL before calling `mg_start()`, and set the pre-
 processor define `SSL_ALREADY_INITIALIZED`. This is not required if SSL is

+ 13 - 4
docs/UserManual.md

@@ -558,8 +558,8 @@ The configuration value is approximate, the real limit might be a few bytes off.
 The minimum is 1024 (1 kB).
 
 ### num\_threads `50`
-Number of worker threads. CivetWeb handles each incoming connection in a
-separate thread. Therefore, the value of this option is effectively the number
+Maximum number of worker threads allowed. CivetWeb handles each incoming connection
+in a separate thread. Therefore, the value of this option is effectively the number
 of concurrent HTTP connections CivetWeb can handle.
 
 If there are more simultaneous requests (connection attempts), they are queued.
@@ -572,6 +572,15 @@ In case the clients are web browsers, it is recommended to use `num_threads` of
 at least 5, since browsers often establish multiple connections to load a single
 web page, including all linked documents (CSS, JavaScript, images, ...).
 
+### prespawn\_threads '0'
+Number of worker threads that should be pre-spawned by mg_start().  Defaults to
+0, meaning no worker threads will be pre-spawned at startup; rather, worker threads
+will be spawned when a new connection comes in and there aren't currently any
+idle worker threads available to handle it (if we haven't already reached the
+maximum worker-thread count as specified by num_threads).  If this value is
+specified less than zero, or greater than the value of num_threads, it will be
+treated as if it was specified to be equal to the value of num_threads.
+
 ### listen\_backlog `200`
 Maximum number of connections waiting to be accepted by the server operating system.
 Internally, this parameter is passed to the "listen" socket/system call.
@@ -876,8 +885,8 @@ All port, socket, process and thread specific parameters are per server:
 `enable_http2`, `enable_keep_alive`, `enable_websocket_ping_pong`,
 `keep_alive_timeout_ms`, `linger_timeout_ms`, `listen_backlog`,
 `listening_ports`, `lua_background_script`, `lua_background_script_params`,
-`max_request_size`, `num_threads`, `request_timeout_ms`, `run_as_user`,
-`tcp_nodelay`, `throttle`, `websocket_timeout_ms` + all options from `main.c`.
+`max_request_size`, `num_threads`, 'prespawn_threads', `request_timeout_ms`,
+`run_as_user`, `tcp_nodelay`, `throttle`, `websocket_timeout_ms` + all options from `main.c`.
 
 All other options can be set per domain. In particular
 `authentication_domain`, `document_root` and (for HTTPS) `ssl_certificate`

+ 1 - 0
resources/civetweb.conf

@@ -26,6 +26,7 @@ listening_ports 8080
 # extra_mime_types 
 # ssl_certificate 
 # num_threads 50
+# prespawn_threads 0
 # run_as_user 
 # url_rewrite_patterns 
 # hide_files_patterns 

+ 94 - 35
src/civetweb.c

@@ -1934,6 +1934,7 @@ enum {
 	/* Once for each server */
 	LISTENING_PORTS,
 	NUM_THREADS,
+	PRESPAWN_THREADS,
 	RUN_AS_USER,
 	CONFIG_TCP_NODELAY, /* Prepended CONFIG_ to avoid conflict with the
 	                     * socket option typedef TCP_NODELAY. */
@@ -2081,6 +2082,7 @@ static const struct mg_option config_options[] = {
     /* Once for each server */
     {"listening_ports", MG_CONFIG_TYPE_STRING_LIST, "8080"},
     {"num_threads", MG_CONFIG_TYPE_NUMBER, "50"},
+    {"prespawn_threads", MG_CONFIG_TYPE_NUMBER, "0"},
     {"run_as_user", MG_CONFIG_TYPE_STRING, NULL},
     {"tcp_nodelay", MG_CONFIG_TYPE_NUMBER, "0"},
     {"max_request_size", MG_CONFIG_TYPE_NUMBER, "16384"},
@@ -2394,7 +2396,14 @@ struct mg_context {
 
 	pthread_t masterthreadid; /* The master thread ID */
 	unsigned int
-	    cfg_worker_threads;      /* The number of configured worker threads. */
+	    cfg_max_worker_threads;  /* How many worker-threads we are allowed to create, total */
+
+	unsigned int
+	    spawned_worker_threads;  /* How many worker-threads currently exist (modified by master thread) */
+	unsigned int
+	    idle_worker_thread_count; /* How many worker-threads are currently sitting around with nothing to do */
+	                              /* Access to this value MUST be synchronized by thread_mutex */
+
 	pthread_t *worker_threadids; /* The worker thread IDs */
 	unsigned long starter_thread_idx; /* thread index which called mg_start */
 
@@ -18089,7 +18098,7 @@ mg_close_connection(struct mg_connection *conn)
 		 * timeouts, we will just wait a few seconds in mg_join_thread. */
 
 		/* join worker thread */
-		for (i = 0; i < conn->phys_ctx->cfg_worker_threads; i++) {
+		for (i = 0; i < conn->phys_ctx->spawned_worker_threads; i++) {
 			mg_join_thread(conn->phys_ctx->worker_threadids[i]);
 		}
 	}
@@ -19299,7 +19308,8 @@ mg_connect_websocket_client_impl(const struct mg_client_options *client_options,
 	/* Now upgrade to ws/wss client context */
 	conn->phys_ctx->user_data = user_data;
 	conn->phys_ctx->context_type = CONTEXT_WS_CLIENT;
-	conn->phys_ctx->cfg_worker_threads = 1; /* one worker thread */
+	conn->phys_ctx->cfg_max_worker_threads = 1; /* one worker thread */
+	conn->phys_ctx->spawned_worker_threads = 1; /* one worker thread */
 
 	/* Start a thread to read the websocket client connection
 	 * This thread will automatically stop when mg_disconnect is
@@ -19308,7 +19318,7 @@ mg_connect_websocket_client_impl(const struct mg_client_options *client_options,
 	                            thread_data,
 	                            conn->phys_ctx->worker_threadids)
 	    != 0) {
-		conn->phys_ctx->cfg_worker_threads = 0;
+		conn->phys_ctx->spawned_worker_threads = 0;
 		mg_free(thread_data);
 		mg_close_connection(conn);
 		conn = NULL;
@@ -19679,6 +19689,7 @@ process_new_connection(struct mg_connection *conn)
 #endif
 }
 
+static int mg_start_worker_thread(struct mg_context *ctx, int only_if_no_idle_threads);  /* forward declaration */
 
 #if defined(ALTERNATIVE_QUEUE)
 
@@ -19687,8 +19698,10 @@ produce_socket(struct mg_context *ctx, const struct socket *sp)
 {
 	unsigned int i;
 
+	(void)mg_start_worker_thread(ctx, 1);  /* will start a worker-thread only if there aren't currently any idle worker-threads */
+
 	while (!ctx->stop_flag) {
-		for (i = 0; i < ctx->cfg_worker_threads; i++) {
+		for (i = 0; i < ctx->spawned_worker_threads; i++) {
 			/* find a free worker slot and signal it */
 			if (ctx->client_socks[i].in_use == 2) {
 				(void)pthread_mutex_lock(&ctx->thread_mutex);
@@ -19713,10 +19726,13 @@ produce_socket(struct mg_context *ctx, const struct socket *sp)
 
 
 static int
-consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
+consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index, int counter_was_preincremented)
 {
 	DEBUG_TRACE("%s", "going idle");
 	(void)pthread_mutex_lock(&ctx->thread_mutex);
+	if (counter_was_preincremented == 0) {  /* first call only: the master-thread pre-incremented this before he spawned us */
+		ctx->idle_worker_thread_count++;
+	}
 	ctx->client_socks[thread_index].in_use = 2;
 	(void)pthread_mutex_unlock(&ctx->thread_mutex);
 
@@ -19733,6 +19749,7 @@ consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
 		}
 		return 0;
 	}
+	ctx->idle_worker_thread_count--;
 	(void)pthread_mutex_unlock(&ctx->thread_mutex);
 	if (sp->in_use == 1) {
 		DEBUG_TRACE("grabbed socket %d, going busy", sp->sock);
@@ -19747,12 +19764,15 @@ consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
 
 /* Worker threads take accepted socket from the queue */
 static int
-consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
+consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index, int counter_was_preincremented)
 {
 	(void)thread_index;
 
-	(void)pthread_mutex_lock(&ctx->thread_mutex);
 	DEBUG_TRACE("%s", "going idle");
+	(void)pthread_mutex_lock(&ctx->thread_mutex);
+	if (counter_was_preincremented == 0) {  /* first call only: the master-thread pre-incremented this before he spawned us */
+		ctx->idle_worker_thread_count++;
+	}
 
 	/* If the queue is empty, wait. We're idle at this point. */
 	while ((ctx->sq_head == ctx->sq_tail)
@@ -19776,6 +19796,8 @@ consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
 	}
 
 	(void)pthread_cond_signal(&ctx->sq_empty);
+
+	ctx->idle_worker_thread_count--;
 	(void)pthread_mutex_unlock(&ctx->thread_mutex);
 
 	return STOP_FLAG_IS_ZERO(&ctx->stop_flag);
@@ -19822,6 +19844,8 @@ produce_socket(struct mg_context *ctx, const struct socket *sp)
 
 	(void)pthread_cond_signal(&ctx->sq_full);
 	(void)pthread_mutex_unlock(&ctx->thread_mutex);
+
+	(void)mg_start_worker_thread(ctx, 1);  /* will start a worker-thread only if there aren't currently any idle worker-threads */
 }
 #endif /* ALTERNATIVE_QUEUE */
 
@@ -19832,6 +19856,7 @@ worker_thread_run(struct mg_connection *conn)
 	struct mg_context *ctx = conn->phys_ctx;
 	int thread_index;
 	struct mg_workerTLS tls;
+	int first_call_to_consume_socket = 1;
 
 	mg_set_thread_name("worker");
 
@@ -19857,7 +19882,7 @@ worker_thread_run(struct mg_connection *conn)
 	/* Connection structure has been pre-allocated */
 	thread_index = (int)(conn - ctx->worker_connections);
 	if ((thread_index < 0)
-	    || ((unsigned)thread_index >= (unsigned)ctx->cfg_worker_threads)) {
+	    || ((unsigned)thread_index >= (unsigned)ctx->cfg_max_worker_threads)) {
 		mg_cry_ctx_internal(ctx,
 		                    "Internal error: Invalid worker index %i",
 		                    thread_index);
@@ -19898,7 +19923,8 @@ worker_thread_run(struct mg_connection *conn)
 	/* Call consume_socket() even when ctx->stop_flag > 0, to let it
 	 * signal sq_empty condvar to wake up the master waiting in
 	 * produce_socket() */
-	while (consume_socket(ctx, &conn->client, thread_index)) {
+	while (consume_socket(ctx, &conn->client, thread_index, first_call_to_consume_socket)) {
+		first_call_to_consume_socket = 0;
 
 		/* New connections must start with new protocol negotiation */
 		tls.alpn_proto = NULL;
@@ -20289,7 +20315,7 @@ master_thread_run(struct mg_context *ctx)
 
 	/* Wakeup workers that are waiting for connections to handle. */
 #if defined(ALTERNATIVE_QUEUE)
-	for (i = 0; i < ctx->cfg_worker_threads; i++) {
+	for (i = 0; i < ctx->spawned_worker_threads; i++) {
 		event_signal(ctx->client_wait_events[i]);
 	}
 #else
@@ -20299,7 +20325,7 @@ master_thread_run(struct mg_context *ctx)
 #endif
 
 	/* Join all worker threads to avoid leaking threads. */
-	workerthreadcount = ctx->cfg_worker_threads;
+	workerthreadcount = ctx->spawned_worker_threads;
 	for (i = 0; i < workerthreadcount; i++) {
 		if (ctx->worker_threadids[i] != 0) {
 			mg_join_thread(ctx->worker_threadids[i]);
@@ -20403,7 +20429,7 @@ free_context(struct mg_context *ctx)
 #if defined(ALTERNATIVE_QUEUE)
 	mg_free(ctx->client_socks);
 	if (ctx->client_wait_events != NULL) {
-		for (i = 0; (unsigned)i < ctx->cfg_worker_threads; i++) {
+		for (i = 0; (unsigned)i < ctx->spawned_worker_threads; i++) {
 			event_destroy(ctx->client_wait_events[i]);
 		}
 		mg_free(ctx->client_wait_events);
@@ -20674,12 +20700,45 @@ mg_socketpair(int *sockA, int *sockB)
 #endif
 }
 
+static int mg_start_worker_thread(struct mg_context *ctx, int only_if_no_idle_threads) {
+	const unsigned int i = ctx->spawned_worker_threads;
+	if (i >= ctx->cfg_max_worker_threads) {
+		return -1;  /* Oops, we hit our worker-thread limit!  No more worker threads, ever! */
+	}
+
+	(void)pthread_mutex_lock(&ctx->thread_mutex);
+#if defined(ALTERNATIVE_QUEUE)
+	if ((only_if_no_idle_threads)&&(ctx->idle_worker_thread_count > 0)) {
+#else
+	if ((only_if_no_idle_threads)&&(ctx->idle_worker_thread_count > (unsigned)(ctx->sq_head-ctx->sq_tail))) {
+#endif
+		(void)pthread_mutex_unlock(&ctx->thread_mutex);
+		return -2;  /* There are idle threads available, so no need to spawn a new worker thread now */
+	}
+	ctx->idle_worker_thread_count++;  /* we do this here to avoid a race condition while the thread is starting up */
+	(void)pthread_mutex_unlock(&ctx->thread_mutex);
+
+	ctx->worker_connections[i].phys_ctx = ctx;
+	int ret = mg_start_thread_with_id(worker_thread,
+	                            &ctx->worker_connections[i],
+	                            &ctx->worker_threadids[i]);
+	if (ret == 0) {
+		ctx->spawned_worker_threads++;  /* note that we've filled another slot in the table */
+		DEBUG_TRACE("Started worker_thread #%i", ctx->spawned_worker_threads);
+	} else {
+		(void)pthread_mutex_lock(&ctx->thread_mutex);
+		ctx->idle_worker_thread_count--;  /* whoops, roll-back on error */
+		(void)pthread_mutex_unlock(&ctx->thread_mutex);
+	}
+	return ret;
+}
+
 CIVETWEB_API struct mg_context *
 mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 {
 	struct mg_context *ctx;
 	const char *name, *value, *default_value;
-	int idx, ok, workerthreadcount;
+	int idx, ok, prespawnthreadcount, workerthreadcount;
 	unsigned int i;
 	int itmp;
 	void (*exit_callback)(const struct mg_context *ctx) = 0;
@@ -20931,7 +20990,12 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 #endif
 
 	/* Worker thread count option */
-	workerthreadcount = atoi(ctx->dd.config[NUM_THREADS]);
+	workerthreadcount   = atoi(ctx->dd.config[NUM_THREADS]);
+	prespawnthreadcount = atoi(ctx->dd.config[PRESPAWN_THREADS]);
+
+	if ((prespawnthreadcount < 0)||(prespawnthreadcount > workerthreadcount)) {
+		prespawnthreadcount = workerthreadcount;  /* can't prespawn more than all of them! */
+	}
 
 	if ((workerthreadcount > MAX_WORKER_THREADS) || (workerthreadcount <= 0)) {
 		if (workerthreadcount <= 0) {
@@ -21196,8 +21260,8 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 		return NULL;
 	}
 
-	ctx->cfg_worker_threads = ((unsigned int)(workerthreadcount));
-	ctx->worker_threadids = (pthread_t *)mg_calloc_ctx(ctx->cfg_worker_threads,
+	ctx->cfg_max_worker_threads = ((unsigned int)(workerthreadcount));
+	ctx->worker_threadids = (pthread_t *)mg_calloc_ctx(ctx->cfg_max_worker_threads,
 	                                                   sizeof(pthread_t),
 	                                                   ctx);
 
@@ -21208,7 +21272,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 		if (error != NULL) {
 			error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
 			error->code_sub =
-			    (unsigned)ctx->cfg_worker_threads * (unsigned)sizeof(pthread_t);
+			    (unsigned)ctx->cfg_max_worker_threads * (unsigned)sizeof(pthread_t);
 			mg_snprintf(NULL,
 			            NULL, /* No truncation check for error buffers */
 			            error->text,
@@ -21222,7 +21286,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 		return NULL;
 	}
 	ctx->worker_connections =
-	    (struct mg_connection *)mg_calloc_ctx(ctx->cfg_worker_threads,
+	    (struct mg_connection *)mg_calloc_ctx(ctx->cfg_max_worker_threads,
 	                                          sizeof(struct mg_connection),
 	                                          ctx);
 	if (ctx->worker_connections == NULL) {
@@ -21232,7 +21296,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 
 		if (error != NULL) {
 			error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
-			error->code_sub = (unsigned)ctx->cfg_worker_threads
+			error->code_sub = (unsigned)ctx->cfg_max_worker_threads
 			                  * (unsigned)sizeof(struct mg_connection);
 			mg_snprintf(NULL,
 			            NULL, /* No truncation check for error buffers */
@@ -21249,7 +21313,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 
 #if defined(ALTERNATIVE_QUEUE)
 	ctx->client_wait_events =
-	    (void **)mg_calloc_ctx(ctx->cfg_worker_threads,
+	    (void **)mg_calloc_ctx(ctx->cfg_max_worker_threads,
 	                           sizeof(ctx->client_wait_events[0]),
 	                           ctx);
 	if (ctx->client_wait_events == NULL) {
@@ -21259,7 +21323,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 
 		if (error != NULL) {
 			error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
-			error->code_sub = (unsigned)ctx->cfg_worker_threads
+			error->code_sub = (unsigned)ctx->cfg_max_worker_threads
 			                  * (unsigned)sizeof(ctx->client_wait_events[0]);
 			mg_snprintf(NULL,
 			            NULL, /* No truncation check for error buffers */
@@ -21275,7 +21339,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 	}
 
 	ctx->client_socks =
-	    (struct socket *)mg_calloc_ctx(ctx->cfg_worker_threads,
+	    (struct socket *)mg_calloc_ctx(ctx->cfg_max_worker_threads,
 	                                   sizeof(ctx->client_socks[0]),
 	                                   ctx);
 	if (ctx->client_socks == NULL) {
@@ -21286,7 +21350,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 
 		if (error != NULL) {
 			error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
-			error->code_sub = (unsigned)ctx->cfg_worker_threads
+			error->code_sub = (unsigned)ctx->cfg_max_worker_threads
 			                  * (unsigned)sizeof(ctx->client_socks[0]);
 			mg_snprintf(NULL,
 			            NULL, /* No truncation check for error buffers */
@@ -21301,7 +21365,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 		return NULL;
 	}
 
-	for (i = 0; (unsigned)i < ctx->cfg_worker_threads; i++) {
+	for (i = 0; (unsigned)i < ctx->cfg_max_worker_threads; i++) {
 		ctx->client_wait_events[i] = event_create();
 		if (ctx->client_wait_events[i] == 0) {
 			const char *err_msg = "Error creating worker event %i";
@@ -21365,23 +21429,18 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
 	ctx->context_type = CONTEXT_SERVER; /* server context */
 
 	/* Start worker threads */
-	for (i = 0; i < ctx->cfg_worker_threads; i++) {
+	for (i = 0; (int)i < prespawnthreadcount; i++) {
 		/* worker_thread sets up the other fields */
-		ctx->worker_connections[i].phys_ctx = ctx;
-		if (mg_start_thread_with_id(worker_thread,
-		                            &ctx->worker_connections[i],
-		                            &ctx->worker_threadids[i])
-		    != 0) {
-
+		if (mg_start_worker_thread(ctx, 0) != 0) {
 			long error_no = (long)ERRNO;
 
 			/* thread was not created */
-			if (i > 0) {
+			if (ctx->spawned_worker_threads > 0) {
 				/* If the second, third, ... thread cannot be created, set a
 				 * warning, but keep running. */
 				mg_cry_ctx_internal(ctx,
 				                    "Cannot start worker thread %i: error %ld",
-				                    i + 1,
+				                    ctx->spawned_worker_threads + 1,
 				                    error_no);
 
 				/* If the server initialization should stop here, all
@@ -22311,7 +22370,7 @@ mg_get_connection_info(const struct mg_context *ctx,
 		return 0;
 	}
 
-	if ((unsigned)idx >= ctx->cfg_worker_threads) {
+	if ((unsigned)idx >= ctx->cfg_max_worker_threads) {
 		/* Out of range */
 		return 0;
 	}

+ 1 - 0
test/page3.ssjs

@@ -22,6 +22,7 @@ opts = [
 "fallback_document_root",
 "ssl_certificate",
 "num_threads",
+"prespawn_threads",
 "run_as_user",
 "url_rewrite_patterns",
 "hide_files_patterns",

+ 1 - 0
unittest/private.c

@@ -1627,6 +1627,7 @@ START_TEST(test_config_options)
 	ck_assert_str_eq("ssl_certificate_chain",
 	                 config_options[SSL_CERTIFICATE_CHAIN].name);
 	ck_assert_str_eq("num_threads", config_options[NUM_THREADS].name);
+	ck_assert_str_eq("prespawn_threads", config_options[PRESPAWN_THREADS].name);
 	ck_assert_str_eq("run_as_user", config_options[RUN_AS_USER].name);
 	ck_assert_str_eq("url_rewrite_patterns",
 	                 config_options[URL_REWRITE_PATTERN].name);