stream_cli: Add res param to read_cb2

Notify user about read errors, similar to what is supported in the
earlier ofd cb backend of osmo_stream_cli:
https://osmocom.org/issues/6405#note-15

Related: OS#6405
Fixes: 5fec34a9f2
Fixes: 0245cf5e07
Change-Id: I395c75ff1e9904757ce1d767a9ac2f779593c4c8
This commit is contained in:
Pau Espin 2024-04-16 19:11:58 +02:00
parent 211c15d543
commit 1957db8ecd
5 changed files with 58 additions and 16 deletions

View File

@ -102,13 +102,19 @@ static int connect_cb(struct osmo_stream_cli *conn)
return 0;
}
static int read_cb(struct osmo_stream_cli *conn, struct msgb *msg)
static int read_cb(struct osmo_stream_cli *conn, int res, struct msgb *msg)
{
int num;
struct msg_sent *cur, *tmp, *found = NULL;
LOGP(DIPATEST, LOGL_DEBUG, "received message from stream (payload len=%d)\n", msgb_length(msg));
if (res <= 0) {
LOGP(DIPATEST, LOGL_ERROR, "Event with no data! %d\n", res);
msgb_free(msg);
return 0;
}
if (osmo_ipa_process_msg(msg) < 0) {
LOGP(DIPATEST, LOGL_ERROR, "bad IPA message\n");
msgb_free(msg);

View File

@ -54,10 +54,17 @@ static int disconnect_cb(struct osmo_stream_cli *conn)
return 0;
}
static int read_cb(struct osmo_stream_cli *conn, struct msgb *msg)
static int read_cb(struct osmo_stream_cli *conn, int res, struct msgb *msg)
{
LOGP(DSTREAMTEST, LOGL_NOTICE, "receiving message from stream... ");
if (res < 0) {
LOGPC(DSTREAMTEST, LOGL_ERROR, "cannot receive message (res = %d)\n", res);
msgb_free(msg);
return 0;
}
LOGPC(DSTREAMTEST, LOGL_NOTICE, "got %d bytes: %s\n", msg->len, msgb_hexdump(msg));
msgb_free(msg);

View File

@ -160,6 +160,13 @@ void osmo_stream_srv_clear_tx_queue(struct osmo_stream_srv *conn);
/*! \brief Osmocom Stream Client: Single client connection */
struct osmo_stream_cli;
/*! Completion call-back function when something was read from from the stream client socket.
* \param[in] cli Stream Client that got receive event.
* \param[in] res return value of the read()/recvmsg()/... call, or -errno in case of error.
* \param[in] msg message buffer containing the read data. Ownership is transferred to the
* call-back, and it must make sure to msgb_free() it eventually! */
typedef int (*osmo_stream_cli_read_cb2_t)(struct osmo_stream_cli *cli, int res, struct msgb *msg);
void osmo_stream_cli_set_name(struct osmo_stream_cli *cli, const char *name);
const char *osmo_stream_cli_get_name(const struct osmo_stream_cli *cli);
void osmo_stream_cli_set_nodelay(struct osmo_stream_cli *cli, bool nodelay);
@ -182,7 +189,7 @@ struct osmo_io_fd *osmo_stream_cli_get_iofd(const struct osmo_stream_cli *cli);
void osmo_stream_cli_set_connect_cb(struct osmo_stream_cli *cli, int (*connect_cb)(struct osmo_stream_cli *cli));
void osmo_stream_cli_set_disconnect_cb(struct osmo_stream_cli *cli, int (*disconnect_cb)(struct osmo_stream_cli *cli));
void osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli, int (*read_cb)(struct osmo_stream_cli *cli));
void osmo_stream_cli_set_read_cb2(struct osmo_stream_cli *cli, int (*read_cb)(struct osmo_stream_cli *cli, struct msgb *msg));
void osmo_stream_cli_set_read_cb2(struct osmo_stream_cli *cli, osmo_stream_cli_read_cb2_t read_cb);
void osmo_stream_cli_set_segmentation_cb(struct osmo_stream_cli *cli, int (*segmentation_cb)(struct msgb *msg));
void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli);
bool osmo_stream_cli_is_connected(struct osmo_stream_cli *cli);

View File

@ -106,7 +106,7 @@ struct osmo_stream_cli {
int (*connect_cb)(struct osmo_stream_cli *cli);
int (*disconnect_cb)(struct osmo_stream_cli *cli);
int (*read_cb)(struct osmo_stream_cli *cli);
int (*iofd_read_cb)(struct osmo_stream_cli *cli, struct msgb *msg);
osmo_stream_cli_read_cb2_t iofd_read_cb;
int (*write_cb)(struct osmo_stream_cli *cli);
int (*segmentation_cb)(struct msgb *msg);
void *data;
@ -449,14 +449,22 @@ static void stream_cli_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msg
stream_cli_handle_connecting(cli, res);
break;
case STREAM_CLI_STATE_CONNECTED:
if (res <= 0) {
LOGSCLI(cli, LOGL_NOTICE, "received result %d in response to read\n", res);
switch (res) {
case -EPIPE:
case -ECONNRESET:
LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res);
osmo_stream_cli_reconnect(cli);
msgb_free(msg);
break;
case 0:
LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n");
osmo_stream_cli_reconnect(cli);
break;
default:
break;
}
/* Notify user of new data or error: */
if (cli->iofd_read_cb)
cli->iofd_read_cb(cli, msg);
cli->iofd_read_cb(cli, res, msg);
else
msgb_free(msg);
break;
@ -504,15 +512,22 @@ static void stream_cli_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct
stream_cli_handle_connecting(cli, res);
break;
case STREAM_CLI_STATE_CONNECTED:
if (res <= 0) {
LOGSCLI(cli, LOGL_NOTICE, "received result %d in response to recvmsg\n", res);
switch (res) {
case -EPIPE:
case -ECONNRESET:
LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res);
osmo_stream_cli_reconnect(cli);
msgb_free(msg);
break;
case 0:
LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n");
osmo_stream_cli_reconnect(cli);
break;
default:
break;
}
/* Forward message to read callback, also if the connection failed. */
/* Notify user of new data or error: */
if (cli->iofd_read_cb)
cli->iofd_read_cb(cli, msg);
cli->iofd_read_cb(cli, res, msg);
else
msgb_free(msg);
break;
@ -818,7 +833,7 @@ osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli,
* \param[in] read_cb Call-back function to be called when data was read from the socket */
void
osmo_stream_cli_set_read_cb2(struct osmo_stream_cli *cli,
int (*read_cb)(struct osmo_stream_cli *cli, struct msgb *msg))
osmo_stream_cli_read_cb2_t read_cb)
{
OSMO_ASSERT(cli->mode != OSMO_STREAM_MODE_OSMO_FD);
cli->mode = OSMO_STREAM_MODE_OSMO_IO;

View File

@ -502,12 +502,19 @@ static void send_last_third(void *osmo_stream_cli_arg)
static struct osmo_timer_list fragmented_send_tl_cli;
static int test_segm_ipa_stream_srv_cli_read_cb(struct osmo_stream_cli *osc, struct msgb *msg)
static int test_segm_ipa_stream_srv_cli_read_cb(struct osmo_stream_cli *osc, int res, struct msgb *msg)
{
unsigned char *data;
struct ipa_head *h = (struct ipa_head *) msg->l1h;
uint8_t ipac_msg_type = *msg->data;
struct msgb *reply;
if (res < 0) {
fprintf(stderr, "cannot receive message (res = %d)\n", res);
msgb_free(msg);
return -ENOMSG;
}
LOGCLI(osc, "Received message from stream (payload len = %" PRIu16 ")\n", msgb_length(msg));
if (ipac_msg_type < 0 || 5 < ipac_msg_type) {
fprintf(stderr, "Received unexpected IPAC message type %"PRIu8"\n", ipac_msg_type);
@ -772,7 +779,7 @@ static int test_segm_ipa_stream_cli_srv_accept_cb(struct osmo_stream_srv_link *s
static bool test_segm_ipa_stream_cli_all_msgs_processed = false;
static int test_segm_ipa_stream_cli_cli_read_cb(struct osmo_stream_cli *osc, struct msgb *msg)
static int test_segm_ipa_stream_cli_cli_read_cb(struct osmo_stream_cli *osc, int res, struct msgb *msg)
{
static unsigned msgnum_cli = 0;
unsigned char *data;