summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2014-12-07 18:15:27 +0100
committerAndreas Baumann <mail@andreasbaumann.cc>2014-12-07 18:15:27 +0100
commit645bd551dcfb609ce4b57a205bbf1c3e129d4ef3 (patch)
tree9e89afea2e74628bb0350d2bfefb2903957badb1 /src
parent5f1f7c844741a8c55705e8460fca54024c7d5102 (diff)
downloadbiruda-645bd551dcfb609ce4b57a205bbf1c3e129d4ef3.tar.gz
biruda-645bd551dcfb609ce4b57a205bbf1c3e129d4ef3.tar.bz2
added started, stopped messages for worker
Diffstat (limited to 'src')
-rw-r--r--src/master.c72
-rw-r--r--src/worker.c128
2 files changed, 176 insertions, 24 deletions
diff --git a/src/master.c b/src/master.c
index 387fe5f..a3d98b1 100644
--- a/src/master.c
+++ b/src/master.c
@@ -382,6 +382,74 @@ static int master_output_write( const char *spool_dir, json_object *obj )
return 0;
}
+static int master_output_write_started( const char *spool_dir, json_object *obj )
+{
+ json_object *worker_obj;
+ json_object_object_get_ex( obj, "worker", &worker_obj );
+ const char *name = json_object_get_string( worker_obj );
+
+ worker_t *w = worker_by_name( name );
+
+ if( w == NULL ) return 0;
+
+ execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data;
+
+ if( wed->spool_file == NULL ) {
+ master_output_init( spool_dir, name );
+ if( wed->spool_file == NULL ) return 0;
+ }
+
+ json_object *pid_obj;
+ json_object_object_get_ex( obj, "pid", &pid_obj );
+ int pid = json_object_get_int( pid_obj );
+
+ json_object *ts_obj;
+ json_object_object_get_ex( obj, "timestamp", &ts_obj );
+ time_t ts = json_object_get_int( ts_obj );
+
+ char line[1024];
+ snprintf( line, sizeof( line ), "%lu %s --- STARTED %d ---\n", ts, name, pid );
+
+ fputs( line, wed->spool_file );
+ fflush( wed->spool_file );
+
+ return 0;
+}
+
+static int master_output_write_terminated( const char *spool_dir, json_object *obj )
+{
+ json_object *worker_obj;
+ json_object_object_get_ex( obj, "worker", &worker_obj );
+ const char *name = json_object_get_string( worker_obj );
+
+ worker_t *w = worker_by_name( name );
+
+ if( w == NULL ) return 0;
+
+ execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data;
+
+ if( wed->spool_file == NULL ) {
+ master_output_init( spool_dir, name );
+ if( wed->spool_file == NULL ) return 0;
+ }
+
+ json_object *pid_obj;
+ json_object_object_get_ex( obj, "pid", &pid_obj );
+ int pid = json_object_get_int( pid_obj );
+
+ json_object *ts_obj;
+ json_object_object_get_ex( obj, "timestamp", &ts_obj );
+ time_t ts = json_object_get_int( ts_obj );
+
+ char line[1024];
+ snprintf( line, sizeof( line ), "%lu %s --- STOPPED %d ---\n", ts, name, pid );
+
+ fputs( line, wed->spool_file );
+ fflush( wed->spool_file );
+
+ return 0;
+}
+
static void *master_func( void *thread_data )
{
master_thread_data_t *tdata = (master_thread_data_t *)thread_data;
@@ -496,6 +564,10 @@ NEXT_COMMAND:
json_object_object_get_ex( recv_obj, "worker", &worker_obj );
const char *worker = json_object_get_string( worker_obj );
master_output_free( worker );
+ } else if( strcmp( op, "created" ) == 0 ) {
+ master_output_write_started( tdata->spool_dir, recv_obj );
+ } else if( strcmp( op, "terminated" ) == 0 ) {
+ master_output_write_terminated( tdata->spool_dir, recv_obj );
} else if( strcmp( op, "output" ) == 0 ) {
master_output_write( tdata->spool_dir, recv_obj );
} else {
diff --git a/src/worker.c b/src/worker.c
index d831031..3bebd60 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -70,18 +70,7 @@ typedef struct {
int sock;
} direct_glib_execution_worker_data_t;
-static void watch_child( GPid pid, gint status, gpointer *data )
-{
- direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data;
-
- printf( "Worker child with PID %d terminated.\n", wed->pid );
-
- g_spawn_close_pid( pid );
-
- g_main_loop_quit( wed->main_loop );
-}
-
-static char *create_worker_answer( const char *name, const char *s, size_t len, bool is_stdout )
+static char *create_worker_output_answer( const char *name, const char *s, size_t len, bool is_stdout )
{
json_object *obj = json_object_new_object( );
json_object *op = json_object_new_string( "output" );
@@ -114,24 +103,70 @@ static char *create_worker_answer( const char *name, const char *s, size_t len,
return res;
}
-static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpointer *data )
+static char *create_worker_termination_answer( const char *name, GPid pid )
{
- direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data;
- worker_t *worker = wed->worker;
+ json_object *obj = json_object_new_object( );
+ json_object *op = json_object_new_string( "terminated" );
+ json_object_object_add( obj, "op", op );
- bool is_stdout = ( wed->out_channel == channel );
+ /* we are a worker */
+ json_object *role = json_object_new_string( "worker" );
+ json_object_object_add( obj, "role", role );
- gchar *buf;
- gsize len;
+ json_object *name_obj = json_object_new_string( name );
+ json_object_object_add( obj, "worker", name_obj );
- if( ( condition & G_IO_HUP ) == G_IO_HUP ) {
- g_io_channel_unref( channel );
- return FALSE;
- }
+ json_object *pid_obj = json_object_new_int( pid );
+ json_object_object_add( obj, "pid", pid_obj );
+
+ time_t ts;
+ time( &ts );
+ json_object *timestamp_obj = json_object_new_int( ts );
+ json_object_object_add( obj, "timestamp", timestamp_obj );
+
+ /* produce message as string, caller must free it */
+ const char *msg = json_object_to_json_string( obj );
+ char *res = strdup( msg );
- g_io_channel_read_line( channel, &buf, &len, NULL, NULL );
+ json_object_put( obj );
+
+ return res;
+}
+
+static char *create_worker_created_answer( const char *name, GPid pid )
+{
+ json_object *obj = json_object_new_object( );
+ json_object *op = json_object_new_string( "created" );
+ json_object_object_add( obj, "op", op );
+
+ /* we are a worker */
+ json_object *role = json_object_new_string( "worker" );
+ json_object_object_add( obj, "role", role );
+
+ json_object *name_obj = json_object_new_string( name );
+ json_object_object_add( obj, "worker", name_obj );
+
+ json_object *pid_obj = json_object_new_int( pid );
+ json_object_object_add( obj, "pid", pid_obj );
+
+ time_t ts;
+ time( &ts );
+ json_object *timestamp_obj = json_object_new_int( ts );
+ json_object_object_add( obj, "timestamp", timestamp_obj );
+
+ /* produce message as string, caller must free it */
+ const char *msg = json_object_to_json_string( obj );
+ char *res = strdup( msg );
+
+ json_object_put( obj );
+
+ return res;
+}
+
+static int worker_send_message( worker_t *worker, const char *msg )
+{
+ direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)worker->execution_data;
- char *msg = create_worker_answer( worker->name, buf, len, is_stdout );
size_t msg_size = strlen( msg ) + 1;
printf( "worker send: %s\n", msg );
@@ -140,13 +175,54 @@ static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpoin
if( bytes < 0 ) {
fprintf( stderr, "ERROR: worker '%s' nn_send returned %d: %s (%d)\n",
worker->name, bytes, nn_strerror( errno ), errno );
+ return -1;
}
if( bytes != msg_size ) {
fprintf( stderr, "ERROR: worker '%s' truncated message!",
worker->name );
+ return -1;
+ }
+
+ return 0;
+}
+
+static void watch_child( GPid pid, gint status, gpointer *data )
+{
+ direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data;
+ worker_t *worker = wed->worker;
+
+ printf( "Worker child with PID %d terminated.\n", wed->pid );
+
+ char *msg = create_worker_termination_answer( worker->name, wed->pid );
+ worker_send_message( worker, msg );
+ free( msg );
+
+ g_spawn_close_pid( pid );
+
+ g_main_loop_quit( wed->main_loop );
+}
+
+static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpointer *data )
+{
+ direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data;
+ worker_t *worker = wed->worker;
+
+ bool is_stdout = ( wed->out_channel == channel );
+
+ gchar *buf;
+ gsize len;
+
+ if( ( condition & G_IO_HUP ) == G_IO_HUP ) {
+ g_io_channel_unref( channel );
+ return FALSE;
}
+ g_io_channel_read_line( channel, &buf, &len, NULL, NULL );
+
+ char *msg = create_worker_output_answer( worker->name, buf, len, is_stdout );
+ worker_send_message( worker, msg );
free( msg );
+
g_free( buf );
return TRUE;
@@ -185,6 +261,10 @@ static void *worker_func( void *thread_data )
printf( "Worker child with PID %d created: %s.\n",
wed->pid, worker->command );
+
+ char *msg = create_worker_created_answer( worker->name, wed->pid );
+ worker_send_message( worker, msg );
+ free( msg );
GMainContext *context = g_main_context_default( );
wed->main_loop = g_main_loop_new( context, TRUE );