From 4f1b09a3ca9e6ac1d88fc2c002bf9facbd32ddc0 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Fri, 25 Jul 2014 00:23:08 +0500 Subject: [PATCH] add switch_socket_waitlist for waiting on multiple sockets and abstract it to poll and select so windows can use it too --- src/include/switch_types.h | 7 + src/include/switch_utils.h | 2 + src/mod/endpoints/mod_verto/mod_verto.c | 105 +++++++------ src/switch_utils.c | 193 ++++++++++++++++++++++++ 4 files changed, 261 insertions(+), 46 deletions(-) diff --git a/src/include/switch_types.h b/src/include/switch_types.h index 8b2fd99b9f..55f1bd1614 100644 --- a/src/include/switch_types.h +++ b/src/include/switch_types.h @@ -2398,6 +2398,13 @@ typedef enum { SWITCH_POLL_INVALID = (1 << 7) } switch_poll_t; +typedef struct switch_waitlist_s { + switch_os_socket_t sock; + uint32_t events; + uint32_t revents; +} switch_waitlist_t; + + SWITCH_END_EXTERN_C #endif /* For Emacs: diff --git a/src/include/switch_utils.h b/src/include/switch_utils.h index fc6c06861b..f1903c8c29 100644 --- a/src/include/switch_utils.h +++ b/src/include/switch_utils.h @@ -1067,6 +1067,8 @@ SWITCH_DECLARE(unsigned long) switch_atoul(const char *nptr); */ SWITCH_DECLARE(char *) switch_strerror_r(int errnum, char *buf, switch_size_t buflen); SWITCH_DECLARE(int) switch_wait_sock(switch_os_socket_t sock, uint32_t ms, switch_poll_t flags); +SWITCH_DECLARE(int) switch_wait_socklist(switch_waitlist_t *waitlist, uint32_t len, uint32_t ms); + SWITCH_END_EXTERN_C #endif /* For Emacs: diff --git a/src/mod/endpoints/mod_verto/mod_verto.c b/src/mod/endpoints/mod_verto/mod_verto.c index b1568dcfa9..4530505430 100644 --- a/src/mod/endpoints/mod_verto/mod_verto.c +++ b/src/mod/endpoints/mod_verto/mod_verto.c @@ -3255,11 +3255,66 @@ static void handle_mcast_sub(verto_profile_t *profile) } +static int profile_one_loop(verto_profile_t *profile) +{ + switch_waitlist_t pfds[MAX_BIND+4]; + int res, x = 0; + int i = 0; + int max = 2; + + memset(&pfds[0], 0, sizeof(pfds[0]) * MAX_BIND+2); + + for (i = 0; i < profile->i; i++) { + pfds[i].sock = profile->server_socket[i]; + pfds[i].events = SWITCH_POLL_READ|SWITCH_POLL_ERROR; + } + + if (profile->mcast_ip) { + pfds[i].sock = profile->mcast_sub.sock; + pfds[i++].events = SWITCH_POLL_READ|SWITCH_POLL_ERROR; + } + + max = i; + + if ((res = switch_wait_socklist(pfds, max, 1000)) < 0) { + if (errno != EINTR) { + die("POLL FAILED\n"); + } + } + + if (res == 0) { + return 0; + } + + for (x = 0; x < max; x++) { + if (pfds[x].revents & SWITCH_POLL_ERROR) { + die("POLL ERROR\n"); + } + + if (pfds[x].revents & SWITCH_POLL_HUP) { + die("POLL HUP\n"); + } + + if (pfds[x].revents & SWITCH_POLL_READ) { + if (pfds[x].sock == profile->mcast_sub.sock) { + handle_mcast_sub(profile); + } else { + start_jsock(profile, pfds[x].sock); + } + } + } + + return res; + + error: + return -1; +} + + static int runtime(verto_profile_t *profile) { - int max = 2; int i; - + for (i = 0; i < profile->i; i++) { if ((profile->server_socket[i] = prepare_socket(profile->ip[i].local_ip_addr, profile->ip[i].local_port)) < 0) { die("Client Socket Error!\n"); @@ -3281,50 +3336,8 @@ static int runtime(verto_profile_t *profile) while(profile->running) { - struct pollfd pfds[MAX_BIND+4]; - int res, x = 0; - int i = 0; - - memset(&pfds[0], 0, sizeof(pfds[0]) * MAX_BIND+2); - - for (i = 0; i < profile->i; i++) { - pfds[i].fd = profile->server_socket[i]; - pfds[i].events = POLLIN|POLLERR; - } - - if (profile->mcast_ip) { - pfds[i].fd = profile->mcast_sub.sock; - pfds[i++].events = POLLIN|POLLERR; - } - - max = i; - - if ((res = poll(pfds, max, 1000)) < 0) { - if (errno != EINTR) { - die("POLL FAILED\n"); - } - } - - if (res == 0) { - continue; - } - - for (x = 0; x < max; x++) { - if (pfds[x].revents & POLLERR) { - die("POLL ERROR\n"); - } - - if (pfds[x].revents & POLLHUP) { - die("POLL HUP\n"); - } - - if (pfds[x].revents & POLLIN) { - if (pfds[x].fd == profile->mcast_sub.sock) { - handle_mcast_sub(profile); - } else { - start_jsock(profile, pfds[x].fd); - } - } + if (profile_one_loop(profile) < 0) { + goto error; } } diff --git a/src/switch_utils.c b/src/switch_utils.c index a45187b2cc..6e7dfeacc3 100644 --- a/src/switch_utils.c +++ b/src/switch_utils.c @@ -2570,6 +2570,94 @@ SWITCH_DECLARE(int) switch_wait_sock(switch_os_socket_t sock, uint32_t ms, switc } +SWITCH_DECLARE(int) switch_wait_socklist(switch_waitlist_t *waitlist, uint32_t len, uint32_t ms) +{ + struct pollfd *pfds; + int s = 0, r = 0, i; + + pfds = calloc(len, sizeof(struct pollfd)); + + for (i = 0; i < len; i++) { + if (waitlist[i].sock == SWITCH_SOCK_INVALID) { + break; + } + + pfds[i].fd = waitlist[i].sock; + + if ((waitlist[i].events & SWITCH_POLL_READ)) { + pfds[i].events |= POLLIN; + } + + if ((waitlist[i].events & SWITCH_POLL_WRITE)) { + pfds[i].events |= POLLOUT; + } + + if ((waitlist[i].events & SWITCH_POLL_ERROR)) { + pfds[i].events |= POLLERR; + } + + if ((waitlist[i].events & SWITCH_POLL_HUP)) { + pfds[i].events |= POLLHUP; + } + + if ((waitlist[i].events & SWITCH_POLL_RDNORM)) { + pfds[i].events |= POLLRDNORM; + } + + if ((waitlist[i].events & SWITCH_POLL_RDBAND)) { + pfds[i].events |= POLLRDBAND; + } + + if ((waitlist[i].events & SWITCH_POLL_PRI)) { + pfds[i].events |= POLLPRI; + } + } + + s = poll(pfds, len, ms); + + if (s < 0) { + r = s; + } else if (s > 0) { + for (i = 0; i < len; i++) { + if ((pfds[i].revents & POLLIN)) { + r |= SWITCH_POLL_READ; + waitlist[i].revents |= SWITCH_POLL_READ; + } + if ((pfds[i].revents & POLLOUT)) { + r |= SWITCH_POLL_WRITE; + waitlist[i].revents |= SWITCH_POLL_WRITE; + } + if ((pfds[i].revents & POLLERR)) { + r |= SWITCH_POLL_ERROR; + waitlist[i].revents |= SWITCH_POLL_ERROR; + } + if ((pfds[i].revents & POLLHUP)) { + r |= SWITCH_POLL_HUP; + waitlist[i].revents |= SWITCH_POLL_HUP; + } + if ((pfds[i].revents & POLLRDNORM)) { + r |= SWITCH_POLL_RDNORM; + waitlist[i].revents |= SWITCH_POLL_RDNORM; + } + if ((pfds[i].revents & POLLRDBAND)) { + r |= SWITCH_POLL_RDBAND; + waitlist[i].revents |= SWITCH_POLL_RDBAND; + } + if ((pfds[i].revents & POLLPRI)) { + r |= SWITCH_POLL_PRI; + waitlist[i].revents |= SWITCH_POLL_PRI; + } + if ((pfds[i].revents & POLLNVAL)) { + r |= SWITCH_POLL_INVALID; + waitlist[i].revents |= SWITCH_POLL_INVALID; + } + } + } + + return r; + +} + #else /* use select instead of poll */ SWITCH_DECLARE(int) switch_wait_sock(switch_os_socket_t sock, uint32_t ms, switch_poll_t flags) @@ -2660,6 +2748,111 @@ SWITCH_DECLARE(int) switch_wait_sock(switch_os_socket_t sock, uint32_t ms, switc return r; +} + +SWITCH_DECLARE(int) switch_wait_socklist(switch_waitlist_t *waitlist, uint32_t len, uint32_t ms) +{ + int s = 0, r = 0; + fd_set *rfds; + fd_set *wfds; + fd_set *efds; + struct timeval tv; + int i; + switch_os_socket_t max_fd = 0; + int flags = 0; + + rfds = malloc(sizeof(fd_set)); + wfds = malloc(sizeof(fd_set)); + efds = malloc(sizeof(fd_set)); + + FD_ZERO(rfds); + FD_ZERO(wfds); + FD_ZERO(efds); + + for (i = 0; i < len; i++) { + if (waitlist[i].sock == SWITCH_SOCK_INVALID) { + break; + } + + if (waitlist[i].sock > max_fd) { + max_fd = waitlist[i].sock; + } + +#ifndef WIN32 + /* Wouldn't you rather know?? */ + assert(waitlist[i].sock <= FD_SETSIZE); +#endif + flags |= waitlist[i].events; + + if ((waitlist[i].events & SWITCH_POLL_READ)) { + +#ifdef WIN32 +#pragma warning( push ) +#pragma warning( disable : 4127 ) + FD_SET(waitlist[i].sock, rfds); +#pragma warning( pop ) +#else + FD_SET(waitlist[i].sock, rfds); +#endif + } + + if ((waitlist[i].events & SWITCH_POLL_WRITE)) { + +#ifdef WIN32 +#pragma warning( push ) +#pragma warning( disable : 4127 ) + FD_SET(waitlist[i].sock, wfds); +#pragma warning( pop ) +#else + FD_SET(waitlist[i].sock, wfds); +#endif + } + + if ((waitlist[i].events & SWITCH_POLL_ERROR)) { + +#ifdef WIN32 +#pragma warning( push ) +#pragma warning( disable : 4127 ) + FD_SET(waitlist[i].sock, efds); +#pragma warning( pop ) +#else + FD_SET(waitlist[i].sock, efds); +#endif + } + } + + tv.tv_sec = ms / 1000; + tv.tv_usec = (ms % 1000) * ms; + + s = select(max_fd + 1, (flags & SWITCH_POLL_READ) ? rfds : NULL, (flags & SWITCH_POLL_WRITE) ? wfds : NULL, (flags & SWITCH_POLL_ERROR) ? efds : NULL, &tv); + + if (s < 0) { + r = s; + } else if (s > 0) { + for (i = 0; i < len; i++) { + if ((waitlist[i].events & SWITCH_POLL_READ) && FD_ISSET(waitlist[i].sock, rfds)) { + r |= SWITCH_POLL_READ; + waitlist[i].revents |= SWITCH_POLL_READ; + } + + if ((waitlist[i].events & SWITCH_POLL_WRITE) && FD_ISSET(waitlist[i].sock, wfds)) { + r |= SWITCH_POLL_WRITE; + waitlist[i].revents |= SWITCH_POLL_WRITE; + } + + if ((waitlist[i].events & SWITCH_POLL_ERROR) && FD_ISSET(waitlist[i].sock, efds)) { + r |= SWITCH_POLL_ERROR; + waitlist[i].revents |= SWITCH_POLL_ERROR; + } + } + } + + free(rfds); + free(wfds); + free(efds); + + return r; + } #endif