فهرست منبع

Merge pull request #661 from xtne6f/pr-altq

 Fix alternative queue
bel2125 6 سال پیش
والد
کامیت
041ceca18c
1فایلهای تغییر یافته به همراه48 افزوده شده و 24 حذف شده
  1. 48 24
      src/civetweb.c

+ 48 - 24
src/civetweb.c

@@ -2296,7 +2296,7 @@ struct socket {
 	unsigned char is_ssl;    /* Is port SSL-ed */
 	unsigned char is_ssl;    /* Is port SSL-ed */
 	unsigned char ssl_redir; /* Is port supposed to redirect everything to SSL
 	unsigned char ssl_redir; /* Is port supposed to redirect everything to SSL
 	                          * port */
 	                          * port */
-	unsigned char in_use;    /* Is valid */
+	unsigned char in_use;    /* 0: invalid, 1: valid, 2: free */
 };
 };
 
 
 
 
@@ -2898,6 +2898,7 @@ event_destroy(void *eventhdl)
 struct posix_event {
 struct posix_event {
 	pthread_mutex_t mutex;
 	pthread_mutex_t mutex;
 	pthread_cond_t cond;
 	pthread_cond_t cond;
+	int signaled;
 };
 };
 
 
 
 
@@ -2920,6 +2921,7 @@ event_create(void)
 		mg_free(ret);
 		mg_free(ret);
 		return 0;
 		return 0;
 	}
 	}
+	ret->signaled = 0;
 	return (void *)ret;
 	return (void *)ret;
 }
 }
 
 
@@ -2929,7 +2931,10 @@ event_wait(void *eventhdl)
 {
 {
 	struct posix_event *ev = (struct posix_event *)eventhdl;
 	struct posix_event *ev = (struct posix_event *)eventhdl;
 	pthread_mutex_lock(&(ev->mutex));
 	pthread_mutex_lock(&(ev->mutex));
-	pthread_cond_wait(&(ev->cond), &(ev->mutex));
+	while (!ev->signaled) {
+		pthread_cond_wait(&(ev->cond), &(ev->mutex));
+	}
+	ev->signaled = 0;
 	pthread_mutex_unlock(&(ev->mutex));
 	pthread_mutex_unlock(&(ev->mutex));
 	return 1;
 	return 1;
 }
 }
@@ -2941,6 +2946,7 @@ event_signal(void *eventhdl)
 	struct posix_event *ev = (struct posix_event *)eventhdl;
 	struct posix_event *ev = (struct posix_event *)eventhdl;
 	pthread_mutex_lock(&(ev->mutex));
 	pthread_mutex_lock(&(ev->mutex));
 	pthread_cond_signal(&(ev->cond));
 	pthread_cond_signal(&(ev->cond));
+	ev->signaled = 1;
 	pthread_mutex_unlock(&(ev->mutex));
 	pthread_mutex_unlock(&(ev->mutex));
 	return 1;
 	return 1;
 }
 }
@@ -13435,7 +13441,7 @@ mg_set_websocket_handler_with_subprotocols(
 void
 void
 mg_set_auth_handler(struct mg_context *ctx,
 mg_set_auth_handler(struct mg_context *ctx,
                     const char *uri,
                     const char *uri,
-                    mg_request_handler handler,
+                    mg_authorization_handler handler,
                     void *cbdata)
                     void *cbdata)
 {
 {
 	mg_set_handler_type(ctx,
 	mg_set_handler_type(ctx,
@@ -17621,16 +17627,25 @@ produce_socket(struct mg_context *ctx, const struct socket *sp)
 	while (!ctx->stop_flag) {
 	while (!ctx->stop_flag) {
 		for (i = 0; i < ctx->cfg_worker_threads; i++) {
 		for (i = 0; i < ctx->cfg_worker_threads; i++) {
 			/* find a free worker slot and signal it */
 			/* find a free worker slot and signal it */
-			if (ctx->client_socks[i].in_use == 0) {
-				ctx->client_socks[i] = *sp;
-				ctx->client_socks[i].in_use = 1;
-				event_signal(ctx->client_wait_events[i]);
-				return;
+			if (ctx->client_socks[i].in_use == 2) {
+				(void)pthread_mutex_lock(&ctx->thread_mutex);
+				if ((ctx->client_socks[i].in_use == 2) && !ctx->stop_flag) {
+					ctx->client_socks[i] = *sp;
+					ctx->client_socks[i].in_use = 1;
+					/* socket has been moved to the consumer */
+					(void)pthread_mutex_unlock(&ctx->thread_mutex);
+					(void)event_signal(ctx->client_wait_events[i]);
+					return;
+				}
+				(void)pthread_mutex_unlock(&ctx->thread_mutex);
 			}
 			}
 		}
 		}
 		/* queue is full */
 		/* queue is full */
 		mg_sleep(1);
 		mg_sleep(1);
 	}
 	}
+	/* must consume */
+	set_blocking_mode(sp->sock);
+	closesocket(sp->sock);
 }
 }
 
 
 
 
@@ -17638,12 +17653,31 @@ static int
 consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
 consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
 {
 {
 	DEBUG_TRACE("%s", "going idle");
 	DEBUG_TRACE("%s", "going idle");
-	ctx->client_socks[thread_index].in_use = 0;
+	(void)pthread_mutex_lock(&ctx->thread_mutex);
+	ctx->client_socks[thread_index].in_use = 2;
+	(void)pthread_mutex_unlock(&ctx->thread_mutex);
+
 	event_wait(ctx->client_wait_events[thread_index]);
 	event_wait(ctx->client_wait_events[thread_index]);
-	*sp = ctx->client_socks[thread_index];
-	DEBUG_TRACE("grabbed socket %d, going busy", sp ? sp->sock : -1);
 
 
-	return !ctx->stop_flag;
+	(void)pthread_mutex_lock(&ctx->thread_mutex);
+	*sp = ctx->client_socks[thread_index];
+	if (ctx->stop_flag) {
+		(void)pthread_mutex_unlock(&ctx->thread_mutex);
+		if (sp->in_use == 1) {
+			/* must consume */
+			set_blocking_mode(sp->sock);
+			closesocket(sp->sock);
+		}
+		return 0;
+	}
+	(void)pthread_mutex_unlock(&ctx->thread_mutex);
+	if (sp->in_use == 1) {
+		DEBUG_TRACE("grabbed socket %d, going busy", sp->sock);
+		return 1;
+	}
+	/* must not reach here */
+	DEBUG_ASSERT(0);
+	return 0;
 }
 }
 
 
 #else /* ALTERNATIVE_QUEUE */
 #else /* ALTERNATIVE_QUEUE */
@@ -17791,15 +17825,10 @@ worker_thread_run(struct worker_thread_args *thread_args)
 	conn->conn_state = 1; /* not consumed */
 	conn->conn_state = 1; /* not consumed */
 #endif
 #endif
 
 
-#if defined(ALTERNATIVE_QUEUE)
-	while ((ctx->stop_flag == 0)
-	       && consume_socket(ctx, &conn->client, conn->thread_index)) {
-#else
 	/* Call consume_socket() even when ctx->stop_flag > 0, to let it
 	/* Call consume_socket() even when ctx->stop_flag > 0, to let it
 	 * signal sq_empty condvar to wake up the master waiting in
 	 * signal sq_empty condvar to wake up the master waiting in
 	 * produce_socket() */
 	 * produce_socket() */
 	while (consume_socket(ctx, &conn->client, conn->thread_index)) {
 	while (consume_socket(ctx, &conn->client, conn->thread_index)) {
-#endif
 
 
 		conn->conn_birth_time = time(NULL);
 		conn->conn_birth_time = time(NULL);
 
 
@@ -18083,20 +18112,15 @@ master_thread_run(void *thread_func_param)
 	close_all_listening_sockets(ctx);
 	close_all_listening_sockets(ctx);
 
 
 	/* Wakeup workers that are waiting for connections to handle. */
 	/* Wakeup workers that are waiting for connections to handle. */
-	(void)pthread_mutex_lock(&ctx->thread_mutex);
 #if defined(ALTERNATIVE_QUEUE)
 #if defined(ALTERNATIVE_QUEUE)
 	for (i = 0; i < ctx->cfg_worker_threads; i++) {
 	for (i = 0; i < ctx->cfg_worker_threads; i++) {
 		event_signal(ctx->client_wait_events[i]);
 		event_signal(ctx->client_wait_events[i]);
-
-		/* Since we know all sockets, we can shutdown the connections. */
-		if (ctx->client_socks[i].in_use) {
-			shutdown(ctx->client_socks[i].sock, SHUTDOWN_BOTH);
-		}
 	}
 	}
 #else
 #else
+	(void)pthread_mutex_lock(&ctx->thread_mutex);
 	pthread_cond_broadcast(&ctx->sq_full);
 	pthread_cond_broadcast(&ctx->sq_full);
-#endif
 	(void)pthread_mutex_unlock(&ctx->thread_mutex);
 	(void)pthread_mutex_unlock(&ctx->thread_mutex);
+#endif
 
 
 	/* Join all worker threads to avoid leaking threads. */
 	/* Join all worker threads to avoid leaking threads. */
 	workerthreadcount = ctx->cfg_worker_threads;
 	workerthreadcount = ctx->cfg_worker_threads;