From 967e3ce019a5ed4c5691f57feb68f74b212acf13 Mon Sep 17 00:00:00 2001 From: leitner Date: Thu, 28 Aug 2014 19:03:57 +0000 Subject: [PATCH] revert edge triggering epoll; it had reliability and fairness issues and was also not actually faster --- io/io_canread.c | 6 +-- io/io_canwrite.c | 4 +- io/io_dontwantread.c | 10 ++++ io/io_dontwantwrite.c | 10 ++++ io/io_eagain.c | 2 +- io/io_eagain_read.c | 2 +- io/io_eagain_write.c | 2 +- io/io_fd.c | 6 +-- io/io_tryread.c | 2 +- io/io_trywrite.c | 2 +- io/io_waituntil2.c | 104 +++++++++++++++++++++++++++++++++--------- io/io_wantwrite.c | 36 +++++++-------- 12 files changed, 133 insertions(+), 53 deletions(-) diff --git a/io/io_canread.c b/io/io_canread.c index a26d985..0e23f51 100644 --- a/io/io_canread.c +++ b/io/io_canread.c @@ -12,7 +12,7 @@ void io_wantread_really(int64 d, io_entry* e); int64 io_canread() { io_entry* e; if (first_readable==-1) -#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) +#if defined(HAVE_SIGIO) { if (alt_firstread>=0 && (e=iarray_get(&io_fds,alt_firstread)) && e->canread) { debug_printf(("io_canread: normal read queue is empty, swapping in alt read queue (starting with %ld)\n",alt_firstread)); @@ -44,11 +44,11 @@ int64 io_canread() { e->canread #endif ) { -#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) +#if defined(HAVE_SIGIO) e->next_read=alt_firstread; alt_firstread=r; debug_printf(("io_canread: enqueue %ld in alt read queue (next is %ld)\n",alt_firstread,e->next_read)); - if (io_waitmode!=_SIGIO && io_waitmode!=EPOLL) + if (io_waitmode!=_SIGIO) #endif e->canread=0; if (!e->kernelwantread) diff --git a/io/io_canwrite.c b/io/io_canwrite.c index 1f9c465..21e28ba 100644 --- a/io/io_canwrite.c +++ b/io/io_canwrite.c @@ -8,7 +8,7 @@ void io_wantwrite_really(int64 d, io_entry* e); int64 io_canwrite() { io_entry* e; if (first_writeable==-1) -#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) +#if defined(HAVE_SIGIO) { if (alt_firstwrite>=0 && (e=iarray_get(&io_fds,alt_firstwrite)) && e->canwrite) { debug_printf(("io_canwrite: normal write queue is empty, swapping in alt write queue (starting with %ld)\n",alt_firstwrite)); @@ -35,7 +35,7 @@ int64 io_canwrite() { e->canwrite #endif ) { -#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) +#if defined(HAVE_SIGIO) e->next_write=alt_firstwrite; alt_firstwrite=r; debug_printf(("io_canwrite: enqueue %ld in alt write queue (next is %ld)\n",alt_firstwrite,e->next_write)); diff --git a/io/io_dontwantread.c b/io/io_dontwantread.c index bb16515..cb9d3d1 100644 --- a/io/io_dontwantread.c +++ b/io/io_dontwantread.c @@ -30,6 +30,16 @@ void io_dontwantread_really(int64 d, io_entry* e) { assert(e->kernelwantread); newfd=!e->kernelwantwrite; io_wanted_fds-=newfd; +#ifdef HAVE_EPOLL + if (io_waitmode==EPOLL) { + struct epoll_event x; + byte_zero(&x,sizeof(x)); // to shut up valgrind + x.events=0; + if (e->kernelwantwrite) x.events|=EPOLLOUT; + x.data.fd=d; + epoll_ctl(io_master,e->kernelwantwrite?EPOLL_CTL_MOD:EPOLL_CTL_DEL,d,&x); + } +#endif #ifdef HAVE_KQUEUE if (io_waitmode==KQUEUE) { struct kevent kev; diff --git a/io/io_dontwantwrite.c b/io/io_dontwantwrite.c index 37008d6..fbf0fe5 100644 --- a/io/io_dontwantwrite.c +++ b/io/io_dontwantwrite.c @@ -36,6 +36,16 @@ void io_dontwantwrite_really(int64 d,io_entry* e) { assert(e->kernelwantwrite); newfd=!e->kernelwantread; io_wanted_fds-=newfd; +#ifdef HAVE_EPOLL + if (io_waitmode==EPOLL) { + struct epoll_event x; + byte_zero(&x,sizeof(x)); // to shut up valgrind + x.events=0; + if (e->wantread) x.events|=EPOLLIN; + x.data.fd=d; + epoll_ctl(io_master,e->kernelwantread?EPOLL_CTL_MOD:EPOLL_CTL_DEL,d,&x); + } +#endif #ifdef HAVE_KQUEUE if (io_waitmode==KQUEUE) { struct kevent kev; diff --git a/io/io_eagain.c b/io/io_eagain.c index d28ee2d..659f9fa 100644 --- a/io/io_eagain.c +++ b/io/io_eagain.c @@ -5,7 +5,7 @@ void io_eagain(int64 d) { if (e) { if (e->wantread) e->canread=0; if (e->wantwrite) e->canwrite=0; -#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) +#if defined(HAVE_SIGIO) if (d==alt_firstread) { debug_printf(("io_eagain: dequeueing %lld from alt read queue (next is %ld)\n",d,e->next_read)); alt_firstread=e->next_read; diff --git a/io/io_eagain_read.c b/io/io_eagain_read.c index 71f89a4..14640a8 100644 --- a/io/io_eagain_read.c +++ b/io/io_eagain_read.c @@ -4,7 +4,7 @@ void io_eagain_read(int64 d) { io_entry* e=iarray_get(&io_fds,d); if (e) { e->canread=0; -#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) +#if defined(HAVE_SIGIO) if (d==alt_firstread) { debug_printf(("io_eagain: dequeueing %lld from alt read queue (next is %ld)\n",d,e->next_read)); alt_firstread=e->next_read; diff --git a/io/io_eagain_write.c b/io/io_eagain_write.c index f8c1bae..276f401 100644 --- a/io/io_eagain_write.c +++ b/io/io_eagain_write.c @@ -4,7 +4,7 @@ void io_eagain_write(int64 d) { io_entry* e=iarray_get(&io_fds,d); if (e) { e->canwrite=0; -#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) +#if defined(HAVE_SIGIO) if (d==alt_firstwrite) { debug_printf(("io_eagain: dequeueing %lld from alt write queue (next is %ld)\n",d,e->next_write)); alt_firstwrite=e->next_write; diff --git a/io/io_fd.c b/io/io_fd.c index ab83f8c..1b9538f 100644 --- a/io/io_fd.c +++ b/io/io_fd.c @@ -47,7 +47,7 @@ int io_master; int io_signum; sigset_t io_ss; #endif -#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) +#if defined(HAVE_SIGIO) long alt_firstread; long alt_firstwrite; #endif @@ -98,10 +98,8 @@ static io_entry* io_fd_internal(int64 d) { if (io_master!=-1) io_waitmode=DEVPOLL; } #endif -#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) - alt_firstread=alt_firstwrite=-1; -#endif #if defined(HAVE_SIGIO) + alt_firstread=alt_firstwrite=-1; if (io_waitmode==UNDECIDED) { io_signum=SIGRTMIN+1; if (sigemptyset(&io_ss)==0 && diff --git a/io/io_tryread.c b/io/io_tryread.c index 7f3fb21..ae9fcd5 100644 --- a/io/io_tryread.c +++ b/io/io_tryread.c @@ -112,7 +112,7 @@ int64 io_tryread(int64 d,char* buf,int64 len) { } if (r!=len) { e->canread=0; -#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) +#if defined(HAVE_SIGIO) if (d==alt_firstread) { debug_printf(("io_tryread: dequeueing %ld from alt read queue (next is %ld)\n",d,e->next_read)); alt_firstread=e->next_read; diff --git a/io/io_trywrite.c b/io/io_trywrite.c index 33091be..1f09d1a 100644 --- a/io/io_trywrite.c +++ b/io/io_trywrite.c @@ -106,7 +106,7 @@ int64 io_trywrite(int64 d,const char* buf,int64 len) { } if (r!=len) { e->canwrite=0; -#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) +#if defined(HAVE_SIGIO) if (d==alt_firstwrite) { debug_printf(("io_trywrite: dequeueing %ld from alt write queue (next is %ld)\n",d,e->next_write)); alt_firstwrite=e->next_write; diff --git a/io/io_waituntil2.c b/io/io_waituntil2.c index ac4ff6a..4de711c 100644 --- a/io/io_waituntil2.c +++ b/io/io_waituntil2.c @@ -124,44 +124,106 @@ int64 io_waituntil2(int64 milliseconds) { if (io_waitmode==EPOLL) { int n; struct epoll_event y[100]; - io_entry* e; - if (alt_firstread>=0 && (e=iarray_get(&io_fds,alt_firstread)) && e->canread) { -// write(1,"r",1); - return 1; - } - if (alt_firstwrite>=0 && (e=iarray_get(&io_fds,alt_firstwrite)) && e->canwrite) { -// write(1,"w",1); - return 1; - } -// write(1,".",1); if ((n=epoll_wait(io_master,y,100,milliseconds))==-1) return -1; for (i=0; ikernelwantread) curevents |= EPOLLIN; + if (e->kernelwantwrite) curevents |= EPOLLOUT; + +#ifdef DEBUG + if ((y[i].events&(EPOLLIN|EPOLLPRI|EPOLLRDNORM|EPOLLRDBAND)) && !e->kernelwantread) + printf("got unexpected read event on fd #%d\n",y[i].data.fd); + if ((y[i].events&EPOLLOUT) && !e->kernelwantwrite) + printf("got unexpected write event on fd #%d\n",y[i].data.fd); +#endif + if (y[i].events&(POLLERR|POLLHUP)) { /* error; signal whatever app is looking for */ if (e->wantread) y[i].events|=POLLIN; if (e->wantwrite) y[i].events|=POLLOUT; } - if (y[i].events&POLLIN && !e->canread) { - debug_printf(("io_waituntil2: enqueueing %ld in normal read queue before %ld\n",info.si_fd,first_readable)); - e->canread=1; - e->next_read=first_readable; - first_readable=y[i].data.fd; + + newevents=0; + if (!e->canread || e->wantread) { + newevents|=EPOLLIN; + e->kernelwantread=1; + } else + e->kernelwantread=0; + if (!e->canwrite || e->wantwrite) { + newevents|=EPOLLOUT; + e->kernelwantwrite=1; + } else + e->kernelwantwrite=0; + + /* if we think we can not read, but the kernel tells us that we + * can, put this fd in the relevant data structures */ + if (!e->canread && (y[i].events&(EPOLLIN|EPOLLPRI|EPOLLRDNORM|EPOLLRDBAND))) { + if (e->canread) { + newevents &= ~EPOLLIN; + } else { + e->canread=1; + if (e->wantread) { + e->next_read=first_readable; + first_readable=y[i].data.fd; + } + } } - if (y[i].events&POLLOUT && !e->canwrite) { - debug_printf(("io_waituntil2: enqueueing %ld in normal write queue before %ld\n",info.si_fd,first_writeable)); - e->canwrite=1; - e->next_write=first_writeable; - first_writeable=y[i].data.fd; + + /* if the kernel says the fd is writable, ... */ + if (y[i].events&EPOLLOUT) { + /* Usually, if the kernel says a descriptor is writable, we + * note it and do not tell the kernel not to tell us again. + * The idea is that once we notify the caller that the fd is + * writable, and the caller handles the event, the caller will + * just ask to be notified of future write events again. We + * are trying to save the superfluous epoll_ctl syscalls. + * If e->canwrite is set, then this gamble did not work out. + * We told the caller, yet after the caller is done we still + * got another write event. Clearly the user is implementing + * some kind of throttling and we can tell the kernel to leave + * us alone for now. */ + if (e->canwrite) { + newevents &= ~EPOLLOUT; + e->kernelwantwrite=0; + } else { + /* If !e->wantwrite: The laziness optimization in + * io_dontwantwrite hit. We did not tell the kernel that we + * are no longer interested in writing to save the syscall. + * Now we know we could write if we wanted; remember that + * and then go on. */ + e->canwrite=1; + if (e->wantwrite) { + e->next_write=first_writeable; + first_writeable=y[i].data.fd; + } + } + } + + if (newevents != curevents) { +#if 0 + printf("canread %d, wantread %d, kernelwantread %d, canwrite %d, wantwrite %d, kernelwantwrite %d\n", + e->canread,e->wantread,e->kernelwantread,e->canwrite,e->wantwrite,e->kernelwantwrite); + printf("newevents: read %d write %d\n",!!(newevents&EPOLLIN),!!(newevents&EPOLLOUT)); +#endif + y[i].events=newevents; + if (newevents) { + epoll_ctl(io_master,EPOLL_CTL_MOD,y[i].data.fd,y+i); + } else { + epoll_ctl(io_master,EPOLL_CTL_DEL,y[i].data.fd,y+i); + --io_wanted_fds; + } } } else { +#if 0 #ifdef __dietlibc__ char buf[FMT_ULONG]; buf[fmt_ulong(buf,y[i].data.fd)]=0; __write2("got epoll event on invalid fd "); __write2(buf); __write2("!\n"); +#endif #endif epoll_ctl(io_master,EPOLL_CTL_DEL,y[i].data.fd,y+i); } diff --git a/io/io_wantwrite.c b/io/io_wantwrite.c index 3bf2bff..99dcd22 100644 --- a/io/io_wantwrite.c +++ b/io/io_wantwrite.c @@ -40,6 +40,16 @@ void io_wantwrite_really(int64 d, io_entry* e) { assert(!e->kernelwantwrite); /* we should not be here if we already told the kernel we want to write */ newfd=(!e->kernelwantread); io_wanted_fds+=newfd; +#ifdef HAVE_EPOLL + if (io_waitmode==EPOLL) { + struct epoll_event x; + byte_zero(&x,sizeof(x)); // to shut up valgrind + x.events=EPOLLOUT; + if (e->kernelwantread) x.events|=EPOLLIN; + x.data.fd=d; + epoll_ctl(io_master,e->kernelwantread?EPOLL_CTL_MOD:EPOLL_CTL_ADD,d,&x); + } +#endif #ifdef HAVE_KQUEUE if (io_waitmode==KQUEUE) { struct kevent kev; @@ -58,25 +68,15 @@ void io_wantwrite_really(int64 d, io_entry* e) { write(io_master,&x,sizeof(x)); } #endif -#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) - if (io_waitmode==_SIGIO || io_waitmode==EPOLL) { +#ifdef HAVE_SIGIO + if (io_waitmode==_SIGIO) { struct pollfd p; - if (io_waitmode==EPOLL && !e->epolladded) { - struct epoll_event x; - byte_zero(&x,sizeof(x)); // shut up valgrind - x.events=EPOLLIN|EPOLLOUT|EPOLLET; - x.data.fd=d; - epoll_ctl(io_master,EPOLL_CTL_ADD,d,&x); - e->epolladded=1; - } - if (e->canwrite==0) { - p.fd=d; - p.events=POLLOUT; - switch (poll(&p,1,0)) { - case 1: e->canwrite=1; break; -// case 0: e->canwrite=0; break; - case -1: return; - } + p.fd=d; + p.events=POLLOUT; + switch (poll(&p,1,0)) { + case 1: e->canwrite=1; break; + case 0: e->canwrite=0; break; + case -1: return; } if (e->canwrite) { debug_printf(("io_wantwrite: enqueueing %lld in normal write queue before %ld\n",d,first_readable));