Merge pull request #71786 from CurtizJ/const-adaptive-granularity

Allow to use constant adaptive index granularity for whole part
This commit is contained in:
alesapin 2024-11-16 19:32:57 +00:00 committed by GitHub
commit 208fd6efe1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
49 changed files with 1071 additions and 440 deletions

View File

@ -647,7 +647,7 @@ std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
range.begin = exact_ranges[i].end;
ordinary_reading_marks -= exact_ranges[i].end - exact_ranges[i].begin;
exact_count += part_with_ranges.data_part->index_granularity.getRowsCountInRange(exact_ranges[i]);
exact_count += part_with_ranges.data_part->index_granularity->getRowsCountInRange(exact_ranges[i]);
++i;
}

View File

@ -201,7 +201,7 @@ public:
size_t getMarkRows(size_t part_idx, size_t mark) const
{
return parts[part_idx].data_part->index_granularity.getMarkRows(mark);
return parts[part_idx].data_part->index_granularity->getMarkRows(mark);
}
private:
const RangesInDataParts & parts;
@ -444,7 +444,7 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts,
parts_ranges.push_back(
{index_access.getValue(part_index, range.begin), range, part_index, PartsRangesIterator::EventType::RangeStart});
const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount();
const bool value_is_defined_at_end_mark = range.end < index_granularity->getMarksCount();
if (!value_is_defined_at_end_mark)
continue;
@ -667,7 +667,7 @@ std::pair<std::vector<RangesInDataParts>, std::vector<Values>> splitIntersecting
PartRangeIndex parts_range_start_index(parts_range_start);
parts_ranges_queue.push({std::move(parts_range_start), std::move(parts_range_start_index)});
const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount();
const bool value_is_defined_at_end_mark = range.end < index_granularity->getMarksCount();
if (!value_is_defined_at_end_mark)
continue;

View File

@ -667,7 +667,7 @@ Pipe ReadFromMergeTree::readInOrder(
part_with_ranges.ranges.size(),
read_type == ReadType::InReverseOrder ? " reverse " : " ",
part_with_ranges.data_part->name, total_rows,
part_with_ranges.data_part->index_granularity.getMarkStartingRow(part_with_ranges.ranges.front().begin));
part_with_ranges.data_part->index_granularity->getMarkStartingRow(part_with_ranges.ranges.front().begin));
MergeTreeSelectAlgorithmPtr algorithm;
if (read_type == ReadType::InReverseOrder)
@ -1759,7 +1759,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
return std::make_shared<AnalysisResult>(std::move(result));
for (const auto & part : parts)
total_marks_pk += part->index_granularity.getMarksCountWithoutFinal();
total_marks_pk += part->index_granularity->getMarksCountWithoutFinal();
parts_before_pk = parts.size();
auto reader_settings = getMergeTreeReaderSettings(context_, query_info_);

View File

@ -29,6 +29,7 @@
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MergeTree/Backup.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h>
#include <base/JSON.h>
#include <boost/algorithm/string/join.hpp>
#include <Common/CurrentMetrics.h>
@ -626,11 +627,12 @@ UInt64 IMergeTreeDataPart::getIndexSizeInAllocatedBytes() const
UInt64 IMergeTreeDataPart::getIndexGranularityBytes() const
{
return index_granularity.getBytesSize();
return index_granularity->getBytesSize();
}
UInt64 IMergeTreeDataPart::getIndexGranularityAllocatedBytes() const
{
return index_granularity.getBytesAllocated();
return index_granularity->getBytesAllocated();
}
void IMergeTreeDataPart::assertState(const std::initializer_list<MergeTreeDataPartState> & affordable_states) const
@ -661,7 +663,7 @@ void IMergeTreeDataPart::assertOnDisk() const
UInt64 IMergeTreeDataPart::getMarksCount() const
{
return index_granularity.getMarksCount();
return index_granularity->getMarksCount();
}
UInt64 IMergeTreeDataPart::getExistingBytesOnDisk() const
@ -746,7 +748,6 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
loadChecksums(require_columns_checksums);
loadIndexGranularity();
index_granularity.shrinkToFitInMemory();
if (!(*storage.getSettings())[MergeTreeSetting::primary_key_lazy_load])
getIndex();
@ -942,13 +943,13 @@ void IMergeTreeDataPart::loadIndex() const
for (size_t i = 0; i < key_size; ++i)
{
loaded_index[i] = primary_key.data_types[i]->createColumn();
loaded_index[i]->reserve(index_granularity.getMarksCount());
loaded_index[i]->reserve(index_granularity->getMarksCount());
}
String index_name = "primary" + getIndexExtensionFromFilesystem(getDataPartStorage());
String index_path = fs::path(getDataPartStorage().getRelativePath()) / index_name;
auto index_file = metadata_manager->read(index_name);
size_t marks_count = index_granularity.getMarksCount();
size_t marks_count = index_granularity->getMarksCount();
Serializations key_serializations(key_size);
for (size_t j = 0; j < key_size; ++j)
@ -1363,7 +1364,7 @@ void IMergeTreeDataPart::loadRowsCount()
assertEOF(*buf);
};
if (index_granularity.empty())
if (index_granularity->empty())
{
rows_count = 0;
}
@ -1398,9 +1399,9 @@ void IMergeTreeDataPart::loadRowsCount()
backQuote(column.name), rows_in_column, name, rows_count);
}
size_t last_possibly_incomplete_mark_rows = index_granularity.getLastNonFinalMarkRows();
size_t last_possibly_incomplete_mark_rows = index_granularity->getLastNonFinalMarkRows();
/// All this rows have to be written in column
size_t index_granularity_without_last_mark = index_granularity.getTotalRows() - last_possibly_incomplete_mark_rows;
size_t index_granularity_without_last_mark = index_granularity->getTotalRows() - last_possibly_incomplete_mark_rows;
/// We have more rows in column than in index granularity without last possibly incomplete mark
if (rows_in_column < index_granularity_without_last_mark)
{
@ -1410,7 +1411,7 @@ void IMergeTreeDataPart::loadRowsCount()
"and size of single value, "
"but index granularity in part {} without last mark has {} rows, which "
"is more than in column",
backQuote(column.name), rows_in_column, name, index_granularity.getTotalRows());
backQuote(column.name), rows_in_column, name, index_granularity->getTotalRows());
}
/// In last mark we actually written less or equal rows than stored in last mark of index granularity
@ -1458,8 +1459,8 @@ void IMergeTreeDataPart::loadRowsCount()
column.name, column_size, sizeof_field);
}
size_t last_mark_index_granularity = index_granularity.getLastNonFinalMarkRows();
size_t rows_approx = index_granularity.getTotalRows();
size_t last_mark_index_granularity = index_granularity->getLastNonFinalMarkRows();
size_t rows_approx = index_granularity->getTotalRows();
if (!(rows_count <= rows_approx && rows_approx < rows_count + last_mark_index_granularity))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected size of column {}: "
"{} rows, expected {}+-{} rows according to the index",
@ -1522,7 +1523,7 @@ UInt64 IMergeTreeDataPart::readExistingRowsCount()
while (current_row < rows_count)
{
size_t rows_to_read = index_granularity.getMarkRows(current_mark);
size_t rows_to_read = index_granularity->getMarkRows(current_mark);
continue_reading = (current_mark != 0);
Columns result;
@ -1970,6 +1971,9 @@ void IMergeTreeDataPart::initializeIndexGranularityInfo()
index_granularity_info = MergeTreeIndexGranularityInfo(storage, *mrk_type);
else
index_granularity_info = MergeTreeIndexGranularityInfo(storage, part_type);
/// It may be converted to constant index granularity after loading it.
index_granularity = std::make_unique<MergeTreeIndexGranularityAdaptive>();
}
void IMergeTreeDataPart::remove()
@ -2243,9 +2247,9 @@ void IMergeTreeDataPart::checkConsistency(bool require_part_metadata) const
"part_state: [{}]",
columns.toString(),
index_granularity_info.getMarkSizeInBytes(columns.size()),
index_granularity.getMarksCount(),
index_granularity->getMarksCount(),
index_granularity_info.describe(),
index_granularity.describe(),
index_granularity->describe(),
part_state);
e.addMessage(debug_info);

View File

@ -321,7 +321,7 @@ public:
/// Amount of rows between marks
/// As index always loaded into memory
MergeTreeIndexGranularity index_granularity;
MergeTreeIndexGranularityPtr index_granularity;
/// Index that for each part stores min and max values of a set of columns. This allows quickly excluding
/// parts based on conditions on these columns imposed by a query.

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Columns/ColumnSparse.h>
namespace DB
@ -11,7 +12,6 @@ namespace ErrorCodes
extern const int NO_SUCH_COLUMN_IN_TABLE;
}
Block getIndexBlockAndPermute(const Block & block, const Names & names, const IColumn::Permutation * permutation)
{
Block result;
@ -57,7 +57,7 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
const StorageMetadataPtr & metadata_snapshot_,
const VirtualsDescriptionPtr & virtual_columns_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
MergeTreeIndexGranularityPtr index_granularity_)
: data_part_name(data_part_name_)
, serializations(serializations_)
, index_granularity_info(index_granularity_info_)
@ -68,7 +68,7 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
, settings(settings_)
, with_final_mark(settings.can_use_adaptive_granularity)
, data_part_storage(data_part_storage_)
, index_granularity(index_granularity_)
, index_granularity(std::move(index_granularity_))
{
}
@ -145,7 +145,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter(
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity);
MergeTreeIndexGranularityPtr computed_index_granularity);
MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter(
const String & data_part_name_,
@ -162,8 +162,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter(
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity);
MergeTreeIndexGranularityPtr computed_index_granularity);
MergeTreeDataPartWriterPtr createMergeTreeDataPartWriter(
MergeTreeDataPartType part_type,
@ -182,12 +181,26 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWriter(
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity)
MergeTreeIndexGranularityPtr computed_index_granularity)
{
if (part_type == MergeTreeDataPartType::Compact)
return createMergeTreeDataPartCompactWriter(data_part_name_, logger_name_, serializations_, data_part_storage_,
index_granularity_info_, storage_settings_, columns_list, column_positions, metadata_snapshot, virtual_columns, indices_to_recalc, stats_to_recalc_,
marks_file_extension_, default_codec_, writer_settings, computed_index_granularity);
return createMergeTreeDataPartCompactWriter(
data_part_name_,
logger_name_,
serializations_,
data_part_storage_,
index_granularity_info_,
storage_settings_,
columns_list,
column_positions,
metadata_snapshot,
virtual_columns,
indices_to_recalc,
stats_to_recalc_,
marks_file_extension_,
default_codec_,
writer_settings,
std::move(computed_index_granularity));
if (part_type == MergeTreeDataPartType::Wide)
return createMergeTreeDataPartWideWriter(
data_part_name_,
@ -204,7 +217,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWriter(
marks_file_extension_,
default_codec_,
writer_settings,
computed_index_granularity);
std::move(computed_index_granularity));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown part type: {}", part_type.toString());
}

View File

@ -36,7 +36,7 @@ public:
const StorageMetadataPtr & metadata_snapshot_,
const VirtualsDescriptionPtr & virtual_columns_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_ = {});
MergeTreeIndexGranularityPtr index_granularity_);
virtual ~IMergeTreeDataPartWriter();
@ -52,7 +52,7 @@ public:
PlainMarksByName releaseCachedMarks();
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
MergeTreeIndexGranularityPtr getIndexGranularity() const { return index_granularity; }
virtual Block getColumnsSample() const = 0;
@ -76,7 +76,7 @@ protected:
MutableDataPartStoragePtr data_part_storage;
MutableColumns index_columns;
MergeTreeIndexGranularity index_granularity;
MergeTreeIndexGranularityPtr index_granularity;
/// Marks that will be saved to cache on finish.
PlainMarksByName cached_marks;
};
@ -101,6 +101,6 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWriter(
const String & marks_file_extension,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity);
MergeTreeIndexGranularityPtr computed_index_granularity);
}

View File

@ -29,7 +29,7 @@ public:
virtual void write(const Block & block) = 0;
const MergeTreeIndexGranularity & getIndexGranularity() const
MergeTreeIndexGranularityPtr getIndexGranularity() const
{
return writer->getIndexGranularity();
}

View File

@ -51,7 +51,7 @@ public:
const MergeTreeIndexGranularityInfo & getIndexGranularityInfo() const override { return data_part->index_granularity_info; }
const MergeTreeIndexGranularity & getIndexGranularity() const override { return data_part->index_granularity; }
const MergeTreeIndexGranularity & getIndexGranularity() const override { return *data_part->index_granularity; }
const SerializationInfoByName & getSerializationInfos() const override { return data_part->getSerializationInfos(); }

View File

@ -52,7 +52,7 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta
total_size_bytes_compressed += source_part->getBytesOnDisk();
total_size_bytes_uncompressed += source_part->getTotalColumnsSize().data_uncompressed;
total_size_marks += source_part->getMarksCount();
total_rows_count += source_part->index_granularity.getTotalRows();
total_rows_count += source_part->index_granularity->getTotalRows();
}
if (!future_part->parts.empty())

View File

@ -8,6 +8,7 @@
#include <Common/logger_useful.h>
#include <Core/Settings.h>
#include <Common/ProfileEvents.h>
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Compression/CompressedWriteBuffer.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/Serializations/SerializationInfo.h>
@ -72,6 +73,7 @@ namespace CurrentMetrics
namespace DB
{
namespace Setting
{
extern const SettingsBool compile_sort_description;
@ -99,6 +101,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsUInt64 vertical_merge_algorithm_min_rows_to_activate;
extern const MergeTreeSettingsBool vertical_merge_remote_filesystem_prefetch;
extern const MergeTreeSettingsBool prewarm_mark_cache;
extern const MergeTreeSettingsBool use_const_adaptive_granularity;
}
namespace ErrorCodes
@ -412,10 +415,11 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
};
auto mutations_snapshot = global_ctx->data->getMutationsSnapshot(params);
auto storage_settings = global_ctx->data->getSettings();
SerializationInfo::Settings info_settings =
{
.ratio_of_defaults_for_sparse = (*global_ctx->data->getSettings())[MergeTreeSetting::ratio_of_defaults_for_sparse_serialization],
.ratio_of_defaults_for_sparse = (*storage_settings)[MergeTreeSetting::ratio_of_defaults_for_sparse_serialization],
.choose_kind = true,
};
@ -464,6 +468,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
ctx->sum_input_rows_upper_bound = global_ctx->merge_list_element_ptr->total_rows_count;
ctx->sum_compressed_bytes_upper_bound = global_ctx->merge_list_element_ptr->total_size_bytes_compressed;
ctx->sum_uncompressed_bytes_upper_bound = global_ctx->merge_list_element_ptr->total_size_bytes_uncompressed;
global_ctx->chosen_merge_algorithm = chooseMergeAlgorithm();
global_ctx->merge_list_element_ptr->merge_algorithm.store(global_ctx->chosen_merge_algorithm, std::memory_order_relaxed);
@ -507,8 +512,14 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
throw Exception(ErrorCodes::LOGICAL_ERROR, "Merge algorithm must be chosen");
}
/// If merge is vertical we cannot calculate it
ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical);
bool use_adaptive_granularity = global_ctx->new_data_part->index_granularity_info.mark_type.adaptive;
bool use_const_adaptive_granularity = (*storage_settings)[MergeTreeSetting::use_const_adaptive_granularity];
/// If merge is vertical we cannot calculate it.
/// If granularity is constant we don't need to calculate it.
ctx->blocks_are_granules_size = use_adaptive_granularity
&& !use_const_adaptive_granularity
&& global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical;
/// Merged stream will be created and available as merged_stream variable
createMergedStream();
@ -550,7 +561,14 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
}
}
bool save_marks_in_cache = (*global_ctx->data->getSettings())[MergeTreeSetting::prewarm_mark_cache] && global_ctx->context->getMarkCache();
auto index_granularity_ptr = createMergeTreeIndexGranularity(
ctx->sum_input_rows_upper_bound,
ctx->sum_uncompressed_bytes_upper_bound,
*storage_settings,
global_ctx->new_data_part->index_granularity_info,
ctx->blocks_are_granules_size);
bool save_marks_in_cache = (*storage_settings)[MergeTreeSetting::prewarm_mark_cache] && global_ctx->context->getMarkCache();
global_ctx->to = std::make_shared<MergedBlockOutputStream>(
global_ctx->new_data_part,
@ -559,6 +577,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
MergeTreeIndexFactory::instance().getMany(global_ctx->merging_skip_indexes),
getStatisticsForColumns(global_ctx->merging_columns, global_ctx->metadata_snapshot),
ctx->compression_codec,
std::move(index_granularity_ptr),
global_ctx->txn ? global_ctx->txn->tid : Tx::PrehistoricTID,
/*reset_columns=*/ true,
save_marks_in_cache,
@ -1100,12 +1119,12 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
global_ctx->new_data_part,
global_ctx->metadata_snapshot,
columns_list,
ctx->compression_codec,
column_pipepline.indexes_to_recalc,
getStatisticsForColumns(columns_list, global_ctx->metadata_snapshot),
ctx->compression_codec,
global_ctx->to->getIndexGranularity(),
&global_ctx->written_offset_columns,
save_marks_in_cache,
global_ctx->to->getIndexGranularity());
save_marks_in_cache);
ctx->column_elems_written = 0;
}

View File

@ -243,7 +243,6 @@ private:
bool need_remove_expired_values{false};
bool force_ttl{false};
CompressionCodecPtr compression_codec{nullptr};
size_t sum_input_rows_upper_bound{0};
std::shared_ptr<RowsSourcesTemporaryFile> rows_sources_temporary_file;
std::optional<ColumnSizeEstimator> column_sizes{};
@ -261,7 +260,9 @@ private:
std::function<bool()> is_cancelled{};
/// Local variables for this stage
size_t sum_input_rows_upper_bound{0};
size_t sum_compressed_bytes_upper_bound{0};
size_t sum_uncompressed_bytes_upper_bound{0};
bool blocks_are_granules_size{false};
LoggerPtr log{getLogger("MergeTask::PrepareStage")};

View File

@ -83,6 +83,7 @@
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h>
#include <boost/range/algorithm_ext/erase.hpp>
#include <boost/algorithm/string/join.hpp>
@ -7237,7 +7238,7 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
/// It's extremely rare that some parts have final marks while others don't. To make it
/// straightforward, disable minmax_count projection when `max(pk)' encounters any part with
/// no final mark.
if (need_primary_key_max_column && !part->index_granularity.hasFinalMark())
if (need_primary_key_max_column && !part->index_granularity->hasFinalMark())
return {};
real_parts.push_back(part);
@ -8960,10 +8961,15 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::createE
auto compression_codec = getContext()->chooseCompressionCodec(0, 0);
const auto & index_factory = MergeTreeIndexFactory::instance();
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns,
MergedBlockOutputStream out(
new_data_part,
metadata_snapshot,
columns,
index_factory.getMany(metadata_snapshot->getSecondaryIndices()),
ColumnsStatistics{},
compression_codec, txn ? txn->tid : Tx::PrehistoricTID);
compression_codec,
std::make_shared<MergeTreeIndexGranularityAdaptive>(),
txn ? txn->tid : Tx::PrehistoricTID);
bool sync_on_insert = (*settings)[MergeTreeSetting::fsync_after_insert];

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergeTreeReaderCompactSingleBuffer.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
namespace DB
@ -15,6 +16,11 @@ namespace ErrorCodes
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
}
namespace MergeTreeSetting
{
extern MergeTreeSettingsBool enable_index_granularity_compression;
}
MergeTreeDataPartCompact::MergeTreeDataPartCompact(
const MergeTreeData & storage_,
const String & name_,
@ -62,7 +68,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter(
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity)
MergeTreeIndexGranularityPtr computed_index_granularity)
{
NamesAndTypesList ordered_columns_list;
std::copy_if(columns_list.begin(), columns_list.end(), std::back_inserter(ordered_columns_list),
@ -76,7 +82,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter(
data_part_name_, logger_name_, serializations_, data_part_storage_,
index_granularity_info_, storage_settings_, ordered_columns_list, metadata_snapshot, virtual_columns,
indices_to_recalc, stats_to_recalc_, marks_file_extension_,
default_codec_, writer_settings, computed_index_granularity);
default_codec_, writer_settings, std::move(computed_index_granularity));
}
@ -95,8 +101,11 @@ void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*eac
}
void MergeTreeDataPartCompact::loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, const MergeTreeIndexGranularityInfo & index_granularity_info_,
size_t columns_count, const IDataPartStorage & data_part_storage_)
MergeTreeIndexGranularityPtr & index_granularity_ptr,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
size_t columns_count,
const IDataPartStorage & data_part_storage_,
const MergeTreeSettings & storage_settings)
{
if (!index_granularity_info_.mark_type.adaptive)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MergeTreeDataPartCompact cannot be created with non-adaptive granularity.");
@ -122,10 +131,14 @@ void MergeTreeDataPartCompact::loadIndexGranularityImpl(
marks_reader->ignore(columns_count * sizeof(MarkInCompressedFile));
size_t granularity;
readBinaryLittleEndian(granularity, *marks_reader);
index_granularity_.appendMark(granularity);
index_granularity_ptr->appendMark(granularity);
}
index_granularity_.setInitialized();
if (storage_settings[MergeTreeSetting::enable_index_granularity_compression])
{
if (auto new_granularity_ptr = index_granularity_ptr->optimize())
index_granularity_ptr = std::move(new_granularity_ptr);
}
}
void MergeTreeDataPartCompact::loadIndexGranularity()
@ -133,7 +146,7 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
if (columns.empty())
throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART, "No columns in part {}", name);
loadIndexGranularityImpl(index_granularity, index_granularity_info, columns.size(), getDataPartStorage());
loadIndexGranularityImpl(index_granularity, index_granularity_info, columns.size(), getDataPartStorage(), *storage.getSettings());
}
void MergeTreeDataPartCompact::loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const
@ -152,7 +165,7 @@ void MergeTreeDataPartCompact::loadMarksToCache(const Names & column_names, Mark
info_for_read,
mark_cache,
index_granularity_info.getMarksFilePath(DATA_FILE_NAME),
index_granularity.getMarksCount(),
index_granularity->getMarksCount(),
index_granularity_info,
/*save_marks_in_cache=*/ true,
read_settings,
@ -227,7 +240,7 @@ void MergeTreeDataPartCompact::doCheckConsistency(bool require_part_metadata) co
getDataPartStorage().getRelativePath(),
std::string(fs::path(getDataPartStorage().getFullPath()) / mrk_file_name));
UInt64 expected_file_size = index_granularity_info.getMarkSizeInBytes(columns.size()) * index_granularity.getMarksCount();
UInt64 expected_file_size = index_granularity_info.getMarkSizeInBytes(columns.size()) * index_granularity->getMarksCount();
if (expected_file_size != file_size)
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,

View File

@ -60,8 +60,11 @@ public:
protected:
static void loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, const MergeTreeIndexGranularityInfo & index_granularity_info_,
size_t columns_count, const IDataPartStorage & data_part_storage_);
MergeTreeIndexGranularityPtr & index_granularity_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
size_t columns_count,
const IDataPartStorage & data_part_storage_,
const MergeTreeSettings & storage_settings);
void doCheckConsistency(bool require_part_metadata) const override;

View File

@ -3,6 +3,8 @@
#include <Storages/MergeTree/MergeTreeDataPartWriterWide.h>
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityConstant.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <DataTypes/NestedUtils.h>
#include <Core/NamesAndTypes.h>
@ -17,6 +19,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace MergeTreeSetting
{
extern MergeTreeSettingsBool enable_index_granularity_compression;
}
MergeTreeDataPartWide::MergeTreeDataPartWide(
const MergeTreeData & storage_,
const String & name_,
@ -68,14 +75,14 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter(
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity)
MergeTreeIndexGranularityPtr computed_index_granularity)
{
return std::make_unique<MergeTreeDataPartWriterWide>(
data_part_name_, logger_name_, serializations_, data_part_storage_,
index_granularity_info_, storage_settings_, columns_list,
metadata_snapshot, virtual_columns, indices_to_recalc, stats_to_recalc_,
marks_file_extension_,
default_codec_, writer_settings, computed_index_granularity);
default_codec_, writer_settings, std::move(computed_index_granularity));
}
@ -114,8 +121,11 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
}
void MergeTreeDataPartWide::loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, MergeTreeIndexGranularityInfo & index_granularity_info_,
const IDataPartStorage & data_part_storage_, const std::string & any_column_file_name)
MergeTreeIndexGranularityPtr & index_granularity_ptr,
MergeTreeIndexGranularityInfo & index_granularity_info_,
const IDataPartStorage & data_part_storage_,
const std::string & any_column_file_name,
const MergeTreeSettings & storage_settings)
{
index_granularity_info_.changeGranularityIfRequired(data_part_storage_);
@ -127,12 +137,13 @@ void MergeTreeDataPartWide::loadIndexGranularityImpl(
std::string(fs::path(data_part_storage_.getFullPath()) / marks_file_path));
size_t marks_file_size = data_part_storage_.getFileSize(marks_file_path);
size_t fixed_granularity = index_granularity_info_.fixed_index_granularity;
if (!index_granularity_info_.mark_type.adaptive && !index_granularity_info_.mark_type.compressed)
{
/// The most easy way - no need to read the file, everything is known from its size.
size_t marks_count = marks_file_size / index_granularity_info_.getMarkSizeInBytes();
index_granularity_.resizeWithFixedGranularity(marks_count, index_granularity_info_.fixed_index_granularity); /// all the same
index_granularity_ptr = std::make_shared<MergeTreeIndexGranularityConstant>(fixed_granularity, fixed_granularity, marks_count, false);
}
else
{
@ -145,6 +156,7 @@ void MergeTreeDataPartWide::loadIndexGranularityImpl(
marks_reader = std::make_unique<CompressedReadBufferFromFile>(std::move(marks_file));
size_t marks_count = 0;
while (!marks_reader->eof())
{
MarkInCompressedFile mark;
@ -157,15 +169,20 @@ void MergeTreeDataPartWide::loadIndexGranularityImpl(
if (index_granularity_info_.mark_type.adaptive)
{
readBinaryLittleEndian(granularity, *marks_reader);
index_granularity_.appendMark(granularity);
index_granularity_ptr->appendMark(granularity);
}
}
if (!index_granularity_info_.mark_type.adaptive)
index_granularity_.resizeWithFixedGranularity(marks_count, index_granularity_info_.fixed_index_granularity); /// all the same
{
index_granularity_ptr = std::make_shared<MergeTreeIndexGranularityConstant>(fixed_granularity, fixed_granularity, marks_count, false);
}
else if (storage_settings[MergeTreeSetting::enable_index_granularity_compression])
{
if (auto new_granularity_ptr = index_granularity_ptr->optimize())
index_granularity_ptr = std::move(new_granularity_ptr);
}
}
index_granularity_.setInitialized();
}
void MergeTreeDataPartWide::loadIndexGranularity()
@ -179,7 +196,7 @@ void MergeTreeDataPartWide::loadIndexGranularity()
"There are no files for column {} in part {}",
columns.front().name, getDataPartStorage().getFullPath());
loadIndexGranularityImpl(index_granularity, index_granularity_info, getDataPartStorage(), *any_column_filename);
loadIndexGranularityImpl(index_granularity, index_granularity_info, getDataPartStorage(), *any_column_filename, *storage.getSettings());
}
void MergeTreeDataPartWide::loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const
@ -209,7 +226,7 @@ void MergeTreeDataPartWide::loadMarksToCache(const Names & column_names, MarkCac
info_for_read,
mark_cache,
index_granularity_info.getMarksFilePath(*stream_name),
index_granularity.getMarksCount(),
index_granularity->getMarksCount(),
index_granularity_info,
/*save_marks_in_cache=*/ true,
read_settings,

View File

@ -55,8 +55,11 @@ public:
protected:
static void loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, MergeTreeIndexGranularityInfo & index_granularity_info_,
const IDataPartStorage & data_part_storage_, const std::string & any_column_file_name);
MergeTreeIndexGranularityPtr & index_granularity_ptr,
MergeTreeIndexGranularityInfo & index_granularity_info_,
const IDataPartStorage & data_part_storage_,
const std::string & any_column_file_name,
const MergeTreeSettings & storage_settings);
void doCheckConsistency(bool require_part_metadata) const override;

View File

@ -25,13 +25,13 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
MergeTreeIndexGranularityPtr index_granularity_)
: MergeTreeDataPartWriterOnDisk(
data_part_name_, logger_name_, serializations_,
data_part_storage_, index_granularity_info_, storage_settings_,
columns_list_, metadata_snapshot_, virtual_columns_,
indices_to_recalc_, stats_to_recalc, marks_file_extension_,
default_codec_, settings_, index_granularity_)
default_codec_, settings_, std::move(index_granularity_))
, plain_file(getDataPartStorage().writeFile(
MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION,
settings.max_compress_block_size,
@ -189,13 +189,13 @@ void MergeTreeDataPartWriterCompact::write(const Block & block, const IColumn::P
header = result_block.cloneEmpty();
columns_buffer.add(result_block.mutateColumns());
size_t current_mark_rows = index_granularity.getMarkRows(getCurrentMark());
size_t current_mark_rows = index_granularity->getMarkRows(getCurrentMark());
size_t rows_in_buffer = columns_buffer.size();
if (rows_in_buffer >= current_mark_rows)
{
Block flushed_block = header.cloneWithColumns(columns_buffer.releaseColumns());
auto granules_to_write = getGranulesToWrite(index_granularity, flushed_block.rows(), getCurrentMark(), /* last_block = */ false);
auto granules_to_write = getGranulesToWrite(*index_granularity, flushed_block.rows(), getCurrentMark(), /* last_block = */ false);
writeDataBlockPrimaryIndexAndSkipIndices(flushed_block, granules_to_write);
setCurrentMark(getCurrentMark() + granules_to_write.size());
calculateAndSerializeStatistics(flushed_block);
@ -274,12 +274,11 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(MergeTreeDataPartChecksum
if (columns_buffer.size() != 0)
{
auto block = header.cloneWithColumns(columns_buffer.releaseColumns());
auto granules_to_write = getGranulesToWrite(index_granularity, block.rows(), getCurrentMark(), /* last_block = */ true);
auto granules_to_write = getGranulesToWrite(*index_granularity, block.rows(), getCurrentMark(), /*last_block=*/ true);
if (!granules_to_write.back().is_complete)
{
/// Correct last mark as it should contain exact amount of rows.
index_granularity.popMark();
index_granularity.appendMark(granules_to_write.back().rows_to_write);
index_granularity->adjustLastMark(granules_to_write.back().rows_to_write);
}
writeDataBlockPrimaryIndexAndSkipIndices(block, granules_to_write);
}
@ -375,11 +374,11 @@ static void fillIndexGranularityImpl(
void MergeTreeDataPartWriterCompact::fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block)
{
size_t index_offset = 0;
if (index_granularity.getMarksCount() > getCurrentMark())
index_offset = index_granularity.getMarkRows(getCurrentMark()) - columns_buffer.size();
if (index_granularity->getMarksCount() > getCurrentMark())
index_offset = index_granularity->getMarkRows(getCurrentMark()) - columns_buffer.size();
fillIndexGranularityImpl(
index_granularity,
*index_granularity,
index_offset,
index_granularity_for_block,
rows_in_block);

View File

@ -25,7 +25,7 @@ public:
const String & marks_file_extension,
const CompressionCodecPtr & default_codec,
const MergeTreeWriterSettings & settings,
const MergeTreeIndexGranularity & index_granularity);
MergeTreeIndexGranularityPtr index_granularity_);
void write(const Block & block, const IColumn::Permutation * permutation) override;

View File

@ -162,20 +162,20 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk(
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
MergeTreeIndexGranularityPtr index_granularity_)
: IMergeTreeDataPartWriter(
data_part_name_, serializations_, data_part_storage_, index_granularity_info_,
storage_settings_, columns_list_, metadata_snapshot_, virtual_columns_, settings_, index_granularity_)
storage_settings_, columns_list_, metadata_snapshot_, virtual_columns_, settings_, std::move(index_granularity_))
, skip_indices(indices_to_recalc_)
, stats(stats_to_recalc_)
, marks_file_extension(marks_file_extension_)
, default_codec(default_codec_)
, compute_granularity(index_granularity.empty())
, compute_granularity(index_granularity->empty())
, compress_primary_key(settings.compress_primary_key)
, execution_stats(skip_indices.size(), stats.size())
, log(getLogger(logger_name_ + " (DataPartWriter)"))
{
if (settings.blocks_are_granules_size && !index_granularity.empty())
if (settings.blocks_are_granules_size && !index_granularity->empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Can't take information about index granularity from blocks, when non empty index_granularity array specified");
@ -189,59 +189,11 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk(
initStatistics();
}
// Implementation is split into static functions for ability
/// of making unit tests without creation instance of IMergeTreeDataPartWriter,
/// which requires a lot of dependencies and access to filesystem.
static size_t computeIndexGranularityImpl(
const Block & block,
size_t index_granularity_bytes,
size_t fixed_index_granularity_rows,
bool blocks_are_granules,
bool can_use_adaptive_index_granularity)
{
size_t rows_in_block = block.rows();
size_t index_granularity_for_block;
if (!can_use_adaptive_index_granularity)
{
index_granularity_for_block = fixed_index_granularity_rows;
}
else
{
size_t block_size_in_memory = block.bytes();
if (blocks_are_granules)
{
index_granularity_for_block = rows_in_block;
}
else if (block_size_in_memory >= index_granularity_bytes)
{
size_t granules_in_block = block_size_in_memory / index_granularity_bytes;
index_granularity_for_block = rows_in_block / granules_in_block;
}
else
{
size_t size_of_row_in_bytes = std::max(block_size_in_memory / rows_in_block, 1UL);
index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes;
}
}
/// We should be less or equal than fixed index granularity.
/// But if block size is a granule size then do not adjust it.
/// Granularity greater than fixed granularity might come from compact part.
if (!blocks_are_granules)
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
/// Very rare case when index granularity bytes less than single row.
if (index_granularity_for_block == 0)
index_granularity_for_block = 1;
return index_granularity_for_block;
}
size_t MergeTreeDataPartWriterOnDisk::computeIndexGranularity(const Block & block) const
{
return computeIndexGranularityImpl(
block,
return DB::computeIndexGranularity(
block.rows(),
block.bytes(),
(*storage_settings)[MergeTreeSetting::index_granularity_bytes],
(*storage_settings)[MergeTreeSetting::index_granularity],
settings.blocks_are_granules_size,
@ -433,7 +385,7 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
{
bool write_final_mark = (with_final_mark && data_written);
if (write_final_mark && compute_granularity)
index_granularity.appendMark(0);
index_granularity->appendMark(0);
if (index_file_hashing_stream)
{

View File

@ -116,7 +116,7 @@ public:
const String & marks_file_extension,
const CompressionCodecPtr & default_codec,
const MergeTreeWriterSettings & settings,
const MergeTreeIndexGranularity & index_granularity);
MergeTreeIndexGranularityPtr index_granularity_);
void setWrittenOffsetColumns(WrittenOffsetColumns * written_offset_columns_)
{

View File

@ -99,13 +99,13 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
MergeTreeIndexGranularityPtr index_granularity_)
: MergeTreeDataPartWriterOnDisk(
data_part_name_, logger_name_, serializations_,
data_part_storage_, index_granularity_info_, storage_settings_,
columns_list_, metadata_snapshot_, virtual_columns_,
indices_to_recalc_, stats_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_)
default_codec_, settings_, std::move(index_granularity_))
{
if (settings.save_marks_in_cache)
{
@ -238,8 +238,8 @@ void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_wri
if (settings.can_use_adaptive_granularity && settings.blocks_are_granules_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incomplete granules are not allowed while blocks are granules size. "
"Mark number {} (rows {}), rows written in last mark {}, rows to write in last mark from block {} (from row {}), "
"total marks currently {}", last_granule.mark_number, index_granularity.getMarkRows(last_granule.mark_number),
rows_written_in_last_mark, last_granule.rows_to_write, last_granule.start_row, index_granularity.getMarksCount());
"total marks currently {}", last_granule.mark_number, index_granularity->getMarkRows(last_granule.mark_number),
rows_written_in_last_mark, last_granule.rows_to_write, last_granule.start_row, index_granularity->getMarksCount());
/// Shift forward except last granule
setCurrentMark(getCurrentMark() + granules_written.size() - 1);
@ -273,10 +273,15 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
/// but not in case of vertical part of vertical merge)
if (compute_granularity)
{
size_t index_granularity_for_block = computeIndexGranularity(block_to_write);
size_t index_granularity_for_block;
if (auto constant_granularity = index_granularity->getConstantGranularity())
index_granularity_for_block = *constant_granularity;
else
index_granularity_for_block = computeIndexGranularity(block_to_write);
if (rows_written_in_last_mark > 0)
{
size_t rows_left_in_last_mark = index_granularity.getMarkRows(getCurrentMark()) - rows_written_in_last_mark;
size_t rows_left_in_last_mark = index_granularity->getMarkRows(getCurrentMark()) - rows_written_in_last_mark;
/// Previous granularity was much bigger than our new block's
/// granularity let's adjust it, because we want add new
/// heavy-weight blocks into small old granule.
@ -294,7 +299,7 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
fillIndexGranularity(index_granularity_for_block, block_to_write.rows());
}
auto granules_to_write = getGranulesToWrite(index_granularity, block_to_write.rows(), getCurrentMark(), rows_written_in_last_mark);
auto granules_to_write = getGranulesToWrite(*index_granularity, block_to_write.rows(), getCurrentMark(), rows_written_in_last_mark);
auto offset_columns = written_offset_columns ? *written_offset_columns : WrittenOffsetColumns{};
Block primary_key_block;
@ -482,7 +487,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
throw Exception(ErrorCodes::LOGICAL_ERROR,
"We have to add new mark for column, but already have non written mark. "
"Current mark {}, total marks {}, offset {}",
getCurrentMark(), index_granularity.getMarksCount(), rows_written_in_last_mark);
getCurrentMark(), index_granularity->getMarksCount(), rows_written_in_last_mark);
last_non_written_marks[name] = getCurrentMarksForColumn(name_and_type, column.getPtr(), offset_columns);
}
@ -502,7 +507,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
throw Exception(ErrorCodes::LOGICAL_ERROR, "No mark was saved for incomplete granule for column {}", backQuoteIfNeed(name));
for (const auto & mark : marks_it->second)
flushMarkToFile(mark, index_granularity.getMarkRows(granule.mark_number));
flushMarkToFile(mark, index_granularity->getMarkRows(granule.mark_number));
last_non_written_marks.erase(marks_it);
}
}
@ -549,10 +554,10 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
for (mark_num = 0; !mrk_in->eof(); ++mark_num)
{
if (mark_num > index_granularity.getMarksCount())
if (mark_num > index_granularity->getMarksCount())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Incorrect number of marks in memory {}, on disk (at least) {}",
index_granularity.getMarksCount(), mark_num + 1);
index_granularity->getMarksCount(), mark_num + 1);
readBinaryLittleEndian(offset_in_compressed_file, *mrk_in);
readBinaryLittleEndian(offset_in_decompressed_block, *mrk_in);
@ -583,10 +588,10 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Still have {} rows in bin stream, last mark #{}"
" index granularity size {}, last rows {}",
column->size(), mark_num, index_granularity.getMarksCount(), index_granularity_rows);
column->size(), mark_num, index_granularity->getMarksCount(), index_granularity_rows);
}
if (index_granularity_rows != index_granularity.getMarkRows(mark_num))
if (index_granularity_rows != index_granularity->getMarkRows(mark_num))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -594,8 +599,8 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
" (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}, total marks {}",
getDataPartStorage().getFullPath(),
mark_num, offset_in_compressed_file, offset_in_decompressed_block,
index_granularity.getMarkRows(mark_num), index_granularity_rows,
index_granularity.getMarksCount());
index_granularity->getMarkRows(mark_num), index_granularity_rows,
index_granularity->getMarksCount());
}
auto column = type->createColumn();
@ -630,7 +635,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for mark #{} (compressed offset {}, decompressed offset {}), "
"actually in bin file {}, in mrk file {}, total marks {}",
mark_num, offset_in_compressed_file, offset_in_decompressed_block, column->size(),
index_granularity.getMarkRows(mark_num), index_granularity.getMarksCount());
index_granularity->getMarkRows(mark_num), index_granularity->getMarksCount());
}
}
@ -638,7 +643,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Still have something in marks stream, last mark #{}"
" index granularity size {}, last rows {}",
mark_num, index_granularity.getMarksCount(), index_granularity_rows);
mark_num, index_granularity->getMarksCount(), index_granularity_rows);
if (!bin_in.eof())
{
auto column = type->createColumn();
@ -648,7 +653,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Still have {} rows in bin stream, last mark #{}"
" index granularity size {}, last rows {}",
column->size(), mark_num, index_granularity.getMarksCount(), index_granularity_rows);
column->size(), mark_num, index_granularity->getMarksCount(), index_granularity_rows);
}
}
@ -665,8 +670,8 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(MergeTreeDataPartChecksums &
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Incomplete granule is not allowed while blocks are granules size even for last granule. "
"Mark number {} (rows {}), rows written for last mark {}, total marks {}",
getCurrentMark(), index_granularity.getMarkRows(getCurrentMark()),
rows_written_in_last_mark, index_granularity.getMarksCount());
getCurrentMark(), index_granularity->getMarkRows(getCurrentMark()),
rows_written_in_last_mark, index_granularity->getMarksCount());
adjustLastMarkIfNeedAndFlushToDisk(rows_written_in_last_mark);
}
@ -785,16 +790,16 @@ static void fillIndexGranularityImpl(
void MergeTreeDataPartWriterWide::fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block)
{
if (getCurrentMark() < index_granularity.getMarksCount() && getCurrentMark() != index_granularity.getMarksCount() - 1)
if (getCurrentMark() < index_granularity->getMarksCount() && getCurrentMark() != index_granularity->getMarksCount() - 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to add marks, while current mark {}, but total marks {}",
getCurrentMark(), index_granularity.getMarksCount());
getCurrentMark(), index_granularity->getMarksCount());
size_t index_offset = 0;
if (rows_written_in_last_mark != 0)
index_offset = index_granularity.getLastMarkRows() - rows_written_in_last_mark;
index_offset = index_granularity->getLastMarkRows() - rows_written_in_last_mark;
fillIndexGranularityImpl(
index_granularity,
*index_granularity,
index_offset,
index_granularity_for_block,
rows_in_block);
@ -813,27 +818,26 @@ void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk(size_t new_
/// other columns
if (compute_granularity && settings.can_use_adaptive_granularity)
{
if (getCurrentMark() != index_granularity.getMarksCount() - 1)
if (getCurrentMark() != index_granularity->getMarksCount() - 1)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Non last mark {} (with {} rows) having rows offset {}, total marks {}",
getCurrentMark(), index_granularity.getMarkRows(getCurrentMark()),
rows_written_in_last_mark, index_granularity.getMarksCount());
getCurrentMark(), index_granularity->getMarkRows(getCurrentMark()),
rows_written_in_last_mark, index_granularity->getMarksCount());
index_granularity.popMark();
index_granularity.appendMark(new_rows_in_last_mark);
index_granularity->adjustLastMark(new_rows_in_last_mark);
}
/// Last mark should be filled, otherwise it's a bug
if (last_non_written_marks.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No saved marks for last mark {} having rows offset {}, total marks {}",
getCurrentMark(), rows_written_in_last_mark, index_granularity.getMarksCount());
getCurrentMark(), rows_written_in_last_mark, index_granularity->getMarksCount());
if (rows_written_in_last_mark == new_rows_in_last_mark)
{
for (const auto & [name, marks] : last_non_written_marks)
{
for (const auto & mark : marks)
flushMarkToFile(mark, index_granularity.getMarkRows(getCurrentMark()));
flushMarkToFile(mark, index_granularity->getMarkRows(getCurrentMark()));
}
last_non_written_marks.clear();

View File

@ -35,7 +35,7 @@ public:
const String & marks_file_extension,
const CompressionCodecPtr & default_codec,
const MergeTreeWriterSettings & settings,
const MergeTreeIndexGranularity & index_granularity);
MergeTreeIndexGranularityPtr index_granularity_);
void write(const Block & block, const IColumn::Permutation * permutation) override;

View File

@ -129,7 +129,7 @@ size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
{
MarkRanges part_ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, {}, &exact_ranges, settings, log);
for (const auto & range : part_ranges)
rows_count += part->index_granularity.getRowsCountInRange(range);
rows_count += part->index_granularity->getRowsCountInRange(range);
}
UNUSED(exact_ranges);
@ -688,7 +688,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
auto & part = parts[part_index];
RangesInDataPart ranges(part, part_index);
size_t total_marks_count = part->index_granularity.getMarksCountWithoutFinal();
size_t total_marks_count = part->index_granularity->getMarksCountWithoutFinal();
if (metadata_snapshot->hasPrimaryKey() || part_offset_condition)
{
@ -1044,11 +1044,11 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
{
MarkRanges res;
size_t marks_count = part->index_granularity.getMarksCount();
size_t marks_count = part->index_granularity->getMarksCount();
if (marks_count == 0)
return res;
bool has_final_mark = part->index_granularity.hasFinalMark();
bool has_final_mark = part->index_granularity->hasFinalMark();
bool key_condition_useful = !key_condition.alwaysUnknownOrTrue();
bool part_offset_condition_useful = part_offset_condition && !part_offset_condition->alwaysUnknownOrTrue();
@ -1160,16 +1160,16 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
auto check_part_offset_condition = [&]()
{
auto begin = part->index_granularity.getMarkStartingRow(range.begin);
auto end = part->index_granularity.getMarkStartingRow(range.end) - 1;
auto begin = part->index_granularity->getMarkStartingRow(range.begin);
auto end = part->index_granularity->getMarkStartingRow(range.end) - 1;
if (begin > end)
{
/// Empty mark (final mark)
return BoolMask(false, true);
}
part_offset_left[0] = part->index_granularity.getMarkStartingRow(range.begin);
part_offset_right[0] = part->index_granularity.getMarkStartingRow(range.end) - 1;
part_offset_left[0] = part->index_granularity->getMarkStartingRow(range.begin);
part_offset_right[0] = part->index_granularity->getMarkStartingRow(range.end) - 1;
part_offset_left[1] = part->name;
part_offset_right[1] = part->name;
@ -1381,9 +1381,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
part->index_granularity_info.fixed_index_granularity,
part->index_granularity_info.index_granularity_bytes);
size_t marks_count = part->getMarksCount();
size_t final_mark = part->index_granularity.hasFinalMark();
size_t index_marks_count = (marks_count - final_mark + index_granularity - 1) / index_granularity;
size_t marks_count = part->index_granularity->getMarksCountWithoutFinal();
size_t index_marks_count = (marks_count + index_granularity - 1) / index_granularity;
MarkRanges index_ranges;
for (const auto & range : ranges)
@ -1431,8 +1430,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
for (auto row : rows)
{
const MergeTreeIndexGranularity & merge_tree_index_granularity = part->index_granularity;
size_t num_marks = merge_tree_index_granularity.countMarksForRows(index_mark * index_granularity, row);
size_t num_marks = part->index_granularity->countMarksForRows(index_mark * index_granularity, row);
MarkRange data_range(
std::max(ranges[i].begin, (index_mark * index_granularity) + num_marks),
@ -1505,9 +1503,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingMergedIndex(
part->index_granularity_info.fixed_index_granularity,
part->index_granularity_info.index_granularity_bytes);
size_t marks_count = part->getMarksCount();
size_t final_mark = part->index_granularity.hasFinalMark();
size_t index_marks_count = (marks_count - final_mark + index_granularity - 1) / index_granularity;
size_t marks_count = part->index_granularity->getMarksCountWithoutFinal();
size_t index_marks_count = (marks_count + index_granularity - 1) / index_granularity;
std::vector<std::unique_ptr<MergeTreeIndexReader>> readers;
for (const auto & index_helper : indices)
@ -1607,9 +1604,7 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
continue;
}
size_t num_granules = part->getMarksCount();
if (num_granules && part->index_granularity.hasFinalMark())
--num_granules;
size_t num_granules = part->index_granularity->getMarksCountWithoutFinal();
counters.num_initial_selected_parts += 1;
counters.num_initial_selected_granules += num_granules;
@ -1676,9 +1671,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
if (part->uuid != UUIDHelpers::Nil && ignored_part_uuids->has(part->uuid))
continue;
size_t num_granules = part->getMarksCount();
if (num_granules && part->index_granularity.hasFinalMark())
--num_granules;
size_t num_granules = part->index_granularity->getMarksCountWithoutFinal();
counters.num_initial_selected_parts += 1;
counters.num_initial_selected_granules += num_granules;

View File

@ -687,6 +687,13 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
bool save_marks_in_cache = (*data_settings)[MergeTreeSetting::prewarm_mark_cache] && data.getContext()->getMarkCache();
auto index_granularity_ptr = createMergeTreeIndexGranularity(
block.rows(),
block.bytes(),
*data.getSettings(),
new_data_part->index_granularity_info,
/*blocks_are_granules=*/ false);
auto out = std::make_unique<MergedBlockOutputStream>(
new_data_part,
metadata_snapshot,
@ -694,6 +701,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
indices,
statistics,
compression_codec,
std::move(index_granularity_ptr),
context->getCurrentTransaction() ? context->getCurrentTransaction()->tid : Tx::PrehistoricTID,
/*reset_columns=*/ false,
save_marks_in_cache,
@ -834,6 +842,13 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
bool save_marks_in_cache = (*data.getSettings())[MergeTreeSetting::prewarm_mark_cache] && data.getContext()->getMarkCache();
auto index_granularity_ptr = createMergeTreeIndexGranularity(
block.rows(),
block.bytes(),
*data.getSettings(),
new_data_part->index_granularity_info,
/*blocks_are_granules=*/ false);
auto out = std::make_unique<MergedBlockOutputStream>(
new_data_part,
metadata_snapshot,
@ -842,6 +857,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
/// TODO(hanfei): It should be helpful to write statistics for projection result.
ColumnsStatistics{},
compression_codec,
std::move(index_granularity_ptr),
Tx::PrehistoricTID,
/*reset_columns=*/ false,
save_marks_in_cache,

View File

@ -1,77 +1,23 @@
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Common/Exception.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityConstant.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
MergeTreeIndexGranularity::MergeTreeIndexGranularity(const std::vector<size_t> & marks_rows_partial_sums_)
: marks_rows_partial_sums(marks_rows_partial_sums_)
namespace MergeTreeSetting
{
}
/// Rows after mark to next mark
size_t MergeTreeIndexGranularity::getMarkRows(size_t mark_index) const
{
if (mark_index >= getMarksCount())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get non existing mark {}, while size is {}", mark_index, getMarksCount());
if (mark_index == 0)
return marks_rows_partial_sums[0];
return marks_rows_partial_sums[mark_index] - marks_rows_partial_sums[mark_index - 1];
}
size_t MergeTreeIndexGranularity::getMarkStartingRow(size_t mark_index) const
{
if (mark_index == 0)
return 0;
return marks_rows_partial_sums[mark_index - 1];
}
size_t MergeTreeIndexGranularity::getMarksCount() const
{
return marks_rows_partial_sums.size();
}
size_t MergeTreeIndexGranularity::getTotalRows() const
{
if (marks_rows_partial_sums.empty())
return 0;
return marks_rows_partial_sums.back();
}
void MergeTreeIndexGranularity::appendMark(size_t rows_count)
{
if (marks_rows_partial_sums.empty())
marks_rows_partial_sums.push_back(rows_count);
else
marks_rows_partial_sums.push_back(marks_rows_partial_sums.back() + rows_count);
}
void MergeTreeIndexGranularity::addRowsToLastMark(size_t rows_count)
{
if (marks_rows_partial_sums.empty())
marks_rows_partial_sums.push_back(rows_count);
else
marks_rows_partial_sums.back() += rows_count;
}
void MergeTreeIndexGranularity::popMark()
{
if (!marks_rows_partial_sums.empty())
marks_rows_partial_sums.pop_back();
}
size_t MergeTreeIndexGranularity::getRowsCountInRange(size_t begin, size_t end) const
{
size_t subtrahend = 0;
if (begin != 0)
subtrahend = marks_rows_partial_sums[begin - 1];
return marks_rows_partial_sums[end - 1] - subtrahend;
extern const MergeTreeSettingsUInt64 index_granularity;
extern const MergeTreeSettingsUInt64 index_granularity_bytes;
extern const MergeTreeSettingsBool use_const_adaptive_granularity;
}
size_t MergeTreeIndexGranularity::getRowsCountInRange(const MarkRange & range) const
@ -87,55 +33,118 @@ size_t MergeTreeIndexGranularity::getRowsCountInRanges(const MarkRanges & ranges
return total;
}
size_t MergeTreeIndexGranularity::countMarksForRows(size_t from_mark, size_t number_of_rows) const
size_t MergeTreeIndexGranularity::getMarksCountWithoutFinal() const
{
size_t rows_before_mark = getMarkStartingRow(from_mark);
size_t last_row_pos = rows_before_mark + number_of_rows;
auto it = std::upper_bound(marks_rows_partial_sums.begin(), marks_rows_partial_sums.end(), last_row_pos);
size_t to_mark = it - marks_rows_partial_sums.begin();
return to_mark - from_mark;
size_t total = getMarksCount();
if (total == 0)
return total;
return total - hasFinalMark();
}
size_t MergeTreeIndexGranularity::countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const
size_t MergeTreeIndexGranularity::getMarkStartingRow(size_t mark_index) const
{
size_t rows_before_mark = getMarkStartingRow(from_mark);
size_t last_row_pos = rows_before_mark + offset_in_rows + number_of_rows;
auto it = std::upper_bound(marks_rows_partial_sums.begin(), marks_rows_partial_sums.end(), last_row_pos);
size_t to_mark = it - marks_rows_partial_sums.begin();
return getRowsCountInRange(from_mark, std::max(1UL, to_mark)) - offset_in_rows;
return getRowsCountInRange(0, mark_index);
}
void MergeTreeIndexGranularity::resizeWithFixedGranularity(size_t size, size_t fixed_granularity)
size_t MergeTreeIndexGranularity::getLastMarkRows() const
{
marks_rows_partial_sums.resize(size);
return getMarkRows(getMarksCount() - 1);
}
size_t prev = 0;
for (size_t i = 0; i < size; ++i)
size_t MergeTreeIndexGranularity::getLastNonFinalMarkRows() const
{
size_t last_mark_rows = getMarkRows(getMarksCount() - 1);
if (last_mark_rows != 0)
return last_mark_rows;
return getMarkRows(getMarksCount() - 2);
}
void MergeTreeIndexGranularity::addRowsToLastMark(size_t rows_count)
{
if (hasFinalMark())
{
marks_rows_partial_sums[i] = fixed_granularity + prev;
prev = marks_rows_partial_sums[i];
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add rows to final mark");
}
else if (empty())
{
appendMark(rows_count);
}
else
{
adjustLastMark(getLastMarkRows() + rows_count);
}
}
std::string MergeTreeIndexGranularity::describe() const
size_t computeIndexGranularity(
size_t rows,
size_t bytes_uncompressed,
size_t index_granularity_bytes,
size_t fixed_index_granularity_rows,
bool blocks_are_granules,
bool can_use_adaptive_index_granularity)
{
return fmt::format("initialized: {}, marks_rows_partial_sums: [{}]", initialized, fmt::join(marks_rows_partial_sums, ", "));
size_t index_granularity_for_block;
if (!can_use_adaptive_index_granularity)
{
index_granularity_for_block = fixed_index_granularity_rows;
}
else
{
if (blocks_are_granules)
{
index_granularity_for_block = rows;
}
else if (bytes_uncompressed >= index_granularity_bytes)
{
size_t granules_in_block = bytes_uncompressed / index_granularity_bytes;
index_granularity_for_block = rows / granules_in_block;
}
else
{
size_t size_of_row_in_bytes = std::max(bytes_uncompressed / rows, 1UL);
index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes;
}
}
/// We should be less or equal than fixed index granularity.
/// But if block size is a granule size then do not adjust it.
/// Granularity greater than fixed granularity might come from compact part.
if (!blocks_are_granules)
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
/// Very rare case when index granularity bytes less than single row.
if (index_granularity_for_block == 0)
index_granularity_for_block = 1;
return index_granularity_for_block;
}
void MergeTreeIndexGranularity::shrinkToFitInMemory()
MergeTreeIndexGranularityPtr createMergeTreeIndexGranularity(
size_t rows,
size_t bytes_uncompressed,
const MergeTreeSettings & settings,
const MergeTreeIndexGranularityInfo & info,
bool blocks_are_granules)
{
marks_rows_partial_sums.shrink_to_fit();
bool use_adaptive_granularity = info.mark_type.adaptive;
bool use_const_adaptive_granularity = settings[MergeTreeSetting::use_const_adaptive_granularity];
bool is_compact_part = info.mark_type.part_type == MergeTreeDataPartType::Compact;
/// Compact parts cannot work without adaptive granularity.
/// If part is empty create adaptive granularity because constant granularity doesn't support this corner case.
if (rows == 0 || blocks_are_granules || is_compact_part || (use_adaptive_granularity && !use_const_adaptive_granularity))
return std::make_shared<MergeTreeIndexGranularityAdaptive>();
size_t computed_granularity = computeIndexGranularity(
rows,
bytes_uncompressed,
settings[MergeTreeSetting::index_granularity_bytes],
settings[MergeTreeSetting::index_granularity],
blocks_are_granules,
use_adaptive_granularity);
return std::make_shared<MergeTreeIndexGranularityConstant>(computed_granularity);
}
uint64_t MergeTreeIndexGranularity::getBytesSize() const
{
return marks_rows_partial_sums.size() * sizeof(size_t);
}
uint64_t MergeTreeIndexGranularity::getBytesAllocated() const
{
return marks_rows_partial_sums.capacity() * sizeof(size_t);
}
}

View File

@ -1,35 +1,28 @@
#pragma once
#include <vector>
#include <optional>
#include <Storages/MergeTree/MarkRange.h>
namespace DB
{
/// Class contains information about index granularity in rows of IMergeTreeDataPart
/// Inside it contains vector of partial sums of rows after mark:
/// |-----|---|----|----|
/// | 5 | 8 | 12 | 16 |
/// If user doesn't specify setting index_granularity_bytes for MergeTree* table
/// all values in inner vector would have constant stride (default 8192).
/// Class that contains information about index granularity in rows of IMergeTreeDataPart
class MergeTreeIndexGranularity
{
private:
std::vector<size_t> marks_rows_partial_sums;
bool initialized = false;
public:
MergeTreeIndexGranularity() = default;
explicit MergeTreeIndexGranularity(const std::vector<size_t> & marks_rows_partial_sums_);
virtual ~MergeTreeIndexGranularity() = default;
/// Returns granularity if it is constant for whole part (except last granule).
virtual std::optional<size_t> getConstantGranularity() const = 0;
/// Return count of rows between marks
virtual size_t getRowsCountInRange(size_t begin, size_t end) const = 0;
/// Return count of rows between marks
size_t getRowsCountInRange(const MarkRange & range) const;
/// Return count of rows between marks
size_t getRowsCountInRange(size_t begin, size_t end) const;
/// Return sum of rows between all ranges
size_t getRowsCountInRanges(const MarkRanges & ranges) const;
/// Return number of marks, starting from `from_marks` that contain `number_of_rows`
size_t countMarksForRows(size_t from_mark, size_t number_of_rows) const;
virtual size_t countMarksForRows(size_t from_mark, size_t number_of_rows) const = 0;
/// Return number of rows, starting from `from_mark`, that contains amount of `number_of_rows`
/// and possible some offset_in_rows from `from_mark`
@ -37,74 +30,65 @@ public:
/// |-----|---------------------------|----|----|
/// ^------------------------^-----------^
//// from_mark offset_in_rows number_of_rows
size_t countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const;
virtual size_t countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const = 0;
/// Total marks
size_t getMarksCount() const;
virtual size_t getMarksCount() const = 0;
/// Total rows
size_t getTotalRows() const;
virtual size_t getTotalRows() const = 0;
/// Total number marks without final mark if it exists
size_t getMarksCountWithoutFinal() const { return getMarksCount() - hasFinalMark(); }
size_t getMarksCountWithoutFinal() const;
/// Rows after mark to next mark
size_t getMarkRows(size_t mark_index) const;
virtual size_t getMarkRows(size_t mark_index) const = 0;
/// Return amount of rows before mark
size_t getMarkStartingRow(size_t mark_index) const;
/// Amount of rows after last mark
size_t getLastMarkRows() const
{
size_t last = marks_rows_partial_sums.size() - 1;
return getMarkRows(last);
}
size_t getLastMarkRows() const;
size_t getLastNonFinalMarkRows() const
{
size_t last_mark_rows = getLastMarkRows();
if (last_mark_rows != 0)
return last_mark_rows;
return getMarkRows(marks_rows_partial_sums.size() - 2);
}
/// Amount of rows after last non-final mark
size_t getLastNonFinalMarkRows() const;
bool hasFinalMark() const
{
return getLastMarkRows() == 0;
}
virtual bool hasFinalMark() const = 0;
bool empty() const { return getMarksCount() == 0; }
bool empty() const
{
return marks_rows_partial_sums.empty();
}
/// Add new mark with rows_count.
virtual void appendMark(size_t rows_count) = 0;
bool isInitialized() const
{
return initialized;
}
void setInitialized()
{
initialized = true;
}
/// Add new mark with rows_count
void appendMark(size_t rows_count);
/// Extends last mark by rows_count.
/// Sets last mark equal to rows_count.
virtual void adjustLastMark(size_t rows_count) = 0;
void addRowsToLastMark(size_t rows_count);
/// Drops last mark if any exists.
void popMark();
virtual uint64_t getBytesSize() const = 0;
virtual uint64_t getBytesAllocated() const = 0;
/// Add `size` of marks with `fixed_granularity` rows
void resizeWithFixedGranularity(size_t size, size_t fixed_granularity);
std::string describe() const;
void shrinkToFitInMemory();
uint64_t getBytesSize() const;
uint64_t getBytesAllocated() const;
/// Possibly optimizes values in memory (for example, to constant value).
/// Returns new optimized index granularity structure or nullptr if no optimization is not applicable.
virtual std::shared_ptr<MergeTreeIndexGranularity> optimize() = 0;
virtual std::string describe() const = 0;
};
using MergeTreeIndexGranularityPtr = std::shared_ptr<MergeTreeIndexGranularity>;
size_t computeIndexGranularity(
size_t rows,
size_t bytes_uncompressed,
size_t index_granularity_bytes,
size_t fixed_index_granularity_rows,
bool blocks_are_granules,
bool can_use_adaptive_index_granularity);
struct MergeTreeSettings;
struct MergeTreeIndexGranularityInfo;
MergeTreeIndexGranularityPtr createMergeTreeIndexGranularity(
size_t rows,
size_t bytes_uncompressed,
const MergeTreeSettings & settings,
const MergeTreeIndexGranularityInfo & info,
bool blocks_are_granules);
}

View File

@ -0,0 +1,152 @@
#include <Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityConstant.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
MergeTreeIndexGranularityAdaptive::MergeTreeIndexGranularityAdaptive(const std::vector<size_t> & marks_rows_partial_sums_)
: marks_rows_partial_sums(marks_rows_partial_sums_)
{
}
/// Rows after mark to next mark
size_t MergeTreeIndexGranularityAdaptive::getMarkRows(size_t mark_index) const
{
if (mark_index >= getMarksCount())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get non existing mark {}, while size is {}", mark_index, getMarksCount());
if (mark_index == 0)
return marks_rows_partial_sums[0];
return marks_rows_partial_sums[mark_index] - marks_rows_partial_sums[mark_index - 1];
}
bool MergeTreeIndexGranularityAdaptive::hasFinalMark() const
{
if (marks_rows_partial_sums.empty())
return false;
return getLastMarkRows() == 0;
}
size_t MergeTreeIndexGranularityAdaptive::getMarksCount() const
{
return marks_rows_partial_sums.size();
}
size_t MergeTreeIndexGranularityAdaptive::getTotalRows() const
{
if (marks_rows_partial_sums.empty())
return 0;
return marks_rows_partial_sums.back();
}
void MergeTreeIndexGranularityAdaptive::appendMark(size_t rows_count)
{
if (hasFinalMark())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot append mark after final");
}
else if (marks_rows_partial_sums.empty())
{
marks_rows_partial_sums.push_back(rows_count);
}
else
{
marks_rows_partial_sums.push_back(marks_rows_partial_sums.back() + rows_count);
}
}
void MergeTreeIndexGranularityAdaptive::adjustLastMark(size_t rows_count)
{
if (hasFinalMark())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot adjust final mark");
}
else if (marks_rows_partial_sums.empty())
{
marks_rows_partial_sums.push_back(rows_count);
}
else
{
marks_rows_partial_sums.pop_back();
appendMark(rows_count);
}
}
size_t MergeTreeIndexGranularityAdaptive::getRowsCountInRange(size_t begin, size_t end) const
{
if (end > getMarksCount())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get marks in range [{}; {}), while size is {}", begin, end, getMarksCount());
if (end == 0)
return 0;
size_t subtrahend = 0;
if (begin != 0)
subtrahend = marks_rows_partial_sums[begin - 1];
return marks_rows_partial_sums[end - 1] - subtrahend;
}
size_t MergeTreeIndexGranularityAdaptive::countMarksForRows(size_t from_mark, size_t number_of_rows) const
{
size_t rows_before_mark = getMarkStartingRow(from_mark);
size_t last_row_pos = rows_before_mark + number_of_rows;
auto it = std::upper_bound(marks_rows_partial_sums.begin(), marks_rows_partial_sums.end(), last_row_pos);
size_t to_mark = it - marks_rows_partial_sums.begin();
return to_mark - from_mark;
}
size_t MergeTreeIndexGranularityAdaptive::countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const
{
size_t rows_before_mark = getMarkStartingRow(from_mark);
size_t last_row_pos = rows_before_mark + offset_in_rows + number_of_rows;
auto it = std::upper_bound(marks_rows_partial_sums.begin(), marks_rows_partial_sums.end(), last_row_pos);
size_t to_mark = it - marks_rows_partial_sums.begin();
return getRowsCountInRange(from_mark, std::max(1UL, to_mark)) - offset_in_rows;
}
uint64_t MergeTreeIndexGranularityAdaptive::getBytesSize() const
{
return marks_rows_partial_sums.size() * sizeof(size_t);
}
uint64_t MergeTreeIndexGranularityAdaptive::getBytesAllocated() const
{
return marks_rows_partial_sums.capacity() * sizeof(size_t);
}
std::shared_ptr<MergeTreeIndexGranularity> MergeTreeIndexGranularityAdaptive::optimize()
{
size_t marks_count = getMarksCountWithoutFinal();
if (marks_count == 0)
return nullptr;
size_t first_mark = getMarkRows(0);
for (size_t i = 1; i < marks_count - 1; ++i)
{
if (getMarkRows(i) != first_mark)
{
/// We cannot optimize to constant but at least optimize memory usage.
marks_rows_partial_sums.shrink_to_fit();
return nullptr;
}
}
size_t last_mark = getMarkRows(marks_count - 1);
return std::make_shared<MergeTreeIndexGranularityConstant>(first_mark, last_mark, marks_count, hasFinalMark());
}
std::string MergeTreeIndexGranularityAdaptive::describe() const
{
return fmt::format("Adaptive(marks_rows_partial_sums: [{}])", fmt::join(marks_rows_partial_sums, ", "));
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
namespace DB
{
/// Class that stores adaptive index granularity.
/// Inside it contains vector of partial sums of rows after mark:
/// |-----|---|----|----|
/// | 5 | 8 | 12 | 16 |
class MergeTreeIndexGranularityAdaptive final : public MergeTreeIndexGranularity
{
public:
MergeTreeIndexGranularityAdaptive() = default;
explicit MergeTreeIndexGranularityAdaptive(const std::vector<size_t> & marks_rows_partial_sums_);
std::optional<size_t> getConstantGranularity() const override { return {}; }
size_t getRowsCountInRange(size_t begin, size_t end) const override;
size_t countMarksForRows(size_t from_mark, size_t number_of_rows) const override;
size_t countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const override;
size_t getMarksCount() const override;
size_t getTotalRows() const override;
size_t getMarkRows(size_t mark_index) const override;
bool hasFinalMark() const override;
void appendMark(size_t rows_count) override;
void adjustLastMark(size_t rows_count) override;
uint64_t getBytesSize() const override;
uint64_t getBytesAllocated() const override;
std::shared_ptr<MergeTreeIndexGranularity> optimize() override;
std::string describe() const override;
private:
std::vector<size_t> marks_rows_partial_sums;
};
}

View File

@ -0,0 +1,143 @@
#include <Storages/MergeTree/MergeTreeIndexGranularityConstant.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
MergeTreeIndexGranularityConstant::MergeTreeIndexGranularityConstant(size_t constant_granularity_)
: constant_granularity(constant_granularity_)
, last_mark_granularity(constant_granularity_)
{
}
MergeTreeIndexGranularityConstant::MergeTreeIndexGranularityConstant(size_t constant_granularity_, size_t last_mark_granularity_, size_t num_marks_without_final_, bool has_final_mark_)
: constant_granularity(constant_granularity_)
, last_mark_granularity(last_mark_granularity_)
, num_marks_without_final(num_marks_without_final_)
, has_final_mark(has_final_mark_)
{
}
/// Rows after mark to next mark
size_t MergeTreeIndexGranularityConstant::getMarkRows(size_t mark_index) const
{
if (mark_index >= getMarksCount())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get non existing mark {}, while size is {}", mark_index, getMarksCount());
if (mark_index + 1 < num_marks_without_final)
return constant_granularity;
if (mark_index + 1 == num_marks_without_final)
return last_mark_granularity;
return 0; // Final mark.
}
size_t MergeTreeIndexGranularityConstant::getMarksCount() const
{
return num_marks_without_final + has_final_mark;
}
size_t MergeTreeIndexGranularityConstant::getTotalRows() const
{
if (num_marks_without_final == 0)
return 0;
return constant_granularity * (num_marks_without_final - 1) + last_mark_granularity;
}
void MergeTreeIndexGranularityConstant::appendMark(size_t rows_count)
{
if (has_final_mark)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot append mark after final");
}
else if (rows_count == 0)
{
has_final_mark = true;
}
else if (rows_count != constant_granularity)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot append mark with {} rows. Granularity is constant ({})", rows_count, constant_granularity);
}
else
{
++num_marks_without_final;
}
}
void MergeTreeIndexGranularityConstant::adjustLastMark(size_t rows_count)
{
if (has_final_mark)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot adjust final mark");
}
else
{
if (num_marks_without_final == 0)
++num_marks_without_final;
last_mark_granularity = rows_count;
}
}
size_t MergeTreeIndexGranularityConstant::getRowsCountInRange(size_t begin, size_t end) const
{
if (end > getMarksCount())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get marks in range [{}; {}), while size is {}", begin, end, getMarksCount());
if (end == 0)
return 0;
size_t total_rows = 0;
if (end >= num_marks_without_final)
{
total_rows += last_mark_granularity;
end = num_marks_without_final - 1;
}
total_rows += constant_granularity * (end - begin);
return total_rows;
}
size_t MergeTreeIndexGranularityConstant::getMarkUpperBoundForRow(size_t row_index) const
{
size_t num_rows_with_constant_granularity = (num_marks_without_final - 1) * constant_granularity;
if (row_index >= getTotalRows())
return getMarksCount();
if (row_index >= num_rows_with_constant_granularity)
return num_marks_without_final - 1;
return row_index / constant_granularity;
}
size_t MergeTreeIndexGranularityConstant::countMarksForRows(size_t from_mark, size_t number_of_rows) const
{
size_t rows_before_mark = getMarkStartingRow(from_mark);
size_t last_row_pos = rows_before_mark + number_of_rows;
return getMarkUpperBoundForRow(last_row_pos) - from_mark;
}
size_t MergeTreeIndexGranularityConstant::countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const
{
size_t rows_before_mark = getMarkStartingRow(from_mark);
size_t last_row_pos = rows_before_mark + offset_in_rows + number_of_rows;
return getRowsCountInRange(from_mark, std::max(1UL, getMarkUpperBoundForRow(last_row_pos))) - offset_in_rows;
}
std::string MergeTreeIndexGranularityConstant::describe() const
{
return fmt::format(
"Constant(constant_granularity: {}, last_mark_granularity: {}, num_marks_without_final: {}, has_final_mark: {})",
constant_granularity, last_mark_granularity, num_marks_without_final, has_final_mark);
}
}

View File

@ -0,0 +1,47 @@
#pragma once
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
namespace DB
{
/// Class that stores constant index granularity for whole part, except
/// last non-zero granule and final granule which always has zero rows.
class MergeTreeIndexGranularityConstant final : public MergeTreeIndexGranularity
{
private:
size_t constant_granularity;
size_t last_mark_granularity;
size_t num_marks_without_final = 0;
bool has_final_mark = false;
size_t getMarkUpperBoundForRow(size_t row_index) const;
public:
MergeTreeIndexGranularityConstant() = default;
explicit MergeTreeIndexGranularityConstant(size_t constant_granularity_);
MergeTreeIndexGranularityConstant(size_t constant_granularity_, size_t last_mark_granularity_, size_t num_marks_without_final_, bool has_final_mark_);
std::optional<size_t> getConstantGranularity() const override { return constant_granularity; }
size_t getRowsCountInRange(size_t begin, size_t end) const override;
size_t countMarksForRows(size_t from_mark, size_t number_of_rows) const override;
size_t countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const override;
size_t getMarksCount() const override;
size_t getTotalRows() const override;
size_t getMarkRows(size_t mark_index) const override;
bool hasFinalMark() const override { return has_final_mark; }
void appendMark(size_t rows_count) override;
void adjustLastMark(size_t rows_count) override;
uint64_t getBytesSize() const override { return sizeof(size_t) * 3 + sizeof(bool); }
uint64_t getBytesAllocated() const override { return getBytesSize(); }
std::shared_ptr<MergeTreeIndexGranularity> optimize() override { return nullptr; }
std::string describe() const override;
};
}

View File

@ -4,12 +4,12 @@
#include <base/types.h>
#include <Storages/MergeTree/MergeTreeDataPartType.h>
#include <Disks/IDisk.h>
#include <Storages/MergeTree/IDataPartStorage.h>
namespace DB
{
class MergeTreeData;
class IDataPartStorage;
/** Various types of mark files are stored in files with various extensions:

View File

@ -151,7 +151,7 @@ UInt64 MergeTreeReadTask::estimateNumRows() const
return rows_to_read;
const auto & index_granularity = info->data_part->index_granularity;
return index_granularity.countRowsForRows(range_readers.main.currentMark(), rows_to_read, range_readers.main.numReadRowsInCurrentGranule());
return index_granularity->countRowsForRows(range_readers.main.currentMark(), rows_to_read, range_readers.main.numReadRowsInCurrentGranule());
}
MergeTreeReadTask::BlockAndProgress MergeTreeReadTask::read()

View File

@ -230,7 +230,7 @@ try
if (!isCancelled() && current_row < data_part->rows_count)
{
size_t rows_to_read = data_part->index_granularity.getMarkRows(current_mark);
size_t rows_to_read = data_part->index_granularity->getMarkRows(current_mark);
bool continue_reading = (current_mark != 0);
const auto & sample = reader->getColumns();

View File

@ -187,6 +187,8 @@ namespace ErrorCodes
DECLARE(UInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).", 0) \
DECLARE(UInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).", 0) \
DECLARE(UInt64, min_index_granularity_bytes, 1024, "Minimum amount of bytes in single granule.", 1024) \
DECLARE(Bool, use_const_adaptive_granularity, false, "Always use constant granularity for whole part. It allows to compress in memory values of index granularity. It can be useful in extremely large workloads with thin tables.", 0) \
DECLARE(Bool, enable_index_granularity_compression, true, "Compress in memory values of index granularity if it is possible", 0) \
DECLARE(Int64, merge_with_ttl_timeout, 3600 * 4, "Minimal time in seconds, when merge with delete TTL can be repeated.", 0) \
DECLARE(Int64, merge_with_recompression_ttl_timeout, 3600 * 4, "Minimal time in seconds, when merge with recompression TTL can be repeated.", 0) \
DECLARE(Bool, ttl_only_drop_parts, false, "Only drop altogether the expired parts and not partially prune them.", 0) \

View File

@ -1,9 +1,9 @@
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <IO/HashingWriteBuffer.h>
#include <Interpreters/Context.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Parsers/queryToString.h>
#include <Common/logger_useful.h>
#include <Core/Settings.h>
@ -15,6 +15,10 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace MergeTreeSetting
{
extern const MergeTreeSettingsBool enable_index_granularity_compression;
}
MergedBlockOutputStream::MergedBlockOutputStream(
const MergeTreeMutableDataPartPtr & data_part,
@ -23,12 +27,12 @@ MergedBlockOutputStream::MergedBlockOutputStream(
const MergeTreeIndices & skip_indices,
const ColumnsStatistics & statistics,
CompressionCodecPtr default_codec_,
MergeTreeIndexGranularityPtr index_granularity_ptr,
TransactionID tid,
bool reset_columns_,
bool save_marks_in_cache,
bool blocks_are_granules_size,
const WriteSettings & write_settings_,
const MergeTreeIndexGranularity & computed_index_granularity)
const WriteSettings & write_settings_)
: IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, reset_columns_)
, columns_list(columns_list_)
, default_codec(default_codec_)
@ -53,11 +57,22 @@ MergedBlockOutputStream::MergedBlockOutputStream(
data_part->storeVersionMetadata();
writer = createMergeTreeDataPartWriter(data_part->getType(),
data_part->name, data_part->storage.getLogName(), data_part->getSerializations(),
data_part_storage, data_part->index_granularity_info,
data_part->name,
data_part->storage.getLogName(),
data_part->getSerializations(),
data_part_storage,
data_part->index_granularity_info,
storage_settings,
columns_list, data_part->getColumnPositions(), metadata_snapshot, data_part->storage.getVirtualsPtr(),
skip_indices, statistics, data_part->getMarksFileExtension(), default_codec, writer_settings, computed_index_granularity);
columns_list,
data_part->getColumnPositions(),
metadata_snapshot,
data_part->storage.getVirtualsPtr(),
skip_indices,
statistics,
data_part->getMarksFileExtension(),
default_codec,
writer_settings,
std::move(index_granularity_ptr));
}
/// If data is pre-sorted.
@ -207,10 +222,14 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk());
new_part->setBytesUncompressedOnDisk(checksums.getTotalSizeUncompressedOnDisk());
new_part->index_granularity = writer->getIndexGranularity();
/// Just in case
new_part->index_granularity.shrinkToFitInMemory();
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(writer->getColumnsSample());
if ((*new_part->storage.getSettings())[MergeTreeSetting::enable_index_granularity_compression])
{
if (auto new_index_granularity = new_part->index_granularity->optimize())
new_part->index_granularity = std::move(new_index_granularity);
}
/// In mutation, existing_rows_count is already calculated in PartMergerWriter
/// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count
if (!new_part->existing_rows_count.has_value())

View File

@ -22,12 +22,12 @@ public:
const MergeTreeIndices & skip_indices,
const ColumnsStatistics & statistics,
CompressionCodecPtr default_codec_,
MergeTreeIndexGranularityPtr index_granularity_ptr,
TransactionID tid,
bool reset_columns_ = false,
bool save_marks_in_cache = false,
bool blocks_are_granules_size = false,
const WriteSettings & write_settings = {},
const MergeTreeIndexGranularity & computed_index_granularity = {});
const WriteSettings & write_settings = {});
Block getHeader() const { return metadata_snapshot->getSampleBlock(); }

View File

@ -15,25 +15,25 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
const MergeTreeMutableDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
const NamesAndTypesList & columns_list_,
CompressionCodecPtr default_codec,
const MergeTreeIndices & indices_to_recalc,
const ColumnsStatistics & stats_to_recalc_,
WrittenOffsetColumns * offset_columns_,
bool save_marks_in_cache,
const MergeTreeIndexGranularity & index_granularity,
const MergeTreeIndexGranularityInfo * index_granularity_info)
const ColumnsStatistics & stats_to_recalc,
CompressionCodecPtr default_codec,
MergeTreeIndexGranularityPtr index_granularity_ptr,
WrittenOffsetColumns * offset_columns,
bool save_marks_in_cache)
: IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, /*reset_columns=*/ true)
{
const auto & global_settings = data_part->storage.getContext()->getSettingsRef();
/// Granularity is never recomputed while writing only columns.
MergeTreeWriterSettings writer_settings(
global_settings,
data_part->storage.getContext()->getWriteSettings(),
storage_settings,
index_granularity_info ? index_granularity_info->mark_type.adaptive : data_part->storage.canUseAdaptiveGranularity(),
/* rewrite_primary_key = */ false,
data_part->index_granularity_info.mark_type.adaptive,
/*rewrite_primary_key=*/ false,
save_marks_in_cache,
/* blocks_are_granules_size = */ false);
/*blocks_are_granules_size=*/ false);
writer = createMergeTreeDataPartWriter(
data_part->getType(),
@ -45,17 +45,17 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
metadata_snapshot_,
data_part->storage.getVirtualsPtr(),
indices_to_recalc,
stats_to_recalc_,
stats_to_recalc,
data_part->getMarksFileExtension(),
default_codec,
writer_settings,
index_granularity);
std::move(index_granularity_ptr));
auto * writer_on_disk = dynamic_cast<MergeTreeDataPartWriterOnDisk *>(writer.get());
if (!writer_on_disk)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MergedColumnOnlyOutputStream supports only parts stored on disk");
writer_on_disk->setWrittenOffsetColumns(offset_columns_);
writer_on_disk->setWrittenOffsetColumns(offset_columns);
}
void MergedColumnOnlyOutputStream::write(const Block & block)

View File

@ -18,13 +18,12 @@ public:
const MergeTreeMutableDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
const NamesAndTypesList & columns_list_,
CompressionCodecPtr default_codec_,
const MergeTreeIndices & indices_to_recalc_,
const ColumnsStatistics & stats_to_recalc_,
WrittenOffsetColumns * offset_columns_ = nullptr,
bool save_marks_in_cache = false,
const MergeTreeIndexGranularity & index_granularity = {},
const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr);
const MergeTreeIndices & indices_to_recalc,
const ColumnsStatistics & stats_to_recalc,
CompressionCodecPtr default_codec,
MergeTreeIndexGranularityPtr index_granularity_ptr,
WrittenOffsetColumns * offset_columns = nullptr,
bool save_marks_in_cache = false);
void write(const Block & block) override;

View File

@ -74,6 +74,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsFloat ratio_of_defaults_for_sparse_serialization;
extern const MergeTreeSettingsBool replace_long_file_name_to_hash;
extern const MergeTreeSettingsBool ttl_only_drop_parts;
extern const MergeTreeSettingsBool enable_index_granularity_compression;
}
namespace ErrorCodes
@ -984,12 +985,16 @@ void finalizeMutatedPart(
new_data_part->rows_count = source_part->rows_count;
new_data_part->index_granularity = source_part->index_granularity;
/// Just in case
new_data_part->index_granularity.shrinkToFitInMemory();
new_data_part->setIndex(*source_part->getIndex());
new_data_part->minmax_idx = source_part->minmax_idx;
new_data_part->modification_time = time(nullptr);
if ((*new_data_part->storage.getSettings())[MergeTreeSetting::enable_index_granularity_compression])
{
if (auto new_index_granularity = new_data_part->index_granularity->optimize())
new_data_part->index_granularity = std::move(new_index_granularity);
}
/// Load rest projections which are hardlinked
bool noop;
new_data_part->loadProjections(false, false, noop, true /* if_not_loaded */);
@ -1599,7 +1604,6 @@ private:
ctx->minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
MergeTreeIndexGranularity computed_granularity;
bool has_delete = false;
for (auto & command_for_interpreter : ctx->for_interpreter)
@ -1612,9 +1616,21 @@ private:
}
}
MergeTreeIndexGranularityPtr index_granularity_ptr;
/// Reuse source part granularity if mutation does not change number of rows
if (!has_delete && ctx->execute_ttl_type == ExecuteTTLType::NONE)
computed_granularity = ctx->source_part->index_granularity;
{
index_granularity_ptr = ctx->source_part->index_granularity;
}
else
{
index_granularity_ptr = createMergeTreeIndexGranularity(
ctx->new_data_part->rows_count,
ctx->new_data_part->getBytesUncompressedOnDisk(),
*ctx->data->getSettings(),
ctx->new_data_part->index_granularity_info,
/*blocks_are_granules=*/ false);
}
ctx->out = std::make_shared<MergedBlockOutputStream>(
ctx->new_data_part,
@ -1623,12 +1639,12 @@ private:
skip_indices,
stats_to_rewrite,
ctx->compression_codec,
std::move(index_granularity_ptr),
ctx->txn ? ctx->txn->tid : Tx::PrehistoricTID,
/*reset_columns=*/ true,
/*save_marks_in_cache=*/ false,
/*blocks_are_granules_size=*/ false,
ctx->context->getWriteSettings(),
computed_granularity);
ctx->context->getWriteSettings());
ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
ctx->mutating_pipeline.setProgressCallback(ctx->progress_callback);
@ -1850,14 +1866,10 @@ private:
ctx->new_data_part,
ctx->metadata_snapshot,
ctx->updated_header.getNamesAndTypesList(),
ctx->compression_codec,
std::vector<MergeTreeIndexPtr>(ctx->indices_to_recalc.begin(), ctx->indices_to_recalc.end()),
ColumnsStatistics(ctx->stats_to_recalc.begin(), ctx->stats_to_recalc.end()),
nullptr,
/*save_marks_in_cache=*/ false,
ctx->source_part->index_granularity,
&ctx->source_part->index_granularity_info
);
ctx->compression_codec,
ctx->source_part->index_granularity);
ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
ctx->mutating_pipeline.setProgressCallback(ctx->progress_callback);

View File

@ -99,7 +99,7 @@ size_t RangesInDataPart::getMarksCount() const
size_t RangesInDataPart::getRowsCount() const
{
return data_part->index_granularity.getRowsCountInRanges(ranges);
return data_part->index_granularity->getRowsCountInRanges(ranges);
}

View File

@ -63,7 +63,7 @@ protected:
marks_loader = createMarksLoader(part, MergeTreeDataPartCompact::DATA_FILE_NAME, part->getColumns().size());
size_t num_columns = header.columns();
size_t num_rows = index_granularity.getMarksCount();
size_t num_rows = index_granularity->getMarksCount();
const auto & part_name_column = StorageMergeTreeIndex::part_name_column;
const auto & mark_number_column = StorageMergeTreeIndex::mark_number_column;
@ -115,7 +115,7 @@ protected:
data.resize(num_rows);
for (size_t i = 0; i < num_rows; ++i)
data[i] = index_granularity.getMarkRows(i);
data[i] = index_granularity->getMarkRows(i);
result_columns[pos] = std::move(column);
}
@ -159,7 +159,7 @@ private:
{
size_t col_idx = 0;
bool has_marks_in_part = false;
size_t num_rows = part->index_granularity.getMarksCount();
size_t num_rows = part->index_granularity->getMarksCount();
if (isWidePart(part))
{

View File

@ -1,12 +1,15 @@
#include <gtest/gtest.h>
#include <Core/Block.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypesNumber.h>
// I know that inclusion of .cpp is not good at all
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp> // NOLINT
#include <Storages/MergeTree/MergeTreeDataPartWriterWide.cpp> // NOLINT
#include <Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityConstant.h>
using namespace DB;
static Block getBlockWithSize(size_t required_size_in_bytes, size_t size_of_row_in_bytes)
{
@ -25,16 +28,16 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests)
auto block1 = getBlockWithSize(80, 8);
EXPECT_EQ(block1.bytes(), 80);
{ /// Granularity bytes are not set. Take default index_granularity.
MergeTreeIndexGranularity index_granularity;
auto granularity = computeIndexGranularityImpl(block1, 0, 100, false, false);
MergeTreeIndexGranularityAdaptive index_granularity;
auto granularity = computeIndexGranularity(block1.rows(), block1.bytes(), 0, 100, false, false);
fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows());
EXPECT_EQ(index_granularity.getMarksCount(), 1);
EXPECT_EQ(index_granularity.getMarkRows(0), 100);
}
{ /// Granule size is less than block size. Block contains multiple granules.
MergeTreeIndexGranularity index_granularity;
auto granularity = computeIndexGranularityImpl(block1, 16, 100, false, true);
MergeTreeIndexGranularityAdaptive index_granularity;
auto granularity = computeIndexGranularity(block1.rows(), block1.bytes(), 16, 100, false, true);
fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows());
EXPECT_EQ(index_granularity.getMarksCount(), 5); /// First granule with 8 rows, and second with 1 row
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
@ -43,8 +46,8 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests)
{ /// Granule size is more than block size. Whole block (and maybe more) can be placed in single granule.
MergeTreeIndexGranularity index_granularity;
auto granularity = computeIndexGranularityImpl(block1, 512, 100, false, true);
MergeTreeIndexGranularityAdaptive index_granularity;
auto granularity = computeIndexGranularity(block1.rows(), block1.bytes(), 512, 100, false, true);
fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows());
EXPECT_EQ(index_granularity.getMarksCount(), 1);
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
@ -53,8 +56,8 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests)
{ /// Blocks with granule size
MergeTreeIndexGranularity index_granularity;
auto granularity = computeIndexGranularityImpl(block1, 1, 100, true, true);
MergeTreeIndexGranularityAdaptive index_granularity;
auto granularity = computeIndexGranularity(block1.rows(), block1.bytes(), 1, 100, true, true);
fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows());
EXPECT_EQ(index_granularity.getMarksCount(), 1);
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
@ -62,8 +65,8 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests)
}
{ /// Shift in index offset
MergeTreeIndexGranularity index_granularity;
auto granularity = computeIndexGranularityImpl(block1, 16, 100, false, true);
MergeTreeIndexGranularityAdaptive index_granularity;
auto granularity = computeIndexGranularity(block1.rows(), block1.bytes(), 16, 100, false, true);
fillIndexGranularityImpl(index_granularity, 6, granularity, block1.rows());
EXPECT_EQ(index_granularity.getMarksCount(), 2);
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
@ -78,10 +81,10 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks)
auto block1 = getBlockWithSize(65536, 8);
auto block2 = getBlockWithSize(65536, 8);
auto block3 = getBlockWithSize(65536, 8);
MergeTreeIndexGranularity index_granularity;
MergeTreeIndexGranularityAdaptive index_granularity;
for (const auto & block : {block1, block2, block3})
{
auto granularity = computeIndexGranularityImpl(block, 1024, 8192, false, true);
auto granularity = computeIndexGranularity(block.rows(), block.bytes(), 1024, 8192, false, true);
fillIndexGranularityImpl(index_granularity, 0, granularity, block.rows());
}
@ -94,10 +97,10 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks)
auto block2 = getBlockWithSize(32768, 32);
auto block3 = getBlockWithSize(2048, 32);
EXPECT_EQ(block1.rows() + block2.rows() + block3.rows(), 3136);
MergeTreeIndexGranularity index_granularity;
MergeTreeIndexGranularityAdaptive index_granularity;
for (const auto & block : {block1, block2, block3})
{
auto granularity = computeIndexGranularityImpl(block, 1024, 8192, false, true);
auto granularity = computeIndexGranularity(block.rows(), block.bytes(), 1024, 8192, false, true);
fillIndexGranularityImpl(index_granularity, 0, granularity, block.rows());
}
@ -113,11 +116,11 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks)
EXPECT_EQ(block1.rows() + block2.rows() + block3.rows(), (2048 + 4096 + 8192) / 32);
MergeTreeIndexGranularity index_granularity;
MergeTreeIndexGranularityAdaptive index_granularity;
size_t index_offset = 0;
for (const auto & block : {block1, block2, block3})
{
auto granularity = computeIndexGranularityImpl(block, 16384, 8192, false, true);
auto granularity = computeIndexGranularity(block.rows(), block.bytes(), 16384, 8192, false, true);
fillIndexGranularityImpl(index_granularity, index_offset, granularity, block.rows());
index_offset = index_granularity.getLastMarkRows() - block.rows();
}
@ -128,10 +131,10 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks)
}
TEST(AdaptiveIndexGranularity, TestIndexGranularityClass)
TEST(AdaptiveIndexGranularity, TestIndexGranularityAdaptive)
{
{
MergeTreeIndexGranularity index_granularity;
MergeTreeIndexGranularityAdaptive index_granularity;
size_t sum_rows = 0;
size_t sum_marks = 0;
for (size_t i = 10; i <= 100; i+=10)
@ -148,11 +151,70 @@ TEST(AdaptiveIndexGranularity, TestIndexGranularityClass)
EXPECT_EQ(index_granularity.getMarkStartingRow(2), 30);
EXPECT_EQ(index_granularity.getMarkStartingRow(3), 60);
EXPECT_EQ(index_granularity.getRowsCountInRange({0, 10}), sum_rows);
EXPECT_EQ(index_granularity.getRowsCountInRange({0, 1}), 10);
EXPECT_EQ(index_granularity.getRowsCountInRange({2, 5}), 30 + 40 + 50);
EXPECT_EQ(index_granularity.getRowsCountInRange(0, 10), sum_rows);
EXPECT_EQ(index_granularity.getRowsCountInRange(0, 1), 10);
EXPECT_EQ(index_granularity.getRowsCountInRange(2, 5), 30 + 40 + 50);
EXPECT_EQ(index_granularity.getRowsCountInRanges({{2, 5}, {0, 1}, {0, 10}}), 10 + 30 + 40 + 50 + sum_rows);
}
}
TEST(AdaptiveIndexGranularity, TestIndexGranularityConstant)
{
auto test = [](MergeTreeIndexGranularity & index_granularity, size_t granularity_rows)
{
size_t sum_marks = 10;
size_t sum_rows = granularity_rows * sum_marks;
for (size_t i = 0; i < 10; ++i)
index_granularity.appendMark(granularity_rows);
size_t new_granularity_rows = granularity_rows / 2;
index_granularity.adjustLastMark(new_granularity_rows);
sum_rows -= (granularity_rows - new_granularity_rows);
index_granularity.appendMark(0);
++sum_marks;
EXPECT_EQ(index_granularity.getMarksCount(), sum_marks);
EXPECT_EQ(index_granularity.getMarksCountWithoutFinal(), sum_marks - 1);
EXPECT_EQ(index_granularity.hasFinalMark(), true);
EXPECT_EQ(index_granularity.getTotalRows(), sum_rows);
EXPECT_EQ(index_granularity.getTotalRows(), sum_rows);
EXPECT_EQ(index_granularity.getLastMarkRows(), 0);
EXPECT_EQ(index_granularity.getLastNonFinalMarkRows(), granularity_rows / 2);
EXPECT_EQ(index_granularity.getMarkStartingRow(0), 0);
EXPECT_EQ(index_granularity.getMarkStartingRow(3), 30);
EXPECT_EQ(index_granularity.getMarkStartingRow(9), 90);
EXPECT_EQ(index_granularity.getMarkStartingRow(10), sum_rows);
EXPECT_EQ(index_granularity.getMarkStartingRow(11), sum_rows);
EXPECT_EQ(index_granularity.getRowsCountInRange(0, 10), sum_rows);
EXPECT_EQ(index_granularity.getRowsCountInRange(0, 11), sum_rows);
EXPECT_EQ(index_granularity.getRowsCountInRange(0, 1), 10);
EXPECT_EQ(index_granularity.getRowsCountInRange(2, 5), 30);
EXPECT_EQ(index_granularity.getRowsCountInRange(3, 9), 60);
EXPECT_EQ(index_granularity.getRowsCountInRange(5, 10), 45);
EXPECT_EQ(index_granularity.getRowsCountInRange(5, 11), 45);
EXPECT_EQ(index_granularity.countMarksForRows(0, 35), 3);
EXPECT_EQ(index_granularity.countMarksForRows(5, 29), 2);
EXPECT_EQ(index_granularity.countMarksForRows(0, 89), 8);
EXPECT_EQ(index_granularity.countMarksForRows(0, 90), 9);
EXPECT_EQ(index_granularity.countMarksForRows(0, 92), 9);
EXPECT_EQ(index_granularity.countMarksForRows(0, 95), sum_marks);
EXPECT_EQ(index_granularity.countMarksForRows(0, 99), sum_marks);
};
const size_t granularity_rows = 10;
{
MergeTreeIndexGranularityConstant index_granularity(granularity_rows);
test(index_granularity, granularity_rows);
}
{
MergeTreeIndexGranularityAdaptive index_granularity;
test(index_granularity, granularity_rows);
}
}

View File

@ -4,6 +4,7 @@
// I know that inclusion of .cpp is not good at all
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp> // NOLINT
#include <Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h>
using namespace DB;
@ -13,7 +14,7 @@ TEST(IndexGranularityCompactParts, FillGranularitySequenceOfBlocks)
size_t rows = 8;
size_t granularity = 32;
MergeTreeIndexGranularity index_granularity;
MergeTreeIndexGranularityAdaptive index_granularity;
size_t index_offset = 0;
size_t rows_written = 0;
for (size_t i = 0; i < 3; ++i)
@ -34,7 +35,7 @@ TEST(IndexGranularityCompactParts, FillGranularitySequenceOfBlocks)
size_t rows2 = 8;
size_t granularity = 32;
MergeTreeIndexGranularity index_granularity;
MergeTreeIndexGranularityAdaptive index_granularity;
size_t index_offset = 0;
fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows1);
@ -51,7 +52,7 @@ TEST(IndexGranularityCompactParts, FillGranularitySequenceOfBlocks)
size_t rows2 = 25;
size_t granularity = 32;
MergeTreeIndexGranularity index_granularity;
MergeTreeIndexGranularityAdaptive index_granularity;
size_t index_offset = 0;
fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows1);
@ -68,7 +69,7 @@ TEST(IndexGranularityCompactParts, FillGranularitySequenceOfBlocks)
size_t rows = 40;
size_t granularity = 32;
MergeTreeIndexGranularity index_granularity;
MergeTreeIndexGranularityAdaptive index_granularity;
size_t index_offset = 0;
for (size_t i = 0; i < 3; ++i)

View File

@ -981,6 +981,8 @@ class MergeTreeSettingsRandomizer:
"cache_populated_by_fetch": lambda: random.randint(0, 1),
"concurrent_part_removal_threshold": threshold_generator(0.2, 0.3, 0, 100),
"old_parts_lifetime": threshold_generator(0.2, 0.3, 10, 8 * 60),
"use_const_adaptive_granularity": lambda: random.randint(0, 1),
"enable_index_granularity_compression": lambda: random.randint(0, 1),
}
@staticmethod

View File

@ -0,0 +1,54 @@
adaptive non-const, before merge
all_1_1_0 0 10 0
all_1_1_0 1 5 10
all_1_1_0 2 0 14
all_2_2_0 0 2 15
all_2_2_0 1 2 17
all_2_2_0 2 2 19
all_2_2_0 3 2 21
all_2_2_0 4 2 23
all_2_2_0 5 2 25
all_2_2_0 6 2 27
all_2_2_0 7 1 29
all_2_2_0 8 0 29
all_1_1_0 25
all_2_2_0 25
adaptive non-const, after merge
all_1_2_1 0 10 0
all_1_2_1 1 5 10
all_1_2_1 2 2 15
all_1_2_1 3 2 17
all_1_2_1 4 2 19
all_1_2_1 5 2 21
all_1_2_1 6 2 23
all_1_2_1 7 2 25
all_1_2_1 8 2 27
all_1_2_1 9 1 29
all_1_2_1 10 0 29
all_1_2_1 88
adaptive const, before merge
all_1_1_0 0 10 0
all_1_1_0 1 5 10
all_1_1_0 2 0 14
all_2_2_0 0 2 15
all_2_2_0 1 2 17
all_2_2_0 2 2 19
all_2_2_0 3 2 21
all_2_2_0 4 2 23
all_2_2_0 5 2 25
all_2_2_0 6 2 27
all_2_2_0 7 1 29
all_2_2_0 8 0 29
all_1_1_0 25
all_2_2_0 25
adaptive const, after merge
all_1_2_1 0 4 0
all_1_2_1 1 4 4
all_1_2_1 2 4 8
all_1_2_1 3 4 12
all_1_2_1 4 4 16
all_1_2_1 5 4 20
all_1_2_1 6 4 24
all_1_2_1 7 2 28
all_1_2_1 8 0 29
all_1_2_1 25

View File

@ -0,0 +1,53 @@
DROP TABLE IF EXISTS t_index_granularity;
CREATE TABLE t_index_granularity (id UInt64, s String)
ENGINE = MergeTree ORDER BY id
SETTINGS min_bytes_for_wide_part = 0,
index_granularity = 10,
index_granularity_bytes = 4096,
merge_max_block_size = 10,
merge_max_block_size_bytes = 4096,
enable_index_granularity_compression = 1,
use_const_adaptive_granularity = 0,
enable_vertical_merge_algorithm = 0;
INSERT INTO t_index_granularity SELECT number, 'a' FROM numbers(15);
INSERT INTO t_index_granularity SELECT number, repeat('a', 2048) FROM numbers(15, 15);
SELECT 'adaptive non-const, before merge';
SELECT * FROM mergeTreeIndex(currentDatabase(), t_index_granularity) ORDER BY ALL;
SELECT name, index_granularity_bytes_in_memory FROM system.parts WHERE database = currentDatabase() AND table = 't_index_granularity' AND active;
OPTIMIZE TABLE t_index_granularity FINAL;
SELECT 'adaptive non-const, after merge';
SELECT * FROM mergeTreeIndex(currentDatabase(), t_index_granularity) ORDER BY ALL;
SELECT name, index_granularity_bytes_in_memory FROM system.parts WHERE database = currentDatabase() AND table = 't_index_granularity' AND active;
DROP TABLE t_index_granularity;
CREATE TABLE t_index_granularity (id UInt64, s String)
ENGINE = MergeTree ORDER BY id
SETTINGS min_bytes_for_wide_part = 0,
index_granularity = 10,
index_granularity_bytes = 4096,
merge_max_block_size = 10,
merge_max_block_size_bytes = 4096,
enable_index_granularity_compression = 1,
use_const_adaptive_granularity = 1,
enable_vertical_merge_algorithm = 0;
INSERT INTO t_index_granularity SELECT number, 'a' FROM numbers(15);
INSERT INTO t_index_granularity SELECT number, repeat('a', 2048) FROM numbers(15, 15);
SELECT 'adaptive const, before merge';
SELECT * FROM mergeTreeIndex(currentDatabase(), t_index_granularity) ORDER BY ALL;
SELECT name, index_granularity_bytes_in_memory FROM system.parts WHERE database = currentDatabase() AND table = 't_index_granularity' AND active;
OPTIMIZE TABLE t_index_granularity FINAL;
SELECT 'adaptive const, after merge';
SELECT * FROM mergeTreeIndex(currentDatabase(), t_index_granularity) ORDER BY ALL;
SELECT name, index_granularity_bytes_in_memory FROM system.parts WHERE database = currentDatabase() AND table = 't_index_granularity' AND active;
DROP TABLE t_index_granularity;

View File

@ -8,8 +8,14 @@ CREATE TABLE t (
ENGINE MergeTree()
ORDER by key SETTINGS index_granularity = 10, index_granularity_bytes = '1024K';
ALTER TABLE t MODIFY SETTING enable_index_granularity_compression = 0;
INSERT INTO t SELECT number, toString(number) FROM numbers(100);
SELECT index_granularity_bytes_in_memory, index_granularity_bytes_in_memory_allocated FROM system.parts where table = 't' and database = currentDatabase();
ALTER TABLE t MODIFY SETTING enable_index_granularity_compression = 1;
INSERT INTO t SELECT number, toString(number) FROM numbers(100);
SELECT index_granularity_bytes_in_memory, index_granularity_bytes_in_memory_allocated FROM system.parts where table = 't' and database = currentDatabase() ORDER BY name;
DROP TABLE IF EXISTS t;