mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 09:02:00 +00:00
Merge pull request #38196 from ClickHouse/revert-38194-revert-37015-zstd_window_log_max
Revert "Revert "Add a setting to use more memory for zstd decompression""
This commit is contained in:
commit
7700c26076
@ -183,6 +183,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
|
||||
\
|
||||
M(Int64, network_zstd_compression_level, 1, "Allows you to select the level of ZSTD compression.", 0) \
|
||||
\
|
||||
M(Int64, zstd_window_log_max, 0, "Allows you to select the max window log of ZSTD (it will not be used for MergeTree family)", 0) \
|
||||
\
|
||||
M(UInt64, priority, 0, "Priority of the query. 1 - the highest, higher value - lower priority; 0 - do not use priorities.", 0) \
|
||||
M(Int64, os_thread_priority, 0, "If non zero - set corresponding 'nice' value for query processing threads. Can be used to adjust query priority for OS scheduler.", 0) \
|
||||
\
|
||||
|
@ -99,7 +99,7 @@ CompressionMethod chooseCompressionMethod(const std::string & path, const std::s
|
||||
}
|
||||
|
||||
static std::unique_ptr<CompressedReadBufferWrapper> createCompressedWrapper(
|
||||
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, size_t buf_size, char * existing_memory, size_t alignment)
|
||||
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, size_t buf_size, char * existing_memory, size_t alignment, int zstd_window_log_max)
|
||||
{
|
||||
if (method == CompressionMethod::Gzip || method == CompressionMethod::Zlib)
|
||||
return std::make_unique<ZlibInflatingReadBuffer>(std::move(nested), method, buf_size, existing_memory, alignment);
|
||||
@ -110,7 +110,7 @@ static std::unique_ptr<CompressedReadBufferWrapper> createCompressedWrapper(
|
||||
if (method == CompressionMethod::Xz)
|
||||
return std::make_unique<LZMAInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
|
||||
if (method == CompressionMethod::Zstd)
|
||||
return std::make_unique<ZstdInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
|
||||
return std::make_unique<ZstdInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment, zstd_window_log_max);
|
||||
if (method == CompressionMethod::Lz4)
|
||||
return std::make_unique<Lz4InflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
|
||||
#if USE_BZIP2
|
||||
@ -126,14 +126,13 @@ static std::unique_ptr<CompressedReadBufferWrapper> createCompressedWrapper(
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
|
||||
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, size_t buf_size, char * existing_memory, size_t alignment)
|
||||
std::unique_ptr<ReadBuffer> nested, CompressionMethod method, int zstd_window_log_max, size_t buf_size, char * existing_memory, size_t alignment)
|
||||
{
|
||||
if (method == CompressionMethod::None)
|
||||
return nested;
|
||||
return createCompressedWrapper(std::move(nested), method, buf_size, existing_memory, alignment);
|
||||
return createCompressedWrapper(std::move(nested), method, buf_size, existing_memory, alignment, zstd_window_log_max);
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
|
||||
std::unique_ptr<WriteBuffer> nested, CompressionMethod method, int level, size_t buf_size, char * existing_memory, size_t alignment)
|
||||
{
|
||||
|
@ -5,7 +5,6 @@
|
||||
|
||||
#include <Core/Defines.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ReadBuffer;
|
||||
@ -50,10 +49,12 @@ CompressionMethod chooseCompressionMethod(const std::string & path, const std::s
|
||||
std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
|
||||
std::unique_ptr<ReadBuffer> nested,
|
||||
CompressionMethod method,
|
||||
int zstd_window_log_max = 0,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
char * existing_memory = nullptr,
|
||||
size_t alignment = 0);
|
||||
|
||||
|
||||
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
|
||||
std::unique_ptr<WriteBuffer> nested,
|
||||
CompressionMethod method,
|
||||
|
@ -8,7 +8,7 @@ namespace ErrorCodes
|
||||
extern const int ZSTD_DECODER_FAILED;
|
||||
}
|
||||
|
||||
ZstdInflatingReadBuffer::ZstdInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment)
|
||||
ZstdInflatingReadBuffer::ZstdInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment, int zstd_window_log_max)
|
||||
: CompressedReadBufferWrapper(std::move(in_), buf_size, existing_memory, alignment)
|
||||
{
|
||||
dctx = ZSTD_createDCtx();
|
||||
@ -19,6 +19,12 @@ ZstdInflatingReadBuffer::ZstdInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_
|
||||
{
|
||||
throw Exception(ErrorCodes::ZSTD_DECODER_FAILED, "zstd_stream_decoder init failed: zstd version: {}", ZSTD_VERSION_STRING);
|
||||
}
|
||||
|
||||
size_t ret = ZSTD_DCtx_setParameter(dctx, ZSTD_d_windowLogMax, zstd_window_log_max);
|
||||
if (ZSTD_isError(ret))
|
||||
{
|
||||
throw Exception(ErrorCodes::ZSTD_DECODER_FAILED, "zstd_stream_decoder init failed: {}", ZSTD_getErrorName(ret));
|
||||
}
|
||||
}
|
||||
|
||||
ZstdInflatingReadBuffer::~ZstdInflatingReadBuffer()
|
||||
|
@ -20,7 +20,8 @@ public:
|
||||
std::unique_ptr<ReadBuffer> in_,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
char * existing_memory = nullptr,
|
||||
size_t alignment = 0);
|
||||
size_t alignment = 0,
|
||||
int zstd_window_log_max = 0);
|
||||
|
||||
~ZstdInflatingReadBuffer() override;
|
||||
|
||||
|
@ -649,7 +649,7 @@ void HTTPHandler::processQuery(
|
||||
/// Request body can be compressed using algorithm specified in the Content-Encoding header.
|
||||
String http_request_compression_method_str = request.get("Content-Encoding", "");
|
||||
auto in_post = wrapReadBufferWithCompressionMethod(
|
||||
wrapReadBufferReference(request.getStream()), chooseCompressionMethod({}, http_request_compression_method_str));
|
||||
wrapReadBufferReference(request.getStream()), chooseCompressionMethod({}, http_request_compression_method_str), context->getSettingsRef().zstd_window_log_max);
|
||||
|
||||
/// The data can also be compressed using incompatible internal algorithm. This is indicated by
|
||||
/// 'decompress' query parameter.
|
||||
|
@ -197,8 +197,9 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
|
||||
if (it == paths.end())
|
||||
return nullptr;
|
||||
auto compression = chooseCompressionMethod(*it, compression_method);
|
||||
auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
|
||||
return wrapReadBufferWithCompressionMethod(
|
||||
std::make_unique<ReadBufferFromHDFS>(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef()), compression);
|
||||
std::make_unique<ReadBufferFromHDFS>(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef()), compression, zstd_window_log_max);
|
||||
};
|
||||
return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx);
|
||||
}
|
||||
@ -327,7 +328,8 @@ bool HDFSSource::initialize()
|
||||
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_path);
|
||||
|
||||
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
|
||||
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression);
|
||||
const auto zstd_window_log_max = getContext()->getSettingsRef().zstd_window_log_max;
|
||||
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression, zstd_window_log_max);
|
||||
|
||||
auto input_format = getContext()->getInputFormat(storage->format_name, *read_buf, block_for_format, max_block_size);
|
||||
|
||||
|
@ -208,7 +208,8 @@ std::unique_ptr<ReadBuffer> createReadBuffer(
|
||||
in.setProgressCallback(context);
|
||||
}
|
||||
|
||||
return wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method);
|
||||
auto zstd_window_log_max = context->getSettingsRef().zstd_window_log_max;
|
||||
return wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method, zstd_window_log_max);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -434,7 +434,8 @@ bool StorageS3Source::initialize()
|
||||
|
||||
file_path = fs::path(bucket) / current_key;
|
||||
|
||||
read_buf = wrapReadBufferWithCompressionMethod(createS3ReadBuffer(current_key), chooseCompressionMethod(current_key, compression_hint));
|
||||
auto zstd_window_log_max = getContext()->getSettingsRef().zstd_window_log_max;
|
||||
read_buf = wrapReadBufferWithCompressionMethod(createS3ReadBuffer(current_key), chooseCompressionMethod(current_key, compression_hint), zstd_window_log_max);
|
||||
|
||||
auto input_format = getContext()->getInputFormat(format, *read_buf, sample_block, max_block_size, format_settings);
|
||||
QueryPipelineBuilder builder;
|
||||
@ -1170,10 +1171,12 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
|
||||
read_keys_in_distributed_processing->push_back(key);
|
||||
|
||||
first = false;
|
||||
const auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
|
||||
return wrapReadBufferWithCompressionMethod(
|
||||
std::make_unique<ReadBufferFromS3>(
|
||||
s3_configuration.client, s3_configuration.uri.bucket, key, s3_configuration.uri.version_id, s3_configuration.rw_settings.max_single_read_retries, ctx->getReadSettings()),
|
||||
chooseCompressionMethod(key, compression_method));
|
||||
chooseCompressionMethod(key, compression_method),
|
||||
zstd_window_log_max);
|
||||
};
|
||||
|
||||
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx);
|
||||
|
@ -350,7 +350,8 @@ namespace
|
||||
std::move(read_buffer_factory),
|
||||
threadPoolCallbackRunner(IOThreadPool::get()),
|
||||
download_threads),
|
||||
chooseCompressionMethod(request_uri.getPath(), compression_method));
|
||||
chooseCompressionMethod(request_uri.getPath(), compression_method),
|
||||
settings.zstd_window_log_max);
|
||||
}
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
@ -381,7 +382,8 @@ namespace
|
||||
delay_initialization,
|
||||
/* use_external_buffer */ false,
|
||||
/* skip_url_not_found_error */ skip_url_not_found_error),
|
||||
chooseCompressionMethod(request_uri.getPath(), compression_method));
|
||||
chooseCompressionMethod(request_uri.getPath(), compression_method),
|
||||
settings.zstd_window_log_max);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -0,0 +1,2 @@
|
||||
1
|
||||
40
|
10
tests/queries/0_stateless/02293_test_zstd_window_log_max.sh
Executable file
10
tests/queries/0_stateless/02293_test_zstd_window_log_max.sh
Executable file
@ -0,0 +1,10 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-parallel
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
# reuse the test data in 01946_test_zstd_decompression_with_escape_sequence_at_the_end_of_buffer.sh
|
||||
$CLICKHOUSE_LOCAL --query "SELECT count() FROM file('$CUR_DIR/data_zstd/test_01946.zstd', JSONEachRow, 'foo String') SETTINGS zstd_window_log_max = 20" 2>&1 | grep -c "ZSTD_DECODER_FAILED"
|
||||
$CLICKHOUSE_LOCAL --query "SELECT count() FROM file('$CUR_DIR/data_zstd/test_01946.zstd', JSONEachRow, 'foo String') SETTINGS zstd_window_log_max = 21"
|
Loading…
Reference in New Issue
Block a user