diff options
-rw-r--r-- | DEVELOPERS | 6 | ||||
-rw-r--r-- | pgfuse.c | 190 | ||||
-rw-r--r-- | pool.c | 23 | ||||
-rw-r--r-- | pool.h | 6 |
4 files changed, 119 insertions, 106 deletions
@@ -112,3 +112,9 @@ commands of the shell). bonnie is a good stress and performance tester. Don't despair because of poor performance, that's normal. :-) +Several shells in parallel doing: + +while(true);do + mkdir mnt/bla + rmdir mnt/bla +done @@ -50,7 +50,7 @@ typedef struct PgFuseData { int multi_threaded; /* whether we run multi-threaded */ } PgFuseData; -/* --- helper functions --- */ +/* --- timestamp helpers --- */ static struct timespec now( void ) { @@ -72,16 +72,43 @@ static struct timespec now( void ) return s; } +/* --- pool helpers --- */ + +static PGconn *psql_acquire( PgFuseData *data ) +{ + if( !data->multi_threaded ) { + return data->conn; + } + + return psql_pool_acquire( &data->pool ); +} + +static int psql_release( PgFuseData *data, PGconn *conn ) +{ + if( !data->multi_threaded ) return 0; + + return psql_pool_release( &data->pool, conn ); +} + +#define ACQUIRE( C ) \ + C = psql_acquire( data ); \ + if( C == NULL ) return -EIO; + +#define RELEASE( C ) \ + if( psql_release( data, C ) < 0 ) return -EIO; + +#define THREAD_ID (unsigned int)pthread_self( ) + /* --- implementation of FUSE hooks --- */ static void *pgfuse_init( struct fuse_conn_info *conn ) { PgFuseData *data = (PgFuseData *)fuse_get_context( )->private_data; - syslog( LOG_INFO, "Mounting file system on '%s' ('%s', %s), thread #%d", + syslog( LOG_INFO, "Mounting file system on '%s' ('%s', %s), thread #%u", data->mountpoint, data->conninfo, data->read_only ? "read-only" : "read-write", - fuse_get_context( )->uid ); + THREAD_ID ); /* in single-threaded case we just need one shared PostgreSQL connection */ if( !data->multi_threaded ) { @@ -108,8 +135,8 @@ static void pgfuse_destroy( void *userdata ) { PgFuseData *data = (PgFuseData *)userdata; - syslog( LOG_INFO, "Unmounting file system on '%s' (%s), thread #%d", - data->mountpoint, data->conninfo, fuse_get_context( )->uid ); + syslog( LOG_INFO, "Unmounting file system on '%s' (%s), thread #%u", + data->mountpoint, data->conninfo, THREAD_ID ); if( !data->multi_threaded ) { PQfinish( data->conn ); @@ -118,29 +145,6 @@ static void pgfuse_destroy( void *userdata ) } } -static PGconn *psql_acquire( PgFuseData *data ) -{ - if( !data->multi_threaded ) { - return data->conn; - } - - return psql_pool_acquire( &data->pool, fuse_get_context( )->pid ); -} - -static void psql_release( PgFuseData *data ) -{ - if( !data->multi_threaded ) return; - - (void)psql_pool_release( &data->pool, fuse_get_context( )->pid ); -} - -#define ACQUIRE( C ) \ - C = psql_acquire( data ); \ - if( C == NULL ) return -EIO; - -#define RELEASE( C ) \ - (void)psql_release( data ) - static int pgfuse_fgetattr( const char *path, struct stat *stbuf, struct fuse_file_info *fi ) { PgFuseData *data = (PgFuseData *)fuse_get_context( )->private_data; @@ -149,8 +153,8 @@ static int pgfuse_fgetattr( const char *path, struct stat *stbuf, struct fuse_fi PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "FgetAttrs '%s' on '%s', thread #%d", - path, data->mountpoint, fuse_get_context( )->uid ); + syslog( LOG_INFO, "FgetAttrs '%s' on '%s', thread #%u", + path, data->mountpoint, THREAD_ID ); } ACQUIRE( conn ); @@ -165,9 +169,9 @@ static int pgfuse_fgetattr( const char *path, struct stat *stbuf, struct fuse_fi } if( data->verbose ) { - syslog( LOG_DEBUG, "Id for %s '%s' is %d, thread #%d", + syslog( LOG_DEBUG, "Id for %s '%s' is %d, thread #%u", S_ISDIR( meta.mode ) ? "dir" : "file", path, id, - fuse_get_context( )->uid ); + THREAD_ID ); } stbuf->st_ino = id; @@ -182,7 +186,7 @@ static int pgfuse_fgetattr( const char *path, struct stat *stbuf, struct fuse_fi stbuf->st_uid = meta.uid; stbuf->st_gid = meta.gid; - PSQL_COMMIT( conn ); + PSQL_COMMIT( conn ); RELEASE( conn ); return 0; } @@ -195,8 +199,8 @@ static int pgfuse_getattr( const char *path, struct stat *stbuf ) PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "GetAttrs '%s' on '%s', thread #%d", - path, data->mountpoint, fuse_get_context( )->uid ); + syslog( LOG_INFO, "GetAttrs '%s' on '%s', thread #%u", + path, data->mountpoint, THREAD_ID ); } ACQUIRE( conn ); @@ -211,9 +215,9 @@ static int pgfuse_getattr( const char *path, struct stat *stbuf ) } if( data->verbose ) { - syslog( LOG_DEBUG, "Id for %s '%s' is %d, thread #%d", + syslog( LOG_DEBUG, "Id for %s '%s' is %d, thread #%u", S_ISDIR( meta.mode ) ? "dir" : "file", path, id, - fuse_get_context( )->uid ); + THREAD_ID ); } stbuf->st_ino = id; @@ -241,8 +245,8 @@ static int pgfuse_access( const char *path, int mode ) PgFuseData *data = (PgFuseData *)fuse_get_context( )->private_data; if( data->verbose ) { - syslog( LOG_INFO, "Access on '%s' and mode '%o, thread #%d", - path, (unsigned int)mode, fuse_get_context( )->uid ); + syslog( LOG_INFO, "Access on '%s' and mode '%o, thread #%u", + path, (unsigned int)mode, THREAD_ID ); } /* TODO: check access, but not now. grant always access */ @@ -285,8 +289,8 @@ static int pgfuse_create( const char *path, mode_t mode, struct fuse_file_info * if( data->verbose ) { char *s = flags_to_string( fi->flags ); - syslog( LOG_INFO, "Create '%s' in mode '%o' on '%s' with flags '%s', thread #%d", - path, mode, data->mountpoint, s, fuse_get_context( )->uid ); + syslog( LOG_INFO, "Create '%s' in mode '%o' on '%s' with flags '%s', thread #%u", + path, mode, data->mountpoint, s, THREAD_ID ); if( *s != '<' ) free( s ); } @@ -306,8 +310,8 @@ static int pgfuse_create( const char *path, mode_t mode, struct fuse_file_info * if( id >= 0 ) { if( data->verbose ) { - syslog( LOG_DEBUG, "Id for dir '%s' is %d, thread #%d", - path, id, fuse_get_context( )->uid ); + syslog( LOG_DEBUG, "Id for dir '%s' is %d, thread #%u", + path, id, THREAD_ID ); } if( S_ISDIR(meta.mode ) ) { @@ -339,8 +343,8 @@ static int pgfuse_create( const char *path, mode_t mode, struct fuse_file_info * } if( data->verbose ) { - syslog( LOG_DEBUG, "Parent_id for new file '%s' in dir '%s' is %d, thread #%d", - path, parent_path, parent_id, fuse_get_context( )->uid ); + syslog( LOG_DEBUG, "Parent_id for new file '%s' in dir '%s' is %d, thread #%u", + path, parent_path, parent_id, THREAD_ID ); } free( copy_path ); @@ -379,8 +383,8 @@ static int pgfuse_create( const char *path, mode_t mode, struct fuse_file_info * } if( data->verbose ) { - syslog( LOG_DEBUG, "Id for new file '%s' is %d, thread #%d", - path, id, fuse_get_context( )->uid ); + syslog( LOG_DEBUG, "Id for new file '%s' is %d, thread #%u", + path, id, THREAD_ID ); } fi->fh = id; @@ -403,8 +407,8 @@ static int pgfuse_open( const char *path, struct fuse_file_info *fi ) if( data->verbose ) { char *s = flags_to_string( fi->flags ); - syslog( LOG_INFO, "Open '%s' on '%s' with flags '%s', thread #%d", - path, data->mountpoint, s, fuse_get_context( )->uid ); + syslog( LOG_INFO, "Open '%s' on '%s' with flags '%s', thread #%u", + path, data->mountpoint, s, THREAD_ID ); if( *s != '<' ) free( s ); } @@ -424,8 +428,8 @@ static int pgfuse_open( const char *path, struct fuse_file_info *fi ) } if( data->verbose ) { - syslog( LOG_DEBUG, "Id for file '%s' to open is %d, thread #%d", - path, id, fuse_get_context( )->uid ); + syslog( LOG_DEBUG, "Id for file '%s' to open is %d, thread #%u", + path, id, THREAD_ID ); } if( S_ISDIR( meta.mode ) ) { @@ -471,8 +475,8 @@ static int pgfuse_readdir( const char *path, void *buf, fuse_fill_dir_t filler, PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Readdir '%s' on '%s', thread #%d", - path, data->mountpoint, fuse_get_context( )->uid ); + syslog( LOG_INFO, "Readdir '%s' on '%s', thread #%u", + path, data->mountpoint, THREAD_ID ); } ACQUIRE( conn ); @@ -522,9 +526,9 @@ static int pgfuse_mkdir( const char *path, mode_t mode ) PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Mkdir '%s' in mode '%o' on '%s', thread #%d", + syslog( LOG_INFO, "Mkdir '%s' in mode '%o' on '%s', thread #%u", path, (unsigned int)mode, data->mountpoint, - fuse_get_context( )->uid ); + THREAD_ID ); } ACQUIRE( conn ); @@ -555,8 +559,8 @@ static int pgfuse_mkdir( const char *path, mode_t mode ) } if( data->verbose ) { - syslog( LOG_DEBUG, "Parent_id for new dir '%s' is %d, thread #%d", - path, parent_id, fuse_get_context( )->uid ); + syslog( LOG_DEBUG, "Parent_id for new dir '%s' is %d, thread #%u", + path, parent_id, THREAD_ID ); } free( copy_path ); @@ -602,8 +606,8 @@ static int pgfuse_rmdir( const char *path ) PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Rmdir '%s' on '%s', thread #%d", - path, data->mountpoint, fuse_get_context( )->uid ); + syslog( LOG_INFO, "Rmdir '%s' on '%s', thread #%u", + path, data->mountpoint, THREAD_ID ); } ACQUIRE( conn ); @@ -620,8 +624,8 @@ static int pgfuse_rmdir( const char *path ) } if( data->verbose ) { - syslog( LOG_DEBUG, "Id of dir '%s' to be removed is %d, thread #%d", - path, id, fuse_get_context( )->uid ); + syslog( LOG_DEBUG, "Id of dir '%s' to be removed is %d, thread #%u", + path, id, THREAD_ID ); } if( data->read_only ) { @@ -649,8 +653,8 @@ static int pgfuse_unlink( const char *path ) PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Remove file '%s' on '%s', thread #%d", - path, data->mountpoint, fuse_get_context( )->uid ); + syslog( LOG_INFO, "Remove file '%s' on '%s', thread #%u", + path, data->mountpoint, THREAD_ID ); } ACQUIRE( conn ); @@ -667,8 +671,8 @@ static int pgfuse_unlink( const char *path ) } if( data->verbose ) { - syslog( LOG_DEBUG, "Id of file '%s' to be removed is %d, thread #%d", - path, id, fuse_get_context( )->uid ); + syslog( LOG_DEBUG, "Id of file '%s' to be removed is %d, thread #%u", + path, id, THREAD_ID ); } if( data->read_only ) { @@ -705,9 +709,9 @@ static int pgfuse_fsync( const char *path, int isdatasync, struct fuse_file_info PgFuseData *data = (PgFuseData *)fuse_get_context( )->private_data; if( data->verbose ) { - syslog( LOG_INFO, "%s on file '%s' on '%s', thread #%d", + syslog( LOG_INFO, "%s on file '%s' on '%s', thread #%u", isdatasync ? "FDataSync" : "FSync", path, data->mountpoint, - fuse_get_context( )->uid ); + THREAD_ID ); } if( data->read_only ) { @@ -734,8 +738,8 @@ static int pgfuse_release( const char *path, struct fuse_file_info *fi ) PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Releasing '%s' on '%s', thread #%d", - path, data->mountpoint, fuse_get_context( )->uid ); + syslog( LOG_INFO, "Releasing '%s' on '%s', thread #%u", + path, data->mountpoint, THREAD_ID ); } ACQUIRE( conn ); @@ -779,9 +783,9 @@ static int pgfuse_write( const char *path, const char *buf, size_t size, PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Write to '%s' from offset %d, size %d on '%s', thread #%d", + syslog( LOG_INFO, "Write to '%s' from offset %d, size %d on '%s', thread #%u", path, (unsigned int)offset, (unsigned int)size, data->mountpoint, - fuse_get_context( )->uid ); + THREAD_ID ); } ACQUIRE( conn ); @@ -838,9 +842,9 @@ static int pgfuse_read( const char *path, char *buf, size_t size, PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Read to '%s' from offset %d, size %d on '%s', thread #%d", + syslog( LOG_INFO, "Read to '%s' from offset %d, size %d on '%s', thread #%u", path, (unsigned int)offset, (unsigned int)size, data->mountpoint, - fuse_get_context( )->uid ); + THREAD_ID ); } ACQUIRE( conn ); @@ -871,8 +875,8 @@ static int pgfuse_truncate( const char* path, off_t offset ) PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Truncate of '%s' to size '%d' on '%s', thread #%d", - path, (unsigned int)offset, data->mountpoint, fuse_get_context( )->uid ); + syslog( LOG_INFO, "Truncate of '%s' to size '%d' on '%s', thread #%u", + path, (unsigned int)offset, data->mountpoint, THREAD_ID ); } ACQUIRE( conn ); @@ -890,8 +894,8 @@ static int pgfuse_truncate( const char* path, off_t offset ) } if( data->verbose ) { - syslog( LOG_DEBUG, "Id of file '%s' to be truncated is %d, thread #%d", - path, id, fuse_get_context( )->uid ); + syslog( LOG_DEBUG, "Id of file '%s' to be truncated is %d, thread #%u", + path, id, THREAD_ID ); } if( data->read_only ) { @@ -925,9 +929,9 @@ static int pgfuse_ftruncate( const char *path, off_t offset, struct fuse_file_in PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Truncate of '%s' to size '%d' on '%s', thread #%d", + syslog( LOG_INFO, "Truncate of '%s' to size '%d' on '%s', thread #%u", path, (unsigned int)offset, data->mountpoint, - fuse_get_context( )->uid ); + THREAD_ID ); } ACQUIRE( conn ); @@ -973,8 +977,8 @@ static int pgfuse_statfs( const char *path, struct statvfs *buf ) PgFuseData *data = (PgFuseData *)fuse_get_context( )->private_data; if( data->verbose ) { - syslog( LOG_INFO, "Statfs called on '%s', thread #%d", - data->mountpoint, fuse_get_context( )->uid ); + syslog( LOG_INFO, "Statfs called on '%s', thread #%u", + data->mountpoint, THREAD_ID ); } /* Note: f_frsize, f_favail, f_fsid and f_flag are currently ignored by FUSE */ @@ -1009,9 +1013,9 @@ static int pgfuse_chmod( const char *path, mode_t mode ) PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Chmod on '%s' to mode '%o' on '%s', thread #%d", + syslog( LOG_INFO, "Chmod on '%s' to mode '%o' on '%s', thread #%u", path, (unsigned int)mode, data->mountpoint, - fuse_get_context( )->uid ); + THREAD_ID ); } ACQUIRE( conn ); @@ -1045,9 +1049,9 @@ static int pgfuse_chown( const char *path, uid_t uid, gid_t gid ) PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Chown on '%s' to uid '%d' and gid '%d' on '%s', thread #%d", + syslog( LOG_INFO, "Chown on '%s' to uid '%d' and gid '%d' on '%s', thread #%u", path, (unsigned int)uid, (unsigned int)gid, data->mountpoint, - fuse_get_context( )->uid ); + THREAD_ID ); } ACQUIRE( conn ); @@ -1086,8 +1090,8 @@ static int pgfuse_symlink( const char *from, const char *to ) PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Symlink from '%s' to '%s' on '%s', thread #%d", - from, to, data->mountpoint, fuse_get_context( )->uid ); + syslog( LOG_INFO, "Symlink from '%s' to '%s' on '%s', thread #%u", + from, to, data->mountpoint, THREAD_ID ); } ACQUIRE( conn ); @@ -1118,8 +1122,8 @@ static int pgfuse_symlink( const char *from, const char *to ) } if( data->verbose ) { - syslog( LOG_DEBUG, "Parent_id for symlink '%s' is %d, thread #%d", - to, parent_id, fuse_get_context( )->uid ); + syslog( LOG_DEBUG, "Parent_id for symlink '%s' is %d, thread #%u", + to, parent_id, THREAD_ID ); } free( copy_to ); @@ -1184,8 +1188,8 @@ static int pgfuse_readlink( const char *path, char *buf, size_t size ) PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Dereferencing symlink '%s' on '%s', thread #%d", - path, data->mountpoint, fuse_get_context( )->uid ); + syslog( LOG_INFO, "Dereferencing symlink '%s' on '%s', thread #%u", + path, data->mountpoint, THREAD_ID ); } ACQUIRE( conn ); @@ -1228,9 +1232,9 @@ static int pgfuse_utimens( const char *path, const struct timespec tv[2] ) PGconn *conn; if( data->verbose ) { - syslog( LOG_INFO, "Utimens on '%s' to access time '%d' and modification time '%d' on '%s', thread #%d", + syslog( LOG_INFO, "Utimens on '%s' to access time '%d' and modification time '%d' on '%s', thread #%u", path, (unsigned int)tv[0].tv_sec, (unsigned int)tv[1].tv_sec, data->mountpoint, - fuse_get_context( )->uid ); + THREAD_ID ); } ACQUIRE( conn ); @@ -36,7 +36,7 @@ int psql_pool_init( PgConnPool *pool, const char *conninfo, const size_t max_con return -ENOMEM; } - pool->avail = (pid_t *)malloc( sizeof( pid_t ) * max_connections ); + pool->avail = (pthread_t *)malloc( sizeof( pthread_t ) * max_connections ); if( pool->avail == NULL ) { free( pool->conns ); return -ENOMEM; @@ -102,8 +102,8 @@ int psql_pool_destroy( PgConnPool *pool ) if( pool->avail[i] == AVAILABLE ) { PQfinish( pool->conns[i] ); } else if( pool->avail[i] > 0 ) { - syslog( LOG_ERR, "Destroying pool connection to thread '%d' which is still in use", - pool->avail[i] ); + syslog( LOG_ERR, "Destroying pool connection to thread '%u' which is still in use", + (unsigned int)pool->avail[i] ); PQfinish( pool->conns[i] ); } } @@ -116,7 +116,7 @@ int psql_pool_destroy( PgConnPool *pool ) return ( res1 < 0 ) ? res1 : res2; } -PGconn *psql_pool_acquire( PgConnPool *pool , pid_t pid ) +PGconn *psql_pool_acquire( PgConnPool *pool ) { int res; size_t i; @@ -124,7 +124,8 @@ PGconn *psql_pool_acquire( PgConnPool *pool , pid_t pid ) for( ;; ) { res = pthread_mutex_lock( &pool->lock ); if( res < 0 ) { - syslog( LOG_ERR, "Locking mutex failed for thread '%d': %d", pid, res ); + syslog( LOG_ERR, "Locking mutex failed for thread '%u': %d", + (unsigned int)pthread_self( ), res ); return NULL; } @@ -132,7 +133,7 @@ PGconn *psql_pool_acquire( PgConnPool *pool , pid_t pid ) for( i = 0; i < pool->size; i++ ) { if( pool->avail[i] == AVAILABLE ) { if( PQstatus( pool->conns[i] ) == CONNECTION_OK ) { - pool->avail[i] = pid; + pool->avail[i] = pthread_self( ); (void)pthread_mutex_unlock( &pool->lock ); return pool->conns[i]; } else { @@ -144,7 +145,9 @@ PGconn *psql_pool_acquire( PgConnPool *pool , pid_t pid ) /* wait on conditional till a free connection is signalled */ res = pthread_cond_wait( &pool->cond, &pool->lock ); if( res < 0 ) { - syslog( LOG_ERR, "Error waiting for free condition in thread '%d': %d", pid, res ); + syslog( LOG_ERR, "Error waiting for free condition in thread '%u': %d", + (unsigned int)pthread_self( ), res ); + (void)pthread_mutex_unlock( &pool->lock ); return NULL; } @@ -155,7 +158,7 @@ PGconn *psql_pool_acquire( PgConnPool *pool , pid_t pid ) return NULL; } -int psql_pool_release( PgConnPool *pool, pid_t pid ) +int psql_pool_release( PgConnPool *pool, PGconn *conn ) { int res; int i; @@ -163,8 +166,8 @@ int psql_pool_release( PgConnPool *pool, pid_t pid ) res = pthread_mutex_lock( &pool->lock ); if( res < 0 ) return res; - for( i = pool->size-1; i >= 0; i-- ) { - if( pool->avail[i] == pid ) { + for( i = 1; i < pool->size; i++ ) { + if( pool->conns[i] == conn ) { break; } } @@ -27,7 +27,7 @@ typedef struct PgConnPool { PGconn **conns; /* array of connections */ size_t size; /* max number of connections */ - pid_t *avail; /* slots of allocated/available connections per thread */ + pthread_t *avail; /* slots of allocated/available connections per thread */ pthread_mutex_t lock; /* monitor lock */ pthread_cond_t cond; /* condition signalling a free connection */ } PgConnPool; @@ -36,8 +36,8 @@ int psql_pool_init( PgConnPool *pool, const char *conninfo, const size_t max_con int psql_pool_destroy( PgConnPool *pool ); -PGconn *psql_pool_acquire( PgConnPool *pool, pid_t pid ); +PGconn *psql_pool_acquire( PgConnPool *pool ); -int psql_pool_release( PgConnPool *pool, pid_t pid ); +int psql_pool_release( PgConnPool *pool, PGconn *conn ); #endif |