Selaa lähdekoodia

Rewrite websocket for Lua (Step 12 of ?)

bel 11 vuotta sitten
vanhempi
commit
9ec115fff2
2 muutettua tiedostoa jossa 88 lisäystä ja 26 poistoa
  1. 76 25
      src/civetweb.c
  2. 12 1
      src/mod_lua.inl

+ 76 - 25
src/civetweb.c

@@ -773,9 +773,9 @@ struct mg_context {
     volatile int sq_tail;           /* Tail of the socket queue */
     volatile int sq_tail;           /* Tail of the socket queue */
     pthread_cond_t sq_full;         /* Signaled when socket is produced */
     pthread_cond_t sq_full;         /* Signaled when socket is produced */
     pthread_cond_t sq_empty;        /* Signaled when socket is consumed */
     pthread_cond_t sq_empty;        /* Signaled when socket is consumed */
-    pthread_t masterthreadid;       /* The master thread ID. */
+    pthread_t masterthreadid;       /* The master thread ID */
     int workerthreadcount;          /* The amount of worker threads. */
     int workerthreadcount;          /* The amount of worker threads. */
-    pthread_t *workerthreadids;     /* The worker thread IDs. */
+    pthread_t *workerthreadids;     /* The worker thread IDs */
 
 
     unsigned long start_time;       /* Server start time, used for authentication */
     unsigned long start_time;       /* Server start time, used for authentication */
     pthread_mutex_t nonce_mutex;    /* Protects nonce_count */
     pthread_mutex_t nonce_mutex;    /* Protects nonce_count */
@@ -789,35 +789,35 @@ struct mg_context {
 #if defined(USE_LUA) && defined(USE_WEBSOCKET)
 #if defined(USE_LUA) && defined(USE_WEBSOCKET)
     /* linked list of shared lua websockets */
     /* linked list of shared lua websockets */
     struct mg_shared_lua_websocket_list *shared_lua_websockets;
     struct mg_shared_lua_websocket_list *shared_lua_websockets;
+    pthread_t timerthreadid;        /* Time thread ID */
+    pthread_mutex_t timer_mutex;    /* Protects timer lists */
 #endif
 #endif
 };
 };
 
 
 struct mg_connection {
 struct mg_connection {
     struct mg_request_info request_info;
     struct mg_request_info request_info;
     struct mg_context *ctx;
     struct mg_context *ctx;
-    SSL *ssl;                   /* SSL descriptor */
-    SSL_CTX *client_ssl_ctx;    /* SSL context for client connections */
-    struct socket client;       /* Connected client */
-    time_t birth_time;          /* Time when request was received */
-    int64_t num_bytes_sent;     /* Total bytes sent to client */
-    int64_t content_len;        /* Content-Length header value */
-    int64_t consumed_content;   /* How many bytes of content have been read */
-    char *buf;                  /* Buffer for received data */
-    char *path_info;            /* PATH_INFO part of the URL */
-    int must_close;             /* 1 if connection must be closed */
-    int in_error_handler;       /* 1 if in handler for user defined error pages */
-    int buf_size;               /* Buffer size */
-    int request_len;            /* Size of the request + headers in a buffer */
-    int data_len;               /* Total size of data in a buffer */
-    int status_code;            /* HTTP reply status code, e.g. 200 */
-    int throttle;               /* Throttling, bytes/sec. <= 0 means no
-                                   throttle */
-    time_t last_throttle_time;  /* Last time throttled data was sent */
-    int64_t last_throttle_bytes;/* Bytes sent this second */
-    pthread_mutex_t mutex;      /* Used by mg_lock/mg_unlock to ensure atomic
-                                   transmissions for websockets */
+    SSL *ssl;                       /* SSL descriptor */
+    SSL_CTX *client_ssl_ctx;        /* SSL context for client connections */
+    struct socket client;           /* Connected client */
+    time_t birth_time;              /* Time when request was received */
+    int64_t num_bytes_sent;         /* Total bytes sent to client */
+    int64_t content_len;            /* Content-Length header value */
+    int64_t consumed_content;       /* How many bytes of content have been read */
+    char *buf;                      /* Buffer for received data */
+    char *path_info;                /* PATH_INFO part of the URL */
+    int must_close;                 /* 1 if connection must be closed */
+    int in_error_handler;           /* 1 if in handler for user defined error pages */
+    int buf_size;                   /* Buffer size */
+    int request_len;                /* Size of the request + headers in a buffer */
+    int data_len;                   /* Total size of data in a buffer */
+    int status_code;                /* HTTP reply status code, e.g. 200 */
+    int throttle;                   /* Throttling, bytes/sec. <= 0 means no throttle */
+    time_t last_throttle_time;      /* Last time throttled data was sent */
+    int64_t last_throttle_bytes;    /* Bytes sent this second */
+    pthread_mutex_t mutex;          /* Used by mg_lock/mg_unlock to ensure atomic transmissions for websockets */
 #if defined(USE_LUA) && defined(USE_WEBSOCKET)
 #if defined(USE_LUA) && defined(USE_WEBSOCKET)
-    void * lua_websocket_state; /* Lua_State for a websocket connection */
+    void * lua_websocket_state;     /* Lua_State for a websocket connection */
 #endif
 #endif
 };
 };
 
 
@@ -1419,7 +1419,18 @@ static int pthread_mutex_destroy(pthread_mutex_t *mutex)
 
 
 static int pthread_mutex_lock(pthread_mutex_t *mutex)
 static int pthread_mutex_lock(pthread_mutex_t *mutex)
 {
 {
-    return WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0? 0 : -1;
+    return WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0 ? 0 : -1;
+}
+
+static int pthread_mutex_trylock(pthread_mutex_t *mutex)
+{
+    switch (WaitForSingleObject(*mutex, 0)) {
+        case WAIT_OBJECT_0:
+            return 0;
+        case WAIT_TIMEOUT:
+            return -2; /* EBUSY */
+    }
+    return -1;
 }
 }
 
 
 static int pthread_mutex_unlock(pthread_mutex_t *mutex)
 static int pthread_mutex_unlock(pthread_mutex_t *mutex)
@@ -6728,6 +6739,33 @@ static void *master_thread(void *thread_func_param)
 }
 }
 #endif /* _WIN32 */
 #endif /* _WIN32 */
 
 
+#if defined(USE_LUA) && defined(USE_WEBSOCKET)
+void timer_thread_run(void *thread_func_param)
+{
+    struct mg_context *ctx = (struct mg_context *) thread_func_param;
+    while (ctx->stop_flag == 0) {
+        pthread_mutex_lock(&ctx->timer_mutex);
+        /* TODO: something useful */
+        pthread_mutex_unlock(&ctx->timer_mutex);
+        mg_sleep(1);
+    }
+}
+
+#ifdef _WIN32
+static unsigned __stdcall timer_thread(void *thread_func_param)
+{
+    timer_thread_run(thread_func_param);
+    return 0;
+}
+#else
+static void *timer_thread(void *thread_func_param)
+{
+    timer_thread_run(thread_func_param);
+    return NULL;
+}
+#endif /* _WIN32 */
+#endif /* USE_LUA && USE_WEBSOCKET */
+
 static void free_context(struct mg_context *ctx)
 static void free_context(struct mg_context *ctx)
 {
 {
     int i;
     int i;
@@ -6745,6 +6783,10 @@ static void free_context(struct mg_context *ctx)
     /* Destroy other context global data structures mutex */
     /* Destroy other context global data structures mutex */
     (void) pthread_mutex_destroy(&ctx->nonce_mutex);
     (void) pthread_mutex_destroy(&ctx->nonce_mutex);
 
 
+#if defined(USE_LUA) && defined(USE_WEBSOCKET)
+    (void) pthread_mutex_destroy(&ctx->timer_mutex);
+#endif
+
     /* Deallocate config parameters */
     /* Deallocate config parameters */
     for (i = 0; i < NUM_OPTIONS; i++) {
     for (i = 0; i < NUM_OPTIONS; i++) {
         if (ctx->config[i] != NULL)
         if (ctx->config[i] != NULL)
@@ -6937,6 +6979,10 @@ struct mg_context *mg_start(const struct mg_callbacks *callbacks,
 
 
     (void) pthread_mutex_init(&ctx->nonce_mutex, NULL);
     (void) pthread_mutex_init(&ctx->nonce_mutex, NULL);
 
 
+#if defined(USE_LUA) && defined(USE_WEBSOCKET)
+    (void) pthread_mutex_init(&ctx->timer_mutex, NULL);
+#endif
+
     workerthreadcount = atoi(ctx->config[NUM_THREADS]);
     workerthreadcount = atoi(ctx->config[NUM_THREADS]);
 
 
     if (workerthreadcount > MAX_WORKER_THREADS) {
     if (workerthreadcount > MAX_WORKER_THREADS) {
@@ -6955,6 +7001,11 @@ struct mg_context *mg_start(const struct mg_callbacks *callbacks,
         }
         }
     }
     }
 
 
+#if defined(USE_LUA) && defined(USE_WEBSOCKET)
+    /* Start timer thread */
+    mg_start_thread_with_id(timer_thread, ctx, &ctx->timerthreadid);
+#endif
+
     /* Start master (listening) thread */
     /* Start master (listening) thread */
     mg_start_thread_with_id(master_thread, ctx, &ctx->masterthreadid);
     mg_start_thread_with_id(master_thread, ctx, &ctx->masterthreadid);
 
 

+ 12 - 1
src/mod_lua.inl

@@ -741,6 +741,11 @@ static int lwebsocket_set_timer(lua_State *L, int is_periodic)
     int type;
     int type;
     struct timespec ts_now;
     struct timespec ts_now;
     double now;
     double now;
+    struct mg_context *ctx;
+
+    lua_pushlightuserdata(L, (void *)&lua_regkey_ctx);
+    lua_gettable(L, LUA_REGISTRYINDEX);
+    ctx = (struct mg_context *)lua_touserdata(L, -1);
 
 
     lua_pushlightuserdata(L, (void *)&lua_regkey_connlist);
     lua_pushlightuserdata(L, (void *)&lua_regkey_connlist);
     lua_gettable(L, LUA_REGISTRYINDEX);
     lua_gettable(L, LUA_REGISTRYINDEX);
@@ -762,10 +767,16 @@ static int lwebsocket_set_timer(lua_State *L, int is_periodic)
     clock_gettime(CLOCK_MONOTONIC, &ts_now);
     clock_gettime(CLOCK_MONOTONIC, &ts_now);
     now = (double)ts_now.tv_sec + ((double)ts_now.tv_nsec * 1.0E6);
     now = (double)ts_now.tv_sec + ((double)ts_now.tv_nsec * 1.0E6);
 
 
+    pthread_mutex_lock(&ctx->timer_mutex);
     /* TODO: next timer call: now + timediff */
     /* TODO: next timer call: now + timediff */
+    pthread_mutex_unlock(&ctx->timer_mutex);
 
 
-#endif
+    lua_pushboolean(L, 1);
+    return 1;
+
+#else
     return 0;
     return 0;
+#endif
 }
 }
 
 
 /* mg.set_timeout for websockets */
 /* mg.set_timeout for websockets */