summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2014-12-05 20:30:27 +0100
committerAndreas Baumann <mail@andreasbaumann.cc>2014-12-05 20:30:27 +0100
commitc81701d1bf5f20932ba77b3b1e1893a51e182f73 (patch)
tree6d8658f8ea40f114ff759c26077e2093e74ac817 /src
parent67ba87a159fb55196092e59c041af6d69ce07e75 (diff)
downloadbiruda-c81701d1bf5f20932ba77b3b1e1893a51e182f73.tar.gz
biruda-c81701d1bf5f20932ba77b3b1e1893a51e182f73.tar.bz2
first working version of worker spool files in the master
Diffstat (limited to 'src')
-rw-r--r--src/biruda.c4
-rw-r--r--src/biruda.conf2
-rw-r--r--src/master.c81
-rw-r--r--src/master.h2
-rw-r--r--src/port.h7
-rw-r--r--src/worker.h6
6 files changed, 93 insertions, 9 deletions
diff --git a/src/biruda.c b/src/biruda.c
index 3fae736..98733fe 100644
--- a/src/biruda.c
+++ b/src/biruda.c
@@ -143,6 +143,7 @@ static int read_config( const char *filename, cfg_t **cfg )
cfg_opt_t opts_master[] = {
CFG_STR( "control", 0, CFGF_NODEFAULT ),
CFG_STR( "data", 0, CFGF_NODEFAULT ),
+ CFG_STR( "spool_dir", 0, CFGF_NODEFAULT ),
CFG_END( )
};
@@ -269,8 +270,9 @@ static int create_master( cfg_t *cfg )
cfg_t *master_cfg = cfg_getnsec( cfg, "master", 0 );
char *control = cfg_getstr( master_cfg, "control" );
char *data = cfg_getstr( master_cfg, "data" );
+ char *spool_dir = cfg_getstr( master_cfg, "spool_dir" );
- return master_init( control, data );
+ return master_init( control, data, spool_dir );
}
static int create_coordinator( cfg_t *cfg )
diff --git a/src/biruda.conf b/src/biruda.conf
index 9269bab..d7fae19 100644
--- a/src/biruda.conf
+++ b/src/biruda.conf
@@ -9,6 +9,8 @@ master
# data = "ipc:///tmp/biruda_data.ipc"
# data = "inproc://data"
data = "tcp://*:5556"
+
+ spool_dir = "."
}
coordinator
diff --git a/src/master.c b/src/master.c
index d1f002d..0c59db1 100644
--- a/src/master.c
+++ b/src/master.c
@@ -79,6 +79,10 @@ static void update_workers( coordinator_t *coord, json_object *obj )
// TODO: check for deleted workers
}
+typedef struct {
+ FILE *spool_file;
+} execution_worker_data_t;
+
static int register_coordinator( json_object *obj )
{
json_object *host_obj;
@@ -162,6 +166,10 @@ static int register_coordinator( json_object *obj )
} else {
w->mode = WORKER_EXECUTION_DISABLED;
}
+
+ w->execution_data = malloc( sizeof( execution_worker_data_t ) );
+ execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data;
+ wed->spool_file = NULL;
json_object *state_obj;
json_object_object_get_ex( worker_obj, "state", &state_obj );
@@ -283,10 +291,65 @@ static char *get_next_request( )
typedef struct {
const char *control;
const char *data;
+ const char *spool_dir;
} master_thread_data_t;
static master_thread_data_t master_thread_data;
+static worker_t *worker_by_name( const char *name )
+{
+ for( int c = 0; c < MAX_COORDINATORS; c++ ) {
+ if( coordinator[c].used ) {
+ for( int i = 0; i < coordinator[c].nof_workers; i++ ) {
+ worker_t *w = &coordinator[c].worker[i];
+ if( strcmp( name, w->name ) == 0 ) {
+ return w;
+ }
+ }
+ }
+ }
+
+ return NULL;
+}
+
+static int master_output_init( const char *spool_dir, const char *name )
+{
+ char filename[1024];
+ worker_t *worker = worker_by_name( name );
+
+ snprintf( filename, sizeof( filename ), "%s%c%s.output",
+ spool_dir, PORT_DIR_SEPARATOR, worker->name );
+
+ execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data;
+ wed->spool_file = fopen( filename, "a+" );
+
+ return 0;
+}
+
+static int master_output_free( const char *name )
+{
+ worker_t *worker = worker_by_name( name );
+ execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data;
+
+ if( wed->spool_file != NULL ) {
+ fclose( wed->spool_file );
+ wed->spool_file = NULL;
+ }
+
+ return 0;
+}
+
+static int master_output_write( const char *name, const char *msg )
+{
+ worker_t *worker = worker_by_name( name );
+ execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data;
+
+ fputs( msg, wed->spool_file );
+ fflush( wed->spool_file );
+
+ return 0;
+}
+
static void *master_func( void *thread_data )
{
master_thread_data_t *tdata = (master_thread_data_t *)thread_data;
@@ -391,6 +454,16 @@ NEXT_COMMAND:
continue;
}
}
+ } else if( strcmp( op, "started" ) == 0 ) {
+ json_object *worker_obj;
+ json_object_object_get_ex( recv_obj, "worker", &worker_obj );
+ const char *worker = json_object_get_string( worker_obj );
+ master_output_init( tdata->spool_dir, worker );
+ } else if( strcmp( op, "stopped" ) == 0 ) {
+ json_object *worker_obj;
+ json_object_object_get_ex( recv_obj, "worker", &worker_obj );
+ const char *worker = json_object_get_string( worker_obj );
+ master_output_free( worker );
} else if( strcmp( op, "output" ) == 0 ) {
json_object *worker_obj;
json_object_object_get_ex( recv_obj, "worker", &worker_obj );
@@ -398,9 +471,7 @@ NEXT_COMMAND:
json_object *msg_obj;
json_object_object_get_ex( recv_obj, "msg", &msg_obj );
const char *msg = json_object_get_string( msg_obj );
-
- printf( "Got message from worker '%s': %s\n", worker, msg );
-
+ master_output_write( worker, msg );
} else {
fprintf( stderr, "WARNING: master received unkown message: %s\n", answer );
}
@@ -433,7 +504,7 @@ NEXT_COMMAND:
return NULL;
}
-int master_init( const char *control, const char *data )
+int master_init( const char *control, const char *data, const char *spool_dir )
{
pthread_attr_t attr;
int res;
@@ -449,6 +520,7 @@ int master_init( const char *control, const char *data )
master_thread_data.control = control;
master_thread_data.data = data;
+ master_thread_data.spool_dir = spool_dir;
res = pthread_create( &master_thread, &attr, master_func, (void *)&master_thread_data );
if( res != 0 ) {
@@ -495,3 +567,4 @@ int master_stop_worker( const char *name )
return res;
}
+
diff --git a/src/master.h b/src/master.h
index a19dd8d..616a763 100644
--- a/src/master.h
+++ b/src/master.h
@@ -4,7 +4,7 @@
#include "port.h"
#include "coordinator.h"
-int master_init( const char *control, const char *data );
+int master_init( const char *control, const char *data, const char *spool_dir );
void master_terminate( int terminate_nano_msg );
int master_free( );
diff --git a/src/port.h b/src/port.h
index 3d98280..5fefa88 100644
--- a/src/port.h
+++ b/src/port.h
@@ -30,4 +30,11 @@ typedef unsigned char _Bool;
#include <stdbool.h>
#endif
+// directory separator
+#ifdef _WIN32
+#define PORT_DIR_SEPARATOR '\\'
+#else
+#define PORT_DIR_SEPARATOR '/'
+#endif
+
#endif
diff --git a/src/worker.h b/src/worker.h
index a5f0ce9..0e243af 100644
--- a/src/worker.h
+++ b/src/worker.h
@@ -1,6 +1,8 @@
#ifndef _BIRUDA_WORKER_HEADER_INCLUDED
#define _BIRUDA_WORKER_HEADER_INCLUDED
+#include <stdio.h>
+
typedef enum {
WORKER_EXECUTION_DISABLED = 0,
WORKER_EXECUTION_DIRECT = 1
@@ -11,14 +13,12 @@ typedef enum {
WORKER_STATE_RUNNING = 1
} worker_state_t;
-typedef void *worker_execution_data_t;
-
typedef struct {
char *name;
worker_execution_mode_t mode;
char *command;
worker_state_t state;
- worker_execution_data_t execution_data;
+ void *execution_data;
const char *control;
const char *data;
} worker_t;