Live sync is now handled in its own thread. Therefore it now creates and handles its own sockets.

This commit is contained in:
erdgeist 2008-10-06 02:05:53 +00:00
parent 17724dde29
commit 465cc2ecdf
2 changed files with 89 additions and 61 deletions

View File

@ -7,9 +7,11 @@
#include <sys/types.h>
#include <sys/uio.h>
#include <string.h>
#include <pthread.h>
/* Libowfat */
#include "socket.h"
#include "ndelay.h"
/* Opentracker */
#include "trackerlogic.h"
@ -17,10 +19,23 @@
#include "ot_accesslist.h"
char groupip_1[4] = { LIVESYNC_MCASTDOMAIN_1 };
char groupip_1[4] = { 224,0,23,42 };
#define LIVESYNC_BUFFINSIZE (256*256)
#define LIVESYNC_BUFFWATER (sizeof(ot_peer)+sizeof(ot_hash))
/* Forward declaration */
static void * livesync_worker( void * args );
/* For outgoing packets */
int64 g_livesync_socket = -1;
static int64 g_livesync_socket_in = -1;
/* For incoming packets */
static int64 g_livesync_socket_out = -1;
static uint8_t livesync_inbuffer[LIVESYNC_BUFFINSIZE];
static uint8_t livesync_outbuffer_start[ LIVESYNC_BUFFSIZE ];
@ -28,34 +43,49 @@ static uint8_t *livesync_outbuffer_pos;
static uint8_t *livesync_outbuffer_highwater = livesync_outbuffer_start + LIVESYNC_BUFFSIZE - LIVESYNC_BUFFWATER;
static ot_time livesync_lastpacket_time;
static pthread_t thread_id;
void livesync_init( ) {
if( g_livesync_socket == -1 )
if( g_livesync_socket_in == -1 )
exerr( "No socket address for live sync specified." );
livesync_outbuffer_pos = livesync_outbuffer_start;
memmove( livesync_outbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
livesync_outbuffer_pos += sizeof( g_tracker_id );
livesync_lastpacket_time = g_now;
pthread_create( &thread_id, NULL, livesync_worker, NULL );
void livesync_deinit() {
pthread_cancel( thread_id );
void livesync_bind_mcast( char *ip, uint16_t port) {
char tmpip[4] = {0,0,0,0};
if( g_livesync_socket != -1 )
exerr("Livesync listen ip specified twice.");
if( socket_mcjoin4( ot_try_bind(tmpip, port, FLAG_MCA ), groupip_1, ip ) )
exerr("Cant join mcast group.");
g_livesync_socket = ot_try_bind( ip, port, FLAG_UDP );
socket_mcttl4(g_livesync_socket, 1);
socket_mcloop4(g_livesync_socket, 0);
if( g_livesync_socket_in != -1 )
exerr("Error: Livesync listen ip specified twice.");
if( ( g_livesync_socket_in = socket_udp4( )) < 0)
exerr("Error: Cant create live sync incoming socket." );
if( socket_bind4_reuse( g_livesync_socket_in, tmpip, port ) == -1 )
exerr("Error: Cant bind live sync incoming socket." );
if( socket_mcjoin4( g_livesync_socket_in, groupip_1, ip ) )
exerr("Error: Cant make live sync incoming socket join mcast group.");
if( ( g_livesync_socket_out = socket_udp4()) < 0)
exerr("Error: Cant create live sync outgoing socket." );
if( socket_bind4_reuse( g_livesync_socket_out, ip, port ) == -1 )
exerr("Error: Cant bind live sync outgoing socket." );
socket_mcttl4(g_livesync_socket_out, 1);
socket_mcloop4(g_livesync_socket_out, 0);
static void livesync_issuepacket( ) {
socket_send4(g_livesync_socket, (char*)livesync_outbuffer_start, livesync_outbuffer_pos - livesync_outbuffer_start,
socket_send4(g_livesync_socket_out, (char*)livesync_outbuffer_start, livesync_outbuffer_pos - livesync_outbuffer_start,
groupip_1, LIVESYNC_PORT);
livesync_outbuffer_pos = livesync_outbuffer_start + sizeof( g_tracker_id );
livesync_lastpacket_time = g_now;
@ -81,41 +111,51 @@ void livesync_ticker( ) {
/* Handle an incoming live sync packet */
void handle_livesync( int64 serversocket ) {
static void * livesync_worker( void * args ) {
uint8_t in_ip[4]; uint16_t in_port;
ssize_t datalen = socket_recv4(serversocket, (char*)livesync_inbuffer, LIVESYNC_BUFFINSIZE, (char*)in_ip, &in_port);
int off = 4;
ssize_t datalen;
int off;
if( datalen < (ssize_t)(sizeof( g_tracker_id ) + sizeof( ot_hash ) + sizeof( ot_peer ) ) ) {
// TODO: log invalid sync packet
args = args;
while( 1 ) {
datalen = socket_recv4(g_livesync_socket_in, (char*)livesync_inbuffer, LIVESYNC_BUFFINSIZE, (char*)in_ip, &in_port);
off = 4;
if( datalen <= 0 )
if( datalen < (ssize_t)(sizeof( g_tracker_id ) + sizeof( ot_hash ) + sizeof( ot_peer ) ) ) {
// TODO: log invalid sync packet
if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) {
// TODO: log invalid sync packet
if( !memcmp( livesync_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
// TODO: log packet coming from ourselves
// Now basic sanity checks have been done on the live sync packet
// We might add more testing and logging.
while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) {
ot_peer *peer = (ot_peer*)(livesync_inbuffer + off + sizeof(ot_hash));
ot_hash *hash = (ot_hash*)(livesync_inbuffer + off);
remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA);
add_peer_to_torrent( hash, peer WANT_SYNC_PARAM(1));
off += sizeof( ot_hash ) + sizeof( ot_peer );
if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) {
// TODO: log invalid sync packet
if( !memcmp( livesync_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
// TODO: log packet coming from ourselves
// Now basic sanity checks have been done on the live sync packet
// We might add more testing and logging.
while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) {
ot_peer *peer = (ot_peer*)(livesync_inbuffer + off + sizeof(ot_hash));
ot_hash *hash = (ot_hash*)(livesync_inbuffer + off);
remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA);
add_peer_to_torrent( hash, peer WANT_SYNC_PARAM(1));
off += sizeof( ot_hash ) + sizeof( ot_peer );
/* Never returns. */
return NULL;

View File

@ -10,14 +10,14 @@
#include "trackerlogic.h"
Syncing is done as udp packets in the multicast domain 224.23.42.N port 9696
Syncing is done as udp packets in the multicast domain 224.0.42.N port 9696
Each tracker should join the multicast group and send its live sync packets
to that group, using a ttl of 1
Format of a live sync packet is straight forward and depends on N:
For N == 1: (simple tracker2tracker sync)
For N == 23: (simple tracker2tracker sync)
0x0000 0x04 id of tracker instance
[ 0x0004 0x14 info_hash
0x0018 0x04 peer's ipv4 address
@ -25,7 +25,7 @@
0x0020 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 )
For N == 2: (aggregator syncs)
For N == 24: (aggregator syncs)
0x0000 0x04 id of tracker instance
[ 0x0004 0x14 info_hash
0x0018 0x01 number of peers
@ -41,18 +41,6 @@
#define LIVESYNC_PORT 9696
#define LIVESYNC_MCASTDOMAIN_1 224,23,42,1
#define LIVESYNC_MCASTDOMAIN_2 224,23,42,2
extern char groupip_1[4];
extern char groupip_2[4];
extern int64 g_livesync_socket;
#define LIVESYNC_BUFFINSIZE (256*256)
#define LIVESYNC_BUFFWATER (sizeof(ot_peer)+sizeof(ot_hash))
void livesync_init();
void livesync_deinit();