libosmonetify SMPP

Use libosmo-netif instead of socket interface in OsmoMSC's SMPP.

Closes: OS#5568
Change-Id: Idc2e60af1010783e555e61b114ae61f55a89d890
This commit is contained in:
Max 2022-10-27 17:31:00 +03:00
parent 11c8116dd7
commit 53cd9e06b8
5 changed files with 132 additions and 152 deletions

View File

@ -1,7 +1,16 @@
#pragma once
#include <inttypes.h>
#include <smpp34_structs.h>
#include <osmocom/msc/gsm_data.h>
#define SMPP_MAX_MSG_SIZE (MAX_TLV_SIZE + \
MAX_DAD_SIZE + \
SERVICE_TYPE_LENGTH + \
ADDRESS_LENGTH + \
TIME_LENGTH + \
SHORT_MESSAGE_LENGTH)
/* Length limits according to SMPP 3.4 spec including NUL-byte: */
#define SMPP_SYS_ID_LEN 15
#define SMPP_PASSWD_LEN 8
@ -19,6 +28,7 @@ struct esme {
enum esme_read_state read_state;
uint32_t read_len;
uint32_t read_idx;
struct osmo_stream_srv *srv; /* represents the TCP connection we accept()ed for this ESME */
struct msgb *read_msg;
uint8_t smpp_version;

View File

@ -87,7 +87,7 @@ void smpp_cmd_err(struct osmo_smpp_cmd *cmd, uint32_t status);
void smpp_cmd_flush_pending(struct smpp_esme *esme);
struct smsc {
struct osmo_fd listen_ofd;
struct osmo_stream_srv_link *link;
struct llist_head esme_list;
struct llist_head acl_list;
struct llist_head route_list;

View File

@ -846,7 +846,7 @@ int smpp_msc_start(struct gsm_network *net)
* default SMPP port, see smpp_smsc_bind()). */
rc = smpp_smsc_start(g_smsc, g_smsc->bind_addr, g_smsc->listen_port);
if (rc < 0)
return rc;
return rc;
rc = osmo_signal_register_handler(SS_SMS, smpp_sms_cb, g_smsc);
if (rc < 0)

View File

@ -38,6 +38,7 @@
#include <osmocom/core/logging.h>
#include <osmocom/core/write_queue.h>
#include <osmocom/core/talloc.h>
#include <osmocom/netif/stream.h>
#include <osmocom/gsm/protocol/gsm_04_11.h>
#include <osmocom/msc/debug.h>
@ -159,10 +160,6 @@ void smpp_acl_delete(struct osmo_smpp_acl *acl)
/* kill any active ESMEs */
if (acl->esme) {
struct esme *esme = acl->esme->esme;
osmo_fd_unregister(&esme->wqueue.bfd);
close(esme->wqueue.bfd.fd);
esme->wqueue.bfd.fd = -1;
smpp_esme_put(acl->esme);
acl->esme = NULL;
}
@ -238,15 +235,14 @@ void smpp_esme_get(struct smpp_esme *esme)
static void esme_destroy(struct smpp_esme *esme)
{
osmo_wqueue_clear(&esme->esme->wqueue);
if (esme->esme->wqueue.bfd.fd >= 0) {
osmo_fd_unregister(&esme->esme->wqueue.bfd);
close(esme->esme->wqueue.bfd.fd);
}
if (esme->use < 0)
return;
smpp_cmd_flush_pending(esme);
llist_del(&esme->list);
if (esme->acl)
esme->acl->esme = NULL;
osmo_stream_srv_destroy(esme->esme->srv);
talloc_free(esme);
}
@ -333,7 +329,7 @@ int pack_and_send(struct esme *esme, uint32_t type, void *ptr)
/* the socket was closed. Avoid allocating + enqueueing msgb, see
* https://osmocom.org/issues/3278 */
if (esme->wqueue.bfd.fd == -1)
if (!esme->srv)
return -EIO;
msg = msgb_alloc(4096, "SMPP_Tx");
@ -348,11 +344,8 @@ int pack_and_send(struct esme *esme, uint32_t type, void *ptr)
}
msgb_put(msg, rlen);
if (osmo_wqueue_enqueue(&esme->wqueue, msg) != 0) {
LOGPESME(esme, LOGL_ERROR, "Write queue full. Dropping message\n");
msgb_free(msg);
return -EAGAIN;
}
osmo_stream_srv_send(esme->srv, msg);
return 0;
}
@ -761,115 +754,107 @@ static int smpp_pdu_rx(struct smpp_esme *esme, struct msgb *msg __uses)
return rc;
}
/* This macro should be called after a call to read() in the read_cb of an
* osmo_fd to properly check for errors.
* rc is the return value of read, err_label is the label to jump to in case of
* an error. The code there should handle closing the connection.
* FIXME: This code should go in libosmocore utils.h so it can be used by other
* projects as well.
* */
#define OSMO_FD_CHECK_READ(rc, err_label) \
if (rc < 0) { \
/* EINTR is a non-fatal error, just try again */ \
if (errno == EINTR) \
return 0; \
goto err_label; \
} else if (rc == 0) { \
goto err_label; \
/* call-back when per-ESME TCP socket has some data to be read */
int esme_read_callback(struct esme *esme, struct osmo_stream_srv *conn)
{
uint32_t smpp_size = esme->read_msg ? esme->read_msg->cb[0] : 0;
int rc;
if (!esme->read_msg) {
esme->read_msg = msgb_alloc(SMPP_MAX_MSG_SIZE, "SMPP Rx");
if (esme->read_msg == NULL) {
LOGPESME(esme, LOGL_ERROR, "unable to allocate %u bytes for message buffer\n", SMPP_MAX_MSG_SIZE);
return -ENOMEM;
}
esme->read_msg->cb[0] = 0;
}
/* !\brief call-back when per-ESME TCP socket has some data to be read */
static int esme_link_read_cb(struct osmo_fd *ofd)
{
struct smpp_esme *e = ofd->data;
struct esme *esme = e->esme;
uint32_t len;
uint8_t *lenptr = (uint8_t *) &len;
uint8_t *cur;
struct msgb *msg;
ssize_t rdlen, rc;
rc = osmo_stream_srv_recv(conn, esme->read_msg); /* N. B: the data will be appended to previously received (if any) */
if (rc <= 0) {
LOGPESME(esme, LOGL_ERROR, "unable to receive message: %s\n", strerror(-rc));
msgb_reset(esme->read_msg);
return -ENOMSG;
}
switch (esme->read_state) {
case READ_ST_IN_LEN:
rdlen = sizeof(uint32_t) - esme->read_idx;
rc = read(ofd->fd, lenptr + esme->read_idx, rdlen);
if (rc < 0)
LOGPESME(esme, LOGL_ERROR, "read returned %zd (%s)\n", rc, strerror(errno));
OSMO_FD_CHECK_READ(rc, dead_socket);
/* we've received rc bytes of smth */
esme->read_idx += rc;
if (esme->read_msg->cb[0] == 0) { /* we're expecting SMPP_LENGTH */
if (msgb_length(esme->read_msg) < 4)
return 0; /* incomplete read - need more data to obtain SMPP_LENGTH */
if (esme->read_idx >= sizeof(uint32_t)) {
esme->read_len = ntohl(len);
if (esme->read_len < 8 || esme->read_len > UINT16_MAX) {
LOGPESME(esme, LOGL_ERROR, "length invalid %u\n", esme->read_len);
goto dead_socket;
}
msg = msgb_alloc(esme->read_len, "SMPP Rx");
if (!msg)
return -ENOMEM;
esme->read_msg = msg;
cur = msgb_put(msg, sizeof(uint32_t));
memcpy(cur, lenptr, sizeof(uint32_t));
esme->read_state = READ_ST_IN_MSG;
esme->read_idx = sizeof(uint32_t);
}
break;
case READ_ST_IN_MSG:
msg = esme->read_msg;
rdlen = esme->read_len - esme->read_idx;
rc = read(ofd->fd, msg->tail, OSMO_MIN(rdlen, msgb_tailroom(msg)));
if (rc < 0)
LOGPESME(esme, LOGL_ERROR, "read returned %zd (%s)\n",
rc, strerror(errno));
OSMO_FD_CHECK_READ(rc, dead_socket);
esme->read_idx += rc;
msgb_put(msg, rc);
if (esme->read_idx >= esme->read_len) {
rc = smpp_pdu_rx(e, esme->read_msg);
smpp_size = osmo_load32be(msgb_data(esme->read_msg));
if (smpp_size < 8 || smpp_size > UINT16_MAX) {
LOGPESME(esme, LOGL_ERROR, "SMPP length %u invalid: not in [%u; %u]\n", smpp_size, 8, UINT16_MAX);
msgb_free(esme->read_msg);
esme->read_msg = NULL;
esme->read_idx = 0;
esme->read_len = 0;
esme->read_state = READ_ST_IN_LEN;
return -EBADF;
}
/* SMPP_LENGTH is correct, let's keep and use it */
esme->read_msg->cb[0] = smpp_size;
}
/* we have already received SMPP_LENGTH, we're expecting data */
/* check that we can receive expected amount of data */
if (esme->read_msg->data_len < smpp_size) { /* we have to resize the message */
struct msgb *new_msg = msgb_copy_resize(esme->read_msg, smpp_size, "SMPP Rx");
if (new_msg == NULL) {
LOGPESME(esme, LOGL_ERROR, "unable to reallocate %u bytes for message buffer\n", smpp_size);
return -ENOMEM;
}
new_msg->cb[0] = esme->read_msg->cb[0];
msgb_free(esme->read_msg);
esme->read_msg = new_msg;
return 0; /* incomplete read - now we're ready to receive data for entire SMPP packet */
}
/* check if we've already got all we've expected */
if (msgb_length(esme->read_msg) != smpp_size) {
/* check that the protocol message size matches the size of received message buffer */
return 0; /* incomplete read - we need more data to obtain complete SMPP packet */
}
/* we have both the length and expected amount of data */
return 1;
}
/* !\brief call-back when per-ESME TCP socket has some data to be read */
static int esme_link_read_cb(struct osmo_stream_srv *conn)
{
struct smpp_esme *e = osmo_stream_srv_get_data(conn);
struct esme *esme = e->esme;
int rc = esme_read_callback(esme, conn);
switch (rc) {
case 1:
rc = smpp_pdu_rx(e, esme->read_msg);
msgb_reset(esme->read_msg);
break;
case -ENOMEM:
case -ENOMSG:
case -EBADF:
if (e->acl)
e->acl->esme = NULL;
smpp_esme_put(e);
break;
default:
return rc;
}
return 0;
dead_socket:
msgb_free(esme->read_msg);
osmo_fd_unregister(&esme->wqueue.bfd);
close(esme->wqueue.bfd.fd);
esme->wqueue.bfd.fd = -1;
if (e->acl)
e->acl->esme = NULL;
smpp_esme_put(e);
return -EBADF;
}
/* call-back of write queue once it wishes to write a message to the socket */
static int esme_link_write_cb(struct osmo_fd *ofd, struct msgb *msg)
/* call-back when new connection is closed on ESME */
static int esme_link_close_cb(struct osmo_stream_srv *conn)
{
struct smpp_esme *esme = ofd->data;
int rc;
struct smpp_esme *esme = osmo_stream_srv_get_data(conn);
rc = write(ofd->fd, msgb_data(msg), msgb_length(msg));
if (rc == 0) {
osmo_fd_unregister(&esme->esme->wqueue.bfd);
close(esme->esme->wqueue.bfd.fd);
esme->esme->wqueue.bfd.fd = -1;
if (esme->acl)
esme->acl->esme = NULL;
smpp_esme_put(esme);
} else if (rc < msgb_length(msg)) {
LOGPESME(esme->esme, LOGL_ERROR, "Short write\n");
return -1;
}
LOGPESME(esme->esme, LOGL_NOTICE, "Connection lost\n");
smpp_esme_put(esme);
return 0;
}
@ -882,15 +867,14 @@ struct esme *esme_alloc(void *ctx)
e->own_seq_nr = rand();
esme_inc_seq_nr(e);
osmo_wqueue_init(&e->wqueue, 10);
return e;
}
/* callback for already-accepted new TCP socket */
static int link_accept_cb(struct smsc *smsc, int fd,
struct sockaddr_storage *s, socklen_t s_len)
static int link_accept_cb(struct osmo_stream_srv_link *link, int fd)
{
struct smsc *smsc = osmo_stream_srv_link_get_data(link);
struct smpp_esme *esme = talloc_zero(smsc, struct smpp_esme);
if (!esme) {
close(fd);
@ -906,38 +890,20 @@ static int link_accept_cb(struct smsc *smsc, int fd,
INIT_LLIST_HEAD(&esme->smpp_cmd_list);
smpp_esme_get(esme);
esme->smsc = smsc;
osmo_fd_setup(&esme->esme->wqueue.bfd, fd, OSMO_FD_READ, osmo_wqueue_bfd_cb, esme, 0);
if (osmo_fd_register(&esme->esme->wqueue.bfd) != 0) {
close(fd);
esme->esme->srv = osmo_stream_srv_create(esme, link, fd, esme_link_read_cb, esme_link_close_cb, esme);
if (!esme->esme->srv) {
talloc_free(esme);
return -EIO;
}
esme->esme->wqueue.read_cb = esme_link_read_cb;
esme->esme->wqueue.write_cb = esme_link_write_cb;
LOGPESME(esme->esme, LOGL_NOTICE, "accepted SMPP client: %s\n", osmo_sock_get_name2(fd));
llist_add_tail(&esme->list, &smsc->esme_list);
return 0;
}
/* callback of listening TCP socket */
static int smsc_fd_cb(struct osmo_fd *ofd, unsigned int what)
{
int rc;
struct sockaddr_storage sa;
socklen_t sa_len = sizeof(sa);
rc = accept(ofd->fd, (struct sockaddr *)&sa, &sa_len);
if (rc < 0) {
LOGP(DSMPP, LOGL_ERROR, "Accept returns %d (%s)\n",
rc, strerror(errno));
return rc;
}
return link_accept_cb(ofd->data, rc, &sa, sa_len);
}
/*! \brief allocate and initialize an smsc struct from talloc context ctx. */
struct smsc *smpp_smsc_alloc_init(void *ctx)
{
@ -947,8 +913,14 @@ struct smsc *smpp_smsc_alloc_init(void *ctx)
INIT_LLIST_HEAD(&smsc->acl_list);
INIT_LLIST_HEAD(&smsc->route_list);
smsc->listen_ofd.data = smsc;
smsc->listen_ofd.cb = smsc_fd_cb;
smsc->link = osmo_stream_srv_link_create(smsc);
if (!smsc->link)
return NULL;
osmo_stream_srv_link_set_data(smsc->link, smsc);
osmo_stream_srv_link_set_accept_cb(smsc->link, link_accept_cb);
osmo_stream_srv_link_set_proto(smsc->link, IPPROTO_TCP);
osmo_stream_srv_link_set_nodelay(smsc->link, true);
return smsc;
}
@ -974,14 +946,16 @@ int smpp_smsc_start(struct smsc *smsc, const char *bind_addr, uint16_t port)
{
int rc;
LOGP(DSMPP, LOGL_NOTICE, "SMPP at %s %d\n",
bind_addr ? bind_addr : "0.0.0.0", port ? port : SMPP_PORT);
osmo_stream_srv_link_set_addr(smsc->link, bind_addr ? bind_addr : "0.0.0.0");
osmo_stream_srv_link_set_port(smsc->link, port ? port : SMPP_PORT);
rc = osmo_sock_init_ofd(&smsc->listen_ofd, AF_UNSPEC, SOCK_STREAM,
IPPROTO_TCP, bind_addr, port ? port : SMPP_PORT,
OSMO_SOCK_F_BIND);
if (rc < 0)
rc = osmo_stream_srv_link_open(smsc->link);
if (rc < 0) {
LOGP(DSMPP, LOGL_ERROR, "SMPP socket cannot be opened: %s\n", strerror(-rc));
return rc;
}
LOGP(DSMPP, LOGL_NOTICE, "SMPP at %s\n", osmo_stream_srv_link_get_sockname(smsc->link));
/* store new address and port */
rc = smpp_smsc_conf(smsc, bind_addr, port ? port : SMPP_PORT);
@ -1008,9 +982,5 @@ int smpp_smsc_restart(struct smsc *smsc, const char *bind_addr, uint16_t port)
/*! /brief Close SMPP connection. */
void smpp_smsc_stop(struct smsc *smsc)
{
if (smsc->listen_ofd.fd > 0) {
close(smsc->listen_ofd.fd);
smsc->listen_ofd.fd = 0;
osmo_fd_unregister(&smsc->listen_ofd);
}
osmo_stream_srv_link_close(smsc->link);
}

View File

@ -27,7 +27,7 @@
#include <osmocom/vty/command.h>
#include <osmocom/vty/buffer.h>
#include <osmocom/vty/vty.h>
#include <osmocom/netif/stream.h>
#include <osmocom/core/linuxlist.h>
#include <osmocom/core/utils.h>
#include <osmocom/core/socket.h>
@ -80,7 +80,7 @@ static int smpp_local_tcp(struct vty *vty,
const char *bind_addr, uint16_t port)
{
struct smsc *smsc = smsc_from_vty(vty);
int is_running = smsc->listen_ofd.fd > 0;
bool is_running = smsc->link;
int same_bind_addr;
int rc;
@ -187,8 +187,8 @@ static int config_write_smpp(struct vty *vty)
vty_out(vty, " local-tcp-ip %s %u%s", smsc->bind_addr,
smsc->listen_port, VTY_NEWLINE);
else
vty_out(vty, " local-tcp-port %u%s", smsc->listen_port,
VTY_NEWLINE);
vty_out(vty, " local-tcp-port %u%s", smsc->listen_port ? smsc->listen_port : SMPP_PORT, VTY_NEWLINE);
if (strlen(smsc->system_id) > 0)
vty_out(vty, " system-id %s%s", smsc->system_id, VTY_NEWLINE);
vty_out(vty, " policy %s%s",
@ -527,7 +527,7 @@ static void dump_one_esme(struct vty *vty, struct smpp_esme *esme)
vty_out(vty, "ESME System ID: %s, Password: %s, SMPP Version %02x%s",
esme->esme->system_id, esme->acl ? esme->acl->passwd : "",
esme->smpp_version, VTY_NEWLINE);
vty_out(vty, " Connection %s%s", osmo_sock_get_name(tall_vty_ctx, esme->esme->wqueue.bfd.fd), VTY_NEWLINE);
vty_out(vty, " Connection %s%s", osmo_stream_srv_link_get_sockname(esme->smsc->link), VTY_NEWLINE);
if (esme->smsc->def_route == esme->acl)
vty_out(vty, " Is current default route%s", VTY_NEWLINE);
}