Revert "Revert "Add a setting to use more memory for zstd decompression""

This commit is contained in:
Alexey Milovidov 2022-06-18 15:55:35 +03:00 committed by GitHub
parent dd75e9cba5
commit 73709b0488
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 46 additions and 16 deletions

View File

@ -183,6 +183,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
\
M(Int64, network_zstd_compression_level, 1, "Allows you to select the level of ZSTD compression.", 0) \
\
M(Int64, zstd_window_log_max, 0, "Allows you to select the max window log of ZSTD (it will not be used for MergeTree family)", 0) \
\
M(UInt64, priority, 0, "Priority of the query. 1 - the highest, higher value - lower priority; 0 - do not use priorities.", 0) \
M(Int64, os_thread_priority, 0, "If non zero - set corresponding 'nice' value for query processing threads. Can be used to adjust query priority for OS scheduler.", 0) \
\

View File

@ -99,7 +99,7 @@ CompressionMethod chooseCompressionMethod(const std::string & path, const std::s
}
static std::unique_ptr<CompressedReadBufferWrapper> createCompressedWrapper(
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, size_t buf_size, char * existing_memory, size_t alignment)
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, size_t buf_size, char * existing_memory, size_t alignment, int zstd_window_log_max)
{
if (method == CompressionMethod::Gzip || method == CompressionMethod::Zlib)
return std::make_unique<ZlibInflatingReadBuffer>(std::move(nested), method, buf_size, existing_memory, alignment);
@ -110,7 +110,7 @@ static std::unique_ptr<CompressedReadBufferWrapper> createCompressedWrapper(
if (method == CompressionMethod::Xz)
return std::make_unique<LZMAInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
if (method == CompressionMethod::Zstd)
return std::make_unique<ZstdInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
return std::make_unique<ZstdInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment, zstd_window_log_max);
if (method == CompressionMethod::Lz4)
return std::make_unique<Lz4InflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
#if USE_BZIP2
@ -126,14 +126,13 @@ static std::unique_ptr<CompressedReadBufferWrapper> createCompressedWrapper(
}
std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, size_t buf_size, char * existing_memory, size_t alignment)
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, int zstd_window_log_max, size_t buf_size, char * existing_memory, size_t alignment)
{
if (method == CompressionMethod::None)
return nested;
return createCompressedWrapper(std::move(nested), method, buf_size, existing_memory, alignment);
return createCompressedWrapper(std::move(nested), method, buf_size, existing_memory, alignment, zstd_window_log_max);
}
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested, CompressionMethod method, int level, size_t buf_size, char * existing_memory, size_t alignment)
{

View File

@ -5,7 +5,6 @@
#include <Core/Defines.h>
namespace DB
{
class ReadBuffer;
@ -50,10 +49,12 @@ CompressionMethod chooseCompressionMethod(const std::string & path, const std::s
std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
std::unique_ptr<ReadBuffer> nested,
CompressionMethod method,
int zstd_window_log_max = 0,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested,
CompressionMethod method,

View File

@ -8,7 +8,7 @@ namespace ErrorCodes
extern const int ZSTD_DECODER_FAILED;
}
ZstdInflatingReadBuffer::ZstdInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment)
ZstdInflatingReadBuffer::ZstdInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment, int zstd_window_log_max)
: CompressedReadBufferWrapper(std::move(in_), buf_size, existing_memory, alignment)
{
dctx = ZSTD_createDCtx();
@ -19,6 +19,12 @@ ZstdInflatingReadBuffer::ZstdInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_
{
throw Exception(ErrorCodes::ZSTD_DECODER_FAILED, "zstd_stream_decoder init failed: zstd version: {}", ZSTD_VERSION_STRING);
}
size_t ret = ZSTD_DCtx_setParameter(dctx, ZSTD_d_windowLogMax, zstd_window_log_max);
if (ZSTD_isError(ret))
{
throw Exception(ErrorCodes::ZSTD_DECODER_FAILED, "zstd_stream_decoder init failed: {}", ZSTD_getErrorName(ret));
}
}
ZstdInflatingReadBuffer::~ZstdInflatingReadBuffer()

View File

@ -20,7 +20,8 @@ public:
std::unique_ptr<ReadBuffer> in_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
int zstd_window_log_max = 0);
~ZstdInflatingReadBuffer() override;

View File

@ -649,7 +649,7 @@ void HTTPHandler::processQuery(
/// Request body can be compressed using algorithm specified in the Content-Encoding header.
String http_request_compression_method_str = request.get("Content-Encoding", "");
auto in_post = wrapReadBufferWithCompressionMethod(
wrapReadBufferReference(request.getStream()), chooseCompressionMethod({}, http_request_compression_method_str));
wrapReadBufferReference(request.getStream()), chooseCompressionMethod({}, http_request_compression_method_str), context->getSettingsRef().zstd_window_log_max);
/// The data can also be compressed using incompatible internal algorithm. This is indicated by
/// 'decompress' query parameter.

View File

@ -197,8 +197,9 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
if (it == paths.end())
return nullptr;
auto compression = chooseCompressionMethod(*it, compression_method);
auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromHDFS>(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef()), compression);
std::make_unique<ReadBufferFromHDFS>(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef()), compression, zstd_window_log_max);
};
return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx);
}
@ -327,7 +328,8 @@ bool HDFSSource::initialize()
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_path);
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression);
const auto zstd_window_log_max = getContext()->getSettingsRef().zstd_window_log_max;
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression, zstd_window_log_max);
auto input_format = getContext()->getInputFormat(storage->format_name, *read_buf, block_for_format, max_block_size);

View File

@ -208,7 +208,8 @@ std::unique_ptr<ReadBuffer> createReadBuffer(
in.setProgressCallback(context);
}
return wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method);
auto zstd_window_log_max = context->getSettingsRef().zstd_window_log_max;
return wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method, zstd_window_log_max);
}
}

View File

@ -434,7 +434,8 @@ bool StorageS3Source::initialize()
file_path = fs::path(bucket) / current_key;
read_buf = wrapReadBufferWithCompressionMethod(createS3ReadBuffer(current_key), chooseCompressionMethod(current_key, compression_hint));
auto zstd_window_log_max = getContext()->getSettingsRef().zstd_window_log_max;
read_buf = wrapReadBufferWithCompressionMethod(createS3ReadBuffer(current_key), chooseCompressionMethod(current_key, compression_hint), zstd_window_log_max);
auto input_format = getContext()->getInputFormat(format, *read_buf, sample_block, max_block_size, format_settings);
QueryPipelineBuilder builder;
@ -1170,10 +1171,12 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
read_keys_in_distributed_processing->push_back(key);
first = false;
const auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(
s3_configuration.client, s3_configuration.uri.bucket, key, s3_configuration.uri.version_id, s3_configuration.rw_settings.max_single_read_retries, ctx->getReadSettings()),
chooseCompressionMethod(key, compression_method));
chooseCompressionMethod(key, compression_method),
zstd_window_log_max);
};
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx);

View File

@ -350,7 +350,8 @@ namespace
std::move(read_buffer_factory),
threadPoolCallbackRunner(IOThreadPool::get()),
download_threads),
chooseCompressionMethod(request_uri.getPath(), compression_method));
chooseCompressionMethod(request_uri.getPath(), compression_method),
settings.zstd_window_log_max);
}
}
catch (const Poco::Exception & e)
@ -381,7 +382,8 @@ namespace
delay_initialization,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error),
chooseCompressionMethod(request_uri.getPath(), compression_method));
chooseCompressionMethod(request_uri.getPath(), compression_method),
settings.zstd_window_log_max);
}
catch (...)
{

View File

@ -0,0 +1,2 @@
1
40

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: no-parallel
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# reuse the test data in 01946_test_zstd_decompression_with_escape_sequence_at_the_end_of_buffer.sh
$CLICKHOUSE_LOCAL --query "SELECT count() FROM file('$CUR_DIR/data_zstd/test_01946.zstd', JSONEachRow, 'foo String') SETTINGS zstd_window_log_max=20" 2>&1 | grep -c \
"Code: 561. DB::Exception: Zstd stream encoding failed: error 'Frame requires too much memory for decoding'; zstd version: 1.5.0: While executing File. (ZSTD_DECODER_FAILED)"
$CLICKHOUSE_LOCAL --query "SELECT count() FROM file('$CUR_DIR/data_zstd/test_01946.zstd', JSONEachRow, 'foo String') SETTINGS zstd_window_log_max=21"