diff options
author | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-04 19:35:19 +0100 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2014-12-04 19:35:19 +0100 |
commit | aad5ac1625e6830bd63b57f646479883f06f7d88 (patch) | |
tree | 95561271f1981b214e7dec675e24cd107ee289f4 /src | |
parent | 720079063a4e97794dada308faac5834c0c2b067 (diff) | |
download | biruda-aad5ac1625e6830bd63b57f646479883f06f7d88.tar.gz biruda-aad5ac1625e6830bd63b57f646479883f06f7d88.tar.bz2 |
added stopping of worker
Diffstat (limited to 'src')
-rw-r--r-- | src/coordinator.c | 89 | ||||
-rw-r--r-- | src/master.c | 27 | ||||
-rw-r--r-- | src/master.h | 1 | ||||
-rw-r--r-- | src/webserver.c | 28 | ||||
-rw-r--r-- | src/worker.c | 9 |
5 files changed, 153 insertions, 1 deletions
diff --git a/src/coordinator.c b/src/coordinator.c index 30afb62..1366d88 100644 --- a/src/coordinator.c +++ b/src/coordinator.c @@ -61,6 +61,39 @@ static char *create_worker_started_answer( const char *name, bool worker_found ) return res; } +static char *create_worker_stopped_answer( const char *name, bool worker_found ) +{ + json_object *obj = json_object_new_object( ); + json_object *op = json_object_new_string( "stopped" ); + json_object_object_add( obj, "op", op ); + + /* we are a coordinator */ + json_object *role = json_object_new_string( "coordinator" ); + json_object_object_add( obj, "role", role ); + + /* return hostname for unique identification (maybe better + * or additionally the MAC of the first interface or hostid?) + */ + char hostname[100]; + gethostname( hostname, sizeof( hostname ) ); + json_object *host = json_object_new_string( hostname ); + json_object_object_add( obj, "host", host ); + + json_object *worker = json_object_new_string( name ); + json_object_object_add( obj, "worker", worker ); + + json_object *found = json_object_new_boolean( worker_found ); + json_object_object_add( obj, "found", found ); + + /* produce message as string, caller must free it */ + const char *msg = json_object_to_json_string( obj ); + char *res = strdup( msg ); + + json_object_put( obj ); + + return res; +} + static char *create_register_answer( ) { json_object *obj = json_object_new_object( ); @@ -162,6 +195,32 @@ static int coordinator_start_worker( const char *name, bool *found ) return 0; } +static int coordinator_stop_worker( const char *name, bool *found ) +{ + worker_t *w = NULL; + worker_t *wfound = NULL; + *found = false; + for( int i = 0; i < nof_workers; i++ ) { + w = &worker[i]; + if( strcmp( name, w->name ) == 0 ) { + *found = true; + wfound = w; + break; + } + } + + if( !( *found ) ) { + return 0; + } + + printf( "STOPPING WORKER '%s'\n", wfound->name ); + + worker_terminate( wfound ); + + return 0; +} + + static void *coordinator_func( void *thread_data ) { char *control = (char *)thread_data; @@ -240,6 +299,36 @@ static void *coordinator_func( void *thread_data ) } free( msg ); + } else if( strcmp( op, "stop" ) == 0 ) { + + json_object *worker_obj; + json_object_object_get_ex( recv_obj, "worker", &worker_obj ); + const char *worker = json_object_get_string( worker_obj ); + + bool found; + /*int res = */ + (void)coordinator_stop_worker( worker, &found ); + + char *msg = create_worker_stopped_answer( worker, found ); + size_t msg_size = strlen( msg ) + 1; + + // TODO: tons of copy-paste code here, think! + printf( "coordinator send: %s\n", msg ); + bytes = nn_send( coordinator_sock, msg, msg_size, 0 ); + if( bytes < 0 ) { + if( errno == ETERM ) { + coordinator_must_terminate = 1; + continue; + } else { + fprintf( stderr, "ERROR: nn_send returned %d: %s (%d)\n", + bytes, nn_strerror( errno ), errno ); + } + } + if( bytes != msg_size ) { + fprintf( stderr, "ERROR: truncated message!" ); + } + + free( msg ); } json_object_put( recv_obj ); diff --git a/src/master.c b/src/master.c index d7bc64c..9a35a73 100644 --- a/src/master.c +++ b/src/master.c @@ -217,6 +217,25 @@ static char *create_start_request( const char *name ) return res; } +static char *create_stop_request( const char *name ) +{ + json_object *obj = json_object_new_object( ); + json_object *op = json_object_new_string( "stop" ); + json_object_object_add( obj, "op", op ); + json_object *role = json_object_new_string( "master" ); + json_object_object_add( obj, "role", role ); + json_object *worker = json_object_new_string( name ); + json_object_object_add( obj, "worker", worker ); + + /* produce message as string, caller must free it */ + const char *msg = json_object_to_json_string( obj ); + char *res = alloc_message( msg ); + + json_object_put( obj ); + + return res; +} + #define MAX_NOF_COMMANDS 12 static int nof_queued_commands = 0; @@ -408,3 +427,11 @@ int master_start_worker( const char *name ) return res; } + +int master_stop_worker( const char *name ) +{ + char *msg = create_stop_request( name ); + int res = enqueue_request( msg ); + + return res; +} diff --git a/src/master.h b/src/master.h index cdbb621..60d1061 100644 --- a/src/master.h +++ b/src/master.h @@ -9,6 +9,7 @@ void master_terminate( int terminate_nano_msg ); int master_free( ); int master_start_worker( const char *name ); +int master_stop_worker( const char *name ); #define MAX_COORDINATORS 128 diff --git a/src/webserver.c b/src/webserver.c index 01dcd51..312105a 100644 --- a/src/webserver.c +++ b/src/webserver.c @@ -65,7 +65,11 @@ static int handle_request( void *cls, struct MHD_Connection *connection, printf( "Got POST operation '%s'\n", op ); - if( op != NULL && strcmp( op, "start" ) == 0 ) { + if( op == NULL ) { + return MHD_NO; + } + + if( strcmp( op, "start" ) == 0 ) { const char *name = MHD_lookup_connection_value( connection, MHD_GET_ARGUMENT_KIND, "name" ); @@ -86,6 +90,28 @@ static int handle_request( void *cls, struct MHD_Connection *connection, ret = MHD_queue_response( connection, ( res == 0 ) ? MHD_HTTP_OK : MHD_HTTP_INTERNAL_SERVER_ERROR, response ); MHD_destroy_response( response ); } + } else if( strcmp( op, "stop" ) == 0 ) { + + const char *name = MHD_lookup_connection_value( connection, MHD_GET_ARGUMENT_KIND, "name" ); + + if( name != NULL ) { + + printf( "Got POST parameter for stopping a worker with name '%s'\n", name ); + + int res = master_stop_worker( name ); + + if( res < 0 ) { + snprintf( biruda_msg, sizeof( biruda_msg ), "Queueing stop request message failed\n" ); + } else { + snprintf( biruda_msg, sizeof( biruda_msg ), "Queued stop worker request\n" ); + } + + response = MHD_create_response_from_buffer( strlen( biruda_msg ), + (void *)biruda_msg, MHD_RESPMEM_MUST_COPY ); + ret = MHD_queue_response( connection, ( res == 0 ) ? MHD_HTTP_OK : MHD_HTTP_INTERNAL_SERVER_ERROR, response ); + MHD_destroy_response( response ); + } + } else { return MHD_NO; } diff --git a/src/worker.c b/src/worker.c index c347ee8..1bd5eb8 100644 --- a/src/worker.c +++ b/src/worker.c @@ -211,6 +211,15 @@ void worker_terminate( worker_t *worker ) { direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)worker->execution_data; + switch( worker->state ) { + case WORKER_STATE_RUNNING: + break; + + case WORKER_STATE_STOPPED: + fprintf( stderr, "worker %s is already stopped\n", worker->name ); + return; + } + #ifndef _WIN32 (void)kill( wed->pid, SIGTERM ); #else |