@ -9,59 +9,109 @@
# include <string.h>
# include <pthread.h>
# include <unistd.h>
# include <stdlib.h>
/* Libowfat */
# include "socket.h"
# include "ndelay.h"
# include "byte.h"
/* Opentracker */
# include "trackerlogic.h"
# include "ot_livesync.h"
# include "ot_accesslist.h"
# include "ot_stats.h"
# include "ot_mutex.h"
char groupip_1 [ 4 ] = { 224 , 0 , 23 , 42 } ;
char groupip_1 [ 4 ] = { 224 , 0 , 23 , 5 } ;
# define LIVESYNC_BUFFINSIZE (256*256)
# define LIVESYNC_BUFFWATER (sizeof(ot_peer)+sizeof(ot_hash))
# define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
# define LIVESYNC_OUTGOING_WATERMARK_SCRAPE (sizeof(ot_hash)+sizeof(uint64_t)+sizeof(uint32_t))
# define LIVESYNC_FIRST_BEACON_DELAY (30*60) /* seconds */
# define LIVESYNC_BEACON_INTERVAL 60 /* seconds */
# endif /* WANT_SYNC_SCRAPE */
# define LIVESYNC_MAXDELAY 15 /* seconds */
# endif
} ;
/* Forward declaration */
static void * livesync_worker ( void * args ) ;
/* For outgoing packets */
static int64 g_livesync_socket_in = - 1 ;
static int64 g _socket_in = - 1 ;
/* For incoming packets */
static int64 g_livesync_socket_out = - 1 ;
static int64 g_socket_out = - 1 ;
static uint8_t g_inbuffer [ LIVESYNC_INCOMING_BUFFSIZE ] ;
static uint8_t g_peerbuffer_start [ LIVESYNC_OUTGOING_BUFFSIZE_PEERS ] ;
static uint8_t * g_peerbuffer_pos ;
static uint8_t * g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ;
static ot_time g_next_packet_time ;
static uint8_t livesync_inbuffer [ LIVESYNC_BUFFINSIZE ] ;
static uint8_t livesync_outbuffer_start [ LIVESYNC_BUFFSIZE ] ;
static uint8_t * livesync_outbuffer_pos ;
static uint8_t * livesync_outbuffer_highwater = livesync_outbuffer_start + LIVESYNC_BUFFSIZE - LIVESYNC_BUFFWATER ;
static ot_time livesync_lastpacket_time ;
/* Live sync scrape buffers, states and timers */
static ot_time g_next_beacon_time ;
static ot_time g_next_inquire_time ;
static uint8_t g_scrapebuffer_start [ LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE ] ;
static uint8_t * g_scrapebuffer_pos ;
static uint8_t * g_scrapebuffer_highwater = g_scrapebuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE - LIVESYNC_OUTGOING_WATERMARK_SCRAPE ;
static size_t g_inquire_remote_count ;
static uint32_t g_inquire_remote_host ;
static int g_inquire_inprogress ;
static int g_inquire_bucket ;
# endif /* WANT_SYNC_SCRAPE */
static pthread_t thread_id ;
void livesync_init ( ) {
if ( g_livesync_socket_in = = - 1 )
if ( g_ socket_in = = - 1 )
exerr ( " No socket address for live sync specified. " ) ;
livesync_outbuffer_pos = livesync_outbuffer_start ;
memmove ( livesync_outbuffer_pos , & g_tracker_id , sizeof ( g_tracker_id ) ) ;
livesync_outbuffer_pos + = sizeof ( g_tracker_id ) ;
livesync_lastpacket_time = g_now_seconds ;
/* Prepare outgoing peers buffer */
g_peerbuffer_pos = g_peerbuffer_start ;
memmove ( g_peerbuffer_pos , & g_tracker_id , sizeof ( g_tracker_id ) ) ;
uint32_pack_big ( ( char * ) g_peerbuffer_pos + sizeof ( g_tracker_id ) , OT_SYNC_PEER ) ;
g_peerbuffer_pos + = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
/* Prepare outgoing scrape buffer */
g_scrapebuffer_pos = g_scrapebuffer_start ;
memmove ( g_scrapebuffer_pos , & g_tracker_id , sizeof ( g_tracker_id ) ) ;
uint32_pack_big ( ( char * ) g_scrapebuffer_pos + sizeof ( g_tracker_id ) , OT_SYNC_SCRAPE_TELL ) ;
g_scrapebuffer_pos + = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
/* Wind up timers for inquires */
g_next_beacon_time = g_now_seconds + LIVESYNC_FIRST_BEACON_DELAY ;
# endif /* WANT_SYNC_SCRAPE */
g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY ;
pthread_create ( & thread_id , NULL , livesync_worker , NULL ) ;
void livesync_deinit ( ) {
if ( g_livesync_socket_in ! = - 1 )
close ( g_livesync_socket_in ) ;
if ( g_livesync_socket_out ! = - 1 )
close ( g_livesync_socket_out ) ;
if ( g_ socket_in ! = - 1 )
close ( g_ socket_in ) ;
if ( g_ socket_out ! = - 1 )
close ( g_ socket_out ) ;
pthread_cancel ( thread_id ) ;
@ -69,104 +119,292 @@ void livesync_deinit() {
void livesync_bind_mcast ( char * ip , uint16_t port ) {
char tmpip [ 4 ] = { 0 , 0 , 0 , 0 } ;
if ( g_ livesync_ socket_in ! = - 1 )
if ( g_ socket_in ! = - 1 )
exerr ( " Error: Livesync listen ip specified twice. " ) ;
if ( ( g_ livesync_ socket_in = socket_udp4 ( ) ) < 0 )
if ( ( g_ socket_in = socket_udp4 ( ) ) < 0 )
exerr ( " Error: Cant create live sync incoming socket. " ) ;
ndelay_off ( g_ livesync_ socket_in) ;
ndelay_off ( g_ socket_in) ;
if ( socket_bind4_reuse ( g_ livesync_ socket_in, tmpip , port ) = = - 1 )
if ( socket_bind4_reuse ( g_ socket_in, tmpip , port ) = = - 1 )
exerr ( " Error: Cant bind live sync incoming socket. " ) ;
if ( socket_mcjoin4 ( g_ livesync_ socket_in, groupip_1 , ip ) )
if ( socket_mcjoin4 ( g_ socket_in, groupip_1 , ip ) )
exerr ( " Error: Cant make live sync incoming socket join mcast group. " ) ;
if ( ( g_ livesync_ socket_out = socket_udp4 ( ) ) < 0 )
if ( ( g_ socket_out = socket_udp4 ( ) ) < 0 )
exerr ( " Error: Cant create live sync outgoing socket. " ) ;
if ( socket_bind4_reuse ( g_ livesync_ socket_out, ip , port ) = = - 1 )
if ( socket_bind4_reuse ( g_ socket_out, ip , port ) = = - 1 )
exerr ( " Error: Cant bind live sync outgoing socket. " ) ;
socket_mcttl4 ( g_ livesync_ socket_out, 1 ) ;
socket_mcloop4 ( g_ livesync_ socket_out, 0 ) ;
socket_mcttl4 ( g_ socket_out, 1 ) ;
socket_mcloop4 ( g_ socket_out, 0 ) ;
static void livesync_issue packet ( ) {
socket_send4 ( g_ livesync_ socket_out, ( char * ) livesync_outbuffer_start, livesync_outbuffer_pos - livesync_out buffer_start,
static void livesync_issue _peersync ( ) {
socket_send4 ( g_ socket_out, ( char * ) g_peerbuffer_start, g_peerbuffer_pos - g_peer buffer_start,
groupip_1 , LIVESYNC_PORT ) ;
livesync_outbuffer_pos = livesync_out buffer_start + sizeof ( g_tracker_id ) ;
livesync_lastpacket_time = g_now_seconds ;
g_peerbuffer_pos = g_peer buffer_start + sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY ;
/* Inform live sync about whats going on. */
void livesync_tell ( ot_hash * const info_hash , const ot_peer * const peer ) {
int i ;
for ( i = 0 ; i < 20 ; i + = 4 ) WRITE32 ( livesync_outbuffer_pos , i , READ32 ( info_hash , i ) ) ;
WRITE32 ( livesync_outbuffer_pos , 20 , READ32 ( peer , 0 ) ) ;
WRITE32 ( livesync_outbuffer_pos , 24 , READ32 ( peer , 4 ) ) ;
livesync_outbuffer_pos + = 28 ;
if ( livesync_outbuffer_pos > = livesync_outbuffer_highwater )
livesync_issuepacket ( ) ;
static void livesync_handle_peersync ( ssize_t datalen ) {
int off = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
/* Now basic sanity checks have been done on the live sync packet
We might add more testing and logging . */
while ( off + ( ssize_t ) sizeof ( ot_hash ) + ( ssize_t ) sizeof ( ot_peer ) < = datalen ) {
ot_peer * peer = ( ot_peer * ) ( g_inbuffer + off + sizeof ( ot_hash ) ) ;
ot_hash * hash = ( ot_hash * ) ( g_inbuffer + off ) ;
if ( ! g_opentracker_running ) return ;
remove_peer_from_torrent ( hash , peer , NULL , FLAG_MCA ) ;
add_peer_to_torrent ( hash , peer , FLAG_MCA ) ;
off + = sizeof ( ot_hash ) + sizeof ( ot_peer ) ;
stats_issue_event ( EVENT_SYNC , 0 , datalen / ( ( ssize_t ) sizeof ( ot_hash ) + ( ssize_t ) sizeof ( ot_peer ) ) ) ;
void livesync_issue_beacon ( ) {
size_t torrent_count = mutex_get_torrent_count ( ) ;
uint8_t beacon [ sizeof ( g_tracker_id ) + sizeof ( uint32_t ) + sizeof ( uint64_t ) ] ;
memmove ( beacon , & g_tracker_id , sizeof ( g_tracker_id ) ) ;
uint32_pack_big ( ( char * ) beacon + sizeof ( g_tracker_id ) , OT_SYNC_SCRAPE_BEACON ) ;
uint32_pack_big ( ( char * ) beacon + sizeof ( g_tracker_id ) + sizeof ( uint32_t ) , ( uint32_t ) ( ( uint64_t ) ( torrent_count ) > > 32 ) ) ;
uint32_pack_big ( ( char * ) beacon + sizeof ( g_tracker_id ) + 2 * sizeof ( uint32_t ) , ( uint32_t ) torrent_count ) ;
socket_send4 ( g_socket_out , ( char * ) beacon , sizeof ( beacon ) , groupip_1 , LIVESYNC_PORT ) ;
void livesync_handle_beacon ( ssize_t datalen ) {
size_t torrent_count_local , torrent_count_remote ;
if ( datalen ! = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) + sizeof ( uint64_t ) )
return ;
torrent_count_local = mutex_get_torrent_count ( ) ;
torrent_count_remote = ( size_t ) ( ( ( uint64_t ) uint32_read_big ( ( char * ) g_inbuffer + sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ) ) < < 32 ) ;
torrent_count_remote | = ( size_t ) uint32_read_big ( ( char * ) g_inbuffer + sizeof ( g_tracker_id ) + 2 * sizeof ( uint32_t ) ) ;
/* Empty tracker is useless */
if ( ! torrent_count_remote ) return ;
if ( ( ( double ) torrent_count_local ) / ( ( double ) torrent_count_remote ) < LIVESYNC_INQUIRE_THRESH ) {
if ( ! g_next_inquire_time ) {
g_next_inquire_time = g_now_seconds + 2 * LIVESYNC_BEACON_INTERVAL ;
g_inquire_remote_count = 0 ;
if ( torrent_count_remote > g_inquire_remote_count ) {
g_inquire_remote_count = torrent_count_remote ;
memmove ( & g_inquire_remote_host , g_inbuffer , sizeof ( g_tracker_id ) ) ;
void livesync_issue_inquire ( ) {
uint8_t inquire [ sizeof ( g_tracker_id ) + sizeof ( uint32_t ) + sizeof ( g_tracker_id ) ] ;
memmove ( inquire , & g_tracker_id , sizeof ( g_tracker_id ) ) ;
uint32_pack_big ( ( char * ) inquire + sizeof ( g_tracker_id ) , OT_SYNC_SCRAPE_INQUIRE ) ;
memmove ( inquire + sizeof ( g_tracker_id ) + sizeof ( uint32_t ) , & g_inquire_remote_host , sizeof ( g_tracker_id ) ) ;
socket_send4 ( g_socket_out , ( char * ) inquire , sizeof ( inquire ) , groupip_1 , LIVESYNC_PORT ) ;
void livesync_handle_inquire ( ssize_t datalen ) {
if ( datalen ! = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) + sizeof ( g_tracker_id ) )
return ;
/* If it isn't us, they're inquiring, ignore inquiry */
if ( memcmp ( & g_tracker_id , g_inbuffer , sizeof ( g_tracker_id ) ) )
return ;
/* Start scrape tell on next ticker */
if ( ! g_inquire_inprogress ) {
g_inquire_inprogress = 1 ;
g_inquire_bucket = 0 ;
void livesync_issue_tell ( ) {
while ( packets_to_send > 0 & & g_inquire_bucket < OT_BUCKET_COUNT ) {
ot_vector * torrents_list = mutex_bucket_lock ( g_inquire_bucket ) ;
unsigned int j ;
for ( j = 0 ; j < torrents_list - > size ; + + j ) {
ot_torrent * torrent = ( ot_torrent * ) ( torrents_list - > data ) + j ;
memmove ( g_scrapebuffer_pos , torrent - > hash , sizeof ( ot_hash ) ) ;
g_scrapebuffer_pos + = sizeof ( ot_hash ) ;
uint32_pack_big ( ( char * ) g_scrapebuffer_pos , ( uint32_t ) ( g_now_minutes - torrent - > peer_list - > base ) ) ;
uint32_pack_big ( ( char * ) g_scrapebuffer_pos + 4 , ( uint32_t ) ( ( uint64_t ) ( torrent - > peer_list - > down_count ) > > 32 ) ) ;
uint32_pack_big ( ( char * ) g_scrapebuffer_pos + 8 , ( uint32_t ) torrent - > peer_list - > down_count ) ;
g_scrapebuffer_pos + = 12 ;
if ( g_scrapebuffer_pos > = g_scrapebuffer_highwater ) {
socket_send4 ( g_socket_out , ( char * ) g_scrapebuffer_start , g_scrapebuffer_pos - g_scrapebuffer_start , groupip_1 , LIVESYNC_PORT ) ;
g_scrapebuffer_pos = g_scrapebuffer_start + sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
- - packets_to_send ;
mutex_bucket_unlock ( g_inquire_bucket + + , 0 ) ;
if ( ! g_opentracker_running )
return ;
if ( g_inquire_bucket = = OT_BUCKET_COUNT ) {
socket_send4 ( g_socket_out , ( char * ) g_scrapebuffer_start , g_scrapebuffer_pos - g_scrapebuffer_start , groupip_1 , LIVESYNC_PORT ) ;
g_inquire_inprogress = 0 ;
void livesync_handle_tell ( ssize_t datalen ) {
int off = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
/* Some instance is in progress of telling. Our inquiry was successful.
Don ' t ask again until we see next beacon . */
g_next_inquire_time = 0 ;
/* Don't cause any new inquiries during another tracker's tell */
if ( g_next_beacon_time - g_now_seconds < LIVESYNC_BEACON_INTERVAL )
g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL ;
while ( off + sizeof ( ot_hash ) + 12 < = ( size_t ) datalen ) {
ot_hash * hash = ( ot_hash * ) ( g_inbuffer + off ) ;
ot_vector * torrents_list = mutex_bucket_lock_by_hash ( hash ) ;
size_t down_count_remote ;
int exactmatch ;
ot_torrent * torrent = vector_find_or_insert ( torrents_list , hash , sizeof ( ot_hash ) , OT_HASH_COMPARE_SIZE , & exactmatch ) ;
if ( ! torrent ) {
mutex_bucket_unlock_by_hash ( hash , 0 ) ;
continue ;
if ( ! exactmatch ) {
/* Create a new torrent entry, then */
int i ; for ( i = 0 ; i < 20 ; i + = 4 ) WRITE32 ( & torrent - > hash , i , READ32 ( hash , i ) ) ;
if ( ! ( torrent - > peer_list = malloc ( sizeof ( ot_peerlist ) ) ) ) {
vector_remove_torrent ( torrents_list , torrent ) ;
mutex_bucket_unlock_by_hash ( hash , 0 ) ;
continue ;
byte_zero ( torrent - > peer_list , sizeof ( ot_peerlist ) ) ;
torrent - > peer_list - > base = g_now_minutes - uint32_read_big ( ( char * ) g_inbuffer + off + sizeof ( ot_hash ) ) ;
down_count_remote = ( size_t ) ( ( ( uint64_t ) uint32_read_big ( ( char * ) g_inbuffer + off + sizeof ( ot_hash ) + sizeof ( uint32_t ) ) ) < < 32 ) ;
down_count_remote | = ( size_t ) uint32_read_big ( ( char * ) g_inbuffer + off + sizeof ( ot_hash ) + 2 * sizeof ( uint32_t ) ) ;
if ( down_count_remote > torrent - > peer_list - > down_count )
torrent - > peer_list - > down_count = down_count_remote ;
/* else
We might think of sending a tell packet , if we have a much larger downloaded count
mutex_bucket_unlock ( g_inquire_bucket + + , exactmatch ? 0 : 1 ) ;
if ( ! g_opentracker_running )
return ;
off + = sizeof ( ot_hash ) + 12 ;
# endif /* WANT_SYNC_SCRAPE */
/* Tickle the live sync module from time to time, so no events get
stuck when there ' s not enough traffic to fill udp packets fast
enough */
void livesync_ticker ( ) {
if ( ( g_now_seconds - livesync_lastpacket_time > LIVESYNC_MAXDELAY ) & &
( livesync_outbuffer_pos > livesync_outbuffer_start + sizeof ( g_tracker_id ) ) )
livesync_issuepacket ( ) ;
/* livesync_issue_peersync sets g_next_packet_time */
if ( g_now_seconds > g_next_packet_time & &
g_peerbuffer_pos > g_peerbuffer_start + sizeof ( g_tracker_id ) )
livesync_issue_peersync ( ) ;
/* Send first beacon after running at least LIVESYNC_FIRST_BEACON_DELAY
seconds and not more often than every LIVESYNC_BEACON_INTERVAL seconds */
if ( g_now_seconds > g_next_beacon_time ) {
livesync_issue_beacon ( ) ;
g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL ;
/* If we're interested in an inquiry and waited long enough to see all
tracker ' s beacons , go ahead and inquire */
if ( g_next_inquire_time & & g_now_seconds > g_next_inquire_time ) {
livesync_issue_inquire ( ) ;
/* If packet gets lost, ask again after LIVESYNC_BEACON_INTERVAL */
g_next_inquire_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL ;
/* If we're in process of telling, let's tell. */
if ( g_inquire_inprogress )
livesync_issue_tell ( ) ;
# endif /* WANT_SYNC_SCRAPE */
/* Inform live sync about whats going on. */
void livesync_tell ( ot_hash * const info_hash , const ot_peer * const peer ) {
unsigned int i ;
for ( i = 0 ; i < sizeof ( ot_hash ) / 4 ; i + = 4 ) WRITE32 ( g_peerbuffer_pos , i , READ32 ( info_hash , i ) ) ;
WRITE32 ( g_peerbuffer_pos , sizeof ( ot_hash ) , READ32 ( peer , 0 ) ) ;
WRITE32 ( g_peerbuffer_pos , sizeof ( ot_hash ) + 4 , READ32 ( peer , 4 ) ) ;
g_peerbuffer_pos + = sizeof ( ot_hash ) + 8 ;
if ( g_peerbuffer_pos > = g_peerbuffer_highwater )
livesync_issue_peersync ( ) ;
static void * livesync_worker ( void * args ) {
uint8_t in_ip [ 4 ] ; uint16_t in_port ;
ssize_t datalen ;
int off ;
args = args ;
( void ) args ;
while ( 1 ) {
datalen = socket_recv4 ( g_livesync_socket_in , ( char * ) livesync_inbuffer , LIVESYNC_BUFFINSIZE , ( char * ) in_ip , & in_port ) ;
off = 4 ;
datalen = socket_recv4 ( g_socket_in , ( char * ) g_inbuffer , LIVESYNC_INCOMING_BUFFSIZE , ( char * ) in_ip , & in_port ) ;
if ( datalen < = 0 )
/* Expect at least tracker id and packet type */
if ( datalen < = ( ssize_t ) ( sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ) )
continue ;
if ( datalen < ( ssize_t ) ( sizeof ( g_tracker_id ) + sizeof ( ot_hash ) + sizeof ( ot_peer ) ) ) {
/* TODO: log invalid sync packet */
if ( ! accesslist_isblessed ( ( char * ) in_ip , OT_PERMISSION_MAY_LIVESYNC ) )
continue ;
if ( ! accesslist_isblessed ( ( char * ) in_ip , OT_PERMISSION_MAY_LIVESYNC ) ) {
/* TODO: log invalid sync packet */
continue ;
if ( ! memcmp ( livesync_inbuffer , & g_tracker_id , sizeof ( g_tracker_id ) ) ) {
if ( ! memcmp ( g_inbuffer , & g_tracker_id , sizeof ( g_tracker_id ) ) ) {
/* TODO: log packet coming from ourselves */
continue ;
/* Now basic sanity checks have been done on the live sync packet
We might add more testing and logging . */
while ( off + ( ssize_t ) sizeof ( ot_hash ) + ( ssize_t ) sizeof ( ot_peer ) < = datalen ) {
ot_peer * peer = ( ot_peer * ) ( livesync_inbuffer + off + sizeof ( ot_hash ) ) ;
ot_hash * hash = ( ot_hash * ) ( livesync_inbuffer + off ) ;
if ( ! g_opentracker_running )
return NULL ;
remove_peer_from_torrent ( hash , peer , NULL , FLAG_MCA ) ;
add_peer_to_torrent ( hash , peer WANT_SYNC_PARAM ( 1 ) ) ;
off + = sizeof ( ot_hash ) + sizeof ( ot_peer ) ;
switch ( uint32_read_big ( ( char * ) g_inbuffer ) ) {
livesync_handle_peersync ( datalen ) ;
break ;
livesync_handle_beacon ( datalen ) ;
break ;
livesync_handle_inquire ( datalen ) ;
break ;
livesync_handle_tell ( datalen ) ;
break ;
# endif /* WANT_SYNC_SCRAPE */
default :
break ;
stats_issue_event ( EVENT_SYNC , 0 , datalen / ( ( ssize_t ) sizeof ( ot_hash ) + ( ssize_t ) sizeof ( ot_peer ) ) ) ;
/* Handle outstanding requests */
livesync_ticker ( ) ;
/* Never returns. */
return NULL ;