|
@@ -5255,6 +5255,152 @@ static void read_websocket(struct mg_connection *conn)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+static void read_client_websocket(struct mg_connection *conn)
|
|
|
|
+{
|
|
|
|
+ /* Pointer to the beginning of the portion of the incoming websocket
|
|
|
|
+ message queue.
|
|
|
|
+ The original websocket upgrade request is never removed, so the queue
|
|
|
|
+ begins after it. */
|
|
|
|
+ unsigned char *buf = (unsigned char *) conn->buf + conn->request_len;
|
|
|
|
+ int n, error;
|
|
|
|
+
|
|
|
|
+ /* body_len is the length of the entire queue in bytes
|
|
|
|
+ len is the length of the current message
|
|
|
|
+ data_len is the length of the current message's data payload
|
|
|
|
+ header_len is the length of the current message's header */
|
|
|
|
+ size_t i, len, mask_len, data_len, header_len, body_len;
|
|
|
|
+
|
|
|
|
+ /* "The masking key is a 32-bit value chosen at random by the client."
|
|
|
|
+ http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-17#section-5 */
|
|
|
|
+ unsigned char mask[4];
|
|
|
|
+
|
|
|
|
+ /* data points to the place where the message is stored when passed to the
|
|
|
|
+ websocket_data callback. This is either mem on the stack, or a
|
|
|
|
+ dynamically allocated buffer if it is too large. */
|
|
|
|
+ char mem[4096];
|
|
|
|
+ char *data = mem;
|
|
|
|
+ unsigned char mop; /* mask flag and opcode */
|
|
|
|
+
|
|
|
|
+ /* Loop continuously, reading messages from the socket, invoking the
|
|
|
|
+ callback, and waiting repeatedly until an error occurs. */
|
|
|
|
+ //assert(conn->content_len == 0);
|
|
|
|
+ for (;;) {
|
|
|
|
+ header_len = 0;
|
|
|
|
+ assert(conn->data_len >= conn->request_len);
|
|
|
|
+ if ((body_len = conn->data_len - conn->request_len) >= 2) {
|
|
|
|
+ len = buf[1] & 127;
|
|
|
|
+ mask_len = buf[1] & 128 ? 4 : 0;
|
|
|
|
+ if (len < 126 && body_len >= mask_len) {
|
|
|
|
+ data_len = len;
|
|
|
|
+ header_len = 2 + mask_len;
|
|
|
|
+ } else if (len == 126 && body_len >= 4 + mask_len) {
|
|
|
|
+ header_len = 4 + mask_len;
|
|
|
|
+ data_len = ((((int) buf[2]) << 8) + buf[3]);
|
|
|
|
+ } else if (body_len >= 10 + mask_len) {
|
|
|
|
+ header_len = 10 + mask_len;
|
|
|
|
+ data_len = (((uint64_t) ntohl(* (uint32_t *) &buf[2])) << 32) +
|
|
|
|
+ ntohl(* (uint32_t *) &buf[6]);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (header_len > 0 && body_len >= header_len) {
|
|
|
|
+ /* Allocate space to hold websocket payload */
|
|
|
|
+ data = mem;
|
|
|
|
+ if (data_len > sizeof(mem)) {
|
|
|
|
+ data = (char *)mg_malloc(data_len);
|
|
|
|
+ if (data == NULL) {
|
|
|
|
+ /* Allocation failed, exit the loop and then close the
|
|
|
|
+ connection */
|
|
|
|
+ mg_cry(conn, "websocket out of memory; closing connection");
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /* Copy the mask before we shift the queue and destroy it */
|
|
|
|
+ if (mask_len > 0) {
|
|
|
|
+ memcpy(mask, buf + header_len - mask_len, sizeof(mask));
|
|
|
|
+ } else {
|
|
|
|
+ memset(mask, 0, sizeof(mask));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /* Read frame payload from the first message in the queue into
|
|
|
|
+ data and advance the queue by moving the memory in place. */
|
|
|
|
+ assert(body_len >= header_len);
|
|
|
|
+ if (data_len + header_len > body_len) {
|
|
|
|
+ mop = buf[0]; /* current mask and opcode */
|
|
|
|
+ /* Overflow case */
|
|
|
|
+ len = body_len - header_len;
|
|
|
|
+ memcpy(data, buf + header_len, len);
|
|
|
|
+ error = 0;
|
|
|
|
+ while (len < data_len) {
|
|
|
|
+ int n = pull(NULL, conn, data + len, (int)(data_len - len));
|
|
|
|
+ if (n <= 0) {
|
|
|
|
+ error = 1;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ len += n;
|
|
|
|
+ }
|
|
|
|
+ if (error) {
|
|
|
|
+ mg_cry(conn, "Websocket pull failed; closing connection");
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ conn->data_len = conn->request_len;
|
|
|
|
+ } else {
|
|
|
|
+ mop = buf[0]; /* current mask and opcode, overwritten by memmove() */
|
|
|
|
+ /* Length of the message being read at the front of the
|
|
|
|
+ queue */
|
|
|
|
+ len = data_len + header_len;
|
|
|
|
+
|
|
|
|
+ /* Copy the data payload into the data pointer for the
|
|
|
|
+ callback */
|
|
|
|
+ memcpy(data, buf + header_len, data_len);
|
|
|
|
+
|
|
|
|
+ /* Move the queue forward len bytes */
|
|
|
|
+ memmove(buf, buf + len, body_len - len);
|
|
|
|
+
|
|
|
|
+ /* Mark the queue as advanced */
|
|
|
|
+ conn->data_len -= (int)len;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /* Apply mask if necessary */
|
|
|
|
+ if (mask_len > 0) {
|
|
|
|
+ for (i = 0; i < data_len; ++i) {
|
|
|
|
+ data[i] ^= mask[i & 3];
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /* Exit the loop if callback signalled to exit,
|
|
|
|
+ or "connection close" opcode received. */
|
|
|
|
+ if ((conn->ctx->callbacks.websocket_data != NULL &&
|
|
|
|
+#ifdef USE_LUA
|
|
|
|
+ (conn->lua_websocket_state == NULL) &&
|
|
|
|
+#endif
|
|
|
|
+ !conn->ctx->callbacks.websocket_data(conn, mop, data, data_len)) ||
|
|
|
|
+#ifdef USE_LUA
|
|
|
|
+ (conn->lua_websocket_state &&
|
|
|
|
+ !lua_websocket_data(conn, conn->lua_websocket_state, mop, data, data_len)) ||
|
|
|
|
+#endif
|
|
|
|
+ (mop & 0xf) == WEBSOCKET_OPCODE_CONNECTION_CLOSE) { /* Opcode == 8, connection close */
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (data != mem) {
|
|
|
|
+ mg_free(data);
|
|
|
|
+ }
|
|
|
|
+ /* Not breaking the loop, process next websocket frame. */
|
|
|
|
+ } else {
|
|
|
|
+ /* Read from the socket into the next available location in the
|
|
|
|
+ message queue. */
|
|
|
|
+ if ((n = pull(NULL, conn, conn->buf + conn->data_len,
|
|
|
|
+ conn->buf_size - conn->data_len)) <= 0) {
|
|
|
|
+ /* Error, no bytes read */
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ conn->data_len += n;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
int mg_websocket_write(struct mg_connection* conn, int opcode, const char* data, size_t dataLen)
|
|
int mg_websocket_write(struct mg_connection* conn, int opcode, const char* data, size_t dataLen)
|
|
{
|
|
{
|
|
unsigned char header[10];
|
|
unsigned char header[10];
|
|
@@ -6434,6 +6580,58 @@ struct mg_connection *mg_download(const char *host, int port, int use_ssl,
|
|
return conn;
|
|
return conn;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+static void* websocket_client_thread(void *data)
|
|
|
|
+{
|
|
|
|
+ struct mg_connection* conn = (struct mg_connection*)data;
|
|
|
|
+ read_client_websocket(conn);
|
|
|
|
+
|
|
|
|
+ DEBUG_TRACE("Websocket client thread exited\n");
|
|
|
|
+
|
|
|
|
+ return NULL;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+struct mg_connection *mg_client_websocket_connect(const char *host, int port, int use_ssl,
|
|
|
|
+ char *error_buffer, size_t error_buffer_size,
|
|
|
|
+ const char *path, const char *origin, websocket_data_func data_func)
|
|
|
|
+{
|
|
|
|
+ static const char *magic = "x3JJHMbDL1EzLkh9GBhXDw==";
|
|
|
|
+
|
|
|
|
+ //Establish the client connection and request upgrade
|
|
|
|
+ struct mg_connection* conn = mg_download(host, port, use_ssl,
|
|
|
|
+ error_buffer, error_buffer_size,
|
|
|
|
+ "GET %s HTTP/1.1\r\n"
|
|
|
|
+ "Host: %s\r\n"
|
|
|
|
+ "Upgrade: websocket\r\n"
|
|
|
|
+ "Connection: Upgrade\r\n"
|
|
|
|
+ "Sec-WebSocket-Key: %s\r\n"
|
|
|
|
+ "Sec-WebSocket-Version: 13\r\n"
|
|
|
|
+ "Origin: %s\r\n"
|
|
|
|
+ "\r\n", path, host, magic, origin);
|
|
|
|
+
|
|
|
|
+ //Connection object will be null if something goes wrong
|
|
|
|
+ if(conn == NULL)
|
|
|
|
+ {
|
|
|
|
+ DEBUG_TRACE("Websocket client connect error: %s\r\n", error_buffer);
|
|
|
|
+ return conn;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //For client connections, mg_context is fake. Set the callback for websocket
|
|
|
|
+ //data manually here so that read_client_websocket will automatically call it
|
|
|
|
+ conn->ctx->callbacks.websocket_data = data_func;
|
|
|
|
+
|
|
|
|
+ //Start a thread to read the websocket client connection
|
|
|
|
+ //This thread will automatically stop when mg_disconnect is
|
|
|
|
+ //called on the client connection
|
|
|
|
+ if(mg_start_thread(websocket_client_thread, (void*)conn) != 0)
|
|
|
|
+ {
|
|
|
|
+ mg_free((void*)conn);
|
|
|
|
+ conn = NULL;
|
|
|
|
+ DEBUG_TRACE("Websocket client connect thread could not be started\r\n");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return conn;
|
|
|
|
+}
|
|
|
|
+
|
|
static void process_new_connection(struct mg_connection *conn)
|
|
static void process_new_connection(struct mg_connection *conn)
|
|
{
|
|
{
|
|
struct mg_request_info *ri = &conn->request_info;
|
|
struct mg_request_info *ri = &conn->request_info;
|