/* * This file is part of the Sofia-SIP package * * Copyright (C) 2005 Nokia Corporation. * * Contact: Pekka Pessi * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA * */ /**@ingroup su_wait * @CFILE su_root.c * OS-independent synchronization interface. * @internal * * @author Pekka Pessi * * @date Created: Tue Sep 14 15:51:04 1999 ppessi */ #include "config.h" #include #include #include #include #include #include "sofia-sip/su.h" #if SU_HAVE_PTHREADS #include #endif struct su_root_s; #define SU_ROOT_MAGIC_T struct su_root_magic_s #define SU_WAKEUP_ARG_T struct su_wakeup_arg_s #define SU_TIMER_ARG_T struct su_timer_arg_s #include "su_port.h" #include "sofia-sip/su_alloc.h" /**@ingroup su_wait * * @page su_root_t Tasks and root objects * * A task is the basic execution unit for the Sofia event-driven programming * model. According to the model, the program can ask that the event loop * invokes a callback function when a certain event occurs. Such events * include @ref su_root_register "I/O activity", @ref su_timer_t "timers" or * a @ref su_msg_t "message" from other task. The event loop is run with * function su_root_run() or su_root_step(). * * Root object gives access to the task control. The root object represents * the task to the code running within task. Through the root, the task code * can access its context object (magic) and thread-synchronization features * like wait objects, timers, and messages. * * When a message is sent between tasks, a task reference #su_task_r is used * to reprent the task address. Reference counting is used to make sure that * the task references stay valid. * * The public API contains following functions: * - su_root_create() [Do not call from cloned task] * - su_root_destroy() [Do not call from cloned task] * - su_root_magic() * - su_root_register() * - su_root_deregister() * - su_root_unregister() * - su_root_threading() * - su_root_run() [Do not call from cloned task] * - su_root_break() [Do not call from cloned task] * - su_root_step() [Do not call from cloned task] * - su_root_task() * * New tasks can be created via su_clone_start() function. */ /**@ingroup su_wait * * @page su_root_register Registering Wait Objects * * When application expects I/O events, it can create a wait object and * register it, a callback function and a context pointer to the #su_root_t * object using the su_root_register() function. Whenever the wait object * receives an event, the registered @link ::su_wakeup_f callback function * @endlink is invoked. * * When successful, the su_root_register() returns an small non-negative * integer representing the registration. The registration can be * manipulated with su_root_eventmask() function, for instance, when sending * through a socket block, the application can add SU_WAIT_OUT event to the * mask. * * The registration can be removed using su_root_deregister() function. */ /**@ingroup su_wait * * Contains hint of number of sockets supported by su_root_t */ int su_root_size_hint = 64; /* ========================================================================= * Tasks */ su_task_r const su_task_null = SU_TASK_R_INIT; #define SU_TASK_ZAP(t, f) \ while (t->sut_port) { \ su_port_decref(t->sut_port, #f); t->sut_port = NULL; break; } #define SU_TASK_ZAPP(t, f) \ do { if (t->sut_port) { \ su_port_decref(t->sut_port, #f); t->sut_port = NULL; } \ t->sut_root = NULL; } while(0) /** * Initialize a task handle with su_task_null. * * @param task task handle * * @return A reference to the initialized task handle. */ _su_task_r su_task_init(su_task_r task) { assert(task); memset(task, 0, sizeof(task)); return task; } /** * Destroy a task handle * * @param task task handle */ void su_task_deinit(su_task_r task) { assert(task); SU_TASK_ZAP(task, su_task_deinit); task->sut_root = NULL; } /** * Create a new task handle. * * @param task task reference * @param root pointer to root object * @param port pointer to port object * * @return New task handle. */ _su_task_r su_task_new(su_task_r task, su_root_t *root, su_port_t *port) { assert(task); task->sut_root = root; if ((task->sut_port = port)) { su_port_incref(port, "su_task_new"); } return task; } /** * Duplicates a task handle. * * @param dst destination task reference * @param src source task reference */ void su_task_copy(su_task_r dst, su_task_r const src) { su_port_t *port; assert(src); assert(dst); SU_TASK_ZAP(dst, su_task_copy); port = src->sut_port; if (port) { su_port_incref(port, "su_task_copy"); } dst[0] = src[0]; } #define SU_TASK_COPY(d, s, by) (void)((d)[0]=(s)[0], \ (s)->sut_port?(void)su_port_incref(s->sut_port, #by):(void)0) /** * Moves a task handle. * * @param dst destination task reference * @param src source task reference */ void su_task_move(su_task_r dst, su_task_r src) { SU_TASK_ZAP(dst, su_task_move); dst[0] = src[0]; src->sut_port = 0; src->sut_root = 0; } /** * Compare two tasks with each other. * * @param a First task * @param b Second task * * @retval negative number, if a < b * @retval positive number, if a > b * @retval 0, if a == b. */ int su_task_cmp(su_task_r const a, su_task_r const b) { intptr_t retval = (char *)a->sut_port - (char *)b->sut_port; if (retval == 0) retval = (char *)a->sut_root - (char *)b->sut_root; if (sizeof(retval) != sizeof(int)) { if (retval < 0) retval = -1; else if (retval > 0) retval = 1; } return (int)retval; } /** * Tests if a task is running. * * @param task task handle * * @retval true (nonzero) if task is not stopped, * @retval zero if it is null or stopped. */ int su_task_is_running(su_task_r const task) { return task && task->sut_port && task->sut_root; } /** @internal * Attach a root object to the task handle. * * @param self task handle * @param root pointer to the root object * * @retval 0 if successful, * @retval -1 otherwise. */ int su_task_attach(su_task_r self, su_root_t *root) { if (self->sut_port) { self->sut_root = root; return 0; } else return -1; } /** * Get root pointer attached to a task handle. * * @param self task handle * * @return * A pointer to root object attached to the task handle, or NULL if no root * object has been attached. */ su_root_t *su_task_root(su_task_r const self) { if (self->sut_port) return self->sut_root; else return NULL; } #if 0 /** @internal * Detach a root pointer from task handle. * @bug Not used anymore. */ int su_task_detach(su_task_r self) { self->sut_root = NULL; return 0; } #endif /** * Return the timer list associated with given task. * * @param task task handle * * @return A timer list of the task. If there are no timers, it returns * NULL. */ su_timer_queue_t *su_task_timers(su_task_r const task) { return task ? su_port_timers(task->sut_port) : NULL; } /** Execute the @a function by @a task thread. * * @retval 0 if successful * @retval -1 upon an error */ int su_task_execute(su_task_r const task, int (*function)(void *), void *arg, int *return_value) { if (function == NULL) return (errno = EFAULT), -1; if (!su_port_own_thread(task->sut_port)) { return su_port_execute(task, function, arg, return_value); } else { int value = function(arg); if (return_value) *return_value = value; return 0; } } _su_task_r su_task_new(su_task_r task, su_root_t *root, su_port_t *port); int su_task_attach(su_task_r self, su_root_t *root); int su_task_detach(su_task_r self); int su_timer_reset_all(su_timer_t **t0, su_task_r); /* Note that is *not* necessary same as su_root_t, * as su_root_t can be extended */ #define sur_port sur_task->sut_port #define sur_root sur_task->sut_root #define SU_ROOT_OWN_THREAD(r) (su_port_own_thread(r->sur_port)) /** Create a reactor object. * * Allocate and initialize the instance of su_root_t. * * @param magic pointer to user data * * @return A pointer to allocated su_root_t instance, NULL on error. */ su_root_t *su_root_create(su_root_magic_t *magic) { return su_root_create_with_port(magic, su_port_create()); } /**@internal * * Create a reactor object using given message port. * * Allocate and initialize the instance of su_root_t. Note that this * function always uses a reference to su_port_t, even when creating the * root fails. * * @param magic pointer to user data * @param port pointer to a message port * * @return A pointer to allocated su_root_t instance, NULL on error. */ su_root_t *su_root_create_with_port(su_root_magic_t *magic, su_port_t *port) { su_root_t *self; if (!port) return NULL; self = su_salloc(su_port_home(port), sizeof(struct su_root_s)); if (self) { self->sur_magic = magic; #if SU_HAVE_PTHREADS self->sur_threading = SU_HAVE_PTHREADS; #else self->sur_threading = 0; #endif /* This one creates a new reference to port */ su_task_new(self->sur_task, self, port); /* ... so we zap the old one below */ } su_port_decref(port, "su_root_create_with_port"); return self; } /** Destroy a root object. * * Stop and free an instance of su_root_t * * @param self pointer to a root object. */ void su_root_destroy(su_root_t *self) { su_port_t *port; int unregistered, reset; if (!self) return; assert(SU_ROOT_OWN_THREAD(self)); self->sur_deiniting = 1; if (self->sur_deinit) { su_root_deinit_f deinit = self->sur_deinit; su_root_magic_t *magic = self->sur_magic; self->sur_deinit = NULL; deinit(self, magic); } port = self->sur_port; assert(port); unregistered = su_port_unregister_all(port, self); reset = su_timer_reset_all(su_task_timers(self->sur_task), self->sur_task); if (unregistered || reset) SU_DEBUG_1(("su_root_destroy: " "%u registered waits, %u timers\n", unregistered, reset)); SU_TASK_ZAP(self->sur_parent, su_root_destroy); su_free(su_port_home(port), self); su_port_decref(port, "su_root_destroy"); } /** Get instance name. * * @param self pointer to a root object * * @return Instance name (e.g., "epoll", "devpoll", "select"). * * @NEW_1_12_6 */ char const *su_root_name(su_root_t *self) { if (self && self->sur_task->sut_port) return su_port_name(self->sur_task->sut_port); else return NULL; } /** Set the context pointer. * * Set the context pointer (magic) of a root object. * * @param self pointer to a root object * @param magic pointer to user data * * @retval 0 when successful, * @retval -1 upon error. */ int su_root_set_magic(su_root_t *self, su_root_magic_t *magic) { assert(SU_ROOT_OWN_THREAD(self)); if (self) { self->sur_magic = magic; } return 0; } /** Set threading option. * * Controls whether su_clone_start() creates a new thread. * * @param self pointer to a root object * @param enable if true, enable threading, if false, disable threading * * @return True if threading is enabled. */ int su_root_threading(su_root_t *self, int enable) { if (self) { assert(SU_ROOT_OWN_THREAD(self)); #if SU_HAVE_PTHREADS self->sur_threading = enable = enable != 0; return enable; #endif } return 0; } /** Get context pointer. * * The function su_root_magic() returns the user context pointer that was * given input to su_root_create() or su_root_set_magic(). * * @param self pointer to a root object * * @return A pointer to user data */ su_root_magic_t *su_root_magic(su_root_t *self) { return self ? self->sur_magic : NULL; } /** Get a GSource */ struct _GSource *su_root_gsource(su_root_t *self) { return self ? su_port_gsource(self->sur_port) : NULL; } /** Register a su_wait_t object. * * The function su_root_register() registers a su_wait_t object. The wait * object, a callback function and a argument are stored to the root * object. The callback function is called, when the wait object is * signaled. * * Please note if identical wait objects are inserted, only first one is * ever signalled. * * @param self pointer to root object * @param wait pointer to wait object * @param callback callback function pointer * @param arg argument given to callback function when it is invoked * @param priority relative priority of the wait object * (0 is normal, 1 important, 2 realtime) * * @return Nonzero index of the wait object, or -1 upon an error. */ int su_root_register(su_root_t *self, su_wait_t *wait, su_wakeup_f callback, su_wakeup_arg_t *arg, int priority) { assert(self && self->sur_port); if (!self || !self->sur_port) return -1; return su_port_register(self->sur_port, self, wait, callback, arg, priority); } /** Unregister a su_wait_t object. * * The function su_root_unregister() unregisters a su_wait_t object. The * wait object, a callback function and a argument are removed from the * root object. * * @param self pointer to root object * @param wait pointer to wait object * @param callback callback function pointer (may be NULL) * @param arg argument given to callback function when it is invoked * (may be NULL) * * @return Nonzero index of the wait object, or -1 upon an error. */ int su_root_unregister(su_root_t *self, su_wait_t *wait, su_wakeup_f callback, /* XXX - ignored */ su_wakeup_arg_t *arg) { assert(self && self->sur_port); if (!self || !self->sur_port) return -1; return su_port_unregister(self->sur_port, self, wait, callback, arg); } /** Remove a su_wait_t registration. * * The function su_root_deregister() deregisters a su_wait_t object. The * wait object, a callback function and a argument are removed from the * root object. The wait object is destroyed. * * @param self pointer to root object * @param i registration index * * @return Index of the wait object, or -1 upon an error. */ int su_root_deregister(su_root_t *self, int i) { if (i == 0 || i == -1) return -1; assert(self && self->sur_port); if (!self || !self->sur_port) return -1; return su_port_deregister(self->sur_port, i); } /** Set mask for a registered event. * * The function su_root_eventmask() sets the mask describing events that can * signal the registered callback. * * @param self pointer to root object * @param index registration index * @param socket socket * @param events new event mask * * @retval 0 when successful, * @retval -1 upon an error. */ int su_root_eventmask(su_root_t *self, int index, int socket, int events) { assert(self && self->sur_port); if (!self || !self->sur_port) return -1; return su_port_eventmask(self->sur_port, index, socket, events); } /** Set multishot mode. * * The function su_root_multishot() enables, disables or queries the * multishot mode for the root. The multishot mode determines how the events * are scheduled by root. If multishot mode is enabled, root serves all the * sockets that have received network events. If it is disables, only first * socket event is served. * * @param self pointer to root object * @param multishot multishot mode (0 => disables, 1 => enables, -1 => query) * * @retval 0 multishot mode is disabled * @retval 1 multishot mode is enabled * @retval -1 an error occurred */ int su_root_multishot(su_root_t *self, int multishot) { if (self && self->sur_port) { return su_port_multishot(self->sur_port, multishot); } else { return (errno = EINVAL), -1; } } /** Run event and message loop. * * The function su_root_run() runs the root main loop. The root loop waits * for wait objects and the timers associated with the root object. When any * wait object is signaled or timer is expired, it invokes the callbacks, * and returns waiting. * * This function returns when su_root_break() is called from a callback. * * @param self pointer to root object * */ void su_root_run(su_root_t *self) { assert(self && self->sur_port); if (self && self->sur_port) su_port_run(self->sur_port); } /** Terminate event loop. * * The function su_root_break() is used to terminate execution of * su_root_run(). It can be called from a callback function. * * @param self pointer to root object */ void su_root_break(su_root_t *self) { assert(self && self->sur_port); if (self && self->sur_port) su_port_break(self->sur_port); } /** Process events, timers and messages. * * The function su_root_step() waits for wait objects and the timers * associated with the root object. When any wait object is signaled or * timer is expired, it invokes the callbacks. * * This function returns when a callback has been invoked or tout * milliseconds is elapsed. * * @param self pointer to root object * @param tout timeout in milliseconds * * @return Milliseconds to the next invocation of timer, or SU_WAIT_FOREVER * if there are no active timers. */ su_duration_t su_root_step(su_root_t *self, su_duration_t tout) { assert(self && self->sur_port); return su_port_step(self->sur_port, tout); } /**Run event and message loop for given duration. * * The function su_root_sleep() runs event loop for @a duration milliseconds. * The event loop waits for wait objects and the timers associated with the * @a root object. When any wait object is signaled, timer is expired, or * message is received, it invokes the callbacks and returns waiting. * * @param self pointer to root object * @param duration milliseconds to run event loop */ su_duration_t su_root_sleep(su_root_t *self, su_duration_t duration) { su_duration_t retval, accrued = 0; su_time_t started = su_now(); assert(self && self->sur_port); do { retval = su_port_step(self->sur_port, duration - accrued); accrued = su_duration(su_now(), started); } while (accrued < duration); return retval; } /** Check wait events in callbacks that take lots of time * * This function does a 0 timeout poll() and runs wait objects * * @param self pointer to root object */ int su_root_yield(su_root_t *self) { if (self && self->sur_task[0].sut_port) { su_virtual_port_t *port = (su_virtual_port_t *)self->sur_task[0].sut_port; /* Make sure we have su_port_wait_events extension */ if (port->sup_vtable->su_vtable_size >= offsetof(su_port_vtable_t, su_port_wait_events) && port->sup_vtable->su_port_wait_events) return port->sup_vtable-> su_port_wait_events(self->sur_task[0].sut_port, 0); /* Make sure we have su_port_yield extension */ if (port->sup_vtable->su_vtable_size >= offsetof(su_port_vtable_t, su_port_yield) && port->sup_vtable->su_port_yield) return port->sup_vtable-> su_port_yield(self->sur_task[0].sut_port); } errno = EINVAL; return -1; } /** Get task reference. * * The function su_root_task() is used to retrieve the task reference * (PId) related with the root object. * * @param self a pointer to a root object * * @return The function su_root_task() returns a reference to the task * object. */ _su_task_r su_root_task(su_root_t const *self) { if (self) return self->sur_task; else return su_task_null; } /** Get parent task reference. * * The function su_root_parent() is used to retrieve the task reference * (PId) of the parent task. * * @param self a pointer to a root object * * @return The function su_root_parent() returns a reference to the parent * task object. */ _su_task_r su_root_parent(su_root_t const *self) { if (self) return self->sur_parent; else return su_task_null; } /** Add a pre-poll callback. */ int su_root_add_prepoll(su_root_t *root, su_prepoll_f *callback, su_prepoll_magic_t *magic) { if (root == NULL || root->sur_port == NULL) return -1; return su_port_add_prepoll(root->sur_port, root, callback, magic); } /** Remove a pre-poll callback */ int su_root_remove_prepoll(su_root_t *root) { if (root == NULL || root->sur_port == NULL) return -1; return su_port_remove_prepoll(root->sur_port, root); } /* ========================================================================= * Messages */ /** * Allocates a message of given size. * * The function @c su_msg_create() allocates a message with given data size. * If successful, it moves the new message handle to the @c rmsg. * * @param rmsg handle to the new message (may be uninitialized prior calling) * @param to the recipient task * @param from the sender task * @param wakeup function that is called when message is delivered * @param size size of the message data * * @retval 0 if successful, * @retval -1 if message allocation fails. */ int su_msg_create(su_msg_r rmsg, su_task_r const to, su_task_r const from, su_msg_f wakeup, isize_t size) { su_msg_t *msg; msg = su_zalloc(NULL, sizeof(*msg) + size); if (msg) { msg->sum_size = sizeof(*msg) + size; SU_TASK_COPY(msg->sum_to, to, su_msg_create); SU_TASK_COPY(msg->sum_from, from, su_msg_create); msg->sum_func = wakeup; *rmsg = msg; return 0; } *rmsg = NULL; return -1; } /** Add a delivery report function to a message. * * The delivery report funcgtion gets called by the sending task after the * message was delivered and the message function was executed. (The * su_root_t message delivery loop calls su_msg_delivery_report() * */ int su_msg_report(su_msg_r msg, su_msg_f report) { if (msg && msg[0] && msg[0]->sum_report == NULL) { msg[0]->sum_report = report; return 0; } return -1; } /** * Allocates a reply message of given size. * * @param reply handle to the new message (may be uninitialized prior calling) * @param msg the incoming message * @param wakeup function that is called when message is delivered * @param size size of the message data * * @retval 0 if successful, * @retval -1 otherwise. */ int su_msg_reply(su_msg_r reply, su_msg_cr msg, su_msg_f wakeup, isize_t size) { su_msg_r msg0; assert(msg != reply); *msg0 = *(su_msg_t **) msg; *reply = NULL; return su_msg_create(reply, su_msg_from(msg0), su_msg_to(msg0), wakeup, size); } /** Send a delivery report. * * If the sender has attached a delivery report function to message with * su_msg_report(), the message is returned to the message queue of the * sending task. The sending task calls the delivery report function when it * has received the message. */ void su_msg_delivery_report(su_msg_r msg) { su_task_r swap; if (!msg || !msg[0]) return; if (!msg[0]->sum_report) { su_msg_destroy(msg); return; } *swap = *msg[0]->sum_from; *msg[0]->sum_from = *msg[0]->sum_to; *msg[0]->sum_to = *swap; msg[0]->sum_func = msg[0]->sum_report; msg[0]->sum_report = NULL; su_msg_send(msg); } /** Save a message. */ void su_msg_save(su_msg_r save, su_msg_r msg) { if (save) { if (msg) save[0] = msg[0]; else save[0] = NULL; } if (msg) msg[0] = NULL; } /** * Destroys an unsent message. * * @param rmsg message handle. */ void su_msg_destroy(su_msg_r rmsg) { assert(rmsg); if (rmsg[0]) { /* su_port_t *port = rmsg[0]->sum_to->sut_port; */ /* su_port_incref(port, "su_msg_destroy"); */ SU_TASK_ZAP(rmsg[0]->sum_to, su_msg_destroy); SU_TASK_ZAP(rmsg[0]->sum_from, su_msg_destroy); su_free(NULL /* port->sup_home */, rmsg[0]); /* SU_PORT_UNLOCK(port, su_msg_destroy); */ /* su_port_decref(port, "su_msg_destroy"); */ } rmsg[0] = NULL; } /** Gets a pointer to the message data area. * * The function @c su_msg_data() returns a pointer to the message data * area. If @c rmsg contains a @c NULL handle, or message size is 0, @c NULL * pointer is returned. * * @param rmsg message handle * * @return A pointer to the message data area is returned. */ su_msg_arg_t *su_msg_data(su_msg_cr rmsg) { if (rmsg[0] && rmsg[0]->sum_size > sizeof(su_msg_t)) return rmsg[0]->sum_data; else return NULL; } /** Get size of message data area. */ isize_t su_msg_size(su_msg_cr rmsg) { return rmsg[0] ? rmsg[0]->sum_size - sizeof(su_msg_t) : 0; } /** Get sending task. * * Returns the task handle belonging to the sender of the message. * * If the message handle contains NULL the function @c su_msg_from * returns NULL. * * @param msg message handle * * @return The task handle of the sender is returned. */ _su_task_r su_msg_from(su_msg_cr msg) { return msg[0] ? msg[0]->sum_from : NULL; } /** Get destination task. * * The function @c su_msg_from returns the task handle belonging to the * recipient of the message. * * If the message handle contains NULL the function @c su_msg_to * returns NULL. * * @param msg message handle * * @return The task handle of the recipient is returned. */ _su_task_r su_msg_to(su_msg_cr msg) { return msg[0] ? msg[0]->sum_to : NULL; } /** Remove references to 'from' and 'to' tasks from a message. * * @param msg message handle */ void su_msg_remove_refs(su_msg_cr msg) { if (msg[0]) { su_task_deinit(msg[0]->sum_to); su_task_deinit(msg[0]->sum_from); } } /**Send a message. * * The function @c su_msg_send() sends the message. The message is added to * the recipients message queue, and recipient is waken up. The caller may * not alter the message or the data associated with it after the message * has been sent. * * @param rmsg message handle * * @retval 0 if signal was sent successfully or handle was @c NULL, * @retval -1 otherwise. */ int su_msg_send(su_msg_r rmsg) { assert(rmsg); if (rmsg[0]) { su_msg_t *msg = rmsg[0]; assert(msg->sum_to->sut_port); return su_port_send(msg->sum_to->sut_port, rmsg); } return 0; }