diff options
author | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-05 15:18:48 +0100 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-05 15:18:48 +0100 |
commit | 67ba87a159fb55196092e59c041af6d69ce07e75 (patch) | |
tree | 10a37a3338647dce6c83a474b2ed4eb10612c91c | |
parent | 7dae7024079fd2c3bec178cd92f47ac993119176 (diff) | |
download | biruda-67ba87a159fb55196092e59c041af6d69ce07e75.tar.gz biruda-67ba87a159fb55196092e59c041af6d69ce07e75.tar.bz2 |
added output message for worker output sent to master, master prints only debug message for now
-rw-r--r-- | src/biruda.c | 7 | ||||
-rw-r--r-- | src/coordinator.c | 10 | ||||
-rw-r--r-- | src/coordinator.h | 3 | ||||
-rw-r--r-- | src/master.c | 82 | ||||
-rw-r--r-- | src/master.h | 2 | ||||
-rw-r--r-- | src/worker.c | 62 | ||||
-rw-r--r-- | src/worker.h | 2 |
7 files changed, 144 insertions, 24 deletions
diff --git a/src/biruda.c b/src/biruda.c index 834b72f..3fae736 100644 --- a/src/biruda.c +++ b/src/biruda.c @@ -268,8 +268,9 @@ static int create_master( cfg_t *cfg ) { cfg_t *master_cfg = cfg_getnsec( cfg, "master", 0 ); char *control = cfg_getstr( master_cfg, "control" ); + char *data = cfg_getstr( master_cfg, "data" ); - return master_init( control ); + return master_init( control, data ); } static int create_coordinator( cfg_t *cfg ) @@ -285,7 +286,9 @@ static int create_coordinator( cfg_t *cfg ) cfg_t *worker_cfg = cfg_getnsec( cfg, "worker", i ); coordinator_add_worker( cfg_title( worker_cfg ), (worker_execution_mode_t)cfg_getint( worker_cfg, "execution" ), - cfg_getstr( worker_cfg, "command" ) ); + cfg_getstr( worker_cfg, "command" ), + cfg_getstr( worker_cfg, "control" ), + cfg_getstr( worker_cfg, "data" ) ); } } diff --git a/src/coordinator.c b/src/coordinator.c index 1366d88..b1412e0 100644 --- a/src/coordinator.c +++ b/src/coordinator.c @@ -235,10 +235,6 @@ static void *coordinator_func( void *thread_data ) printf( "coordinator idle: %d\n", coordinator_must_terminate ); char *answer = NULL; - // TODO: nn_recv should be called with NN_DONTWAIT, - // but only if the main loop of gmail says no events are - // pending. We have to handle exit clients, out and error - // pipes here.. int bytes = nn_recv( coordinator_sock, &answer, NN_MSG, 0 ); if( coordinator_must_terminate ) continue; if( bytes >= 0 ) { @@ -410,7 +406,8 @@ int coordinator_free( ) return 0; } -int coordinator_add_worker( const char *name, worker_execution_mode_t mode, const char *command ) +int coordinator_add_worker( const char *name, worker_execution_mode_t mode, const char *command, + const char *control, const char *data ) { if( nof_workers >= MAX_WORKERS ) { fprintf( stderr, "Can't define more workers, limit reached!\n" ); @@ -428,6 +425,9 @@ int coordinator_add_worker( const char *name, worker_execution_mode_t mode, cons w->state = WORKER_STATE_STOPPED; w->execution_data = NULL; + w->control = control; + w->data = data; + nof_workers++; return 0; diff --git a/src/coordinator.h b/src/coordinator.h index d67a3b6..a2a743c 100644 --- a/src/coordinator.h +++ b/src/coordinator.h @@ -24,6 +24,7 @@ typedef struct { int coordinator_init( const char *control ); void coordinator_terminate( ); int coordinator_free( ); -int coordinator_add_worker( const char *name, worker_execution_mode_t mode, const char *command ); +int coordinator_add_worker( const char *name, worker_execution_mode_t mode, const char *command, + const char *control, const char *data ); #endif diff --git a/src/master.c b/src/master.c index 9a35a73..d1f002d 100644 --- a/src/master.c +++ b/src/master.c @@ -4,6 +4,7 @@ #include "nanomsg/nn.h" #include "nanomsg/survey.h" +#include "nanomsg/pipeline.h" #include "json-c/json.h" @@ -11,7 +12,8 @@ #include <string.h> static pthread_t master_thread; -static int master_sock; +static int control_sock; +static int data_sock; static int master_must_terminate = 0; coordinator_t coordinator[MAX_COORDINATORS]; @@ -278,34 +280,52 @@ static char *get_next_request( ) } } +typedef struct { + const char *control; + const char *data; +} master_thread_data_t; + +static master_thread_data_t master_thread_data; + static void *master_func( void *thread_data ) { - char *control = (char *)thread_data; + master_thread_data_t *tdata = (master_thread_data_t *)thread_data; + const char *control = tdata->control; + const char *data = tdata->data; pthread_mutex_init( &queued_command_mutex, NULL ); - if( ( master_sock = nn_socket( AF_SP, NN_SURVEYOR ) ) < 0 ) { + if( ( control_sock = nn_socket( AF_SP, NN_SURVEYOR ) ) < 0 ) { fprintf( stderr, "master, nn_socket( AF_SP, NN_SURVEYOR ) error: %s (%d)\n", strerror( errno ), errno ); abort( ); } int deadline = 5000; - if( nn_setsockopt( master_sock, NN_SURVEYOR, NN_SURVEYOR_DEADLINE, &deadline, sizeof( deadline ) ) < 0 ) { + if( nn_setsockopt( control_sock, NN_SURVEYOR, NN_SURVEYOR_DEADLINE, &deadline, sizeof( deadline ) ) < 0 ) { fprintf( stderr, "master, nn_setsockopt error: %s (%d)\n", strerror( errno ), errno ); abort( ); } - (void)nn_bind( master_sock, control ); + (void)nn_bind( control_sock, control ); + + printf( "master connected to control channel '%s'\n", control ); + + if( ( data_sock = nn_socket( AF_SP, NN_PULL ) ) < 0 ) { + fprintf( stderr, "master, nn_socket( AF_SP, NN_PULL ) error: %s (%d)\n", strerror( errno ), errno ); + abort( ); + } - printf( "master connected to %s\n", control ); + (void)nn_bind( data_sock, data ); + printf( "master connected to data channel '%s'\n", data ); + NEXT_COMMAND: ; char *msg = get_next_request( ); int msg_size = strlen( msg ) + 1; printf( "master send: %s\n", msg ); - int bytes = nn_send( master_sock, msg, msg_size /* NN_MSG */, 0 ); + int bytes = nn_send( control_sock, msg, msg_size /* NN_MSG */, 0 ); if( bytes < 0 ) { if( errno == ETERM ) { master_must_terminate = 1; @@ -325,8 +345,32 @@ NEXT_COMMAND: recompute_coordinator_states( ); + struct nn_pollfd pollfd[2]; + pollfd[0].fd = control_sock; + pollfd[0].events = NN_POLLIN; + pollfd[1].fd = data_sock; + pollfd[1].events = NN_POLLIN; + int rc = nn_poll( pollfd, 2, 5000 ); + if( rc == 0 ) { + // timeout, next iteration + goto NEXT_COMMAND; + } else if( rc < 0 ) { + fprintf( stderr, "master, nn_poll error: %s (%d)\n", strerror( errno ), errno ); + abort( ); + } + + int sock; + + if( pollfd[1].revents & NN_POLLIN ) { + sock = data_sock; + } + + if( pollfd[0].revents & NN_POLLIN ) { + sock = control_sock; + } + char *answer = NULL; - bytes = nn_recv( master_sock, &answer, NN_MSG, 0 ); + bytes = nn_recv( sock, &answer, NN_MSG, 0 ); if( master_must_terminate ) continue; if( bytes >= 0 ) { json_object *recv_obj = json_tokener_parse( answer ); @@ -347,6 +391,18 @@ NEXT_COMMAND: continue; } } + } else if( strcmp( op, "output" ) == 0 ) { + json_object *worker_obj; + json_object_object_get_ex( recv_obj, "worker", &worker_obj ); + const char *worker = json_object_get_string( worker_obj ); + json_object *msg_obj; + json_object_object_get_ex( recv_obj, "msg", &msg_obj ); + const char *msg = json_object_get_string( msg_obj ); + + printf( "Got message from worker '%s': %s\n", worker, msg ); + + } else { + fprintf( stderr, "WARNING: master received unkown message: %s\n", answer ); } json_object_put( recv_obj ); @@ -369,14 +425,15 @@ NEXT_COMMAND: } } - (void)nn_shutdown( master_sock, 0 ); + (void)nn_shutdown( data_sock, 0 ); + (void)nn_shutdown( control_sock, 0 ); puts( "master disconnected" ); return NULL; } -int master_init( const char *control ) +int master_init( const char *control, const char *data ) { pthread_attr_t attr; int res; @@ -390,7 +447,10 @@ int master_init( const char *control ) return 1; } - res = pthread_create( &master_thread, &attr, master_func, (void *)control ); + master_thread_data.control = control; + master_thread_data.data = data; + + res = pthread_create( &master_thread, &attr, master_func, (void *)&master_thread_data ); if( res != 0 ) { return 1; } diff --git a/src/master.h b/src/master.h index 60d1061..a19dd8d 100644 --- a/src/master.h +++ b/src/master.h @@ -4,7 +4,7 @@ #include "port.h" #include "coordinator.h" -int master_init( const char *control ); +int master_init( const char *control, const char *data ); void master_terminate( int terminate_nano_msg ); int master_free( ); diff --git a/src/worker.c b/src/worker.c index 03be16c..e635589 100644 --- a/src/worker.c +++ b/src/worker.c @@ -1,6 +1,7 @@ #include "worker.h" #include "nanomsg/nn.h" +#include "nanomsg/pipeline.h" #include "port.h" @@ -14,6 +15,8 @@ #include <signal.h> +#include "json-c/json.h" + const char *worker_exection_mode_str( worker_execution_mode_t mode ) { switch( mode ) { @@ -64,7 +67,7 @@ typedef struct { worker_t *worker; pthread_t thread; GMainLoop *main_loop; - + int sock; } direct_glib_execution_worker_data_t; static void watch_child( GPid pid, gint status, gpointer *data ) @@ -78,12 +81,40 @@ static void watch_child( GPid pid, gint status, gpointer *data ) g_main_loop_quit( wed->main_loop ); } +static char *create_worker_answer( const char *name, const char *s, size_t len, bool is_stdout ) +{ + json_object *obj = json_object_new_object( ); + json_object *op = json_object_new_string( "output" ); + json_object_object_add( obj, "op", op ); + + /* we are a worker */ + json_object *role = json_object_new_string( "worker" ); + json_object_object_add( obj, "role", role ); + + json_object *name_obj = json_object_new_string( name ); + json_object_object_add( obj, "worker", name_obj ); + + json_object *msg_obj = json_object_new_string_len( s, len ); + json_object_object_add( obj, "msg", msg_obj ); + + json_object *is_stdout_obj = json_object_new_boolean( is_stdout ); + json_object_object_add( obj, "stdout", is_stdout_obj ); + + /* produce message as string, caller must free it */ + const char *msg = json_object_to_json_string( obj ); + char *res = strdup( msg ); + + json_object_put( obj ); + + return res; +} + static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpointer *data ) { direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data; worker_t *worker = wed->worker; - bool isStdout = ( wed->out_channel == channel ); + bool is_stdout = ( wed->out_channel == channel ); gchar *buf; gsize len; @@ -94,10 +125,23 @@ static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpoin } g_io_channel_read_line( channel, &buf, &len, NULL, NULL ); + + char *msg = create_worker_answer( worker->name, buf, len, is_stdout ); + size_t msg_size = strlen( msg ) + 1; - fprintf( stderr, "worker %s %s: %*s\n", worker->name, - isStdout ? "stdout" : "stderr", len, buf ); + printf( "worker send: %s\n", msg ); + int bytes = nn_send( wed->sock, msg, msg_size, 0 ); + if( bytes < 0 ) { + fprintf( stderr, "ERROR: worker '%s' nn_send returned %d: %s (%d)\n", + worker->name, bytes, nn_strerror( errno ), errno ); + } + if( bytes != msg_size ) { + fprintf( stderr, "ERROR: worker '%s' truncated message!", + worker->name ); + } + + free( msg ); g_free( buf ); return TRUE; @@ -117,6 +161,12 @@ static void *worker_func( void *thread_data ) g_strfreev( args ); return NULL; } + + wed->sock = nn_socket( AF_SP, NN_PUSH ); + + (void)nn_connect( wed->sock, worker->data ); + + printf( "worker connected to data channel '%s'\n", worker->data ); error = NULL; gboolean ret = g_spawn_async_with_pipes( NULL, @@ -157,6 +207,10 @@ static void *worker_func( void *thread_data ) worker->state = WORKER_STATE_STOPPED; + (void)nn_shutdown( wed->sock, 0 ); + + puts( "worker disconnected" ); + free( worker->execution_data ); worker->execution_data = NULL; diff --git a/src/worker.h b/src/worker.h index b147e16..a5f0ce9 100644 --- a/src/worker.h +++ b/src/worker.h @@ -19,6 +19,8 @@ typedef struct { char *command; worker_state_t state; worker_execution_data_t execution_data; + const char *control; + const char *data; } worker_t; const char *worker_exection_mode_str( worker_execution_mode_t mode ); |