Better interface

This commit is contained in:
alesapin 2020-08-28 12:07:20 +03:00
parent 32db38b4d2
commit 77faf9587f
14 changed files with 184 additions and 66 deletions

View File

@ -106,6 +106,15 @@ void CompressionCodecMultiple::doDecompressData(const char * source, UInt32 sour
memcpy(dest, compressed_buf.data(), decompressed_size);
}
std::vector<uint8_t> CompressionCodecMultiple::getCodecsBytesFromData(const char * source)
{
std::vector<uint8_t> result;
uint8_t compression_methods_size = source[0];
for (size_t i = 0; i < compression_methods_size; ++i)
result.push_back(source[1 + i]);
return result;
}
bool CompressionCodecMultiple::isCompression() const
{
for (const auto & codec : codecs)

View File

@ -17,6 +17,8 @@ public:
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
static std::vector<uint8_t> getCodecsBytesFromData(const char * source);
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;

View File

@ -0,0 +1,43 @@
#include <Compression/getCompressionCodecForFile.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Compression/CompressionCodecMultiple.h>
#include <Common/PODArray.h>
#include <common/logger_useful.h>
#include <Common/UInt128.h>
namespace DB
{
using Checksum = CityHash_v1_0_2::uint128;
CompressionCodecPtr getCompressionCodecForFile(const DiskPtr & disk, const String & relative_path)
{
auto read_buffer = disk->readFile(relative_path);
read_buffer->ignore(sizeof(Checksum));
UInt8 header_size = ICompressionCodec::getHeaderSize();
PODArray<char> compressed_buffer;
compressed_buffer.resize(header_size);
read_buffer->readStrict(compressed_buffer.data(), header_size);
uint8_t method = ICompressionCodec::readMethod(compressed_buffer.data());
if (method == static_cast<uint8_t>(CompressionMethodByte::Multiple))
{
compressed_buffer.resize(1);
read_buffer->readStrict(compressed_buffer.data(), 1);
compressed_buffer.resize(1 + compressed_buffer[0]);
read_buffer->readStrict(compressed_buffer.data() + 1, compressed_buffer[0]);
auto codecs_bytes = CompressionCodecMultiple::getCodecsBytesFromData(compressed_buffer.data());
Codecs codecs;
for (auto byte : codecs_bytes)
codecs.push_back(CompressionCodecFactory::instance().get(byte));
return std::make_shared<CompressionCodecMultiple>(codecs);
}
return CompressionCodecFactory::instance().get(method);
}
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <Compression/ICompressionCodec.h>
#include <Disks/IDisk.h>
namespace DB
{
/// Return compression codec with default parameters for file compressed in
/// clickhouse fashion (with checksums, headers for each block, etc). This
/// method should be used as fallback when we cannot deduce compression codec
/// from metadata.
CompressionCodecPtr getCompressionCodecForFile(const DiskPtr & disk, const String & relative_path);
}

View File

@ -29,6 +29,7 @@ SRCS(
CompressionFactory.cpp
ICompressionCodec.cpp
LZ4_decompress_faster.cpp
getCompressionCodecForFile.cpp
)

View File

@ -404,6 +404,16 @@ std::optional<ColumnDefault> ColumnsDescription::getDefault(const String & colum
}
bool ColumnsDescription::hasCompressionCodec(const String & column_name) const
{
const auto it = columns.get<1>().find(column_name);
if (it == columns.get<1>().end() || !it->codec)
return false;
return true;
}
CompressionCodecPtr ColumnsDescription::getCodecOrDefault(const String & column_name, CompressionCodecPtr default_codec) const
{
const auto it = columns.get<1>().find(column_name);

View File

@ -111,6 +111,8 @@ public:
bool hasDefault(const String & column_name) const;
std::optional<ColumnDefault> getDefault(const String & column_name) const;
/// Does column has non default specified compression codec
bool hasCompressionCodec(const String & column_name) const;
CompressionCodecPtr getCodecOrDefault(const String & column_name, CompressionCodecPtr default_codec) const;
CompressionCodecPtr getCodecOrDefault(const String & column_name) const;

View File

@ -316,7 +316,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
new_data_part->minmax_idx.update(block, data.minmax_idx_columns);
new_data_part->partition.create(metadata_snapshot, block, 0);
MergedBlockOutputStream part_out(new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, nullptr);
MergedBlockOutputStream part_out(new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
part_out.writePrefix();
part_out.write(block);
part_out.writeSuffixAndFinalizePart(new_data_part);
@ -401,8 +401,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
new_data_part->is_temp = true;
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);
if (!new_data_part->loadDefaultCompressionCodec())
new_data_part->detectAndSetDefaultCompressionCodec(data.getTotalActiveSizeInBytes());
new_data_part->checksums.checkEqual(checksums, false);
return new_data_part;

View File

@ -13,6 +13,8 @@
#include <Common/escapeForFileName.h>
#include <common/JSON.h>
#include <common/logger_useful.h>
#include <Compression/getCompressionCodecForFile.h>
#include <Parsers/queryToString.h>
namespace DB
{
@ -416,6 +418,8 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
loadTTLInfos();
if (check_consistency)
checkConsistency(require_columns_checksums);
loadDefaultCompressionCodec();
}
void IMergeTreeDataPart::loadIndexGranularity()
@ -482,56 +486,73 @@ NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
return result;
}
bool IMergeTreeDataPart::loadDefaultCompressionCodec()
void IMergeTreeDataPart::loadDefaultCompressionCodec()
{
/// In memory parts doesn't have any compression
if (!isStoredOnDisk())
{
default_codec = CompressionCodecFactory::instance().get("NONE", {});
return true;
return;
}
String path = getFullRelativePath() + DEFAULT_COMPRESSION_CODEC_FILE_NAME;
if (!volume->getDisk()->exists(path))
return false;
auto file_buf = openForReading(volume->getDisk(), path);
String codec_line;
readEscapedStringUntilEOL(codec_line, *file_buf);
ReadBufferFromString buf(codec_line);
if (!checkString("CODEC", buf))
{
LOG_WARNING(storage.log, "Cannot parse default codec for part {} from file {}, content '{}'. Default compression codec will be deduced automatically, from compression section in config.xml.", name, path, codec_line);
return false;
default_codec = detectDefaultCompressionCodec();
}
else
{
try
{
ParserCodec codec_parser;
auto codec_ast = parseQuery(codec_parser, codec_line.data() + buf.getPosition(), codec_line.data() + codec_line.length(), "codec parser", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
default_codec = CompressionCodecFactory::instance().get(codec_ast, {});
return true;
}
catch (const DB::Exception & ex)
{
LOG_WARNING(storage.log, "Cannot parse default codec for part {} from file {}, content '{}', error '{}'. Default compression codec will be deduced automatically, from compression section in config.xml.", name, path, codec_line, ex.what());
return false;
auto file_buf = openForReading(volume->getDisk(), path);
String codec_line;
readEscapedStringUntilEOL(codec_line, *file_buf);
ReadBufferFromString buf(codec_line);
if (!checkString("CODEC", buf))
{
LOG_WARNING(storage.log, "Cannot parse default codec for part {} from file {}, content '{}'. Default compression codec will be deduced automatically, from data on disk", name, path, codec_line);
default_codec = detectDefaultCompressionCodec();
}
try
{
ParserCodec codec_parser;
auto codec_ast = parseQuery(codec_parser, codec_line.data() + buf.getPosition(), codec_line.data() + codec_line.length(), "codec parser", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
default_codec = CompressionCodecFactory::instance().get(codec_ast, {});
}
catch (const DB::Exception & ex)
{
LOG_WARNING(storage.log, "Cannot parse default codec for part {} from file {}, content '{}', error '{}'. Default compression codec will be deduced automatically, from data on disk.", name, path, codec_line, ex.what());
default_codec = detectDefaultCompressionCodec();
}
}
}
void IMergeTreeDataPart::detectAndSetDefaultCompressionCodec(size_t total_storage_size)
CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
{
/// In memory parts doesn't have any compression
if (isStoredOnDisk())
default_codec
= storage.global_context.chooseCompressionCodec(getBytesOnDisk(),
static_cast<double>(getBytesOnDisk()) / total_storage_size);
else
default_codec = CompressionCodecFactory::instance().get("NONE", {});
if (!isStoredOnDisk())
return CompressionCodecFactory::instance().get("NONE", {});
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
const auto & storage_columns = metadata_snapshot->getColumns();
CompressionCodecPtr result = nullptr;
for (const auto & part_column : columns)
{
/// It was compressed with default codec
if (!storage_columns.hasCompressionCodec(part_column.name))
{
result = getCompressionCodecForFile(volume->getDisk(), getFullRelativePath() + getFileNameForColumn(part_column) + ".bin");
break;
}
}
if (!result)
result = CompressionCodecFactory::instance().getDefaultCodec();
return result;
}
void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()

View File

@ -332,19 +332,11 @@ public:
String getRelativePathForPrefix(const String & prefix) const;
/// Detect default codec for part based on compression section from
/// config.xml (require total size of all parts in table)
void detectAndSetDefaultCompressionCodec(size_t total_storage_size);
/// Return set of metadat file names without checksums. For example,
/// columns.txt or checksums.txt itself.
NameSet getFileNamesWithoutChecksums() const;
/// Load default compression codec from file default_compression_codec.txt
/// return false if load was not successful (for example file doesn't
/// exists)
bool loadDefaultCompressionCodec();
/// File with compression codec name which was used to compress part columns
/// by default. Some columns may have their own compression codecs, but
/// default will be stored in this file.
@ -403,6 +395,15 @@ private:
void loadTTLInfos();
void loadPartitionAndMinMaxIndex();
/// Load default compression codec from file default_compression_codec.txt
/// if it not exists tries to deduce codec from compressed column without
/// any specifial compression.
void loadDefaultCompressionCodec();
/// Found column without specific compression and return codec
/// for this column with default parameters.
CompressionCodecPtr detectDefaultCompressionCodec() const;
};
using MergeTreeDataPartState = IMergeTreeDataPart::State;

View File

@ -751,12 +751,10 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
DataPartsVector broken_parts_to_remove;
DataPartsVector broken_parts_to_detach;
MutableDataPartsVector parts_without_default_compression;
size_t suspicious_broken_parts = 0;
std::atomic<bool> has_adaptive_parts = false;
std::atomic<bool> has_non_adaptive_parts = false;
std::atomic<size_t> total_active_parts_size = 0;
ThreadPool pool(num_threads);
@ -789,13 +787,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
try
{
part->loadColumnsChecksumsIndexes(require_part_metadata, true);
/// If it was not successful we will try to get default
/// compression from config.xml
if (!part->loadDefaultCompressionCodec())
{
std::lock_guard loading_lock(mutex);
parts_without_default_compression.push_back(part);
}
}
catch (const Exception & e)
{
@ -878,7 +869,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
std::lock_guard loading_lock(mutex);
if (!data_parts_indexes.insert(part).second)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
total_active_parts_size += part->getBytesOnDisk();
});
}
@ -897,11 +887,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
}
/// Deduce codec based on part size and total parts size with rules from
/// config.xml
for (auto & part : parts_without_default_compression)
part->detectAndSetDefaultCompressionCodec(total_active_parts_size);
if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts)
throw Exception("Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", ErrorCodes::LOGICAL_ERROR);
@ -2433,14 +2418,12 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
}
static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part, size_t total_active_parts_size)
static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part)
{
auto disk = part->volume->getDisk();
String full_part_path = part->getFullRelativePath();
part->loadColumnsChecksumsIndexes(false, true);
if (!part->loadDefaultCompressionCodec())
part->detectAndSetDefaultCompressionCodec(total_active_parts_size);
part->modification_time = disk->getLastModified(full_part_path).epochTime();
}
@ -2915,13 +2898,12 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
MutableDataPartsVector loaded_parts;
loaded_parts.reserve(renamed_parts.old_and_new_names.size());
size_t total_active_parts_size = getTotalActiveSizeInBytes();
for (const auto & part_names : renamed_parts.old_and_new_names)
{
LOG_DEBUG(log, "Checking part {}", part_names.second);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_names.first, name_to_disk[part_names.first]);
MutableDataPartPtr part = createPart(part_names.first, single_disk_volume, source_dir + part_names.second);
loadPartAndFixMetadataImpl(part, total_active_parts_size);
loadPartAndFixMetadataImpl(part);
loaded_parts.push_back(part);
}
@ -3291,7 +3273,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
dst_data_part->is_temp = true;
dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true);
dst_data_part->default_codec = src_part->default_codec;
dst_data_part->modification_time = disk->getLastModified(dst_part_path).epochTime();
return dst_data_part;
}

View File

@ -122,7 +122,8 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk());
new_part->index_granularity = writer->getIndexGranularity();
new_part->calculateColumnsSizesOnDisk();
new_part->default_codec = default_codec;
if (default_codec != nullptr)
new_part->default_codec = default_codec;
}
void MergedBlockOutputStream::finalizePartOnDisk(
@ -165,10 +166,16 @@ void MergedBlockOutputStream::finalizePartOnDisk(
part_columns.writeText(*out);
}
if (default_codec != nullptr)
{
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out);
}
else
{
throw Exception("Compression codec have to be specified for part on disk, empty for" + new_part->name
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
}
{
/// Write file with checksums.

View File

@ -8,6 +8,7 @@ cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/default_compression.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/default_compression.xml'], with_zookeeper=True)
node3 = cluster.add_instance('node3', main_configs=['configs/default_compression.xml'], image='yandex/clickhouse-server:20.3.16', stay_alive=True, with_installed_binary=True)
@pytest.fixture(scope="module")
@ -172,3 +173,28 @@ def test_default_codec_multiple(start_cluster):
assert node1.query("SELECT COUNT() FROM compression_table_multiple") == "3\n"
assert node2.query("SELECT COUNT() FROM compression_table_multiple") == "3\n"
def test_default_codec_version_update(start_cluster):
node3.query("""
CREATE TABLE compression_table (
key UInt64 CODEC(LZ4HC(7)),
data1 String
) ENGINE = MergeTree ORDER BY tuple() PARTITION BY key;
""")
node3.query("INSERT INTO compression_table VALUES (1, 'x')")
node3.query("INSERT INTO compression_table VALUES (2, '{}')".format(get_random_string(2048)))
node3.query("INSERT INTO compression_table VALUES (3, '{}')".format(get_random_string(22048)))
node3.restart_with_latest_version()
assert node3.query("SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_1_1_0'") == "ZSTD(1)\n"
assert node3.query("SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_2_2_0'") == "ZSTD(1)\n"
assert node3.query("SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_3_3_0'") == "ZSTD(1)\n"
node3.query("OPTIMIZE TABLE compression_table FINAL")
assert node3.query("SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_1_1_1'") == "ZSTD(10)\n"
assert node3.query("SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_2_2_1'") == "LZ4HC(5)\n"
assert node3.query("SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_3_3_1'") == "LZ4\n"

View File

@ -84,6 +84,8 @@ def partition_complex_assert_checksums():
"77d5af402ada101574f4da114f242e02\tshadow/1/data/test/partition/19700201_1_1_0/columns.txt\n" \
"88cdc31ded355e7572d68d8cde525d3a\tshadow/1/data/test/partition/19700201_1_1_0/p.bin\n" \
"9e688c58a5487b8eaf69c9e1005ad0bf\tshadow/1/data/test/partition/19700102_2_2_0/primary.idx\n" \
"c0904274faa8f3f06f35666cc9c5bd2f\tshadow/1/data/test/partition/19700102_2_2_0/default_compression_codec.txt\n" \
"c0904274faa8f3f06f35666cc9c5bd2f\tshadow/1/data/test/partition/19700201_1_1_0/default_compression_codec.txt\n" \
"c4ca4238a0b923820dcc509a6f75849b\tshadow/1/data/test/partition/19700102_2_2_0/count.txt\n" \
"c4ca4238a0b923820dcc509a6f75849b\tshadow/1/data/test/partition/19700201_1_1_0/count.txt\n" \
"cfcb770c3ecd0990dcceb1bde129e6c6\tshadow/1/data/test/partition/19700102_2_2_0/p.bin\n" \