summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2014-09-14 09:24:10 +0200
committerAndreas Baumann <mail@andreasbaumann.cc>2014-09-14 09:24:10 +0200
commit12b41ea402aef5b932eb0ba2c123ecd94b48f44f (patch)
tree7397930967105dd55cb10f05e61dcc1cb7a8fe04
parent5257e805a9499229af234180a9d68104a563cf80 (diff)
downloadbiruda-12b41ea402aef5b932eb0ba2c123ecd94b48f44f.tar.gz
biruda-12b41ea402aef5b932eb0ba2c123ecd94b48f44f.tar.bz2
got a primitive discovery test running
-rw-r--r--src/biruda.c18
-rw-r--r--src/biruda.conf8
-rw-r--r--src/coordinator.c97
-rw-r--r--src/coordinator.h3
-rw-r--r--src/master.c31
5 files changed, 144 insertions, 13 deletions
diff --git a/src/biruda.c b/src/biruda.c
index 6e8a932..f013712 100644
--- a/src/biruda.c
+++ b/src/biruda.c
@@ -131,6 +131,14 @@ static int create_master( cfg_t *cfg )
return master_init( control );
}
+static int create_coordinator( cfg_t *cfg )
+{
+ cfg_t *master_cfg = cfg_getnsec( cfg, "coordinator", 0 );
+ char *control = cfg_getstr( master_cfg, "control" );
+
+ return coordinator_init( control );
+}
+
int main( int argc, char *argv[] )
{
struct gengetopt_args_info args_info;
@@ -159,6 +167,13 @@ int main( int argc, char *argv[] )
cmdline_parser_free( &args_info );
return( ( test_config( &args_info ) == 0 ) ? EXIT_SUCCESS : EXIT_FAILURE );
}
+
+ if( create_coordinator( cfg ) != 0 ) {
+ fprintf( stderr, "FATAL: Unable to create coordinator thread!\n" );
+ cfg_free( cfg );
+ cmdline_parser_free( &args_info );
+ exit( EXIT_FAILURE );
+ }
if( create_master( cfg ) != 0 ) {
fprintf( stderr, "FATAL: Unable to create master thread!\n" );
@@ -166,9 +181,10 @@ int main( int argc, char *argv[] )
cmdline_parser_free( &args_info );
exit( EXIT_FAILURE );
}
-
+
sleep( 4 );
+ coordinator_free( );
master_free( );
cfg_free( cfg );
diff --git a/src/biruda.conf b/src/biruda.conf
index b5ec9d2..6b22e7d 100644
--- a/src/biruda.conf
+++ b/src/biruda.conf
@@ -3,21 +3,29 @@
master
{
control = "ipc:///tmp/biruda.ipc"
+# control = "inproc://control"
+# control = "tcp://*:5555"
}
coordinator
{
control = "ipc:///tmp/biruda.ipc"
+# control = "inproc://control"
+# control = "tcp://localhost:5555"
}
worker worker1
{
control = "ipc:///tmp/biruda.ipc"
+# control = "inproc://control"
+# control = "tcp://localhost:5555"
}
worker worker2
{
control = "ipc:///tmp/biruda.ipc"
+# control = "inproc://control"
+# control = "tcp://localhost:5555"
}
webserver
diff --git a/src/coordinator.c b/src/coordinator.c
index 3b3d87c..ebdfdfc 100644
--- a/src/coordinator.c
+++ b/src/coordinator.c
@@ -1 +1,98 @@
#include "coordinator.h"
+
+#include <pthread.h>
+
+#include "nanomsg/nn.h"
+#include "nanomsg/survey.h"
+
+#include "json-c/json.h"
+
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+
+static pthread_t coordinator_thread;
+static int coordinator_sock;
+static int coordinator_terminate = 0;
+
+static void *coordinator_func( void *thread_data )
+{
+ char *control = (char *)thread_data;
+
+ coordinator_sock = nn_socket( AF_SP, NN_RESPONDENT );
+
+ (void)nn_connect( coordinator_sock, control );
+
+ printf( "coordinator connected to %s\n", control );
+
+ sleep( 1 );
+
+ while( !coordinator_terminate ) {
+ printf( "coordinator idle: %d\n", coordinator_terminate );
+
+ sleep( 1 );
+
+ char *answer = NULL;
+ int bytes = nn_recv( coordinator_sock, &answer, NN_MSG, 0 );
+ if( bytes == ETIMEDOUT ) continue;
+ if( bytes >= 0 ) {
+ printf( "coordinator received: %s\n", answer );
+ nn_freemsg( answer );
+
+ json_object *obj = json_object_new_object( );
+ json_object *op = json_object_new_string( "register" );
+ json_object_object_add( obj, "op", op );
+ json_object *role = json_object_new_string( "coordinator" );
+ json_object_object_add( obj, "role", role );
+ const char *msg = json_object_to_json_string( obj );
+ int msg_size = strlen( msg ) + 1;
+ printf( "coordinator send: %s\n", msg );
+ bytes = nn_send( coordinator_sock, msg, msg_size, 0 );
+ if( bytes != msg_size ) {
+ fprintf( stderr, "ERROR: truncated message!" );
+ }
+ json_object_put( obj );
+
+ }
+ }
+
+ (void)nn_shutdown( coordinator_sock, 0 );
+
+ return NULL;
+}
+
+int coordinator_init( const char *control )
+{
+ pthread_attr_t attr;
+ int res;
+
+ coordinator_terminate = 0;
+
+ res = pthread_attr_init( &attr );
+ if( res != 0 ) {
+ return 1;
+ }
+
+ res = pthread_create( &coordinator_thread, &attr, coordinator_func, (void *)control );
+ if( res != 0 ) {
+ return 1;
+ }
+
+ return 0;
+}
+
+int coordinator_free( )
+{
+ void *result;
+ int res;
+
+ coordinator_terminate = 1;
+
+ res = pthread_join( coordinator_thread, &result );
+ if( res != 0 ) {
+ return 1;
+ }
+
+ return 0;
+
+}
diff --git a/src/coordinator.h b/src/coordinator.h
index a52fa6d..70661de 100644
--- a/src/coordinator.h
+++ b/src/coordinator.h
@@ -1,4 +1,7 @@
#ifndef _BIRUDA_COORDINGATOR_HEADER_INCLUDED
#define _BIRUDA_COORDINGATOR_HEADER_INCLUDED
+int coordinator_init( const char *control );
+int coordinator_free( );
+
#endif
diff --git a/src/master.c b/src/master.c
index 3deba9e..80ce1a5 100644
--- a/src/master.c
+++ b/src/master.c
@@ -23,18 +23,25 @@ static void *master_func( void *thread_data )
(void)nn_bind( master_sock, control );
+ printf( "master connected to %s\n", control );
+
+ sleep( 1 );
+
+ json_object *obj = json_object_new_object( );
+ json_object *op = json_object_new_string( "discover" );
+ json_object_object_add( obj, "op", op );
+ const char *msg = json_object_to_json_string( obj );
+ int msg_size = strlen( msg ) + 1;
+ printf( "master send: %s\n", msg );
+ int bytes = nn_send( master_sock, msg, msg_size, 0 );
+ if( bytes != msg_size ) {
+ fprintf( stderr, "ERROR: truncated message!" );
+ }
+ json_object_put( obj );
+
while( !master_terminate ) {
- json_object *obj = json_object_new_object( );
- json_object *op = json_object_new_string( "discover" );
- json_object_object_add( obj, "op", op );
- const char *msg = json_object_to_json_string( obj );
- int msg_size = strlen( msg ) + 1;
- printf( "master send: %s\n", msg );
- int bytes = nn_send( master_sock, msg, msg_size, 0 );
- if( bytes != msg_size ) {
- fprintf( stderr, "ERROR: truncated message!" );
- }
- json_object_put( obj );
+
+ printf( "master idle: %d\n", master_terminate );
sleep( 1 );
@@ -42,7 +49,7 @@ static void *master_func( void *thread_data )
bytes = nn_recv( master_sock, &answer, NN_MSG, 0 );
if( bytes == ETIMEDOUT ) continue;
if( bytes >= 0 ) {
- printf( "master recevied: %s\n", answer );
+ printf( "master received: %s\n", answer );
nn_freemsg( answer );
}
}