Merge branch 'zecke/sms-queue'

This commit is contained in:
Holger Hans Peter Freyther 2010-12-26 09:42:19 +01:00
commit c407ba5ed2
21 changed files with 756 additions and 58 deletions

View File

@ -0,0 +1,66 @@
"I create output for some simple SQL statements for the HLR db"
Eval [
"Create tables if they don't exist"
Transcript show: 'CREATE TABLE SMS (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created TIMESTAMP NOT NULL,
sent TIMESTAMP,
sender_id INTEGER NOT NULL,
receiver_id INTEGER NOT NULL,
deliver_attempts INTEGER NOT NULL DEFAULT 0,
valid_until TIMESTAMP,
reply_path_req INTEGER NOT NULL,
status_rep_req INTEGER NOT NULL,
protocol_id INTEGER NOT NULL,
data_coding_scheme INTEGER NOT NULL,
ud_hdr_ind INTEGER NOT NULL,
dest_addr TEXT,
user_data BLOB,
header BLOB,
text TEXT);'; nl;
show: 'CREATE TABLE Subscriber (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created TIMESTAMP NOT NULL,
updated TIMESTAMP NOT NULL,
imsi NUMERIC UNIQUE NOT NULL,
name TEXT,
extension TEXT UNIQUE,
authorized INTEGER NOT NULL DEFAULT 0,
tmsi TEXT UNIQUE,
lac INTEGER NOT NULL DEFAULT 0);'; nl.
"Create some dummy subscribers"
num_sub := 1000.
num_sms := 30.
lac := 1.
Transcript show: 'BEGIN;'; nl.
1 to: num_sub do: [:each |
Transcript show: 'INSERT INTO Subscriber
(imsi, created, updated, authorized, lac, extension)
VALUES
(%1, datetime(''now''), datetime(''now''), 1, %2, %3);' %
{(274090000000000 + each). lac. each}; nl.
].
1 to: num_sms do: [:sms |
1 to: num_sub do: [:sub |
Transcript show: 'INSERT INTO SMS
(created, sender_id, receiver_id, valid_until,
reply_path_req, status_rep_req, protocol_id,
data_coding_scheme, ud_hdr_ind, dest_addr,
text) VALUES
(datetime(''now''), 1, %1, ''2222-2-2'',
0, 0, 0,
0, 0, ''123456'',
''abc'');' % {sub}; nl.
]
].
Transcript show: 'COMMIT;'; nl.
]

View File

@ -0,0 +1,10 @@
"Query for one SMS"
Eval [
1 to: 100 do: [:each |
Transcript show: 'SELECT SMS.* FROM SMS
JOIN Subscriber ON SMS.receiver_id = Subscriber.id
WHERE SMS.id >= 1 AND SMS.sent IS NULL AND Subscriber.lac > 0
ORDER BY SMS.id LIMIT 1;'; nl.
].
]

View File

@ -0,0 +1,5 @@
probe process("/usr/lib/libsqlite3.so.0.8.6").function("sqlite3_get_table")
{
a = user_string($zSql);
printf("sqlite3_get_table called '%s'\n", a);
}

View File

@ -11,7 +11,7 @@ noinst_HEADERS = abis_nm.h abis_rsl.h db.h gsm_04_08.h gsm_data.h \
gb_proxy.h gprs_sgsn.h gsm_04_08_gprs.h sgsn.h \
gprs_ns_frgre.h auth.h osmo_msc.h bsc_msc.h bsc_nat.h \
osmo_bsc_rf.h osmo_bsc.h network_listen.h bsc_nat_sccp.h \
osmo_msc_data.h osmo_bsc_grace.h
osmo_msc_data.h osmo_bsc_grace.h sms_queue.h
openbsc_HEADERS = gsm_04_08.h meas_rep.h bsc_api.h
openbscdir = $(includedir)/openbsc

View File

@ -63,8 +63,9 @@ int db_sync_lastauthtuple_for_subscr(struct gsm_auth_tuple *atuple,
/* SMS store-and-forward */
int db_sms_store(struct gsm_sms *sms);
struct gsm_sms *db_sms_get(struct gsm_network *net, unsigned long long id);
struct gsm_sms *db_sms_get_unsent(struct gsm_network *net, unsigned long long min_id);
struct gsm_sms *db_sms_get_unsent_by_subscr(struct gsm_network *net, unsigned long long min_subscr_id);
struct gsm_sms *db_sms_get_unsent_by_subscr(struct gsm_network *net, unsigned long long min_subscr_id, unsigned int failed);
struct gsm_sms *db_sms_get_unsent_for_subscr(struct gsm_subscriber *subscr);
int db_sms_mark_sent(struct gsm_sms *sms);
int db_sms_inc_deliver_attempts(struct gsm_sms *sms);

View File

@ -33,5 +33,7 @@ void sms_free(struct gsm_sms *sms);
void _gsm411_sms_trans_free(struct gsm_trans *trans);
int gsm411_send_sms_subscr(struct gsm_subscriber *subscr,
struct gsm_sms *sms);
int gsm411_send_sms(struct gsm_subscriber_connection *conn,
struct gsm_sms *sms);
void gsm411_sapi_n_reject(struct gsm_subscriber_connection *conn);
#endif

View File

@ -5,6 +5,7 @@
struct osmo_msc_data;
struct osmo_bsc_sccp_con;
struct gsm_sms_queue;
enum gsm_phys_chan_config {
GSM_PCHAN_NONE,
@ -83,6 +84,7 @@ enum gsm_paging_event {
GSM_PAGING_SUCCEEDED,
GSM_PAGING_EXPIRED,
GSM_PAGING_OOM,
GSM_PAGING_BUSY,
};
enum bts_gprs_mode {
@ -733,6 +735,7 @@ struct gsm_network {
/* subscriber related features */
int keep_subscr;
struct gsm_sms_queue *sms_queue;
};
#define SMS_HDR_SIZE 128

View File

@ -46,6 +46,9 @@ struct gsm_paging_request {
/* Timer 3113: how long do we try to page? */
struct timer_list T3113;
/* How often did we ask the BTS to page? */
int attempts;
/* callback to be called in case paging completes */
gsm_cbfn *cbfn;
void *cbfn_param;

View File

@ -61,6 +61,7 @@ enum signal_sms {
S_SMS_DELIVERED, /* A SMS has been successfully delivered to a MS */
S_SMS_SMMA, /* A MS tells us it has more space available */
S_SMS_MEM_EXCEEDED, /* A MS tells us it has no more space available */
S_SMS_UNKNOWN_ERROR, /* A MS tells us it has an error */
};
/* SS_ABISIP signals */
@ -140,6 +141,8 @@ struct paging_signal_data {
struct gsm_subscriber *subscr;
struct gsm_bts *bts;
int paging_result;
/* NULL in case the paging didn't work */
struct gsm_subscriber_connection *conn;
};
@ -170,6 +173,15 @@ struct rf_signal_data {
struct gsm_network *net;
};
struct sms_signal_data {
/* The transaction where this occured */
struct gsm_trans *trans;
/* Can be NULL for SMMA */
struct gsm_sms *sms;
/* int paging result. Only the ones with > 0 */
int paging_result;
};
enum signal_ns {
S_NS_RESET,
S_NS_BLOCK,

View File

@ -0,0 +1,17 @@
#ifndef SMS_QUEUE_H
#define SMS_QUEUE_H
struct gsm_network;
struct gsm_sms_queue;
struct vty;
int sms_queue_start(struct gsm_network *, int in_flight);
int sms_queue_trigger(struct gsm_sms_queue *);
/* vty helper functions */
int sms_queue_stats(struct gsm_sms_queue *, struct vty* vty);
int sms_queue_set_max_pending(struct gsm_sms_queue *, int max);
int sms_queue_set_max_failure(struct gsm_sms_queue *, int fail);
int sms_queue_clear(struct gsm_sms_queue *);
#endif

View File

@ -35,7 +35,7 @@ libvty_a_SOURCES = common_vty.c
libmgcp_a_SOURCES = mgcp/mgcp_protocol.c mgcp/mgcp_network.c mgcp/mgcp_vty.c
bsc_hack_SOURCES = bsc_hack.c bsc_init.c bsc_vty.c vty_interface_layer3.c
bsc_hack_SOURCES = bsc_hack.c bsc_init.c bsc_vty.c vty_interface_layer3.c sms_queue.c
bsc_hack_LDADD = libmsc.a libbsc.a libvty.a libmsc.a \
-ldl -ldbi $(LIBCRYPT) $(LIBOSMOVTY_LIBS)

View File

@ -274,6 +274,10 @@ int main(int argc, char **argv)
signal(SIGUSR2, &signal_handler);
signal(SIGPIPE, SIG_IGN);
/* start the SMS queue */
if (sms_queue_start(bsc_gsmnet, 20) != 0)
return -1;
if (daemonize) {
rc = osmo_daemonize();
if (rc < 0) {

View File

@ -1063,6 +1063,28 @@ static struct gsm_sms *sms_from_result(struct gsm_network *net, dbi_result resul
return sms;
}
struct gsm_sms *db_sms_get(struct gsm_network *net, unsigned long long id)
{
dbi_result result;
struct gsm_sms *sms;
result = dbi_conn_queryf(conn,
"SELECT * FROM SMS WHERE SMS.id = %llu", id);
if (!result)
return NULL;
if (!dbi_result_next_row(result)) {
dbi_result_free(result);
return NULL;
}
sms = sms_from_result(net, result);
dbi_result_free(result);
return sms;
}
/* retrieve the next unsent SMS with ID >= min_id */
struct gsm_sms *db_sms_get_unsent(struct gsm_network *net, unsigned long long min_id)
{
@ -1092,7 +1114,9 @@ struct gsm_sms *db_sms_get_unsent(struct gsm_network *net, unsigned long long mi
return sms;
}
struct gsm_sms *db_sms_get_unsent_by_subscr(struct gsm_network *net, unsigned long long min_subscr_id)
struct gsm_sms *db_sms_get_unsent_by_subscr(struct gsm_network *net,
unsigned long long min_subscr_id,
unsigned int failed)
{
dbi_result result;
struct gsm_sms *sms;
@ -1102,9 +1126,9 @@ struct gsm_sms *db_sms_get_unsent_by_subscr(struct gsm_network *net, unsigned lo
"FROM SMS JOIN Subscriber ON "
"SMS.receiver_id = Subscriber.id "
"WHERE SMS.receiver_id >= %llu AND SMS.sent IS NULL "
"AND Subscriber.lac > 0 "
"AND Subscriber.lac > 0 AND SMS.deliver_attempts < %u "
"ORDER BY SMS.receiver_id, SMS.id LIMIT 1",
min_subscr_id);
min_subscr_id, failed);
if (!result)
return NULL;

View File

@ -1409,6 +1409,7 @@ static int setup_trig_pag_evt(unsigned int hooknum, unsigned int event,
gsm48_cc_tx_setup(transt, &transt->cc.msg);
break;
case GSM_PAGING_EXPIRED:
case GSM_PAGING_BUSY:
DEBUGP(DCC, "Paging subscr %s expired!\n",
subscr->extension);
/* Temporarily out of order */

View File

@ -101,7 +101,6 @@ static const struct value_string rp_cause_strs[] = {
{ 0, NULL }
};
static int gsm411_send_sms(struct gsm_subscriber_connection *conn, struct gsm_sms *sms);
struct gsm_sms *sms_alloc(void)
{
@ -119,6 +118,18 @@ void sms_free(struct gsm_sms *sms)
talloc_free(sms);
}
static void send_signal(int sig_no,
struct gsm_trans *trans,
struct gsm_sms *sms,
int paging_result)
{
struct sms_signal_data sig;
sig.trans = trans;
sig.sms = sms;
sig.paging_result = paging_result;
dispatch_signal(SS_SMS, sig_no, &sig);
}
/*
* This should be called whenever all SMS to a given subscriber
* on a given connection has been sent. This will inform the higher
@ -422,7 +433,7 @@ static int gsm340_rx_sms_submit(struct msgb *msg, struct gsm_sms *gsms)
return GSM411_RP_CAUSE_MO_NET_OUT_OF_ORDER;
}
/* dispatch a signal to tell higher level about it */
dispatch_signal(SS_SMS, S_SMS_SUBMITTED, gsms);
send_signal(S_SMS_SUBMITTED, NULL, gsms, 0);
return 0;
}
@ -614,7 +625,8 @@ static int gsm340_rx_tpdu(struct gsm_subscriber_connection *conn, struct msgb *m
gsms->validity_minutes = gsm340_validity_period(sms_vpf, sms_vp);
dispatch_signal(SS_SMS, 0, gsms);
/* FIXME: This looks very wrong */
send_signal(0, NULL, gsms, 0);
/* determine gsms->receiver based on dialled number */
gsms->receiver = subscr_get_by_extension(conn->bts->network, gsms->dest_addr);
@ -754,7 +766,7 @@ static int gsm411_rx_rp_ack(struct msgb *msg, struct gsm_trans *trans,
/* mark this SMS as sent in database */
db_sms_mark_sent(sms);
dispatch_signal(SS_SMS, S_SMS_DELIVERED, sms);
send_signal(S_SMS_DELIVERED, trans, sms, 0);
sms_free(sms);
trans->sms.sms = NULL;
@ -807,12 +819,14 @@ static int gsm411_rx_rp_error(struct msgb *msg, struct gsm_trans *trans,
if (cause == GSM411_RP_CAUSE_MT_MEM_EXCEEDED) {
/* MS has not enough memory to store the message. We need
* to store this in our database and wati for a SMMA message */
* to store this in our database and wait for a SMMA message */
/* FIXME */
dispatch_signal(SS_SMS, S_SMS_MEM_EXCEEDED, trans->subscr);
send_signal(S_SMS_MEM_EXCEEDED, trans, sms, 0);
counter_inc(net->stats.sms.rp_err_mem);
} else
} else {
send_signal(S_SMS_UNKNOWN_ERROR, trans, sms, 0);
counter_inc(net->stats.sms.rp_err_other);
}
sms_free(sms);
trans->sms.sms = NULL;
@ -832,7 +846,7 @@ static int gsm411_rx_rp_smma(struct msgb *msg, struct gsm_trans *trans,
/* MS tells us that it has memory for more SMS, we need
* to check if we have any pending messages for it and then
* transfer those */
dispatch_signal(SS_SMS, S_SMS_SMMA, trans->subscr);
send_signal(S_SMS_SMMA, trans, NULL, 0);
/* check for more messages for this subscriber */
sms = db_sms_get_unsent_for_subscr(trans->subscr);
@ -1032,7 +1046,7 @@ int gsm0411_rcv_sms(struct gsm_subscriber_connection *conn,
/* Take a SMS in gsm_sms structure and send it through an already
* existing lchan. We also assume that the caller ensured this lchan already
* has a SAPI3 RLL connection! */
static int gsm411_send_sms(struct gsm_subscriber_connection *conn, struct gsm_sms *sms)
int gsm411_send_sms(struct gsm_subscriber_connection *conn, struct gsm_sms *sms)
{
struct msgb *msg = gsm411_msgb_alloc();
struct gsm_trans *trans;
@ -1044,6 +1058,7 @@ static int gsm411_send_sms(struct gsm_subscriber_connection *conn, struct gsm_sm
transaction_id = trans_assign_trans_id(conn->subscr, GSM48_PDISC_SMS, 0);
if (transaction_id == -1) {
LOGP(DSMS, LOGL_ERROR, "No available transaction ids\n");
send_signal(S_SMS_UNKNOWN_ERROR, NULL, sms, 0);
sms_free(sms);
return -EBUSY;
}
@ -1055,6 +1070,7 @@ static int gsm411_send_sms(struct gsm_subscriber_connection *conn, struct gsm_sm
transaction_id, new_callref++);
if (!trans) {
LOGP(DSMS, LOGL_ERROR, "No memory for trans\n");
send_signal(S_SMS_UNKNOWN_ERROR, NULL, sms, 0);
sms_free(sms);
/* FIXME: send some error message */
return -ENOMEM;
@ -1088,6 +1104,7 @@ static int gsm411_send_sms(struct gsm_subscriber_connection *conn, struct gsm_sm
/* generate the 03.40 TPDU */
rc = gsm340_gen_tpdu(msg, sms);
if (rc < 0) {
send_signal(S_SMS_UNKNOWN_ERROR, trans, sms, 0);
trans_free(trans);
sms_free(sms);
msgb_free(msg);
@ -1126,6 +1143,8 @@ static int paging_cb_send_sms(unsigned int hooknum, unsigned int event,
break;
case GSM_PAGING_EXPIRED:
case GSM_PAGING_OOM:
case GSM_PAGING_BUSY:
send_signal(S_SMS_UNKNOWN_ERROR, NULL, sms, event);
sms_free(sms);
rc = -ETIMEDOUT;
break;
@ -1154,36 +1173,11 @@ int gsm411_send_sms_subscr(struct gsm_subscriber *subscr,
return 0;
}
static int subscr_sig_cb(unsigned int subsys, unsigned int signal,
void *handler_data, void *signal_data)
{
struct gsm_subscriber *subscr;
struct gsm_subscriber_connection *conn;
struct gsm_sms *sms;
switch (signal) {
case S_SUBSCR_ATTACHED:
/* A subscriber has attached. Check if there are
* any pending SMS for him to be delivered */
subscr = signal_data;
conn = connection_for_subscr(subscr);
if (!conn)
break;
sms = db_sms_get_unsent_for_subscr(subscr);
if (!sms)
break;
gsm411_send_sms(conn, sms);
break;
default:
break;
}
return 0;
}
void _gsm411_sms_trans_free(struct gsm_trans *trans)
{
if (trans->sms.sms) {
LOGP(DSMS, LOGL_ERROR, "Transaction contains SMS.\n");
send_signal(S_SMS_UNKNOWN_ERROR, trans, trans->sms.sms, 0);
sms_free(trans->sms.sms);
trans->sms.sms = NULL;
}
@ -1203,6 +1197,7 @@ void gsm411_sapi_n_reject(struct gsm_subscriber_connection *conn)
continue;
}
send_signal(S_SMS_UNKNOWN_ERROR, trans, sms, 0);
sms_free(sms);
trans->sms.sms = NULL;
trans_free(trans);
@ -1211,7 +1206,3 @@ void gsm411_sapi_n_reject(struct gsm_subscriber_connection *conn)
gsm411_release_conn(conn);
}
static __attribute__((constructor)) void on_dso_load_sms(void)
{
register_signal_handler(SS_SUBSCR, subscr_sig_cb, NULL);
}

View File

@ -84,6 +84,7 @@ static int subscr_paging_dispatch(unsigned int hooknum, unsigned int event,
sig_data.subscr = subscr;
sig_data.bts = conn ? conn->bts : NULL;
sig_data.conn = conn;
sig_data.paging_result = event;
dispatch_signal(
SS_PAGING,
event == GSM_PAGING_SUCCEEDED ?
@ -169,7 +170,7 @@ static void subscr_send_paging_request(struct gsm_subscriber *subscr)
/* paging failed, quit now */
if (rc <= 0) {
subscr_paging_cb(GSM_HOOK_RR_PAGING, GSM_PAGING_EXPIRED,
subscr_paging_cb(GSM_HOOK_RR_PAGING, GSM_PAGING_BUSY,
NULL, NULL, subscr);
}
}
@ -293,6 +294,8 @@ struct gsm_subscriber *subscr_get_by_id(struct gsm_network *net,
int subscr_update(struct gsm_subscriber *s, struct gsm_bts *bts, int reason)
{
int rc;
/* FIXME: Migrate pending requests from one BSC to another */
switch (reason) {
case GSM_SUBSCRIBER_UPDATE_ATTACHED:
@ -301,6 +304,8 @@ int subscr_update(struct gsm_subscriber *s, struct gsm_bts *bts, int reason)
s->lac = bts->location_area_code;
LOGP(DMM, LOGL_INFO, "Subscriber %s ATTACHED LAC=%u\n",
subscr_name(s), s->lac);
rc = db_sync_subscriber(s);
db_subscriber_update(s);
dispatch_signal(SS_SUBSCR, S_SUBSCR_ATTACHED, s);
break;
case GSM_SUBSCRIBER_UPDATE_DETACHED:
@ -308,14 +313,19 @@ int subscr_update(struct gsm_subscriber *s, struct gsm_bts *bts, int reason)
if (bts->location_area_code == s->lac)
s->lac = GSM_LAC_RESERVED_DETACHED;
LOGP(DMM, LOGL_INFO, "Subscriber %s DETACHED\n", subscr_name(s));
rc = db_sync_subscriber(s);
db_subscriber_update(s);
dispatch_signal(SS_SUBSCR, S_SUBSCR_DETACHED, s);
break;
default:
fprintf(stderr, "subscr_update with unknown reason: %d\n",
reason);
rc = db_sync_subscriber(s);
db_subscriber_update(s);
break;
};
return db_sync_subscriber(s);
return rc;
}
void subscr_update_from_db(struct gsm_subscriber *sub)

View File

@ -209,6 +209,7 @@ static void paging_handle_pending_requests(struct gsm_bts_paging_state *paging_b
/* handle the paging request now */
page_ms(request);
paging_bts->available_slots--;
request->attempts++;
/* take the current and add it to the back */
llist_del(&request->entry);
@ -253,6 +254,7 @@ static void paging_T3113_expired(void *data)
struct gsm_paging_request *req = (struct gsm_paging_request *)data;
void *cbfn_param;
gsm_cbfn *cbfn;
int msg;
LOGP(DPAG, LOGL_INFO, "T3113 expired for request %p (%s)\n",
req, req->subscr->imsi);
@ -261,10 +263,15 @@ static void paging_T3113_expired(void *data)
counter_inc(req->bts->network->stats.paging.expired);
cbfn_param = req->cbfn_param;
cbfn = req->cbfn;
/* did we ever manage to page the subscriber */
msg = req->attempts > 0 ? GSM_PAGING_EXPIRED : GSM_PAGING_BUSY;
/* destroy it now. Do not access req afterwards */
paging_remove_request(&req->bts->paging, req);
if (cbfn)
cbfn(GSM_HOOK_RR_PAGING, GSM_PAGING_EXPIRED, NULL, NULL,
cbfn(GSM_HOOK_RR_PAGING, msg, NULL, NULL,
cbfn_param);
}

View File

@ -60,6 +60,7 @@ static int paging_cb_silent(unsigned int hooknum, unsigned int event,
dispatch_signal(SS_SCALL, S_SCALL_SUCCESS, &sigdata);
break;
case GSM_PAGING_EXPIRED:
case GSM_PAGING_BUSY:
DEBUGP(DSMS, "expired\n");
dispatch_signal(SS_SCALL, S_SCALL_EXPIRED, &sigdata);
break;

478
openbsc/src/sms_queue.c Normal file
View File

@ -0,0 +1,478 @@
/* SMS queue to continously attempt to deliver SMS */
/*
* (C) 2010 by Holger Hans Peter Freyther <zecke@selfish.org>
* All Rights Reserved
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
*/
/**
* The difficulty of such a queue is to send a lot of SMS without
* overloading the paging subsystem and the database and other users
* of the MSC. To make the best use we would need to know the number
* of pending paging requests, then throttle the number of SMS we
* want to send and such.
* We will start with a very simple SMS Queue and then try to speed
* things up by collecting data from other parts of the system.
*/
#include <openbsc/sms_queue.h>
#include <openbsc/chan_alloc.h>
#include <openbsc/db.h>
#include <openbsc/debug.h>
#include <openbsc/gsm_data.h>
#include <openbsc/gsm_04_11.h>
#include <openbsc/gsm_subscriber.h>
#include <openbsc/signal.h>
#include <osmocore/talloc.h>
#include <osmocom/vty/vty.h>
/*
* One pending SMS that we wait for.
*/
struct gsm_sms_pending {
struct llist_head entry;
struct gsm_subscriber *subscr;
unsigned long long sms_id;
int failed_attempts;
int resend;
};
struct gsm_sms_queue {
struct timer_list resend_pending;
struct timer_list push_queue;
struct gsm_network *network;
int max_fail;
int max_pending;
int pending;
struct llist_head pending_sms;
unsigned long long last_subscr_id;
};
static int sms_subscr_cb(unsigned int, unsigned int, void *, void *);
static int sms_sms_cb(unsigned int, unsigned int, void *, void *);
static struct gsm_sms_pending *sms_find_pending(struct gsm_sms_queue *smsq,
struct gsm_sms *sms)
{
struct gsm_sms_pending *pending;
llist_for_each_entry(pending, &smsq->pending_sms, entry) {
if (pending->sms_id == sms->id)
return pending;
}
return NULL;
}
static int sms_is_in_pending(struct gsm_sms_queue *smsq, struct gsm_sms *sms)
{
return sms_find_pending(smsq, sms) != NULL;
}
static int sms_subscriber_is_pending(struct gsm_sms_queue *smsq,
struct gsm_subscriber *subscr)
{
struct gsm_sms_pending *pending;
llist_for_each_entry(pending, &smsq->pending_sms, entry) {
if (pending->subscr == subscr)
return 1;
}
return 0;
}
static struct gsm_sms_pending *sms_pending_from(struct gsm_sms_queue *smsq,
struct gsm_sms *sms)
{
struct gsm_sms_pending *pending;
pending = talloc_zero(smsq, struct gsm_sms_pending);
if (!pending)
return NULL;
pending->subscr = subscr_get(sms->receiver);
pending->sms_id = sms->id;
return pending;
}
static void sms_pending_free(struct gsm_sms_pending *pending)
{
subscr_put(pending->subscr);
llist_del(&pending->entry);
talloc_free(pending);
}
static void sms_pending_resend(struct gsm_sms_pending *pending)
{
struct gsm_sms_queue *smsq;
LOGP(DSMS, LOGL_DEBUG,
"Scheduling resend of SMS %llu.\n", pending->sms_id);
pending->resend = 1;
smsq = pending->subscr->net->sms_queue;
if (bsc_timer_pending(&smsq->resend_pending))
return;
bsc_schedule_timer(&smsq->resend_pending, 1, 0);
}
static void sms_pending_failed(struct gsm_sms_pending *pending, int paging_error)
{
struct gsm_sms_queue *smsq;
LOGP(DSMS, LOGL_NOTICE, "Sending SMS %llu failed %d times.\n",
pending->sms_id, pending->failed_attempts);
smsq = pending->subscr->net->sms_queue;
if (++pending->failed_attempts < smsq->max_fail)
return sms_pending_resend(pending);
if (paging_error) {
LOGP(DSMS, LOGL_NOTICE,
"Subscriber %llu is not reachable. Setting LAC=0.\n", pending->subscr->id);
pending->subscr->lac = GSM_LAC_RESERVED_DETACHED;
db_sync_subscriber(pending->subscr);
/* Workaround a failing sync */
db_subscriber_update(pending->subscr);
}
sms_pending_free(pending);
smsq->pending -= 1;
sms_queue_trigger(smsq);
}
/*
* Resend all SMS that are scheduled for a resend. This is done to
* avoid an immediate failure.
*/
static void sms_resend_pending(void *_data)
{
struct gsm_sms_pending *pending, *tmp;
struct gsm_sms_queue *smsq = _data;
llist_for_each_entry_safe(pending, tmp, &smsq->pending_sms, entry) {
struct gsm_sms *sms;
if (!pending->resend)
continue;
sms = db_sms_get(smsq->network, pending->sms_id);
/* the sms is gone? Move to the next */
if (!sms) {
sms_pending_free(pending);
smsq->pending -= 1;
sms_queue_trigger(smsq);
} else {
pending->resend = 0;
gsm411_send_sms_subscr(sms->receiver, sms);
}
}
}
static struct gsm_sms *take_next_sms(struct gsm_sms_queue *smsq)
{
struct gsm_sms *sms;
sms = db_sms_get_unsent_by_subscr(smsq->network, smsq->last_subscr_id, 10);
if (sms) {
smsq->last_subscr_id = sms->receiver->id + 1;
return sms;
}
/* need to wrap around */
smsq->last_subscr_id = 0;
sms = db_sms_get_unsent_by_subscr(smsq->network,
smsq->last_subscr_id, 10);
if (sms)
smsq->last_subscr_id = sms->receiver->id + 1;
return sms;
}
/**
* I will submit up to max_pending - pending SMS to the
* subsystem.
*/
static void sms_submit_pending(void *_data)
{
struct gsm_sms_queue *smsq = _data;
int attempts = smsq->max_pending - smsq->pending;
int initialized = 0;
unsigned long long first_sub = 0;
int attempted = 0, rounds = 0;
LOGP(DSMS, LOGL_NOTICE, "Attempting to send %d SMS\n", attempts);
do {
struct gsm_sms_pending *pending;
struct gsm_sms *sms;
sms = take_next_sms(smsq);
if (!sms)
break;
rounds += 1;
/*
* This code needs to detect a loop. It assumes that no SMS
* will vanish during the time this is executed. We will remember
* the id of the first GSM subscriber we see and then will
* compare this. The Database code should make sure that we will
* see all other subscribers first before seeing this one again.
*
* It is always scary to have an infinite loop like this.
*/
if (!initialized) {
first_sub = sms->receiver->id;
initialized = 1;
} else if (first_sub == sms->receiver->id) {
sms_free(sms);
break;
}
/* no need to send a pending sms */
if (sms_is_in_pending(smsq, sms)) {
LOGP(DSMS, LOGL_DEBUG,
"SMSqueue with pending sms: %llu\n. Skipping", sms->id);
sms_free(sms);
continue;
}
/* no need to send a SMS with the same receiver */
if (sms_subscriber_is_pending(smsq, sms->receiver)) {
LOGP(DSMS, LOGL_DEBUG,
"SMSqueue with pending sub: %llu. Skipping\n", sms->receiver->id);
sms_free(sms);
continue;
}
pending = sms_pending_from(smsq, sms);
if (!pending) {
LOGP(DSMS, LOGL_ERROR,
"Failed to create pending SMS entry.\n");
sms_free(sms);
continue;
}
attempted += 1;
smsq->pending += 1;
llist_add(&pending->entry, &smsq->pending_sms);
gsm411_send_sms_subscr(sms->receiver, sms);
} while (attempted < attempts && rounds < 1000);
LOGP(DSMS, LOGL_DEBUG, "SMSqueue added %d messages in %d rounds\n", attempted, rounds);
}
/*
* Kick off the queue again.
*/
int sms_queue_trigger(struct gsm_sms_queue *smsq)
{
if (bsc_timer_pending(&smsq->push_queue))
return 0;
bsc_schedule_timer(&smsq->push_queue, 1, 0);
return 0;
}
int sms_queue_start(struct gsm_network *network, int max_pending)
{
struct gsm_sms_queue *sms = talloc_zero(network, struct gsm_sms_queue);
if (!sms) {
LOGP(DMSC, LOGL_ERROR, "Failed to create the SMS queue.\n");
return -1;
}
register_signal_handler(SS_SUBSCR, sms_subscr_cb, network);
register_signal_handler(SS_SMS, sms_sms_cb, network);
network->sms_queue = sms;
INIT_LLIST_HEAD(&sms->pending_sms);
sms->max_fail = 1;
sms->network = network;
sms->max_pending = max_pending;
sms->push_queue.data = sms;
sms->push_queue.cb = sms_submit_pending;
sms->resend_pending.data = sms;
sms->resend_pending.cb = sms_resend_pending;
sms_submit_pending(sms);
return 0;
}
static int sub_ready_for_sm(struct gsm_subscriber *subscr)
{
struct gsm_subscriber_connection *conn;
struct gsm_sms *sms;
/* A subscriber has attached. Check if there are
* any pending SMS for him to be delivered */
conn = connection_for_subscr(subscr);
if (!conn)
return -1;
sms = db_sms_get_unsent_for_subscr(subscr);
if (!sms)
return -1;
gsm411_send_sms(conn, sms);
return 0;
}
static int sms_subscr_cb(unsigned int subsys, unsigned int signal,
void *handler_data, void *signal_data)
{
struct gsm_subscriber *subscr = signal_data;
if (signal != S_SUBSCR_ATTACHED)
return 0;
/* this is readyForSM */
return sub_ready_for_sm(subscr);
}
static int sms_sms_cb(unsigned int subsys, unsigned int signal,
void *handler_data, void *signal_data)
{
struct gsm_network *network = handler_data;
struct sms_signal_data *sig_sms = signal_data;
struct gsm_sms_pending *pending;
/* We got a new SMS and maybe should launch the queue again. */
if (signal == S_SMS_SUBMITTED || signal == S_SMS_SMMA) {
sms_queue_trigger(network->sms_queue);
return 0;
}
if (!sig_sms->sms)
return -1;
/*
* Find the entry of our queue. The SMS subsystem will submit
* sms that are not in our control as we just have a channel
* open anyway.
*/
pending = sms_find_pending(network->sms_queue, sig_sms->sms);
if (!pending)
return 0;
switch (signal) {
case S_SMS_DELIVERED:
/*
* Create place for a new SMS but keep the pending data
* so we will not attempt to send the SMS for this subscriber
* as we still have an open channel and will attempt to submit
* SMS to it anyway.
*/
network->sms_queue->pending -= 1;
sms_submit_pending(network->sms_queue);
sms_pending_free(pending);
break;
case S_SMS_MEM_EXCEEDED:
network->sms_queue->pending -= 1;
sms_pending_free(pending);
sms_queue_trigger(network->sms_queue);
break;
case S_SMS_UNKNOWN_ERROR:
/*
* There can be many reasons for this failure. E.g. the paging
* timed out, the subscriber was not paged at all, or there was
* a protocol error. The current strategy is to try sending the
* next SMS for busy/oom and to retransmit when we have paged.
*
* When the paging expires three times we will disable the
* subscriber. If we have some kind of other transmit error we
* should flag the SMS as bad.
*/
switch (sig_sms->paging_result) {
case 0:
/* BAD SMS? */
db_sms_inc_deliver_attempts(sig_sms->sms);
sms_pending_failed(pending, 0);
break;
case GSM_PAGING_EXPIRED:
sms_pending_failed(pending, 1);
break;
case GSM_PAGING_OOM:
case GSM_PAGING_BUSY:
network->sms_queue->pending -= 1;
sms_pending_free(pending);
sms_queue_trigger(network->sms_queue);
break;
default:
LOGP(DSMS, LOGL_ERROR, "Unhandled result: %d\n",
sig_sms->paging_result);
}
break;
default:
LOGP(DSMS, LOGL_ERROR, "Unhandled result: %d\n",
sig_sms->paging_result);
}
return 0;
}
/* VTY helper functions */
int sms_queue_stats(struct gsm_sms_queue *smsq, struct vty *vty)
{
struct gsm_sms_pending *pending;
vty_out(vty, "SMSqueue with max_pending: %d pending: %d%s",
smsq->max_pending, smsq->pending, VTY_NEWLINE);
llist_for_each_entry(pending, &smsq->pending_sms, entry)
vty_out(vty, " SMS Pending for Subscriber: %llu%s",
pending->subscr->id, VTY_NEWLINE);
return 0;
}
int sms_queue_set_max_pending(struct gsm_sms_queue *smsq, int max_pending)
{
LOGP(DSMS, LOGL_NOTICE, "SMSqueue old max: %d new: %d\n",
smsq->max_pending, max_pending);
smsq->max_pending = max_pending;
return 0;
}
int sms_queue_set_max_failure(struct gsm_sms_queue *smsq, int max_fail)
{
LOGP(DSMS, LOGL_NOTICE, "SMSqueue max failure old: %d new: %d\n",
smsq->max_fail, max_fail);
smsq->max_fail = max_fail;
return 0;
}
int sms_queue_clear(struct gsm_sms_queue *smsq)
{
struct gsm_sms_pending *pending, *tmp;
llist_for_each_entry_safe(pending, tmp, &smsq->pending_sms, entry) {
LOGP(DSMS, LOGL_NOTICE,
"SMSqueue clearing for sub %llu\n", pending->subscr->id);
sms_pending_free(pending);
}
return 0;
}

View File

@ -117,7 +117,8 @@ unauth:
static int token_sms_cb(unsigned int subsys, unsigned int signal,
void *handler_data, void *signal_data)
{
struct gsm_sms *sms = signal_data;
struct sms_signal_data *sig = signal_data;
struct gsm_sms *sms = sig->sms;;
struct gsm_subscriber_connection *conn;
u_int8_t auth_rand[16];

View File

@ -20,6 +20,7 @@
*/
#include <stdlib.h>
#include <limits.h>
#include <unistd.h>
#include <sys/types.h>
@ -45,6 +46,7 @@
#include <openbsc/vty.h>
#include <openbsc/gsm_04_80.h>
#include <openbsc/chan_alloc.h>
#include <openbsc/sms_queue.h>
extern struct gsm_network *gsmnet_from_vty(struct vty *v);
@ -61,6 +63,8 @@ static void subscr_dump_full_vty(struct vty *vty, struct gsm_subscriber *subscr)
if (subscr->extension)
vty_out(vty, " Extension: %s%s", subscr->extension,
VTY_NEWLINE);
vty_out(vty, " LAC: %d/0x%x%s",
subscr->lac, subscr->lac, VTY_NEWLINE);
if (subscr->imsi)
vty_out(vty, " IMSI: %s%s", subscr->imsi, VTY_NEWLINE);
if (subscr->tmsi != GSM_RESERVED_TMSI)
@ -123,7 +127,7 @@ DEFUN(sms_send_pend,
int id = 0;
while (1) {
sms = db_sms_get_unsent_by_subscr(gsmnet, id);
sms = db_sms_get_unsent_by_subscr(gsmnet, id, UINT_MAX);
if (!sms)
break;
@ -167,16 +171,15 @@ static int _send_sms_str(struct gsm_subscriber *receiver, char *str,
sms = sms_from_text(receiver, str);
sms->protocol_id = tp_pid;
if(!receiver->lac){
/* subscriber currently not attached, store in database */
if (db_sms_store(sms) != 0) {
LOGP(DSMS, LOGL_ERROR, "Failed to store SMS in Database\n");
return CMD_WARNING;
}
} else {
gsm411_send_sms_subscr(receiver, sms);
/* store in database for the queue */
if (db_sms_store(sms) != 0) {
LOGP(DSMS, LOGL_ERROR, "Failed to store SMS in Database\n");
sms_free(sms);
return CMD_WARNING;
}
sms_free(sms);
sms_queue_trigger(receiver->net->sms_queue);
return CMD_SUCCESS;
}
@ -630,6 +633,60 @@ DEFUN(show_stats,
return CMD_SUCCESS;
}
DEFUN(show_smsqueue,
show_smsqueue_cmd,
"show sms-queue",
SHOW_STR "Display SMSqueue statistics\n")
{
struct gsm_network *net = gsmnet_from_vty(vty);
sms_queue_stats(net->sms_queue, vty);
return CMD_SUCCESS;
}
DEFUN(smsqueue_trigger,
smsqueue_trigger_cmd,
"sms-queue trigger",
"SMS Queue\n" "Trigger sending messages\n")
{
struct gsm_network *net = gsmnet_from_vty(vty);
sms_queue_trigger(net->sms_queue);
return CMD_SUCCESS;
}
DEFUN(smsqueue_max,
smsqueue_max_cmd,
"sms-queue max-pending <1-500>",
"SMS Queue\n" "SMS to attempt to deliver at the same time\n")
{
struct gsm_network *net = gsmnet_from_vty(vty);
sms_queue_set_max_pending(net->sms_queue, atoi(argv[0]));
return CMD_SUCCESS;
}
DEFUN(smsqueue_clear,
smsqueue_clear_cmd,
"sms-queue clear",
"SMS Queue\n" "Clear the queue of pending SMS\n")
{
struct gsm_network *net = gsmnet_from_vty(vty);
sms_queue_clear(net->sms_queue);
return CMD_SUCCESS;
}
DEFUN(smsqueue_fail,
smsqueue_fail_cmd,
"sms-queue max-failure <1-500>",
"SMS Queue\n" "Set maximum amount of failures\n")
{
struct gsm_network *net = gsmnet_from_vty(vty);
sms_queue_set_max_failure(net->sms_queue, atoi(argv[0]));
return CMD_SUCCESS;
}
int bsc_vty_init_extra(void)
{
@ -647,12 +704,17 @@ int bsc_vty_init_extra(void)
install_element_ve(&subscriber_ussd_notify_cmd);
install_element_ve(&subscriber_update_cmd);
install_element_ve(&show_stats_cmd);
install_element_ve(&show_smsqueue_cmd);
install_element(ENABLE_NODE, &ena_subscr_name_cmd);
install_element(ENABLE_NODE, &ena_subscr_extension_cmd);
install_element(ENABLE_NODE, &ena_subscr_authorized_cmd);
install_element(ENABLE_NODE, &ena_subscr_a3a8_cmd);
install_element(ENABLE_NODE, &subscriber_purge_cmd);
install_element(ENABLE_NODE, &smsqueue_trigger_cmd);
install_element(ENABLE_NODE, &smsqueue_max_cmd);
install_element(ENABLE_NODE, &smsqueue_clear_cmd);
install_element(ENABLE_NODE, &smsqueue_fail_cmd);
return 0;
}