949 lines
28 KiB
C
949 lines
28 KiB
C
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
/**
|
|
* @brief CPR layer for interprocess communication
|
|
*
|
|
* The name of this file may be overly broad, rather this file deals
|
|
* with IPC via message queues. A user may create, destroy and
|
|
* associate a thread with a message queue. Once established, messages
|
|
* can be delivered and retrieved.
|
|
*
|
|
* The send/get APIs attempt to reliably deliver messages even when
|
|
* under stress. Two mechanisms have been added to deal with a full
|
|
* message queue. First, the message queue size may be extended to
|
|
* allow more messages to be handled than supported by an OS.
|
|
* Second, if the queue is indeed full a sleep-and-retry
|
|
* method is used to force a context-switch to allow for other threads
|
|
* to run in hope of clearing some messages off of the queue. The
|
|
* latter method is always-on by default. The former method must be
|
|
* enabled by extending the message queue by some size greater than
|
|
* zero (0).
|
|
*
|
|
* @defgroup IPC The Inter Process Communication module
|
|
* @ingroup CPR
|
|
* @brief The module related to IPC abstraction for the pSIPCC
|
|
* @addtogroup MsgQIPCAPIs The Message Queue IPC APIs
|
|
* @ingroup IPC
|
|
* @brief APIs expected by pSIPCC for using message queues
|
|
*
|
|
* @{
|
|
*
|
|
*
|
|
*/
|
|
#include "cpr.h"
|
|
#include "cpr_stdlib.h"
|
|
#include <cpr_stdio.h>
|
|
#include <errno.h>
|
|
#include <sys/msg.h>
|
|
#include <sys/ipc.h>
|
|
#include "plat_api.h"
|
|
#include "CSFLog.h"
|
|
|
|
static const char *logTag = "cpr_linux_ipc";
|
|
|
|
#define STATIC static
|
|
|
|
/* @def The Message Queue depth */
|
|
#define OS_MSGTQL 31
|
|
|
|
/*
|
|
* Internal CPR API
|
|
*/
|
|
extern pthread_t cprGetThreadId(cprThread_t thread);
|
|
|
|
/**
|
|
* @struct cpr_msgq_node_s
|
|
* Extended internal message queue node
|
|
*
|
|
* A double-linked list holding the necessary message information
|
|
*/
|
|
typedef struct cpr_msgq_node_s
|
|
{
|
|
struct cpr_msgq_node_s *next;
|
|
struct cpr_msgq_node_s *prev;
|
|
void *msg;
|
|
void *pUserData;
|
|
} cpr_msgq_node_t;
|
|
|
|
/**
|
|
* @struct cpr_msg_queue_s
|
|
* Msg queue information needed to hide OS differences in implementation.
|
|
* To use msg queues, the application code may pass in a name to the
|
|
* create function for msg queues. CPR does not use this field, it is
|
|
* solely for the convenience of the application and to aid in debugging.
|
|
*
|
|
* Note: Statistics are not protected by a mutex; therefore, there exists
|
|
* the possibility that the results may not be accurate.
|
|
*
|
|
* Note:if the depth supplied by OS is insufficient,a message queue owner may
|
|
* increase the message queue depth via cprCreateMessageQueue's depth
|
|
* parameter where the value can range from MSGTQL to CPR_MAX_MSG_Q_DEPTH.
|
|
*/
|
|
typedef struct cpr_msg_queue_s
|
|
{
|
|
struct cpr_msg_queue_s *next;
|
|
const char *name;
|
|
pthread_t thread;
|
|
int32_t queueId;
|
|
uint16_t maxCount;
|
|
uint16_t currentCount;
|
|
uint32_t totalCount;
|
|
uint32_t sendErrors;
|
|
uint32_t reTries;
|
|
uint32_t highAttempts;
|
|
uint32_t selfQErrors;
|
|
uint16_t extendedQDepth;
|
|
uint16_t maxExtendedQDepth;
|
|
pthread_mutex_t mutex; /* lock for managing extended queue */
|
|
cpr_msgq_node_t *head; /* extended queue head (newest element) */
|
|
cpr_msgq_node_t *tail; /* extended queue tail (oldest element) */
|
|
} cpr_msg_queue_t;
|
|
|
|
/**
|
|
* @enum cpr_msgq_post_result_e
|
|
* A enumeration used to report the result of posting a message to
|
|
* a message queue
|
|
*/
|
|
typedef enum
|
|
{
|
|
CPR_MSGQ_POST_SUCCESS,
|
|
CPR_MSGQ_POST_FAILED,
|
|
CPR_MSGQ_POST_PENDING
|
|
} cpr_msgq_post_result_e;
|
|
|
|
|
|
/*
|
|
* Head of list of message queues
|
|
*/
|
|
static cpr_msg_queue_t *msgQueueList = NULL;
|
|
|
|
/*
|
|
* Mutex to manage message queue list
|
|
*/
|
|
pthread_mutex_t msgQueueListMutex;
|
|
|
|
/*
|
|
* String to represent message queue name when it is not provided
|
|
*/
|
|
static const char unnamed_string[] = "unnamed";
|
|
|
|
|
|
/*
|
|
* CPR_MAX_MSG_Q_DEPTH
|
|
*
|
|
* The maximum queue depth supported by the CPR layer. This value
|
|
* is arbitrary though the purpose is to limit the memory usage
|
|
* by CPR and avoid (nearly) unbounded situations.
|
|
*
|
|
* Note: This value should be greater than MSGTQL which is currently
|
|
* defined as 31
|
|
*/
|
|
#define CPR_MAX_MSG_Q_DEPTH 256
|
|
|
|
/*
|
|
* CPR_SND_TIMEOUT_WAIT_INTERVAL
|
|
*
|
|
* The interval of time to wait in milliseconds between attempts to
|
|
* send a message to the message queue
|
|
*
|
|
* Note: 20 ms. to avoid less than a tick wake up since on most
|
|
* OSes 10ms is one 1 tick
|
|
* this should really be OS_TICK_MS * 2 or OS_TICK_MS + X
|
|
*/
|
|
#define CPR_SND_TIMEOUT_WAIT_INTERVAL 20
|
|
|
|
/*
|
|
* CPR_ATTEMPTS_TO_SEND
|
|
*
|
|
* The number of attempts made to send a message when the message
|
|
* would otherwise be blocked. Note in this condition the thread
|
|
* will sleep the timeout interval to allow the msg queue to be
|
|
* drained.
|
|
*
|
|
* Note: 25 attempts for upto .5 seconds at the interval of
|
|
* CPR_SND_TIMEOUT_WAIT_INTERVAL worst case.
|
|
*/
|
|
#define CPR_ATTEMPTS_TO_SEND 25
|
|
|
|
/*
|
|
* Also, important to note that the total timeout interval must be
|
|
* greater than the SIP's select call timeout value which is 25msec.
|
|
* This is necessary to cover the case where the SIP message queue
|
|
* is full and the select timeout occurs.
|
|
*
|
|
* Total timeout interval = CPR_SND_TIMEOUT_WAIT_INTERVAL *
|
|
* CPR_ATTEMPTS_TO_SEND;
|
|
*/
|
|
|
|
|
|
/*
|
|
* Prototype declarations
|
|
*/
|
|
static cpr_msgq_post_result_e
|
|
cprPostMessage(cpr_msg_queue_t *msgq, void *msg, void **ppUserData);
|
|
static void
|
|
cprPegSendMessageStats(cpr_msg_queue_t *msgq, uint16_t numAttempts);
|
|
static cpr_msgq_post_result_e
|
|
cprPostExtendedQMsg(cpr_msg_queue_t *msgq, void *msg, void **ppUserData);
|
|
static void
|
|
cprMoveMsgToQueue(cpr_msg_queue_t *msgq);
|
|
|
|
/*
|
|
* Functions
|
|
*/
|
|
|
|
/**
|
|
* Creates a message queue
|
|
*
|
|
* @brief The cprCreateMessageQueue function is called to allow the OS to
|
|
* perform whatever work is needed to create a message queue.
|
|
|
|
* If the name is present, CPR should assign this name to the message queue to assist in
|
|
* debugging. The message queue depth is the second input parameter and is for
|
|
* setting the desired queue depth. This parameter may not be supported by all OS.
|
|
* Its primary intention is to set queue depth beyond the default queue depth
|
|
* limitation.
|
|
* On any OS where there is no limit on the message queue depth or
|
|
* its queue depth is sufficiently large then this parameter is ignored on that
|
|
* OS.
|
|
*
|
|
* @param[in] name - name of the message queue (optional)
|
|
* @param[in] depth - the message queue depth, optional field which should
|
|
* default if set to zero(0)
|
|
*
|
|
* @return Msg queue handle or NULL if init failed, errno should be provided
|
|
*
|
|
* @note the actual message queue depth will be bounded by the
|
|
* standard system message queue depth and CPR_MAX_MSG_Q_DEPTH.
|
|
* If 'depth' is outside of the bounds, the value will be
|
|
* reset automatically.
|
|
*/
|
|
cprMsgQueue_t
|
|
cprCreateMessageQueue (const char *name, uint16_t depth)
|
|
{
|
|
static const char fname[] = "cprCreateMessageQueue";
|
|
cpr_msg_queue_t *msgq;
|
|
key_t key;
|
|
static int key_id = 100; /* arbitrary starting number */
|
|
struct msqid_ds buf;
|
|
|
|
msgq =(cpr_msg_queue_t *)cpr_calloc(1, sizeof(cpr_msg_queue_t));
|
|
if (msgq == NULL) {
|
|
CPR_ERROR("%s: Malloc failed: %s\n", fname,
|
|
name ? name : unnamed_string);
|
|
errno = ENOMEM;
|
|
return NULL;
|
|
}
|
|
|
|
msgq->name = name ? name : unnamed_string;
|
|
|
|
/*
|
|
* Find a unique key
|
|
*/
|
|
key = ftok("/proc/self", key_id++);
|
|
CSFLogDebug(logTag, "key = %x\n", key);
|
|
|
|
if (key == -1) {
|
|
CPR_ERROR("%s: Key generation failed: %d\n", fname, errno);
|
|
cpr_free(msgq);
|
|
return NULL;
|
|
}
|
|
|
|
/*
|
|
* Set creation flag so that OS will create the message queue
|
|
*/
|
|
msgq->queueId = msgget(key, (IPC_EXCL | IPC_CREAT | 0666));
|
|
if (msgq->queueId == -1) {
|
|
if (errno == EEXIST) {
|
|
CSFLogDebug(logTag, "Q exists so first remove it and then create again\n");
|
|
/* Remove message queue */
|
|
msgq->queueId = msgget(key, (IPC_CREAT | 0666));
|
|
if (msgctl(msgq->queueId, IPC_RMID, &buf) == -1) {
|
|
|
|
CPR_ERROR("%s: Destruction failed: %s: %d\n", fname,
|
|
msgq->name, errno);
|
|
|
|
return NULL;
|
|
}
|
|
msgq->queueId = msgget(key, (IPC_CREAT | 0666));
|
|
}
|
|
} else {
|
|
CSFLogDebug(logTag, "there was no preexisting q..\n");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (msgq->queueId == -1) {
|
|
CPR_ERROR("%s: Creation failed: %s: %d\n", fname, name, errno);
|
|
if (errno == EEXIST) {
|
|
|
|
}
|
|
|
|
cpr_free(msgq);
|
|
return NULL;
|
|
}
|
|
CSFLogDebug(logTag, "create message q with id=%x\n", msgq->queueId);
|
|
|
|
/* flush the q before ?? */
|
|
|
|
/*
|
|
* Create mutex for extended (overflow) queue
|
|
*/
|
|
if (pthread_mutex_init(&msgq->mutex, NULL) != 0) {
|
|
CPR_ERROR("%s: Failed to create msg queue (%s) mutex: %d\n",
|
|
fname, name, errno);
|
|
(void) msgctl(msgq->queueId, IPC_RMID, &buf);
|
|
cpr_free(msgq);
|
|
return NULL;
|
|
}
|
|
|
|
/*
|
|
* Set the extended message queue depth (within bounds)
|
|
*/
|
|
if (depth > CPR_MAX_MSG_Q_DEPTH) {
|
|
CPR_INFO("%s: Depth too large (%d) reset to %d\n", fname, depth,
|
|
CPR_MAX_MSG_Q_DEPTH);
|
|
depth = CPR_MAX_MSG_Q_DEPTH;
|
|
}
|
|
|
|
if (depth < OS_MSGTQL) {
|
|
if (depth) {
|
|
CPR_INFO("%s: Depth too small (%d) reset to %d\n", fname, depth, OS_MSGTQL);
|
|
}
|
|
depth = OS_MSGTQL;
|
|
}
|
|
msgq->maxExtendedQDepth = depth - OS_MSGTQL;
|
|
|
|
/*
|
|
* Add message queue to list for statistics reporting
|
|
*/
|
|
pthread_mutex_lock(&msgQueueListMutex);
|
|
msgq->next = msgQueueList;
|
|
msgQueueList = msgq;
|
|
pthread_mutex_unlock(&msgQueueListMutex);
|
|
|
|
return msgq;
|
|
}
|
|
|
|
|
|
/**
|
|
* cprDestroyMessageQueue
|
|
* @brief Removes all messages from the queue and then destroy the message queue
|
|
*
|
|
* The cprDestroyMessageQueue function is called to destroy a message queue. The
|
|
* function drains any messages from the queue and the frees the
|
|
* message queue. Any messages on the queue are to be deleted, and not sent to the intended
|
|
* recipient. It is the application's responsibility to ensure that no threads are
|
|
* blocked on a message queue when it is destroyed.
|
|
*
|
|
* @param[in] msgQueue - message queue to destroy
|
|
*
|
|
* @return CPR_SUCCESS or CPR_FAILURE, errno should be provided in this case
|
|
*/
|
|
cprRC_t
|
|
cprDestroyMessageQueue (cprMsgQueue_t msgQueue)
|
|
{
|
|
static const char fname[] = "cprDestroyMessageQueue";
|
|
cpr_msg_queue_t *msgq;
|
|
void *msg;
|
|
struct msqid_ds buf;
|
|
CSFLogDebug(logTag, "Destroy message Q called..\n");
|
|
|
|
|
|
msgq = (cpr_msg_queue_t *) msgQueue;
|
|
if (msgq == NULL) {
|
|
/* Bad application! */
|
|
CPR_ERROR("%s: Invalid input\n", fname);
|
|
errno = EINVAL;
|
|
return CPR_FAILURE;
|
|
}
|
|
|
|
/* Drain message queue */
|
|
msg = cprGetMessage(msgQueue, FALSE, NULL);
|
|
while (msg != NULL) {
|
|
cpr_free(msg);
|
|
msg = cprGetMessage(msgQueue, FALSE, NULL);
|
|
}
|
|
|
|
/* Remove message queue from list */
|
|
pthread_mutex_lock(&msgQueueListMutex);
|
|
if (msgq == msgQueueList) {
|
|
msgQueueList = msgq->next;
|
|
} else {
|
|
cpr_msg_queue_t *msgql = msgQueueList;
|
|
|
|
while ((msgql->next != NULL) && (msgql->next != msgq)) {
|
|
msgql = msgql->next;
|
|
}
|
|
if (msgql->next == msgq) {
|
|
msgql->next = msgq->next;
|
|
}
|
|
}
|
|
pthread_mutex_unlock(&msgQueueListMutex);
|
|
|
|
/* Remove message queue */
|
|
if (msgctl(msgq->queueId, IPC_RMID, &buf) == -1) {
|
|
CPR_ERROR("%s: Destruction failed: %s: %d\n", fname,
|
|
msgq->name, errno);
|
|
return CPR_FAILURE;
|
|
}
|
|
|
|
/* Remove message queue mutex */
|
|
if (pthread_mutex_destroy(&msgq->mutex) != 0) {
|
|
CPR_ERROR("%s: Failed to destroy msg queue (%s) mutex: %d\n",
|
|
fname, msgq->name, errno);
|
|
}
|
|
|
|
cpr_free(msgq);
|
|
return CPR_SUCCESS;
|
|
}
|
|
|
|
|
|
/**
|
|
* cprSetMessageQueueThread
|
|
* @brief Associate a thread with the message queue
|
|
*
|
|
* This method is used by pSIPCC to associate a thread and a message queue.
|
|
* @param[in] msgQueue - msg queue to set
|
|
* @param[in] thread - CPR thread to associate with queue
|
|
*
|
|
* @return CPR_SUCCESS or CPR_FAILURE
|
|
*
|
|
* @note Nothing is done to prevent overwriting the thread ID
|
|
* when the value has already been set.
|
|
*/
|
|
cprRC_t
|
|
cprSetMessageQueueThread (cprMsgQueue_t msgQueue, cprThread_t thread)
|
|
{
|
|
static const char fname[] = "cprSetMessageQueueThread";
|
|
cpr_msg_queue_t *msgq;
|
|
|
|
if ((!msgQueue) || (!thread)) {
|
|
CPR_ERROR("%s: Invalid input\n", fname);
|
|
return CPR_FAILURE;
|
|
}
|
|
|
|
msgq = (cpr_msg_queue_t *) msgQueue;
|
|
if (msgq->thread != 0) {
|
|
CPR_ERROR("%s: over-writing previously msgq thread name for %s",
|
|
fname, msgq->name);
|
|
}
|
|
|
|
msgq->thread = cprGetThreadId(thread);
|
|
return CPR_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
* cprGetMessage
|
|
* @brief Retrieve a message from a particular message queue
|
|
*
|
|
* The cprGetMessage function retrieves the first message from the message queue
|
|
* specified and returns a void pointer to that message.
|
|
*
|
|
* @param[in] msgQueue - msg queue from which to retrieve the message. This
|
|
* is the handle returned from cprCreateMessageQueue.
|
|
* @param[in] waitForever - boolean to either wait forever (TRUE) or not
|
|
* wait at all (FALSE) if the msg queue is empty.
|
|
* @param[out] ppUserData - pointer to a pointer to user defined data. This
|
|
* will be NULL if no user data was present.
|
|
*
|
|
* @return Retrieved message buffer or NULL if failure occurred or
|
|
* the waitForever flag was set to false and no messages were
|
|
* on the queue.
|
|
*
|
|
* @note If ppUserData is defined, the value will be initialized to NULL
|
|
*/
|
|
void *
|
|
cprGetMessage (cprMsgQueue_t msgQueue, boolean waitForever, void **ppUserData)
|
|
{
|
|
static const char fname[] = "cprGetMessage";
|
|
struct msgbuffer rcvBuffer = { 0 };
|
|
struct msgbuffer *rcvMsg = &rcvBuffer;
|
|
void *buffer;
|
|
int msgrcvflags;
|
|
cpr_msg_queue_t *msgq;
|
|
|
|
/* Initialize ppUserData */
|
|
if (ppUserData) {
|
|
*ppUserData = NULL;
|
|
}
|
|
|
|
msgq = (cpr_msg_queue_t *) msgQueue;
|
|
if (msgq == NULL) {
|
|
/* Bad application! */
|
|
CPR_ERROR("%s: Invalid input\n", fname);
|
|
errno = EINVAL;
|
|
return NULL;
|
|
}
|
|
|
|
/*
|
|
* If waitForever is set, block on the message queue
|
|
* until a message is received.
|
|
*/
|
|
if (waitForever) {
|
|
msgrcvflags = 0;
|
|
} else {
|
|
msgrcvflags = IPC_NOWAIT;
|
|
}
|
|
|
|
if (msgrcv(msgq->queueId, rcvMsg,
|
|
sizeof(struct msgbuffer) - offsetof(struct msgbuffer, msgPtr),
|
|
0, msgrcvflags) == -1) {
|
|
if (!waitForever && errno == ENOMSG) {
|
|
CPR_INFO("%s: no message on queue %s (non-blocking receive "
|
|
" operation), returning\n", fname, msgq->name);
|
|
} else {
|
|
CPR_ERROR("%s: msgrcv for queue %s failed: %d\n",
|
|
fname, msgq->name, errno);
|
|
}
|
|
return NULL;
|
|
}
|
|
CPR_INFO("%s: msgrcv success for queue %s \n",fname, msgq->name);
|
|
|
|
(void) pthread_mutex_lock(&msgq->mutex);
|
|
/* Update statistics */
|
|
msgq->currentCount--;
|
|
(void) pthread_mutex_unlock(&msgq->mutex);
|
|
|
|
/*
|
|
* Pull out the data
|
|
*/
|
|
if (ppUserData) {
|
|
*ppUserData = rcvMsg->usrPtr;
|
|
}
|
|
buffer = rcvMsg->msgPtr;
|
|
|
|
/*
|
|
* If there are messages on the extended queue, attempt to
|
|
* push a message back onto the real system queue
|
|
*/
|
|
if (msgq->extendedQDepth) {
|
|
cprMoveMsgToQueue(msgq);
|
|
}
|
|
|
|
return buffer;
|
|
}
|
|
|
|
|
|
/**
|
|
* cprSendMessage
|
|
* @brief Place a message on a particular queue. Note that caller may
|
|
* block (see comments below)
|
|
*
|
|
* @param[in] msgQueue - msg queue on which to place the message
|
|
* @param[in] msg - pointer to the msg to place on the queue
|
|
* @param[in] ppUserData - pointer to a pointer to user defined data
|
|
*
|
|
* @return CPR_SUCCESS or CPR_FAILURE, errno should be provided
|
|
*
|
|
* @note 1. Messages queues are set to be non-blocking, those cases
|
|
* where the system call fails with a would-block error code
|
|
* (EAGAIN) the function will attempt other mechanisms described
|
|
* below.
|
|
* @note 2. If enabled with an extended message queue, either via a
|
|
* call to cprCreateMessageQueue with depth value or a call to
|
|
* cprSetExtendMessageQueueDepth() (when unit testing), the message
|
|
* will be added to the extended message queue and the call will
|
|
* return successfully. When room becomes available on the
|
|
* system's message queue, those messages will be added.
|
|
* @note 3. If the message queue becomes full and no space is availabe
|
|
* on the extended message queue, then the function will attempt
|
|
* to resend the message up to CPR_ATTEMPTS_TO_SEND and the
|
|
* calling thread will *BLOCK* CPR_SND_TIMEOUT_WAIT_INTERVAL
|
|
* milliseconds after each failed attempt. If unsuccessful
|
|
* after all attempts then EGAIN error code is returned.
|
|
* @note 4. This applies to all CPR threads, including the timer thread.
|
|
* So it is possible that the timer thread would be forced to
|
|
* sleep which would have the effect of delaying all active
|
|
* timers. The work to fix this rare situation is not considered
|
|
* worth the effort to fix....so just leaving as is.
|
|
*/
|
|
cprRC_t
|
|
cprSendMessage (cprMsgQueue_t msgQueue, void *msg, void **ppUserData)
|
|
{
|
|
static const char fname[] = "cprSendMessage";
|
|
static const char error_str[] = "%s: Msg not sent to %s queue: %s\n";
|
|
cpr_msgq_post_result_e rc;
|
|
cpr_msg_queue_t *msgq;
|
|
int16_t attemptsToSend = CPR_ATTEMPTS_TO_SEND;
|
|
uint16_t numAttempts = 0;
|
|
|
|
/* Bad application? */
|
|
if (msgQueue == NULL) {
|
|
CPR_ERROR(error_str, fname, "undefined", "invalid input");
|
|
errno = EINVAL;
|
|
return CPR_FAILURE;
|
|
}
|
|
|
|
msgq = (cpr_msg_queue_t *) msgQueue;
|
|
|
|
/*
|
|
* Attempt to send message
|
|
*/
|
|
do {
|
|
(void) pthread_mutex_lock(&msgq->mutex);
|
|
|
|
/*
|
|
* If in a queue overflow condition, post message to the
|
|
* extended queue; otherwise, post to normal message queue
|
|
*/
|
|
if (msgq->extendedQDepth) {
|
|
/*
|
|
* Check if extended queue is full, if not then
|
|
* attempt to add the message.
|
|
*/
|
|
if (msgq->extendedQDepth < msgq->maxExtendedQDepth) {
|
|
rc = cprPostExtendedQMsg(msgq, msg, ppUserData);
|
|
// do under lock to avoid races
|
|
if (rc == CPR_MSGQ_POST_SUCCESS) {
|
|
cprPegSendMessageStats(msgq, numAttempts);
|
|
} else {
|
|
msgq->sendErrors++;
|
|
}
|
|
(void) pthread_mutex_unlock(&msgq->mutex);
|
|
|
|
if (rc == CPR_MSGQ_POST_SUCCESS) {
|
|
return CPR_SUCCESS;
|
|
}
|
|
else
|
|
{
|
|
CPR_ERROR(error_str, fname, msgq->name, "no memory");
|
|
return CPR_FAILURE;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Even the extended message queue is full, so
|
|
* release the message queue mutex and use the
|
|
* re-try procedure.
|
|
*/
|
|
(void) pthread_mutex_unlock(&msgq->mutex);
|
|
|
|
/*
|
|
* If attempting to post to the calling thread's
|
|
* own message queue, the re-try procedure will
|
|
* not work. No options left...fail with an error.
|
|
*/
|
|
if (pthread_self() == msgq->thread) {
|
|
msgq->selfQErrors++;
|
|
msgq->sendErrors++;
|
|
CPR_ERROR(error_str, fname, msgq->name, "FULL");
|
|
return CPR_FAILURE;
|
|
}
|
|
} else {
|
|
/*
|
|
* Normal posting of message
|
|
*/
|
|
rc = cprPostMessage(msgq, msg, ppUserData);
|
|
|
|
/*
|
|
* Before releasing the mutex, check if the
|
|
* return code is 'pending' which means the
|
|
* system message queue is full
|
|
*/
|
|
if (rc == CPR_MSGQ_POST_PENDING) {
|
|
/*
|
|
* If the message queue has enabled the extended queue
|
|
* support, then attempt to add to the extended queue.
|
|
*/
|
|
if (msgq->maxExtendedQDepth) {
|
|
rc = cprPostExtendedQMsg(msgq, msg, ppUserData);
|
|
}
|
|
}
|
|
|
|
(void) pthread_mutex_unlock(&msgq->mutex);
|
|
|
|
if (rc == CPR_MSGQ_POST_SUCCESS) {
|
|
cprPegSendMessageStats(msgq, numAttempts);
|
|
return CPR_SUCCESS;
|
|
} else if (rc == CPR_MSGQ_POST_FAILED) {
|
|
CPR_ERROR("%s: Msg not sent to %s queue: %d\n",
|
|
fname, msgq->name, errno);
|
|
msgq->sendErrors++;
|
|
/*
|
|
* If posting to calling thread's own queue,
|
|
* then peg the self queue error.
|
|
*/
|
|
if (pthread_self() == msgq->thread) {
|
|
msgq->selfQErrors++;
|
|
}
|
|
|
|
return CPR_FAILURE;
|
|
}
|
|
/*
|
|
* Else pending due to a full message queue
|
|
* and the extended queue has not been enabled,
|
|
* so just use the re-try attempts.
|
|
*/
|
|
}
|
|
|
|
/*
|
|
* Did not succeed in sending the message, so continue
|
|
* to attempt up to the CPR_ATTEMPTS_TO_SEND.
|
|
*/
|
|
attemptsToSend--;
|
|
if (attemptsToSend > 0) {
|
|
/*
|
|
* Force a context-switch of the thread attempting to
|
|
* send the message, in order to help the case where
|
|
* the msg queue is full and the owning thread may get
|
|
* a a chance be scheduled so it can drain it (Note:
|
|
* no guarantees, more of a "last-ditch effort" to
|
|
* recover...especially when temporarily over-whelmed).
|
|
*/
|
|
cprSleep(CPR_SND_TIMEOUT_WAIT_INTERVAL);
|
|
msgq->reTries++;
|
|
numAttempts++;
|
|
}
|
|
} while (attemptsToSend > 0);
|
|
|
|
CPR_ERROR(error_str, fname, msgq->name, "FULL");
|
|
msgq->sendErrors++;
|
|
return CPR_FAILURE;
|
|
}
|
|
|
|
/**
|
|
* @}
|
|
* @addtogroup MsgQIPCHelper Internal Helper functions for MsgQ
|
|
* @ingroup IPC
|
|
* @brief Helper functions used by CPR to implement the Message Queue IPC APIs
|
|
* @{
|
|
*/
|
|
|
|
/**
|
|
* cprPegSendMessageStats
|
|
* @brief Peg the statistics for successfully posting a message
|
|
*
|
|
* @param[in] msgq - message queue
|
|
* @param[in] numAttempts - number of attempts to post message to message queue
|
|
*
|
|
* @return none
|
|
*
|
|
* @pre (msgq != NULL)
|
|
*/
|
|
static void
|
|
cprPegSendMessageStats (cpr_msg_queue_t *msgq, uint16_t numAttempts)
|
|
{
|
|
/*
|
|
* Collect statistics
|
|
*/
|
|
msgq->totalCount++;
|
|
if (msgq->currentCount > msgq->maxCount) {
|
|
msgq->maxCount = msgq->currentCount;
|
|
}
|
|
|
|
if (numAttempts > msgq->highAttempts) {
|
|
msgq->highAttempts = numAttempts;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* cprPostMessage
|
|
* @brief Post message to system message queue
|
|
*
|
|
* @param[in] msgq - message queue
|
|
* @param[in] msg - message to post
|
|
* @param[in] ppUserData - ptr to ptr to option user data
|
|
*
|
|
* @return the post result which is CPR_MSGQ_POST_SUCCESS,
|
|
* CPR_MSGQ_POST_FAILURE or CPR_MSGQ_POST_PENDING
|
|
*
|
|
* @pre (msgq != NULL)
|
|
* @pre (msg != NULL)
|
|
*/
|
|
static cpr_msgq_post_result_e
|
|
cprPostMessage (cpr_msg_queue_t *msgq, void *msg, void **ppUserData)
|
|
{
|
|
struct msgbuffer mbuf;
|
|
|
|
/*
|
|
* Put msg user wants to send into a CNU msg buffer
|
|
* Copy the address of the msg buffer into the mtext
|
|
* portion of the message.
|
|
*/
|
|
mbuf.mtype = CPR_IPC_MSG;
|
|
mbuf.msgPtr = msg;
|
|
|
|
if (ppUserData != NULL) {
|
|
mbuf.usrPtr = *ppUserData;
|
|
} else {
|
|
mbuf.usrPtr = NULL;
|
|
}
|
|
|
|
/*
|
|
* Send message buffer
|
|
*/
|
|
if (msgsnd(msgq->queueId, &mbuf,
|
|
sizeof(struct msgbuffer) - offsetof(struct msgbuffer, msgPtr),
|
|
IPC_NOWAIT) != -1) {
|
|
msgq->currentCount++;
|
|
return CPR_MSGQ_POST_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
* If msgsnd system call would block, handle separately;
|
|
* otherwise a real system error.
|
|
*/
|
|
if (errno == EAGAIN) {
|
|
return CPR_MSGQ_POST_PENDING;
|
|
}
|
|
|
|
return CPR_MSGQ_POST_FAILED;
|
|
}
|
|
|
|
/**
|
|
* cprPostExtendedQMsg
|
|
* @brief Post message to internal extended message queue
|
|
*
|
|
* @param[in] msgq - message queue
|
|
* @param[in] msg - message to post
|
|
* @param[in] ppUserData - ptr to ptr to option user data
|
|
*
|
|
* @return the post result which is CPR_MSGQ_POST_SUCCESS or
|
|
* CPR_MSGQ_POST_FAILURE if no memory available
|
|
*
|
|
* @pre (msgq != NULL)
|
|
* @pre (msg != NULL)
|
|
* @pre (msgq->mutex has been locked)
|
|
* @pre (msgq->extendedQDepth < msgq->maxExtendedQDepth)
|
|
*
|
|
* @todo Could use cpr_chunk_malloc to pre-allocate all of the nodes
|
|
* but that does have the consequence of allocating memory that
|
|
* may not be necessary
|
|
*/
|
|
static cpr_msgq_post_result_e
|
|
cprPostExtendedQMsg (cpr_msg_queue_t *msgq, void *msg, void **ppUserData)
|
|
{
|
|
cpr_msgq_node_t *node;
|
|
|
|
/*
|
|
* Allocate new message queue node
|
|
*/
|
|
node = cpr_malloc(sizeof(*node));
|
|
if (!node) {
|
|
errno = ENOMEM;
|
|
return CPR_MSGQ_POST_FAILED;
|
|
}
|
|
|
|
/*
|
|
* Fill in data
|
|
*/
|
|
node->msg = msg;
|
|
if (ppUserData != NULL) {
|
|
node->pUserData = *ppUserData;
|
|
} else {
|
|
node->pUserData = NULL;
|
|
}
|
|
|
|
/*
|
|
* Push onto list
|
|
*/
|
|
node->prev = NULL;
|
|
node->next = msgq->head;
|
|
msgq->head = node;
|
|
|
|
if (node->next) {
|
|
node->next->prev = node;
|
|
}
|
|
|
|
if (msgq->tail == NULL) {
|
|
msgq->tail = node;
|
|
}
|
|
msgq->extendedQDepth++;
|
|
msgq->currentCount++;
|
|
|
|
return CPR_MSGQ_POST_SUCCESS;
|
|
}
|
|
|
|
|
|
/**
|
|
* cprMoveMsgToQueue
|
|
* @brief Move message from extended internal queue to system message queue
|
|
*
|
|
* @param[in] msgq - the message queue
|
|
*
|
|
* @return none
|
|
*
|
|
* @pre (msgq != NULL)
|
|
* @pre (msgq->extendedQDepth > 0)
|
|
*/
|
|
static void
|
|
cprMoveMsgToQueue (cpr_msg_queue_t *msgq)
|
|
{
|
|
static const char *fname = "cprMoveMsgToQueue";
|
|
cpr_msgq_post_result_e rc;
|
|
cpr_msgq_node_t *node;
|
|
|
|
(void) pthread_mutex_lock(&msgq->mutex);
|
|
|
|
if (!msgq->tail) {
|
|
/* the linked list is bad...ignore it */
|
|
CPR_ERROR("%s: MsgQ (%s) list is corrupt", fname, msgq->name);
|
|
(void) pthread_mutex_unlock(&msgq->mutex);
|
|
return;
|
|
}
|
|
|
|
node = msgq->tail;
|
|
|
|
rc = cprPostMessage(msgq, node->msg, &node->pUserData);
|
|
if (rc == CPR_MSGQ_POST_SUCCESS) {
|
|
/*
|
|
* Remove node from extended list
|
|
*/
|
|
msgq->tail = node->prev;
|
|
if (msgq->tail) {
|
|
msgq->tail->next = NULL;
|
|
}
|
|
if (msgq->head == node) {
|
|
msgq->head = NULL;
|
|
}
|
|
msgq->extendedQDepth--;
|
|
/*
|
|
* Fix increase in the current count which was incremented
|
|
* in cprPostMessage but not really an addition.
|
|
*/
|
|
msgq->currentCount--;
|
|
}
|
|
|
|
(void) pthread_mutex_unlock(&msgq->mutex);
|
|
|
|
if (rc == CPR_MSGQ_POST_SUCCESS) {
|
|
cpr_free(node);
|
|
} else {
|
|
CPR_ERROR("%s: Failed to repost msg on %s queue: %d\n",
|
|
fname, msgq->name, errno);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @}
|
|
*
|
|
* @addtogroup MsgQIPCAPIs The Message Queue IPC APIs
|
|
* @{
|
|
*/
|
|
|
|
/**
|
|
* cprGetDepth
|
|
*
|
|
* @brief get depth of a message queue
|
|
*
|
|
* The pSIPCC uses this API to look at the depth of a message queue for internal
|
|
* routing and throttling decision
|
|
*
|
|
* @param[in] msgQueue - message queue
|
|
*
|
|
* @return depth of msgQueue
|
|
*
|
|
* @pre (msgQueue != NULL)
|
|
*/
|
|
uint16_t cprGetDepth (cprMsgQueue_t msgQueue)
|
|
{
|
|
cpr_msg_queue_t *msgq;
|
|
msgq = (cpr_msg_queue_t *) msgQueue;
|
|
return msgq->currentCount;
|
|
}
|
|
|