summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2014-12-04 19:35:19 +0100
committerAndreas Baumann <mail@andreasbaumann.cc>2014-12-04 19:35:19 +0100
commitaad5ac1625e6830bd63b57f646479883f06f7d88 (patch)
tree95561271f1981b214e7dec675e24cd107ee289f4 /src
parent720079063a4e97794dada308faac5834c0c2b067 (diff)
downloadbiruda-aad5ac1625e6830bd63b57f646479883f06f7d88.tar.gz
biruda-aad5ac1625e6830bd63b57f646479883f06f7d88.tar.bz2
added stopping of worker
Diffstat (limited to 'src')
-rw-r--r--src/coordinator.c89
-rw-r--r--src/master.c27
-rw-r--r--src/master.h1
-rw-r--r--src/webserver.c28
-rw-r--r--src/worker.c9
5 files changed, 153 insertions, 1 deletions
diff --git a/src/coordinator.c b/src/coordinator.c
index 30afb62..1366d88 100644
--- a/src/coordinator.c
+++ b/src/coordinator.c
@@ -61,6 +61,39 @@ static char *create_worker_started_answer( const char *name, bool worker_found )
return res;
}
+static char *create_worker_stopped_answer( const char *name, bool worker_found )
+{
+ json_object *obj = json_object_new_object( );
+ json_object *op = json_object_new_string( "stopped" );
+ json_object_object_add( obj, "op", op );
+
+ /* we are a coordinator */
+ json_object *role = json_object_new_string( "coordinator" );
+ json_object_object_add( obj, "role", role );
+
+ /* return hostname for unique identification (maybe better
+ * or additionally the MAC of the first interface or hostid?)
+ */
+ char hostname[100];
+ gethostname( hostname, sizeof( hostname ) );
+ json_object *host = json_object_new_string( hostname );
+ json_object_object_add( obj, "host", host );
+
+ json_object *worker = json_object_new_string( name );
+ json_object_object_add( obj, "worker", worker );
+
+ json_object *found = json_object_new_boolean( worker_found );
+ json_object_object_add( obj, "found", found );
+
+ /* produce message as string, caller must free it */
+ const char *msg = json_object_to_json_string( obj );
+ char *res = strdup( msg );
+
+ json_object_put( obj );
+
+ return res;
+}
+
static char *create_register_answer( )
{
json_object *obj = json_object_new_object( );
@@ -162,6 +195,32 @@ static int coordinator_start_worker( const char *name, bool *found )
return 0;
}
+static int coordinator_stop_worker( const char *name, bool *found )
+{
+ worker_t *w = NULL;
+ worker_t *wfound = NULL;
+ *found = false;
+ for( int i = 0; i < nof_workers; i++ ) {
+ w = &worker[i];
+ if( strcmp( name, w->name ) == 0 ) {
+ *found = true;
+ wfound = w;
+ break;
+ }
+ }
+
+ if( !( *found ) ) {
+ return 0;
+ }
+
+ printf( "STOPPING WORKER '%s'\n", wfound->name );
+
+ worker_terminate( wfound );
+
+ return 0;
+}
+
+
static void *coordinator_func( void *thread_data )
{
char *control = (char *)thread_data;
@@ -240,6 +299,36 @@ static void *coordinator_func( void *thread_data )
}
free( msg );
+ } else if( strcmp( op, "stop" ) == 0 ) {
+
+ json_object *worker_obj;
+ json_object_object_get_ex( recv_obj, "worker", &worker_obj );
+ const char *worker = json_object_get_string( worker_obj );
+
+ bool found;
+ /*int res = */
+ (void)coordinator_stop_worker( worker, &found );
+
+ char *msg = create_worker_stopped_answer( worker, found );
+ size_t msg_size = strlen( msg ) + 1;
+
+ // TODO: tons of copy-paste code here, think!
+ printf( "coordinator send: %s\n", msg );
+ bytes = nn_send( coordinator_sock, msg, msg_size, 0 );
+ if( bytes < 0 ) {
+ if( errno == ETERM ) {
+ coordinator_must_terminate = 1;
+ continue;
+ } else {
+ fprintf( stderr, "ERROR: nn_send returned %d: %s (%d)\n",
+ bytes, nn_strerror( errno ), errno );
+ }
+ }
+ if( bytes != msg_size ) {
+ fprintf( stderr, "ERROR: truncated message!" );
+ }
+
+ free( msg );
}
json_object_put( recv_obj );
diff --git a/src/master.c b/src/master.c
index d7bc64c..9a35a73 100644
--- a/src/master.c
+++ b/src/master.c
@@ -217,6 +217,25 @@ static char *create_start_request( const char *name )
return res;
}
+static char *create_stop_request( const char *name )
+{
+ json_object *obj = json_object_new_object( );
+ json_object *op = json_object_new_string( "stop" );
+ json_object_object_add( obj, "op", op );
+ json_object *role = json_object_new_string( "master" );
+ json_object_object_add( obj, "role", role );
+ json_object *worker = json_object_new_string( name );
+ json_object_object_add( obj, "worker", worker );
+
+ /* produce message as string, caller must free it */
+ const char *msg = json_object_to_json_string( obj );
+ char *res = alloc_message( msg );
+
+ json_object_put( obj );
+
+ return res;
+}
+
#define MAX_NOF_COMMANDS 12
static int nof_queued_commands = 0;
@@ -408,3 +427,11 @@ int master_start_worker( const char *name )
return res;
}
+
+int master_stop_worker( const char *name )
+{
+ char *msg = create_stop_request( name );
+ int res = enqueue_request( msg );
+
+ return res;
+}
diff --git a/src/master.h b/src/master.h
index cdbb621..60d1061 100644
--- a/src/master.h
+++ b/src/master.h
@@ -9,6 +9,7 @@ void master_terminate( int terminate_nano_msg );
int master_free( );
int master_start_worker( const char *name );
+int master_stop_worker( const char *name );
#define MAX_COORDINATORS 128
diff --git a/src/webserver.c b/src/webserver.c
index 01dcd51..312105a 100644
--- a/src/webserver.c
+++ b/src/webserver.c
@@ -65,7 +65,11 @@ static int handle_request( void *cls, struct MHD_Connection *connection,
printf( "Got POST operation '%s'\n", op );
- if( op != NULL && strcmp( op, "start" ) == 0 ) {
+ if( op == NULL ) {
+ return MHD_NO;
+ }
+
+ if( strcmp( op, "start" ) == 0 ) {
const char *name = MHD_lookup_connection_value( connection, MHD_GET_ARGUMENT_KIND, "name" );
@@ -86,6 +90,28 @@ static int handle_request( void *cls, struct MHD_Connection *connection,
ret = MHD_queue_response( connection, ( res == 0 ) ? MHD_HTTP_OK : MHD_HTTP_INTERNAL_SERVER_ERROR, response );
MHD_destroy_response( response );
}
+ } else if( strcmp( op, "stop" ) == 0 ) {
+
+ const char *name = MHD_lookup_connection_value( connection, MHD_GET_ARGUMENT_KIND, "name" );
+
+ if( name != NULL ) {
+
+ printf( "Got POST parameter for stopping a worker with name '%s'\n", name );
+
+ int res = master_stop_worker( name );
+
+ if( res < 0 ) {
+ snprintf( biruda_msg, sizeof( biruda_msg ), "Queueing stop request message failed\n" );
+ } else {
+ snprintf( biruda_msg, sizeof( biruda_msg ), "Queued stop worker request\n" );
+ }
+
+ response = MHD_create_response_from_buffer( strlen( biruda_msg ),
+ (void *)biruda_msg, MHD_RESPMEM_MUST_COPY );
+ ret = MHD_queue_response( connection, ( res == 0 ) ? MHD_HTTP_OK : MHD_HTTP_INTERNAL_SERVER_ERROR, response );
+ MHD_destroy_response( response );
+ }
+
} else {
return MHD_NO;
}
diff --git a/src/worker.c b/src/worker.c
index c347ee8..1bd5eb8 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -211,6 +211,15 @@ void worker_terminate( worker_t *worker )
{
direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)worker->execution_data;
+ switch( worker->state ) {
+ case WORKER_STATE_RUNNING:
+ break;
+
+ case WORKER_STATE_STOPPED:
+ fprintf( stderr, "worker %s is already stopped\n", worker->name );
+ return;
+ }
+
#ifndef _WIN32
(void)kill( wed->pid, SIGTERM );
#else