diff options
author | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-05 20:30:27 +0100 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-05 20:30:27 +0100 |
commit | c81701d1bf5f20932ba77b3b1e1893a51e182f73 (patch) | |
tree | 6d8658f8ea40f114ff759c26077e2093e74ac817 /src/master.c | |
parent | 67ba87a159fb55196092e59c041af6d69ce07e75 (diff) | |
download | biruda-c81701d1bf5f20932ba77b3b1e1893a51e182f73.tar.gz biruda-c81701d1bf5f20932ba77b3b1e1893a51e182f73.tar.bz2 |
first working version of worker spool files in the master
Diffstat (limited to 'src/master.c')
-rw-r--r-- | src/master.c | 81 |
1 files changed, 77 insertions, 4 deletions
diff --git a/src/master.c b/src/master.c index d1f002d..0c59db1 100644 --- a/src/master.c +++ b/src/master.c @@ -79,6 +79,10 @@ static void update_workers( coordinator_t *coord, json_object *obj ) // TODO: check for deleted workers } +typedef struct { + FILE *spool_file; +} execution_worker_data_t; + static int register_coordinator( json_object *obj ) { json_object *host_obj; @@ -162,6 +166,10 @@ static int register_coordinator( json_object *obj ) } else { w->mode = WORKER_EXECUTION_DISABLED; } + + w->execution_data = malloc( sizeof( execution_worker_data_t ) ); + execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data; + wed->spool_file = NULL; json_object *state_obj; json_object_object_get_ex( worker_obj, "state", &state_obj ); @@ -283,10 +291,65 @@ static char *get_next_request( ) typedef struct { const char *control; const char *data; + const char *spool_dir; } master_thread_data_t; static master_thread_data_t master_thread_data; +static worker_t *worker_by_name( const char *name ) +{ + for( int c = 0; c < MAX_COORDINATORS; c++ ) { + if( coordinator[c].used ) { + for( int i = 0; i < coordinator[c].nof_workers; i++ ) { + worker_t *w = &coordinator[c].worker[i]; + if( strcmp( name, w->name ) == 0 ) { + return w; + } + } + } + } + + return NULL; +} + +static int master_output_init( const char *spool_dir, const char *name ) +{ + char filename[1024]; + worker_t *worker = worker_by_name( name ); + + snprintf( filename, sizeof( filename ), "%s%c%s.output", + spool_dir, PORT_DIR_SEPARATOR, worker->name ); + + execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data; + wed->spool_file = fopen( filename, "a+" ); + + return 0; +} + +static int master_output_free( const char *name ) +{ + worker_t *worker = worker_by_name( name ); + execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data; + + if( wed->spool_file != NULL ) { + fclose( wed->spool_file ); + wed->spool_file = NULL; + } + + return 0; +} + +static int master_output_write( const char *name, const char *msg ) +{ + worker_t *worker = worker_by_name( name ); + execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data; + + fputs( msg, wed->spool_file ); + fflush( wed->spool_file ); + + return 0; +} + static void *master_func( void *thread_data ) { master_thread_data_t *tdata = (master_thread_data_t *)thread_data; @@ -391,6 +454,16 @@ NEXT_COMMAND: continue; } } + } else if( strcmp( op, "started" ) == 0 ) { + json_object *worker_obj; + json_object_object_get_ex( recv_obj, "worker", &worker_obj ); + const char *worker = json_object_get_string( worker_obj ); + master_output_init( tdata->spool_dir, worker ); + } else if( strcmp( op, "stopped" ) == 0 ) { + json_object *worker_obj; + json_object_object_get_ex( recv_obj, "worker", &worker_obj ); + const char *worker = json_object_get_string( worker_obj ); + master_output_free( worker ); } else if( strcmp( op, "output" ) == 0 ) { json_object *worker_obj; json_object_object_get_ex( recv_obj, "worker", &worker_obj ); @@ -398,9 +471,7 @@ NEXT_COMMAND: json_object *msg_obj; json_object_object_get_ex( recv_obj, "msg", &msg_obj ); const char *msg = json_object_get_string( msg_obj ); - - printf( "Got message from worker '%s': %s\n", worker, msg ); - + master_output_write( worker, msg ); } else { fprintf( stderr, "WARNING: master received unkown message: %s\n", answer ); } @@ -433,7 +504,7 @@ NEXT_COMMAND: return NULL; } -int master_init( const char *control, const char *data ) +int master_init( const char *control, const char *data, const char *spool_dir ) { pthread_attr_t attr; int res; @@ -449,6 +520,7 @@ int master_init( const char *control, const char *data ) master_thread_data.control = control; master_thread_data.data = data; + master_thread_data.spool_dir = spool_dir; res = pthread_create( &master_thread, &attr, master_func, (void *)&master_thread_data ); if( res != 0 ) { @@ -495,3 +567,4 @@ int master_stop_worker( const char *name ) return res; } + |