summaryrefslogtreecommitdiff
path: root/src/coordinator.c
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2014-11-29 20:33:32 +0100
committerAndreas Baumann <mail@andreasbaumann.cc>2014-11-29 20:33:32 +0100
commit57aadcf79d1bb62793875e46030ee27dc42f5237 (patch)
treee021244af4bae4145c3731017bed690829307714 /src/coordinator.c
parent398ff5ab52dc4f9000461bc9ddec1d6cf50458be (diff)
downloadbiruda-57aadcf79d1bb62793875e46030ee27dc42f5237.tar.gz
biruda-57aadcf79d1bb62793875e46030ee27dc42f5237.tar.bz2
implemented coordinator part of starting a worker
Diffstat (limited to 'src/coordinator.c')
-rw-r--r--src/coordinator.c85
1 files changed, 85 insertions, 0 deletions
diff --git a/src/coordinator.c b/src/coordinator.c
index 303b31f..8c34997 100644
--- a/src/coordinator.c
+++ b/src/coordinator.c
@@ -16,6 +16,8 @@
#include "system.h"
+#include "port.h"
+
static pthread_t coordinator_thread;
static int coordinator_sock;
static int coordinator_must_terminate = 0;
@@ -23,6 +25,41 @@ static int coordinator_must_terminate = 0;
int nof_workers = 0;
worker_t worker[MAX_WORKERS];
+// TODO: also here: tons of copy paste!
+
+static char *create_worker_started_answer( const char *name, bool worker_found )
+{
+ json_object *obj = json_object_new_object( );
+ json_object *op = json_object_new_string( "started" );
+ json_object_object_add( obj, "op", op );
+
+ /* we are a coordinator */
+ json_object *role = json_object_new_string( "coordinator" );
+ json_object_object_add( obj, "role", role );
+
+ /* return hostname for unique identification (maybe better
+ * or additionally the MAC of the first interface or hostid?)
+ */
+ char hostname[100];
+ gethostname( hostname, sizeof( hostname ) );
+ json_object *host = json_object_new_string( hostname );
+ json_object_object_add( obj, "host", host );
+
+ json_object *worker = json_object_new_string( name );
+ json_object_object_add( obj, "worker", worker );
+
+ json_object *found = json_object_new_boolean( worker_found );
+ json_object_object_add( obj, "found", found );
+
+ /* produce message as string, caller must free it */
+ const char *msg = json_object_to_json_string( obj );
+ char *res = strdup( msg );
+
+ json_object_put( obj );
+
+ return res;
+}
+
static char *create_register_answer( )
{
json_object *obj = json_object_new_object( );
@@ -96,6 +133,25 @@ static char *create_register_answer( )
return res;
}
+static int coordinator_start_worker( const char *name, bool *found )
+{
+ for( int i = 0; i < nof_workers; i++ ) {
+ worker_t *w = &worker[i];
+ if( strcmp( name, w->name ) == 0 ) {
+ *found = true;
+ break;
+ }
+ }
+
+ if( !( *found ) ) {
+ return 0;
+ }
+
+ printf( "STARTING WORKER '%s'\n", name );
+
+ return 0;
+}
+
static void *coordinator_func( void *thread_data )
{
char *control = (char *)thread_data;
@@ -140,6 +196,35 @@ static void *coordinator_func( void *thread_data )
}
free( msg );
+ } else if( strcmp( op, "start" ) == 0 ) {
+
+ json_object *worker_obj;
+ json_object_object_get_ex( recv_obj, "worker", &worker_obj );
+ const char *worker = json_object_get_string( worker_obj );
+
+ bool found;
+ int res = coordinator_start_worker( worker, &found );
+
+ char *msg = create_worker_started_answer( worker, found );
+ size_t msg_size = strlen( msg ) + 1;
+
+ // TODO: tons of copy-paste code here, think!
+ printf( "coordinator send: %s\n", msg );
+ bytes = nn_send( coordinator_sock, msg, msg_size, 0 );
+ if( bytes < 0 ) {
+ if( errno == ETERM ) {
+ coordinator_must_terminate = 1;
+ continue;
+ } else {
+ fprintf( stderr, "ERROR: nn_send returned %d: %s (%d)\n",
+ bytes, nn_strerror( errno ), errno );
+ }
+ }
+ if( bytes != msg_size ) {
+ fprintf( stderr, "ERROR: truncated message!" );
+ }
+
+ free( msg );
}
json_object_put( recv_obj );