From e175800a8cc239dec3ef006dca6c4441da57dba0 Mon Sep 17 00:00:00 2001 From: leitner Date: Tue, 10 Apr 2012 21:15:51 +0000 Subject: [PATCH] add io_fd_canwrite (like io_fd but assume the fd is writable) save a few syscalls here and there --- CAS.h | 31 +++++---- CHANGES | 2 + io.h | 2 +- io/io_canread.c | 4 ++ io/io_canwrite.c | 4 ++ io/io_close.c | 7 +- io/io_dontwantread.c | 29 ++++++-- io/io_dontwantwrite.c | 33 +++++++-- io/io_fd.c | 17 ++++- io/io_fd_canwrite.3 | 22 ++++++ io/io_waituntil2.c | 155 +++++++++++++++++++++++++++++++++++++----- io/io_wantread.c | 31 +++++++-- io/io_wantwrite.c | 40 +++++++++-- test/cdbget2.c | 2 +- test/uudecode.c | 6 +- 15 files changed, 322 insertions(+), 63 deletions(-) create mode 100644 io/io_fd_canwrite.3 diff --git a/CAS.h b/CAS.h index b4a1c07..1308050 100644 --- a/CAS.h +++ b/CAS.h @@ -30,19 +30,6 @@ static inline int compare_and_swap(volatile size_t* x,size_t oldval,size_t newva #endif } -/* *x += val; */ -static inline void atomic_add(size_t* x,size_t val) { -#ifdef USE_BUILTINS - __sync_add_and_fetch(x,val); -#elif defined(__i386__) - asm volatile ("lock; addl %1, %0" : "+m" (*x) : "ir" (val) ); -#elif defined(__x86_64__) - asm volatile ("lock; addq %1, %0" : "+m" (*x) : "ir" (val) ); -#else -#error architecture not supported and gcc too old, edit CAS.h -#endif -} - /* return *x += val; */ static inline size_t atomic_add_return(size_t* x,size_t val) { #ifdef USE_BUILTINS @@ -56,7 +43,23 @@ static inline size_t atomic_add_return(size_t* x,size_t val) { asm volatile ("lock; xaddq %1, %0" : "+m" (*x), "+r" (val) :: "memory" ); return i + val; #else -#error architecture not supported and gcc too old, edit CAS.h + size_t y; + for (y=*x; compare_and_swap(&x,y,y+val)==0; y=*x) ; + return y+val; +#endif +} + + +/* *x += val; */ +static inline void atomic_add(size_t* x,size_t val) { +#ifdef USE_BUILTINS + __sync_add_and_fetch(x,val); +#elif defined(__i386__) + asm volatile ("lock; addl %1, %0" : "+m" (*x) : "ir" (val) ); +#elif defined(__x86_64__) + asm volatile ("lock; addq %1, %0" : "+m" (*x) : "ir" (val) ); +#else + atomic_add_return(&x,val); #endif } diff --git a/CHANGES b/CHANGES index bf598e4..fcf5753 100644 --- a/CHANGES +++ b/CHANGES @@ -6,6 +6,8 @@ fix io_receivefd so the incoming buffer is only 1 byte; io_passfd sends only one byte, so we might receive (and discard) other data if we try to read more (Stefan Bühler, from the lightttpd project) + add io_fd_canwrite (like io_fd but assume the fd is writable) + save a few syscalls here and there 0.28: add uint64 pack and unpack routines diff --git a/io.h b/io.h index 6c4bf6c..0a20a37 100644 --- a/io.h +++ b/io.h @@ -77,7 +77,7 @@ int64 io_timeouted(); /* put d on internal data structure, return 1 on success, 0 on error */ int io_fd(int64 d); /* use this for sockets before you called connect() or accept() */ -int io_fd_connected(int64 d); /* use this for connected sockets (assumes socket is writable) */ +int io_fd_canwrite(int64 d); /* use this for connected sockets (assumes socket is writable) */ void io_setcookie(int64 d,void* cookie); void* io_getcookie(int64 d); diff --git a/io/io_canread.c b/io/io_canread.c index a17a3fe..8a02ed0 100644 --- a/io/io_canread.c +++ b/io/io_canread.c @@ -7,6 +7,8 @@ #include #endif +void io_wantread_really(int64 d, io_entry* e); + int64 io_canread() { io_entry* e; if (first_readable==-1) @@ -49,6 +51,8 @@ int64 io_canread() { if (io_waitmode!=_SIGIO) #endif e->canread=0; + if (!e->kernelwantread) + io_wantread_really(r,e); return r; } } diff --git a/io/io_canwrite.c b/io/io_canwrite.c index 1acbc3c..beecade 100644 --- a/io/io_canwrite.c +++ b/io/io_canwrite.c @@ -3,6 +3,8 @@ #include #include "io_internal.h" +void io_wantwrite_really(int64 d, io_entry* e); + int64 io_canwrite() { io_entry* e; if (first_writeable==-1) @@ -40,6 +42,8 @@ int64 io_canwrite() { if (io_waitmode!=_SIGIO) #endif e->canwrite=0; + if (!e->kernelwantwrite) + io_wantwrite_really(r,e); return r; } } diff --git a/io/io_close.c b/io/io_close.c index c98da24..b5b6d04 100644 --- a/io/io_close.c +++ b/io/io_close.c @@ -7,13 +7,16 @@ #endif #include "io_internal.h" +extern void io_dontwantread_really(int64 d,io_entry* e); +extern void io_dontwantwrite_really(int64 d,io_entry* e); + void io_close(int64 d) { io_entry* e; if ((e=array_get(&io_fds,sizeof(io_entry),d))) { e->inuse=0; e->cookie=0; - io_dontwantread(d); - io_dontwantwrite(d); + if (e->kernelwantread) io_dontwantread_really(d,e); + if (e->kernelwantwrite) io_dontwantwrite_really(d,e); if (e->mmapped) { #ifdef __MINGW32__ UnmapViewOfFile(e->mmapped); diff --git a/io/io_dontwantread.c b/io/io_dontwantread.c index c0ffae8..51e1898 100644 --- a/io/io_dontwantread.c +++ b/io/io_dontwantread.c @@ -18,20 +18,25 @@ #include #endif -void io_dontwantread(int64 d) { +#ifdef DEBUG +#include +#else +#define assert(x) +#endif + +void io_dontwantread_really(int64 d, io_entry* e) { int newfd; - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); - if (!e || !e->wantread) return; - newfd=(e->wantread && !e->wantwrite); + assert(e->kernelwantread); + newfd=!e->kernelwantwrite; io_wanted_fds-=newfd; #ifdef HAVE_EPOLL if (io_waitmode==EPOLL) { struct epoll_event x; byte_zero(&x,sizeof(x)); // to shut up valgrind x.events=0; - if (e->wantwrite) x.events|=EPOLLOUT; + if (e->kernelwantwrite) x.events|=EPOLLOUT; x.data.fd=d; - epoll_ctl(io_master,newfd?EPOLL_CTL_DEL:EPOLL_CTL_MOD,d,&x); + epoll_ctl(io_master,e->kernelwantwrite?EPOLL_CTL_MOD:EPOLL_CTL_DEL,d,&x); } #endif #ifdef HAVE_KQUEUE @@ -48,10 +53,20 @@ void io_dontwantread(int64 d) { struct pollfd x; x.fd=d; x.events=0; - if (e->wantwrite) x.events|=POLLOUT; + if (e->kernelwantwrite) x.events|=POLLOUT; if (!x.events) x.events=POLLREMOVE; write(io_master,&x,sizeof(x)); } #endif e->wantread=0; + e->kernelwantread=0; +} + +void io_dontwantread(int64 d) { + io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + if (e) { + if (e->canread) + io_dontwantread_really(d,e); + e->wantread=0; + } } diff --git a/io/io_dontwantwrite.c b/io/io_dontwantwrite.c index 1531976..abbbbbe 100644 --- a/io/io_dontwantwrite.c +++ b/io/io_dontwantwrite.c @@ -18,11 +18,22 @@ #include #endif -void io_dontwantwrite(int64 d) { +#ifdef DEBUG +#include +#else +#define assert(x) +#endif + +/* IDEA: if someone calls io_dontwantwrite, do not do the syscall to + * tell the kernel about it. Only when a write event comes in and the + * user has told us he does not want them, THEN tell the kernel we are + * not interested. In the typical protocol case of "write request, read + * reply", this should save a lot of syscalls. */ + +void io_dontwantwrite_really(int64 d,io_entry* e) { int newfd; - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); - if (!e || !e->wantwrite) return; - newfd=(!e->wantread && e->wantwrite); + assert(e->kernelwantwrite); + newfd=!e->kernelwantread; io_wanted_fds-=newfd; #ifdef HAVE_EPOLL if (io_waitmode==EPOLL) { @@ -31,7 +42,7 @@ void io_dontwantwrite(int64 d) { x.events=0; if (e->wantread) x.events|=EPOLLIN; x.data.fd=d; - epoll_ctl(io_master,newfd?EPOLL_CTL_DEL:EPOLL_CTL_MOD,d,&x); + epoll_ctl(io_master,e->kernelwantread?EPOLL_CTL_MOD:EPOLL_CTL_DEL,d,&x); } #endif #ifdef HAVE_KQUEUE @@ -48,10 +59,20 @@ void io_dontwantwrite(int64 d) { struct pollfd x; x.fd=d; x.events=0; - if (e->wantread) x.events|=POLLIN; + if (e->kernelwantread) x.events|=POLLIN; if (!x.events) x.events=POLLREMOVE; write(io_master,&x,sizeof(x)); } #endif e->wantwrite=0; + e->kernelwantwrite=0; +} + +void io_dontwantwrite(int64 d) { + io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + if (e) { + if (e->canwrite) + io_dontwantwrite_really(d,e); + e->wantwrite=0; + } } diff --git a/io/io_fd.c b/io/io_fd.c index d87072f..5d6c73d 100644 --- a/io/io_fd.c +++ b/io/io_fd.c @@ -45,7 +45,7 @@ long alt_firstwrite; #endif /* put d on internal data structure, return 1 on success, 0 on error */ -int io_fd(int64 d) { +static io_entry* io_fd_internal(int64 d) { io_entry* e; #ifndef __MINGW32__ long r; @@ -53,7 +53,7 @@ int io_fd(int64 d) { return 0; /* file descriptor not open */ #endif if (!(e=array_allocate(&io_fds,sizeof(io_entry),d))) return 0; - if (e->inuse) return 1; + if (e->inuse) return e; byte_zero(e,sizeof(io_entry)); e->inuse=1; #ifdef __MINGW32__ @@ -124,5 +124,16 @@ int io_fd(int64 d) { fprintf(stderr," OK!\n"); } #endif - return 1; + return e; +} + +int io_fd(int64 d) { + io_entry* e=io_fd_internal(d); + return !!e; +} + +int io_fd_canwrite(int64 d) { + io_entry* e=io_fd_internal(d); + if (e) e->canwrite=1; + return !!e; } diff --git a/io/io_fd_canwrite.3 b/io/io_fd_canwrite.3 new file mode 100644 index 0000000..9b0a3c1 --- /dev/null +++ b/io/io_fd_canwrite.3 @@ -0,0 +1,22 @@ +.TH io_fd_canwrite 3 +.SH NAME +io_fd_canwrite \- prepare descriptor for io_wait +.SH SYNTAX +.B #include + +int \fBio_fd\fP(int64 fd); +int \fBio_fd_canwrite\fP(int64 fd); +.SH DESCRIPTION +io_fd_canwrite is just like io_fd, except that assumes the descriptor +is writable, which may save a syscall or two. This assumption is true +in most cases, because the kernel buffers writes. Noteworthy cases in +which you need to use io_fd instead of io_fd_canwrite are unconnected +sockets, i.e. when you queued a non-blocking connect() and want to ask +for writability to get notified when it went through. + +It is OK to call this function on a descriptor that io_fd() has already +been called on. +.SH "RETURN VALUE" +io_fd_canwrite returns 1 on success, 0 on error. +.SH "SEE ALSO" +io_wait(3), io_wantread(3), io_canread(3), io_eagain(3), io_nonblock(3), io_fd(3) diff --git a/io/io_waituntil2.c b/io/io_waituntil2.c index e07d122..eb357ac 100644 --- a/io/io_waituntil2.c +++ b/io/io_waituntil2.c @@ -32,6 +32,83 @@ #include #endif +#ifndef EPOLLRDNORM +#define EPOLLRDNORM 0 +#endif +#ifndef EPOLLRDBAND +#define EPOLLRDNORM 0 +#endif + +#if 0 +static void handleevent(int fd,int readable,int writable,int error) { + io_entry* e=array_get(&io_fds,sizeof(io_entry),fd); + if (e) { + int curevents=0,newevents; + if (e->kernelwantread) curevents |= POLLIN; + if (e->kernelwantwrite) curevents |= POLLOUT; + +#ifdef DEBUG + if (readable && !e->kernelwantread) + printf("got unexpected read event on fd #%d\n",fd); + if (writable && !e->kernelwantwrite) + printf("got unexpected write event on fd #%d\n",fd); +#endif + + if (error) { + /* signal whatever app is looking for */ + if (e->wantread) readable=1; + if (e->wantwrite) writable=1; + } + + if (readable && !e->canread) { + e->canread=1; + if (e->wantread) { + e->next_read=first_readable; + first_readable=y[i].data.fd; + } + } + if (writable && !e->canwrite) { + e->canwrite=1; + if (e->wantwrite) { + e->next_write=first_writeable; + first_writeable=y[i].data.fd; + } + } + + /* TODO: wie kommuniziere ich nach außen, was der Caller tun soll? + * Bitfeld-Integer? */ + + newevents=0; + if (!e->canread || e->wantread) { + newevents|=EPOLLIN; + e->kernelwantread=1; + } else + e->kernelwantread=0; + if (!e->canwrite || e->wantwrite) { + newevents|=EPOLLOUT; + e->kernelwantwrite=1; + } else + e->kernelwantwrite=0; + if (newevents != curevents) { +#if 0 + printf("canread %d, wantread %d, kernelwantread %d, canwrite %d, wantwrite %d, kernelwantwrite %d\n", + e->canread,e->wantread,e->kernelwantread,e->canwrite,e->wantwrite,e->kernelwantwrite); + printf("newevents: read %d write %d\n",!!(newevents&EPOLLIN),!!(newevents&EPOLLOUT)); +#endif + y[i].events=newevents; + if (newevents) { + epoll_ctl(io_master,EPOLL_CTL_MOD,y[i].data.fd,y+i); + } else { + epoll_ctl(io_master,EPOLL_CTL_DEL,y[i].data.fd,y+i); + --io_wanted_fds; + } + } + } else { + epoll_ctl(io_master,EPOLL_CTL_DEL,y[i].data.fd,y+i); + } +} +#endif + int64 io_waituntil2(int64 milliseconds) { #ifndef __MINGW32__ struct pollfd* p; @@ -46,24 +123,75 @@ int64 io_waituntil2(int64 milliseconds) { for (i=n-1; i>=0; --i) { io_entry* e=array_get(&io_fds,sizeof(io_entry),y[i].data.fd); if (e) { + int curevents=0,newevents; + if (e->kernelwantread) curevents |= EPOLLIN; + if (e->kernelwantwrite) curevents |= EPOLLOUT; + +#ifdef DEBUG + if ((y[i].events&(EPOLLIN|EPOLLPRI|EPOLLRDNORM|EPOLLRDBAND)) && !e->kernelwantread) + printf("got unexpected read event on fd #%d\n",y[i].data.fd); + if ((y[i].events&EPOLLOUT) && !e->kernelwantwrite) + printf("got unexpected write event on fd #%d\n",y[i].data.fd); +#endif + if (y[i].events&(EPOLLERR|EPOLLHUP)) { /* error; signal whatever app is looking for */ if (e->wantread) y[i].events|=EPOLLIN; if (e->wantwrite) y[i].events|=EPOLLOUT; } -#ifdef EPOLLRDNORM + if (!e->canread && (y[i].events&(EPOLLIN|EPOLLPRI|EPOLLRDNORM|EPOLLRDBAND))) { -#else - if (!e->canread && (y[i].events&(EPOLLIN|EPOLLPRI))) { -#endif - e->canread=1; - e->next_read=first_readable; - first_readable=y[i].data.fd; + if (e->canread) { + newevents &= ~EPOLLIN; + } else { + e->canread=1; + if (e->wantread) { + e->next_read=first_readable; + first_readable=y[i].data.fd; + } + } } - if (!e->canwrite && (y[i].events&EPOLLOUT)) { - e->canwrite=1; - e->next_write=first_writeable; - first_writeable=y[i].data.fd; + if (y[i].events&EPOLLOUT) { + if (e->canwrite) { + newevents &= ~EPOLLOUT; + } else { + /* If !e->wantwrite: The laziness optimization in + * io_dontwantwrite hit. We did not tell the kernel that we + * are no longer interested in writing to save the syscall. + * Now we know we could write if we wanted; remember that + * and then go on. */ + e->canwrite=1; + if (e->wantwrite) { + e->next_write=first_writeable; + first_writeable=y[i].data.fd; + } + } + } + + newevents=0; + if (!e->canread || e->wantread) { + newevents|=EPOLLIN; + e->kernelwantread=1; + } else + e->kernelwantread=0; + if (!e->canwrite || e->wantwrite) { + newevents|=EPOLLOUT; + e->kernelwantwrite=1; + } else + e->kernelwantwrite=0; + if (newevents != curevents) { +#if 0 + printf("canread %d, wantread %d, kernelwantread %d, canwrite %d, wantwrite %d, kernelwantwrite %d\n", + e->canread,e->wantread,e->kernelwantread,e->canwrite,e->wantwrite,e->kernelwantwrite); + printf("newevents: read %d write %d\n",!!(newevents&EPOLLIN),!!(newevents&EPOLLOUT)); +#endif + y[i].events=newevents; + if (newevents) { + epoll_ctl(io_master,EPOLL_CTL_MOD,y[i].data.fd,y+i); + } else { + epoll_ctl(io_master,EPOLL_CTL_DEL,y[i].data.fd,y+i); + --io_wanted_fds; + } } } else { epoll_ctl(io_master,EPOLL_CTL_DEL,y[i].data.fd,y+i); @@ -81,11 +209,6 @@ int64 io_waituntil2(int64 milliseconds) { if ((n=kevent(io_master,0,0,y,100,milliseconds!=-1?&ts:0))==-1) return -1; for (i=n-1; i>=0; --i) { io_entry* e=array_get(&io_fds,sizeof(io_entry),y[--n].ident); -#ifdef DEBUG - if (!e) { - e=e; - } -#endif if (e) { if (y[n].flags&EV_ERROR) { /* error; signal whatever app is looking for */ diff --git a/io/io_wantread.c b/io/io_wantread.c index b6032b0..3fb9965 100644 --- a/io/io_wantread.c +++ b/io/io_wantread.c @@ -24,20 +24,25 @@ #include #endif -void io_wantread(int64 d) { +#ifdef DEBUG +#include +#else +#define assert(x) +#endif + +void io_wantread_really(int64 d,io_entry* e) { int newfd; - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); - if (!e || e->wantread) return; - newfd=(!e->wantread && !e->wantwrite); + assert(!e->kernelwantread); + newfd=!e->kernelwantwrite; io_wanted_fds+=newfd; #ifdef HAVE_EPOLL if (io_waitmode==EPOLL) { struct epoll_event x; byte_zero(&x,sizeof(x)); // to shut up valgrind x.events=EPOLLIN; - if (e->wantwrite) x.events|=EPOLLOUT; + if (e->kernelwantwrite) x.events|=EPOLLOUT; x.data.fd=d; - epoll_ctl(io_master,newfd?EPOLL_CTL_ADD:EPOLL_CTL_MOD,d,&x); + epoll_ctl(io_master,e->kernelwantwrite?EPOLL_CTL_MOD:EPOLL_CTL_ADD,d,&x); } #endif #ifdef HAVE_KQUEUE @@ -104,4 +109,18 @@ queueread: } #endif e->wantread=1; + e->kernelwantread=1; +} + +void io_wantread(int64 d) { + io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + if (!e || e->wantread) return; + if (e->canread) { + e->next_read=first_readable; + first_readable=d; + e->wantread=1; + return; + } + /* the harder case: do as before */ + if (!e->kernelwantread) io_wantread_really(d, e); else e->wantread=1; } diff --git a/io/io_wantwrite.c b/io/io_wantwrite.c index afcfd70..10d97ab 100644 --- a/io/io_wantwrite.c +++ b/io/io_wantwrite.c @@ -21,20 +21,33 @@ #include #endif -void io_wantwrite(int64 d) { +#ifdef DEBUG +#include +#else +#define assert(x) +#endif + +/* IDEA: if someone calls io_dontwantwrite, do not do the syscall to + * tell the kernel about it. Only when a write event comes in and the + * user has told us he does not want them, THEN tell the kernel we are + * not interested. In the typical protocol case of "write request, read + * reply", this should save a lot of syscalls. + * Now, if someone calls io_wantwrite, we might be in the situation that + * canwrite is already set. In that case, just enqueue the fd. */ + +void io_wantwrite_really(int64 d, io_entry* e) { int newfd; - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); - if (!e || e->wantwrite) return; - newfd=(!e->wantread && !e->wantwrite); + assert(!e->kernelwantwrite); /* we should not be here if we already told the kernel we want to write */ + newfd=(!e->kernelwantread); io_wanted_fds+=newfd; #ifdef HAVE_EPOLL if (io_waitmode==EPOLL) { struct epoll_event x; byte_zero(&x,sizeof(x)); // to shut up valgrind x.events=EPOLLOUT; - if (e->wantread) x.events|=EPOLLIN; + if (e->kernelwantread) x.events|=EPOLLIN; x.data.fd=d; - epoll_ctl(io_master,newfd?EPOLL_CTL_ADD:EPOLL_CTL_MOD,d,&x); + epoll_ctl(io_master,e->kernelwantread?EPOLL_CTL_MOD:EPOLL_CTL_ADD,d,&x); } #endif #ifdef HAVE_KQUEUE @@ -82,4 +95,19 @@ void io_wantwrite(int64 d) { } #endif e->wantwrite=1; + e->kernelwantwrite=1; +} + +void io_wantwrite(int64 d) { + io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + if (!e) return; + if (e->wantwrite && e->kernelwantwrite) return; + if (e->canwrite) { + e->next_write=first_writeable; + first_writeable=d; + e->wantwrite=1; + return; + } + /* the harder case: do as before */ + if (!e->kernelwantwrite) io_wantwrite_really(d, e); else e->wantwrite=1; } diff --git a/test/cdbget2.c b/test/cdbget2.c index b69fc8f..0931797 100644 --- a/test/cdbget2.c +++ b/test/cdbget2.c @@ -6,7 +6,7 @@ #include #ifndef O_BINARY -#define O_BINARY +#define O_BINARY 0 #endif int main(int argc,char* argv[]) { diff --git a/test/uudecode.c b/test/uudecode.c index 24a289a..84862be 100644 --- a/test/uudecode.c +++ b/test/uudecode.c @@ -303,7 +303,8 @@ invalidpart: continue; } else if (str_start(line,"=yend")) { /* first try to decode it normally and see if the CRC matches */ - unsigned long i,wantedcrc; + unsigned long i,wantedcrc,gotcrc; + gotcrc=0; stralloc out; char* tmp=strstr(line," pcrc32="); @@ -311,12 +312,14 @@ invalidpart: if (!scan_xlong(tmp+8,&wantedcrc)) goto invalidpart; wantedcrc &= 0xfffffffful; + gotcrc=1; } else if (part==1) { tmp=strstr(line," crc32="); if (!tmp) goto invalidpart; if (!scan_xlong(tmp+7,&wantedcrc)) goto invalidpart; wantedcrc &= 0xfffffffful; + gotcrc=1; endoffset=totalsize; } else goto invalidpart; stralloc_init(&out); @@ -345,6 +348,7 @@ writeerror: i+=x+1; out.len+=scanned; } i=crc32(0,out.s,out.len); + if (!gotcrc) wantedcrc=i; if (out.len == endoffset-offset && i == wantedcrc) { if (buffer_put(&fileout,out.s,out.len)) goto writeerror; ++reconstructed;