diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index e5771b5a11..d4e572030a 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -1149,10 +1149,10 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void if (node) { - switch_mutex_lock(node->mutex); - //node->busy = switch_epoch_time_now(NULL) + 600; + switch_thread_rwlock_wrlock(node->rwlock); + node->busy = 0; node->ring_consumer_count = 1; - switch_mutex_unlock(node->mutex); + switch_thread_rwlock_unlock(node->rwlock); } else { goto end; } @@ -1319,7 +1319,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, " "outbound_fail_count=outbound_fail_count+1, " "outbound_fail_total_count = outbound_fail_total_count+1, " - "next_avail=%ld + lag where uuid='%q' and ring_count > 0", + "next_avail=%ld + lag + 1 where uuid='%q' and ring_count > 0", (long) switch_epoch_time_now(NULL), h->uuid); fifo_execute_sql(sql, globals.sql_mutex); switch_safe_free(sql); @@ -1389,6 +1389,14 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void cbh->ready = 1; + if (node) { + switch_thread_rwlock_wrlock(node->rwlock); + node->ring_consumer_count = 0; + node->busy = 0; + switch_thread_rwlock_unlock(node->rwlock); + } + + for (i = 0; i < cbh->rowcount; i++) { struct call_helper *h = cbh->rows[i]; del_consumer_outbound_call(h->uuid); @@ -1404,14 +1412,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void if (pop_dup) { switch_event_destroy(&pop_dup); } - - if (node) { - switch_mutex_lock(node->mutex); - node->ring_consumer_count = 0; - //node->busy = switch_epoch_time_now(NULL) + connected; - switch_mutex_unlock(node->mutex); - } - + switch_core_destroy_memory_pool(&cbh->pool); return NULL; @@ -1439,10 +1440,10 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) switch_mutex_unlock(globals.mutex); if (node) { - switch_mutex_lock(node->mutex); + switch_thread_rwlock_wrlock(node->rwlock); node->ring_consumer_count++; - //node->busy = switch_epoch_time_now(NULL) + 600; - switch_mutex_unlock(node->mutex); + node->busy = 0; + switch_thread_rwlock_unlock(node->rwlock); } switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS); @@ -1485,7 +1486,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) if (status != SWITCH_STATUS_SUCCESS) { sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, " - "outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag where uuid='%q' and use_count > 0", + "outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag + 1 where uuid='%q' and use_count > 0", (long) switch_epoch_time_now(NULL), h->uuid); fifo_execute_sql(sql, globals.sql_mutex); switch_safe_free(sql); @@ -1539,12 +1540,12 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) switch_event_destroy(&ovars); if (node) { - switch_mutex_lock(node->mutex); + switch_thread_rwlock_wrlock(node->rwlock); if (node->ring_consumer_count-- < 0) { node->ring_consumer_count = 0; } - //node->busy = switch_epoch_time_now(NULL) + connected; - switch_mutex_unlock(node->mutex); + node->busy = 0; + switch_thread_rwlock_unlock(node->rwlock); } switch_core_destroy_memory_pool(&h->pool); @@ -1652,15 +1653,10 @@ static void find_consumers(fifo_node_t *node) fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh); if (cbh->rowcount) { - int sanity = 40; - switch_threadattr_create(&thd_attr, cbh->pool); switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_thread_create(&thread, thd_attr, ringall_thread_run, cbh, cbh->pool); - while(--sanity > 0 && !cbh->ready) { - switch_yield(100000); - } } } @@ -1689,8 +1685,7 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { switch_hash_this(hi, &var, NULL, &val); if ((node = (fifo_node_t *) val)) { - switch_mutex_lock(node->mutex); - if (node->has_outbound && node->ready) {// && switch_epoch_time_now(NULL) > node->busy) { + if (node->has_outbound && node->ready && !node->busy) { ppl_waiting = node_consumer_wait_count(node); consumer_total = node->consumer_count; idle_consumers = node_idle_consumers(node); @@ -1704,7 +1699,6 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o switch_yield(1000000); } } - switch_mutex_unlock(node->mutex); } } switch_mutex_unlock(globals.mutex); @@ -1922,8 +1916,9 @@ static void dec_use_count(switch_channel_t *channel) if ((outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid"))) { del_bridge_call(outbound_id); - sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag where use_count > 0 and uuid='%q'", + sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag + 1 where use_count > 0 and uuid='%q'", now, now, outbound_id); + fifo_execute_sql(sql, globals.sql_mutex); switch_safe_free(sql); } @@ -1963,9 +1958,15 @@ SWITCH_STANDARD_APP(fifo_track_call_function) return; } + if (switch_true(switch_channel_get_variable(channel, "fifo_track_call"))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s trying to double-track call!\n", switch_channel_get_name(channel)); + return; + } + add_bridge_call(data); switch_channel_set_variable(channel, "fifo_outbound_uuid", data); + switch_channel_set_variable(channel, "fifo_track_call", "true"); if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_OUTBOUND) { col1 = "manual_calls_in_count"; @@ -1978,6 +1979,7 @@ SWITCH_STANDARD_APP(fifo_track_call_function) sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,outbound_fail_count=0,use_count=use_count+1,%s=%s+1,%s=%s+1 where uuid='%q'", (long) switch_epoch_time_now(NULL), col1, col1, col2, col2, data); fifo_execute_sql(sql, globals.sql_mutex); + switch_safe_free(sql); @@ -2218,7 +2220,7 @@ SWITCH_STANDARD_APP(fifo_function) switch_channel_answer(channel); - switch_mutex_lock(node->mutex); + switch_thread_rwlock_wrlock(node->rwlock); node->caller_count++; if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) { @@ -2257,7 +2259,7 @@ SWITCH_STANDARD_APP(fifo_function) switch_channel_set_variable(channel, "fifo_priority", tmp); } - switch_mutex_unlock(node->mutex); + switch_thread_rwlock_unlock(node->rwlock); ts = switch_micro_time_now(); switch_time_exp_lt(&tm, ts); @@ -2342,6 +2344,8 @@ SWITCH_STANDARD_APP(fifo_function) switch_channel_clear_app_flag(channel, CF_APP_TAGGED); abort: + + fifo_caller_del(switch_core_session_get_uuid(session)); if (!aborted && switch_channel_ready(channel)) { switch_channel_set_state(channel, CS_HIBERNATE); @@ -2361,10 +2365,10 @@ SWITCH_STANDARD_APP(fifo_function) } switch_mutex_lock(globals.mutex); - switch_mutex_lock(node->mutex); + switch_thread_rwlock_wrlock(node->rwlock); node_remove_uuid(node, uuid); node->caller_count--; - switch_mutex_unlock(node->mutex); + switch_thread_rwlock_unlock(node->rwlock); send_presence(node); check_cancel(node); switch_mutex_unlock(globals.mutex); @@ -2588,15 +2592,10 @@ SWITCH_STANDARD_APP(fifo_function) } } - if (pop) { - fifo_caller_del(switch_str_nil(switch_event_get_header(pop, "unique-id"))); - } - - if (pop && !node_consumer_wait_count(node)) { - switch_mutex_lock(node->mutex); + switch_thread_rwlock_wrlock(node->rwlock); node->start_waiting = 0; - switch_mutex_unlock(node->mutex); + switch_thread_rwlock_unlock(node->rwlock); } } @@ -2710,9 +2709,9 @@ SWITCH_STANDARD_APP(fifo_function) const char *arg = switch_channel_get_variable(other_channel, "current_application_data"); switch_caller_extension_t *extension = NULL; - switch_mutex_lock(node->mutex); + switch_thread_rwlock_wrlock(node->rwlock); node->caller_count--; - switch_mutex_unlock(node->mutex); + switch_thread_rwlock_unlock(node->rwlock); send_presence(node); check_cancel(node); @@ -2786,6 +2785,8 @@ SWITCH_STANDARD_APP(fifo_function) sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,use_count=use_count+1,outbound_fail_count=0 where uuid='%s'", switch_epoch_time_now(NULL), outbound_id); + + fifo_execute_sql(sql, globals.sql_mutex); switch_safe_free(sql); } @@ -2817,7 +2818,7 @@ SWITCH_STANDARD_APP(fifo_function) sql = switch_mprintf("update fifo_outbound set stop_time=%ld, use_count=use_count-1, " "outbound_call_total_count=outbound_call_total_count+1, " - "outbound_call_count=outbound_call_count+1, next_avail=%ld + lag where uuid='%s' and use_count > 0", + "outbound_call_count=outbound_call_count+1, next_avail=%ld + lag + 1 where uuid='%s' and use_count > 0", now, now, outbound_id); fifo_execute_sql(sql, globals.sql_mutex); @@ -2876,9 +2877,9 @@ SWITCH_STANDARD_APP(fifo_function) switch_channel_set_variable(other_channel, "fifo_status", "DONE"); switch_channel_set_variable(other_channel, "fifo_timestamp", date); - switch_mutex_lock(node->mutex); + switch_thread_rwlock_wrlock(node->rwlock); node->caller_count--; - switch_mutex_unlock(node->mutex); + switch_thread_rwlock_unlock(node->rwlock); send_presence(node); check_cancel(node); switch_core_session_rwunlock(other_session); @@ -3523,9 +3524,9 @@ SWITCH_STANDARD_API(fifo_api_function) switch_hash_this(hi, &var, NULL, &val); node = (fifo_node_t *) val; len = node_consumer_wait_count(node); - switch_mutex_lock(node->mutex); + switch_thread_rwlock_wrlock(node->rwlock); stream->write_function(stream, "%s:%d:%d:%d\n", (char *) var, node->consumer_count, node->caller_count, len); - switch_mutex_unlock(node->mutex); + switch_thread_rwlock_unlock(node->rwlock); x++; } @@ -3534,9 +3535,9 @@ SWITCH_STANDARD_API(fifo_api_function) } } else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) { len = node_consumer_wait_count(node); - switch_mutex_lock(node->mutex); + switch_thread_rwlock_wrlock(node->rwlock); stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node->caller_count, len); - switch_mutex_unlock(node->mutex); + switch_thread_rwlock_unlock(node->rwlock); } else { stream->write_function(stream, "none\n"); } @@ -3546,9 +3547,9 @@ SWITCH_STANDARD_API(fifo_api_function) switch_hash_this(hi, &var, NULL, &val); node = (fifo_node_t *) val; len = node_consumer_wait_count(node); - switch_mutex_lock(node->mutex); + switch_thread_rwlock_wrlock(node->rwlock); stream->write_function(stream, "%s:%d\n", (char *) var, node->has_outbound); - switch_mutex_unlock(node->mutex); + switch_thread_rwlock_unlock(node->rwlock); x++; } @@ -3557,9 +3558,9 @@ SWITCH_STANDARD_API(fifo_api_function) } } else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) { len = node_consumer_wait_count(node); - switch_mutex_lock(node->mutex); + switch_thread_rwlock_wrlock(node->rwlock); stream->write_function(stream, "%s:%d\n", argv[1], node->has_outbound); - switch_mutex_unlock(node->mutex); + switch_thread_rwlock_unlock(node->rwlock); } else { stream->write_function(stream, "none\n"); }