summaryrefslogtreecommitdiff
path: root/src/master.c
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2014-12-05 15:18:48 +0100
committerAndreas Baumann <mail@andreasbaumann.cc>2014-12-05 15:18:48 +0100
commit67ba87a159fb55196092e59c041af6d69ce07e75 (patch)
tree10a37a3338647dce6c83a474b2ed4eb10612c91c /src/master.c
parent7dae7024079fd2c3bec178cd92f47ac993119176 (diff)
downloadbiruda-67ba87a159fb55196092e59c041af6d69ce07e75.tar.gz
biruda-67ba87a159fb55196092e59c041af6d69ce07e75.tar.bz2
added output message for worker output sent to master, master prints only debug message for now
Diffstat (limited to 'src/master.c')
-rw-r--r--src/master.c82
1 files changed, 71 insertions, 11 deletions
diff --git a/src/master.c b/src/master.c
index 9a35a73..d1f002d 100644
--- a/src/master.c
+++ b/src/master.c
@@ -4,6 +4,7 @@
#include "nanomsg/nn.h"
#include "nanomsg/survey.h"
+#include "nanomsg/pipeline.h"
#include "json-c/json.h"
@@ -11,7 +12,8 @@
#include <string.h>
static pthread_t master_thread;
-static int master_sock;
+static int control_sock;
+static int data_sock;
static int master_must_terminate = 0;
coordinator_t coordinator[MAX_COORDINATORS];
@@ -278,34 +280,52 @@ static char *get_next_request( )
}
}
+typedef struct {
+ const char *control;
+ const char *data;
+} master_thread_data_t;
+
+static master_thread_data_t master_thread_data;
+
static void *master_func( void *thread_data )
{
- char *control = (char *)thread_data;
+ master_thread_data_t *tdata = (master_thread_data_t *)thread_data;
+ const char *control = tdata->control;
+ const char *data = tdata->data;
pthread_mutex_init( &queued_command_mutex, NULL );
- if( ( master_sock = nn_socket( AF_SP, NN_SURVEYOR ) ) < 0 ) {
+ if( ( control_sock = nn_socket( AF_SP, NN_SURVEYOR ) ) < 0 ) {
fprintf( stderr, "master, nn_socket( AF_SP, NN_SURVEYOR ) error: %s (%d)\n", strerror( errno ), errno );
abort( );
}
int deadline = 5000;
- if( nn_setsockopt( master_sock, NN_SURVEYOR, NN_SURVEYOR_DEADLINE, &deadline, sizeof( deadline ) ) < 0 ) {
+ if( nn_setsockopt( control_sock, NN_SURVEYOR, NN_SURVEYOR_DEADLINE, &deadline, sizeof( deadline ) ) < 0 ) {
fprintf( stderr, "master, nn_setsockopt error: %s (%d)\n", strerror( errno ), errno );
abort( );
}
- (void)nn_bind( master_sock, control );
+ (void)nn_bind( control_sock, control );
+
+ printf( "master connected to control channel '%s'\n", control );
+
+ if( ( data_sock = nn_socket( AF_SP, NN_PULL ) ) < 0 ) {
+ fprintf( stderr, "master, nn_socket( AF_SP, NN_PULL ) error: %s (%d)\n", strerror( errno ), errno );
+ abort( );
+ }
- printf( "master connected to %s\n", control );
+ (void)nn_bind( data_sock, data );
+ printf( "master connected to data channel '%s'\n", data );
+
NEXT_COMMAND:
;
char *msg = get_next_request( );
int msg_size = strlen( msg ) + 1;
printf( "master send: %s\n", msg );
- int bytes = nn_send( master_sock, msg, msg_size /* NN_MSG */, 0 );
+ int bytes = nn_send( control_sock, msg, msg_size /* NN_MSG */, 0 );
if( bytes < 0 ) {
if( errno == ETERM ) {
master_must_terminate = 1;
@@ -325,8 +345,32 @@ NEXT_COMMAND:
recompute_coordinator_states( );
+ struct nn_pollfd pollfd[2];
+ pollfd[0].fd = control_sock;
+ pollfd[0].events = NN_POLLIN;
+ pollfd[1].fd = data_sock;
+ pollfd[1].events = NN_POLLIN;
+ int rc = nn_poll( pollfd, 2, 5000 );
+ if( rc == 0 ) {
+ // timeout, next iteration
+ goto NEXT_COMMAND;
+ } else if( rc < 0 ) {
+ fprintf( stderr, "master, nn_poll error: %s (%d)\n", strerror( errno ), errno );
+ abort( );
+ }
+
+ int sock;
+
+ if( pollfd[1].revents & NN_POLLIN ) {
+ sock = data_sock;
+ }
+
+ if( pollfd[0].revents & NN_POLLIN ) {
+ sock = control_sock;
+ }
+
char *answer = NULL;
- bytes = nn_recv( master_sock, &answer, NN_MSG, 0 );
+ bytes = nn_recv( sock, &answer, NN_MSG, 0 );
if( master_must_terminate ) continue;
if( bytes >= 0 ) {
json_object *recv_obj = json_tokener_parse( answer );
@@ -347,6 +391,18 @@ NEXT_COMMAND:
continue;
}
}
+ } else if( strcmp( op, "output" ) == 0 ) {
+ json_object *worker_obj;
+ json_object_object_get_ex( recv_obj, "worker", &worker_obj );
+ const char *worker = json_object_get_string( worker_obj );
+ json_object *msg_obj;
+ json_object_object_get_ex( recv_obj, "msg", &msg_obj );
+ const char *msg = json_object_get_string( msg_obj );
+
+ printf( "Got message from worker '%s': %s\n", worker, msg );
+
+ } else {
+ fprintf( stderr, "WARNING: master received unkown message: %s\n", answer );
}
json_object_put( recv_obj );
@@ -369,14 +425,15 @@ NEXT_COMMAND:
}
}
- (void)nn_shutdown( master_sock, 0 );
+ (void)nn_shutdown( data_sock, 0 );
+ (void)nn_shutdown( control_sock, 0 );
puts( "master disconnected" );
return NULL;
}
-int master_init( const char *control )
+int master_init( const char *control, const char *data )
{
pthread_attr_t attr;
int res;
@@ -390,7 +447,10 @@ int master_init( const char *control )
return 1;
}
- res = pthread_create( &master_thread, &attr, master_func, (void *)control );
+ master_thread_data.control = control;
+ master_thread_data.data = data;
+
+ res = pthread_create( &master_thread, &attr, master_func, (void *)&master_thread_data );
if( res != 0 ) {
return 1;
}