diff options
author | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-07 18:15:27 +0100 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-07 18:15:27 +0100 |
commit | 645bd551dcfb609ce4b57a205bbf1c3e129d4ef3 (patch) | |
tree | 9e89afea2e74628bb0350d2bfefb2903957badb1 | |
parent | 5f1f7c844741a8c55705e8460fca54024c7d5102 (diff) | |
download | biruda-645bd551dcfb609ce4b57a205bbf1c3e129d4ef3.tar.gz biruda-645bd551dcfb609ce4b57a205bbf1c3e129d4ef3.tar.bz2 |
added started, stopped messages for worker
-rw-r--r-- | TODOS | 3 | ||||
-rw-r--r-- | src/master.c | 72 | ||||
-rw-r--r-- | src/worker.c | 128 |
3 files changed, 176 insertions, 27 deletions
@@ -1,6 +1,3 @@ -- surveyor -> bus, maybe also Windows blocking problem disappears - - every member of the net is allowed to receive data and to send - answers or to request or inform about something. - http_lib: - POST - handle answer body too diff --git a/src/master.c b/src/master.c index 387fe5f..a3d98b1 100644 --- a/src/master.c +++ b/src/master.c @@ -382,6 +382,74 @@ static int master_output_write( const char *spool_dir, json_object *obj ) return 0; } +static int master_output_write_started( const char *spool_dir, json_object *obj ) +{ + json_object *worker_obj; + json_object_object_get_ex( obj, "worker", &worker_obj ); + const char *name = json_object_get_string( worker_obj ); + + worker_t *w = worker_by_name( name ); + + if( w == NULL ) return 0; + + execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data; + + if( wed->spool_file == NULL ) { + master_output_init( spool_dir, name ); + if( wed->spool_file == NULL ) return 0; + } + + json_object *pid_obj; + json_object_object_get_ex( obj, "pid", &pid_obj ); + int pid = json_object_get_int( pid_obj ); + + json_object *ts_obj; + json_object_object_get_ex( obj, "timestamp", &ts_obj ); + time_t ts = json_object_get_int( ts_obj ); + + char line[1024]; + snprintf( line, sizeof( line ), "%lu %s --- STARTED %d ---\n", ts, name, pid ); + + fputs( line, wed->spool_file ); + fflush( wed->spool_file ); + + return 0; +} + +static int master_output_write_terminated( const char *spool_dir, json_object *obj ) +{ + json_object *worker_obj; + json_object_object_get_ex( obj, "worker", &worker_obj ); + const char *name = json_object_get_string( worker_obj ); + + worker_t *w = worker_by_name( name ); + + if( w == NULL ) return 0; + + execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data; + + if( wed->spool_file == NULL ) { + master_output_init( spool_dir, name ); + if( wed->spool_file == NULL ) return 0; + } + + json_object *pid_obj; + json_object_object_get_ex( obj, "pid", &pid_obj ); + int pid = json_object_get_int( pid_obj ); + + json_object *ts_obj; + json_object_object_get_ex( obj, "timestamp", &ts_obj ); + time_t ts = json_object_get_int( ts_obj ); + + char line[1024]; + snprintf( line, sizeof( line ), "%lu %s --- STOPPED %d ---\n", ts, name, pid ); + + fputs( line, wed->spool_file ); + fflush( wed->spool_file ); + + return 0; +} + static void *master_func( void *thread_data ) { master_thread_data_t *tdata = (master_thread_data_t *)thread_data; @@ -496,6 +564,10 @@ NEXT_COMMAND: json_object_object_get_ex( recv_obj, "worker", &worker_obj ); const char *worker = json_object_get_string( worker_obj ); master_output_free( worker ); + } else if( strcmp( op, "created" ) == 0 ) { + master_output_write_started( tdata->spool_dir, recv_obj ); + } else if( strcmp( op, "terminated" ) == 0 ) { + master_output_write_terminated( tdata->spool_dir, recv_obj ); } else if( strcmp( op, "output" ) == 0 ) { master_output_write( tdata->spool_dir, recv_obj ); } else { diff --git a/src/worker.c b/src/worker.c index d831031..3bebd60 100644 --- a/src/worker.c +++ b/src/worker.c @@ -70,18 +70,7 @@ typedef struct { int sock; } direct_glib_execution_worker_data_t; -static void watch_child( GPid pid, gint status, gpointer *data ) -{ - direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data; - - printf( "Worker child with PID %d terminated.\n", wed->pid ); - - g_spawn_close_pid( pid ); - - g_main_loop_quit( wed->main_loop ); -} - -static char *create_worker_answer( const char *name, const char *s, size_t len, bool is_stdout ) +static char *create_worker_output_answer( const char *name, const char *s, size_t len, bool is_stdout ) { json_object *obj = json_object_new_object( ); json_object *op = json_object_new_string( "output" ); @@ -114,24 +103,70 @@ static char *create_worker_answer( const char *name, const char *s, size_t len, return res; } -static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpointer *data ) +static char *create_worker_termination_answer( const char *name, GPid pid ) { - direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data; - worker_t *worker = wed->worker; + json_object *obj = json_object_new_object( ); + json_object *op = json_object_new_string( "terminated" ); + json_object_object_add( obj, "op", op ); - bool is_stdout = ( wed->out_channel == channel ); + /* we are a worker */ + json_object *role = json_object_new_string( "worker" ); + json_object_object_add( obj, "role", role ); - gchar *buf; - gsize len; + json_object *name_obj = json_object_new_string( name ); + json_object_object_add( obj, "worker", name_obj ); - if( ( condition & G_IO_HUP ) == G_IO_HUP ) { - g_io_channel_unref( channel ); - return FALSE; - } + json_object *pid_obj = json_object_new_int( pid ); + json_object_object_add( obj, "pid", pid_obj ); + + time_t ts; + time( &ts ); + json_object *timestamp_obj = json_object_new_int( ts ); + json_object_object_add( obj, "timestamp", timestamp_obj ); + + /* produce message as string, caller must free it */ + const char *msg = json_object_to_json_string( obj ); + char *res = strdup( msg ); - g_io_channel_read_line( channel, &buf, &len, NULL, NULL ); + json_object_put( obj ); + + return res; +} + +static char *create_worker_created_answer( const char *name, GPid pid ) +{ + json_object *obj = json_object_new_object( ); + json_object *op = json_object_new_string( "created" ); + json_object_object_add( obj, "op", op ); + + /* we are a worker */ + json_object *role = json_object_new_string( "worker" ); + json_object_object_add( obj, "role", role ); + + json_object *name_obj = json_object_new_string( name ); + json_object_object_add( obj, "worker", name_obj ); + + json_object *pid_obj = json_object_new_int( pid ); + json_object_object_add( obj, "pid", pid_obj ); + + time_t ts; + time( &ts ); + json_object *timestamp_obj = json_object_new_int( ts ); + json_object_object_add( obj, "timestamp", timestamp_obj ); + + /* produce message as string, caller must free it */ + const char *msg = json_object_to_json_string( obj ); + char *res = strdup( msg ); + + json_object_put( obj ); + + return res; +} + +static int worker_send_message( worker_t *worker, const char *msg ) +{ + direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)worker->execution_data; - char *msg = create_worker_answer( worker->name, buf, len, is_stdout ); size_t msg_size = strlen( msg ) + 1; printf( "worker send: %s\n", msg ); @@ -140,13 +175,54 @@ static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpoin if( bytes < 0 ) { fprintf( stderr, "ERROR: worker '%s' nn_send returned %d: %s (%d)\n", worker->name, bytes, nn_strerror( errno ), errno ); + return -1; } if( bytes != msg_size ) { fprintf( stderr, "ERROR: worker '%s' truncated message!", worker->name ); + return -1; + } + + return 0; +} + +static void watch_child( GPid pid, gint status, gpointer *data ) +{ + direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data; + worker_t *worker = wed->worker; + + printf( "Worker child with PID %d terminated.\n", wed->pid ); + + char *msg = create_worker_termination_answer( worker->name, wed->pid ); + worker_send_message( worker, msg ); + free( msg ); + + g_spawn_close_pid( pid ); + + g_main_loop_quit( wed->main_loop ); +} + +static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpointer *data ) +{ + direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data; + worker_t *worker = wed->worker; + + bool is_stdout = ( wed->out_channel == channel ); + + gchar *buf; + gsize len; + + if( ( condition & G_IO_HUP ) == G_IO_HUP ) { + g_io_channel_unref( channel ); + return FALSE; } + g_io_channel_read_line( channel, &buf, &len, NULL, NULL ); + + char *msg = create_worker_output_answer( worker->name, buf, len, is_stdout ); + worker_send_message( worker, msg ); free( msg ); + g_free( buf ); return TRUE; @@ -185,6 +261,10 @@ static void *worker_func( void *thread_data ) printf( "Worker child with PID %d created: %s.\n", wed->pid, worker->command ); + + char *msg = create_worker_created_answer( worker->name, wed->pid ); + worker_send_message( worker, msg ); + free( msg ); GMainContext *context = g_main_context_default( ); wed->main_loop = g_main_loop_new( context, TRUE ); |