From c15c0924d1e81a5f37755d7971e9cfa0934d6a0c Mon Sep 17 00:00:00 2001 From: Chris Rienzo Date: Mon, 24 Jun 2013 20:50:37 -0400 Subject: [PATCH] mod_rayo: added message delivery threads --- src/mod/event_handlers/mod_rayo/mod_rayo.c | 248 +++++++++++------- src/mod/event_handlers/mod_rayo/mod_rayo.h | 18 +- .../event_handlers/mod_rayo/rayo_components.c | 4 +- .../mod_rayo/rayo_input_component.c | 2 +- .../mod_rayo/rayo_prompt_component.c | 12 +- 5 files changed, 171 insertions(+), 113 deletions(-) diff --git a/src/mod/event_handlers/mod_rayo/mod_rayo.c b/src/mod/event_handlers/mod_rayo/mod_rayo.c index cd94e1e0fb..93f36b9e12 100644 --- a/src/mod/event_handlers/mod_rayo/mod_rayo.c +++ b/src/mod/event_handlers/mod_rayo/mod_rayo.c @@ -199,6 +199,14 @@ static struct { struct rayo_client *console; /** XMPP context */ struct xmpp_stream_context *xmpp_context; + /** number of message threads */ + int num_message_threads; + /** message delivery queue */ + switch_queue_t *msg_queue; + /** shutdown flag */ + int shutdown; + /** prevents context shutdown until all threads are finished */ + switch_thread_rwlock_t *shutdown_rwlock; } globals; /** @@ -213,12 +221,12 @@ struct dial_gateway { int strip; }; -static void rayo_call_send(struct rayo_actor *call, struct rayo_message *msg, const char *file, int line); -static void rayo_server_send(struct rayo_actor *server, struct rayo_message *msg, const char *file, int line); -static void rayo_mixer_send(struct rayo_actor *mixer, struct rayo_message *msg, const char *file, int line); -static void rayo_component_send(struct rayo_actor *component, struct rayo_message *msg, const char *file, int line); -static void rayo_client_send(struct rayo_actor *client, struct rayo_message *msg, const char *file, int line); -static void rayo_console_client_send(struct rayo_actor *client, struct rayo_message *msg, const char *file, int line); +static void rayo_call_send(struct rayo_actor *call, struct rayo_message *msg); +static void rayo_server_send(struct rayo_actor *server, struct rayo_message *msg); +static void rayo_mixer_send(struct rayo_actor *mixer, struct rayo_message *msg); +static void rayo_component_send(struct rayo_actor *component, struct rayo_message *msg); +static void rayo_client_send(struct rayo_actor *client, struct rayo_message *msg); +static void rayo_console_client_send(struct rayo_actor *client, struct rayo_message *msg); static void on_client_presence(struct rayo_client *rclient, iks *node); @@ -546,29 +554,6 @@ rayo_actor_xmpp_handler rayo_actor_event_handler_find(struct rayo_actor *actor, return NULL; } -/** - * Create a new xml message for delivery to an actor. - * @param from actor creating this message - * @param xml to create message from. This value will be freed upon message delivery. - * @param dup true if duplicate payload - * @param reply true if a reply (error or result) - * @return the message - */ -struct rayo_message *rayo_message_create(struct rayo_actor *from, iks *xml, int dup, int reply) -{ - struct rayo_message *msg = malloc(sizeof(*msg)); - if (dup) { - msg->payload = iks_copy(xml); - } else { - msg->payload = xml; - } - msg->is_reply = reply; - msg->from_jid = strdup(RAYO_JID(from)); - msg->from_type = strdup(zstr(from->type) ? "" : from->type); - msg->from_subtype = strdup(zstr(from->subtype) ? "" : from->subtype); - return msg; -} - /** * Clean up a message * @param msg to destroy @@ -579,9 +564,11 @@ void rayo_message_destroy(struct rayo_message *msg) if (msg->payload) { iks_delete(msg->payload); } + switch_safe_free(msg->to_jid); switch_safe_free(msg->from_jid); switch_safe_free(msg->from_type); switch_safe_free(msg->from_subtype); + switch_safe_free(msg->file); free(msg); } } @@ -597,37 +584,91 @@ iks *rayo_message_remove_payload(struct rayo_message *msg) } /** - * Deliver message to actor + * Thread that delivers internal XMPP messages + * @param thread this thread + * @param obj unused + * @return NULL */ -static void rayo_actor_deliver(struct rayo_actor *actor, struct rayo_message *msg, const char *file, int line) +static void *SWITCH_THREAD_FUNC deliver_message_thread(switch_thread_t *thread, void *obj) { - iks *payload = msg->payload; - char *msg_str = iks_string(iks_stack(payload), payload); + struct rayo_message *msg = NULL; + switch_thread_rwlock_rdlock(globals.shutdown_rwlock); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "New message delivery thread\n"); + while (!globals.shutdown) { + if (switch_queue_pop(globals.msg_queue, (void *)&msg) == SWITCH_STATUS_SUCCESS) { + struct rayo_actor *actor = RAYO_LOCATE(msg->to_jid); + if (actor) { + switch_mutex_lock(actor->mutex); + actor->send_fn(actor, msg); + switch_mutex_unlock(actor->mutex); + RAYO_UNLOCK(actor); + rayo_message_destroy(msg); + } + } + } - switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, "", line, "", SWITCH_LOG_DEBUG, "%s, %s\n", msg->from_jid, msg_str); - switch_mutex_lock(actor->mutex); - actor->send_fn(actor, msg, file, line); - switch_mutex_unlock(actor->mutex); + /* clean up remaining messages */ + while(switch_queue_trypop(globals.msg_queue, (void *)&msg) == SWITCH_STATUS_SUCCESS) { + rayo_message_destroy(msg); + } - rayo_message_destroy(msg); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Message delivery thread finished\n"); + switch_thread_rwlock_unlock(globals.shutdown_rwlock); + return NULL; +} + +/** + * Create a new message thread + * @param pool to use + */ +static void start_deliver_message_thread(switch_memory_pool_t *pool) +{ + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + switch_threadattr_create(&thd_attr, pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&thread, thd_attr, deliver_message_thread, NULL, pool); +} + +/** + * Stop all message threads + */ +static void stop_deliver_message_threads(void) +{ + globals.shutdown = 1; + switch_queue_interrupt_all(globals.msg_queue); + switch_thread_rwlock_wrlock(globals.shutdown_rwlock); } /** * Send message to actor addressed by JID + * @param from actor sending the message + * @param to destination JID + * @param payload the message payload to deliver + * @param dup true if payload is to be copied + * @param reply true if a reply + * @param file file name + * @param line line number */ -void rayo_actor_send(const char *jid, struct rayo_message *msg, const char *file, int line) +void rayo_message_send(struct rayo_actor *from, const char *to, iks *payload, int dup, int reply, const char *file, int line) { - struct rayo_actor *actor = RAYO_LOCATE(jid); - if (actor) { - /* TODO queue to thread pool */ - rayo_actor_deliver(actor, msg, file, line); - RAYO_UNLOCK(actor); + struct rayo_message *msg = malloc(sizeof(*msg)); + if (dup) { + msg->payload = iks_copy(payload); } else { - switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, "", line, "", SWITCH_LOG_DEBUG, "%s, failed to locate %s.\n", msg->from_jid, jid); - if (!msg->is_reply) { - /* don't reply to replies to prevent infinite loops */ - RAYO_SEND(msg->from_jid, RAYO_REPLY_CREATE(globals.server, iks_new_error(msg->payload, STANZA_ERROR_ITEM_NOT_FOUND))); - } + msg->payload = payload; + } + msg->is_reply = reply; + msg->to_jid = strdup(zstr(to) ? "" : to); + msg->from_jid = strdup(RAYO_JID(from)); + msg->from_type = strdup(zstr(from->type) ? "" : from->type); + msg->from_subtype = strdup(zstr(from->subtype) ? "" : from->subtype); + msg->file = strdup(file); + msg->line = line; + + if (switch_queue_trypush(globals.msg_queue, msg) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "failed to queue message!\n"); rayo_message_destroy(msg); } } @@ -694,15 +735,11 @@ void rayo_actor_destroy(struct rayo_actor *actor, const char *file, int line) } actor->destroy = 1; if (actor->ref_count <= 0) { - struct rayo_message *msg; switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, "", line, "", SWITCH_LOG_DEBUG, "Destroying %s\n", RAYO_JID(actor)); if (actor->cleanup_fn) { actor->cleanup_fn(actor); } switch_core_hash_delete(globals.destroy_actors, RAYO_JID(actor)); - while (switch_queue_trypop(actor->msg_queue, (void *)&msg) == SWITCH_STATUS_SUCCESS) { - rayo_message_destroy(msg); - } switch_core_destroy_memory_pool(&pool); } else { switch_core_hash_insert(globals.destroy_actors, RAYO_JID(actor), actor); @@ -834,14 +871,14 @@ static void rayo_call_cleanup(struct rayo_actor *actor) switch_assert(client_jid); iks_insert_attrib(revent, "to", client_jid); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rayo_call_get_uuid(call)), SWITCH_LOG_DEBUG, "Sending to offered client %s\n", client_jid); - RAYO_SEND(client_jid, RAYO_MESSAGE_CREATE_DUP(actor, revent)); + RAYO_SEND_MESSAGE_DUP(actor, client_jid, revent); no_offered_clients = 0; } if (no_offered_clients) { /* send to DCP only */ switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rayo_call_get_uuid(call)), SWITCH_LOG_DEBUG, "Sending to DCP %s\n", rayo_call_get_dcp_jid(call)); - RAYO_SEND(rayo_call_get_dcp_jid(call), RAYO_MESSAGE_CREATE_DUP(actor, revent)); + RAYO_SEND_MESSAGE_DUP(actor, rayo_call_get_dcp_jid(call), revent); } iks_delete(revent); @@ -886,9 +923,9 @@ static struct rayo_mixer *rayo_mixer_locate(const char *mixer_name, const char * /** * Default message handler - drops messages */ -void rayo_actor_send_ignore(struct rayo_actor *to, struct rayo_message *msg, const char *file, int line) +void rayo_actor_send_ignore(struct rayo_actor *to, struct rayo_message *msg) { - switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, "", line, "", SWITCH_LOG_WARNING, "%s, dropping unexpected message to %s.\n", msg->from_jid, RAYO_JID(to)); + switch_log_printf(SWITCH_CHANNEL_ID_LOG, msg->file, "", msg->line, "", SWITCH_LOG_WARNING, "%s, dropping unexpected message to %s.\n", msg->from_jid, RAYO_JID(to)); } #define RAYO_ACTOR_INIT(actor, pool, type, subtype, id, jid, cleanup, send) rayo_actor_init(actor, pool, type, subtype, id, jid, cleanup, send, __FILE__, __LINE__) @@ -939,7 +976,6 @@ static struct rayo_actor *rayo_actor_init(struct rayo_actor *actor, switch_memor } else { actor->send_fn = send; } - switch_queue_create(&actor->msg_queue, 20, pool); /* add to hash of actors, so commands can route to call */ switch_mutex_lock(globals.actors_mutex); @@ -1067,7 +1103,7 @@ struct rayo_component *_rayo_component_init(struct rayo_component *component, sw /** * Send XMPP message to client */ -void rayo_client_send(struct rayo_actor *client, struct rayo_message *msg, const char *file, int line) +void rayo_client_send(struct rayo_actor *client, struct rayo_message *msg) { xmpp_stream_context_send(globals.xmpp_context, RAYO_CLIENT(client)->route, msg->payload); } @@ -1146,7 +1182,7 @@ static struct rayo_client *rayo_client_create(const char *jid, const char *route /** * Send XMPP message to peer server */ -void rayo_peer_server_send(struct rayo_actor *server, struct rayo_message *msg, const char *file, int line) +void rayo_peer_server_send(struct rayo_actor *server, struct rayo_message *msg) { xmpp_stream_context_send(globals.xmpp_context, RAYO_JID(server), msg->payload); } @@ -1307,7 +1343,7 @@ static iks *rayo_component_command_ok(struct rayo_component *component, struct r /** * Handle server message */ -void rayo_server_send(struct rayo_actor *server, struct rayo_message *msg, const char *file, int line) +void rayo_server_send(struct rayo_actor *server, struct rayo_message *msg) { iks *response = NULL; rayo_actor_xmpp_handler handler = NULL; @@ -1329,7 +1365,7 @@ void rayo_server_send(struct rayo_actor *server, struct rayo_message *msg, const handler = rayo_actor_command_handler_find(server, msg); if (!handler) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, no handler function for command to %s\n", msg->from_jid, RAYO_JID(server)); - RAYO_SEND(msg->from_jid, RAYO_REPLY_CREATE(server, iks_new_error(iq, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED))); + RAYO_SEND_REPLY(server, msg->from_jid, iks_new_error(iq, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED)); return; } @@ -1339,14 +1375,14 @@ void rayo_server_send(struct rayo_actor *server, struct rayo_message *msg, const } if (response) { - RAYO_SEND(msg->from_jid, RAYO_REPLY_CREATE(server, response)); + RAYO_SEND_REPLY(server, msg->from_jid, response); } } /** * Handle call message */ -void rayo_call_send(struct rayo_actor *call, struct rayo_message *msg, const char *file, int line) +void rayo_call_send(struct rayo_actor *call, struct rayo_message *msg) { rayo_actor_xmpp_handler handler = NULL; iks *iq = msg->payload; @@ -1357,7 +1393,7 @@ void rayo_call_send(struct rayo_actor *call, struct rayo_message *msg, const cha handler = rayo_actor_command_handler_find(call, msg); if (!handler) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, no handler function for command\n", RAYO_JID(call)); - RAYO_SEND(msg->from_jid, RAYO_REPLY_CREATE(call, iks_new_error(iq, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED))); + RAYO_SEND_REPLY(call, msg->from_jid, iks_new_error(iq, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED)); return; } @@ -1365,7 +1401,7 @@ void rayo_call_send(struct rayo_actor *call, struct rayo_message *msg, const cha session = switch_core_session_locate(rayo_call_get_uuid(RAYO_CALL(call))); if (!session) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, session not found\n", RAYO_JID(call)); - RAYO_SEND(msg->from_jid, RAYO_REPLY_CREATE(call, iks_new_error(iq, STANZA_ERROR_SERVICE_UNAVAILABLE))); + RAYO_SEND_REPLY(call, msg->from_jid, iks_new_error(iq, STANZA_ERROR_SERVICE_UNAVAILABLE)); return; } @@ -1379,14 +1415,14 @@ void rayo_call_send(struct rayo_actor *call, struct rayo_message *msg, const cha switch_core_session_rwunlock(session); if (response) { - RAYO_SEND(msg->from_jid, RAYO_REPLY_CREATE(call, response)); + RAYO_SEND_REPLY(call, msg->from_jid, response); } } /** * Handle mixer message */ -void rayo_mixer_send(struct rayo_actor *mixer, struct rayo_message *msg, const char *file, int line) +void rayo_mixer_send(struct rayo_actor *mixer, struct rayo_message *msg) { rayo_actor_xmpp_handler handler = NULL; iks *iq = msg->payload; @@ -1396,21 +1432,21 @@ void rayo_mixer_send(struct rayo_actor *mixer, struct rayo_message *msg, const c handler = rayo_actor_command_handler_find(mixer, msg); if (!handler) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, no handler function for command\n", RAYO_JID(mixer)); - RAYO_SEND(msg->from_jid, RAYO_REPLY_CREATE(mixer, iks_new_error(iq, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED))); + RAYO_SEND_REPLY(mixer, msg->from_jid, iks_new_error(iq, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED)); return; } /* execute the command */ response = handler(mixer, msg, NULL); if (response) { - RAYO_SEND(msg->from_jid, RAYO_REPLY_CREATE(mixer, response)); + RAYO_SEND_REPLY(mixer, msg->from_jid, response); } } /** * Handle mixer message */ -void rayo_component_send(struct rayo_actor *component, struct rayo_message *msg, const char *file, int line) +void rayo_component_send(struct rayo_actor *component, struct rayo_message *msg) { rayo_actor_xmpp_handler handler = NULL; iks *xml_msg = msg->payload; @@ -1421,7 +1457,7 @@ void rayo_component_send(struct rayo_actor *component, struct rayo_message *msg, handler = rayo_actor_command_handler_find(component, msg); if (!handler) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, no component handler function for command\n", RAYO_JID(component)); - RAYO_SEND(msg->from_jid, RAYO_REPLY_CREATE(component, iks_new_error(xml_msg, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED))); + RAYO_SEND_REPLY(component, msg->from_jid, iks_new_error(xml_msg, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED)); return; } @@ -1433,7 +1469,7 @@ void rayo_component_send(struct rayo_actor *component, struct rayo_message *msg, } if (response) { - RAYO_SEND(msg->from_jid, RAYO_REPLY_CREATE(component, response)); + RAYO_SEND_REPLY(component, msg->from_jid, response); return; } } else if (!strcmp("presence", iks_name(xml_msg))) { @@ -1448,7 +1484,7 @@ void rayo_component_send(struct rayo_actor *component, struct rayo_message *msg, switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, forwarding event\n", RAYO_JID(component)); response = handler(component, msg, NULL); if (response) { - RAYO_SEND(msg->from_jid, RAYO_REPLY_CREATE(component, response)); + RAYO_SEND_REPLY(component, msg->from_jid, response); } } } @@ -1970,7 +2006,7 @@ done: /* response when error */ if (response) { /* send response to client */ - RAYO_SEND(iks_find_attrib(response, "to"), RAYO_REPLY_CREATE(call, response)); + RAYO_SEND_REPLY(call, iks_find_attrib(response, "to"), response); /* destroy call */ if (call) { @@ -2016,7 +2052,7 @@ static iks *on_rayo_dial(struct rayo_actor *server, struct rayo_message *msg, vo struct dial_thread_data *dtdata = NULL; switch_core_new_memory_pool(&pool); dtdata = switch_core_alloc(pool, sizeof(*dtdata)); - dtdata->pool = pool; + dtdata->pool = pool; dtdata->node = iks_copy(node); iks_insert_attrib(dtdata->node, "from", msg->from_jid); /* save DCP jid in case it isn't specified */ @@ -2107,7 +2143,7 @@ static void on_client_message(struct rayo_client *rclient, iks *message) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, recv message, availability = %s\n", RAYO_JID(rclient), presence_status_to_string(rclient->availability)); - RAYO_SEND(to, RAYO_MESSAGE_CREATE_DUP(rclient, message)); + RAYO_SEND_MESSAGE_DUP(rclient, to, message); } /** @@ -2203,9 +2239,9 @@ static void rayo_client_command_recv(struct rayo_client *rclient, iks *iq) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, recv iq, availability = %s\n", RAYO_JID(rclient), presence_status_to_string(rclient->availability)); if (command) { - RAYO_SEND(to, RAYO_MESSAGE_CREATE_DUP(rclient, iq)); + RAYO_SEND_MESSAGE_DUP(rclient, to, iq); } else { - RAYO_SEND(RAYO_JID(rclient), RAYO_REPLY_CREATE(globals.server, iks_new_error_detailed(iq, STANZA_ERROR_BAD_REQUEST, "empty IQ request"))); + RAYO_SEND_REPLY(globals.server, RAYO_JID(rclient), iks_new_error_detailed(iq, STANZA_ERROR_BAD_REQUEST, "empty IQ request")); } } @@ -2225,7 +2261,7 @@ static void broadcast_mixer_event(struct rayo_mixer *mixer, iks *rayo_event) subscriber = (struct rayo_mixer_subscriber *)val; switch_assert(subscriber); iks_insert_attrib(rayo_event, "to", subscriber->jid); - RAYO_SEND(subscriber->jid, RAYO_MESSAGE_CREATE_DUP(mixer, rayo_event)); + RAYO_SEND_MESSAGE_DUP(mixer, subscriber->jid, rayo_event); } } @@ -2264,7 +2300,7 @@ static void on_mixer_delete_member_event(struct rayo_mixer *mixer, switch_event_ delete_member_event = iks_new_presence("unjoined", RAYO_NS, member->jid, member->dcp_jid); x = iks_find(delete_member_event, "unjoined"); iks_insert_attrib(x, "mixer-name", rayo_mixer_get_name(mixer)); - RAYO_SEND(member->dcp_jid, RAYO_MESSAGE_CREATE(mixer, delete_member_event)); + RAYO_SEND_MESSAGE(mixer, member->dcp_jid, delete_member_event); /* broadcast member unjoined event to subscribers */ delete_member_event = iks_new_presence("unjoined", RAYO_NS, RAYO_JID(mixer), ""); @@ -2338,7 +2374,7 @@ static void on_mixer_add_member_event(struct rayo_mixer *mixer, switch_event_t * add_member_event = iks_new_presence("joined", RAYO_NS, RAYO_JID(call), call->dcp_jid); x = iks_find(add_member_event, "joined"); iks_insert_attrib(x, "mixer-name", rayo_mixer_get_name(mixer)); - RAYO_SEND(call->dcp_jid, RAYO_MESSAGE_CREATE(call, add_member_event)); + RAYO_SEND_MESSAGE(call, call->dcp_jid, add_member_event); RAYO_UNLOCK(call); } @@ -2414,7 +2450,7 @@ static void on_call_originate_event(struct rayo_client *rclient, switch_event_t #else iks_insert_attrib_printf(ref, "uri", "xmpp:%s", RAYO_JID(call)); #endif - RAYO_SEND(RAYO_JID(rclient), RAYO_MESSAGE_CREATE(call, response)); + RAYO_SEND_MESSAGE(call, RAYO_JID(rclient), response); call->dial_id = NULL; } RAYO_UNLOCK(call); @@ -2455,7 +2491,7 @@ static void on_call_answer_event(struct rayo_client *rclient, switch_event_t *ev iks *revent = iks_new_presence("answered", RAYO_NS, switch_event_get_header(event, "variable_rayo_call_jid"), switch_event_get_header(event, "variable_rayo_dcp_jid")); - RAYO_SEND(RAYO_JID(rclient), RAYO_MESSAGE_CREATE(call, revent)); + RAYO_SEND_MESSAGE(call, RAYO_JID(rclient), revent); RAYO_UNLOCK(call); } } @@ -2472,7 +2508,7 @@ static void on_call_ringing_event(struct rayo_client *rclient, switch_event_t *e iks *revent = iks_new_presence("ringing", RAYO_NS, switch_event_get_header(event, "variable_rayo_call_jid"), switch_event_get_header(event, "variable_rayo_dcp_jid")); - RAYO_SEND(RAYO_JID(rclient), RAYO_MESSAGE_CREATE(call, revent)); + RAYO_SEND_MESSAGE(call, RAYO_JID(rclient), revent); RAYO_UNLOCK(call); } } @@ -2499,7 +2535,7 @@ static void on_call_bridge_event(struct rayo_client *rclient, switch_event_t *ev call->joined = 1; - RAYO_SEND(RAYO_JID(rclient), RAYO_MESSAGE_CREATE(call, revent)); + RAYO_SEND_MESSAGE(call, RAYO_JID(rclient), revent); /* send B-leg event */ b_call = RAYO_CALL_LOCATE(b_uuid); @@ -2510,7 +2546,7 @@ static void on_call_bridge_event(struct rayo_client *rclient, switch_event_t *ev b_call->joined = 1; - RAYO_SEND(rayo_call_get_dcp_jid(b_call), RAYO_MESSAGE_CREATE(b_call, revent)); + RAYO_SEND_MESSAGE(b_call, rayo_call_get_dcp_jid(b_call), revent); RAYO_UNLOCK(b_call); } RAYO_UNLOCK(call); @@ -2536,7 +2572,7 @@ static void on_call_unbridge_event(struct rayo_client *rclient, switch_event_t * switch_event_get_header(event, "variable_rayo_dcp_jid")); iks *joined = iks_find(revent, "unjoined"); iks_insert_attrib(joined, "call-uri", b_uuid); - RAYO_SEND(RAYO_JID(rclient), RAYO_MESSAGE_CREATE(call, revent)); + RAYO_SEND_MESSAGE(call, RAYO_JID(rclient), revent); call->joined = 0; @@ -2546,7 +2582,7 @@ static void on_call_unbridge_event(struct rayo_client *rclient, switch_event_t * revent = iks_new_presence("unjoined", RAYO_NS, RAYO_JID(b_call), rayo_call_get_dcp_jid(b_call)); joined = iks_find(revent, "unjoined"); iks_insert_attrib(joined, "call-uri", a_uuid); - RAYO_SEND(rayo_call_get_dcp_jid(b_call), RAYO_MESSAGE_CREATE(b_call, revent)); + RAYO_SEND_MESSAGE(b_call, rayo_call_get_dcp_jid(b_call), revent); b_call->joined = 0; RAYO_UNLOCK(b_call); @@ -2759,7 +2795,7 @@ SWITCH_STANDARD_APP(rayo_app) ok = 1; switch_core_hash_insert(call->pcps, RAYO_JID(rclient), "1"); iks_insert_attrib(offer, "to", RAYO_JID(rclient)); - RAYO_SEND(RAYO_JID(rclient), RAYO_MESSAGE_CREATE_DUP(call, offer)); + RAYO_SEND_MESSAGE_DUP(call, RAYO_JID(rclient), offer); } } iks_delete(offer); @@ -2833,7 +2869,7 @@ static void on_xmpp_stream_ready(struct xmpp_stream *stream) iks_insert_attrib(presence, "to", xmpp_stream_get_jid(stream)); x = iks_insert(presence, "show"); iks_insert_cdata(x, "chat", 4); - RAYO_SEND(xmpp_stream_get_jid(stream), RAYO_MESSAGE_CREATE(globals.server, presence)); + RAYO_SEND_MESSAGE(globals.server, xmpp_stream_get_jid(stream), presence); } } else { /* client belongs to stream */ @@ -2920,6 +2956,7 @@ static switch_status_t do_config(switch_memory_pool_t *pool, const char *config_ /* set defaults */ globals.max_idle_ms = 30000; globals.mixer_conf_profile = "sla"; + globals.num_message_threads = 8; /* get params */ { @@ -2941,6 +2978,13 @@ static switch_status_t do_config(switch_memory_pool_t *pool, const char *config_ if (!zstr(val)) { globals.mixer_conf_profile = switch_core_strdup(pool, val); } + } else if (!strcasecmp(var, "message-threads")) { + if (switch_is_number(val)) { + int num_message_threads = atoi(val); + if (num_message_threads > 0) { + globals.num_message_threads = num_message_threads; + } + } } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unsupported param: %s\n", var); } @@ -3125,7 +3169,7 @@ static int dump_api(const char *cmd, switch_stream_handle_t *stream) /** * Process response to console command_api */ -void rayo_console_client_send(struct rayo_actor *actor, struct rayo_message *msg, const char *file, int line) +void rayo_console_client_send(struct rayo_actor *actor, struct rayo_message *msg) { iks *response = msg->payload; @@ -3236,7 +3280,7 @@ static void send_console_message(struct rayo_client *client, const char *to, con x = iks_insert(message, "body"); iks_insert_cdata(x, message_str, strlen(message_str)); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "\nSEND: to %s, %s\n", to, iks_string(iks_stack(message), message)); - RAYO_SEND(to, RAYO_MESSAGE_CREATE(client, message)); + RAYO_SEND_MESSAGE(client, to, message); } /** @@ -3277,7 +3321,7 @@ static void send_console_presence(struct rayo_client *client, const char *to, in x = iks_insert(presence, "show"); iks_insert_cdata(x, is_online ? "chat" : "dnd", 0); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "\nSEND: to %s, %s\n", to, iks_string(iks_stack(presence), presence)); - RAYO_SEND(to, RAYO_MESSAGE_CREATE(client, presence)); + RAYO_SEND_MESSAGE(client, to, presence); } /** @@ -3454,6 +3498,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rayo_load) switch_mutex_init(&globals.actors_mutex, SWITCH_MUTEX_NESTED, pool); switch_core_hash_init(&globals.dial_gateways, pool); switch_core_hash_init(&globals.cmd_aliases, pool); + switch_thread_rwlock_create(&globals.shutdown_rwlock, pool); + switch_queue_create(&globals.msg_queue, 25000, pool); /* server commands */ rayo_actor_command_handler_add(RAT_SERVER, "", "get:"IKS_NS_XMPP_PING":ping", on_iq_xmpp_ping); @@ -3492,6 +3538,14 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rayo_load) return SWITCH_STATUS_TERM; } + /* start up message threads */ + { + int i; + for (i = 0; i < globals.num_message_threads; i++) { + start_deliver_message_thread(pool); + } + } + /* create admin client */ globals.console = rayo_console_client_create(); @@ -3589,10 +3643,14 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rayo_shutdown) switch_console_del_complete_func("::rayo::list_actors"); switch_console_set_complete("del rayo"); - /* wait for threads to finish */ + /* stop XMPP streams */ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for XMPP threads to stop\n"); xmpp_stream_context_destroy(globals.xmpp_context); + /* stop message threads */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for message threads to stop\n"); + stop_deliver_message_threads(); + rayo_components_shutdown(); /* cleanup module */ diff --git a/src/mod/event_handlers/mod_rayo/mod_rayo.h b/src/mod/event_handlers/mod_rayo/mod_rayo.h index 8c171408d3..d9174b9fc9 100644 --- a/src/mod/event_handlers/mod_rayo/mod_rayo.h +++ b/src/mod/event_handlers/mod_rayo/mod_rayo.h @@ -64,14 +64,17 @@ struct rayo_component; */ struct rayo_message { iks *payload; + char *to_jid; char *from_jid; char *from_type; char *from_subtype; int is_reply; + char *file; + int line; }; typedef void (* rayo_actor_cleanup_fn)(struct rayo_actor *); -typedef void (* rayo_actor_send_fn)(struct rayo_actor *, struct rayo_message *, const char *file, int line); +typedef void (* rayo_actor_send_fn)(struct rayo_actor *, struct rayo_message *); /** * A rayo actor - this is an entity that can be controlled by a rayo client @@ -126,19 +129,17 @@ struct rayo_component { #define RAYO_CALL(x) ((struct rayo_call *)x) #define RAYO_MIXER(x) ((struct rayo_mixer *)x) -extern struct rayo_message *rayo_message_create(struct rayo_actor *from, iks *xml, int dup, int reply); +extern void rayo_message_send(struct rayo_actor *from, const char *to, iks *payload, int dup, int reply, const char *file, int line); extern void rayo_message_destroy(struct rayo_message *msg); extern iks *rayo_message_remove_payload(struct rayo_message *msg); - -#define RAYO_MESSAGE_CREATE(from, msg) rayo_message_create(RAYO_ACTOR(from), msg, 0, 0) -#define RAYO_MESSAGE_CREATE_DUP(from, msg) rayo_message_create(RAYO_ACTOR(from), msg, 1, 0) -#define RAYO_REPLY_CREATE(from, msg) rayo_message_create(RAYO_ACTOR(from), msg, 0, 1) -#define RAYO_REPLY_CREATE_DUP(from, msg) rayo_message_create(RAYO_ACTOR(from), msg, 1, 1) +#define RAYO_SEND_MESSAGE(from, to, payload) rayo_message_send(RAYO_ACTOR(from), to, payload, 0, 0, __FILE__, __LINE__) +#define RAYO_SEND_MESSAGE_DUP(from, to, payload) rayo_message_send(RAYO_ACTOR(from), to, payload, 1, 0, __FILE__, __LINE__) +#define RAYO_SEND_REPLY(from, to, payload) rayo_message_send(RAYO_ACTOR(from), to, payload, 0, 1, __FILE__, __LINE__) +#define RAYO_SEND_REPLY_DUP(from, to, payload) rayo_message_send(RAYO_ACTOR(from), to, payload, 1, 1, __FILE__, __LINE__) extern struct rayo_actor *rayo_actor_locate(const char *jid, const char *file, int line); extern struct rayo_actor *rayo_actor_locate_by_id(const char *id, const char *file, int line); extern int rayo_actor_seq_next(struct rayo_actor *actor); -extern void rayo_actor_send(const char *jid, struct rayo_message *msg, const char *file, int line); extern void rayo_actor_rdlock(struct rayo_actor *actor, const char *file, int line); extern void rayo_actor_unlock(struct rayo_actor *actor, const char *file, int line); extern void rayo_actor_destroy(struct rayo_actor *actor, const char *file, int line); @@ -154,7 +155,6 @@ extern void rayo_actor_destroy(struct rayo_actor *actor, const char *file, int l #define RAYO_UNLOCK(x) rayo_actor_unlock(RAYO_ACTOR(x), __FILE__, __LINE__) #define RAYO_DESTROY(x) rayo_actor_destroy(RAYO_ACTOR(x), __FILE__, __LINE__) #define RAYO_SEQ_NEXT(x) rayo_actor_seq_next(RAYO_ACTOR(x)) -#define RAYO_SEND(to, msg) rayo_actor_send(to, msg, __FILE__, __LINE__) extern const char *rayo_call_get_dcp_jid(struct rayo_call *call); diff --git a/src/mod/event_handlers/mod_rayo/rayo_components.c b/src/mod/event_handlers/mod_rayo/rayo_components.c index 05154358c2..ea50cf067d 100644 --- a/src/mod/event_handlers/mod_rayo/rayo_components.c +++ b/src/mod/event_handlers/mod_rayo/rayo_components.c @@ -63,7 +63,7 @@ void rayo_component_send_start(struct rayo_component *component, iks *iq) #else iks_insert_attrib_printf(ref, "uri", "xmpp:%s", RAYO_JID(component)); #endif - RAYO_SEND(iks_find_attrib(response, "to"), RAYO_REPLY_CREATE(component, response)); + RAYO_SEND_REPLY(component, iks_find_attrib(response, "to"), response); } /** @@ -116,7 +116,7 @@ iks *rayo_component_create_complete_event(struct rayo_component *component, cons */ void rayo_component_send_complete_event(struct rayo_component *component, iks *response) { - RAYO_SEND(iks_find_attrib(response, "to"), RAYO_REPLY_CREATE(component, response)); + RAYO_SEND_REPLY(component, iks_find_attrib(response, "to"), response); RAYO_UNLOCK(component); RAYO_DESTROY(component); } diff --git a/src/mod/event_handlers/mod_rayo/rayo_input_component.c b/src/mod/event_handlers/mod_rayo/rayo_input_component.c index 2d5a4947e2..d9aca853f7 100644 --- a/src/mod/event_handlers/mod_rayo/rayo_input_component.c +++ b/src/mod/event_handlers/mod_rayo/rayo_input_component.c @@ -187,7 +187,7 @@ static void send_barge_event(struct rayo_component *component) iks_insert_attrib(event, "to", component->client_jid); x = iks_insert(event, "start-of-input"); iks_insert_attrib(x, "xmlns", RAYO_INPUT_NS); - RAYO_SEND(component->client_jid, RAYO_REPLY_CREATE(component, event)); + RAYO_SEND_REPLY(component, component->client_jid, event); } /** diff --git a/src/mod/event_handlers/mod_rayo/rayo_prompt_component.c b/src/mod/event_handlers/mod_rayo/rayo_prompt_component.c index ce0805eb29..474251788e 100644 --- a/src/mod/event_handlers/mod_rayo/rayo_prompt_component.c +++ b/src/mod/event_handlers/mod_rayo/rayo_prompt_component.c @@ -99,7 +99,7 @@ static void rayo_component_send_stop(struct rayo_actor *from, const char *to) iks_insert_attrib_printf(stop, "id", "mod_rayo-%d", RAYO_SEQ_NEXT(from)); x = iks_insert(stop, "stop"); iks_insert_attrib(x, "xmlns", RAYO_EXT_NS); - RAYO_SEND(to, RAYO_MESSAGE_CREATE(from, stop)); + RAYO_SEND_MESSAGE(from, to, stop); } /** @@ -118,7 +118,7 @@ static void start_input(struct prompt_component *prompt, int start_timers, int b iks_insert_attrib(input, "start-timers", start_timers ? "true" : "false"); iks_insert_attrib(input, "barge-event", barge_event ? "true" : "false"); iks_insert_node(iq, input); - RAYO_SEND(RAYO_JID(RAYO_COMPONENT(prompt)->parent), RAYO_MESSAGE_CREATE(prompt, iq)); + RAYO_SEND_MESSAGE(prompt, RAYO_JID(RAYO_COMPONENT(prompt)->parent), iq); } /** @@ -134,7 +134,7 @@ static void start_input_timers(struct prompt_component *prompt) iks_insert_attrib_printf(iq, "id", "mod_rayo-%d", RAYO_SEQ_NEXT(prompt)); x = iks_insert(iq, "start-timers"); iks_insert_attrib(x, "xmlns", RAYO_INPUT_NS); - RAYO_SEND(prompt->input_jid, RAYO_MESSAGE_CREATE(prompt, iq)); + RAYO_SEND_MESSAGE(prompt, prompt->input_jid, iq); } /** @@ -317,7 +317,7 @@ static iks *prompt_component_handle_output_error(struct rayo_actor *prompt, stru iks_insert_attrib(iq, "from", RAYO_JID(RAYO_COMPONENT(prompt)->parent)); iks_insert_attrib(iq, "to", RAYO_COMPONENT(prompt)->client_jid); iks_insert_node(iq, iks_copy_within(error, iks_stack(iq))); - RAYO_SEND(RAYO_COMPONENT(prompt)->client_jid, RAYO_REPLY_CREATE(prompt, iq)); + RAYO_SEND_REPLY(prompt, RAYO_COMPONENT(prompt)->client_jid, iq); /* done */ RAYO_UNLOCK(prompt); @@ -516,7 +516,7 @@ static iks *start_call_prompt_component(struct rayo_actor *call, struct rayo_mes iks_insert_attrib(cmd, "type", "set"); output = iks_copy_within(output, iks_stack(cmd)); iks_insert_node(cmd, output); - RAYO_SEND(RAYO_JID(call), RAYO_MESSAGE_CREATE(prompt_component, cmd)); + RAYO_SEND_MESSAGE(prompt_component, RAYO_JID(call), cmd); return NULL; } @@ -583,7 +583,7 @@ static iks *forward_output_component_request(struct rayo_actor *prompt, struct r /* forward request to output component */ iks_insert_attrib(iq, "from", RAYO_JID(prompt)); iks_insert_attrib(iq, "to", RAYO_JID(PROMPT_COMPONENT(prompt)->output_jid)); - RAYO_SEND(PROMPT_COMPONENT(prompt)->output_jid, RAYO_MESSAGE_CREATE_DUP(prompt, iq)); + RAYO_SEND_MESSAGE_DUP(prompt, PROMPT_COMPONENT(prompt)->output_jid, iq); return NULL; } case PCS_START_INPUT_TIMERS: