diff options
author | Andreas Baumann <mail@andreasbaumann.cc> | 2014-09-14 09:24:10 +0200 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2014-09-14 09:24:10 +0200 |
commit | 12b41ea402aef5b932eb0ba2c123ecd94b48f44f (patch) | |
tree | 7397930967105dd55cb10f05e61dcc1cb7a8fe04 | |
parent | 5257e805a9499229af234180a9d68104a563cf80 (diff) | |
download | biruda-12b41ea402aef5b932eb0ba2c123ecd94b48f44f.tar.gz biruda-12b41ea402aef5b932eb0ba2c123ecd94b48f44f.tar.bz2 |
got a primitive discovery test running
-rw-r--r-- | src/biruda.c | 18 | ||||
-rw-r--r-- | src/biruda.conf | 8 | ||||
-rw-r--r-- | src/coordinator.c | 97 | ||||
-rw-r--r-- | src/coordinator.h | 3 | ||||
-rw-r--r-- | src/master.c | 31 |
5 files changed, 144 insertions, 13 deletions
diff --git a/src/biruda.c b/src/biruda.c index 6e8a932..f013712 100644 --- a/src/biruda.c +++ b/src/biruda.c @@ -131,6 +131,14 @@ static int create_master( cfg_t *cfg ) return master_init( control ); } +static int create_coordinator( cfg_t *cfg ) +{ + cfg_t *master_cfg = cfg_getnsec( cfg, "coordinator", 0 ); + char *control = cfg_getstr( master_cfg, "control" ); + + return coordinator_init( control ); +} + int main( int argc, char *argv[] ) { struct gengetopt_args_info args_info; @@ -159,6 +167,13 @@ int main( int argc, char *argv[] ) cmdline_parser_free( &args_info ); return( ( test_config( &args_info ) == 0 ) ? EXIT_SUCCESS : EXIT_FAILURE ); } + + if( create_coordinator( cfg ) != 0 ) { + fprintf( stderr, "FATAL: Unable to create coordinator thread!\n" ); + cfg_free( cfg ); + cmdline_parser_free( &args_info ); + exit( EXIT_FAILURE ); + } if( create_master( cfg ) != 0 ) { fprintf( stderr, "FATAL: Unable to create master thread!\n" ); @@ -166,9 +181,10 @@ int main( int argc, char *argv[] ) cmdline_parser_free( &args_info ); exit( EXIT_FAILURE ); } - + sleep( 4 ); + coordinator_free( ); master_free( ); cfg_free( cfg ); diff --git a/src/biruda.conf b/src/biruda.conf index b5ec9d2..6b22e7d 100644 --- a/src/biruda.conf +++ b/src/biruda.conf @@ -3,21 +3,29 @@ master { control = "ipc:///tmp/biruda.ipc" +# control = "inproc://control" +# control = "tcp://*:5555" } coordinator { control = "ipc:///tmp/biruda.ipc" +# control = "inproc://control" +# control = "tcp://localhost:5555" } worker worker1 { control = "ipc:///tmp/biruda.ipc" +# control = "inproc://control" +# control = "tcp://localhost:5555" } worker worker2 { control = "ipc:///tmp/biruda.ipc" +# control = "inproc://control" +# control = "tcp://localhost:5555" } webserver diff --git a/src/coordinator.c b/src/coordinator.c index 3b3d87c..ebdfdfc 100644 --- a/src/coordinator.c +++ b/src/coordinator.c @@ -1 +1,98 @@ #include "coordinator.h" + +#include <pthread.h> + +#include "nanomsg/nn.h" +#include "nanomsg/survey.h" + +#include "json-c/json.h" + +#include <stdio.h> +#include <unistd.h> +#include <string.h> + +static pthread_t coordinator_thread; +static int coordinator_sock; +static int coordinator_terminate = 0; + +static void *coordinator_func( void *thread_data ) +{ + char *control = (char *)thread_data; + + coordinator_sock = nn_socket( AF_SP, NN_RESPONDENT ); + + (void)nn_connect( coordinator_sock, control ); + + printf( "coordinator connected to %s\n", control ); + + sleep( 1 ); + + while( !coordinator_terminate ) { + printf( "coordinator idle: %d\n", coordinator_terminate ); + + sleep( 1 ); + + char *answer = NULL; + int bytes = nn_recv( coordinator_sock, &answer, NN_MSG, 0 ); + if( bytes == ETIMEDOUT ) continue; + if( bytes >= 0 ) { + printf( "coordinator received: %s\n", answer ); + nn_freemsg( answer ); + + json_object *obj = json_object_new_object( ); + json_object *op = json_object_new_string( "register" ); + json_object_object_add( obj, "op", op ); + json_object *role = json_object_new_string( "coordinator" ); + json_object_object_add( obj, "role", role ); + const char *msg = json_object_to_json_string( obj ); + int msg_size = strlen( msg ) + 1; + printf( "coordinator send: %s\n", msg ); + bytes = nn_send( coordinator_sock, msg, msg_size, 0 ); + if( bytes != msg_size ) { + fprintf( stderr, "ERROR: truncated message!" ); + } + json_object_put( obj ); + + } + } + + (void)nn_shutdown( coordinator_sock, 0 ); + + return NULL; +} + +int coordinator_init( const char *control ) +{ + pthread_attr_t attr; + int res; + + coordinator_terminate = 0; + + res = pthread_attr_init( &attr ); + if( res != 0 ) { + return 1; + } + + res = pthread_create( &coordinator_thread, &attr, coordinator_func, (void *)control ); + if( res != 0 ) { + return 1; + } + + return 0; +} + +int coordinator_free( ) +{ + void *result; + int res; + + coordinator_terminate = 1; + + res = pthread_join( coordinator_thread, &result ); + if( res != 0 ) { + return 1; + } + + return 0; + +} diff --git a/src/coordinator.h b/src/coordinator.h index a52fa6d..70661de 100644 --- a/src/coordinator.h +++ b/src/coordinator.h @@ -1,4 +1,7 @@ #ifndef _BIRUDA_COORDINGATOR_HEADER_INCLUDED #define _BIRUDA_COORDINGATOR_HEADER_INCLUDED +int coordinator_init( const char *control ); +int coordinator_free( ); + #endif diff --git a/src/master.c b/src/master.c index 3deba9e..80ce1a5 100644 --- a/src/master.c +++ b/src/master.c @@ -23,18 +23,25 @@ static void *master_func( void *thread_data ) (void)nn_bind( master_sock, control ); + printf( "master connected to %s\n", control ); + + sleep( 1 ); + + json_object *obj = json_object_new_object( ); + json_object *op = json_object_new_string( "discover" ); + json_object_object_add( obj, "op", op ); + const char *msg = json_object_to_json_string( obj ); + int msg_size = strlen( msg ) + 1; + printf( "master send: %s\n", msg ); + int bytes = nn_send( master_sock, msg, msg_size, 0 ); + if( bytes != msg_size ) { + fprintf( stderr, "ERROR: truncated message!" ); + } + json_object_put( obj ); + while( !master_terminate ) { - json_object *obj = json_object_new_object( ); - json_object *op = json_object_new_string( "discover" ); - json_object_object_add( obj, "op", op ); - const char *msg = json_object_to_json_string( obj ); - int msg_size = strlen( msg ) + 1; - printf( "master send: %s\n", msg ); - int bytes = nn_send( master_sock, msg, msg_size, 0 ); - if( bytes != msg_size ) { - fprintf( stderr, "ERROR: truncated message!" ); - } - json_object_put( obj ); + + printf( "master idle: %d\n", master_terminate ); sleep( 1 ); @@ -42,7 +49,7 @@ static void *master_func( void *thread_data ) bytes = nn_recv( master_sock, &answer, NN_MSG, 0 ); if( bytes == ETIMEDOUT ) continue; if( bytes >= 0 ) { - printf( "master recevied: %s\n", answer ); + printf( "master received: %s\n", answer ); nn_freemsg( answer ); } } |