#include "coordinator.h" #include "worker.h" #include "pthread.h" #include "nanomsg/nn.h" #include "nanomsg/survey.h" #include "json-c/json.h" #include #include #ifndef _WIN32 #include #endif #include "system.h" #include "port.h" static pthread_t coordinator_thread; static int coordinator_sock; static int coordinator_must_terminate = 0; int nof_workers = 0; worker_t worker[MAX_WORKERS]; // TODO: also here: tons of copy paste! static char *create_worker_started_answer( const char *name, bool worker_found ) { json_object *obj = json_object_new_object( ); json_object *op = json_object_new_string( "started" ); json_object_object_add( obj, "op", op ); /* we are a coordinator */ json_object *role = json_object_new_string( "coordinator" ); json_object_object_add( obj, "role", role ); /* return hostname for unique identification (maybe better * or additionally the MAC of the first interface or hostid?) */ char hostname[100]; system_hostname( hostname, sizeof( hostname ) ); json_object *host = json_object_new_string( hostname ); json_object_object_add( obj, "host", host ); json_object *worker = json_object_new_string( name ); json_object_object_add( obj, "worker", worker ); json_object *found = json_object_new_boolean( worker_found ); json_object_object_add( obj, "found", found ); /* produce message as string, caller must free it */ const char *msg = json_object_to_json_string( obj ); char *res = strdup( msg ); json_object_put( obj ); return res; } static char *create_worker_stopped_answer( const char *name, bool worker_found ) { json_object *obj = json_object_new_object( ); json_object *op = json_object_new_string( "stopped" ); json_object_object_add( obj, "op", op ); /* we are a coordinator */ json_object *role = json_object_new_string( "coordinator" ); json_object_object_add( obj, "role", role ); /* return hostname for unique identification (maybe better * or additionally the MAC of the first interface or hostid?) */ char hostname[100]; system_hostname( hostname, sizeof( hostname ) ); json_object *host = json_object_new_string( hostname ); json_object_object_add( obj, "host", host ); json_object *worker = json_object_new_string( name ); json_object_object_add( obj, "worker", worker ); json_object *found = json_object_new_boolean( worker_found ); json_object_object_add( obj, "found", found ); /* produce message as string, caller must free it */ const char *msg = json_object_to_json_string( obj ); char *res = strdup( msg ); json_object_put( obj ); return res; } static char *create_register_answer( ) { json_object *obj = json_object_new_object( ); json_object *op = json_object_new_string( "register" ); json_object_object_add( obj, "op", op ); /* we are a coordinator */ json_object *role = json_object_new_string( "coordinator" ); json_object_object_add( obj, "role", role ); /* return hostname for unique identification (maybe better * or additionally the MAC of the first interface or hostid?) */ char hostname[100]; system_hostname( hostname, sizeof( hostname ) ); json_object *host = json_object_new_string( hostname ); json_object_object_add( obj, "host", host ); /* number of CPUs, important so we don't overload the * server (TODO: should maybe be configurable how many * CPUs we want to use in absolute, percentage use or * leave free */ unsigned int nofCpus = system_available_cpus( ); json_object *cpus = json_object_new_int( nofCpus ); json_object_object_add( obj, "cpus", cpus ); /* physically installed memory in kBytes */ unsigned int nofPhysMem = system_phys_memory( ); json_object *physMem = json_object_new_int( nofPhysMem ); json_object_object_add( obj, "physical_memory", physMem ); /* Operating system running on the coordinator, this is * important so we know what we can run on the system * directly */ char os_name[100]; system_os( os_name, sizeof( os_name ) ); json_object *os = json_object_new_string( os_name ); json_object_object_add( obj, "os", os ); /* system architecture, important whether we can run chrooted * environments as workers directly without an virtualizer */ char machine_arch[100]; system_arch( machine_arch, sizeof( machine_arch ) ); json_object *arch = json_object_new_string( machine_arch ); json_object_object_add( obj, "arch", arch ); json_object *worker_array_obj = json_object_new_array( ); for( int i = 0; i < nof_workers; i++ ) { worker_t *w = &worker[i]; json_object *worker_obj = json_object_new_object( ); json_object *name_obj = json_object_new_string( w->name ); json_object_object_add( worker_obj, "name", name_obj ); json_object *mode_obj = json_object_new_string( worker_exection_mode_str( w->mode ) ); json_object_object_add( worker_obj, "mode", mode_obj ); if( w->mode != WORKER_EXECUTION_DISABLED ) { json_object *command_obj = json_object_new_string( w->command ); json_object_object_add( worker_obj, "command", command_obj ); } json_object *state_obj = json_object_new_string( worker_state_str( w->state ) ); json_object_object_add( worker_obj, "state", state_obj ); json_object_array_add( worker_array_obj, worker_obj ); } json_object_object_add( obj, "workers", worker_array_obj ); /* produce message as string, caller must free it */ const char *msg = json_object_to_json_string( obj ); char *res = strdup( msg ); json_object_put( obj ); return res; } static int coordinator_start_worker( const char *name, bool *found ) { worker_t *w = NULL; worker_t *wfound = NULL; *found = false; for( int i = 0; i < nof_workers; i++ ) { w = &worker[i]; if( strcmp( name, w->name ) == 0 ) { *found = true; wfound = w; break; } } if( !( *found ) ) { return 0; } printf( "STARTING WORKER '%s'\n", wfound->name ); worker_init( wfound ); return 0; } static int coordinator_stop_worker( const char *name, bool *found ) { worker_t *w = NULL; worker_t *wfound = NULL; *found = false; for( int i = 0; i < nof_workers; i++ ) { w = &worker[i]; if( strcmp( name, w->name ) == 0 ) { *found = true; wfound = w; break; } } if( !( *found ) ) { return 0; } printf( "STOPPING WORKER '%s'\n", wfound->name ); worker_terminate( wfound ); return 0; } static void *coordinator_func( void *thread_data ) { char *control = (char *)thread_data; coordinator_sock = nn_socket( AF_SP, NN_RESPONDENT ); (void)nn_connect( coordinator_sock, control ); printf( "coordinator connected to %s\n", control ); while( !coordinator_must_terminate ) { printf( "coordinator idle: %d\n", coordinator_must_terminate ); char *answer = NULL; int bytes = nn_recv( coordinator_sock, &answer, NN_MSG, 0 ); if( coordinator_must_terminate ) continue; if( bytes >= 0 ) { json_object *recv_obj = json_tokener_parse( answer ); const char *recv_obj_str = json_object_to_json_string( recv_obj ); printf( "coordinator received: %s\n", recv_obj_str ); json_object *op_obj; json_object_object_get_ex( recv_obj, "op", &op_obj ); const char *op = json_object_get_string( op_obj ); if( strcmp( op, "discover" ) == 0 ) { char *msg = create_register_answer( ); size_t msg_size = strlen( msg ) + 1; printf( "coordinator send: %s\n", msg ); bytes = nn_send( coordinator_sock, msg, msg_size, 0 ); if( bytes < 0 ) { if( errno == ETERM ) { coordinator_must_terminate = 1; continue; } else { fprintf( stderr, "ERROR: nn_send returned %d: %s (%d)\n", bytes, nn_strerror( errno ), errno ); } } if( bytes != msg_size ) { fprintf( stderr, "ERROR: truncated message!" ); } free( msg ); } else if( strcmp( op, "start" ) == 0 ) { json_object *worker_obj; json_object_object_get_ex( recv_obj, "worker", &worker_obj ); const char *worker = json_object_get_string( worker_obj ); bool found; /*int res = */ (void)coordinator_start_worker( worker, &found ); char *msg = create_worker_started_answer( worker, found ); size_t msg_size = strlen( msg ) + 1; // TODO: tons of copy-paste code here, think! printf( "coordinator send: %s\n", msg ); bytes = nn_send( coordinator_sock, msg, msg_size, 0 ); if( bytes < 0 ) { if( errno == ETERM ) { coordinator_must_terminate = 1; continue; } else { fprintf( stderr, "ERROR: nn_send returned %d: %s (%d)\n", bytes, nn_strerror( errno ), errno ); } } if( bytes != msg_size ) { fprintf( stderr, "ERROR: truncated message!" ); } free( msg ); } else if( strcmp( op, "stop" ) == 0 ) { json_object *worker_obj; json_object_object_get_ex( recv_obj, "worker", &worker_obj ); const char *worker = json_object_get_string( worker_obj ); bool found; /*int res = */ (void)coordinator_stop_worker( worker, &found ); char *msg = create_worker_stopped_answer( worker, found ); size_t msg_size = strlen( msg ) + 1; // TODO: tons of copy-paste code here, think! printf( "coordinator send: %s\n", msg ); bytes = nn_send( coordinator_sock, msg, msg_size, 0 ); if( bytes < 0 ) { if( errno == ETERM ) { coordinator_must_terminate = 1; continue; } else { fprintf( stderr, "ERROR: nn_send returned %d: %s (%d)\n", bytes, nn_strerror( errno ), errno ); } } if( bytes != msg_size ) { fprintf( stderr, "ERROR: truncated message!" ); } free( msg ); } json_object_put( recv_obj ); nn_freemsg( answer ); } if( bytes < 0 ) { if( errno == EAGAIN || errno == EINTR ) { continue; } else if( errno == ETERM ) { coordinator_must_terminate = 1; continue; } else { fprintf( stderr, "ERROR: nn_recv returned %d: %s (%d)\n", bytes, nn_strerror( errno ), errno ); } } } (void)nn_shutdown( coordinator_sock, 0 ); puts( "coordinator disconnected" ); return NULL; } int coordinator_init( const char *control ) { pthread_attr_t attr; int res; coordinator_must_terminate = 0; nof_workers = 0; res = pthread_attr_init( &attr ); if( res != 0 ) { return 1; } res = pthread_create( &coordinator_thread, &attr, coordinator_func, (void *)control ); if( res != 0 ) { return 1; } return 0; } void coordinator_terminate( ) { coordinator_must_terminate = 1; nn_term( ); } static void free_workers( ) { for( int i = 0; i < nof_workers; i++ ) { worker_t *w = &worker[i]; free( w->name ); free( w->command ); if( w->execution_data ) { free( w->execution_data ); } } } int coordinator_free( ) { void *result; int res; res = pthread_join( coordinator_thread, &result ); if( res != 0 ) { return 1; } free_workers( ); return 0; } int coordinator_add_worker( const char *name, worker_execution_mode_t mode, const char *command, const char *control, const char *data ) { if( nof_workers >= MAX_WORKERS ) { fprintf( stderr, "Can't define more workers, limit reached!\n" ); return -1; } worker_t *w = &worker[nof_workers]; w->name = strdup( name ); w->mode = mode; if( command != NULL ) { w->command = strdup( command ); } else { w->mode = WORKER_EXECUTION_DISABLED; } w->state = WORKER_STATE_STOPPED; w->execution_data = NULL; w->control = control; w->data = data; nof_workers++; return 0; }