Procházet zdrojové kódy

Rewrite websockets for Lua (Step 1 of ?)

bel před 11 roky
rodič
revize
5522ad9555
4 změnil soubory, kde provedl 202 přidání a 254 odebrání
  1. 54 47
      src/civetweb.c
  2. 118 192
      src/mod_lua.inl
  3. 25 13
      test/websocket.lua
  4. 5 2
      test/websocket.xhtml

+ 54 - 47
src/civetweb.c

@@ -764,30 +764,31 @@ struct mg_context {
     in_port_t *listening_ports;
     int num_listening_sockets;
 
-    volatile int num_threads;  /* Number of threads */
-    pthread_mutex_t mutex;     /* Protects (max|num)_threads */
-    pthread_cond_t  cond;      /* Condvar for tracking workers terminations */
+    volatile int num_threads;       /* Number of threads */
+    pthread_mutex_t thread_mutex;   /* Protects (max|num)_threads */
+    pthread_cond_t thread_cond;     /* Condvar for tracking workers terminations */
 
     struct socket queue[MGSQLEN];   /* Accepted sockets */
-    volatile int sq_head;      /* Head of the socket queue */
-    volatile int sq_tail;      /* Tail of the socket queue */
-    pthread_cond_t sq_full;    /* Signaled when socket is produced */
-    pthread_cond_t sq_empty;   /* Signaled when socket is consumed */
-    pthread_t masterthreadid;  /* The master thread ID. */
-    int workerthreadcount;     /* The amount of worker threads. */
-    pthread_t *workerthreadids;/* The worker thread IDs. */
+    volatile int sq_head;           /* Head of the socket queue */
+    volatile int sq_tail;           /* Tail of the socket queue */
+    pthread_cond_t sq_full;         /* Signaled when socket is produced */
+    pthread_cond_t sq_empty;        /* Signaled when socket is consumed */
+    pthread_t masterthreadid;       /* The master thread ID. */
+    int workerthreadcount;          /* The amount of worker threads. */
+    pthread_t *workerthreadids;     /* The worker thread IDs. */
 
-    unsigned long start_time;  /* Server start time, used for authentication */
-    unsigned long nonce_count; /* Used nonces, used for authentication */
+    unsigned long start_time;       /* Server start time, used for authentication */
+    pthread_mutex_t nonce_mutex;    /* Protects nonce_count */
+    unsigned long nonce_count;      /* Used nonces, used for authentication */
 
-    char *systemName;          /* What operating system is running */
+    char *systemName;               /* What operating system is running */
 
     /* linked list of uri handlers */
     struct mg_request_handler_info *request_handlers;
 
 #if defined(USE_LUA) && defined(USE_WEBSOCKET)
     /* linked list of shared lua websockets */
-    struct mg_shared_lua_websocket *shared_lua_websockets;
+    struct mg_shared_lua_websocket_list *shared_lua_websockets;
 #endif
 };
 
@@ -3251,10 +3252,10 @@ static void send_authorization_request(struct mg_connection *conn)
     time_t curtime = time(NULL);
     unsigned long nonce = (unsigned long)(conn->ctx->start_time);
 
-    (void)pthread_mutex_lock(&conn->ctx->mutex);
+    (void)pthread_mutex_lock(&conn->ctx->nonce_mutex);
     nonce += conn->ctx->nonce_count;
     ++conn->ctx->nonce_count;
-    (void)pthread_mutex_unlock(&conn->ctx->mutex);
+    (void)pthread_mutex_unlock(&conn->ctx->nonce_mutex);
 
     nonce ^= (unsigned long)(conn->ctx);
     conn->status_code = 401;
@@ -5169,7 +5170,7 @@ static void read_websocket(struct mg_connection *conn)
                  !conn->ctx->callbacks.websocket_data(conn, mop, data, data_len)) ||
 #ifdef USE_LUA
                 (conn->lua_websocket_state &&
-                 !lua_websocket_data(conn, mop, data, data_len)) ||
+                 !lua_websocket_data(conn->lua_websocket_state, mop, data, data_len)) ||
 #endif
                 (buf[0] & 0xf) == WEBSOCKET_OPCODE_CONNECTION_CLOSE) {  /* Opcode == 8, connection close */
                 break;
@@ -5236,7 +5237,7 @@ static void handle_websocket_request(struct mg_connection *conn, const char *pat
 {
     const char *version = mg_get_header(conn, "Sec-WebSocket-Version");
 #ifdef USE_LUA
-    int lua_websock, shared_lua_websock = 0;
+    int lua_websock = 0;
     /* TODO: A websocket script may be shared between several clients, allowing them to communicate
              directly instead of writing to a data base and polling the data base. */
 #endif
@@ -5249,17 +5250,17 @@ static void handle_websocket_request(struct mg_connection *conn, const char *pat
         /* The C callback is called before Lua and may prevent Lua from handling the websocket. */
     } else {
 #ifdef USE_LUA
-        lua_websock = conn->ctx->config[LUA_WEBSOCKET_EXTENSIONS] ?
-                          match_prefix(conn->ctx->config[LUA_WEBSOCKET_EXTENSIONS],
+        if (conn->ctx->config[LUA_WEBSOCKET_EXTENSIONS]) {
+            lua_websock = match_prefix(conn->ctx->config[LUA_WEBSOCKET_EXTENSIONS],
                                        (int)strlen(conn->ctx->config[LUA_WEBSOCKET_EXTENSIONS]),
-                                       path) : 0;
+                                       path);
+        }
 
-        if (lua_websock || shared_lua_websock) {
-            /* TODO */ shared_lua_websock = 0;
-            conn->lua_websocket_state = lua_websocket_new(path, conn, !!shared_lua_websock);
+        if (lua_websock) {
+            conn->lua_websocket_state = lua_websocket_new(path, conn);
             if (conn->lua_websocket_state) {
                 send_websocket_handshake(conn);
-                if (lua_websocket_ready(conn)) {
+                if (lua_websocket_ready(conn->lua_websocket_state)) {
                     read_websocket(conn);
                 }
             }
@@ -6211,7 +6212,8 @@ static void close_connection(struct mg_connection *conn)
 {
 #if defined(USE_LUA) && defined(USE_WEBSOCKET)
     if (conn->lua_websocket_state) {
-        lua_websocket_close(conn);
+        lua_websocket_close(conn->lua_websocket_state);
+        conn->lua_websocket_state = NULL;
     }
 #endif
 
@@ -6431,12 +6433,12 @@ static void process_new_connection(struct mg_connection *conn)
 /* Worker threads take accepted socket from the queue */
 static int consume_socket(struct mg_context *ctx, struct socket *sp)
 {
-    (void) pthread_mutex_lock(&ctx->mutex);
+    (void) pthread_mutex_lock(&ctx->thread_mutex);
     DEBUG_TRACE("going idle");
 
     /* If the queue is empty, wait. We're idle at this point. */
     while (ctx->sq_head == ctx->sq_tail && ctx->stop_flag == 0) {
-        pthread_cond_wait(&ctx->sq_full, &ctx->mutex);
+        pthread_cond_wait(&ctx->sq_full, &ctx->thread_mutex);
     }
 
     /* If we're stopping, sq_head may be equal to sq_tail. */
@@ -6454,7 +6456,7 @@ static int consume_socket(struct mg_context *ctx, struct socket *sp)
     }
 
     (void) pthread_cond_signal(&ctx->sq_empty);
-    (void) pthread_mutex_unlock(&ctx->mutex);
+    (void) pthread_mutex_unlock(&ctx->thread_mutex);
 
     return !ctx->stop_flag;
 }
@@ -6512,11 +6514,11 @@ static void *worker_thread_run(void *thread_func_param)
     }
 
     /* Signal master that we're done with connection and exiting */
-    (void) pthread_mutex_lock(&ctx->mutex);
+    (void) pthread_mutex_lock(&ctx->thread_mutex);
     ctx->num_threads--;
-    (void) pthread_cond_signal(&ctx->cond);
+    (void) pthread_cond_signal(&ctx->thread_cond);
     assert(ctx->num_threads >= 0);
-    (void) pthread_mutex_unlock(&ctx->mutex);
+    (void) pthread_mutex_unlock(&ctx->thread_mutex);
 
     pthread_setspecific(sTlsKey, NULL);
 #if defined(_WIN32) && !defined(__SYMBIAN32__)
@@ -6547,12 +6549,12 @@ static void *worker_thread(void *thread_func_param)
 /* Master thread adds accepted socket to a queue */
 static void produce_socket(struct mg_context *ctx, const struct socket *sp)
 {
-    (void) pthread_mutex_lock(&ctx->mutex);
+    (void) pthread_mutex_lock(&ctx->thread_mutex);
 
     /* If the queue is full, wait */
     while (ctx->stop_flag == 0 &&
            ctx->sq_head - ctx->sq_tail >= (int) ARRAY_SIZE(ctx->queue)) {
-        (void) pthread_cond_wait(&ctx->sq_empty, &ctx->mutex);
+        (void) pthread_cond_wait(&ctx->sq_empty, &ctx->thread_mutex);
     }
 
     if (ctx->sq_head - ctx->sq_tail < (int) ARRAY_SIZE(ctx->queue)) {
@@ -6563,7 +6565,7 @@ static void produce_socket(struct mg_context *ctx, const struct socket *sp)
     }
 
     (void) pthread_cond_signal(&ctx->sq_full);
-    (void) pthread_mutex_unlock(&ctx->mutex);
+    (void) pthread_mutex_unlock(&ctx->thread_mutex);
 }
 
 static int set_sock_timeout(SOCKET sock, int milliseconds)
@@ -6685,11 +6687,11 @@ static void master_thread_run(void *thread_func_param)
     pthread_cond_broadcast(&ctx->sq_full);
 
     /* Wait until all threads finish */
-    (void) pthread_mutex_lock(&ctx->mutex);
+    (void) pthread_mutex_lock(&ctx->thread_mutex);
     while (ctx->num_threads > 0) {
-        (void) pthread_cond_wait(&ctx->cond, &ctx->mutex);
+        (void) pthread_cond_wait(&ctx->thread_cond, &ctx->thread_mutex);
     }
-    (void) pthread_mutex_unlock(&ctx->mutex);
+    (void) pthread_mutex_unlock(&ctx->thread_mutex);
 
     /* Join all worker threads to avoid leaking threads. */
     workerthreadcount = ctx->workerthreadcount;
@@ -6737,12 +6739,15 @@ static void free_context(struct mg_context *ctx)
     if (ctx == NULL)
         return;
 
-    /* All threads exited, no sync is needed. Destroy mutex and condvars */
-    (void) pthread_mutex_destroy(&ctx->mutex);
-    (void) pthread_cond_destroy(&ctx->cond);
+    /* All threads exited, no sync is needed. Destroy thread mutex and condvars */
+    (void) pthread_mutex_destroy(&ctx->thread_mutex);
+    (void) pthread_cond_destroy(&ctx->thread_cond);
     (void) pthread_cond_destroy(&ctx->sq_empty);
     (void) pthread_cond_destroy(&ctx->sq_full);
 
+    /* Destroy other context global data structures mutex */
+    (void) pthread_mutex_destroy(&ctx->nonce_mutex);
+
     /* Deallocate config parameters */
     for (i = 0; i < NUM_OPTIONS; i++) {
         if (ctx->config[i] != NULL)
@@ -6928,11 +6933,13 @@ struct mg_context *mg_start(const struct mg_callbacks *callbacks,
     (void) signal(SIGPIPE, SIG_IGN);
 #endif /* !_WIN32 && !__SYMBIAN32__ */
 
-    (void) pthread_mutex_init(&ctx->mutex, NULL);
-    (void) pthread_cond_init(&ctx->cond, NULL);
+    (void) pthread_mutex_init(&ctx->thread_mutex, NULL);
+    (void) pthread_cond_init(&ctx->thread_cond, NULL);
     (void) pthread_cond_init(&ctx->sq_empty, NULL);
     (void) pthread_cond_init(&ctx->sq_full, NULL);
 
+    (void) pthread_mutex_init(&ctx->nonce_mutex, NULL);
+
     workerthreadcount = atoi(ctx->config[NUM_THREADS]);
 
     if (workerthreadcount > MAX_WORKER_THREADS) {
@@ -6956,14 +6963,14 @@ struct mg_context *mg_start(const struct mg_callbacks *callbacks,
 
     /* Start worker threads */
     for (i = 0; i < workerthreadcount; i++) {
-        (void) pthread_mutex_lock(&ctx->mutex);
+        (void) pthread_mutex_lock(&ctx->thread_mutex);
         ctx->num_threads++;
-        (void) pthread_mutex_unlock(&ctx->mutex);
+        (void) pthread_mutex_unlock(&ctx->thread_mutex);
         if (mg_start_thread_with_id(worker_thread, ctx,
                                     &ctx->workerthreadids[i]) != 0) {
-            (void) pthread_mutex_lock(&ctx->mutex);
+            (void) pthread_mutex_lock(&ctx->thread_mutex);
             ctx->num_threads--;
-            (void) pthread_mutex_unlock(&ctx->mutex);
+            (void) pthread_mutex_unlock(&ctx->thread_mutex);
             mg_cry(fc(ctx), "Cannot start worker thread: %ld", (long) ERRNO);
         }
     }

+ 118 - 192
src/mod_lua.inl

@@ -800,6 +800,10 @@ static void prepare_lua_environment(struct mg_connection *conn, lua_State *L, co
     if ((preload_file != NULL) && (*preload_file != 0)) {
         IGNORE_UNUSED_RESULT(luaL_dofile(L, preload_file));
     }
+
+    if (conn->ctx->callbacks.init_lua != NULL) {
+        conn->ctx->callbacks.init_lua(conn, L);
+    }
 }
 
 static int lua_error_handler(lua_State *L)
@@ -907,9 +911,6 @@ struct file *filep, struct lua_State *ls)
         /* We're not sending HTTP headers here, Lua page must do it. */
         if (ls == NULL) {
             prepare_lua_environment(conn, L, path, LUA_ENV_TYPE_LUA_SERVER_PAGE);
-            if (conn->ctx->callbacks.init_lua != NULL) {
-                conn->ctx->callbacks.init_lua(conn, L);
-            }
         }
         error = lsp(conn, path, filep->membuf == NULL ? p : filep->membuf,
             filep->size, L);
@@ -923,17 +924,16 @@ struct file *filep, struct lua_State *ls)
 
 #ifdef USE_WEBSOCKET
 struct lua_websock_data {
-    lua_State *main;
-    lua_State *thread;
+    lua_State *state;
     char * script;
-    unsigned shared;
+    unsigned references;
     struct mg_connection *conn;
-    pthread_mutex_t mutex;
+    pthread_mutex_t ws_mutex;
 };
 
-struct mg_shared_lua_websocket {
-    struct lua_websock_data *sock;
-    struct mg_shared_lua_websocket *next;
+struct mg_shared_lua_websocket_list {
+    struct lua_websock_data ws;
+    struct mg_shared_lua_websocket_list *next;
 };
 
 static void websock_cry(struct mg_connection *conn, int err, lua_State * L, const char * ws_operation, const char * lua_operation)
@@ -963,215 +963,141 @@ static void websock_cry(struct mg_connection *conn, int err, lua_State * L, cons
     }
 }
 
-static void * lua_websocket_new(const char * script, struct mg_connection *conn, int is_shared)
+static void * lua_websocket_new(const char * script, struct mg_connection *conn)
 {
-    struct lua_websock_data *lws_data;
-    struct mg_shared_lua_websocket **shared_websock_list = &(conn->ctx->shared_lua_websockets);
-    int ok = 0;
-    int found = 0;
-    int err, nargs;
+    struct mg_shared_lua_websocket_list **shared_websock_list = &(conn->ctx->shared_lua_websockets);
+    int err, ok = 0;
 
     assert(conn->lua_websocket_state == NULL);
 
-    /*
-    lock list (mg_context global)
-    check if in list
-    yes: inc rec counter
-    no: create state, add to list
-    lock list element
-    unlock list (mg_context global)
-    call add
-    unlock list element
-    */
-
-    if (is_shared) {
-        (void)pthread_mutex_lock(&conn->ctx->mutex);
-        while (*shared_websock_list) {
-            if (!strcmp((*shared_websock_list)->sock->script, script)) {
-                lws_data = (*shared_websock_list)->sock;
-                lws_data->shared++;
-                found = 1;
-            }
-            shared_websock_list = &((*shared_websock_list)->next);
+    /* lock list (mg_context global) */
+    (void)pthread_mutex_lock(&conn->ctx->nonce_mutex);
+    while (*shared_websock_list) {
+        /* check if ws already in list */
+        if (0==strcmp(script,(*shared_websock_list)->ws.script)) {
+            break;
         }
-        (void)pthread_mutex_unlock(&conn->ctx->mutex);
-    }
-
-    if (!found) {
-        lws_data = (struct lua_websock_data *) mg_malloc(sizeof(*lws_data));
+        shared_websock_list = &((*shared_websock_list)->next);
     }
-
-    if (lws_data) {
-        if (!found) {
-            lws_data->shared = is_shared;
-            lws_data->conn = conn;
-            lws_data->script = mg_strdup(script);
-            lws_data->main = lua_newstate(lua_allocator, NULL);
-            if (is_shared) {
-                (void)pthread_mutex_lock(&conn->ctx->mutex);
-                shared_websock_list = &(conn->ctx->shared_lua_websockets);
-                while (*shared_websock_list) {
-                    shared_websock_list = &((*shared_websock_list)->next);
-                }
-                *shared_websock_list = (struct mg_shared_lua_websocket *)mg_malloc(sizeof(struct mg_shared_lua_websocket));
-                if (*shared_websock_list) {
-                    (*shared_websock_list)->sock = lws_data;
-                    (*shared_websock_list)->next = 0;
-                }
-                (void)pthread_mutex_unlock(&conn->ctx->mutex);
-            }
+    if (*shared_websock_list == NULL) {
+        /* add ws to list */
+        *shared_websock_list = mg_calloc(sizeof(struct mg_shared_lua_websocket_list), 1);
+        if (*shared_websock_list == NULL) {
+            (void)pthread_mutex_unlock(&conn->ctx->nonce_mutex);
+            mg_cry(conn, "Cannot create shared websocket struct, OOM");
+            return NULL;
         }
-
-        if (lws_data->main) {
-            prepare_lua_environment(conn, lws_data->main, script, LUA_ENV_TYPE_LUA_WEBSOCKET);
-            if (conn->ctx->callbacks.init_lua != NULL) {
-                conn->ctx->callbacks.init_lua(conn, lws_data->main);
-            }
-            lws_data->thread = lua_newthread(lws_data->main);
-            err = luaL_loadfile(lws_data->thread, script);
-            if (err==LUA_OK) {
-                /* Activate the Lua script. */
-                err = lua_resume(lws_data->thread, NULL, 0);
-                if (err!=LUA_YIELD) {
-                    websock_cry(conn, err, lws_data->thread, __func__, "lua_resume");
-                } else {
-                    nargs = lua_gettop(lws_data->thread);
-                    ok = (nargs==1) && lua_isboolean(lws_data->thread, 1) && lua_toboolean(lws_data->thread, 1);
-                }
-            } else {
-                websock_cry(conn, err, lws_data->thread, __func__, "lua_loadfile");
-            }
-
-        } else {
-            mg_cry(conn, "%s: luaL_newstate failed", __func__);
+        /* init ws list element */
+        (*shared_websock_list)->ws.conn = conn;
+        (*shared_websock_list)->ws.script = mg_strdup(script); /* TODO: handle OOM */
+        pthread_mutex_init(&((*shared_websock_list)->ws.ws_mutex), NULL);
+        (*shared_websock_list)->ws.state = lua_newstate(lua_allocator, NULL);
+        (*shared_websock_list)->ws.references = 1;
+        (void)pthread_mutex_lock(&((*shared_websock_list)->ws.ws_mutex));
+        prepare_lua_environment(conn, (*shared_websock_list)->ws.state, script, LUA_ENV_TYPE_LUA_WEBSOCKET);
+        err = luaL_loadfile((*shared_websock_list)->ws.state, script);
+        if (err != 0) {
+            mg_cry(conn, "Lua websocket: Error %i loading %s: %s", err, script,
+                lua_tostring((*shared_websock_list)->ws.state, -1));
         }
-
-        if (!ok) {
-            if (lws_data->main) lua_close(lws_data->main);
-            mg_free(lws_data->script);
-            mg_free(lws_data);
-            lws_data=0;
+        err = lua_pcall((*shared_websock_list)->ws.state, 0, 0, 0);
+        if (err != 0) {
+            mg_cry(conn, "Lua websocket: Error %i initializing %s: %s", err, script,
+                lua_tostring((*shared_websock_list)->ws.state, -1));
         }
     } else {
-        mg_cry(conn, "%s: out of memory", __func__);
+        /* inc ref count */
+        (void)pthread_mutex_lock(&((*shared_websock_list)->ws.ws_mutex));
+        ((*shared_websock_list)->ws.references)++;
     }
+    (void)pthread_mutex_unlock(&conn->ctx->nonce_mutex);
+
+    /* call add */
+    lua_getglobal((*shared_websock_list)->ws.state, "open");
+    err = lua_pcall((*shared_websock_list)->ws.state, 0, 1, 0);
+    if (err != 0) {
+        mg_cry(conn, "Lua websocket: Error %i calling open handler of %s: %s", err, script,
+            lua_tostring((*shared_websock_list)->ws.state, -1));
+    } else {
+        if (lua_isboolean((*shared_websock_list)->ws.state, -1)) {
+            ok = lua_toboolean((*shared_websock_list)->ws.state, -1);
+        }
+        lua_pop((*shared_websock_list)->ws.state, 1);
+    }
+    if (!ok) {
+        /* TODO */
+    }
+
+    (void)pthread_mutex_unlock(&((*shared_websock_list)->ws.ws_mutex));
 
-    return lws_data;
+    return (void*)&((*shared_websock_list)->ws);
 }
 
-static int lua_websocket_data(struct mg_connection *conn, int bits, char *data, size_t data_len)
+static int lua_websocket_data(void *ws_arg, int bits, char *data, size_t data_len)
 {
-    struct lua_websock_data *lws_data = (struct lua_websock_data *)(conn->lua_websocket_state);
-    int err, nargs, ok=0, retry;
-    lua_Number delay;
-
-    assert(lws_data != NULL);
-    assert(lws_data->main != NULL);
-    assert(lws_data->thread != NULL);
-
-    /*
-    lock list element
-    call data
-    unlock list element
-    */
-
-    do {
-        retry=0;
-
-        /* Push the data to Lua, then resume the Lua state. */
-        /* The data will be available to Lua as the result of the coroutine.yield function. */
-        lua_pushboolean(lws_data->thread, 1);
-        if (bits >= 0) {
-            lua_pushinteger(lws_data->thread, bits);
-            if (data) {
-                lua_pushlstring(lws_data->thread, data, data_len);
-                err = lua_resume(lws_data->thread, NULL, 3);
-            } else {
-                err = lua_resume(lws_data->thread, NULL, 2);
-            }
-        } else {
-            err = lua_resume(lws_data->thread, NULL, 1);
-        }
-
-        /* Check if Lua returned by a call to the coroutine.yield function. */
-        if (err!=LUA_YIELD) {
-            websock_cry(conn, err, lws_data->thread, __func__, "lua_resume");
-        } else {
-            nargs = lua_gettop(lws_data->thread);
-            ok = (nargs>=1) && lua_isboolean(lws_data->thread, 1) && lua_toboolean(lws_data->thread, 1);
-            delay = (nargs>=2) && lua_isnumber(lws_data->thread, 2) ? lua_tonumber(lws_data->thread, 2) : -1.0;
-            if (ok && delay>0) {
-                fd_set rfds;
-                struct timeval tv;
-
-                FD_ZERO(&rfds);
-                FD_SET(conn->client.sock, &rfds);
-
-                tv.tv_sec = (unsigned long)delay;
-                tv.tv_usec = (unsigned long)(((double)delay - (double)((unsigned long)delay))*1000000.0);
-                retry = (0==select(conn->client.sock+1, &rfds, NULL, NULL, &tv));
-            }
+    struct lua_websock_data *ws = (struct lua_websock_data *)(ws_arg);
+    int err, ok = 0;
+
+    assert(ws != NULL);
+    assert(ws->state != NULL);
+
+    (void)pthread_mutex_lock(&ws->ws_mutex);
+    lua_getglobal(ws->state, "data");
+    lua_pushnumber(ws->state, bits);
+    lua_pushlstring(ws->state, data, data_len);
+    err = lua_pcall(ws->state, 2, 1, 0);
+    if (err != 0) {
+        mg_cry(ws->conn, "Lua websocket: Error %i calling data handler of %s", err, ws->script);
+    } else {
+        if (lua_isboolean(ws->state, -1)) {
+            ok = lua_toboolean(ws->state, -1);
         }
-    } while (retry);
+        lua_pop(ws->state, 1);
+    }
+    (void)pthread_mutex_unlock(&ws->ws_mutex);
 
     return ok;
 }
 
-static int lua_websocket_ready(struct mg_connection *conn)
+static int lua_websocket_ready(void * ws_arg)
 {
-    return lua_websocket_data(conn, -1, NULL, 0);
+    return lua_websocket_data(ws_arg, -1, NULL, 0);
 }
 
-static void lua_websocket_close(struct mg_connection *conn)
+static void lua_websocket_close(void * ws_arg)
 {
-    struct lua_websock_data *lws_data = (struct lua_websock_data *)(conn->lua_websocket_state);
-    struct mg_shared_lua_websocket **shared_websock_list;
-    int err;
-
-    assert(lws_data != NULL);
-    assert(lws_data->main != NULL);
-    assert(lws_data->thread != NULL);
-
-    /*
-    lock list element
-    lock list (mg_context global)
-    call remove    
-    dec ref counter
-    if ref counter == 0 close state and remove from list
-    unlock list element
-    unlock list (mg_context global)
-    */
-
-
-    lua_pushboolean(lws_data->thread, 0);
-    err = lua_resume(lws_data->thread, NULL, 1);
-
-    if (lws_data->shared) {
-        (void)pthread_mutex_lock(&conn->ctx->mutex);
-        lws_data->shared--;
-        if (lws_data->shared==0) {
-        /*
-            shared_websock_list = &(conn->ctx->shared_lua_websockets);
-            while (*shared_websock_list) {
-                if ((*shared_websock_list)->sock == lws_data) {
-                    *shared_websock_list = (*shared_websock_list)->next;
-                } else {
-                    shared_websock_list = &((*shared_websock_list)->next);
-                }
-            }
+    struct lua_websock_data *ws = (struct lua_websock_data *)(ws_arg);
+    struct mg_shared_lua_websocket_list **shared_websock_list = &(ws->conn->ctx->shared_lua_websockets);
+    int err = 0;
+
+    assert(ws != NULL);
+    assert(ws->state != NULL);
+
+    (void)pthread_mutex_lock(&ws->ws_mutex);
+    lua_getglobal(ws->state, "close");
+    err = lua_pcall(ws->state, 0, 0, 0);
+    if (err != 0) {
+        mg_cry(ws->conn, "Lua websocket: Error %i calling close handler of %s", err, ws->script);
+    }
+    ws->references--;
+    if (ws->references==0) {
+        (void)pthread_mutex_lock(&ws->conn->ctx->nonce_mutex);
+        (void)pthread_mutex_unlock(&ws->ws_mutex);
 
-            lua_close(lws_data->main);
-            mg_free(lws_data->script);
-            lws_data->script=0;
-            mg_free(lws_data);
-         */
+        while (*shared_websock_list) {
+            if (0==strcmp(ws->script,(*shared_websock_list)->ws.script)) {
+                break;
+            }
+            shared_websock_list = &((*shared_websock_list)->next);
         }
-        (void)pthread_mutex_unlock(&conn->ctx->mutex);
+        assert(*shared_websock_list != NULL);
+        (void)pthread_mutex_unlock(&ws->conn->ctx->nonce_mutex);
+        lua_close(ws->state);
+        mg_free(ws->script);
+        *shared_websock_list = (*shared_websock_list)->next;
+        mg_free(ws);
     } else {
-        lua_close(lws_data->main);
-        mg_free(lws_data->script);
-        mg_free(lws_data);
+        (void)pthread_mutex_unlock(&ws->ws_mutex);
     }
-    conn->lua_websocket_state = NULL;
 }
 #endif

+ 25 - 13
test/websocket.lua

@@ -1,3 +1,8 @@
+function trace(text)
+    local f = io.open("R:\\websocket.trace", "a")
+    f:write(os.date() .. " - " .. text .. "\n")
+    f:close()
+end
 
 function iswebsocket()
   return pcall(function()
@@ -6,36 +11,44 @@ function iswebsocket()
 end
 
 if not iswebsocket() then
+  trace("no websocket")
   mg.write("HTTP/1.0 403 Forbidden\r\n")
   mg.write("Connection: close\r\n")
   mg.write("\r\n")
+  mg.write("forbidden")
   return
 end
 
 
+-- Callback to reject a connection
+function open()
+  trace("open")
+  return true
+end
+
 -- Callback for "Websocket ready"
 function ready()
+  trace("ready")
   mg.write("text", "Websocket ready")
+  senddata()
+  return true
 end
 
 -- Callback for "Websocket received data"
 function data(bits, content)
+    trace("data(" .. bits .. "): " .. content)
+    senddata()
+    return true
 end
 
 -- Callback for "Websocket is closing"
 function close()
+    trace("close")
+    mg.write("text", "end")
 end
 
-
-coroutine.yield(true); -- first yield returns (true) or (false) to accept or reject the connection
-
-ready()
-
-local lasthand = ""
-
-repeat
-    local cont, bits, content = coroutine.yield(true, 1.0)
-
+function senddata()
+    trace("senddata")
     local date = os.date('*t');
     local hand = (date.hour%12)*60+date.min;
 
@@ -50,7 +63,6 @@ repeat
     if bits and content then
         data(bits, content)
     end
-until not cont;
+end
 
-mg.write("text", "end")
-close()
+trace("defined")

+ 5 - 2
test/websocket.xhtml

@@ -1,4 +1,4 @@
-<!DOCTYPE HTML>
+<!DOCTYPE HTML>
 <html xmlns="http://www.w3.org/1999/xhtml">
 <head>
   <meta charset="UTF-8"></meta>
@@ -23,6 +23,7 @@
     function webSockKeepAlive() {
       if (keepAlive) {
         connection.send('client still alive');
+        console.log('send keep alive')
         setTimeout("webSockKeepAlive()", 10000);
       }
     }
@@ -49,7 +50,7 @@
       connection.onmessage = function (e) {
         var lCmd = e.data.substring(0,3);
         if (lCmd == "-->") {
-          //console.log(e.data);
+          console.log(e.data);
           var lDirection = Number(e.data.substring(5));
           if (e.data[3] == 'h') {
             hand_hour.setAttribute("transform", "rotate(" + lDirection + " 800 600)");
@@ -61,6 +62,8 @@
           websock_text_field.textContent = e.data;
         }
       };
+
+      console.log("load");
     }
 
   ]]></script>