Merge pull request #40715 from ClickHouse/compress-marks

Merging #37693: Compress marks and primary key
This commit is contained in:
Alexey Milovidov 2022-09-22 07:30:34 +03:00 committed by GitHub
commit 2a75e025f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 565 additions and 206 deletions

View File

@ -344,7 +344,7 @@ struct PartRangesReadInfo
sum_marks_in_parts[i] = parts[i].getMarksCount();
sum_marks += sum_marks_in_parts[i];
if (parts[i].data_part->index_granularity_info.is_adaptive)
if (parts[i].data_part->index_granularity_info.mark_type.adaptive)
++adaptive_parts;
}

View File

@ -67,6 +67,7 @@ namespace ErrorCodes
extern const int NO_SUCH_COLUMN_IN_TABLE;
}
void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const PartMetadataManagerPtr & manager)
{
auto metadata_snapshot = data.getInMemoryMetadataPtr();
@ -319,6 +320,7 @@ IMergeTreeDataPart::IMergeTreeDataPart(
minmax_idx = std::make_shared<MinMaxIndex>();
initializeIndexGranularityInfo();
initializePartMetadataManager();
}
@ -345,6 +347,7 @@ IMergeTreeDataPart::IMergeTreeDataPart(
minmax_idx = std::make_shared<MinMaxIndex>();
initializeIndexGranularityInfo();
initializePartMetadataManager();
}
@ -738,7 +741,7 @@ void IMergeTreeDataPart::loadIndex()
loaded_index[i]->reserve(index_granularity.getMarksCount());
}
String index_name = "primary.idx";
String index_name = "primary" + getIndexExtensionFromFilesystem(data_part_storage).value();
String index_path = fs::path(data_part_storage->getRelativePath()) / index_name;
auto index_file = metadata_manager->read(index_name);
size_t marks_count = index_granularity.getMarksCount();
@ -777,7 +780,10 @@ void IMergeTreeDataPart::appendFilesOfIndex(Strings & files) const
return;
if (metadata_snapshot->hasPrimaryKey())
files.push_back("primary.idx");
{
String index_name = "primary" + getIndexExtensionFromFilesystem(data_part_storage).value();
files.push_back(index_name);
}
}
NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
@ -1428,6 +1434,15 @@ void IMergeTreeDataPart::initializePartMetadataManager()
#endif
}
void IMergeTreeDataPart::initializeIndexGranularityInfo()
{
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(data_part_storage);
if (mrk_ext)
index_granularity_info = MergeTreeIndexGranularityInfo(storage, MarkType{*mrk_ext});
else
index_granularity_info = MergeTreeIndexGranularityInfo(storage, part_type);
}
void IMergeTreeDataPart::remove() const
{
assert(assertHasValidVersionMetadata());
@ -1536,8 +1551,11 @@ void IMergeTreeDataPart::checkConsistencyBase() const
const auto & partition_key = metadata_snapshot->getPartitionKey();
if (!checksums.empty())
{
if (!pk.column_names.empty() && !checksums.files.contains("primary.idx"))
throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
if (!pk.column_names.empty()
&& (!checksums.files.contains("primary" + getIndexExtension(false))
&& !checksums.files.contains("primary" + getIndexExtension(true))))
throw Exception("No checksum for " + toString("primary" + getIndexExtension(false)) + " or " + toString("primary" + getIndexExtension(true)),
ErrorCodes::NO_FILE_IN_DATA_PART);
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
@ -1575,7 +1593,10 @@ void IMergeTreeDataPart::checkConsistencyBase() const
/// Check that the primary key index is not empty.
if (!pk.column_names.empty())
check_file_not_empty("primary.idx");
{
String index_name = "primary" + getIndexExtensionFromFilesystem(data_part_storage).value();
check_file_not_empty(index_name);
}
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
@ -1628,7 +1649,7 @@ void IMergeTreeDataPart::calculateSecondaryIndicesSizesOnDisk()
auto index_name_escaped = escapeForFileName(index_name);
auto index_file_name = index_name_escaped + index_ptr->getSerializedFileExtension();
auto index_marks_file_name = index_name_escaped + index_granularity_info.marks_file_extension;
auto index_marks_file_name = index_name_escaped + getMarksFileExtension();
/// If part does not contain index
auto bin_checksum = checksums.files.find(index_file_name);
@ -1788,4 +1809,24 @@ bool isInMemoryPart(const MergeTreeDataPartPtr & data_part)
return (data_part && data_part->getType() == MergeTreeDataPartType::InMemory);
}
std::optional<std::string> getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage)
{
if (data_part_storage->exists())
{
for (auto it = data_part_storage->iterate(); it->isValid(); it->next())
{
const auto & extension = fs::path(it->name()).extension();
if (extension == getIndexExtension(false)
|| extension == getIndexExtension(true))
return extension;
}
}
return {".idx"};
}
bool isCompressedFromIndexExtension(const String & index_extension)
{
return index_extension == getIndexExtension(true);
}
}

View File

@ -23,6 +23,7 @@
#include <shared_mutex>
namespace zkutil
{
class ZooKeeper;
@ -158,7 +159,7 @@ public:
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
void appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection = false) const;
String getMarksFileExtension() const { return index_granularity_info.marks_file_extension; }
String getMarksFileExtension() const { return index_granularity_info.mark_type.getFileExtension(); }
/// Generate the new name for this part according to `new_part_info` and min/max dates from the old name.
/// This is useful when you want to change e.g. block numbers or the mutation version of the part.
@ -500,6 +501,7 @@ protected:
void initializePartMetadataManager();
void initializeIndexGranularityInfo();
private:
/// In compact parts order of columns is necessary
@ -582,5 +584,8 @@ using MergeTreeMutableDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;
bool isCompactPart(const MergeTreeDataPartPtr & data_part);
bool isWidePart(const MergeTreeDataPartPtr & data_part);
bool isInMemoryPart(const MergeTreeDataPartPtr & data_part);
inline String getIndexExtension(bool is_compressed_primary_key) { return is_compressed_primary_key ? ".cidx" : ".idx"; }
std::optional<String> getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);
bool isCompressedFromIndexExtension(const String & index_extension);
}

View File

@ -106,6 +106,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
if (remove_it != columns.end())
columns.erase(remove_it);
}
return remove_files;
}

View File

@ -3,9 +3,19 @@
#include <Disks/IVolume.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
{
IPartMetadataManager::IPartMetadataManager(const IMergeTreeDataPart * part_) : part(part_)
{
}
bool IPartMetadataManager::isCompressedFromFileName(const String & file_name)
{
std::string extension = fs::path(file_name).extension();
return (MarkType::isMarkFileExtension(extension) && MarkType(extension).compressed)
|| isCompressedFromIndexExtension(extension);
}
}

View File

@ -9,6 +9,7 @@ namespace DB
class IMergeTreeDataPart;
class ReadBuffer;
class SeekableReadBuffer;
class IDisk;
@ -29,8 +30,8 @@ public:
virtual ~IPartMetadataManager() = default;
/// Read metadata content and return SeekableReadBuffer object.
virtual std::unique_ptr<SeekableReadBuffer> read(const String & file_name) const = 0;
/// Read metadata content and return ReadBuffer object.
virtual std::unique_ptr<ReadBuffer> read(const String & file_name) const = 0;
/// Return true if metadata exists in part.
virtual bool exists(const String & file_name) const = 0;
@ -50,6 +51,9 @@ public:
/// Check all metadatas in part.
virtual std::unordered_map<String, uint128> check() const = 0;
/// Determine whether to compress by file extension
static bool isCompressedFromFileName(const String & file_name);
protected:
const IMergeTreeDataPart * part;
};

View File

@ -157,6 +157,7 @@ namespace ErrorCodes
extern const int ZERO_COPY_REPLICATION_ERROR;
}
static void checkSampleExpression(const StorageInMemoryMetadata & metadata, bool allow_sampling_expression_not_in_primary_key, bool check_sample_column_is_correct)
{
if (metadata.sampling_key.column_names.empty())
@ -1088,7 +1089,7 @@ void MergeTreeData::loadDataPartsFromDisk(
suspicious_broken_parts_bytes += *size_of_part;
return;
}
if (!part->index_granularity_info.is_adaptive)
if (!part->index_granularity_info.mark_type.adaptive)
has_non_adaptive_parts.store(true, std::memory_order_relaxed);
else
has_adaptive_parts.store(true, std::memory_order_relaxed);
@ -2726,18 +2727,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
throw Exception("Unknown type of part " + data_part_storage->getRelativePath(), ErrorCodes::UNKNOWN_PART_TYPE);
}
static MergeTreeDataPartType getPartTypeFromMarkExtension(const String & mrk_ext)
{
if (mrk_ext == getNonAdaptiveMrkExtension())
return MergeTreeDataPartType::Wide;
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide))
return MergeTreeDataPartType::Wide;
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact))
return MergeTreeDataPartType::Compact;
throw Exception("Can't determine part type, because of unknown mark extension " + mrk_ext, ErrorCodes::UNKNOWN_PART_TYPE);
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
const String & name, const DataPartStoragePtr & data_part_storage, const IMergeTreeDataPart * parent_part) const
{
@ -2752,7 +2741,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(data_part_storage);
if (mrk_ext)
type = getPartTypeFromMarkExtension(*mrk_ext);
{
type = MarkType(*mrk_ext).part_type;
}
else
{
/// Didn't find any mark file, suppose that part is empty.
@ -6462,9 +6453,9 @@ bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
if (!settings->enable_mixed_granularity_parts || settings->index_granularity_bytes == 0)
{
if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.is_adaptive)
if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.mark_type.adaptive)
return false;
if (canUseAdaptiveGranularity() && !src_part->index_granularity_info.is_adaptive)
if (canUseAdaptiveGranularity() && !src_part->index_granularity_info.mark_type.adaptive)
return false;
}
return true;

View File

@ -75,7 +75,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
return std::make_unique<MergeTreeDataPartWriterCompact>(
shared_from_this(), std::move(data_part_storage_builder), ordered_columns_list, metadata_snapshot,
indices_to_recalc, index_granularity_info.marks_file_extension,
indices_to_recalc, getMarksFileExtension(),
default_codec_, writer_settings, computed_index_granularity);
}
@ -89,7 +89,7 @@ void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*eac
total_size.data_uncompressed += bin_checksum->second.uncompressed_size;
}
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension);
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + getMarksFileExtension());
if (mrk_checksum != checksums.files.end())
total_size.marks += mrk_checksum->second.file_size;
}
@ -98,7 +98,7 @@ void MergeTreeDataPartCompact::loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, const MergeTreeIndexGranularityInfo & index_granularity_info_,
size_t columns_count, const DataPartStoragePtr & data_part_storage_)
{
if (!index_granularity_info_.is_adaptive)
if (!index_granularity_info_.mark_type.adaptive)
throw Exception("MergeTreeDataPartCompact cannot be created with non-adaptive granulary.", ErrorCodes::NOT_IMPLEMENTED);
auto marks_file_path = index_granularity_info_.getMarksFilePath("data");
@ -140,7 +140,7 @@ bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) co
return false;
auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION);
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension);
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + getMarksFileExtension());
return (bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end());
}
@ -148,7 +148,7 @@ bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) co
void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) const
{
checkConsistencyBase();
String mrk_file_name = DATA_FILE_NAME + index_granularity_info.marks_file_extension;
String mrk_file_name = DATA_FILE_NAME + getMarksFileExtension();
if (!checksums.empty())
{

View File

@ -12,7 +12,6 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_READ_ALL_DATA;
extern const int NO_FILE_IN_DATA_PART;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int LOGICAL_ERROR;
@ -68,7 +67,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter(
return std::make_unique<MergeTreeDataPartWriterWide>(
shared_from_this(), data_part_storage_builder,
columns_list, metadata_snapshot, indices_to_recalc,
index_granularity_info.marks_file_extension,
getMarksFileExtension(),
default_codec_, writer_settings, computed_index_granularity);
}
@ -96,7 +95,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
size.data_uncompressed += bin_checksum->second.uncompressed_size;
}
auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension);
auto mrk_checksum = checksums.files.find(file_name + getMarksFileExtension());
if (mrk_checksum != checksums.files.end())
size.marks += mrk_checksum->second.file_size;
});
@ -119,26 +118,41 @@ void MergeTreeDataPartWide::loadIndexGranularityImpl(
size_t marks_file_size = data_part_storage_->getFileSize(marks_file_path);
if (!index_granularity_info_.is_adaptive)
if (!index_granularity_info_.mark_type.adaptive && !index_granularity_info_.mark_type.compressed)
{
/// The most easy way - no need to read the file, everything is known from its size.
size_t marks_count = marks_file_size / index_granularity_info_.getMarkSizeInBytes();
index_granularity_.resizeWithFixedGranularity(marks_count, index_granularity_info_.fixed_index_granularity); /// all the same
}
else
{
auto buffer = data_part_storage_->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
while (!buffer->eof())
auto marks_file = data_part_storage_->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
std::unique_ptr<ReadBuffer> marks_reader;
if (!index_granularity_info_.mark_type.compressed)
marks_reader = std::move(marks_file);
else
marks_reader = std::make_unique<CompressedReadBufferFromFile>(std::move(marks_file));
size_t marks_count = 0;
while (!marks_reader->eof())
{
buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block
MarkInCompressedFile mark;
size_t granularity;
readIntBinary(granularity, *buffer);
index_granularity_.appendMark(granularity);
readBinary(mark.offset_in_compressed_file, *marks_reader);
readBinary(mark.offset_in_decompressed_block, *marks_reader);
++marks_count;
if (index_granularity_info_.mark_type.adaptive)
{
readIntBinary(granularity, *marks_reader);
index_granularity_.appendMark(granularity);
}
}
if (index_granularity_.getMarksCount() * index_granularity_info_.getMarkSizeInBytes() != marks_file_size)
throw Exception(
ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all marks from file {}",
std::string(fs::path(data_part_storage_->getFullPath()) / marks_file_path));
if (!index_granularity_info_.mark_type.adaptive)
index_granularity_.resizeWithFixedGranularity(marks_count, index_granularity_info_.fixed_index_granularity); /// all the same
}
index_granularity_.setInitialized();
@ -152,6 +166,7 @@ void MergeTreeDataPartWide::loadIndexGranularity()
loadIndexGranularityImpl(index_granularity, index_granularity_info, data_part_storage, getFileNameForColumn(columns.front()));
}
bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const
{
return data_part_storage->isStoredOnRemoteDisk();
@ -170,7 +185,7 @@ MergeTreeDataPartWide::~MergeTreeDataPartWide()
void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
{
checkConsistencyBase();
//String path = getRelativePath();
std::string marks_file_extension = index_granularity_info.mark_type.getFileExtension();
if (!checksums.empty())
{
@ -181,7 +196,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
String file_name = ISerialization::getFileNameForStream(name_type, substream_path);
String mrk_file_name = file_name + index_granularity_info.marks_file_extension;
String mrk_file_name = file_name + marks_file_extension;
String bin_file_name = file_name + DATA_FILE_EXTENSION;
if (!checksums.files.contains(mrk_file_name))
@ -207,7 +222,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
{
getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + index_granularity_info.marks_file_extension;
auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + marks_file_extension;
/// Missing file is Ok for case when new column was added.
if (data_part_storage->exists(file_path))
@ -235,10 +250,11 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const
{
auto check_stream_exists = [this](const String & stream_name)
std::string marks_file_extension = index_granularity_info.mark_type.getFileExtension();
auto check_stream_exists = [this, &marks_file_extension](const String & stream_name)
{
auto bin_checksum = checksums.files.find(stream_name + DATA_FILE_EXTENSION);
auto mrk_checksum = checksums.files.find(stream_name + index_granularity_info.marks_file_extension);
auto mrk_checksum = checksums.files.find(stream_name + marks_file_extension);
return bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end();
};

View File

@ -27,12 +27,24 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
settings.max_compress_block_size,
settings_.query_write_settings))
, plain_hashing(*plain_file)
, marks_file(data_part_storage_builder->writeFile(
{
marks_file = data_part_storage_builder->writeFile(
MergeTreeDataPartCompact::DATA_FILE_NAME + marks_file_extension_,
4096,
settings_.query_write_settings))
, marks(*marks_file)
{
settings_.query_write_settings);
marks_file_hashing = std::make_unique<HashingWriteBuffer>(*marks_file);
if (data_part_->index_granularity_info.mark_type.compressed)
{
marks_compressor = std::make_unique<CompressedWriteBuffer>(
*marks_file_hashing,
settings_.getMarksCompressionCodec(),
settings_.marks_compress_block_size);
marks_source_hashing = std::make_unique<HashingWriteBuffer>(*marks_compressor);
}
const auto & storage_columns = metadata_snapshot->getColumns();
for (const auto & column : columns_list)
addStreams(column, storage_columns.getCodecDescOrDefault(column.name, default_codec));
@ -172,6 +184,8 @@ void MergeTreeDataPartWriterCompact::writeDataBlockPrimaryIndexAndSkipIndices(co
void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const Granules & granules)
{
WriteBuffer & marks_out = marks_source_hashing ? *marks_source_hashing : *marks_file_hashing;
for (const auto & granule : granules)
{
data_written = true;
@ -203,8 +217,8 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
};
writeIntBinary(plain_hashing.count(), marks);
writeIntBinary(static_cast<UInt64>(0), marks);
writeIntBinary(plain_hashing.count(), marks_out);
writeIntBinary(static_cast<UInt64>(0), marks_out);
writeColumnSingleGranule(
block.getByName(name_and_type->name), data_part->getSerialization(name_and_type->name),
@ -214,7 +228,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
prev_stream->hashing_buf.next(); //-V522
}
writeIntBinary(granule.rows_to_write, marks);
writeIntBinary(granule.rows_to_write, marks_out);
}
}
@ -239,18 +253,26 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Check
assert(stream->hashing_buf.offset() == 0);
#endif
WriteBuffer & marks_out = marks_source_hashing ? *marks_source_hashing : *marks_file_hashing;
if (with_final_mark && data_written)
{
for (size_t i = 0; i < columns_list.size(); ++i)
{
writeIntBinary(plain_hashing.count(), marks);
writeIntBinary(static_cast<UInt64>(0), marks);
writeIntBinary(plain_hashing.count(), marks_out);
writeIntBinary(static_cast<UInt64>(0), marks_out);
}
writeIntBinary(static_cast<UInt64>(0), marks);
writeIntBinary(static_cast<UInt64>(0), marks_out);
}
plain_file->next();
marks.next();
if (marks_source_hashing)
marks_source_hashing->next();
if (marks_compressor)
marks_compressor->next();
marks_file_hashing->next();
addToChecksums(checksums);
plain_file->preFinalize();
@ -261,6 +283,7 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(bool sync)
{
plain_file->finalize();
marks_file->finalize();
if (sync)
{
plain_file->sync();
@ -332,8 +355,15 @@ void MergeTreeDataPartWriterCompact::addToChecksums(MergeTreeDataPartChecksums &
checksums.files[data_file_name].file_size = plain_hashing.count();
checksums.files[data_file_name].file_hash = plain_hashing.getHash();
checksums.files[marks_file_name].file_size = marks.count();
checksums.files[marks_file_name].file_hash = marks.getHash();
if (marks_compressor)
{
checksums.files[marks_file_name].is_compressed = true;
checksums.files[marks_file_name].uncompressed_size = marks_source_hashing->count();
checksums.files[marks_file_name].uncompressed_hash = marks_source_hashing->getHash();
}
checksums.files[marks_file_name].file_size = marks_file_hashing->count();
checksums.files[marks_file_name].file_hash = marks_file_hashing->getHash();
}
void MergeTreeDataPartWriterCompact::ColumnsBuffer::add(MutableColumns && columns)

View File

@ -1,6 +1,8 @@
#pragma once
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
namespace DB
{
@ -84,9 +86,16 @@ private:
/// Stream for each column's substreams path (look at addStreams).
std::unordered_map<String, CompressedStreamPtr> compressed_streams;
/// marks -> marks_file
/// If marks are uncompressed, the data is written to 'marks_file_hashing' for hash calculation and then to the 'marks_file'.
std::unique_ptr<WriteBufferFromFileBase> marks_file;
HashingWriteBuffer marks;
std::unique_ptr<HashingWriteBuffer> marks_file_hashing;
/// If marks are compressed, the data is written to 'marks_source_hashing' for hash calculation,
/// then to 'marks_compressor' for compression,
/// then to 'marks_file_hashing' for calculation of hash of compressed data,
/// then finally to 'marks_file'.
std::unique_ptr<CompressedWriteBuffer> marks_compressor;
std::unique_ptr<HashingWriteBuffer> marks_source_hashing;
};
}

View File

@ -13,10 +13,17 @@ namespace ErrorCodes
void MergeTreeDataPartWriterOnDisk::Stream::preFinalize()
{
compressed.next();
/// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually.
compressed_hashing.next();
compressor.next();
plain_hashing.next();
marks.next();
if (compress_marks)
{
marks_compressed_hashing.next();
marks_compressor.next();
}
marks_hashing.next();
plain_file->preFinalize();
marks_file->preFinalize();
@ -48,15 +55,21 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream(
const std::string & marks_file_extension_,
const CompressionCodecPtr & compression_codec_,
size_t max_compress_block_size_,
const CompressionCodecPtr & marks_compression_codec_,
size_t marks_compress_block_size_,
const WriteSettings & query_write_settings) :
escaped_column_name(escaped_column_name_),
data_file_extension{data_file_extension_},
marks_file_extension{marks_file_extension_},
plain_file(data_part_storage_builder->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
plain_hashing(*plain_file),
compressed_buf(plain_hashing, compression_codec_, max_compress_block_size_),
compressed(compressed_buf),
marks_file(data_part_storage_builder->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)), marks(*marks_file)
compressor(plain_hashing, compression_codec_, max_compress_block_size_),
compressed_hashing(compressor),
marks_file(data_part_storage_builder->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)),
marks_hashing(*marks_file),
marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_),
marks_compressed_hashing(marks_compressor),
compress_marks(MarkType(marks_file_extension).compressed)
{
}
@ -65,13 +78,20 @@ void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPa
String name = escaped_column_name;
checksums.files[name + data_file_extension].is_compressed = true;
checksums.files[name + data_file_extension].uncompressed_size = compressed.count();
checksums.files[name + data_file_extension].uncompressed_hash = compressed.getHash();
checksums.files[name + data_file_extension].uncompressed_size = compressed_hashing.count();
checksums.files[name + data_file_extension].uncompressed_hash = compressed_hashing.getHash();
checksums.files[name + data_file_extension].file_size = plain_hashing.count();
checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash();
checksums.files[name + marks_file_extension].file_size = marks.count();
checksums.files[name + marks_file_extension].file_hash = marks.getHash();
if (compress_marks)
{
checksums.files[name + marks_file_extension].is_compressed = true;
checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed_hashing.count();
checksums.files[name + marks_file_extension].uncompressed_hash = marks_compressed_hashing.getHash();
}
checksums.files[name + marks_file_extension].file_size = marks_hashing.count();
checksums.files[name + marks_file_extension].file_hash = marks_hashing.getHash();
}
@ -91,6 +111,7 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk(
, marks_file_extension(marks_file_extension_)
, default_codec(default_codec_)
, compute_granularity(index_granularity.empty())
, compress_primary_key(settings.compress_primary_key)
{
if (settings.blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
@ -156,13 +177,27 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex()
{
if (metadata_snapshot->hasPrimaryKey())
{
index_file_stream = data_part_storage_builder->writeFile("primary.idx", DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings);
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
String index_name = "primary" + getIndexExtension(compress_primary_key);
index_file_stream = data_part_storage_builder->writeFile(index_name, DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings);
index_file_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
if (compress_primary_key)
{
ParserCodec codec_parser;
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.primary_key_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
CompressionCodecPtr primary_key_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr);
index_compressor_stream = std::make_unique<CompressedWriteBuffer>(*index_file_hashing_stream, primary_key_compression_codec, settings.primary_key_compress_block_size);
index_source_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_compressor_stream);
}
}
}
void MergeTreeDataPartWriterOnDisk::initSkipIndices()
{
ParserCodec codec_parser;
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.marks_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
CompressionCodecPtr marks_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr);
for (const auto & index_helper : skip_indices)
{
String stream_name = index_helper->getFileName();
@ -172,7 +207,9 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
data_part_storage_builder,
stream_name, index_helper->getSerializedFileExtension(),
stream_name, marks_file_extension,
default_codec, settings.max_compress_block_size, settings.query_write_settings));
default_codec, settings.max_compress_block_size,
marks_compression_codec, settings.marks_compress_block_size,
settings.query_write_settings));
skip_indices_aggregators.push_back(index_helper->createIndexAggregator());
skip_index_accumulated_marks.push_back(0);
}
@ -208,7 +245,8 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc
{
const auto & primary_column = primary_index_block.getByPosition(j);
index_columns[j]->insertFrom(*primary_column.column, granule.start_row);
primary_column.type->getDefaultSerialization()->serializeBinary(*primary_column.column, granule.start_row, *index_stream);
primary_column.type->getDefaultSerialization()->serializeBinary(
*primary_column.column, granule.start_row, compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream);
}
}
}
@ -226,11 +264,13 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
{
const auto index_helper = skip_indices[i];
auto & stream = *skip_indices_streams[i];
WriteBuffer & marks_out = stream.compress_marks ? stream.marks_compressed_hashing : stream.marks_hashing;
for (const auto & granule : granules_to_write)
{
if (skip_index_accumulated_marks[i] == index_helper->index.granularity)
{
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed_hashing);
skip_index_accumulated_marks[i] = 0;
}
@ -238,15 +278,16 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
{
skip_indices_aggregators[i] = index_helper->createIndexAggregator();
if (stream.compressed.offset() >= settings.min_compress_block_size)
stream.compressed.next();
if (stream.compressed_hashing.offset() >= settings.min_compress_block_size)
stream.compressed_hashing.next();
writeIntBinary(stream.plain_hashing.count(), marks_out);
writeIntBinary(stream.compressed_hashing.offset(), marks_out);
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
/// Actually this numbers is redundant, but we have to store them
/// to be compatible with normal .mrk2 file format
/// to be compatible with the normal .mrk2 file format
if (settings.can_use_adaptive_granularity)
writeIntBinary(1UL, stream.marks);
writeIntBinary(1UL, marks_out);
}
size_t pos = granule.start_row;
@ -263,7 +304,7 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
if (write_final_mark && compute_granularity)
index_granularity.appendMark(0);
if (index_stream)
if (index_file_hashing_stream)
{
if (write_final_mark)
{
@ -272,26 +313,44 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
const auto & column = *last_block_index_columns[j];
size_t last_row_number = column.size() - 1;
index_columns[j]->insertFrom(column, last_row_number);
index_types[j]->getDefaultSerialization()->serializeBinary(column, last_row_number, *index_stream);
index_types[j]->getDefaultSerialization()->serializeBinary(
column, last_row_number, compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream);
}
last_block_index_columns.clear();
}
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();
if (compress_primary_key)
index_source_hashing_stream->next();
index_file_hashing_stream->next();
String index_name = "primary" + getIndexExtension(compress_primary_key);
if (compress_primary_key)
{
checksums.files[index_name].is_compressed = true;
checksums.files[index_name].uncompressed_size = index_source_hashing_stream->count();
checksums.files[index_name].uncompressed_hash = index_source_hashing_stream->getHash();
}
checksums.files[index_name].file_size = index_file_hashing_stream->count();
checksums.files[index_name].file_hash = index_file_hashing_stream->getHash();
index_file_stream->preFinalize();
}
}
void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(bool sync)
{
if (index_stream)
if (index_file_hashing_stream)
{
index_file_stream->finalize();
if (sync)
index_file_stream->sync();
index_stream = nullptr;
if (compress_primary_key)
{
index_source_hashing_stream = nullptr;
index_compressor_stream = nullptr;
}
index_file_hashing_stream = nullptr;
}
}
@ -301,7 +360,7 @@ void MergeTreeDataPartWriterOnDisk::fillSkipIndicesChecksums(MergeTreeData::Data
{
auto & stream = *skip_indices_streams[i];
if (!skip_indices_aggregators[i]->empty())
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed_hashing);
}
for (auto & stream : skip_indices_streams)

View File

@ -8,7 +8,8 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Disks/IDisk.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/parseQuery.h>
namespace DB
{
@ -56,21 +57,26 @@ public:
const std::string & marks_file_extension_,
const CompressionCodecPtr & compression_codec_,
size_t max_compress_block_size_,
const CompressionCodecPtr & marks_compression_codec_,
size_t marks_compress_block_size_,
const WriteSettings & query_write_settings);
String escaped_column_name;
std::string data_file_extension;
std::string marks_file_extension;
/// compressed -> compressed_buf -> plain_hashing -> plain_file
/// compressed_hashing -> compressor -> plain_hashing -> plain_file
std::unique_ptr<WriteBufferFromFileBase> plain_file;
HashingWriteBuffer plain_hashing;
CompressedWriteBuffer compressed_buf;
HashingWriteBuffer compressed;
CompressedWriteBuffer compressor;
HashingWriteBuffer compressed_hashing;
/// marks -> marks_file
/// marks_compressed_hashing -> marks_compressor -> marks_hashing -> marks_file
std::unique_ptr<WriteBufferFromFileBase> marks_file;
HashingWriteBuffer marks;
HashingWriteBuffer marks_hashing;
CompressedWriteBuffer marks_compressor;
HashingWriteBuffer marks_compressed_hashing;
bool compress_marks;
bool is_prefinalized = false;
@ -139,7 +145,11 @@ protected:
std::vector<size_t> skip_index_accumulated_marks;
std::unique_ptr<WriteBufferFromFileBase> index_file_stream;
std::unique_ptr<HashingWriteBuffer> index_stream;
std::unique_ptr<HashingWriteBuffer> index_file_hashing_stream;
std::unique_ptr<CompressedWriteBuffer> index_compressor_stream;
std::unique_ptr<HashingWriteBuffer> index_source_hashing_stream;
bool compress_primary_key;
DataTypes index_types;
/// Index columns from the last block
/// It's written to index file in the `writeSuffixAndFinalizePart` method

View File

@ -30,6 +30,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
Granules result;
size_t current_row = 0;
/// When our last mark is not finished yet and we have to write rows into it
if (rows_written_in_last_mark > 0)
{
@ -43,7 +44,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
.is_complete = (rows_left_in_block >= rows_left_in_last_mark),
});
current_row += result.back().rows_to_write;
current_mark++;
++current_mark;
}
/// Calculating normal granules for block
@ -61,7 +62,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
.is_complete = (rows_left_in_block >= expected_rows_in_mark),
});
current_row += result.back().rows_to_write;
current_mark++;
++current_mark;
}
return result;
@ -110,6 +111,10 @@ void MergeTreeDataPartWriterWide::addStreams(
else /// otherwise return only generic codecs and don't use info about the` data_type
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, nullptr, default_codec, true);
ParserCodec codec_parser;
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.marks_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
CompressionCodecPtr marks_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr);
column_streams[stream_name] = std::make_unique<Stream>(
stream_name,
data_part_storage_builder,
@ -117,6 +122,8 @@ void MergeTreeDataPartWriterWide::addStreams(
stream_name, marks_file_extension,
compression_codec,
settings.max_compress_block_size,
marks_compression_codec,
settings.marks_compress_block_size,
settings.query_write_settings);
};
@ -138,7 +145,7 @@ ISerialization::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGett
if (is_offsets && offset_columns.contains(stream_name))
return nullptr;
return &column_streams.at(stream_name)->compressed;
return &column_streams.at(stream_name)->compressed_hashing;
};
}
@ -265,10 +272,12 @@ void MergeTreeDataPartWriterWide::writeSingleMark(
void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark)
{
Stream & stream = *column_streams[stream_with_mark.stream_name];
writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, stream.marks);
writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, stream.marks);
WriteBuffer & marks_out = stream.compress_marks ? stream.marks_compressed_hashing : stream.marks_hashing;
writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, marks_out);
writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, marks_out);
if (settings.can_use_adaptive_granularity)
writeIntBinary(rows_in_mark, stream.marks);
writeIntBinary(rows_in_mark, marks_out);
}
StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
@ -289,13 +298,13 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
Stream & stream = *column_streams[stream_name];
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= settings.min_compress_block_size)
stream.compressed.next();
if (stream.compressed_hashing.offset() >= settings.min_compress_block_size)
stream.compressed_hashing.next();
StreamNameAndMark stream_with_mark;
stream_with_mark.stream_name = stream_name;
stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing.count();
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed.offset();
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed_hashing.offset();
result.push_back(stream_with_mark);
});
@ -325,7 +334,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
if (is_offsets && offset_columns.contains(stream_name))
return;
column_streams[stream_name]->compressed.nextIfAtEnd();
column_streams[stream_name]->compressed_hashing.nextIfAtEnd();
});
}
@ -418,7 +427,13 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
if (!data_part_storage->exists(mrk_path))
return;
auto mrk_in = data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt);
auto mrk_file_in = data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt);
std::unique_ptr<ReadBuffer> mrk_in;
if (data_part->index_granularity_info.mark_type.compressed)
mrk_in = std::make_unique<CompressedReadBufferFromFile>(std::move(mrk_file_in));
else
mrk_in = std::move(mrk_file_in);
DB::CompressedReadBufferFromFile bin_in(data_part_storage->readFile(bin_path, {}, std::nullopt, std::nullopt));
bool must_be_last = false;
UInt64 offset_in_compressed_file = 0;

View File

@ -3,6 +3,8 @@
#include <Core/Settings.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <IO/WriteSettings.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/parseQuery.h>
namespace DB
@ -43,6 +45,11 @@ struct MergeTreeWriterSettings
, max_compress_block_size(
storage_settings->max_compress_block_size ? storage_settings->max_compress_block_size
: global_settings.max_compress_block_size)
, marks_compression_codec(storage_settings->marks_compression_codec)
, marks_compress_block_size(storage_settings->marks_compress_block_size)
, compress_primary_key(storage_settings->compress_primary_key)
, primary_key_compression_codec(storage_settings->primary_key_compression_codec)
, primary_key_compress_block_size(storage_settings->primary_key_compress_block_size)
, can_use_adaptive_granularity(can_use_adaptive_granularity_)
, rewrite_primary_key(rewrite_primary_key_)
, blocks_are_granules_size(blocks_are_granules_size_)
@ -50,8 +57,23 @@ struct MergeTreeWriterSettings
{
}
CompressionCodecPtr getMarksCompressionCodec() const
{
ParserCodec codec_parser;
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(marks_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
return CompressionCodecFactory::instance().get(ast, nullptr);
}
size_t min_compress_block_size;
size_t max_compress_block_size;
String marks_compression_codec;
size_t marks_compress_block_size;
bool compress_primary_key;
String primary_key_compression_codec;
size_t primary_key_compress_block_size;
bool can_use_adaptive_granularity;
bool rewrite_primary_key;
bool blocks_are_granules_size;

View File

@ -9,71 +9,124 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_PART_TYPE;
extern const int INCORRECT_FILE_NAME;
}
MarkType::MarkType(std::string_view extension)
{
if (extension.starts_with('.'))
extension = extension.substr(1);
if (extension.starts_with('c'))
{
compressed = true;
extension = extension.substr(1);
}
if (!extension.starts_with("mrk"))
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Mark file extension does not start with .mrk or .cmrk: {}", extension);
extension = extension.substr(strlen("mrk"));
if (extension.empty())
{
adaptive = false;
part_type = MergeTreeDataPartType::Wide;
}
else if (extension == "2")
{
adaptive = true;
part_type = MergeTreeDataPartType::Wide;
}
else if (extension == "3")
{
adaptive = true;
part_type = MergeTreeDataPartType::Compact;
}
else
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Unknown mark file extension: '{}'", extension);
}
MarkType::MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_)
: adaptive(adaptive_), compressed(compressed_), part_type(part_type_)
{
if (!adaptive && part_type != MergeTreeDataPartType::Wide)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity");
if (part_type == MergeTreeDataPartType::Unknown)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type");
}
bool MarkType::isMarkFileExtension(std::string_view extension)
{
return extension.find("mrk") != std::string_view::npos;
}
std::string MarkType::getFileExtension() const
{
std::string res = compressed ? ".cmrk" : ".mrk";
if (!adaptive)
{
if (part_type != MergeTreeDataPartType::Wide)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity");
return res;
}
switch (part_type)
{
case MergeTreeDataPartType::Wide:
return res + "2";
case MergeTreeDataPartType::Compact:
return res + "3";
case MergeTreeDataPartType::InMemory:
return "";
case MergeTreeDataPartType::Unknown:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type");
}
}
std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage)
{
if (data_part_storage->exists())
{
for (auto it = data_part_storage->iterate(); it->isValid(); it->next())
{
const auto & ext = fs::path(it->name()).extension();
if (ext == getNonAdaptiveMrkExtension()
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide)
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact))
return ext;
}
}
if (it->isFile())
if (std::string ext = fs::path(it->name()).extension(); MarkType::isMarkFileExtension(ext))
return ext;
return {};
}
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_)
: type(type_)
: MergeTreeIndexGranularityInfo(storage, {storage.canUseAdaptiveGranularity(), storage.getSettings()->compress_marks, type_.getValue()})
{
const auto storage_settings = storage.getSettings();
fixed_index_granularity = storage_settings->index_granularity;
}
/// Granularity is fixed
if (!storage.canUseAdaptiveGranularity())
{
if (type != MergeTreeDataPartType::Wide)
throw Exception("Only Wide parts can be used with non-adaptive granularity.", ErrorCodes::NOT_IMPLEMENTED);
setNonAdaptive();
}
else
setAdaptive(storage_settings->index_granularity_bytes);
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MarkType mark_type_)
: mark_type(mark_type_)
{
fixed_index_granularity = storage.getSettings()->index_granularity;
}
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage)
{
auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage);
if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension())
setNonAdaptive();
}
void MergeTreeIndexGranularityInfo::setAdaptive(size_t index_granularity_bytes_)
{
is_adaptive = true;
marks_file_extension = getAdaptiveMrkExtension(type);
index_granularity_bytes = index_granularity_bytes_;
}
void MergeTreeIndexGranularityInfo::setNonAdaptive()
{
is_adaptive = false;
marks_file_extension = getNonAdaptiveMrkExtension();
index_granularity_bytes = 0;
if (mrk_ext && !MarkType(*mrk_ext).adaptive)
{
mark_type.adaptive = false;
index_granularity_bytes = 0;
}
}
size_t MergeTreeIndexGranularityInfo::getMarkSizeInBytes(size_t columns_num) const
{
if (type == MergeTreeDataPartType::Wide)
return is_adaptive ? getAdaptiveMrkSizeWide() : getNonAdaptiveMrkSizeWide();
else if (type == MergeTreeDataPartType::Compact)
if (mark_type.part_type == MergeTreeDataPartType::Wide)
return mark_type.adaptive ? getAdaptiveMrkSizeWide() : getNonAdaptiveMrkSizeWide();
else if (mark_type.part_type == MergeTreeDataPartType::Compact)
return getAdaptiveMrkSizeCompact(columns_num);
else if (type == MergeTreeDataPartType::InMemory)
else if (mark_type.part_type == MergeTreeDataPartType::InMemory)
return 0;
else
throw Exception("Unknown part type", ErrorCodes::UNKNOWN_PART_TYPE);
@ -85,16 +138,4 @@ size_t getAdaptiveMrkSizeCompact(size_t columns_num)
return sizeof(UInt64) * (columns_num * 2 + 1);
}
std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type)
{
if (part_type == MergeTreeDataPartType::Wide)
return ".mrk2";
else if (part_type == MergeTreeDataPartType::Compact)
return ".mrk3";
else if (part_type == MergeTreeDataPartType::InMemory)
return "";
else
throw Exception("Unknown part type", ErrorCodes::UNKNOWN_PART_TYPE);
}
}

View File

@ -11,15 +11,30 @@ namespace DB
class MergeTreeData;
/** Various types of mark files are stored in files with various extensions:
* .mrk, .mrk2, .mrk3, .cmrk, .cmrk2, .cmrk3.
* This helper allows to obtain mark type from file extension and vice versa.
*/
struct MarkType
{
MarkType(std::string_view extension);
MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_);
static bool isMarkFileExtension(std::string_view extension);
std::string getFileExtension() const;
bool adaptive = false;
bool compressed = false;
MergeTreeDataPartType::Value part_type = MergeTreeDataPartType::Unknown;
};
/// Meta information about index granularity
struct MergeTreeIndexGranularityInfo
{
public:
/// Marks file extension '.mrk' or '.mrk2'
String marks_file_extension;
/// Is stride in rows between marks non fixed?
bool is_adaptive = false;
MarkType mark_type;
/// Fixed size in rows of one granule if index_granularity_bytes is zero
size_t fixed_index_granularity = 0;
@ -29,29 +44,24 @@ public:
MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_);
MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MarkType mark_type_);
MergeTreeIndexGranularityInfo(MergeTreeDataPartType type_, bool is_adaptive_, size_t index_granularity_, size_t index_granularity_bytes_);
void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage);
String getMarksFilePath(const String & path_prefix) const
{
return path_prefix + marks_file_extension;
return path_prefix + mark_type.getFileExtension();
}
size_t getMarkSizeInBytes(size_t columns_num = 1) const;
static std::optional<std::string> getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);
private:
MergeTreeDataPartType type;
void setAdaptive(size_t index_granularity_bytes_);
void setNonAdaptive();
};
constexpr inline auto getNonAdaptiveMrkExtension() { return ".mrk"; }
constexpr inline auto getNonAdaptiveMrkSizeWide() { return sizeof(UInt64) * 2; }
constexpr inline auto getAdaptiveMrkSizeWide() { return sizeof(UInt64) * 3; }
inline size_t getAdaptiveMrkSizeCompact(size_t columns_num);
std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type);
}

View File

@ -2,6 +2,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
#include <Common/CurrentMetrics.h>
@ -59,6 +60,7 @@ MergeTreeMarksLoader::~MergeTreeMarksLoader()
}
}
const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, size_t column_index)
{
if (!marks)
@ -87,6 +89,7 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz
return (*marks)[row_index * columns_in_mark + column_index];
}
MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
{
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
@ -94,42 +97,49 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
size_t file_size = data_part_storage->getFileSize(mrk_path);
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
size_t expected_file_size = mark_size * marks_count;
size_t expected_uncompressed_size = mark_size * marks_count;
if (expected_file_size != file_size)
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
if (!index_granularity_info.mark_type.compressed && expected_uncompressed_size != file_size)
throw Exception(
ErrorCodes::CORRUPTED_DATA,
"Bad size of marks file '{}': {}, must be: {}",
std::string(fs::path(data_part_storage->getFullPath()) / mrk_path),
std::to_string(file_size), std::to_string(expected_file_size));
std::to_string(file_size), std::to_string(expected_uncompressed_size));
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
auto buffer = data_part_storage->readFile(mrk_path, read_settings.adjustBufferSize(file_size), file_size, std::nullopt);
std::unique_ptr<ReadBuffer> reader;
if (!index_granularity_info.mark_type.compressed)
reader = std::move(buffer);
else
reader = std::make_unique<CompressedReadBufferFromFile>(std::move(buffer));
if (!index_granularity_info.is_adaptive)
if (!index_granularity_info.mark_type.adaptive)
{
/// Read directly to marks.
auto buffer = data_part_storage->readFile(mrk_path, read_settings.adjustBufferSize(file_size), file_size, std::nullopt);
buffer->readStrict(reinterpret_cast<char *>(res->data()), file_size);
reader->readStrict(reinterpret_cast<char *>(res->data()), expected_uncompressed_size);
if (!buffer->eof())
if (!reader->eof())
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
"Cannot read all marks from file {}, is eof: {}, buffer size: {}, file size: {}",
mrk_path, buffer->eof(), buffer->buffer().size(), file_size);
mrk_path, reader->eof(), reader->buffer().size(), file_size);
}
else
{
auto buffer = data_part_storage->readFile(mrk_path, read_settings.adjustBufferSize(file_size), file_size, std::nullopt);
size_t i = 0;
while (!buffer->eof())
size_t granularity;
while (!reader->eof())
{
res->read(*buffer, i * columns_in_mark, columns_in_mark);
buffer->seek(sizeof(size_t), SEEK_CUR);
res->read(*reader, i * columns_in_mark, columns_in_mark);
readIntBinary(granularity, *reader);
++i;
}
if (i * mark_size != file_size)
if (i * mark_size != expected_uncompressed_size)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all marks from file {}", mrk_path);
}
res->protect();
return res;
}

View File

@ -1,9 +1,11 @@
#pragma once
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Storages/MarkCache.h>
#include <IO/ReadSettings.h>
#include <Common/ThreadPool.h>
namespace DB
{

View File

@ -145,6 +145,14 @@ struct Settings;
M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for Zero-copy table-independet info.", 0) \
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
\
/** Compress marks and primary key. */ \
M(Bool, compress_marks, false, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \
M(Bool, compress_primary_key, false, "Primary key support compression, reduce primary key file size and speed up network transmission.", 0) \
M(String, marks_compression_codec, "ZSTD(3)", "Compression encoding used by marks, marks are small enough and cached, so the default compression is ZSTD(3).", 0) \
M(String, primary_key_compression_codec, "ZSTD(3)", "Compression encoding used by primary, primary key is small enough and cached, so the default compression is ZSTD(3).", 0) \
M(UInt64, marks_compress_block_size, 65536, "Mark compress block size, the actual size of the block to compress.", 0) \
M(UInt64, primary_key_compress_block_size, 65536, "Primary compress block size, the actual size of the block to compress.", 0) \
\
/** Obsolete settings. Kept for backward compatibility only. */ \
M(UInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \
M(UInt64, check_delay_period, 60, "Obsolete setting, does nothing.", 0) \
@ -153,7 +161,7 @@ struct Settings;
M(UInt64, replicated_max_parallel_sends_for_table, 0, "Obsolete setting, does nothing.", 0) \
M(UInt64, replicated_max_parallel_fetches, 0, "Obsolete setting, does nothing.", 0) \
M(UInt64, replicated_max_parallel_fetches_for_table, 0, "Obsolete setting, does nothing.", 0) \
M(Bool, write_final_mark, true, "Obsolete setting, does nothing.", 0)
M(Bool, write_final_mark, true, "Obsolete setting, does nothing.", 0) \
/// Settings that should not change after the creation of a table.
/// NOLINTNEXTLINE
#define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \

View File

@ -33,7 +33,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
storage.getContext()->getSettings(),
write_settings,
storage.getSettings(),
data_part->index_granularity_info.is_adaptive,
data_part->index_granularity_info.mark_type.adaptive,
/* rewrite_primary_key = */ true,
blocks_are_granules_size);

View File

@ -30,8 +30,8 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
global_settings,
data_part->storage.getContext()->getWriteSettings(),
storage_settings,
index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity(),
/* rewrite_primary_key = */false);
index_granularity_info ? index_granularity_info->mark_type.adaptive : data_part->storage.canUseAdaptiveGranularity(),
/* rewrite_primary_key = */ false);
writer = data_part->getWriter(
data_part_storage_builder,

View File

@ -1570,8 +1570,7 @@ bool MutateTask::prepare()
ctx->new_data_part->partition.assign(ctx->source_part->partition);
/// Don't change granularity type while mutating subset of columns
ctx->mrk_extension = ctx->source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(ctx->new_data_part->getType())
: getNonAdaptiveMrkExtension();
ctx->mrk_extension = ctx->source_part->index_granularity_info.mark_type.getFileExtension();
const auto data_settings = ctx->data->getSettings();
ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings);

View File

@ -1,6 +1,7 @@
#include "PartMetadataManagerOrdinary.h"
#include <IO/ReadBufferFromFileBase.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Disks/IDisk.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
@ -18,9 +19,14 @@ PartMetadataManagerOrdinary::PartMetadataManagerOrdinary(const IMergeTreeDataPar
}
std::unique_ptr<SeekableReadBuffer> PartMetadataManagerOrdinary::read(const String & file_name) const
std::unique_ptr<ReadBuffer> PartMetadataManagerOrdinary::read(const String & file_name) const
{
return openForReading(part->data_part_storage, file_name);
auto res = openForReading(part->data_part_storage, file_name);
if (isCompressedFromFileName(file_name))
return std::make_unique<CompressedReadBufferFromFile>(std::move(res));
return res;
}
bool PartMetadataManagerOrdinary::exists(const String & file_name) const

View File

@ -12,7 +12,7 @@ public:
~PartMetadataManagerOrdinary() override = default;
std::unique_ptr<SeekableReadBuffer> read(const String & file_name) const override;
std::unique_ptr<ReadBuffer> read(const String & file_name) const override;
bool exists(const String & file_name) const override;

View File

@ -5,6 +5,7 @@
#include <Common/ErrorCodes.h>
#include <IO/HashingReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace ProfileEvents
@ -38,7 +39,7 @@ String PartMetadataManagerWithCache::getFilePathFromKey(const String & key) cons
return key.substr(part->data_part_storage->getDiskName().size() + 1);
}
std::unique_ptr<SeekableReadBuffer> PartMetadataManagerWithCache::read(const String & file_name) const
std::unique_ptr<ReadBuffer> PartMetadataManagerWithCache::read(const String & file_name) const
{
String file_path = fs::path(part->data_part_storage->getRelativePath()) / file_name;
String key = getKeyFromFilePath(file_path);
@ -48,7 +49,13 @@ std::unique_ptr<SeekableReadBuffer> PartMetadataManagerWithCache::read(const Str
{
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheMiss);
auto in = part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt);
readStringUntilEOF(value, *in);
std::unique_ptr<ReadBuffer> reader;
if (!isCompressedFromFileName(file_name))
reader = std::move(in);
else
reader = std::make_unique<CompressedReadBufferFromFile>(std::move(in));
readStringUntilEOF(value, *reader);
cache->put(key, value);
}
else

View File

@ -19,7 +19,7 @@ public:
~PartMetadataManagerWithCache() override = default;
/// First read the metadata from RocksDB cache, then from disk.
std::unique_ptr<SeekableReadBuffer> read(const String & file_name) const override;
std::unique_ptr<ReadBuffer> read(const String & file_name) const override;
/// First judge existence of the metadata in RocksDB cache, then in disk.
bool exists(const String & file_name) const override;
@ -48,7 +48,6 @@ private:
/// Get cache keys and checksums of corresponding metadata in a part(including projection parts)
void getKeysAndCheckSums(Strings & keys, std::vector<uint128> & checksums) const;
MergeTreeMetadataCachePtr cache;
};

View File

@ -261,7 +261,7 @@ void StorageSystemPartsColumns::processNextStorage(
size.data_uncompressed += bin_checksum->second.uncompressed_size;
}
auto mrk_checksum = part->checksums.files.find(file_name + part->index_granularity_info.marks_file_extension);
auto mrk_checksum = part->checksums.files.find(file_name + part->index_granularity_info.mark_type.getFileExtension());
if (mrk_checksum != part->checksums.files.end())
size.marks += mrk_checksum->second.file_size;

View File

@ -0,0 +1,14 @@
1000 10000
1000 10000
test_02381 2000000 16112790 11904 16100886
test_02381_compress 2000000 16099626 1658 16097968
10000 100000
10000 100000
10000 100000
10000 100000
test_02381 4000000 28098334 2946 28095388
test_02381_compress 4000000 28125412 23616 28101796
1 Hello
2 World
1 Hello
2 World

View File

@ -0,0 +1,50 @@
drop table if exists test_02381;
create table test_02381(a UInt64, b UInt64) ENGINE = MergeTree order by (a, b);
insert into test_02381 select number, number * 10 from system.numbers limit 1000000;
drop table if exists test_02381_compress;
create table test_02381_compress(a UInt64, b UInt64) ENGINE = MergeTree order by (a, b)
SETTINGS compress_marks=true, compress_primary_key=true, marks_compression_codec='ZSTD(3)', primary_key_compression_codec='ZSTD(3)', marks_compress_block_size=65536, primary_key_compress_block_size=65536;
insert into test_02381_compress select number, number * 10 from system.numbers limit 1000000;
select * from test_02381_compress where a = 1000 limit 1;
optimize table test_02381_compress final;
select * from test_02381_compress where a = 1000 limit 1;
-- Compare the size of marks on disk
select table, sum(rows), sum(bytes_on_disk) sum_bytes, sum(marks_bytes) sum_marks_bytes, (sum_bytes - sum_marks_bytes) exclude_marks from system.parts_columns where active and database = currentDatabase() and table like 'test_02381%' group by table order by table;
-- Switch to compressed and uncompressed
-- Test wide part
alter table test_02381 modify setting compress_marks=true, compress_primary_key=true;
insert into test_02381 select number, number * 10 from system.numbers limit 1000000;
alter table test_02381_compress modify setting compress_marks=false, compress_primary_key=false;
insert into test_02381_compress select number, number * 10 from system.numbers limit 1000000;
select * from test_02381_compress where a = 10000 limit 1;
optimize table test_02381_compress final;
select * from test_02381_compress where a = 10000 limit 1;
select * from test_02381 where a = 10000 limit 1;
optimize table test_02381 final;
select * from test_02381 where a = 10000 limit 1;
select table, sum(rows), sum(bytes_on_disk) sum_bytes, sum(marks_bytes) sum_marks_bytes, (sum_bytes - sum_marks_bytes) exclude_marks from system.parts_columns where active and database = currentDatabase() and table like 'test_02381%' group by table order by table;
drop table if exists test_02381;
drop table if exists test_02381_compress;
-- Test compact part
drop table if exists test_02381_compact;
create table test_02381_compact (a UInt64, b String) ENGINE = MergeTree order by (a, b);
insert into test_02381_compact values (1, 'Hello');
alter table test_02381_compact modify setting compress_marks = true, compress_primary_key = true;
insert into test_02381_compact values (2, 'World');
select * from test_02381_compact order by a;
optimize table test_02381_compact final;
select * from test_02381_compact order by a;
drop table if exists test_02381_compact;