diff options
author | Andreas Baumann <mail@andreasbaumann.cc> | 2014-11-29 16:01:33 +0100 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2014-11-29 16:01:33 +0100 |
commit | 1135bfb7aa76ea4ec5cff756cc6f0f98332e9828 (patch) | |
tree | e0645f5ca35bcafef13685a7af2994629386e93b /src/master.c | |
parent | a784c8f795f60604a2cd6f9633a213e891e4d7df (diff) | |
download | biruda-1135bfb7aa76ea4ec5cff756cc6f0f98332e9828.tar.gz biruda-1135bfb7aa76ea4ec5cff756cc6f0f98332e9828.tar.bz2 |
sending start command to coordinator, some command queueing and a memory mess
Diffstat (limited to 'src/master.c')
-rw-r--r-- | src/master.c | 116 |
1 files changed, 104 insertions, 12 deletions
diff --git a/src/master.c b/src/master.c index 73d1d76..2761667 100644 --- a/src/master.c +++ b/src/master.c @@ -119,9 +119,101 @@ static int register_coordinator( json_object *obj ) return 0; } +static char *alloc_message( const char *msg ) +{ + char *buf = strdup( msg ); + + // crashes in survey mode! + //~ size_t len = strlen( msg ); + //~ char *buf = (char *)nn_allocmsg( len + 1, 0 ); + //~ strncpy( buf, msg, len ); + + return buf; +} + +static char *create_discover_request( ) +{ + json_object *obj = json_object_new_object( ); + json_object *op = json_object_new_string( "discover" ); + json_object_object_add( obj, "op", op ); + json_object *role = json_object_new_string( "master" ); + json_object_object_add( obj, "role", role ); + + /* produce message as string, caller must free it */ + const char *msg = json_object_to_json_string( obj ); + char *res = alloc_message( msg ); + + json_object_put( obj ); + + return res; +} + +static char *create_start_request( const char *name ) +{ + json_object *obj = json_object_new_object( ); + json_object *op = json_object_new_string( "start" ); + json_object_object_add( obj, "op", op ); + json_object *role = json_object_new_string( "master" ); + json_object_object_add( obj, "role", role ); + json_object *worker = json_object_new_string( name ); + json_object_object_add( obj, "worker", worker ); + + /* produce message as string, caller must free it */ + const char *msg = json_object_to_json_string( obj ); + char *res = alloc_message( msg ); + + json_object_put( obj ); + + return res; +} + +#define MAX_NOF_COMMANDS 12 + +static int nof_queued_commands = 0; + +char *queued_command[MAX_NOF_COMMANDS]; + +static pthread_mutex_t queued_command_mutex; + +static int enqueue_request( char *cmd ) +{ + pthread_mutex_lock( &queued_command_mutex ); + + if( nof_queued_commands >= MAX_NOF_COMMANDS ) { + fprintf( stderr, "ERROR: internal command queue overflow!\n" ); + pthread_mutex_unlock( &queued_command_mutex ); + return -1; + } + + queued_command[nof_queued_commands] = cmd; + nof_queued_commands++; + + pthread_mutex_unlock( &queued_command_mutex ); + + return 0; +} + +static char *get_next_request( ) +{ + pthread_mutex_lock( &queued_command_mutex ); + char *cmd; + if( nof_queued_commands > 0 ) { + cmd = queued_command[0]; + --nof_queued_commands; + memmove( &queued_command[0], &queued_command[1], nof_queued_commands * sizeof( char * ) ); + pthread_mutex_unlock( &queued_command_mutex ); + return cmd; + } else { + pthread_mutex_unlock( &queued_command_mutex ); + return create_discover_request( ); + } +} + static void *master_func( void *thread_data ) { char *control = (char *)thread_data; + + pthread_mutex_init( &queued_command_mutex, NULL ); if( ( master_sock = nn_socket( AF_SP, NN_SURVEYOR ) ) < 0 ) { fprintf( stderr, "master, nn_socket( AF_SP, NN_SURVEYOR ) error: %s (%d)\n", strerror( errno ), errno ); @@ -138,17 +230,13 @@ static void *master_func( void *thread_data ) printf( "master connected to %s\n", control ); -NEXT_DISCOVER: +NEXT_COMMAND: ; - json_object *obj = json_object_new_object( ); - json_object *op = json_object_new_string( "discover" ); - json_object_object_add( obj, "op", op ); - json_object *role = json_object_new_string( "master" ); - json_object_object_add( obj, "role", role ); - const char *msg = json_object_to_json_string( obj ); + char *msg = get_next_request( ); int msg_size = strlen( msg ) + 1; + printf( "master send: %s\n", msg ); - int bytes = nn_send( master_sock, msg, msg_size, 0 ); + int bytes = nn_send( master_sock, msg, msg_size /* NN_MSG */, 0 ); if( bytes < 0 ) { if( errno == ETERM ) { master_must_terminate = 1; @@ -159,7 +247,8 @@ NEXT_DISCOVER: } else if( bytes != msg_size ) { fprintf( stderr, "ERROR: truncated message!" ); } - json_object_put( obj ); + // breaks in coordinator, so the memory is passed between threads? + free( msg ); msg = NULL; while( !master_must_terminate ) { @@ -184,7 +273,7 @@ NEXT_DISCOVER: const char *role = json_object_get_string( role_obj ); if( strcmp( role, "coordinator" ) == 0 ) { if( register_coordinator( recv_obj ) < 0 ) { - fprintf( stderr, "Termiating master because out of resources!\n" ); + fprintf( stderr, "Terminating master because out of resources!\n" ); master_must_terminate = 1; continue; } @@ -198,7 +287,7 @@ NEXT_DISCOVER: /* documentation says we should get a ETIMEDOUT, but in * fact we get EFSM here, change if fixed in nanomsg */ if( errno == EFSM /* ETIMEDOUT */ ) { - goto NEXT_DISCOVER; + goto NEXT_COMMAND; } else if( errno == EAGAIN || errno == EINTR ) { continue; } else if( errno == ETERM ) { @@ -264,5 +353,8 @@ int master_free( ) int master_start_worker( const char *name ) { - return 0; + char *msg = create_start_request( name ); + int res = enqueue_request( msg ); + + return res; } |