reengineer alt queue handling to prevent race
This commit is contained in:
parent
f9570b38a1
commit
b73a699dab
@ -11,6 +11,21 @@ void io_wantread_really(int64 d, io_entry* e);
|
|||||||
|
|
||||||
int64 io_canread() {
|
int64 io_canread() {
|
||||||
io_entry* e;
|
io_entry* e;
|
||||||
|
|
||||||
|
#if defined(HAVE_SIGIO)
|
||||||
|
/* We promise that the user can call io_canread() and read, and
|
||||||
|
* the user uses io_tryread or calls io_eagain_read to signal if
|
||||||
|
* there is no more data to read. That means if the user does not
|
||||||
|
* call io_eagain_read, we need to know which fd it was so we can
|
||||||
|
* keep it in the alternative queue. */
|
||||||
|
if (alt_curread!=-1) {
|
||||||
|
e=iarray_get(&io_fds,alt_curread);
|
||||||
|
e->next_read=alt_firstread;
|
||||||
|
alt_firstread=alt_curread;
|
||||||
|
alt_curread=-1;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (first_readable==-1)
|
if (first_readable==-1)
|
||||||
#if defined(HAVE_SIGIO)
|
#if defined(HAVE_SIGIO)
|
||||||
{
|
{
|
||||||
@ -54,9 +69,15 @@ int64 io_canread() {
|
|||||||
#endif
|
#endif
|
||||||
) {
|
) {
|
||||||
#if defined(HAVE_SIGIO)
|
#if defined(HAVE_SIGIO)
|
||||||
|
#if 0
|
||||||
|
/* this code violates an invariant that the other code has, namely
|
||||||
|
* that e->next_read is -1 once the fd is dequeued. */
|
||||||
e->next_read=alt_firstread;
|
e->next_read=alt_firstread;
|
||||||
alt_firstread=r;
|
alt_firstread=r;
|
||||||
debug_printf(("io_canread: enqueue %ld in alt read queue (next is %ld)\n",alt_firstread,e->next_read));
|
debug_printf(("io_canread: enqueue %ld in alt read queue (next is %ld)\n",alt_firstread,e->next_read));
|
||||||
|
#else
|
||||||
|
alt_curread=r;
|
||||||
|
#endif
|
||||||
if (io_waitmode!=_SIGIO)
|
if (io_waitmode!=_SIGIO)
|
||||||
#endif
|
#endif
|
||||||
e->canread=0;
|
e->canread=0;
|
||||||
|
@ -7,6 +7,21 @@ void io_wantwrite_really(int64 d, io_entry* e);
|
|||||||
|
|
||||||
int64 io_canwrite() {
|
int64 io_canwrite() {
|
||||||
io_entry* e;
|
io_entry* e;
|
||||||
|
|
||||||
|
#if defined(HAVE_SIGIO)
|
||||||
|
/* We promise that the user can call io_canread() and read, and
|
||||||
|
* the user uses io_tryread or calls io_eagain_read to signal if
|
||||||
|
* there is no more data to read. That means if the user does not
|
||||||
|
* call io_eagain_read, we need to know which fd it was so we can
|
||||||
|
* keep it in the alternative queue. */
|
||||||
|
if (alt_curwrite!=-1) {
|
||||||
|
e=iarray_get(&io_fds,alt_curwrite);
|
||||||
|
e->next_write=alt_firstwrite;
|
||||||
|
alt_firstwrite=alt_curwrite;
|
||||||
|
alt_curwrite=-1;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (first_writeable==-1)
|
if (first_writeable==-1)
|
||||||
#if defined(HAVE_SIGIO)
|
#if defined(HAVE_SIGIO)
|
||||||
{
|
{
|
||||||
@ -46,9 +61,15 @@ int64 io_canwrite() {
|
|||||||
#endif
|
#endif
|
||||||
) {
|
) {
|
||||||
#if defined(HAVE_SIGIO)
|
#if defined(HAVE_SIGIO)
|
||||||
|
#if 0
|
||||||
|
/* this code violates an invariant that the other code has, namely
|
||||||
|
* that e->next_read is -1 once the fd is dequeued. */
|
||||||
e->next_write=alt_firstwrite;
|
e->next_write=alt_firstwrite;
|
||||||
alt_firstwrite=r;
|
alt_firstwrite=r;
|
||||||
debug_printf(("io_canwrite: enqueue %ld in alt write queue (next is %ld)\n",alt_firstwrite,e->next_write));
|
debug_printf(("io_canwrite: enqueue %ld in alt write queue (next is %ld)\n",alt_firstwrite,e->next_write));
|
||||||
|
#else
|
||||||
|
alt_curwrite=r;
|
||||||
|
#endif
|
||||||
if (io_waitmode!=_SIGIO)
|
if (io_waitmode!=_SIGIO)
|
||||||
#endif
|
#endif
|
||||||
e->canwrite=0;
|
e->canwrite=0;
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
#else
|
#else
|
||||||
#include <sys/mman.h>
|
#include <sys/mman.h>
|
||||||
#endif
|
#endif
|
||||||
|
#include <stdio.h>
|
||||||
#include "io_internal.h"
|
#include "io_internal.h"
|
||||||
|
|
||||||
extern void io_dontwantread_really(int64 d,io_entry* e);
|
extern void io_dontwantread_really(int64 d,io_entry* e);
|
||||||
@ -33,10 +34,12 @@ void io_close(int64 d) {
|
|||||||
* happen for everybody.
|
* happen for everybody.
|
||||||
* So we don't actually close the fd now, but we will mark it as
|
* So we don't actually close the fd now, but we will mark it as
|
||||||
* closed. */
|
* closed. */
|
||||||
|
// fprintf(stderr,"io_close(%d) DEFERRED!\n",d);
|
||||||
e->closed=1;
|
e->closed=1;
|
||||||
return;
|
return;
|
||||||
} else
|
} else
|
||||||
e->closed=0;
|
e->closed=0;
|
||||||
}
|
}
|
||||||
|
// fprintf(stderr,"io_close(%d)\n",d);
|
||||||
close(d);
|
close(d);
|
||||||
}
|
}
|
||||||
|
@ -6,15 +6,23 @@ void io_eagain(int64 d) {
|
|||||||
if (e->wantread) e->canread=0;
|
if (e->wantread) e->canread=0;
|
||||||
if (e->wantwrite) e->canwrite=0;
|
if (e->wantwrite) e->canwrite=0;
|
||||||
#if defined(HAVE_SIGIO)
|
#if defined(HAVE_SIGIO)
|
||||||
if (d==alt_firstread) {
|
if (d==alt_curread) {
|
||||||
|
#if 0
|
||||||
debug_printf(("io_eagain: dequeueing %lld from alt read queue (next is %ld)\n",d,e->next_read));
|
debug_printf(("io_eagain: dequeueing %lld from alt read queue (next is %ld)\n",d,e->next_read));
|
||||||
alt_firstread=e->next_read;
|
alt_firstread=e->next_read;
|
||||||
e->next_read=-1;
|
e->next_read=-1;
|
||||||
|
#else
|
||||||
|
alt_curread=-1;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
if (d==alt_firstwrite) {
|
if (d==alt_firstwrite) {
|
||||||
|
#if 0
|
||||||
debug_printf(("io_eagain: dequeueing %lld from alt write queue (next is %ld)\n",d,e->next_write));
|
debug_printf(("io_eagain: dequeueing %lld from alt write queue (next is %ld)\n",d,e->next_write));
|
||||||
alt_firstwrite=e->next_write;
|
alt_firstwrite=e->next_write;
|
||||||
e->next_write=-1;
|
e->next_write=-1;
|
||||||
|
#else
|
||||||
|
alt_curwrite=-1;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
@ -5,10 +5,14 @@ void io_eagain_read(int64 d) {
|
|||||||
if (e) {
|
if (e) {
|
||||||
e->canread=0;
|
e->canread=0;
|
||||||
#if defined(HAVE_SIGIO)
|
#if defined(HAVE_SIGIO)
|
||||||
if (d==alt_firstread) {
|
if (d==alt_curread) {
|
||||||
|
#if 0
|
||||||
debug_printf(("io_eagain: dequeueing %lld from alt read queue (next is %ld)\n",d,e->next_read));
|
debug_printf(("io_eagain: dequeueing %lld from alt read queue (next is %ld)\n",d,e->next_read));
|
||||||
alt_firstread=e->next_read;
|
alt_firstread=e->next_read;
|
||||||
e->next_read=-1;
|
e->next_read=-1;
|
||||||
|
#else
|
||||||
|
alt_curread=-1;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
@ -5,10 +5,14 @@ void io_eagain_write(int64 d) {
|
|||||||
if (e) {
|
if (e) {
|
||||||
e->canwrite=0;
|
e->canwrite=0;
|
||||||
#if defined(HAVE_SIGIO)
|
#if defined(HAVE_SIGIO)
|
||||||
if (d==alt_firstwrite) {
|
if (d==alt_curwrite) {
|
||||||
|
#if 0
|
||||||
debug_printf(("io_eagain: dequeueing %lld from alt write queue (next is %ld)\n",d,e->next_write));
|
debug_printf(("io_eagain: dequeueing %lld from alt write queue (next is %ld)\n",d,e->next_write));
|
||||||
alt_firstwrite=e->next_write;
|
alt_firstwrite=e->next_write;
|
||||||
e->next_write=-1;
|
e->next_write=-1;
|
||||||
|
#else
|
||||||
|
alt_curwrite=-1;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
@ -48,8 +48,8 @@ int io_signum;
|
|||||||
sigset_t io_ss;
|
sigset_t io_ss;
|
||||||
#endif
|
#endif
|
||||||
#if defined(HAVE_SIGIO)
|
#if defined(HAVE_SIGIO)
|
||||||
long alt_firstread;
|
long alt_firstread, alt_firstwrite;
|
||||||
long alt_firstwrite;
|
long alt_curread, alt_curwrite;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* put d on internal data structure, return 1 on success, 0 on error */
|
/* put d on internal data structure, return 1 on success, 0 on error */
|
||||||
@ -105,7 +105,7 @@ static io_entry* io_fd_internal(int64 d,int flags) {
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
#if defined(HAVE_SIGIO)
|
#if defined(HAVE_SIGIO)
|
||||||
alt_firstread=alt_firstwrite=-1;
|
alt_firstread=alt_firstwrite=alt_curread=alt_curwrite=-1;
|
||||||
if (io_waitmode==UNDECIDED) {
|
if (io_waitmode==UNDECIDED) {
|
||||||
io_signum=SIGRTMIN+1;
|
io_signum=SIGRTMIN+1;
|
||||||
if (sigemptyset(&io_ss)==0 &&
|
if (sigemptyset(&io_ss)==0 &&
|
||||||
|
@ -113,11 +113,17 @@ int64 io_tryread(int64 d,char* buf,int64 len) {
|
|||||||
if (r!=len) {
|
if (r!=len) {
|
||||||
e->canread=0;
|
e->canread=0;
|
||||||
#if defined(HAVE_SIGIO)
|
#if defined(HAVE_SIGIO)
|
||||||
if (d==alt_firstread) {
|
#if 0
|
||||||
debug_printf(("io_tryread: dequeueing %ld from alt read queue (next is %ld)\n",d,e->next_read));
|
debug_printf(("io_tryread: dequeueing %ld from alt read queue (next is %ld)\n",d,alt_firstread));
|
||||||
alt_firstread=e->next_read;
|
alt_firstread=e->next_read;
|
||||||
e->next_read=-1;
|
e->next_read=-1;
|
||||||
}
|
#else
|
||||||
|
if (d==alt_curread) alt_curread=-1;
|
||||||
|
} else {
|
||||||
|
debug_printf(("io_tryread: enqueueing %ld into alt read queue (next is %ld)\n",d,alt_firstread));
|
||||||
|
e->next_read=alt_firstread;
|
||||||
|
alt_firstread=d;
|
||||||
|
#endif
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
return r;
|
return r;
|
||||||
|
@ -108,9 +108,17 @@ int64 io_trywrite(int64 d,const char* buf,int64 len) {
|
|||||||
e->canwrite=0;
|
e->canwrite=0;
|
||||||
#if defined(HAVE_SIGIO)
|
#if defined(HAVE_SIGIO)
|
||||||
if (d==alt_firstwrite) {
|
if (d==alt_firstwrite) {
|
||||||
|
#if 0
|
||||||
debug_printf(("io_trywrite: dequeueing %ld from alt write queue (next is %ld)\n",d,e->next_write));
|
debug_printf(("io_trywrite: dequeueing %ld from alt write queue (next is %ld)\n",d,e->next_write));
|
||||||
alt_firstwrite=e->next_write;
|
alt_firstwrite=e->next_write;
|
||||||
e->next_write=-1;
|
e->next_write=-1;
|
||||||
|
#else
|
||||||
|
if (d==alt_curwrite) alt_curwrite=-1;
|
||||||
|
} else {
|
||||||
|
debug_printf(("io_trywrite: enqueueing %ld into alt write queue (next is %ld)\n",d,alt_firstwrite));
|
||||||
|
e->next_write=alt_firstwrite;
|
||||||
|
alt_firstwrite=d;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
@ -110,8 +110,8 @@ my_extern int io_master;
|
|||||||
my_extern int io_signum;
|
my_extern int io_signum;
|
||||||
my_extern sigset_t io_ss;
|
my_extern sigset_t io_ss;
|
||||||
|
|
||||||
my_extern long alt_firstread;
|
my_extern long alt_firstread, alt_firstwrite;
|
||||||
my_extern long alt_firstwrite;
|
my_extern long alt_curread, alt_curwrite;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int64 io_waituntil2(int64 milliseconds);
|
int64 io_waituntil2(int64 milliseconds);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user