Merge pull request #58539 from canhld94/file_custom_compress_level

Allow explicitly set compression level in output format
This commit is contained in:
pufit 2024-01-09 13:43:38 -05:00 committed by GitHub
commit 6cf55b82f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 99 additions and 26 deletions

View File

@ -4773,6 +4773,24 @@ Type: Int64
Default: 0
## output_format_compression_level
Default compression level if query output is compressed. The setting is applied when `SELECT` query has `INTO OUTFILE` or when writing to table functions `file`, `url`, `hdfs`, `s3`, or `azureBlobStorage`.
Possible values: from `1` to `22`
Default: `3`
## output_format_compression_zstd_window_log
Can be used when the output compression method is `zstd`. If greater than `0`, this setting explicitly sets compression window size (power of `2`) and enables a long-range mode for zstd compression. This can help to achieve a better compression ratio.
Possible values: non-negative numbers. Note that if the value is too small or too big, `zstdlib` will throw an exception. Typical values are from `20` (window size = `1MB`) to `30` (window size = `1GB`).
Default: `0`
## rewrite_count_distinct_if_with_count_distinct_implementation
Allows you to rewrite `countDistcintIf` with [count_distinct_implementation](#count_distinct_implementation) setting.

View File

@ -204,6 +204,8 @@ class IColumn;
M(Bool, input_format_parallel_parsing, true, "Enable parallel parsing for some data formats.", 0) \
M(UInt64, min_chunk_bytes_for_parallel_parsing, (10 * 1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \
M(Bool, output_format_parallel_formatting, true, "Enable parallel formatting for some data formats.", 0) \
M(UInt64, output_format_compression_level, 3, "Default compression level if query output is compressed. The setting is applied when `SELECT` query has `INTO OUTFILE` or when inserting to table function `file`, `url`, `hdfs`, `s3`, and `azureBlobStorage`.", 0) \
M(UInt64, output_format_compression_zstd_window_log, 0, "Can be used when the output compression method is `zstd`. If greater than `0`, this setting explicitly sets compression window size (power of `2`) and enables a long-range mode for zstd compression.", 0) \
\
M(UInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.", 0) \
M(UInt64, merge_tree_min_bytes_for_concurrent_read, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized.", 0) \

View File

@ -170,7 +170,7 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
}
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested, CompressionMethod method, int level, size_t buf_size, char * existing_memory, size_t alignment)
std::unique_ptr<WriteBuffer> nested, CompressionMethod method, int level, int zstd_window_log, size_t buf_size, char * existing_memory, size_t alignment)
{
if (method == DB::CompressionMethod::Gzip || method == CompressionMethod::Zlib)
return std::make_unique<ZlibDeflatingWriteBuffer>(std::move(nested), method, level, buf_size, existing_memory, alignment);
@ -183,7 +183,7 @@ std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
return std::make_unique<LZMADeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
if (method == CompressionMethod::Zstd)
return std::make_unique<ZstdDeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<ZstdDeflatingWriteBuffer>(std::move(nested), level, zstd_window_log, buf_size, existing_memory, alignment);
if (method == CompressionMethod::Lz4)
return std::make_unique<Lz4DeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);

View File

@ -66,6 +66,7 @@ std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested,
CompressionMethod method,
int level,
int zstd_window_log = 0,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);

View File

@ -1,30 +1,51 @@
#include <IO/ZstdDeflatingWriteBuffer.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ZSTD_ENCODER_FAILED;
extern const int ILLEGAL_CODEC_PARAMETER;
}
static void setZstdParameter(ZSTD_CCtx * cctx, ZSTD_cParameter param, int value)
{
auto ret = ZSTD_CCtx_setParameter(cctx, param, value);
if (ZSTD_isError(ret))
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED,
"zstd stream encoder option setting failed: error code: {}; zstd version: {}",
ret,
ZSTD_VERSION_STRING);
}
ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
std::unique_ptr<WriteBuffer> out_, int compression_level, int window_log, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
{
cctx = ZSTD_createCCtx();
if (cctx == nullptr)
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder init failed: zstd version: {}", ZSTD_VERSION_STRING);
size_t ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, compression_level);
if (ZSTD_isError(ret))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED,
"zstd stream encoder option setting failed: error code: {}; zstd version: {}",
ret, ZSTD_VERSION_STRING);
ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1);
if (ZSTD_isError(ret))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED,
"zstd stream encoder option setting failed: error code: {}; zstd version: {}",
ret, ZSTD_VERSION_STRING);
setZstdParameter(cctx, ZSTD_c_compressionLevel, compression_level);
if (window_log > 0)
{
ZSTD_bounds window_log_bounds = ZSTD_cParam_getBounds(ZSTD_c_windowLog);
if (ZSTD_isError(window_log_bounds.error))
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "ZSTD windowLog parameter is not supported {}",
std::string(ZSTD_getErrorName(window_log_bounds.error)));
if (window_log > window_log_bounds.upperBound || window_log < window_log_bounds.lowerBound)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER,
"ZSTD codec can't have window log more than {} and lower than {}, given {}",
toString(window_log_bounds.upperBound),
toString(window_log_bounds.lowerBound), toString(window_log));
setZstdParameter(cctx, ZSTD_c_enableLongDistanceMatching, 1);
setZstdParameter(cctx, ZSTD_c_windowLog, window_log);
}
setZstdParameter(cctx, ZSTD_c_checksumFlag, 1);
input = {nullptr, 0, 0};
output = {nullptr, 0, 0};

View File

@ -17,6 +17,7 @@ public:
ZstdDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
int compression_level,
int window_log = 0,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);

View File

@ -1434,11 +1434,12 @@ void executeQuery(
const auto & compression_method_node = ast_query_with_output->compression->as<ASTLiteral &>();
compression_method = compression_method_node.value.safeGet<std::string>();
}
const auto & settings = context->getSettingsRef();
compressed_buffer = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT),
chooseCompressionMethod(out_file, compression_method),
/* compression level = */ 3
/* compression level = */ static_cast<int>(settings.output_format_compression_level),
/* zstd_window_log = */ static_cast<int>(settings.output_format_compression_zstd_window_log)
);
}

View File

@ -448,6 +448,7 @@ PODArray<char> & compress(PODArray<char> & source, PODArray<char> & scratch, Com
std::move(dest_buf),
method,
/*level*/ 3,
/*zstd_window_log*/ 0,
source.size(),
/*existing_memory*/ source.data());
chassert(compressed_buf->position() == source.data());

View File

@ -112,6 +112,7 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
std::make_unique<WriteBufferFromOStream>(*response_body_ostr),
compress ? compression_method : CompressionMethod::None,
compression_level,
0,
working_buffer.size(),
working_buffer.begin());
else

View File

@ -724,13 +724,13 @@ public:
const CompressionMethod compression_method)
: SinkToStorage(sample_block)
{
const auto & settings = context->getSettingsRef();
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHDFS>(
uri,
context->getGlobalContext()->getConfigRef(),
context->getSettingsRef().hdfs_replication,
context->getWriteSettings()),
compression_method, 3);
uri, context->getGlobalContext()->getConfigRef(), context->getSettingsRef().hdfs_replication, context->getWriteSettings()),
compression_method,
static_cast<int>(settings.output_format_compression_level),
static_cast<int>(settings.output_format_compression_zstd_window_log));
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context);
}

View File

@ -535,7 +535,12 @@ public:
, format_settings(format_settings_)
{
StoredObject object(blob_path);
write_buf = wrapWriteBufferWithCompressionMethod(object_storage->writeObject(object, WriteMode::Rewrite), compression_method, 3);
const auto & settings = context->getSettingsRef();
write_buf = wrapWriteBufferWithCompressionMethod(
object_storage->writeObject(object, WriteMode::Rewrite),
compression_method,
static_cast<int>(settings.output_format_compression_level),
static_cast<int>(settings.output_format_compression_zstd_window_log));
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, format_settings);
}

View File

@ -1575,8 +1575,12 @@ public:
/// In case of formats with prefixes if file is not empty we have already written prefix.
bool do_not_write_prefix = naked_buffer->size();
write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3);
const auto & settings = context->getSettingsRef();
write_buf = wrapWriteBufferWithCompressionMethod(
std::move(naked_buffer),
compression_method,
static_cast<int>(settings.output_format_compression_level),
static_cast<int>(settings.output_format_compression_zstd_window_log));
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format_name,
*write_buf, metadata_snapshot->getSampleBlock(), context, format_settings);

View File

@ -858,6 +858,7 @@ public:
blob_log->query_id = context->getCurrentQueryId();
}
const auto & settings = context->getSettingsRef();
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(
configuration_.client,
@ -870,7 +871,8 @@ public:
threadPoolCallbackRunner<void>(getIOThreadPool().get(), "S3ParallelWrite"),
context->getWriteSettings()),
compression_method,
3);
static_cast<int>(settings.output_format_compression_level),
static_cast<int>(settings.output_format_compression_zstd_window_log));
writer
= FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, format_settings);
}

View File

@ -541,11 +541,12 @@ StorageURLSink::StorageURLSink(
Poco::URI(uri), http_method, content_type, content_encoding, headers, timeouts, DBMS_DEFAULT_BUFFER_SIZE, proxy_config
);
const auto & settings = context->getSettingsRef();
write_buf = wrapWriteBufferWithCompressionMethod(
std::move(write_buffer),
compression_method,
3
);
static_cast<int>(settings.output_format_compression_level),
static_cast<int>(settings.output_format_compression_zstd_window_log));
writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block, context, format_settings);
}

View File

@ -0,0 +1,2 @@
1
1000000

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
# Tags: replica
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
file_with_random_postfix=test_02961_`date +%s%6N`.csv
${CLICKHOUSE_CLIENT} --query "INSERT INTO FUNCTION file('${file_with_random_postfix}', 'CSV', 'x UInt64', 'zstd') SELECT number FROM numbers(1000000) SETTINGS output_format_compression_level = 10, output_format_compression_zstd_window_log = 30, engine_file_truncate_on_insert = 1;"
# Simple check that output_format_compression_zstd_window_log = 30 works
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM file('${file_with_random_postfix}', 'CSV', 'x UInt64', 'zstd') SETTINGS zstd_window_log_max = 29;" 2>&1 | head -n 1 | grep -c "ZSTD_DECODER_FAILED"
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM file('${file_with_random_postfix}', 'CSV', 'x UInt64', 'zstd') SETTINGS zstd_window_log_max = 30;"