mirror of
git://erdgeist.org/opentracker
synced 2025-03-01 04:21:30 +08:00
Introducing compression for fullscrapes and tpbs stats
This commit is contained in:
parent
4bf12406fb
commit
b1c8723609
@ -43,6 +43,7 @@
|
|||||||
|
|
||||||
/* Globals */
|
/* Globals */
|
||||||
static const size_t SUCCESS_HTTP_HEADER_LENGTH = 80;
|
static const size_t SUCCESS_HTTP_HEADER_LENGTH = 80;
|
||||||
|
static const size_t SUCCESS_HTTP_HEADER_LENGHT_CONTENT_ENCODING = 32;
|
||||||
static const size_t SUCCESS_HTTP_SIZE_OFF = 17;
|
static const size_t SUCCESS_HTTP_SIZE_OFF = 17;
|
||||||
static uint32_t g_adminip_addresses[OT_ADMINIP_MAX];
|
static uint32_t g_adminip_addresses[OT_ADMINIP_MAX];
|
||||||
static unsigned int g_adminip_count = 0;
|
static unsigned int g_adminip_count = 0;
|
||||||
@ -73,15 +74,14 @@ static size_t ot_sockets_count = 0;
|
|||||||
|
|
||||||
#ifdef _DEBUG_HTTPERROR
|
#ifdef _DEBUG_HTTPERROR
|
||||||
static char debug_request[8192];
|
static char debug_request[8192];
|
||||||
#define _DEBUG_HTTPERROR_PARAM( param ) , param
|
|
||||||
#else
|
|
||||||
#define _DEBUG_HTTPERROR_PARAM( param )
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
STRUCT_HTTP_FLAG_ARRAY_USED = 1,
|
STRUCT_HTTP_FLAG_ARRAY_USED = 1,
|
||||||
STRUCT_HTTP_FLAG_IOB_USED = 2,
|
STRUCT_HTTP_FLAG_IOB_USED = 2,
|
||||||
STRUCT_HTTP_FLAG_WAITINGFORTASK = 4
|
STRUCT_HTTP_FLAG_WAITINGFORTASK = 4,
|
||||||
|
STRUCT_HTTP_FLAG_GZIP = 8,
|
||||||
|
STRUCT_HTTP_FLAG_BZIP2 = 16
|
||||||
} STRUCT_HTTP_FLAG;
|
} STRUCT_HTTP_FLAG;
|
||||||
|
|
||||||
struct http_data {
|
struct http_data {
|
||||||
@ -100,7 +100,7 @@ static int ot_ip_compare( const void *a, const void *b ) { return memcmp( a,b,4
|
|||||||
int main( int argc, char **argv );
|
int main( int argc, char **argv );
|
||||||
|
|
||||||
static void httperror( const int64 s, const char *title, const char *message );
|
static void httperror( const int64 s, const char *title, const char *message );
|
||||||
static void httpresponse( const int64 s, char *data _DEBUG_HTTPERROR_PARAM(size_t l ) );
|
static void httpresponse( const int64 s, char *data, size_t l );
|
||||||
|
|
||||||
static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovector );
|
static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovector );
|
||||||
static void senddata( const int64 s, char *buffer, const size_t size );
|
static void senddata( const int64 s, char *buffer, const size_t size );
|
||||||
@ -162,7 +162,7 @@ static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovec
|
|||||||
iovec_free( &iovec_entries, &iovector );
|
iovec_free( &iovec_entries, &iovector );
|
||||||
HTTPERROR_500;
|
HTTPERROR_500;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If this socket collected request in a buffer,
|
/* If this socket collected request in a buffer,
|
||||||
free it now */
|
free it now */
|
||||||
if( h->flag & STRUCT_HTTP_FLAG_ARRAY_USED ) {
|
if( h->flag & STRUCT_HTTP_FLAG_ARRAY_USED ) {
|
||||||
@ -173,20 +173,24 @@ static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovec
|
|||||||
/* If we came here, wait for the answer is over */
|
/* If we came here, wait for the answer is over */
|
||||||
h->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK;
|
h->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK;
|
||||||
|
|
||||||
/* Our answers never are 0 bytes. Return an error. */
|
/* Our answers never are 0 vectors. Return an error. */
|
||||||
if( !iovec_entries || !iovector[0].iov_len ) {
|
if( !iovec_entries ) {
|
||||||
iovec_free( &iovec_entries, &iovector );
|
|
||||||
HTTPERROR_500;
|
HTTPERROR_500;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Prepare space for http header */
|
/* Prepare space for http header */
|
||||||
header = malloc( SUCCESS_HTTP_HEADER_LENGTH );
|
header = malloc( SUCCESS_HTTP_HEADER_LENGTH + SUCCESS_HTTP_HEADER_LENGHT_CONTENT_ENCODING );
|
||||||
if( !header ) {
|
if( !header ) {
|
||||||
iovec_free( &iovec_entries, &iovector );
|
iovec_free( &iovec_entries, &iovector );
|
||||||
HTTPERROR_500;
|
HTTPERROR_500;
|
||||||
}
|
}
|
||||||
|
|
||||||
header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Length: %zd\r\n\r\n", size );
|
if( h->flag & STRUCT_HTTP_FLAG_GZIP )
|
||||||
|
header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Encoding: gzip\r\nContent-Length: %zd\r\n\r\n", size );
|
||||||
|
else if( h->flag & STRUCT_HTTP_FLAG_BZIP2 )
|
||||||
|
header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Encoding: bzip2\r\nContent-Length: %zd\r\n\r\n", size );
|
||||||
|
else
|
||||||
|
header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Length: %zd\r\n\r\n", size );
|
||||||
|
|
||||||
iob_reset( &h->batch );
|
iob_reset( &h->batch );
|
||||||
iob_addbuf_free( &h->batch, header, header_size );
|
iob_addbuf_free( &h->batch, header, header_size );
|
||||||
@ -241,9 +245,9 @@ static void senddata( const int64 s, char *buffer, size_t size ) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void httpresponse( const int64 s, char *data _DEBUG_HTTPERROR_PARAM( size_t l ) ) {
|
static void httpresponse( const int64 s, char *data, size_t l ) {
|
||||||
struct http_data* h = io_getcookie( s );
|
struct http_data* h = io_getcookie( s );
|
||||||
char *c;
|
char *c, *d=data;
|
||||||
ot_peer peer;
|
ot_peer peer;
|
||||||
ot_torrent *torrent;
|
ot_torrent *torrent;
|
||||||
ot_hash *hash = NULL;
|
ot_hash *hash = NULL;
|
||||||
@ -253,6 +257,9 @@ static void httpresponse( const int64 s, char *data _DEBUG_HTTPERROR_PARAM( size
|
|||||||
ssize_t len;
|
ssize_t len;
|
||||||
size_t reply_size = 0, reply_off;
|
size_t reply_size = 0, reply_off;
|
||||||
|
|
||||||
|
/* Touch l and d in case it is unused */
|
||||||
|
l = l; d = d;
|
||||||
|
|
||||||
#ifdef _DEBUG_HTTPERROR
|
#ifdef _DEBUG_HTTPERROR
|
||||||
if( l >= sizeof( debug_request ) )
|
if( l >= sizeof( debug_request ) )
|
||||||
l = sizeof( debug_request) - 1;
|
l = sizeof( debug_request) - 1;
|
||||||
@ -379,6 +386,12 @@ LOG_TO_STDERR( "sync: %d.%d.%d.%d\n", h->ip[0], h->ip[1], h->ip[2], h->ip[3] );
|
|||||||
}
|
}
|
||||||
|
|
||||||
if( mode == TASK_STATS_TPB ) {
|
if( mode == TASK_STATS_TPB ) {
|
||||||
|
#ifdef WANT_COMPRESSION_GZIP
|
||||||
|
if( strnstr( d, "gzip", l ) ) {
|
||||||
|
h->flag |= STRUCT_HTTP_FLAG_GZIP;
|
||||||
|
format |= TASK_FLAG_GZIP;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
/* Pass this task to the worker thread */
|
/* Pass this task to the worker thread */
|
||||||
h->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK;
|
h->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK;
|
||||||
fullscrape_deliver( s, format );
|
fullscrape_deliver( s, format );
|
||||||
@ -403,9 +416,17 @@ LOG_TO_STDERR( "[%08d] scrp: %d.%d.%d.%d - FULL SCRAPE\n", (unsigned int)(g_now
|
|||||||
#ifdef _DEBUG_HTTPERROR
|
#ifdef _DEBUG_HTTPERROR
|
||||||
write( 2, debug_request, l );
|
write( 2, debug_request, l );
|
||||||
#endif
|
#endif
|
||||||
|
format = 0;
|
||||||
|
#ifdef WANT_COMPRESSION_GZIP
|
||||||
|
if( strnstr( d, "gzip", l ) ) {
|
||||||
|
h->flag |= STRUCT_HTTP_FLAG_GZIP;
|
||||||
|
format = TASK_FLAG_GZIP;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Pass this task to the worker thread */
|
/* Pass this task to the worker thread */
|
||||||
h->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK;
|
h->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK;
|
||||||
fullscrape_deliver( s, TASK_FULLSCRAPE );
|
fullscrape_deliver( s, TASK_FULLSCRAPE | format );
|
||||||
io_dontwantread( s );
|
io_dontwantread( s );
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -655,7 +676,7 @@ static void handle_read( const int64 clientsocket ) {
|
|||||||
/* If we get the whole request in one packet, handle it without copying */
|
/* If we get the whole request in one packet, handle it without copying */
|
||||||
if( !array_start( &h->request ) ) {
|
if( !array_start( &h->request ) ) {
|
||||||
if( memchr( static_inbuf, '\n', l ) )
|
if( memchr( static_inbuf, '\n', l ) )
|
||||||
return httpresponse( clientsocket, static_inbuf _DEBUG_HTTPERROR_PARAM( l ) );
|
return httpresponse( clientsocket, static_inbuf, l );
|
||||||
h->flag |= STRUCT_HTTP_FLAG_ARRAY_USED;
|
h->flag |= STRUCT_HTTP_FLAG_ARRAY_USED;
|
||||||
return array_catb( &h->request, static_inbuf, l );
|
return array_catb( &h->request, static_inbuf, l );
|
||||||
}
|
}
|
||||||
@ -670,7 +691,7 @@ static void handle_read( const int64 clientsocket ) {
|
|||||||
return httperror( clientsocket, "500 request too long", "You sent too much headers");
|
return httperror( clientsocket, "500 request too long", "You sent too much headers");
|
||||||
|
|
||||||
if( memchr( array_start( &h->request ), '\n', array_bytes( &h->request ) ) )
|
if( memchr( array_start( &h->request ), '\n', array_bytes( &h->request ) ) )
|
||||||
return httpresponse( clientsocket, array_start( &h->request ) _DEBUG_HTTPERROR_PARAM( array_bytes( &h->request ) ) );
|
return httpresponse( clientsocket, array_start( &h->request ), array_bytes( &h->request ) );
|
||||||
}
|
}
|
||||||
|
|
||||||
static void handle_write( const int64 clientsocket ) {
|
static void handle_write( const int64 clientsocket ) {
|
||||||
@ -722,9 +743,9 @@ static void handle_timeouted( void ) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void server_mainloop( ) {
|
static void server_mainloop( ) {
|
||||||
time_t next_timeout_check = g_now + OT_CLIENT_TIMEOUT_CHECKINTERVAL;
|
time_t next_timeout_check = g_now + OT_CLIENT_TIMEOUT_CHECKINTERVAL;
|
||||||
struct iovec *iovector;
|
struct iovec *iovector;
|
||||||
int iovec_entries;
|
int iovec_entries;
|
||||||
|
|
||||||
for( ; ; ) {
|
for( ; ; ) {
|
||||||
int64 i;
|
int64 i;
|
||||||
|
@ -7,8 +7,12 @@
|
|||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
|
#ifdef WANT_COMPRESSION_GZIP
|
||||||
|
#include <zlib.h>
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Libowfat */
|
/* Libowfat */
|
||||||
|
#include "byte.h"
|
||||||
#include "textcode.h"
|
#include "textcode.h"
|
||||||
|
|
||||||
/* Opentracker */
|
/* Opentracker */
|
||||||
@ -24,7 +28,7 @@
|
|||||||
#define OT_SCRAPE_CHUNK_SIZE (512*1024)
|
#define OT_SCRAPE_CHUNK_SIZE (512*1024)
|
||||||
|
|
||||||
/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
|
/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
|
||||||
#define OT_FULLSCRAPE_MAXENTRYLEN 100
|
#define OT_FULLSCRAPE_MAXENTRYLEN 256
|
||||||
|
|
||||||
/* Forward declaration */
|
/* Forward declaration */
|
||||||
static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode );
|
static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode );
|
||||||
@ -66,8 +70,12 @@ void fullscrape_deliver( int64 socket, ot_tasktype tasktype ) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) {
|
static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) {
|
||||||
int bucket;
|
int bucket;
|
||||||
char *r, *re;
|
char *r, *re;
|
||||||
|
#ifdef WANT_COMPRESSION_GZIP
|
||||||
|
char compress_buffer[OT_FULLSCRAPE_MAXENTRYLEN];
|
||||||
|
z_stream strm;
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Setup return vector... */
|
/* Setup return vector... */
|
||||||
*iovec_entries = 0;
|
*iovec_entries = 0;
|
||||||
@ -79,8 +87,21 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
|
|||||||
This works as a low watermark */
|
This works as a low watermark */
|
||||||
re = r + OT_SCRAPE_CHUNK_SIZE;
|
re = r + OT_SCRAPE_CHUNK_SIZE;
|
||||||
|
|
||||||
|
#ifdef WANT_COMPRESSION_GZIP
|
||||||
|
if( mode & TASK_FLAG_GZIP ) {
|
||||||
|
byte_zero( &strm, sizeof(strm) );
|
||||||
|
strm.next_in = (ot_byte*)r;
|
||||||
|
if( deflateInit2(&strm,9,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK )
|
||||||
|
fprintf( stderr, "not ok.\n" );
|
||||||
|
|
||||||
|
strm.next_out = (unsigned char*)r;
|
||||||
|
strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
|
||||||
|
r = compress_buffer;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Reply dictionary only needed for bencoded fullscrape */
|
/* Reply dictionary only needed for bencoded fullscrape */
|
||||||
if( mode == TASK_FULLSCRAPE ) {
|
if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) {
|
||||||
memmove( r, "d5:filesd", 9 );
|
memmove( r, "d5:filesd", 9 );
|
||||||
r += 9;
|
r += 9;
|
||||||
}
|
}
|
||||||
@ -97,7 +118,7 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
|
|||||||
ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
|
ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
|
||||||
ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash;
|
ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash;
|
||||||
|
|
||||||
switch( mode ) {
|
switch( mode & TASK_TASK_MASK ) {
|
||||||
case TASK_FULLSCRAPE:
|
case TASK_FULLSCRAPE:
|
||||||
default:
|
default:
|
||||||
/* push hash as bencoded string */
|
/* push hash as bencoded string */
|
||||||
@ -122,6 +143,16 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef WANT_COMPRESSION_GZIP
|
||||||
|
if( mode & TASK_FLAG_GZIP ) {
|
||||||
|
strm.next_in = (ot_byte*)compress_buffer;
|
||||||
|
strm.avail_in = r - compress_buffer;
|
||||||
|
if( deflate( &strm, Z_NO_FLUSH ) != Z_OK )
|
||||||
|
fprintf( stderr, "Not ok.\n" );
|
||||||
|
r = (char*)strm.next_out;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/* If we reached our low watermark in buffer... */
|
/* If we reached our low watermark in buffer... */
|
||||||
if( re - r <= OT_FULLSCRAPE_MAXENTRYLEN ) {
|
if( re - r <= OT_FULLSCRAPE_MAXENTRYLEN ) {
|
||||||
|
|
||||||
@ -134,6 +165,10 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
|
|||||||
/* If this fails: free buffers */
|
/* If this fails: free buffers */
|
||||||
iovec_free( iovec_entries, iovector );
|
iovec_free( iovec_entries, iovector );
|
||||||
|
|
||||||
|
#ifdef WANT_COMPRESSION_GZIP
|
||||||
|
deflateEnd(&strm);
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Release lock on current bucket and return */
|
/* Release lock on current bucket and return */
|
||||||
mutex_bucket_unlock( bucket );
|
mutex_bucket_unlock( bucket );
|
||||||
return;
|
return;
|
||||||
@ -141,7 +176,19 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
|
|||||||
|
|
||||||
/* Adjust new end of output buffer */
|
/* Adjust new end of output buffer */
|
||||||
re = r + OT_SCRAPE_CHUNK_SIZE;
|
re = r + OT_SCRAPE_CHUNK_SIZE;
|
||||||
|
|
||||||
|
#ifdef WANT_COMPRESSION_GZIP
|
||||||
|
if( mode & TASK_FLAG_GZIP ) {
|
||||||
|
strm.next_out = (ot_byte*)r;
|
||||||
|
strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
#ifdef WANT_COMPRESSION_GZIP
|
||||||
|
if( mode & TASK_FLAG_GZIP ) {
|
||||||
|
r = compress_buffer;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
/* All torrents done: release lock on currenct bucket */
|
/* All torrents done: release lock on currenct bucket */
|
||||||
@ -149,10 +196,21 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Close bencoded scrape dictionary if necessary */
|
/* Close bencoded scrape dictionary if necessary */
|
||||||
if( mode == TASK_FULLSCRAPE ) {
|
if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) {
|
||||||
*r++='e'; *r++='e';
|
*r++='e'; *r++='e';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef WANT_COMPRESSION_GZIP
|
||||||
|
if( mode & TASK_FLAG_GZIP ) {
|
||||||
|
strm.next_in = (ot_byte*) compress_buffer;
|
||||||
|
strm.avail_in = r - compress_buffer;
|
||||||
|
if( deflate( &strm, Z_FINISH ) != Z_STREAM_END )
|
||||||
|
fprintf( stderr, "Not ok.\n" );
|
||||||
|
r = (char*)strm.next_out;
|
||||||
|
deflateEnd(&strm);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Release unused memory in current output buffer */
|
/* Release unused memory in current output buffer */
|
||||||
iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) );
|
iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) );
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user