summaryrefslogtreecommitdiff
path: root/src/worker.c
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2014-12-05 15:18:48 +0100
committerAndreas Baumann <mail@andreasbaumann.cc>2014-12-05 15:18:48 +0100
commit67ba87a159fb55196092e59c041af6d69ce07e75 (patch)
tree10a37a3338647dce6c83a474b2ed4eb10612c91c /src/worker.c
parent7dae7024079fd2c3bec178cd92f47ac993119176 (diff)
downloadbiruda-67ba87a159fb55196092e59c041af6d69ce07e75.tar.gz
biruda-67ba87a159fb55196092e59c041af6d69ce07e75.tar.bz2
added output message for worker output sent to master, master prints only debug message for now
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c62
1 files changed, 58 insertions, 4 deletions
diff --git a/src/worker.c b/src/worker.c
index 03be16c..e635589 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -1,6 +1,7 @@
#include "worker.h"
#include "nanomsg/nn.h"
+#include "nanomsg/pipeline.h"
#include "port.h"
@@ -14,6 +15,8 @@
#include <signal.h>
+#include "json-c/json.h"
+
const char *worker_exection_mode_str( worker_execution_mode_t mode )
{
switch( mode ) {
@@ -64,7 +67,7 @@ typedef struct {
worker_t *worker;
pthread_t thread;
GMainLoop *main_loop;
-
+ int sock;
} direct_glib_execution_worker_data_t;
static void watch_child( GPid pid, gint status, gpointer *data )
@@ -78,12 +81,40 @@ static void watch_child( GPid pid, gint status, gpointer *data )
g_main_loop_quit( wed->main_loop );
}
+static char *create_worker_answer( const char *name, const char *s, size_t len, bool is_stdout )
+{
+ json_object *obj = json_object_new_object( );
+ json_object *op = json_object_new_string( "output" );
+ json_object_object_add( obj, "op", op );
+
+ /* we are a worker */
+ json_object *role = json_object_new_string( "worker" );
+ json_object_object_add( obj, "role", role );
+
+ json_object *name_obj = json_object_new_string( name );
+ json_object_object_add( obj, "worker", name_obj );
+
+ json_object *msg_obj = json_object_new_string_len( s, len );
+ json_object_object_add( obj, "msg", msg_obj );
+
+ json_object *is_stdout_obj = json_object_new_boolean( is_stdout );
+ json_object_object_add( obj, "stdout", is_stdout_obj );
+
+ /* produce message as string, caller must free it */
+ const char *msg = json_object_to_json_string( obj );
+ char *res = strdup( msg );
+
+ json_object_put( obj );
+
+ return res;
+}
+
static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpointer *data )
{
direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data;
worker_t *worker = wed->worker;
- bool isStdout = ( wed->out_channel == channel );
+ bool is_stdout = ( wed->out_channel == channel );
gchar *buf;
gsize len;
@@ -94,10 +125,23 @@ static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpoin
}
g_io_channel_read_line( channel, &buf, &len, NULL, NULL );
+
+ char *msg = create_worker_answer( worker->name, buf, len, is_stdout );
+ size_t msg_size = strlen( msg ) + 1;
- fprintf( stderr, "worker %s %s: %*s\n", worker->name,
- isStdout ? "stdout" : "stderr", len, buf );
+ printf( "worker send: %s\n", msg );
+ int bytes = nn_send( wed->sock, msg, msg_size, 0 );
+ if( bytes < 0 ) {
+ fprintf( stderr, "ERROR: worker '%s' nn_send returned %d: %s (%d)\n",
+ worker->name, bytes, nn_strerror( errno ), errno );
+ }
+ if( bytes != msg_size ) {
+ fprintf( stderr, "ERROR: worker '%s' truncated message!",
+ worker->name );
+ }
+
+ free( msg );
g_free( buf );
return TRUE;
@@ -117,6 +161,12 @@ static void *worker_func( void *thread_data )
g_strfreev( args );
return NULL;
}
+
+ wed->sock = nn_socket( AF_SP, NN_PUSH );
+
+ (void)nn_connect( wed->sock, worker->data );
+
+ printf( "worker connected to data channel '%s'\n", worker->data );
error = NULL;
gboolean ret = g_spawn_async_with_pipes( NULL,
@@ -157,6 +207,10 @@ static void *worker_func( void *thread_data )
worker->state = WORKER_STATE_STOPPED;
+ (void)nn_shutdown( wed->sock, 0 );
+
+ puts( "worker disconnected" );
+
free( worker->execution_data );
worker->execution_data = NULL;