diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c index 0108bdaa79..1bb344f270 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c @@ -254,6 +254,8 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg) } } else if (!strncmp(var, "exchange", 8)) { exchange = switch_core_strdup(profile->pool, "TAP.Events"); + } else if (!strncmp(var, "exchange_type", 13)) { + exchange_type = switch_core_strdup(profile->pool, "topic"); } else if (!strncmp(var, "format_fields", 13)) { int size = 0; if ((size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) { @@ -311,6 +313,18 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] was unable to connect to any connection\n", profile->name); } + amqp_exchange_declare(profile->conn_active->state, 1, + amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes(profile->exchange_type), + 0, + 1, + amqp_empty_table); + + if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] failed to create exchange\n", profile->name); + goto err; + } + /* Create a bounded FIFO queue for sending messages */ if (switch_queue_create(&(profile->send_queue), profile->send_queue_size, profile->pool) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot create send queue of size %d!\n", profile->send_queue_size); @@ -426,7 +440,7 @@ void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void durable, amqp_empty_table); - if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { + if (!mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n"); continue; }