summaryrefslogtreecommitdiff
path: root/src/worker.c
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2014-12-07 18:15:27 +0100
committerAndreas Baumann <mail@andreasbaumann.cc>2014-12-07 18:15:27 +0100
commit645bd551dcfb609ce4b57a205bbf1c3e129d4ef3 (patch)
tree9e89afea2e74628bb0350d2bfefb2903957badb1 /src/worker.c
parent5f1f7c844741a8c55705e8460fca54024c7d5102 (diff)
downloadbiruda-645bd551dcfb609ce4b57a205bbf1c3e129d4ef3.tar.gz
biruda-645bd551dcfb609ce4b57a205bbf1c3e129d4ef3.tar.bz2
added started, stopped messages for worker
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c128
1 files changed, 104 insertions, 24 deletions
diff --git a/src/worker.c b/src/worker.c
index d831031..3bebd60 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -70,18 +70,7 @@ typedef struct {
int sock;
} direct_glib_execution_worker_data_t;
-static void watch_child( GPid pid, gint status, gpointer *data )
-{
- direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data;
-
- printf( "Worker child with PID %d terminated.\n", wed->pid );
-
- g_spawn_close_pid( pid );
-
- g_main_loop_quit( wed->main_loop );
-}
-
-static char *create_worker_answer( const char *name, const char *s, size_t len, bool is_stdout )
+static char *create_worker_output_answer( const char *name, const char *s, size_t len, bool is_stdout )
{
json_object *obj = json_object_new_object( );
json_object *op = json_object_new_string( "output" );
@@ -114,24 +103,70 @@ static char *create_worker_answer( const char *name, const char *s, size_t len,
return res;
}
-static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpointer *data )
+static char *create_worker_termination_answer( const char *name, GPid pid )
{
- direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data;
- worker_t *worker = wed->worker;
+ json_object *obj = json_object_new_object( );
+ json_object *op = json_object_new_string( "terminated" );
+ json_object_object_add( obj, "op", op );
- bool is_stdout = ( wed->out_channel == channel );
+ /* we are a worker */
+ json_object *role = json_object_new_string( "worker" );
+ json_object_object_add( obj, "role", role );
- gchar *buf;
- gsize len;
+ json_object *name_obj = json_object_new_string( name );
+ json_object_object_add( obj, "worker", name_obj );
- if( ( condition & G_IO_HUP ) == G_IO_HUP ) {
- g_io_channel_unref( channel );
- return FALSE;
- }
+ json_object *pid_obj = json_object_new_int( pid );
+ json_object_object_add( obj, "pid", pid_obj );
+
+ time_t ts;
+ time( &ts );
+ json_object *timestamp_obj = json_object_new_int( ts );
+ json_object_object_add( obj, "timestamp", timestamp_obj );
+
+ /* produce message as string, caller must free it */
+ const char *msg = json_object_to_json_string( obj );
+ char *res = strdup( msg );
- g_io_channel_read_line( channel, &buf, &len, NULL, NULL );
+ json_object_put( obj );
+
+ return res;
+}
+
+static char *create_worker_created_answer( const char *name, GPid pid )
+{
+ json_object *obj = json_object_new_object( );
+ json_object *op = json_object_new_string( "created" );
+ json_object_object_add( obj, "op", op );
+
+ /* we are a worker */
+ json_object *role = json_object_new_string( "worker" );
+ json_object_object_add( obj, "role", role );
+
+ json_object *name_obj = json_object_new_string( name );
+ json_object_object_add( obj, "worker", name_obj );
+
+ json_object *pid_obj = json_object_new_int( pid );
+ json_object_object_add( obj, "pid", pid_obj );
+
+ time_t ts;
+ time( &ts );
+ json_object *timestamp_obj = json_object_new_int( ts );
+ json_object_object_add( obj, "timestamp", timestamp_obj );
+
+ /* produce message as string, caller must free it */
+ const char *msg = json_object_to_json_string( obj );
+ char *res = strdup( msg );
+
+ json_object_put( obj );
+
+ return res;
+}
+
+static int worker_send_message( worker_t *worker, const char *msg )
+{
+ direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)worker->execution_data;
- char *msg = create_worker_answer( worker->name, buf, len, is_stdout );
size_t msg_size = strlen( msg ) + 1;
printf( "worker send: %s\n", msg );
@@ -140,13 +175,54 @@ static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpoin
if( bytes < 0 ) {
fprintf( stderr, "ERROR: worker '%s' nn_send returned %d: %s (%d)\n",
worker->name, bytes, nn_strerror( errno ), errno );
+ return -1;
}
if( bytes != msg_size ) {
fprintf( stderr, "ERROR: worker '%s' truncated message!",
worker->name );
+ return -1;
+ }
+
+ return 0;
+}
+
+static void watch_child( GPid pid, gint status, gpointer *data )
+{
+ direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data;
+ worker_t *worker = wed->worker;
+
+ printf( "Worker child with PID %d terminated.\n", wed->pid );
+
+ char *msg = create_worker_termination_answer( worker->name, wed->pid );
+ worker_send_message( worker, msg );
+ free( msg );
+
+ g_spawn_close_pid( pid );
+
+ g_main_loop_quit( wed->main_loop );
+}
+
+static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpointer *data )
+{
+ direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data;
+ worker_t *worker = wed->worker;
+
+ bool is_stdout = ( wed->out_channel == channel );
+
+ gchar *buf;
+ gsize len;
+
+ if( ( condition & G_IO_HUP ) == G_IO_HUP ) {
+ g_io_channel_unref( channel );
+ return FALSE;
}
+ g_io_channel_read_line( channel, &buf, &len, NULL, NULL );
+
+ char *msg = create_worker_output_answer( worker->name, buf, len, is_stdout );
+ worker_send_message( worker, msg );
free( msg );
+
g_free( buf );
return TRUE;
@@ -185,6 +261,10 @@ static void *worker_func( void *thread_data )
printf( "Worker child with PID %d created: %s.\n",
wed->pid, worker->command );
+
+ char *msg = create_worker_created_answer( worker->name, wed->pid );
+ worker_send_message( worker, msg );
+ free( msg );
GMainContext *context = g_main_context_default( );
wed->main_loop = g_main_loop_new( context, TRUE );