Project

General

Profile

Submit #3135 » kqsbtest.c

tautolog, 05/25/2018 09:49 PM

 
1
#include <sys/types.h>
2
#include <sys/event.h>
3
#include <sys/time.h>
4

    
5
#include <unistd.h>
6

    
7
#include <stdio.h>
8

    
9
#include <errno.h>
10

    
11
#include <sys/socket.h>
12
#include <sys/un.h>
13
#include <netinet/in.h>
14
#include <arpa/inet.h>
15

    
16
#include <string.h>
17

    
18
#define EVFILT_RECV (-11)
19
#define EVFILT_SEND (-12)
20

    
21
struct usr_pipe {
22
	struct kevent qev;   /* queue kevent */
23
};
24

    
25
enum USR_SOCKET_TYPE {
26
	USR_SOCKET_UNKNOWN  = 0,
27
	USR_SOCKET_ACCEPTED,
28
	USR_SOCKET_BOUND,
29
	USR_SOCKET_LISTENER,
30
};
31

    
32
struct usr_socket {
33
	struct usr_pipe recv;
34
	struct usr_pipe send;
35

    
36
	int managed;
37
	int fd;
38
	enum USR_SOCKET_TYPE type;
39
	char *path;
40
	struct sockaddr addr;
41
	socklen_t       addrlen;
42
};
43
#define USR_SOCKET_MAY_RECV(us) ((us).recv.qev.data)
44
#define USR_SOCKET_MAY_SEND(us) ((us).send.qev.data)
45

    
46
#define USR_SOCKET_TABLE_SIZE 1024
47
struct usr_socket_table {
48
	struct usr_socket socket_table[USR_SOCKET_TABLE_SIZE];
49
	int kq;
50
};
51
#define USR_SOCKET_FOR(ust, fd) ((ust).socket_table[fd])
52

    
53
int usr_socket_table_init(struct usr_socket_table *ust_ptr) {
54
	ust_ptr->kq = kqueue();
55
	memset(&ust_ptr->socket_table, 0, sizeof (ust_ptr->socket_table));
56
};
57

    
58
int usr_socket_table_register_listener(struct usr_socket_table *ust_ptr, int fd) {
59
	int rc;
60
	struct kevent kev_pair_in[1];
61
	struct kevent kev_pair_out[1];
62
	struct timespec zero_time;
63

    
64
	EV_SET(&kev_pair_in[0], fd, EVFILT_READ, EV_ADD|EV_RECEIPT, 0, 0, NULL);
65
	zero_time.tv_sec  = 0;
66
	zero_time.tv_nsec = 0;
67
	rc = kevent(ust_ptr->kq, kev_pair_in, 1, kev_pair_out, 1, &zero_time);
68
	if (rc == -1) {
69
		perror("kevent");
70
		return -1;
71
	}
72

    
73
	printf("registered listener %i\n", fd);
74

    
75
	return 0;
76
}
77

    
78
int usr_socket_table_register_accepted(struct usr_socket_table *ust_ptr, int fd) {
79
	int rc;
80
	struct kevent kev_pair_in[2];
81
	struct kevent kev_pair_out[2];
82
	struct timespec zero_time;
83

    
84
	EV_SET(&kev_pair_in[0], fd, EVFILT_RECV, EV_ADD|EV_RECEIPT, 0, 0, NULL);
85
	EV_SET(&kev_pair_in[1], fd, EVFILT_SEND, EV_ADD|EV_RECEIPT, 0, 0, NULL);
86
	zero_time.tv_sec  = 0;
87
	zero_time.tv_nsec = 0;
88
	rc = kevent(ust_ptr->kq, kev_pair_in, 2, kev_pair_out, 2, &zero_time);
89
	if (rc == -1) {
90
		perror("kevent");
91
		return -1;
92
	}
93

    
94
	printf("registered accepted %i\n", fd);
95

    
96
	return 0;
97
}
98

    
99
int usr_socket_table_opened(struct usr_socket_table *ust_ptr, int fd, enum USR_SOCKET_TYPE type, const struct sockaddr *name, socklen_t namelen) {
100
	int rc;
101
	struct usr_socket us;
102

    
103
	if (fd >= USR_SOCKET_TABLE_SIZE) {
104
		errno = ENOBUFS;
105
		return -1;
106
	}
107

    
108
	switch (type) {
109
		case USR_SOCKET_ACCEPTED:
110
			memset(&us, 0, sizeof (us));
111
			us.fd = fd;
112
			us.managed = 1;
113
			us.type = type;
114
			memcpy(&USR_SOCKET_FOR(*ust_ptr, fd), &us, sizeof (us));
115

    
116
			memcpy(&USR_SOCKET_FOR(*ust_ptr, fd).addr, name, namelen);
117

    
118
			rc = usr_socket_table_register_accepted(ust_ptr, fd);
119
			if (rc == -1) {
120
				perror("usr_socket_table_register_accepted");
121
				return -1;
122
			}
123
			break;
124
		case USR_SOCKET_BOUND:
125
			memset(&us, 0, sizeof (us));
126
			us.fd = fd;
127
			us.managed = 1;
128
			us.type = type;
129
			memcpy(&USR_SOCKET_FOR(*ust_ptr, fd), &us, sizeof (us));
130

    
131
			memcpy(&us.addr, name, namelen);
132

    
133
			/* skip registration */
134
			break;
135
		case USR_SOCKET_LISTENER: 
136
			/* already bound with address; only update to new type */
137
			USR_SOCKET_FOR(*ust_ptr, fd).type = type;
138

    
139
			rc = usr_socket_table_register_listener(ust_ptr, fd);
140
			if (rc == -1) {
141
				perror("usr_socket_table_register_listener");
142
				return -1;
143
			}
144
			break;
145
		default:
146
			errno = ENOTSUP;
147
			break;
148
	}
149

    
150
	return fd;
151
}
152

    
153
int usr_socket_table_connect(struct usr_socket_table *ust_ptr, int domain, int type, int protocol, const struct sockaddr *name, socklen_t namelen) {
154
	int rc;
155
	int fd;
156

    
157
	rc = socket(domain, type, protocol);
158
	if (rc == -1) {
159
		perror("socket");
160
		return -1;
161
	}
162

    
163
	fd = rc;
164

    
165
	rc = connect(fd, name, namelen);
166
	if (rc == -1) {
167
		perror("connect");
168
		return -1;
169
	}
170

    
171
	return usr_socket_table_opened(ust_ptr, fd, USR_SOCKET_ACCEPTED, name, namelen);
172
}
173

    
174
int usr_socket_table_bind(struct usr_socket_table *ust_ptr, int domain, int type, int protocol, const struct sockaddr *name, socklen_t namelen) {
175
	int rc;
176
	int fd;
177

    
178
	rc = socket(domain, type, protocol);
179
	if (rc == -1) {
180
		perror("socket");
181
		return -1;
182
	}
183

    
184
	fd = rc;
185

    
186
	rc = bind(fd, name, namelen);
187
	if (rc == -1) {
188
		perror("bind");
189
		return -1;
190
	}
191

    
192
	return usr_socket_table_opened(ust_ptr, fd, USR_SOCKET_BOUND, name, namelen);
193
}
194

    
195
int usr_socket_table_listen(struct usr_socket_table *ust_ptr, int fd, int backlog) {
196
	int rc;
197

    
198
	rc = listen(fd, backlog);
199
	if (rc == -1) {
200
		perror("listen");
201
		return -1;
202
	}
203

    
204
	return usr_socket_table_opened(ust_ptr, fd, USR_SOCKET_LISTENER, NULL, 0);
205
}
206

    
207
int usr_socket_table_accept(struct usr_socket_table *ust_ptr, int fd) {
208
	int rc;
209
	struct sockaddr addr;
210
	socklen_t       addrlen;
211
	int accepted_fd;
212

    
213
	rc = accept(fd, &addr, &addrlen);
214
	if (rc == -1) {
215
		perror("accept");
216
		return 1;
217
	}
218

    
219
	accepted_fd = rc;
220

    
221
	return usr_socket_table_opened(ust_ptr, accepted_fd, USR_SOCKET_ACCEPTED, &addr, addrlen);
222
}
223

    
224
/* returns the next active file descriptor */
225
int usr_socket_table_poll(struct usr_socket_table *ust_ptr) {
226
	int rc;
227
	struct kevent kev;
228
	struct usr_socket *us_ptr;
229

    
230
	rc = kevent(ust_ptr->kq, NULL, 0, &kev, 1, NULL);
231
	if (rc == -1) {
232
		perror("kevent");
233
		return -1;
234
	}
235

    
236
	printf("kqueue() = %i, ident = %i, filter = %i, flags = %i, fflags = %i, data = %i\n", rc, (int) kev.ident, kev.filter, kev.flags, kev.fflags, (int) kev.data);
237

    
238
	us_ptr = &USR_SOCKET_FOR(*ust_ptr, kev.ident);
239

    
240
	switch (kev.filter) {
241
		case EVFILT_READ:
242
		case EVFILT_RECV:
243
			if (us_ptr->managed) {
244
				memcpy(&USR_SOCKET_FOR(*ust_ptr, kev.ident).recv.qev, &kev, sizeof (kev));
245
			}
246
			break;
247
		case EVFILT_WRITE:
248
		case EVFILT_SEND:
249
			if (us_ptr->managed) {
250
				memcpy(&USR_SOCKET_FOR(*ust_ptr, kev.ident).send.qev, &kev, sizeof (kev));
251
			}
252
			break;
253
		default:
254
			break;
255
	}
256

    
257
	return kev.ident;
258
}
259

    
260
int usr_socket_recv(struct usr_socket *us_ptr, void *buf, size_t len, int flags) {
261
	int rc;
262

    
263
	rc = recv(us_ptr->fd, buf, len, flags);
264
	if (rc == -1) {
265
		perror("recv");
266
		return -1;
267
	}
268

    
269
	us_ptr->recv.qev.data -= rc;
270

    
271
	printf("received %i from %i with %i remaining\n", rc, us_ptr->fd, us_ptr->recv.qev.data);
272

    
273
	return rc;
274
}
275

    
276
int usr_socket_send(struct usr_socket *us_ptr, void *msgbuf, size_t len, int flags) {
277
	int rc;
278

    
279
	rc = send(us_ptr->fd, msgbuf, len, flags);
280
	if (rc == -1) {
281
		perror("send");
282
		return -1;
283
	}
284

    
285
	us_ptr->send.qev.data -= rc;
286

    
287
	printf("sent %i to %i with %i remaining\n", rc, us_ptr->fd, us_ptr->send.qev.data);
288

    
289
	return rc;
290
}
291

    
292
int main() {
293
	int rc;
294
	int s;
295
	struct usr_socket_table ust;
296
	char   echo_buf[10];
297
	size_t echo_buflen;
298
	struct usr_socket *us_ptr;
299
	struct sockaddr_un addr_un;
300
	struct sockaddr_in addr_in;
301

    
302
	rc = usr_socket_table_init(&ust);
303
	if (rc == -1) {
304
		perror("usr_socket_table_init");
305
		return 1;
306
	}
307

    
308
	/* Unix Domain Socket */
309

    
310
	/*
311
	memset(&addr_un, 0, sizeof (addr_un));
312
	strncpy(addr_un.sun_path, "test1.sock", sizeof (addr_un.sun_path));
313
	addr_un.sun_len = strnlen(addr_un.sun_path, sizeof (addr_un.sun_path));
314
	addr_un.sun_family = SOCK_STREAM;
315

    
316
	rc = usr_socket_table_connect(&ust, AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK, 0, (struct sockaddr *) &addr_un, sizeof (addr_un));
317
	if (rc == -1) {
318
		perror("usr_socket_table_connect");
319
		return 1;
320
	}
321

    
322
	s = rc;
323

    
324
	printf("connected socket %i to \"test1.sock\"\n", s);
325
	*/
326

    
327
	/* Socket Server */
328

    
329
	memset(&addr_in, 0, sizeof (addr_in));
330
	addr_in.sin_family = AF_INET;
331
	addr_in.sin_port = htons(8081);
332
	addr_in.sin_addr.s_addr = inet_addr("0.0.0.0");
333

    
334
	rc = usr_socket_table_bind(&ust, AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0, (struct sockaddr *) &addr_in, sizeof (addr_in));
335
	if (rc == -1) {
336
		perror("usr_socket_table_bind");
337
		return 1;
338
	}
339

    
340
	s = rc;
341

    
342
	printf("bound socket %i to 0.0.0.0:8081\n", s);
343

    
344
	rc = usr_socket_table_listen(&ust, s, 32);
345
	if (rc == -1) {
346
		perror("usr_socket_table_listen");
347
		return 1;
348
	}
349

    
350
	printf("listening\n");
351

    
352
	/* Event Loop */
353

    
354
	while ((rc = usr_socket_table_poll(&ust)) != -1) {
355
		s = rc;
356

    
357
		printf("State change for socket %i\n", s);
358

    
359
		us_ptr = &USR_SOCKET_FOR(ust, s);
360
		if ( ! us_ptr->managed) {
361
			printf("skipping unmanaged event for fd %i\n", s);
362
			continue;
363
		}
364

    
365
		printf("event for socket of type %i\n", us_ptr->type);
366

    
367
		switch (us_ptr->type) {
368
			case USR_SOCKET_LISTENER:
369
				printf("accepting from %i\n", us_ptr->fd);
370
				rc = usr_socket_table_accept(&ust, us_ptr->fd);
371
				if (rc == -1) {
372
					perror("usr_socket_table_accept");
373
				}
374
				break;
375
			case USR_SOCKET_ACCEPTED:
376
				while (USR_SOCKET_MAY_RECV(*us_ptr) > 0) {
377
					rc = usr_socket_recv(us_ptr, echo_buf, sizeof (echo_buf), 0);
378
					if (rc == -1) {
379
						perror("usr_socket_recv");
380
						return 1;
381
					}
382
					echo_buflen = rc;
383

    
384
					if (echo_buflen > 0) {
385
						if (USR_SOCKET_MAY_SEND(*us_ptr) < echo_buflen) {
386
							errno = ENOBUFS;
387
							perror("echo_buflen");
388
							return 1;
389
						}
390
						rc = usr_socket_send(us_ptr, echo_buf, echo_buflen, 0);
391
						if (rc == -1) {
392
							perror("usr_socket_send");
393
							return 1;
394
						}
395
					}
396
				}
397
				break;
398
		}
399
	}
400
	if (rc == -1) {
401
		perror("usr_socket_table_poll");
402
		return 1;
403
	}
404

    
405
	return 0;
406
}
(2-2/2)