summaryrefslogtreecommitdiff
path: root/src/master.c
blob: 6dc764a19799a0205392dc041f71fdbb539d1d83 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#include "master.h"

#include "pthread.h"

#include "nanomsg/nn.h"
#include "nanomsg/survey.h"

#include "json-c/json.h"

#include <stdio.h>
#include <string.h>

#include "sleep.h"

static pthread_t master_thread;
static int master_sock;
static int master_must_terminate = 0;

static void *master_func( void *thread_data )
{
	char *control = (char *)thread_data;
	
	master_sock = nn_socket( AF_SP, NN_SURVEYOR );
	
	int deadline = 5000;
	(void)nn_setsockopt( master_sock, NN_SURVEYOR, NN_SURVEYOR_DEADLINE, &deadline, sizeof( deadline ) );
	
	(void)nn_bind( master_sock, control );

	printf( "master connected to %s\n", control );

NEXT_DISCOVER:	
	sleep( 1 );

	json_object *obj = json_object_new_object( );
	json_object *op = json_object_new_string( "discover" );
	json_object_object_add( obj, "op", op );
	const char *msg = json_object_to_json_string( obj );
	int msg_size = strlen( msg ) + 1;
	printf( "master send: %s\n", msg );
	int bytes = nn_send( master_sock, msg, msg_size, 0 );
	if( bytes < 0 ) {
		if( errno == ETERM ) {
			goto MASTER_ENDS;
		} else {
			fprintf( stderr, "ERROR: nn_send returned %d: %s (%d)\n",
				bytes, nn_strerror( errno ), errno );
		}
	}
	if( bytes != msg_size ) {
		fprintf( stderr, "ERROR: truncated message!" );
	}
	json_object_put( obj );

	while( !master_must_terminate ) {
		
		printf( "master idle: %d\n", master_must_terminate );
		
		sleep( 1 );
		
		char *answer = NULL;
		bytes = nn_recv( master_sock, &answer, NN_MSG, 0 );
		if( master_must_terminate ) continue;
		if( bytes >= 0 ) {
			printf( "master received: %s\n", answer );
			nn_freemsg( answer );
		}
		if( bytes < 0 ) {
			/* documentation says we should get a ETIMEDOUT, but in
			 * fact we get EFSM here, change if fixed in nanomsg */
			if( errno == EFSM /* ETIMEDOUT */ ) {
				goto NEXT_DISCOVER;
			} else if( errno == EAGAIN || errno == EINTR ) {
				continue;
			} else if( errno == ETERM ) {
				master_must_terminate = 1;
				continue;
			} else {
				fprintf( stderr, "ERROR: nn_recv returned %d: %s (%d)\n",
					bytes, nn_strerror( errno ), errno );
			}
		}
	}

MASTER_ENDS:
  
	(void)nn_shutdown( master_sock, 0 );

	puts( "master disconnected" );
	
	return NULL;
}

int master_init( const char *control )
{
	pthread_attr_t attr;
	int res;
	
	master_must_terminate = 0;

	res = pthread_attr_init( &attr );
	if( res != 0 ) {
		return 1;
	}
	
	res = pthread_create( &master_thread, &attr, master_func, (void *)control );
	if( res != 0 ) {
		return 1;
	}
		
	return 0;
}

void master_terminate( )
{
	master_must_terminate = 1;

	nn_term( );
}

int master_free( )
{
	void *result;
	int res;
	
	res = pthread_join( master_thread, &result );
	if( res != 0 ) {
		return 1;
	}

	return 0;
	
}