Make it slightly more sane

This commit is contained in:
Alexey Milovidov 2022-09-05 07:26:58 +02:00
parent b4eec0e6f4
commit d7127e4b2d
15 changed files with 162 additions and 210 deletions

View File

@ -341,7 +341,7 @@ struct PartRangesReadInfo
sum_marks_in_parts[i] = parts[i].getMarksCount(); sum_marks_in_parts[i] = parts[i].getMarksCount();
sum_marks += sum_marks_in_parts[i]; sum_marks += sum_marks_in_parts[i];
if (parts[i].data_part->index_granularity_info.is_adaptive) if (parts[i].data_part->index_granularity_info.mark_type.adaptive)
++adaptive_parts; ++adaptive_parts;
} }

View File

@ -65,7 +65,6 @@ namespace ErrorCodes
extern const int BAD_TTL_FILE; extern const int BAD_TTL_FILE;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int NO_SUCH_COLUMN_IN_TABLE; extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int INCORRECT_FILE_NAME;
} }
@ -1629,7 +1628,7 @@ void IMergeTreeDataPart::calculateSecondaryIndicesSizesOnDisk()
auto index_name_escaped = escapeForFileName(index_name); auto index_name_escaped = escapeForFileName(index_name);
auto index_file_name = index_name_escaped + index_ptr->getSerializedFileExtension(); auto index_file_name = index_name_escaped + index_ptr->getSerializedFileExtension();
auto index_marks_file_name = index_name_escaped + index_granularity_info.marks_file_extension; auto index_marks_file_name = index_name_escaped + getMarksFileExtension();
/// If part does not contain index /// If part does not contain index
auto bin_checksum = checksums.files.find(index_file_name); auto bin_checksum = checksums.files.find(index_file_name);
@ -1809,74 +1808,4 @@ bool isCompressedFromIndexExtension(const String & index_extension)
return index_extension == getIndexExtension(true); return index_extension == getIndexExtension(true);
} }
MarkType::MarkType(std::string_view extension)
{
if (extension.starts_with('c'))
{
compressed = true;
extension = extension.substr(1);
}
if (!extension.starts_with("mrk"))
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Mark file extension does not start with .mrk or .cmrk");
extension = extension.substr(strlen("mrk"));
if (extension.empty())
{
adaptive = false;
part_type = MergeTreeDataPartType::Wide;
}
else if (extension == "2")
{
adaptive = true;
part_type = MergeTreeDataPartType::Wide;
}
else if (extension == "3")
{
adaptive = true;
part_type = MergeTreeDataPartType::Compact;
}
else
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Unknown mark file extension: '{}'", extension);
}
MarkType::MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_)
: adaptive(adaptive_), compressed(compressed_), part_type(part_type_)
{
if (!adaptive && part_type != MergeTreeDataPartType::Wide)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity");
if (part_type == MergeTreeDataPartType::Unknown)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type");
}
bool MarkType::isMarkFileExtension(std::string_view extension)
{
return extension.find("mrk") != std::string_view::npos;
}
std::string MarkType::getFileExtension()
{
std::string res = compressed ? "cmrk" : "mrk";
if (!adaptive)
{
if (part_type != MergeTreeDataPartType::Wide)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity");
return res;
}
switch (part_type)
{
case MergeTreeDataPartType::Wide:
return res + "2";
case MergeTreeDataPartType::Compact:
return res + "3";
case MergeTreeDataPartType::InMemory:
return "";
case MergeTreeDataPartType::Unknown:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type");
}
}
} }

View File

@ -160,7 +160,7 @@ public:
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency); void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
void appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection = false) const; void appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection = false) const;
String getMarksFileExtension() const { return index_granularity_info.marks_file_extension; } String getMarksFileExtension() const { return index_granularity_info.mark_type.getFileExtension(); }
/// Generate the new name for this part according to `new_part_info` and min/max dates from the old name. /// Generate the new name for this part according to `new_part_info` and min/max dates from the old name.
/// This is useful when you want to change e.g. block numbers or the mutation version of the part. /// This is useful when you want to change e.g. block numbers or the mutation version of the part.
@ -588,22 +588,4 @@ inline String getIndexExtension(bool is_compressed_primary_key) { return is_comp
std::optional<String> getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage); std::optional<String> getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);
bool isCompressedFromIndexExtension(const String & index_extension); bool isCompressedFromIndexExtension(const String & index_extension);
/** Various types of mark files are stored in files with various extensions:
* .mrk, .mrk2, .mrk3, .cmrk, .cmrk2, .cmrk3.
* This helper allows to obtain mark type from file extension and vise versa.
*/
struct MarkType
{
MarkType(std::string_view extension);
MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_);
static bool isMarkFileExtension(std::string_view extension);
std::string getFileExtension();
bool adaptive = false;
bool compressed = false;
MergeTreeDataPartType::Value part_type = MergeTreeDataPartType::Unknown;
};
} }

View File

@ -106,6 +106,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
if (remove_it != columns.end()) if (remove_it != columns.end())
columns.erase(remove_it); columns.erase(remove_it);
} }
return remove_files; return remove_files;
} }

View File

@ -1072,7 +1072,7 @@ void MergeTreeData::loadDataPartsFromDisk(
suspicious_broken_parts_bytes += size_of_part; suspicious_broken_parts_bytes += size_of_part;
return; return;
} }
if (!part->index_granularity_info.is_adaptive) if (!part->index_granularity_info.mark_type.adaptive)
has_non_adaptive_parts.store(true, std::memory_order_relaxed); has_non_adaptive_parts.store(true, std::memory_order_relaxed);
else else
has_adaptive_parts.store(true, std::memory_order_relaxed); has_adaptive_parts.store(true, std::memory_order_relaxed);
@ -6315,9 +6315,9 @@ bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
if (!settings->enable_mixed_granularity_parts || settings->index_granularity_bytes == 0) if (!settings->enable_mixed_granularity_parts || settings->index_granularity_bytes == 0)
{ {
if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.is_adaptive) if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.mark_type.adaptive)
return false; return false;
if (canUseAdaptiveGranularity() && !src_part->index_granularity_info.is_adaptive) if (canUseAdaptiveGranularity() && !src_part->index_granularity_info.mark_type.adaptive)
return false; return false;
} }
return true; return true;

View File

@ -71,7 +71,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
return std::make_unique<MergeTreeDataPartWriterCompact>( return std::make_unique<MergeTreeDataPartWriterCompact>(
shared_from_this(), std::move(data_part_storage_builder), ordered_columns_list, metadata_snapshot, shared_from_this(), std::move(data_part_storage_builder), ordered_columns_list, metadata_snapshot,
indices_to_recalc, index_granularity_info.marks_file_extension, indices_to_recalc, getMarksFileExtension(),
default_codec_, writer_settings, computed_index_granularity); default_codec_, writer_settings, computed_index_granularity);
} }
@ -85,19 +85,17 @@ void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*eac
total_size.data_uncompressed += bin_checksum->second.uncompressed_size; total_size.data_uncompressed += bin_checksum->second.uncompressed_size;
} }
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension); auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + getMarksFileExtension());
if (mrk_checksum != checksums.files.end()) if (mrk_checksum != checksums.files.end())
total_size.marks += mrk_checksum->second.file_size; total_size.marks += mrk_checksum->second.file_size;
} }
void MergeTreeDataPartCompact::loadIndexGranularity() void MergeTreeDataPartCompact::loadIndexGranularity()
{ {
//String full_path = getRelativePath();
if (columns.empty()) if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
if (!index_granularity_info.is_adaptive) if (!index_granularity_info.mark_type.adaptive)
throw Exception("MergeTreeDataPartCompact cannot be created with non-adaptive granulary.", ErrorCodes::NOT_IMPLEMENTED); throw Exception("MergeTreeDataPartCompact cannot be created with non-adaptive granulary.", ErrorCodes::NOT_IMPLEMENTED);
auto marks_file_path = index_granularity_info.getMarksFilePath("data"); auto marks_file_path = index_granularity_info.getMarksFilePath("data");
@ -131,7 +129,7 @@ bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) co
return false; return false;
auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION); auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION);
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension); auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + getMarksFileExtension());
return (bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end()); return (bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end());
} }
@ -139,7 +137,7 @@ bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) co
void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) const void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) const
{ {
checkConsistencyBase(); checkConsistencyBase();
String mrk_file_name = DATA_FILE_NAME + index_granularity_info.marks_file_extension; String mrk_file_name = DATA_FILE_NAME + getMarksFileExtension();
if (!checksums.empty()) if (!checksums.empty())
{ {

View File

@ -67,7 +67,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter(
return std::make_unique<MergeTreeDataPartWriterWide>( return std::make_unique<MergeTreeDataPartWriterWide>(
shared_from_this(), data_part_storage_builder, shared_from_this(), data_part_storage_builder,
columns_list, metadata_snapshot, indices_to_recalc, columns_list, metadata_snapshot, indices_to_recalc,
index_granularity_info.marks_file_extension, getMarksFileExtension(),
default_codec_, writer_settings, computed_index_granularity); default_codec_, writer_settings, computed_index_granularity);
} }
@ -95,7 +95,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
size.data_uncompressed += bin_checksum->second.uncompressed_size; size.data_uncompressed += bin_checksum->second.uncompressed_size;
} }
auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension); auto mrk_checksum = checksums.files.find(file_name + getMarksFileExtension());
if (mrk_checksum != checksums.files.end()) if (mrk_checksum != checksums.files.end())
size.marks += mrk_checksum->second.file_size; size.marks += mrk_checksum->second.file_size;
}); });
@ -103,11 +103,11 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
return size; return size;
} }
void MergeTreeDataPartWide::loadIndexGranularity() void MergeTreeDataPartWide::loadIndexGranularity()
{ {
index_granularity_info.changeGranularityIfRequired(data_part_storage); index_granularity_info.changeGranularityIfRequired(data_part_storage);
if (columns.empty()) if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
@ -119,55 +119,48 @@ void MergeTreeDataPartWide::loadIndexGranularity()
std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path)); std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path));
size_t marks_file_size = data_part_storage->getFileSize(marks_file_path); size_t marks_file_size = data_part_storage->getFileSize(marks_file_path);
if (!index_granularity_info.compress_marks)
{
if (!index_granularity_info.is_adaptive)
{
size_t marks_count = marks_file_size / index_granularity_info.getMarkSizeInBytes();
index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same
}
else
{
auto buffer = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
while (!buffer->eof())
{
buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block
size_t granularity;
readIntBinary(granularity, *buffer);
index_granularity.appendMark(granularity);
}
if (index_granularity.getMarksCount() * index_granularity_info.getMarkSizeInBytes() != marks_file_size) if (!index_granularity_info.mark_type.adaptive && !index_granularity_info.mark_type.compressed)
throw Exception("Cannot read all marks from file " + data_part_storage->getFullPath() + "/" + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA); {
} /// The most easy way - no need to read the file, everything is known from its size.
size_t marks_count = marks_file_size / index_granularity_info.getMarkSizeInBytes();
index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same
} }
else else
{ {
CompressedReadBufferFromFile buffer( auto marks_file = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt));
std::unique_ptr<ReadBuffer> marks_reader;
if (!index_granularity_info.mark_type.compressed)
marks_reader = std::move(marks_file);
else
marks_reader = std::make_unique<CompressedReadBufferFromFile>(std::move(marks_file));
MarksInCompressedFile mark(1);
size_t marks_count = 0; size_t marks_count = 0;
while (!buffer.eof()) while (!marks_reader->eof())
{ {
buffer.readStrict(reinterpret_cast<char *>(mark.data()), sizeof(size_t) * 2); /// skip offset_in_compressed file and offset_in_decompressed_block MarkInCompressedFile mark;
size_t granularity;
readBinary(mark.offset_in_compressed_file, *marks_reader);
readBinary(mark.offset_in_decompressed_block, *marks_reader);
++marks_count; ++marks_count;
if (index_granularity_info.is_adaptive) if (index_granularity_info.mark_type.adaptive)
{ {
size_t granularity; readIntBinary(granularity, *marks_reader);
readIntBinary(granularity, buffer);
index_granularity.appendMark(granularity); index_granularity.appendMark(granularity);
} }
} }
if (!index_granularity_info.is_adaptive) if (!index_granularity_info.mark_type.adaptive)
index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same
} }
index_granularity.setInitialized(); index_granularity.setInitialized();
} }
bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const
{ {
return data_part_storage->isStoredOnRemoteDisk(); return data_part_storage->isStoredOnRemoteDisk();
@ -186,7 +179,7 @@ MergeTreeDataPartWide::~MergeTreeDataPartWide()
void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
{ {
checkConsistencyBase(); checkConsistencyBase();
//String path = getRelativePath(); std::string marks_file_extension = index_granularity_info.mark_type.getFileExtension();
if (!checksums.empty()) if (!checksums.empty())
{ {
@ -197,7 +190,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{ {
String file_name = ISerialization::getFileNameForStream(name_type, substream_path); String file_name = ISerialization::getFileNameForStream(name_type, substream_path);
String mrk_file_name = file_name + index_granularity_info.marks_file_extension; String mrk_file_name = file_name + marks_file_extension;
String bin_file_name = file_name + DATA_FILE_EXTENSION; String bin_file_name = file_name + DATA_FILE_EXTENSION;
if (!checksums.files.contains(mrk_file_name)) if (!checksums.files.contains(mrk_file_name))
@ -223,7 +216,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
{ {
getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{ {
auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + index_granularity_info.marks_file_extension; auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + marks_file_extension;
/// Missing file is Ok for case when new column was added. /// Missing file is Ok for case when new column was added.
if (data_part_storage->exists(file_path)) if (data_part_storage->exists(file_path))
@ -251,10 +244,11 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const
{ {
auto check_stream_exists = [this](const String & stream_name) std::string marks_file_extension = index_granularity_info.mark_type.getFileExtension();
auto check_stream_exists = [this, &marks_file_extension](const String & stream_name)
{ {
auto bin_checksum = checksums.files.find(stream_name + DATA_FILE_EXTENSION); auto bin_checksum = checksums.files.find(stream_name + DATA_FILE_EXTENSION);
auto mrk_checksum = checksums.files.find(stream_name + index_granularity_info.marks_file_extension); auto mrk_checksum = checksums.files.find(stream_name + marks_file_extension);
return bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end(); return bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end();
}; };

View File

@ -431,7 +431,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
auto mrk_file_in = data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt); auto mrk_file_in = data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt);
std::unique_ptr<ReadBuffer> mrk_in; std::unique_ptr<ReadBuffer> mrk_in;
if (data_part->index_granularity_info.compress_marks) if (data_part->index_granularity_info.mark_type.compressed)
mrk_in = std::make_unique<CompressedReadBufferFromFile>(std::move(mrk_file_in)); mrk_in = std::make_unique<CompressedReadBufferFromFile>(std::move(mrk_file_in));
else else
mrk_in = std::move(mrk_file_in); mrk_in = std::move(mrk_file_in);

View File

@ -9,10 +9,87 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int UNKNOWN_PART_TYPE; extern const int UNKNOWN_PART_TYPE;
extern const int INCORRECT_FILE_NAME;
} }
MarkType::MarkType(std::string_view extension)
{
if (extension.starts_with('.'))
extension = extension.substr(1);
if (extension.starts_with('c'))
{
compressed = true;
extension = extension.substr(1);
}
if (!extension.starts_with("mrk"))
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Mark file extension does not start with .mrk or .cmrk");
extension = extension.substr(strlen("mrk"));
if (extension.empty())
{
adaptive = false;
part_type = MergeTreeDataPartType::Wide;
}
else if (extension == "2")
{
adaptive = true;
part_type = MergeTreeDataPartType::Wide;
}
else if (extension == "3")
{
adaptive = true;
part_type = MergeTreeDataPartType::Compact;
}
else
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Unknown mark file extension: '{}'", extension);
}
MarkType::MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_)
: adaptive(adaptive_), compressed(compressed_), part_type(part_type_)
{
if (!adaptive && part_type != MergeTreeDataPartType::Wide)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity");
if (part_type == MergeTreeDataPartType::Unknown)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type");
}
bool MarkType::isMarkFileExtension(std::string_view extension)
{
return extension.find("mrk") != std::string_view::npos;
}
std::string MarkType::getFileExtension() const
{
std::string res = compressed ? ".cmrk" : ".mrk";
if (!adaptive)
{
if (part_type != MergeTreeDataPartType::Wide)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity");
return res;
}
switch (part_type)
{
case MergeTreeDataPartType::Wide:
return res + "2";
case MergeTreeDataPartType::Compact:
return res + "3";
case MergeTreeDataPartType::InMemory:
return "";
case MergeTreeDataPartType::Unknown:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type");
}
}
std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage) std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage)
{ {
if (data_part_storage->exists()) if (data_part_storage->exists())
@ -24,51 +101,28 @@ std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromF
} }
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_) MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_)
: type(type_) : mark_type(storage.canUseAdaptiveGranularity(), storage.getSettings()->compress_marks, type_.getValue())
{ {
const auto storage_settings = storage.getSettings(); fixed_index_granularity = storage.getSettings()->index_granularity;
fixed_index_granularity = storage_settings->index_granularity;
compress_marks = storage_settings->compress_marks;
/// Granularity is fixed
if (!storage.canUseAdaptiveGranularity())
{
if (type != MergeTreeDataPartType::Wide)
throw Exception("Only Wide parts can be used with non-adaptive granularity.", ErrorCodes::NOT_IMPLEMENTED);
setNonAdaptive();
}
else
setAdaptive(storage_settings->index_granularity_bytes);
} }
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage) void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage)
{ {
auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage); auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage);
if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension(compress_marks)) if (mrk_ext && !MarkType(*mrk_ext).adaptive)
setNonAdaptive(); {
} mark_type.adaptive = false;
index_granularity_bytes = 0;
void MergeTreeIndexGranularityInfo::setAdaptive(size_t index_granularity_bytes_) }
{
is_adaptive = true;
marks_file_extension = getAdaptiveMrkExtension(type, compress_marks);
index_granularity_bytes = index_granularity_bytes_;
}
void MergeTreeIndexGranularityInfo::setNonAdaptive()
{
is_adaptive = false;
marks_file_extension = getNonAdaptiveMrkExtension(compress_marks);
index_granularity_bytes = 0;
} }
size_t MergeTreeIndexGranularityInfo::getMarkSizeInBytes(size_t columns_num) const size_t MergeTreeIndexGranularityInfo::getMarkSizeInBytes(size_t columns_num) const
{ {
if (type == MergeTreeDataPartType::Wide) if (mark_type.part_type == MergeTreeDataPartType::Wide)
return is_adaptive ? getAdaptiveMrkSizeWide() : getNonAdaptiveMrkSizeWide(); return mark_type.adaptive ? getAdaptiveMrkSizeWide() : getNonAdaptiveMrkSizeWide();
else if (type == MergeTreeDataPartType::Compact) else if (mark_type.part_type == MergeTreeDataPartType::Compact)
return getAdaptiveMrkSizeCompact(columns_num); return getAdaptiveMrkSizeCompact(columns_num);
else if (type == MergeTreeDataPartType::InMemory) else if (mark_type.part_type == MergeTreeDataPartType::InMemory)
return 0; return 0;
else else
throw Exception("Unknown part type", ErrorCodes::UNKNOWN_PART_TYPE); throw Exception("Unknown part type", ErrorCodes::UNKNOWN_PART_TYPE);
@ -80,16 +134,4 @@ size_t getAdaptiveMrkSizeCompact(size_t columns_num)
return sizeof(UInt64) * (columns_num * 2 + 1); return sizeof(UInt64) * (columns_num * 2 + 1);
} }
std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type, bool compress_marks)
{
if (part_type == MergeTreeDataPartType::Wide)
return compress_marks ? ".cmrk2" : ".mrk2";
else if (part_type == MergeTreeDataPartType::Compact)
return compress_marks ? ".cmrk3" : ".mrk3";
else if (part_type == MergeTreeDataPartType::InMemory)
return "";
else
throw Exception("Unknown part type", ErrorCodes::UNKNOWN_PART_TYPE);
}
} }

View File

@ -11,15 +11,30 @@ namespace DB
class MergeTreeData; class MergeTreeData;
/** Various types of mark files are stored in files with various extensions:
* .mrk, .mrk2, .mrk3, .cmrk, .cmrk2, .cmrk3.
* This helper allows to obtain mark type from file extension and vise versa.
*/
struct MarkType
{
MarkType(std::string_view extension);
MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_);
static bool isMarkFileExtension(std::string_view extension);
std::string getFileExtension() const;
bool adaptive = false;
bool compressed = false;
MergeTreeDataPartType::Value part_type = MergeTreeDataPartType::Unknown;
};
/// Meta information about index granularity /// Meta information about index granularity
struct MergeTreeIndexGranularityInfo struct MergeTreeIndexGranularityInfo
{ {
public: public:
/// Marks file extension '.mrk' or '.mrk2' MarkType mark_type;
String marks_file_extension;
/// Is stride in rows between marks non fixed?
bool is_adaptive = false;
/// Fixed size in rows of one granule if index_granularity_bytes is zero /// Fixed size in rows of one granule if index_granularity_bytes is zero
size_t fixed_index_granularity = 0; size_t fixed_index_granularity = 0;
@ -27,38 +42,29 @@ public:
/// Approximate bytes size of one granule /// Approximate bytes size of one granule
size_t index_granularity_bytes = 0; size_t index_granularity_bytes = 0;
/// Whether to compress marks
bool compress_marks;
MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_); MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_);
void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage); void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage);
String getMarksFilePath(const String & path_prefix) const String getMarksFilePath(const String & path_prefix) const
{ {
return path_prefix + marks_file_extension; return path_prefix + mark_type.getFileExtension();
} }
/// TODO: This is non-passable (method overload), remove before merge.
String getMarksFilePath(const DataPartStoragePtr & data_part_storage, const String & path_prefix) const String getMarksFilePath(const DataPartStoragePtr & data_part_storage, const String & path_prefix) const
{ {
auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage); auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage);
return path_prefix + mrk_ext.value_or(marks_file_extension); return path_prefix + mrk_ext.value_or(mark_type.getFileExtension());
} }
size_t getMarkSizeInBytes(size_t columns_num = 1) const; size_t getMarkSizeInBytes(size_t columns_num = 1) const;
static std::optional<std::string> getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage); static std::optional<std::string> getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);
private:
MergeTreeDataPartType type;
void setAdaptive(size_t index_granularity_bytes_);
void setNonAdaptive();
}; };
constexpr inline auto getNonAdaptiveMrkExtension(bool compress_marks) { return compress_marks ? ".cmrk" : ".mrk"; }
constexpr inline auto getNonAdaptiveMrkSizeWide() { return sizeof(UInt64) * 2; } constexpr inline auto getNonAdaptiveMrkSizeWide() { return sizeof(UInt64) * 2; }
constexpr inline auto getAdaptiveMrkSizeWide() { return sizeof(UInt64) * 3; } constexpr inline auto getAdaptiveMrkSizeWide() { return sizeof(UInt64) * 3; }
inline size_t getAdaptiveMrkSizeCompact(size_t columns_num); inline size_t getAdaptiveMrkSizeCompact(size_t columns_num);
std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type, bool compress_marks);
} }

View File

@ -80,7 +80,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
else else
reader = std::make_unique<CompressedReadBufferFromFile>(std::move(buffer)); reader = std::make_unique<CompressedReadBufferFromFile>(std::move(buffer));
if (!index_granularity_info.is_adaptive) if (!index_granularity_info.mark_type.adaptive)
{ {
/// Read directly to marks. /// Read directly to marks.
reader->readStrict(reinterpret_cast<char *>(res->data()), expected_uncompressed_size); reader->readStrict(reinterpret_cast<char *>(res->data()), expected_uncompressed_size);

View File

@ -33,7 +33,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
storage.getContext()->getSettings(), storage.getContext()->getSettings(),
write_settings, write_settings,
storage.getSettings(), storage.getSettings(),
data_part->index_granularity_info.is_adaptive, data_part->index_granularity_info.mark_type.adaptive,
/* rewrite_primary_key = */ true, /* rewrite_primary_key = */ true,
blocks_are_granules_size); blocks_are_granules_size);

View File

@ -30,8 +30,8 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
global_settings, global_settings,
data_part->storage.getContext()->getWriteSettings(), data_part->storage.getContext()->getWriteSettings(),
storage_settings, storage_settings,
index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity(), index_granularity_info ? index_granularity_info->mark_type.adaptive : data_part->storage.canUseAdaptiveGranularity(),
/* rewrite_primary_key = */false); /* rewrite_primary_key = */ false);
writer = data_part->getWriter( writer = data_part->getWriter(
data_part_storage_builder, data_part_storage_builder,

View File

@ -1570,7 +1570,7 @@ bool MutateTask::prepare()
ctx->new_data_part->partition.assign(ctx->source_part->partition); ctx->new_data_part->partition.assign(ctx->source_part->partition);
/// Don't change granularity type while mutating subset of columns /// Don't change granularity type while mutating subset of columns
ctx->mrk_extension = ctx->source_part->index_granularity_info.marks_file_extension; ctx->mrk_extension = ctx->source_part->index_granularity_info.mark_type.getFileExtension();
const auto data_settings = ctx->data->getSettings(); const auto data_settings = ctx->data->getSettings();
ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings); ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings);

View File

@ -261,7 +261,7 @@ void StorageSystemPartsColumns::processNextStorage(
size.data_uncompressed += bin_checksum->second.uncompressed_size; size.data_uncompressed += bin_checksum->second.uncompressed_size;
} }
auto mrk_checksum = part->checksums.files.find(file_name + part->index_granularity_info.marks_file_extension); auto mrk_checksum = part->checksums.files.find(file_name + part->index_granularity_info.mark_type.getFileExtension());
if (mrk_checksum != part->checksums.files.end()) if (mrk_checksum != part->checksums.files.end())
size.marks += mrk_checksum->second.file_size; size.marks += mrk_checksum->second.file_size;