use a packet buffer for ESL

This commit is contained in:
Anthony Minessale 2010-12-16 11:33:38 -06:00
parent c44b7a7465
commit 2081bf97b9
5 changed files with 141 additions and 75 deletions

View File

@ -276,7 +276,8 @@ bin_PROGRAMS = freeswitch fs_cli fs_ivrd tone2wav fs_encode
## ##
## fs_cli () ## fs_cli ()
## ##
fs_cli_SOURCES = libs/esl/src/esl.c libs/esl/src/esl_config.c libs/esl/src/esl_event.c libs/esl/src/esl_threadmutex.c libs/esl/fs_cli.c libs/esl/src/esl_json.c fs_cli_SOURCES = libs/esl/src/esl.c libs/esl/src/esl_config.c libs/esl/src/esl_event.c \
libs/esl/src/esl_threadmutex.c libs/esl/fs_cli.c libs/esl/src/esl_json.c libs/esl/src/esl_buffer.c
fs_cli_CFLAGS = $(AM_CFLAGS) -I$(switch_srcdir)/libs/esl/src/include fs_cli_CFLAGS = $(AM_CFLAGS) -I$(switch_srcdir)/libs/esl/src/include
fs_cli_LDFLAGS = $(AM_LDFLAGS) -lpthread $(ESL_LDFLAGS) -lm fs_cli_LDFLAGS = $(AM_LDFLAGS) -lpthread $(ESL_LDFLAGS) -lm
@ -304,7 +305,8 @@ tone2wav_LDADD = libfreeswitch.la
## ##
## fs_ivrd () ## fs_ivrd ()
## ##
fs_ivrd_SOURCES = libs/esl/src/esl.c libs/esl/src/esl_config.c libs/esl/src/esl_event.c libs/esl/src/esl_threadmutex.c libs/esl/ivrd.c libs/esl/src/esl_json.c fs_ivrd_SOURCES = libs/esl/src/esl.c libs/esl/src/esl_config.c libs/esl/src/esl_event.c \
libs/esl/src/esl_threadmutex.c libs/esl/ivrd.c libs/esl/src/esl_json.c libs/esl/src/esl_buffer.c
fs_ivrd_CFLAGS = $(AM_CFLAGS) -I$(switch_srcdir)/libs/esl/src/include fs_ivrd_CFLAGS = $(AM_CFLAGS) -I$(switch_srcdir)/libs/esl/src/include
fs_ivrd_LDFLAGS = $(AM_LDFLAGS) -lpthread $(ESL_LDFLAGS) -lm fs_ivrd_LDFLAGS = $(AM_LDFLAGS) -lpthread $(ESL_LDFLAGS) -lm

View File

@ -9,9 +9,9 @@ CXXFLAGS=$(BASE_FLAGS) -Wall -Werror -Wno-unused-variable
MYLIB=libesl.a MYLIB=libesl.a
LIBS=-lncurses -lpthread -lesl -lm LIBS=-lncurses -lpthread -lesl -lm
LDFLAGS=-L. LDFLAGS=-L.
OBJS=src/esl.o src/esl_event.o src/esl_threadmutex.o src/esl_config.o src/esl_json.o OBJS=src/esl.o src/esl_event.o src/esl_threadmutex.o src/esl_config.o src/esl_json.o src/esl_buffer.o
SRC=src/esl.c src/esl_json.c src/esl_event.c src/esl_threadmutex.c src/esl_config.c src/esl_oop.cpp src/esl_json.c SRC=src/esl.c src/esl_json.c src/esl_event.c src/esl_threadmutex.c src/esl_config.c src/esl_oop.cpp src/esl_json.c src/esl_buffer.c
HEADERS=src/include/esl_config.h src/include/esl_event.h src/include/esl.h src/include/esl_threadmutex.h src/include/esl_oop.h src/include/esl_json.h HEADERS=src/include/esl_config.h src/include/esl_event.h src/include/esl.h src/include/esl_threadmutex.h src/include/esl_oop.h src/include/esl_json.h src/include/esl_buffer.h
SOLINK=-shared -Xlinker -x SOLINK=-shared -Xlinker -x
# comment the next line to disable c++ (no swig mods for you then) # comment the next line to disable c++ (no swig mods for you then)
OBJS += src/esl_oop.o OBJS += src/esl_oop.o

View File

@ -428,6 +428,10 @@ ESL_DECLARE(esl_status_t) esl_attach_handle(esl_handle_t *handle, esl_socket_t s
esl_mutex_create(&handle->mutex); esl_mutex_create(&handle->mutex);
} }
if (!handle->packet_buf) {
esl_buffer_create(&handle->packet_buf, BUF_CHUNK, BUF_START, 0);
}
handle->connected = 1; handle->connected = 1;
esl_send_recv(handle, "connect\n\n"); esl_send_recv(handle, "connect\n\n");
@ -632,6 +636,10 @@ ESL_DECLARE(esl_status_t) esl_connect_timeout(esl_handle_t *handle, const char *
if (!handle->mutex) { if (!handle->mutex) {
esl_mutex_create(&handle->mutex); esl_mutex_create(&handle->mutex);
} }
if (!handle->packet_buf) {
esl_buffer_create(&handle->packet_buf, BUF_CHUNK, BUF_START, 0);
}
handle->sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); handle->sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
@ -805,6 +813,11 @@ ESL_DECLARE(esl_status_t) esl_disconnect(esl_handle_t *handle)
esl_mutex_destroy(&mutex); esl_mutex_destroy(&mutex);
} }
if (handle->packet_buf) {
esl_buffer_destroy(&handle->packet_buf);
}
return status; return status;
} }
@ -825,7 +838,7 @@ ESL_DECLARE(esl_status_t) esl_recv_event_timed(esl_handle_t *handle, uint32_t ms
if (check_q) { if (check_q) {
esl_mutex_lock(handle->mutex); esl_mutex_lock(handle->mutex);
if (handle->race_event) { if (handle->race_event || esl_buffer_packet_count(handle->packet_buf)) {
esl_mutex_unlock(handle->mutex); esl_mutex_unlock(handle->mutex);
return esl_recv_event(handle, check_q, save_event); return esl_recv_event(handle, check_q, save_event);
} }
@ -894,12 +907,15 @@ ESL_DECLARE(esl_status_t) esl_recv_event_timed(esl_handle_t *handle, uint32_t ms
} }
static esl_ssize_t handle_recv(esl_handle_t *handle, void *data, esl_size_t datalen)
{
return recv(handle->sock, data, datalen, 0);
}
ESL_DECLARE(esl_status_t) esl_recv_event(esl_handle_t *handle, int check_q, esl_event_t **save_event) ESL_DECLARE(esl_status_t) esl_recv_event(esl_handle_t *handle, int check_q, esl_event_t **save_event)
{ {
char *c; char *c;
esl_ssize_t rrval; esl_ssize_t rrval;
int crc = 0;
esl_event_t *revent = NULL; esl_event_t *revent = NULL;
char *beg; char *beg;
char *hname, *hval; char *hname, *hval;
@ -907,7 +923,6 @@ ESL_DECLARE(esl_status_t) esl_recv_event(esl_handle_t *handle, int check_q, esl_
char *cl; char *cl;
esl_ssize_t len; esl_ssize_t len;
int zc = 0; int zc = 0;
int bread = 0;
if (!handle || !handle->connected || handle->sock == ESL_SOCK_INVALID) { if (!handle || !handle->connected || handle->sock == ESL_SOCK_INVALID) {
return ESL_FAIL; return ESL_FAIL;
@ -916,9 +931,7 @@ ESL_DECLARE(esl_status_t) esl_recv_event(esl_handle_t *handle, int check_q, esl_
esl_mutex_lock(handle->mutex); esl_mutex_lock(handle->mutex);
if (!handle->connected || handle->sock == ESL_SOCK_INVALID) { if (!handle->connected || handle->sock == ESL_SOCK_INVALID) {
handle->connected = 0; goto fail;
esl_mutex_unlock(handle->mutex);
return ESL_FAIL;
} }
esl_event_safe_destroy(&handle->last_event); esl_event_safe_destroy(&handle->last_event);
@ -932,76 +945,62 @@ ESL_DECLARE(esl_status_t) esl_recv_event(esl_handle_t *handle, int check_q, esl_
goto parse_event; goto parse_event;
} }
memset(handle->header_buf, 0, sizeof(handle->header_buf));
while(!revent && handle->connected) {
esl_size_t len;
if ((len = esl_buffer_read_packet(handle->packet_buf, handle->socket_buf, sizeof(handle->socket_buf)))) {
char *data = (char *) handle->socket_buf;
char *p, *e;
esl_event_create(&revent, ESL_EVENT_CLONE);
revent->event_id = ESL_EVENT_SOCKET_DATA;
esl_event_add_header_string(revent, ESL_STACK_BOTTOM, "Event-Name", "SOCKET_DATA");
hname = p = data;
while(p) {
hname = p;
p = NULL;
c = handle->header_buf; if ((hval = strchr(hname, ':'))) {
beg = c; *hval++ = '\0';
while(*hval == ' ' || *hval == '\t') hval++;
while(handle->connected) { if ((e = strchr(hval, '\n'))) {
if (bread + 2 >= sizeof(handle->header_buf)) { *e++ = '\0';
esl_log(ESL_LOG_CRIT, "OUT OF BUFFER SPACE!\n"); while(*e == '\n' || *e == '\r') e++;
handle->connected = 0;
esl_mutex_unlock(handle->mutex); if (hname && hval) {
return ESL_DISCONNECTED; esl_url_decode(hval);
esl_log(ESL_LOG_DEBUG, "RECV HEADER [%s] = [%s]\n", hname, hval);
esl_event_add_header_string(revent, ESL_STACK_BOTTOM, hname, hval);
}
p = e;
}
}
}
break;
} }
rrval = recv(handle->sock, c, 1, 0); rrval = handle_recv(handle, handle->socket_buf, sizeof(handle->socket_buf));
if (rrval == 0) { if (rrval == 0) {
if (++zc >= 100) { if (++zc >= 100) {
handle->connected = 0; goto fail;
esl_mutex_unlock(handle->mutex);
return ESL_DISCONNECTED;
} }
continue;
} else if (rrval < 0) { } else if (rrval < 0) {
strerror_r(handle->errnum, handle->err, sizeof(handle->err)); strerror_r(handle->errnum, handle->err, sizeof(handle->err));
goto fail; goto fail;
} else {
zc = 0;
if (*c == '\n') {
*(c+1) = '\0';
if (++crc == 2) {
break;
}
if (!revent) {
esl_event_create(&revent, ESL_EVENT_CLONE);
revent->event_id = ESL_EVENT_SOCKET_DATA;
esl_event_add_header_string(revent, ESL_STACK_BOTTOM, "Event-Name", "SOCKET_DATA");
}
hname = beg;
hval = col = NULL;
if (hname && (col = strchr(hname, ':'))) {
hval = col + 1;
*col = '\0';
while(*hval == ' ') hval++;
}
*c = '\0';
if (hname && hval) {
esl_url_decode(hval);
esl_log(ESL_LOG_DEBUG, "RECV HEADER [%s] = [%s]\n", hname, hval);
esl_event_add_header_string(revent, ESL_STACK_BOTTOM, hname, hval);
}
c = beg;
bread = 0;
continue;
} else {
crc = 0;
}
c++;
} }
}
zc = 0;
esl_buffer_write(handle->packet_buf, handle->socket_buf, rrval);
}
if (!revent) { if (!revent) {
goto fail; goto fail;
} }
@ -1016,12 +1015,28 @@ ESL_DECLARE(esl_status_t) esl_recv_event(esl_handle_t *handle, int check_q, esl_
*(body + len) = '\0'; *(body + len) = '\0';
do { do {
esl_ssize_t r; esl_ssize_t r,s = esl_buffer_inuse(handle->packet_buf);
if ((r = recv(handle->sock, body + sofar, len - sofar, 0)) < 0) {
strerror_r(handle->errnum, handle->err, sizeof(handle->err)); if (s >= len) {
goto fail; sofar = esl_buffer_read(handle->packet_buf, body, len);
} else {
r = handle_recv(handle, handle->socket_buf, sizeof(handle->socket_buf));
if (r < 0) {
strerror_r(handle->errnum, handle->err, sizeof(handle->err));
goto fail;
} else if (r == 0) {
if (++zc >= 100) {
goto fail;
}
continue;
}
zc = 0;
esl_buffer_write(handle->packet_buf, handle->socket_buf, r);
} }
sofar += r;
} while (sofar < len); } while (sofar < len);
revent->body = body; revent->body = body;
@ -1123,6 +1138,8 @@ ESL_DECLARE(esl_status_t) esl_recv_event(esl_handle_t *handle, int check_q, esl_
fail: fail:
esl_mutex_unlock(handle->mutex);
handle->connected = 0; handle->connected = 0;
return ESL_FAIL; return ESL_FAIL;

View File

@ -251,6 +251,7 @@ typedef int esl_filehandle_t;
#include "esl_json.h" #include "esl_json.h"
typedef int16_t esl_port_t; typedef int16_t esl_port_t;
typedef size_t esl_size_t;
typedef enum { typedef enum {
ESL_SUCCESS, ESL_SUCCESS,
@ -259,7 +260,11 @@ typedef enum {
ESL_DISCONNECTED ESL_DISCONNECTED
} esl_status_t; } esl_status_t;
#define BUF_CHUNK 65536 * 50
#define BUF_START 65536 * 100
#include <esl_threadmutex.h> #include <esl_threadmutex.h>
#include <esl_buffer.h>
/*! \brief A handle that will hold the socket information and /*! \brief A handle that will hold the socket information and
different events received. */ different events received. */
@ -273,7 +278,8 @@ typedef struct {
/*! The error number reported by the OS */ /*! The error number reported by the OS */
int errnum; int errnum;
/*! The inner contents received by the socket. Used only internally. */ /*! The inner contents received by the socket. Used only internally. */
char header_buf[4196]; esl_buffer_t *packet_buf;
char socket_buf[65536];
/*! Last command reply */ /*! Last command reply */
char last_reply[1024]; char last_reply[1024];
/*! Las command reply when called with esl_send_recv */ /*! Las command reply when called with esl_send_recv */

View File

@ -6,6 +6,47 @@
int main(void) int main(void)
{ {
esl_handle_t handle = {{0}}; esl_handle_t handle = {{0}};
esl_buffer_t *buffer;
char doh[65536];
esl_buffer_create(&buffer, 32 * 1024, 32 * 1024, 0);
snprintf(doh, sizeof(doh), "TEST 1 FOO BAR 1234\n");
esl_buffer_write(buffer, doh, strlen(doh));
esl_buffer_write(buffer, doh, strlen(doh));
esl_buffer_write(buffer, doh, strlen(doh));
snprintf(doh, sizeof(doh), "TEST 1 END\n\n");
esl_buffer_write(buffer, doh, strlen(doh));
snprintf(doh, sizeof(doh), "TEST 2 BAR FOO 4321\n");
esl_buffer_write(buffer, doh, strlen(doh));
esl_buffer_write(buffer, doh, strlen(doh));
esl_buffer_write(buffer, doh, strlen(doh));
snprintf(doh, sizeof(doh), "TEST 2 END\n\n");
esl_buffer_write(buffer, doh, strlen(doh));
snprintf(doh, sizeof(doh), "TEST 2 BAR FOO 4321\n");
esl_buffer_write(buffer, doh, strlen(doh));
esl_buffer_write(buffer, doh, strlen(doh));
esl_buffer_write(buffer, doh, strlen(doh));
snprintf(doh, sizeof(doh), "TEST 2 END\n\n");
esl_buffer_write(buffer, doh, strlen(doh));
printf("COUNT %ld\n", esl_buffer_packet_count(buffer));
memset(doh, 0, sizeof(doh));
esl_buffer_read_packet(buffer, doh, sizeof(doh));
printf("TEST: [%s]\n", doh);
memset(doh, 0, sizeof(doh));
esl_buffer_read_packet(buffer, doh, sizeof(doh));
printf("TEST2: [%s]\n", doh);
return 0;
esl_connect(&handle, "localhost", 8021, NULL, "ClueCon"); esl_connect(&handle, "localhost", 8021, NULL, "ClueCon");