diff options
author | Andreas Baumann <abaumann@yahoo.com> | 2012-05-06 12:51:36 +0200 |
---|---|---|
committer | Andreas Baumann <abaumann@yahoo.com> | 2012-05-06 12:51:36 +0200 |
commit | bc26616b1af435f7c7509ff3f5b70bc3991015b2 (patch) | |
tree | 11fd917530d860b3e59bc5d2075729a90f464a72 /pgsql.c | |
parent | b80d19d023149d81fc80ff873d96cf14617db06d (diff) | |
download | pgfuse-bc26616b1af435f7c7509ff3f5b70bc3991015b2.tar.gz pgfuse-bc26616b1af435f7c7509ff3f5b70bc3991015b2.tar.bz2 |
added mount option blocksize, can be set when creating the filsystem
(first write), schema.sql doesn't contain block size information anymore
Diffstat (limited to 'pgsql.c')
-rw-r--r-- | pgsql.c | 114 |
1 files changed, 74 insertions, 40 deletions
@@ -62,27 +62,27 @@ typedef struct PgDataInfo { size_t to_len; } PgDataInfo; -static PgDataInfo compute_block_info( off_t offset, size_t len ) +static PgDataInfo compute_block_info( size_t block_size, off_t offset, size_t len ) { PgDataInfo info; int nof_blocks; - info.from_block = offset / STANDARD_BLOCK_SIZE; - info.from_offset = offset % STANDARD_BLOCK_SIZE; + info.from_block = offset / block_size; + info.from_offset = offset % block_size; - nof_blocks = ( info.from_offset + len ) / STANDARD_BLOCK_SIZE; + nof_blocks = ( info.from_offset + len ) / block_size; if( nof_blocks == 0 ) { info.from_len = len; } else { - info.from_len = STANDARD_BLOCK_SIZE - info.from_offset; + info.from_len = block_size - info.from_offset; } info.to_block = info.from_block + nof_blocks; - info.to_len = ( info.from_offset + len ) % STANDARD_BLOCK_SIZE; + info.to_len = ( info.from_offset + len ) % block_size; if( info.to_len == 0 ) { info.to_block--; - info.to_len = STANDARD_BLOCK_SIZE; + info.to_len = block_size; } return info; @@ -142,7 +142,7 @@ int64_t psql_path_to_id( PGconn *conn, const char *path ) idx = PQfnumber( res, "mode" ); data = PQgetvalue( res, 0, idx ); - mode = htonl( *( (uint32_t *)data ) ); + mode = ntohl( *( (uint32_t *)data ) ); PQclear( res ); @@ -301,7 +301,7 @@ int psql_create_file( PGconn *conn, const int64_t parent_id, const char *path, c return 0; } -int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, const off_t offset, const size_t len, int verbose ) +int psql_read_buf( PGconn *conn, const size_t block_size, const int64_t id, const char *path, char *buf, const off_t offset, const size_t len, int verbose ) { PgDataInfo info; int64_t param1; @@ -311,7 +311,7 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, int lengths[3] = { sizeof( param1 ), sizeof( param2 ), sizeof( param3 ) }; int binary[3] = { 1, 1, 1 }; PGresult *res; - char zero_block[STANDARD_BLOCK_SIZE]; + char *zero_block; int64_t block_no; char *iptr; char *data; @@ -322,7 +322,7 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, PgMeta meta; size_t size; int64_t tmp; - + tmp = psql_read_meta( conn, id, path, &meta ); if( tmp < 0 ) { return tmp; @@ -337,7 +337,7 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, size = meta.size - offset; } - info = compute_block_info( offset, size ); + info = compute_block_info( block_size, offset, size ); param1 = htobe64( id ); param2 = htobe64( info.from_block ); @@ -352,8 +352,12 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, return -EIO; } - memset( zero_block, 0, STANDARD_BLOCK_SIZE ); - + zero_block = (char *)calloc( 1, block_size ); + if( zero_block == NULL ) { + PQclear( res ); + return -ENOMEM; + } + dst = buf; copied = 0; for( block_no = info.from_block, idx = 0; block_no <= info.to_block; block_no++ ) { @@ -390,9 +394,9 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, /* intermediary blocks, are copied completly */ } else { - memcpy( dst, data, STANDARD_BLOCK_SIZE ); - dst += STANDARD_BLOCK_SIZE; - copied += STANDARD_BLOCK_SIZE; + memcpy( dst, data, block_size ); + dst += block_size; + copied += block_size; } if( verbose ) { @@ -403,6 +407,8 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, PQclear( res ); + free( zero_block ); + if( copied != size ) { syslog( LOG_ERR, "File '%s', reading block '%"PRIi64"', copied '%zu' bytes but expecting '%zu'!", path, block_no, copied, size ); @@ -552,7 +558,7 @@ int psql_delete_file( PGconn *conn, const int64_t id, const char *path ) return 0; } -static int psql_write_block( PGconn *conn, const int64_t id, const char *path, const char *buf, const int64_t block_no, const off_t offset, const size_t len, int verbose ) +static int psql_write_block( PGconn *conn, const size_t block_size, const int64_t id, const char *path, const char *buf, const int64_t block_no, const off_t offset, const size_t len, int verbose ) { int64_t param1 = htobe64( id ); int64_t param2 = htobe64( block_no ); @@ -563,42 +569,42 @@ static int psql_write_block( PGconn *conn, const int64_t id, const char *path, c char sql[256]; /* could actually be an assertion, as this can never happen */ - if( offset + len > STANDARD_BLOCK_SIZE ) { + if( offset + len > block_size ) { syslog( LOG_ERR, "Got a too big block write for file '%s', block '%20"PRIi64"': %20jd + %20zu > %d!", - path, block_no, offset, len, STANDARD_BLOCK_SIZE ); + path, block_no, offset, len, block_size ); return -EIO; } update_again: /* write a complete block, old data in the database doesn't bother us */ - if( offset == 0 && len == STANDARD_BLOCK_SIZE ) { + if( offset == 0 && len == block_size ) { strcpy( sql, "UPDATE data set data = $3::bytea WHERE dir_id=$1::bigint AND block_no=$2::bigint" ); /* keep data on the right */ - } else if( offset == 0 && len < STANDARD_BLOCK_SIZE ) { + } else if( offset == 0 && len < block_size ) { sprintf( sql, "UPDATE data set data = $3::bytea || substring( data from %zu for %zu ) WHERE dir_id=$1::bigint AND block_no=$2::bigint", - len + 1, (size_t)STANDARD_BLOCK_SIZE - len ); + len + 1, block_size - len ); /* keep data on the left */ - } else if( offset > 0 && offset + len == STANDARD_BLOCK_SIZE ) { + } else if( offset > 0 && offset + len == block_size ) { sprintf( sql, "UPDATE data set data = substring( data from %d for %jd ) || $3::bytea WHERE dir_id=$1::bigint AND block_no=$2::bigint", 1, offset ); /* small in the middle write, keep data on both sides */ - } else if( offset > 0 && offset + len < STANDARD_BLOCK_SIZE ) { + } else if( offset > 0 && offset + len < block_size ) { sprintf( sql, "UPDATE data set data = substring( data from %d for %jd ) || $3::bytea || substring( data from %jd for %jd ) WHERE dir_id=$1::bigint AND block_no=$2::bigint", 1, offset, - offset + len + 1, STANDARD_BLOCK_SIZE - ( offset + len ) ); + offset + len + 1, block_size - ( offset + len ) ); /* we should never get here */ } else { syslog( LOG_ERR, "Unhandled write case for file '%s' in block '%"PRIi64"': offset: %jd, len: %zu, blocksize: %u", - path, block_no, offset, len, STANDARD_BLOCK_SIZE ); + path, block_no, offset, len, block_size ); return -EIO; } @@ -634,9 +640,9 @@ update_again: PQclear( res ); /* the block didn't exist, so create one */ - res = PQexecParams( conn, "INSERT INTO data( dir_id, block_no ) VALUES" - " ( $1::bigint, $2::bigint )", - 2, NULL, values, lengths, binary, 1 ); + sprintf( sql, "INSERT INTO data( dir_id, block_no, data ) VALUES" + " ( $1::bigint, $2::bigint, repeat(E'\\\\000',%zu)::bytea )", block_size ); + res = PQexecParams( conn, sql, 2, NULL, values, lengths, binary, 1 ); if( PQresultStatus( res ) != PGRES_COMMAND_OK ) { syslog( LOG_ERR, "Error in psql_write_block(%"PRIi64",%jd,%zu) for file '%s' allocating new block '%"PRIi64"': %s", @@ -657,7 +663,7 @@ update_again: goto update_again; } -int psql_write_buf( PGconn *conn, const int64_t id, const char *path, const char *buf, const off_t offset, const size_t len, int verbose ) +int psql_write_buf( PGconn *conn, const size_t block_size, const int64_t id, const char *path, const char *buf, const off_t offset, const size_t len, int verbose ) { PgDataInfo info; int res; @@ -665,10 +671,10 @@ int psql_write_buf( PGconn *conn, const int64_t id, const char *path, const char if( len == 0 ) return 0; - info = compute_block_info( offset, len ); + info = compute_block_info( block_size, offset, len ); /* first (partial) block */ - res = psql_write_block( conn, id, path, buf, info.from_block, info.from_offset, info.from_len, verbose ); + res = psql_write_block( conn, block_size, id, path, buf, info.from_block, info.from_offset, info.from_len, verbose ); if( res < 0 ) { return res; } @@ -687,20 +693,20 @@ int psql_write_buf( PGconn *conn, const int64_t id, const char *path, const char /* all full blocks */ for( block_no = info.from_block + 1; block_no < info.to_block; block_no++ ) { - res = psql_write_block( conn, id, path, buf, block_no, 0, STANDARD_BLOCK_SIZE, verbose ); + res = psql_write_block( conn, block_size, id, path, buf, block_no, 0, block_size, verbose ); if( res < 0 ) { return res; } - if( res != STANDARD_BLOCK_SIZE ) { + if( res != block_size ) { syslog( LOG_ERR, "Partial write in file '%s' in block '%"PRIi64"' (%u instead of %u octets)", - path, block_no, res, STANDARD_BLOCK_SIZE ); + path, block_no, res, block_size ); return -EIO; } - buf += STANDARD_BLOCK_SIZE; + buf += block_size; } /* last partial block */ - res = psql_write_block( conn, id, path, buf, info.to_block, 0, info.to_len, verbose ); + res = psql_write_block( conn, block_size, id, path, buf, info.to_block, 0, info.to_len, verbose ); if( res < 0 ) { return res; } @@ -713,7 +719,7 @@ int psql_write_buf( PGconn *conn, const int64_t id, const char *path, const char return len; } -int psql_truncate( PGconn *conn, const int64_t id, const char *path, const off_t offset ) +int psql_truncate( PGconn *conn, const size_t block_size, const int64_t id, const char *path, const off_t offset ) { PgDataInfo info; int64_t res; @@ -730,7 +736,7 @@ int psql_truncate( PGconn *conn, const int64_t id, const char *path, const off_t return res; } - info = compute_block_info( 0, offset ); + info = compute_block_info( block_size, 0, offset ); param1 = htobe64( id ); param2 = htobe64( info.to_block ); @@ -862,3 +868,31 @@ int psql_rename( PGconn *conn, const int64_t from_id, const int64_t from_parent_ return 0; } + +size_t psql_get_block_size( PGconn *conn, const size_t block_size ) +{ + PGresult *res; + char *data; + size_t db_block_size; + + res = PQexec( conn, "SELECT distinct octet_length(data) FROM data" ); + if( PQresultStatus( res ) != PGRES_TUPLES_OK ) { + syslog( LOG_ERR, "Error in psql_get_block_size: %s", PQerrorMessage( conn ) ); + PQclear( res ); + return -EIO; + } + + /* empty, this is ok, any blocksize acceptable after initialization */ + if( PQntuples( res ) == 0 ) { + PQclear( res ); + return block_size; + } + + data = PQgetvalue( res, 0, 0 ); + db_block_size = atoi( data ); + + PQclear( res ); + + return db_block_size; +} + |