#include "coordinator.h" #include "pthread.h" #include "nanomsg/nn.h" #include "nanomsg/survey.h" #include "json-c/json.h" #include #include #ifdef _WIN32 #define WIN32_MEAN_AND_LEAN #include #else #include #endif #include "sleep.h" #include "system.h" static pthread_t coordinator_thread; static int coordinator_sock; static int coordinator_must_terminate = 0; static void *coordinator_func( void *thread_data ) { char *control = (char *)thread_data; coordinator_sock = nn_socket( AF_SP, NN_RESPONDENT ); (void)nn_connect( coordinator_sock, control ); printf( "coordinator connected to %s\n", control ); sleep( 1 ); while( !coordinator_must_terminate ) { printf( "coordinator idle: %d\n", coordinator_must_terminate ); sleep( 1 ); char *answer = NULL; int bytes = nn_recv( coordinator_sock, &answer, NN_MSG, 0 ); if( coordinator_must_terminate ) continue; if( bytes >= 0 ) { printf( "coordinator received: %s\n", answer ); nn_freemsg( answer ); json_object *obj = json_object_new_object( ); json_object *op = json_object_new_string( "register" ); json_object_object_add( obj, "op", op ); json_object *role = json_object_new_string( "coordinator" ); json_object_object_add( obj, "role", role ); char hostname[100]; gethostname( hostname, 100 ); json_object *host = json_object_new_string( hostname ); json_object_object_add( obj, "host", host ); unsigned int nofCpus = available_cpus( ); json_object *cpus = json_object_new_int( nofCpus ); json_object_object_add( obj, "cpus", cpus ); const char *msg = json_object_to_json_string( obj ); int msg_size = strlen( msg ) + 1; printf( "coordinator send: %s\n", msg ); bytes = nn_send( coordinator_sock, msg, msg_size, 0 ); if( bytes < 0 ) { if( errno == ETERM ) { coordinator_must_terminate = 1; continue; } else { fprintf( stderr, "ERROR: nn_send returned %d: %s (%d)\n", bytes, nn_strerror( errno ), errno ); } } if( bytes != msg_size ) { fprintf( stderr, "ERROR: truncated message!" ); } json_object_put( obj ); } if( bytes < 0 ) { if( errno == EAGAIN || errno == EINTR ) { continue; } else if( errno == ETERM ) { coordinator_must_terminate = 1; continue; } else { fprintf( stderr, "ERROR: nn_recv returned %d: %s (%d)\n", bytes, nn_strerror( errno ), errno ); } } } (void)nn_shutdown( coordinator_sock, 0 ); puts( "coordinator disconnected" ); return NULL; } int coordinator_init( const char *control ) { pthread_attr_t attr; int res; coordinator_must_terminate = 0; res = pthread_attr_init( &attr ); if( res != 0 ) { return 1; } res = pthread_create( &coordinator_thread, &attr, coordinator_func, (void *)control ); if( res != 0 ) { return 1; } return 0; } void coordinator_terminate( ) { coordinator_must_terminate = 1; nn_term( ); } int coordinator_free( ) { void *result; int res; res = pthread_join( coordinator_thread, &result ); if( res != 0 ) { return 1; } return 0; }