diff --git a/apps/app_externalivr.c b/apps/app_externalivr.c index 3a3644923..1dd1fc43e 100644 --- a/apps/app_externalivr.c +++ b/apps/app_externalivr.c @@ -457,9 +457,7 @@ static int app_exec(struct ast_channel *chan, const char *data) ivr_desc.local_address.sin_family = AF_INET; ivr_desc.local_address.sin_port = htons(port); memcpy(&ivr_desc.local_address.sin_addr.s_addr, hp.hp.h_addr, hp.hp.h_length); - ser = ast_tcptls_client_start(&ivr_desc); - - if (!ser) { + if (!(ser = ast_tcptls_client_create(&ivr_desc)) || !(ser = ast_tcptls_client_start(ser))) { goto exit; } res = eivr_comm(chan, u, ser->fd, ser->fd, -1, pipe_delim_args, flags); diff --git a/channels/chan_sip.c b/channels/chan_sip.c index e15b5fb06..50756fae0 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -2111,13 +2111,26 @@ struct sip_registry { char lastmsg[256]; /*!< Last Message sent/received */ }; +enum sip_tcptls_alert { + /*! \brief There is new data to be sent out */ + TCPTLS_ALERT_DATA, + /*! \brief A request to stop the tcp_handler thread */ + TCPTLS_ALERT_STOP, +}; + +struct tcptls_packet { + AST_LIST_ENTRY(tcptls_packet) entry; + struct ast_str *data; + size_t len; +}; /*! \brief Definition of a thread that handles a socket */ struct sip_threadinfo { int stop; + int alert_pipe[2]; /*! Used to alert tcptls thread when packet is ready to be written */ pthread_t threadid; struct ast_tcptls_session_instance *tcptls_session; enum sip_transport type; /*!< We keep a copy of the type here so we can display it in the connection list */ - AST_LIST_ENTRY(sip_threadinfo) list; + AST_LIST_HEAD_NOLOCK(, tcptls_packet) packet_q; }; /*! \brief Definition of an MWI subscription to another server */ @@ -2151,8 +2164,8 @@ static int hash_dialog_size = 563; static int hash_user_size = 563; #endif -/*! \brief The thread list of TCP threads */ -static AST_LIST_HEAD_STATIC(threadl, sip_threadinfo); +/*! \brief The table of TCP threads */ +static struct ao2_container *threadt; /*! \brief The peer list: Users, Peers and Friends */ static struct ao2_container *peers; @@ -2244,6 +2257,21 @@ static int peer_ipcmp_cb(void *obj, void *arg, int flags) return peer->addr.sin_port == peer2->addr.sin_port ? (CMP_MATCH | CMP_STOP) : 0; } + +static int threadt_hash_cb(const void *obj, const int flags) +{ + const struct sip_threadinfo *th = obj; + + return (int) th->tcptls_session->remote_address.sin_addr.s_addr; +} + +static int threadt_cmp_cb(void *obj, void *arg, int flags) +{ + struct sip_threadinfo *th = obj, *th2 = arg; + + return (th->tcptls_session == th2->tcptls_session) ? CMP_MATCH | CMP_STOP : 0; +} + /*! * \note The only member of the dialog used here callid string */ @@ -2890,6 +2918,130 @@ static struct ast_variable *copy_vars(struct ast_variable *src) return res; } +static void tcptls_packet_destructor(void *obj) +{ + struct tcptls_packet *packet = obj; + + ast_free(packet->data); +} + +static void sip_tcptls_client_args_destructor(void *obj) +{ + struct ast_tcptls_session_args *args = obj; + if (args->tls_cfg) { + ast_free(args->tls_cfg->certfile); + ast_free(args->tls_cfg->pvtfile); + ast_free(args->tls_cfg->cipher); + ast_free(args->tls_cfg->cafile); + ast_free(args->tls_cfg->capath); + } + ast_free(args->tls_cfg); + ast_free((char *) args->name); +} + +static void sip_threadinfo_destructor(void *obj) +{ + struct sip_threadinfo *th = obj; + struct tcptls_packet *packet; + if (th->alert_pipe[1] > -1) { + close(th->alert_pipe[0]); + } + if (th->alert_pipe[1] > -1) { + close(th->alert_pipe[1]); + } + th->alert_pipe[0] = th->alert_pipe[1] = -1; + + while ((packet = AST_LIST_REMOVE_HEAD(&th->packet_q, entry))) { + ao2_t_ref(packet, -1, "thread destruction, removing packet from frame queue"); + } + + if (th->tcptls_session) { + ao2_t_ref(th->tcptls_session, -1, "remove tcptls_session for sip_threadinfo object"); + } +} + +/*! \brief creates a sip_threadinfo object and links it into the threadt table. */ +static struct sip_threadinfo *sip_threadinfo_create(struct ast_tcptls_session_instance *tcptls_session, int transport) +{ + struct sip_threadinfo *th; + + if (!tcptls_session || !(th = ao2_alloc(sizeof(*th), sip_threadinfo_destructor))) { + return NULL; + } + + th->alert_pipe[0] = th->alert_pipe[1] = -1; + + if (pipe(th->alert_pipe) == -1) { + ao2_t_ref(th, -1, "Failed to open alert pipe on sip_threadinfo"); + ast_log(LOG_ERROR, "Could not create sip alert pipe in tcptls thread, error %s\n", strerror(errno)); + return NULL; + } + ao2_t_ref(tcptls_session, +1, "tcptls_session ref for sip_threadinfo object"); + th->tcptls_session = tcptls_session; + th->type = transport ? transport : (tcptls_session->ssl ? SIP_TRANSPORT_TLS: SIP_TRANSPORT_TCP); + ao2_t_link(threadt, th, "Adding new tcptls helper thread"); + ao2_t_ref(th, -1, "Decrementing threadinfo ref from alloc, only table ref remains"); + return th; +} + +/*! \brief used to indicate to a tcptls thread that data is ready to be written */ +static int sip_tcptls_write(struct ast_tcptls_session_instance *tcptls_session, const void *buf, size_t len) +{ + int res = len; + struct sip_threadinfo *th = NULL; + struct tcptls_packet *packet = NULL; + struct sip_threadinfo tmp = { + .tcptls_session = tcptls_session, + }; + enum sip_tcptls_alert alert = TCPTLS_ALERT_DATA; + + if (!tcptls_session) { + return XMIT_ERROR; + } + + ast_mutex_lock(&tcptls_session->lock); + + if ((tcptls_session->fd == -1) || + !(th = ao2_t_find(threadt, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread")) || + !(packet = ao2_alloc(sizeof(*packet), tcptls_packet_destructor)) || + !(packet->data = ast_str_create(len))) { + goto tcptls_write_setup_error; + } + + /* goto tcptls_write_error should _NOT_ be used beyond this point */ + ast_str_set(&packet->data, 0, "%s", (char *) buf); + packet->len = len; + + /* alert tcptls thread handler that there is a packet to be sent. + * must lock the thread info object to guarantee control of the + * packet queue */ + ao2_lock(th); + if (write(th->alert_pipe[1], &alert, sizeof(alert)) == -1) { + ast_log(LOG_ERROR, "write() to alert pipe failed: %s\n", strerror(errno)); + ao2_t_ref(packet, -1, "could not write to alert pipe, remove packet"); + packet = NULL; + res = XMIT_ERROR; + } else { /* it is safe to queue the frame after issuing the alert when we hold the threadinfo lock */ + AST_LIST_INSERT_TAIL(&th->packet_q, packet, entry); + } + ao2_unlock(th); + + ast_mutex_unlock(&tcptls_session->lock); + ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo object after finding it"); + return res; + +tcptls_write_setup_error: + if (th) { + ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo obj, could not create packet"); + } + if (packet) { + ao2_t_ref(packet, -1, "could not allocate packet's data"); + } + ast_mutex_unlock(&tcptls_session->lock); + + return XMIT_ERROR; +} + /*! \brief SIP TCP connection handler */ static void *sip_tcp_worker_fn(void *data) { @@ -2905,26 +3057,45 @@ static void *_sip_tcp_helper_thread(struct sip_pvt *pvt, struct ast_tcptls_sessi { int res, cl; struct sip_request req = { 0, } , reqcpy = { 0, }; - struct sip_threadinfo *me; + struct sip_threadinfo *me = NULL; char buf[1024] = ""; + struct pollfd fds[2] = { { 0 }, { 0 }, }; + struct ast_tcptls_session_args *ca = NULL; - me = ast_calloc(1, sizeof(*me)); + /* If this is a server session, then the connection has already been setup, + * simply create the threadinfo object so we can access this thread for writing. + * + * if this is a client connection more work must be done. + * 1. We own the parent session args for a client connection. This pointer needs + * to be held on to so we can decrement it's ref count on thread destruction. + * 2. The threadinfo object was created before this thread was launched, however + * it must be found within the threadt table. + * 3. Last, the tcptls_session must be started. + */ + if (!tcptls_session->client) { + if (!(me = sip_threadinfo_create(tcptls_session, tcptls_session->ssl ? SIP_TRANSPORT_TLS : SIP_TRANSPORT_TCP))) { + goto cleanup; + } + ao2_t_ref(me, +1, "Adding threadinfo ref for tcp_helper_thread"); + } else { + struct sip_threadinfo tmp = { + .tcptls_session = tcptls_session, + }; - if (!me) - goto cleanup2; + if ((!(ca = tcptls_session->parent)) || + (!(me = ao2_t_find(threadt, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread"))) || + (!(tcptls_session = ast_tcptls_client_start(tcptls_session)))) { + goto cleanup; + } + } me->threadid = pthread_self(); - me->tcptls_session = tcptls_session; - if (tcptls_session->ssl) - me->type = SIP_TRANSPORT_TLS; - else - me->type = SIP_TRANSPORT_TCP; - ast_debug(2, "Starting thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP"); - AST_LIST_LOCK(&threadl); - AST_LIST_INSERT_TAIL(&threadl, me, list); - AST_LIST_UNLOCK(&threadl); + /* set up pollfd to watch for reads on both the socket and the alert_pipe */ + fds[0].fd = tcptls_session->fd; + fds[1].fd = me->alert_pipe[0]; + fds[0].events = fds[1].events = POLLIN | POLLPRI; if (!(req.data = ast_str_create(SIP_MIN_PACKET))) goto cleanup; @@ -2934,81 +3105,120 @@ static void *_sip_tcp_helper_thread(struct sip_pvt *pvt, struct ast_tcptls_sessi for (;;) { struct ast_str *str_save; - str_save = req.data; - memset(&req, 0, sizeof(req)); - req.data = str_save; - ast_str_reset(req.data); - - str_save = reqcpy.data; - memset(&reqcpy, 0, sizeof(reqcpy)); - reqcpy.data = str_save; - ast_str_reset(reqcpy.data); - - memset(buf, 0, sizeof(buf)); - - if (tcptls_session->ssl) { - set_socket_transport(&req.socket, SIP_TRANSPORT_TLS); - req.socket.port = htons(ourport_tls); - } else { - set_socket_transport(&req.socket, SIP_TRANSPORT_TCP); - req.socket.port = htons(ourport_tcp); - } - req.socket.fd = tcptls_session->fd; - res = ast_wait_for_input(tcptls_session->fd, -1); + res = ast_poll(fds, 2, -1); /* polls for both socket and alert_pipe */ if (res < 0) { ast_debug(2, "SIP %s server :: ast_wait_for_input returned %d\n", tcptls_session->ssl ? "SSL": "TCP", res); goto cleanup; } - /* Read in headers one line at a time */ - while (req.len < 4 || strncmp(REQ_OFFSET_TO_STR(&req, len - 4), "\r\n\r\n", 4)) { - ast_mutex_lock(&tcptls_session->lock); - if (!fgets(buf, sizeof(buf), tcptls_session->f)) { - ast_mutex_unlock(&tcptls_session->lock); - goto cleanup; + /* handle the socket event, check for both reads from the socket fd, + * and writes from alert_pipe fd */ + if (fds[0].revents) { /* there is data on the socket to be read */ + + fds[0].revents = 0; + + /* clear request structure */ + str_save = req.data; + memset(&req, 0, sizeof(req)); + req.data = str_save; + ast_str_reset(req.data); + + str_save = reqcpy.data; + memset(&reqcpy, 0, sizeof(reqcpy)); + reqcpy.data = str_save; + ast_str_reset(reqcpy.data); + + memset(buf, 0, sizeof(buf)); + + if (tcptls_session->ssl) { + set_socket_transport(&req.socket, SIP_TRANSPORT_TLS); + req.socket.port = htons(ourport_tls); + } else { + set_socket_transport(&req.socket, SIP_TRANSPORT_TCP); + req.socket.port = htons(ourport_tcp); } - ast_mutex_unlock(&tcptls_session->lock); - if (me->stop) - goto cleanup; - ast_str_append(&req.data, 0, "%s", buf); - req.len = req.data->used; - } - copy_request(&reqcpy, &req); - parse_request(&reqcpy); - /* In order to know how much to read, we need the content-length header */ - if (sscanf(get_header(&reqcpy, "Content-Length"), "%30d", &cl)) { - while (cl > 0) { - size_t bytes_read; + req.socket.fd = tcptls_session->fd; + + /* Read in headers one line at a time */ + while (req.len < 4 || strncmp(REQ_OFFSET_TO_STR(&req, len - 4), "\r\n\r\n", 4)) { ast_mutex_lock(&tcptls_session->lock); - if (!(bytes_read = fread(buf, 1, MIN(sizeof(buf) - 1, cl), tcptls_session->f))) { + if (!fgets(buf, sizeof(buf), tcptls_session->f)) { ast_mutex_unlock(&tcptls_session->lock); goto cleanup; } - buf[bytes_read] = '\0'; ast_mutex_unlock(&tcptls_session->lock); if (me->stop) - goto cleanup; - cl -= strlen(buf); + goto cleanup; ast_str_append(&req.data, 0, "%s", buf); req.len = req.data->used; } - } - /*! \todo XXX If there's no Content-Length or if the content-length and what - we receive is not the same - we should generate an error */ + copy_request(&reqcpy, &req); + parse_request(&reqcpy); + /* In order to know how much to read, we need the content-length header */ + if (sscanf(get_header(&reqcpy, "Content-Length"), "%30d", &cl)) { + while (cl > 0) { + size_t bytes_read; + ast_mutex_lock(&tcptls_session->lock); + if (!(bytes_read = fread(buf, 1, MIN(sizeof(buf) - 1, cl), tcptls_session->f))) { + ast_mutex_unlock(&tcptls_session->lock); + goto cleanup; + } + buf[bytes_read] = '\0'; + ast_mutex_unlock(&tcptls_session->lock); + if (me->stop) + goto cleanup; + cl -= strlen(buf); + ast_str_append(&req.data, 0, "%s", buf); + req.len = req.data->used; + } + } + /*! \todo XXX If there's no Content-Length or if the content-length and what + we receive is not the same - we should generate an error */ - req.socket.tcptls_session = tcptls_session; - handle_request_do(&req, &tcptls_session->remote_address); + req.socket.tcptls_session = tcptls_session; + handle_request_do(&req, &tcptls_session->remote_address); + } + + if (fds[1].revents) { /* alert_pipe indicates there is data in the send queue to be sent */ + enum sip_tcptls_alert alert; + struct tcptls_packet *packet; + + fds[1].revents = 0; + + if (read(me->alert_pipe[0], &alert, sizeof(alert)) == -1) { + ast_log(LOG_ERROR, "read() failed: %s\n", strerror(errno)); + continue; + } + + switch (alert) { + case TCPTLS_ALERT_STOP: + goto cleanup; + case TCPTLS_ALERT_DATA: + ao2_lock(me); + if (!(packet = AST_LIST_REMOVE_HEAD(&me->packet_q, entry))) { + ast_log(LOG_WARNING, "TCPTLS thread alert_pipe indicated packet should be sent, but frame_q is empty"); + } else if (ast_tcptls_server_write(tcptls_session, ast_str_buffer(packet->data), packet->len) == -1) { + ast_log(LOG_WARNING, "Failure to write to tcp/tls socket\n"); + } + + if (packet) { + ao2_t_ref(packet, -1, "tcptls packet sent, this is no longer needed"); + } + ao2_unlock(me); + break; + default: + ast_log(LOG_ERROR, "Unknown tcptls thread alert '%d'\n", alert); + } + } } + ast_debug(2, "Shutting down thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP"); + cleanup: - AST_LIST_LOCK(&threadl); - AST_LIST_REMOVE(&threadl, me, list); - AST_LIST_UNLOCK(&threadl); - ast_free(me); -cleanup2: - fclose(tcptls_session->f); - tcptls_session->f = NULL; - tcptls_session->fd = -1; + if (me) { + ao2_t_unlink(threadt, me, "Removing tcptls helper thread, thread is closing"); + ao2_t_ref(me, -1, "Removing tcp_helper_threads threadinfo ref"); + } if (reqcpy.data) { ast_free(reqcpy.data); } @@ -3018,12 +3228,27 @@ cleanup2: req.data = NULL; } - ast_debug(2, "Shutting down thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP"); - + /* if client, we own the parent session arguments and must decrement ref */ + if (ca) { + ao2_t_ref(ca, -1, "closing tcptls thread, getting rid of client tcptls_session arguments"); + } - ao2_ref(tcptls_session, -1); - tcptls_session = NULL; + if (tcptls_session) { + ast_mutex_lock(&tcptls_session->lock); + if (tcptls_session->f) { + fclose(tcptls_session->f); + tcptls_session->f = NULL; + } + if (tcptls_session->fd != -1) { + close(tcptls_session->fd); + tcptls_session->fd = -1; + } + tcptls_session->parent = NULL; + ast_mutex_unlock(&tcptls_session->lock); + ao2_ref(tcptls_session, -1); + tcptls_session = NULL; + } return NULL; } @@ -3480,26 +3705,15 @@ static int __sip_xmit(struct sip_pvt *p, struct ast_str *data, int len) if (sip_prepare_socket(p) < 0) return XMIT_ERROR; - if (p->socket.tcptls_session) - ast_mutex_lock(&p->socket.tcptls_session->lock); - - if (p->socket.type & SIP_TRANSPORT_UDP) { + if (p->socket.type == SIP_TRANSPORT_UDP) { res = sendto(p->socket.fd, data->str, len, 0, (const struct sockaddr *)dst, sizeof(struct sockaddr_in)); } else if (p->socket.tcptls_session) { - if (p->socket.tcptls_session->f) { - res = ast_tcptls_server_write(p->socket.tcptls_session, data->str, len); - } else { - ast_debug(2, "No p->socket.tcptls_session->f len=%d\n", len); - return XMIT_ERROR; - } + res = sip_tcptls_write(p->socket.tcptls_session, data->str, len); } else { ast_debug(2, "Socket type is TCP but no tcptls_session is present to write to\n"); return XMIT_ERROR; } - if (p->socket.tcptls_session) - ast_mutex_unlock(&p->socket.tcptls_session->lock); - if (res == -1) { switch (errno) { case EBADF: /* Bad file descriptor - seems like this is generated when the host exist, but doesn't accept the UDP packet */ @@ -12233,6 +12447,11 @@ static int expire_register(const void *data) destroy_association(peer); /* remove registration data from storage */ set_socket_transport(&peer->socket, peer->default_outbound_transport); + if (peer->socket.tcptls_session) { + ao2_ref(peer->socket.tcptls_session, -1); + peer->socket.tcptls_session = NULL; + } + manager_event(EVENT_FLAG_SYSTEM, "PeerStatus", "ChannelType: SIP\r\nPeer: SIP/%s\r\nPeerStatus: Unregistered\r\nCause: Expired\r\n", peer->name); register_peer_exten(peer, FALSE); /* Remove regexten */ ast_devstate_changed(AST_DEVICE_UNKNOWN, "SIP/%s", peer->name); @@ -14732,6 +14951,7 @@ static const char *cli_yesno(int x) static char *sip_show_tcp(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) { struct sip_threadinfo *th; + struct ao2_iterator i; #define FORMAT2 "%-30.30s %3.6s %9.9s %6.6s\n" #define FORMAT "%-30.30s %-6d %-9.9s %-6.6s\n" @@ -14751,15 +14971,16 @@ static char *sip_show_tcp(struct ast_cli_entry *e, int cmd, struct ast_cli_args return CLI_SHOWUSAGE; ast_cli(a->fd, FORMAT2, "Host", "Port", "Transport", "Type"); - AST_LIST_LOCK(&threadl); - AST_LIST_TRAVERSE(&threadl, th, list) { + + i = ao2_iterator_init(threadt, 0); + while ((th = ao2_t_iterator_next(&i, "iterate through tcp threads for 'sip show tcp'"))) { ast_cli(a->fd, FORMAT, ast_inet_ntoa(th->tcptls_session->remote_address.sin_addr), ntohs(th->tcptls_session->remote_address.sin_port), get_transport(th->type), (th->tcptls_session->client ? "Client" : "Server")); - + ao2_t_ref(th, -1, "decrement ref from iterator"); } - AST_LIST_UNLOCK(&threadl); + return CLI_SUCCESS; #undef FORMAT #undef FORMAT2 @@ -22678,6 +22899,18 @@ static int sip_standard_port(enum sip_transport type, int port) return port == STANDARD_SIP_PORT; } +static int threadinfo_locate_cb(void *obj, void *arg, int flags) +{ + struct sip_threadinfo *th = obj; + struct sockaddr_in *s = arg; + + if (!inaddrcmp(&th->tcptls_session->remote_address, s)) { + return CMP_MATCH | CMP_STOP; + } + + return 0; +} + /*! * \brief Find thread for TCP/TLS session (based on IP/Port * @@ -22688,16 +22921,10 @@ static struct ast_tcptls_session_instance *sip_tcp_locate(struct sockaddr_in *s) struct sip_threadinfo *th; struct ast_tcptls_session_instance *tcptls_instance = NULL; - AST_LIST_LOCK(&threadl); - AST_LIST_TRAVERSE(&threadl, th, list) { - if ((s->sin_family == th->tcptls_session->remote_address.sin_family) && - (s->sin_addr.s_addr == th->tcptls_session->remote_address.sin_addr.s_addr) && - (s->sin_port == th->tcptls_session->remote_address.sin_port)) { - tcptls_instance = (ao2_ref(th->tcptls_session, +1), th->tcptls_session); - break; - } + if ((th = ao2_callback(threadt, 0, threadinfo_locate_cb, s))) { + tcptls_instance = (ao2_ref(th->tcptls_session, +1), th->tcptls_session); + ao2_t_ref(th, -1, "decrement ref from callback"); } - AST_LIST_UNLOCK(&threadl); return tcptls_instance; } @@ -22707,14 +22934,23 @@ static int sip_prepare_socket(struct sip_pvt *p) { struct sip_socket *s = &p->socket; static const char name[] = "SIP socket"; + struct sip_threadinfo *th; struct ast_tcptls_session_instance *tcptls_session; - struct ast_tcptls_session_args ca = { + struct ast_tcptls_session_args tmp_ca = { .name = name, .accept_fd = -1, }; + struct ast_tcptls_session_args *ca; - if (s->fd != -1) - return s->fd; /* This socket is already active */ + /* check to see if a socket is already active */ + if ((s->fd != -1) && (s->type == SIP_TRANSPORT_UDP)) { + return s->fd; + } + if ((s->type & (SIP_TRANSPORT_TCP | SIP_TRANSPORT_TLS)) && + (s->tcptls_session) && + (s->tcptls_session->fd != -1)) { + return s->tcptls_session->fd; + } /*! \todo Check this... This might be wrong, depending on the proxy configuration If proxy is in "force" mode its correct. @@ -22723,14 +22959,23 @@ static int sip_prepare_socket(struct sip_pvt *p) s->type = p->outboundproxy->transport; } - if (s->type & SIP_TRANSPORT_UDP) { + if (s->type == SIP_TRANSPORT_UDP) { s->fd = sipsock; return s->fd; } - ca.remote_address = *(sip_real_dst(p)); + /* At this point we are dealing with a TCP/TLS connection + * 1. We need to check to see if a connectin thread exists + * for this address, if so use that. + * 2. If a thread does not exist for this address, but the tcptls_session + * exists on the socket, the connection was closed. + * 3. If no tcptls_session thread exists for the address, and no tcptls_session + * already exists on the socket, create a new one and launch a new thread. + */ - if ((tcptls_session = sip_tcp_locate(&ca.remote_address))) { /* Check if we have a thread handling a socket connected to this IP/port */ + /* 1. check for existing threads */ + tmp_ca.remote_address = *(sip_real_dst(p)); + if ((tcptls_session = sip_tcp_locate(&tmp_ca.remote_address))) { s->fd = tcptls_session->fd; if (s->tcptls_session) { ao2_ref(s->tcptls_session, -1); @@ -22738,46 +22983,82 @@ static int sip_prepare_socket(struct sip_pvt *p) } s->tcptls_session = tcptls_session; return s->fd; + /* 2. Thread not found, if tcptls_session already exists, it once had a thread and is now terminated */ + } else if (s->tcptls_session) { + return s->fd; /* XXX whether reconnection is ever necessary here needs to be investigated further */ } - if (s->tcptls_session && s->tcptls_session->parent->tls_cfg) { - ca.tls_cfg = s->tcptls_session->parent->tls_cfg; - } else { - if (s->type & SIP_TRANSPORT_TLS) { - ca.tls_cfg = ast_calloc(1, sizeof(*ca.tls_cfg)); - if (!ca.tls_cfg) - return -1; - memcpy(ca.tls_cfg, &default_tls_cfg, sizeof(*ca.tls_cfg)); - if (!ast_strlen_zero(p->tohost)) - ast_copy_string(ca.hostname, p->tohost, sizeof(ca.hostname)); + /* 3. Create a new TCP/TLS client connection */ + /* create new session arguments for the client connection */ + if (!(ca = ao2_alloc(sizeof(*ca), sip_tcptls_client_args_destructor)) || + !(ca->name = ast_strdup(name))) { + goto create_tcptls_session_fail; + } + ca->accept_fd = -1; + ca->remote_address = *(sip_real_dst(p)); + /* if type is TLS, we need to create a tls cfg for this session arg */ + if (s->type == SIP_TRANSPORT_TLS) { + if (!(ca->tls_cfg = ast_calloc(1, sizeof(*ca->tls_cfg)))) { + goto create_tcptls_session_fail; + } + memcpy(ca->tls_cfg, &default_tls_cfg, sizeof(*ca->tls_cfg)); + + if (!(ca->tls_cfg->certfile = ast_strdup(default_tls_cfg.certfile)) || + !(ca->tls_cfg->pvtfile = ast_strdup(default_tls_cfg.pvtfile)) || + !(ca->tls_cfg->cipher = ast_strdup(default_tls_cfg.cipher)) || + !(ca->tls_cfg->cafile = ast_strdup(default_tls_cfg.cafile)) || + !(ca->tls_cfg->capath = ast_strdup(default_tls_cfg.capath))) { + + goto create_tcptls_session_fail; + } + + /* this host is used as the common name in ssl/tls */ + if (!ast_strlen_zero(p->tohost)) { + ast_copy_string(ca->hostname, p->tohost, sizeof(ca->hostname)); } } - - if (s->tcptls_session) { - /* the pvt socket already has a server instance ... */ - } else { - s->tcptls_session = ast_tcptls_client_start(&ca); /* Start a client connection to this address */ + + /* Create a client connection for address, this does not start the connection, just sets it up. */ + if (!(s->tcptls_session = ast_tcptls_client_create(ca))) { + goto create_tcptls_session_fail; } - if (!s->tcptls_session) { - if (ca.tls_cfg) - ast_free(ca.tls_cfg); - return -1; + s->fd = s->tcptls_session->fd; + + /* client connections need to have the sip_threadinfo object created before + * the thread is detached. This ensures the alert_pipe is up before it will + * be used. Note that this function links the new threadinfo object into the + * threadt container. */ + if (!(th = sip_threadinfo_create(s->tcptls_session, s->type))) { + goto create_tcptls_session_fail; } - s->fd = ca.accept_fd; - - /* Give the new thread a reference */ + /* Give the new thread a reference to the tcptls_session */ ao2_ref(s->tcptls_session, +1); - if (ast_pthread_create_background(&ca.master, NULL, sip_tcp_worker_fn, s->tcptls_session)) { - ast_debug(1, "Unable to launch '%s'.", ca.name); - ao2_ref(s->tcptls_session, -1); - close(ca.accept_fd); - s->fd = ca.accept_fd = -1; + if (ast_pthread_create_background(&ca->master, NULL, sip_tcp_worker_fn, s->tcptls_session)) { + ast_debug(1, "Unable to launch '%s'.", ca->name); + ao2_ref(s->tcptls_session, -1); /* take away the thread ref we just gave it */ + goto create_tcptls_session_fail; } return s->fd; + +create_tcptls_session_fail: + if (ca) { + ao2_t_ref(ca, -1, "failed to create client, getting rid of client tcptls_session arguments"); + } + if (s->tcptls_session) { + close(tcptls_session->fd); + s->fd = tcptls_session->fd = -1; + ao2_ref(s->tcptls_session, -1); + s->tcptls_session = NULL; + } + if (th) { + ao2_t_unlink(threadt, th, "Removing tcptls thread info object, thread failed to open"); + } + + return -1; } /*! @@ -26362,6 +26643,7 @@ static int load_module(void) peers = ao2_t_container_alloc(hash_peer_size, peer_hash_cb, peer_cmp_cb, "allocate peers"); peers_by_ip = ao2_t_container_alloc(hash_peer_size, peer_iphash_cb, peer_ipcmp_cb, "allocate peers_by_ip"); dialogs = ao2_t_container_alloc(hash_dialog_size, dialog_hash_cb, dialog_cmp_cb, "allocate dialogs"); + threadt = ao2_t_container_alloc(hash_dialog_size, threadt_hash_cb, threadt_cmp_cb, "allocate threadt table"); ASTOBJ_CONTAINER_INIT(®l); /* Registry object list -- not searched for anything */ ASTOBJ_CONTAINER_INIT(&submwil); /* MWI subscription object list */ @@ -26492,17 +26774,15 @@ static int unload_module(void) ast_tcptls_server_stop(&sip_tls_desc); /* Kill all existing TCP/TLS threads */ - AST_LIST_LOCK(&threadl); - AST_LIST_TRAVERSE_SAFE_BEGIN(&threadl, th, list) { + i = ao2_iterator_init(threadt, 0); + while ((th = ao2_t_iterator_next(&i, "iterate through tcp threads for 'sip show tcp'"))) { pthread_t thread = th->threadid; th->stop = 1; - AST_LIST_UNLOCK(&threadl); pthread_kill(thread, SIGURG); pthread_join(thread, NULL); - AST_LIST_LOCK(&threadl); + ao2_t_ref(th, -1, "decrement ref from iterator"); } - AST_LIST_TRAVERSE_SAFE_END; - AST_LIST_UNLOCK(&threadl); + ao2_iterator_destroy(&i); /* Hangup all dialogs if they have an owner */ i = ao2_iterator_init(dialogs, 0); @@ -26555,6 +26835,7 @@ static int unload_module(void) ao2_t_ref(peers, -1, "unref the peers table"); ao2_t_ref(peers_by_ip, -1, "unref the peers_by_ip table"); ao2_t_ref(dialogs, -1, "unref the dialogs table"); + ao2_t_ref(threadt, -1, "unref the thread table"); clear_sip_domains(); close(sipsock); diff --git a/include/asterisk/tcptls.h b/include/asterisk/tcptls.h index b6cc9a31b..ad0438583 100644 --- a/include/asterisk/tcptls.h +++ b/include/asterisk/tcptls.h @@ -156,12 +156,14 @@ struct ast_tcptls_session_instance { #define LEN_T size_t #endif -/*! - * \brief A generic client routine for a TCP client - * and starts a thread for handling accept() - * \version 1.6.1 changed desc parameter to be of ast_tcptls_session_args type - */ -struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_args *desc); +/*! + * \brief attempts to connect and start tcptls session, on error the tcptls_session's + * ref count is decremented, fd and file are closed, and NULL is returned. + */ +struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_instance *tcptls_session); + +/* \brief Creates a client connection's ast_tcptls_session_instance. */ +struct ast_tcptls_session_instance *ast_tcptls_client_create(struct ast_tcptls_session_args *desc); void *ast_tcptls_server_root(void *); diff --git a/main/tcptls.c b/main/tcptls.c index e92209b3c..05e59f600 100644 --- a/main/tcptls.c +++ b/main/tcptls.c @@ -125,7 +125,7 @@ static void session_instance_destructor(void *obj) * * \note must decrement ref count before returning NULL on error */ -static void *handle_tls_connection(void *data) +static void *handle_tcptls_connection(void *data) { struct ast_tcptls_session_instance *tcptls_session = data; #ifdef DO_SSL @@ -197,6 +197,7 @@ static void *handle_tls_connection(void *data) ast_log(LOG_ERROR, "Certificate common name did not match (%s)\n", tcptls_session->parent->hostname); if (peer) X509_free(peer); + close(tcptls_session->fd); fclose(tcptls_session->f); ao2_ref(tcptls_session, -1); return NULL; @@ -266,7 +267,7 @@ void *ast_tcptls_server_root(void *data) tcptls_session->client = 0; /* This thread is now the only place that controls the single ref to tcptls_session */ - if (ast_pthread_create_detached_background(&launched, NULL, handle_tls_connection, tcptls_session)) { + if (ast_pthread_create_detached_background(&launched, NULL, handle_tcptls_connection, tcptls_session)) { ast_log(LOG_WARNING, "Unable to launch helper thread: %s\n", strerror(errno)); close(tcptls_session->fd); ao2_ref(tcptls_session, -1); @@ -357,9 +358,45 @@ int ast_ssl_setup(struct ast_tls_config *cfg) return __ssl_setup(cfg, 0); } -struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_args *desc) +struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_instance *tcptls_session) { + struct ast_tcptls_session_args *desc; int flags; + + if (!(desc = tcptls_session->parent)) { + goto client_start_error; + } + + if (connect(desc->accept_fd, (const struct sockaddr *) &desc->remote_address, sizeof(desc->remote_address))) { + ast_log(LOG_ERROR, "Unable to connect %s to %s:%d: %s\n", + desc->name, + ast_inet_ntoa(desc->remote_address.sin_addr), ntohs(desc->remote_address.sin_port), + strerror(errno)); + goto client_start_error; + } + + flags = fcntl(desc->accept_fd, F_GETFL); + fcntl(desc->accept_fd, F_SETFL, flags & ~O_NONBLOCK); + + if (desc->tls_cfg) { + desc->tls_cfg->enabled = 1; + __ssl_setup(desc->tls_cfg, 1); + } + + return handle_tcptls_connection(tcptls_session); + +client_start_error: + close(desc->accept_fd); + desc->accept_fd = -1; + if (tcptls_session) { + ao2_ref(tcptls_session, -1); + } + return NULL; + +} + +struct ast_tcptls_session_instance *ast_tcptls_client_create(struct ast_tcptls_session_args *desc) +{ int x = 1; struct ast_tcptls_session_instance *tcptls_session = NULL; @@ -394,39 +431,16 @@ struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_se } } - if (connect(desc->accept_fd, (const struct sockaddr *) &desc->remote_address, sizeof(desc->remote_address))) { - ast_log(LOG_ERROR, "Unable to connect %s to %s:%d: %s\n", - desc->name, - ast_inet_ntoa(desc->remote_address.sin_addr), ntohs(desc->remote_address.sin_port), - strerror(errno)); - goto error; - } - if (!(tcptls_session = ao2_alloc(sizeof(*tcptls_session), session_instance_destructor))) goto error; ast_mutex_init(&tcptls_session->lock); - - flags = fcntl(desc->accept_fd, F_GETFL); - fcntl(desc->accept_fd, F_SETFL, flags & ~O_NONBLOCK); - + tcptls_session->client = 1; tcptls_session->fd = desc->accept_fd; tcptls_session->parent = desc; tcptls_session->parent->worker_fn = NULL; memcpy(&tcptls_session->remote_address, &desc->remote_address, sizeof(tcptls_session->remote_address)); - tcptls_session->client = 1; - - if (desc->tls_cfg) { - desc->tls_cfg->enabled = 1; - __ssl_setup(desc->tls_cfg, 1); - } - - /* handle_tls_connection controls the single ref to tcptls_session. If - * tcptls_session returns NULL then the session has been destroyed */ - if (!(tcptls_session = handle_tls_connection(tcptls_session))) - goto error; - return tcptls_session; error: