summaryrefslogtreecommitdiff
path: root/src/master.c
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2014-11-13 21:50:54 +0100
committerAndreas Baumann <mail@andreasbaumann.cc>2014-11-13 21:50:54 +0100
commit7342b11a3c12f48b44597232723304150e868328 (patch)
tree23ff7738cc5c8a679ef73912fc368b170e4eab43 /src/master.c
parent5d92b737f031acd99e7e2011c679fe03d15930ec (diff)
downloadbiruda-7342b11a3c12f48b44597232723304150e868328.tar.gz
biruda-7342b11a3c12f48b44597232723304150e868328.tar.bz2
showing status and keeping track of coordinators
Diffstat (limited to 'src/master.c')
-rw-r--r--src/master.c66
1 files changed, 65 insertions, 1 deletions
diff --git a/src/master.c b/src/master.c
index 02037e5..f2d9835 100644
--- a/src/master.c
+++ b/src/master.c
@@ -14,6 +14,52 @@ static pthread_t master_thread;
static int master_sock;
static int master_must_terminate = 0;
+coordinator_t coordinator[MAX_COORDINATORS];
+
+static void register_coordinator( json_object *obj )
+{
+ json_object *host_obj;
+ json_object_object_get_ex( obj, "host", &host_obj );
+ const char *host = json_object_get_string( host_obj );
+
+ json_object *os_obj;
+ json_object_object_get_ex( obj, "os", &os_obj );
+ const char *os = json_object_get_string( os_obj );
+
+ json_object *arch_obj;
+ json_object_object_get_ex( obj, "arch", &arch_obj );
+ const char *arch = json_object_get_string( arch_obj );
+
+ json_object *cpus_obj;
+ json_object_object_get_ex( obj, "cpus", &cpus_obj );
+ unsigned int cpus = json_object_get_int( cpus_obj );
+
+ int pos;
+ for( pos = 0; pos < MAX_COORDINATORS; pos++ ) {
+ if( coordinator[pos].used &&
+ strcmp( coordinator[pos].host, host ) == 0 ) {
+ fprintf( stderr, "master: already registered coordinator from host '%s' (%d)\n", host, pos );
+ return;
+ }
+ }
+
+ for( pos = 0; pos < MAX_COORDINATORS && coordinator[pos].used; pos++ );
+
+ if( pos == MAX_COORDINATORS ) {
+ fprintf( stderr, "Can't accept more coordinators, limit reached!\n" );
+ return;
+ }
+ coordinator_t *coord = &coordinator[pos];
+
+ printf( "master: registering coordinator from host '%s' (%d)\n", host, pos );
+
+ coord->host = strdup( host );
+ coord->os = strdup( os );
+ coord->arch = strdup( arch );
+ coord->cpus = cpus;
+ coord->used = true;
+}
+
static void *master_func( void *thread_data )
{
char *control = (char *)thread_data;
@@ -62,7 +108,23 @@ NEXT_DISCOVER:
bytes = nn_recv( master_sock, &answer, NN_MSG, 0 );
if( master_must_terminate ) continue;
if( bytes >= 0 ) {
- printf( "master received: %s\n", answer );
+ json_object *recv_obj = json_tokener_parse( answer );
+ const char *recv_obj_str = json_object_to_json_string( recv_obj );
+ printf( "master received: %s\n", recv_obj_str );
+ json_object *op_obj;
+ json_object_object_get_ex( recv_obj, "op", &op_obj );
+ const char *op = json_object_get_string( op_obj );
+
+ if( strcmp( op, "register" ) == 0 ) {
+ json_object *role_obj;
+ json_object_object_get_ex( recv_obj, "role", &role_obj );
+ const char *role = json_object_get_string( role_obj );
+ if( strcmp( role, "coordinator" ) == 0 ) {
+ register_coordinator( recv_obj );
+ }
+ }
+
+ json_object_put( recv_obj );
nn_freemsg( answer );
}
if( bytes < 0 ) {
@@ -96,6 +158,8 @@ int master_init( const char *control )
master_must_terminate = 0;
+ memset( coordinator, 0, sizeof( coordinator_t * ) * MAX_COORDINATORS );
+
res = pthread_attr_init( &attr );
if( res != 0 ) {
return 1;