revert edge triggering epoll; it had reliability and fairness issues and

was also not actually faster
master
leitner 10 years ago
parent 8b3eb0be7c
commit 967e3ce019

@ -12,7 +12,7 @@ void io_wantread_really(int64 d, io_entry* e);
int64 io_canread() { int64 io_canread() {
io_entry* e; io_entry* e;
if (first_readable==-1) if (first_readable==-1)
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) #if defined(HAVE_SIGIO)
{ {
if (alt_firstread>=0 && (e=iarray_get(&io_fds,alt_firstread)) && e->canread) { if (alt_firstread>=0 && (e=iarray_get(&io_fds,alt_firstread)) && e->canread) {
debug_printf(("io_canread: normal read queue is empty, swapping in alt read queue (starting with %ld)\n",alt_firstread)); debug_printf(("io_canread: normal read queue is empty, swapping in alt read queue (starting with %ld)\n",alt_firstread));
@ -44,11 +44,11 @@ int64 io_canread() {
e->canread e->canread
#endif #endif
) { ) {
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) #if defined(HAVE_SIGIO)
e->next_read=alt_firstread; e->next_read=alt_firstread;
alt_firstread=r; alt_firstread=r;
debug_printf(("io_canread: enqueue %ld in alt read queue (next is %ld)\n",alt_firstread,e->next_read)); debug_printf(("io_canread: enqueue %ld in alt read queue (next is %ld)\n",alt_firstread,e->next_read));
if (io_waitmode!=_SIGIO && io_waitmode!=EPOLL) if (io_waitmode!=_SIGIO)
#endif #endif
e->canread=0; e->canread=0;
if (!e->kernelwantread) if (!e->kernelwantread)

@ -8,7 +8,7 @@ void io_wantwrite_really(int64 d, io_entry* e);
int64 io_canwrite() { int64 io_canwrite() {
io_entry* e; io_entry* e;
if (first_writeable==-1) if (first_writeable==-1)
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) #if defined(HAVE_SIGIO)
{ {
if (alt_firstwrite>=0 && (e=iarray_get(&io_fds,alt_firstwrite)) && e->canwrite) { if (alt_firstwrite>=0 && (e=iarray_get(&io_fds,alt_firstwrite)) && e->canwrite) {
debug_printf(("io_canwrite: normal write queue is empty, swapping in alt write queue (starting with %ld)\n",alt_firstwrite)); debug_printf(("io_canwrite: normal write queue is empty, swapping in alt write queue (starting with %ld)\n",alt_firstwrite));
@ -35,7 +35,7 @@ int64 io_canwrite() {
e->canwrite e->canwrite
#endif #endif
) { ) {
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) #if defined(HAVE_SIGIO)
e->next_write=alt_firstwrite; e->next_write=alt_firstwrite;
alt_firstwrite=r; alt_firstwrite=r;
debug_printf(("io_canwrite: enqueue %ld in alt write queue (next is %ld)\n",alt_firstwrite,e->next_write)); debug_printf(("io_canwrite: enqueue %ld in alt write queue (next is %ld)\n",alt_firstwrite,e->next_write));

@ -30,6 +30,16 @@ void io_dontwantread_really(int64 d, io_entry* e) {
assert(e->kernelwantread); assert(e->kernelwantread);
newfd=!e->kernelwantwrite; newfd=!e->kernelwantwrite;
io_wanted_fds-=newfd; io_wanted_fds-=newfd;
#ifdef HAVE_EPOLL
if (io_waitmode==EPOLL) {
struct epoll_event x;
byte_zero(&x,sizeof(x)); // to shut up valgrind
x.events=0;
if (e->kernelwantwrite) x.events|=EPOLLOUT;
x.data.fd=d;
epoll_ctl(io_master,e->kernelwantwrite?EPOLL_CTL_MOD:EPOLL_CTL_DEL,d,&x);
}
#endif
#ifdef HAVE_KQUEUE #ifdef HAVE_KQUEUE
if (io_waitmode==KQUEUE) { if (io_waitmode==KQUEUE) {
struct kevent kev; struct kevent kev;

@ -36,6 +36,16 @@ void io_dontwantwrite_really(int64 d,io_entry* e) {
assert(e->kernelwantwrite); assert(e->kernelwantwrite);
newfd=!e->kernelwantread; newfd=!e->kernelwantread;
io_wanted_fds-=newfd; io_wanted_fds-=newfd;
#ifdef HAVE_EPOLL
if (io_waitmode==EPOLL) {
struct epoll_event x;
byte_zero(&x,sizeof(x)); // to shut up valgrind
x.events=0;
if (e->wantread) x.events|=EPOLLIN;
x.data.fd=d;
epoll_ctl(io_master,e->kernelwantread?EPOLL_CTL_MOD:EPOLL_CTL_DEL,d,&x);
}
#endif
#ifdef HAVE_KQUEUE #ifdef HAVE_KQUEUE
if (io_waitmode==KQUEUE) { if (io_waitmode==KQUEUE) {
struct kevent kev; struct kevent kev;

@ -5,7 +5,7 @@ void io_eagain(int64 d) {
if (e) { if (e) {
if (e->wantread) e->canread=0; if (e->wantread) e->canread=0;
if (e->wantwrite) e->canwrite=0; if (e->wantwrite) e->canwrite=0;
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) #if defined(HAVE_SIGIO)
if (d==alt_firstread) { if (d==alt_firstread) {
debug_printf(("io_eagain: dequeueing %lld from alt read queue (next is %ld)\n",d,e->next_read)); debug_printf(("io_eagain: dequeueing %lld from alt read queue (next is %ld)\n",d,e->next_read));
alt_firstread=e->next_read; alt_firstread=e->next_read;

@ -4,7 +4,7 @@ void io_eagain_read(int64 d) {
io_entry* e=iarray_get(&io_fds,d); io_entry* e=iarray_get(&io_fds,d);
if (e) { if (e) {
e->canread=0; e->canread=0;
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) #if defined(HAVE_SIGIO)
if (d==alt_firstread) { if (d==alt_firstread) {
debug_printf(("io_eagain: dequeueing %lld from alt read queue (next is %ld)\n",d,e->next_read)); debug_printf(("io_eagain: dequeueing %lld from alt read queue (next is %ld)\n",d,e->next_read));
alt_firstread=e->next_read; alt_firstread=e->next_read;

@ -4,7 +4,7 @@ void io_eagain_write(int64 d) {
io_entry* e=iarray_get(&io_fds,d); io_entry* e=iarray_get(&io_fds,d);
if (e) { if (e) {
e->canwrite=0; e->canwrite=0;
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) #if defined(HAVE_SIGIO)
if (d==alt_firstwrite) { if (d==alt_firstwrite) {
debug_printf(("io_eagain: dequeueing %lld from alt write queue (next is %ld)\n",d,e->next_write)); debug_printf(("io_eagain: dequeueing %lld from alt write queue (next is %ld)\n",d,e->next_write));
alt_firstwrite=e->next_write; alt_firstwrite=e->next_write;

@ -47,7 +47,7 @@ int io_master;
int io_signum; int io_signum;
sigset_t io_ss; sigset_t io_ss;
#endif #endif
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) #if defined(HAVE_SIGIO)
long alt_firstread; long alt_firstread;
long alt_firstwrite; long alt_firstwrite;
#endif #endif
@ -98,10 +98,8 @@ static io_entry* io_fd_internal(int64 d) {
if (io_master!=-1) io_waitmode=DEVPOLL; if (io_master!=-1) io_waitmode=DEVPOLL;
} }
#endif #endif
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL)
alt_firstread=alt_firstwrite=-1;
#endif
#if defined(HAVE_SIGIO) #if defined(HAVE_SIGIO)
alt_firstread=alt_firstwrite=-1;
if (io_waitmode==UNDECIDED) { if (io_waitmode==UNDECIDED) {
io_signum=SIGRTMIN+1; io_signum=SIGRTMIN+1;
if (sigemptyset(&io_ss)==0 && if (sigemptyset(&io_ss)==0 &&

@ -112,7 +112,7 @@ int64 io_tryread(int64 d,char* buf,int64 len) {
} }
if (r!=len) { if (r!=len) {
e->canread=0; e->canread=0;
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) #if defined(HAVE_SIGIO)
if (d==alt_firstread) { if (d==alt_firstread) {
debug_printf(("io_tryread: dequeueing %ld from alt read queue (next is %ld)\n",d,e->next_read)); debug_printf(("io_tryread: dequeueing %ld from alt read queue (next is %ld)\n",d,e->next_read));
alt_firstread=e->next_read; alt_firstread=e->next_read;

@ -106,7 +106,7 @@ int64 io_trywrite(int64 d,const char* buf,int64 len) {
} }
if (r!=len) { if (r!=len) {
e->canwrite=0; e->canwrite=0;
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) #if defined(HAVE_SIGIO)
if (d==alt_firstwrite) { if (d==alt_firstwrite) {
debug_printf(("io_trywrite: dequeueing %ld from alt write queue (next is %ld)\n",d,e->next_write)); debug_printf(("io_trywrite: dequeueing %ld from alt write queue (next is %ld)\n",d,e->next_write));
alt_firstwrite=e->next_write; alt_firstwrite=e->next_write;

@ -124,44 +124,106 @@ int64 io_waituntil2(int64 milliseconds) {
if (io_waitmode==EPOLL) { if (io_waitmode==EPOLL) {
int n; int n;
struct epoll_event y[100]; struct epoll_event y[100];
io_entry* e;
if (alt_firstread>=0 && (e=iarray_get(&io_fds,alt_firstread)) && e->canread) {
// write(1,"r",1);
return 1;
}
if (alt_firstwrite>=0 && (e=iarray_get(&io_fds,alt_firstwrite)) && e->canwrite) {
// write(1,"w",1);
return 1;
}
// write(1,".",1);
if ((n=epoll_wait(io_master,y,100,milliseconds))==-1) return -1; if ((n=epoll_wait(io_master,y,100,milliseconds))==-1) return -1;
for (i=0; i<n; ++i) { for (i=0; i<n; ++i) {
e=iarray_get(&io_fds,y[i].data.fd); io_entry* e=iarray_get(&io_fds,y[i].data.fd);
if (e) { if (e) {
int curevents=0,newevents;
if (e->kernelwantread) curevents |= EPOLLIN;
if (e->kernelwantwrite) curevents |= EPOLLOUT;
#ifdef DEBUG
if ((y[i].events&(EPOLLIN|EPOLLPRI|EPOLLRDNORM|EPOLLRDBAND)) && !e->kernelwantread)
printf("got unexpected read event on fd #%d\n",y[i].data.fd);
if ((y[i].events&EPOLLOUT) && !e->kernelwantwrite)
printf("got unexpected write event on fd #%d\n",y[i].data.fd);
#endif
if (y[i].events&(POLLERR|POLLHUP)) { if (y[i].events&(POLLERR|POLLHUP)) {
/* error; signal whatever app is looking for */ /* error; signal whatever app is looking for */
if (e->wantread) y[i].events|=POLLIN; if (e->wantread) y[i].events|=POLLIN;
if (e->wantwrite) y[i].events|=POLLOUT; if (e->wantwrite) y[i].events|=POLLOUT;
} }
if (y[i].events&POLLIN && !e->canread) {
debug_printf(("io_waituntil2: enqueueing %ld in normal read queue before %ld\n",info.si_fd,first_readable)); newevents=0;
e->canread=1; if (!e->canread || e->wantread) {
e->next_read=first_readable; newevents|=EPOLLIN;
first_readable=y[i].data.fd; e->kernelwantread=1;
} else
e->kernelwantread=0;
if (!e->canwrite || e->wantwrite) {
newevents|=EPOLLOUT;
e->kernelwantwrite=1;
} else
e->kernelwantwrite=0;
/* if we think we can not read, but the kernel tells us that we
* can, put this fd in the relevant data structures */
if (!e->canread && (y[i].events&(EPOLLIN|EPOLLPRI|EPOLLRDNORM|EPOLLRDBAND))) {
if (e->canread) {
newevents &= ~EPOLLIN;
} else {
e->canread=1;
if (e->wantread) {
e->next_read=first_readable;
first_readable=y[i].data.fd;
}
}
} }
if (y[i].events&POLLOUT && !e->canwrite) {
debug_printf(("io_waituntil2: enqueueing %ld in normal write queue before %ld\n",info.si_fd,first_writeable)); /* if the kernel says the fd is writable, ... */
e->canwrite=1; if (y[i].events&EPOLLOUT) {
e->next_write=first_writeable; /* Usually, if the kernel says a descriptor is writable, we
first_writeable=y[i].data.fd; * note it and do not tell the kernel not to tell us again.
* The idea is that once we notify the caller that the fd is
* writable, and the caller handles the event, the caller will
* just ask to be notified of future write events again. We
* are trying to save the superfluous epoll_ctl syscalls.
* If e->canwrite is set, then this gamble did not work out.
* We told the caller, yet after the caller is done we still
* got another write event. Clearly the user is implementing
* some kind of throttling and we can tell the kernel to leave
* us alone for now. */
if (e->canwrite) {
newevents &= ~EPOLLOUT;
e->kernelwantwrite=0;
} else {
/* If !e->wantwrite: The laziness optimization in
* io_dontwantwrite hit. We did not tell the kernel that we
* are no longer interested in writing to save the syscall.
* Now we know we could write if we wanted; remember that
* and then go on. */
e->canwrite=1;
if (e->wantwrite) {
e->next_write=first_writeable;
first_writeable=y[i].data.fd;
}
}
}
if (newevents != curevents) {
#if 0
printf("canread %d, wantread %d, kernelwantread %d, canwrite %d, wantwrite %d, kernelwantwrite %d\n",
e->canread,e->wantread,e->kernelwantread,e->canwrite,e->wantwrite,e->kernelwantwrite);
printf("newevents: read %d write %d\n",!!(newevents&EPOLLIN),!!(newevents&EPOLLOUT));
#endif
y[i].events=newevents;
if (newevents) {
epoll_ctl(io_master,EPOLL_CTL_MOD,y[i].data.fd,y+i);
} else {
epoll_ctl(io_master,EPOLL_CTL_DEL,y[i].data.fd,y+i);
--io_wanted_fds;
}
} }
} else { } else {
#if 0
#ifdef __dietlibc__ #ifdef __dietlibc__
char buf[FMT_ULONG]; char buf[FMT_ULONG];
buf[fmt_ulong(buf,y[i].data.fd)]=0; buf[fmt_ulong(buf,y[i].data.fd)]=0;
__write2("got epoll event on invalid fd "); __write2("got epoll event on invalid fd ");
__write2(buf); __write2(buf);
__write2("!\n"); __write2("!\n");
#endif
#endif #endif
epoll_ctl(io_master,EPOLL_CTL_DEL,y[i].data.fd,y+i); epoll_ctl(io_master,EPOLL_CTL_DEL,y[i].data.fd,y+i);
} }

@ -40,6 +40,16 @@ void io_wantwrite_really(int64 d, io_entry* e) {
assert(!e->kernelwantwrite); /* we should not be here if we already told the kernel we want to write */ assert(!e->kernelwantwrite); /* we should not be here if we already told the kernel we want to write */
newfd=(!e->kernelwantread); newfd=(!e->kernelwantread);
io_wanted_fds+=newfd; io_wanted_fds+=newfd;
#ifdef HAVE_EPOLL
if (io_waitmode==EPOLL) {
struct epoll_event x;
byte_zero(&x,sizeof(x)); // to shut up valgrind
x.events=EPOLLOUT;
if (e->kernelwantread) x.events|=EPOLLIN;
x.data.fd=d;
epoll_ctl(io_master,e->kernelwantread?EPOLL_CTL_MOD:EPOLL_CTL_ADD,d,&x);
}
#endif
#ifdef HAVE_KQUEUE #ifdef HAVE_KQUEUE
if (io_waitmode==KQUEUE) { if (io_waitmode==KQUEUE) {
struct kevent kev; struct kevent kev;
@ -58,25 +68,15 @@ void io_wantwrite_really(int64 d, io_entry* e) {
write(io_master,&x,sizeof(x)); write(io_master,&x,sizeof(x));
} }
#endif #endif
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) #ifdef HAVE_SIGIO
if (io_waitmode==_SIGIO || io_waitmode==EPOLL) { if (io_waitmode==_SIGIO) {
struct pollfd p; struct pollfd p;
if (io_waitmode==EPOLL && !e->epolladded) { p.fd=d;
struct epoll_event x; p.events=POLLOUT;
byte_zero(&x,sizeof(x)); // shut up valgrind switch (poll(&p,1,0)) {
x.events=EPOLLIN|EPOLLOUT|EPOLLET; case 1: e->canwrite=1; break;
x.data.fd=d; case 0: e->canwrite=0; break;
epoll_ctl(io_master,EPOLL_CTL_ADD,d,&x); case -1: return;
e->epolladded=1;
}
if (e->canwrite==0) {
p.fd=d;
p.events=POLLOUT;
switch (poll(&p,1,0)) {
case 1: e->canwrite=1; break;
// case 0: e->canwrite=0; break;
case -1: return;
}
} }
if (e->canwrite) { if (e->canwrite) {
debug_printf(("io_wantwrite: enqueueing %lld in normal write queue before %ld\n",d,first_readable)); debug_printf(("io_wantwrite: enqueueing %lld in normal write queue before %ld\n",d,first_readable));

Loading…
Cancel
Save