From ce595ae0cc301d4967260c8a9342f6bd59f4f758 Mon Sep 17 00:00:00 2001 From: leitner Date: Tue, 27 Mar 2018 02:23:00 +0000 Subject: [PATCH] add experimental iom API for multithreaded I/O multiplexing (in io.h) --- CHANGES | 1 + io.h | 45 ++++++++++++++++++ io/iom_abort.3 | 19 ++++++++ io/iom_abort.c | 10 ++++ io/iom_add.3 | 33 +++++++++++++ io/iom_add.c | 28 +++++++++++ io/iom_init.3 | 30 ++++++++++++ io/iom_init.c | 38 +++++++++++++++ io/iom_wait.3 | 38 +++++++++++++++ io/iom_wait.c | 124 +++++++++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 366 insertions(+) create mode 100644 io/iom_abort.3 create mode 100644 io/iom_abort.c create mode 100644 io/iom_add.3 create mode 100644 io/iom_add.c create mode 100644 io/iom_init.3 create mode 100644 io/iom_init.c create mode 100644 io/iom_wait.3 create mode 100644 io/iom_wait.c diff --git a/CHANGES b/CHANGES index 552282e..a03e0fe 100644 --- a/CHANGES +++ b/CHANGES @@ -4,6 +4,7 @@ fix fmt_ip6 (Erwin Hoffmann) add MSG_ZEROCOPY support (only used for buffers >8k) use write in buffer_put for a slight perf improvement + add experimental iom API for multithreaded I/O multiplexing (in io.h) 0.31: special case buffer_get_token with token length 1 through memccpy (almost 4x speedup) diff --git a/io.h b/io.h index 3752e1b..5284245 100644 --- a/io.h +++ b/io.h @@ -144,6 +144,51 @@ int64 io_mmapwritefile(int64 out,int64 in,uint64 off,uint64 bytes,io_write_callb * aid in debugging the state machine if a descriptor loops or so */ unsigned int io_debugstring(int64 s,char* buf,unsigned int bufsize); +#ifdef __dietlibc__ +#include +#else +#include +#include +#endif + +enum { SLOTS=128 }; +typedef struct iomux { + int ctx; + int working; /* used to synchronize who is filling the queue */ + unsigned int h,l; /* high, low */ + struct { + int fd, events; + } q[SLOTS]; +#ifdef __dietlibc__ + mtx_t mtx; + cnd_t sem; +#else + sem_t sem; +#endif +} iomux_t; + + +/* Init master context */ +int iom_init(iomux_t* c); + +/* Add socket to iomux */ +enum { + IOM_READ=1, + IOM_WRITE=2, + IOM_ERROR=4 +}; +/* return -1 if error, or | of IOM_READ, IOM_WRITE or IOM_ERROR */ +int iom_add(iomux_t* c,int64 s,unsigned int events); + +/* Blocking wait for single event, timeout in milliseconds */ +/* return -1 if error, 0 if ok; s set to fd, revents set to known events on that fd */ +/* when done with the fd, call iom_add on it again! */ +/* This can be called by multiple threads in parallel */ +int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout); + +/* Call this to terminate all threads waiting in iom_wait */ +int iom_abort(iomux_t* c); + #ifdef __cplusplus } #endif diff --git a/io/iom_abort.3 b/io/iom_abort.3 new file mode 100644 index 0000000..0336f6b --- /dev/null +++ b/io/iom_abort.3 @@ -0,0 +1,19 @@ +.TH iom_abort 3 +.SH NAME +iom_abort \- abort all pending iom_wait calls +.SH SYNTAX +.B #include + +int \fBiom_abort\fP(iomux_t* c); +.SH DESCRIPTION +\fIiom_abort\fR will cause all currently running instances of +\fIiom_wait\fR to return immediately with return value -2. + +.SH "LINKING" +You may have to add \fI-lpthread\fR to the command line in the linking +step. + +.SH "RETURN VALUE" +iom_abort returns 0 on success and -1 on error, setting errno. +.SH "SEE ALSO" +iom_init, iom_add, iom_wait diff --git a/io/iom_abort.c b/io/iom_abort.c new file mode 100644 index 0000000..89d1406 --- /dev/null +++ b/io/iom_abort.c @@ -0,0 +1,10 @@ +#include "io_internal.h" + +int iom_abort(iomux_t* c) { + c->working=-2; +#ifdef __dietlibc__ + return cnd_broadcast(&c->sem); +#else + return sem_post(&c->sem); +#endif +} diff --git a/io/iom_add.3 b/io/iom_add.3 new file mode 100644 index 0000000..760a7d5 --- /dev/null +++ b/io/iom_add.3 @@ -0,0 +1,33 @@ +.TH iom_add 3 +.SH NAME +iom_add \- add event to I/O multiplexer +.SH SYNTAX +.B #include + +int \fBiom_add\fP(iomux_t* c, int64 fd, unsigned int events); +.SH DESCRIPTION +iom_add adds an event you are interested in to an I/O multiplexer. + +\fIfd\fR is the file descriptor (usually a socket) you are interested +in, and \fIevents\fR is the operation you want to do. It can be IOM_READ +or IOM_WRITE. + +If that operation becomes possible on that descriptor, and some thread +is calling \fIiom_wait\fR at the time, it will return and tell you the +fd and the event. + +Note that the event registration is removed from the iomux_t context if +it occurs. You will have to call \fIiom_wait\fR again after you handled +the event, if you are still interested in it. + +Closing a file descriptor with registered events will discard the event +registration. + +.SH "LINKING" +You may have to add \fI-lpthread\fR to the command line in the linking +step. + +.SH "RETURN VALUE" +iom_add returns 0 on success and -1 on error, setting errno. +.SH "SEE ALSO" +iom_init, iom_wait, iom_abort diff --git a/io/iom_add.c b/io/iom_add.c new file mode 100644 index 0000000..1d6d06f --- /dev/null +++ b/io/iom_add.c @@ -0,0 +1,28 @@ +#include "io_internal.h" +#ifdef HAVE_EPOLL +#include +#endif +#ifdef HAVE_KQUEUE +#include +#include +#include +#endif + +int iom_add(iomux_t* c,int64 s,unsigned int events) { +#ifdef HAVE_EPOLL + struct epoll_event e = { .events=EPOLLONESHOT, .data.fd=s }; + if (events & IOM_READ) e.events|=EPOLLIN; + if (events & IOM_WRITE) e.events|=EPOLLOUT; + return epoll_ctl(c->ctx, EPOLL_CTL_ADD, s, &e); +#elif defined(HAVE_KQUEUE) + struct kevent kev; + struct timespec ts = { 0 }; + EV_SET(&kev, s, + (events & IOM_READ ? EVFILT_READ : 0) + + (events & IOM_WRITE ? EVFILT_WRITE : 0), + EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, (void*)s); + return kevent(c->ctx, &kev, 1, 0, 0, &ts); +#else +#warning "only epoll and kqueue supported for now" +#endif +} diff --git a/io/iom_init.3 b/io/iom_init.3 new file mode 100644 index 0000000..7d6f9d3 --- /dev/null +++ b/io/iom_init.3 @@ -0,0 +1,30 @@ +.TH iom_init 3 +.SH NAME +iom_init \- create new I/O multiplexer +.SH SYNTAX +.B #include + +int \fBiom_init\fP(iomux_t* c); +.SH DESCRIPTION +iom_init initializes an I/O multiplexer. + +An I/O multiplexer is a context that can be used to do I/O multiplexing +with support for multiple threads. Add events to a multiplexer using +\fIiom_add\fR, and then get the next available event with +\fIiom_wait\fR. If you are done and want to signal all the threads +something, set a volatile global variable to tell the threads to stop +and then fall \fIiom_abort\fR to tell all pending iom_wait operations in +all threads to return immediately. + +After \fIiom_init\fR is done, \fIiom_add\fR and \fIiom_wait\fR can be +called from different threads on the same context, and they will +synchronize internally. + +.SH "LINKING" +You may have to add \fI-lpthread\fR to the command line in the linking +step. + +.SH "RETURN VALUE" +iom_init returns 0 on success and -1 on error, setting errno. +.SH "SEE ALSO" +iom_add, iom_wait, iom_abort diff --git a/io/iom_init.c b/io/iom_init.c new file mode 100644 index 0000000..769af8b --- /dev/null +++ b/io/iom_init.c @@ -0,0 +1,38 @@ +#include "io_internal.h" +#ifdef HAVE_EPOLL +#include +#endif +#ifdef HAVE_KQUEUE +#include +#include +#include +#endif + +int iom_init(iomux_t* c) { +#ifdef HAVE_EPOLL + c->ctx = epoll_create1(EPOLL_CLOEXEC); +#elif defined(HAVE_KQUEUE) + if ((c->ctx = kqueue()) != -1) { + if (fcntl(c->ctx,F_SETFD,FD_CLOEXEC) == -1) { + close(c->ctx); + c->ctx=-1; + } + } +#else +#warning "only epoll and kqueue supported for now" +#endif + unsigned int i; + c->working=0; + c->h=c->l=0; /* no elements in queue */ + for (i=0; iq[i].fd=-1; + c->q[i].events=0; + } +#ifdef __dietlibc__ + mtx_init(&c->mtx, mtx_timed); + cnd_init(&c->sem); +#else + sem_init(&c->sem, 0, 1); +#endif + return (c->ctx!=-1); +} diff --git a/io/iom_wait.3 b/io/iom_wait.3 new file mode 100644 index 0000000..e332114 --- /dev/null +++ b/io/iom_wait.3 @@ -0,0 +1,38 @@ +.TH iom_wait 3 +.SH NAME +iom_wait \- wait for event from I/O multiplexer +.SH SYNTAX +.B #include + +int \fBiom_wait\fP(iomux_t* c, + int64* fd, unsigned int* events, + unsigned long timeout); +.SH DESCRIPTION +iom_wait will wait for events registered to the I/O multiplexer with +\fIiom_add\fR. It will wait \fItimeout\fR milliseconds. + +If during that time any of the registered events occur, \fIiom_wait\fR +will set \fIfd\fR to the file descriptor the event happened on, and +\fIevents\fR to the sum of IOM_READ, IOM_WRITE and IOM_ERROR, depending +on what event actually happened, and return 1. + +If nothing happens during that time, it will return 0 and leave \fIfd\fR +and \fIevents\fR alone. + +Note that the event registration is removed from the iomux_t context if +it occurs. You will have to call \fIiom_wait\fR again after you handled +the event, if you are still interested in it. + +Closing a file descriptor with registered events will discard the event +registration. + +.SH "LINKING" +You may have to add \fI-lpthread\fR to the command line in the linking +step. + +.SH "RETURN VALUE" +iom_wait returns 1 on success, 0 if there was a timeout, and -1 on +error, setting errno. If \fIiom_abort\fR was called on the I/O +multiplexer context, it will return -2. +.SH "SEE ALSO" +iom_init, iom_add, iom_abort diff --git a/io/iom_wait.c b/io/iom_wait.c new file mode 100644 index 0000000..e9187b0 --- /dev/null +++ b/io/iom_wait.c @@ -0,0 +1,124 @@ +#include "io_internal.h" +#ifdef HAVE_EPOLL +#include +#endif +#ifdef HAVE_KQUEUE +#include +#include +#include +#endif +#include + +int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) { + for (;;) { + /* If we have an event in the queue, use that one */ + int r; + if (c->working==-2) return -2; /* iomux was aborted */ + for (;;) { + unsigned int f=c->l; + if (f == c->h) + break; /* no elements in queue */ + int n=(f+1)%SLOTS; + if (__sync_bool_compare_and_swap(&c->l,f,n)) { + /* we got one, and its index is in f */ + *s=c->q[f].fd; + *revents=c->q[f].events; + } + /* collided with another thread, try again */ + } + /* The queue was empty. If someone else is already calling + * epoll_wait/kevent, then use the semaphore */ + if (__sync_bool_compare_and_swap(&c->working,0,1)) { + /* we have the job to fill the struct. */ + int freeslots = (c->h - c->l); + if (!freeslots) freeslots=SLOTS; + +#ifdef HAVE_EPOLL + struct epoll_event ee[SLOTS]; + int i; + r=epoll_wait(c->ctx, ee, freeslots, timeout); + if (r<=0) { + /* we ran into a timeout, so let someone else take over */ + c->working=0; +#ifdef __dietlibc__ + cnd_broadcast(&c->sem); +#else + sem_post(&c->sem); +#endif + return r; + } + for (i=0; iq[c->h].fd=ee[i].data.fd; + c->q[c->h].events=e; + c->h = (c->h + 1) % SLOTS; + } + } +#elif defined(HAVE_KQUEUE) + struct kevent kev[SLOTS]; + struct timespec ts = { .tv_sec=timeout/1000, .tv_nsec=(timeout%1000)*1000000 }; + int r=kevent(c->ctx, 0, 0, &kev, freeslots, &ts); + if (r<=0) { + /* we ran into a timeout, so let someone else take over */ + c->working=0; +#ifdef __dietlibc__ + cnd_broadcast(&c->sem); +#else + sem_post(&c->sem); +#endif + return r; + } + for (i=0; iq[c->h].fd=kev[i].ident; + c->q[c->h].events=e; + c->h = (c->h + 1) % SLOTS; + } + } +#else +#warning "only epoll and kqueue supported for now" +#endif + /* We need to signal the other threads. + Either there are other events left, or we need one of them to + wake up and call epoll_wait/kevent next, because we aren't + doing it anymore */ + c->working=0; +#ifdef __dietlibc__ + cnd_signal(&c->sem); +#else + sem_post(&c->sem); +#endif + return 1; + } else { + /* somebody else has the job to fill the queue */ + struct timespec ts; + ts.tv_sec = timeout / 1000; + ts.tv_nsec = (timeout % 1000) * 1000000; +#ifdef __dietlibc__ + r=cnd_timedwait(&c->sem,&c->mtx,&ts); +#else + r=sem_timedwait(&c->sem,&ts); +#endif + if (r==-1) { + if (errno==ETIMEDOUT) return 0; + return -1; + } + /* fall through into next loop iteration */ + } + } +}