diff options
author | Andreas Baumann <abaumann@yahoo.com> | 2012-05-06 12:51:36 +0200 |
---|---|---|
committer | Andreas Baumann <abaumann@yahoo.com> | 2012-05-06 12:51:36 +0200 |
commit | bc26616b1af435f7c7509ff3f5b70bc3991015b2 (patch) | |
tree | 11fd917530d860b3e59bc5d2075729a90f464a72 | |
parent | b80d19d023149d81fc80ff873d96cf14617db06d (diff) | |
download | pgfuse-bc26616b1af435f7c7509ff3f5b70bc3991015b2.tar.gz pgfuse-bc26616b1af435f7c7509ff3f5b70bc3991015b2.tar.bz2 |
added mount option blocksize, can be set when creating the filsystem
(first write), schema.sql doesn't contain block size information anymore
-rw-r--r-- | config.h | 3 | ||||
-rw-r--r-- | inc.mak | 4 | ||||
-rw-r--r-- | pgfuse.c | 50 | ||||
-rw-r--r-- | pgsql.c | 114 | ||||
-rw-r--r-- | pgsql.h | 8 | ||||
-rw-r--r-- | schema.sql | 9 | ||||
-rw-r--r-- | tests/Makefile | 4 | ||||
-rw-r--r-- | tests/clean.sql | 1 |
8 files changed, 123 insertions, 70 deletions
@@ -1,3 +1,4 @@ + /* Copyright (C) 2012 Andreas Baumann <abaumann@yahoo.com> @@ -24,7 +25,7 @@ /* standard block size, that's the split size for the byta column in data */ -#define STANDARD_BLOCK_SIZE 4096 +#define DEFAULT_BLOCK_SIZE 4096 /* maximum length of a filename , rather arbitrary choice */ @@ -1,9 +1,9 @@ CC=gcc # for debugging -#CFLAGS = -Wall -Werror -g -O0 -pthread +CFLAGS = -Wall -Werror -g -O0 -pthread # for releasing -CFLAGS = -Wall -O2 +#CFLAGS = -Wall -O2 # redhat has libpq-fe.h and fuse.h in /usr/include, ok @@ -52,6 +52,7 @@ typedef struct PgFuseData { PgConnPool pool; /* the database pool to operate on (multi-thread only) */ int read_only; /* whether the mount point is read-only */ int multi_threaded; /* whether we run multi-threaded */ + size_t block_size; /* block size to use for storage of data in bytea fields */ } PgFuseData; /* --- timestamp helpers --- */ @@ -184,8 +185,8 @@ static int pgfuse_fgetattr( const char *path, struct stat *stbuf, struct fuse_fi stbuf->st_blocks = 0; stbuf->st_mode = meta.mode; stbuf->st_size = meta.size; - stbuf->st_blksize = STANDARD_BLOCK_SIZE; - stbuf->st_blocks = ( meta.size + STANDARD_BLOCK_SIZE - 1 ) / STANDARD_BLOCK_SIZE; + stbuf->st_blksize = data->block_size; + stbuf->st_blocks = ( meta.size + data->block_size - 1 ) / data->block_size; /* TODO: set correctly from table */ stbuf->st_nlink = 1; stbuf->st_uid = meta.uid; @@ -230,8 +231,8 @@ static int pgfuse_getattr( const char *path, struct stat *stbuf ) stbuf->st_blocks = 0; stbuf->st_mode = meta.mode; stbuf->st_size = meta.size; - stbuf->st_blksize = STANDARD_BLOCK_SIZE; - stbuf->st_blocks = ( meta.size + STANDARD_BLOCK_SIZE - 1 ) / STANDARD_BLOCK_SIZE; + stbuf->st_blksize = data->block_size; + stbuf->st_blocks = ( meta.size + data->block_size - 1 ) / data->block_size; /* TODO: set correctly from table */ stbuf->st_nlink = 1; stbuf->st_uid = meta.uid; @@ -769,7 +770,7 @@ static int pgfuse_write( const char *path, const char *buf, size_t size, meta.size = offset + size; } - res = psql_write_buf( conn, fi->fh, path, buf, offset, size, data->verbose ); + res = psql_write_buf( conn, data->block_size, fi->fh, path, buf, offset, size, data->verbose ); if( res < 0 ) { PSQL_ROLLBACK( conn ); RELEASE( conn ); return res; @@ -813,7 +814,7 @@ static int pgfuse_read( const char *path, char *buf, size_t size, return -EBADF; } - res = psql_read_buf( conn, fi->fh, path, buf, offset, size, data->verbose ); + res = psql_read_buf( conn, data->block_size, fi->fh, path, buf, offset, size, data->verbose ); if( res < 0 ) { PSQL_ROLLBACK( conn ); RELEASE( conn ); return res; @@ -861,7 +862,7 @@ static int pgfuse_truncate( const char* path, off_t offset ) return -EROFS; } - res = psql_truncate( conn, id, path, offset ); + res = psql_truncate( conn, data->block_size, id, path, offset ); if( res < 0 ) { PSQL_ROLLBACK( conn ); RELEASE( conn ); return res; @@ -912,7 +913,7 @@ static int pgfuse_ftruncate( const char *path, off_t offset, struct fuse_file_in return -EROFS; } - res = psql_truncate( conn, fi->fh, path, offset ); + res = psql_truncate( conn, data->block_size, fi->fh, path, offset ); if( res < 0 ) { PSQL_ROLLBACK( conn ); RELEASE( conn ); return res; @@ -944,8 +945,8 @@ static int pgfuse_statfs( const char *path, struct statvfs *buf ) memset( buf, 0, sizeof( struct statvfs ) ); - buf->f_bsize = STANDARD_BLOCK_SIZE; - buf->f_frsize = STANDARD_BLOCK_SIZE; + buf->f_bsize = data->block_size; + buf->f_frsize = data->block_size; /* Note: it's hard to tell how much space is left in the database * and how big it is */ @@ -1129,7 +1130,7 @@ static int pgfuse_symlink( const char *from, const char *to ) return id; } - res = psql_write_buf( conn, id, to, from, 0, strlen( from ), data->verbose ); + res = psql_write_buf( conn, data->block_size, id, to, from, 0, strlen( from ), data->verbose ); if( res < 0 ) { free( copy_to ); PSQL_ROLLBACK( conn ); RELEASE( conn ); @@ -1278,7 +1279,7 @@ static int pgfuse_readlink( const char *path, char *buf, size_t size ) return -ENOMEM; } - res = psql_read_buf( conn, id, path, buf, 0, meta.size, data->verbose ); + res = psql_read_buf( conn, data->block_size, id, path, buf, 0, meta.size, data->verbose ); if( res < 0 ) { PSQL_ROLLBACK( conn ); RELEASE( conn ); return res; @@ -1382,6 +1383,7 @@ typedef struct PgFuseOptions { char *mountpoint; /* where we mount the virtual filesystem */ int read_only; /* whether to mount read-only */ int multi_threaded; /* whether we run multi-threaded */ + size_t block_size; /* block size to use to store data in BYTEA fields */ } PgFuseOptions; #define PGFUSE_OPT( t, p, v ) { t, offsetof( PgFuseOptions, p ), v } @@ -1394,6 +1396,7 @@ enum { static struct fuse_opt pgfuse_opts[] = { PGFUSE_OPT( "ro", read_only, 1 ), + PGFUSE_OPT( "blocksize=%d", block_size, DEFAULT_BLOCK_SIZE ), FUSE_OPT_KEY( "-h", KEY_HELP ), FUSE_OPT_KEY( "--help", KEY_HELP ), FUSE_OPT_KEY( "-v", KEY_VERBOSE ), @@ -1469,6 +1472,7 @@ static void print_usage( char* progname ) "\n" "PgFuse options:\n" " ro mount filesystem read-only, do not change data in database\n" + " blocksize=<bytes> block size to use for storage of data\n" "\n", progname ); @@ -1487,6 +1491,7 @@ int main( int argc, char *argv[] ) memset( &pgfuse, 0, sizeof( pgfuse ) ); pgfuse.multi_threaded = 1; + pgfuse.block_size = DEFAULT_BLOCK_SIZE; if( fuse_opt_parse( &args, &pgfuse, pgfuse_opts, pgfuse_opt_proc ) == -1 ) { if( pgfuse.print_help ) { @@ -1528,6 +1533,7 @@ int main( int argc, char *argv[] ) * standard for PostgreSQL 8.4 or newer). Otherwise bail out * currently.. */ + value = PQparameterStatus( conn, "integer_datetimes" ); if( value == NULL ) { fprintf( stderr, "PQ param integer_datetimes not available?\n" @@ -1542,10 +1548,23 @@ int main( int argc, char *argv[] ) PQfinish( conn ); return 1; } + + openlog( basename( argv[0] ), LOG_PID, LOG_USER ); - PQfinish( conn ); + /* Compare blocksize given as parameter and blocksize in database */ + res = psql_get_block_size( conn, pgfuse.block_size ); + if( res < 0 ) { + PQfinish( conn ); + return 1; + } + if( res != pgfuse.block_size ) { + fprintf( stderr, "Blocksize parameter mismatch (is '%zu', in database we have '%zu') taking the later one!\n", + pgfuse.block_size, (size_t)res ); + PQfinish( conn ); + return 1; + } - openlog( basename( argv[0] ), LOG_PID, LOG_USER ); + PQfinish( conn ); memset( &userdata, 0, sizeof( PgFuseData ) ); userdata.conninfo = pgfuse.conninfo; @@ -1553,8 +1572,11 @@ int main( int argc, char *argv[] ) userdata.verbose = pgfuse.verbose; userdata.read_only = pgfuse.read_only; userdata.multi_threaded = pgfuse.multi_threaded; + userdata.block_size = pgfuse.block_size; res = fuse_main( args.argc, args.argv, &pgfuse_oper, &userdata ); + closelog( ); + exit( res ); } @@ -62,27 +62,27 @@ typedef struct PgDataInfo { size_t to_len; } PgDataInfo; -static PgDataInfo compute_block_info( off_t offset, size_t len ) +static PgDataInfo compute_block_info( size_t block_size, off_t offset, size_t len ) { PgDataInfo info; int nof_blocks; - info.from_block = offset / STANDARD_BLOCK_SIZE; - info.from_offset = offset % STANDARD_BLOCK_SIZE; + info.from_block = offset / block_size; + info.from_offset = offset % block_size; - nof_blocks = ( info.from_offset + len ) / STANDARD_BLOCK_SIZE; + nof_blocks = ( info.from_offset + len ) / block_size; if( nof_blocks == 0 ) { info.from_len = len; } else { - info.from_len = STANDARD_BLOCK_SIZE - info.from_offset; + info.from_len = block_size - info.from_offset; } info.to_block = info.from_block + nof_blocks; - info.to_len = ( info.from_offset + len ) % STANDARD_BLOCK_SIZE; + info.to_len = ( info.from_offset + len ) % block_size; if( info.to_len == 0 ) { info.to_block--; - info.to_len = STANDARD_BLOCK_SIZE; + info.to_len = block_size; } return info; @@ -142,7 +142,7 @@ int64_t psql_path_to_id( PGconn *conn, const char *path ) idx = PQfnumber( res, "mode" ); data = PQgetvalue( res, 0, idx ); - mode = htonl( *( (uint32_t *)data ) ); + mode = ntohl( *( (uint32_t *)data ) ); PQclear( res ); @@ -301,7 +301,7 @@ int psql_create_file( PGconn *conn, const int64_t parent_id, const char *path, c return 0; } -int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, const off_t offset, const size_t len, int verbose ) +int psql_read_buf( PGconn *conn, const size_t block_size, const int64_t id, const char *path, char *buf, const off_t offset, const size_t len, int verbose ) { PgDataInfo info; int64_t param1; @@ -311,7 +311,7 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, int lengths[3] = { sizeof( param1 ), sizeof( param2 ), sizeof( param3 ) }; int binary[3] = { 1, 1, 1 }; PGresult *res; - char zero_block[STANDARD_BLOCK_SIZE]; + char *zero_block; int64_t block_no; char *iptr; char *data; @@ -322,7 +322,7 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, PgMeta meta; size_t size; int64_t tmp; - + tmp = psql_read_meta( conn, id, path, &meta ); if( tmp < 0 ) { return tmp; @@ -337,7 +337,7 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, size = meta.size - offset; } - info = compute_block_info( offset, size ); + info = compute_block_info( block_size, offset, size ); param1 = htobe64( id ); param2 = htobe64( info.from_block ); @@ -352,8 +352,12 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, return -EIO; } - memset( zero_block, 0, STANDARD_BLOCK_SIZE ); - + zero_block = (char *)calloc( 1, block_size ); + if( zero_block == NULL ) { + PQclear( res ); + return -ENOMEM; + } + dst = buf; copied = 0; for( block_no = info.from_block, idx = 0; block_no <= info.to_block; block_no++ ) { @@ -390,9 +394,9 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, /* intermediary blocks, are copied completly */ } else { - memcpy( dst, data, STANDARD_BLOCK_SIZE ); - dst += STANDARD_BLOCK_SIZE; - copied += STANDARD_BLOCK_SIZE; + memcpy( dst, data, block_size ); + dst += block_size; + copied += block_size; } if( verbose ) { @@ -403,6 +407,8 @@ int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, PQclear( res ); + free( zero_block ); + if( copied != size ) { syslog( LOG_ERR, "File '%s', reading block '%"PRIi64"', copied '%zu' bytes but expecting '%zu'!", path, block_no, copied, size ); @@ -552,7 +558,7 @@ int psql_delete_file( PGconn *conn, const int64_t id, const char *path ) return 0; } -static int psql_write_block( PGconn *conn, const int64_t id, const char *path, const char *buf, const int64_t block_no, const off_t offset, const size_t len, int verbose ) +static int psql_write_block( PGconn *conn, const size_t block_size, const int64_t id, const char *path, const char *buf, const int64_t block_no, const off_t offset, const size_t len, int verbose ) { int64_t param1 = htobe64( id ); int64_t param2 = htobe64( block_no ); @@ -563,42 +569,42 @@ static int psql_write_block( PGconn *conn, const int64_t id, const char *path, c char sql[256]; /* could actually be an assertion, as this can never happen */ - if( offset + len > STANDARD_BLOCK_SIZE ) { + if( offset + len > block_size ) { syslog( LOG_ERR, "Got a too big block write for file '%s', block '%20"PRIi64"': %20jd + %20zu > %d!", - path, block_no, offset, len, STANDARD_BLOCK_SIZE ); + path, block_no, offset, len, block_size ); return -EIO; } update_again: /* write a complete block, old data in the database doesn't bother us */ - if( offset == 0 && len == STANDARD_BLOCK_SIZE ) { + if( offset == 0 && len == block_size ) { strcpy( sql, "UPDATE data set data = $3::bytea WHERE dir_id=$1::bigint AND block_no=$2::bigint" ); /* keep data on the right */ - } else if( offset == 0 && len < STANDARD_BLOCK_SIZE ) { + } else if( offset == 0 && len < block_size ) { sprintf( sql, "UPDATE data set data = $3::bytea || substring( data from %zu for %zu ) WHERE dir_id=$1::bigint AND block_no=$2::bigint", - len + 1, (size_t)STANDARD_BLOCK_SIZE - len ); + len + 1, block_size - len ); /* keep data on the left */ - } else if( offset > 0 && offset + len == STANDARD_BLOCK_SIZE ) { + } else if( offset > 0 && offset + len == block_size ) { sprintf( sql, "UPDATE data set data = substring( data from %d for %jd ) || $3::bytea WHERE dir_id=$1::bigint AND block_no=$2::bigint", 1, offset ); /* small in the middle write, keep data on both sides */ - } else if( offset > 0 && offset + len < STANDARD_BLOCK_SIZE ) { + } else if( offset > 0 && offset + len < block_size ) { sprintf( sql, "UPDATE data set data = substring( data from %d for %jd ) || $3::bytea || substring( data from %jd for %jd ) WHERE dir_id=$1::bigint AND block_no=$2::bigint", 1, offset, - offset + len + 1, STANDARD_BLOCK_SIZE - ( offset + len ) ); + offset + len + 1, block_size - ( offset + len ) ); /* we should never get here */ } else { syslog( LOG_ERR, "Unhandled write case for file '%s' in block '%"PRIi64"': offset: %jd, len: %zu, blocksize: %u", - path, block_no, offset, len, STANDARD_BLOCK_SIZE ); + path, block_no, offset, len, block_size ); return -EIO; } @@ -634,9 +640,9 @@ update_again: PQclear( res ); /* the block didn't exist, so create one */ - res = PQexecParams( conn, "INSERT INTO data( dir_id, block_no ) VALUES" - " ( $1::bigint, $2::bigint )", - 2, NULL, values, lengths, binary, 1 ); + sprintf( sql, "INSERT INTO data( dir_id, block_no, data ) VALUES" + " ( $1::bigint, $2::bigint, repeat(E'\\\\000',%zu)::bytea )", block_size ); + res = PQexecParams( conn, sql, 2, NULL, values, lengths, binary, 1 ); if( PQresultStatus( res ) != PGRES_COMMAND_OK ) { syslog( LOG_ERR, "Error in psql_write_block(%"PRIi64",%jd,%zu) for file '%s' allocating new block '%"PRIi64"': %s", @@ -657,7 +663,7 @@ update_again: goto update_again; } -int psql_write_buf( PGconn *conn, const int64_t id, const char *path, const char *buf, const off_t offset, const size_t len, int verbose ) +int psql_write_buf( PGconn *conn, const size_t block_size, const int64_t id, const char *path, const char *buf, const off_t offset, const size_t len, int verbose ) { PgDataInfo info; int res; @@ -665,10 +671,10 @@ int psql_write_buf( PGconn *conn, const int64_t id, const char *path, const char if( len == 0 ) return 0; - info = compute_block_info( offset, len ); + info = compute_block_info( block_size, offset, len ); /* first (partial) block */ - res = psql_write_block( conn, id, path, buf, info.from_block, info.from_offset, info.from_len, verbose ); + res = psql_write_block( conn, block_size, id, path, buf, info.from_block, info.from_offset, info.from_len, verbose ); if( res < 0 ) { return res; } @@ -687,20 +693,20 @@ int psql_write_buf( PGconn *conn, const int64_t id, const char *path, const char /* all full blocks */ for( block_no = info.from_block + 1; block_no < info.to_block; block_no++ ) { - res = psql_write_block( conn, id, path, buf, block_no, 0, STANDARD_BLOCK_SIZE, verbose ); + res = psql_write_block( conn, block_size, id, path, buf, block_no, 0, block_size, verbose ); if( res < 0 ) { return res; } - if( res != STANDARD_BLOCK_SIZE ) { + if( res != block_size ) { syslog( LOG_ERR, "Partial write in file '%s' in block '%"PRIi64"' (%u instead of %u octets)", - path, block_no, res, STANDARD_BLOCK_SIZE ); + path, block_no, res, block_size ); return -EIO; } - buf += STANDARD_BLOCK_SIZE; + buf += block_size; } /* last partial block */ - res = psql_write_block( conn, id, path, buf, info.to_block, 0, info.to_len, verbose ); + res = psql_write_block( conn, block_size, id, path, buf, info.to_block, 0, info.to_len, verbose ); if( res < 0 ) { return res; } @@ -713,7 +719,7 @@ int psql_write_buf( PGconn *conn, const int64_t id, const char *path, const char return len; } -int psql_truncate( PGconn *conn, const int64_t id, const char *path, const off_t offset ) +int psql_truncate( PGconn *conn, const size_t block_size, const int64_t id, const char *path, const off_t offset ) { PgDataInfo info; int64_t res; @@ -730,7 +736,7 @@ int psql_truncate( PGconn *conn, const int64_t id, const char *path, const off_t return res; } - info = compute_block_info( 0, offset ); + info = compute_block_info( block_size, 0, offset ); param1 = htobe64( id ); param2 = htobe64( info.to_block ); @@ -862,3 +868,31 @@ int psql_rename( PGconn *conn, const int64_t from_id, const int64_t from_parent_ return 0; } + +size_t psql_get_block_size( PGconn *conn, const size_t block_size ) +{ + PGresult *res; + char *data; + size_t db_block_size; + + res = PQexec( conn, "SELECT distinct octet_length(data) FROM data" ); + if( PQresultStatus( res ) != PGRES_TUPLES_OK ) { + syslog( LOG_ERR, "Error in psql_get_block_size: %s", PQerrorMessage( conn ) ); + PQclear( res ); + return -EIO; + } + + /* empty, this is ok, any blocksize acceptable after initialization */ + if( PQntuples( res ) == 0 ) { + PQclear( res ); + return block_size; + } + + data = PQgetvalue( res, 0, 0 ); + db_block_size = atoi( data ); + + PQclear( res ); + + return db_block_size; +} + @@ -80,7 +80,7 @@ int psql_write_meta( PGconn *conn, const int64_t id, const char *path, PgMeta me int psql_create_file( PGconn *conn, const int64_t parent_id, const char *path, const char *new_file, PgMeta meta ); -int psql_read_buf( PGconn *conn, const int64_t id, const char *path, char *buf, const off_t offset, const size_t len, int verbose ); +int psql_read_buf( PGconn *conn, const size_t block_size, const int64_t id, const char *path, char *buf, const off_t offset, const size_t len, int verbose ); int psql_readdir( PGconn *conn, const int64_t parent_id, void *buf, fuse_fill_dir_t filler ); @@ -90,10 +90,12 @@ int psql_delete_dir( PGconn *conn, const int64_t id, const char *path ); int psql_delete_file( PGconn *conn, const int64_t id, const char *path ); -int psql_write_buf( PGconn *conn, const int64_t id, const char *path, const char *buf, const off_t offset, const size_t len, int verbose ); +int psql_write_buf( PGconn *conn, const size_t block_size, const int64_t id, const char *path, const char *buf, const off_t offset, const size_t len, int verbose ); -int psql_truncate( PGconn *conn, const int64_t id, const char *path, const off_t offset ); +int psql_truncate( PGconn *conn, const size_t block_size, const int64_t id, const char *path, const off_t offset ); int psql_rename( PGconn *conn, const int64_t from_id, const int64_t from_parent_id, const int64_t to_parent_id, const char *rename_to, const char *from, const char *to ); +size_t psql_get_block_size( PGconn *conn, const size_t block_size ); + #endif @@ -18,7 +18,7 @@ CREATE TABLE dir ( CREATE TABLE data ( dir_id BIGINT, block_no BIGINT NOT NULL DEFAULT 0, - data BYTEA NOT NULL DEFAULT repeat(E'\\000',4096)::bytea, + data BYTEA, PRIMARY KEY( dir_id, block_no ), FOREIGN KEY( dir_id ) REFERENCES dir( id ) ); @@ -35,13 +35,6 @@ CREATE INDEX dir_parent_id_idx ON dir( parent_id ); -- TODO: should be created by the program after checking the OS -- it is running on (for full POSIX compatibility) --- make sure 'dir' entries always get a first block in the 'data' --- table -CREATE OR REPLACE RULE "dir_insert" AS ON - INSERT TO dir WHERE NEW.mode & 16384 = 0 - DO ALSO INSERT INTO data( dir_id ) - VALUES ( currval( 'dir_id_seq' ) ); - -- garbage collect deleted file entries, delete all blocks in 'data' CREATE OR REPLACE RULE "dir_remove" AS ON DELETE TO dir WHERE OLD.mode & 16384 = 0 diff --git a/tests/Makefile b/tests/Makefile index 8b156e6..b8dd4e3 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -2,13 +2,15 @@ include ../inc.mak PG_CONNINFO = "" +BLOCKSIZE = 4096 + CFLAGS += -I.. test: testfsync testpgsql testtypes testbigfile psql < clean.sql psql < ../schema.sql test -d mnt || mkdir mnt - ../pgfuse -s -v "$(PG_CONNINFO)" mnt + ../pgfuse -o blocksize=$(BLOCKSIZE) -s -v "$(PG_CONNINFO)" mnt mount | grep pgfuse # expect success for making directories -mkdir mnt/dir diff --git a/tests/clean.sql b/tests/clean.sql index 6d94089..8048f01 100644 --- a/tests/clean.sql +++ b/tests/clean.sql @@ -1,4 +1,3 @@ -DROP RULE dir_insert ON dir; DROP RULE dir_remove ON dir; DROP TABLE data; DROP TABLE dir; |