ported interfaces to new threading functions (incomplete)

This commit is contained in:
Martin Willi 2007-06-11 14:24:32 +00:00
parent 5e564f2bff
commit 424e0c467e
2 changed files with 133 additions and 147 deletions

View File

@ -30,6 +30,7 @@
#include <library.h>
#include <daemon.h>
#include <processing/jobs/callback_job.h>
#define NM_DBUS_SERVICE_STRONG "org.freedesktop.NetworkManager.strongswan"
@ -64,9 +65,9 @@ struct private_dbus_interface_t {
NMVPNState state;
/**
* dispatcher thread for DBUS messages
* job accepting stroke messages
*/
pthread_t thread;
callback_job_t *job;
/**
* name of the currently active connection
@ -392,14 +393,13 @@ static DBusHandlerResult signal_handler(DBusConnection *con, DBusMessage *msg,
/**
* dispatcher function processed by a seperate thread
*/
static void dispatch(private_dbus_interface_t *this)
static job_requeue_t dispatch(private_dbus_interface_t *this)
{
charon->drop_capabilities(charon, TRUE);
while (dbus_connection_read_write_dispatch(this->conn, -1))
if (dbus_connection_read_write_dispatch(this->conn, -1))
{
/* nothing */
return JOB_REQUEUE_DIRECT;
}
return JOB_REQUEUE_NONE;
}
/**
@ -407,8 +407,7 @@ static void dispatch(private_dbus_interface_t *this)
*/
static void destroy(private_dbus_interface_t *this)
{
pthread_cancel(this->thread);
pthread_join(this->thread, NULL);
this->job->cancel(this->job);
dbus_connection_close(this->conn);
dbus_error_free(&this->err);
dbus_shutdown();
@ -469,10 +468,8 @@ interface_t *interface_create()
this->state = NM_VPN_STATE_INIT;
set_state(this, NM_VPN_STATE_STOPPED);
if (pthread_create(&this->thread, NULL, (void*(*)(void*))dispatch, this) != 0)
{
charon->kill(charon, "unable to create stroke thread");
}
this->job = callback_job_create((callback_job_cb_t)dispatch, this, NULL, NULL);
charon->processor->queue_job(charon->processor, (job_t*)this->job);
return &this->public.interface;
}

View File

@ -37,7 +37,7 @@
#include <library.h>
#include <daemon.h>
#include <processing/jobs/callback_job.h>
static struct sockaddr_un socket_addr = { AF_UNIX, "/var/run/charon.xml"};
@ -60,15 +60,16 @@ struct private_xml_interface_t {
int socket;
/**
* thread receiving messages
* job accepting stroke messages
*/
pthread_t thread;
callback_job_t *job;
};
static void get(private_xml_interface_t *this,
xmlTextReaderPtr reader, xmlTextWriterPtr writer)
/**
* process a getRequest message
*/
static void process_get(xmlTextReaderPtr reader, xmlTextWriterPtr writer)
{
if (/* <GetResponse> */
xmlTextWriterStartElement(writer, "GetResponse") < 0 ||
/* <Status Code="200"><Message/></Status> */
@ -85,128 +86,124 @@ static void get(private_xml_interface_t *this,
{
DBG1(DBG_CFG, "error writing XML document (GetResponse)");
}
/*
DBG1(DBG_CFG, "%d %d %s %d %d %s",
xmlTextReaderDepth(reader),
,
xmlTextReaderConstName(reader),
xmlTextReaderIsEmptyElement(reader),
xmlTextReaderHasValue(reader),
xmlTextReaderConstValue(reader));
*/
}
static void receive(private_xml_interface_t *this)
/**
* read from a opened connection and process it
*/
static job_requeue_t process(int *fdp)
{
charon->drop_capabilities(charon, TRUE);
int oldstate, fd = *fdp;
char buffer[4096];
size_t len;
xmlTextReaderPtr reader;
xmlTextWriterPtr writer;
/* disable cancellation by default */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
while (TRUE)
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
len = read(fd, buffer, sizeof(buffer));
pthread_setcancelstate(oldstate, NULL);
if (len <= 0)
{
struct sockaddr_un strokeaddr;
int strokeaddrlen = sizeof(strokeaddr);
int oldstate;
int fd;
char buffer[4096];
size_t len;
/* wait for connections, but allow thread to terminate */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
fd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
pthread_setcancelstate(oldstate, NULL);
if (fd < 0)
{
DBG1(DBG_CFG, "accepting SMP XML socket failed: %s", strerror(errno));
continue;
}
DBG2(DBG_CFG, "SMP XML connection opened");
while (TRUE)
{
xmlTextReaderPtr reader;
xmlTextWriterPtr writer;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
len = read(fd, buffer, sizeof(buffer));
pthread_setcancelstate(oldstate, NULL);
if (len <= 0)
{
close(fd);
DBG2(DBG_CFG, "SMP XML connection closed");
break;
}
reader = xmlReaderForMemory(buffer, len, NULL, NULL, 0);
if (reader == NULL)
{
DBG1(DBG_CFG, "opening SMP XML reader failed");
continue;
}
writer = xmlNewTextWriter(xmlOutputBufferCreateFd(fd, NULL));
if (writer == NULL)
{
xmlFreeTextReader(reader);
DBG1(DBG_CFG, "opening SMP XML writer failed");
continue;
}
/* create the standard message parts */
if (xmlTextWriterStartDocument(writer, NULL, NULL, NULL) < 0 ||
/* <SMPMessage xmlns="http://www.strongswan.org/smp/1.0"> */
xmlTextWriterStartElement(writer, "SMPMessage") < 0 ||
xmlTextWriterWriteAttribute(writer, "xmlns",
"http://www.strongswan.org/smp/1.0") < 0 ||
/* <Body> */
xmlTextWriterStartElement(writer, "Body") < 0)
{
xmlFreeTextReader(reader);
xmlFreeTextWriter(writer);
DBG1(DBG_CFG, "creating SMP XML message failed");
continue;
}
while (TRUE)
{
switch (xmlTextReaderRead(reader))
{
case 1:
{
if (xmlTextReaderNodeType(reader) ==
XML_READER_TYPE_ELEMENT)
{
if (streq(xmlTextReaderConstName(reader), "GetRequest"))
{
get(this, reader, writer);
break;
}
}
continue;
}
case 0:
/* end of XML */
break;
default:
DBG1(DBG_CFG, "parsing SMP XML message failed");
break;
}
xmlFreeTextReader(reader);
break;
}
/* write </Body></SMPMessage> and close document */
if (xmlTextWriterEndDocument(writer) < 0)
{
DBG1(DBG_CFG, "completing SMP XML message failed");
}
xmlFreeTextWriter(writer);
/* write a newline to indicate end of xml */
write(fd, "\n", 1);
}
close(fd);
DBG2(DBG_CFG, "SMP XML connection closed");
return JOB_REQUEUE_NONE;
}
reader = xmlReaderForMemory(buffer, len, NULL, NULL, 0);
if (reader == NULL)
{
DBG1(DBG_CFG, "opening SMP XML reader failed");
return JOB_REQUEUE_FAIR;;
}
writer = xmlNewTextWriter(xmlOutputBufferCreateFd(fd, NULL));
if (writer == NULL)
{
xmlFreeTextReader(reader);
DBG1(DBG_CFG, "opening SMP XML writer failed");
return JOB_REQUEUE_FAIR;;
}
/* create the standard message parts */
if (xmlTextWriterStartDocument(writer, NULL, NULL, NULL) < 0 ||
/* <SMPMessage xmlns="http://www.strongswan.org/smp/1.0"> */
xmlTextWriterStartElement(writer, "SMPMessage") < 0 ||
xmlTextWriterWriteAttribute(writer, "xmlns",
"http://www.strongswan.org/smp/1.0") < 0 ||
/* <Body> */
xmlTextWriterStartElement(writer, "Body") < 0)
{
xmlFreeTextReader(reader);
xmlFreeTextWriter(writer);
DBG1(DBG_CFG, "creating SMP XML message failed");
return JOB_REQUEUE_FAIR;;
}
while (TRUE)
{
switch (xmlTextReaderRead(reader))
{
case 1:
{
if (xmlTextReaderNodeType(reader) == XML_READER_TYPE_ELEMENT)
{
if (streq(xmlTextReaderConstName(reader), "GetRequest"))
{
process_get(reader, writer);
break;
}
}
continue;
}
case 0:
/* end of XML */
break;
default:
DBG1(DBG_CFG, "parsing SMP XML message failed");
break;
}
xmlFreeTextReader(reader);
break;
}
/* write </Body></SMPMessage> and close document */
if (xmlTextWriterEndDocument(writer) < 0)
{
DBG1(DBG_CFG, "completing SMP XML message failed");
}
xmlFreeTextWriter(writer);
/* write a newline to indicate end of xml */
write(fd, "\n", 1);
return JOB_REQUEUE_FAIR;;
}
/**
* accept from XML socket and create jobs to process connections
*/
static job_requeue_t dispatch(private_xml_interface_t *this)
{
struct sockaddr_un strokeaddr;
int oldstate, fd, *fdp, strokeaddrlen = sizeof(strokeaddr);
callback_job_t *job;
/* wait for connections, but allow thread to terminate */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
fd = accept(this->socket, (struct sockaddr *)&strokeaddr, &strokeaddrlen);
pthread_setcancelstate(oldstate, NULL);
if (fd < 0)
{
DBG1(DBG_CFG, "accepting SMP XML socket failed: %s", strerror(errno));
sleep(1);
return JOB_REQUEUE_FAIR;;
}
fdp = malloc_thing(int);
*fdp = fd;
job = callback_job_create((callback_job_cb_t)process, fdp, free, this->job);
charon->processor->queue_job(charon->processor, (job_t*)job);
return JOB_REQUEUE_DIRECT;
}
/**
@ -214,9 +211,7 @@ static void receive(private_xml_interface_t *this)
*/
static void destroy(private_xml_interface_t *this)
{
pthread_cancel(this->thread);
pthread_join(this->thread, NULL);
close(this->socket);
this->job->cancel(this->job);
unlink(socket_addr.sun_path);
free(this);
}
@ -257,15 +252,9 @@ interface_t *interface_create()
free(this);
return NULL;
}
if (pthread_create(&this->thread, NULL, (void*(*)(void*))receive, this) != 0)
{
DBG1(DBG_CFG, "could not create XML socket thread: %s", strerror(errno));
close(this->socket);
unlink(socket_addr.sun_path);
free(this);
return NULL;
}
this->job = callback_job_create((callback_job_cb_t)dispatch, this, NULL, NULL);
charon->processor->queue_job(charon->processor, (job_t*)this->job);
return &this->public.interface;
}