summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2014-12-05 15:18:48 +0100
committerAndreas Baumann <mail@andreasbaumann.cc>2014-12-05 15:18:48 +0100
commit67ba87a159fb55196092e59c041af6d69ce07e75 (patch)
tree10a37a3338647dce6c83a474b2ed4eb10612c91c
parent7dae7024079fd2c3bec178cd92f47ac993119176 (diff)
downloadbiruda-67ba87a159fb55196092e59c041af6d69ce07e75.tar.gz
biruda-67ba87a159fb55196092e59c041af6d69ce07e75.tar.bz2
added output message for worker output sent to master, master prints only debug message for now
-rw-r--r--src/biruda.c7
-rw-r--r--src/coordinator.c10
-rw-r--r--src/coordinator.h3
-rw-r--r--src/master.c82
-rw-r--r--src/master.h2
-rw-r--r--src/worker.c62
-rw-r--r--src/worker.h2
7 files changed, 144 insertions, 24 deletions
diff --git a/src/biruda.c b/src/biruda.c
index 834b72f..3fae736 100644
--- a/src/biruda.c
+++ b/src/biruda.c
@@ -268,8 +268,9 @@ static int create_master( cfg_t *cfg )
{
cfg_t *master_cfg = cfg_getnsec( cfg, "master", 0 );
char *control = cfg_getstr( master_cfg, "control" );
+ char *data = cfg_getstr( master_cfg, "data" );
- return master_init( control );
+ return master_init( control, data );
}
static int create_coordinator( cfg_t *cfg )
@@ -285,7 +286,9 @@ static int create_coordinator( cfg_t *cfg )
cfg_t *worker_cfg = cfg_getnsec( cfg, "worker", i );
coordinator_add_worker( cfg_title( worker_cfg ),
(worker_execution_mode_t)cfg_getint( worker_cfg, "execution" ),
- cfg_getstr( worker_cfg, "command" ) );
+ cfg_getstr( worker_cfg, "command" ),
+ cfg_getstr( worker_cfg, "control" ),
+ cfg_getstr( worker_cfg, "data" ) );
}
}
diff --git a/src/coordinator.c b/src/coordinator.c
index 1366d88..b1412e0 100644
--- a/src/coordinator.c
+++ b/src/coordinator.c
@@ -235,10 +235,6 @@ static void *coordinator_func( void *thread_data )
printf( "coordinator idle: %d\n", coordinator_must_terminate );
char *answer = NULL;
- // TODO: nn_recv should be called with NN_DONTWAIT,
- // but only if the main loop of gmail says no events are
- // pending. We have to handle exit clients, out and error
- // pipes here..
int bytes = nn_recv( coordinator_sock, &answer, NN_MSG, 0 );
if( coordinator_must_terminate ) continue;
if( bytes >= 0 ) {
@@ -410,7 +406,8 @@ int coordinator_free( )
return 0;
}
-int coordinator_add_worker( const char *name, worker_execution_mode_t mode, const char *command )
+int coordinator_add_worker( const char *name, worker_execution_mode_t mode, const char *command,
+ const char *control, const char *data )
{
if( nof_workers >= MAX_WORKERS ) {
fprintf( stderr, "Can't define more workers, limit reached!\n" );
@@ -428,6 +425,9 @@ int coordinator_add_worker( const char *name, worker_execution_mode_t mode, cons
w->state = WORKER_STATE_STOPPED;
w->execution_data = NULL;
+ w->control = control;
+ w->data = data;
+
nof_workers++;
return 0;
diff --git a/src/coordinator.h b/src/coordinator.h
index d67a3b6..a2a743c 100644
--- a/src/coordinator.h
+++ b/src/coordinator.h
@@ -24,6 +24,7 @@ typedef struct {
int coordinator_init( const char *control );
void coordinator_terminate( );
int coordinator_free( );
-int coordinator_add_worker( const char *name, worker_execution_mode_t mode, const char *command );
+int coordinator_add_worker( const char *name, worker_execution_mode_t mode, const char *command,
+ const char *control, const char *data );
#endif
diff --git a/src/master.c b/src/master.c
index 9a35a73..d1f002d 100644
--- a/src/master.c
+++ b/src/master.c
@@ -4,6 +4,7 @@
#include "nanomsg/nn.h"
#include "nanomsg/survey.h"
+#include "nanomsg/pipeline.h"
#include "json-c/json.h"
@@ -11,7 +12,8 @@
#include <string.h>
static pthread_t master_thread;
-static int master_sock;
+static int control_sock;
+static int data_sock;
static int master_must_terminate = 0;
coordinator_t coordinator[MAX_COORDINATORS];
@@ -278,34 +280,52 @@ static char *get_next_request( )
}
}
+typedef struct {
+ const char *control;
+ const char *data;
+} master_thread_data_t;
+
+static master_thread_data_t master_thread_data;
+
static void *master_func( void *thread_data )
{
- char *control = (char *)thread_data;
+ master_thread_data_t *tdata = (master_thread_data_t *)thread_data;
+ const char *control = tdata->control;
+ const char *data = tdata->data;
pthread_mutex_init( &queued_command_mutex, NULL );
- if( ( master_sock = nn_socket( AF_SP, NN_SURVEYOR ) ) < 0 ) {
+ if( ( control_sock = nn_socket( AF_SP, NN_SURVEYOR ) ) < 0 ) {
fprintf( stderr, "master, nn_socket( AF_SP, NN_SURVEYOR ) error: %s (%d)\n", strerror( errno ), errno );
abort( );
}
int deadline = 5000;
- if( nn_setsockopt( master_sock, NN_SURVEYOR, NN_SURVEYOR_DEADLINE, &deadline, sizeof( deadline ) ) < 0 ) {
+ if( nn_setsockopt( control_sock, NN_SURVEYOR, NN_SURVEYOR_DEADLINE, &deadline, sizeof( deadline ) ) < 0 ) {
fprintf( stderr, "master, nn_setsockopt error: %s (%d)\n", strerror( errno ), errno );
abort( );
}
- (void)nn_bind( master_sock, control );
+ (void)nn_bind( control_sock, control );
+
+ printf( "master connected to control channel '%s'\n", control );
+
+ if( ( data_sock = nn_socket( AF_SP, NN_PULL ) ) < 0 ) {
+ fprintf( stderr, "master, nn_socket( AF_SP, NN_PULL ) error: %s (%d)\n", strerror( errno ), errno );
+ abort( );
+ }
- printf( "master connected to %s\n", control );
+ (void)nn_bind( data_sock, data );
+ printf( "master connected to data channel '%s'\n", data );
+
NEXT_COMMAND:
;
char *msg = get_next_request( );
int msg_size = strlen( msg ) + 1;
printf( "master send: %s\n", msg );
- int bytes = nn_send( master_sock, msg, msg_size /* NN_MSG */, 0 );
+ int bytes = nn_send( control_sock, msg, msg_size /* NN_MSG */, 0 );
if( bytes < 0 ) {
if( errno == ETERM ) {
master_must_terminate = 1;
@@ -325,8 +345,32 @@ NEXT_COMMAND:
recompute_coordinator_states( );
+ struct nn_pollfd pollfd[2];
+ pollfd[0].fd = control_sock;
+ pollfd[0].events = NN_POLLIN;
+ pollfd[1].fd = data_sock;
+ pollfd[1].events = NN_POLLIN;
+ int rc = nn_poll( pollfd, 2, 5000 );
+ if( rc == 0 ) {
+ // timeout, next iteration
+ goto NEXT_COMMAND;
+ } else if( rc < 0 ) {
+ fprintf( stderr, "master, nn_poll error: %s (%d)\n", strerror( errno ), errno );
+ abort( );
+ }
+
+ int sock;
+
+ if( pollfd[1].revents & NN_POLLIN ) {
+ sock = data_sock;
+ }
+
+ if( pollfd[0].revents & NN_POLLIN ) {
+ sock = control_sock;
+ }
+
char *answer = NULL;
- bytes = nn_recv( master_sock, &answer, NN_MSG, 0 );
+ bytes = nn_recv( sock, &answer, NN_MSG, 0 );
if( master_must_terminate ) continue;
if( bytes >= 0 ) {
json_object *recv_obj = json_tokener_parse( answer );
@@ -347,6 +391,18 @@ NEXT_COMMAND:
continue;
}
}
+ } else if( strcmp( op, "output" ) == 0 ) {
+ json_object *worker_obj;
+ json_object_object_get_ex( recv_obj, "worker", &worker_obj );
+ const char *worker = json_object_get_string( worker_obj );
+ json_object *msg_obj;
+ json_object_object_get_ex( recv_obj, "msg", &msg_obj );
+ const char *msg = json_object_get_string( msg_obj );
+
+ printf( "Got message from worker '%s': %s\n", worker, msg );
+
+ } else {
+ fprintf( stderr, "WARNING: master received unkown message: %s\n", answer );
}
json_object_put( recv_obj );
@@ -369,14 +425,15 @@ NEXT_COMMAND:
}
}
- (void)nn_shutdown( master_sock, 0 );
+ (void)nn_shutdown( data_sock, 0 );
+ (void)nn_shutdown( control_sock, 0 );
puts( "master disconnected" );
return NULL;
}
-int master_init( const char *control )
+int master_init( const char *control, const char *data )
{
pthread_attr_t attr;
int res;
@@ -390,7 +447,10 @@ int master_init( const char *control )
return 1;
}
- res = pthread_create( &master_thread, &attr, master_func, (void *)control );
+ master_thread_data.control = control;
+ master_thread_data.data = data;
+
+ res = pthread_create( &master_thread, &attr, master_func, (void *)&master_thread_data );
if( res != 0 ) {
return 1;
}
diff --git a/src/master.h b/src/master.h
index 60d1061..a19dd8d 100644
--- a/src/master.h
+++ b/src/master.h
@@ -4,7 +4,7 @@
#include "port.h"
#include "coordinator.h"
-int master_init( const char *control );
+int master_init( const char *control, const char *data );
void master_terminate( int terminate_nano_msg );
int master_free( );
diff --git a/src/worker.c b/src/worker.c
index 03be16c..e635589 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -1,6 +1,7 @@
#include "worker.h"
#include "nanomsg/nn.h"
+#include "nanomsg/pipeline.h"
#include "port.h"
@@ -14,6 +15,8 @@
#include <signal.h>
+#include "json-c/json.h"
+
const char *worker_exection_mode_str( worker_execution_mode_t mode )
{
switch( mode ) {
@@ -64,7 +67,7 @@ typedef struct {
worker_t *worker;
pthread_t thread;
GMainLoop *main_loop;
-
+ int sock;
} direct_glib_execution_worker_data_t;
static void watch_child( GPid pid, gint status, gpointer *data )
@@ -78,12 +81,40 @@ static void watch_child( GPid pid, gint status, gpointer *data )
g_main_loop_quit( wed->main_loop );
}
+static char *create_worker_answer( const char *name, const char *s, size_t len, bool is_stdout )
+{
+ json_object *obj = json_object_new_object( );
+ json_object *op = json_object_new_string( "output" );
+ json_object_object_add( obj, "op", op );
+
+ /* we are a worker */
+ json_object *role = json_object_new_string( "worker" );
+ json_object_object_add( obj, "role", role );
+
+ json_object *name_obj = json_object_new_string( name );
+ json_object_object_add( obj, "worker", name_obj );
+
+ json_object *msg_obj = json_object_new_string_len( s, len );
+ json_object_object_add( obj, "msg", msg_obj );
+
+ json_object *is_stdout_obj = json_object_new_boolean( is_stdout );
+ json_object_object_add( obj, "stdout", is_stdout_obj );
+
+ /* produce message as string, caller must free it */
+ const char *msg = json_object_to_json_string( obj );
+ char *res = strdup( msg );
+
+ json_object_put( obj );
+
+ return res;
+}
+
static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpointer *data )
{
direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data;
worker_t *worker = wed->worker;
- bool isStdout = ( wed->out_channel == channel );
+ bool is_stdout = ( wed->out_channel == channel );
gchar *buf;
gsize len;
@@ -94,10 +125,23 @@ static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpoin
}
g_io_channel_read_line( channel, &buf, &len, NULL, NULL );
+
+ char *msg = create_worker_answer( worker->name, buf, len, is_stdout );
+ size_t msg_size = strlen( msg ) + 1;
- fprintf( stderr, "worker %s %s: %*s\n", worker->name,
- isStdout ? "stdout" : "stderr", len, buf );
+ printf( "worker send: %s\n", msg );
+ int bytes = nn_send( wed->sock, msg, msg_size, 0 );
+ if( bytes < 0 ) {
+ fprintf( stderr, "ERROR: worker '%s' nn_send returned %d: %s (%d)\n",
+ worker->name, bytes, nn_strerror( errno ), errno );
+ }
+ if( bytes != msg_size ) {
+ fprintf( stderr, "ERROR: worker '%s' truncated message!",
+ worker->name );
+ }
+
+ free( msg );
g_free( buf );
return TRUE;
@@ -117,6 +161,12 @@ static void *worker_func( void *thread_data )
g_strfreev( args );
return NULL;
}
+
+ wed->sock = nn_socket( AF_SP, NN_PUSH );
+
+ (void)nn_connect( wed->sock, worker->data );
+
+ printf( "worker connected to data channel '%s'\n", worker->data );
error = NULL;
gboolean ret = g_spawn_async_with_pipes( NULL,
@@ -157,6 +207,10 @@ static void *worker_func( void *thread_data )
worker->state = WORKER_STATE_STOPPED;
+ (void)nn_shutdown( wed->sock, 0 );
+
+ puts( "worker disconnected" );
+
free( worker->execution_data );
worker->execution_data = NULL;
diff --git a/src/worker.h b/src/worker.h
index b147e16..a5f0ce9 100644
--- a/src/worker.h
+++ b/src/worker.h
@@ -19,6 +19,8 @@ typedef struct {
char *command;
worker_state_t state;
worker_execution_data_t execution_data;
+ const char *control;
+ const char *data;
} worker_t;
const char *worker_exection_mode_str( worker_execution_mode_t mode );