Cleanup now moved to its own thread, too

dynamic-accesslists
erdgeist 17 years ago
parent 3528752c7b
commit 49ba269b9d

@ -40,12 +40,14 @@
#include "ot_iovec.h" #include "ot_iovec.h"
#include "ot_accesslist.h" #include "ot_accesslist.h"
#include "ot_mutex.h" #include "ot_mutex.h"
#include "ot_clean.h"
/* Globals */ /* Globals */
static const size_t SUCCESS_HTTP_HEADER_LENGTH = 80; static const size_t SUCCESS_HTTP_HEADER_LENGTH = 80;
static const size_t SUCCESS_HTTP_SIZE_OFF = 17; static const size_t SUCCESS_HTTP_SIZE_OFF = 17;
static uint32_t g_adminip_addresses[OT_ADMINIP_MAX]; static uint32_t g_adminip_addresses[OT_ADMINIP_MAX];
static unsigned int g_adminip_count = 0; static unsigned int g_adminip_count = 0;
static time_t ot_last_clean_time;
time_t ot_start_time; time_t ot_start_time;
time_t g_now; time_t g_now;
@ -623,6 +625,7 @@ ANNOUNCE_WORKAROUND:
static void signal_handler( int s ) { static void signal_handler( int s ) {
if( s == SIGINT ) { if( s == SIGINT ) {
signal( SIGINT, SIG_IGN); signal( SIGINT, SIG_IGN);
trackerlogic_deinit(); trackerlogic_deinit();
exit( 0 ); exit( 0 );
} else if( s == SIGALRM ) { } else if( s == SIGALRM ) {
@ -783,9 +786,12 @@ static void server_mainloop( ) {
} }
/* See if we need to move our pools */ /* See if we need to move our pools */
if( g_now != ot_last_clean_time ) {
ot_last_clean_time = g_now;
clean_all_torrents(); clean_all_torrents();
} }
} }
}
static void ot_try_bind( char ip[4], uint16 port, int is_tcp ) { static void ot_try_bind( char ip[4], uint16 port, int is_tcp ) {
int64 s = is_tcp ? socket_tcp4( ) : socket_udp4(); int64 s = is_tcp ? socket_tcp4( ) : socket_udp4();
@ -872,9 +878,7 @@ int main( int argc, char **argv ) {
if( trackerlogic_init( serverdir ) == -1 ) if( trackerlogic_init( serverdir ) == -1 )
panic( "Logic not started" ); panic( "Logic not started" );
fullscrape_init( ); g_now = ot_start_time = ot_last_clean_time = time( NULL );
g_now = ot_start_time = time( NULL );
alarm(5); alarm(5);
server_mainloop( ); server_mainloop( );

@ -4,6 +4,7 @@
/* System */ /* System */
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <pthread.h>
/* Libowfat */ /* Libowfat */
#include "byte.h" #include "byte.h"
@ -12,9 +13,6 @@
#include "trackerlogic.h" #include "trackerlogic.h"
#include "ot_mutex.h" #include "ot_mutex.h"
/* To remember, when we last cleaned up */
static ot_time all_torrents_clean[OT_BUCKET_COUNT];
/* Clean a single torrent /* Clean a single torrent
return 1 if torrent timed out return 1 if torrent timed out
*/ */
@ -83,37 +81,46 @@ int clean_single_torrent( ot_torrent *torrent ) {
return 0; return 0;
} }
/* Clean up all peers in current bucket, remove timedout pools and static void clean_make() {
torrents */ int bucket;
void clean_all_torrents( void ) {
ot_vector *torrents_list;
size_t i;
static int bucket;
ot_time time_now = NOW;
/* Search for an uncleaned bucked */
while( ( all_torrents_clean[bucket] == time_now ) && ( ++bucket < OT_BUCKET_COUNT ) );
if( bucket >= OT_BUCKET_COUNT ) {
bucket = 0; return;
}
all_torrents_clean[bucket] = time_now; for( bucket = OT_BUCKET_COUNT - 1; bucket >= 0; --bucket ) {
ot_vector *torrents_list = mutex_bucket_lock( bucket );
size_t toffs;
torrents_list = mutex_bucket_lock( bucket ); for( toffs=0; toffs<torrents_list->size; ++toffs ) {
for( i=0; i<torrents_list->size; ++i ) { ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + toffs;
ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + i;
if( clean_single_torrent( torrent ) ) { if( clean_single_torrent( torrent ) ) {
vector_remove_torrent( torrents_list, torrent ); vector_remove_torrent( torrents_list, torrent );
--i; continue; --toffs; continue;
} }
} }
mutex_bucket_unlock( bucket ); mutex_bucket_unlock( bucket );
} }
}
/* Clean up all peers in current bucket, remove timedout pools and
torrents */
static void * clean_worker( void * args ) {
args = args;
while( 1 ) {
ot_tasktype tasktype = TASK_CLEAN;
ot_taskid taskid = mutex_workqueue_poptask( &tasktype );
clean_make( );
mutex_workqueue_pushsuccess( taskid );
}
return NULL;
}
void clean_all_torrents( ) {
mutex_workqueue_pushtask( 0, TASK_CLEAN );
}
static pthread_t thread_id;
void clean_init( void ) { void clean_init( void ) {
byte_zero( all_torrents_clean, sizeof( all_torrents_clean ) ); pthread_create( &thread_id, NULL, clean_worker, NULL );
} }
void clean_deinit( void ) { void clean_deinit( void ) {
byte_zero( all_torrents_clean, sizeof( all_torrents_clean ) ); pthread_cancel( thread_id );
} }

@ -218,6 +218,30 @@ ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype ) {
return taskid; return taskid;
} }
void mutex_workqueue_pushsuccess( ot_taskid taskid ) {
struct ot_task ** task;
/* Want exclusive access to tasklist */
MTX_DBG( "pushsuccess locks.\n" );
pthread_mutex_lock( &tasklist_mutex );
MTX_DBG( "pushsuccess locked.\n" );
task = &tasklist;
while( *task && ( (*task)->taskid != taskid ) )
*task = (*task)->next;
if( *task && ( (*task)->taskid == taskid ) ) {
struct ot_task *ptask = *task;
*task = (*task)->next;
free( ptask );
}
/* Release lock */
MTX_DBG( "pushsuccess unlocks.\n" );
pthread_mutex_unlock( &tasklist_mutex );
MTX_DBG( "pushsuccess unlocked.\n" );
}
int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovec ) { int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovec ) {
struct ot_task * task; struct ot_task * task;
/* Want exclusive access to tasklist */ /* Want exclusive access to tasklist */

@ -32,9 +32,11 @@ typedef enum {
TASK_FULLSCRAPE_TPB_ASCII = 0x0202, TASK_FULLSCRAPE_TPB_ASCII = 0x0202,
TASK_FULLSCRAPE_TPB_URLENCODED = 0x0203, TASK_FULLSCRAPE_TPB_URLENCODED = 0x0203,
TASK_SYNC = 0x0300, TASK_CLEAN = 0x0300,
TASK_DMEM = 0x0400, TASK_SYNC = 0x0400,
TASK_DMEM = 0x0500,
TASK_DONE = 0x0f00, TASK_DONE = 0x0f00,
TASK_MASK = 0xff00 TASK_MASK = 0xff00
@ -44,6 +46,7 @@ typedef unsigned long ot_taskid;
int mutex_workqueue_pushtask( int64 socket, ot_tasktype tasktype ); int mutex_workqueue_pushtask( int64 socket, ot_tasktype tasktype );
void mutex_workqueue_canceltask( int64 socket ); void mutex_workqueue_canceltask( int64 socket );
void mutex_workqueue_pushsuccess( ot_taskid taskid );
ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype ); ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype );
int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovector ); int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovector );
int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovector ); int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovector );

@ -22,6 +22,7 @@
#include "ot_stats.h" #include "ot_stats.h"
#include "ot_clean.h" #include "ot_clean.h"
#include "ot_accesslist.h" #include "ot_accesslist.h"
#include "ot_fullscrape.h"
void free_peerlist( ot_peerlist *peer_list ) { void free_peerlist( ot_peerlist *peer_list ) {
size_t i; size_t i;
@ -321,8 +322,10 @@ int trackerlogic_init( const char * const serverdir ) {
srandom( time(NULL) ); srandom( time(NULL) );
clean_init( ); /* Initialise background worker threads */
mutex_init( ); mutex_init( );
clean_init( );
fullscrape_init( );
return 0; return 0;
} }
@ -343,6 +346,9 @@ void trackerlogic_deinit( void ) {
} }
mutex_bucket_unlock( bucket ); mutex_bucket_unlock( bucket );
} }
mutex_deinit( );
/* Deinitialise background worker threads */
fullscrape_init( );
clean_deinit( ); clean_deinit( );
mutex_deinit( );
} }

Loading…
Cancel
Save