Parcourir la source

Using poll() instead of select() to prevent FD_SETSIZE overflows, for programs that open large number of descriptors

Sergey Lyubka il y a 12 ans
Parent
commit
5646caa572
1 fichiers modifiés avec 76 ajouts et 68 suppressions
  1. 76 68
      mongoose.c

+ 76 - 68
mongoose.c

@@ -176,13 +176,23 @@ typedef struct DIR {
   struct dirent  result;
 } DIR;
 
+#ifndef HAS_POLL
+struct pollfd {
+  int fd;
+  short events;
+  short revents;
+};
+#define POLLIN 1
+#endif
+
+
 // Mark required libraries
 #pragma comment(lib, "Ws2_32.lib")
 
 #else    // UNIX  specific
 #include <sys/wait.h>
 #include <sys/socket.h>
-#include <sys/select.h>
+#include <sys/poll.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <sys/time.h>
@@ -435,7 +445,6 @@ struct file {
 // Describes listening socket, or socket which was accept()-ed by the master
 // thread and queued for future handling by the worker thread.
 struct socket {
-  struct socket *next;  // Linkage
   SOCKET sock;          // Listening socket
   union usa lsa;        // Local socket address
   union usa rsa;        // Remote socket address
@@ -490,6 +499,7 @@ struct mg_context {
   void *user_data;              // User-defined data
 
   struct socket *listening_sockets;
+  int num_listening_sockets;
 
   volatile int num_threads;  // Number of threads
   pthread_mutex_t mutex;     // Protects (max|num)_threads
@@ -1231,6 +1241,33 @@ static struct dirent *readdir(DIR *dir) {
   return result;
 }
 
+#ifndef HAVE_POLL
+static int poll(struct pollfd *pfd, int n, int milliseconds) {
+  struct timeval tv;
+  fd_set set;
+  int i, result;
+
+  tv.tv_sec = milliseconds / 1000;
+  tv.tv_usec = (milliseconds % 1000) * 1000;
+  FD_ZERO(&set);
+
+  for (i = 0; i < n; i++) {
+    FD_SET((SOCKET) pfd[i].fd, &set);
+    pfd[i].revents = 0;
+  }
+
+  if ((result = select(0, &set, NULL, NULL, &tv)) > 0) {
+    for (i = 0; i < n; i++) {
+      if (FD_ISSET(pfd[i].fd, &set)) {
+        pfd[i].revents = POLLIN;
+      }
+    }
+  }
+
+  return result;
+}
+#endif // HAVE_POLL
+
 #define set_close_on_exec(x) // No FD_CLOEXEC on Windows
 
 int mg_start_thread(mg_thread_func_t f, void *p) {
@@ -1470,17 +1507,14 @@ static int64_t push(FILE *fp, SOCKET sock, SSL *ssl, const char *buf,
 // reading, must give up and close the connection and exit serving thread.
 static int wait_until_socket_is_readable(struct mg_connection *conn) {
   int result;
-  struct timeval tv;
-  fd_set set;
+  struct pollfd pfd;
 
   do {
-    tv.tv_sec = 0;
-    tv.tv_usec = 300 * 1000;
-    FD_ZERO(&set);
-    FD_SET(conn->client.sock, &set);
-    result = select(conn->client.sock + 1, &set, NULL, NULL, &tv);
-    if(result == 0 && conn->ssl != NULL) {
-        result = SSL_pending(conn->ssl);
+    pfd.fd = conn->client.sock;
+    pfd.events = POLLIN;
+    result = poll(&pfd, 1, 200);
+    if (result == 0 && conn->ssl != NULL) {
+      result = SSL_pending(conn->ssl);
     }
   } while ((result == 0 || (result < 0 && ERRNO == EINTR)) &&
            conn->ctx->stop_flag == 0);
@@ -1609,7 +1643,8 @@ int mg_printf(struct mg_connection *conn, const char *fmt, ...) {
     // vsnprintf() error, give up
     len = -1;
     cry(conn, "%s(%s, ...): vsnprintf() error", __func__, fmt);
-  } else if (len > (int) sizeof(mem) && (buf = (char *) malloc(len + 1)) != NULL) {
+  } else if (len > (int) sizeof(mem) &&
+             (buf = (char *) malloc(len + 1)) != NULL) {
     // Local buffer is not large enough, allocate big buffer on heap
     va_start(ap, fmt);
     vsnprintf(buf, len + 1, fmt, ap);
@@ -4272,12 +4307,11 @@ static void handle_request(struct mg_connection *conn) {
 }
 
 static void close_all_listening_sockets(struct mg_context *ctx) {
-  struct socket *sp, *tmp;
-  for (sp = ctx->listening_sockets; sp != NULL; sp = tmp) {
-    tmp = sp->next;
-    (void) closesocket(sp->sock);
-    free(sp);
+  int i;
+  for (i = 0; i < ctx->num_listening_sockets; i++) {
+    closesocket(ctx->listening_sockets[i].sock);
   }
+  free(ctx->listening_sockets);
 }
 
 // Valid listening port specification is: [ip_address:]port[s]
@@ -4316,9 +4350,8 @@ static int parse_port_string(const struct vec *vec, struct socket *so) {
 static int set_ports_option(struct mg_context *ctx) {
   const char *list = ctx->config[LISTENING_PORTS];
   int on = 1, success = 1;
-  SOCKET sock;
   struct vec vec;
-  struct socket so, *listener;
+  struct socket so;
 
   while (success && (list = next_option(list, &vec, NULL)) != NULL) {
     if (!parse_port_string(&vec, &so)) {
@@ -4329,11 +4362,11 @@ static int set_ports_option(struct mg_context *ctx) {
                (ctx->ssl_ctx == NULL || ctx->config[SSL_CERTIFICATE] == NULL)) {
       cry(fc(ctx), "Cannot add SSL socket, is -ssl_certificate option set?");
       success = 0;
-    } else if ((sock = socket(so.lsa.sa.sa_family, SOCK_STREAM, 6)) ==
+    } else if ((so.sock = socket(so.lsa.sa.sa_family, SOCK_STREAM, 6)) ==
                INVALID_SOCKET ||
                // On Windows, SO_REUSEADDR is recommended only for
                // broadcast UDP sockets
-               setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &on,
+               setsockopt(so.sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &on,
                           sizeof(on)) != 0 ||
                // Set TCP keep-alive. This is needed because if HTTP-level
                // keep-alive is enabled, and client resets the connection,
@@ -4342,27 +4375,22 @@ static int set_ports_option(struct mg_context *ctx) {
                // handshake will figure out that the client is down and
                // will close the server end.
                // Thanks to Igor Klopov who suggested the patch.
-               setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &on,
+               setsockopt(so.sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &on,
                           sizeof(on)) != 0 ||
-               bind(sock, &so.lsa.sa, sizeof(so.lsa)) != 0 ||
-               listen(sock, SOMAXCONN) != 0) {
-      closesocket(sock);
+               bind(so.sock, &so.lsa.sa, sizeof(so.lsa)) != 0 ||
+               listen(so.sock, SOMAXCONN) != 0) {
+      closesocket(so.sock);
       cry(fc(ctx), "%s: cannot bind to %.*s: %s", __func__,
           (int) vec.len, vec.ptr, strerror(ERRNO));
       success = 0;
-    } else if ((listener = (struct socket *)
-                calloc(1, sizeof(*listener))) == NULL) {
-      // NOTE(lsm): order is important: call cry before closesocket(),
-      // cause closesocket() alters the errno.
-      cry(fc(ctx), "%s: %s", __func__, strerror(ERRNO));
-      closesocket(sock);
-      success = 0;
     } else {
-      *listener = so;
-      listener->sock = sock;
-      set_close_on_exec(listener->sock);
-      listener->next = ctx->listening_sockets;
-      ctx->listening_sockets = listener;
+      set_close_on_exec(so.sock);
+      // TODO: handle realloc failure
+      ctx->listening_sockets = realloc(ctx->listening_sockets,
+                                       (ctx->num_listening_sockets + 1) *
+                                       sizeof(ctx->listening_sockets[0]));
+      ctx->listening_sockets[ctx->num_listening_sockets] = so;
+      ctx->num_listening_sockets++;
     }
   }
 
@@ -4443,13 +4471,6 @@ static int check_acl(struct mg_context *ctx, uint32_t remote_ip) {
   return allowed == '+';
 }
 
-static void add_to_set(SOCKET fd, fd_set *set, int *max_fd) {
-  FD_SET(fd, set);
-  if (fd > (SOCKET) *max_fd) {
-    *max_fd = (int) fd;
-  }
-}
-
 #if !defined(_WIN32)
 static int set_uid_option(struct mg_context *ctx) {
   struct passwd *pw;
@@ -4982,10 +5003,8 @@ static void accept_new_connection(const struct socket *listener,
 
 static void *master_thread(void *thread_func_param) {
   struct mg_context *ctx = thread_func_param;
-  fd_set read_set;
-  struct timeval tv;
-  struct socket *sp;
-  int max_fd;
+  struct pollfd *pfd;
+  int i;
 
   // Increase priority of the master thread
 #if defined(_WIN32)
@@ -4998,33 +5017,22 @@ static void *master_thread(void *thread_func_param) {
   pthread_setschedparam(pthread_self(), SCHED_RR, &sched_param);
 #endif
 
+  pfd = calloc(ctx->num_listening_sockets, sizeof(pfd[0]));
   while (ctx->stop_flag == 0) {
-    FD_ZERO(&read_set);
-    max_fd = -1;
-
-    // Add listening sockets to the read set
-    for (sp = ctx->listening_sockets; sp != NULL; sp = sp->next) {
-      add_to_set(sp->sock, &read_set, &max_fd);
+    for (i = 0; i < ctx->num_listening_sockets; i++) {
+      pfd[i].fd = ctx->listening_sockets[i].sock;
+      pfd[i].events = POLLIN;
     }
 
-    tv.tv_sec = 0;
-    tv.tv_usec = 200 * 1000;
-
-    if (select(max_fd + 1, &read_set, NULL, NULL, &tv) < 0) {
-#ifdef _WIN32
-      // On windows, if read_set and write_set are empty,
-      // select() returns "Invalid parameter" error
-      // (at least on my Windows XP Pro). So in this case, we sleep here.
-      mg_sleep(1000);
-#endif // _WIN32
-    } else {
-      for (sp = ctx->listening_sockets; sp != NULL; sp = sp->next) {
-        if (ctx->stop_flag == 0 && FD_ISSET(sp->sock, &read_set)) {
-          accept_new_connection(sp, ctx);
+    if (poll(pfd, ctx->num_listening_sockets, 200) > 0) {
+      for (i = 0; i < ctx->num_listening_sockets; i++) {
+        if (ctx->stop_flag == 0 && pfd[i].revents == POLLIN) {
+          accept_new_connection(&ctx->listening_sockets[i], ctx);
         }
       }
     }
   }
+  free(pfd);
   DEBUG_TRACE(("stopping workers"));
 
   // Stop signal received: somebody called mg_stop. Quit.