add Andrew's patch from FS-3432 as a starting point with todo markers

This commit is contained in:
Tamas Cseke 2012-05-24 09:37:41 +02:00
parent fbcb862265
commit 8f4c5bc492
3 changed files with 187 additions and 71 deletions

View File

@ -286,6 +286,7 @@ static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_bu
switch_set_flag_locked(listener, LFLAG_EVENTS);
}
/* TODO - listener write lock */
for (i = 1; i < arity; i++) {
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
@ -335,6 +336,7 @@ static switch_status_t handle_msg_session_event(listener_t *listener, erlang_msg
for (i = 1; i < arity; i++) {
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
/* TODO session write locking */
if (custom) {
switch_core_hash_insert(session->event_hash, atom, MARKER);
} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
@ -380,10 +382,12 @@ static switch_status_t handle_msg_nixevent(listener_t *listener, int arity, ei_x
int i = 0;
switch_event_types_t type;
/* TODO listener write lock */
for (i = 1; i < arity; i++) {
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
if (custom) {
switch_core_hash_delete(listener->event_hash, atom);
} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
uint32_t x = 0;
@ -426,6 +430,7 @@ static switch_status_t handle_msg_session_nixevent(listener_t *listener, erlang_
int i = 0;
switch_event_types_t type;
/* TODO session write lock */
for (i = 1; i < arity; i++) {
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
@ -480,6 +485,8 @@ static switch_status_t handle_msg_setevent(listener_t *listener, erlang_msg *msg
switch_event_types_t type;
int i = 0;
/* TODO listener write lock */
/* clear any previous event registrations */
for( x = 0; x <= SWITCH_EVENT_ALL; x++){
event_list[x] = 0;
@ -517,6 +524,7 @@ static switch_status_t handle_msg_setevent(listener_t *listener, erlang_msg *msg
/* update the event subscriptions with the new ones */
memcpy(listener->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
/* wipe the old hash, and point the pointer at the new one */
/* TODO make thread safe */
switch_core_hash_destroy(&listener->event_hash);
listener->event_hash = event_hash;
@ -544,12 +552,14 @@ static switch_status_t handle_msg_session_setevent(listener_t *listener, erlang_
switch_event_types_t type;
uint32_t x = 0;
/* TODO session write lock */
/* clear any previous event registrations */
for (x = 0; x <= SWITCH_EVENT_ALL; x++){
event_list[x] = 0;
}
/* create new hash */
/* TODO make thread safe*/
switch_core_hash_init(&event_hash, session->pool);
for (i = 1; i < arity; i++){
@ -576,6 +586,7 @@ static switch_status_t handle_msg_session_setevent(listener_t *listener, erlang_
/* update the event subscriptions with the new ones */
memcpy(session->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
/* wipe the old hash, and point the pointer at the new one */
/* TODO make thread safe*/
switch_core_hash_destroy(&session->event_hash);
session->event_hash = event_hash;
/* TODO - we should flush any non-matching events from the queue */
@ -601,13 +612,13 @@ static switch_status_t handle_msg_api(listener_t *listener, erlang_msg * msg, in
fail = SWITCH_TRUE;
}
ei_get_type(buf->buff, &buf->index, &type, &size);
ei_get_type(buf->buff, &buf->index, &type, &size);
if ((size > (sizeof(api_cmd) - 1)) || ei_decode_atom(buf->buff, &buf->index, api_cmd)) {
fail = SWITCH_TRUE;
}
ei_get_type(buf->buff, &buf->index, &type, &size);
ei_get_type(buf->buff, &buf->index, &type, &size);
arg = malloc(size + 1);
if (ei_decode_string(buf->buff, &buf->index, arg)) {
@ -706,29 +717,29 @@ static switch_status_t handle_msg_sendevent(listener_t *listener, int arity, ei_
while (!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
i++;
ei_get_type(buf->buff, &buf->index, &type, &size);
ei_get_type(buf->buff, &buf->index, &type, &size);
if ((size > (sizeof(key) - 1)) || ei_decode_string(buf->buff, &buf->index, key)) {
fail = SWITCH_TRUE;
break;
}
ei_get_type(buf->buff, &buf->index, &type, &size);
value = malloc(size + 1);
ei_get_type(buf->buff, &buf->index, &type, &size);
value = malloc(size + 1);
if (ei_decode_string(buf->buff, &buf->index, value)) {
fail = SWITCH_TRUE;
break;
}
if (!fail && !strcmp(key, "body")) {
switch_safe_free(event->body);
event->body = value;
} else if (!fail) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM | SWITCH_STACK_NODUP, key, value);
}
/* Do not free malloc here! The above commands utilize the raw allocated memory and skip any copying/duplication. Faster. */
if (!fail && !strcmp(key, "body")) {
switch_safe_free(event->body);
event->body = value;
} else if (!fail) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM | SWITCH_STACK_NODUP, key, value);
}
/* Do not free malloc here! The above commands utilize the raw allocated memory and skip any copying/duplication. Faster. */
}
if (headerlength != i || fail) {
@ -763,22 +774,22 @@ static switch_status_t handle_msg_sendmsg(listener_t *listener, int arity, ei_x_
char key[1024];
char *value;
int type;
int size;
int type;
int size;
int i = 0;
switch_bool_t fail = SWITCH_FALSE;
while (!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
i++;
ei_get_type(buf->buff, &buf->index, &type, &size);
ei_get_type(buf->buff, &buf->index, &type, &size);
if ((size > (sizeof(key) - 1)) || ei_decode_string(buf->buff, &buf->index, key)) {
fail = SWITCH_TRUE;
break;
}
ei_get_type(buf->buff, &buf->index, &type, &size);
value = malloc(size + 1);
ei_get_type(buf->buff, &buf->index, &type, &size);
value = malloc(size + 1);
if (ei_decode_string(buf->buff, &buf->index, value)) {
fail = SWITCH_TRUE;
@ -786,7 +797,7 @@ static switch_status_t handle_msg_sendmsg(listener_t *listener, int arity, ei_x_
}
if (!fail) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM | SWITCH_STACK_NODUP, key, value);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM | SWITCH_STACK_NODUP, key, value);
}
}
@ -1024,6 +1035,7 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, e
listener->event_list[x] = 0;
}
/* wipe the hash */
/* TODO make thread safe*/
switch_core_hash_destroy(&listener->event_hash);
switch_core_hash_init(&listener->event_hash, listener->pool);
ei_x_encode_atom(rbuf, "ok");
@ -1044,6 +1056,7 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, e
session->event_list[x] = 0;
}
/* wipe the hash */
/* TODO make thread safe*/
switch_core_hash_destroy(&session->event_hash);
switch_core_hash_init(&session->event_hash, session->pool);
ei_x_encode_atom(rbuf, "ok");
@ -1240,6 +1253,7 @@ int handle_msg(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buf
buf->index = 0;
ei_decode_version(buf->buff, &buf->index, &version);
ei_get_type(buf->buff, &buf->index, &type, &size);
switch (type) {
case ERL_SMALL_TUPLE_EXT:
case ERL_LARGE_TUPLE_EXT:
@ -1288,11 +1302,8 @@ int handle_msg(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buf
#ifdef EI_DEBUG
ei_x_print_msg(rbuf, &msg->from, 1);
#endif
return SWITCH_STATUS_SUCCESS != ret;
if (SWITCH_STATUS_SUCCESS == ret)
return 0;
else /* SWITCH_STATUS_TERM */
return 1;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Empty reply, supressing\n");
return 0;

View File

@ -59,6 +59,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l
switch_thread_rwlock_rdlock(globals.listener_rwlock);
for (l = listen_list.listeners; l; l = l->next) {
/* TODO listener read lock */
if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) {
switch_log_node_t *dnode = switch_log_node_dup(node);
@ -131,9 +132,10 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t
return;
}
switch_thread_rwlock_rdlock(listener->session_rwlock);
s = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid);
switch_thread_rwlock_unlock(listener->session_rwlock);
/* TODO - we don't need to hold the lock, we need to lock the session */
if (s) {
int send = 0;
@ -162,6 +164,7 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t
switch_event_name(event->event_id), s->uuid_str);
}
}
switch_thread_rwlock_unlock(listener->session_rwlock);
}
static void event_handler(switch_event_t *event)
@ -175,9 +178,10 @@ static void event_handler(switch_event_t *event)
return;
}
switch_thread_rwlock_rdlock(globals.listener_rwlock);
lp = listen_list.listeners;
switch_thread_rwlock_rdlock(globals.listener_rwlock);
while (lp) {
uint8_t send = 0;
@ -188,6 +192,8 @@ static void event_handler(switch_event_t *event)
one of them should receive the event as well
*/
/* TODO need read locking */
send_event_to_attached_sessions(l, event);
if (!switch_test_flag(l, LFLAG_EVENTS)) {
@ -249,21 +255,21 @@ static void close_socket(int *sock)
}
static void add_listener(listener_t *listener)
{
/*static void add_listener(listener_t *listener)*/
/*{*/
/* add me to the listeners so I get events */
switch_thread_rwlock_wrlock(globals.listener_rwlock);
listener->next = listen_list.listeners;
listen_list.listeners = listener;
switch_thread_rwlock_unlock(globals.listener_rwlock);
}
/*switch_thread_rwlock_wrlock(globals.listener_rwlock);*/
/*listener->next = listen_list.listeners;*/
/*listen_list.listeners = listener;*/
/*switch_thread_rwlock_unlock(globals.listener_rwlock);*/
/*}*/
/* TODO lock */
static void remove_listener(listener_t *listener)
{
listener_t *l, *last = NULL;
switch_thread_rwlock_wrlock(globals.listener_rwlock);
for (l = listen_list.listeners; l; l = l->next) {
if (l == listener) {
if (last) {
@ -274,7 +280,6 @@ static void remove_listener(listener_t *listener)
}
last = l;
}
switch_thread_rwlock_unlock(globals.listener_rwlock);
}
/* Search for a listener already talking to the specified node */
@ -285,6 +290,7 @@ static listener_t *find_listener(char *nodename)
switch_thread_rwlock_rdlock(globals.listener_rwlock);
for (l = listen_list.listeners; l; l = l->next) {
if (!strncmp(nodename, l->peer_nodename, MAXNODELEN)) {
/* TODO listener rwlock */
break;
}
}
@ -301,11 +307,19 @@ static void add_session_elem_to_listener(listener_t *listener, session_elem_t *s
}
/* TODO lock */
static void remove_session_elem_from_listener(listener_t *listener, session_elem_t *session_element)
{
switch_core_hash_delete(listener->sessions, session_element->uuid_str);
}
static void remove_session_elem_from_listener_locked(listener_t *listener, session_elem_t *session_element)
{
switch_thread_rwlock_wrlock(listener->session_rwlock);
remove_session_elem_from_listener(listener, session_element);
switch_thread_rwlock_unlock(listener->session_rwlock);
}
static void destroy_session_elem(session_elem_t *session_element)
{
switch_core_session_t *session;
@ -315,16 +329,10 @@ static void destroy_session_elem(session_elem_t *session_element)
switch_core_session_rwunlock(session);
}
switch_core_destroy_memory_pool(&session_element->pool);
session_element = NULL;
/*switch_safe_free(s); */
}
static void remove_session_elem_from_listener_locked(listener_t *listener, session_elem_t *session_element)
{
switch_thread_rwlock_wrlock(listener->session_rwlock);
remove_session_elem_from_listener(listener, session_element);
switch_thread_rwlock_unlock(listener->session_rwlock);
}
session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid)
{
@ -362,6 +370,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
switch_xml_t xml = NULL;
ei_x_buff *rep;
ei_x_buff buf;
ei_x_new_with_version(&buf);
switch_uuid_get(&uuid);
@ -403,6 +412,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
/* Create a new fetch object. */
p = malloc(sizeof(*p));
switch_thread_cond_create(&p->ready_or_found, module_pool);
/* TODO module pool */
switch_mutex_init(&p->mutex, SWITCH_MUTEX_UNNESTED, module_pool);
p->state = reply_not_ready;
p->reply = NULL;
@ -430,8 +440,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
/* Tell the threads to be ready, and wait five seconds for a reply. */
switch_mutex_lock(p->mutex);
//p->state = reply_waiting;
switch_thread_cond_timedwait(p->ready_or_found,
p->mutex, 5000000);
switch_thread_cond_timedwait(p->ready_or_found, p->mutex, 5000000);
if (!p->reply) {
p->state = reply_timeout;
switch_mutex_unlock(p->mutex);
@ -516,6 +525,7 @@ static switch_status_t notify_new_session(listener_t *listener, session_elem_t *
switch_caller_profile_event_set_data(switch_channel_get_caller_profile(channel), "Channel", call_event);
switch_channel_event_set_data(channel, call_event);
switch_core_session_rwunlock(session);
/* TODO reply? sure? */
switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "Content-Type", "command/reply");
switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "Reply-Text", "+OK\n");
@ -551,6 +561,7 @@ static switch_status_t check_attached_sessions(listener_t *listener)
/* event used to track sessions to remove */
switch_event_t *event = NULL;
switch_event_header_t *header = NULL;
switch_event_create_subclass(&event, SWITCH_EVENT_CLONE, NULL);
switch_assert(event);
/* check up on all the attached sessions -
@ -558,6 +569,8 @@ static switch_status_t check_attached_sessions(listener_t *listener)
if they have pending events in their queues then send them
if the session has finished then clean it up
*/
/* TODO try to minimize critical section */
switch_thread_rwlock_rdlock(listener->session_rwlock);
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
switch_hash_this(iter, &key, NULL, &value);
@ -643,6 +656,8 @@ static switch_status_t check_attached_sessions(listener_t *listener)
/* release the read lock and get a write lock */
switch_thread_rwlock_wrlock(listener->session_rwlock);
/* do the deferred remove */
/* TODO refactor find_session_elem_by_uuid*/
for (header = event->headers; header; header = header->next) {
if ((sp = (session_elem_t*)switch_core_hash_find(listener->sessions, header->value))) {
remove_session_elem_from_listener(listener, sp);
@ -762,6 +777,7 @@ static void handle_exit(listener_t *listener, erlang_pid * pid)
switch_core_session_rwunlock(session);
}
/* TODO - if a spawned process that was handling an outbound call fails.. what do we do with the call? */
/* TODO hangup and let the state handler set the complete flag and destroy as usual*/
}
remove_session_elem_from_listener_locked(listener, s);
destroy_session_elem(s);
@ -772,6 +788,7 @@ static void handle_exit(listener_t *listener, erlang_pid * pid)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Log handler process for node %s exited\n", pid->node);
/*purge the log queue */
/* TODO don't we want to clear flag first? */
while (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) {
switch_log_node_t *dnode = (switch_log_node_t *) pop;
switch_log_node_free(&dnode);
@ -787,6 +804,7 @@ static void handle_exit(listener_t *listener, erlang_pid * pid)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Event handler process for node %s exited\n", pid->node);
/*purge the event queue */
/* TODO don't we want to clear flag first? */
while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
switch_event_t *pevent = (switch_event_t *) pop;
switch_event_destroy(&pevent);
@ -795,10 +813,13 @@ static void handle_exit(listener_t *listener, erlang_pid * pid)
if (switch_test_flag(listener, LFLAG_EVENTS)) {
uint8_t x = 0;
switch_clear_flag_locked(listener, LFLAG_EVENTS);
for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
listener->event_list[x] = 0;
}
/* wipe the hash */
/* XXX this needs to be locked */
/* TODO switch_core_hash_delete_multi_locked */
switch_core_hash_destroy(&listener->event_hash);
switch_core_hash_init(&listener->event_hash, listener->pool);
}
@ -865,7 +886,16 @@ static void listener_main_loop(listener_t *listener)
case ERL_EXIT:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_exit from %s <%d.%d.%d>\n", msg.from.node, msg.from.creation, msg.from.num,
msg.from.serial);
handle_exit(listener, &msg.from);
switch_thread_rwlock_rdlock(globals.listener_rwlock);
if (listener) {
/* get the listener lock */
switch_thread_rwlock_wrlock(listener->rwlock);
/* wipe event hash */
handle_exit(listener, &msg.from);
switch_thread_rwlock_unlock(listener->rwlock);
}
switch_thread_rwlock_unlock(globals.listener_rwlock);
break;
default:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "unexpected msg type %d\n", (int) (msg.msgtype));
@ -874,7 +904,7 @@ static void listener_main_loop(listener_t *listener)
break;
case ERL_ERROR:
if (erl_errno != ETIMEDOUT && erl_errno != EAGAIN) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_error\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "erl_error: status=%d, erl_errno=%d errno=%d\n", status, erl_errno, errno);
}
break;
default:
@ -892,6 +922,11 @@ static void listener_main_loop(listener_t *listener)
return;
}
}
if (prefs.done) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "shutting down listener\n");
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "listener exit: status=%d, erl_errno=%d errno=%d\n", status, erl_errno, errno);
}
}
static switch_bool_t check_inbound_acl(listener_t *listener)
@ -941,7 +976,7 @@ static switch_bool_t check_inbound_acl(listener_t *listener)
static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
{
listener_t *listener = (listener_t *) obj;
session_elem_t *s;
session_elem_t *s = NULL;
const void *key;
void *value;
switch_hash_index_t *iter;
@ -959,24 +994,29 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open from %s\n", listener->remote_ip); /*, listener->remote_port); */
}
add_listener(listener);
/*add_listener(listener);*/
listener_main_loop(listener);
}
/* clean up */
remove_listener(listener);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n");
listener->dead = 1; /* mark it as dead */
/* TODO - release write lock */
switch_thread_rwlock_wrlock(globals.listener_rwlock);
remove_listener(listener);
switch_thread_rwlock_unlock(globals.listener_rwlock);
switch_thread_rwlock_wrlock(listener->rwlock);
if (listener->sockfd) {
close_socket(&listener->sockfd);
}
switch_thread_rwlock_unlock(listener->rwlock);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Closed\n");
/* TODO make listener destroy function and move there */
switch_core_hash_destroy(&listener->event_hash);
/* remove any bindings for this connection */
@ -987,9 +1027,12 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
switch_hash_this(iter, &key, NULL, &value);
s = (session_elem_t*)value;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Orphaning call %s\n", s->uuid_str);
remove_session_elem_from_listener(listener, s);
destroy_session_elem(s);
}
switch_thread_rwlock_unlock(listener->session_rwlock);
switch_thread_rwlock_unlock(listener->rwlock);
if (listener->pool) {
switch_memory_pool_t *pool = listener->pool;
@ -1156,30 +1199,31 @@ static int config(void)
return 0;
}
static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
{
switch_memory_pool_t *listener_pool = NULL;
switch_memory_pool_t *pool = NULL;
listener_t *listener = NULL;
if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) {
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
return NULL;
}
if (!(listener = switch_core_alloc(listener_pool, sizeof(*listener)))) {
if (!(listener = switch_core_alloc(pool, sizeof(*listener)))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
switch_core_destroy_memory_pool(&pool);
return NULL;
}
memset(listener, 0, sizeof(*listener));
switch_thread_rwlock_create(&listener->rwlock, listener_pool);
switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener_pool);
switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, listener_pool);
switch_thread_rwlock_create(&listener->rwlock, pool);
switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool);
switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool);
/* TODO remove */
listener->dead = 0; /* born alive */
listener->sockfd = clientfd;
listener->pool = listener_pool;
listener_pool = NULL;
listener->pool = pool;
listener->ec = switch_core_alloc(listener->pool, sizeof(ei_cnode));
memcpy(listener->ec, ec, sizeof(ei_cnode));
listener->level = SWITCH_LOG_DEBUG;
@ -1189,15 +1233,49 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
switch_core_hash_init(&listener->event_hash, listener->pool);
switch_core_hash_init(&listener->sessions, listener->pool);
/* TODO listener rdlock */
listener->next = listen_list.listeners;
listen_list.listeners = listener;
return listener;
}
static listener_t *new_outbound_listener(char *node)
/*TODO we don't need bottleneck*/
static listener_t *new_listener_locked(struct ei_cnode_s *ec, int clientfd)
{
listener_t *res;
switch_thread_rwlock_wrlock(globals.listener_rwlock);
res = new_listener(ec, clientfd);
switch_thread_rwlock_unlock(globals.listener_rwlock);
return res;
}
/* TODO new session??? */
static listener_t *new_outbound_listener(char *node, switch_bool_t *new_session)
{
listener_t *listener = NULL;
struct ei_cnode_s ec;
int clientfd;
/* TODO find listener func */
switch_thread_rwlock_wrlock(globals.listener_rwlock);
for (listener = listen_list.listeners; listener; listener = listener->next) {
if (!strncmp(node, listener->peer_nodename, MAXNODELEN)) {
break;
}
}
if (listener && listener->dead) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "found dead listener for %s\n", node);
remove_listener(listener); /* remove the dead listener and continue adding one */
} else if (listener) {
switch_thread_rwlock_unlock(globals.listener_rwlock);
*new_session = SWITCH_FALSE;
return listener;
}
if (SWITCH_STATUS_SUCCESS == initialise_ei(&ec)) {
#ifdef WIN32
WSASetLastError(0);
@ -1206,11 +1284,17 @@ static listener_t *new_outbound_listener(char *node)
#endif
if ((clientfd = ei_connect(&ec, node)) < 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error connecting to node %s (erl_errno=%d, errno=%d)!\n", node, erl_errno, errno);
switch_thread_rwlock_unlock(globals.listener_rwlock);
return NULL;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "new listener for %s\n", node);
listener = new_listener(&ec, clientfd);
listener->peer_nodename = switch_core_strdup(listener->pool, node);
}
switch_thread_rwlock_unlock(globals.listener_rwlock);
*new_session = SWITCH_TRUE;
return listener;
}
@ -1226,6 +1310,7 @@ static switch_status_t state_handler(switch_core_session_t *session)
if (state == CS_DESTROY) {
/* indicate that once all the events in the event queue are done
* we can throw this away */
/* TODO locked? */
switch_set_flag(session_element, LFLAG_SESSION_COMPLETE);
}
} else {
@ -1298,6 +1383,7 @@ session_elem_t *attach_call_to_pid(listener_t *listener, erlang_pid * pid, switc
memcpy(&session_element->process.pid, pid, sizeof(erlang_pid));
/* attach the session to the listener */
add_session_elem_to_listener(listener, session_element);
/* TODO link before added to listener? */
ei_link(listener, ei_self(listener->ec), pid);
return session_element;
@ -1361,8 +1447,7 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
*/
}
switch_thread_cond_timedwait(p->ready_or_found,
p->mutex, 5000000);
switch_thread_cond_timedwait(p->ready_or_found, p->mutex, 5000000);
if (!p->pid) {
p->state = reply_timeout;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid %s %s\n", hash, session_element->uuid_str);
@ -1460,13 +1545,23 @@ SWITCH_STANDARD_APP(erlang_outbound_function)
/* if there is no listener, then create one */
if (!listener) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for session\n");
new_session = SWITCH_TRUE;
listener = new_outbound_listener(node);
listener = new_outbound_listener(node, &new_session);
/* XXX new_session isn't accurate now */
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for session\n");
/* TODO don't we need to connect ? */
}
if (listener) {
/* TODO it's too late */
switch_thread_rwlock_rdlock(globals.listener_rwlock);
if (listener && !listener->dead) {
/* prevent the listener_run thread from destroying the listener out from under us */
/* get the listener lock */
switch_thread_rwlock_rdlock(listener->rwlock);
/* release the global listener lock, since the listener can't be freed without the listener lock */
switch_thread_rwlock_unlock(globals.listener_rwlock);
if (new_session == SWITCH_TRUE) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching new listener\n");
launch_listener_thread(listener);
@ -1480,11 +1575,18 @@ SWITCH_STANDARD_APP(erlang_outbound_function)
session_element = attach_call_to_registered_process(listener, reg_name, session);
}
/* should be safe now */
switch_thread_rwlock_unlock(listener->rwlock);
if (session_element) {
switch_ivr_park(session, NULL);
}
} else {
switch_thread_rwlock_unlock(globals.listener_rwlock);
}
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "exit erlang_outbound_function\n");
}
@ -1498,6 +1600,7 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function)
char *mydata;
ei_x_buff buf;
listener_t *listener;
switch_bool_t new_session;
ei_x_new_with_version(&buf);
@ -1523,12 +1626,12 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function)
listener = find_listener(node);
if (!listener) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for sendmsg %s\n", node);
listener = new_outbound_listener(node);
listener = new_outbound_listener(node, &new_session);
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for sendmsg to %s\n", node);
}
if (listener) {
if (listener && !listener->dead) {
ei_reg_send(listener->ec, listener->sockfd, reg_name, buf.buff, buf.index);
}
}
@ -1598,11 +1701,11 @@ SWITCH_STANDARD_API(erlang_cmd)
stream->write_function(stream, "Outbound session for %s in state %s\n", sp->uuid_str,
switch_channel_state_name(sp->channel_state));
}
switch_thread_rwlock_unlock(l->session_rwlock);
if (empty) {
stream->write_function(stream, "No active sessions for %s\n", argv[1]);
}
switch_thread_rwlock_unlock(l->session_rwlock);
break;
}
}
@ -1824,7 +1927,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
continue;
}
listener = new_listener(&ec, clientfd);
listener = new_listener_locked(&ec, clientfd);
if (listener) {
/* store the IP and node name we are talking with */
switch_inet_ntop(AF_INET, conn.ipadr, listener->remote_ip, sizeof(listener->remote_ip));

View File

@ -39,7 +39,8 @@ typedef enum {
} session_flag_t;
typedef enum {
ERLANG_PID = 0,
NONE = 0,
ERLANG_PID,
ERLANG_REG_PROCESS
} process_type;
@ -113,6 +114,7 @@ struct listener {
#else
int sockfd;
#endif
uint8_t dead;
struct ei_cnode_s *ec;
struct erlang_process log_process;
struct erlang_process event_process;