summaryrefslogtreecommitdiff
path: root/pgsql.c
diff options
context:
space:
mode:
authorAndreas Baumann <abaumann@yahoo.com>2012-04-18 20:49:27 +0200
committerAndreas Baumann <abaumann@yahoo.com>2012-04-18 20:49:27 +0200
commit33e10221f515899116faa30f0d79d34b5f0bfff6 (patch)
treeaca87b4870111c70a7b55b72c9abb827998627fc /pgsql.c
parent6e8858e71000ef3c72b7ba1d696f8ea6f39f7202 (diff)
downloadpgfuse-33e10221f515899116faa30f0d79d34b5f0bfff6.tar.gz
pgfuse-33e10221f515899116faa30f0d79d34b5f0bfff6.tar.bz2
got a first working version without a maxsize limit
Diffstat (limited to 'pgsql.c')
-rw-r--r--pgsql.c281
1 files changed, 256 insertions, 25 deletions
diff --git a/pgsql.c b/pgsql.c
index 44a09cd..9f7cc71 100644
--- a/pgsql.c
+++ b/pgsql.c
@@ -26,6 +26,8 @@
#include <stdint.h> /* for uint64_t */
#include <endian.h> /* for be64toh (GNU/BSD-ism, but easy to port if necessary) */
+#include "config.h" /* compiled in defaults */
+
/* --- helper functions --- */
/* January 1, 2000, 00:00:00 UTC (in Unix epoch seconds) */
@@ -49,6 +51,41 @@ static struct timespec convert_from_timestamp( uint64_t raw )
return ts;
}
+/* block information for read/write/truncate */
+typedef struct PgDataInfo {
+ unsigned int from_block;
+ off_t from_offset;
+ size_t from_len;
+ unsigned int to_block;
+ size_t to_len;
+} PgDataInfo;
+
+static PgDataInfo compute_block_info( off_t offset, size_t len )
+{
+ PgDataInfo info;
+ int nof_blocks;
+
+ info.from_block = offset / STANDARD_BLOCK_SIZE;
+ info.from_offset = offset % STANDARD_BLOCK_SIZE;
+
+ nof_blocks = ( info.from_offset + len ) / STANDARD_BLOCK_SIZE;
+ if( nof_blocks == 0 ) {
+ info.from_len = len;
+ } else {
+ info.from_len = STANDARD_BLOCK_SIZE;
+ }
+
+ info.to_block = info.from_block + nof_blocks;
+ info.to_len = ( info.from_offset + len ) % STANDARD_BLOCK_SIZE;
+
+ if( info.to_len == 0 ) {
+ info.to_block--;
+ info.to_len = STANDARD_BLOCK_SIZE;
+ }
+
+ return info;
+}
+
/* --- postgresql implementation --- */
int psql_get_meta( PGconn *conn, const char *path, PgMeta *meta )
@@ -178,18 +215,25 @@ int psql_create_file( PGconn *conn, const int parent_id, const char *path, const
return 0;
}
-int psql_read_buf( PGconn *conn, const int id, const char *path, char **buf, const size_t len )
+int psql_read_buf( PGconn *conn, const int id, const char *path, char *buf, const off_t offset, const size_t len, int verbose )
{
+ PgDataInfo info = compute_block_info( offset, len );
int param1 = htonl( id );
- const char *values[1] = { (char *)&param1 };
- int lengths[2] = { sizeof( param1 ) };
- int binary[2] = { 1, 1 };
+ const char *values[3] = { (const char *)&param1, (const char *)&info.from_block, (const char *)&info.to_block };
+ int lengths[3] = { sizeof( param1 ), sizeof( info.from_block ), sizeof( info.to_block ) };
+ int binary[3] = { 1, 1, 1 };
PGresult *res;
- int i_data;
- int read;
-
- res = PQexecParams( conn, "SELECT data FROM DATA WHERE id=$1::int4",
- 1, NULL, values, lengths, binary, 1 );
+ char zero_block[STANDARD_BLOCK_SIZE];
+ int block_no;
+ char *iptr;
+ char *data;
+ size_t copied;
+ int db_block_no;
+ int idx;
+ char *dst;
+
+ res = PQexecParams( conn, "SELECT block_no, data FROM data WHERE dir_id=$1::int4 AND block_no>=$2::int4 AND block_no<=$3::int4 ORDER BY block_no ASC",
+ 3, NULL, values, lengths, binary, 1 );
if( PQresultStatus( res ) != PGRES_TUPLES_OK ) {
syslog( LOG_ERR, "Error in psql_read_buf for path '%s'", path );
@@ -197,19 +241,61 @@ int psql_read_buf( PGconn *conn, const int id, const char *path, char **buf, con
return -EIO;
}
- if( PQntuples( res ) != 1 ) {
- syslog( LOG_ERR, "Expecting exactly one data entry for path '%s' in psql_read_buf, data inconsistent!", path );
- PQclear( res );
- return -EIO;
+ memset( zero_block, 0, STANDARD_BLOCK_SIZE );
+
+ dst = buf;
+ copied = 0;
+ for( block_no = info.from_block, idx = 0; block_no <= info.to_block; block_no++ ) {
+
+ /* handle sparse files */
+ if( idx < PQntuples( res ) ) {
+ iptr = PQgetvalue( res, idx, 0 );
+ db_block_no = ntohl( *( (uint32_t *)iptr ) );
+
+ if( block_no < db_block_no ) {
+ data = zero_block;
+ } else {
+ data = PQgetvalue( res, idx, 1 );
+ }
+ } else {
+ data = zero_block;
+ }
+
+ if( verbose ) {
+ syslog( LOG_DEBUG, "File '%s', reading block '%d', copied: '%d', DB block: '%d'",
+ path, block_no, copied, db_block_no );
+ }
+
+ /* first block */
+ if( block_no == info.from_block ) {
+
+ memcpy( dst, data + info.from_offset, info.from_len );
+
+ dst += info.from_len;
+ copied += info.from_len;
+
+ /* last block */
+ } else if( block_no == info.to_block ) {
+
+ memcpy( dst, data, info.to_len );
+ copied += info.to_len;
+
+ /* intermediary blocks, are copied completly */
+ } else {
+
+ memcpy( dst, data, STANDARD_BLOCK_SIZE );
+ dst += STANDARD_BLOCK_SIZE;
+ copied += STANDARD_BLOCK_SIZE;
+ }
+
+ if( block_no == db_block_no ) {
+ idx++;
+ }
}
-
- i_data = PQfnumber( res, "data" );
- read = PQgetlength( res, 0, i_data );
- memcpy( *buf, PQgetvalue( res, 0, i_data ), read );
PQclear( res );
- return read;
+ return copied;
}
int psql_readdir( PGconn *conn, const int parent_id, void *buf, fuse_fill_dir_t filler )
@@ -334,7 +420,8 @@ int psql_delete_file( PGconn *conn, const int id, const char *path )
1, NULL, values, lengths, binary, 1 );
if( PQresultStatus( res ) != PGRES_COMMAND_OK ) {
- syslog( LOG_ERR, "Error in psql_delete_dir for path '%s': %s", path, PQerrorMessage( conn ) );
+ syslog( LOG_ERR, "Error in psql_delete_dir for path '%s': %s",
+ path, PQerrorMessage( conn ) );
PQclear( res );
return -EIO;
}
@@ -344,24 +431,168 @@ int psql_delete_file( PGconn *conn, const int id, const char *path )
return 0;
}
-int psql_write_buf( PGconn *conn, const int id, const char *path, const char *buf, const size_t len )
+static int psql_write_block( PGconn *conn, const int id, const char *path, const char *buf, const unsigned int block_no, const off_t offset, const size_t len, int verbose )
{
int param1 = htonl( id );
- const char *values[2] = { (char *)&param1, buf };
- int lengths[2] = { sizeof( param1 ), len };
- int binary[2] = { 1, 1 };
+ int param2 = htonl( block_no );
+ const char *values[3] = { (const char *)&param1, (const char *)&param2, buf + offset };
+ int lengths[3] = { sizeof( param1 ), sizeof( param2 ), len };
+ int binary[3] = { 1, 1, 1 };
PGresult *res;
+ char sql[256];
+
+ /* could actually be an assertion, as this can never happen */
+ if( offset + len > STANDARD_BLOCK_SIZE ) {
+ syslog( LOG_ERR, "Got a too big block write for file '%s': %u + %u > %d!",
+ path, (unsigned int)offset, (unsigned int)len, STANDARD_BLOCK_SIZE );
+ return -EIO;
+ }
+
+update_again:
+
+ /* write a complete block, old data in the database doesn't bother us */
+ if( offset == 0 && len == STANDARD_BLOCK_SIZE ) {
+
+ strcpy( sql, "UPDATE data set data = $3::bytea WHERE dir_id=$1::int4 AND block_no=$2::int4" );
+
+ /* small in the middle write, keep data on both sides */
+ } else if( offset > 0 && offset + len < STANDARD_BLOCK_SIZE ) {
+
+ sprintf( sql, "UPDATE data set data = substring( data from %d for %d ) || $3::bytea || substring( data from %d for %d ) WHERE dir_id=$1::int4 AND block_no=$2::int4",
+ 0, (unsigned int)offset,
+ (unsigned int)offset + len, STANDARD_BLOCK_SIZE - ( (unsigned int)offset + len ) );
+
+ /* keep data on the right */
+ } else if( offset == 0 && len < STANDARD_BLOCK_SIZE ) {
+
+ sprintf( sql, "UPDATE data set data = $3::bytea || substring( data from %d for %d ) WHERE dir_id=$1::int4 AND block_no=$2::int4",
+ len, STANDARD_BLOCK_SIZE - len );
+
+ /* keep data on the left */
+ } else if( offset > 0 && offset + len == STANDARD_BLOCK_SIZE ) {
+
+ sprintf( sql, "UPDATE data set data = substring( data from %d for %d ) || $3::bytea WHERE dir_id=$1::int4 AND block_no=$2::int4",
+ 0, (unsigned int)offset );
+
+ /* we should never get here */
+ } else {
+ syslog( LOG_ERR, "Unhandled write case for file '%s' in block '%d': offset: %u, len: %u, blocksize: %u",
+ path, block_no, (unsigned int)offset, (unsigned int)len, STANDARD_BLOCK_SIZE );
+ return -EIO;
+ }
+
+ if( verbose ) {
+ syslog( LOG_DEBUG, "%s, block: %d, offset: %d, len: %d => %s\n",
+ path, block_no, (unsigned int)offset, len, sql );
+ }
+
+ res = PQexecParams( conn, sql, 3, NULL, values, lengths, binary, 1 );
+
+ if( PQresultStatus( res ) != PGRES_COMMAND_OK ) {
+ syslog( LOG_ERR, "Error in psql_write_block(%d,%u,%u) for file '%s' (%s): %s",
+ block_no, (unsigned int)offset, (unsigned int)len, path,
+ sql, PQerrorMessage( conn ) );
+ PQclear( res );
+ return -EIO;
+ }
+
+ /* ok, one block updated */
+ if( atoi( PQcmdTuples( res ) ) == 1 ) {
+ PQclear( res );
+ return len;
+ }
+
+ /* funny problems */
+ if( atoi( PQcmdTuples( res ) ) != 0 ) {
+ syslog( LOG_ERR, "Unable to update block '%d' of file '%s'! Data consistency problems!",
+ block_no, path );
+ PQclear( res );
+ return -EIO;
+ }
- res = PQexecParams( conn, "UPDATE data SET data=$2::bytea WHERE id=$1::int4",
+ PQclear( res );
+
+ /* the block didn't exist, so create one */
+ res = PQexecParams( conn, "INSERT INTO data( dir_id, block_no ) VALUES"
+ " ( $1::int4, $2::int4 )",
2, NULL, values, lengths, binary, 1 );
if( PQresultStatus( res ) != PGRES_COMMAND_OK ) {
- syslog( LOG_ERR, "Error in psql_write_buf for file '%s': %s", path, PQerrorMessage( conn ) );
+ syslog( LOG_ERR, "Error in psql_write_block(%d,%u,%u) for file '%s' allocating new block '%d': %s",
+ block_no, (unsigned int)offset, (unsigned int)len, path, block_no, PQerrorMessage( conn ) );
+ PQclear( res );
+ return -EIO;
+ }
+
+ if( atoi( PQcmdTuples( res ) ) != 1 ) {
+ syslog( LOG_ERR, "Unable to add new block '%d' of file '%s'! Data consistency problems!",
+ block_no, path );
PQclear( res );
return -EIO;
}
PQclear( res );
+ goto update_again;
+}
+
+int psql_write_buf( PGconn *conn, const int id, const char *path, const char *buf, const off_t offset, const size_t len, int verbose )
+{
+ PgDataInfo info;
+ int res;
+ unsigned int block_no;
+
+ if( len == 0 ) return 0;
+
+ info = compute_block_info( offset, len );
+
+ /* first (partial) block */
+ res = psql_write_block( conn, id, path, buf, info.from_block, info.from_offset, info.from_len, verbose );
+ if( res < 0 ) {
+ return res;
+ }
+ if( res != info.from_len ) {
+ syslog( LOG_ERR, "Partial write in file '%s' in first block '%d' (%u instead of %u octets)",
+ path, info.from_block, res, info.from_len );
+ return -EIO;
+ }
+
+ /* special case of one block */
+ if( info.from_block == info.to_block ) {
+ return res;
+ }
+
+ buf += info.from_len;
+
+ /* all full blocks */
+ for( block_no = info.from_block + 1; block_no < info.to_block; block_no++ ) {
+ res = psql_write_block( conn, id, path, buf, block_no, 0, STANDARD_BLOCK_SIZE, verbose );
+ if( res < 0 ) {
+ return res;
+ }
+ if( res != STANDARD_BLOCK_SIZE ) {
+ syslog( LOG_ERR, "Partial write in file '%s' in block '%d' (%u instead of %u octets)",
+ path, block_no, res, STANDARD_BLOCK_SIZE );
+ return -EIO;
+ }
+ buf += STANDARD_BLOCK_SIZE;
+ }
+
+ /* last partial block */
+ res = psql_write_block( conn, id, path, buf, info.to_block, 0, info.to_len, verbose );
+ if( res < 0 ) {
+ return res;
+ }
+ if( res != info.to_len ) {
+ syslog( LOG_ERR, "Partial write in file '%s' in last block '%d' (%u instead of %u octets)",
+ path, block_no, res, info.to_len );
+ return -EIO;
+ }
+
return len;
}
+
+int psql_truncate( PGconn *conn, const int id, const char *path, const off_t offset )
+{
+ return 0;
+}