diff options
author | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-05 20:30:27 +0100 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-05 20:30:27 +0100 |
commit | c81701d1bf5f20932ba77b3b1e1893a51e182f73 (patch) | |
tree | 6d8658f8ea40f114ff759c26077e2093e74ac817 | |
parent | 67ba87a159fb55196092e59c041af6d69ce07e75 (diff) | |
download | biruda-c81701d1bf5f20932ba77b3b1e1893a51e182f73.tar.gz biruda-c81701d1bf5f20932ba77b3b1e1893a51e182f73.tar.bz2 |
first working version of worker spool files in the master
-rw-r--r-- | src/biruda.c | 4 | ||||
-rw-r--r-- | src/biruda.conf | 2 | ||||
-rw-r--r-- | src/master.c | 81 | ||||
-rw-r--r-- | src/master.h | 2 | ||||
-rw-r--r-- | src/port.h | 7 | ||||
-rw-r--r-- | src/worker.h | 6 |
6 files changed, 93 insertions, 9 deletions
diff --git a/src/biruda.c b/src/biruda.c index 3fae736..98733fe 100644 --- a/src/biruda.c +++ b/src/biruda.c @@ -143,6 +143,7 @@ static int read_config( const char *filename, cfg_t **cfg ) cfg_opt_t opts_master[] = { CFG_STR( "control", 0, CFGF_NODEFAULT ), CFG_STR( "data", 0, CFGF_NODEFAULT ), + CFG_STR( "spool_dir", 0, CFGF_NODEFAULT ), CFG_END( ) }; @@ -269,8 +270,9 @@ static int create_master( cfg_t *cfg ) cfg_t *master_cfg = cfg_getnsec( cfg, "master", 0 ); char *control = cfg_getstr( master_cfg, "control" ); char *data = cfg_getstr( master_cfg, "data" ); + char *spool_dir = cfg_getstr( master_cfg, "spool_dir" ); - return master_init( control, data ); + return master_init( control, data, spool_dir ); } static int create_coordinator( cfg_t *cfg ) diff --git a/src/biruda.conf b/src/biruda.conf index 9269bab..d7fae19 100644 --- a/src/biruda.conf +++ b/src/biruda.conf @@ -9,6 +9,8 @@ master # data = "ipc:///tmp/biruda_data.ipc" # data = "inproc://data" data = "tcp://*:5556" + + spool_dir = "." } coordinator diff --git a/src/master.c b/src/master.c index d1f002d..0c59db1 100644 --- a/src/master.c +++ b/src/master.c @@ -79,6 +79,10 @@ static void update_workers( coordinator_t *coord, json_object *obj ) // TODO: check for deleted workers } +typedef struct { + FILE *spool_file; +} execution_worker_data_t; + static int register_coordinator( json_object *obj ) { json_object *host_obj; @@ -162,6 +166,10 @@ static int register_coordinator( json_object *obj ) } else { w->mode = WORKER_EXECUTION_DISABLED; } + + w->execution_data = malloc( sizeof( execution_worker_data_t ) ); + execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data; + wed->spool_file = NULL; json_object *state_obj; json_object_object_get_ex( worker_obj, "state", &state_obj ); @@ -283,10 +291,65 @@ static char *get_next_request( ) typedef struct { const char *control; const char *data; + const char *spool_dir; } master_thread_data_t; static master_thread_data_t master_thread_data; +static worker_t *worker_by_name( const char *name ) +{ + for( int c = 0; c < MAX_COORDINATORS; c++ ) { + if( coordinator[c].used ) { + for( int i = 0; i < coordinator[c].nof_workers; i++ ) { + worker_t *w = &coordinator[c].worker[i]; + if( strcmp( name, w->name ) == 0 ) { + return w; + } + } + } + } + + return NULL; +} + +static int master_output_init( const char *spool_dir, const char *name ) +{ + char filename[1024]; + worker_t *worker = worker_by_name( name ); + + snprintf( filename, sizeof( filename ), "%s%c%s.output", + spool_dir, PORT_DIR_SEPARATOR, worker->name ); + + execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data; + wed->spool_file = fopen( filename, "a+" ); + + return 0; +} + +static int master_output_free( const char *name ) +{ + worker_t *worker = worker_by_name( name ); + execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data; + + if( wed->spool_file != NULL ) { + fclose( wed->spool_file ); + wed->spool_file = NULL; + } + + return 0; +} + +static int master_output_write( const char *name, const char *msg ) +{ + worker_t *worker = worker_by_name( name ); + execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data; + + fputs( msg, wed->spool_file ); + fflush( wed->spool_file ); + + return 0; +} + static void *master_func( void *thread_data ) { master_thread_data_t *tdata = (master_thread_data_t *)thread_data; @@ -391,6 +454,16 @@ NEXT_COMMAND: continue; } } + } else if( strcmp( op, "started" ) == 0 ) { + json_object *worker_obj; + json_object_object_get_ex( recv_obj, "worker", &worker_obj ); + const char *worker = json_object_get_string( worker_obj ); + master_output_init( tdata->spool_dir, worker ); + } else if( strcmp( op, "stopped" ) == 0 ) { + json_object *worker_obj; + json_object_object_get_ex( recv_obj, "worker", &worker_obj ); + const char *worker = json_object_get_string( worker_obj ); + master_output_free( worker ); } else if( strcmp( op, "output" ) == 0 ) { json_object *worker_obj; json_object_object_get_ex( recv_obj, "worker", &worker_obj ); @@ -398,9 +471,7 @@ NEXT_COMMAND: json_object *msg_obj; json_object_object_get_ex( recv_obj, "msg", &msg_obj ); const char *msg = json_object_get_string( msg_obj ); - - printf( "Got message from worker '%s': %s\n", worker, msg ); - + master_output_write( worker, msg ); } else { fprintf( stderr, "WARNING: master received unkown message: %s\n", answer ); } @@ -433,7 +504,7 @@ NEXT_COMMAND: return NULL; } -int master_init( const char *control, const char *data ) +int master_init( const char *control, const char *data, const char *spool_dir ) { pthread_attr_t attr; int res; @@ -449,6 +520,7 @@ int master_init( const char *control, const char *data ) master_thread_data.control = control; master_thread_data.data = data; + master_thread_data.spool_dir = spool_dir; res = pthread_create( &master_thread, &attr, master_func, (void *)&master_thread_data ); if( res != 0 ) { @@ -495,3 +567,4 @@ int master_stop_worker( const char *name ) return res; } + diff --git a/src/master.h b/src/master.h index a19dd8d..616a763 100644 --- a/src/master.h +++ b/src/master.h @@ -4,7 +4,7 @@ #include "port.h" #include "coordinator.h" -int master_init( const char *control, const char *data ); +int master_init( const char *control, const char *data, const char *spool_dir ); void master_terminate( int terminate_nano_msg ); int master_free( ); @@ -30,4 +30,11 @@ typedef unsigned char _Bool; #include <stdbool.h> #endif +// directory separator +#ifdef _WIN32 +#define PORT_DIR_SEPARATOR '\\' +#else +#define PORT_DIR_SEPARATOR '/' +#endif + #endif diff --git a/src/worker.h b/src/worker.h index a5f0ce9..0e243af 100644 --- a/src/worker.h +++ b/src/worker.h @@ -1,6 +1,8 @@ #ifndef _BIRUDA_WORKER_HEADER_INCLUDED #define _BIRUDA_WORKER_HEADER_INCLUDED +#include <stdio.h> + typedef enum { WORKER_EXECUTION_DISABLED = 0, WORKER_EXECUTION_DIRECT = 1 @@ -11,14 +13,12 @@ typedef enum { WORKER_STATE_RUNNING = 1 } worker_state_t; -typedef void *worker_execution_data_t; - typedef struct { char *name; worker_execution_mode_t mode; char *command; worker_state_t state; - worker_execution_data_t execution_data; + void *execution_data; const char *control; const char *data; } worker_t; |