mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
A log of bugs
This commit is contained in:
parent
0bfb7b18ce
commit
b544b62e44
@ -197,57 +197,6 @@ MergeTreeData::MergeTreeData(
|
||||
"MergeTree data format version on disk doesn't support custom partitioning",
|
||||
ErrorCodes::METADATA_MISMATCH);
|
||||
}
|
||||
index_granularity_info.init(settings, format_version, full_path);
|
||||
}
|
||||
|
||||
|
||||
|
||||
std::optional<std::string> MergeTreeData::IndexGranularityInfo::getMrkExtensionFromFS(const std::string & path_to_table) const
|
||||
{
|
||||
if (Poco::File(path_to_table).exists())
|
||||
{
|
||||
Poco::DirectoryIterator end;
|
||||
for (Poco::DirectoryIterator table_it(path_to_table); table_it != end; ++table_it)
|
||||
{
|
||||
if (startsWith(table_it.name(), "tmp"))
|
||||
continue;
|
||||
|
||||
if (Poco::File(table_it.path()).isDirectory())
|
||||
{
|
||||
for (Poco::DirectoryIterator part_it(table_it.path()); part_it != end; ++part_it)
|
||||
{
|
||||
const auto & ext = part_it.path().getExtension();
|
||||
if (ext == "mrk" || ext == "mrk2")
|
||||
return ext;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
void MergeTreeData::IndexGranularityInfo::init(
|
||||
const MergeTreeSettings & storage_settings,
|
||||
const MergeTreeDataFormatVersion & format,
|
||||
const std::string & path_to_table)
|
||||
{
|
||||
fixed_index_granularity = storage_settings.index_granularity;
|
||||
auto mrk_ext = getMrkExtensionFromFS(path_to_table);
|
||||
/// Granularity is fixed
|
||||
if (storage_settings.index_granularity_bytes == 0 || (mrk_ext && *mrk_ext == "mrk") || format < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
{
|
||||
is_adaptive = false;
|
||||
mark_size_in_bytes = sizeof(UInt64) * 2;
|
||||
marks_file_extension = ".mrk";
|
||||
index_granularity_bytes = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
is_adaptive = true;
|
||||
mark_size_in_bytes = sizeof(UInt64) * 3;
|
||||
marks_file_extension = ".mrk2";
|
||||
index_granularity_bytes = storage_settings.index_granularity_bytes;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1317,6 +1266,11 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
|
||||
out_expression = nullptr;
|
||||
out_rename_map = {};
|
||||
out_force_update_metadata = false;
|
||||
String part_mrk_file_extension;
|
||||
if (part)
|
||||
part_mrk_file_extension = part->index_granularity_info.marks_file_extension;
|
||||
else
|
||||
part_mrk_file_extension = settings.index_granularity_bytes == 0 ? getNonAdaptiveMrkExtension() : getAdaptiveMrkExtension();
|
||||
|
||||
using NameToType = std::map<String, const IDataType *>;
|
||||
NameToType new_types;
|
||||
@ -1337,7 +1291,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
|
||||
if (!new_indices_set.count(index.name))
|
||||
{
|
||||
out_rename_map["skp_idx_" + index.name + ".idx"] = "";
|
||||
out_rename_map["skp_idx_" + index.name + index_granularity_info.marks_file_extension] = "";
|
||||
out_rename_map["skp_idx_" + index.name + part_mrk_file_extension] = "";
|
||||
}
|
||||
}
|
||||
|
||||
@ -1366,7 +1320,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
|
||||
if (--stream_counts[file_name] == 0)
|
||||
{
|
||||
out_rename_map[file_name + ".bin"] = "";
|
||||
out_rename_map[file_name + index_granularity_info.marks_file_extension] = "";
|
||||
out_rename_map[file_name + part_mrk_file_extension] = "";
|
||||
}
|
||||
}, {});
|
||||
}
|
||||
@ -1441,7 +1395,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
|
||||
String temporary_file_name = IDataType::getFileNameForStream(temporary_column_name, substream_path);
|
||||
|
||||
out_rename_map[temporary_file_name + ".bin"] = original_file_name + ".bin";
|
||||
out_rename_map[temporary_file_name + index_granularity_info.marks_file_extension] = original_file_name + index_granularity_info.marks_file_extension;
|
||||
out_rename_map[temporary_file_name + part_mrk_file_extension] = original_file_name + part_mrk_file_extension;
|
||||
}, {});
|
||||
}
|
||||
|
||||
|
@ -285,39 +285,6 @@ public:
|
||||
String getModeName() const;
|
||||
};
|
||||
|
||||
/// Meta information about index granularity
|
||||
struct IndexGranularityInfo
|
||||
{
|
||||
public:
|
||||
/// Marks file extension '.mrk' or '.mrk2'
|
||||
String marks_file_extension;
|
||||
|
||||
/// Size of one mark in file two or three size_t numbers
|
||||
UInt8 mark_size_in_bytes;
|
||||
|
||||
/// Is stride in rows between marks non fixed?
|
||||
bool is_adaptive;
|
||||
|
||||
/// Fixed size in rows of one granule if index_granularity_bytes is zero
|
||||
size_t fixed_index_granularity;
|
||||
|
||||
/// Approximate bytes size of one granule
|
||||
size_t index_granularity_bytes;
|
||||
|
||||
void init(
|
||||
const MergeTreeSettings & storage_settings,
|
||||
const MergeTreeDataFormatVersion & format,
|
||||
const std::string & path_to_table);
|
||||
|
||||
String getMarksFilePath(const String & column_path) const
|
||||
{
|
||||
return column_path + marks_file_extension;
|
||||
}
|
||||
private:
|
||||
std::optional<std::string> getMrkExtensionFromFS(const std::string & path_to_table) const;
|
||||
};
|
||||
|
||||
|
||||
/// Attach the table corresponding to the directory in full_path (must end with /), with the given columns.
|
||||
/// Correctness of names and paths is not checked.
|
||||
///
|
||||
@ -610,7 +577,6 @@ public:
|
||||
MergeTreeDataFormatVersion format_version;
|
||||
|
||||
Context global_context;
|
||||
IndexGranularityInfo index_granularity_info;
|
||||
|
||||
/// Merging params - what additional actions to perform during merge.
|
||||
const MergingParams merging_params;
|
||||
|
@ -977,13 +977,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
|
||||
}
|
||||
|
||||
NameSet files_to_skip = {"checksums.txt", "columns.txt"};
|
||||
auto mrk_extension = data.settings.index_granularity_bytes ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension();
|
||||
for (const auto & entry : in_header)
|
||||
{
|
||||
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
|
||||
{
|
||||
String stream_name = IDataType::getFileNameForStream(entry.name, substream_path);
|
||||
files_to_skip.insert(stream_name + ".bin");
|
||||
files_to_skip.insert(stream_name + data.index_granularity_info.marks_file_extension);
|
||||
files_to_skip.insert(stream_name + mrk_extension);
|
||||
};
|
||||
|
||||
IDataType::SubstreamPath stream_path;
|
||||
|
@ -138,6 +138,7 @@ MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & na
|
||||
: storage(storage_)
|
||||
, name(name_)
|
||||
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
|
||||
, index_granularity_info(storage.settings, storage.format_version)
|
||||
{
|
||||
}
|
||||
|
||||
@ -145,6 +146,7 @@ MergeTreeDataPart::MergeTreeDataPart(const MergeTreeData & storage_, const Strin
|
||||
: storage(storage_)
|
||||
, name(name_)
|
||||
, info(info_)
|
||||
, index_granularity_info(storage.settings, storage.format_version)
|
||||
{
|
||||
}
|
||||
|
||||
@ -172,7 +174,7 @@ MergeTreeDataPart::ColumnSize MergeTreeDataPart::getColumnSizeImpl(
|
||||
size.data_uncompressed += bin_checksum->second.uncompressed_size;
|
||||
}
|
||||
|
||||
auto mrk_checksum = checksums.files.find(file_name + storage.index_granularity_info.marks_file_extension);
|
||||
auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension);
|
||||
if (mrk_checksum != checksums.files.end())
|
||||
size.marks += mrk_checksum->second.file_size;
|
||||
}, {});
|
||||
@ -529,22 +531,25 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu
|
||||
|
||||
void MergeTreeDataPart::loadIndexGranularity()
|
||||
{
|
||||
|
||||
String full_path = getFullPath();
|
||||
index_granularity_info.changeGranularityIfRequired(getFullPath());
|
||||
|
||||
if (columns.empty())
|
||||
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
const auto & granularity_info = storage.index_granularity_info;
|
||||
|
||||
/// We can use any column, it doesn't matter
|
||||
std::string marks_file_path = granularity_info.getMarksFilePath(getFullPath() + escapeForFileName(columns.front().name));
|
||||
std::string marks_file_path = index_granularity_info.getMarksFilePath(getFullPath() + escapeForFileName(columns.front().name));
|
||||
if (!Poco::File(marks_file_path).exists())
|
||||
throw Exception("Marks file '" + marks_file_path + "' doesn't exist", ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
|
||||
size_t marks_file_size = Poco::File(marks_file_path).getSize();
|
||||
|
||||
/// old version of marks with static index granularity
|
||||
if (!granularity_info.is_adaptive)
|
||||
if (!index_granularity_info.is_adaptive)
|
||||
{
|
||||
size_t marks_count = marks_file_size / granularity_info.mark_size_in_bytes;
|
||||
index_granularity.resizeWithFixedGranularity(marks_count, granularity_info.fixed_index_granularity); /// all the same
|
||||
size_t marks_count = marks_file_size / index_granularity_info.mark_size_in_bytes;
|
||||
index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -556,7 +561,7 @@ void MergeTreeDataPart::loadIndexGranularity()
|
||||
readIntBinary(granularity, buffer);
|
||||
index_granularity.appendMark(granularity);
|
||||
}
|
||||
if (index_granularity.getMarksCount() * granularity_info.mark_size_in_bytes != marks_file_size)
|
||||
if (index_granularity.getMarksCount() * index_granularity_info.mark_size_in_bytes != marks_file_size)
|
||||
throw Exception("Cannot read all marks from file " + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||
}
|
||||
index_granularity.setInitialized();
|
||||
@ -802,7 +807,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
|
||||
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
|
||||
{
|
||||
String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
|
||||
String mrk_file_name = file_name + storage.index_granularity_info.marks_file_extension;
|
||||
String mrk_file_name = file_name + index_granularity_info.marks_file_extension;
|
||||
String bin_file_name = file_name + ".bin";
|
||||
if (!checksums.files.count(mrk_file_name))
|
||||
throw Exception("No " + mrk_file_name + " file checksum for column " + name_type.name + " in part " + path,
|
||||
@ -866,7 +871,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
|
||||
{
|
||||
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
|
||||
{
|
||||
Poco::File file(IDataType::getFileNameForStream(name_type.name, substream_path) + storage.index_granularity_info.marks_file_extension);
|
||||
Poco::File file(IDataType::getFileNameForStream(name_type.name, substream_path) + index_granularity_info.marks_file_extension);
|
||||
|
||||
/// Missing file is Ok for case when new column was added.
|
||||
if (file.exists())
|
||||
@ -897,7 +902,7 @@ bool MergeTreeDataPart::hasColumnFiles(const String & column_name, const IDataTy
|
||||
String file_name = IDataType::getFileNameForStream(column_name, substream_path);
|
||||
|
||||
auto bin_checksum = checksums.files.find(file_name + ".bin");
|
||||
auto mrk_checksum = checksums.files.find(file_name + storage.index_granularity_info.marks_file_extension);
|
||||
auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension);
|
||||
|
||||
if (bin_checksum == checksums.files.end() || mrk_checksum == checksums.files.end())
|
||||
res = false;
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Core/Types.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndices.h>
|
||||
#include <Storages/MergeTree/MergeTreePartInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreePartition.h>
|
||||
@ -246,6 +247,8 @@ struct MergeTreeDataPart
|
||||
*/
|
||||
mutable std::mutex alter_mutex;
|
||||
|
||||
MergeTreeIndexGranularityInfo index_granularity_info;
|
||||
|
||||
~MergeTreeDataPart();
|
||||
|
||||
/// Calculate the total size of the entire directory with all the files
|
||||
|
@ -623,17 +623,16 @@ namespace
|
||||
size_t roundRowsOrBytesToMarks(
|
||||
size_t rows_setting,
|
||||
size_t bytes_setting,
|
||||
const MergeTreeData::IndexGranularityInfo & granularity_info)
|
||||
size_t rows_granularity,
|
||||
size_t bytes_granularity)
|
||||
{
|
||||
if (!granularity_info.is_adaptive)
|
||||
if (bytes_granularity == 0)
|
||||
{
|
||||
size_t fixed_index_granularity = granularity_info.fixed_index_granularity;
|
||||
return (rows_setting + fixed_index_granularity - 1) / fixed_index_granularity;
|
||||
return (rows_setting + rows_granularity - 1) / rows_granularity;
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t index_granularity_bytes = granularity_info.index_granularity_bytes;
|
||||
return (bytes_setting + index_granularity_bytes - 1) / index_granularity_bytes;
|
||||
return (bytes_setting + bytes_granularity - 1) / bytes_granularity;
|
||||
}
|
||||
}
|
||||
|
||||
@ -650,20 +649,13 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
|
||||
const Names & virt_columns,
|
||||
const Settings & settings) const
|
||||
{
|
||||
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
|
||||
settings.merge_tree_max_rows_to_use_cache,
|
||||
settings.merge_tree_max_bytes_to_use_cache,
|
||||
data.index_granularity_info);
|
||||
|
||||
const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
|
||||
settings.merge_tree_min_rows_for_concurrent_read,
|
||||
settings.merge_tree_min_bytes_for_concurrent_read,
|
||||
data.index_granularity_info);
|
||||
|
||||
/// Count marks for each part.
|
||||
std::vector<size_t> sum_marks_in_parts(parts.size());
|
||||
size_t sum_marks = 0;
|
||||
size_t total_rows = 0;
|
||||
|
||||
size_t adaptive_parts = 0;
|
||||
for (size_t i = 0; i < parts.size(); ++i)
|
||||
{
|
||||
total_rows += parts[i].getRowsCount();
|
||||
@ -674,8 +666,26 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
|
||||
sum_marks_in_parts[i] += range.end - range.begin;
|
||||
|
||||
sum_marks += sum_marks_in_parts[i];
|
||||
if (parts[i].data_part->index_granularity_info.is_adaptive)
|
||||
adaptive_parts++;
|
||||
}
|
||||
|
||||
size_t index_granularity_bytes = 0;
|
||||
if (adaptive_parts > parts.size() / 2)
|
||||
index_granularity_bytes = data.settings.index_granularity_bytes;
|
||||
|
||||
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
|
||||
settings.merge_tree_max_rows_to_use_cache,
|
||||
settings.merge_tree_max_bytes_to_use_cache,
|
||||
data.settings.index_granularity,
|
||||
index_granularity_bytes);
|
||||
|
||||
const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
|
||||
settings.merge_tree_min_rows_for_concurrent_read,
|
||||
settings.merge_tree_min_bytes_for_concurrent_read,
|
||||
data.settings.index_granularity,
|
||||
index_granularity_bytes);
|
||||
|
||||
if (sum_marks > max_marks_to_use_cache)
|
||||
use_uncompressed_cache = false;
|
||||
|
||||
@ -798,15 +808,27 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
|
||||
const Names & virt_columns,
|
||||
const Settings & settings) const
|
||||
{
|
||||
|
||||
size_t sum_marks = 0;
|
||||
size_t adaptive_parts = 0;
|
||||
for (size_t i = 0; i < parts.size(); ++i)
|
||||
{
|
||||
for (size_t j = 0; j < parts[i].ranges.size(); ++j)
|
||||
sum_marks += parts[i].ranges[j].end - parts[i].ranges[j].begin;
|
||||
|
||||
if (parts[i].data_part->index_granularity_info.is_adaptive)
|
||||
adaptive_parts++;
|
||||
}
|
||||
|
||||
size_t index_granularity_bytes = 0;
|
||||
if (adaptive_parts >= parts.size() / 2)
|
||||
index_granularity_bytes = data.settings.index_granularity_bytes;
|
||||
|
||||
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
|
||||
settings.merge_tree_max_rows_to_use_cache,
|
||||
settings.merge_tree_max_bytes_to_use_cache,
|
||||
data.index_granularity_info);
|
||||
|
||||
size_t sum_marks = 0;
|
||||
for (size_t i = 0; i < parts.size(); ++i)
|
||||
for (size_t j = 0; j < parts[i].ranges.size(); ++j)
|
||||
sum_marks += parts[i].ranges[j].end - parts[i].ranges[j].begin;
|
||||
data.settings.index_granularity,
|
||||
index_granularity_bytes);
|
||||
|
||||
if (sum_marks > max_marks_to_use_cache)
|
||||
use_uncompressed_cache = false;
|
||||
@ -828,6 +850,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
|
||||
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.sorting_key_expr));
|
||||
}
|
||||
|
||||
|
||||
Names sort_columns = data.sorting_key_columns;
|
||||
SortDescription sort_description;
|
||||
size_t sort_columns_size = sort_columns.size();
|
||||
@ -921,7 +944,8 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
|
||||
size_t min_marks_for_seek = roundRowsOrBytesToMarks(
|
||||
settings.merge_tree_min_rows_for_seek,
|
||||
settings.merge_tree_min_bytes_for_seek,
|
||||
data.index_granularity_info);
|
||||
part->index_granularity_info.fixed_index_granularity,
|
||||
part->index_granularity_info.index_granularity_bytes);
|
||||
|
||||
/** There will always be disjoint suspicious segments on the stack, the leftmost one at the top (back).
|
||||
* At each step, take the left segment and check if it fits.
|
||||
@ -1006,7 +1030,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
|
||||
const size_t min_marks_for_seek = roundRowsOrBytesToMarks(
|
||||
settings.merge_tree_min_rows_for_seek,
|
||||
settings.merge_tree_min_bytes_for_seek,
|
||||
data.index_granularity_info);
|
||||
part->index_granularity_info.index_granularity_bytes,
|
||||
part->index_granularity_info.fixed_index_granularity);
|
||||
|
||||
size_t granules_dropped = 0;
|
||||
|
||||
|
@ -0,0 +1,60 @@
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
|
||||
#include <Poco/Path.h>
|
||||
#include <Poco/File.h>
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
#include <iostream>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(const std::string & path_to_part) const
|
||||
{
|
||||
if (Poco::File(path_to_part).exists())
|
||||
{
|
||||
Poco::DirectoryIterator end;
|
||||
for (Poco::DirectoryIterator part_it(path_to_part); part_it != end; ++part_it)
|
||||
{
|
||||
const auto & ext = "." + part_it.path().getExtension();
|
||||
if (ext == getNonAdaptiveMrkExtension() || ext == getAdaptiveMrkExtension())
|
||||
return ext;
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(
|
||||
const MergeTreeSettings & storage_settings,
|
||||
const MergeTreeDataFormatVersion & format)
|
||||
{
|
||||
fixed_index_granularity = storage_settings.index_granularity;
|
||||
/// Granularity is fixed
|
||||
if (storage_settings.index_granularity_bytes == 0 || format < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
setNonAdaptive();
|
||||
else
|
||||
setAdaptive(storage_settings.index_granularity_bytes);
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const std::string & path_to_part)
|
||||
{
|
||||
auto mrk_ext = getMrkExtensionFromFS(path_to_part);
|
||||
if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension())
|
||||
setNonAdaptive();
|
||||
}
|
||||
|
||||
void MergeTreeIndexGranularityInfo::setAdaptive(size_t index_granularity_bytes_)
|
||||
{
|
||||
is_adaptive = false;
|
||||
mark_size_in_bytes = getAdaptiveMrkSize();
|
||||
marks_file_extension = getAdaptiveMrkExtension();
|
||||
index_granularity_bytes = index_granularity_bytes_;
|
||||
}
|
||||
|
||||
void MergeTreeIndexGranularityInfo::setNonAdaptive()
|
||||
{
|
||||
is_adaptive = true;
|
||||
mark_size_in_bytes = getNonAdaptiveMrkSize();
|
||||
marks_file_extension = getNonAdaptiveMrkExtension();
|
||||
index_granularity_bytes = 0;
|
||||
}
|
||||
|
||||
}
|
51
dbms/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h
Normal file
51
dbms/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h
Normal file
@ -0,0 +1,51 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
|
||||
#include <optional>
|
||||
#include <Core/Types.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
/// Meta information about index granularity
|
||||
struct MergeTreeIndexGranularityInfo
|
||||
{
|
||||
public:
|
||||
/// Marks file extension '.mrk' or '.mrk2'
|
||||
String marks_file_extension;
|
||||
|
||||
/// Size of one mark in file two or three size_t numbers
|
||||
UInt8 mark_size_in_bytes;
|
||||
|
||||
/// Is stride in rows between marks non fixed?
|
||||
bool is_adaptive;
|
||||
|
||||
/// Fixed size in rows of one granule if index_granularity_bytes is zero
|
||||
size_t fixed_index_granularity;
|
||||
|
||||
/// Approximate bytes size of one granule
|
||||
size_t index_granularity_bytes;
|
||||
|
||||
MergeTreeIndexGranularityInfo(
|
||||
const MergeTreeSettings & storage_settings,
|
||||
const MergeTreeDataFormatVersion & format);
|
||||
|
||||
void changeGranularityIfRequired(const std::string & path_to_part);
|
||||
|
||||
String getMarksFilePath(const String & column_path) const
|
||||
{
|
||||
return column_path + marks_file_extension;
|
||||
}
|
||||
private:
|
||||
|
||||
void setAdaptive(size_t index_granularity_bytes_);
|
||||
void setNonAdaptive();
|
||||
std::optional<std::string> getMrkExtensionFromFS(const std::string & path_to_table) const;
|
||||
};
|
||||
|
||||
constexpr inline auto getNonAdaptiveMrkExtension() { return ".mrk"; }
|
||||
constexpr inline auto getAdaptiveMrkExtension() { return ".mrk2"; }
|
||||
constexpr inline auto getNonAdaptiveMrkSize() { return sizeof(UInt64) * 2; }
|
||||
constexpr inline auto getAdaptiveMrkSize() { return sizeof(UInt64) * 3; }
|
||||
|
||||
}
|
@ -10,7 +10,7 @@ MergeTreeIndexReader::MergeTreeIndexReader(
|
||||
part->getFullPath() + index->getFileName(), ".idx", marks_count,
|
||||
all_mark_ranges, nullptr, false, nullptr,
|
||||
part->getFileSizeOrZero(index->getFileName() + ".idx"), 0, DBMS_DEFAULT_BUFFER_SIZE,
|
||||
&part->storage.index_granularity_info,
|
||||
&part->index_granularity_info,
|
||||
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE)
|
||||
{
|
||||
stream.seekToStart();
|
||||
|
@ -173,7 +173,7 @@ void MergeTreeReader::addStreams(const String & name, const IDataType & type,
|
||||
all_mark_ranges, mark_cache, save_marks_in_cache,
|
||||
uncompressed_cache, data_part->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
|
||||
aio_threshold, max_read_buffer_size,
|
||||
&storage.index_granularity_info,
|
||||
&data_part->index_granularity_info,
|
||||
profile_callback, clock_type));
|
||||
};
|
||||
|
||||
|
@ -20,7 +20,7 @@ MergeTreeReaderStream::MergeTreeReaderStream(
|
||||
MarkCache * mark_cache_, bool save_marks_in_cache_,
|
||||
UncompressedCache * uncompressed_cache,
|
||||
size_t file_size, size_t aio_threshold, size_t max_read_buffer_size,
|
||||
const GranularityInfo * index_granularity_info_,
|
||||
const MergeTreeIndexGranularityInfo * index_granularity_info_,
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
|
||||
: path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_)
|
||||
, mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_)
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <Storages/MergeTree/MarkRange.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/MergeTree/MergeTreeRangeReader.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
|
||||
#include <Compression/CachedCompressedReadBuffer.h>
|
||||
#include <Compression/CompressedReadBufferFromFile.h>
|
||||
|
||||
@ -13,14 +14,13 @@ namespace DB
|
||||
class MergeTreeReaderStream
|
||||
{
|
||||
public:
|
||||
using GranularityInfo = MergeTreeData::IndexGranularityInfo;
|
||||
MergeTreeReaderStream(
|
||||
const String & path_prefix_, const String & data_file_extension_, size_t marks_count_,
|
||||
const MarkRanges & all_mark_ranges,
|
||||
MarkCache * mark_cache, bool save_marks_in_cache,
|
||||
UncompressedCache * uncompressed_cache,
|
||||
size_t file_size, size_t aio_threshold, size_t max_read_buffer_size,
|
||||
const GranularityInfo * index_granularity_info_,
|
||||
const MergeTreeIndexGranularityInfo * index_granularity_info_,
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
|
||||
|
||||
void seekToMark(size_t index);
|
||||
@ -44,7 +44,7 @@ private:
|
||||
bool save_marks_in_cache;
|
||||
MarkCache::MappedPtr marks;
|
||||
|
||||
const GranularityInfo * index_granularity_info;
|
||||
const MergeTreeIndexGranularityInfo * index_granularity_info;
|
||||
|
||||
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
|
||||
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
|
||||
|
@ -29,9 +29,9 @@ MergeTreeThreadSelectBlockInputStream::MergeTreeThreadSelectBlockInputStream(
|
||||
/// round min_marks_to_read up to nearest multiple of block_size expressed in marks
|
||||
/// If granularity is adaptive it doesn't make sense
|
||||
/// Maybe it will make sence to add settings `max_block_size_bytes`
|
||||
if (max_block_size_rows && !storage.index_granularity_info.is_adaptive)
|
||||
if (max_block_size_rows && storage.settings.index_granularity_bytes == 0)
|
||||
{
|
||||
size_t fixed_index_granularity = storage.index_granularity_info.fixed_index_granularity;
|
||||
size_t fixed_index_granularity = storage.settings.index_granularity;
|
||||
min_marks_to_read = (min_marks_to_read_ * fixed_index_granularity + max_block_size_rows - 1)
|
||||
/ max_block_size_rows * max_block_size_rows / fixed_index_granularity;
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include <Storages/MergeTree/MergedBlockOutputStream.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
|
||||
#include <IO/createWriteBufferFromFileBase.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
@ -40,8 +41,8 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
|
||||
, min_compress_block_size(min_compress_block_size_)
|
||||
, max_compress_block_size(max_compress_block_size_)
|
||||
, aio_threshold(aio_threshold_)
|
||||
, marks_file_extension(storage.index_granularity_info.marks_file_extension)
|
||||
, mark_size_in_bytes(storage.index_granularity_info.mark_size_in_bytes)
|
||||
, marks_file_extension(storage.settings.index_granularity_bytes ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension())
|
||||
, mark_size_in_bytes(storage.settings.index_granularity_bytes ? getAdaptiveMrkSize() : getNonAdaptiveMrkSize())
|
||||
, blocks_are_granules_size(blocks_are_granules_size_)
|
||||
, index_granularity(index_granularity_)
|
||||
, compute_granularity(index_granularity.empty())
|
||||
@ -148,8 +149,8 @@ void IMergedBlockOutputStream::fillIndexGranularity(const Block & block)
|
||||
{
|
||||
fillIndexGranularityImpl(
|
||||
block,
|
||||
storage.index_granularity_info.index_granularity_bytes,
|
||||
storage.index_granularity_info.fixed_index_granularity,
|
||||
storage.settings.index_granularity_bytes,
|
||||
storage.settings.index_granularity,
|
||||
blocks_are_granules_size,
|
||||
index_offset,
|
||||
index_granularity);
|
||||
@ -190,7 +191,7 @@ size_t IMergedBlockOutputStream::writeSingleGranule(
|
||||
|
||||
writeIntBinary(stream.plain_hashing.count(), stream.marks);
|
||||
writeIntBinary(stream.compressed.offset(), stream.marks);
|
||||
if (storage.index_granularity_info.is_adaptive)
|
||||
if (storage.settings.index_granularity_bytes != 0)
|
||||
writeIntBinary(number_of_rows, stream.marks);
|
||||
}, serialize_settings.path);
|
||||
}
|
||||
@ -705,7 +706,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
|
||||
writeIntBinary(stream.compressed.offset(), stream.marks);
|
||||
/// Actually this numbers is redundant, but we have to store them
|
||||
/// to be compatible with normal .mrk2 file format
|
||||
if (storage.index_granularity_info.is_adaptive)
|
||||
if (storage.settings.index_granularity_bytes != 0)
|
||||
writeIntBinary(1UL, stream.marks);
|
||||
}
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
|
||||
date_column = data.minmax_idx_columns[data.minmax_idx_date_column_pos];
|
||||
|
||||
sampling_expression = formattedAST(data.sample_by_ast);
|
||||
index_granularity = data.index_granularity_info.fixed_index_granularity;
|
||||
index_granularity = data.settings.index_granularity;
|
||||
merging_params_mode = static_cast<int>(data.merging_params.mode);
|
||||
sign_column = data.merging_params.sign_column;
|
||||
|
||||
@ -47,7 +47,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
|
||||
|
||||
ttl_table = formattedAST(data.ttl_table_ast);
|
||||
skip_indices = data.getIndices().toString();
|
||||
index_granularity_bytes = data.index_granularity_info.index_granularity_bytes;
|
||||
index_granularity_bytes = data.settings.index_granularity_bytes;
|
||||
}
|
||||
|
||||
void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
|
||||
@ -115,9 +115,6 @@ void ReplicatedMergeTreeTableMetadata::read(ReadBuffer & in)
|
||||
|
||||
if (checkString("granularity bytes: ", in))
|
||||
in >> index_granularity_bytes >> "\n";
|
||||
else
|
||||
index_granularity_bytes = 0;
|
||||
|
||||
}
|
||||
|
||||
ReplicatedMergeTreeTableMetadata ReplicatedMergeTreeTableMetadata::parse(const String & s)
|
||||
|
@ -433,7 +433,7 @@ MergeTreeData::DataPart::Checksums checkDataPart(
|
||||
return checkDataPart(
|
||||
data_part->getFullPath(),
|
||||
data_part->index_granularity,
|
||||
data_part->storage.index_granularity_info.marks_file_extension,
|
||||
data_part->index_granularity_info.marks_file_extension,
|
||||
require_checksums,
|
||||
primary_key_data_types,
|
||||
indices,
|
||||
|
Loading…
Reference in New Issue
Block a user