#include "worker.h" #include "nanomsg/nn.h" #include "nanomsg/pipeline.h" #include "port.h" #include "pthread.h" #include #include #include #ifndef _WIN32 #include #endif #include #include #include #include "json-c/json.h" #include "system.h" const char *worker_exection_mode_str( worker_execution_mode_t mode ) { switch( mode ) { case WORKER_EXECUTION_DISABLED: return "disabled"; case WORKER_EXECUTION_DIRECT: return "direct"; default: return ""; } } const char *worker_state_str( worker_state_t state ) { switch( state ) { case WORKER_STATE_STOPPED: return "stopped"; case WORKER_STATE_RUNNING: return "running"; default: return ""; } } worker_execution_mode_t worker_execution_mode_from_str( const char *s ) { if( strcasecmp( s, "disabled" ) == 0 ) { return WORKER_EXECUTION_DISABLED; } else if( strcasecmp( s, "direct" ) == 0 ) { return WORKER_EXECUTION_DIRECT; } else { fprintf( stderr, "Warning: unknown worker execution mode '%s'!\n", s ); return WORKER_EXECUTION_DISABLED; } } worker_state_t worker_state_from_str( const char *s ) { if( strcasecmp( s, "stopped" ) == 0 ) { return WORKER_STATE_STOPPED; } else if( strcasecmp( s, "running" ) == 0 ) { return WORKER_STATE_RUNNING; } else { fprintf( stderr, "Warning: unknown worker state '%s' (assuming 'stopped')!\n", s ); return WORKER_STATE_STOPPED; } } typedef struct { GPid pid; gint out, err; GIOChannel *out_channel; GIOChannel *err_channel; worker_t *worker; pthread_t thread; GMainLoop *main_loop; int sock; } direct_glib_execution_worker_data_t; static char *create_worker_output_answer( const char *name, const char *s, size_t len, bool is_stdout ) { json_object *obj = json_object_new_object( ); json_object *op = json_object_new_string( "output" ); json_object_object_add( obj, "op", op ); /* we are a worker */ json_object *role = json_object_new_string( "worker" ); json_object_object_add( obj, "role", role ); json_object *name_obj = json_object_new_string( name ); json_object_object_add( obj, "worker", name_obj ); json_object *msg_obj = json_object_new_string_len( s, len ); json_object_object_add( obj, "msg", msg_obj ); json_object *stream_obj = json_object_new_string( is_stdout ? "stdout" : "stderr" ); json_object_object_add( obj, "stream", stream_obj ); time_t ts; time( &ts ); json_object *timestamp_obj = json_object_new_int( ts ); json_object_object_add( obj, "timestamp", timestamp_obj ); /* produce message as string, caller must free it */ const char *msg = json_object_to_json_string( obj ); char *res = strdup( msg ); json_object_put( obj ); return res; } static char *create_worker_termination_answer( const char *name, GPid pid, gint status, const char *exit_message ) { json_object *obj = json_object_new_object( ); json_object *op = json_object_new_string( "terminated" ); json_object_object_add( obj, "op", op ); /* we are a worker */ json_object *role = json_object_new_string( "worker" ); json_object_object_add( obj, "role", role ); json_object *name_obj = json_object_new_string( name ); json_object_object_add( obj, "worker", name_obj ); char pid_str[64]; snprintf( pid_str, sizeof( pid_str ), PRIgid, pid ); json_object *pid_obj = json_object_new_string( pid_str ); json_object_object_add( obj, "pid", pid_obj ); json_object *exit_status_obj = json_object_new_int( status ); json_object_object_add( obj, "exit_status", exit_status_obj ); json_object *exit_message_obj = json_object_new_string( exit_message ); json_object_object_add( obj, "exit_message", exit_message_obj ); time_t ts; time( &ts ); json_object *timestamp_obj = json_object_new_int( ts ); json_object_object_add( obj, "timestamp", timestamp_obj ); /* produce message as string, caller must free it */ const char *msg = json_object_to_json_string( obj ); char *res = strdup( msg ); json_object_put( obj ); return res; } static char *create_worker_created_answer( const char *name, GPid pid ) { json_object *obj = json_object_new_object( ); json_object *op = json_object_new_string( "created" ); json_object_object_add( obj, "op", op ); /* we are a worker */ json_object *role = json_object_new_string( "worker" ); json_object_object_add( obj, "role", role ); json_object *name_obj = json_object_new_string( name ); json_object_object_add( obj, "worker", name_obj ); char pid_str[64]; snprintf( pid_str, sizeof( pid_str ), PRIgid, pid ); json_object *pid_obj = json_object_new_string( pid_str ); json_object_object_add( obj, "pid", pid_obj ); time_t ts; time( &ts ); json_object *timestamp_obj = json_object_new_int( ts ); json_object_object_add( obj, "timestamp", timestamp_obj ); /* produce message as string, caller must free it */ const char *msg = json_object_to_json_string( obj ); char *res = strdup( msg ); json_object_put( obj ); return res; } static int worker_send_message( worker_t *worker, const char *msg ) { direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)worker->execution_data; size_t msg_size = strlen( msg ) + 1; printf( "worker send: %s\n", msg ); int bytes = nn_send( wed->sock, msg, msg_size, 0 ); if( bytes < 0 ) { fprintf( stderr, "ERROR: worker '%s' nn_send returned %d: %s (%d)\n", worker->name, bytes, nn_strerror( errno ), errno ); return -1; } if( bytes != msg_size ) { fprintf( stderr, "ERROR: worker '%s' truncated message!", worker->name ); return -1; } return 0; } static void watch_child( GPid pid, gint status, gpointer *data ) { direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data; worker_t *worker = wed->worker; char *exit_message = ""; #if( GLIB_CHECK_VERSION( 2, 34, 0 ) ) GError *error = NULL; if( g_spawn_check_exit_status( status, &error ) ) { if( error != NULL ) { exit_message = error->message; } if( status == 0 ) { exit_message = ""; } } #else // TODO: do it in the old platform dependend way #endif printf( "Worker child with PID " PRIgid " terminated with status %d: %s.\n", wed->pid, status, exit_message ); char *msg = create_worker_termination_answer( worker->name, wed->pid, status, exit_message ); worker_send_message( worker, msg ); free( msg ); g_spawn_close_pid( pid ); g_main_loop_quit( wed->main_loop ); } static gboolean watch_output( GIOChannel *channel, GIOCondition condition, gpointer *data ) { direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)data; worker_t *worker = wed->worker; bool is_stdout = ( wed->out_channel == channel ); gchar *buf; gsize len; if( ( condition & G_IO_HUP ) == G_IO_HUP ) { g_io_channel_unref( channel ); return FALSE; } g_io_channel_read_line( channel, &buf, &len, NULL, NULL ); char *msg = create_worker_output_answer( worker->name, buf, len, is_stdout ); worker_send_message( worker, msg ); free( msg ); g_free( buf ); return TRUE; } static char *expand_system_variables( const char *s ) { size_t r_size = strlen( s ) * 2; char *r = (char *)malloc( r_size ); char *d = r; enum { COPYING, DOLLAR, OPEN_BRACE, VARIABLE, CLOSE_BRACE, _ERROR } state = COPYING; const char *var = NULL; const char *p = s; while( state != _ERROR && *p != '\0' && d - r < r_size - 1 ) { switch( state ) { case COPYING: if( *p == '$' ) { state = DOLLAR; p++; } else { *d = *p; d++; p++; } break; case DOLLAR: if( *p == '{' ) { state = OPEN_BRACE; p++; } else { fprintf( stderr, "ERROR: Unexpected '{' at position %d in '%s'\n", (int)( p - s ), s ); state = _ERROR; } break; case OPEN_BRACE: if( *p == '}' ) { fprintf( stderr, "ERROR: Empty variable declaration '${}' at position %d in '%s'\n", (int)( p - s ), s ); state = _ERROR; p++; } else if( isalpha( *p ) ) { // ok, part of variable name state = VARIABLE; var = p; p++; } else { fprintf( stderr, "ERROR: Illegal character '%c' at position %d in variable in '%s'\n", *p, (int)( p - s ), s ); state = _ERROR; } break; case VARIABLE: if( *p == '}' ) { state = CLOSE_BRACE; } else if( isalpha( *p ) ) { // ok, part of variable name p++; } else { fprintf( stderr, "ERROR: Illegal character '%c' at position %d in variable in '%s'\n", *p, (int)( p - s ), s ); state = _ERROR; } break; case CLOSE_BRACE: if( strncmp( var, "ARCH", 4 ) == 0 ) { char machine_arch[100]; system_arch( machine_arch, sizeof( machine_arch ) ); strncpy( d, machine_arch, strlen( machine_arch ) ); d += strlen( machine_arch ); state = COPYING; } else if( strncmp( var, "OS", 2 ) == 0 ) { char os_name[100]; system_os( os_name, sizeof( os_name ) ); strncpy( d, os_name, strlen( os_name ) ); d += strlen( os_name ); state = COPYING; } else if( strncmp( var, "HOST", 4 ) == 0 ) { char hostname[100]; system_hostname( hostname, sizeof( hostname ) ); strncpy( d, hostname, strlen( hostname ) ); d += strlen( hostname ); state = COPYING; } else if( strncmp( var, "CPUS", 4 ) == 0 ) { unsigned int nofCpus = system_available_cpus( ); char buf[10]; snprintf( buf, sizeof( buf ), "%d", nofCpus ); strncpy( d, buf, strlen( buf ) ); d += strlen( buf ); state = COPYING; } else if( strncmp( var, "PHYSICAL_MEMORY", 15 ) == 0 ) { unsigned int physMem = system_phys_memory( ); char buf[12]; snprintf( buf, sizeof( buf ), "%d", physMem ); strncpy( d, buf, strlen( buf ) ); d += strlen( buf ); state = COPYING; } else { fprintf( stderr, "ERROR: Unknown variable '${%*s}' at position %d in '%s'\n", (int)( p-var ), var, (int)( p - s ), s ); state = _ERROR; } p++; break; case _ERROR: default: break; } } *d = '\0'; if( d - r >= r_size - 1 ) { fprintf( stderr, "ERROR: out of memory while substituting variables in '%s'\n", s ); state = _ERROR; } switch( state ) { case COPYING: // fine break; case DOLLAR: case OPEN_BRACE: case VARIABLE: fprintf( stderr, "ERROR: unterminated variable reference in '%s'\n", s ); break; case CLOSE_BRACE: fprintf( stderr, "ERROR: illegal state while parsing command '%s'\n", s ); break; case _ERROR: return NULL; } return r; } static void *worker_func( void *thread_data ) { direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)thread_data; worker_t *worker = wed->worker; char *command = expand_system_variables( worker->command ); if( command == NULL ) { fprintf( stderr, "ERROR: Failed to substitute variables in command of worker '%s', '%s'\n", worker->name, command ); return NULL; } gchar **args; gint nof_args; GError *error = NULL; if( !g_shell_parse_argv( command, &nof_args, &args, &error ) ) { fprintf( stderr, "ERROR: Failed to parse command of worker '%s', '%s': %s\n", worker->name, command, error->message ); g_strfreev( args ); free( command ); return NULL; } wed->sock = nn_socket( AF_SP, NN_PUSH ); (void)nn_connect( wed->sock, worker->data ); printf( "worker connected to data channel '%s'\n", worker->data ); error = NULL; gboolean ret = g_spawn_async_with_pipes( NULL, args, NULL, G_SPAWN_DO_NOT_REAP_CHILD, NULL, NULL, &wed->pid, NULL, &wed->out, &wed->err, &error ); if( !ret ) { fprintf( stderr, "ERROR: Starting worker failed: %s\n", error->message ); g_strfreev( args ); free( command ); return NULL; } printf( "Worker child with PID " PRIgid " created: %s.\n", wed->pid, command ); char *msg = create_worker_created_answer( worker->name, wed->pid ); worker_send_message( worker, msg ); free( msg ); GMainContext *context = g_main_context_default( ); wed->main_loop = g_main_loop_new( context, TRUE ); g_child_watch_add( wed->pid, (GChildWatchFunc)watch_child, (gpointer *)wed ); #ifdef G_OS_WIN32 wed->out_channel = g_io_channel_win32_new_fd( wed->out ); wed->err_channel = g_io_channel_win32_new_fd( wed->err ); #else wed->out_channel = g_io_channel_unix_new( wed->out ); wed->err_channel = g_io_channel_unix_new( wed->err ); #endif GIOCondition cond = (GIOCondition)( G_IO_IN | G_IO_HUP ); g_io_add_watch( wed->out_channel, cond, (GIOFunc)watch_output, (gpointer *)wed ); g_io_add_watch( wed->err_channel, cond, (GIOFunc)watch_output, (gpointer *)wed ); worker->state = WORKER_STATE_RUNNING; printf( "worker %s: entering glib worker main loop..\n", worker->name ); g_main_loop_run( wed->main_loop ); printf( "worker %s: leaving glib worker main loop..\n", worker->name ); worker->state = WORKER_STATE_STOPPED; (void)nn_shutdown( wed->sock, 0 ); puts( "worker disconnected" ); free( worker->execution_data ); worker->execution_data = NULL; g_strfreev( args ); free( command ); return NULL; } int worker_init( worker_t *worker ) { switch( worker->state ) { case WORKER_STATE_RUNNING: fprintf( stderr, "worker %s is already running\n", worker->name ); return 0; case WORKER_STATE_STOPPED: // execute, skip break; } switch( worker->mode ) { case WORKER_EXECUTION_DIRECT: // execute, skip break; case WORKER_EXECUTION_DISABLED: fprintf( stderr, "worker %s is disabled, won't execute\n", worker->name ); return 0; } pthread_attr_t attr; int res; res = pthread_attr_init( &attr ); if( res != 0 ) { return 1; } worker->execution_data = malloc( sizeof( direct_glib_execution_worker_data_t ) ); direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)worker->execution_data; wed->worker = worker; res = pthread_create( &wed->thread, &attr, worker_func, (void *)wed ); if( res != 0 ) { return 1; } pthread_detach( wed->thread ); return 0; } void worker_terminate( worker_t *worker ) { direct_glib_execution_worker_data_t *wed = (direct_glib_execution_worker_data_t *)worker->execution_data; switch( worker->state ) { case WORKER_STATE_RUNNING: break; case WORKER_STATE_STOPPED: fprintf( stderr, "worker %s is already stopped\n", worker->name ); return; } #ifndef _WIN32 if( kill( wed->pid, SIGTERM ) < 0 ) { fprintf( stderr, "ERROR: worker %s (PID " PRIgid ") could not be killed: %s\n", worker->name, wed->pid, strerror( errno ) ); } #else if( !TerminateProcess( wed->pid, 0 ) ) { fprintf( stderr, "ERROR: worker %s (PID " PRIgid ") could not be killed\n", worker->name, wed->pid ); } #endif } int worker_free( worker_t *worker ) { return 0; }