diff options
author | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-05 15:18:48 +0100 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-05 15:18:48 +0100 |
commit | 67ba87a159fb55196092e59c041af6d69ce07e75 (patch) | |
tree | 10a37a3338647dce6c83a474b2ed4eb10612c91c /src/master.c | |
parent | 7dae7024079fd2c3bec178cd92f47ac993119176 (diff) | |
download | biruda-67ba87a159fb55196092e59c041af6d69ce07e75.tar.gz biruda-67ba87a159fb55196092e59c041af6d69ce07e75.tar.bz2 |
added output message for worker output sent to master, master prints only debug message for now
Diffstat (limited to 'src/master.c')
-rw-r--r-- | src/master.c | 82 |
1 files changed, 71 insertions, 11 deletions
diff --git a/src/master.c b/src/master.c index 9a35a73..d1f002d 100644 --- a/src/master.c +++ b/src/master.c @@ -4,6 +4,7 @@ #include "nanomsg/nn.h" #include "nanomsg/survey.h" +#include "nanomsg/pipeline.h" #include "json-c/json.h" @@ -11,7 +12,8 @@ #include <string.h> static pthread_t master_thread; -static int master_sock; +static int control_sock; +static int data_sock; static int master_must_terminate = 0; coordinator_t coordinator[MAX_COORDINATORS]; @@ -278,34 +280,52 @@ static char *get_next_request( ) } } +typedef struct { + const char *control; + const char *data; +} master_thread_data_t; + +static master_thread_data_t master_thread_data; + static void *master_func( void *thread_data ) { - char *control = (char *)thread_data; + master_thread_data_t *tdata = (master_thread_data_t *)thread_data; + const char *control = tdata->control; + const char *data = tdata->data; pthread_mutex_init( &queued_command_mutex, NULL ); - if( ( master_sock = nn_socket( AF_SP, NN_SURVEYOR ) ) < 0 ) { + if( ( control_sock = nn_socket( AF_SP, NN_SURVEYOR ) ) < 0 ) { fprintf( stderr, "master, nn_socket( AF_SP, NN_SURVEYOR ) error: %s (%d)\n", strerror( errno ), errno ); abort( ); } int deadline = 5000; - if( nn_setsockopt( master_sock, NN_SURVEYOR, NN_SURVEYOR_DEADLINE, &deadline, sizeof( deadline ) ) < 0 ) { + if( nn_setsockopt( control_sock, NN_SURVEYOR, NN_SURVEYOR_DEADLINE, &deadline, sizeof( deadline ) ) < 0 ) { fprintf( stderr, "master, nn_setsockopt error: %s (%d)\n", strerror( errno ), errno ); abort( ); } - (void)nn_bind( master_sock, control ); + (void)nn_bind( control_sock, control ); + + printf( "master connected to control channel '%s'\n", control ); + + if( ( data_sock = nn_socket( AF_SP, NN_PULL ) ) < 0 ) { + fprintf( stderr, "master, nn_socket( AF_SP, NN_PULL ) error: %s (%d)\n", strerror( errno ), errno ); + abort( ); + } - printf( "master connected to %s\n", control ); + (void)nn_bind( data_sock, data ); + printf( "master connected to data channel '%s'\n", data ); + NEXT_COMMAND: ; char *msg = get_next_request( ); int msg_size = strlen( msg ) + 1; printf( "master send: %s\n", msg ); - int bytes = nn_send( master_sock, msg, msg_size /* NN_MSG */, 0 ); + int bytes = nn_send( control_sock, msg, msg_size /* NN_MSG */, 0 ); if( bytes < 0 ) { if( errno == ETERM ) { master_must_terminate = 1; @@ -325,8 +345,32 @@ NEXT_COMMAND: recompute_coordinator_states( ); + struct nn_pollfd pollfd[2]; + pollfd[0].fd = control_sock; + pollfd[0].events = NN_POLLIN; + pollfd[1].fd = data_sock; + pollfd[1].events = NN_POLLIN; + int rc = nn_poll( pollfd, 2, 5000 ); + if( rc == 0 ) { + // timeout, next iteration + goto NEXT_COMMAND; + } else if( rc < 0 ) { + fprintf( stderr, "master, nn_poll error: %s (%d)\n", strerror( errno ), errno ); + abort( ); + } + + int sock; + + if( pollfd[1].revents & NN_POLLIN ) { + sock = data_sock; + } + + if( pollfd[0].revents & NN_POLLIN ) { + sock = control_sock; + } + char *answer = NULL; - bytes = nn_recv( master_sock, &answer, NN_MSG, 0 ); + bytes = nn_recv( sock, &answer, NN_MSG, 0 ); if( master_must_terminate ) continue; if( bytes >= 0 ) { json_object *recv_obj = json_tokener_parse( answer ); @@ -347,6 +391,18 @@ NEXT_COMMAND: continue; } } + } else if( strcmp( op, "output" ) == 0 ) { + json_object *worker_obj; + json_object_object_get_ex( recv_obj, "worker", &worker_obj ); + const char *worker = json_object_get_string( worker_obj ); + json_object *msg_obj; + json_object_object_get_ex( recv_obj, "msg", &msg_obj ); + const char *msg = json_object_get_string( msg_obj ); + + printf( "Got message from worker '%s': %s\n", worker, msg ); + + } else { + fprintf( stderr, "WARNING: master received unkown message: %s\n", answer ); } json_object_put( recv_obj ); @@ -369,14 +425,15 @@ NEXT_COMMAND: } } - (void)nn_shutdown( master_sock, 0 ); + (void)nn_shutdown( data_sock, 0 ); + (void)nn_shutdown( control_sock, 0 ); puts( "master disconnected" ); return NULL; } -int master_init( const char *control ) +int master_init( const char *control, const char *data ) { pthread_attr_t attr; int res; @@ -390,7 +447,10 @@ int master_init( const char *control ) return 1; } - res = pthread_create( &master_thread, &attr, master_func, (void *)control ); + master_thread_data.control = control; + master_thread_data.data = data; + + res = pthread_create( &master_thread, &attr, master_func, (void *)&master_thread_data ); if( res != 0 ) { return 1; } |