Merge pull request #17144 from fibersel/issue-16791

introduce zstd compression (for data import/export)
This commit is contained in:
alexey-milovidov 2020-12-02 22:41:07 +03:00 committed by GitHub
commit 85f4045160
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 360 additions and 3 deletions

View File

@ -8,6 +8,7 @@ RUN apt-get update -y \
apt-get install --yes --no-install-recommends \
brotli \
expect \
zstd \
lsof \
ncdu \
netcat-openbsd \

View File

@ -8,6 +8,7 @@ RUN apt-get --allow-unauthenticated update -y \
apt-get --allow-unauthenticated install --yes --no-install-recommends \
alien \
brotli \
zstd \
cmake \
devscripts \
expect \

View File

@ -24,6 +24,7 @@ RUN apt-get update -y \
tree \
moreutils \
brotli \
zstd \
gdb \
lsof \
unixodbc \

View File

@ -329,6 +329,8 @@ dbms_target_include_directories(SYSTEM BEFORE PUBLIC ${MINISELECT_INCLUDE_DIR})
if (ZSTD_LIBRARY)
dbms_target_link_libraries(PRIVATE ${ZSTD_LIBRARY})
target_link_libraries (clickhouse_common_io PUBLIC ${ZSTD_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${ZSTD_INCLUDE_DIR})
if (NOT USE_INTERNAL_ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
dbms_target_include_directories(SYSTEM BEFORE PRIVATE ${ZSTD_INCLUDE_DIR})
endif ()

View File

@ -526,6 +526,8 @@
M(557, UNKNOWN_UNION) \
M(558, EXPECTED_ALL_OR_DISTINCT) \
M(559, INVALID_GRPC_QUERY_INFO) \
M(560, ZSTD_ENCODER_FAILED) \
M(561, ZSTD_DECODER_FAILED) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -8,6 +8,8 @@
#include <IO/WriteBuffer.h>
#include <IO/ZlibDeflatingWriteBuffer.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <IO/ZstdDeflatingWriteBuffer.h>
#include <IO/ZstdInflatingReadBuffer.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
@ -34,6 +36,8 @@ std::string toContentEncodingName(CompressionMethod method)
return "br";
case CompressionMethod::Xz:
return "xz";
case CompressionMethod::Zstd:
return "zstd";
case CompressionMethod::None:
return "";
}
@ -61,11 +65,13 @@ CompressionMethod chooseCompressionMethod(const std::string & path, const std::s
return CompressionMethod::Brotli;
if (*method_str == "LZMA" || *method_str == "xz")
return CompressionMethod::Xz;
if (*method_str == "zstd" || *method_str == "zst")
return CompressionMethod::Zstd;
if (hint.empty() || hint == "auto" || hint == "none")
return CompressionMethod::None;
throw Exception(
"Unknown compression method " + hint + ". Only 'auto', 'none', 'gzip', 'br', 'xz' are supported as compression methods",
"Unknown compression method " + hint + ". Only 'auto', 'none', 'gzip', 'br', 'xz', 'zstd' are supported as compression methods",
ErrorCodes::NOT_IMPLEMENTED);
}
@ -81,6 +87,8 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
#endif
if (method == CompressionMethod::Xz)
return std::make_unique<LZMAInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
if (method == CompressionMethod::Zstd)
return std::make_unique<ZstdInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
if (method == CompressionMethod::None)
return nested;
@ -102,6 +110,9 @@ std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
if (method == CompressionMethod::Xz)
return std::make_unique<LZMADeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
if (method == CompressionMethod::Zstd)
return std::make_unique<ZstdDeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
if (method == CompressionMethod::None)
return nested;

View File

@ -28,6 +28,9 @@ enum class CompressionMethod
/// LZMA2-based content compression
/// This option corresponds to HTTP Content-Encoding: xz
Xz,
/// Zstd compressor
/// This option corresponds to HTTP Content-Encoding: zstd
Zstd,
Brotli
};

View File

@ -0,0 +1,95 @@
#include <IO/ZstdDeflatingWriteBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ZSTD_ENCODER_FAILED;
}
ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment), out(std::move(out_))
{
cctx = ZSTD_createCCtx();
if (cctx == nullptr)
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder init failed: zstd version: {}", ZSTD_VERSION_STRING);
size_t ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, compression_level);
if (ZSTD_isError(ret))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder option setting failed: error code: {}; zstd version: {}", ret, ZSTD_VERSION_STRING);
ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1);
if (ZSTD_isError(ret))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder option setting failed: error code: {}; zstd version: {}", ret, ZSTD_VERSION_STRING);
input = {nullptr, 0, 0};
output = {nullptr, 0, 0};
}
ZstdDeflatingWriteBuffer::~ZstdDeflatingWriteBuffer()
{
try
{
finish();
ZSTD_freeCCtx(cctx);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void ZstdDeflatingWriteBuffer::nextImpl()
{
if (!offset())
return;
ZSTD_EndDirective mode = ZSTD_e_flush;
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
bool finished = false;
do
{
out->nextIfAtEnd();
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
ZSTD_compressStream2(cctx, &output, &input, mode);
out->position() = out->buffer().begin() + output.pos;
finished = (input.pos == input.size);
} while (!finished);
}
void ZstdDeflatingWriteBuffer::finish()
{
if (flushed)
return;
next();
out->nextIfAtEnd();
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
size_t remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
if (ZSTD_isError(remaining))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder end failed: zstd version: {}", ZSTD_VERSION_STRING);
out->position() = out->buffer().begin() + output.pos;
flushed = true;
}
}

View File

@ -0,0 +1,40 @@
#pragma once
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
#include <zstd.h>
namespace DB
{
/// Performs compression using zstd library and writes compressed data to out_ WriteBuffer.
class ZstdDeflatingWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
public:
ZstdDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
/// Flush all pending data and write zstd footer to the underlying buffer.
/// After the first call to this function, subsequent calls will have no effect and
/// an attempt to write to this buffer will result in exception.
void finish();
~ZstdDeflatingWriteBuffer() override;
private:
void nextImpl() override;
std::unique_ptr<WriteBuffer> out;
ZSTD_CCtx * cctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
bool flushed = false;
};
}

View File

@ -0,0 +1,63 @@
#include <IO/ZstdInflatingReadBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ZSTD_DECODER_FAILED;
}
ZstdInflatingReadBuffer::ZstdInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment), in(std::move(in_))
{
dctx = ZSTD_createDCtx();
input = {nullptr, 0, 0};
output = {nullptr, 0, 0};
if (dctx == nullptr)
{
throw Exception(ErrorCodes::ZSTD_DECODER_FAILED, "zstd_stream_decoder init failed: zstd version: {}", ZSTD_VERSION_STRING);
}
}
ZstdInflatingReadBuffer::~ZstdInflatingReadBuffer()
{
ZSTD_freeDCtx(dctx);
}
bool ZstdInflatingReadBuffer::nextImpl()
{
if (eof)
return false;
if (input.pos >= input.size)
{
in->nextIfAtEnd();
input.src = reinterpret_cast<unsigned char *>(in->position());
input.pos = 0;
input.size = in->buffer().end() - in->position();
}
output.dst = reinterpret_cast<unsigned char *>(internal_buffer.begin());
output.size = internal_buffer.size();
output.pos = 0;
size_t ret = ZSTD_decompressStream(dctx, &output, &input);
if (ZSTD_isError(ret))
throw Exception(
ErrorCodes::ZSTD_DECODER_FAILED, "Zstd stream decoding failed: error code: {}; zstd version: {}", ret, ZSTD_VERSION_STRING);
in->position() = in->buffer().begin() + input.pos;
working_buffer.resize(output.pos);
if (in->eof())
{
eof = true;
return working_buffer.size() != 0;
}
return true;
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
#include <zstd.h>
namespace DB
{
namespace ErrorCodes
{
}
class ZstdInflatingReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{
public:
ZstdInflatingReadBuffer(
std::unique_ptr<ReadBuffer> in_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
~ZstdInflatingReadBuffer() override;
private:
bool nextImpl() override;
std::unique_ptr<ReadBuffer> in;
ZSTD_DCtx * dctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
bool eof = false;
};
}

View File

@ -82,3 +82,6 @@ target_link_libraries (zlib_ng_bug PRIVATE ${ZLIB_LIBRARIES})
add_executable (ryu_test ryu_test.cpp)
target_link_libraries (ryu_test PRIVATE ryu)
add_executable (zstd_buffers zstd_buffers.cpp)
target_link_libraries (zstd_buffers PRIVATE clickhouse_common_io)

View File

@ -0,0 +1,66 @@
#include <iomanip>
#include <iostream>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/ZstdDeflatingWriteBuffer.h>
#include <IO/ZstdInflatingReadBuffer.h>
#include <Common/Stopwatch.h>
int main(int, char **)
try
{
std::cout << std::fixed << std::setprecision(2);
size_t n = 10000000;
Stopwatch stopwatch;
{
auto buf
= std::make_unique<DB::WriteBufferFromFile>("test_zstd_buffers.zst", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
DB::ZstdDeflatingWriteBuffer zstd_buf(std::move(buf), /*compression level*/ 3);
stopwatch.restart();
for (size_t i = 0; i < n; ++i)
{
DB::writeIntText(i, zstd_buf);
DB::writeChar('\t', zstd_buf);
}
zstd_buf.finish();
stopwatch.stop();
std::cout << "Writing done. Elapsed: " << stopwatch.elapsedSeconds() << " s."
<< ", " << (zstd_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s" << std::endl;
}
{
auto buf = std::make_unique<DB::ReadBufferFromFile>("test_zstd_buffers.zst");
DB::ZstdInflatingReadBuffer zstd_buf(std::move(buf));
stopwatch.restart();
for (size_t i = 0; i < n; ++i)
{
size_t x;
DB::readIntText(x, zstd_buf);
zstd_buf.ignore();
if (x != i)
throw DB::Exception("Failed!, read: " + std::to_string(x) + ", expected: " + std::to_string(i), 0);
}
stopwatch.stop();
std::cout << "Reading done. Elapsed: " << stopwatch.elapsedSeconds() << " s."
<< ", " << (zstd_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s" << std::endl;
}
return 0;
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}

View File

@ -58,6 +58,8 @@ SRCS(
WriteHelpers.cpp
ZlibDeflatingWriteBuffer.cpp
ZlibInflatingReadBuffer.cpp
ZstdDeflatingWriteBuffer.cpp
ZstdInflatingReadBuffer.cpp
copyData.cpp
createReadBufferFromFileBase.cpp
createWriteBufferFromFileBase.cpp

View File

@ -353,6 +353,8 @@ void HTTPHandler::processQuery(
http_response_compression_method = CompressionMethod::Zlib;
else if (std::string::npos != http_response_compression_methods.find("xz"))
http_response_compression_method = CompressionMethod::Xz;
else if (std::string::npos != http_response_compression_methods.find("zstd"))
http_response_compression_method = CompressionMethod::Zstd;
}
bool client_supports_http_compression = http_response_compression_method != CompressionMethod::None;

View File

@ -68,15 +68,28 @@
7
8
9
0
1
2
3
4
5
6
7
8
9
< Content-Encoding: gzip
< Content-Encoding: deflate
< Content-Encoding: gzip
< Content-Encoding: br
< Content-Encoding: xz
< Content-Encoding: zstd
1
1
1
1
1
Hello, world
Hello, world
Hello, world
Hello, world

View File

@ -10,6 +10,7 @@ ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: zip, eflate' -d 'SELECT number FROM system.numbers LIMIT 10';
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: br' -d 'SELECT number FROM system.numbers LIMIT 10' | brotli -d;
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: xz' -d 'SELECT number FROM system.numbers LIMIT 10' | xz -d;
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: zstd' -d 'SELECT number FROM system.numbers LIMIT 10' | zstd -d;
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
@ -18,16 +19,19 @@ ${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: zip, eflate' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: br' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: xz' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: zstd' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
echo "SELECT 1" | ${CLICKHOUSE_CURL} -sS --data-binary @- "${CLICKHOUSE_URL}";
echo "SELECT 1" | gzip -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: gzip' "${CLICKHOUSE_URL}";
echo "SELECT 1" | brotli | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: br' "${CLICKHOUSE_URL}";
echo "SELECT 1" | xz -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: xz' "${CLICKHOUSE_URL}";
echo "SELECT 1" | zstd -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: zstd' "${CLICKHOUSE_URL}";
echo "'Hello, world'" | ${CLICKHOUSE_CURL} -sS --data-binary @- "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | gzip -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: gzip' "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | brotli | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: br' "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | xz -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: xz' "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | zstd -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: zstd' "${CLICKHOUSE_URL}&query=SELECT";
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 0' | wc -c;

View File

@ -1,7 +1,9 @@
1000000 999999
1000000 999999
1000000 999999
3000000 999999
1000000 999999
4000000 999999
1 255
1 255
1 255
1 255

View File

@ -23,9 +23,18 @@ SELECT count(), max(x) FROM file;
DROP TABLE file;
SELECT count(), max(x) FROM file('data{1,2,3}.tsv.{gz,br,xz}', TSV, 'x UInt64');
CREATE TABLE file (x UInt64) ENGINE = File(TSV, 'data4.tsv.zst');
TRUNCATE TABLE file;
INSERT INTO file SELECT * FROM numbers(1000000);
SELECT count(), max(x) FROM file;
DROP TABLE file;
SELECT count(), max(x) FROM file('data{1,2,3,4}.tsv.{gz,br,xz,zst}', TSV, 'x UInt64');
-- check that they are compressed
SELECT count() < 1000000, max(x) FROM file('data1.tsv.br', RowBinary, 'x UInt8', 'none');
SELECT count() < 3000000, max(x) FROM file('data2.tsv.gz', RowBinary, 'x UInt8', 'none');
SELECT count() < 1000000, max(x) FROM file('data3.tsv.xz', RowBinary, 'x UInt8', 'none');
SELECT count() < 1000000, max(x) FROM file('data4.tsv.zst', RowBinary, 'x UInt8', 'none');