Project

General

Profile

kqsbtest.c

tautolog, 05/25/2018 09:49 PM

 
1
#include <sys/types.h>
2
#include <sys/event.h>
3
#include <sys/time.h>
4

    
5
#include <unistd.h>
6

    
7
#include <stdio.h>
8

    
9
#include <errno.h>
10

    
11
#include <sys/socket.h>
12
#include <sys/un.h>
13
#include <netinet/in.h>
14
#include <arpa/inet.h>
15

    
16
#include <string.h>
17

    
18
#define EVFILT_RECV (-11)
19
#define EVFILT_SEND (-12)
20

    
21
struct usr_pipe {
22
        struct kevent qev;   /* queue kevent */
23
};
24

    
25
enum USR_SOCKET_TYPE {
26
        USR_SOCKET_UNKNOWN  = 0,
27
        USR_SOCKET_ACCEPTED,
28
        USR_SOCKET_BOUND,
29
        USR_SOCKET_LISTENER,
30
};
31

    
32
struct usr_socket {
33
        struct usr_pipe recv;
34
        struct usr_pipe send;
35

    
36
        int managed;
37
        int fd;
38
        enum USR_SOCKET_TYPE type;
39
        char *path;
40
        struct sockaddr addr;
41
        socklen_t       addrlen;
42
};
43
#define USR_SOCKET_MAY_RECV(us) ((us).recv.qev.data)
44
#define USR_SOCKET_MAY_SEND(us) ((us).send.qev.data)
45

    
46
#define USR_SOCKET_TABLE_SIZE 1024
47
struct usr_socket_table {
48
        struct usr_socket socket_table[USR_SOCKET_TABLE_SIZE];
49
        int kq;
50
};
51
#define USR_SOCKET_FOR(ust, fd) ((ust).socket_table[fd])
52

    
53
int usr_socket_table_init(struct usr_socket_table *ust_ptr) {
54
        ust_ptr->kq = kqueue();
55
        memset(&ust_ptr->socket_table, 0, sizeof (ust_ptr->socket_table));
56
};
57

    
58
int usr_socket_table_register_listener(struct usr_socket_table *ust_ptr, int fd) {
59
        int rc;
60
        struct kevent kev_pair_in[1];
61
        struct kevent kev_pair_out[1];
62
        struct timespec zero_time;
63

    
64
        EV_SET(&kev_pair_in[0], fd, EVFILT_READ, EV_ADD|EV_RECEIPT, 0, 0, NULL);
65
        zero_time.tv_sec  = 0;
66
        zero_time.tv_nsec = 0;
67
        rc = kevent(ust_ptr->kq, kev_pair_in, 1, kev_pair_out, 1, &zero_time);
68
        if (rc == -1) {
69
                perror("kevent");
70
                return -1;
71
        }
72

    
73
        printf("registered listener %i\n", fd);
74

    
75
        return 0;
76
}
77

    
78
int usr_socket_table_register_accepted(struct usr_socket_table *ust_ptr, int fd) {
79
        int rc;
80
        struct kevent kev_pair_in[2];
81
        struct kevent kev_pair_out[2];
82
        struct timespec zero_time;
83

    
84
        EV_SET(&kev_pair_in[0], fd, EVFILT_RECV, EV_ADD|EV_RECEIPT, 0, 0, NULL);
85
        EV_SET(&kev_pair_in[1], fd, EVFILT_SEND, EV_ADD|EV_RECEIPT, 0, 0, NULL);
86
        zero_time.tv_sec  = 0;
87
        zero_time.tv_nsec = 0;
88
        rc = kevent(ust_ptr->kq, kev_pair_in, 2, kev_pair_out, 2, &zero_time);
89
        if (rc == -1) {
90
                perror("kevent");
91
                return -1;
92
        }
93

    
94
        printf("registered accepted %i\n", fd);
95

    
96
        return 0;
97
}
98

    
99
int usr_socket_table_opened(struct usr_socket_table *ust_ptr, int fd, enum USR_SOCKET_TYPE type, const struct sockaddr *name, socklen_t namelen) {
100
        int rc;
101
        struct usr_socket us;
102

    
103
        if (fd >= USR_SOCKET_TABLE_SIZE) {
104
                errno = ENOBUFS;
105
                return -1;
106
        }
107

    
108
        switch (type) {
109
                case USR_SOCKET_ACCEPTED:
110
                        memset(&us, 0, sizeof (us));
111
                        us.fd = fd;
112
                        us.managed = 1;
113
                        us.type = type;
114
                        memcpy(&USR_SOCKET_FOR(*ust_ptr, fd), &us, sizeof (us));
115

    
116
                        memcpy(&USR_SOCKET_FOR(*ust_ptr, fd).addr, name, namelen);
117

    
118
                        rc = usr_socket_table_register_accepted(ust_ptr, fd);
119
                        if (rc == -1) {
120
                                perror("usr_socket_table_register_accepted");
121
                                return -1;
122
                        }
123
                        break;
124
                case USR_SOCKET_BOUND:
125
                        memset(&us, 0, sizeof (us));
126
                        us.fd = fd;
127
                        us.managed = 1;
128
                        us.type = type;
129
                        memcpy(&USR_SOCKET_FOR(*ust_ptr, fd), &us, sizeof (us));
130

    
131
                        memcpy(&us.addr, name, namelen);
132

    
133
                        /* skip registration */
134
                        break;
135
                case USR_SOCKET_LISTENER: 
136
                        /* already bound with address; only update to new type */
137
                        USR_SOCKET_FOR(*ust_ptr, fd).type = type;
138

    
139
                        rc = usr_socket_table_register_listener(ust_ptr, fd);
140
                        if (rc == -1) {
141
                                perror("usr_socket_table_register_listener");
142
                                return -1;
143
                        }
144
                        break;
145
                default:
146
                        errno = ENOTSUP;
147
                        break;
148
        }
149

    
150
        return fd;
151
}
152

    
153
int usr_socket_table_connect(struct usr_socket_table *ust_ptr, int domain, int type, int protocol, const struct sockaddr *name, socklen_t namelen) {
154
        int rc;
155
        int fd;
156

    
157
        rc = socket(domain, type, protocol);
158
        if (rc == -1) {
159
                perror("socket");
160
                return -1;
161
        }
162

    
163
        fd = rc;
164

    
165
        rc = connect(fd, name, namelen);
166
        if (rc == -1) {
167
                perror("connect");
168
                return -1;
169
        }
170

    
171
        return usr_socket_table_opened(ust_ptr, fd, USR_SOCKET_ACCEPTED, name, namelen);
172
}
173

    
174
int usr_socket_table_bind(struct usr_socket_table *ust_ptr, int domain, int type, int protocol, const struct sockaddr *name, socklen_t namelen) {
175
        int rc;
176
        int fd;
177

    
178
        rc = socket(domain, type, protocol);
179
        if (rc == -1) {
180
                perror("socket");
181
                return -1;
182
        }
183

    
184
        fd = rc;
185

    
186
        rc = bind(fd, name, namelen);
187
        if (rc == -1) {
188
                perror("bind");
189
                return -1;
190
        }
191

    
192
        return usr_socket_table_opened(ust_ptr, fd, USR_SOCKET_BOUND, name, namelen);
193
}
194

    
195
int usr_socket_table_listen(struct usr_socket_table *ust_ptr, int fd, int backlog) {
196
        int rc;
197

    
198
        rc = listen(fd, backlog);
199
        if (rc == -1) {
200
                perror("listen");
201
                return -1;
202
        }
203

    
204
        return usr_socket_table_opened(ust_ptr, fd, USR_SOCKET_LISTENER, NULL, 0);
205
}
206

    
207
int usr_socket_table_accept(struct usr_socket_table *ust_ptr, int fd) {
208
        int rc;
209
        struct sockaddr addr;
210
        socklen_t       addrlen;
211
        int accepted_fd;
212

    
213
        rc = accept(fd, &addr, &addrlen);
214
        if (rc == -1) {
215
                perror("accept");
216
                return 1;
217
        }
218

    
219
        accepted_fd = rc;
220

    
221
        return usr_socket_table_opened(ust_ptr, accepted_fd, USR_SOCKET_ACCEPTED, &addr, addrlen);
222
}
223

    
224
/* returns the next active file descriptor */
225
int usr_socket_table_poll(struct usr_socket_table *ust_ptr) {
226
        int rc;
227
        struct kevent kev;
228
        struct usr_socket *us_ptr;
229

    
230
        rc = kevent(ust_ptr->kq, NULL, 0, &kev, 1, NULL);
231
        if (rc == -1) {
232
                perror("kevent");
233
                return -1;
234
        }
235

    
236
        printf("kqueue() = %i, ident = %i, filter = %i, flags = %i, fflags = %i, data = %i\n", rc, (int) kev.ident, kev.filter, kev.flags, kev.fflags, (int) kev.data);
237

    
238
        us_ptr = &USR_SOCKET_FOR(*ust_ptr, kev.ident);
239

    
240
        switch (kev.filter) {
241
                case EVFILT_READ:
242
                case EVFILT_RECV:
243
                        if (us_ptr->managed) {
244
                                memcpy(&USR_SOCKET_FOR(*ust_ptr, kev.ident).recv.qev, &kev, sizeof (kev));
245
                        }
246
                        break;
247
                case EVFILT_WRITE:
248
                case EVFILT_SEND:
249
                        if (us_ptr->managed) {
250
                                memcpy(&USR_SOCKET_FOR(*ust_ptr, kev.ident).send.qev, &kev, sizeof (kev));
251
                        }
252
                        break;
253
                default:
254
                        break;
255
        }
256

    
257
        return kev.ident;
258
}
259

    
260
int usr_socket_recv(struct usr_socket *us_ptr, void *buf, size_t len, int flags) {
261
        int rc;
262

    
263
        rc = recv(us_ptr->fd, buf, len, flags);
264
        if (rc == -1) {
265
                perror("recv");
266
                return -1;
267
        }
268

    
269
        us_ptr->recv.qev.data -= rc;
270

    
271
        printf("received %i from %i with %i remaining\n", rc, us_ptr->fd, us_ptr->recv.qev.data);
272

    
273
        return rc;
274
}
275

    
276
int usr_socket_send(struct usr_socket *us_ptr, void *msgbuf, size_t len, int flags) {
277
        int rc;
278

    
279
        rc = send(us_ptr->fd, msgbuf, len, flags);
280
        if (rc == -1) {
281
                perror("send");
282
                return -1;
283
        }
284

    
285
        us_ptr->send.qev.data -= rc;
286

    
287
        printf("sent %i to %i with %i remaining\n", rc, us_ptr->fd, us_ptr->send.qev.data);
288

    
289
        return rc;
290
}
291

    
292
int main() {
293
        int rc;
294
        int s;
295
        struct usr_socket_table ust;
296
        char   echo_buf[10];
297
        size_t echo_buflen;
298
        struct usr_socket *us_ptr;
299
        struct sockaddr_un addr_un;
300
        struct sockaddr_in addr_in;
301

    
302
        rc = usr_socket_table_init(&ust);
303
        if (rc == -1) {
304
                perror("usr_socket_table_init");
305
                return 1;
306
        }
307

    
308
        /* Unix Domain Socket */
309

    
310
        /*
311
        memset(&addr_un, 0, sizeof (addr_un));
312
        strncpy(addr_un.sun_path, "test1.sock", sizeof (addr_un.sun_path));
313
        addr_un.sun_len = strnlen(addr_un.sun_path, sizeof (addr_un.sun_path));
314
        addr_un.sun_family = SOCK_STREAM;
315

316
        rc = usr_socket_table_connect(&ust, AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK, 0, (struct sockaddr *) &addr_un, sizeof (addr_un));
317
        if (rc == -1) {
318
                perror("usr_socket_table_connect");
319
                return 1;
320
        }
321

322
        s = rc;
323

324
        printf("connected socket %i to \"test1.sock\"\n", s);
325
        */
326

    
327
        /* Socket Server */
328

    
329
        memset(&addr_in, 0, sizeof (addr_in));
330
        addr_in.sin_family = AF_INET;
331
        addr_in.sin_port = htons(8081);
332
        addr_in.sin_addr.s_addr = inet_addr("0.0.0.0");
333

    
334
        rc = usr_socket_table_bind(&ust, AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0, (struct sockaddr *) &addr_in, sizeof (addr_in));
335
        if (rc == -1) {
336
                perror("usr_socket_table_bind");
337
                return 1;
338
        }
339

    
340
        s = rc;
341

    
342
        printf("bound socket %i to 0.0.0.0:8081\n", s);
343

    
344
        rc = usr_socket_table_listen(&ust, s, 32);
345
        if (rc == -1) {
346
                perror("usr_socket_table_listen");
347
                return 1;
348
        }
349

    
350
        printf("listening\n");
351

    
352
        /* Event Loop */
353

    
354
        while ((rc = usr_socket_table_poll(&ust)) != -1) {
355
                s = rc;
356

    
357
                printf("State change for socket %i\n", s);
358

    
359
                us_ptr = &USR_SOCKET_FOR(ust, s);
360
                if ( ! us_ptr->managed) {
361
                        printf("skipping unmanaged event for fd %i\n", s);
362
                        continue;
363
                }
364

    
365
                printf("event for socket of type %i\n", us_ptr->type);
366

    
367
                switch (us_ptr->type) {
368
                        case USR_SOCKET_LISTENER:
369
                                printf("accepting from %i\n", us_ptr->fd);
370
                                rc = usr_socket_table_accept(&ust, us_ptr->fd);
371
                                if (rc == -1) {
372
                                        perror("usr_socket_table_accept");
373
                                }
374
                                break;
375
                        case USR_SOCKET_ACCEPTED:
376
                                while (USR_SOCKET_MAY_RECV(*us_ptr) > 0) {
377
                                        rc = usr_socket_recv(us_ptr, echo_buf, sizeof (echo_buf), 0);
378
                                        if (rc == -1) {
379
                                                perror("usr_socket_recv");
380
                                                return 1;
381
                                        }
382
                                        echo_buflen = rc;
383

    
384
                                        if (echo_buflen > 0) {
385
                                                if (USR_SOCKET_MAY_SEND(*us_ptr) < echo_buflen) {
386
                                                        errno = ENOBUFS;
387
                                                        perror("echo_buflen");
388
                                                        return 1;
389
                                                }
390
                                                rc = usr_socket_send(us_ptr, echo_buf, echo_buflen, 0);
391
                                                if (rc == -1) {
392
                                                        perror("usr_socket_send");
393
                                                        return 1;
394
                                                }
395
                                        }
396
                                }
397
                                break;
398
                }
399
        }
400
        if (rc == -1) {
401
                perror("usr_socket_table_poll");
402
                return 1;
403
        }
404

    
405
        return 0;
406
}