summaryrefslogtreecommitdiff
path: root/src/master.c
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2014-12-05 20:30:27 +0100
committerAndreas Baumann <mail@andreasbaumann.cc>2014-12-05 20:30:27 +0100
commitc81701d1bf5f20932ba77b3b1e1893a51e182f73 (patch)
tree6d8658f8ea40f114ff759c26077e2093e74ac817 /src/master.c
parent67ba87a159fb55196092e59c041af6d69ce07e75 (diff)
downloadbiruda-c81701d1bf5f20932ba77b3b1e1893a51e182f73.tar.gz
biruda-c81701d1bf5f20932ba77b3b1e1893a51e182f73.tar.bz2
first working version of worker spool files in the master
Diffstat (limited to 'src/master.c')
-rw-r--r--src/master.c81
1 files changed, 77 insertions, 4 deletions
diff --git a/src/master.c b/src/master.c
index d1f002d..0c59db1 100644
--- a/src/master.c
+++ b/src/master.c
@@ -79,6 +79,10 @@ static void update_workers( coordinator_t *coord, json_object *obj )
// TODO: check for deleted workers
}
+typedef struct {
+ FILE *spool_file;
+} execution_worker_data_t;
+
static int register_coordinator( json_object *obj )
{
json_object *host_obj;
@@ -162,6 +166,10 @@ static int register_coordinator( json_object *obj )
} else {
w->mode = WORKER_EXECUTION_DISABLED;
}
+
+ w->execution_data = malloc( sizeof( execution_worker_data_t ) );
+ execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data;
+ wed->spool_file = NULL;
json_object *state_obj;
json_object_object_get_ex( worker_obj, "state", &state_obj );
@@ -283,10 +291,65 @@ static char *get_next_request( )
typedef struct {
const char *control;
const char *data;
+ const char *spool_dir;
} master_thread_data_t;
static master_thread_data_t master_thread_data;
+static worker_t *worker_by_name( const char *name )
+{
+ for( int c = 0; c < MAX_COORDINATORS; c++ ) {
+ if( coordinator[c].used ) {
+ for( int i = 0; i < coordinator[c].nof_workers; i++ ) {
+ worker_t *w = &coordinator[c].worker[i];
+ if( strcmp( name, w->name ) == 0 ) {
+ return w;
+ }
+ }
+ }
+ }
+
+ return NULL;
+}
+
+static int master_output_init( const char *spool_dir, const char *name )
+{
+ char filename[1024];
+ worker_t *worker = worker_by_name( name );
+
+ snprintf( filename, sizeof( filename ), "%s%c%s.output",
+ spool_dir, PORT_DIR_SEPARATOR, worker->name );
+
+ execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data;
+ wed->spool_file = fopen( filename, "a+" );
+
+ return 0;
+}
+
+static int master_output_free( const char *name )
+{
+ worker_t *worker = worker_by_name( name );
+ execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data;
+
+ if( wed->spool_file != NULL ) {
+ fclose( wed->spool_file );
+ wed->spool_file = NULL;
+ }
+
+ return 0;
+}
+
+static int master_output_write( const char *name, const char *msg )
+{
+ worker_t *worker = worker_by_name( name );
+ execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data;
+
+ fputs( msg, wed->spool_file );
+ fflush( wed->spool_file );
+
+ return 0;
+}
+
static void *master_func( void *thread_data )
{
master_thread_data_t *tdata = (master_thread_data_t *)thread_data;
@@ -391,6 +454,16 @@ NEXT_COMMAND:
continue;
}
}
+ } else if( strcmp( op, "started" ) == 0 ) {
+ json_object *worker_obj;
+ json_object_object_get_ex( recv_obj, "worker", &worker_obj );
+ const char *worker = json_object_get_string( worker_obj );
+ master_output_init( tdata->spool_dir, worker );
+ } else if( strcmp( op, "stopped" ) == 0 ) {
+ json_object *worker_obj;
+ json_object_object_get_ex( recv_obj, "worker", &worker_obj );
+ const char *worker = json_object_get_string( worker_obj );
+ master_output_free( worker );
} else if( strcmp( op, "output" ) == 0 ) {
json_object *worker_obj;
json_object_object_get_ex( recv_obj, "worker", &worker_obj );
@@ -398,9 +471,7 @@ NEXT_COMMAND:
json_object *msg_obj;
json_object_object_get_ex( recv_obj, "msg", &msg_obj );
const char *msg = json_object_get_string( msg_obj );
-
- printf( "Got message from worker '%s': %s\n", worker, msg );
-
+ master_output_write( worker, msg );
} else {
fprintf( stderr, "WARNING: master received unkown message: %s\n", answer );
}
@@ -433,7 +504,7 @@ NEXT_COMMAND:
return NULL;
}
-int master_init( const char *control, const char *data )
+int master_init( const char *control, const char *data, const char *spool_dir )
{
pthread_attr_t attr;
int res;
@@ -449,6 +520,7 @@ int master_init( const char *control, const char *data )
master_thread_data.control = control;
master_thread_data.data = data;
+ master_thread_data.spool_dir = spool_dir;
res = pthread_create( &master_thread, &attr, master_func, (void *)&master_thread_data );
if( res != 0 ) {
@@ -495,3 +567,4 @@ int master_stop_worker( const char *name )
return res;
}
+