Bladeren bron

Rewrite websocket for Lua (Step 15 of ?)

bel 11 jaren geleden
bovenliggende
commit
e7f26f92e0
4 gewijzigde bestanden met toevoegingen van 125 en 28 verwijderingen
  1. 5 7
      src/civetweb.c
  2. 30 5
      src/mod_lua.inl
  3. 83 15
      src/timer.inl
  4. 7 1
      test/websocket.lua

+ 5 - 7
src/civetweb.c

@@ -796,9 +796,7 @@ struct mg_context {
 #endif
 
 #ifdef USE_TIMERS
-    pthread_t timerthreadid;        /* Time thread ID */
-    pthread_mutex_t timer_mutex;    /* Protects timer lists */
-    struct timer_list *timers;      /* List of timers */
+    struct timers * timers;
 #endif
 };
 
@@ -4838,6 +4836,10 @@ void mg_unlock(struct mg_connection* conn)
     (void) pthread_mutex_unlock(&conn->mutex);
 }
 
+#if defined(USE_TIMERS)
+#include "timer.inl"
+#endif /* USE_TIMERS */
+
 #ifdef USE_LUA
 #include "mod_lua.inl"
 #endif /* USE_LUA */
@@ -6744,10 +6746,6 @@ static void *master_thread(void *thread_func_param)
 }
 #endif /* _WIN32 */
 
-#if defined(USE_TIMERS)
-#include "timer.inl"
-#endif /* USE_TIMERS */
-
 static void free_context(struct mg_context *ctx)
 {
     int i;

+ 30 - 5
src/mod_lua.inl

@@ -732,14 +732,31 @@ static int lwebsock_write(lua_State *L)
     return 0;
 }
 
+struct laction_arg {
+    lua_State *state;
+    pthread_mutex_t *pmutex;
+    char txt[1];
+};
+
+static void lua_action(struct laction_arg *arg)
+{
+    (void)pthread_mutex_lock(arg->pmutex);
+    luaL_dostring(arg->state, arg->txt);
+    (void)pthread_mutex_unlock(arg->pmutex);
+    mg_free(arg);
+}
+
 static int lwebsocket_set_timer(lua_State *L, int is_periodic)
 {
 #ifdef USE_TIMERS
     int num_args = lua_gettop(L);
-    struct lua_websock_data *ws;    
-    int type1,type2, ok = 0;    
+    struct lua_websock_data *ws;
+    int type1,type2, ok = 0;
     double timediff;
     struct mg_context *ctx;
+    struct laction_arg *arg;
+    const char *txt;
+    size_t txt_len;
 
     lua_pushlightuserdata(L, (void *)&lua_regkey_ctx);
     lua_gettable(L, LUA_REGISTRYINDEX);
@@ -755,10 +772,17 @@ static int lwebsocket_set_timer(lua_State *L, int is_periodic)
 
     type1 = lua_type(L, 1);
     type2 = lua_type(L, 2);
-    
+
     if (type1==LUA_TSTRING && type2==LUA_TNUMBER && num_args==2) {
         timediff = (double)lua_tonumber(L, 2);
-        ok = (0==timer_add(ctx, timediff, is_periodic, lua_tostring(L, 1)));
+        txt = lua_tostring(L, 1);
+        txt_len = strlen(txt);
+        arg = mg_malloc(sizeof(struct laction_arg) + txt_len);
+        arg->state = L;
+        arg->pmutex = &(ws->ws_mutex);
+        memcpy(arg->txt, txt, txt_len);
+        arg->txt[txt_len] = 0;
+        ok = (0==timer_add(ctx, timediff, is_periodic, 1, lua_action, (void*)arg));
     } else if (type1==LUA_TFUNCTION && type2==LUA_TNUMBER)  {
         /* TODO: not implemented yet */
         return luaL_error(L, "invalid arguments for set_timer/interval() call");
@@ -1272,6 +1296,7 @@ static void lua_websocket_close(struct mg_connection * conn, void * ws_arg)
             ws->conn[i] = ws->conn[ws->references];
         }
     }
+/*
     if (ws->references==0) {
         (void)pthread_mutex_lock(&conn->ctx->nonce_mutex);
         (void)pthread_mutex_unlock(&ws->ws_mutex);
@@ -1288,7 +1313,7 @@ static void lua_websocket_close(struct mg_connection * conn, void * ws_arg)
         mg_free(ws->script);
         *shared_websock_list = (*shared_websock_list)->next;
         mg_free(ws);
-    } else {
+    } else */ {
         (void)pthread_mutex_unlock(&ws->ws_mutex);
     }
 }

+ 83 - 15
src/timer.inl

@@ -1,27 +1,93 @@
 
+#if !defined(MAX_TIMERS)
+#define MAX_TIMERS MAX_WORKER_THREADS
+#endif
+
+typedef void (*taction)(void *arg);
+
+struct timer {
+    double time;
+    double period;
+    taction action;
+    void * arg;
+};
+
 struct timers {
-    pthread_t timerthreadid;        /* Time thread ID */
-    pthread_mutex_t timer_mutex;    /* Protects timer lists */
-    struct timer_list *timers;      /* List of timers */
+    pthread_t threadid;               /* Timer thread ID */
+    pthread_mutex_t mutex;            /* Protects timer lists */
+    struct timer timers[MAX_TIMERS];  /* List of timers */
+    unsigned timer_count;             /* Current size of timer list */
 };
 
 static void timer_thread_run(void *thread_func_param)
 {
     struct mg_context *ctx = (struct mg_context *) thread_func_param;
+    struct timespec now;
+    double d;
+    unsigned u;
+    struct timer t;
+
     while (ctx->stop_flag == 0) {
-        pthread_mutex_lock(&ctx->timer_mutex);
-        /* TODO: something useful */
-        pthread_mutex_unlock(&ctx->timer_mutex);
-        mg_sleep(1);
+#if defined(HAVE_CLOCK_NANOSLEEP) /* Linux with librt */
+        while (clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &request, &request)==EINTR) {/*nop*/;}
+#else
+        clock_gettime(CLOCK_MONOTONIC, &now);
+        d = (double)now.tv_sec + (double)now.tv_nsec * 1.0E-9;
+        for (;;) {
+            pthread_mutex_lock(&ctx->timers->mutex);
+            if (ctx->timers->timer_count > 0 && d >= ctx->timers->timers[0].time) {
+                t = ctx->timers->timers[0];
+                for (u=1; u<ctx->timers->timer_count; u++) {
+                    ctx->timers->timers[u-1] = ctx->timers->timers[u];
+                }
+                ctx->timers->timer_count--;
+                pthread_mutex_unlock(&ctx->timers->mutex);
+                t.action(t.arg);
+                continue;
+            } else {
+                pthread_mutex_unlock(&ctx->timers->mutex);
+            }
+            mg_sleep(1);
+            clock_gettime(CLOCK_MONOTONIC, &now);
+            d = (double)now.tv_sec + (double)now.tv_nsec * 1.0E-9;
+        }
+#endif
     }
 }
 
-static int timer_add(struct mg_context * ctx, double rel_time, int is_periodic, const char * action)
+static int timer_add(struct mg_context * ctx, double time, int is_periodic, int is_relative, taction action, void * arg)
 {
-    pthread_mutex_lock(&ctx->timer_mutex);
-    /* TODO: something useful */
-    pthread_mutex_unlock(&ctx->timer_mutex);
-    return 0;
+    double t = time;
+    unsigned u, v;
+    int error = 0;
+    struct timespec now;
+
+    if (is_relative) {
+        clock_gettime(CLOCK_MONOTONIC, &now);
+        t += now.tv_sec;
+        t += now.tv_nsec * 1.0E-9;
+    }
+
+    pthread_mutex_lock(&ctx->timers->mutex);
+    if (ctx->timers->timer_count == MAX_TIMERS) {
+        error = 1;
+    } else {
+        for (u=0; u<ctx->timers->timer_count; u++) {
+            if (ctx->timers->timers[u].time < time) {
+                for (v=ctx->timers->timer_count; v>u; v--) {
+                    ctx->timers->timers[v] = ctx->timers->timers[v-1];
+                }
+                break;
+            }
+        }
+        ctx->timers->timers[u].time = t;
+        ctx->timers->timers[u].period = is_periodic ? time : 0.0;
+        ctx->timers->timers[u].action = action;
+        ctx->timers->timers[u].arg = arg;
+        ctx->timers->timer_count++;
+    }
+    pthread_mutex_unlock(&ctx->timers->mutex);
+    return error;
 }
 
 #ifdef _WIN32
@@ -40,15 +106,17 @@ static void *timer_thread(void *thread_func_param)
 
 static int timers_init(struct mg_context * ctx)
 {
-    (void) pthread_mutex_init(&ctx->timer_mutex, NULL);
+    ctx->timers = (struct timers*) mg_calloc(sizeof(struct timers), 1);
+    (void) pthread_mutex_init(&ctx->timers->mutex, NULL);
 
     /* Start timer thread */
-    mg_start_thread_with_id(timer_thread, ctx, &ctx->timerthreadid);
+    mg_start_thread_with_id(timer_thread, ctx, &ctx->timers->threadid);
 
     return 0;
 }
 
 static void timers_exit(struct mg_context * ctx)
 {
-    (void) pthread_mutex_destroy(&ctx->timer_mutex);
+    (void) pthread_mutex_destroy(&ctx->timers->mutex);
+    mg_free(ctx->timers);
 }

+ 7 - 1
test/websocket.lua

@@ -64,6 +64,7 @@ function ready(tab)
   mg.write(tab.client, 1, "-->h 180");
   mg.write(tab.client, "-->m 180");
   senddata()
+  mg.set_timeout("timer()", 1)
   return true
 end
 
@@ -82,7 +83,6 @@ function close(tab)
 end
 
 function senddata()
-    trace("senddata")
     local date = os.date('*t');
     local hand = (date.hour%12)*60+date.min;
 
@@ -99,3 +99,9 @@ function senddata()
     end
 end
 
+function timer()
+    trace("timer")
+    senddata()
+    mg.set_timeout("timer()", 1)
+end
+