write a small socket server with io_wait integration and add and debug

io framework enough to support the test program
master
leitner 22 years ago
parent 8cd9323a1b
commit 325a6176ca

@ -75,6 +75,6 @@ void* array_allocate(array* x,uint64 membersize,int64 pos) {
x->allocated=wanted; x->allocated=wanted;
byte_zero(x->p+x->initialized,x->allocated-x->initialized); byte_zero(x->p+x->initialized,x->allocated-x->initialized);
} }
x->initialized=pos*membersize; x->initialized=(pos+1)*membersize;
return x->p+pos*membersize; return x->p+pos*membersize;
} }

@ -50,6 +50,9 @@ int64 io_canread();
/* return next descriptor from io_wait that can be written to */ /* return next descriptor from io_wait that can be written to */
int64 io_canwrite(); int64 io_canwrite();
/* return next descriptor with expired timeout */
int64 io_timeouted();
/* put d on internal data structure, return 1 on success, 0 on error */ /* put d on internal data structure, return 1 on success, 0 on error */
int io_fd(int64 d); int io_fd(int64 d);

@ -0,0 +1,9 @@
#include <unistd.h>
#include <sys/time.h>
#include <poll.h>
#include <errno.h>
#include "io_internal.h"
void io_check() {
io_waituntil2(0);
}

@ -4,5 +4,9 @@
void io_close(int64 d) { void io_close(int64 d) {
close(d); close(d);
io_entry* e=array_get(&io_fds,sizeof(io_entry),d); io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
if (e) e->inuse=0; if (e) {
e->inuse=0;
io_dontwantread(d);
io_dontwantwrite(d);
}
} }

@ -12,5 +12,6 @@ int io_fd(int64 d) {
if (!(e=array_allocate(&io_fds,sizeof(io_entry),d))) return 0; if (!(e=array_allocate(&io_fds,sizeof(io_entry),d))) return 0;
e->inuse=1; e->inuse=1;
if (r&O_NDELAY) e->nonblock=1; if (r&O_NDELAY) e->nonblock=1;
e->next_read=e->next_write=-1;
return 1; return 1;
} }

@ -16,7 +16,10 @@ int64 io_tryread(int64 d,char* buf,int64 len) {
p.events=POLLIN; p.events=POLLIN;
switch (poll(&p,1,0)) { switch (poll(&p,1,0)) {
case -1: return -3; case -1: return -3;
case 0: errno=EAGAIN; return -1; case 0: errno=EAGAIN;
e->canread=0;
e->next_read=-1;
return -1;
} }
new.it_interval.tv_usec=0; new.it_interval.tv_usec=0;
new.it_interval.tv_sec=0; new.it_interval.tv_sec=0;
@ -37,5 +40,9 @@ int64 io_tryread(int64 d,char* buf,int64 len) {
if (errno!=EAGAIN) if (errno!=EAGAIN)
r=-3; r=-3;
} }
if (r==-1 || r==0) {
e->canread=0;
e->next_read=-1;
}
return r; return r;
} }

@ -16,7 +16,10 @@ int64 io_trywrite(int64 d,const char* buf,int64 len) {
p.events=POLLOUT; p.events=POLLOUT;
switch (poll(&p,1,0)) { switch (poll(&p,1,0)) {
case -1: return -3; case -1: return -3;
case 0: errno=EAGAIN; return -1; case 0: errno=EAGAIN;
e->canwrite=0;
e->next_write=-1;
return -1;
} }
new.it_interval.tv_usec=0; new.it_interval.tv_usec=0;
new.it_interval.tv_sec=0; new.it_interval.tv_sec=0;
@ -32,8 +35,14 @@ int64 io_trywrite(int64 d,const char* buf,int64 len) {
new.it_value.tv_sec=0; new.it_value.tv_sec=0;
setitimer(ITIMER_REAL,&new,&old); setitimer(ITIMER_REAL,&new,&old);
} }
if (r==-1) if (r==-1) {
if (errno==EINTR) errno=EAGAIN;
if (errno!=EAGAIN) if (errno!=EAGAIN)
r=-3; r=-3;
}
if (r==-1 || r==0) {
e->canwrite=0;
e->next_write=-1;
}
return r; return r;
} }

@ -29,10 +29,18 @@ again:
if (errno==EINTR) goto again; if (errno==EINTR) goto again;
return -1; return -1;
} }
for (i=0; i<r; ++i) { for (i=r-1; i>=0; --i) {
io_entry* e=array_get(&io_fds,sizeof(io_entry),p->fd); io_entry* e=array_get(&io_fds,sizeof(io_entry),p->fd);
if (p->revents&POLLIN) e->canread=1; if (p->revents&POLLIN) {
if (p->revents&POLLOUT) e->canwrite=1; e->canread=1;
e->next_read=first_readable;
first_readable=p->fd;
}
if (p->revents&POLLOUT) {
e->canwrite=1;
e->next_write=first_writeable;
first_writeable=p->fd;
}
p++; p++;
} }
return i; return i;

@ -9,10 +9,15 @@ typedef struct {
unsigned int nonblock:1; unsigned int nonblock:1;
unsigned int inuse:1; unsigned int inuse:1;
tai6464 timeout; tai6464 timeout;
long next_read;
long next_write;
} io_entry; } io_entry;
array io_fds; array io_fds;
uint64 io_wanted_fds; uint64 io_wanted_fds;
array io_pollfds; array io_pollfds;
unsigned long first_readable;
unsigned long first_writeable;
int64 io_waituntil2(int64 milliseconds); int64 io_waituntil2(int64 milliseconds);

@ -0,0 +1,95 @@
#include "socket.h"
#include "io.h"
#include "buffer.h"
#include "ip6.h"
#include <errno.h>
main() {
int s=socket_tcp6();
uint32 scope_id;
char ip[16];
uint16 port;
if (socket_bind6_reuse(s,V6any,1234,0)==-1) return 111;
if (socket_listen(s,16)==-1) return 111;
io_nonblock(s);
if (!io_fd(s)) return 111;
io_wantread(s);
buffer_puts(buffer_2,"listening on port 1234 (fd #");
buffer_putulong(buffer_2,s);
buffer_putsflush(buffer_2,")\n");
for (;;) {
int64 i;
io_wait();
buffer_putsflush(buffer_2,"io_wait() returned!\n");
while ((i=io_canread())!=-1) {
if (i==s) {
int n;
while ((n=socket_accept6(s,ip,&port,&scope_id))!=-1) {
char buf[IP6_FMT];
buffer_puts(buffer_2,"accepted new connection from ");
buffer_put(buffer_2,buf,fmt_ip6(buf,ip));
buffer_puts(buffer_2,":");
buffer_putulong(buffer_2,port);
buffer_puts(buffer_2," (fd ");
buffer_putulong(buffer_2,n);
buffer_puts(buffer_2,")");
if (io_fd(n)) {
io_wantread(n);
} else {
buffer_puts(buffer_2,", but io_fd failed.");
io_close(n);
}
buffer_putnlflush(buffer_2);
}
if (errno!=EAGAIN) {
buffer_puts(buffer_2,"socket_accept6: ");
buffer_puterror(buffer_2);
buffer_putnlflush(buffer_2);
}
} else {
char buf[1024];
int l=io_tryread(i,buf,sizeof buf);
if (l==-1) {
buffer_puts(buffer_2,"io_tryread(");
buffer_putulong(buffer_2,i);
buffer_puts(buffer_2,"): ");
buffer_puterror(buffer_2);
buffer_putnlflush(buffer_2);
io_close(i);
} else if (l==0) {
buffer_puts(buffer_2,"eof on fd #");
buffer_putulong(buffer_2,i);
buffer_putnlflush(buffer_2);
io_close(i);
} else {
int r;
switch (r=io_trywrite(i,buf,l)) {
case -1:
buffer_puts(buffer_2,"io_tryread(");
buffer_putulong(buffer_2,i);
buffer_puts(buffer_2,"): ");
buffer_puterror(buffer_2);
buffer_putnlflush(buffer_2);
io_close(i);
break;
case 0:
buffer_puts(buffer_2,"write eof on fd #");
buffer_putulong(buffer_2,i);
buffer_putnlflush(buffer_2);
io_close(i);
default:
if (r!=l) {
buffer_puts(buffer_2,"short write on fd #");
buffer_putulong(buffer_2,i);
buffer_puts(buffer_2,": wrote ");
buffer_putulong(buffer_2,r);
buffer_puts(buffer_2,", wanted to write ");
buffer_putulong(buffer_2,l);
buffer_putsflush(buffer_2,").\n");
}
}
}
}
}
}
}
Loading…
Cancel
Save