/* * sqlite3xx - sqlite3 C++ layer, following the ideas of libpqxx * Copyright (C) 2009 Andreas Baumann * * This copyrighted material is made available to anyone wishing to use, * modify, copy, or redistribute it subject to the terms and conditions of * the GNU Lesser General Public License, as published by the Free Software * Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this distribution; if not, write to: * Free Software Foundation, Inc. * 51 Franklin Street, Fifth Floor * Boston, MA 02110-1301 USA * */ #include #include #include "sqlite3xx/sqlite3xx" /* test needs 'unlink' from 'unistd.h' */ #if !defined _WIN32 #include #include #endif /* !defined _WIN32 */ #include "threads.h" using namespace sqlite3xx; using namespace std; const int NOF_PRODUCERS = 10; const int NOF_CONSUMERS = 10; const int NOF_PRODUCER_TRANSACTIONS = 10; const int NOF_PRODUCER_OPS = 10; const int NOF_CONSUMER_TRANSACTIONS = 10; const int NOF_CONSUMER_OPS = 10; #define UNUSED( x ) if( 0 && (x) ) { } static MUTEX_TYPE cout_mutex; static bool verbose = false; static bool tracing = false; static THREAD_FUNC_DECL produce( void *thread_data ) { uintptr_t no = (uintptr_t)thread_data; try { connection c( "test9.db" ); if( tracing ) c.trace( true ); PRODUCER_PREPARE_AGAIN: try { c.prepare( "ins", "insert into x values( ? )" )( "integer", prepare::treat_direct ); } catch( const database_locked& e ) { goto PRODUCER_PREPARE_AGAIN; } for( int i = 0; i < NOF_PRODUCER_TRANSACTIONS; i++ ) { work t( c, "ins" ); if( verbose ) { MUTEX_LOCK( cout_mutex ); cout << "producer " << no << " transaction " << i << " started" << endl; MUTEX_UNLOCK( cout_mutex ); } for( int j = 0; j < NOF_PRODUCER_OPS; j++ ) { PRODUCER_INS_AGAIN: try { result r = t.prepared( "ins" )( (int)no * 100000 + i * 1000 + j ).exec( ); assert( r.affected_rows( ) == 1 ); t.commit( ); } catch( const database_locked& e ) { goto PRODUCER_INS_AGAIN; } } if( verbose ) { MUTEX_LOCK( cout_mutex ); cout << "producer " << no << " transaction " << i << " terminated" << endl; MUTEX_UNLOCK( cout_mutex ); } } } catch( const sql_error& e ) { cerr << "producer error, " << e.what( ) << ": " << e.query( ) << endl; } THREAD_FUNC_RETURN; } static THREAD_FUNC_DECL consume( void *thread_data ) { uintptr_t no = (uintptr_t)thread_data; try { connection c( "test9.db" ); CONSUMER_PREPARE_AGAIN: try { c.prepare( "sel", "select * from x" ); } catch( const database_locked &e ) { goto CONSUMER_PREPARE_AGAIN; } if( tracing ) c.trace( true ); for( int i = 0; i < NOF_CONSUMER_TRANSACTIONS; i++ ) { work t( c, "sel" ); if( verbose ) { MUTEX_LOCK( cout_mutex ); cout << "consumer " << no << " transaction " << i << " started" << endl; MUTEX_UNLOCK( cout_mutex ); } for( int j = 0; j < NOF_CONSUMER_OPS; j++ ) { CONSUMER_SELECT_AGAIN: try { result r = t.prepared( "sel" ).exec( ); int nof_rows = 1; for( result::const_iterator it = r.begin( ); it < r.end( ); it++, nof_rows++ ) { int x; it["x"].to( x ); } if( verbose ) { MUTEX_LOCK( cout_mutex ); cout << "consumer " << no << " transaction " << i << " got " << nof_rows << " rows." << endl; MUTEX_UNLOCK( cout_mutex ); } t.commit( ); } catch( const database_locked& e ) { goto CONSUMER_SELECT_AGAIN; } } if( verbose ) { MUTEX_LOCK( cout_mutex ); cout << "consumer " << no << " transaction " << i << " terminated" << endl; MUTEX_UNLOCK( cout_mutex ); } } } catch( const sql_error& e ) { cerr << "consumer error, " << e.what( ) << ": " << e.query( ) << endl; } THREAD_FUNC_RETURN; } int main( ) { (void)unlink( "test9.db" ); try { cout << "creating DB.." << endl; connection c( "test9.db" ); if( tracing ) c.trace( true ); cout << "connection object is " << c << endl; cout << "create table.." << endl; c.exec( "create table x( x integer )" ); // now have some threads creating data and some others // consuming it. stupid, but should nicely stress the thing THREAD_TYPE prod[NOF_PRODUCERS]; THREAD_TYPE cons[NOF_CONSUMERS]; MUTEX_SETUP( cout_mutex ); for( intptr_t i = 0; i < NOF_PRODUCERS ; i++ ) { THREAD_CREATE( &prod[i], produce, (void *)i ); } for( intptr_t i = 0; i < NOF_CONSUMERS; i++ ) { THREAD_CREATE( &cons[i], consume, (void *)i ); } for( intptr_t i = 0; i < NOF_PRODUCERS; i++ ) { THREAD_JOIN( prod[i] ); } for( int i = 0; i < NOF_CONSUMERS; i++ ) { THREAD_JOIN( cons[i] ); } MUTEX_CLEANUP( cout_mutex ); result r = c.exec( "select count(*) from x" ); int count; r[0][0].to( count ); assert( count == NOF_PRODUCERS * NOF_PRODUCER_TRANSACTIONS * NOF_PRODUCER_OPS ); } catch( const sql_error& e ) { cerr << e.what( ) << ": " << e.query( ) << endl; } }