diff options
author | Andreas Baumann <abaumann@yahoo.com> | 2012-04-18 20:49:27 +0200 |
---|---|---|
committer | Andreas Baumann <abaumann@yahoo.com> | 2012-04-18 20:49:27 +0200 |
commit | 33e10221f515899116faa30f0d79d34b5f0bfff6 (patch) | |
tree | aca87b4870111c70a7b55b72c9abb827998627fc /pgsql.c | |
parent | 6e8858e71000ef3c72b7ba1d696f8ea6f39f7202 (diff) | |
download | pgfuse-33e10221f515899116faa30f0d79d34b5f0bfff6.tar.gz pgfuse-33e10221f515899116faa30f0d79d34b5f0bfff6.tar.bz2 |
got a first working version without a maxsize limit
Diffstat (limited to 'pgsql.c')
-rw-r--r-- | pgsql.c | 281 |
1 files changed, 256 insertions, 25 deletions
@@ -26,6 +26,8 @@ #include <stdint.h> /* for uint64_t */ #include <endian.h> /* for be64toh (GNU/BSD-ism, but easy to port if necessary) */ +#include "config.h" /* compiled in defaults */ + /* --- helper functions --- */ /* January 1, 2000, 00:00:00 UTC (in Unix epoch seconds) */ @@ -49,6 +51,41 @@ static struct timespec convert_from_timestamp( uint64_t raw ) return ts; } +/* block information for read/write/truncate */ +typedef struct PgDataInfo { + unsigned int from_block; + off_t from_offset; + size_t from_len; + unsigned int to_block; + size_t to_len; +} PgDataInfo; + +static PgDataInfo compute_block_info( off_t offset, size_t len ) +{ + PgDataInfo info; + int nof_blocks; + + info.from_block = offset / STANDARD_BLOCK_SIZE; + info.from_offset = offset % STANDARD_BLOCK_SIZE; + + nof_blocks = ( info.from_offset + len ) / STANDARD_BLOCK_SIZE; + if( nof_blocks == 0 ) { + info.from_len = len; + } else { + info.from_len = STANDARD_BLOCK_SIZE; + } + + info.to_block = info.from_block + nof_blocks; + info.to_len = ( info.from_offset + len ) % STANDARD_BLOCK_SIZE; + + if( info.to_len == 0 ) { + info.to_block--; + info.to_len = STANDARD_BLOCK_SIZE; + } + + return info; +} + /* --- postgresql implementation --- */ int psql_get_meta( PGconn *conn, const char *path, PgMeta *meta ) @@ -178,18 +215,25 @@ int psql_create_file( PGconn *conn, const int parent_id, const char *path, const return 0; } -int psql_read_buf( PGconn *conn, const int id, const char *path, char **buf, const size_t len ) +int psql_read_buf( PGconn *conn, const int id, const char *path, char *buf, const off_t offset, const size_t len, int verbose ) { + PgDataInfo info = compute_block_info( offset, len ); int param1 = htonl( id ); - const char *values[1] = { (char *)¶m1 }; - int lengths[2] = { sizeof( param1 ) }; - int binary[2] = { 1, 1 }; + const char *values[3] = { (const char *)¶m1, (const char *)&info.from_block, (const char *)&info.to_block }; + int lengths[3] = { sizeof( param1 ), sizeof( info.from_block ), sizeof( info.to_block ) }; + int binary[3] = { 1, 1, 1 }; PGresult *res; - int i_data; - int read; - - res = PQexecParams( conn, "SELECT data FROM DATA WHERE id=$1::int4", - 1, NULL, values, lengths, binary, 1 ); + char zero_block[STANDARD_BLOCK_SIZE]; + int block_no; + char *iptr; + char *data; + size_t copied; + int db_block_no; + int idx; + char *dst; + + res = PQexecParams( conn, "SELECT block_no, data FROM data WHERE dir_id=$1::int4 AND block_no>=$2::int4 AND block_no<=$3::int4 ORDER BY block_no ASC", + 3, NULL, values, lengths, binary, 1 ); if( PQresultStatus( res ) != PGRES_TUPLES_OK ) { syslog( LOG_ERR, "Error in psql_read_buf for path '%s'", path ); @@ -197,19 +241,61 @@ int psql_read_buf( PGconn *conn, const int id, const char *path, char **buf, con return -EIO; } - if( PQntuples( res ) != 1 ) { - syslog( LOG_ERR, "Expecting exactly one data entry for path '%s' in psql_read_buf, data inconsistent!", path ); - PQclear( res ); - return -EIO; + memset( zero_block, 0, STANDARD_BLOCK_SIZE ); + + dst = buf; + copied = 0; + for( block_no = info.from_block, idx = 0; block_no <= info.to_block; block_no++ ) { + + /* handle sparse files */ + if( idx < PQntuples( res ) ) { + iptr = PQgetvalue( res, idx, 0 ); + db_block_no = ntohl( *( (uint32_t *)iptr ) ); + + if( block_no < db_block_no ) { + data = zero_block; + } else { + data = PQgetvalue( res, idx, 1 ); + } + } else { + data = zero_block; + } + + if( verbose ) { + syslog( LOG_DEBUG, "File '%s', reading block '%d', copied: '%d', DB block: '%d'", + path, block_no, copied, db_block_no ); + } + + /* first block */ + if( block_no == info.from_block ) { + + memcpy( dst, data + info.from_offset, info.from_len ); + + dst += info.from_len; + copied += info.from_len; + + /* last block */ + } else if( block_no == info.to_block ) { + + memcpy( dst, data, info.to_len ); + copied += info.to_len; + + /* intermediary blocks, are copied completly */ + } else { + + memcpy( dst, data, STANDARD_BLOCK_SIZE ); + dst += STANDARD_BLOCK_SIZE; + copied += STANDARD_BLOCK_SIZE; + } + + if( block_no == db_block_no ) { + idx++; + } } - - i_data = PQfnumber( res, "data" ); - read = PQgetlength( res, 0, i_data ); - memcpy( *buf, PQgetvalue( res, 0, i_data ), read ); PQclear( res ); - return read; + return copied; } int psql_readdir( PGconn *conn, const int parent_id, void *buf, fuse_fill_dir_t filler ) @@ -334,7 +420,8 @@ int psql_delete_file( PGconn *conn, const int id, const char *path ) 1, NULL, values, lengths, binary, 1 ); if( PQresultStatus( res ) != PGRES_COMMAND_OK ) { - syslog( LOG_ERR, "Error in psql_delete_dir for path '%s': %s", path, PQerrorMessage( conn ) ); + syslog( LOG_ERR, "Error in psql_delete_dir for path '%s': %s", + path, PQerrorMessage( conn ) ); PQclear( res ); return -EIO; } @@ -344,24 +431,168 @@ int psql_delete_file( PGconn *conn, const int id, const char *path ) return 0; } -int psql_write_buf( PGconn *conn, const int id, const char *path, const char *buf, const size_t len ) +static int psql_write_block( PGconn *conn, const int id, const char *path, const char *buf, const unsigned int block_no, const off_t offset, const size_t len, int verbose ) { int param1 = htonl( id ); - const char *values[2] = { (char *)¶m1, buf }; - int lengths[2] = { sizeof( param1 ), len }; - int binary[2] = { 1, 1 }; + int param2 = htonl( block_no ); + const char *values[3] = { (const char *)¶m1, (const char *)¶m2, buf + offset }; + int lengths[3] = { sizeof( param1 ), sizeof( param2 ), len }; + int binary[3] = { 1, 1, 1 }; PGresult *res; + char sql[256]; + + /* could actually be an assertion, as this can never happen */ + if( offset + len > STANDARD_BLOCK_SIZE ) { + syslog( LOG_ERR, "Got a too big block write for file '%s': %u + %u > %d!", + path, (unsigned int)offset, (unsigned int)len, STANDARD_BLOCK_SIZE ); + return -EIO; + } + +update_again: + + /* write a complete block, old data in the database doesn't bother us */ + if( offset == 0 && len == STANDARD_BLOCK_SIZE ) { + + strcpy( sql, "UPDATE data set data = $3::bytea WHERE dir_id=$1::int4 AND block_no=$2::int4" ); + + /* small in the middle write, keep data on both sides */ + } else if( offset > 0 && offset + len < STANDARD_BLOCK_SIZE ) { + + sprintf( sql, "UPDATE data set data = substring( data from %d for %d ) || $3::bytea || substring( data from %d for %d ) WHERE dir_id=$1::int4 AND block_no=$2::int4", + 0, (unsigned int)offset, + (unsigned int)offset + len, STANDARD_BLOCK_SIZE - ( (unsigned int)offset + len ) ); + + /* keep data on the right */ + } else if( offset == 0 && len < STANDARD_BLOCK_SIZE ) { + + sprintf( sql, "UPDATE data set data = $3::bytea || substring( data from %d for %d ) WHERE dir_id=$1::int4 AND block_no=$2::int4", + len, STANDARD_BLOCK_SIZE - len ); + + /* keep data on the left */ + } else if( offset > 0 && offset + len == STANDARD_BLOCK_SIZE ) { + + sprintf( sql, "UPDATE data set data = substring( data from %d for %d ) || $3::bytea WHERE dir_id=$1::int4 AND block_no=$2::int4", + 0, (unsigned int)offset ); + + /* we should never get here */ + } else { + syslog( LOG_ERR, "Unhandled write case for file '%s' in block '%d': offset: %u, len: %u, blocksize: %u", + path, block_no, (unsigned int)offset, (unsigned int)len, STANDARD_BLOCK_SIZE ); + return -EIO; + } + + if( verbose ) { + syslog( LOG_DEBUG, "%s, block: %d, offset: %d, len: %d => %s\n", + path, block_no, (unsigned int)offset, len, sql ); + } + + res = PQexecParams( conn, sql, 3, NULL, values, lengths, binary, 1 ); + + if( PQresultStatus( res ) != PGRES_COMMAND_OK ) { + syslog( LOG_ERR, "Error in psql_write_block(%d,%u,%u) for file '%s' (%s): %s", + block_no, (unsigned int)offset, (unsigned int)len, path, + sql, PQerrorMessage( conn ) ); + PQclear( res ); + return -EIO; + } + + /* ok, one block updated */ + if( atoi( PQcmdTuples( res ) ) == 1 ) { + PQclear( res ); + return len; + } + + /* funny problems */ + if( atoi( PQcmdTuples( res ) ) != 0 ) { + syslog( LOG_ERR, "Unable to update block '%d' of file '%s'! Data consistency problems!", + block_no, path ); + PQclear( res ); + return -EIO; + } - res = PQexecParams( conn, "UPDATE data SET data=$2::bytea WHERE id=$1::int4", + PQclear( res ); + + /* the block didn't exist, so create one */ + res = PQexecParams( conn, "INSERT INTO data( dir_id, block_no ) VALUES" + " ( $1::int4, $2::int4 )", 2, NULL, values, lengths, binary, 1 ); if( PQresultStatus( res ) != PGRES_COMMAND_OK ) { - syslog( LOG_ERR, "Error in psql_write_buf for file '%s': %s", path, PQerrorMessage( conn ) ); + syslog( LOG_ERR, "Error in psql_write_block(%d,%u,%u) for file '%s' allocating new block '%d': %s", + block_no, (unsigned int)offset, (unsigned int)len, path, block_no, PQerrorMessage( conn ) ); + PQclear( res ); + return -EIO; + } + + if( atoi( PQcmdTuples( res ) ) != 1 ) { + syslog( LOG_ERR, "Unable to add new block '%d' of file '%s'! Data consistency problems!", + block_no, path ); PQclear( res ); return -EIO; } PQclear( res ); + goto update_again; +} + +int psql_write_buf( PGconn *conn, const int id, const char *path, const char *buf, const off_t offset, const size_t len, int verbose ) +{ + PgDataInfo info; + int res; + unsigned int block_no; + + if( len == 0 ) return 0; + + info = compute_block_info( offset, len ); + + /* first (partial) block */ + res = psql_write_block( conn, id, path, buf, info.from_block, info.from_offset, info.from_len, verbose ); + if( res < 0 ) { + return res; + } + if( res != info.from_len ) { + syslog( LOG_ERR, "Partial write in file '%s' in first block '%d' (%u instead of %u octets)", + path, info.from_block, res, info.from_len ); + return -EIO; + } + + /* special case of one block */ + if( info.from_block == info.to_block ) { + return res; + } + + buf += info.from_len; + + /* all full blocks */ + for( block_no = info.from_block + 1; block_no < info.to_block; block_no++ ) { + res = psql_write_block( conn, id, path, buf, block_no, 0, STANDARD_BLOCK_SIZE, verbose ); + if( res < 0 ) { + return res; + } + if( res != STANDARD_BLOCK_SIZE ) { + syslog( LOG_ERR, "Partial write in file '%s' in block '%d' (%u instead of %u octets)", + path, block_no, res, STANDARD_BLOCK_SIZE ); + return -EIO; + } + buf += STANDARD_BLOCK_SIZE; + } + + /* last partial block */ + res = psql_write_block( conn, id, path, buf, info.to_block, 0, info.to_len, verbose ); + if( res < 0 ) { + return res; + } + if( res != info.to_len ) { + syslog( LOG_ERR, "Partial write in file '%s' in last block '%d' (%u instead of %u octets)", + path, block_no, res, info.to_len ); + return -EIO; + } + return len; } + +int psql_truncate( PGconn *conn, const int id, const char *path, const off_t offset ) +{ + return 0; +} |