Forráskód Böngészése

Merge pull request #322 from xtne6f/fix-thread

Fix thread handling
bel2125 9 éve
szülő
commit
03b3428fb9
1 módosított fájl, 80 hozzáadás és 68 törlés
  1. 80 68
      src/civetweb.c

+ 80 - 68
src/civetweb.c

@@ -328,8 +328,7 @@ typedef DWORD pthread_key_t;
 typedef HANDLE pthread_t;
 typedef struct {
 	CRITICAL_SECTION threadIdSec;
-	int waitingthreadcount;       /* The number of threads queued. */
-	pthread_t *waitingthreadhdls; /* The thread handles. */
+	struct mg_workerTLS *waiting_thread; /* The chain of threads */
 } pthread_cond_t;
 
 #ifndef __clockid_t_defined
@@ -1409,13 +1408,10 @@ struct mg_context {
 	int context_type;              /* 1 = server context, 2 = client context */
 
 	struct socket *listening_sockets;
-	in_port_t *listening_ports;
+	struct pollfd *listening_socket_fds;
 	unsigned int num_listening_sockets;
 
-	volatile int
-	    running_worker_threads; /* Number of currently running worker threads */
 	pthread_mutex_t thread_mutex; /* Protects (max|num)_threads */
-	pthread_cond_t thread_cond; /* Condvar for tracking workers terminations */
 
 	struct socket queue[MGSQLEN]; /* Accepted sockets */
 	volatile int sq_head;         /* Head of the socket queue */
@@ -1499,6 +1495,7 @@ struct mg_workerTLS {
 	unsigned long thread_idx;
 #if defined(_WIN32) && !defined(__SYMBIAN32__)
 	HANDLE pthread_cond_helper_mutex;
+	struct mg_workerTLS *next_waiting_thread;
 #endif
 };
 
@@ -1947,7 +1944,13 @@ mg_get_ports(const struct mg_context *ctx, size_t size, int *ports, int *ssl)
 	}
 	for (i = 0; i < size && i < ctx->num_listening_sockets; i++) {
 		ssl[i] = ctx->listening_sockets[i].is_ssl;
-		ports[i] = ctx->listening_ports[i];
+		ports[i] =
+#if defined(USE_IPV6)
+			(ctx->listening_sockets[i].lsa.sa.sa_family == AF_INET6)
+				? ntohs(ctx->listening_sockets[i].lsa.sin6.sin6_port)
+				:
+#endif
+			ntohs(ctx->listening_sockets[i].lsa.sin.sin_port);
 	}
 	return i;
 }
@@ -1967,13 +1970,19 @@ mg_get_server_ports(const struct mg_context *ctx,
 	if (!ctx) {
 		return -1;
 	}
-	if (!ctx->listening_sockets || !ctx->listening_ports) {
+	if (!ctx->listening_sockets) {
 		return -1;
 	}
 
 	for (i = 0; (i < size) && (i < (int)ctx->num_listening_sockets); i++) {
 
-		ports[cnt].port = ctx->listening_ports[i];
+		ports[cnt].port =
+#if defined(USE_IPV6)
+			(ctx->listening_sockets[i].lsa.sa.sa_family == AF_INET6)
+				? ntohs(ctx->listening_sockets[i].lsa.sin6.sin6_port)
+				:
+#endif
+			ntohs(ctx->listening_sockets[i].lsa.sin.sin_port);
 		ports[cnt].is_ssl = ctx->listening_sockets[i].is_ssl;
 		ports[cnt].is_redirect = ctx->listening_sockets[i].ssl_redir;
 
@@ -2863,10 +2872,8 @@ pthread_cond_init(pthread_cond_t *cv, const void *unused)
 {
 	(void)unused;
 	InitializeCriticalSection(&cv->threadIdSec);
-	cv->waitingthreadcount = 0;
-	cv->waitingthreadhdls =
-	    (pthread_t *)mg_calloc(MAX_WORKER_THREADS, sizeof(pthread_t));
-	return (cv->waitingthreadhdls != NULL) ? 0 : -1;
+	cv->waiting_thread = NULL;
+	return 0;
 }
 
 
@@ -2875,7 +2882,7 @@ pthread_cond_timedwait(pthread_cond_t *cv,
                        pthread_mutex_t *mutex,
                        const struct timespec *abstime)
 {
-	struct mg_workerTLS *tls =
+	struct mg_workerTLS **ptls, *tls =
 	    (struct mg_workerTLS *)pthread_getspecific(sTlsKey);
 	int ok;
 	struct timespec tsnow;
@@ -2883,10 +2890,11 @@ pthread_cond_timedwait(pthread_cond_t *cv,
 	DWORD mswaitrel;
 
 	EnterCriticalSection(&cv->threadIdSec);
-	assert(cv->waitingthreadcount < MAX_WORKER_THREADS);
-	cv->waitingthreadhdls[cv->waitingthreadcount] =
-	    tls->pthread_cond_helper_mutex;
-	cv->waitingthreadcount++;
+	/* Add this thread to cv's waiting list */
+	ptls = &cv->waiting_thread;
+	for (; *ptls != NULL; ptls = &(*ptls)->next_waiting_thread);
+	tls->next_waiting_thread = NULL;
+	*ptls = tls;
 	LeaveCriticalSection(&cv->threadIdSec);
 
 	if (abstime) {
@@ -2906,6 +2914,23 @@ pthread_cond_timedwait(pthread_cond_t *cv,
 	pthread_mutex_unlock(mutex);
 	ok = (WAIT_OBJECT_0
 	      == WaitForSingleObject(tls->pthread_cond_helper_mutex, mswaitrel));
+	if (!ok) {
+		ok = 1;
+		EnterCriticalSection(&cv->threadIdSec);
+		ptls = &cv->waiting_thread;
+		for (; *ptls != NULL; ptls = &(*ptls)->next_waiting_thread) {
+			if (*ptls == tls) {
+				*ptls = tls->next_waiting_thread;
+				ok = 0;
+				break;
+			}
+		}
+		LeaveCriticalSection(&cv->threadIdSec);
+		if (ok) {
+			WaitForSingleObject(tls->pthread_cond_helper_mutex, INFINITE);
+		}
+	}
+	/* This thread has been removed from cv's waiting list */
 	pthread_mutex_lock(mutex);
 
 	return ok ? 0 : -1;
@@ -2922,20 +2947,15 @@ pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mutex)
 static int
 pthread_cond_signal(pthread_cond_t *cv)
 {
-	int i;
 	HANDLE wkup = NULL;
 	BOOL ok = FALSE;
 
 	EnterCriticalSection(&cv->threadIdSec);
-	if (cv->waitingthreadcount) {
-		wkup = cv->waitingthreadhdls[0];
-		ok = SetEvent(wkup);
-
-		for (i = 1; i < cv->waitingthreadcount; i++) {
-			cv->waitingthreadhdls[i - 1] = cv->waitingthreadhdls[i];
-		}
-		cv->waitingthreadcount--;
+	if (cv->waiting_thread) {
+		wkup = cv->waiting_thread->pthread_cond_helper_mutex;
+		cv->waiting_thread = cv->waiting_thread->next_waiting_thread;
 
+		ok = SetEvent(wkup);
 		assert(ok);
 	}
 	LeaveCriticalSection(&cv->threadIdSec);
@@ -2948,7 +2968,7 @@ static int
 pthread_cond_broadcast(pthread_cond_t *cv)
 {
 	EnterCriticalSection(&cv->threadIdSec);
-	while (cv->waitingthreadcount) {
+	while (cv->waiting_thread) {
 		pthread_cond_signal(cv);
 	}
 	LeaveCriticalSection(&cv->threadIdSec);
@@ -2961,9 +2981,7 @@ static int
 pthread_cond_destroy(pthread_cond_t *cv)
 {
 	EnterCriticalSection(&cv->threadIdSec);
-	assert(cv->waitingthreadcount == 0);
-	mg_free(cv->waitingthreadhdls);
-	cv->waitingthreadhdls = 0;
+	assert(cv->waiting_thread == NULL);
 	LeaveCriticalSection(&cv->threadIdSec);
 	DeleteCriticalSection(&cv->threadIdSec);
 
@@ -9551,6 +9569,13 @@ redirect_to_https_port(struct mg_connection *conn, int ssl_index)
 		mg_printf(conn,
 		          "HTTP/1.1 302 Found\r\nLocation: https://%s:%d%s%s%s\r\n\r\n",
 		          host,
+#if defined(USE_IPV6)
+		          (conn->ctx->listening_sockets[ssl_index].lsa.sa.sa_family
+		              == AF_INET6) ?
+		          (int)ntohs(
+		              conn->ctx->listening_sockets[ssl_index].lsa.sin6.sin6_port
+		              ) :
+#endif
 		          (int)ntohs(
 		              conn->ctx->listening_sockets[ssl_index].lsa.sin.sin_port),
 		          conn->request_info.local_uri,
@@ -10392,8 +10417,8 @@ close_all_listening_sockets(struct mg_context *ctx)
 	}
 	mg_free(ctx->listening_sockets);
 	ctx->listening_sockets = NULL;
-	mg_free(ctx->listening_ports);
-	ctx->listening_ports = NULL;
+	mg_free(ctx->listening_socket_fds);
+	ctx->listening_socket_fds = NULL;
 }
 
 
@@ -10467,7 +10492,7 @@ set_ports_option(struct mg_context *ctx)
 	struct vec vec;
 	struct socket so, *ptr;
 
-	in_port_t *portPtr;
+	struct pollfd *pfd;
 	union usa usa;
 	socklen_t len;
 
@@ -10613,7 +10638,8 @@ set_ports_option(struct mg_context *ctx)
 			continue;
 		}
 
-		if (getsockname(so.sock, &(usa.sa), &len) != 0) {
+		if (getsockname(so.sock, &(usa.sa), &len) != 0
+		    || usa.sa.sa_family != so.lsa.sa.sa_family) {
 
 			int err = (int)ERRNO;
 			mg_cry(fc(ctx),
@@ -10627,6 +10653,16 @@ set_ports_option(struct mg_context *ctx)
 			continue;
 		}
 
+		/* Update lsa port in case of random free ports */
+#if defined(USE_IPV6)
+		if (so.lsa.sa.sa_family == AF_INET6) {
+			so.lsa.sin6.sin6_port = usa.sin6.sin6_port;
+		} else
+#endif
+		{
+			so.lsa.sin.sin_port = usa.sin.sin_port;
+		}
+
 		if ((ptr = (struct socket *)
 		         mg_realloc(ctx->listening_sockets,
 		                    (ctx->num_listening_sockets + 1)
@@ -10638,10 +10674,10 @@ set_ports_option(struct mg_context *ctx)
 			continue;
 		}
 
-		if ((portPtr =
-		         (in_port_t *)mg_realloc(ctx->listening_ports,
-		                                 (ctx->num_listening_sockets + 1)
-		                                     * sizeof(ctx->listening_ports[0])))
+		if ((pfd = (struct pollfd *)
+		         mg_realloc(ctx->listening_socket_fds,
+		                    (ctx->num_listening_sockets + 1)
+		                        * sizeof(ctx->listening_socket_fds[0])))
 		    == NULL) {
 
 			mg_cry(fc(ctx), "%s", "Out of memory");
@@ -10654,9 +10690,7 @@ set_ports_option(struct mg_context *ctx)
 		set_close_on_exec(so.sock, fc(ctx));
 		ctx->listening_sockets = ptr;
 		ctx->listening_sockets[ctx->num_listening_sockets] = so;
-		ctx->listening_ports = portPtr;
-		ctx->listening_ports[ctx->num_listening_sockets] =
-		    ntohs(usa.sin.sin_port);
+		ctx->listening_socket_fds = pfd;
 		ctx->num_listening_sockets++;
 		portsOk++;
 	}
@@ -12585,13 +12619,6 @@ worker_thread_run(void *thread_func_param)
 		}
 	}
 
-	/* Signal master that we're done with connection and exiting */
-	(void)pthread_mutex_lock(&ctx->thread_mutex);
-	ctx->running_worker_threads--;
-	(void)pthread_cond_signal(&ctx->thread_cond);
-	/* assert(ctx->running_worker_threads >= 0); */
-	(void)pthread_mutex_unlock(&ctx->thread_mutex);
-
 	pthread_setspecific(sTlsKey, NULL);
 #if defined(_WIN32) && !defined(__SYMBIAN32__)
 	CloseHandle(tls.pthread_cond_helper_mutex);
@@ -12786,10 +12813,9 @@ master_thread_run(void *thread_func_param)
 	/* Server starts *now* */
 	ctx->start_time = time(NULL);
 
-	/* Allocate memory for the listening sockets, and start the server */
-	pfd =
-	    (struct pollfd *)mg_calloc(ctx->num_listening_sockets, sizeof(pfd[0]));
-	while (pfd != NULL && ctx->stop_flag == 0) {
+	/* Start the server */
+	pfd = ctx->listening_socket_fds;
+	while (ctx->stop_flag == 0) {
 		for (i = 0; i < ctx->num_listening_sockets; i++) {
 			pfd[i].fd = ctx->listening_sockets[i].sock;
 			pfd[i].events = POLLIN;
@@ -12808,20 +12834,14 @@ master_thread_run(void *thread_func_param)
 			}
 		}
 	}
-	mg_free(pfd);
 	DEBUG_TRACE("%s", "stopping workers");
 
 	/* Stop signal received: somebody called mg_stop. Quit. */
 	close_all_listening_sockets(ctx);
 
 	/* Wakeup workers that are waiting for connections to handle. */
-	pthread_cond_broadcast(&ctx->sq_full);
-
-	/* Wait until all threads finish */
 	(void)pthread_mutex_lock(&ctx->thread_mutex);
-	while (ctx->running_worker_threads > 0) {
-		(void)pthread_cond_wait(&ctx->thread_cond, &ctx->thread_mutex);
-	}
+	pthread_cond_broadcast(&ctx->sq_full);
 	(void)pthread_mutex_unlock(&ctx->thread_mutex);
 
 	/* Join all worker threads to avoid leaking threads. */
@@ -12886,7 +12906,6 @@ free_context(struct mg_context *ctx)
 	 * condvars
 	 */
 	(void)pthread_mutex_destroy(&ctx->thread_mutex);
-	(void)pthread_cond_destroy(&ctx->thread_cond);
 	(void)pthread_cond_destroy(&ctx->sq_empty);
 	(void)pthread_cond_destroy(&ctx->sq_full);
 
@@ -13090,7 +13109,6 @@ mg_start(const struct mg_callbacks *callbacks,
 #endif
 
 	ok = 0 == pthread_mutex_init(&ctx->thread_mutex, &pthread_mutex_attr);
-	ok &= 0 == pthread_cond_init(&ctx->thread_cond, NULL);
 	ok &= 0 == pthread_cond_init(&ctx->sq_empty, NULL);
 	ok &= 0 == pthread_cond_init(&ctx->sq_full, NULL);
 	ok &= 0 == pthread_mutex_init(&ctx->nonce_mutex, &pthread_mutex_attr);
@@ -13218,15 +13236,9 @@ mg_start(const struct mg_callbacks *callbacks,
 
 	/* Start worker threads */
 	for (i = 0; i < ctx->cfg_worker_threads; i++) {
-		(void)pthread_mutex_lock(&ctx->thread_mutex);
-		ctx->running_worker_threads++;
-		(void)pthread_mutex_unlock(&ctx->thread_mutex);
 		if (mg_start_thread_with_id(worker_thread,
 		                            ctx,
 		                            &ctx->workerthreadids[i]) != 0) {
-			(void)pthread_mutex_lock(&ctx->thread_mutex);
-			ctx->running_worker_threads--;
-			(void)pthread_mutex_unlock(&ctx->thread_mutex);
 			if (i > 0) {
 				mg_cry(fc(ctx),
 				       "Cannot start worker thread %i: error %ld",