Add support for zstd

master
Dirk Engling 8 months ago
parent 160ba08074
commit 33bd2c9094

@ -27,6 +27,11 @@ STRIP?=strip
#FEATURES+=-DWANT_IP_FROM_QUERY_STRING #FEATURES+=-DWANT_IP_FROM_QUERY_STRING
FEATURES+=-DWANT_COMPRESSION_GZIP FEATURES+=-DWANT_COMPRESSION_GZIP
FEATURES+=-DWANT_COMPRESSION_GZIP_ALWAYS FEATURES+=-DWANT_COMPRESSION_GZIP_ALWAYS
#FEATURES+=-DWANT_COMPRESSION_ZSTD
#FEATURES+=-DWANT_COMPRESSION_ZSTD_ALWAYS
#LDFLAGS+=-lzstd
#FEATURES+=-DWANT_LOG_NETWORKS #FEATURES+=-DWANT_LOG_NETWORKS
#FEATURES+=-DWANT_RESTRICT_STATS #FEATURES+=-DWANT_RESTRICT_STATS
#FEATURES+=-DWANT_IP_FROM_PROXY #FEATURES+=-DWANT_IP_FROM_PROXY

@ -14,6 +14,10 @@
#ifdef WANT_COMPRESSION_GZIP #ifdef WANT_COMPRESSION_GZIP
#include <zlib.h> #include <zlib.h>
#endif #endif
#ifdef WANT_COMPRESSION_ZSTD
#include <zstd.h>
#endif
/* Libowfat */ /* Libowfat */
#include "byte.h" #include "byte.h"
@ -40,6 +44,9 @@ static void fullscrape_make(int taskid, ot_tasktype mode);
#ifdef WANT_COMPRESSION_GZIP #ifdef WANT_COMPRESSION_GZIP
static void fullscrape_make_gzip(int taskid, ot_tasktype mode); static void fullscrape_make_gzip(int taskid, ot_tasktype mode);
#endif #endif
#ifdef WANT_COMPRESSION_ZSTD
static void fullscrape_make_zstd(int taskid, ot_tasktype mode);
#endif
/* Converter function from memory to human readable hex strings /* Converter function from memory to human readable hex strings
XXX - Duplicated from ot_stats. Needs fix. */ XXX - Duplicated from ot_stats. Needs fix. */
@ -64,6 +71,11 @@ static void *fullscrape_worker(void *args) {
while (g_opentracker_running) { while (g_opentracker_running) {
ot_tasktype tasktype = TASK_FULLSCRAPE; ot_tasktype tasktype = TASK_FULLSCRAPE;
ot_taskid taskid = mutex_workqueue_poptask(&tasktype); ot_taskid taskid = mutex_workqueue_poptask(&tasktype);
#ifdef WANT_COMPRESSION_ZSTD
if (tasktype & TASK_FLAG_ZSTD)
fullscrape_make_zstd(taskid, tasktype);
else
#endif
#ifdef WANT_COMPRESSION_GZIP #ifdef WANT_COMPRESSION_GZIP
if (tasktype & TASK_FLAG_GZIP) if (tasktype & TASK_FLAG_GZIP)
fullscrape_make_gzip(taskid, tasktype); fullscrape_make_gzip(taskid, tasktype);
@ -205,7 +217,6 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
struct iovec iovector = {NULL, 0}; struct iovec iovector = {NULL, 0};
int zres; int zres;
z_stream strm; z_stream strm;
fprintf(stderr, "GZIP path\n");
/* Setup return vector... */ /* Setup return vector... */
iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
if (!iovector.iov_base) if (!iovector.iov_base)
@ -267,9 +278,11 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
mutex_bucket_unlock(bucket, 0); mutex_bucket_unlock(bucket, 0);
/* Parent thread died? */ /* Parent thread died? */
if (!g_opentracker_running) if (!g_opentracker_running) {
deflateEnd(&strm);
return; return;
} }
}
if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
strm.next_in = (uint8_t *)"ee"; strm.next_in = (uint8_t *)"ee";
@ -282,7 +295,8 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
if (mutex_workqueue_pushchunked(taskid, &iovector)) { if (mutex_workqueue_pushchunked(taskid, &iovector)) {
free(iovector.iov_base); free(iovector.iov_base);
return mutex_bucket_unlock(bucket, 0); deflateEnd(&strm);
return;
} }
/* Check if there's a last batch of data in the zlib buffer */ /* Check if there's a last batch of data in the zlib buffer */
@ -293,7 +307,7 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
if (!iovector.iov_base) { if (!iovector.iov_base) {
fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
deflateEnd(&strm); deflateEnd(&strm);
return mutex_bucket_unlock(bucket, 0); return;
} }
strm.next_out = iovector.iov_base; strm.next_out = iovector.iov_base;
strm.avail_out = OT_SCRAPE_CHUNK_SIZE; strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
@ -311,5 +325,133 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
/* WANT_COMPRESSION_GZIP */ /* WANT_COMPRESSION_GZIP */
#endif #endif
#ifdef WANT_COMPRESSION_ZSTD
static void fullscrape_make_zstd(int taskid, ot_tasktype mode) {
int bucket;
char *r;
struct iovec iovector = {NULL, 0};
ZSTD_CCtx *zstream = ZSTD_createCCtx();
ZSTD_inBuffer inbuf;
ZSTD_outBuffer outbuf;
size_t more_bytes;
if (!zstream)
return;
/* Setup return vector... */
iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
if (!iovector.iov_base) {
ZSTD_freeCCtx(zstream);
return;
}
/* Working with a compression level 6 is half as fast as level 3, but
seems to be the last reasonable bump that's worth extra cpu */
ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6);
outbuf.dst = iovector.iov_base;
outbuf.size = OT_SCRAPE_CHUNK_SIZE;
outbuf.pos = 0;
if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
inbuf.src = (const void *)"d5:filesd";
inbuf.size = strlen("d5:filesd");
inbuf.pos = 0;
ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
}
/* For each bucket... */
for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
/* Get exclusive access to that bucket */
ot_vector *torrents_list = mutex_bucket_lock(bucket);
ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
size_t i;
/* For each torrent in this bucket.. */
for (i = 0; i < torrents_list->size; ++i) {
char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash);
inbuf.src = compress_buffer;
inbuf.size = r - compress_buffer;
inbuf.pos = 0;
ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
/* Check if there still is enough buffer left */
while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) {
iovector.iov_len = outbuf.size;
if (mutex_workqueue_pushchunked(taskid, &iovector)) {
free(iovector.iov_base);
ZSTD_freeCCtx(zstream);
return mutex_bucket_unlock(bucket, 0);
}
/* Allocate a fresh output buffer */
iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
if (!iovector.iov_base) {
fprintf(stderr, "Out of memory trying to claim ouput buffer\n");
ZSTD_freeCCtx(zstream);
return mutex_bucket_unlock(bucket, 0);
}
outbuf.dst = iovector.iov_base;
outbuf.size = OT_SCRAPE_CHUNK_SIZE;
outbuf.pos = 0;
ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
}
}
/* All torrents done: release lock on current bucket */
mutex_bucket_unlock(bucket, 0);
/* Parent thread died? */
if (!g_opentracker_running)
return;
}
if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
inbuf.src = (const void *)"ee";
inbuf.size = strlen("ee");
inbuf.pos = 0;
}
more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
iovector.iov_len = outbuf.pos;
if (mutex_workqueue_pushchunked(taskid, &iovector)) {
free(iovector.iov_base);
ZSTD_freeCCtx(zstream);
return;
}
/* Check if there's a last batch of data in the zlib buffer */
if (more_bytes) {
/* Allocate a fresh output buffer */
iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
if (!iovector.iov_base) {
fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
ZSTD_freeCCtx(zstream);
return;
}
outbuf.dst = iovector.iov_base;
outbuf.size = OT_SCRAPE_CHUNK_SIZE;
outbuf.pos = 0;
ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
/* Only pass the new buffer if there actually was some data left in the buffer */
iovector.iov_len = outbuf.pos;
if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector))
free(iovector.iov_base);
}
ZSTD_freeCCtx(zstream);
}
/* WANT_COMPRESSION_ZSTD */
#endif
/* WANT_FULLSCRAPE */ /* WANT_FULLSCRAPE */
#endif #endif

@ -159,7 +159,9 @@ ssize_t http_sendiovecdata(const int64 sock, struct ot_workstruct *ws, int iovec
if (iovec_entries) { if (iovec_entries) {
if (cookie->flag & STRUCT_HTTP_FLAG_GZIP) if (cookie->flag & STRUCT_HTTP_FLAG_ZSTD)
encoding = "Content-Encoding: zstd\r\n";
else if (cookie->flag & STRUCT_HTTP_FLAG_GZIP)
encoding = "Content-Encoding: gzip\r\n"; encoding = "Content-Encoding: gzip\r\n";
else if (cookie->flag & STRUCT_HTTP_FLAG_BZIP2) else if (cookie->flag & STRUCT_HTTP_FLAG_BZIP2)
encoding = "Content-Encoding: bzip2\r\n"; encoding = "Content-Encoding: bzip2\r\n";
@ -369,18 +371,33 @@ static ssize_t http_handle_fullscrape(const int64 sock, struct ot_workstruct *ws
} }
#endif #endif
#ifdef WANT_COMPRESSION_GZIP
#if defined(WANT_COMPRESSION_GZIP) || defined(WANT_COMPRESSION_ZSTD)
ws->request[ws->request_size - 1] = 0; ws->request[ws->request_size - 1] = 0;
#ifndef WANT_COMPRESSION_GZIP_ALWAYS #ifdef WANT_COMPRESSION_GZIP
if (strstr(ws->request, "gzip")) { if (strstr(ws->request, "gzip")) {
cookie->flag |= STRUCT_HTTP_FLAG_GZIP;
format |= TASK_FLAG_GZIP;
}
#endif
#ifdef WANT_COMPRESSION_ZSTD
if (strstr(ws->request, "zstd")) {
cookie->flag |= STRUCT_HTTP_FLAG_ZSTD;
format |= TASK_FLAG_ZSTD;
}
#endif
#if defined(WANT_COMPRESSION_ZSTD) && defined(WANT_COMPRESSION_ZSTD_ALWAYS)
cookie->flag |= STRUCT_HTTP_FLAG_ZSTD;
format |= TASK_FLAG_ZSTD;
#endif #endif
#if defined(WANT_COMPRESSION_GZIP) && defined(WANT_COMPRESSION_GZIP_ALWAYS)
cookie->flag |= STRUCT_HTTP_FLAG_GZIP; cookie->flag |= STRUCT_HTTP_FLAG_GZIP;
format = TASK_FLAG_GZIP; format |= TASK_FLAG_GZIP;
stats_issue_event(EVENT_FULLSCRAPE_REQUEST_GZIP, 0, (uintptr_t)cookie->ip);
#ifndef WANT_COMPRESSION_GZIP_ALWAYS
} else
#endif #endif
#endif #endif
stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip); stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip);
#ifdef _DEBUG_HTTPERROR #ifdef _DEBUG_HTTPERROR

@ -10,8 +10,9 @@ typedef enum {
STRUCT_HTTP_FLAG_WAITINGFORTASK = 1, STRUCT_HTTP_FLAG_WAITINGFORTASK = 1,
STRUCT_HTTP_FLAG_GZIP = 2, STRUCT_HTTP_FLAG_GZIP = 2,
STRUCT_HTTP_FLAG_BZIP2 = 4, STRUCT_HTTP_FLAG_BZIP2 = 4,
STRUCT_HTTP_FLAG_CHUNKED = 8, STRUCT_HTTP_FLAG_ZSTD = 8,
STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 16 STRUCT_HTTP_FLAG_CHUNKED = 16,
STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 32
} STRUCT_HTTP_FLAG; } STRUCT_HTTP_FLAG;
struct http_data { struct http_data {

@ -59,7 +59,8 @@ typedef enum {
TASK_FLAG_GZIP = 0x1000, TASK_FLAG_GZIP = 0x1000,
TASK_FLAG_BZIP2 = 0x2000, TASK_FLAG_BZIP2 = 0x2000,
TASK_FLAG_CHUNKED = 0x4000, TASK_FLAG_ZSTD = 0x4000,
TASK_FLAG_CHUNKED = 0x8000,
TASK_TASK_MASK = 0x0fff, TASK_TASK_MASK = 0x0fff,
TASK_CLASS_MASK = 0x0f00, TASK_CLASS_MASK = 0x0f00,

@ -19,6 +19,7 @@ typedef enum {
EVENT_SCRAPE, EVENT_SCRAPE,
EVENT_FULLSCRAPE_REQUEST, EVENT_FULLSCRAPE_REQUEST,
EVENT_FULLSCRAPE_REQUEST_GZIP, EVENT_FULLSCRAPE_REQUEST_GZIP,
EVENT_FULLSCRAPE_REQUEST_ZSTD,
EVENT_FULLSCRAPE, /* TCP only */ EVENT_FULLSCRAPE, /* TCP only */
EVENT_FAILED, EVENT_FAILED,
EVENT_BUCKET_LOCKED, EVENT_BUCKET_LOCKED,

Loading…
Cancel
Save