#include "master.h" #include "pthread.h" #include "nanomsg/nn.h" #include "nanomsg/survey.h" #include "nanomsg/pipeline.h" #include "json-c/json.h" #include #include #include #ifndef _WIN32 #include #endif static pthread_t master_thread; static int control_sock; static int data_sock; static int master_must_terminate = 0; coordinator_t coordinator[MAX_COORDINATORS]; static void recompute_coordinator_states( void ) { time_t now; time( &now ); for( int pos = 0; pos < MAX_COORDINATORS; pos++ ) { coordinator_t *c = &coordinator[pos]; if( c->used ) { double d = difftime( now, c->lastAlive ); fprintf( stderr, "%s %lld %lld %g\n", c->host, (long long)c->lastAlive, (long long)now, d ); if( difftime( now, c->lastAlive ) > MAX_COORDINATOR_AGE ) { c->alive = false; } } } } static void update_workers( coordinator_t *coord, json_object *obj ) { json_object *worker_array; json_object_object_get_ex( obj, "workers", &worker_array ); coord->nof_workers = json_object_array_length( worker_array ); for( int i = 0; i < coord->nof_workers; i++ ) { json_object *worker_obj; worker_obj= json_object_array_get_idx( worker_array, i ); json_object *name_obj; json_object_object_get_ex( worker_obj, "name", &name_obj ); const char *name = json_object_get_string( name_obj ); json_object *mode_obj; json_object_object_get_ex( worker_obj, "mode", &mode_obj ); const char *mode = json_object_get_string( mode_obj ); json_object *command_obj; json_object_object_get_ex( worker_obj, "command", &command_obj ); const char *command = json_object_get_string( command_obj ); json_object *state_obj; json_object_object_get_ex( worker_obj, "state", &state_obj ); for( int j = 0; j < coord->nof_workers; j++ ) { worker_t *w = &coord->worker[j]; if( strcmp( w->name, name ) == 0 ) { w->mode = worker_execution_mode_from_str( mode ); if( command != NULL ) { w->command = strdup( command ); } else { w->mode = WORKER_EXECUTION_DISABLED; } w->state = worker_state_from_str( json_object_get_string( state_obj ) ); } else { // TODO: add new worker } } } // TODO: check for deleted workers } typedef struct { FILE *spool_file; off_t read_pos; } execution_worker_data_t; static int register_coordinator( json_object *obj ) { json_object *host_obj; json_object_object_get_ex( obj, "host", &host_obj ); const char *host = json_object_get_string( host_obj ); json_object *os_obj; json_object_object_get_ex( obj, "os", &os_obj ); const char *os = json_object_get_string( os_obj ); json_object *arch_obj; json_object_object_get_ex( obj, "arch", &arch_obj ); const char *arch = json_object_get_string( arch_obj ); json_object *cpus_obj; json_object_object_get_ex( obj, "cpus", &cpus_obj ); unsigned int cpus = json_object_get_int( cpus_obj ); int pos; for( pos = 0; pos < MAX_COORDINATORS; pos++ ) { coordinator_t *coord = &coordinator[pos]; if( coord->used && strcmp( coord->host, host ) == 0 ) { time( &coord->lastAlive ); coord->alive = true; update_workers( coord, obj ); fprintf( stderr, "master: already registered coordinator from host '%s' (%d)\n", host, pos ); return 0; } } for( pos = 0; pos < MAX_COORDINATORS && coordinator[pos].used; pos++ ); if( pos == MAX_COORDINATORS ) { fprintf( stderr, "Can't accept more coordinators, limit reached!\n" ); return -1; } coordinator_t *coord = &coordinator[pos]; printf( "master: registering coordinator from host '%s' (%d)\n", host, pos ); coord->host = strdup( host ); coord->os = strdup( os ); coord->arch = strdup( arch ); coord->cpus = cpus; coord->alive = true; time( &coord->lastAlive ); coord->used = true; json_object *worker_array; json_object_object_get_ex( obj, "workers", &worker_array ); coord->nof_workers = json_object_array_length( worker_array ); if( coord->nof_workers >= MAX_WORKERS ) { fprintf( stderr, "Can't define more workers per coordinator, limit reached!\n" ); return -1; } for( int i = 0; i < coord->nof_workers; i++ ) { json_object *worker_obj; worker_obj = json_object_array_get_idx( worker_array, i ); json_object *name_obj; json_object_object_get_ex( worker_obj, "name", &name_obj ); const char *name = json_object_get_string( name_obj ); json_object *mode_obj; json_object_object_get_ex( worker_obj, "mode", &mode_obj ); const char *mode = json_object_get_string( mode_obj ); json_object *command_obj; json_object_object_get_ex( worker_obj, "command", &command_obj ); const char *command = json_object_get_string( command_obj ); worker_t *w = &coord->worker[i]; w->name = strdup( name ); w->mode = worker_execution_mode_from_str( mode ); if( command != NULL ) { w->command = strdup( command ); } else { w->mode = WORKER_EXECUTION_DISABLED; } w->execution_data = malloc( sizeof( execution_worker_data_t ) ); execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data; wed->spool_file = NULL; wed->read_pos = 0; json_object *state_obj; json_object_object_get_ex( worker_obj, "state", &state_obj ); w->state = worker_state_from_str( json_object_get_string( state_obj ) ); } return 0; } static char *alloc_message( const char *msg ) { char *buf = strdup( msg ); // crashes in survey mode! //~ size_t len = strlen( msg ); //~ char *buf = (char *)nn_allocmsg( len + 1, 0 ); //~ strncpy( buf, msg, len ); return buf; } static char *create_discover_request( ) { json_object *obj = json_object_new_object( ); json_object *op = json_object_new_string( "discover" ); json_object_object_add( obj, "op", op ); json_object *role = json_object_new_string( "master" ); json_object_object_add( obj, "role", role ); /* produce message as string, caller must free it */ const char *msg = json_object_to_json_string( obj ); char *res = alloc_message( msg ); json_object_put( obj ); return res; } static char *create_start_request( const char *name ) { json_object *obj = json_object_new_object( ); json_object *op = json_object_new_string( "start" ); json_object_object_add( obj, "op", op ); json_object *role = json_object_new_string( "master" ); json_object_object_add( obj, "role", role ); json_object *worker = json_object_new_string( name ); json_object_object_add( obj, "worker", worker ); /* produce message as string, caller must free it */ const char *msg = json_object_to_json_string( obj ); char *res = alloc_message( msg ); json_object_put( obj ); return res; } static char *create_stop_request( const char *name ) { json_object *obj = json_object_new_object( ); json_object *op = json_object_new_string( "stop" ); json_object_object_add( obj, "op", op ); json_object *role = json_object_new_string( "master" ); json_object_object_add( obj, "role", role ); json_object *worker = json_object_new_string( name ); json_object_object_add( obj, "worker", worker ); /* produce message as string, caller must free it */ const char *msg = json_object_to_json_string( obj ); char *res = alloc_message( msg ); json_object_put( obj ); return res; } #define MAX_NOF_COMMANDS 12 static int nof_queued_commands = 0; char *queued_command[MAX_NOF_COMMANDS]; static pthread_mutex_t queued_command_mutex; static int enqueue_request( char *cmd ) { pthread_mutex_lock( &queued_command_mutex ); if( nof_queued_commands >= MAX_NOF_COMMANDS ) { fprintf( stderr, "ERROR: internal command queue overflow!\n" ); pthread_mutex_unlock( &queued_command_mutex ); return -1; } queued_command[nof_queued_commands] = cmd; nof_queued_commands++; pthread_mutex_unlock( &queued_command_mutex ); return 0; } static char *get_next_request( ) { pthread_mutex_lock( &queued_command_mutex ); char *cmd; if( nof_queued_commands > 0 ) { cmd = queued_command[0]; --nof_queued_commands; memmove( &queued_command[0], &queued_command[1], nof_queued_commands * sizeof( char * ) ); pthread_mutex_unlock( &queued_command_mutex ); return cmd; } else { pthread_mutex_unlock( &queued_command_mutex ); return create_discover_request( ); } } typedef struct { const char *control; const char *data; const char *spool_dir; } master_thread_data_t; static master_thread_data_t master_thread_data; static worker_t *worker_by_name( const char *name ) { for( int c = 0; c < MAX_COORDINATORS; c++ ) { if( coordinator[c].used ) { for( int i = 0; i < coordinator[c].nof_workers; i++ ) { worker_t *w = &coordinator[c].worker[i]; if( strcmp( name, w->name ) == 0 ) { return w; } } } } return NULL; } static int master_output_init( const char *spool_dir, const char *name ) { char filename[1024]; worker_t *worker = worker_by_name( name ); if( worker == NULL ) return 0; snprintf( filename, sizeof( filename ), "%s%c%s.output", spool_dir, PORT_DIR_SEPARATOR, worker->name ); execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data; if( wed->spool_file != NULL ) { fprintf( stderr, "WARNING: truncating old output file of worker '%s'\n", name ); } unlink( filename ); wed->spool_file = fopen( filename, "a+" ); wed->read_pos = 0; return 0; } static int master_output_free( const char *name ) { worker_t *worker = worker_by_name( name ); if( worker == NULL ) return 0; execution_worker_data_t *wed = (execution_worker_data_t *)worker->execution_data; if( wed->spool_file != NULL ) { long end_pos = ftell( wed->spool_file ); if( wed->read_pos >= end_pos ) { fclose( wed->spool_file ); wed->spool_file = NULL; } } return 0; } static int master_output_write( const char *spool_dir, const char *name, json_object *obj ) { worker_t *w = worker_by_name( name ); if( w == NULL ) return 0; execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data; if( wed->spool_file == NULL ) { master_output_init( spool_dir, name ); if( wed->spool_file == NULL ) return 0; } json_object *msg_obj; json_object_object_get_ex( obj, "msg", &msg_obj ); const char *msg = json_object_get_string( msg_obj ); json_object *stream_obj; json_object_object_get_ex( obj, "stream", &stream_obj ); const char *stream = json_object_get_string( stream_obj ); json_object *ts_obj; json_object_object_get_ex( obj, "timestamp", &ts_obj ); time_t ts = json_object_get_int( ts_obj ); char line[1024]; snprintf( line, sizeof( line ), "%lu %s %s %s", ts, name, stream, msg ); fputs( line, wed->spool_file ); fflush( wed->spool_file ); return 0; } static int master_output_write_started( const char *spool_dir, const char *name, json_object *obj ) { worker_t *w = worker_by_name( name ); if( w == NULL ) return 0; execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data; if( wed->spool_file == NULL ) { master_output_init( spool_dir, name ); if( wed->spool_file == NULL ) return 0; } json_object *pid_obj; json_object_object_get_ex( obj, "pid", &pid_obj ); const char *pid_str = json_object_get_string( pid_obj ); json_object *ts_obj; json_object_object_get_ex( obj, "timestamp", &ts_obj ); time_t ts = json_object_get_int( ts_obj ); char line[1024]; snprintf( line, sizeof( line ), "%lu %s --- STARTED %s ---\n", ts, name, pid_str ); fputs( line, wed->spool_file ); fflush( wed->spool_file ); return 0; } static int master_output_write_terminated( const char *spool_dir, const char *name, json_object *obj ) { worker_t *w = worker_by_name( name ); if( w == NULL ) return 0; execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data; if( wed->spool_file == NULL ) { master_output_init( spool_dir, name ); if( wed->spool_file == NULL ) return 0; } json_object *pid_obj; json_object_object_get_ex( obj, "pid", &pid_obj ); const char *pid_str = json_object_get_string( pid_obj ); json_object *ts_obj; json_object_object_get_ex( obj, "timestamp", &ts_obj ); time_t ts = json_object_get_int( ts_obj ); json_object *exit_code_obj; json_object_object_get_ex( obj, "exit_status", &exit_code_obj ); int exit_code = json_object_get_int( exit_code_obj ); json_object *exit_message_obj; json_object_object_get_ex( obj, "exit_message", &exit_message_obj ); const char *exit_message = json_object_get_string( exit_message_obj ); char line[1024]; snprintf( line, sizeof( line ), "%lu %s --- STOPPED %s (%d, %s)---\n", ts, name, pid_str, exit_code, exit_message ); fputs( line, wed->spool_file ); fflush( wed->spool_file ); return 0; } void master_output_outstanding_messages( const char *name, char *s, size_t len ) { worker_t *w = worker_by_name( name ); *s = '\0'; if( w == NULL ) return; execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data; if( wed->spool_file == NULL ) return; char *cur = s; char *end = s + len; long end_pos = ftell( wed->spool_file ); if( len < end_pos - wed->read_pos + 3 ) { wed->read_pos = end_pos - len + 3; cur += snprintf( cur, end - cur, "..\n" ); } fseek( wed->spool_file, wed->read_pos, SEEK_SET ); char line[1024]; while( wed->read_pos < end_pos ) { fgets( line, sizeof( line ), wed->spool_file ); cur += snprintf( cur, end - cur, "%s", line ); wed->read_pos += strlen( line ); } } void master_output_tail( const char *name, char *s, size_t len ) { worker_t *w = worker_by_name( name ); *s = '\0'; if( w == NULL ) return; execution_worker_data_t *wed = (execution_worker_data_t *)w->execution_data; if( wed->spool_file == NULL ) return; long end_pos = ftell( wed->spool_file ); fseek( wed->spool_file, 0, SEEK_SET ); long pos = 0; char *last_lines[25]; memset( last_lines, 0, sizeof( last_lines ) ); int last_lines_pos = 0; char line[1024]; while( pos < end_pos ) { fgets( line, sizeof( line ), wed->spool_file ); if( last_lines[last_lines_pos] != NULL ) { free( last_lines[last_lines_pos] ); } last_lines[last_lines_pos] = strdup( line ); last_lines_pos++; if( last_lines_pos >= 25 ) { last_lines_pos = 0; } pos += strlen( line ); } char *cur = s; char *end = s + len; for( int i = last_lines_pos; i < 25; i++ ) { if( last_lines[i] != NULL ) { cur += snprintf( cur, end - cur, "%s", last_lines[i] ); } } for( int i = 0; i < last_lines_pos; i++ ) { if( last_lines[i] != NULL ) { cur += snprintf( cur, end - cur, "%s", last_lines[i] ); } } } static void *master_func( void *thread_data ) { master_thread_data_t *tdata = (master_thread_data_t *)thread_data; const char *control = tdata->control; const char *data = tdata->data; pthread_mutex_init( &queued_command_mutex, NULL ); if( ( control_sock = nn_socket( AF_SP, NN_SURVEYOR ) ) < 0 ) { fprintf( stderr, "master, nn_socket( AF_SP, NN_SURVEYOR ) error: %s (%d)\n", strerror( errno ), errno ); abort( ); } int deadline = 5000; if( nn_setsockopt( control_sock, NN_SURVEYOR, NN_SURVEYOR_DEADLINE, &deadline, sizeof( deadline ) ) < 0 ) { fprintf( stderr, "master, nn_setsockopt error: %s (%d)\n", strerror( errno ), errno ); abort( ); } (void)nn_bind( control_sock, control ); printf( "master connected to control channel '%s'\n", control ); if( ( data_sock = nn_socket( AF_SP, NN_PULL ) ) < 0 ) { fprintf( stderr, "master, nn_socket( AF_SP, NN_PULL ) error: %s (%d)\n", strerror( errno ), errno ); abort( ); } (void)nn_bind( data_sock, data ); printf( "master connected to data channel '%s'\n", data ); NEXT_COMMAND: ; char *msg = get_next_request( ); int msg_size = strlen( msg ) + 1; printf( "master send: %s\n", msg ); int bytes = nn_send( control_sock, msg, msg_size /* NN_MSG */, 0 ); if( bytes < 0 ) { if( errno == ETERM ) { master_must_terminate = 1; } else { fprintf( stderr, "ERROR: nn_send returned %d: %s (%d)\n", bytes, nn_strerror( errno ), errno ); } } else if( bytes != msg_size ) { fprintf( stderr, "ERROR: truncated message!" ); } // breaks in coordinator, so the memory is passed between threads? free( msg ); msg = NULL; while( !master_must_terminate ) { printf( "master idle: %d\n", master_must_terminate ); recompute_coordinator_states( ); struct nn_pollfd pollfd[2]; pollfd[0].fd = control_sock; pollfd[0].events = NN_POLLIN; pollfd[1].fd = data_sock; pollfd[1].events = NN_POLLIN; int rc = nn_poll( pollfd, 2, 5000 ); if( rc == 0 ) { // timeout, next iteration goto NEXT_COMMAND; } else if( rc < 0 ) { fprintf( stderr, "master, nn_poll error: %s (%d)\n", strerror( errno ), errno ); abort( ); } int sock; if( pollfd[1].revents & NN_POLLIN ) { sock = data_sock; } if( pollfd[0].revents & NN_POLLIN ) { sock = control_sock; } char *answer = NULL; bytes = nn_recv( sock, &answer, NN_MSG, 0 ); if( master_must_terminate ) continue; if( bytes >= 0 ) { json_object *recv_obj = json_tokener_parse( answer ); const char *recv_obj_str = json_object_to_json_string( recv_obj ); printf( "master received: %s\n", recv_obj_str ); json_object *op_obj; json_object_object_get_ex( recv_obj, "op", &op_obj ); const char *op = json_object_get_string( op_obj ); if( strcmp( op, "register" ) == 0 ) { json_object *role_obj; json_object_object_get_ex( recv_obj, "role", &role_obj ); const char *role = json_object_get_string( role_obj ); if( strcmp( role, "coordinator" ) == 0 ) { if( register_coordinator( recv_obj ) < 0 ) { fprintf( stderr, "Terminating master because out of resources!\n" ); master_must_terminate = 1; continue; } } } else if( strcmp( op, "started" ) == 0 ) { // TODO: send confirmation to cli } else if( strcmp( op, "stopped" ) == 0 ) { // TODO: send confirmation to cli } else if( strcmp( op, "created" ) == 0 ) { json_object *worker_obj; json_object_object_get_ex( recv_obj, "worker", &worker_obj ); const char *worker = json_object_get_string( worker_obj ); master_output_init( tdata->spool_dir, worker ); master_output_write_started( tdata->spool_dir, worker, recv_obj ); } else if( strcmp( op, "terminated" ) == 0 ) { json_object *worker_obj; json_object_object_get_ex( recv_obj, "worker", &worker_obj ); const char *worker = json_object_get_string( worker_obj ); master_output_write_terminated( tdata->spool_dir, worker, recv_obj ); master_output_free( worker ); } else if( strcmp( op, "output" ) == 0 ) { json_object *worker_obj; json_object_object_get_ex( recv_obj, "worker", &worker_obj ); const char *worker = json_object_get_string( worker_obj ); master_output_write( tdata->spool_dir, worker, recv_obj ); } else { fprintf( stderr, "WARNING: master received unkown message: %s\n", answer ); } json_object_put( recv_obj ); nn_freemsg( answer ); } if( bytes < 0 ) { /* documentation says we should get a ETIMEDOUT, but in * fact we get EFSM here, change if fixed in nanomsg */ if( errno == EFSM /* ETIMEDOUT */ ) { goto NEXT_COMMAND; } else if( errno == EAGAIN || errno == EINTR ) { continue; } else if( errno == ETERM ) { master_must_terminate = 1; continue; } else { fprintf( stderr, "ERROR: nn_recv returned %d: %s (%d)\n", bytes, nn_strerror( errno ), errno ); } } } (void)nn_shutdown( data_sock, 0 ); (void)nn_shutdown( control_sock, 0 ); puts( "master disconnected" ); return NULL; } int master_init( const char *control, const char *data, const char *spool_dir ) { pthread_attr_t attr; int res; master_must_terminate = 0; memset( coordinator, 0, sizeof( coordinator_t * ) * MAX_COORDINATORS ); res = pthread_attr_init( &attr ); if( res != 0 ) { return 1; } master_thread_data.control = control; master_thread_data.data = data; master_thread_data.spool_dir = spool_dir; res = pthread_create( &master_thread, &attr, master_func, (void *)&master_thread_data ); if( res != 0 ) { return 1; } return 0; } void master_terminate( int terminate_nanomsg ) { master_must_terminate = 1; if( terminate_nanomsg ) { nn_term( ); } } int master_free( ) { void *result; int res; res = pthread_join( master_thread, &result ); if( res != 0 ) { return 1; } return 0; } int master_start_worker( const char *name ) { char *msg = create_start_request( name ); int res = enqueue_request( msg ); return res; } int master_stop_worker( const char *name ) { char *msg = create_stop_request( name ); int res = enqueue_request( msg ); return res; }