Преглед на файлове

Merge pull request #908 from jdetaeye/websocket_compression

first prototype for websocket compression
bel2125 преди 4 години
родител
ревизия
8e1f656553
променени са 2 файла, в които са добавени 328 реда и са изтрити 26 реда
  1. 177 23
      src/civetweb.c
  2. 151 3
      src/mod_zlib.inl

+ 177 - 23
src/civetweb.c

@@ -441,6 +441,11 @@ _civet_safe_clock_gettime(int clk_id, struct timespec *t)
 	(((err) == EAGAIN) || ((err) == EWOULDBLOCK) || ((err) == EINTR))
 	(((err) == EAGAIN) || ((err) == EWOULDBLOCK) || ((err) == EINTR))
 #endif
 #endif
 
 
+#if defined(USE_ZLIB)
+#include "zconf.h"
+#include "zlib.h"
+#endif
+
 
 
 /********************************************************************/
 /********************************************************************/
 /* CivetWeb configuration defines */
 /* CivetWeb configuration defines */
@@ -3027,6 +3032,18 @@ struct mg_connection {
 #if defined(USE_WEBSOCKET)
 #if defined(USE_WEBSOCKET)
 	int in_websocket_handling; /* 1 if in read_websocket */
 	int in_websocket_handling; /* 1 if in read_websocket */
 #endif
 #endif
+#if defined(USE_ZLIB) && defined(USE_WEBSOCKET)                                \
+    && defined(MG_EXPERIMENTAL_INTERFACES)
+	/* Parameters for websocket data compression according to rfc7692 */
+	int websocket_deflate_server_max_windows_bits;
+	int websocket_deflate_client_max_windows_bits;
+	int websocket_deflate_server_no_context_takeover;
+	int websocket_deflate_client_no_context_takeover;
+	int websocket_deflate_initialized;
+	int websocket_deflate_flush;
+	z_stream websocket_deflate_state;
+	z_stream websocket_inflate_state;
+#endif
 	int handled_requests; /* Number of requests handled by this connection
 	int handled_requests; /* Number of requests handled by this connection
 	                       */
 	                       */
 	int buf_size;         /* Buffer size */
 	int buf_size;         /* Buffer size */
@@ -12550,6 +12567,12 @@ send_websocket_handshake(struct mg_connection *conn, const char *websock_key)
 	          "Connection: Upgrade\r\n"
 	          "Connection: Upgrade\r\n"
 	          "Sec-WebSocket-Accept: %s\r\n",
 	          "Sec-WebSocket-Accept: %s\r\n",
 	          b64_sha);
 	          b64_sha);
+
+#if defined(USE_ZLIB) && defined(MG_EXPERIMENTAL_INTERFACES)
+	// Send negotiated compression extension parameters
+	websocket_deflate_response(conn);
+#endif
+
 	if (conn->request_info.acceptedWebSocketSubprotocol) {
 	if (conn->request_info.acceptedWebSocketSubprotocol) {
 		mg_printf(conn,
 		mg_printf(conn,
 		          "Sec-WebSocket-Protocol: %s\r\n\r\n",
 		          "Sec-WebSocket-Protocol: %s\r\n\r\n",
@@ -12788,13 +12811,93 @@ read_websocket(struct mg_connection *conn,
 			} else {
 			} else {
 				/* Exit the loop if callback signals to exit (server side),
 				/* Exit the loop if callback signals to exit (server side),
 				 * or "connection close" opcode received (client side). */
 				 * or "connection close" opcode received (client side). */
-				if ((ws_data_handler != NULL)
-				    && !ws_data_handler(conn,
-				                        mop,
-				                        (char *)data,
-				                        (size_t)data_len,
-				                        callback_data)) {
-					exit_by_callback = 1;
+				if (ws_data_handler != NULL) {
+#if defined(USE_ZLIB) && defined(MG_EXPERIMENTAL_INTERFACES)
+					if (mop & 0x40) {
+						/* Inflate the data received if bit RSV1 is set. */
+						if (!conn->websocket_deflate_initialized) {
+							if (websocket_deflate_initialize(conn, 1) != Z_OK)
+								exit_by_callback = 1;
+						}
+						if (!exit_by_callback) {
+							size_t inflate_buf_size_old = 0;
+							size_t inflate_buf_size =
+							    data_len
+							    * 4; // Initial guess of the inflated message
+							         // size. We double the memory when needed.
+							Bytef *inflated;
+							Bytef *new_mem;
+							conn->websocket_inflate_state.avail_in =
+							    (uInt)(data_len + 4);
+							conn->websocket_inflate_state.next_in = data;
+							// Add trailing 0x00 0x00 0xff 0xff bytes
+							data[data_len] = '\x00';
+							data[data_len + 1] = '\x00';
+							data[data_len + 2] = '\xff';
+							data[data_len + 3] = '\xff';
+							do {
+								if (inflate_buf_size_old == 0) {
+									new_mem = mg_calloc(inflate_buf_size,
+									                    sizeof(Bytef));
+								} else {
+									inflate_buf_size *= 2;
+									new_mem =
+									    mg_realloc(inflated, inflate_buf_size);
+								}
+								if (new_mem == NULL) {
+									mg_cry_internal(
+									    conn,
+									    "Out of memory: Cannot allocate "
+									    "inflate buffer of %i bytes",
+									    inflate_buf_size);
+									exit_by_callback = 1;
+									break;
+								}
+								inflated = new_mem;
+								conn->websocket_inflate_state.avail_out =
+								    (uInt)(inflate_buf_size
+								           - inflate_buf_size_old);
+								conn->websocket_inflate_state.next_out =
+								    inflated + inflate_buf_size_old;
+								int ret =
+								    inflate(&conn->websocket_inflate_state,
+								            Z_SYNC_FLUSH);
+								if (ret == Z_NEED_DICT || ret == Z_DATA_ERROR
+								    || ret == Z_MEM_ERROR || ret < 0) {
+									mg_cry_internal(
+									    conn,
+									    "ZLIB inflate error: %i %s",
+									    ret,
+									    (conn->websocket_inflate_state.msg
+									         ? conn->websocket_inflate_state.msg
+									         : "<no error message>"));
+									exit_by_callback = 1;
+									break;
+								}
+								inflate_buf_size_old = inflate_buf_size;
+
+							} while (conn->websocket_inflate_state.avail_out
+							         == 0);
+							inflate_buf_size -=
+							    conn->websocket_inflate_state.avail_out;
+							if (!ws_data_handler(conn,
+							                     mop,
+							                     (char *)inflated,
+							                     inflate_buf_size,
+							                     callback_data)) {
+								exit_by_callback = 1;
+							}
+							mg_free(inflated);
+						}
+					} else
+#endif
+					    if (!ws_data_handler(conn,
+					                         mop,
+					                         (char *)data,
+					                         (size_t)data_len,
+					                         callback_data)) {
+						exit_by_callback = 1;
+					}
 				}
 				}
 			}
 			}
 
 
@@ -12899,7 +13002,53 @@ mg_websocket_write_exec(struct mg_connection *conn,
 #pragma GCC diagnostic ignored "-Wconversion"
 #pragma GCC diagnostic ignored "-Wconversion"
 #endif
 #endif
 
 
-	header[0] = 0x80u | (unsigned char)((unsigned)opcode & 0xf);
+	/* Note that POSIX/Winsock's send() is threadsafe
+	 * http://stackoverflow.com/questions/1981372/are-parallel-calls-to-send-recv-on-the-same-socket-valid
+	 * but mongoose's mg_printf/mg_write is not (because of the loop in
+	 * push(), although that is only a problem if the packet is large or
+	 * outgoing buffer is full). */
+
+	/* TODO: Check if this lock should be moved to user land.
+	 * Currently the server sets this lock for websockets, but
+	 * not for any other connection. It must be set for every
+	 * conn read/written by more than one thread, no matter if
+	 * it is a websocket or regular connection. */
+	(void)mg_lock_connection(conn);
+
+#if defined(USE_ZLIB) && defined(MG_EXPERIMENTAL_INTERFACES)
+	size_t deflated_size;
+	Bytef *deflated;
+	// Deflate websocket messages over 100kb
+	int use_deflate = dataLen > 100 * 1024 && conn->accept_gzip;
+
+	if (use_deflate) {
+		if (!conn->websocket_deflate_initialized) {
+			if (websocket_deflate_initialize(conn, 1) != Z_OK)
+				return 0;
+		}
+
+		// Deflating the message
+		header[0] = 0xC0u | (unsigned char)((unsigned)opcode & 0xf);
+		conn->websocket_deflate_state.avail_in = (uInt)dataLen;
+		conn->websocket_deflate_state.next_in = (unsigned char *)data;
+		deflated_size = compressBound((uLong)dataLen);
+		deflated = mg_calloc(deflated_size, sizeof(Bytef));
+		if (deflated == NULL) {
+			mg_cry_internal(
+			    conn,
+			    "Out of memory: Cannot allocate deflate buffer of %i bytes",
+			    deflated_size);
+			mg_unlock_connection(conn);
+			return -1;
+		}
+		conn->websocket_deflate_state.avail_out = (uInt)deflated_size;
+		conn->websocket_deflate_state.next_out = deflated;
+		deflate(&conn->websocket_deflate_state, conn->websocket_deflate_flush);
+		dataLen = deflated_size - conn->websocket_deflate_state.avail_out
+		          - 4; // Strip trailing 0x00 0x00 0xff 0xff bytes
+	} else
+#endif
+		header[0] = 0x80u | (unsigned char)((unsigned)opcode & 0xf);
 
 
 #if defined(GCC_DIAGNOSTIC)
 #if defined(GCC_DIAGNOSTIC)
 #pragma GCC diagnostic pop
 #pragma GCC diagnostic pop
@@ -12933,26 +13082,19 @@ mg_websocket_write_exec(struct mg_connection *conn,
 		headerLen += 4;
 		headerLen += 4;
 	}
 	}
 
 
-	/* Note that POSIX/Winsock's send() is threadsafe
-	 * http://stackoverflow.com/questions/1981372/are-parallel-calls-to-send-recv-on-the-same-socket-valid
-	 * but mongoose's mg_printf/mg_write is not (because of the loop in
-	 * push(), although that is only a problem if the packet is large or
-	 * outgoing buffer is full). */
-
-	/* TODO: Check if this lock should be moved to user land.
-	 * Currently the server sets this lock for websockets, but
-	 * not for any other connection. It must be set for every
-	 * conn read/written by more than one thread, no matter if
-	 * it is a websocket or regular connection. */
-	(void)mg_lock_connection(conn);
-
 	retval = mg_write(conn, header, headerLen);
 	retval = mg_write(conn, header, headerLen);
 	if (retval != (int)headerLen) {
 	if (retval != (int)headerLen) {
 		/* Did not send complete header */
 		/* Did not send complete header */
 		retval = -1;
 		retval = -1;
 	} else {
 	} else {
 		if (dataLen > 0) {
 		if (dataLen > 0) {
-			retval = mg_write(conn, data, dataLen);
+#if defined(USE_ZLIB) && defined(MG_EXPERIMENTAL_INTERFACES)
+			if (use_deflate) {
+				retval = mg_write(conn, deflated, dataLen);
+				mg_free(deflated);
+			} else
+#endif
+				retval = mg_write(conn, data, dataLen);
 		}
 		}
 		/* if dataLen == 0, the header length (2) is returned */
 		/* if dataLen == 0, the header length (2) is returned */
 	}
 	}
@@ -13169,6 +13311,10 @@ handle_websocket_request(struct mg_connection *conn,
 			}
 			}
 		}
 		}
 
 
+#if defined(USE_ZLIB) && defined(MG_EXPERIMENTAL_INTERFACES)
+		websocket_deflate_negotiate(conn);
+#endif
+
 		if ((ws_connect_handler != NULL)
 		if ((ws_connect_handler != NULL)
 		    && (ws_connect_handler(conn, cbData) != 0)) {
 		    && (ws_connect_handler(conn, cbData) != 0)) {
 			/* C callback has returned non-zero, do not proceed with
 			/* C callback has returned non-zero, do not proceed with
@@ -13239,7 +13385,15 @@ handle_websocket_request(struct mg_connection *conn,
 #endif
 #endif
 	}
 	}
 
 
-	/* Step 8: Call the close handler */
+#if defined(USE_ZLIB) && defined(MG_EXPERIMENTAL_INTERFACES)
+	/* Step 8: Close the deflate & inflate buffers */
+	if (conn->websocket_deflate_initialized) {
+		deflateEnd(&conn->websocket_deflate_state);
+		inflateEnd(&conn->websocket_inflate_state);
+	}
+#endif
+
+	/* Step 9: Call the close handler */
 	if (ws_close_handler) {
 	if (ws_close_handler) {
 		ws_close_handler(conn, cbData);
 		ws_close_handler(conn, cbData);
 	}
 	}

+ 151 - 3
src/mod_zlib.inl

@@ -3,9 +3,6 @@
 #error "This file must only be included, if USE_ZLIB is set"
 #error "This file must only be included, if USE_ZLIB is set"
 #endif
 #endif
 
 
-#include "zconf.h"
-#include "zlib.h"
-
 #if !defined(MEM_LEVEL)
 #if !defined(MEM_LEVEL)
 #define MEM_LEVEL (8)
 #define MEM_LEVEL (8)
 #endif
 #endif
@@ -127,3 +124,154 @@ send_compressed_data(struct mg_connection *conn, struct mg_file *filep)
 	/* Send "end of chunked data" marker */
 	/* Send "end of chunked data" marker */
 	mg_write(conn, "0\r\n\r\n", 5);
 	mg_write(conn, "0\r\n\r\n", 5);
 }
 }
+
+#if defined(USE_WEBSOCKET) && defined(MG_EXPERIMENTAL_INTERFACES)
+int
+websocket_deflate_initialize(struct mg_connection *conn, int server)
+{
+	int zret =
+	    deflateInit2(&conn->websocket_deflate_state,
+	                 Z_BEST_COMPRESSION,
+	                 Z_DEFLATED,
+	                 server
+	                     ? -1 * conn->websocket_deflate_server_max_windows_bits
+	                     : -1 * conn->websocket_deflate_client_max_windows_bits,
+	                 MEM_LEVEL,
+	                 Z_DEFAULT_STRATEGY);
+	if (zret != Z_OK) {
+		mg_cry_internal(conn,
+		                "Websocket deflate init failed (%i): %s",
+		                zret,
+		                (conn->websocket_deflate_state.msg
+		                     ? conn->websocket_deflate_state.msg
+		                     : "<no error message>"));
+		deflateEnd(&conn->websocket_deflate_state);
+		return zret;
+	}
+
+	zret = inflateInit2(
+	    &conn->websocket_inflate_state,
+	    server ? -1 * conn->websocket_deflate_client_max_windows_bits
+	           : -1 * conn->websocket_deflate_server_max_windows_bits);
+	if (zret != Z_OK) {
+		mg_cry_internal(conn,
+		                "Websocket inflate init failed (%i): %s",
+		                zret,
+		                (conn->websocket_inflate_state.msg
+		                     ? conn->websocket_inflate_state.msg
+		                     : "<no error message>"));
+		inflateEnd(&conn->websocket_inflate_state);
+		return zret;
+	}
+	if ((conn->websocket_deflate_server_no_context_takeover && server)
+	    || (conn->websocket_deflate_client_no_context_takeover && !server))
+		conn->websocket_deflate_flush = Z_FULL_FLUSH;
+	else
+		conn->websocket_deflate_flush = Z_SYNC_FLUSH;
+
+	conn->websocket_deflate_initialized = 1;
+	return Z_OK;
+}
+
+void
+websocket_deflate_negotiate(struct mg_connection *conn)
+{
+	const char *extensions = mg_get_header(conn, "Sec-WebSocket-Extensions");
+	int val;
+	if (extensions && !strncmp(extensions, "permessage-deflate", 18)) {
+		conn->accept_gzip = 1;
+		conn->websocket_deflate_client_max_windows_bits = 15;
+		conn->websocket_deflate_server_max_windows_bits = 15;
+		conn->websocket_deflate_server_no_context_takeover = 0;
+		conn->websocket_deflate_client_no_context_takeover = 0;
+		extensions += 18;
+		while (*extensions != '\0') {
+			if (*extensions == ';' || *extensions == ' ')
+				++extensions;
+			else if (!strncmp(extensions, "server_no_context_takeover", 26)) {
+				extensions += 26;
+				conn->websocket_deflate_server_no_context_takeover = 1;
+			} else if (!strncmp(extensions, "client_no_context_takeover", 26)) {
+				extensions += 26;
+				conn->websocket_deflate_client_no_context_takeover = 1;
+			} else if (!strncmp(extensions, "server-max-window-bits", 22)) {
+				extensions += 22;
+				if (*extensions == '=') {
+					++extensions;
+					if (*extensions == '"')
+						++extensions;
+					val = 0;
+					while (*extensions >= '0' && *extensions <= '9') {
+						val = val * 10 + (*extensions - '0');
+						++extensions;
+					}
+					if (val < 9 || val > 15) {
+						// The permessage-deflate spec specifies that a
+						// value of 8 is also allowed, but zlib doesn't accept
+						// that.
+						mg_cry_internal(conn,
+						                "server-max-window-bits must be "
+						                "between 9 and 15. Got %i",
+						                val);
+					} else
+						conn->websocket_deflate_server_max_windows_bits = val;
+					if (*extensions == '"')
+						++extensions;
+				}
+			} else if (!strncmp(extensions, "client-max-window-bits", 22)) {
+				extensions += 22;
+				if (*extensions == '=') {
+					++extensions;
+					if (*extensions == '"')
+						++extensions;
+					val = 0;
+					while (*extensions >= '0' && *extensions <= '9') {
+						val = val * 10 + (*extensions - '0');
+						++extensions;
+					}
+					if (val < 9 || val > 15)
+						// The permessage-deflate spec specifies that a
+						// value of 8 is also allowed, but zlib doesn't
+						// accept that.
+						mg_cry_internal(conn,
+						                "client-max-window-bits must be "
+						                "between 9 and 15. Got %i",
+						                val);
+					else
+						conn->websocket_deflate_client_max_windows_bits = val;
+					if (*extensions == '"')
+						++extensions;
+				}
+			} else {
+				mg_cry_internal(conn,
+				                "Unknown parameter %s for permessage-deflate",
+				                extensions);
+				break;
+			}
+		}
+	} else {
+		conn->accept_gzip = 0;
+	}
+	conn->websocket_deflate_initialized = 0;
+}
+
+void
+websocket_deflate_response(struct mg_connection *conn)
+{
+	if (conn->accept_gzip) {
+		mg_printf(conn,
+		          "Sec-WebSocket-Extensions: permessage-deflate; "
+		          "server_max_window_bits=%i; "
+		          "client_max_window_bits=%i"
+		          "%s%s\r\n",
+		          conn->websocket_deflate_server_max_windows_bits,
+		          conn->websocket_deflate_client_max_windows_bits,
+		          conn->websocket_deflate_client_no_context_takeover
+		              ? "; client_no_context_takeover"
+		              : "",
+		          conn->websocket_deflate_server_no_context_takeover
+		              ? "; server_no_context_takeover"
+		              : "");
+	};
+}
+#endif