From 1e0241e262a23f80abc0db55cae61fc172b0f692 Mon Sep 17 00:00:00 2001 From: Vladimir Chebotarev Date: Fri, 13 Oct 2017 04:02:16 +0300 Subject: [PATCH] dbms: CompressionSettingsSelector. [#METR-21516] --- dbms/src/IO/CompressedWriteBuffer.cpp | 4 +- dbms/src/IO/CompressionSettings.cpp | 41 +++++++++++++++---- dbms/src/IO/CompressionSettings.h | 6 ++- dbms/src/Interpreters/Context.cpp | 18 ++++---- dbms/src/Interpreters/Context.h | 6 +-- dbms/src/Server/Compressor.cpp | 5 ++- ...lector.h => CompressionSettingsSelector.h} | 29 +++++++------ dbms/src/Storages/MergeTree/MergeTreeData.cpp | 4 +- .../MergeTree/MergeTreeDataMerger.cpp | 10 ++--- .../MergeTree/MergeTreeDataWriter.cpp | 4 +- .../MergeTree/MergedBlockOutputStream.cpp | 26 ++++++------ .../MergeTree/MergedBlockOutputStream.h | 12 +++--- dbms/src/Storages/StorageLog.cpp | 2 +- dbms/src/Storages/StorageStripeLog.cpp | 2 +- dbms/src/Storages/StorageTinyLog.cpp | 2 +- 15 files changed, 101 insertions(+), 70 deletions(-) rename dbms/src/Storages/{CompressionMethodSelector.h => CompressionSettingsSelector.h} (68%) diff --git a/dbms/src/IO/CompressedWriteBuffer.cpp b/dbms/src/IO/CompressedWriteBuffer.cpp index 71a0e5d2bcf..b9a121b466b 100644 --- a/dbms/src/IO/CompressedWriteBuffer.cpp +++ b/dbms/src/IO/CompressedWriteBuffer.cpp @@ -59,7 +59,7 @@ void CompressedWriteBuffer::nextImpl() &compressed_buffer[header_size], uncompressed_size, LZ4_COMPRESSBOUND(uncompressed_size), - 0); + compression_settings.level); UInt32 compressed_size_32 = compressed_size; UInt32 uncompressed_size_32 = uncompressed_size; @@ -83,7 +83,7 @@ void CompressedWriteBuffer::nextImpl() compressed_buffer.size() - header_size, working_buffer.begin(), uncompressed_size, - compression_settings.zstd_level); + compression_settings.level); if (ZSTD_isError(res)) throw Exception("Cannot compress block with ZSTD: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_COMPRESS); diff --git a/dbms/src/IO/CompressionSettings.cpp b/dbms/src/IO/CompressionSettings.cpp index 946833590b7..7b74e59fd3b 100644 --- a/dbms/src/IO/CompressionSettings.cpp +++ b/dbms/src/IO/CompressionSettings.cpp @@ -10,20 +10,43 @@ CompressionSettings::CompressionSettings() { } -CompressionSettings::CompressionSettings(CompressionMethod method): - method(method) -{ -} - -CompressionSettings::CompressionSettings(CompressionMethod method, int zstd_level): +CompressionSettings::CompressionSettings(CompressionMethod method, int level): method(method), - zstd_level(zstd_level) + level(level) { } -CompressionSettings::CompressionSettings(const Settings & settings): - CompressionSettings(settings.network_compression_method, settings.network_zstd_compression_level) +CompressionSettings::CompressionSettings(CompressionMethod method): + CompressionSettings(method, getDefaultLevel(method)) { } +CompressionSettings::CompressionSettings(const Settings & settings) +{ + method = settings.network_compression_method; + switch (method) + { + case CompressionMethod::ZSTD: + level = settings.network_zstd_compression_level; + break; + default: + level = getDefaultLevel(method); + } +} + +int CompressionSettings::getDefaultLevel(CompressionMethod method) +{ + switch (method) + { + case CompressionMethod::LZ4: + return -1; + case CompressionMethod::LZ4HC: + return 0; + case CompressionMethod::ZSTD: + return 1; + default: + return -1; + } +} + } diff --git a/dbms/src/IO/CompressionSettings.h b/dbms/src/IO/CompressionSettings.h index 2eb6ee909eb..d0e04039bc2 100644 --- a/dbms/src/IO/CompressionSettings.h +++ b/dbms/src/IO/CompressionSettings.h @@ -11,12 +11,14 @@ class Settings; struct CompressionSettings { CompressionMethod method = CompressionMethod::LZ4; - int zstd_level = 1; + int level; CompressionSettings(); CompressionSettings(CompressionMethod method); - CompressionSettings(CompressionMethod method, int zstd_level); + CompressionSettings(CompressionMethod method, int level); CompressionSettings(const Settings & settings); + + static int getDefaultLevel(CompressionMethod method); }; } diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 1e0023a1b3e..c09eddac8f3 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -16,13 +16,14 @@ #include #include #include +#include #include #include #include #include #include #include -#include +#include #include #include #include @@ -42,7 +43,6 @@ #include #include #include -#include #include #include @@ -126,8 +126,8 @@ struct ContextShared Macros macros; /// Substitutions extracted from config. std::unique_ptr compiler; /// Used for dynamic compilation of queries' parts if it necessary. std::shared_ptr ddl_worker; /// Process ddl commands from zk. - /// Rules for selecting the compression method, depending on the size of the part. - mutable std::unique_ptr compression_method_selector; + /// Rules for selecting the compression settings, depending on the size of the part. + mutable std::unique_ptr compression_settings_selector; std::unique_ptr merge_tree_settings; /// Settings of MergeTree* engines. size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default) @@ -1384,22 +1384,22 @@ PartLog * Context::getPartLog(const String & database, const String & table) } -CompressionMethod Context::chooseCompressionMethod(size_t part_size, double part_size_ratio) const +CompressionSettings Context::chooseCompressionSettings(size_t part_size, double part_size_ratio) const { auto lock = getLock(); - if (!shared->compression_method_selector) + if (!shared->compression_settings_selector) { constexpr auto config_name = "compression"; auto & config = getConfigRef(); if (config.has(config_name)) - shared->compression_method_selector = std::make_unique(config, "compression"); + shared->compression_settings_selector = std::make_unique(config, "compression"); else - shared->compression_method_selector = std::make_unique(); + shared->compression_settings_selector = std::make_unique(); } - return shared->compression_method_selector->choose(part_size, part_size_ratio); + return shared->compression_settings_selector->choose(part_size, part_size_ratio); } diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 8795c5a9799..47c20be8140 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -11,7 +11,7 @@ #include #include #include -#include +#include namespace Poco @@ -328,8 +328,8 @@ public: void setMaxTableSizeToDrop(size_t max_size); void checkTableCanBeDropped(const String & database, const String & table, size_t table_size); - /// Lets you select the compression method according to the conditions described in the configuration file. - CompressionMethod chooseCompressionMethod(size_t part_size, double part_size_ratio) const; + /// Lets you select the compression settings according to the conditions described in the configuration file. + CompressionSettings chooseCompressionSettings(size_t part_size, double part_size_ratio) const; /// Get the server uptime in seconds. time_t getUptimeSeconds() const; diff --git a/dbms/src/Server/Compressor.cpp b/dbms/src/Server/Compressor.cpp index 67dca9113f0..4a412d987b4 100644 --- a/dbms/src/Server/Compressor.cpp +++ b/dbms/src/Server/Compressor.cpp @@ -61,6 +61,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv) ("block-size,b", boost::program_options::value()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size") ("hc", "use LZ4HC instead of LZ4") ("zstd", "use ZSTD instead of LZ4") + ("level", "compression level") ("none", "use no compression instead of LZ4") ("stat", "print block statistics of compressed data") ; @@ -93,6 +94,8 @@ int mainEntryClickHouseCompressor(int argc, char ** argv) else if (use_none) method = DB::CompressionMethod::NONE; + DB::CompressionSettings settings(method, options.count("level") > 0 ? options["level"].as() : DB::CompressionSettings::getDefaultLevel(method)); + DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO); DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO); @@ -110,7 +113,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv) else { /// Compression - DB::CompressedWriteBuffer to(wb, method, block_size); + DB::CompressedWriteBuffer to(wb, settings, block_size); DB::copyData(rb, to); } } diff --git a/dbms/src/Storages/CompressionMethodSelector.h b/dbms/src/Storages/CompressionSettingsSelector.h similarity index 68% rename from dbms/src/Storages/CompressionMethodSelector.h rename to dbms/src/Storages/CompressionSettingsSelector.h index c762bae1513..dc4c0c92db1 100644 --- a/dbms/src/Storages/CompressionMethodSelector.h +++ b/dbms/src/Storages/CompressionSettingsSelector.h @@ -15,7 +15,7 @@ namespace ErrorCodes } -/** Allows you to select the compression method for the conditions specified in the configuration file. +/** Allows you to select the compression settings for the conditions specified in the configuration file. * The config looks like this @@ -29,6 +29,7 @@ namespace ErrorCodes zstd + 2 @@ -36,23 +37,23 @@ namespace ErrorCodes */ -class CompressionMethodSelector +class CompressionSettingsSelector { private: struct Element { size_t min_part_size = 0; double min_part_size_ratio = 0; - CompressionMethod method = CompressionMethod::LZ4; + CompressionSettings settings = CompressionSettings(CompressionMethod::LZ4); - void setMethod(const std::string & name) + static CompressionMethod compressionMethodFromString(const std::string & name) { if (name == "lz4") - method = CompressionMethod::LZ4; + return CompressionMethod::LZ4; else if (name == "zstd") - method = CompressionMethod::ZSTD; + return CompressionMethod::ZSTD; else if (name == "none") - method = CompressionMethod::NONE; + return CompressionMethod::NONE; else throw Exception("Unknown compression method " + name, ErrorCodes::UNKNOWN_COMPRESSION_METHOD); } @@ -62,7 +63,9 @@ private: min_part_size = config.getUInt64(config_prefix + ".min_part_size", 0); min_part_size_ratio = config.getDouble(config_prefix + ".min_part_size_ratio", 0); - setMethod(config.getString(config_prefix + ".method")); + CompressionMethod method = compressionMethodFromString(config.getString(config_prefix + ".method")); + int level = config.getInt64(config_prefix + ".level", CompressionSettings::getDefaultLevel(method)); + settings = CompressionSettings(method, level); } bool check(size_t part_size, double part_size_ratio) const @@ -75,9 +78,9 @@ private: std::vector elements; public: - CompressionMethodSelector() {} /// Always returns the default method. + CompressionSettingsSelector() {} /// Always returns the default method. - CompressionMethodSelector(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) + CompressionSettingsSelector(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) { Poco::Util::AbstractConfiguration::Keys keys; config.keys(config_prefix, keys); @@ -91,13 +94,13 @@ public: } } - CompressionMethod choose(size_t part_size, double part_size_ratio) const + CompressionSettings choose(size_t part_size, double part_size_ratio) const { - CompressionMethod res = CompressionMethod::LZ4; + CompressionSettings res = CompressionSettings(CompressionMethod::LZ4); for (const auto & element : elements) if (element.check(part_size, part_size_ratio)) - res = element.method; + res = element.settings; return res; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 030bf57fbf7..a8d57f95d84 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1123,7 +1123,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart( *this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, expression->getRequiredColumns(), ranges, false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false); - auto compression_method = this->context.chooseCompressionMethod( + auto compression_settings = this->context.chooseCompressionSettings( part->size_in_bytes, static_cast(part->size_in_bytes) / this->getTotalActiveSizeInBytes()); ExpressionBlockInputStream in(part_in, expression); @@ -1135,7 +1135,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart( * temporary column name ('converting_column_name') created in 'createConvertExpression' method * will have old name of shared offsets for arrays. */ - MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, compression_method, true /* skip_offsets */); + MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, compression_settings, true /* skip_offsets */); in.readPrefix(); out.writePrefix(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index f4d46d658f2..ad23021335a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -619,12 +619,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart if (deduplicate && merged_stream->isGroupedOutput()) merged_stream = std::make_shared(merged_stream, Limits(), 0 /*limit_hint*/, Names()); - auto compression_method = data.context.chooseCompressionMethod( + auto compression_settings = data.context.chooseCompressionSettings( merge_entry->total_size_bytes_compressed, static_cast (merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes()); MergedBlockOutputStream to{ - data, new_part_tmp_path, merging_columns, compression_method, merged_column_to_size, aio_threshold}; + data, new_part_tmp_path, merging_columns, compression_settings, merged_column_to_size, aio_threshold}; merged_stream->readPrefix(); to.writePrefix(); @@ -702,7 +702,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart rows_sources_read_buf.seek(0, 0); ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf); - MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_method, offset_written); + MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_settings, offset_written); size_t column_elems_written = 0; column_to.writePrefix(); @@ -915,7 +915,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( /// A very rough estimate for the compressed data size of each sharded partition. /// Actually it all depends on the properties of the expression for sharding. UInt64 per_shard_size_bytes_compressed = merge_entry->total_size_bytes_compressed / static_cast(job.paths.size()); - auto compression_method = data.context.chooseCompressionMethod( + auto compression_settings = data.context.chooseCompressionSettings( per_shard_size_bytes_compressed, static_cast(per_shard_size_bytes_compressed) / data.getTotalActiveSizeInBytes()); @@ -949,7 +949,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( MergedBlockOutputStreamPtr output_stream; output_stream = std::make_unique( - data, new_part_tmp_path, column_names_and_types, compression_method, merged_column_to_size, aio_threshold); + data, new_part_tmp_path, column_names_and_types, compression_settings, merged_column_to_size, aio_threshold); per_shard_data_parts.emplace(shard_no, std::move(data_part)); per_shard_output.emplace(shard_no, std::move(output_stream)); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index bcaee5f9d43..a1791e68790 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -196,10 +196,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa /// This effectively chooses minimal compression method: /// either default lz4 or compression method with zero thresholds on absolute and relative part size. - auto compression_method = data.context.chooseCompressionMethod(0, 0); + auto compression_settings = data.context.chooseCompressionSettings(0, 0); NamesAndTypesList columns = data.getColumnsList().filter(block.getColumnsList().getNames()); - MergedBlockOutputStream out(data, new_data_part->getFullPath(), columns, compression_method); + MergedBlockOutputStream out(data, new_data_part->getFullPath(), columns, compression_settings); out.writePrefix(); out.writeWithPermutation(block, perm_ptr); diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp index a90f7b2bc17..c0e74d4851d 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -31,13 +31,13 @@ IMergedBlockOutputStream::IMergedBlockOutputStream( MergeTreeData & storage_, size_t min_compress_block_size_, size_t max_compress_block_size_, - CompressionMethod compression_method_, + CompressionSettings compression_settings_, size_t aio_threshold_) : storage(storage_), min_compress_block_size(min_compress_block_size_), max_compress_block_size(max_compress_block_size_), aio_threshold(aio_threshold_), - compression_method(compression_method_) + compression_settings(compression_settings_) { } @@ -69,7 +69,7 @@ void IMergedBlockOutputStream::addStream( path + escaped_column_name, NULL_MAP_EXTENSION, path + escaped_column_name, NULL_MARKS_FILE_EXTENSION, max_compress_block_size, - compression_method, + compression_settings, estimated_size, aio_threshold); @@ -91,7 +91,7 @@ void IMergedBlockOutputStream::addStream( path + escaped_size_name, DATA_FILE_EXTENSION, path + escaped_size_name, MARKS_FILE_EXTENSION, max_compress_block_size, - compression_method, + compression_settings, estimated_size, aio_threshold); } @@ -105,7 +105,7 @@ void IMergedBlockOutputStream::addStream( path + escaped_column_name, DATA_FILE_EXTENSION, path + escaped_column_name, MARKS_FILE_EXTENSION, max_compress_block_size, - compression_method, + compression_settings, estimated_size, aio_threshold); } @@ -269,14 +269,14 @@ IMergedBlockOutputStream::ColumnStream::ColumnStream( const std::string & marks_path, const std::string & marks_file_extension_, size_t max_compress_block_size, - CompressionMethod compression_method, + CompressionSettings compression_settings, size_t estimated_size, size_t aio_threshold) : escaped_column_name(escaped_column_name_), data_file_extension{data_file_extension_}, marks_file_extension{marks_file_extension_}, plain_file(createWriteBufferFromFileBase(data_path + data_file_extension, estimated_size, aio_threshold, max_compress_block_size)), - plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_method), compressed(compressed_buf), + plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_settings), compressed(compressed_buf), marks_file(marks_path + marks_file_extension, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file) { } @@ -315,10 +315,10 @@ MergedBlockOutputStream::MergedBlockOutputStream( MergeTreeData & storage_, String part_path_, const NamesAndTypesList & columns_list_, - CompressionMethod compression_method) + CompressionSettings compression_settings) : IMergedBlockOutputStream( storage_, storage_.context.getSettings().min_compress_block_size, - storage_.context.getSettings().max_compress_block_size, compression_method, + storage_.context.getSettings().max_compress_block_size, compression_settings, storage_.context.getSettings().min_bytes_to_use_direct_io), columns_list(columns_list_), part_path(part_path_) { @@ -331,12 +331,12 @@ MergedBlockOutputStream::MergedBlockOutputStream( MergeTreeData & storage_, String part_path_, const NamesAndTypesList & columns_list_, - CompressionMethod compression_method, + CompressionSettings compression_settings, const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_, size_t aio_threshold_) : IMergedBlockOutputStream( storage_, storage_.context.getSettings().min_compress_block_size, - storage_.context.getSettings().max_compress_block_size, compression_method, + storage_.context.getSettings().max_compress_block_size, compression_settings, aio_threshold_), columns_list(columns_list_), part_path(part_path_) { @@ -556,10 +556,10 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm /// Implementation of MergedColumnOnlyOutputStream. MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( - MergeTreeData & storage_, String part_path_, bool sync_, CompressionMethod compression_method, bool skip_offsets_) + MergeTreeData & storage_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_) : IMergedBlockOutputStream( storage_, storage_.context.getSettings().min_compress_block_size, - storage_.context.getSettings().max_compress_block_size, compression_method, + storage_.context.getSettings().max_compress_block_size, compression_settings, storage_.context.getSettings().min_bytes_to_use_direct_io), part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_) { diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h index e95c6042b4f..869570dfacd 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -20,7 +20,7 @@ public: MergeTreeData & storage_, size_t min_compress_block_size_, size_t max_compress_block_size_, - CompressionMethod compression_method_, + CompressionSettings compression_settings_, size_t aio_threshold_); protected: @@ -35,7 +35,7 @@ protected: const std::string & marks_path, const std::string & marks_file_extension_, size_t max_compress_block_size, - CompressionMethod compression_method, + CompressionSettings compression_settings, size_t estimated_size, size_t aio_threshold); @@ -81,7 +81,7 @@ protected: size_t aio_threshold; - CompressionMethod compression_method; + CompressionSettings compression_settings; private: /// Internal version of writeData. @@ -103,13 +103,13 @@ public: MergeTreeData & storage_, String part_path_, const NamesAndTypesList & columns_list_, - CompressionMethod compression_method); + CompressionSettings compression_settings); MergedBlockOutputStream( MergeTreeData & storage_, String part_path_, const NamesAndTypesList & columns_list_, - CompressionMethod compression_method, + CompressionSettings compression_settings, const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_, size_t aio_threshold_); @@ -158,7 +158,7 @@ class MergedColumnOnlyOutputStream : public IMergedBlockOutputStream { public: MergedColumnOnlyOutputStream( - MergeTreeData & storage_, String part_path_, bool sync_, CompressionMethod compression_method, bool skip_offsets_); + MergeTreeData & storage_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_); void write(const Block & block) override; void writeSuffix() override; diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index 9361121844f..93af0b9a8c4 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -168,7 +168,7 @@ private: { Stream(const std::string & data_path, size_t max_compress_block_size) : plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY), - compressed(plain, CompressionMethod::LZ4, max_compress_block_size) + compressed(plain, CompressionSettings(CompressionMethod::LZ4), max_compress_block_size) { plain_offset = Poco::File(data_path).getSize(); } diff --git a/dbms/src/Storages/StorageStripeLog.cpp b/dbms/src/Storages/StorageStripeLog.cpp index 351daa0e999..3442eed7966 100644 --- a/dbms/src/Storages/StorageStripeLog.cpp +++ b/dbms/src/Storages/StorageStripeLog.cpp @@ -117,7 +117,7 @@ public: explicit StripeLogBlockOutputStream(StorageStripeLog & storage_) : storage(storage_), lock(storage.rwlock), data_out_compressed(storage.full_path() + "data.bin", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT), - data_out(data_out_compressed, CompressionMethod::LZ4, storage.max_compress_block_size), + data_out(data_out_compressed, CompressionSettings(CompressionMethod::LZ4), storage.max_compress_block_size), index_out_compressed(storage.full_path() + "index.mrk", INDEX_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT), index_out(index_out_compressed), block_out(data_out, 0, &index_out, Poco::File(storage.full_path() + "data.bin").getSize()) diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index f0ce7d48281..a7422918b0a 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -126,7 +126,7 @@ private: { Stream(const std::string & data_path, size_t max_compress_block_size) : plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY), - compressed(plain, CompressionMethod::LZ4, max_compress_block_size) + compressed(plain, CompressionSettings(CompressionMethod::LZ4), max_compress_block_size) { }