diff options
author | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-05 15:18:48 +0100 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-05 15:18:48 +0100 |
commit | 67ba87a159fb55196092e59c041af6d69ce07e75 (patch) | |
tree | 10a37a3338647dce6c83a474b2ed4eb10612c91c /src/worker.c | |
parent | 7dae7024079fd2c3bec178cd92f47ac993119176 (diff) | |
download | biruda-67ba87a159fb55196092e59c041af6d69ce07e75.tar.gz biruda-67ba87a159fb55196092e59c041af6d69ce07e75.tar.bz2 |
added output message for worker output sent to master, master prints only debug message for now
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 62 |
1 files changed, 58 insertions, 4 deletions
diff --git a/src/worker.c b/src/worker.c index 03be16c..e635589 100644 --- a/src/worker.c +++ b/src/worker.c @@ -1,6 +1,7 @@ #include "worker.h" #include "nanomsg/nn.h" +#include "nanomsg/pipeline.h" #include "port.h" @@ -14,6 +15,8 @@ #include <signal.h> +#include "json-c/json.h" + const char *worker_exection_mode_str( worker_execution_mode_t mode ) { switch( mode ) { @@ -64,7 +67,7 @@ typedef struct { worker_t *worker; pthread_t thread; GMainLoop *main_loop; - + int sock; } direct_glib_execution_worker_data_t; static void watch_child( GPid pid, gint status, gpointer *data ) @@ -78,12 +81,40 @@ static void watch_child( GPid pid, gint status, gpointer *data ) g_main_loop_quit( wed->main_loop ); } +static char *create_worker_answer( const char *name, const char *s, size_t len, bool is_stdout ) +{ + json_object *obj = json_object_new_object( ); + json_object *op = json_object_new_string( "output" ); + json_object_object_add( obj, "op", op ); + + /* we are a worker */ + json_object *role = json_object_new_string( "worker" ); + json_object_object_add( obj, "role", role ); + + json_object *name_obj = json_object_new_string( name ); + json_object_object_add( obj, "worker", name_obj ); + + json_object *msg_obj = json_object_new_string_len( s, len ); + json_object_object_add( obj, "msg", msg_obj ); + + json_object *is_stdout_obj = json_object_new_boolean( is_stdout ); + json_object_object_add( obj, "stdout", is_stdout_obj ); + + /* produce message as string, caller must free it */ + const char *msg = json_object_to_json_string( obj ); + char *res = strdup( msg ); + + json_object_put( obj ); + + return res; +} + static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpointer *data ) { direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data; worker_t *worker = wed->worker; - bool isStdout = ( wed->out_channel == channel ); + bool is_stdout = ( wed->out_channel == channel ); gchar *buf; gsize len; @@ -94,10 +125,23 @@ static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpoin } g_io_channel_read_line( channel, &buf, &len, NULL, NULL ); + + char *msg = create_worker_answer( worker->name, buf, len, is_stdout ); + size_t msg_size = strlen( msg ) + 1; - fprintf( stderr, "worker %s %s: %*s\n", worker->name, - isStdout ? "stdout" : "stderr", len, buf ); + printf( "worker send: %s\n", msg ); + int bytes = nn_send( wed->sock, msg, msg_size, 0 ); + if( bytes < 0 ) { + fprintf( stderr, "ERROR: worker '%s' nn_send returned %d: %s (%d)\n", + worker->name, bytes, nn_strerror( errno ), errno ); + } + if( bytes != msg_size ) { + fprintf( stderr, "ERROR: worker '%s' truncated message!", + worker->name ); + } + + free( msg ); g_free( buf ); return TRUE; @@ -117,6 +161,12 @@ static void *worker_func( void *thread_data ) g_strfreev( args ); return NULL; } + + wed->sock = nn_socket( AF_SP, NN_PUSH ); + + (void)nn_connect( wed->sock, worker->data ); + + printf( "worker connected to data channel '%s'\n", worker->data ); error = NULL; gboolean ret = g_spawn_async_with_pipes( NULL, @@ -157,6 +207,10 @@ static void *worker_func( void *thread_data ) worker->state = WORKER_STATE_STOPPED; + (void)nn_shutdown( wed->sock, 0 ); + + puts( "worker disconnected" ); + free( worker->execution_data ); worker->execution_data = NULL; |