mysql: Implement transaction handling

This commit is contained in:
Tobias Brunner 2013-09-05 16:46:24 +02:00
parent 947b76cda8
commit f3cb889c9b
1 changed files with 119 additions and 7 deletions

View File

@ -50,6 +50,11 @@ struct private_mysql_database_t {
*/
linked_list_t *pool;
/**
* thread-specific transaction, as transaction_t
*/
thread_value_t *transaction;
/**
* mutex to lock pool
*/
@ -99,6 +104,28 @@ struct conn_t {
bool in_use;
};
/**
* database transaction
*/
typedef struct {
/**
* Reference to the specific connection we started the transaction on
*/
conn_t *conn;
/**
* Refcounter if transaction() is called multiple times
*/
refcount_t refs;
/**
* TRUE if transaction was rolled back
*/
bool rollback;
} transaction_t;
/**
* Release a mysql connection
*/
@ -109,6 +136,16 @@ static void conn_release(private_mysql_database_t *this, conn_t *conn)
this->mutex->unlock(this->mutex);
}
/**
* Destroy a transaction and release the connection
*/
static void transaction_destroy(private_mysql_database_t *this,
transaction_t *trans)
{
conn_release(this, trans->conn);
free(trans);
}
/**
* thread specific initialization flag
*/
@ -161,13 +198,24 @@ static void conn_destroy(conn_t *this)
/**
* Acquire/Reuse a mysql connection
*/
static conn_t *conn_get(private_mysql_database_t *this)
static conn_t *conn_get(private_mysql_database_t *this, transaction_t **trans)
{
conn_t *current, *found = NULL;
enumerator_t *enumerator;
transaction_t *transaction;
thread_initialize();
transaction = this->transaction->get(this->transaction);
if (transaction)
{
if (trans)
{
*trans = transaction;
}
return transaction->conn;
}
while (TRUE)
{
this->mutex->lock(this->mutex);
@ -490,7 +538,7 @@ METHOD(database_t, query, enumerator_t*,
mysql_enumerator_t *enumerator = NULL;
conn_t *conn;
conn = conn_get(this);
conn = conn_get(this, NULL);
if (!conn)
{
return NULL;
@ -582,7 +630,7 @@ METHOD(database_t, execute, int,
conn_t *conn;
int affected = -1;
conn = conn_get(this);
conn = conn_get(this, NULL);
if (!conn)
{
return -1;
@ -606,19 +654,81 @@ METHOD(database_t, execute, int,
METHOD(database_t, transaction, bool,
private_mysql_database_t *this)
{
return FALSE;
transaction_t *trans = NULL;
conn_t *conn;
conn = conn_get(this, &trans);
if (!conn)
{
return FALSE;
}
else if (trans)
{
ref_get(&trans->refs);
return TRUE;
}
/* these statements are not supported in prepared statements that are used
* by the execute() method */
if (mysql_query(conn->mysql, "START TRANSACTION") != 0)
{
DBG1(DBG_LIB, "starting transaction failed: %s",
mysql_error(conn->mysql));
conn_release(this, conn);
return FALSE;
}
INIT(trans,
.conn = conn,
.refs = 1,
);
this->transaction->set(this->transaction, trans);
return TRUE;
}
/**
* Finalize a transaction depending on the reference count and if it should be
* rolled back.
*/
static bool finalize_transaction(private_mysql_database_t *this,
bool rollback)
{
transaction_t *trans;
char *command = "COMMIT";
bool success;
trans = this->transaction->get(this->transaction);
if (!trans)
{
DBG1(DBG_LIB, "no database transaction found");
return FALSE;
}
/* set flag, can't be unset */
trans->rollback |= rollback;
if (ref_put(&trans->refs))
{
if (trans->rollback)
{
command = "ROLLBACK";
}
success = mysql_query(trans->conn->mysql, command) == 0;
this->transaction->set(this->transaction, NULL);
transaction_destroy(this, trans);
return success;
}
return TRUE;
}
METHOD(database_t, commit, bool,
private_mysql_database_t *this)
{
return FALSE;
return finalize_transaction(this, FALSE);
}
METHOD(database_t, rollback, bool,
private_mysql_database_t *this)
{
return FALSE;
return finalize_transaction(this, TRUE);
}
METHOD(database_t, get_driver,db_driver_t,
@ -630,6 +740,7 @@ METHOD(database_t, get_driver,db_driver_t,
METHOD(database_t, destroy, void,
private_mysql_database_t *this)
{
this->transaction->destroy(this->transaction);
this->pool->destroy_function(this->pool, (void*)conn_destroy);
this->mutex->destroy(this->mutex);
free(this->host);
@ -721,9 +832,10 @@ mysql_database_t *mysql_database_create(char *uri)
}
this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
this->pool = linked_list_create();
this->transaction = thread_value_create(NULL);
/* check connectivity */
conn = conn_get(this);
conn = conn_get(this, NULL);
if (!conn)
{
destroy(this);