mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-13 09:52:38 +00:00
Merge branch 'compress-marks' into compress_marks_and_primary_key
This commit is contained in:
commit
0bf76fe642
@ -341,7 +341,7 @@ struct PartRangesReadInfo
|
|||||||
sum_marks_in_parts[i] = parts[i].getMarksCount();
|
sum_marks_in_parts[i] = parts[i].getMarksCount();
|
||||||
sum_marks += sum_marks_in_parts[i];
|
sum_marks += sum_marks_in_parts[i];
|
||||||
|
|
||||||
if (parts[i].data_part->index_granularity_info.is_adaptive)
|
if (parts[i].data_part->index_granularity_info.mark_type.adaptive)
|
||||||
++adaptive_parts;
|
++adaptive_parts;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,6 +67,7 @@ namespace ErrorCodes
|
|||||||
extern const int NO_SUCH_COLUMN_IN_TABLE;
|
extern const int NO_SUCH_COLUMN_IN_TABLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const PartMetadataManagerPtr & manager)
|
void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const PartMetadataManagerPtr & manager)
|
||||||
{
|
{
|
||||||
auto metadata_snapshot = data.getInMemoryMetadataPtr();
|
auto metadata_snapshot = data.getInMemoryMetadataPtr();
|
||||||
@ -1648,7 +1649,7 @@ void IMergeTreeDataPart::calculateSecondaryIndicesSizesOnDisk()
|
|||||||
auto index_name_escaped = escapeForFileName(index_name);
|
auto index_name_escaped = escapeForFileName(index_name);
|
||||||
|
|
||||||
auto index_file_name = index_name_escaped + index_ptr->getSerializedFileExtension();
|
auto index_file_name = index_name_escaped + index_ptr->getSerializedFileExtension();
|
||||||
auto index_marks_file_name = index_name_escaped + index_granularity_info.marks_file_extension;
|
auto index_marks_file_name = index_name_escaped + getMarksFileExtension();
|
||||||
|
|
||||||
/// If part does not contain index
|
/// If part does not contain index
|
||||||
auto bin_checksum = checksums.files.find(index_file_name);
|
auto bin_checksum = checksums.files.find(index_file_name);
|
||||||
@ -1828,12 +1829,4 @@ bool isCompressedFromIndexExtension(const String & index_extension)
|
|||||||
return index_extension == getIndexExtension(true);
|
return index_extension == getIndexExtension(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isCompressedFromMrkExtension(const String & mrk_extension)
|
|
||||||
{
|
|
||||||
return mrk_extension == getNonAdaptiveMrkExtension(true)
|
|
||||||
|| mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true)
|
|
||||||
|| mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true)
|
|
||||||
|| mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::InMemory, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
|
|
||||||
#include <shared_mutex>
|
#include <shared_mutex>
|
||||||
|
|
||||||
|
|
||||||
namespace zkutil
|
namespace zkutil
|
||||||
{
|
{
|
||||||
class ZooKeeper;
|
class ZooKeeper;
|
||||||
@ -158,7 +159,7 @@ public:
|
|||||||
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
|
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
|
||||||
void appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection = false) const;
|
void appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection = false) const;
|
||||||
|
|
||||||
String getMarksFileExtension() const { return index_granularity_info.marks_file_extension; }
|
String getMarksFileExtension() const { return index_granularity_info.mark_type.getFileExtension(); }
|
||||||
|
|
||||||
/// Generate the new name for this part according to `new_part_info` and min/max dates from the old name.
|
/// Generate the new name for this part according to `new_part_info` and min/max dates from the old name.
|
||||||
/// This is useful when you want to change e.g. block numbers or the mutation version of the part.
|
/// This is useful when you want to change e.g. block numbers or the mutation version of the part.
|
||||||
@ -585,6 +586,5 @@ bool isInMemoryPart(const MergeTreeDataPartPtr & data_part);
|
|||||||
inline String getIndexExtension(bool is_compressed_primary_key) { return is_compressed_primary_key ? ".cidx" : ".idx"; }
|
inline String getIndexExtension(bool is_compressed_primary_key) { return is_compressed_primary_key ? ".cidx" : ".idx"; }
|
||||||
std::optional<String> getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);
|
std::optional<String> getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);
|
||||||
bool isCompressedFromIndexExtension(const String & index_extension);
|
bool isCompressedFromIndexExtension(const String & index_extension);
|
||||||
bool isCompressedFromMrkExtension(const String & mrk_extension);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -106,6 +106,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
|
|||||||
if (remove_it != columns.end())
|
if (remove_it != columns.end())
|
||||||
columns.erase(remove_it);
|
columns.erase(remove_it);
|
||||||
}
|
}
|
||||||
|
|
||||||
return remove_files;
|
return remove_files;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,8 +13,9 @@ IPartMetadataManager::IPartMetadataManager(const IMergeTreeDataPart * part_) : p
|
|||||||
|
|
||||||
bool IPartMetadataManager::isCompressedFromFileName(const String & file_name)
|
bool IPartMetadataManager::isCompressedFromFileName(const String & file_name)
|
||||||
{
|
{
|
||||||
const auto & extension = fs::path(file_name).extension();
|
std::string extension = fs::path(file_name).extension();
|
||||||
return isCompressedFromMrkExtension(extension) || isCompressedFromIndexExtension(extension);
|
return (MarkType::isMarkFileExtension(extension) && MarkType(extension).compressed)
|
||||||
|
|| isCompressedFromIndexExtension(extension);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -154,6 +154,7 @@ namespace ErrorCodes
|
|||||||
extern const int CANNOT_RESTORE_TABLE;
|
extern const int CANNOT_RESTORE_TABLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void checkSampleExpression(const StorageInMemoryMetadata & metadata, bool allow_sampling_expression_not_in_primary_key, bool check_sample_column_is_correct)
|
static void checkSampleExpression(const StorageInMemoryMetadata & metadata, bool allow_sampling_expression_not_in_primary_key, bool check_sample_column_is_correct)
|
||||||
{
|
{
|
||||||
if (metadata.sampling_key.column_names.empty())
|
if (metadata.sampling_key.column_names.empty())
|
||||||
@ -1085,7 +1086,7 @@ void MergeTreeData::loadDataPartsFromDisk(
|
|||||||
suspicious_broken_parts_bytes += *size_of_part;
|
suspicious_broken_parts_bytes += *size_of_part;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (!part->index_granularity_info.is_adaptive)
|
if (!part->index_granularity_info.mark_type.adaptive)
|
||||||
has_non_adaptive_parts.store(true, std::memory_order_relaxed);
|
has_non_adaptive_parts.store(true, std::memory_order_relaxed);
|
||||||
else
|
else
|
||||||
has_adaptive_parts.store(true, std::memory_order_relaxed);
|
has_adaptive_parts.store(true, std::memory_order_relaxed);
|
||||||
@ -2676,21 +2677,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
|
|||||||
throw Exception("Unknown type of part " + data_part_storage->getRelativePath(), ErrorCodes::UNKNOWN_PART_TYPE);
|
throw Exception("Unknown type of part " + data_part_storage->getRelativePath(), ErrorCodes::UNKNOWN_PART_TYPE);
|
||||||
}
|
}
|
||||||
|
|
||||||
static MergeTreeDataPartType getPartTypeFromMarkExtension(const String & mrk_ext)
|
|
||||||
{
|
|
||||||
if (mrk_ext == getNonAdaptiveMrkExtension(true)
|
|
||||||
|| mrk_ext == getNonAdaptiveMrkExtension(false))
|
|
||||||
return MergeTreeDataPartType::Wide;
|
|
||||||
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true)
|
|
||||||
|| mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, false))
|
|
||||||
return MergeTreeDataPartType::Wide;
|
|
||||||
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true)
|
|
||||||
|| mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, false))
|
|
||||||
return MergeTreeDataPartType::Compact;
|
|
||||||
|
|
||||||
throw Exception("Can't determine part type, because of unknown mark extension " + mrk_ext, ErrorCodes::UNKNOWN_PART_TYPE);
|
|
||||||
}
|
|
||||||
|
|
||||||
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
|
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
|
||||||
const String & name, const DataPartStoragePtr & data_part_storage, const IMergeTreeDataPart * parent_part) const
|
const String & name, const DataPartStoragePtr & data_part_storage, const IMergeTreeDataPart * parent_part) const
|
||||||
{
|
{
|
||||||
@ -2705,7 +2691,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
|
|||||||
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(data_part_storage);
|
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(data_part_storage);
|
||||||
|
|
||||||
if (mrk_ext)
|
if (mrk_ext)
|
||||||
type = getPartTypeFromMarkExtension(*mrk_ext);
|
{
|
||||||
|
type = MarkType(*mrk_ext).part_type;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/// Didn't find any mark file, suppose that part is empty.
|
/// Didn't find any mark file, suppose that part is empty.
|
||||||
@ -6361,9 +6349,9 @@ bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
|
|||||||
|
|
||||||
if (!settings->enable_mixed_granularity_parts || settings->index_granularity_bytes == 0)
|
if (!settings->enable_mixed_granularity_parts || settings->index_granularity_bytes == 0)
|
||||||
{
|
{
|
||||||
if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.is_adaptive)
|
if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.mark_type.adaptive)
|
||||||
return false;
|
return false;
|
||||||
if (canUseAdaptiveGranularity() && !src_part->index_granularity_info.is_adaptive)
|
if (canUseAdaptiveGranularity() && !src_part->index_granularity_info.mark_type.adaptive)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
@ -71,7 +71,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
|
|||||||
|
|
||||||
return std::make_unique<MergeTreeDataPartWriterCompact>(
|
return std::make_unique<MergeTreeDataPartWriterCompact>(
|
||||||
shared_from_this(), std::move(data_part_storage_builder), ordered_columns_list, metadata_snapshot,
|
shared_from_this(), std::move(data_part_storage_builder), ordered_columns_list, metadata_snapshot,
|
||||||
indices_to_recalc, index_granularity_info.marks_file_extension,
|
indices_to_recalc, getMarksFileExtension(),
|
||||||
default_codec_, writer_settings, computed_index_granularity);
|
default_codec_, writer_settings, computed_index_granularity);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,19 +85,17 @@ void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*eac
|
|||||||
total_size.data_uncompressed += bin_checksum->second.uncompressed_size;
|
total_size.data_uncompressed += bin_checksum->second.uncompressed_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension);
|
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + getMarksFileExtension());
|
||||||
if (mrk_checksum != checksums.files.end())
|
if (mrk_checksum != checksums.files.end())
|
||||||
total_size.marks += mrk_checksum->second.file_size;
|
total_size.marks += mrk_checksum->second.file_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
void MergeTreeDataPartCompact::loadIndexGranularity()
|
void MergeTreeDataPartCompact::loadIndexGranularity()
|
||||||
{
|
{
|
||||||
//String full_path = getRelativePath();
|
|
||||||
|
|
||||||
if (columns.empty())
|
if (columns.empty())
|
||||||
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
|
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||||
|
|
||||||
if (!index_granularity_info.is_adaptive)
|
if (!index_granularity_info.mark_type.adaptive)
|
||||||
throw Exception("MergeTreeDataPartCompact cannot be created with non-adaptive granulary.", ErrorCodes::NOT_IMPLEMENTED);
|
throw Exception("MergeTreeDataPartCompact cannot be created with non-adaptive granulary.", ErrorCodes::NOT_IMPLEMENTED);
|
||||||
|
|
||||||
auto marks_file_path = index_granularity_info.getMarksFilePath("data");
|
auto marks_file_path = index_granularity_info.getMarksFilePath("data");
|
||||||
@ -131,7 +129,7 @@ bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) co
|
|||||||
return false;
|
return false;
|
||||||
|
|
||||||
auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION);
|
auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION);
|
||||||
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension);
|
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + getMarksFileExtension());
|
||||||
|
|
||||||
return (bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end());
|
return (bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end());
|
||||||
}
|
}
|
||||||
@ -139,7 +137,7 @@ bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) co
|
|||||||
void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) const
|
void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) const
|
||||||
{
|
{
|
||||||
checkConsistencyBase();
|
checkConsistencyBase();
|
||||||
String mrk_file_name = DATA_FILE_NAME + index_granularity_info.marks_file_extension;
|
String mrk_file_name = DATA_FILE_NAME + getMarksFileExtension();
|
||||||
|
|
||||||
if (!checksums.empty())
|
if (!checksums.empty())
|
||||||
{
|
{
|
||||||
|
@ -67,7 +67,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter(
|
|||||||
return std::make_unique<MergeTreeDataPartWriterWide>(
|
return std::make_unique<MergeTreeDataPartWriterWide>(
|
||||||
shared_from_this(), data_part_storage_builder,
|
shared_from_this(), data_part_storage_builder,
|
||||||
columns_list, metadata_snapshot, indices_to_recalc,
|
columns_list, metadata_snapshot, indices_to_recalc,
|
||||||
index_granularity_info.marks_file_extension,
|
getMarksFileExtension(),
|
||||||
default_codec_, writer_settings, computed_index_granularity);
|
default_codec_, writer_settings, computed_index_granularity);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,7 +95,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
|
|||||||
size.data_uncompressed += bin_checksum->second.uncompressed_size;
|
size.data_uncompressed += bin_checksum->second.uncompressed_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension);
|
auto mrk_checksum = checksums.files.find(file_name + getMarksFileExtension());
|
||||||
if (mrk_checksum != checksums.files.end())
|
if (mrk_checksum != checksums.files.end())
|
||||||
size.marks += mrk_checksum->second.file_size;
|
size.marks += mrk_checksum->second.file_size;
|
||||||
});
|
});
|
||||||
@ -103,11 +103,11 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
|
|||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void MergeTreeDataPartWide::loadIndexGranularity()
|
void MergeTreeDataPartWide::loadIndexGranularity()
|
||||||
{
|
{
|
||||||
index_granularity_info.changeGranularityIfRequired(data_part_storage);
|
index_granularity_info.changeGranularityIfRequired(data_part_storage);
|
||||||
|
|
||||||
|
|
||||||
if (columns.empty())
|
if (columns.empty())
|
||||||
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
|
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||||
|
|
||||||
@ -119,55 +119,48 @@ void MergeTreeDataPartWide::loadIndexGranularity()
|
|||||||
std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path));
|
std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path));
|
||||||
|
|
||||||
size_t marks_file_size = data_part_storage->getFileSize(marks_file_path);
|
size_t marks_file_size = data_part_storage->getFileSize(marks_file_path);
|
||||||
if (!index_granularity_info.compress_marks)
|
|
||||||
{
|
|
||||||
if (!index_granularity_info.is_adaptive)
|
|
||||||
{
|
|
||||||
size_t marks_count = marks_file_size / index_granularity_info.getMarkSizeInBytes();
|
|
||||||
index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
auto buffer = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
|
|
||||||
while (!buffer->eof())
|
|
||||||
{
|
|
||||||
buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block
|
|
||||||
size_t granularity;
|
|
||||||
readIntBinary(granularity, *buffer);
|
|
||||||
index_granularity.appendMark(granularity);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (index_granularity.getMarksCount() * index_granularity_info.getMarkSizeInBytes() != marks_file_size)
|
if (!index_granularity_info.mark_type.adaptive && !index_granularity_info.mark_type.compressed)
|
||||||
throw Exception("Cannot read all marks from file " + data_part_storage->getFullPath() + "/" + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA);
|
{
|
||||||
}
|
/// The most easy way - no need to read the file, everything is known from its size.
|
||||||
|
size_t marks_count = marks_file_size / index_granularity_info.getMarkSizeInBytes();
|
||||||
|
index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
CompressedReadBufferFromFile buffer(
|
auto marks_file = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
|
||||||
data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt));
|
|
||||||
|
std::unique_ptr<ReadBuffer> marks_reader;
|
||||||
|
if (!index_granularity_info.mark_type.compressed)
|
||||||
|
marks_reader = std::move(marks_file);
|
||||||
|
else
|
||||||
|
marks_reader = std::make_unique<CompressedReadBufferFromFile>(std::move(marks_file));
|
||||||
|
|
||||||
MarksInCompressedFile mark(1);
|
|
||||||
size_t marks_count = 0;
|
size_t marks_count = 0;
|
||||||
while (!buffer.eof())
|
while (!marks_reader->eof())
|
||||||
{
|
{
|
||||||
buffer.readStrict(reinterpret_cast<char *>(mark.data()), sizeof(size_t) * 2); /// skip offset_in_compressed file and offset_in_decompressed_block
|
MarkInCompressedFile mark;
|
||||||
|
size_t granularity;
|
||||||
|
|
||||||
|
readBinary(mark.offset_in_compressed_file, *marks_reader);
|
||||||
|
readBinary(mark.offset_in_decompressed_block, *marks_reader);
|
||||||
++marks_count;
|
++marks_count;
|
||||||
|
|
||||||
if (index_granularity_info.is_adaptive)
|
if (index_granularity_info.mark_type.adaptive)
|
||||||
{
|
{
|
||||||
size_t granularity;
|
readIntBinary(granularity, *marks_reader);
|
||||||
readIntBinary(granularity, buffer);
|
|
||||||
index_granularity.appendMark(granularity);
|
index_granularity.appendMark(granularity);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!index_granularity_info.is_adaptive)
|
if (!index_granularity_info.mark_type.adaptive)
|
||||||
index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity);
|
index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same
|
||||||
}
|
}
|
||||||
|
|
||||||
index_granularity.setInitialized();
|
index_granularity.setInitialized();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const
|
bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const
|
||||||
{
|
{
|
||||||
return data_part_storage->isStoredOnRemoteDisk();
|
return data_part_storage->isStoredOnRemoteDisk();
|
||||||
@ -186,7 +179,7 @@ MergeTreeDataPartWide::~MergeTreeDataPartWide()
|
|||||||
void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
|
void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
|
||||||
{
|
{
|
||||||
checkConsistencyBase();
|
checkConsistencyBase();
|
||||||
//String path = getRelativePath();
|
std::string marks_file_extension = index_granularity_info.mark_type.getFileExtension();
|
||||||
|
|
||||||
if (!checksums.empty())
|
if (!checksums.empty())
|
||||||
{
|
{
|
||||||
@ -197,7 +190,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
|
|||||||
getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
|
getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
|
||||||
{
|
{
|
||||||
String file_name = ISerialization::getFileNameForStream(name_type, substream_path);
|
String file_name = ISerialization::getFileNameForStream(name_type, substream_path);
|
||||||
String mrk_file_name = file_name + index_granularity_info.marks_file_extension;
|
String mrk_file_name = file_name + marks_file_extension;
|
||||||
String bin_file_name = file_name + DATA_FILE_EXTENSION;
|
String bin_file_name = file_name + DATA_FILE_EXTENSION;
|
||||||
|
|
||||||
if (!checksums.files.contains(mrk_file_name))
|
if (!checksums.files.contains(mrk_file_name))
|
||||||
@ -223,7 +216,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
|
|||||||
{
|
{
|
||||||
getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
|
getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
|
||||||
{
|
{
|
||||||
auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + index_granularity_info.marks_file_extension;
|
auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + marks_file_extension;
|
||||||
|
|
||||||
/// Missing file is Ok for case when new column was added.
|
/// Missing file is Ok for case when new column was added.
|
||||||
if (data_part_storage->exists(file_path))
|
if (data_part_storage->exists(file_path))
|
||||||
@ -251,10 +244,11 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
|
|||||||
|
|
||||||
bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const
|
bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const
|
||||||
{
|
{
|
||||||
auto check_stream_exists = [this](const String & stream_name)
|
std::string marks_file_extension = index_granularity_info.mark_type.getFileExtension();
|
||||||
|
auto check_stream_exists = [this, &marks_file_extension](const String & stream_name)
|
||||||
{
|
{
|
||||||
auto bin_checksum = checksums.files.find(stream_name + DATA_FILE_EXTENSION);
|
auto bin_checksum = checksums.files.find(stream_name + DATA_FILE_EXTENSION);
|
||||||
auto mrk_checksum = checksums.files.find(stream_name + index_granularity_info.marks_file_extension);
|
auto mrk_checksum = checksums.files.find(stream_name + marks_file_extension);
|
||||||
|
|
||||||
return bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end();
|
return bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end();
|
||||||
};
|
};
|
||||||
|
@ -27,15 +27,24 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
|
|||||||
settings.max_compress_block_size,
|
settings.max_compress_block_size,
|
||||||
settings_.query_write_settings))
|
settings_.query_write_settings))
|
||||||
, plain_hashing(*plain_file)
|
, plain_hashing(*plain_file)
|
||||||
, marks_file(data_part_storage_builder->writeFile(
|
{
|
||||||
|
marks_file = data_part_storage_builder->writeFile(
|
||||||
MergeTreeDataPartCompact::DATA_FILE_NAME + marks_file_extension_,
|
MergeTreeDataPartCompact::DATA_FILE_NAME + marks_file_extension_,
|
||||||
4096,
|
4096,
|
||||||
settings_.query_write_settings))
|
settings_.query_write_settings);
|
||||||
, marks_hashing(*marks_file)
|
|
||||||
, marks_compressed_buf(marks_hashing, settings_.getMarksCompressionCodec(), settings_.marks_compress_block_size)
|
marks_file_hashing = std::make_unique<HashingWriteBuffer>(*marks_file);
|
||||||
, marks_compressed(marks_compressed_buf)
|
|
||||||
, is_compress_marks(isCompressedFromMrkExtension(marks_file_extension))
|
if (MarkType(marks_file_extension).compressed)
|
||||||
{
|
{
|
||||||
|
marks_compressor = std::make_unique<CompressedWriteBuffer>(
|
||||||
|
*marks_file_hashing,
|
||||||
|
settings_.getMarksCompressionCodec(),
|
||||||
|
settings_.marks_compress_block_size);
|
||||||
|
|
||||||
|
marks_source_hashing = std::make_unique<HashingWriteBuffer>(*marks_compressor);
|
||||||
|
}
|
||||||
|
|
||||||
const auto & storage_columns = metadata_snapshot->getColumns();
|
const auto & storage_columns = metadata_snapshot->getColumns();
|
||||||
for (const auto & column : columns_list)
|
for (const auto & column : columns_list)
|
||||||
addStreams(column, storage_columns.getCodecDescOrDefault(column.name, default_codec));
|
addStreams(column, storage_columns.getCodecDescOrDefault(column.name, default_codec));
|
||||||
@ -176,6 +185,8 @@ void MergeTreeDataPartWriterCompact::writeDataBlockPrimaryIndexAndSkipIndices(co
|
|||||||
|
|
||||||
void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const Granules & granules)
|
void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const Granules & granules)
|
||||||
{
|
{
|
||||||
|
WriteBuffer & marks_out = marks_source_hashing ? *marks_source_hashing : *marks_file_hashing;
|
||||||
|
|
||||||
for (const auto & granule : granules)
|
for (const auto & granule : granules)
|
||||||
{
|
{
|
||||||
data_written = true;
|
data_written = true;
|
||||||
@ -207,8 +218,8 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
writeIntBinary(plain_hashing.count(), is_compress_marks ? marks_compressed : marks_hashing);
|
writeIntBinary(plain_hashing.count(), marks_out);
|
||||||
writeIntBinary(static_cast<UInt64>(0), is_compress_marks ? marks_compressed : marks_hashing);
|
writeIntBinary(static_cast<UInt64>(0), marks_out);
|
||||||
|
|
||||||
writeColumnSingleGranule(
|
writeColumnSingleGranule(
|
||||||
block.getByName(name_and_type->name), data_part->getSerialization(name_and_type->name),
|
block.getByName(name_and_type->name), data_part->getSerialization(name_and_type->name),
|
||||||
@ -218,7 +229,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
|
|||||||
prev_stream->hashing_buf.next(); //-V522
|
prev_stream->hashing_buf.next(); //-V522
|
||||||
}
|
}
|
||||||
|
|
||||||
writeIntBinary(granule.rows_to_write, is_compress_marks ? marks_compressed : marks_hashing);
|
writeIntBinary(granule.rows_to_write, marks_out);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,21 +254,26 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Check
|
|||||||
assert(stream->hashing_buf.offset() == 0);
|
assert(stream->hashing_buf.offset() == 0);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
WriteBuffer & marks_out = marks_source_hashing ? *marks_source_hashing : *marks_file_hashing;
|
||||||
|
|
||||||
if (with_final_mark && data_written)
|
if (with_final_mark && data_written)
|
||||||
{
|
{
|
||||||
for (size_t i = 0; i < columns_list.size(); ++i)
|
for (size_t i = 0; i < columns_list.size(); ++i)
|
||||||
{
|
{
|
||||||
writeIntBinary(plain_hashing.count(), is_compress_marks ? marks_compressed : marks_hashing);
|
writeIntBinary(plain_hashing.count(), marks_out);
|
||||||
writeIntBinary(static_cast<UInt64>(0), is_compress_marks ? marks_compressed : marks_hashing);
|
writeIntBinary(static_cast<UInt64>(0), marks_out);
|
||||||
}
|
}
|
||||||
writeIntBinary(static_cast<UInt64>(0), is_compress_marks ? marks_compressed : marks_hashing);
|
writeIntBinary(static_cast<UInt64>(0), marks_out);
|
||||||
}
|
}
|
||||||
|
|
||||||
plain_file->next();
|
plain_file->next();
|
||||||
if (is_compress_marks)
|
|
||||||
marks_compressed.next();
|
|
||||||
|
|
||||||
marks_hashing.next();
|
if (marks_source_hashing)
|
||||||
|
marks_source_hashing->next();
|
||||||
|
if (marks_compressor)
|
||||||
|
marks_compressor->next();
|
||||||
|
|
||||||
|
marks_file_hashing->next();
|
||||||
addToChecksums(checksums);
|
addToChecksums(checksums);
|
||||||
|
|
||||||
plain_file->preFinalize();
|
plain_file->preFinalize();
|
||||||
@ -268,6 +284,7 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(bool sync)
|
|||||||
{
|
{
|
||||||
plain_file->finalize();
|
plain_file->finalize();
|
||||||
marks_file->finalize();
|
marks_file->finalize();
|
||||||
|
|
||||||
if (sync)
|
if (sync)
|
||||||
{
|
{
|
||||||
plain_file->sync();
|
plain_file->sync();
|
||||||
@ -339,14 +356,15 @@ void MergeTreeDataPartWriterCompact::addToChecksums(MergeTreeDataPartChecksums &
|
|||||||
checksums.files[data_file_name].file_size = plain_hashing.count();
|
checksums.files[data_file_name].file_size = plain_hashing.count();
|
||||||
checksums.files[data_file_name].file_hash = plain_hashing.getHash();
|
checksums.files[data_file_name].file_hash = plain_hashing.getHash();
|
||||||
|
|
||||||
if (is_compress_marks)
|
if (marks_compressor)
|
||||||
{
|
{
|
||||||
checksums.files[marks_file_name].is_compressed = true;
|
checksums.files[marks_file_name].is_compressed = true;
|
||||||
checksums.files[marks_file_name].uncompressed_size = marks_compressed.count();
|
checksums.files[marks_file_name].uncompressed_size = marks_source_hashing->count();
|
||||||
checksums.files[marks_file_name].uncompressed_hash = marks_compressed.getHash();
|
checksums.files[marks_file_name].uncompressed_hash = marks_source_hashing->getHash();
|
||||||
}
|
}
|
||||||
checksums.files[marks_file_name].file_size = marks_hashing.count();
|
|
||||||
checksums.files[marks_file_name].file_hash = marks_hashing.getHash();
|
checksums.files[marks_file_name].file_size = marks_file_hashing->count();
|
||||||
|
checksums.files[marks_file_name].file_hash = marks_file_hashing->getHash();
|
||||||
}
|
}
|
||||||
|
|
||||||
void MergeTreeDataPartWriterCompact::ColumnsBuffer::add(MutableColumns && columns)
|
void MergeTreeDataPartWriterCompact::ColumnsBuffer::add(MutableColumns && columns)
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
|
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -84,12 +86,16 @@ private:
|
|||||||
/// Stream for each column's substreams path (look at addStreams).
|
/// Stream for each column's substreams path (look at addStreams).
|
||||||
std::unordered_map<String, CompressedStreamPtr> compressed_streams;
|
std::unordered_map<String, CompressedStreamPtr> compressed_streams;
|
||||||
|
|
||||||
/// marks -> marks_file -> marks_compressed_buf -> marks_compressed
|
/// If marks are uncompressed, the data is written to 'marks_file_hashing' for hash calculation and then to the 'marks_file'.
|
||||||
std::unique_ptr<WriteBufferFromFileBase> marks_file;
|
std::unique_ptr<WriteBufferFromFileBase> marks_file;
|
||||||
HashingWriteBuffer marks_hashing;
|
std::unique_ptr<HashingWriteBuffer> marks_file_hashing;
|
||||||
CompressedWriteBuffer marks_compressed_buf;
|
|
||||||
HashingWriteBuffer marks_compressed;
|
/// If marks are compressed, the data is written to 'marks_source_hashing' for hash calculation,
|
||||||
bool is_compress_marks;
|
/// then to 'marks_compressor' for compression,
|
||||||
|
/// then to 'marks_file_hashing' for calculation of hash of compressed data,
|
||||||
|
/// then finally to 'marks_file'.
|
||||||
|
std::unique_ptr<CompressedWriteBuffer> marks_compressor;
|
||||||
|
std::unique_ptr<HashingWriteBuffer> marks_source_hashing;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -13,12 +13,15 @@ namespace ErrorCodes
|
|||||||
|
|
||||||
void MergeTreeDataPartWriterOnDisk::Stream::preFinalize()
|
void MergeTreeDataPartWriterOnDisk::Stream::preFinalize()
|
||||||
{
|
{
|
||||||
compressed.next();
|
compressed_hashing.next();
|
||||||
/// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually.
|
compressor.next();
|
||||||
plain_hashing.next();
|
plain_hashing.next();
|
||||||
|
|
||||||
if (is_compress_marks)
|
if (compress_marks)
|
||||||
marks_compressed.next();
|
{
|
||||||
|
marks_compressed_hashing.next();
|
||||||
|
marks_compressor.next();
|
||||||
|
}
|
||||||
|
|
||||||
marks_hashing.next();
|
marks_hashing.next();
|
||||||
|
|
||||||
@ -60,13 +63,13 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream(
|
|||||||
marks_file_extension{marks_file_extension_},
|
marks_file_extension{marks_file_extension_},
|
||||||
plain_file(data_part_storage_builder->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
|
plain_file(data_part_storage_builder->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
|
||||||
plain_hashing(*plain_file),
|
plain_hashing(*plain_file),
|
||||||
compressed_buf(plain_hashing, compression_codec_, max_compress_block_size_),
|
compressor(plain_hashing, compression_codec_, max_compress_block_size_),
|
||||||
compressed(compressed_buf),
|
compressed_hashing(compressor),
|
||||||
marks_file(data_part_storage_builder->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)),
|
marks_file(data_part_storage_builder->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)),
|
||||||
marks_hashing(*marks_file),
|
marks_hashing(*marks_file),
|
||||||
marks_compressed_buf(marks_hashing, marks_compression_codec_, marks_compress_block_size_),
|
marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_),
|
||||||
marks_compressed(marks_compressed_buf),
|
marks_compressed_hashing(marks_compressor),
|
||||||
is_compress_marks(isCompressedFromMrkExtension(marks_file_extension))
|
compress_marks(MarkType(marks_file_extension).compressed)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,17 +78,18 @@ void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPa
|
|||||||
String name = escaped_column_name;
|
String name = escaped_column_name;
|
||||||
|
|
||||||
checksums.files[name + data_file_extension].is_compressed = true;
|
checksums.files[name + data_file_extension].is_compressed = true;
|
||||||
checksums.files[name + data_file_extension].uncompressed_size = compressed.count();
|
checksums.files[name + data_file_extension].uncompressed_size = compressed_hashing.count();
|
||||||
checksums.files[name + data_file_extension].uncompressed_hash = compressed.getHash();
|
checksums.files[name + data_file_extension].uncompressed_hash = compressed_hashing.getHash();
|
||||||
checksums.files[name + data_file_extension].file_size = plain_hashing.count();
|
checksums.files[name + data_file_extension].file_size = plain_hashing.count();
|
||||||
checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash();
|
checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash();
|
||||||
|
|
||||||
if (is_compress_marks)
|
if (compress_marks)
|
||||||
{
|
{
|
||||||
checksums.files[name + marks_file_extension].is_compressed = true;
|
checksums.files[name + marks_file_extension].is_compressed = true;
|
||||||
checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed.count();
|
checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed_hashing.count();
|
||||||
checksums.files[name + marks_file_extension].uncompressed_hash = marks_compressed.getHash();
|
checksums.files[name + marks_file_extension].uncompressed_hash = marks_compressed_hashing.getHash();
|
||||||
}
|
}
|
||||||
|
|
||||||
checksums.files[name + marks_file_extension].file_size = marks_hashing.count();
|
checksums.files[name + marks_file_extension].file_size = marks_hashing.count();
|
||||||
checksums.files[name + marks_file_extension].file_hash = marks_hashing.getHash();
|
checksums.files[name + marks_file_extension].file_hash = marks_hashing.getHash();
|
||||||
}
|
}
|
||||||
@ -175,15 +179,15 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex()
|
|||||||
{
|
{
|
||||||
String index_name = "primary" + getIndexExtension(compress_primary_key);
|
String index_name = "primary" + getIndexExtension(compress_primary_key);
|
||||||
index_file_stream = data_part_storage_builder->writeFile(index_name, DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings);
|
index_file_stream = data_part_storage_builder->writeFile(index_name, DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings);
|
||||||
index_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
|
index_file_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
|
||||||
|
|
||||||
if (compress_primary_key)
|
if (compress_primary_key)
|
||||||
{
|
{
|
||||||
ParserCodec codec_parser;
|
ParserCodec codec_parser;
|
||||||
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.primary_key_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.primary_key_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||||
CompressionCodecPtr primary_key_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr);
|
CompressionCodecPtr primary_key_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr);
|
||||||
index_compressed_buf = std::make_unique<CompressedWriteBuffer>(*index_hashing_stream, primary_key_compression_codec, settings.primary_key_compress_block_size);
|
index_compressor_stream = std::make_unique<CompressedWriteBuffer>(*index_file_hashing_stream, primary_key_compression_codec, settings.primary_key_compress_block_size);
|
||||||
index_compressed_stream = std::make_unique<HashingWriteBuffer>(*index_compressed_buf);
|
index_source_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_compressor_stream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -242,7 +246,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc
|
|||||||
const auto & primary_column = primary_index_block.getByPosition(j);
|
const auto & primary_column = primary_index_block.getByPosition(j);
|
||||||
index_columns[j]->insertFrom(*primary_column.column, granule.start_row);
|
index_columns[j]->insertFrom(*primary_column.column, granule.start_row);
|
||||||
primary_column.type->getDefaultSerialization()->serializeBinary(
|
primary_column.type->getDefaultSerialization()->serializeBinary(
|
||||||
*primary_column.column, granule.start_row, compress_primary_key ? *index_compressed_stream : *index_hashing_stream);
|
*primary_column.column, granule.start_row, compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -260,11 +264,13 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
|
|||||||
{
|
{
|
||||||
const auto index_helper = skip_indices[i];
|
const auto index_helper = skip_indices[i];
|
||||||
auto & stream = *skip_indices_streams[i];
|
auto & stream = *skip_indices_streams[i];
|
||||||
|
WriteBuffer & marks_out = stream.compress_marks ? stream.marks_compressed_hashing : stream.marks_hashing;
|
||||||
|
|
||||||
for (const auto & granule : granules_to_write)
|
for (const auto & granule : granules_to_write)
|
||||||
{
|
{
|
||||||
if (skip_index_accumulated_marks[i] == index_helper->index.granularity)
|
if (skip_index_accumulated_marks[i] == index_helper->index.granularity)
|
||||||
{
|
{
|
||||||
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
|
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed_hashing);
|
||||||
skip_index_accumulated_marks[i] = 0;
|
skip_index_accumulated_marks[i] = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -272,15 +278,16 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
|
|||||||
{
|
{
|
||||||
skip_indices_aggregators[i] = index_helper->createIndexAggregator();
|
skip_indices_aggregators[i] = index_helper->createIndexAggregator();
|
||||||
|
|
||||||
if (stream.compressed.offset() >= settings.min_compress_block_size)
|
if (stream.compressed_hashing.offset() >= settings.min_compress_block_size)
|
||||||
stream.compressed.next();
|
stream.compressed_hashing.next();
|
||||||
|
|
||||||
|
writeIntBinary(stream.plain_hashing.count(), marks_out);
|
||||||
|
writeIntBinary(stream.compressed_hashing.offset(), marks_out);
|
||||||
|
|
||||||
writeIntBinary(stream.plain_hashing.count(), stream.is_compress_marks ? stream.marks_compressed : stream.marks_hashing);
|
|
||||||
writeIntBinary(stream.compressed.offset(), stream.is_compress_marks ? stream.marks_compressed : stream.marks_hashing);
|
|
||||||
/// Actually this numbers is redundant, but we have to store them
|
/// Actually this numbers is redundant, but we have to store them
|
||||||
/// to be compatible with normal .mrk2 file format
|
/// to be compatible with the normal .mrk2 file format
|
||||||
if (settings.can_use_adaptive_granularity)
|
if (settings.can_use_adaptive_granularity)
|
||||||
writeIntBinary(1UL, stream.is_compress_marks ? stream.marks_compressed : stream.marks_hashing);
|
writeIntBinary(1UL, marks_out);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t pos = granule.start_row;
|
size_t pos = granule.start_row;
|
||||||
@ -297,7 +304,7 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
|
|||||||
if (write_final_mark && compute_granularity)
|
if (write_final_mark && compute_granularity)
|
||||||
index_granularity.appendMark(0);
|
index_granularity.appendMark(0);
|
||||||
|
|
||||||
if (index_hashing_stream)
|
if (index_file_hashing_stream)
|
||||||
{
|
{
|
||||||
if (write_final_mark)
|
if (write_final_mark)
|
||||||
{
|
{
|
||||||
@ -307,32 +314,32 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
|
|||||||
size_t last_row_number = column.size() - 1;
|
size_t last_row_number = column.size() - 1;
|
||||||
index_columns[j]->insertFrom(column, last_row_number);
|
index_columns[j]->insertFrom(column, last_row_number);
|
||||||
index_types[j]->getDefaultSerialization()->serializeBinary(
|
index_types[j]->getDefaultSerialization()->serializeBinary(
|
||||||
column, last_row_number, compress_primary_key ? *index_compressed_stream : *index_hashing_stream);
|
column, last_row_number, compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream);
|
||||||
}
|
}
|
||||||
last_block_index_columns.clear();
|
last_block_index_columns.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (compress_primary_key)
|
if (compress_primary_key)
|
||||||
index_compressed_stream->next();
|
index_source_hashing_stream->next();
|
||||||
|
|
||||||
index_hashing_stream->next();
|
index_file_hashing_stream->next();
|
||||||
|
|
||||||
String index_name = "primary" + getIndexExtension(compress_primary_key);
|
String index_name = "primary" + getIndexExtension(compress_primary_key);
|
||||||
if (compress_primary_key)
|
if (compress_primary_key)
|
||||||
{
|
{
|
||||||
checksums.files[index_name].is_compressed = true;
|
checksums.files[index_name].is_compressed = true;
|
||||||
checksums.files[index_name].uncompressed_size = index_compressed_stream->count();
|
checksums.files[index_name].uncompressed_size = index_source_hashing_stream->count();
|
||||||
checksums.files[index_name].uncompressed_hash = index_compressed_stream->getHash();
|
checksums.files[index_name].uncompressed_hash = index_source_hashing_stream->getHash();
|
||||||
}
|
}
|
||||||
checksums.files[index_name].file_size = index_hashing_stream->count();
|
checksums.files[index_name].file_size = index_file_hashing_stream->count();
|
||||||
checksums.files[index_name].file_hash = index_hashing_stream->getHash();
|
checksums.files[index_name].file_hash = index_file_hashing_stream->getHash();
|
||||||
index_file_stream->preFinalize();
|
index_file_stream->preFinalize();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(bool sync)
|
void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(bool sync)
|
||||||
{
|
{
|
||||||
if (index_hashing_stream)
|
if (index_file_hashing_stream)
|
||||||
{
|
{
|
||||||
index_file_stream->finalize();
|
index_file_stream->finalize();
|
||||||
if (sync)
|
if (sync)
|
||||||
@ -340,10 +347,10 @@ void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(bool sync)
|
|||||||
|
|
||||||
if (compress_primary_key)
|
if (compress_primary_key)
|
||||||
{
|
{
|
||||||
index_compressed_stream = nullptr;
|
index_source_hashing_stream = nullptr;
|
||||||
index_compressed_buf = nullptr;
|
index_compressor_stream = nullptr;
|
||||||
}
|
}
|
||||||
index_hashing_stream = nullptr;
|
index_file_hashing_stream = nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -353,7 +360,7 @@ void MergeTreeDataPartWriterOnDisk::fillSkipIndicesChecksums(MergeTreeData::Data
|
|||||||
{
|
{
|
||||||
auto & stream = *skip_indices_streams[i];
|
auto & stream = *skip_indices_streams[i];
|
||||||
if (!skip_indices_aggregators[i]->empty())
|
if (!skip_indices_aggregators[i]->empty())
|
||||||
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
|
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed_hashing);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto & stream : skip_indices_streams)
|
for (auto & stream : skip_indices_streams)
|
||||||
|
@ -65,18 +65,18 @@ public:
|
|||||||
std::string data_file_extension;
|
std::string data_file_extension;
|
||||||
std::string marks_file_extension;
|
std::string marks_file_extension;
|
||||||
|
|
||||||
/// compressed -> compressed_buf -> plain_hashing -> plain_file
|
/// compressed_hashing -> compressor -> plain_hashing -> plain_file
|
||||||
std::unique_ptr<WriteBufferFromFileBase> plain_file;
|
std::unique_ptr<WriteBufferFromFileBase> plain_file;
|
||||||
HashingWriteBuffer plain_hashing;
|
HashingWriteBuffer plain_hashing;
|
||||||
CompressedWriteBuffer compressed_buf;
|
CompressedWriteBuffer compressor;
|
||||||
HashingWriteBuffer compressed;
|
HashingWriteBuffer compressed_hashing;
|
||||||
|
|
||||||
/// marks -> marks_file -> marks_compressed_buf -> marks_compressed
|
/// marks_compressed_hashing -> marks_compressor -> marks_hashing -> marks_file
|
||||||
std::unique_ptr<WriteBufferFromFileBase> marks_file;
|
std::unique_ptr<WriteBufferFromFileBase> marks_file;
|
||||||
HashingWriteBuffer marks_hashing;
|
HashingWriteBuffer marks_hashing;
|
||||||
CompressedWriteBuffer marks_compressed_buf;
|
CompressedWriteBuffer marks_compressor;
|
||||||
HashingWriteBuffer marks_compressed;
|
HashingWriteBuffer marks_compressed_hashing;
|
||||||
bool is_compress_marks;
|
bool compress_marks;
|
||||||
|
|
||||||
bool is_prefinalized = false;
|
bool is_prefinalized = false;
|
||||||
|
|
||||||
@ -145,9 +145,9 @@ protected:
|
|||||||
std::vector<size_t> skip_index_accumulated_marks;
|
std::vector<size_t> skip_index_accumulated_marks;
|
||||||
|
|
||||||
std::unique_ptr<WriteBufferFromFileBase> index_file_stream;
|
std::unique_ptr<WriteBufferFromFileBase> index_file_stream;
|
||||||
std::unique_ptr<HashingWriteBuffer> index_hashing_stream;
|
std::unique_ptr<HashingWriteBuffer> index_file_hashing_stream;
|
||||||
std::unique_ptr<CompressedWriteBuffer> index_compressed_buf;
|
std::unique_ptr<CompressedWriteBuffer> index_compressor_stream;
|
||||||
std::unique_ptr<HashingWriteBuffer> index_compressed_stream;
|
std::unique_ptr<HashingWriteBuffer> index_source_hashing_stream;
|
||||||
bool compress_primary_key;
|
bool compress_primary_key;
|
||||||
|
|
||||||
DataTypes index_types;
|
DataTypes index_types;
|
||||||
|
@ -30,6 +30,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
|
|||||||
|
|
||||||
Granules result;
|
Granules result;
|
||||||
size_t current_row = 0;
|
size_t current_row = 0;
|
||||||
|
|
||||||
/// When our last mark is not finished yet and we have to write rows into it
|
/// When our last mark is not finished yet and we have to write rows into it
|
||||||
if (rows_written_in_last_mark > 0)
|
if (rows_written_in_last_mark > 0)
|
||||||
{
|
{
|
||||||
@ -43,7 +44,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
|
|||||||
.is_complete = (rows_left_in_block >= rows_left_in_last_mark),
|
.is_complete = (rows_left_in_block >= rows_left_in_last_mark),
|
||||||
});
|
});
|
||||||
current_row += result.back().rows_to_write;
|
current_row += result.back().rows_to_write;
|
||||||
current_mark++;
|
++current_mark;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Calculating normal granules for block
|
/// Calculating normal granules for block
|
||||||
@ -61,7 +62,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
|
|||||||
.is_complete = (rows_left_in_block >= expected_rows_in_mark),
|
.is_complete = (rows_left_in_block >= expected_rows_in_mark),
|
||||||
});
|
});
|
||||||
current_row += result.back().rows_to_write;
|
current_row += result.back().rows_to_write;
|
||||||
current_mark++;
|
++current_mark;
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
@ -144,7 +145,7 @@ ISerialization::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGett
|
|||||||
if (is_offsets && offset_columns.contains(stream_name))
|
if (is_offsets && offset_columns.contains(stream_name))
|
||||||
return nullptr;
|
return nullptr;
|
||||||
|
|
||||||
return &column_streams.at(stream_name)->compressed;
|
return &column_streams.at(stream_name)->compressed_hashing;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -272,10 +273,12 @@ void MergeTreeDataPartWriterWide::writeSingleMark(
|
|||||||
void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark)
|
void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark)
|
||||||
{
|
{
|
||||||
Stream & stream = *column_streams[stream_with_mark.stream_name];
|
Stream & stream = *column_streams[stream_with_mark.stream_name];
|
||||||
writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, stream.is_compress_marks ? stream.marks_compressed : stream.marks_hashing);
|
WriteBuffer & marks_out = stream.compress_marks ? stream.marks_compressed_hashing : stream.marks_hashing;
|
||||||
writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, stream.is_compress_marks ? stream.marks_compressed : stream.marks_hashing);
|
|
||||||
|
writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, marks_out);
|
||||||
|
writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, marks_out);
|
||||||
if (settings.can_use_adaptive_granularity)
|
if (settings.can_use_adaptive_granularity)
|
||||||
writeIntBinary(rows_in_mark, stream.is_compress_marks ? stream.marks_compressed : stream.marks_hashing);
|
writeIntBinary(rows_in_mark, marks_out);
|
||||||
}
|
}
|
||||||
|
|
||||||
StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
|
StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
|
||||||
@ -297,13 +300,13 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
|
|||||||
Stream & stream = *column_streams[stream_name];
|
Stream & stream = *column_streams[stream_name];
|
||||||
|
|
||||||
/// There could already be enough data to compress into the new block.
|
/// There could already be enough data to compress into the new block.
|
||||||
if (stream.compressed.offset() >= settings.min_compress_block_size)
|
if (stream.compressed_hashing.offset() >= settings.min_compress_block_size)
|
||||||
stream.compressed.next();
|
stream.compressed_hashing.next();
|
||||||
|
|
||||||
StreamNameAndMark stream_with_mark;
|
StreamNameAndMark stream_with_mark;
|
||||||
stream_with_mark.stream_name = stream_name;
|
stream_with_mark.stream_name = stream_name;
|
||||||
stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing.count();
|
stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing.count();
|
||||||
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed.offset();
|
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed_hashing.offset();
|
||||||
|
|
||||||
result.push_back(stream_with_mark);
|
result.push_back(stream_with_mark);
|
||||||
}, path);
|
}, path);
|
||||||
@ -333,7 +336,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
|
|||||||
if (is_offsets && offset_columns.contains(stream_name))
|
if (is_offsets && offset_columns.contains(stream_name))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
column_streams[stream_name]->compressed.nextIfAtEnd();
|
column_streams[stream_name]->compressed_hashing.nextIfAtEnd();
|
||||||
}, serialize_settings.path);
|
}, serialize_settings.path);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -428,7 +431,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
|
|||||||
|
|
||||||
auto mrk_file_in = data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt);
|
auto mrk_file_in = data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt);
|
||||||
std::unique_ptr<ReadBuffer> mrk_in;
|
std::unique_ptr<ReadBuffer> mrk_in;
|
||||||
if (data_part->index_granularity_info.compress_marks)
|
if (data_part->index_granularity_info.mark_type.compressed)
|
||||||
mrk_in = std::make_unique<CompressedReadBufferFromFile>(std::move(mrk_file_in));
|
mrk_in = std::make_unique<CompressedReadBufferFromFile>(std::move(mrk_file_in));
|
||||||
else
|
else
|
||||||
mrk_in = std::move(mrk_file_in);
|
mrk_in = std::move(mrk_file_in);
|
||||||
|
@ -9,75 +9,120 @@ namespace DB
|
|||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
extern const int NOT_IMPLEMENTED;
|
extern const int NOT_IMPLEMENTED;
|
||||||
extern const int UNKNOWN_PART_TYPE;
|
extern const int UNKNOWN_PART_TYPE;
|
||||||
|
extern const int INCORRECT_FILE_NAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
MarkType::MarkType(std::string_view extension)
|
||||||
|
{
|
||||||
|
if (extension.starts_with('.'))
|
||||||
|
extension = extension.substr(1);
|
||||||
|
|
||||||
|
if (extension.starts_with('c'))
|
||||||
|
{
|
||||||
|
compressed = true;
|
||||||
|
extension = extension.substr(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!extension.starts_with("mrk"))
|
||||||
|
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Mark file extension does not start with .mrk or .cmrk: {}", extension);
|
||||||
|
|
||||||
|
extension = extension.substr(strlen("mrk"));
|
||||||
|
|
||||||
|
if (extension.empty())
|
||||||
|
{
|
||||||
|
adaptive = false;
|
||||||
|
part_type = MergeTreeDataPartType::Wide;
|
||||||
|
}
|
||||||
|
else if (extension == "2")
|
||||||
|
{
|
||||||
|
adaptive = true;
|
||||||
|
part_type = MergeTreeDataPartType::Wide;
|
||||||
|
}
|
||||||
|
else if (extension == "3")
|
||||||
|
{
|
||||||
|
adaptive = true;
|
||||||
|
part_type = MergeTreeDataPartType::Compact;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Unknown mark file extension: '{}'", extension);
|
||||||
|
}
|
||||||
|
|
||||||
|
MarkType::MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_)
|
||||||
|
: adaptive(adaptive_), compressed(compressed_), part_type(part_type_)
|
||||||
|
{
|
||||||
|
if (!adaptive && part_type != MergeTreeDataPartType::Wide)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity");
|
||||||
|
if (part_type == MergeTreeDataPartType::Unknown)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type");
|
||||||
|
}
|
||||||
|
|
||||||
|
bool MarkType::isMarkFileExtension(std::string_view extension)
|
||||||
|
{
|
||||||
|
return extension.find("mrk") != std::string_view::npos;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string MarkType::getFileExtension() const
|
||||||
|
{
|
||||||
|
std::string res = compressed ? ".cmrk" : ".mrk";
|
||||||
|
|
||||||
|
if (!adaptive)
|
||||||
|
{
|
||||||
|
if (part_type != MergeTreeDataPartType::Wide)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity");
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (part_type)
|
||||||
|
{
|
||||||
|
case MergeTreeDataPartType::Wide:
|
||||||
|
return res + "2";
|
||||||
|
case MergeTreeDataPartType::Compact:
|
||||||
|
return res + "3";
|
||||||
|
case MergeTreeDataPartType::InMemory:
|
||||||
|
return "";
|
||||||
|
case MergeTreeDataPartType::Unknown:
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage)
|
std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage)
|
||||||
{
|
{
|
||||||
if (data_part_storage->exists())
|
if (data_part_storage->exists())
|
||||||
{
|
|
||||||
for (auto it = data_part_storage->iterate(); it->isValid(); it->next())
|
for (auto it = data_part_storage->iterate(); it->isValid(); it->next())
|
||||||
{
|
if (it->isFile())
|
||||||
const auto & ext = fs::path(it->name()).extension();
|
if (std::string ext = fs::path(it->name()).extension(); MarkType::isMarkFileExtension(ext))
|
||||||
if (ext == getNonAdaptiveMrkExtension(false)
|
return ext;
|
||||||
|| ext == getNonAdaptiveMrkExtension(true)
|
|
||||||
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, false)
|
|
||||||
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true)
|
|
||||||
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, false)
|
|
||||||
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true))
|
|
||||||
return ext;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_)
|
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_)
|
||||||
: type(type_)
|
: mark_type(storage.canUseAdaptiveGranularity(), storage.getSettings()->compress_marks, type_.getValue())
|
||||||
{
|
{
|
||||||
const auto storage_settings = storage.getSettings();
|
fixed_index_granularity = storage.getSettings()->index_granularity;
|
||||||
fixed_index_granularity = storage_settings->index_granularity;
|
|
||||||
compress_marks = storage_settings->compress_marks;
|
|
||||||
|
|
||||||
/// Granularity is fixed
|
|
||||||
if (!storage.canUseAdaptiveGranularity())
|
|
||||||
{
|
|
||||||
if (type != MergeTreeDataPartType::Wide)
|
|
||||||
throw Exception("Only Wide parts can be used with non-adaptive granularity.", ErrorCodes::NOT_IMPLEMENTED);
|
|
||||||
setNonAdaptive();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
setAdaptive(storage_settings->index_granularity_bytes);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage)
|
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage)
|
||||||
{
|
{
|
||||||
auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage);
|
auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage);
|
||||||
if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension(compress_marks))
|
if (mrk_ext && !MarkType(*mrk_ext).adaptive)
|
||||||
setNonAdaptive();
|
{
|
||||||
}
|
mark_type.adaptive = false;
|
||||||
|
index_granularity_bytes = 0;
|
||||||
void MergeTreeIndexGranularityInfo::setAdaptive(size_t index_granularity_bytes_)
|
}
|
||||||
{
|
|
||||||
is_adaptive = true;
|
|
||||||
marks_file_extension = getAdaptiveMrkExtension(type, compress_marks);
|
|
||||||
index_granularity_bytes = index_granularity_bytes_;
|
|
||||||
}
|
|
||||||
|
|
||||||
void MergeTreeIndexGranularityInfo::setNonAdaptive()
|
|
||||||
{
|
|
||||||
is_adaptive = false;
|
|
||||||
marks_file_extension = getNonAdaptiveMrkExtension(compress_marks);
|
|
||||||
index_granularity_bytes = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t MergeTreeIndexGranularityInfo::getMarkSizeInBytes(size_t columns_num) const
|
size_t MergeTreeIndexGranularityInfo::getMarkSizeInBytes(size_t columns_num) const
|
||||||
{
|
{
|
||||||
if (type == MergeTreeDataPartType::Wide)
|
if (mark_type.part_type == MergeTreeDataPartType::Wide)
|
||||||
return is_adaptive ? getAdaptiveMrkSizeWide() : getNonAdaptiveMrkSizeWide();
|
return mark_type.adaptive ? getAdaptiveMrkSizeWide() : getNonAdaptiveMrkSizeWide();
|
||||||
else if (type == MergeTreeDataPartType::Compact)
|
else if (mark_type.part_type == MergeTreeDataPartType::Compact)
|
||||||
return getAdaptiveMrkSizeCompact(columns_num);
|
return getAdaptiveMrkSizeCompact(columns_num);
|
||||||
else if (type == MergeTreeDataPartType::InMemory)
|
else if (mark_type.part_type == MergeTreeDataPartType::InMemory)
|
||||||
return 0;
|
return 0;
|
||||||
else
|
else
|
||||||
throw Exception("Unknown part type", ErrorCodes::UNKNOWN_PART_TYPE);
|
throw Exception("Unknown part type", ErrorCodes::UNKNOWN_PART_TYPE);
|
||||||
@ -89,16 +134,4 @@ size_t getAdaptiveMrkSizeCompact(size_t columns_num)
|
|||||||
return sizeof(UInt64) * (columns_num * 2 + 1);
|
return sizeof(UInt64) * (columns_num * 2 + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type, bool compress_marks)
|
|
||||||
{
|
|
||||||
if (part_type == MergeTreeDataPartType::Wide)
|
|
||||||
return compress_marks ? ".cmrk2" : ".mrk2";
|
|
||||||
else if (part_type == MergeTreeDataPartType::Compact)
|
|
||||||
return compress_marks ? ".cmrk3" : ".mrk3";
|
|
||||||
else if (part_type == MergeTreeDataPartType::InMemory)
|
|
||||||
return "";
|
|
||||||
else
|
|
||||||
throw Exception("Unknown part type", ErrorCodes::UNKNOWN_PART_TYPE);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -11,15 +11,30 @@ namespace DB
|
|||||||
|
|
||||||
class MergeTreeData;
|
class MergeTreeData;
|
||||||
|
|
||||||
|
|
||||||
|
/** Various types of mark files are stored in files with various extensions:
|
||||||
|
* .mrk, .mrk2, .mrk3, .cmrk, .cmrk2, .cmrk3.
|
||||||
|
* This helper allows to obtain mark type from file extension and vise versa.
|
||||||
|
*/
|
||||||
|
struct MarkType
|
||||||
|
{
|
||||||
|
MarkType(std::string_view extension);
|
||||||
|
MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_);
|
||||||
|
|
||||||
|
static bool isMarkFileExtension(std::string_view extension);
|
||||||
|
std::string getFileExtension() const;
|
||||||
|
|
||||||
|
bool adaptive = false;
|
||||||
|
bool compressed = false;
|
||||||
|
MergeTreeDataPartType::Value part_type = MergeTreeDataPartType::Unknown;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
/// Meta information about index granularity
|
/// Meta information about index granularity
|
||||||
struct MergeTreeIndexGranularityInfo
|
struct MergeTreeIndexGranularityInfo
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
/// Marks file extension '.mrk' or '.mrk2'
|
MarkType mark_type;
|
||||||
String marks_file_extension;
|
|
||||||
|
|
||||||
/// Is stride in rows between marks non fixed?
|
|
||||||
bool is_adaptive = false;
|
|
||||||
|
|
||||||
/// Fixed size in rows of one granule if index_granularity_bytes is zero
|
/// Fixed size in rows of one granule if index_granularity_bytes is zero
|
||||||
size_t fixed_index_granularity = 0;
|
size_t fixed_index_granularity = 0;
|
||||||
@ -27,41 +42,29 @@ public:
|
|||||||
/// Approximate bytes size of one granule
|
/// Approximate bytes size of one granule
|
||||||
size_t index_granularity_bytes = 0;
|
size_t index_granularity_bytes = 0;
|
||||||
|
|
||||||
/// Whether to compress marks
|
|
||||||
bool compress_marks;
|
|
||||||
|
|
||||||
MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_);
|
MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_);
|
||||||
|
|
||||||
void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage);
|
void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage);
|
||||||
|
|
||||||
String getMarksFilePath(const String & path_prefix) const
|
String getMarksFilePath(const String & path_prefix) const
|
||||||
{
|
{
|
||||||
return path_prefix + marks_file_extension;
|
return path_prefix + mark_type.getFileExtension();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// TODO: This is non-passable (method overload), remove before merge.
|
||||||
String getMarksFilePath(const DataPartStoragePtr & data_part_storage, const String & path_prefix) const
|
String getMarksFilePath(const DataPartStoragePtr & data_part_storage, const String & path_prefix) const
|
||||||
{
|
{
|
||||||
auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage);
|
auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage);
|
||||||
if (mrk_ext)
|
return path_prefix + mrk_ext.value_or(mark_type.getFileExtension());
|
||||||
return path_prefix + *mrk_ext;
|
|
||||||
|
|
||||||
return path_prefix + marks_file_extension;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t getMarkSizeInBytes(size_t columns_num = 1) const;
|
size_t getMarkSizeInBytes(size_t columns_num = 1) const;
|
||||||
|
|
||||||
static std::optional<std::string> getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);
|
static std::optional<std::string> getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);
|
||||||
|
|
||||||
private:
|
|
||||||
MergeTreeDataPartType type;
|
|
||||||
void setAdaptive(size_t index_granularity_bytes_);
|
|
||||||
void setNonAdaptive();
|
|
||||||
};
|
};
|
||||||
|
|
||||||
constexpr inline auto getNonAdaptiveMrkExtension(bool compress_marks) { return compress_marks ? ".cmrk" : ".mrk"; }
|
|
||||||
constexpr inline auto getNonAdaptiveMrkSizeWide() { return sizeof(UInt64) * 2; }
|
constexpr inline auto getNonAdaptiveMrkSizeWide() { return sizeof(UInt64) * 2; }
|
||||||
constexpr inline auto getAdaptiveMrkSizeWide() { return sizeof(UInt64) * 3; }
|
constexpr inline auto getAdaptiveMrkSizeWide() { return sizeof(UInt64) * 3; }
|
||||||
inline size_t getAdaptiveMrkSizeCompact(size_t columns_num);
|
inline size_t getAdaptiveMrkSizeCompact(size_t columns_num);
|
||||||
std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type, bool compress_marks);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,6 @@ MergeTreeMarksLoader::MergeTreeMarksLoader(
|
|||||||
: data_part_storage(std::move(data_part_storage_))
|
: data_part_storage(std::move(data_part_storage_))
|
||||||
, mark_cache(mark_cache_)
|
, mark_cache(mark_cache_)
|
||||||
, mrk_path(mrk_path_)
|
, mrk_path(mrk_path_)
|
||||||
, is_compress_marks(isCompressedFromMrkExtension(fs::path(mrk_path_).extension()))
|
|
||||||
, marks_count(marks_count_)
|
, marks_count(marks_count_)
|
||||||
, index_granularity_info(index_granularity_info_)
|
, index_granularity_info(index_granularity_info_)
|
||||||
, save_marks_in_cache(save_marks_in_cache_)
|
, save_marks_in_cache(save_marks_in_cache_)
|
||||||
@ -62,9 +61,12 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
|
|||||||
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
|
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
|
||||||
size_t expected_uncompressed_size = mark_size * marks_count;
|
size_t expected_uncompressed_size = mark_size * marks_count;
|
||||||
|
|
||||||
|
std::string file_extension = fs::path(mrk_path).extension();
|
||||||
|
bool compressed_marks = MarkType(file_extension).compressed;
|
||||||
|
|
||||||
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
|
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
|
||||||
|
|
||||||
if (!is_compress_marks && expected_uncompressed_size != file_size)
|
if (!compressed_marks && expected_uncompressed_size != file_size)
|
||||||
throw Exception(
|
throw Exception(
|
||||||
ErrorCodes::CORRUPTED_DATA,
|
ErrorCodes::CORRUPTED_DATA,
|
||||||
"Bad size of marks file '{}': {}, must be: {}",
|
"Bad size of marks file '{}': {}, must be: {}",
|
||||||
@ -73,12 +75,12 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
|
|||||||
|
|
||||||
auto buffer = data_part_storage->readFile(mrk_path, read_settings.adjustBufferSize(file_size), file_size, std::nullopt);
|
auto buffer = data_part_storage->readFile(mrk_path, read_settings.adjustBufferSize(file_size), file_size, std::nullopt);
|
||||||
std::unique_ptr<ReadBuffer> reader;
|
std::unique_ptr<ReadBuffer> reader;
|
||||||
if (!is_compress_marks)
|
if (!compressed_marks)
|
||||||
reader = std::move(buffer);
|
reader = std::move(buffer);
|
||||||
else
|
else
|
||||||
reader = std::make_unique<CompressedReadBufferFromFile>(std::move(buffer));
|
reader = std::make_unique<CompressedReadBufferFromFile>(std::move(buffer));
|
||||||
|
|
||||||
if (!index_granularity_info.is_adaptive)
|
if (!index_granularity_info.mark_type.adaptive)
|
||||||
{
|
{
|
||||||
/// Read directly to marks.
|
/// Read directly to marks.
|
||||||
reader->readStrict(reinterpret_cast<char *>(res->data()), expected_uncompressed_size);
|
reader->readStrict(reinterpret_cast<char *>(res->data()), expected_uncompressed_size);
|
||||||
|
@ -31,7 +31,6 @@ private:
|
|||||||
DataPartStoragePtr data_part_storage;
|
DataPartStoragePtr data_part_storage;
|
||||||
MarkCache * mark_cache = nullptr;
|
MarkCache * mark_cache = nullptr;
|
||||||
String mrk_path;
|
String mrk_path;
|
||||||
bool is_compress_marks;
|
|
||||||
size_t marks_count;
|
size_t marks_count;
|
||||||
const MergeTreeIndexGranularityInfo & index_granularity_info;
|
const MergeTreeIndexGranularityInfo & index_granularity_info;
|
||||||
bool save_marks_in_cache = false;
|
bool save_marks_in_cache = false;
|
||||||
|
@ -33,7 +33,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
|
|||||||
storage.getContext()->getSettings(),
|
storage.getContext()->getSettings(),
|
||||||
write_settings,
|
write_settings,
|
||||||
storage.getSettings(),
|
storage.getSettings(),
|
||||||
data_part->index_granularity_info.is_adaptive,
|
data_part->index_granularity_info.mark_type.adaptive,
|
||||||
/* rewrite_primary_key = */ true,
|
/* rewrite_primary_key = */ true,
|
||||||
blocks_are_granules_size);
|
blocks_are_granules_size);
|
||||||
|
|
||||||
|
@ -30,8 +30,8 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
|
|||||||
global_settings,
|
global_settings,
|
||||||
data_part->storage.getContext()->getWriteSettings(),
|
data_part->storage.getContext()->getWriteSettings(),
|
||||||
storage_settings,
|
storage_settings,
|
||||||
index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity(),
|
index_granularity_info ? index_granularity_info->mark_type.adaptive : data_part->storage.canUseAdaptiveGranularity(),
|
||||||
/* rewrite_primary_key = */false);
|
/* rewrite_primary_key = */ false);
|
||||||
|
|
||||||
writer = data_part->getWriter(
|
writer = data_part->getWriter(
|
||||||
data_part_storage_builder,
|
data_part_storage_builder,
|
||||||
|
@ -1570,7 +1570,7 @@ bool MutateTask::prepare()
|
|||||||
ctx->new_data_part->partition.assign(ctx->source_part->partition);
|
ctx->new_data_part->partition.assign(ctx->source_part->partition);
|
||||||
|
|
||||||
/// Don't change granularity type while mutating subset of columns
|
/// Don't change granularity type while mutating subset of columns
|
||||||
ctx->mrk_extension = ctx->source_part->index_granularity_info.marks_file_extension;
|
ctx->mrk_extension = ctx->source_part->index_granularity_info.mark_type.getFileExtension();
|
||||||
|
|
||||||
const auto data_settings = ctx->data->getSettings();
|
const auto data_settings = ctx->data->getSettings();
|
||||||
ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings);
|
ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings);
|
||||||
|
@ -261,7 +261,7 @@ void StorageSystemPartsColumns::processNextStorage(
|
|||||||
size.data_uncompressed += bin_checksum->second.uncompressed_size;
|
size.data_uncompressed += bin_checksum->second.uncompressed_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto mrk_checksum = part->checksums.files.find(file_name + part->index_granularity_info.marks_file_extension);
|
auto mrk_checksum = part->checksums.files.find(file_name + part->index_granularity_info.mark_type.getFileExtension());
|
||||||
if (mrk_checksum != part->checksums.files.end())
|
if (mrk_checksum != part->checksums.files.end())
|
||||||
size.marks += mrk_checksum->second.file_size;
|
size.marks += mrk_checksum->second.file_size;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user