summaryrefslogtreecommitdiff
path: root/src/master.c
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2014-11-29 16:01:33 +0100
committerAndreas Baumann <mail@andreasbaumann.cc>2014-11-29 16:01:33 +0100
commit1135bfb7aa76ea4ec5cff756cc6f0f98332e9828 (patch)
treee0645f5ca35bcafef13685a7af2994629386e93b /src/master.c
parenta784c8f795f60604a2cd6f9633a213e891e4d7df (diff)
downloadbiruda-1135bfb7aa76ea4ec5cff756cc6f0f98332e9828.tar.gz
biruda-1135bfb7aa76ea4ec5cff756cc6f0f98332e9828.tar.bz2
sending start command to coordinator, some command queueing and a memory mess
Diffstat (limited to 'src/master.c')
-rw-r--r--src/master.c116
1 files changed, 104 insertions, 12 deletions
diff --git a/src/master.c b/src/master.c
index 73d1d76..2761667 100644
--- a/src/master.c
+++ b/src/master.c
@@ -119,9 +119,101 @@ static int register_coordinator( json_object *obj )
return 0;
}
+static char *alloc_message( const char *msg )
+{
+ char *buf = strdup( msg );
+
+ // crashes in survey mode!
+ //~ size_t len = strlen( msg );
+ //~ char *buf = (char *)nn_allocmsg( len + 1, 0 );
+ //~ strncpy( buf, msg, len );
+
+ return buf;
+}
+
+static char *create_discover_request( )
+{
+ json_object *obj = json_object_new_object( );
+ json_object *op = json_object_new_string( "discover" );
+ json_object_object_add( obj, "op", op );
+ json_object *role = json_object_new_string( "master" );
+ json_object_object_add( obj, "role", role );
+
+ /* produce message as string, caller must free it */
+ const char *msg = json_object_to_json_string( obj );
+ char *res = alloc_message( msg );
+
+ json_object_put( obj );
+
+ return res;
+}
+
+static char *create_start_request( const char *name )
+{
+ json_object *obj = json_object_new_object( );
+ json_object *op = json_object_new_string( "start" );
+ json_object_object_add( obj, "op", op );
+ json_object *role = json_object_new_string( "master" );
+ json_object_object_add( obj, "role", role );
+ json_object *worker = json_object_new_string( name );
+ json_object_object_add( obj, "worker", worker );
+
+ /* produce message as string, caller must free it */
+ const char *msg = json_object_to_json_string( obj );
+ char *res = alloc_message( msg );
+
+ json_object_put( obj );
+
+ return res;
+}
+
+#define MAX_NOF_COMMANDS 12
+
+static int nof_queued_commands = 0;
+
+char *queued_command[MAX_NOF_COMMANDS];
+
+static pthread_mutex_t queued_command_mutex;
+
+static int enqueue_request( char *cmd )
+{
+ pthread_mutex_lock( &queued_command_mutex );
+
+ if( nof_queued_commands >= MAX_NOF_COMMANDS ) {
+ fprintf( stderr, "ERROR: internal command queue overflow!\n" );
+ pthread_mutex_unlock( &queued_command_mutex );
+ return -1;
+ }
+
+ queued_command[nof_queued_commands] = cmd;
+ nof_queued_commands++;
+
+ pthread_mutex_unlock( &queued_command_mutex );
+
+ return 0;
+}
+
+static char *get_next_request( )
+{
+ pthread_mutex_lock( &queued_command_mutex );
+ char *cmd;
+ if( nof_queued_commands > 0 ) {
+ cmd = queued_command[0];
+ --nof_queued_commands;
+ memmove( &queued_command[0], &queued_command[1], nof_queued_commands * sizeof( char * ) );
+ pthread_mutex_unlock( &queued_command_mutex );
+ return cmd;
+ } else {
+ pthread_mutex_unlock( &queued_command_mutex );
+ return create_discover_request( );
+ }
+}
+
static void *master_func( void *thread_data )
{
char *control = (char *)thread_data;
+
+ pthread_mutex_init( &queued_command_mutex, NULL );
if( ( master_sock = nn_socket( AF_SP, NN_SURVEYOR ) ) < 0 ) {
fprintf( stderr, "master, nn_socket( AF_SP, NN_SURVEYOR ) error: %s (%d)\n", strerror( errno ), errno );
@@ -138,17 +230,13 @@ static void *master_func( void *thread_data )
printf( "master connected to %s\n", control );
-NEXT_DISCOVER:
+NEXT_COMMAND:
;
- json_object *obj = json_object_new_object( );
- json_object *op = json_object_new_string( "discover" );
- json_object_object_add( obj, "op", op );
- json_object *role = json_object_new_string( "master" );
- json_object_object_add( obj, "role", role );
- const char *msg = json_object_to_json_string( obj );
+ char *msg = get_next_request( );
int msg_size = strlen( msg ) + 1;
+
printf( "master send: %s\n", msg );
- int bytes = nn_send( master_sock, msg, msg_size, 0 );
+ int bytes = nn_send( master_sock, msg, msg_size /* NN_MSG */, 0 );
if( bytes < 0 ) {
if( errno == ETERM ) {
master_must_terminate = 1;
@@ -159,7 +247,8 @@ NEXT_DISCOVER:
} else if( bytes != msg_size ) {
fprintf( stderr, "ERROR: truncated message!" );
}
- json_object_put( obj );
+ // breaks in coordinator, so the memory is passed between threads?
+ free( msg ); msg = NULL;
while( !master_must_terminate ) {
@@ -184,7 +273,7 @@ NEXT_DISCOVER:
const char *role = json_object_get_string( role_obj );
if( strcmp( role, "coordinator" ) == 0 ) {
if( register_coordinator( recv_obj ) < 0 ) {
- fprintf( stderr, "Termiating master because out of resources!\n" );
+ fprintf( stderr, "Terminating master because out of resources!\n" );
master_must_terminate = 1;
continue;
}
@@ -198,7 +287,7 @@ NEXT_DISCOVER:
/* documentation says we should get a ETIMEDOUT, but in
* fact we get EFSM here, change if fixed in nanomsg */
if( errno == EFSM /* ETIMEDOUT */ ) {
- goto NEXT_DISCOVER;
+ goto NEXT_COMMAND;
} else if( errno == EAGAIN || errno == EINTR ) {
continue;
} else if( errno == ETERM ) {
@@ -264,5 +353,8 @@ int master_free( )
int master_start_worker( const char *name )
{
- return 0;
+ char *msg = create_start_request( name );
+ int res = enqueue_request( msg );
+
+ return res;
}