Project

General

Profile

Submit #3135 » kqsbtest.c

tautolog, 05/25/2018 09:49 PM

 
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>

#include <unistd.h>

#include <stdio.h>

#include <errno.h>

#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <string.h>

#define EVFILT_RECV (-11)
#define EVFILT_SEND (-12)

struct usr_pipe {
struct kevent qev; /* queue kevent */
};

enum USR_SOCKET_TYPE {
USR_SOCKET_UNKNOWN = 0,
USR_SOCKET_ACCEPTED,
USR_SOCKET_BOUND,
USR_SOCKET_LISTENER,
};

struct usr_socket {
struct usr_pipe recv;
struct usr_pipe send;

int managed;
int fd;
enum USR_SOCKET_TYPE type;
char *path;
struct sockaddr addr;
socklen_t addrlen;
};
#define USR_SOCKET_MAY_RECV(us) ((us).recv.qev.data)
#define USR_SOCKET_MAY_SEND(us) ((us).send.qev.data)

#define USR_SOCKET_TABLE_SIZE 1024
struct usr_socket_table {
struct usr_socket socket_table[USR_SOCKET_TABLE_SIZE];
int kq;
};
#define USR_SOCKET_FOR(ust, fd) ((ust).socket_table[fd])

int usr_socket_table_init(struct usr_socket_table *ust_ptr) {
ust_ptr->kq = kqueue();
memset(&ust_ptr->socket_table, 0, sizeof (ust_ptr->socket_table));
};

int usr_socket_table_register_listener(struct usr_socket_table *ust_ptr, int fd) {
int rc;
struct kevent kev_pair_in[1];
struct kevent kev_pair_out[1];
struct timespec zero_time;

EV_SET(&kev_pair_in[0], fd, EVFILT_READ, EV_ADD|EV_RECEIPT, 0, 0, NULL);
zero_time.tv_sec = 0;
zero_time.tv_nsec = 0;
rc = kevent(ust_ptr->kq, kev_pair_in, 1, kev_pair_out, 1, &zero_time);
if (rc == -1) {
perror("kevent");
return -1;
}

printf("registered listener %i\n", fd);

return 0;
}

int usr_socket_table_register_accepted(struct usr_socket_table *ust_ptr, int fd) {
int rc;
struct kevent kev_pair_in[2];
struct kevent kev_pair_out[2];
struct timespec zero_time;

EV_SET(&kev_pair_in[0], fd, EVFILT_RECV, EV_ADD|EV_RECEIPT, 0, 0, NULL);
EV_SET(&kev_pair_in[1], fd, EVFILT_SEND, EV_ADD|EV_RECEIPT, 0, 0, NULL);
zero_time.tv_sec = 0;
zero_time.tv_nsec = 0;
rc = kevent(ust_ptr->kq, kev_pair_in, 2, kev_pair_out, 2, &zero_time);
if (rc == -1) {
perror("kevent");
return -1;
}

printf("registered accepted %i\n", fd);

return 0;
}

int usr_socket_table_opened(struct usr_socket_table *ust_ptr, int fd, enum USR_SOCKET_TYPE type, const struct sockaddr *name, socklen_t namelen) {
int rc;
struct usr_socket us;

if (fd >= USR_SOCKET_TABLE_SIZE) {
errno = ENOBUFS;
return -1;
}

switch (type) {
case USR_SOCKET_ACCEPTED:
memset(&us, 0, sizeof (us));
us.fd = fd;
us.managed = 1;
us.type = type;
memcpy(&USR_SOCKET_FOR(*ust_ptr, fd), &us, sizeof (us));

memcpy(&USR_SOCKET_FOR(*ust_ptr, fd).addr, name, namelen);

rc = usr_socket_table_register_accepted(ust_ptr, fd);
if (rc == -1) {
perror("usr_socket_table_register_accepted");
return -1;
}
break;
case USR_SOCKET_BOUND:
memset(&us, 0, sizeof (us));
us.fd = fd;
us.managed = 1;
us.type = type;
memcpy(&USR_SOCKET_FOR(*ust_ptr, fd), &us, sizeof (us));

memcpy(&us.addr, name, namelen);

/* skip registration */
break;
case USR_SOCKET_LISTENER:
/* already bound with address; only update to new type */
USR_SOCKET_FOR(*ust_ptr, fd).type = type;

rc = usr_socket_table_register_listener(ust_ptr, fd);
if (rc == -1) {
perror("usr_socket_table_register_listener");
return -1;
}
break;
default:
errno = ENOTSUP;
break;
}

return fd;
}

int usr_socket_table_connect(struct usr_socket_table *ust_ptr, int domain, int type, int protocol, const struct sockaddr *name, socklen_t namelen) {
int rc;
int fd;

rc = socket(domain, type, protocol);
if (rc == -1) {
perror("socket");
return -1;
}

fd = rc;

rc = connect(fd, name, namelen);
if (rc == -1) {
perror("connect");
return -1;
}

return usr_socket_table_opened(ust_ptr, fd, USR_SOCKET_ACCEPTED, name, namelen);
}

int usr_socket_table_bind(struct usr_socket_table *ust_ptr, int domain, int type, int protocol, const struct sockaddr *name, socklen_t namelen) {
int rc;
int fd;

rc = socket(domain, type, protocol);
if (rc == -1) {
perror("socket");
return -1;
}

fd = rc;

rc = bind(fd, name, namelen);
if (rc == -1) {
perror("bind");
return -1;
}

return usr_socket_table_opened(ust_ptr, fd, USR_SOCKET_BOUND, name, namelen);
}

int usr_socket_table_listen(struct usr_socket_table *ust_ptr, int fd, int backlog) {
int rc;

rc = listen(fd, backlog);
if (rc == -1) {
perror("listen");
return -1;
}

return usr_socket_table_opened(ust_ptr, fd, USR_SOCKET_LISTENER, NULL, 0);
}

int usr_socket_table_accept(struct usr_socket_table *ust_ptr, int fd) {
int rc;
struct sockaddr addr;
socklen_t addrlen;
int accepted_fd;

rc = accept(fd, &addr, &addrlen);
if (rc == -1) {
perror("accept");
return 1;
}

accepted_fd = rc;

return usr_socket_table_opened(ust_ptr, accepted_fd, USR_SOCKET_ACCEPTED, &addr, addrlen);
}

/* returns the next active file descriptor */
int usr_socket_table_poll(struct usr_socket_table *ust_ptr) {
int rc;
struct kevent kev;
struct usr_socket *us_ptr;

rc = kevent(ust_ptr->kq, NULL, 0, &kev, 1, NULL);
if (rc == -1) {
perror("kevent");
return -1;
}

printf("kqueue() = %i, ident = %i, filter = %i, flags = %i, fflags = %i, data = %i\n", rc, (int) kev.ident, kev.filter, kev.flags, kev.fflags, (int) kev.data);

us_ptr = &USR_SOCKET_FOR(*ust_ptr, kev.ident);

switch (kev.filter) {
case EVFILT_READ:
case EVFILT_RECV:
if (us_ptr->managed) {
memcpy(&USR_SOCKET_FOR(*ust_ptr, kev.ident).recv.qev, &kev, sizeof (kev));
}
break;
case EVFILT_WRITE:
case EVFILT_SEND:
if (us_ptr->managed) {
memcpy(&USR_SOCKET_FOR(*ust_ptr, kev.ident).send.qev, &kev, sizeof (kev));
}
break;
default:
break;
}

return kev.ident;
}

int usr_socket_recv(struct usr_socket *us_ptr, void *buf, size_t len, int flags) {
int rc;

rc = recv(us_ptr->fd, buf, len, flags);
if (rc == -1) {
perror("recv");
return -1;
}

us_ptr->recv.qev.data -= rc;

printf("received %i from %i with %i remaining\n", rc, us_ptr->fd, us_ptr->recv.qev.data);

return rc;
}

int usr_socket_send(struct usr_socket *us_ptr, void *msgbuf, size_t len, int flags) {
int rc;

rc = send(us_ptr->fd, msgbuf, len, flags);
if (rc == -1) {
perror("send");
return -1;
}

us_ptr->send.qev.data -= rc;

printf("sent %i to %i with %i remaining\n", rc, us_ptr->fd, us_ptr->send.qev.data);

return rc;
}

int main() {
int rc;
int s;
struct usr_socket_table ust;
char echo_buf[10];
size_t echo_buflen;
struct usr_socket *us_ptr;
struct sockaddr_un addr_un;
struct sockaddr_in addr_in;

rc = usr_socket_table_init(&ust);
if (rc == -1) {
perror("usr_socket_table_init");
return 1;
}

/* Unix Domain Socket */

/*
memset(&addr_un, 0, sizeof (addr_un));
strncpy(addr_un.sun_path, "test1.sock", sizeof (addr_un.sun_path));
addr_un.sun_len = strnlen(addr_un.sun_path, sizeof (addr_un.sun_path));
addr_un.sun_family = SOCK_STREAM;

rc = usr_socket_table_connect(&ust, AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK, 0, (struct sockaddr *) &addr_un, sizeof (addr_un));
if (rc == -1) {
perror("usr_socket_table_connect");
return 1;
}

s = rc;

printf("connected socket %i to \"test1.sock\"\n", s);
*/

/* Socket Server */

memset(&addr_in, 0, sizeof (addr_in));
addr_in.sin_family = AF_INET;
addr_in.sin_port = htons(8081);
addr_in.sin_addr.s_addr = inet_addr("0.0.0.0");

rc = usr_socket_table_bind(&ust, AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0, (struct sockaddr *) &addr_in, sizeof (addr_in));
if (rc == -1) {
perror("usr_socket_table_bind");
return 1;
}

s = rc;

printf("bound socket %i to 0.0.0.0:8081\n", s);

rc = usr_socket_table_listen(&ust, s, 32);
if (rc == -1) {
perror("usr_socket_table_listen");
return 1;
}

printf("listening\n");

/* Event Loop */

while ((rc = usr_socket_table_poll(&ust)) != -1) {
s = rc;

printf("State change for socket %i\n", s);

us_ptr = &USR_SOCKET_FOR(ust, s);
if ( ! us_ptr->managed) {
printf("skipping unmanaged event for fd %i\n", s);
continue;
}

printf("event for socket of type %i\n", us_ptr->type);

switch (us_ptr->type) {
case USR_SOCKET_LISTENER:
printf("accepting from %i\n", us_ptr->fd);
rc = usr_socket_table_accept(&ust, us_ptr->fd);
if (rc == -1) {
perror("usr_socket_table_accept");
}
break;
case USR_SOCKET_ACCEPTED:
while (USR_SOCKET_MAY_RECV(*us_ptr) > 0) {
rc = usr_socket_recv(us_ptr, echo_buf, sizeof (echo_buf), 0);
if (rc == -1) {
perror("usr_socket_recv");
return 1;
}
echo_buflen = rc;

if (echo_buflen > 0) {
if (USR_SOCKET_MAY_SEND(*us_ptr) < echo_buflen) {
errno = ENOBUFS;
perror("echo_buflen");
return 1;
}
rc = usr_socket_send(us_ptr, echo_buf, echo_buflen, 0);
if (rc == -1) {
perror("usr_socket_send");
return 1;
}
}
}
break;
}
}
if (rc == -1) {
perror("usr_socket_table_poll");
return 1;
}

return 0;
}
(2-2/2)