summaryrefslogtreecommitdiff
path: root/pgsql.c
diff options
context:
space:
mode:
authorAndreas Baumann <abaumann@yahoo.com>2012-05-06 12:51:36 +0200
committerAndreas Baumann <abaumann@yahoo.com>2012-05-06 12:51:36 +0200
commitbc26616b1af435f7c7509ff3f5b70bc3991015b2 (patch)
tree11fd917530d860b3e59bc5d2075729a90f464a72 /pgsql.c
parentb80d19d023149d81fc80ff873d96cf14617db06d (diff)
downloadpgfuse-bc26616b1af435f7c7509ff3f5b70bc3991015b2.tar.gz
pgfuse-bc26616b1af435f7c7509ff3f5b70bc3991015b2.tar.bz2
added mount option blocksize, can be set when creating the filsystem
(first write), schema.sql doesn't contain block size information anymore
Diffstat (limited to 'pgsql.c')
-rw-r--r--pgsql.c114
1 files changed, 74 insertions, 40 deletions
diff --git a/pgsql.c b/pgsql.c
index 753f9de..7ed2d12 100644
--- a/pgsql.c
+++ b/pgsql.c
@@ -62,27 +62,27 @@ typedef struct PgDataInfo {
size_t to_len;
} PgDataInfo;
-static PgDataInfo compute_block_info( off_t offset, size_t len )
+static PgDataInfo compute_block_info( size_t block_size, off_t offset, size_t len )
{
PgDataInfo info;
int nof_blocks;
- info.from_block = offset / STANDARD_BLOCK_SIZE;
- info.from_offset = offset % STANDARD_BLOCK_SIZE;
+ info.from_block = offset / block_size;
+ info.from_offset = offset % block_size;
- nof_blocks = ( info.from_offset + len ) / STANDARD_BLOCK_SIZE;
+ nof_blocks = ( info.from_offset + len ) / block_size;
if( nof_blocks == 0 ) {
info.from_len = len;
} else {
- info.from_len = STANDARD_BLOCK_SIZE - info.from_offset;
+ info.from_len = block_size - info.from_offset;
}
info.to_block = info.from_block + nof_blocks;
- info.to_len = ( info.from_offset + len ) % STANDARD_BLOCK_SIZE;
+ info.to_len = ( info.from_offset + len ) % block_size;
if( info.to_len == 0 ) {
info.to_block--;
- info.to_len = STANDARD_BLOCK_SIZE;
+ info.to_len = block_size;
}
return info;
@@ -142,7 +142,7 @@ int64_t psql_path_to_id( PGconn *conn, const char *path )
idx = PQfnumber( res, "mode" );
data = PQgetvalue( res, 0, idx );
- mode = htonl( *( (uint32_t *)data ) );
+ mode = ntohl( *( (uint32_t *)data ) );
PQclear( res );
@@ -301,7 +301,7 @@ int psql_create_file( PGconn *conn, const int64_t parent_id, const char *path, c
return 0;
}
-int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, const off_t offset, const size_t len, int verbose )
+int psql_read_buf( PGconn *conn, const size_t block_size, const int64_t id, const char *path, char *buf, const off_t offset, const size_t len, int verbose )
{
PgDataInfo info;
int64_t param1;
@@ -311,7 +311,7 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf,
int lengths[3] = { sizeof( param1 ), sizeof( param2 ), sizeof( param3 ) };
int binary[3] = { 1, 1, 1 };
PGresult *res;
- char zero_block[STANDARD_BLOCK_SIZE];
+ char *zero_block;
int64_t block_no;
char *iptr;
char *data;
@@ -322,7 +322,7 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf,
PgMeta meta;
size_t size;
int64_t tmp;
-
+
tmp = psql_read_meta( conn, id, path, &meta );
if( tmp < 0 ) {
return tmp;
@@ -337,7 +337,7 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf,
size = meta.size - offset;
}
- info = compute_block_info( offset, size );
+ info = compute_block_info( block_size, offset, size );
param1 = htobe64( id );
param2 = htobe64( info.from_block );
@@ -352,8 +352,12 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf,
return -EIO;
}
- memset( zero_block, 0, STANDARD_BLOCK_SIZE );
-
+ zero_block = (char *)calloc( 1, block_size );
+ if( zero_block == NULL ) {
+ PQclear( res );
+ return -ENOMEM;
+ }
+
dst = buf;
copied = 0;
for( block_no = info.from_block, idx = 0; block_no <= info.to_block; block_no++ ) {
@@ -390,9 +394,9 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf,
/* intermediary blocks, are copied completly */
} else {
- memcpy( dst, data, STANDARD_BLOCK_SIZE );
- dst += STANDARD_BLOCK_SIZE;
- copied += STANDARD_BLOCK_SIZE;
+ memcpy( dst, data, block_size );
+ dst += block_size;
+ copied += block_size;
}
if( verbose ) {
@@ -403,6 +407,8 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf,
PQclear( res );
+ free( zero_block );
+
if( copied != size ) {
syslog( LOG_ERR, "File '%s', reading block '%"PRIi64"', copied '%zu' bytes but expecting '%zu'!",
path, block_no, copied, size );
@@ -552,7 +558,7 @@ int psql_delete_file( PGconn *conn, const int64_t id, const char *path )
return 0;
}
-static int psql_write_block( PGconn *conn, const int64_t id, const char *path, const char *buf, const int64_t block_no, const off_t offset, const size_t len, int verbose )
+static int psql_write_block( PGconn *conn, const size_t block_size, const int64_t id, const char *path, const char *buf, const int64_t block_no, const off_t offset, const size_t len, int verbose )
{
int64_t param1 = htobe64( id );
int64_t param2 = htobe64( block_no );
@@ -563,42 +569,42 @@ static int psql_write_block( PGconn *conn, const int64_t id, const char *path, c
char sql[256];
/* could actually be an assertion, as this can never happen */
- if( offset + len > STANDARD_BLOCK_SIZE ) {
+ if( offset + len > block_size ) {
syslog( LOG_ERR, "Got a too big block write for file '%s', block '%20"PRIi64"': %20jd + %20zu > %d!",
- path, block_no, offset, len, STANDARD_BLOCK_SIZE );
+ path, block_no, offset, len, block_size );
return -EIO;
}
update_again:
/* write a complete block, old data in the database doesn't bother us */
- if( offset == 0 && len == STANDARD_BLOCK_SIZE ) {
+ if( offset == 0 && len == block_size ) {
strcpy( sql, "UPDATE data set data = $3::bytea WHERE dir_id=$1::bigint AND block_no=$2::bigint" );
/* keep data on the right */
- } else if( offset == 0 && len < STANDARD_BLOCK_SIZE ) {
+ } else if( offset == 0 && len < block_size ) {
sprintf( sql, "UPDATE data set data = $3::bytea || substring( data from %zu for %zu ) WHERE dir_id=$1::bigint AND block_no=$2::bigint",
- len + 1, (size_t)STANDARD_BLOCK_SIZE - len );
+ len + 1, block_size - len );
/* keep data on the left */
- } else if( offset > 0 && offset + len == STANDARD_BLOCK_SIZE ) {
+ } else if( offset > 0 && offset + len == block_size ) {
sprintf( sql, "UPDATE data set data = substring( data from %d for %jd ) || $3::bytea WHERE dir_id=$1::bigint AND block_no=$2::bigint",
1, offset );
/* small in the middle write, keep data on both sides */
- } else if( offset > 0 && offset + len < STANDARD_BLOCK_SIZE ) {
+ } else if( offset > 0 && offset + len < block_size ) {
sprintf( sql, "UPDATE data set data = substring( data from %d for %jd ) || $3::bytea || substring( data from %jd for %jd ) WHERE dir_id=$1::bigint AND block_no=$2::bigint",
1, offset,
- offset + len + 1, STANDARD_BLOCK_SIZE - ( offset + len ) );
+ offset + len + 1, block_size - ( offset + len ) );
/* we should never get here */
} else {
syslog( LOG_ERR, "Unhandled write case for file '%s' in block '%"PRIi64"': offset: %jd, len: %zu, blocksize: %u",
- path, block_no, offset, len, STANDARD_BLOCK_SIZE );
+ path, block_no, offset, len, block_size );
return -EIO;
}
@@ -634,9 +640,9 @@ update_again:
PQclear( res );
/* the block didn't exist, so create one */
- res = PQexecParams( conn, "INSERT INTO data( dir_id, block_no ) VALUES"
- " ( $1::bigint, $2::bigint )",
- 2, NULL, values, lengths, binary, 1 );
+ sprintf( sql, "INSERT INTO data( dir_id, block_no, data ) VALUES"
+ " ( $1::bigint, $2::bigint, repeat(E'\\\\000',%zu)::bytea )", block_size );
+ res = PQexecParams( conn, sql, 2, NULL, values, lengths, binary, 1 );
if( PQresultStatus( res ) != PGRES_COMMAND_OK ) {
syslog( LOG_ERR, "Error in psql_write_block(%"PRIi64",%jd,%zu) for file '%s' allocating new block '%"PRIi64"': %s",
@@ -657,7 +663,7 @@ update_again:
goto update_again;
}
-int psql_write_buf( PGconn *conn, const int64_t id, const char *path, const char *buf, const off_t offset, const size_t len, int verbose )
+int psql_write_buf( PGconn *conn, const size_t block_size, const int64_t id, const char *path, const char *buf, const off_t offset, const size_t len, int verbose )
{
PgDataInfo info;
int res;
@@ -665,10 +671,10 @@ int psql_write_buf( PGconn *conn, const int64_t id, const char *path, const char
if( len == 0 ) return 0;
- info = compute_block_info( offset, len );
+ info = compute_block_info( block_size, offset, len );
/* first (partial) block */
- res = psql_write_block( conn, id, path, buf, info.from_block, info.from_offset, info.from_len, verbose );
+ res = psql_write_block( conn, block_size, id, path, buf, info.from_block, info.from_offset, info.from_len, verbose );
if( res < 0 ) {
return res;
}
@@ -687,20 +693,20 @@ int psql_write_buf( PGconn *conn, const int64_t id, const char *path, const char
/* all full blocks */
for( block_no = info.from_block + 1; block_no < info.to_block; block_no++ ) {
- res = psql_write_block( conn, id, path, buf, block_no, 0, STANDARD_BLOCK_SIZE, verbose );
+ res = psql_write_block( conn, block_size, id, path, buf, block_no, 0, block_size, verbose );
if( res < 0 ) {
return res;
}
- if( res != STANDARD_BLOCK_SIZE ) {
+ if( res != block_size ) {
syslog( LOG_ERR, "Partial write in file '%s' in block '%"PRIi64"' (%u instead of %u octets)",
- path, block_no, res, STANDARD_BLOCK_SIZE );
+ path, block_no, res, block_size );
return -EIO;
}
- buf += STANDARD_BLOCK_SIZE;
+ buf += block_size;
}
/* last partial block */
- res = psql_write_block( conn, id, path, buf, info.to_block, 0, info.to_len, verbose );
+ res = psql_write_block( conn, block_size, id, path, buf, info.to_block, 0, info.to_len, verbose );
if( res < 0 ) {
return res;
}
@@ -713,7 +719,7 @@ int psql_write_buf( PGconn *conn, const int64_t id, const char *path, const char
return len;
}
-int psql_truncate( PGconn *conn, const int64_t id, const char *path, const off_t offset )
+int psql_truncate( PGconn *conn, const size_t block_size, const int64_t id, const char *path, const off_t offset )
{
PgDataInfo info;
int64_t res;
@@ -730,7 +736,7 @@ int psql_truncate( PGconn *conn, const int64_t id, const char *path, const off_t
return res;
}
- info = compute_block_info( 0, offset );
+ info = compute_block_info( block_size, 0, offset );
param1 = htobe64( id );
param2 = htobe64( info.to_block );
@@ -862,3 +868,31 @@ int psql_rename( PGconn *conn, const int64_t from_id, const int64_t from_parent_
return 0;
}
+
+size_t psql_get_block_size( PGconn *conn, const size_t block_size )
+{
+ PGresult *res;
+ char *data;
+ size_t db_block_size;
+
+ res = PQexec( conn, "SELECT distinct octet_length(data) FROM data" );
+ if( PQresultStatus( res ) != PGRES_TUPLES_OK ) {
+ syslog( LOG_ERR, "Error in psql_get_block_size: %s", PQerrorMessage( conn ) );
+ PQclear( res );
+ return -EIO;
+ }
+
+ /* empty, this is ok, any blocksize acceptable after initialization */
+ if( PQntuples( res ) == 0 ) {
+ PQclear( res );
+ return block_size;
+ }
+
+ data = PQgetvalue( res, 0, 0 );
+ db_block_size = atoi( data );
+
+ PQclear( res );
+
+ return db_block_size;
+}
+