Include zstd compression support.

Based on a patch by Sebastian A. Siewior. Fixes bug #14338.
This commit is contained in:
Wayne Davison
2020-05-25 13:31:30 -07:00
parent abef92c037
commit 4aaadc2f29
3 changed files with 258 additions and 1 deletions

View File

@@ -90,6 +90,10 @@ struct name_num_obj valid_compressions = {
"compress", NULL, NULL, 0, 0, {
{ CPRES_ZLIBX, "zlibx", NULL },
{ CPRES_ZLIB, "zlib", NULL },
#ifdef SUPPORT_ZSTD
/* TODO decide where in the default preference order this should go. */
{ CPRES_ZSTD, "zstd", NULL },
#endif
{ CPRES_NONE, "none", NULL },
{ 0, NULL, NULL }
}

View File

@@ -381,7 +381,7 @@ AC_CHECK_HEADERS(sys/fcntl.h sys/select.h fcntl.h sys/time.h sys/unistd.h \
netdb.h malloc.h float.h limits.h iconv.h libcharset.h langinfo.h \
sys/acl.h acl/libacl.h attr/xattr.h sys/xattr.h sys/extattr.h \
popt.h popt/popt.h linux/falloc.h netinet/in_systm.h netinet/ip.h \
zlib.h xxhash.h openssl/md4.h openssl/md5.h)
zlib.h xxhash.h openssl/md4.h openssl/md5.h zstd.h)
AC_HEADER_MAJOR_FIXED
AC_MSG_CHECKING([whether to enable use of openssl crypto library])
@@ -408,6 +408,18 @@ else
AC_MSG_RESULT(no)
fi
AC_MSG_CHECKING([whether to enable zstd compression])
AC_ARG_ENABLE([zstd],
AC_HELP_STRING([--disable-zstd], [disable zstd compression]))
AH_TEMPLATE([SUPPORT_ZSTD],
[Undefine if you do not want zstd compression. By default this is defined.])
if test x"$enable_zstd" != x"no" && test x"$ac_cv_header_zstd_h" = x"yes"; then
AC_MSG_RESULT(yes)
AC_SEARCH_LIBS(ZSTD_minCLevel, zstd, [AC_DEFINE(SUPPORT_ZSTD)])
else
AC_MSG_RESULT(no)
fi
AC_CACHE_CHECK([if makedev takes 3 args],rsync_cv_MAKEDEV_TAKES_3_ARGS,[
AC_RUN_IFELSE([AC_LANG_SOURCE([[
#include <sys/types.h>

241
token.c
View File

@@ -22,6 +22,9 @@
#include "rsync.h"
#include "itypes.h"
#include <zlib.h>
#ifdef SUPPORT_ZSTD
#include <zstd.h>
#endif
extern int do_compression;
extern int protocol_version;
@@ -58,6 +61,14 @@ void init_compression_level(void)
if (do_compression_level == Z_DEFAULT_COMPRESSION)
do_compression_level = def_level;
break;
#ifdef SUPPORT_ZSTD
case CPRES_ZSTD:
min_level = ZSTD_minCLevel();
max_level = ZSTD_maxCLevel();
def_level = 3;
off_level = CLVL_NOT_SPECIFIED;
break;
#endif
default: /* paranoia to prevent missing case values */
exit_cleanup(RERR_UNSUPPORTED);
}
@@ -648,6 +659,228 @@ static void see_deflate_token(char *buf, int32 len)
} while (len || rx_strm.avail_out == 0);
}
#ifdef SUPPORT_ZSTD
static ZSTD_inBuffer zstd_in_buff;
static ZSTD_outBuffer zstd_out_buff;
static ZSTD_CCtx *zstd_cctx;
static void send_zstd_token(int f, int32 token, struct map_struct *buf,
OFF_T offset, int32 nb)
{
static int comp_init_done, flush_pending;
ZSTD_EndDirective flush = ZSTD_e_continue;
int32 n, r;
/* initialization */
if (!comp_init_done) {
zstd_cctx = ZSTD_createCCtx();
if (!zstd_cctx) {
rprintf(FERROR, "compression init failed\n");
exit_cleanup(RERR_PROTOCOL);
}
obuf = new_array(char, MAX_DATA_COUNT + 2);
if (!obuf)
out_of_memory("send_deflated_token");
ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_compressionLevel,
do_compression_level);
zstd_out_buff.dst = obuf + 2;
comp_init_done = 1;
}
if (last_token == -1) {
last_run_end = 0;
run_start = token;
flush_pending = 0;
} else if (last_token == -2) {
run_start = token;
} else if (nb != 0 || token != last_token + 1
|| token >= run_start + 65536) {
/* output previous run */
r = run_start - last_run_end;
n = last_token - run_start;
if (r >= 0 && r <= 63) {
write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
} else {
write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
write_int(f, run_start);
}
if (n != 0) {
write_byte(f, n);
write_byte(f, n >> 8);
}
last_run_end = last_token;
run_start = token;
}
last_token = token;
if (nb || flush_pending) {
zstd_in_buff.src = map_ptr(buf, offset, nb);
zstd_in_buff.size = nb;
zstd_in_buff.pos = 0;
do {
if (zstd_out_buff.size == 0) {
zstd_out_buff.size = MAX_DATA_COUNT;
zstd_out_buff.pos = 0;
}
/* File ended, flush */
if (token != -2)
flush = ZSTD_e_flush;
r = ZSTD_compressStream2(zstd_cctx, &zstd_out_buff, &zstd_in_buff, flush);
if (ZSTD_isError(r)) {
rprintf(FERROR, "ZSTD_compressStream returned %d\n", r);
exit_cleanup(RERR_STREAMIO);
}
/*
* Nothing is sent if the buffer isn't full so avoid smaller
* transfers. If a file is finished then we flush the internal
* state and send a smaller buffer so that the remote side can
* finish the file.
*/
if (zstd_out_buff.pos == zstd_out_buff.size || flush == ZSTD_e_flush) {
n = zstd_out_buff.pos;
obuf[0] = DEFLATED_DATA + (n >> 8);
obuf[1] = n;
write_buf(f, obuf, n+2);
zstd_out_buff.size = 0;
}
/*
* Loop while the input buffer isn't full consumed or the
* internal state isn't fully flushed.
*/
} while (zstd_in_buff.pos < zstd_in_buff.size || r > 0);
flush_pending = token == -2;
}
if (token == -1) {
/* end of file - clean up */
write_byte(f, END_FLAG);
}
}
static ZSTD_DCtx *zstd_dctx;
static int32 recv_zstd_token(int f, char **data)
{
static int decomp_init_done;
static int out_buffer_size;
int32 n, flag;
int r;
if (!decomp_init_done) {
zstd_dctx = ZSTD_createDCtx();
if (!zstd_dctx) {
rprintf(FERROR, "ZSTD_createDStream failed\n");
exit_cleanup(RERR_PROTOCOL);
}
/* Output buffer fits two decompressed blocks */
out_buffer_size = ZSTD_DStreamOutSize() * 2;
cbuf = new_array(char, MAX_DATA_COUNT);
dbuf = new_array(char, out_buffer_size);
if (!cbuf || !dbuf)
out_of_memory("recv_zstd_token");
zstd_in_buff.src = cbuf;
zstd_out_buff.dst = dbuf;
decomp_init_done = 1;
}
do {
switch (recv_state) {
case r_init:
recv_state = r_idle;
rx_token = 0;
break;
case r_idle:
flag = read_byte(f);
if ((flag & 0xC0) == DEFLATED_DATA) {
n = ((flag & 0x3f) << 8) + read_byte(f);
read_buf(f, cbuf, n);
zstd_in_buff.size = n;
zstd_in_buff.pos = 0;
recv_state = r_inflating;
} else if (flag == END_FLAG) {
/* that's all folks */
recv_state = r_init;
return 0;
} else {
/* here we have a token of some kind */
if (flag & TOKEN_REL) {
rx_token += flag & 0x3f;
flag >>= 6;
} else
rx_token = read_int(f);
if (flag & 1) {
rx_run = read_byte(f);
rx_run += read_byte(f) << 8;
recv_state = r_running;
}
return -1 - rx_token;
}
break;
case r_inflating:
zstd_out_buff.size = out_buffer_size;
zstd_out_buff.pos = 0;
r = ZSTD_decompressStream(zstd_dctx, &zstd_out_buff, &zstd_in_buff);
n = zstd_out_buff.pos;
if (ZSTD_isError(r)) {
rprintf(FERROR, "ZSTD decomp returned %d (%d bytes)\n", r, n);
exit_cleanup(RERR_STREAMIO);
}
/*
* If the input buffer is fully consumed and the output
* buffer is not full then next step is to read more
* data.
*/
if (zstd_in_buff.size == zstd_in_buff.pos && n < out_buffer_size)
recv_state = r_idle;
if (n != 0) {
*data = dbuf;
return n;
}
break;
case r_running:
++rx_token;
if (--rx_run == 0)
recv_state = r_idle;
return -1 - rx_token;
break;
case r_inflated:
break;
}
} while (1);
}
#endif
/**
* Transmit a verbatim buffer of length @p n followed by a token.
* If token == -1 then we have reached EOF
@@ -658,6 +891,10 @@ void send_token(int f, int32 token, struct map_struct *buf, OFF_T offset,
{
if (!do_compression)
simple_send_token(f, token, buf, offset, n);
#ifdef SUPPORT_ZSTD
else if (do_compression == CPRES_ZSTD)
send_zstd_token(f, token, buf, offset, n);
#endif
else
send_deflated_token(f, token, buf, offset, n, toklen);
}
@@ -674,6 +911,10 @@ int32 recv_token(int f, char **data)
if (!do_compression)
tok = simple_recv_token(f,data);
#ifdef SUPPORT_ZSTD
else if (do_compression == CPRES_ZSTD)
tok = recv_zstd_token(f, data);
#endif
else /* CPRES_ZLIB & CPRES_ZLIBX */
tok = recv_deflated_token(f, data);
return tok;