diff options
author | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-07 18:15:27 +0100 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-07 18:15:27 +0100 |
commit | 645bd551dcfb609ce4b57a205bbf1c3e129d4ef3 (patch) | |
tree | 9e89afea2e74628bb0350d2bfefb2903957badb1 /src/worker.c | |
parent | 5f1f7c844741a8c55705e8460fca54024c7d5102 (diff) | |
download | biruda-645bd551dcfb609ce4b57a205bbf1c3e129d4ef3.tar.gz biruda-645bd551dcfb609ce4b57a205bbf1c3e129d4ef3.tar.bz2 |
added started, stopped messages for worker
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 128 |
1 files changed, 104 insertions, 24 deletions
diff --git a/src/worker.c b/src/worker.c index d831031..3bebd60 100644 --- a/src/worker.c +++ b/src/worker.c @@ -70,18 +70,7 @@ typedef struct { int sock; } direct_glib_execution_worker_data_t; -static void watch_child( GPid pid, gint status, gpointer *data ) -{ - direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data; - - printf( "Worker child with PID %d terminated.\n", wed->pid ); - - g_spawn_close_pid( pid ); - - g_main_loop_quit( wed->main_loop ); -} - -static char *create_worker_answer( const char *name, const char *s, size_t len, bool is_stdout ) +static char *create_worker_output_answer( const char *name, const char *s, size_t len, bool is_stdout ) { json_object *obj = json_object_new_object( ); json_object *op = json_object_new_string( "output" ); @@ -114,24 +103,70 @@ static char *create_worker_answer( const char *name, const char *s, size_t len, return res; } -static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpointer *data ) +static char *create_worker_termination_answer( const char *name, GPid pid ) { - direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data; - worker_t *worker = wed->worker; + json_object *obj = json_object_new_object( ); + json_object *op = json_object_new_string( "terminated" ); + json_object_object_add( obj, "op", op ); - bool is_stdout = ( wed->out_channel == channel ); + /* we are a worker */ + json_object *role = json_object_new_string( "worker" ); + json_object_object_add( obj, "role", role ); - gchar *buf; - gsize len; + json_object *name_obj = json_object_new_string( name ); + json_object_object_add( obj, "worker", name_obj ); - if( ( condition & G_IO_HUP ) == G_IO_HUP ) { - g_io_channel_unref( channel ); - return FALSE; - } + json_object *pid_obj = json_object_new_int( pid ); + json_object_object_add( obj, "pid", pid_obj ); + + time_t ts; + time( &ts ); + json_object *timestamp_obj = json_object_new_int( ts ); + json_object_object_add( obj, "timestamp", timestamp_obj ); + + /* produce message as string, caller must free it */ + const char *msg = json_object_to_json_string( obj ); + char *res = strdup( msg ); - g_io_channel_read_line( channel, &buf, &len, NULL, NULL ); + json_object_put( obj ); + + return res; +} + +static char *create_worker_created_answer( const char *name, GPid pid ) +{ + json_object *obj = json_object_new_object( ); + json_object *op = json_object_new_string( "created" ); + json_object_object_add( obj, "op", op ); + + /* we are a worker */ + json_object *role = json_object_new_string( "worker" ); + json_object_object_add( obj, "role", role ); + + json_object *name_obj = json_object_new_string( name ); + json_object_object_add( obj, "worker", name_obj ); + + json_object *pid_obj = json_object_new_int( pid ); + json_object_object_add( obj, "pid", pid_obj ); + + time_t ts; + time( &ts ); + json_object *timestamp_obj = json_object_new_int( ts ); + json_object_object_add( obj, "timestamp", timestamp_obj ); + + /* produce message as string, caller must free it */ + const char *msg = json_object_to_json_string( obj ); + char *res = strdup( msg ); + + json_object_put( obj ); + + return res; +} + +static int worker_send_message( worker_t *worker, const char *msg ) +{ + direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)worker->execution_data; - char *msg = create_worker_answer( worker->name, buf, len, is_stdout ); size_t msg_size = strlen( msg ) + 1; printf( "worker send: %s\n", msg ); @@ -140,13 +175,54 @@ static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpoin if( bytes < 0 ) { fprintf( stderr, "ERROR: worker '%s' nn_send returned %d: %s (%d)\n", worker->name, bytes, nn_strerror( errno ), errno ); + return -1; } if( bytes != msg_size ) { fprintf( stderr, "ERROR: worker '%s' truncated message!", worker->name ); + return -1; + } + + return 0; +} + +static void watch_child( GPid pid, gint status, gpointer *data ) +{ + direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data; + worker_t *worker = wed->worker; + + printf( "Worker child with PID %d terminated.\n", wed->pid ); + + char *msg = create_worker_termination_answer( worker->name, wed->pid ); + worker_send_message( worker, msg ); + free( msg ); + + g_spawn_close_pid( pid ); + + g_main_loop_quit( wed->main_loop ); +} + +static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpointer *data ) +{ + direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data; + worker_t *worker = wed->worker; + + bool is_stdout = ( wed->out_channel == channel ); + + gchar *buf; + gsize len; + + if( ( condition & G_IO_HUP ) == G_IO_HUP ) { + g_io_channel_unref( channel ); + return FALSE; } + g_io_channel_read_line( channel, &buf, &len, NULL, NULL ); + + char *msg = create_worker_output_answer( worker->name, buf, len, is_stdout ); + worker_send_message( worker, msg ); free( msg ); + g_free( buf ); return TRUE; @@ -185,6 +261,10 @@ static void *worker_func( void *thread_data ) printf( "Worker child with PID %d created: %s.\n", wed->pid, worker->command ); + + char *msg = create_worker_created_answer( worker->name, wed->pid ); + worker_send_message( worker, msg ); + free( msg ); GMainContext *context = g_main_context_default( ); wed->main_loop = g_main_loop_new( context, TRUE ); |