summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2014-11-16 17:10:59 +0100
committerAndreas Baumann <mail@andreasbaumann.cc>2014-11-16 17:10:59 +0100
commit3ecaed99580f378fbcb2598b2570c65d0f3ebc99 (patch)
tree3b972c7da2a3143727caf901b5ed4fa1f9099521
parent842308eb541ecf78391addc0b6778f8f1daab1e4 (diff)
downloadbiruda-3ecaed99580f378fbcb2598b2570c65d0f3ebc99.tar.gz
biruda-3ecaed99580f378fbcb2598b2570c65d0f3ebc99.tar.bz2
..
-rw-r--r--PROTOCOL6
-rw-r--r--src/biruda.c14
-rw-r--r--src/coordinator.c60
-rw-r--r--src/coordinator.h21
-rw-r--r--src/master.c4
-rw-r--r--src/master.h13
-rw-r--r--src/webserver.c5
-rw-r--r--src/worker.c1
-rw-r--r--src/worker.h10
9 files changed, 117 insertions, 17 deletions
diff --git a/PROTOCOL b/PROTOCOL
index f3d48ff..35ed369 100644
--- a/PROTOCOL
+++ b/PROTOCOL
@@ -39,3 +39,9 @@ On receiving a 'register' operation the master must handle accordingly,
usually adding the coordinator as known and alive and provide new
platforms and architectures to run workers on. Also currently scheduled
jobs must be examined.
+
+The coordinators also send all known workers to the master as a list:
+
+{ "workers": { "name": "worker1", "mode": "direct", "command": "build.sh" }
+ { "name": "worker2", "mode": "direct", "command": "build.sh" }
+}
diff --git a/src/biruda.c b/src/biruda.c
index 1ec8d5a..362ca50 100644
--- a/src/biruda.c
+++ b/src/biruda.c
@@ -202,8 +202,20 @@ static int create_coordinator( cfg_t *cfg )
{
cfg_t *master_cfg = cfg_getnsec( cfg, "coordinator", 0 );
char *control = cfg_getstr( master_cfg, "control" );
+
+ int ret = coordinator_init( control );
+
+ unsigned int nof_workers = cfg_size( cfg, "worker" );
+ if( nof_workers > 0 ) {
+ for( unsigned int i = 0; i < nof_workers; i++ ) {
+ cfg_t *worker_cfg = cfg_getnsec( cfg, "worker", i );
+ coordinator_add_worker( cfg_title( worker_cfg ),
+ (worker_execution_mode_t)cfg_getint( worker_cfg, "execution" ),
+ cfg_getstr( worker_cfg, "command" ) );
+ }
+ }
- return coordinator_init( control );
+ return ret;
}
#ifndef _WIN32
diff --git a/src/coordinator.c b/src/coordinator.c
index b88313e..cb462b9 100644
--- a/src/coordinator.c
+++ b/src/coordinator.c
@@ -22,7 +22,10 @@
static pthread_t coordinator_thread;
static int coordinator_sock;
static int coordinator_must_terminate = 0;
-
+
+int nof_workers = 0;
+worker_t worker[MAX_WORKERS];
+
static char *create_discover_answer( )
{
json_object *obj = json_object_new_object( );
@@ -66,7 +69,27 @@ static char *create_discover_answer( )
system_arch( machine_arch, sizeof( machine_arch ) );
json_object *arch = json_object_new_string( machine_arch );
json_object_object_add( obj, "arch", arch );
+
+ json_object *worker_array_obj = json_object_new_array( );
+ for( int i = 0; i < nof_workers; i++ ) {
+ worker_t *w = &worker[i];
+ json_object *worker_obj = json_object_new_object( );
+
+ json_object *name_obj = json_object_new_string( w->name );
+ json_object_object_add( worker_obj, "name", name_obj );
+
+ json_object *mode_obj = json_object_new_string( worker_exection_mode_str( w->mode ) );
+ json_object_object_add( worker_obj, "mode", mode_obj );
+ if( w->mode != WORKER_EXECUTION_DISABLED ) {
+ json_object *command_obj = json_object_new_string( w->command );
+ json_object_object_add( worker_obj, "command", command_obj );
+ }
+
+ json_object_array_add( worker_array_obj, worker_obj );
+ }
+ json_object_object_add( obj, "workers", worker_array_obj );
+
/* produce message as string, caller must free it */
const char *msg = json_object_to_json_string( obj );
char *res = strdup( msg );
@@ -151,6 +174,7 @@ int coordinator_init( const char *control )
int res;
coordinator_must_terminate = 0;
+ nof_workers = 0;
res = pthread_attr_init( &attr );
if( res != 0 ) {
@@ -172,6 +196,16 @@ void coordinator_terminate( )
nn_term( );
}
+static void free_workers( )
+{
+ for( int i = 0; i < nof_workers; i++ ) {
+ worker_t *w = &worker[i];
+
+ free( w->name );
+ free( w->command );
+ }
+}
+
int coordinator_free( )
{
void *result;
@@ -181,6 +215,30 @@ int coordinator_free( )
if( res != 0 ) {
return 1;
}
+
+ free_workers( );
+
+ return 0;
+}
+int coordinator_add_worker( const char *name, worker_execution_mode_t mode, const char *command )
+{
+ if( nof_workers >= MAX_WORKERS ) {
+ fprintf( stderr, "Can't define more workers, limit reached!\n" );
+ return -1;
+ }
+
+ worker_t *w = &worker[nof_workers];
+ w->name = strdup( name );
+ w->mode = mode;
+ if( command != NULL ) {
+ w->command = strdup( command );
+ } else {
+ w->mode = WORKER_EXECUTION_DISABLED;
+ }
+
+ nof_workers++;
+
return 0;
}
+
diff --git a/src/coordinator.h b/src/coordinator.h
index 63b8826..d67a3b6 100644
--- a/src/coordinator.h
+++ b/src/coordinator.h
@@ -1,8 +1,29 @@
#ifndef _BIRUDA_COORDINGATOR_HEADER_INCLUDED
#define _BIRUDA_COORDINGATOR_HEADER_INCLUDED
+#include "worker.h"
+
+#include <time.h>
+
+#include "port.h"
+
+#define MAX_WORKERS 128
+
+typedef struct {
+ char *host;
+ char *os;
+ char *arch;
+ unsigned int cpus;
+ bool used;
+ bool alive;
+ time_t lastAlive;
+ worker_t worker[MAX_WORKERS];
+ int nof_workers;
+} coordinator_t;
+
int coordinator_init( const char *control );
void coordinator_terminate( );
int coordinator_free( );
+int coordinator_add_worker( const char *name, worker_execution_mode_t mode, const char *command );
#endif
diff --git a/src/master.c b/src/master.c
index 84fd062..7933c0b 100644
--- a/src/master.c
+++ b/src/master.c
@@ -79,6 +79,10 @@ static void register_coordinator( json_object *obj )
coord->alive = true;
time( &coord->lastAlive );
coord->used = true;
+
+ json_object *worker_array;
+ json_object_object_get_ex( obj, "workers", &worker_array );
+ coord->nof_workers = json_object_array_length( worker_array );
}
static void *master_func( void *thread_data )
diff --git a/src/master.h b/src/master.h
index 9f67282..06461d9 100644
--- a/src/master.h
+++ b/src/master.h
@@ -2,8 +2,7 @@
#define _BIRUDA_MASTER_HEADER_INCLUDED
#include "port.h"
-
-#include <time.h>
+#include "coordinator.h"
int master_init( const char *control );
void master_terminate( int terminate_nano_msg );
@@ -13,16 +12,6 @@ int master_free( );
#define MAX_COORDINATOR_AGE 10
-typedef struct {
- char *host;
- char *os;
- char *arch;
- unsigned int cpus;
- bool used;
- bool alive;
- time_t lastAlive;
-} coordinator_t;
-
extern coordinator_t coordinator[MAX_COORDINATORS];
#endif
diff --git a/src/webserver.c b/src/webserver.c
index ccf030c..908c2a7 100644
--- a/src/webserver.c
+++ b/src/webserver.c
@@ -28,10 +28,11 @@ static int handle_request( void *cls, struct MHD_Connection *connection,
coordinator_t *c = &coordinator[pos];
if( c->used ) {
char part[256];
- snprintf( part, 256, "%s %s %s %d %s %lld (%d)\n",
+ snprintf( part, 256, "%s %s %s %d %s %lld (%d) %d\n",
c->host, c->os, c->arch, c->cpus,
( c->alive ? "alive" : "dead" ),
- (long long)c->lastAlive, pos );
+ (long long)c->lastAlive, pos,
+ c->nof_workers );
strncat( biruda_msg, part, 2048 );
}
}
diff --git a/src/worker.c b/src/worker.c
index 54e3982..63a53eb 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -7,6 +7,7 @@
const char *worker_exection_mode_str( worker_execution_mode_t mode )
{
switch( mode ) {
+ case WORKER_EXECUTION_DISABLED: return "disabled";
case WORKER_EXECUTION_DIRECT: return "direct";
default: return "<unknown>";
}
diff --git a/src/worker.h b/src/worker.h
index 151aa77..29c6e37 100644
--- a/src/worker.h
+++ b/src/worker.h
@@ -2,10 +2,18 @@
#define _BIRUDA_WORKER_HEADER_INCLUDED
typedef enum {
- WORKER_EXECUTION_DIRECT
+ WORKER_EXECUTION_DISABLED = 0,
+ WORKER_EXECUTION_DIRECT = 1
} worker_execution_mode_t;
+typedef struct {
+ char *name;
+ worker_execution_mode_t mode;
+ char *command;
+} worker_t;
+
const char *worker_exection_mode_str( worker_execution_mode_t mode );
+worker_execution_mode_t worker_execution_mode_from_int( unsigned int v );
int worker_init( const char *control );
void worker_terminate( );