diff --git a/src/mod/xml_int/mod_xml_rpc/Makefile b/src/mod/xml_int/mod_xml_rpc/Makefile index cfc0276204..f0585dc324 100644 --- a/src/mod/xml_int/mod_xml_rpc/Makefile +++ b/src/mod/xml_int/mod_xml_rpc/Makefile @@ -60,10 +60,12 @@ $(XMLRPC_DIR)/src/xmlrpc_server_abyss.o\ $(XMLRPC_DIR)/src/xmlrpc_server_cgi.o\ $(XMLRPC_DIR)/src/xmlrpc_string.o\ $(XMLRPC_DIR)/src/xmlrpc_struct.o\ -$(XMLRPC_DIR)/lib/expat/xmltok/xmltok.o +$(XMLRPC_DIR)/lib/expat/xmltok/xmltok.o\ +ws.o LOCAL_CFLAGS = -w -I$(XMLRPC_DIR)/lib/expat/xmlparse -I$(XMLRPC_DIR)/lib/expat/xmltok -I$(XMLRPC_DIR) -I$(XMLRPC_DIR)/include LOCAL_CFLAGS+= -I$(XMLRPC_DIR)/lib/abyss/src -I$(XMLRPC_DIR)/lib/util/include -D_THREAD -D__EXTENSIONS__ +LOCAL_CFLAGS+= -I. -I../../../../libs/sofia-sip/libsofia-sip-ua/su include $(BASE)/build/modmake.rules diff --git a/src/mod/xml_int/mod_xml_rpc/mod_xml_rpc.c b/src/mod/xml_int/mod_xml_rpc/mod_xml_rpc.c index d25009f8fb..9b791b6c53 100644 --- a/src/mod/xml_int/mod_xml_rpc/mod_xml_rpc.c +++ b/src/mod/xml_int/mod_xml_rpc/mod_xml_rpc.c @@ -26,6 +26,7 @@ * Anthony Minessale II * John Wehle * Garmt Boekholt + * Seven Du * * mod_xml_rpc.c -- XML RPC * @@ -69,6 +70,7 @@ #include <../lib/abyss/src/token.h> #include <../lib/abyss/src/http.h> #include <../lib/abyss/src/session.h> +#include "ws.h" SWITCH_MODULE_LOAD_FUNCTION(mod_xml_rpc_load); SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_xml_rpc_shutdown); @@ -87,6 +89,7 @@ static struct { switch_bool_t virtual_host; TServer abyssServer; xmlrpc_registry *registryP; + switch_bool_t enable_websocket; } globals; SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_global_realm, globals.realm); @@ -126,6 +129,8 @@ static switch_status_t do_config(void) default_domain = val; } else if (!strcasecmp(var, "virtual-host")) { globals.virtual_host = switch_true(val); + } else if (!strcasecmp(var, "enable-websocket")) { + globals.enable_websocket = switch_true(val); } } } @@ -541,11 +546,160 @@ static abyss_bool http_directory_auth(TSession *r, char *domain_name) return rval; } +void stop_hook_event_handler(switch_event_t *event) { + char *json; + wsh_t *wsh = (TSession *)event->bind_user_data; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "got websocket::stophook, closing\n"); + wsh->down++; +} + +void event_handler(switch_event_t *event) { + char *json; + wsh_t *wsh = (TSession *)event->bind_user_data; + switch_event_serialize_json(event, &json); + ws_write_frame(wsh, WSOC_TEXT, json, strlen(json)); + free(json); +} + +#define MAX_EVENT_BIND_SLOTS SWITCH_EVENT_ALL + +abyss_bool websocket_hook(TSession *r) +{ + wsh_t wsh; + int ret; + int i; + ws_opcode_t opcode; + uint8_t *data; + switch_event_node_t *nodes[MAX_EVENT_BIND_SLOTS]; + int node_count = 0; + char *p; + char *key = TableFind(&r->requestHeaderFields, "sec-websocket-key"); + char *version = TableFind(&r->requestHeaderFields, "sec-websocket-version"); + char *proto = TableFind(&r->requestHeaderFields, "sec-websocket-protocol"); + char *upgrade = TableFind(&r->requestHeaderFields, "connection"); + + if (!key || !version || !proto || !upgrade) return FALSE; + if (strncasecmp(upgrade, "Upgrade", 7) || strncasecmp(proto, "websocket", 9)) return FALSE; + + for (i = 0; i < r->requestHeaderFields.size; ++i) { + TTableItem * const fieldP = &r->requestHeaderFields.item[i]; + const char * const fieldValue = fieldP->value; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "headers %s: %s\n", fieldP->name, fieldValue); + } + + ret = ws_init(&wsh, r, NULL, 0); + if (ret != 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "websocket error %d\n", ret); + return FALSE; + } + + while(!wsh.down && !wsh.handshake) { + ret = ws_handshake_kvp(&wsh, key, version, proto); + if (ret < 0) wsh.down = 1; + } + + if (ret != 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "handshake error %d\n", ret); + return FALSE; + } + + if (switch_event_bind_removable("websocket", SWITCH_EVENT_CUSTOM, "websocket::stophook", stop_hook_event_handler, &wsh, &nodes[node_count++]) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't bind!\n"); + node_count--; + } + + while (!wsh.down) { + int bytes = ws_read_frame(&wsh, &opcode, &data); + + if (bytes < 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%d %s\n", opcode, (char *)data); + switch_yield(1000); + continue; + } + + switch (opcode) { + case WSOC_CLOSE: + ws_close(&wsh, 1000); + break; + case WSOC_CONTINUATION: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "continue\n"); + continue; + case WSOC_TEXT: + p = data; + if (!p) continue; + if (!strncasecmp(data, "event ", 6)) { + switch_event_types_t type; + char *subclass; + + if (node_count == MAX_EVENT_BIND_SLOTS - 1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "cannot subscribe more than %d events\n", node_count); + continue; + } + p += 6; + if (p = strchr(p, ' ')) p++; + if (!strncasecmp(p, "json ", 5)) { + p += 5; + } else if (!strncasecmp(p, "xml ", 4)) { + p += 4; + } else if (!strncasecmp(p, "plain ", 6)) { + p += 6; + } + if (!*p) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing event type in [%s]\n", data); + break; + } else { + } + if (subclass = strchr(p, ' ')) { + *subclass++ = '\0'; + if (!*subclass) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "missing subclass\n"); + continue; + } + } else { + subclass = SWITCH_EVENT_SUBCLASS_ANY; + } + + if (switch_name_event(p, &type) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown event %s\n", p); + continue; + } + + if (switch_event_bind_removable("websocket", type, subclass, event_handler, &wsh, &nodes[node_count++]) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't bind!\n"); + node_count--; + continue; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Bind %s\n", data); + } + + } + break; + default: + break; + } + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "wsh.down = %d, node_count = %d\n", wsh.down, node_count); + + switch_yield(2000); + while (--node_count >= 0) switch_event_unbind(&nodes[node_count]); + + return FALSE; +} + abyss_bool auth_hook(TSession * r) { char *domain_name, *e; abyss_bool ret = FALSE; + if (globals.enable_websocket && !strncmp(r->requestInfo.uri, "/socket", 7)) { + // Chrome has no Authorization support yet + // https://code.google.com/p/chromium/issues/detail?id=123862 + return websocket_hook(r); + } + if (!strncmp(r->requestInfo.uri, "/domains/", 9)) { domain_name = strdup(r->requestInfo.uri + 9); switch_assert(domain_name); @@ -1059,7 +1213,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_xml_rpc_runtime) ServerAddHandler(&globals.abyssServer, auth_hook); ServerSetKeepaliveTimeout(&globals.abyssServer, 5); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Starting HTTP Port %d, DocRoot [%s]\n", globals.port, SWITCH_GLOBAL_dirs.htdocs_dir); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Starting HTTP Port %d, DocRoot [%s]%s\n", + globals.port, SWITCH_GLOBAL_dirs.htdocs_dir, globals.enable_websocket ? " with websocket." : ""); ServerRun(&globals.abyssServer); switch_yield(1000000); @@ -1069,10 +1224,28 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_xml_rpc_runtime) return SWITCH_STATUS_TERM; } +void stop_all_websockets() +{ + switch_event_t *event; + if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, "websocket::stophook") != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG,SWITCH_LOG_ERROR, "Failed to create event!\n"); + } + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "stop", "now"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "stopping all websockets ...\n"); + if (switch_event_fire(&event) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG,SWITCH_LOG_ERROR, "Failed to fire the event!\n"); + switch_event_destroy(&event); + return false; + } +} + /* upon module unload */ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_xml_rpc_shutdown) { + /* Cann't find a way to stop the websockets, use this for a workaround before finding the real one that works */ + stop_all_websockets(); + /* this makes the worker thread (ServerRun) stop */ ServerTerminate(&globals.abyssServer); diff --git a/src/mod/xml_int/mod_xml_rpc/ws.c b/src/mod/xml_int/mod_xml_rpc/ws.c index 35fb4c0f22..1ef76e3cc9 100644 --- a/src/mod/xml_int/mod_xml_rpc/ws.c +++ b/src/mod/xml_int/mod_xml_rpc/ws.c @@ -218,11 +218,8 @@ static void sha1_digest(unsigned char *digest, char *in) #endif -int ws_handshake(wsh_t *wsh) +int ws_handshake_kvp(wsh_t *wsh, char *key, char *version, char *proto) { - char key[256] = ""; - char version[5] = ""; - char proto[256] = ""; char uri[256] = ""; char input[256] = ""; unsigned char output[SHA1_HASH_SIZE] = ""; @@ -231,44 +228,14 @@ int ws_handshake(wsh_t *wsh) issize_t bytes; char *p, *e = 0; - if (wsh->sock == ws_sock_invalid) { + if (!wsh->tsession) { return -3; } - while((bytes = ws_raw_read(wsh, wsh->buffer + wsh->datalen, wsh->buflen - wsh->datalen)) > 0) { - wsh->datalen += bytes; - if (strstr(wsh->buffer, "\r\n\r\n") || strstr(wsh->buffer, "\n\n")) { - break; - } - } - - if (bytes > sizeof(wsh->buffer)) { + if (!*key || !*version || !*proto) { goto err; } - *(wsh->buffer+bytes) = '\0'; - - if (strncasecmp(wsh->buffer, "GET ", 4)) { - goto err; - } - - p = wsh->buffer + 4; - - e = strchr(p, ' '); - if (!e) { - goto err; - } - - strncpy(uri, p, e-p); - - cheezy_get_var(wsh->buffer, "Sec-WebSocket-Key", key, sizeof(key)); - cheezy_get_var(wsh->buffer, "Sec-WebSocket-Version", version, sizeof(version)); - cheezy_get_var(wsh->buffer, "Sec-WebSocket-Protocol", proto, sizeof(proto)); - - if (!*key) { - goto err; - } - snprintf(input, sizeof(input), "%s%s", key, WEBSOCKET_GUID); sha1_digest(output, input); b64encode((unsigned char *)output, SHA1_HASH_SIZE, (unsigned char *)b64, sizeof(b64)); @@ -282,7 +249,6 @@ int ws_handshake(wsh_t *wsh) b64, proto); - ws_raw_write(wsh, respond, strlen(respond)); wsh->handshake = 1; @@ -308,7 +274,9 @@ issize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes) { issize_t r; int x = 0; + TConn *conn = wsh->tsession->connP; +#if 0 if (wsh->ssl) { do { r = SSL_read(wsh->ssl, data, bytes); @@ -321,21 +289,50 @@ issize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes) return r; } - - do { - r = recv(wsh->sock, data, bytes, 0); -#ifndef _MSC_VER - if (x++) usleep(10000); -#else - if (x++) Sleep(10); #endif - } while (r == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK || - errno == 35 || errno == 730035 || errno == 2 || errno == 60) && x < 100); - - if (x >= 100) { - r = -1; + + if (!wsh->handshake) { + r = wsh->tsession->connP->buffersize; + memcpy(data, conn->buffer.b, r); + printf("%s\n", conn->buffer.t); + ConnReadInit(conn); + return r; + } else { + const char *readError = NULL; + + // printf(" pos=%d size=%d need=%d\n", conn->bufferpos, conn->buffersize, bytes); + + r = conn->buffersize - conn->bufferpos; + + if (r < 0) { + printf("348 Read Error %d!\n", r); + return 0; + } else if (r == 0) { + ConnRead(conn, 2, NULL, NULL, &readError); + + if (readError) { + // printf("354 Read Error %s\n", readError); + xmlrpc_strfree(readError); + return 0; + } + + r = conn->buffersize - conn->bufferpos; + } + + if (r <= bytes) { + memcpy(data, conn->buffer.b + conn->bufferpos, r); + // ConnReadInit(conn); + conn->bufferpos = conn->buffersize; + ConnReadInit(conn); + return r; + } else { + memcpy(data, conn->buffer.b + conn->bufferpos, bytes); + conn->bufferpos += bytes; + return bytes; + } + } - + return r; } @@ -351,9 +348,11 @@ issize_t ws_raw_write(wsh_t *wsh, void *data, size_t bytes) return r; } - do { - r = send(wsh->sock, data, bytes, 0); - } while (r == -1 && (errno == EAGAIN || errno == EINTR)); + if (ConnWrite(wsh->tsession->connP, data, bytes)) { + return bytes; + } else { + return 0; + } //if (r<0) { //printf("wRITE FAIL: %s\n", strerror(errno)); @@ -408,11 +407,10 @@ static int restore_socket(ws_socket_t sock) #endif - -int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock) +int ws_init(wsh_t *wsh, ws_tsession_t *tsession, SSL_CTX *ssl_ctx, int close_sock) { memset(wsh, 0, sizeof(*wsh)); - wsh->sock = sock; + wsh->tsession = tsession; if (!ssl_ctx) { ssl_ctx = globals.ssl_ctx; @@ -425,7 +423,7 @@ int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock) wsh->buflen = sizeof(wsh->buffer); wsh->secure = ssl_ctx ? 1 : 0; - setup_socket(sock); + // setup_socket(sock); if (wsh->secure) { int code; @@ -466,6 +464,7 @@ int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock) } +/* while (!wsh->down && !wsh->handshake) { int r = ws_handshake(wsh); @@ -474,6 +473,7 @@ int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock) return -1; } } +*/ if (wsh->down) { return -1; @@ -560,8 +560,10 @@ issize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data) } if ((wsh->datalen = ws_raw_read(wsh, wsh->buffer, 14)) < need) { - if ((wsh->datalen += ws_raw_read(wsh, wsh->buffer + wsh->datalen, 14 - wsh->datalen)) < need) { - /* too small - protocol err */ + while (!wsh->down && (wsh->datalen += ws_raw_read(wsh, wsh->buffer + wsh->datalen, 14 - wsh->datalen)) < need) ; + + if (0 && (wsh->datalen += ws_raw_read(wsh, wsh->buffer + wsh->datalen, 14 - wsh->datalen)) < need) { + /* too small - protocol err */ return ws_close(wsh, WS_PROTO_ERR); } } diff --git a/src/mod/xml_int/mod_xml_rpc/ws.h b/src/mod/xml_int/mod_xml_rpc/ws.h index 81368158b3..06fd3b2594 100644 --- a/src/mod/xml_int/mod_xml_rpc/ws.h +++ b/src/mod/xml_int/mod_xml_rpc/ws.h @@ -25,7 +25,10 @@ //#include "sha1.h" #include #include +#include <../lib/abyss/src/session.h> +#include <../lib/abyss/src/conn.h> +typedef TSession ws_tsession_t; struct globals_s { const SSL_METHOD *ssl_method; @@ -34,7 +37,7 @@ struct globals_s { char key[512]; }; -extern struct globals_s globals; +// extern struct globals_s globals; typedef int ws_socket_t; #define ws_sock_invalid -1 @@ -71,6 +74,7 @@ typedef struct wsh_s { uint8_t down; int secure; uint8_t close_sock; + ws_tsession_t *tsession; } wsh_t; issize_t ws_send_buf(wsh_t *wsh, ws_opcode_t oc); @@ -81,11 +85,12 @@ issize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes); issize_t ws_raw_write(wsh_t *wsh, void *data, size_t bytes); issize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data); issize_t ws_write_frame(wsh_t *wsh, ws_opcode_t oc, void *data, size_t bytes); -int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock); +int ws_init(wsh_t *wsh, ws_tsession_t *tsession, SSL_CTX *ssl_ctx, int close_sock); issize_t ws_close(wsh_t *wsh, int16_t reason); void ws_destroy(wsh_t *wsh); void init_ssl(void); void deinit_ssl(void); +int ws_handshake_kvp(wsh_t *wsh, char *key, char *version, char *proto); #ifndef _MSC_VER