From 4d5540671764ae7c9bc50d66501cc5ecfa5a60ab Mon Sep 17 00:00:00 2001 From: Josh Perry Date: Fri, 25 Mar 2011 18:28:53 -0600 Subject: [PATCH 1/4] Intitial mod_event_zmq code --- src/mod/event_handlers/mod_event_zmq/Makefile | 27 ++++ .../mod_event_zmq/mod_event_zmq.cpp | 124 ++++++++++++++++++ 2 files changed, 151 insertions(+) create mode 100644 src/mod/event_handlers/mod_event_zmq/Makefile create mode 100644 src/mod/event_handlers/mod_event_zmq/mod_event_zmq.cpp diff --git a/src/mod/event_handlers/mod_event_zmq/Makefile b/src/mod/event_handlers/mod_event_zmq/Makefile new file mode 100644 index 0000000000..46d03f4bea --- /dev/null +++ b/src/mod/event_handlers/mod_event_zmq/Makefile @@ -0,0 +1,27 @@ +BASE=../../../.. + +ZMQ=zeromq-2.1.3 + +ZMQ_BASEURL=http://download.zeromq.org + +ZMQ_DIR=$(switch_srcdir)/libs/$(ZMQ) +ZMQ_BUILDDIR=$(switch_builddir)/libs/$(ZMQ) +LOCAL_CFLAGS=-I$(ZMQ_DIR)/include +ZMQ_LA=$(ZMQ_BUILDDIR)/src/libzmq.la +LOCAL_LIBADD=$(ZMQ_LA) + +include $(BASE)/build/modmake.rules + +$(ZMQ_DIR): + $(GETLIB) $(ZMQ_BASEURL) $(ZMQ).tar.gz + cd $(ZMQ_DIR) && ./autogen.sh + +$(ZMQ_BUILDDIR)/Makefile: $(ZMQ_DIR) + mkdir -p $(ZMQ_DIR) + cd $(ZMQ_BUILDDIR) && $(DEFAULT_VARS) $(ZMQ_DIR)/configure $(DEFAULT_ARGS) --srcdir=$(ZMQ_DIR) + $(TOUCH_TARGET) + +$(ZMQ_LA): $(ZMQ_BUILDDIR)/Makefile + cd $(ZMQ_BUILDDIR) && $(MAKE) + $(TOUCH_TARGET) + diff --git a/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.cpp b/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.cpp new file mode 100644 index 0000000000..7c99345712 --- /dev/null +++ b/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.cpp @@ -0,0 +1,124 @@ +#include +#include +#include +#include +#include +#include + +namespace mod_event_zmq { + +SWITCH_MODULE_LOAD_FUNCTION(load); +SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown); +SWITCH_MODULE_RUNTIME_FUNCTION(runtime); + +extern "C" { +SWITCH_MODULE_DEFINITION(mod_event_zmq, load, shutdown, runtime); +}; + +class ZmqStringMessage : public zmq::message_t { +public: + ZmqStringMessage(const std::string &msg) { + + } +}; + +// Handles publishing events out to clients +class ZmqEventPublisher { +public: + ZmqEventPublisher() : + context(1), + event_publisher(context, ZMQ_PUB) + { + event_publisher.bind("tcp://*.5556"); + } + + void PublishEvent(const switch_event_t *event) { + char* pjson; + switch_event_serialize_json(const_cast(event), &pjson); + std::auto_ptr json(pjson); + + ZmqStringMessage msg(json.get()); + event_publisher.send(msg); + } + +private: + zmq::context_t context; + zmq::socket_t event_publisher; +}; + +// Handles global inititalization and teardown of the module +class ZmqModule { +public: + ZmqModule(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) { + // Subscribe to all switch events of any subclass + // Store a pointer to ourself in the user data + if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, (void*)this, &_node) + != SWITCH_STATUS_SUCCESS) { + throw std::runtime_error("Couldn't bind to switch events."); + } + + // Create our module interface registration + *module_interface = switch_loadable_module_create_module_interface(pool, modname); + } + + void Listen() { + _publisher.reset(new ZmqEventPublisher()); + } + + ~ZmqModule() { + // Unsubscribe from the switch events + switch_event_unbind(&_node); + } + +private: + // Dispatches events to the publisher + static void event_handler(switch_event_t *event) { + try { + ZmqModule *module = (ZmqModule*)event->bind_user_data; + if(module->_publisher.get()) + module->_publisher->PublishEvent(event); + } catch(std::exception ex) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error publishing event via 0MQ: %s\n", ex.what()); + } + } + + switch_event_node_t *_node; + std::auto_ptr _publisher; +}; + +//*****************************// +// GLOBALS // +//*****************************// +std::auto_ptr module; + + +//*****************************// +// Module interface funtions // +//*****************************// +SWITCH_MODULE_LOAD_FUNCTION(load) { + try { + module.reset(new ZmqModule(module_interface, pool)); + return SWITCH_STATUS_SUCCESS; + } catch(const std::exception &ex) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error loading 0MQ module: %s\n", ex.what()); + return SWITCH_STATUS_GENERR; + } + +} + +SWITCH_MODULE_RUNTIME_FUNCTION(runtime) { + try { + // Begin listening for clients + module->Listen(); + } catch(...) { } + + // Tell the switch to stop calling this runtime loop + return SWITCH_STATUS_FALSE; +} + +SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown) { + // Free the module object + module.reset(); +} + +} From e83af319602b616c8cee5efc89386be3d5af61b5 Mon Sep 17 00:00:00 2001 From: Josh Perry Date: Sat, 26 Mar 2011 12:34:22 -0600 Subject: [PATCH 2/4] Updated message creation --- .../mod_event_zmq/mod_event_zmq.cpp | 62 +++++++++++++------ 1 file changed, 44 insertions(+), 18 deletions(-) diff --git a/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.cpp b/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.cpp index 7c99345712..c8975ebe5d 100644 --- a/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.cpp +++ b/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.cpp @@ -1,6 +1,5 @@ #include #include -#include #include #include #include @@ -15,13 +14,6 @@ extern "C" { SWITCH_MODULE_DEFINITION(mod_event_zmq, load, shutdown, runtime); }; -class ZmqStringMessage : public zmq::message_t { -public: - ZmqStringMessage(const std::string &msg) { - - } -}; - // Handles publishing events out to clients class ZmqEventPublisher { public: @@ -29,19 +21,28 @@ public: context(1), event_publisher(context, ZMQ_PUB) { - event_publisher.bind("tcp://*.5556"); + event_publisher.bind("tcp://*:5556"); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Listening for clients\n"); } void PublishEvent(const switch_event_t *event) { + // Serialize the event into a JSON string char* pjson; switch_event_serialize_json(const_cast(event), &pjson); - std::auto_ptr json(pjson); - ZmqStringMessage msg(json.get()); + // Use the JSON string as the message body + zmq::message_t msg(pjson, strlen(pjson), free_message_data, NULL); + + // Send the message event_publisher.send(msg); } private: + static void free_message_data(void *data, void *hint) { + free (data); + } + zmq::context_t context; zmq::socket_t event_publisher; }; @@ -49,25 +50,39 @@ private: // Handles global inititalization and teardown of the module class ZmqModule { public: - ZmqModule(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) { + ZmqModule(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) : + _running(false) { // Subscribe to all switch events of any subclass // Store a pointer to ourself in the user data if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, (void*)this, &_node) != SWITCH_STATUS_SUCCESS) { throw std::runtime_error("Couldn't bind to switch events."); } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Subscribed to events\n"); // Create our module interface registration *module_interface = switch_loadable_module_create_module_interface(pool, modname); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module loaded\n"); } void Listen() { + if(_running) + return; + _publisher.reset(new ZmqEventPublisher()); + _running = true; + + while(_running) { + switch_yield(100000); + } } ~ZmqModule() { // Unsubscribe from the switch events + _running = false; switch_event_unbind(&_node); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module shut down\n"); } private: @@ -79,11 +94,14 @@ private: module->_publisher->PublishEvent(event); } catch(std::exception ex) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error publishing event via 0MQ: %s\n", ex.what()); + } catch(...) { // Exceptions must not propogate to C caller + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unknown error publishing event via 0MQ\n"); } } switch_event_node_t *_node; std::auto_ptr _publisher; + bool _running; }; //*****************************// @@ -99,8 +117,8 @@ SWITCH_MODULE_LOAD_FUNCTION(load) { try { module.reset(new ZmqModule(module_interface, pool)); return SWITCH_STATUS_SUCCESS; - } catch(const std::exception &ex) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error loading 0MQ module: %s\n", ex.what()); + } catch(...) { // Exceptions must not propogate to C caller + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error loading 0MQ module\n"); return SWITCH_STATUS_GENERR; } @@ -110,15 +128,23 @@ SWITCH_MODULE_RUNTIME_FUNCTION(runtime) { try { // Begin listening for clients module->Listen(); - } catch(...) { } + } catch(std::exception &ex) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error listening for clients: %s\n", ex.what()); + } catch(...) { // Exceptions must not propogate to C caller + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown error listening for clients\n"); + } // Tell the switch to stop calling this runtime loop - return SWITCH_STATUS_FALSE; + return SWITCH_STATUS_TERM; } SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown) { - // Free the module object - module.reset(); + try { + // Free the module object + module.reset(); + } catch(...) { // Exceptions must not propogate to C caller + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error shutting down module\n"); + } } } From 0629b31c87e967fab08e45272c3b2b8172de8b45 Mon Sep 17 00:00:00 2001 From: Josh Perry Date: Sat, 26 Mar 2011 12:46:00 -0600 Subject: [PATCH 3/4] Added module to modules.conf source file --- build/modules.conf.in | 1 + 1 file changed, 1 insertion(+) diff --git a/build/modules.conf.in b/build/modules.conf.in index 287bd9a540..1147ad4863 100644 --- a/build/modules.conf.in +++ b/build/modules.conf.in @@ -76,6 +76,7 @@ endpoints/mod_loopback #asr_tts/mod_tts_commandline #event_handlers/mod_event_multicast event_handlers/mod_event_socket +#event_handlers/mod_event_zmq event_handlers/mod_cdr_csv event_handlers/mod_cdr_sqlite #event_handlers/mod_cdr_pg_csv From 6ebc52d3556ab5f4465b2934fb5dd4c0a0b7a59b Mon Sep 17 00:00:00 2001 From: Josh Perry Date: Sat, 26 Mar 2011 12:47:47 -0600 Subject: [PATCH 4/4] Added module to modules.conf.xml --- conf/autoload_configs/modules.conf.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/conf/autoload_configs/modules.conf.xml b/conf/autoload_configs/modules.conf.xml index 9a2b1db482..8553a6dba1 100644 --- a/conf/autoload_configs/modules.conf.xml +++ b/conf/autoload_configs/modules.conf.xml @@ -22,6 +22,7 @@ +