dbms: CompressionSettingsSelector. [#METR-21516]

This commit is contained in:
Vladimir Chebotarev 2017-10-13 04:02:16 +03:00 committed by alexey-milovidov
parent 4e313f3732
commit 1e0241e262
15 changed files with 101 additions and 70 deletions

View File

@ -59,7 +59,7 @@ void CompressedWriteBuffer::nextImpl()
&compressed_buffer[header_size],
uncompressed_size,
LZ4_COMPRESSBOUND(uncompressed_size),
0);
compression_settings.level);
UInt32 compressed_size_32 = compressed_size;
UInt32 uncompressed_size_32 = uncompressed_size;
@ -83,7 +83,7 @@ void CompressedWriteBuffer::nextImpl()
compressed_buffer.size() - header_size,
working_buffer.begin(),
uncompressed_size,
compression_settings.zstd_level);
compression_settings.level);
if (ZSTD_isError(res))
throw Exception("Cannot compress block with ZSTD: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_COMPRESS);

View File

@ -10,20 +10,43 @@ CompressionSettings::CompressionSettings()
{
}
CompressionSettings::CompressionSettings(CompressionMethod method):
method(method)
{
}
CompressionSettings::CompressionSettings(CompressionMethod method, int zstd_level):
CompressionSettings::CompressionSettings(CompressionMethod method, int level):
method(method),
zstd_level(zstd_level)
level(level)
{
}
CompressionSettings::CompressionSettings(const Settings & settings):
CompressionSettings(settings.network_compression_method, settings.network_zstd_compression_level)
CompressionSettings::CompressionSettings(CompressionMethod method):
CompressionSettings(method, getDefaultLevel(method))
{
}
CompressionSettings::CompressionSettings(const Settings & settings)
{
method = settings.network_compression_method;
switch (method)
{
case CompressionMethod::ZSTD:
level = settings.network_zstd_compression_level;
break;
default:
level = getDefaultLevel(method);
}
}
int CompressionSettings::getDefaultLevel(CompressionMethod method)
{
switch (method)
{
case CompressionMethod::LZ4:
return -1;
case CompressionMethod::LZ4HC:
return 0;
case CompressionMethod::ZSTD:
return 1;
default:
return -1;
}
}
}

View File

@ -11,12 +11,14 @@ class Settings;
struct CompressionSettings
{
CompressionMethod method = CompressionMethod::LZ4;
int zstd_level = 1;
int level;
CompressionSettings();
CompressionSettings(CompressionMethod method);
CompressionSettings(CompressionMethod method, int zstd_level);
CompressionSettings(CompressionMethod method, int level);
CompressionSettings(const Settings & settings);
static int getDefaultLevel(CompressionMethod method);
};
}

View File

@ -16,13 +16,14 @@
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <DataStreams/FormatFactory.h>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Storages/MergeTree/ReshardingWorker.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/CompressionMethodSelector.h>
#include <Storages/CompressionSettingsSelector.h>
#include <Interpreters/Settings.h>
#include <Interpreters/Users.h>
#include <Interpreters/Quota.h>
@ -42,7 +43,6 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Databases/IDatabase.h>
#include <Common/ConfigProcessor.h>
#include <Common/ZooKeeper/ZooKeeper.h>
@ -126,8 +126,8 @@ struct ContextShared
Macros macros; /// Substitutions extracted from config.
std::unique_ptr<Compiler> compiler; /// Used for dynamic compilation of queries' parts if it necessary.
std::shared_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression method, depending on the size of the part.
mutable std::unique_ptr<CompressionMethodSelector> compression_method_selector;
/// Rules for selecting the compression settings, depending on the size of the part.
mutable std::unique_ptr<CompressionSettingsSelector> compression_settings_selector;
std::unique_ptr<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
@ -1384,22 +1384,22 @@ PartLog * Context::getPartLog(const String & database, const String & table)
}
CompressionMethod Context::chooseCompressionMethod(size_t part_size, double part_size_ratio) const
CompressionSettings Context::chooseCompressionSettings(size_t part_size, double part_size_ratio) const
{
auto lock = getLock();
if (!shared->compression_method_selector)
if (!shared->compression_settings_selector)
{
constexpr auto config_name = "compression";
auto & config = getConfigRef();
if (config.has(config_name))
shared->compression_method_selector = std::make_unique<CompressionMethodSelector>(config, "compression");
shared->compression_settings_selector = std::make_unique<CompressionSettingsSelector>(config, "compression");
else
shared->compression_method_selector = std::make_unique<CompressionMethodSelector>();
shared->compression_settings_selector = std::make_unique<CompressionSettingsSelector>();
}
return shared->compression_method_selector->choose(part_size, part_size_ratio);
return shared->compression_settings_selector->choose(part_size, part_size_ratio);
}

View File

@ -11,7 +11,7 @@
#include <Core/NamesAndTypes.h>
#include <Interpreters/Settings.h>
#include <Interpreters/ClientInfo.h>
#include <IO/CompressedStream.h>
#include <IO/CompressionSettings.h>
namespace Poco
@ -328,8 +328,8 @@ public:
void setMaxTableSizeToDrop(size_t max_size);
void checkTableCanBeDropped(const String & database, const String & table, size_t table_size);
/// Lets you select the compression method according to the conditions described in the configuration file.
CompressionMethod chooseCompressionMethod(size_t part_size, double part_size_ratio) const;
/// Lets you select the compression settings according to the conditions described in the configuration file.
CompressionSettings chooseCompressionSettings(size_t part_size, double part_size_ratio) const;
/// Get the server uptime in seconds.
time_t getUptimeSeconds() const;

View File

@ -61,6 +61,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
("block-size,b", boost::program_options::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("hc", "use LZ4HC instead of LZ4")
("zstd", "use ZSTD instead of LZ4")
("level", "compression level")
("none", "use no compression instead of LZ4")
("stat", "print block statistics of compressed data")
;
@ -93,6 +94,8 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
else if (use_none)
method = DB::CompressionMethod::NONE;
DB::CompressionSettings settings(method, options.count("level") > 0 ? options["level"].as<int>() : DB::CompressionSettings::getDefaultLevel(method));
DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO);
DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO);
@ -110,7 +113,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
else
{
/// Compression
DB::CompressedWriteBuffer to(wb, method, block_size);
DB::CompressedWriteBuffer to(wb, settings, block_size);
DB::copyData(rb, to);
}
}

View File

@ -15,7 +15,7 @@ namespace ErrorCodes
}
/** Allows you to select the compression method for the conditions specified in the configuration file.
/** Allows you to select the compression settings for the conditions specified in the configuration file.
* The config looks like this
<compression>
@ -29,6 +29,7 @@ namespace ErrorCodes
<! - Which compression method to choose. ->
<method>zstd</method>
<level>2</level>
</case>
<case>
@ -36,23 +37,23 @@ namespace ErrorCodes
</case>
</compression>
*/
class CompressionMethodSelector
class CompressionSettingsSelector
{
private:
struct Element
{
size_t min_part_size = 0;
double min_part_size_ratio = 0;
CompressionMethod method = CompressionMethod::LZ4;
CompressionSettings settings = CompressionSettings(CompressionMethod::LZ4);
void setMethod(const std::string & name)
static CompressionMethod compressionMethodFromString(const std::string & name)
{
if (name == "lz4")
method = CompressionMethod::LZ4;
return CompressionMethod::LZ4;
else if (name == "zstd")
method = CompressionMethod::ZSTD;
return CompressionMethod::ZSTD;
else if (name == "none")
method = CompressionMethod::NONE;
return CompressionMethod::NONE;
else
throw Exception("Unknown compression method " + name, ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
}
@ -62,7 +63,9 @@ private:
min_part_size = config.getUInt64(config_prefix + ".min_part_size", 0);
min_part_size_ratio = config.getDouble(config_prefix + ".min_part_size_ratio", 0);
setMethod(config.getString(config_prefix + ".method"));
CompressionMethod method = compressionMethodFromString(config.getString(config_prefix + ".method"));
int level = config.getInt64(config_prefix + ".level", CompressionSettings::getDefaultLevel(method));
settings = CompressionSettings(method, level);
}
bool check(size_t part_size, double part_size_ratio) const
@ -75,9 +78,9 @@ private:
std::vector<Element> elements;
public:
CompressionMethodSelector() {} /// Always returns the default method.
CompressionSettingsSelector() {} /// Always returns the default method.
CompressionMethodSelector(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
CompressionSettingsSelector(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
@ -91,13 +94,13 @@ public:
}
}
CompressionMethod choose(size_t part_size, double part_size_ratio) const
CompressionSettings choose(size_t part_size, double part_size_ratio) const
{
CompressionMethod res = CompressionMethod::LZ4;
CompressionSettings res = CompressionSettings(CompressionMethod::LZ4);
for (const auto & element : elements)
if (element.check(part_size, part_size_ratio))
res = element.method;
res = element.settings;
return res;
}

View File

@ -1123,7 +1123,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
*this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, expression->getRequiredColumns(), ranges,
false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
auto compression_method = this->context.chooseCompressionMethod(
auto compression_settings = this->context.chooseCompressionSettings(
part->size_in_bytes,
static_cast<double>(part->size_in_bytes) / this->getTotalActiveSizeInBytes());
ExpressionBlockInputStream in(part_in, expression);
@ -1135,7 +1135,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
* temporary column name ('converting_column_name') created in 'createConvertExpression' method
* will have old name of shared offsets for arrays.
*/
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, compression_method, true /* skip_offsets */);
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, compression_settings, true /* skip_offsets */);
in.readPrefix();
out.writePrefix();

View File

@ -619,12 +619,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
if (deduplicate && merged_stream->isGroupedOutput())
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, Limits(), 0 /*limit_hint*/, Names());
auto compression_method = data.context.chooseCompressionMethod(
auto compression_settings = data.context.chooseCompressionSettings(
merge_entry->total_size_bytes_compressed,
static_cast<double> (merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
MergedBlockOutputStream to{
data, new_part_tmp_path, merging_columns, compression_method, merged_column_to_size, aio_threshold};
data, new_part_tmp_path, merging_columns, compression_settings, merged_column_to_size, aio_threshold};
merged_stream->readPrefix();
to.writePrefix();
@ -702,7 +702,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
rows_sources_read_buf.seek(0, 0);
ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_method, offset_written);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_settings, offset_written);
size_t column_elems_written = 0;
column_to.writePrefix();
@ -915,7 +915,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
/// A very rough estimate for the compressed data size of each sharded partition.
/// Actually it all depends on the properties of the expression for sharding.
UInt64 per_shard_size_bytes_compressed = merge_entry->total_size_bytes_compressed / static_cast<double>(job.paths.size());
auto compression_method = data.context.chooseCompressionMethod(
auto compression_settings = data.context.chooseCompressionSettings(
per_shard_size_bytes_compressed,
static_cast<double>(per_shard_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
@ -949,7 +949,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
MergedBlockOutputStreamPtr output_stream;
output_stream = std::make_unique<MergedBlockOutputStream>(
data, new_part_tmp_path, column_names_and_types, compression_method, merged_column_to_size, aio_threshold);
data, new_part_tmp_path, column_names_and_types, compression_settings, merged_column_to_size, aio_threshold);
per_shard_data_parts.emplace(shard_no, std::move(data_part));
per_shard_output.emplace(shard_no, std::move(output_stream));

View File

@ -196,10 +196,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_method = data.context.chooseCompressionMethod(0, 0);
auto compression_settings = data.context.chooseCompressionSettings(0, 0);
NamesAndTypesList columns = data.getColumnsList().filter(block.getColumnsList().getNames());
MergedBlockOutputStream out(data, new_data_part->getFullPath(), columns, compression_method);
MergedBlockOutputStream out(data, new_data_part->getFullPath(), columns, compression_settings);
out.writePrefix();
out.writeWithPermutation(block, perm_ptr);

View File

@ -31,13 +31,13 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
MergeTreeData & storage_,
size_t min_compress_block_size_,
size_t max_compress_block_size_,
CompressionMethod compression_method_,
CompressionSettings compression_settings_,
size_t aio_threshold_)
: storage(storage_),
min_compress_block_size(min_compress_block_size_),
max_compress_block_size(max_compress_block_size_),
aio_threshold(aio_threshold_),
compression_method(compression_method_)
compression_settings(compression_settings_)
{
}
@ -69,7 +69,7 @@ void IMergedBlockOutputStream::addStream(
path + escaped_column_name, NULL_MAP_EXTENSION,
path + escaped_column_name, NULL_MARKS_FILE_EXTENSION,
max_compress_block_size,
compression_method,
compression_settings,
estimated_size,
aio_threshold);
@ -91,7 +91,7 @@ void IMergedBlockOutputStream::addStream(
path + escaped_size_name, DATA_FILE_EXTENSION,
path + escaped_size_name, MARKS_FILE_EXTENSION,
max_compress_block_size,
compression_method,
compression_settings,
estimated_size,
aio_threshold);
}
@ -105,7 +105,7 @@ void IMergedBlockOutputStream::addStream(
path + escaped_column_name, DATA_FILE_EXTENSION,
path + escaped_column_name, MARKS_FILE_EXTENSION,
max_compress_block_size,
compression_method,
compression_settings,
estimated_size,
aio_threshold);
}
@ -269,14 +269,14 @@ IMergedBlockOutputStream::ColumnStream::ColumnStream(
const std::string & marks_path,
const std::string & marks_file_extension_,
size_t max_compress_block_size,
CompressionMethod compression_method,
CompressionSettings compression_settings,
size_t estimated_size,
size_t aio_threshold) :
escaped_column_name(escaped_column_name_),
data_file_extension{data_file_extension_},
marks_file_extension{marks_file_extension_},
plain_file(createWriteBufferFromFileBase(data_path + data_file_extension, estimated_size, aio_threshold, max_compress_block_size)),
plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_method), compressed(compressed_buf),
plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_settings), compressed(compressed_buf),
marks_file(marks_path + marks_file_extension, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file)
{
}
@ -315,10 +315,10 @@ MergedBlockOutputStream::MergedBlockOutputStream(
MergeTreeData & storage_,
String part_path_,
const NamesAndTypesList & columns_list_,
CompressionMethod compression_method)
CompressionSettings compression_settings)
: IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_method,
storage_.context.getSettings().max_compress_block_size, compression_settings,
storage_.context.getSettings().min_bytes_to_use_direct_io),
columns_list(columns_list_), part_path(part_path_)
{
@ -331,12 +331,12 @@ MergedBlockOutputStream::MergedBlockOutputStream(
MergeTreeData & storage_,
String part_path_,
const NamesAndTypesList & columns_list_,
CompressionMethod compression_method,
CompressionSettings compression_settings,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_,
size_t aio_threshold_)
: IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_method,
storage_.context.getSettings().max_compress_block_size, compression_settings,
aio_threshold_),
columns_list(columns_list_), part_path(part_path_)
{
@ -556,10 +556,10 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
/// Implementation of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
MergeTreeData & storage_, String part_path_, bool sync_, CompressionMethod compression_method, bool skip_offsets_)
MergeTreeData & storage_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_)
: IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_method,
storage_.context.getSettings().max_compress_block_size, compression_settings,
storage_.context.getSettings().min_bytes_to_use_direct_io),
part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_)
{

View File

@ -20,7 +20,7 @@ public:
MergeTreeData & storage_,
size_t min_compress_block_size_,
size_t max_compress_block_size_,
CompressionMethod compression_method_,
CompressionSettings compression_settings_,
size_t aio_threshold_);
protected:
@ -35,7 +35,7 @@ protected:
const std::string & marks_path,
const std::string & marks_file_extension_,
size_t max_compress_block_size,
CompressionMethod compression_method,
CompressionSettings compression_settings,
size_t estimated_size,
size_t aio_threshold);
@ -81,7 +81,7 @@ protected:
size_t aio_threshold;
CompressionMethod compression_method;
CompressionSettings compression_settings;
private:
/// Internal version of writeData.
@ -103,13 +103,13 @@ public:
MergeTreeData & storage_,
String part_path_,
const NamesAndTypesList & columns_list_,
CompressionMethod compression_method);
CompressionSettings compression_settings);
MergedBlockOutputStream(
MergeTreeData & storage_,
String part_path_,
const NamesAndTypesList & columns_list_,
CompressionMethod compression_method,
CompressionSettings compression_settings,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_,
size_t aio_threshold_);
@ -158,7 +158,7 @@ class MergedColumnOnlyOutputStream : public IMergedBlockOutputStream
{
public:
MergedColumnOnlyOutputStream(
MergeTreeData & storage_, String part_path_, bool sync_, CompressionMethod compression_method, bool skip_offsets_);
MergeTreeData & storage_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_);
void write(const Block & block) override;
void writeSuffix() override;

View File

@ -168,7 +168,7 @@ private:
{
Stream(const std::string & data_path, size_t max_compress_block_size) :
plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain, CompressionMethod::LZ4, max_compress_block_size)
compressed(plain, CompressionSettings(CompressionMethod::LZ4), max_compress_block_size)
{
plain_offset = Poco::File(data_path).getSize();
}

View File

@ -117,7 +117,7 @@ public:
explicit StripeLogBlockOutputStream(StorageStripeLog & storage_)
: storage(storage_), lock(storage.rwlock),
data_out_compressed(storage.full_path() + "data.bin", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT),
data_out(data_out_compressed, CompressionMethod::LZ4, storage.max_compress_block_size),
data_out(data_out_compressed, CompressionSettings(CompressionMethod::LZ4), storage.max_compress_block_size),
index_out_compressed(storage.full_path() + "index.mrk", INDEX_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT),
index_out(index_out_compressed),
block_out(data_out, 0, &index_out, Poco::File(storage.full_path() + "data.bin").getSize())

View File

@ -126,7 +126,7 @@ private:
{
Stream(const std::string & data_path, size_t max_compress_block_size) :
plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain, CompressionMethod::LZ4, max_compress_block_size)
compressed(plain, CompressionSettings(CompressionMethod::LZ4), max_compress_block_size)
{
}