Merge pull request #61456 from kitaisreal/temporary-data-use-temporary-files-codec-setting

Temporary data use temporary_files_codec setting
This commit is contained in:
vdimir 2024-03-20 10:54:17 +01:00 committed by GitHub
commit 9a43f7941b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 85 additions and 38 deletions

View File

@ -7,6 +7,8 @@
#include <Poco/String.h>
#include <IO/ReadBuffer.h>
#include <Parsers/queryToString.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Compression/CompressionCodecMultiple.h>
#include <Compression/CompressionCodecNone.h>
#include <IO/WriteHelpers.h>
@ -44,6 +46,12 @@ CompressionCodecPtr CompressionCodecFactory::get(const String & family_name, std
}
}
CompressionCodecPtr CompressionCodecFactory::get(const String & compression_codec) const
{
ParserCodec codec_parser;
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
return CompressionCodecFactory::instance().get(ast, nullptr);
}
CompressionCodecPtr CompressionCodecFactory::get(
const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default, bool only_generic) const

View File

@ -68,6 +68,9 @@ public:
/// For backward compatibility with config settings
CompressionCodecPtr get(const String & family_name, std::optional<int> level) const;
/// Get codec by name with optional params. Example: LZ4, ZSTD(3)
CompressionCodecPtr get(const String & compression_codec) const;
/// Register codec with parameters and column type
void registerCompressionCodecWithType(const String & family_name, std::optional<uint8_t> byte_code, CreatorWithType creator);
/// Register codec with parameters

View File

@ -451,7 +451,7 @@ class IColumn;
\
M(Bool, compatibility_ignore_collation_in_create_table, true, "Compatibility ignore collation in create table", 0) \
\
M(String, temporary_files_codec, "LZ4", "Set compression codec for temporary files (sort and join on disk). I.e. LZ4, NONE.", 0) \
M(String, temporary_files_codec, "LZ4", "Set compression codec for temporary files produced by (JOINs, external GROUP BY, external ORDER BY). I.e. LZ4, NONE.", 0) \
\
M(UInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.", 0) \
M(UInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.", 0) \

View File

@ -1073,7 +1073,9 @@ void Context::setTemporaryStoragePath(const String & path, size_t max_size)
setupTmpPath(shared->log, disk->getPath());
}
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, max_size);
TemporaryDataOnDiskSettings temporary_data_on_disk_settings;
temporary_data_on_disk_settings.max_size_on_disk = max_size;
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(std::move(volume), std::move(temporary_data_on_disk_settings));
}
void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_size)
@ -1093,7 +1095,7 @@ void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_s
VolumePtr volume = tmp_policy->getVolume(0);
if (volume->getDisks().empty())
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "No disks volume for temporary files");
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "No disks volume for temporary files");
for (const auto & disk : volume->getDisks())
{
@ -1119,7 +1121,9 @@ void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_s
if (shared->root_temp_data_on_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set");
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, max_size);
TemporaryDataOnDiskSettings temporary_data_on_disk_settings;
temporary_data_on_disk_settings.max_size_on_disk = max_size;
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(std::move(volume), std::move(temporary_data_on_disk_settings));
}
void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t max_size)
@ -1140,7 +1144,10 @@ void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t
shared->tmp_path = file_cache->getBasePath();
VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path, shared->getConfigRefWithLock(lock));
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, file_cache.get(), max_size);
TemporaryDataOnDiskSettings temporary_data_on_disk_settings;
temporary_data_on_disk_settings.max_size_on_disk = max_size;
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(std::move(volume), file_cache.get(), std::move(temporary_data_on_disk_settings));
}
void Context::setFlagsPath(const String & path)

View File

@ -208,8 +208,13 @@ ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr q
thread_group->memory_tracker.setParent(&user_process_list.user_memory_tracker);
if (user_process_list.user_temp_data_on_disk)
{
TemporaryDataOnDiskSettings temporary_data_on_disk_settings
{
.max_size_on_disk = settings.max_temporary_data_on_disk_size_for_query,
.compression_codec = settings.temporary_files_codec
};
query_context->setTempDataOnDisk(std::make_shared<TemporaryDataOnDiskScope>(
user_process_list.user_temp_data_on_disk, settings.max_temporary_data_on_disk_size_for_query));
user_process_list.user_temp_data_on_disk, std::move(temporary_data_on_disk_settings)));
}
/// Set query-level memory trackers
@ -682,8 +687,15 @@ ProcessListForUser::ProcessListForUser(ContextPtr global_context, ProcessList *
if (global_context)
{
size_t size_limit = global_context->getSettingsRef().max_temporary_data_on_disk_size_for_user;
user_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(global_context->getSharedTempDataOnDisk(), size_limit);
const auto & settings = global_context->getSettingsRef();
TemporaryDataOnDiskSettings temporary_data_on_disk_settings
{
.max_size_on_disk = settings.max_temporary_data_on_disk_size_for_user,
.compression_codec = settings.temporary_files_codec
};
user_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(global_context->getSharedTempDataOnDisk(),
std::move(temporary_data_on_disk_settings));
}
}

View File

@ -44,20 +44,20 @@ void TemporaryDataOnDiskScope::deltaAllocAndCheck(ssize_t compressed_delta, ssiz
}
size_t new_consumprion = stat.compressed_size + compressed_delta;
if (compressed_delta > 0 && limit && new_consumprion > limit)
if (compressed_delta > 0 && settings.max_size_on_disk && new_consumprion > settings.max_size_on_disk)
throw Exception(ErrorCodes::TOO_MANY_ROWS_OR_BYTES,
"Limit for temporary files size exceeded (would consume {} / {} bytes)", new_consumprion, limit);
"Limit for temporary files size exceeded (would consume {} / {} bytes)", new_consumprion, settings.max_size_on_disk);
stat.compressed_size += compressed_delta;
stat.uncompressed_size += uncompressed_delta;
}
TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_)
: TemporaryDataOnDiskScope(std::move(parent_), /* limit_ = */ 0)
: TemporaryDataOnDiskScope(parent_, parent_->getSettings())
{}
TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Metric metric_scope)
: TemporaryDataOnDiskScope(std::move(parent_), /* limit_ = */ 0)
: TemporaryDataOnDiskScope(parent_, parent_->getSettings())
, current_metric_scope(metric_scope)
{}
@ -153,11 +153,19 @@ bool TemporaryDataOnDisk::empty() const
return streams.empty();
}
static inline CompressionCodecPtr getCodec(const TemporaryDataOnDiskSettings & settings)
{
if (settings.compression_codec.empty())
return CompressionCodecFactory::instance().get("NONE");
return CompressionCodecFactory::instance().get(settings.compression_codec);
}
struct TemporaryFileStream::OutputWriter
{
OutputWriter(std::unique_ptr<WriteBuffer> out_buf_, const Block & header_)
OutputWriter(std::unique_ptr<WriteBuffer> out_buf_, const Block & header_, const TemporaryDataOnDiskSettings & settings)
: out_buf(std::move(out_buf_))
, out_compressed_buf(*out_buf)
, out_compressed_buf(*out_buf, getCodec(settings))
, out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_)
{
}
@ -248,7 +256,7 @@ TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const
: parent(parent_)
, header(header_)
, file(std::move(file_))
, out_writer(std::make_unique<OutputWriter>(std::make_unique<WriteBufferFromFile>(file->getAbsolutePath()), header))
, out_writer(std::make_unique<OutputWriter>(std::make_unique<WriteBufferFromFile>(file->getAbsolutePath()), header, parent->settings))
{
LOG_TEST(getLogger("TemporaryFileStream"), "Writing to temporary file {}", file->getAbsolutePath());
}
@ -263,7 +271,7 @@ TemporaryFileStream::TemporaryFileStream(FileSegmentsHolderPtr segments_, const
auto out_buf = std::make_unique<WriteBufferToFileSegment>(&segment_holder->front());
LOG_TEST(getLogger("TemporaryFileStream"), "Writing to temporary file {}", out_buf->getFileName());
out_writer = std::make_unique<OutputWriter>(std::move(out_buf), header);
out_writer = std::make_unique<OutputWriter>(std::move(out_buf), header, parent_->settings);
}
size_t TemporaryFileStream::write(const Block & block)

View File

@ -28,6 +28,15 @@ using TemporaryFileStreamPtr = std::unique_ptr<TemporaryFileStream>;
class FileCache;
struct TemporaryDataOnDiskSettings
{
/// Max size on disk, if 0 there will be no limit
size_t max_size_on_disk = 0;
/// Compression codec for temporary data, if empty no compression will be used. LZ4 by default
String compression_codec = "LZ4";
};
/*
* Used to account amount of temporary data written to disk.
* If limit is set, throws exception if limit is exceeded.
@ -43,22 +52,30 @@ public:
std::atomic<size_t> uncompressed_size;
};
explicit TemporaryDataOnDiskScope(VolumePtr volume_, size_t limit_)
: volume(std::move(volume_)), limit(limit_)
explicit TemporaryDataOnDiskScope(VolumePtr volume_, TemporaryDataOnDiskSettings settings_)
: volume(std::move(volume_))
, settings(std::move(settings_))
{}
explicit TemporaryDataOnDiskScope(VolumePtr volume_, FileCache * file_cache_, size_t limit_)
: volume(std::move(volume_)), file_cache(file_cache_), limit(limit_)
explicit TemporaryDataOnDiskScope(VolumePtr volume_, FileCache * file_cache_, TemporaryDataOnDiskSettings settings_)
: volume(std::move(volume_))
, file_cache(file_cache_)
, settings(std::move(settings_))
{}
explicit TemporaryDataOnDiskScope(TemporaryDataOnDiskScopePtr parent_, size_t limit_)
: parent(std::move(parent_)), volume(parent->volume), file_cache(parent->file_cache), limit(limit_)
explicit TemporaryDataOnDiskScope(TemporaryDataOnDiskScopePtr parent_, TemporaryDataOnDiskSettings settings_)
: parent(std::move(parent_))
, volume(parent->volume)
, file_cache(parent->file_cache)
, settings(std::move(settings_))
{}
/// TODO: remove
/// Refactor all code that uses volume directly to use TemporaryDataOnDisk.
VolumePtr getVolume() const { return volume; }
const TemporaryDataOnDiskSettings & getSettings() const { return settings; }
protected:
void deltaAllocAndCheck(ssize_t compressed_delta, ssize_t uncompressed_delta);
@ -68,14 +85,14 @@ protected:
FileCache * file_cache = nullptr;
StatAtomic stat;
size_t limit = 0;
const TemporaryDataOnDiskSettings settings;
};
/*
* Holds the set of temporary files.
* New file stream is created with `createStream`.
* Streams are owned by this object and will be deleted when it is deleted.
* It's a leaf node in temorarty data scope tree.
* It's a leaf node in temporary data scope tree.
*/
class TemporaryDataOnDisk : private TemporaryDataOnDiskScope
{

View File

@ -947,7 +947,7 @@ TEST_F(FileCacheTest, temporaryData)
file_cache.initialize();
const auto user = FileCache::getCommonUser();
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(nullptr, &file_cache, 0);
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(nullptr, &file_cache, TemporaryDataOnDiskSettings{});
auto some_data_holder = file_cache.getOrSet(file_cache.createKeyForPath("some_data"), 0, 5_KiB, 5_KiB, CreateFileSegmentSettings{}, 0, user);
@ -1130,7 +1130,7 @@ TEST_F(FileCacheTest, TemporaryDataReadBufferSize)
DB::FileCache file_cache("cache", settings);
file_cache.initialize();
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(/*volume=*/nullptr, &file_cache, /*limit=*/0);
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(/*volume=*/nullptr, &file_cache, /*settings=*/TemporaryDataOnDiskSettings{});
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);
@ -1152,7 +1152,7 @@ TEST_F(FileCacheTest, TemporaryDataReadBufferSize)
disk = createDisk("temporary_data_read_buffer_size_test_dir");
VolumePtr volume = std::make_shared<SingleDiskVolume>("volume", disk);
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(/*volume=*/volume, /*cache=*/nullptr, /*limit=*/0);
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(/*volume=*/volume, /*cache=*/nullptr, /*settings=*/TemporaryDataOnDiskSettings{});
auto tmp_data = std::make_unique<TemporaryDataOnDisk>(tmp_data_scope);

View File

@ -9,13 +9,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static CompressionCodecPtr getMarksCompressionCodec(const String & marks_compression_codec)
{
ParserCodec codec_parser;
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(marks_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
return CompressionCodecFactory::instance().get(ast, nullptr);
}
MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
const MergeTreeMutableDataPartPtr & data_part_,
const NamesAndTypesList & columns_list_,
@ -46,7 +39,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
{
marks_compressor = std::make_unique<CompressedWriteBuffer>(
*marks_file_hashing,
getMarksCompressionCodec(settings_.marks_compression_codec),
CompressionCodecFactory::instance().get(settings_.marks_compression_codec),
settings_.marks_compress_block_size);
marks_source_hashing = std::make_unique<HashingWriteBuffer>(*marks_compressor);

View File

@ -242,9 +242,7 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex()
if (compress_primary_key)
{
ParserCodec codec_parser;
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.primary_key_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
CompressionCodecPtr primary_key_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr);
CompressionCodecPtr primary_key_compression_codec = CompressionCodecFactory::instance().get(settings.primary_key_compression_codec);
index_compressor_stream = std::make_unique<CompressedWriteBuffer>(*index_file_hashing_stream, primary_key_compression_codec, settings.primary_key_compress_block_size);
index_source_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_compressor_stream);
}

View File

@ -67,6 +67,7 @@ def test_cache_evicted_by_temporary_data(start_cluster):
settings={
"max_bytes_before_external_group_by": "4M",
"max_bytes_before_external_sort": "4M",
"temporary_files_codec": "ZSTD",
},
)
assert fnmatch.fnmatch(