mirror of
				git://erdgeist.org/opentracker
				synced 2025-11-03 19:33:23 +08:00 
			
		
		
		
	Reaching completion soon
This commit is contained in:
		
							parent
							
								
									21b5baf0c1
								
							
						
					
					
						commit
						5168a3314c
					
				
							
								
								
									
										470
									
								
								proxy.c
									
									
									
									
									
								
							
							
						
						
									
										470
									
								
								proxy.c
									
									
									
									
									
								
							@ -4,6 +4,7 @@
 | 
			
		||||
   $Id$ */
 | 
			
		||||
 | 
			
		||||
/* System */
 | 
			
		||||
#include <stdint.h>
 | 
			
		||||
#include <stdlib.h>
 | 
			
		||||
#include <string.h>
 | 
			
		||||
#include <arpa/inet.h>
 | 
			
		||||
@ -14,6 +15,7 @@
 | 
			
		||||
#include <stdio.h>
 | 
			
		||||
#include <pwd.h>
 | 
			
		||||
#include <ctype.h>
 | 
			
		||||
#include <pthread.h>
 | 
			
		||||
 | 
			
		||||
/* Libowfat */
 | 
			
		||||
#include "socket.h"
 | 
			
		||||
@ -26,30 +28,59 @@
 | 
			
		||||
 | 
			
		||||
/* Opentracker */
 | 
			
		||||
#include "trackerlogic.h"
 | 
			
		||||
#include "ot_vector.h"
 | 
			
		||||
#include "ot_mutex.h"
 | 
			
		||||
#include "ot_livesync.h"
 | 
			
		||||
#include "ot_stats.h"
 | 
			
		||||
 | 
			
		||||
ot_ip6   g_serverip; 
 | 
			
		||||
uint16_t g_serverport = 9009;
 | 
			
		||||
uint32_t g_tracker_id;
 | 
			
		||||
char groupip_1[4] = { 224,0,23,5 };
 | 
			
		||||
char     groupip_1[4] = { 224,0,23,5 };
 | 
			
		||||
int      g_self_pipe[2];
 | 
			
		||||
 | 
			
		||||
/* If you have more than 10 peers, don't use this proxy
 | 
			
		||||
   Use 20 slots for 10 peers to have room for 10 incoming connection slots
 | 
			
		||||
 */
 | 
			
		||||
#define MAX_PEERS 20
 | 
			
		||||
 | 
			
		||||
#define LIVESYNC_INCOMING_BUFFSIZE          (256*256)
 | 
			
		||||
#define STREAMSYNC_OUTGOING_BUFFSIZE        (256*256)
 | 
			
		||||
 | 
			
		||||
#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS     1480
 | 
			
		||||
#define LIVESYNC_OUTGOING_WATERMARK_PEERS   (sizeof(ot_peer)+sizeof(ot_hash))
 | 
			
		||||
 | 
			
		||||
enum { OT_SYNC_PEER };
 | 
			
		||||
/* The amount of time a complete sync cycle should take */
 | 
			
		||||
#define OT_SYNC_INTERVAL_MINUTES             2
 | 
			
		||||
 | 
			
		||||
/* For outgoing packets */
 | 
			
		||||
static int64    g_socket_in = -1;
 | 
			
		||||
/* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */
 | 
			
		||||
#define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) )
 | 
			
		||||
 | 
			
		||||
enum { OT_SYNC_PEER };
 | 
			
		||||
enum { FLAG_SERVERSOCKET = 1 };
 | 
			
		||||
 | 
			
		||||
/* For incoming packets */
 | 
			
		||||
static int64    g_socket_out = -1;
 | 
			
		||||
static int64    g_socket_in = -1;
 | 
			
		||||
static uint8_t  g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE];
 | 
			
		||||
 | 
			
		||||
/* For outgoing packets */
 | 
			
		||||
static int64    g_socket_out = -1;
 | 
			
		||||
//static uint8_t  g_outbuffer[STREAMSYNC_OUTGOING_BUFFSIZE];
 | 
			
		||||
 | 
			
		||||
static void * livesync_worker( void * args );
 | 
			
		||||
static void * streamsync_worker( void * args );
 | 
			
		||||
 | 
			
		||||
void exerr( char * message ) {
 | 
			
		||||
  fprintf( stderr, "%s\n", message );
 | 
			
		||||
  exit( 111 );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) {
 | 
			
		||||
  (void) event;
 | 
			
		||||
  (void) proto;
 | 
			
		||||
  (void) event_data;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
 | 
			
		||||
  char tmpip[4] = {0,0,0,0};
 | 
			
		||||
  char *v4ip;
 | 
			
		||||
@ -80,16 +111,6 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
 | 
			
		||||
  socket_mcloop4(g_socket_out, 1);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static ot_vector all_torrents[OT_BUCKET_COUNT];
 | 
			
		||||
ot_vector *mutex_bucket_lock_by_hash( ot_hash hash ) {
 | 
			
		||||
  return all_torrents + ( uint32_read_big( (char*)hash ) >> OT_BUCKET_COUNT_SHIFT );
 | 
			
		||||
}
 | 
			
		||||
ot_vector *mutex_bucket_lock( int bucket ) {
 | 
			
		||||
  return all_torrents + bucket;
 | 
			
		||||
}
 | 
			
		||||
#define mutex_bucket_unlock_by_hash(A,B)
 | 
			
		||||
#define mutex_bucket_unlock(A)
 | 
			
		||||
 | 
			
		||||
size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) {
 | 
			
		||||
  int         exactmatch;
 | 
			
		||||
  ot_torrent *torrent;
 | 
			
		||||
@ -106,6 +127,7 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) {
 | 
			
		||||
 | 
			
		||||
    if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) {
 | 
			
		||||
      vector_remove_torrent( torrents_list, torrent );
 | 
			
		||||
      mutex_bucket_unlock_by_hash( hash, 0 );
 | 
			
		||||
      return -1;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -114,8 +136,10 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) {
 | 
			
		||||
 | 
			
		||||
  /* Check for peer in torrent */
 | 
			
		||||
  peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch );
 | 
			
		||||
  if( !peer_dest ) return -1;
 | 
			
		||||
 | 
			
		||||
  if( !peer_dest ) {
 | 
			
		||||
    mutex_bucket_unlock_by_hash( hash, 0 );
 | 
			
		||||
    return -1;
 | 
			
		||||
  }
 | 
			
		||||
  /* Tell peer that it's fresh */
 | 
			
		||||
  OT_PEERTIME( peer ) = 0;
 | 
			
		||||
 | 
			
		||||
@ -126,6 +150,7 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) {
 | 
			
		||||
      torrent->peer_list->seed_count++;
 | 
			
		||||
  }
 | 
			
		||||
  memcpy( peer_dest, peer, sizeof(ot_peer) );
 | 
			
		||||
  mutex_bucket_unlock_by_hash( hash, 0 );
 | 
			
		||||
  return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -143,6 +168,7 @@ size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer ) {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  mutex_bucket_unlock_by_hash( hash, 0 );
 | 
			
		||||
  return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -182,16 +208,243 @@ int usage( char *self ) {
 | 
			
		||||
  return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static uint32_t peer_counts[1024];
 | 
			
		||||
#ifdef WANT_SCROOOOOOOLL
 | 
			
		||||
static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=d+40;while(d<e){*d++=m[*s>>4];*d++=m[*s++&15];}*d=0;return t;}
 | 
			
		||||
#endif
 | 
			
		||||
enum {
 | 
			
		||||
  FLAG_OUTGOING      = 0x80,
 | 
			
		||||
 | 
			
		||||
  FLAG_DISCONNECTED  = 0x00,
 | 
			
		||||
  FLAG_CONNECTING    = 0x01,
 | 
			
		||||
  FLAG_WAITTRACKERID = 0x02,
 | 
			
		||||
  FLAG_CONNECTED     = 0x03,
 | 
			
		||||
 | 
			
		||||
  FLAG_MASK          = 0x07
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
#define PROXYPEER_NEEDSCONNECT(flag)      ((flag)==FLAG_OUTGOING)
 | 
			
		||||
#define PROXYPEER_SETDISCONNECTED(flag)   (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED)
 | 
			
		||||
#define PROXYPEER_SETCONNECTING(flag)     (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING)
 | 
			
		||||
#define PROXYPEER_SETWAITTRACKERID(flag)  (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID)
 | 
			
		||||
#define PROXYPEER_SETCONNECTED(flag)      (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED)
 | 
			
		||||
 | 
			
		||||
typedef struct {
 | 
			
		||||
  int      state;          /* Whether we want to connect, how far our handshake is, etc. */
 | 
			
		||||
  ot_ip6   ip;             /* The peer to connect to */
 | 
			
		||||
  uint16_t port;           /* The peers port */
 | 
			
		||||
  uint8_t *indata;         /* Any data not processed yet */
 | 
			
		||||
  size_t   indata_length;  /* Length of unprocessed data */
 | 
			
		||||
  uint32_t tracker_id;     /* How the other end greeted */
 | 
			
		||||
  int64    fd;             /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
 | 
			
		||||
  io_batch outdata;        /* The iobatch containing our sync data */
 | 
			
		||||
} proxy_peer;
 | 
			
		||||
 | 
			
		||||
/* Number of connections to peers
 | 
			
		||||
   * If a peer's IP is set, we try to reconnect, when the connection drops
 | 
			
		||||
   * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it
 | 
			
		||||
   * Multiple connections to/from the same ip are okay, if tracker_id doesn't match
 | 
			
		||||
   * Reconnect attempts occur only twice a minute
 | 
			
		||||
*/
 | 
			
		||||
static int        g_connection_count;
 | 
			
		||||
static ot_time    g_connection_reconn;
 | 
			
		||||
static proxy_peer g_connections[MAX_PEERS];
 | 
			
		||||
 | 
			
		||||
static void handle_reconnects( void ) {
 | 
			
		||||
  int i;
 | 
			
		||||
  for( i=0; i<g_connection_count; ++i )
 | 
			
		||||
    if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) {
 | 
			
		||||
      int64 newfd = socket_tcp6( );
 | 
			
		||||
      if( newfd < 0 ) continue; /* No socket for you */
 | 
			
		||||
      io_fd(newfd);
 | 
			
		||||
      if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) {
 | 
			
		||||
        io_close( newfd );
 | 
			
		||||
        continue;
 | 
			
		||||
      }
 | 
			
		||||
      if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 &&
 | 
			
		||||
          errno != EINPROGRESS && errno != EWOULDBLOCK ) {
 | 
			
		||||
        close(newfd);
 | 
			
		||||
        continue;
 | 
			
		||||
      }
 | 
			
		||||
      io_wantwrite(newfd); /* So we will be informed when it is connected */
 | 
			
		||||
      io_setcookie(newfd,g_connections+i);
 | 
			
		||||
 | 
			
		||||
      /* Prepare connection info block */
 | 
			
		||||
      free( g_connections[i].indata );
 | 
			
		||||
      g_connections[i].indata        = 0;
 | 
			
		||||
      g_connections[i].indata_length = 0;
 | 
			
		||||
      g_connections[i].fd            = newfd;
 | 
			
		||||
      g_connections[i].tracker_id    = 0;
 | 
			
		||||
      iob_reset( &g_connections[i].outdata );
 | 
			
		||||
      PROXYPEER_SETCONNECTING( g_connections[i].state );
 | 
			
		||||
    }
 | 
			
		||||
  g_connection_reconn = time(NULL) + 30;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/* Handle incoming connection requests, check against whitelist */
 | 
			
		||||
static void handle_accept( int64 serversocket ) {
 | 
			
		||||
  int64 newfd;
 | 
			
		||||
  ot_ip6 ip;
 | 
			
		||||
  uint16 port;
 | 
			
		||||
 | 
			
		||||
  while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) {
 | 
			
		||||
 | 
			
		||||
    /* XXX some access control */
 | 
			
		||||
 | 
			
		||||
    /* Put fd into a non-blocking mode */
 | 
			
		||||
    io_nonblock( newfd );
 | 
			
		||||
 | 
			
		||||
    if( !io_fd( newfd ) )
 | 
			
		||||
      io_close( newfd );
 | 
			
		||||
    else {
 | 
			
		||||
      /* Find a new home for our incoming connection */
 | 
			
		||||
      int i;
 | 
			
		||||
      for( i=0; i<MAX_PEERS; ++i )
 | 
			
		||||
        if( g_connections[i].state == FLAG_DISCONNECTED )
 | 
			
		||||
          break;
 | 
			
		||||
      if( i == MAX_PEERS ) {
 | 
			
		||||
        fprintf( stderr, "No room for incoming connection." );
 | 
			
		||||
        close( newfd );
 | 
			
		||||
        continue;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      /* Prepare connection info block */
 | 
			
		||||
      free( g_connections[i].indata );
 | 
			
		||||
      g_connections[i].indata        = 0;
 | 
			
		||||
      g_connections[i].indata_length = 0;
 | 
			
		||||
      g_connections[i].port          = port;
 | 
			
		||||
      g_connections[i].fd            = newfd;
 | 
			
		||||
      g_connections[i].tracker_id    = 0;
 | 
			
		||||
      iob_reset( &g_connections[i].outdata );
 | 
			
		||||
      g_connections[i].tracker_id    = 0;
 | 
			
		||||
 | 
			
		||||
      PROXYPEER_SETCONNECTING( g_connections[i].state );
 | 
			
		||||
 | 
			
		||||
      io_setcookie( newfd, g_connections + i );
 | 
			
		||||
 | 
			
		||||
      /* We expect the connecting side to begin with its tracker_id */
 | 
			
		||||
      io_wantread( newfd );
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/* New sync data on the stream */
 | 
			
		||||
static void handle_read( int64 peersocket ) {
 | 
			
		||||
  uint32_t tracker_id;
 | 
			
		||||
  proxy_peer *peer = io_getcookie( peersocket );
 | 
			
		||||
  if( !peer ) {
 | 
			
		||||
    /* Can't happen ;) */
 | 
			
		||||
    close( peersocket );
 | 
			
		||||
    return;
 | 
			
		||||
  }
 | 
			
		||||
  switch( peer->state & FLAG_MASK ) {
 | 
			
		||||
  case FLAG_DISCONNECTED: break; /* Shouldnt happen */
 | 
			
		||||
  case FLAG_CONNECTING:
 | 
			
		||||
  case FLAG_WAITTRACKERID:
 | 
			
		||||
    /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) */
 | 
			
		||||
    if( io_tryread( peersocket, &tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) )
 | 
			
		||||
      goto close_socket;
 | 
			
		||||
 | 
			
		||||
    /* See, if we already have a connection to that peer */
 | 
			
		||||
    for( i=0; i<MAX_PEERS; ++i )
 | 
			
		||||
      if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED && 
 | 
			
		||||
            g_connections[i].tracker_id == tracker_id )
 | 
			
		||||
        goto close_socket;
 | 
			
		||||
 | 
			
		||||
    /* Also no need for soliloquy */
 | 
			
		||||
    if( tracker_id == g_tracker_id )
 | 
			
		||||
      goto close_socket;
 | 
			
		||||
 | 
			
		||||
    /* The new connection is good, send our tracker_id on incoming connections */
 | 
			
		||||
    if( peer->state == FLAG_CONNECTING )
 | 
			
		||||
      io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) );
 | 
			
		||||
 | 
			
		||||
    peer->tracker_id = tracker_id;
 | 
			
		||||
    PROXYPEER_SETCONNECTED( peer->state );
 | 
			
		||||
 | 
			
		||||
    break;
 | 
			
		||||
close_socket:
 | 
			
		||||
    io_close( peersocket );
 | 
			
		||||
    PROXYPEER_SETDISCONNECTED( peer->state );
 | 
			
		||||
    break;
 | 
			
		||||
  case FLAG_CONNECTED:
 | 
			
		||||
    
 | 
			
		||||
    break;
 | 
			
		||||
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/* Can write new sync data to the stream */
 | 
			
		||||
static void handle_write( int64 peersocket ) {
 | 
			
		||||
  proxy_peer *peer = io_getcookie( peersocket );
 | 
			
		||||
  if( !peer ) { 
 | 
			
		||||
    /* Can't happen ;) */
 | 
			
		||||
    close( peersocket );
 | 
			
		||||
    return;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  switch( peer->state & FLAG_MASK ) {
 | 
			
		||||
  case FLAG_DISCONNECTED: break; /* Shouldnt happen */
 | 
			
		||||
  case FLAG_CONNECTING:
 | 
			
		||||
    io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) );
 | 
			
		||||
    PROXYPEER_SETWAITTRACKERID( peer->state );
 | 
			
		||||
    io_dontwantwrite( peersocket );
 | 
			
		||||
    io_wantread( peersocket );
 | 
			
		||||
    break;
 | 
			
		||||
  case FLAG_CONNECTED:
 | 
			
		||||
    switch( iob_send( peersocket, &peer->outdata ) ) {
 | 
			
		||||
    case 0: /* all data sent */
 | 
			
		||||
      io_dontwantwrite( peersocket );
 | 
			
		||||
      break;
 | 
			
		||||
    case -3: /* an error occured */
 | 
			
		||||
      io_close( peersocket );
 | 
			
		||||
      PROXYPEER_SETDISCONNECTED( peer->state );
 | 
			
		||||
      iob_reset( &peer->outdata );
 | 
			
		||||
      free( peer->indata );
 | 
			
		||||
    default: /* Normal operation or eagain */
 | 
			
		||||
      break;
 | 
			
		||||
    }
 | 
			
		||||
    break;
 | 
			
		||||
  default:
 | 
			
		||||
    break;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void server_mainloop() {
 | 
			
		||||
  int64 sock;
 | 
			
		||||
  tai6464 now;
 | 
			
		||||
 | 
			
		||||
  while(1) {
 | 
			
		||||
    /* See, if we need to connect to anyone */
 | 
			
		||||
    if( time(NULL) > g_connection_reconn )
 | 
			
		||||
      handle_reconnects( );
 | 
			
		||||
 | 
			
		||||
    /* Wait for io events until next approx reconn check time */
 | 
			
		||||
    taia_now( &now );
 | 
			
		||||
    taia_addsec( &now, &now, 30 );
 | 
			
		||||
    io_waituntil( now );
 | 
			
		||||
 | 
			
		||||
    /* Loop over readable sockets */
 | 
			
		||||
    while( ( sock = io_canread( ) ) != -1 ) {
 | 
			
		||||
      const void *cookie = io_getcookie( sock );
 | 
			
		||||
      if( (uintptr_t)cookie == FLAG_SERVERSOCKET )
 | 
			
		||||
        handle_accept( sock );
 | 
			
		||||
      else
 | 
			
		||||
        handle_read( sock );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /* Loop over writable sockets */
 | 
			
		||||
    while( ( sock = io_canwrite( ) ) != -1 )
 | 
			
		||||
      handle_write( sock );
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int main( int argc, char **argv ) {
 | 
			
		||||
  static pthread_t sync_in_thread_id;
 | 
			
		||||
  static pthread_t sync_out_thread_id;
 | 
			
		||||
  ot_ip6 serverip;
 | 
			
		||||
  uint16_t tmpport;
 | 
			
		||||
  int scanon = 1, bound = 0;
 | 
			
		||||
  time_t next_dump = time(NULL)+1;
 | 
			
		||||
 | 
			
		||||
  srandom( time(NULL) );
 | 
			
		||||
  g_tracker_id = random();
 | 
			
		||||
@ -199,7 +452,7 @@ int main( int argc, char **argv ) {
 | 
			
		||||
  while( scanon ) {
 | 
			
		||||
    switch( getopt( argc, argv, ":i:p:vh" ) ) {
 | 
			
		||||
    case -1: scanon = 0; break;
 | 
			
		||||
    case 'i': 
 | 
			
		||||
    case 'S': 
 | 
			
		||||
      if( !scan_ip6( optarg, serverip )) { usage( argv[0] ); exit( 1 ); }
 | 
			
		||||
      break;
 | 
			
		||||
    case 'p':
 | 
			
		||||
@ -211,7 +464,144 @@ int main( int argc, char **argv ) {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if( !bound ) exerr( "No port bound." );
 | 
			
		||||
  pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL );
 | 
			
		||||
  pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL );
 | 
			
		||||
 | 
			
		||||
  server_mainloop();
 | 
			
		||||
  return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void * streamsync_worker( void * args ) {
 | 
			
		||||
  (void)args;
 | 
			
		||||
  while( 1 ) {
 | 
			
		||||
    int bucket;
 | 
			
		||||
    /* For each bucket... */
 | 
			
		||||
    for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
 | 
			
		||||
      /* Get exclusive access to that bucket */
 | 
			
		||||
      ot_vector *torrents_list = mutex_bucket_lock( bucket );
 | 
			
		||||
      size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0;
 | 
			
		||||
      size_t mem, mem_a = 0, mem_b = 0;
 | 
			
		||||
      uint8_t *ptr, *ptr_a, *ptr_b, *ptr_c;
 | 
			
		||||
 | 
			
		||||
      /* For each torrent in this bucket.. */
 | 
			
		||||
      for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
 | 
			
		||||
        /* Address torrents members */
 | 
			
		||||
        ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
 | 
			
		||||
        switch( peer_list->peer_count ) {
 | 
			
		||||
          case 2: count_two++; break;
 | 
			
		||||
          case 1: count_one++; break;
 | 
			
		||||
          case 0: break;
 | 
			
		||||
          default:
 | 
			
		||||
            count_peers += peer_list->peer_count;
 | 
			
		||||
            count_def   += 1 + ( peer_list->peer_count >> 8 );
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */
 | 
			
		||||
      mem = 3 * ( 4 + 1 + 1 + 2 ) + ( count_one + count_two ) * 19 + count_def * 20 +
 | 
			
		||||
            ( count_one + 2 * count_two + count_peers ) * 7;
 | 
			
		||||
 | 
			
		||||
      ptr = ptr_a = ptr_b = ptr_c = malloc( mem );
 | 
			
		||||
      if( !ptr ) goto unlock_continue;
 | 
			
		||||
 | 
			
		||||
      if( count_one > 8 ) {
 | 
			
		||||
        mem_a = 4 + 1 + 1 + 2 + count_one * ( 19 + 7 );
 | 
			
		||||
        ptr_b += mem_a; ptr_c += mem_a;
 | 
			
		||||
        memcpy( ptr_a, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */
 | 
			
		||||
        ptr_a[4] = 1;                                       /* Offset 4: packet type 1 */
 | 
			
		||||
        ptr_a[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS;   /* Offset 5: the shared prefix */
 | 
			
		||||
        ptr_a[6] = count_one >> 8;
 | 
			
		||||
        ptr_a[7] = count_one & 255;
 | 
			
		||||
        ptr_a += 8;
 | 
			
		||||
      } else {
 | 
			
		||||
        count_def   += count_one;
 | 
			
		||||
        count_peers += count_one;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if( count_two > 8 ) {
 | 
			
		||||
        mem_b = 4 + 1 + 1 + 2 + count_two * ( 19 + 14 );
 | 
			
		||||
        ptr_c += mem_b;
 | 
			
		||||
        memcpy( ptr_b, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */
 | 
			
		||||
        ptr_b[4] = 2;                                       /* Offset 4: packet type 2 */
 | 
			
		||||
        ptr_b[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS;   /* Offset 5: the shared prefix */
 | 
			
		||||
        ptr_b[6] = count_two >> 8;
 | 
			
		||||
        ptr_b[7] = count_two & 255;
 | 
			
		||||
        ptr_b += 8;
 | 
			
		||||
      } else {
 | 
			
		||||
        count_def   += count_two;
 | 
			
		||||
        count_peers += 2 * count_two;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if( count_def ) {
 | 
			
		||||
        memcpy( ptr_c, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */
 | 
			
		||||
        ptr_c[4] = 0;                                       /* Offset 4: packet type 0 */
 | 
			
		||||
        ptr_c[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS;   /* Offset 5: the shared prefix */
 | 
			
		||||
        ptr_c[6] = count_def >> 8;
 | 
			
		||||
        ptr_c[7] = count_def & 255;
 | 
			
		||||
        ptr_c += 8;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      /* For each torrent in this bucket.. */
 | 
			
		||||
      for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
 | 
			
		||||
        /* Address torrents members */
 | 
			
		||||
        ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset;
 | 
			
		||||
        ot_peerlist *peer_list = torrent->peer_list;
 | 
			
		||||
        ot_peer *peers = (ot_peer*)(peer_list->peers.data);
 | 
			
		||||
        uint8_t **dst;
 | 
			
		||||
        int multi = 0;
 | 
			
		||||
        switch( peer_list->peer_count ) {
 | 
			
		||||
          case 0:  continue;
 | 
			
		||||
          case 1:  dst = mem_a ? &ptr_a : &ptr_c; break;
 | 
			
		||||
          case 2:  dst = mem_b ? &ptr_b : &ptr_c; break;
 | 
			
		||||
          default: dst = &ptr_c; multi = 1; break;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        do {
 | 
			
		||||
          size_t i, pc = peer_list->peer_count;
 | 
			
		||||
          if( pc > 255 ) pc = 255;
 | 
			
		||||
          memcpy( *dst, torrent->hash + 1, sizeof( ot_hash ) - 1);
 | 
			
		||||
          *dst += sizeof( ot_hash ) - 1;
 | 
			
		||||
          if( multi ) *(*dst)++ = pc;
 | 
			
		||||
          for( i=0; i < pc; ++i ) {
 | 
			
		||||
            memcpy( *dst, peers++, OT_IP_SIZE + 3 );
 | 
			
		||||
            *dst += OT_IP_SIZE + 3;
 | 
			
		||||
          }
 | 
			
		||||
          peer_list->peer_count -= pc; 
 | 
			
		||||
        } while( peer_list->peer_count );
 | 
			
		||||
        free_peerlist(peer_list);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      free( torrents_list->data );
 | 
			
		||||
      memset( torrents_list, 0, sizeof(*torrents_list ) );
 | 
			
		||||
unlock_continue:
 | 
			
		||||
      mutex_bucket_unlock( bucket, 0 );
 | 
			
		||||
 | 
			
		||||
      if( ptr ) {
 | 
			
		||||
        int i;
 | 
			
		||||
 | 
			
		||||
        if( ptr_b > ptr_c ) ptr_c = ptr_b;
 | 
			
		||||
        if( ptr_a > ptr_c ) ptr_c = ptr_a;
 | 
			
		||||
        mem = ptr_c - ptr;
 | 
			
		||||
 | 
			
		||||
        for( i=0; i<g_connection_count; ++i ) {
 | 
			
		||||
          if( g_connections[i].fd != -1 ) {
 | 
			
		||||
            void *tmp = malloc( mem );
 | 
			
		||||
            if( tmp )
 | 
			
		||||
              if( !iob_addbuf_free( &g_connections[i].outdata, tmp, mem ) )
 | 
			
		||||
                free( tmp );
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        free( ptr );
 | 
			
		||||
      }
 | 
			
		||||
      usleep( OT_SYNC_SLEEP );
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void * livesync_worker( void * args ) {
 | 
			
		||||
  (void)args;
 | 
			
		||||
  while( 1 ) {
 | 
			
		||||
    ot_ip6 in_ip; uint16_t in_port;
 | 
			
		||||
    size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port);
 | 
			
		||||
@ -223,7 +613,6 @@ int main( int argc, char **argv ) {
 | 
			
		||||
      /* drop packet coming from ourselves */
 | 
			
		||||
      continue;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) {
 | 
			
		||||
    case OT_SYNC_PEER:
 | 
			
		||||
      livesync_handle_peersync( datalen );
 | 
			
		||||
@ -232,37 +621,6 @@ int main( int argc, char **argv ) {
 | 
			
		||||
      fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) );
 | 
			
		||||
      break;
 | 
			
		||||
    }
 | 
			
		||||
    if( time(NULL) > next_dump ) {
 | 
			
		||||
      int bucket, i;
 | 
			
		||||
      /* For each bucket... */
 | 
			
		||||
      for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
 | 
			
		||||
        /* Get exclusive access to that bucket */
 | 
			
		||||
        ot_vector *torrents_list = mutex_bucket_lock( bucket );
 | 
			
		||||
        size_t tor_offset;
 | 
			
		||||
 | 
			
		||||
        /* For each torrent in this bucket.. */
 | 
			
		||||
        for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
 | 
			
		||||
          /* Address torrents members */
 | 
			
		||||
          ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
 | 
			
		||||
#ifdef WANT_SCROOOOOOOLL
 | 
			
		||||
          ot_hash     *hash      =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash;
 | 
			
		||||
          char hash_out[41];
 | 
			
		||||
          to_hex(hash_out,*hash);
 | 
			
		||||
          printf( "%s %08zd\n", hash_out, peer_list->peer_count );
 | 
			
		||||
#endif
 | 
			
		||||
          if(peer_list->peer_count<1024) peer_counts[peer_list->peer_count]++; else peer_counts[1023]++;
 | 
			
		||||
          free_peerlist(peer_list);
 | 
			
		||||
        }
 | 
			
		||||
        free( torrents_list->data );
 | 
			
		||||
        memset( torrents_list, 0, sizeof(*torrents_list ) );
 | 
			
		||||
      }
 | 
			
		||||
      for( i=1023; i>=0; --i )
 | 
			
		||||
        if( peer_counts[i] ) {
 | 
			
		||||
          printf( "%d:%d ", i, peer_counts[i] );
 | 
			
		||||
          peer_counts[i] = 0;
 | 
			
		||||
        }
 | 
			
		||||
      printf( "\n" );
 | 
			
		||||
      next_dump = time(NULL) + 1;
 | 
			
		||||
    } 
 | 
			
		||||
  }
 | 
			
		||||
  return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user