From 2e8ece4fbaeb060b9443387c4ad940d51afac0a9 Mon Sep 17 00:00:00 2001 From: Andrew Thompson <andrew@hijacked.us> Date: Fri, 27 Aug 2010 21:16:55 -0400 Subject: [PATCH] Patch (with changes) from Micah Warren to add 'setevent' which is a mostly-atomic nixevent ALL + a event subscription --- .../mod_erlang_event/handle_msg.c | 130 ++++++++++++++++++ 1 file changed, 130 insertions(+) diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index 128c5fca52..1a8b64fdb4 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -454,6 +454,132 @@ static switch_status_t handle_msg_session_nixevent(listener_t *listener, erlang_ return SWITCH_STATUS_SUCCESS; } +// Nix's all events, then sets up a listener for the given ones. +// meant to ensure that no events are missed during this common operation. +static switch_status_t handle_msg_setevent(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf) +{ + char atom[MAXATOMLEN]; + + if(arity == 1) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } else { + uint8_t event_list[SWITCH_EVENT_ALL + 1]; + switch_hash_t *event_hash; + uint32_t x = 0; + int custom = 0; + switch_event_types_t type; + int i = 0; + + /* clear any previous event registrations */ + for( x = 0; x <= SWITCH_EVENT_ALL; x++){ + event_list[x] = 0; + } + + /* create new hash */ + switch_core_hash_init(&event_hash, listener->pool); + + if(!switch_test_flag(listener, LFLAG_EVENTS)) { + switch_set_flag_locked(listener, LFLAG_EVENTS); + } + + for(i = 1; i < arity; i++){ + if(!ei_decode_atom(buf->buff, &buf->index, atom)){ + + if(custom){ + switch_core_hash_insert(event_hash, atom, MARKER); + } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) { + if (type == SWITCH_EVENT_ALL) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + break; + } + if (type <= SWITCH_EVENT_ALL) { + event_list[type] = 1; + } + if (type == SWITCH_EVENT_CUSTOM) { + custom++; + } + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom); + } + } + /* update the event subscriptions with the new ones */ + memcpy(listener->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1)); + /* wipe the old hash, and point the pointer at the new one */ + switch_core_hash_destroy(&listener->event_hash); + listener->event_hash = event_hash; + + /* TODO - we should flush any non-matching events from the queue */ + ei_x_encode_atom(rbuf, "ok"); + } + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t handle_msg_session_setevent(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf) +{ + char atom[MAXATOMLEN]; + + if (arity == 1){ + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } else { + session_elem_t *session; + if ((session = find_session_elem_by_pid(listener, &msg->from))) { + uint8_t event_list[SWITCH_EVENT_ALL + 1]; + switch_hash_t *event_hash; + int custom = 0; + int i = 0; + switch_event_types_t type; + uint32_t x = 0; + + /* clear any previous event registrations */ + for (x = 0; x <= SWITCH_EVENT_ALL; x++){ + event_list[x] = 0; + } + + /* create new hash */ + switch_core_hash_init(&event_hash, session->pool); + + for (i = 1; i < arity; i++){ + if (!ei_decode_atom(buf->buff, &buf->index, atom)) { + if (custom) { + switch_core_hash_insert(event_hash, atom, MARKER); + } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) { + if (type == SWITCH_EVENT_ALL) { + ei_x_encode_tuple_header(rbuf, 1); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + break; + } + if (type <= SWITCH_EVENT_ALL) { + event_list[type] = 1; + } + if (type == SWITCH_EVENT_CUSTOM) { + custom++; + } + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str); + } + } + /* update the event subscriptions with the new ones */ + memcpy(session->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1)); + /* wipe the old hash, and point the pointer at the new one */ + switch_core_hash_destroy(&session->event_hash); + session->event_hash = event_hash; + /* TODO - we should flush any non-matching events from the queue */ + ei_x_encode_atom(rbuf, "ok"); + } else { /* no session for this pid */ + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "notlistening"); + } + } + return SWITCH_STATUS_SUCCESS; +} static switch_status_t handle_msg_api(listener_t *listener, erlang_msg * msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf) { @@ -772,6 +898,10 @@ static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg * msg, ret = handle_msg_handlecall(listener, msg, arity, buf, rbuf); } else if (!strncmp(tupletag, "rex", MAXATOMLEN)) { ret = handle_msg_rpcresponse(listener, msg, arity, buf, rbuf); + } else if (!strncmp(tupletag, "setevent", MAXATOMLEN)) { + ret = handle_msg_setevent(listener, msg, arity, buf, rbuf); + } else if (!strncmp(tupletag, "session_setevent", MAXATOMLEN)) { + ret = handle_msg_session_setevent(listener, msg, arity, buf, rbuf); } else { ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_atom(rbuf, "error");