diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp index dee16bfcb1a..de8c42e0a1c 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp @@ -647,7 +647,7 @@ std::optional optimizeUseAggregateProjections(QueryPlan::Node & node, Qu range.begin = exact_ranges[i].end; ordinary_reading_marks -= exact_ranges[i].end - exact_ranges[i].begin; - exact_count += part_with_ranges.data_part->index_granularity.getRowsCountInRange(exact_ranges[i]); + exact_count += part_with_ranges.data_part->index_granularity->getRowsCountInRange(exact_ranges[i]); ++i; } diff --git a/src/Processors/QueryPlan/PartsSplitter.cpp b/src/Processors/QueryPlan/PartsSplitter.cpp index 57fd41e2a32..2b860f45219 100644 --- a/src/Processors/QueryPlan/PartsSplitter.cpp +++ b/src/Processors/QueryPlan/PartsSplitter.cpp @@ -201,7 +201,7 @@ public: size_t getMarkRows(size_t part_idx, size_t mark) const { - return parts[part_idx].data_part->index_granularity.getMarkRows(mark); + return parts[part_idx].data_part->index_granularity->getMarkRows(mark); } private: const RangesInDataParts & parts; @@ -444,7 +444,7 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts, parts_ranges.push_back( {index_access.getValue(part_index, range.begin), range, part_index, PartsRangesIterator::EventType::RangeStart}); - const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount(); + const bool value_is_defined_at_end_mark = range.end < index_granularity->getMarksCount(); if (!value_is_defined_at_end_mark) continue; @@ -667,7 +667,7 @@ std::pair, std::vector> splitIntersecting PartRangeIndex parts_range_start_index(parts_range_start); parts_ranges_queue.push({std::move(parts_range_start), std::move(parts_range_start_index)}); - const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount(); + const bool value_is_defined_at_end_mark = range.end < index_granularity->getMarksCount(); if (!value_is_defined_at_end_mark) continue; diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 626e43898e4..6899dc7f5d6 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -667,7 +667,7 @@ Pipe ReadFromMergeTree::readInOrder( part_with_ranges.ranges.size(), read_type == ReadType::InReverseOrder ? " reverse " : " ", part_with_ranges.data_part->name, total_rows, - part_with_ranges.data_part->index_granularity.getMarkStartingRow(part_with_ranges.ranges.front().begin)); + part_with_ranges.data_part->index_granularity->getMarkStartingRow(part_with_ranges.ranges.front().begin)); MergeTreeSelectAlgorithmPtr algorithm; if (read_type == ReadType::InReverseOrder) @@ -1759,7 +1759,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead( return std::make_shared(std::move(result)); for (const auto & part : parts) - total_marks_pk += part->index_granularity.getMarksCountWithoutFinal(); + total_marks_pk += part->index_granularity->getMarksCountWithoutFinal(); parts_before_pk = parts.size(); auto reader_settings = getMergeTreeReaderSettings(context_, query_info_); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 89bfc152e50..24ffe20d039 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -626,11 +627,12 @@ UInt64 IMergeTreeDataPart::getIndexSizeInAllocatedBytes() const UInt64 IMergeTreeDataPart::getIndexGranularityBytes() const { - return index_granularity.getBytesSize(); + return index_granularity->getBytesSize(); } + UInt64 IMergeTreeDataPart::getIndexGranularityAllocatedBytes() const { - return index_granularity.getBytesAllocated(); + return index_granularity->getBytesAllocated(); } void IMergeTreeDataPart::assertState(const std::initializer_list & affordable_states) const @@ -661,7 +663,7 @@ void IMergeTreeDataPart::assertOnDisk() const UInt64 IMergeTreeDataPart::getMarksCount() const { - return index_granularity.getMarksCount(); + return index_granularity->getMarksCount(); } UInt64 IMergeTreeDataPart::getExistingBytesOnDisk() const @@ -746,7 +748,6 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks loadChecksums(require_columns_checksums); loadIndexGranularity(); - index_granularity.shrinkToFitInMemory(); if (!(*storage.getSettings())[MergeTreeSetting::primary_key_lazy_load]) getIndex(); @@ -942,13 +943,13 @@ void IMergeTreeDataPart::loadIndex() const for (size_t i = 0; i < key_size; ++i) { loaded_index[i] = primary_key.data_types[i]->createColumn(); - loaded_index[i]->reserve(index_granularity.getMarksCount()); + loaded_index[i]->reserve(index_granularity->getMarksCount()); } String index_name = "primary" + getIndexExtensionFromFilesystem(getDataPartStorage()); String index_path = fs::path(getDataPartStorage().getRelativePath()) / index_name; auto index_file = metadata_manager->read(index_name); - size_t marks_count = index_granularity.getMarksCount(); + size_t marks_count = index_granularity->getMarksCount(); Serializations key_serializations(key_size); for (size_t j = 0; j < key_size; ++j) @@ -1363,7 +1364,7 @@ void IMergeTreeDataPart::loadRowsCount() assertEOF(*buf); }; - if (index_granularity.empty()) + if (index_granularity->empty()) { rows_count = 0; } @@ -1398,9 +1399,9 @@ void IMergeTreeDataPart::loadRowsCount() backQuote(column.name), rows_in_column, name, rows_count); } - size_t last_possibly_incomplete_mark_rows = index_granularity.getLastNonFinalMarkRows(); + size_t last_possibly_incomplete_mark_rows = index_granularity->getLastNonFinalMarkRows(); /// All this rows have to be written in column - size_t index_granularity_without_last_mark = index_granularity.getTotalRows() - last_possibly_incomplete_mark_rows; + size_t index_granularity_without_last_mark = index_granularity->getTotalRows() - last_possibly_incomplete_mark_rows; /// We have more rows in column than in index granularity without last possibly incomplete mark if (rows_in_column < index_granularity_without_last_mark) { @@ -1410,7 +1411,7 @@ void IMergeTreeDataPart::loadRowsCount() "and size of single value, " "but index granularity in part {} without last mark has {} rows, which " "is more than in column", - backQuote(column.name), rows_in_column, name, index_granularity.getTotalRows()); + backQuote(column.name), rows_in_column, name, index_granularity->getTotalRows()); } /// In last mark we actually written less or equal rows than stored in last mark of index granularity @@ -1458,8 +1459,8 @@ void IMergeTreeDataPart::loadRowsCount() column.name, column_size, sizeof_field); } - size_t last_mark_index_granularity = index_granularity.getLastNonFinalMarkRows(); - size_t rows_approx = index_granularity.getTotalRows(); + size_t last_mark_index_granularity = index_granularity->getLastNonFinalMarkRows(); + size_t rows_approx = index_granularity->getTotalRows(); if (!(rows_count <= rows_approx && rows_approx < rows_count + last_mark_index_granularity)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected size of column {}: " "{} rows, expected {}+-{} rows according to the index", @@ -1522,7 +1523,7 @@ UInt64 IMergeTreeDataPart::readExistingRowsCount() while (current_row < rows_count) { - size_t rows_to_read = index_granularity.getMarkRows(current_mark); + size_t rows_to_read = index_granularity->getMarkRows(current_mark); continue_reading = (current_mark != 0); Columns result; @@ -1970,6 +1971,9 @@ void IMergeTreeDataPart::initializeIndexGranularityInfo() index_granularity_info = MergeTreeIndexGranularityInfo(storage, *mrk_type); else index_granularity_info = MergeTreeIndexGranularityInfo(storage, part_type); + + /// It may be converted to constant index granularity after loading it. + index_granularity = std::make_unique(); } void IMergeTreeDataPart::remove() @@ -2243,9 +2247,9 @@ void IMergeTreeDataPart::checkConsistency(bool require_part_metadata) const "part_state: [{}]", columns.toString(), index_granularity_info.getMarkSizeInBytes(columns.size()), - index_granularity.getMarksCount(), + index_granularity->getMarksCount(), index_granularity_info.describe(), - index_granularity.describe(), + index_granularity->describe(), part_state); e.addMessage(debug_info); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 24625edf154..77af1e90c43 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -321,7 +321,7 @@ public: /// Amount of rows between marks /// As index always loaded into memory - MergeTreeIndexGranularity index_granularity; + MergeTreeIndexGranularityPtr index_granularity; /// Index that for each part stores min and max values of a set of columns. This allows quickly excluding /// parts based on conditions on these columns imposed by a query. diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp b/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp index dbfdbbdea88..e0070dc2349 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp @@ -1,5 +1,6 @@ #include #include +#include #include namespace DB @@ -11,7 +12,6 @@ namespace ErrorCodes extern const int NO_SUCH_COLUMN_IN_TABLE; } - Block getIndexBlockAndPermute(const Block & block, const Names & names, const IColumn::Permutation * permutation) { Block result; @@ -57,7 +57,7 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter( const StorageMetadataPtr & metadata_snapshot_, const VirtualsDescriptionPtr & virtual_columns_, const MergeTreeWriterSettings & settings_, - const MergeTreeIndexGranularity & index_granularity_) + MergeTreeIndexGranularityPtr index_granularity_) : data_part_name(data_part_name_) , serializations(serializations_) , index_granularity_info(index_granularity_info_) @@ -68,7 +68,7 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter( , settings(settings_) , with_final_mark(settings.can_use_adaptive_granularity) , data_part_storage(data_part_storage_) - , index_granularity(index_granularity_) + , index_granularity(std::move(index_granularity_)) { } @@ -145,7 +145,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter( const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, - const MergeTreeIndexGranularity & computed_index_granularity); + MergeTreeIndexGranularityPtr computed_index_granularity); MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter( const String & data_part_name_, @@ -162,8 +162,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter( const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, - const MergeTreeIndexGranularity & computed_index_granularity); - + MergeTreeIndexGranularityPtr computed_index_granularity); MergeTreeDataPartWriterPtr createMergeTreeDataPartWriter( MergeTreeDataPartType part_type, @@ -182,12 +181,26 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWriter( const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, - const MergeTreeIndexGranularity & computed_index_granularity) + MergeTreeIndexGranularityPtr computed_index_granularity) { if (part_type == MergeTreeDataPartType::Compact) - return createMergeTreeDataPartCompactWriter(data_part_name_, logger_name_, serializations_, data_part_storage_, - index_granularity_info_, storage_settings_, columns_list, column_positions, metadata_snapshot, virtual_columns, indices_to_recalc, stats_to_recalc_, - marks_file_extension_, default_codec_, writer_settings, computed_index_granularity); + return createMergeTreeDataPartCompactWriter( + data_part_name_, + logger_name_, + serializations_, + data_part_storage_, + index_granularity_info_, + storage_settings_, + columns_list, + column_positions, + metadata_snapshot, + virtual_columns, + indices_to_recalc, + stats_to_recalc_, + marks_file_extension_, + default_codec_, + writer_settings, + std::move(computed_index_granularity)); if (part_type == MergeTreeDataPartType::Wide) return createMergeTreeDataPartWideWriter( data_part_name_, @@ -204,7 +217,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWriter( marks_file_extension_, default_codec_, writer_settings, - computed_index_granularity); + std::move(computed_index_granularity)); throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown part type: {}", part_type.toString()); } diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h index 8923f6a59ca..545ced98e6e 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h @@ -36,7 +36,7 @@ public: const StorageMetadataPtr & metadata_snapshot_, const VirtualsDescriptionPtr & virtual_columns_, const MergeTreeWriterSettings & settings_, - const MergeTreeIndexGranularity & index_granularity_ = {}); + MergeTreeIndexGranularityPtr index_granularity_); virtual ~IMergeTreeDataPartWriter(); @@ -52,7 +52,7 @@ public: PlainMarksByName releaseCachedMarks(); - const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; } + MergeTreeIndexGranularityPtr getIndexGranularity() const { return index_granularity; } virtual Block getColumnsSample() const = 0; @@ -76,7 +76,7 @@ protected: MutableDataPartStoragePtr data_part_storage; MutableColumns index_columns; - MergeTreeIndexGranularity index_granularity; + MergeTreeIndexGranularityPtr index_granularity; /// Marks that will be saved to cache on finish. PlainMarksByName cached_marks; }; @@ -101,6 +101,6 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWriter( const String & marks_file_extension, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, - const MergeTreeIndexGranularity & computed_index_granularity); + MergeTreeIndexGranularityPtr computed_index_granularity); } diff --git a/src/Storages/MergeTree/IMergedBlockOutputStream.h b/src/Storages/MergeTree/IMergedBlockOutputStream.h index 7dd6d720170..cdaf599b5ea 100644 --- a/src/Storages/MergeTree/IMergedBlockOutputStream.h +++ b/src/Storages/MergeTree/IMergedBlockOutputStream.h @@ -29,7 +29,7 @@ public: virtual void write(const Block & block) = 0; - const MergeTreeIndexGranularity & getIndexGranularity() const + MergeTreeIndexGranularityPtr getIndexGranularity() const { return writer->getIndexGranularity(); } diff --git a/src/Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h b/src/Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h index aff1cf0edb0..050b80b5979 100644 --- a/src/Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h +++ b/src/Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h @@ -51,7 +51,7 @@ public: const MergeTreeIndexGranularityInfo & getIndexGranularityInfo() const override { return data_part->index_granularity_info; } - const MergeTreeIndexGranularity & getIndexGranularity() const override { return data_part->index_granularity; } + const MergeTreeIndexGranularity & getIndexGranularity() const override { return *data_part->index_granularity; } const SerializationInfoByName & getSerializationInfos() const override { return data_part->getSerializationInfos(); } diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index b80d7fccc91..e88472b9dda 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -52,7 +52,7 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta total_size_bytes_compressed += source_part->getBytesOnDisk(); total_size_bytes_uncompressed += source_part->getTotalColumnsSize().data_uncompressed; total_size_marks += source_part->getMarksCount(); - total_rows_count += source_part->index_granularity.getTotalRows(); + total_rows_count += source_part->index_granularity->getTotalRows(); } if (!future_part->parts.empty()) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index ba218dc1b7d..9a594eef56b 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -72,6 +73,7 @@ namespace CurrentMetrics namespace DB { + namespace Setting { extern const SettingsBool compile_sort_description; @@ -99,6 +101,7 @@ namespace MergeTreeSetting extern const MergeTreeSettingsUInt64 vertical_merge_algorithm_min_rows_to_activate; extern const MergeTreeSettingsBool vertical_merge_remote_filesystem_prefetch; extern const MergeTreeSettingsBool prewarm_mark_cache; + extern const MergeTreeSettingsBool use_const_adaptive_granularity; } namespace ErrorCodes @@ -412,10 +415,11 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const }; auto mutations_snapshot = global_ctx->data->getMutationsSnapshot(params); + auto storage_settings = global_ctx->data->getSettings(); SerializationInfo::Settings info_settings = { - .ratio_of_defaults_for_sparse = (*global_ctx->data->getSettings())[MergeTreeSetting::ratio_of_defaults_for_sparse_serialization], + .ratio_of_defaults_for_sparse = (*storage_settings)[MergeTreeSetting::ratio_of_defaults_for_sparse_serialization], .choose_kind = true, }; @@ -464,6 +468,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const ctx->sum_input_rows_upper_bound = global_ctx->merge_list_element_ptr->total_rows_count; ctx->sum_compressed_bytes_upper_bound = global_ctx->merge_list_element_ptr->total_size_bytes_compressed; + ctx->sum_uncompressed_bytes_upper_bound = global_ctx->merge_list_element_ptr->total_size_bytes_uncompressed; global_ctx->chosen_merge_algorithm = chooseMergeAlgorithm(); global_ctx->merge_list_element_ptr->merge_algorithm.store(global_ctx->chosen_merge_algorithm, std::memory_order_relaxed); @@ -507,8 +512,14 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const throw Exception(ErrorCodes::LOGICAL_ERROR, "Merge algorithm must be chosen"); } - /// If merge is vertical we cannot calculate it - ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical); + bool use_adaptive_granularity = global_ctx->new_data_part->index_granularity_info.mark_type.adaptive; + bool use_const_adaptive_granularity = (*storage_settings)[MergeTreeSetting::use_const_adaptive_granularity]; + + /// If merge is vertical we cannot calculate it. + /// If granularity is constant we don't need to calculate it. + ctx->blocks_are_granules_size = use_adaptive_granularity + && !use_const_adaptive_granularity + && global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical; /// Merged stream will be created and available as merged_stream variable createMergedStream(); @@ -550,7 +561,14 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const } } - bool save_marks_in_cache = (*global_ctx->data->getSettings())[MergeTreeSetting::prewarm_mark_cache] && global_ctx->context->getMarkCache(); + auto index_granularity_ptr = createMergeTreeIndexGranularity( + ctx->sum_input_rows_upper_bound, + ctx->sum_uncompressed_bytes_upper_bound, + *storage_settings, + global_ctx->new_data_part->index_granularity_info, + ctx->blocks_are_granules_size); + + bool save_marks_in_cache = (*storage_settings)[MergeTreeSetting::prewarm_mark_cache] && global_ctx->context->getMarkCache(); global_ctx->to = std::make_shared( global_ctx->new_data_part, @@ -559,6 +577,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const MergeTreeIndexFactory::instance().getMany(global_ctx->merging_skip_indexes), getStatisticsForColumns(global_ctx->merging_columns, global_ctx->metadata_snapshot), ctx->compression_codec, + std::move(index_granularity_ptr), global_ctx->txn ? global_ctx->txn->tid : Tx::PrehistoricTID, /*reset_columns=*/ true, save_marks_in_cache, @@ -1100,12 +1119,12 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const global_ctx->new_data_part, global_ctx->metadata_snapshot, columns_list, - ctx->compression_codec, column_pipepline.indexes_to_recalc, getStatisticsForColumns(columns_list, global_ctx->metadata_snapshot), + ctx->compression_codec, + global_ctx->to->getIndexGranularity(), &global_ctx->written_offset_columns, - save_marks_in_cache, - global_ctx->to->getIndexGranularity()); + save_marks_in_cache); ctx->column_elems_written = 0; } diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 53792165987..cff4e5e763c 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -243,7 +243,6 @@ private: bool need_remove_expired_values{false}; bool force_ttl{false}; CompressionCodecPtr compression_codec{nullptr}; - size_t sum_input_rows_upper_bound{0}; std::shared_ptr rows_sources_temporary_file; std::optional column_sizes{}; @@ -261,7 +260,9 @@ private: std::function is_cancelled{}; /// Local variables for this stage + size_t sum_input_rows_upper_bound{0}; size_t sum_compressed_bytes_upper_bound{0}; + size_t sum_uncompressed_bytes_upper_bound{0}; bool blocks_are_granules_size{false}; LoggerPtr log{getLogger("MergeTask::PrepareStage")}; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index b2f35d0a309..d745b428061 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -83,6 +83,7 @@ #include #include #include +#include #include #include @@ -7237,7 +7238,7 @@ Block MergeTreeData::getMinMaxCountProjectionBlock( /// It's extremely rare that some parts have final marks while others don't. To make it /// straightforward, disable minmax_count projection when `max(pk)' encounters any part with /// no final mark. - if (need_primary_key_max_column && !part->index_granularity.hasFinalMark()) + if (need_primary_key_max_column && !part->index_granularity->hasFinalMark()) return {}; real_parts.push_back(part); @@ -8960,10 +8961,15 @@ std::pair MergeTreeData::createE auto compression_codec = getContext()->chooseCompressionCodec(0, 0); const auto & index_factory = MergeTreeIndexFactory::instance(); - MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, + MergedBlockOutputStream out( + new_data_part, + metadata_snapshot, + columns, index_factory.getMany(metadata_snapshot->getSecondaryIndices()), ColumnsStatistics{}, - compression_codec, txn ? txn->tid : Tx::PrehistoricTID); + compression_codec, + std::make_shared(), + txn ? txn->tid : Tx::PrehistoricTID); bool sync_on_insert = (*settings)[MergeTreeSetting::fsync_after_insert]; diff --git a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp index 8856f467b90..401184eeb36 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB @@ -15,6 +16,11 @@ namespace ErrorCodes extern const int BAD_SIZE_OF_FILE_IN_DATA_PART; } +namespace MergeTreeSetting +{ + extern MergeTreeSettingsBool enable_index_granularity_compression; +} + MergeTreeDataPartCompact::MergeTreeDataPartCompact( const MergeTreeData & storage_, const String & name_, @@ -62,7 +68,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter( const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, - const MergeTreeIndexGranularity & computed_index_granularity) + MergeTreeIndexGranularityPtr computed_index_granularity) { NamesAndTypesList ordered_columns_list; std::copy_if(columns_list.begin(), columns_list.end(), std::back_inserter(ordered_columns_list), @@ -76,7 +82,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter( data_part_name_, logger_name_, serializations_, data_part_storage_, index_granularity_info_, storage_settings_, ordered_columns_list, metadata_snapshot, virtual_columns, indices_to_recalc, stats_to_recalc_, marks_file_extension_, - default_codec_, writer_settings, computed_index_granularity); + default_codec_, writer_settings, std::move(computed_index_granularity)); } @@ -95,8 +101,11 @@ void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*eac } void MergeTreeDataPartCompact::loadIndexGranularityImpl( - MergeTreeIndexGranularity & index_granularity_, const MergeTreeIndexGranularityInfo & index_granularity_info_, - size_t columns_count, const IDataPartStorage & data_part_storage_) + MergeTreeIndexGranularityPtr & index_granularity_ptr, + const MergeTreeIndexGranularityInfo & index_granularity_info_, + size_t columns_count, + const IDataPartStorage & data_part_storage_, + const MergeTreeSettings & storage_settings) { if (!index_granularity_info_.mark_type.adaptive) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MergeTreeDataPartCompact cannot be created with non-adaptive granularity."); @@ -122,10 +131,14 @@ void MergeTreeDataPartCompact::loadIndexGranularityImpl( marks_reader->ignore(columns_count * sizeof(MarkInCompressedFile)); size_t granularity; readBinaryLittleEndian(granularity, *marks_reader); - index_granularity_.appendMark(granularity); + index_granularity_ptr->appendMark(granularity); } - index_granularity_.setInitialized(); + if (storage_settings[MergeTreeSetting::enable_index_granularity_compression]) + { + if (auto new_granularity_ptr = index_granularity_ptr->optimize()) + index_granularity_ptr = std::move(new_granularity_ptr); + } } void MergeTreeDataPartCompact::loadIndexGranularity() @@ -133,7 +146,7 @@ void MergeTreeDataPartCompact::loadIndexGranularity() if (columns.empty()) throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART, "No columns in part {}", name); - loadIndexGranularityImpl(index_granularity, index_granularity_info, columns.size(), getDataPartStorage()); + loadIndexGranularityImpl(index_granularity, index_granularity_info, columns.size(), getDataPartStorage(), *storage.getSettings()); } void MergeTreeDataPartCompact::loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const @@ -152,7 +165,7 @@ void MergeTreeDataPartCompact::loadMarksToCache(const Names & column_names, Mark info_for_read, mark_cache, index_granularity_info.getMarksFilePath(DATA_FILE_NAME), - index_granularity.getMarksCount(), + index_granularity->getMarksCount(), index_granularity_info, /*save_marks_in_cache=*/ true, read_settings, @@ -227,7 +240,7 @@ void MergeTreeDataPartCompact::doCheckConsistency(bool require_part_metadata) co getDataPartStorage().getRelativePath(), std::string(fs::path(getDataPartStorage().getFullPath()) / mrk_file_name)); - UInt64 expected_file_size = index_granularity_info.getMarkSizeInBytes(columns.size()) * index_granularity.getMarksCount(); + UInt64 expected_file_size = index_granularity_info.getMarkSizeInBytes(columns.size()) * index_granularity->getMarksCount(); if (expected_file_size != file_size) throw Exception( ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART, diff --git a/src/Storages/MergeTree/MergeTreeDataPartCompact.h b/src/Storages/MergeTree/MergeTreeDataPartCompact.h index c394de0d7c1..d9128fc5e0c 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartCompact.h @@ -60,8 +60,11 @@ public: protected: static void loadIndexGranularityImpl( - MergeTreeIndexGranularity & index_granularity_, const MergeTreeIndexGranularityInfo & index_granularity_info_, - size_t columns_count, const IDataPartStorage & data_part_storage_); + MergeTreeIndexGranularityPtr & index_granularity_, + const MergeTreeIndexGranularityInfo & index_granularity_info_, + size_t columns_count, + const IDataPartStorage & data_part_storage_, + const MergeTreeSettings & storage_settings); void doCheckConsistency(bool require_part_metadata) const override; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index 39f96ba06ad..2448f52c679 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include #include @@ -17,6 +19,11 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +namespace MergeTreeSetting +{ + extern MergeTreeSettingsBool enable_index_granularity_compression; +} + MergeTreeDataPartWide::MergeTreeDataPartWide( const MergeTreeData & storage_, const String & name_, @@ -68,14 +75,14 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter( const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, - const MergeTreeIndexGranularity & computed_index_granularity) + MergeTreeIndexGranularityPtr computed_index_granularity) { return std::make_unique( data_part_name_, logger_name_, serializations_, data_part_storage_, index_granularity_info_, storage_settings_, columns_list, metadata_snapshot, virtual_columns, indices_to_recalc, stats_to_recalc_, marks_file_extension_, - default_codec_, writer_settings, computed_index_granularity); + default_codec_, writer_settings, std::move(computed_index_granularity)); } @@ -114,8 +121,11 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl( } void MergeTreeDataPartWide::loadIndexGranularityImpl( - MergeTreeIndexGranularity & index_granularity_, MergeTreeIndexGranularityInfo & index_granularity_info_, - const IDataPartStorage & data_part_storage_, const std::string & any_column_file_name) + MergeTreeIndexGranularityPtr & index_granularity_ptr, + MergeTreeIndexGranularityInfo & index_granularity_info_, + const IDataPartStorage & data_part_storage_, + const std::string & any_column_file_name, + const MergeTreeSettings & storage_settings) { index_granularity_info_.changeGranularityIfRequired(data_part_storage_); @@ -127,12 +137,13 @@ void MergeTreeDataPartWide::loadIndexGranularityImpl( std::string(fs::path(data_part_storage_.getFullPath()) / marks_file_path)); size_t marks_file_size = data_part_storage_.getFileSize(marks_file_path); + size_t fixed_granularity = index_granularity_info_.fixed_index_granularity; if (!index_granularity_info_.mark_type.adaptive && !index_granularity_info_.mark_type.compressed) { /// The most easy way - no need to read the file, everything is known from its size. size_t marks_count = marks_file_size / index_granularity_info_.getMarkSizeInBytes(); - index_granularity_.resizeWithFixedGranularity(marks_count, index_granularity_info_.fixed_index_granularity); /// all the same + index_granularity_ptr = std::make_shared(fixed_granularity, fixed_granularity, marks_count, false); } else { @@ -145,6 +156,7 @@ void MergeTreeDataPartWide::loadIndexGranularityImpl( marks_reader = std::make_unique(std::move(marks_file)); size_t marks_count = 0; + while (!marks_reader->eof()) { MarkInCompressedFile mark; @@ -157,15 +169,20 @@ void MergeTreeDataPartWide::loadIndexGranularityImpl( if (index_granularity_info_.mark_type.adaptive) { readBinaryLittleEndian(granularity, *marks_reader); - index_granularity_.appendMark(granularity); + index_granularity_ptr->appendMark(granularity); } } if (!index_granularity_info_.mark_type.adaptive) - index_granularity_.resizeWithFixedGranularity(marks_count, index_granularity_info_.fixed_index_granularity); /// all the same + { + index_granularity_ptr = std::make_shared(fixed_granularity, fixed_granularity, marks_count, false); + } + else if (storage_settings[MergeTreeSetting::enable_index_granularity_compression]) + { + if (auto new_granularity_ptr = index_granularity_ptr->optimize()) + index_granularity_ptr = std::move(new_granularity_ptr); + } } - - index_granularity_.setInitialized(); } void MergeTreeDataPartWide::loadIndexGranularity() @@ -179,7 +196,7 @@ void MergeTreeDataPartWide::loadIndexGranularity() "There are no files for column {} in part {}", columns.front().name, getDataPartStorage().getFullPath()); - loadIndexGranularityImpl(index_granularity, index_granularity_info, getDataPartStorage(), *any_column_filename); + loadIndexGranularityImpl(index_granularity, index_granularity_info, getDataPartStorage(), *any_column_filename, *storage.getSettings()); } void MergeTreeDataPartWide::loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const @@ -209,7 +226,7 @@ void MergeTreeDataPartWide::loadMarksToCache(const Names & column_names, MarkCac info_for_read, mark_cache, index_granularity_info.getMarksFilePath(*stream_name), - index_granularity.getMarksCount(), + index_granularity->getMarksCount(), index_granularity_info, /*save_marks_in_cache=*/ true, read_settings, diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.h b/src/Storages/MergeTree/MergeTreeDataPartWide.h index a6d4897ed87..2af47b7b943 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.h @@ -55,8 +55,11 @@ public: protected: static void loadIndexGranularityImpl( - MergeTreeIndexGranularity & index_granularity_, MergeTreeIndexGranularityInfo & index_granularity_info_, - const IDataPartStorage & data_part_storage_, const std::string & any_column_file_name); + MergeTreeIndexGranularityPtr & index_granularity_ptr, + MergeTreeIndexGranularityInfo & index_granularity_info_, + const IDataPartStorage & data_part_storage_, + const std::string & any_column_file_name, + const MergeTreeSettings & storage_settings); void doCheckConsistency(bool require_part_metadata) const override; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index c8d11ced683..759a2d4b6a5 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -25,13 +25,13 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact( const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & settings_, - const MergeTreeIndexGranularity & index_granularity_) + MergeTreeIndexGranularityPtr index_granularity_) : MergeTreeDataPartWriterOnDisk( data_part_name_, logger_name_, serializations_, data_part_storage_, index_granularity_info_, storage_settings_, columns_list_, metadata_snapshot_, virtual_columns_, indices_to_recalc_, stats_to_recalc, marks_file_extension_, - default_codec_, settings_, index_granularity_) + default_codec_, settings_, std::move(index_granularity_)) , plain_file(getDataPartStorage().writeFile( MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION, settings.max_compress_block_size, @@ -189,13 +189,13 @@ void MergeTreeDataPartWriterCompact::write(const Block & block, const IColumn::P header = result_block.cloneEmpty(); columns_buffer.add(result_block.mutateColumns()); - size_t current_mark_rows = index_granularity.getMarkRows(getCurrentMark()); + size_t current_mark_rows = index_granularity->getMarkRows(getCurrentMark()); size_t rows_in_buffer = columns_buffer.size(); if (rows_in_buffer >= current_mark_rows) { Block flushed_block = header.cloneWithColumns(columns_buffer.releaseColumns()); - auto granules_to_write = getGranulesToWrite(index_granularity, flushed_block.rows(), getCurrentMark(), /* last_block = */ false); + auto granules_to_write = getGranulesToWrite(*index_granularity, flushed_block.rows(), getCurrentMark(), /* last_block = */ false); writeDataBlockPrimaryIndexAndSkipIndices(flushed_block, granules_to_write); setCurrentMark(getCurrentMark() + granules_to_write.size()); calculateAndSerializeStatistics(flushed_block); @@ -274,12 +274,11 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(MergeTreeDataPartChecksum if (columns_buffer.size() != 0) { auto block = header.cloneWithColumns(columns_buffer.releaseColumns()); - auto granules_to_write = getGranulesToWrite(index_granularity, block.rows(), getCurrentMark(), /* last_block = */ true); + auto granules_to_write = getGranulesToWrite(*index_granularity, block.rows(), getCurrentMark(), /*last_block=*/ true); if (!granules_to_write.back().is_complete) { /// Correct last mark as it should contain exact amount of rows. - index_granularity.popMark(); - index_granularity.appendMark(granules_to_write.back().rows_to_write); + index_granularity->adjustLastMark(granules_to_write.back().rows_to_write); } writeDataBlockPrimaryIndexAndSkipIndices(block, granules_to_write); } @@ -375,11 +374,11 @@ static void fillIndexGranularityImpl( void MergeTreeDataPartWriterCompact::fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) { size_t index_offset = 0; - if (index_granularity.getMarksCount() > getCurrentMark()) - index_offset = index_granularity.getMarkRows(getCurrentMark()) - columns_buffer.size(); + if (index_granularity->getMarksCount() > getCurrentMark()) + index_offset = index_granularity->getMarkRows(getCurrentMark()) - columns_buffer.size(); fillIndexGranularityImpl( - index_granularity, + *index_granularity, index_offset, index_granularity_for_block, rows_in_block); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h index b3e2e78491d..db2a362a1df 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h @@ -25,7 +25,7 @@ public: const String & marks_file_extension, const CompressionCodecPtr & default_codec, const MergeTreeWriterSettings & settings, - const MergeTreeIndexGranularity & index_granularity); + MergeTreeIndexGranularityPtr index_granularity_); void write(const Block & block, const IColumn::Permutation * permutation) override; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index c483d47fed7..1f913623fc9 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -162,20 +162,20 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk( const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & settings_, - const MergeTreeIndexGranularity & index_granularity_) + MergeTreeIndexGranularityPtr index_granularity_) : IMergeTreeDataPartWriter( data_part_name_, serializations_, data_part_storage_, index_granularity_info_, - storage_settings_, columns_list_, metadata_snapshot_, virtual_columns_, settings_, index_granularity_) + storage_settings_, columns_list_, metadata_snapshot_, virtual_columns_, settings_, std::move(index_granularity_)) , skip_indices(indices_to_recalc_) , stats(stats_to_recalc_) , marks_file_extension(marks_file_extension_) , default_codec(default_codec_) - , compute_granularity(index_granularity.empty()) + , compute_granularity(index_granularity->empty()) , compress_primary_key(settings.compress_primary_key) , execution_stats(skip_indices.size(), stats.size()) , log(getLogger(logger_name_ + " (DataPartWriter)")) { - if (settings.blocks_are_granules_size && !index_granularity.empty()) + if (settings.blocks_are_granules_size && !index_granularity->empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't take information about index granularity from blocks, when non empty index_granularity array specified"); @@ -189,63 +189,15 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk( initStatistics(); } -// Implementation is split into static functions for ability -/// of making unit tests without creation instance of IMergeTreeDataPartWriter, -/// which requires a lot of dependencies and access to filesystem. -static size_t computeIndexGranularityImpl( - const Block & block, - size_t index_granularity_bytes, - size_t fixed_index_granularity_rows, - bool blocks_are_granules, - bool can_use_adaptive_index_granularity) -{ - size_t rows_in_block = block.rows(); - size_t index_granularity_for_block; - - if (!can_use_adaptive_index_granularity) - { - index_granularity_for_block = fixed_index_granularity_rows; - } - else - { - size_t block_size_in_memory = block.bytes(); - if (blocks_are_granules) - { - index_granularity_for_block = rows_in_block; - } - else if (block_size_in_memory >= index_granularity_bytes) - { - size_t granules_in_block = block_size_in_memory / index_granularity_bytes; - index_granularity_for_block = rows_in_block / granules_in_block; - } - else - { - size_t size_of_row_in_bytes = std::max(block_size_in_memory / rows_in_block, 1UL); - index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes; - } - } - - /// We should be less or equal than fixed index granularity. - /// But if block size is a granule size then do not adjust it. - /// Granularity greater than fixed granularity might come from compact part. - if (!blocks_are_granules) - index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block); - - /// Very rare case when index granularity bytes less than single row. - if (index_granularity_for_block == 0) - index_granularity_for_block = 1; - - return index_granularity_for_block; -} - size_t MergeTreeDataPartWriterOnDisk::computeIndexGranularity(const Block & block) const { - return computeIndexGranularityImpl( - block, - (*storage_settings)[MergeTreeSetting::index_granularity_bytes], - (*storage_settings)[MergeTreeSetting::index_granularity], - settings.blocks_are_granules_size, - settings.can_use_adaptive_granularity); + return DB::computeIndexGranularity( + block.rows(), + block.bytes(), + (*storage_settings)[MergeTreeSetting::index_granularity_bytes], + (*storage_settings)[MergeTreeSetting::index_granularity], + settings.blocks_are_granules_size, + settings.can_use_adaptive_granularity); } void MergeTreeDataPartWriterOnDisk::initPrimaryIndex() @@ -433,7 +385,7 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat { bool write_final_mark = (with_final_mark && data_written); if (write_final_mark && compute_granularity) - index_granularity.appendMark(0); + index_granularity->appendMark(0); if (index_file_hashing_stream) { diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h index b22d58ba51e..b937ab79ebc 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h @@ -116,7 +116,7 @@ public: const String & marks_file_extension, const CompressionCodecPtr & default_codec, const MergeTreeWriterSettings & settings, - const MergeTreeIndexGranularity & index_granularity); + MergeTreeIndexGranularityPtr index_granularity_); void setWrittenOffsetColumns(WrittenOffsetColumns * written_offset_columns_) { diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 7c9724b1b75..11e83cba036 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -99,13 +99,13 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide( const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & settings_, - const MergeTreeIndexGranularity & index_granularity_) + MergeTreeIndexGranularityPtr index_granularity_) : MergeTreeDataPartWriterOnDisk( data_part_name_, logger_name_, serializations_, data_part_storage_, index_granularity_info_, storage_settings_, columns_list_, metadata_snapshot_, virtual_columns_, indices_to_recalc_, stats_to_recalc_, marks_file_extension_, - default_codec_, settings_, index_granularity_) + default_codec_, settings_, std::move(index_granularity_)) { if (settings.save_marks_in_cache) { @@ -238,8 +238,8 @@ void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_wri if (settings.can_use_adaptive_granularity && settings.blocks_are_granules_size) throw Exception(ErrorCodes::LOGICAL_ERROR, "Incomplete granules are not allowed while blocks are granules size. " "Mark number {} (rows {}), rows written in last mark {}, rows to write in last mark from block {} (from row {}), " - "total marks currently {}", last_granule.mark_number, index_granularity.getMarkRows(last_granule.mark_number), - rows_written_in_last_mark, last_granule.rows_to_write, last_granule.start_row, index_granularity.getMarksCount()); + "total marks currently {}", last_granule.mark_number, index_granularity->getMarkRows(last_granule.mark_number), + rows_written_in_last_mark, last_granule.rows_to_write, last_granule.start_row, index_granularity->getMarksCount()); /// Shift forward except last granule setCurrentMark(getCurrentMark() + granules_written.size() - 1); @@ -273,10 +273,15 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm /// but not in case of vertical part of vertical merge) if (compute_granularity) { - size_t index_granularity_for_block = computeIndexGranularity(block_to_write); + size_t index_granularity_for_block; + if (auto constant_granularity = index_granularity->getConstantGranularity()) + index_granularity_for_block = *constant_granularity; + else + index_granularity_for_block = computeIndexGranularity(block_to_write); + if (rows_written_in_last_mark > 0) { - size_t rows_left_in_last_mark = index_granularity.getMarkRows(getCurrentMark()) - rows_written_in_last_mark; + size_t rows_left_in_last_mark = index_granularity->getMarkRows(getCurrentMark()) - rows_written_in_last_mark; /// Previous granularity was much bigger than our new block's /// granularity let's adjust it, because we want add new /// heavy-weight blocks into small old granule. @@ -294,7 +299,7 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm fillIndexGranularity(index_granularity_for_block, block_to_write.rows()); } - auto granules_to_write = getGranulesToWrite(index_granularity, block_to_write.rows(), getCurrentMark(), rows_written_in_last_mark); + auto granules_to_write = getGranulesToWrite(*index_granularity, block_to_write.rows(), getCurrentMark(), rows_written_in_last_mark); auto offset_columns = written_offset_columns ? *written_offset_columns : WrittenOffsetColumns{}; Block primary_key_block; @@ -482,7 +487,7 @@ void MergeTreeDataPartWriterWide::writeColumn( throw Exception(ErrorCodes::LOGICAL_ERROR, "We have to add new mark for column, but already have non written mark. " "Current mark {}, total marks {}, offset {}", - getCurrentMark(), index_granularity.getMarksCount(), rows_written_in_last_mark); + getCurrentMark(), index_granularity->getMarksCount(), rows_written_in_last_mark); last_non_written_marks[name] = getCurrentMarksForColumn(name_and_type, column.getPtr(), offset_columns); } @@ -502,7 +507,7 @@ void MergeTreeDataPartWriterWide::writeColumn( throw Exception(ErrorCodes::LOGICAL_ERROR, "No mark was saved for incomplete granule for column {}", backQuoteIfNeed(name)); for (const auto & mark : marks_it->second) - flushMarkToFile(mark, index_granularity.getMarkRows(granule.mark_number)); + flushMarkToFile(mark, index_granularity->getMarkRows(granule.mark_number)); last_non_written_marks.erase(marks_it); } } @@ -549,10 +554,10 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai for (mark_num = 0; !mrk_in->eof(); ++mark_num) { - if (mark_num > index_granularity.getMarksCount()) + if (mark_num > index_granularity->getMarksCount()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect number of marks in memory {}, on disk (at least) {}", - index_granularity.getMarksCount(), mark_num + 1); + index_granularity->getMarksCount(), mark_num + 1); readBinaryLittleEndian(offset_in_compressed_file, *mrk_in); readBinaryLittleEndian(offset_in_decompressed_block, *mrk_in); @@ -583,10 +588,10 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai throw Exception(ErrorCodes::LOGICAL_ERROR, "Still have {} rows in bin stream, last mark #{}" " index granularity size {}, last rows {}", - column->size(), mark_num, index_granularity.getMarksCount(), index_granularity_rows); + column->size(), mark_num, index_granularity->getMarksCount(), index_granularity_rows); } - if (index_granularity_rows != index_granularity.getMarkRows(mark_num)) + if (index_granularity_rows != index_granularity->getMarkRows(mark_num)) { throw Exception( ErrorCodes::LOGICAL_ERROR, @@ -594,8 +599,8 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai " (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}, total marks {}", getDataPartStorage().getFullPath(), mark_num, offset_in_compressed_file, offset_in_decompressed_block, - index_granularity.getMarkRows(mark_num), index_granularity_rows, - index_granularity.getMarksCount()); + index_granularity->getMarkRows(mark_num), index_granularity_rows, + index_granularity->getMarksCount()); } auto column = type->createColumn(); @@ -630,7 +635,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for mark #{} (compressed offset {}, decompressed offset {}), " "actually in bin file {}, in mrk file {}, total marks {}", mark_num, offset_in_compressed_file, offset_in_decompressed_block, column->size(), - index_granularity.getMarkRows(mark_num), index_granularity.getMarksCount()); + index_granularity->getMarkRows(mark_num), index_granularity->getMarksCount()); } } @@ -638,7 +643,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai throw Exception(ErrorCodes::LOGICAL_ERROR, "Still have something in marks stream, last mark #{}" " index granularity size {}, last rows {}", - mark_num, index_granularity.getMarksCount(), index_granularity_rows); + mark_num, index_granularity->getMarksCount(), index_granularity_rows); if (!bin_in.eof()) { auto column = type->createColumn(); @@ -648,7 +653,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai throw Exception(ErrorCodes::LOGICAL_ERROR, "Still have {} rows in bin stream, last mark #{}" " index granularity size {}, last rows {}", - column->size(), mark_num, index_granularity.getMarksCount(), index_granularity_rows); + column->size(), mark_num, index_granularity->getMarksCount(), index_granularity_rows); } } @@ -665,8 +670,8 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(MergeTreeDataPartChecksums & throw Exception(ErrorCodes::LOGICAL_ERROR, "Incomplete granule is not allowed while blocks are granules size even for last granule. " "Mark number {} (rows {}), rows written for last mark {}, total marks {}", - getCurrentMark(), index_granularity.getMarkRows(getCurrentMark()), - rows_written_in_last_mark, index_granularity.getMarksCount()); + getCurrentMark(), index_granularity->getMarkRows(getCurrentMark()), + rows_written_in_last_mark, index_granularity->getMarksCount()); adjustLastMarkIfNeedAndFlushToDisk(rows_written_in_last_mark); } @@ -785,16 +790,16 @@ static void fillIndexGranularityImpl( void MergeTreeDataPartWriterWide::fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) { - if (getCurrentMark() < index_granularity.getMarksCount() && getCurrentMark() != index_granularity.getMarksCount() - 1) + if (getCurrentMark() < index_granularity->getMarksCount() && getCurrentMark() != index_granularity->getMarksCount() - 1) throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to add marks, while current mark {}, but total marks {}", - getCurrentMark(), index_granularity.getMarksCount()); + getCurrentMark(), index_granularity->getMarksCount()); size_t index_offset = 0; if (rows_written_in_last_mark != 0) - index_offset = index_granularity.getLastMarkRows() - rows_written_in_last_mark; + index_offset = index_granularity->getLastMarkRows() - rows_written_in_last_mark; fillIndexGranularityImpl( - index_granularity, + *index_granularity, index_offset, index_granularity_for_block, rows_in_block); @@ -813,27 +818,26 @@ void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk(size_t new_ /// other columns if (compute_granularity && settings.can_use_adaptive_granularity) { - if (getCurrentMark() != index_granularity.getMarksCount() - 1) + if (getCurrentMark() != index_granularity->getMarksCount() - 1) throw Exception(ErrorCodes::LOGICAL_ERROR, "Non last mark {} (with {} rows) having rows offset {}, total marks {}", - getCurrentMark(), index_granularity.getMarkRows(getCurrentMark()), - rows_written_in_last_mark, index_granularity.getMarksCount()); + getCurrentMark(), index_granularity->getMarkRows(getCurrentMark()), + rows_written_in_last_mark, index_granularity->getMarksCount()); - index_granularity.popMark(); - index_granularity.appendMark(new_rows_in_last_mark); + index_granularity->adjustLastMark(new_rows_in_last_mark); } /// Last mark should be filled, otherwise it's a bug if (last_non_written_marks.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "No saved marks for last mark {} having rows offset {}, total marks {}", - getCurrentMark(), rows_written_in_last_mark, index_granularity.getMarksCount()); + getCurrentMark(), rows_written_in_last_mark, index_granularity->getMarksCount()); if (rows_written_in_last_mark == new_rows_in_last_mark) { for (const auto & [name, marks] : last_non_written_marks) { for (const auto & mark : marks) - flushMarkToFile(mark, index_granularity.getMarkRows(getCurrentMark())); + flushMarkToFile(mark, index_granularity->getMarkRows(getCurrentMark())); } last_non_written_marks.clear(); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index 19304b28c6c..409c7c7fb52 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -35,7 +35,7 @@ public: const String & marks_file_extension, const CompressionCodecPtr & default_codec, const MergeTreeWriterSettings & settings, - const MergeTreeIndexGranularity & index_granularity); + MergeTreeIndexGranularityPtr index_granularity_); void write(const Block & block, const IColumn::Permutation * permutation) override; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index c4ca545ca90..52ea6db787d 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -129,7 +129,7 @@ size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead( { MarkRanges part_ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, {}, &exact_ranges, settings, log); for (const auto & range : part_ranges) - rows_count += part->index_granularity.getRowsCountInRange(range); + rows_count += part->index_granularity->getRowsCountInRange(range); } UNUSED(exact_ranges); @@ -688,7 +688,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd auto & part = parts[part_index]; RangesInDataPart ranges(part, part_index); - size_t total_marks_count = part->index_granularity.getMarksCountWithoutFinal(); + size_t total_marks_count = part->index_granularity->getMarksCountWithoutFinal(); if (metadata_snapshot->hasPrimaryKey() || part_offset_condition) { @@ -1044,11 +1044,11 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange( { MarkRanges res; - size_t marks_count = part->index_granularity.getMarksCount(); + size_t marks_count = part->index_granularity->getMarksCount(); if (marks_count == 0) return res; - bool has_final_mark = part->index_granularity.hasFinalMark(); + bool has_final_mark = part->index_granularity->hasFinalMark(); bool key_condition_useful = !key_condition.alwaysUnknownOrTrue(); bool part_offset_condition_useful = part_offset_condition && !part_offset_condition->alwaysUnknownOrTrue(); @@ -1160,16 +1160,16 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange( auto check_part_offset_condition = [&]() { - auto begin = part->index_granularity.getMarkStartingRow(range.begin); - auto end = part->index_granularity.getMarkStartingRow(range.end) - 1; + auto begin = part->index_granularity->getMarkStartingRow(range.begin); + auto end = part->index_granularity->getMarkStartingRow(range.end) - 1; if (begin > end) { /// Empty mark (final mark) return BoolMask(false, true); } - part_offset_left[0] = part->index_granularity.getMarkStartingRow(range.begin); - part_offset_right[0] = part->index_granularity.getMarkStartingRow(range.end) - 1; + part_offset_left[0] = part->index_granularity->getMarkStartingRow(range.begin); + part_offset_right[0] = part->index_granularity->getMarkStartingRow(range.end) - 1; part_offset_left[1] = part->name; part_offset_right[1] = part->name; @@ -1381,9 +1381,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex( part->index_granularity_info.fixed_index_granularity, part->index_granularity_info.index_granularity_bytes); - size_t marks_count = part->getMarksCount(); - size_t final_mark = part->index_granularity.hasFinalMark(); - size_t index_marks_count = (marks_count - final_mark + index_granularity - 1) / index_granularity; + size_t marks_count = part->index_granularity->getMarksCountWithoutFinal(); + size_t index_marks_count = (marks_count + index_granularity - 1) / index_granularity; MarkRanges index_ranges; for (const auto & range : ranges) @@ -1431,8 +1430,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex( for (auto row : rows) { - const MergeTreeIndexGranularity & merge_tree_index_granularity = part->index_granularity; - size_t num_marks = merge_tree_index_granularity.countMarksForRows(index_mark * index_granularity, row); + size_t num_marks = part->index_granularity->countMarksForRows(index_mark * index_granularity, row); MarkRange data_range( std::max(ranges[i].begin, (index_mark * index_granularity) + num_marks), @@ -1505,9 +1503,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingMergedIndex( part->index_granularity_info.fixed_index_granularity, part->index_granularity_info.index_granularity_bytes); - size_t marks_count = part->getMarksCount(); - size_t final_mark = part->index_granularity.hasFinalMark(); - size_t index_marks_count = (marks_count - final_mark + index_granularity - 1) / index_granularity; + size_t marks_count = part->index_granularity->getMarksCountWithoutFinal(); + size_t index_marks_count = (marks_count + index_granularity - 1) / index_granularity; std::vector> readers; for (const auto & index_helper : indices) @@ -1607,9 +1604,7 @@ void MergeTreeDataSelectExecutor::selectPartsToRead( continue; } - size_t num_granules = part->getMarksCount(); - if (num_granules && part->index_granularity.hasFinalMark()) - --num_granules; + size_t num_granules = part->index_granularity->getMarksCountWithoutFinal(); counters.num_initial_selected_parts += 1; counters.num_initial_selected_granules += num_granules; @@ -1676,9 +1671,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter( if (part->uuid != UUIDHelpers::Nil && ignored_part_uuids->has(part->uuid)) continue; - size_t num_granules = part->getMarksCount(); - if (num_granules && part->index_granularity.hasFinalMark()) - --num_granules; + size_t num_granules = part->index_granularity->getMarksCountWithoutFinal(); counters.num_initial_selected_parts += 1; counters.num_initial_selected_granules += num_granules; diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 6d19f45e2c4..cbec8862f94 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -687,6 +687,13 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl( auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0); bool save_marks_in_cache = (*data_settings)[MergeTreeSetting::prewarm_mark_cache] && data.getContext()->getMarkCache(); + auto index_granularity_ptr = createMergeTreeIndexGranularity( + block.rows(), + block.bytes(), + *data.getSettings(), + new_data_part->index_granularity_info, + /*blocks_are_granules=*/ false); + auto out = std::make_unique( new_data_part, metadata_snapshot, @@ -694,6 +701,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl( indices, statistics, compression_codec, + std::move(index_granularity_ptr), context->getCurrentTransaction() ? context->getCurrentTransaction()->tid : Tx::PrehistoricTID, /*reset_columns=*/ false, save_marks_in_cache, @@ -834,6 +842,13 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl( auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0); bool save_marks_in_cache = (*data.getSettings())[MergeTreeSetting::prewarm_mark_cache] && data.getContext()->getMarkCache(); + auto index_granularity_ptr = createMergeTreeIndexGranularity( + block.rows(), + block.bytes(), + *data.getSettings(), + new_data_part->index_granularity_info, + /*blocks_are_granules=*/ false); + auto out = std::make_unique( new_data_part, metadata_snapshot, @@ -842,6 +857,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl( /// TODO(hanfei): It should be helpful to write statistics for projection result. ColumnsStatistics{}, compression_codec, + std::move(index_granularity_ptr), Tx::PrehistoricTID, /*reset_columns=*/ false, save_marks_in_cache, diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularity.cpp b/src/Storages/MergeTree/MergeTreeIndexGranularity.cpp index bf0ba17d473..f2ed8fdeca8 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularity.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexGranularity.cpp @@ -1,77 +1,23 @@ #include -#include +#include +#include +#include +#include #include - namespace DB { + namespace ErrorCodes { extern const int LOGICAL_ERROR; } -MergeTreeIndexGranularity::MergeTreeIndexGranularity(const std::vector & marks_rows_partial_sums_) - : marks_rows_partial_sums(marks_rows_partial_sums_) +namespace MergeTreeSetting { -} - -/// Rows after mark to next mark -size_t MergeTreeIndexGranularity::getMarkRows(size_t mark_index) const -{ - if (mark_index >= getMarksCount()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get non existing mark {}, while size is {}", mark_index, getMarksCount()); - if (mark_index == 0) - return marks_rows_partial_sums[0]; - return marks_rows_partial_sums[mark_index] - marks_rows_partial_sums[mark_index - 1]; -} - -size_t MergeTreeIndexGranularity::getMarkStartingRow(size_t mark_index) const -{ - if (mark_index == 0) - return 0; - return marks_rows_partial_sums[mark_index - 1]; -} - -size_t MergeTreeIndexGranularity::getMarksCount() const -{ - return marks_rows_partial_sums.size(); -} - -size_t MergeTreeIndexGranularity::getTotalRows() const -{ - if (marks_rows_partial_sums.empty()) - return 0; - return marks_rows_partial_sums.back(); -} - -void MergeTreeIndexGranularity::appendMark(size_t rows_count) -{ - if (marks_rows_partial_sums.empty()) - marks_rows_partial_sums.push_back(rows_count); - else - marks_rows_partial_sums.push_back(marks_rows_partial_sums.back() + rows_count); -} - -void MergeTreeIndexGranularity::addRowsToLastMark(size_t rows_count) -{ - if (marks_rows_partial_sums.empty()) - marks_rows_partial_sums.push_back(rows_count); - else - marks_rows_partial_sums.back() += rows_count; -} - -void MergeTreeIndexGranularity::popMark() -{ - if (!marks_rows_partial_sums.empty()) - marks_rows_partial_sums.pop_back(); -} - -size_t MergeTreeIndexGranularity::getRowsCountInRange(size_t begin, size_t end) const -{ - size_t subtrahend = 0; - if (begin != 0) - subtrahend = marks_rows_partial_sums[begin - 1]; - return marks_rows_partial_sums[end - 1] - subtrahend; + extern const MergeTreeSettingsUInt64 index_granularity; + extern const MergeTreeSettingsUInt64 index_granularity_bytes; + extern const MergeTreeSettingsBool use_const_adaptive_granularity; } size_t MergeTreeIndexGranularity::getRowsCountInRange(const MarkRange & range) const @@ -87,55 +33,118 @@ size_t MergeTreeIndexGranularity::getRowsCountInRanges(const MarkRanges & ranges return total; } -size_t MergeTreeIndexGranularity::countMarksForRows(size_t from_mark, size_t number_of_rows) const +size_t MergeTreeIndexGranularity::getMarksCountWithoutFinal() const { - size_t rows_before_mark = getMarkStartingRow(from_mark); - size_t last_row_pos = rows_before_mark + number_of_rows; - auto it = std::upper_bound(marks_rows_partial_sums.begin(), marks_rows_partial_sums.end(), last_row_pos); - size_t to_mark = it - marks_rows_partial_sums.begin(); - return to_mark - from_mark; + size_t total = getMarksCount(); + if (total == 0) + return total; + return total - hasFinalMark(); } -size_t MergeTreeIndexGranularity::countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const +size_t MergeTreeIndexGranularity::getMarkStartingRow(size_t mark_index) const { - size_t rows_before_mark = getMarkStartingRow(from_mark); - size_t last_row_pos = rows_before_mark + offset_in_rows + number_of_rows; - auto it = std::upper_bound(marks_rows_partial_sums.begin(), marks_rows_partial_sums.end(), last_row_pos); - size_t to_mark = it - marks_rows_partial_sums.begin(); - - return getRowsCountInRange(from_mark, std::max(1UL, to_mark)) - offset_in_rows; + return getRowsCountInRange(0, mark_index); } -void MergeTreeIndexGranularity::resizeWithFixedGranularity(size_t size, size_t fixed_granularity) +size_t MergeTreeIndexGranularity::getLastMarkRows() const { - marks_rows_partial_sums.resize(size); + return getMarkRows(getMarksCount() - 1); +} - size_t prev = 0; - for (size_t i = 0; i < size; ++i) +size_t MergeTreeIndexGranularity::getLastNonFinalMarkRows() const +{ + size_t last_mark_rows = getMarkRows(getMarksCount() - 1); + if (last_mark_rows != 0) + return last_mark_rows; + return getMarkRows(getMarksCount() - 2); +} + +void MergeTreeIndexGranularity::addRowsToLastMark(size_t rows_count) +{ + if (hasFinalMark()) { - marks_rows_partial_sums[i] = fixed_granularity + prev; - prev = marks_rows_partial_sums[i]; + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add rows to final mark"); + } + else if (empty()) + { + appendMark(rows_count); + } + else + { + adjustLastMark(getLastMarkRows() + rows_count); } } -std::string MergeTreeIndexGranularity::describe() const +size_t computeIndexGranularity( + size_t rows, + size_t bytes_uncompressed, + size_t index_granularity_bytes, + size_t fixed_index_granularity_rows, + bool blocks_are_granules, + bool can_use_adaptive_index_granularity) { - return fmt::format("initialized: {}, marks_rows_partial_sums: [{}]", initialized, fmt::join(marks_rows_partial_sums, ", ")); + size_t index_granularity_for_block; + + if (!can_use_adaptive_index_granularity) + { + index_granularity_for_block = fixed_index_granularity_rows; + } + else + { + if (blocks_are_granules) + { + index_granularity_for_block = rows; + } + else if (bytes_uncompressed >= index_granularity_bytes) + { + size_t granules_in_block = bytes_uncompressed / index_granularity_bytes; + index_granularity_for_block = rows / granules_in_block; + } + else + { + size_t size_of_row_in_bytes = std::max(bytes_uncompressed / rows, 1UL); + index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes; + } + } + + /// We should be less or equal than fixed index granularity. + /// But if block size is a granule size then do not adjust it. + /// Granularity greater than fixed granularity might come from compact part. + if (!blocks_are_granules) + index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block); + + /// Very rare case when index granularity bytes less than single row. + if (index_granularity_for_block == 0) + index_granularity_for_block = 1; + + return index_granularity_for_block; } -void MergeTreeIndexGranularity::shrinkToFitInMemory() +MergeTreeIndexGranularityPtr createMergeTreeIndexGranularity( + size_t rows, + size_t bytes_uncompressed, + const MergeTreeSettings & settings, + const MergeTreeIndexGranularityInfo & info, + bool blocks_are_granules) { - marks_rows_partial_sums.shrink_to_fit(); + bool use_adaptive_granularity = info.mark_type.adaptive; + bool use_const_adaptive_granularity = settings[MergeTreeSetting::use_const_adaptive_granularity]; + bool is_compact_part = info.mark_type.part_type == MergeTreeDataPartType::Compact; + + /// Compact parts cannot work without adaptive granularity. + /// If part is empty create adaptive granularity because constant granularity doesn't support this corner case. + if (rows == 0 || blocks_are_granules || is_compact_part || (use_adaptive_granularity && !use_const_adaptive_granularity)) + return std::make_shared(); + + size_t computed_granularity = computeIndexGranularity( + rows, + bytes_uncompressed, + settings[MergeTreeSetting::index_granularity_bytes], + settings[MergeTreeSetting::index_granularity], + blocks_are_granules, + use_adaptive_granularity); + + return std::make_shared(computed_granularity); } -uint64_t MergeTreeIndexGranularity::getBytesSize() const -{ - return marks_rows_partial_sums.size() * sizeof(size_t); -} -uint64_t MergeTreeIndexGranularity::getBytesAllocated() const -{ - return marks_rows_partial_sums.capacity() * sizeof(size_t); -} - - } diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularity.h b/src/Storages/MergeTree/MergeTreeIndexGranularity.h index c616d2ac49a..cb2593b8ae6 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularity.h +++ b/src/Storages/MergeTree/MergeTreeIndexGranularity.h @@ -1,35 +1,28 @@ #pragma once -#include +#include #include namespace DB { -/// Class contains information about index granularity in rows of IMergeTreeDataPart -/// Inside it contains vector of partial sums of rows after mark: -/// |-----|---|----|----| -/// | 5 | 8 | 12 | 16 | -/// If user doesn't specify setting index_granularity_bytes for MergeTree* table -/// all values in inner vector would have constant stride (default 8192). +/// Class that contains information about index granularity in rows of IMergeTreeDataPart class MergeTreeIndexGranularity { -private: - std::vector marks_rows_partial_sums; - bool initialized = false; - public: MergeTreeIndexGranularity() = default; - explicit MergeTreeIndexGranularity(const std::vector & marks_rows_partial_sums_); + virtual ~MergeTreeIndexGranularity() = default; + /// Returns granularity if it is constant for whole part (except last granule). + virtual std::optional getConstantGranularity() const = 0; + /// Return count of rows between marks + virtual size_t getRowsCountInRange(size_t begin, size_t end) const = 0; /// Return count of rows between marks size_t getRowsCountInRange(const MarkRange & range) const; - /// Return count of rows between marks - size_t getRowsCountInRange(size_t begin, size_t end) const; /// Return sum of rows between all ranges size_t getRowsCountInRanges(const MarkRanges & ranges) const; /// Return number of marks, starting from `from_marks` that contain `number_of_rows` - size_t countMarksForRows(size_t from_mark, size_t number_of_rows) const; + virtual size_t countMarksForRows(size_t from_mark, size_t number_of_rows) const = 0; /// Return number of rows, starting from `from_mark`, that contains amount of `number_of_rows` /// and possible some offset_in_rows from `from_mark` @@ -37,74 +30,65 @@ public: /// |-----|---------------------------|----|----| /// ^------------------------^-----------^ //// from_mark offset_in_rows number_of_rows - size_t countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const; + virtual size_t countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const = 0; /// Total marks - size_t getMarksCount() const; + virtual size_t getMarksCount() const = 0; /// Total rows - size_t getTotalRows() const; + virtual size_t getTotalRows() const = 0; /// Total number marks without final mark if it exists - size_t getMarksCountWithoutFinal() const { return getMarksCount() - hasFinalMark(); } + size_t getMarksCountWithoutFinal() const; /// Rows after mark to next mark - size_t getMarkRows(size_t mark_index) const; + virtual size_t getMarkRows(size_t mark_index) const = 0; /// Return amount of rows before mark size_t getMarkStartingRow(size_t mark_index) const; /// Amount of rows after last mark - size_t getLastMarkRows() const - { - size_t last = marks_rows_partial_sums.size() - 1; - return getMarkRows(last); - } + size_t getLastMarkRows() const; - size_t getLastNonFinalMarkRows() const - { - size_t last_mark_rows = getLastMarkRows(); - if (last_mark_rows != 0) - return last_mark_rows; - return getMarkRows(marks_rows_partial_sums.size() - 2); - } + /// Amount of rows after last non-final mark + size_t getLastNonFinalMarkRows() const; - bool hasFinalMark() const - { - return getLastMarkRows() == 0; - } + virtual bool hasFinalMark() const = 0; + bool empty() const { return getMarksCount() == 0; } - bool empty() const - { - return marks_rows_partial_sums.empty(); - } + /// Add new mark with rows_count. + virtual void appendMark(size_t rows_count) = 0; - bool isInitialized() const - { - return initialized; - } - - void setInitialized() - { - initialized = true; - } - /// Add new mark with rows_count - void appendMark(size_t rows_count); - - /// Extends last mark by rows_count. + /// Sets last mark equal to rows_count. + virtual void adjustLastMark(size_t rows_count) = 0; void addRowsToLastMark(size_t rows_count); - /// Drops last mark if any exists. - void popMark(); + virtual uint64_t getBytesSize() const = 0; + virtual uint64_t getBytesAllocated() const = 0; - /// Add `size` of marks with `fixed_granularity` rows - void resizeWithFixedGranularity(size_t size, size_t fixed_granularity); - - std::string describe() const; - - void shrinkToFitInMemory(); - - uint64_t getBytesSize() const; - uint64_t getBytesAllocated() const; + /// Possibly optimizes values in memory (for example, to constant value). + /// Returns new optimized index granularity structure or nullptr if no optimization is not applicable. + virtual std::shared_ptr optimize() = 0; + virtual std::string describe() const = 0; }; +using MergeTreeIndexGranularityPtr = std::shared_ptr; + +size_t computeIndexGranularity( + size_t rows, + size_t bytes_uncompressed, + size_t index_granularity_bytes, + size_t fixed_index_granularity_rows, + bool blocks_are_granules, + bool can_use_adaptive_index_granularity); + +struct MergeTreeSettings; +struct MergeTreeIndexGranularityInfo; + +MergeTreeIndexGranularityPtr createMergeTreeIndexGranularity( + size_t rows, + size_t bytes_uncompressed, + const MergeTreeSettings & settings, + const MergeTreeIndexGranularityInfo & info, + bool blocks_are_granules); + } diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityAdaptive.cpp b/src/Storages/MergeTree/MergeTreeIndexGranularityAdaptive.cpp new file mode 100644 index 00000000000..d51afb5be69 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityAdaptive.cpp @@ -0,0 +1,152 @@ +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +MergeTreeIndexGranularityAdaptive::MergeTreeIndexGranularityAdaptive(const std::vector & marks_rows_partial_sums_) + : marks_rows_partial_sums(marks_rows_partial_sums_) +{ +} + +/// Rows after mark to next mark +size_t MergeTreeIndexGranularityAdaptive::getMarkRows(size_t mark_index) const +{ + if (mark_index >= getMarksCount()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get non existing mark {}, while size is {}", mark_index, getMarksCount()); + + if (mark_index == 0) + return marks_rows_partial_sums[0]; + + return marks_rows_partial_sums[mark_index] - marks_rows_partial_sums[mark_index - 1]; +} + +bool MergeTreeIndexGranularityAdaptive::hasFinalMark() const +{ + if (marks_rows_partial_sums.empty()) + return false; + return getLastMarkRows() == 0; +} + +size_t MergeTreeIndexGranularityAdaptive::getMarksCount() const +{ + return marks_rows_partial_sums.size(); +} + +size_t MergeTreeIndexGranularityAdaptive::getTotalRows() const +{ + if (marks_rows_partial_sums.empty()) + return 0; + return marks_rows_partial_sums.back(); +} + +void MergeTreeIndexGranularityAdaptive::appendMark(size_t rows_count) +{ + if (hasFinalMark()) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot append mark after final"); + } + else if (marks_rows_partial_sums.empty()) + { + marks_rows_partial_sums.push_back(rows_count); + } + else + { + marks_rows_partial_sums.push_back(marks_rows_partial_sums.back() + rows_count); + } +} + +void MergeTreeIndexGranularityAdaptive::adjustLastMark(size_t rows_count) +{ + if (hasFinalMark()) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot adjust final mark"); + } + else if (marks_rows_partial_sums.empty()) + { + marks_rows_partial_sums.push_back(rows_count); + } + else + { + marks_rows_partial_sums.pop_back(); + appendMark(rows_count); + } +} + +size_t MergeTreeIndexGranularityAdaptive::getRowsCountInRange(size_t begin, size_t end) const +{ + if (end > getMarksCount()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get marks in range [{}; {}), while size is {}", begin, end, getMarksCount()); + + if (end == 0) + return 0; + + size_t subtrahend = 0; + if (begin != 0) + subtrahend = marks_rows_partial_sums[begin - 1]; + + return marks_rows_partial_sums[end - 1] - subtrahend; +} + +size_t MergeTreeIndexGranularityAdaptive::countMarksForRows(size_t from_mark, size_t number_of_rows) const +{ + size_t rows_before_mark = getMarkStartingRow(from_mark); + size_t last_row_pos = rows_before_mark + number_of_rows; + auto it = std::upper_bound(marks_rows_partial_sums.begin(), marks_rows_partial_sums.end(), last_row_pos); + size_t to_mark = it - marks_rows_partial_sums.begin(); + return to_mark - from_mark; +} + +size_t MergeTreeIndexGranularityAdaptive::countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const +{ + size_t rows_before_mark = getMarkStartingRow(from_mark); + size_t last_row_pos = rows_before_mark + offset_in_rows + number_of_rows; + auto it = std::upper_bound(marks_rows_partial_sums.begin(), marks_rows_partial_sums.end(), last_row_pos); + size_t to_mark = it - marks_rows_partial_sums.begin(); + + return getRowsCountInRange(from_mark, std::max(1UL, to_mark)) - offset_in_rows; +} + +uint64_t MergeTreeIndexGranularityAdaptive::getBytesSize() const +{ + return marks_rows_partial_sums.size() * sizeof(size_t); +} + +uint64_t MergeTreeIndexGranularityAdaptive::getBytesAllocated() const +{ + return marks_rows_partial_sums.capacity() * sizeof(size_t); +} + +std::shared_ptr MergeTreeIndexGranularityAdaptive::optimize() +{ + size_t marks_count = getMarksCountWithoutFinal(); + if (marks_count == 0) + return nullptr; + + size_t first_mark = getMarkRows(0); + for (size_t i = 1; i < marks_count - 1; ++i) + { + if (getMarkRows(i) != first_mark) + { + /// We cannot optimize to constant but at least optimize memory usage. + marks_rows_partial_sums.shrink_to_fit(); + return nullptr; + } + } + + size_t last_mark = getMarkRows(marks_count - 1); + return std::make_shared(first_mark, last_mark, marks_count, hasFinalMark()); +} + +std::string MergeTreeIndexGranularityAdaptive::describe() const +{ + return fmt::format("Adaptive(marks_rows_partial_sums: [{}])", fmt::join(marks_rows_partial_sums, ", ")); +} + +} diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h b/src/Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h new file mode 100644 index 00000000000..c37417a2993 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h @@ -0,0 +1,42 @@ +#pragma once +#include + +namespace DB +{ + +/// Class that stores adaptive index granularity. +/// Inside it contains vector of partial sums of rows after mark: +/// |-----|---|----|----| +/// | 5 | 8 | 12 | 16 | +class MergeTreeIndexGranularityAdaptive final : public MergeTreeIndexGranularity +{ +public: + MergeTreeIndexGranularityAdaptive() = default; + explicit MergeTreeIndexGranularityAdaptive(const std::vector & marks_rows_partial_sums_); + + std::optional getConstantGranularity() const override { return {}; } + size_t getRowsCountInRange(size_t begin, size_t end) const override; + size_t countMarksForRows(size_t from_mark, size_t number_of_rows) const override; + size_t countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const override; + + size_t getMarksCount() const override; + size_t getTotalRows() const override; + + size_t getMarkRows(size_t mark_index) const override; + bool hasFinalMark() const override; + + void appendMark(size_t rows_count) override; + void adjustLastMark(size_t rows_count) override; + + uint64_t getBytesSize() const override; + uint64_t getBytesAllocated() const override; + + std::shared_ptr optimize() override; + std::string describe() const override; + +private: + std::vector marks_rows_partial_sums; +}; + +} + diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityConstant.cpp b/src/Storages/MergeTree/MergeTreeIndexGranularityConstant.cpp new file mode 100644 index 00000000000..ced1f075021 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityConstant.cpp @@ -0,0 +1,143 @@ +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +MergeTreeIndexGranularityConstant::MergeTreeIndexGranularityConstant(size_t constant_granularity_) + : constant_granularity(constant_granularity_) + , last_mark_granularity(constant_granularity_) +{ +} + +MergeTreeIndexGranularityConstant::MergeTreeIndexGranularityConstant(size_t constant_granularity_, size_t last_mark_granularity_, size_t num_marks_without_final_, bool has_final_mark_) + : constant_granularity(constant_granularity_) + , last_mark_granularity(last_mark_granularity_) + , num_marks_without_final(num_marks_without_final_) + , has_final_mark(has_final_mark_) +{ +} + +/// Rows after mark to next mark +size_t MergeTreeIndexGranularityConstant::getMarkRows(size_t mark_index) const +{ + if (mark_index >= getMarksCount()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get non existing mark {}, while size is {}", mark_index, getMarksCount()); + + if (mark_index + 1 < num_marks_without_final) + return constant_granularity; + + if (mark_index + 1 == num_marks_without_final) + return last_mark_granularity; + + return 0; // Final mark. +} + +size_t MergeTreeIndexGranularityConstant::getMarksCount() const +{ + return num_marks_without_final + has_final_mark; +} + +size_t MergeTreeIndexGranularityConstant::getTotalRows() const +{ + if (num_marks_without_final == 0) + return 0; + + return constant_granularity * (num_marks_without_final - 1) + last_mark_granularity; +} + +void MergeTreeIndexGranularityConstant::appendMark(size_t rows_count) +{ + if (has_final_mark) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot append mark after final"); + } + else if (rows_count == 0) + { + has_final_mark = true; + } + else if (rows_count != constant_granularity) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot append mark with {} rows. Granularity is constant ({})", rows_count, constant_granularity); + } + else + { + ++num_marks_without_final; + } +} + +void MergeTreeIndexGranularityConstant::adjustLastMark(size_t rows_count) +{ + if (has_final_mark) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot adjust final mark"); + } + else + { + if (num_marks_without_final == 0) + ++num_marks_without_final; + + last_mark_granularity = rows_count; + } +} + +size_t MergeTreeIndexGranularityConstant::getRowsCountInRange(size_t begin, size_t end) const +{ + if (end > getMarksCount()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get marks in range [{}; {}), while size is {}", begin, end, getMarksCount()); + + if (end == 0) + return 0; + + size_t total_rows = 0; + if (end >= num_marks_without_final) + { + total_rows += last_mark_granularity; + end = num_marks_without_final - 1; + } + + total_rows += constant_granularity * (end - begin); + return total_rows; +} + +size_t MergeTreeIndexGranularityConstant::getMarkUpperBoundForRow(size_t row_index) const +{ + size_t num_rows_with_constant_granularity = (num_marks_without_final - 1) * constant_granularity; + + if (row_index >= getTotalRows()) + return getMarksCount(); + + if (row_index >= num_rows_with_constant_granularity) + return num_marks_without_final - 1; + + return row_index / constant_granularity; +} + +size_t MergeTreeIndexGranularityConstant::countMarksForRows(size_t from_mark, size_t number_of_rows) const +{ + size_t rows_before_mark = getMarkStartingRow(from_mark); + size_t last_row_pos = rows_before_mark + number_of_rows; + + return getMarkUpperBoundForRow(last_row_pos) - from_mark; +} + +size_t MergeTreeIndexGranularityConstant::countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const +{ + size_t rows_before_mark = getMarkStartingRow(from_mark); + size_t last_row_pos = rows_before_mark + offset_in_rows + number_of_rows; + + return getRowsCountInRange(from_mark, std::max(1UL, getMarkUpperBoundForRow(last_row_pos))) - offset_in_rows; +} + +std::string MergeTreeIndexGranularityConstant::describe() const +{ + return fmt::format( + "Constant(constant_granularity: {}, last_mark_granularity: {}, num_marks_without_final: {}, has_final_mark: {})", + constant_granularity, last_mark_granularity, num_marks_without_final, has_final_mark); +} + +} diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityConstant.h b/src/Storages/MergeTree/MergeTreeIndexGranularityConstant.h new file mode 100644 index 00000000000..a670d168ddd --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityConstant.h @@ -0,0 +1,47 @@ +#pragma once +#include + +namespace DB +{ + +/// Class that stores constant index granularity for whole part, except +/// last non-zero granule and final granule which always has zero rows. +class MergeTreeIndexGranularityConstant final : public MergeTreeIndexGranularity +{ +private: + size_t constant_granularity; + size_t last_mark_granularity; + + size_t num_marks_without_final = 0; + bool has_final_mark = false; + + size_t getMarkUpperBoundForRow(size_t row_index) const; + +public: + MergeTreeIndexGranularityConstant() = default; + explicit MergeTreeIndexGranularityConstant(size_t constant_granularity_); + MergeTreeIndexGranularityConstant(size_t constant_granularity_, size_t last_mark_granularity_, size_t num_marks_without_final_, bool has_final_mark_); + + std::optional getConstantGranularity() const override { return constant_granularity; } + size_t getRowsCountInRange(size_t begin, size_t end) const override; + size_t countMarksForRows(size_t from_mark, size_t number_of_rows) const override; + size_t countRowsForRows(size_t from_mark, size_t number_of_rows, size_t offset_in_rows) const override; + + size_t getMarksCount() const override; + size_t getTotalRows() const override; + + size_t getMarkRows(size_t mark_index) const override; + bool hasFinalMark() const override { return has_final_mark; } + + void appendMark(size_t rows_count) override; + void adjustLastMark(size_t rows_count) override; + + uint64_t getBytesSize() const override { return sizeof(size_t) * 3 + sizeof(bool); } + uint64_t getBytesAllocated() const override { return getBytesSize(); } + + std::shared_ptr optimize() override { return nullptr; } + std::string describe() const override; +}; + +} + diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h index b302d6b1a4b..62632b683ae 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h @@ -4,12 +4,12 @@ #include #include #include -#include namespace DB { class MergeTreeData; +class IDataPartStorage; /** Various types of mark files are stored in files with various extensions: diff --git a/src/Storages/MergeTree/MergeTreeReadTask.cpp b/src/Storages/MergeTree/MergeTreeReadTask.cpp index 72fddb93a6d..a2303ee0899 100644 --- a/src/Storages/MergeTree/MergeTreeReadTask.cpp +++ b/src/Storages/MergeTree/MergeTreeReadTask.cpp @@ -151,7 +151,7 @@ UInt64 MergeTreeReadTask::estimateNumRows() const return rows_to_read; const auto & index_granularity = info->data_part->index_granularity; - return index_granularity.countRowsForRows(range_readers.main.currentMark(), rows_to_read, range_readers.main.numReadRowsInCurrentGranule()); + return index_granularity->countRowsForRows(range_readers.main.currentMark(), rows_to_read, range_readers.main.numReadRowsInCurrentGranule()); } MergeTreeReadTask::BlockAndProgress MergeTreeReadTask::read() diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index 83714d4e197..81497032b08 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -230,7 +230,7 @@ try if (!isCancelled() && current_row < data_part->rows_count) { - size_t rows_to_read = data_part->index_granularity.getMarkRows(current_mark); + size_t rows_to_read = data_part->index_granularity->getMarkRows(current_mark); bool continue_reading = (current_mark != 0); const auto & sample = reader->getColumns(); diff --git a/src/Storages/MergeTree/MergeTreeSettings.cpp b/src/Storages/MergeTree/MergeTreeSettings.cpp index fcd4e05cf00..28ae933c3de 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.cpp +++ b/src/Storages/MergeTree/MergeTreeSettings.cpp @@ -187,6 +187,8 @@ namespace ErrorCodes DECLARE(UInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).", 0) \ DECLARE(UInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).", 0) \ DECLARE(UInt64, min_index_granularity_bytes, 1024, "Minimum amount of bytes in single granule.", 1024) \ + DECLARE(Bool, use_const_adaptive_granularity, false, "Always use constant granularity for whole part. It allows to compress in memory values of index granularity. It can be useful in extremely large workloads with thin tables.", 0) \ + DECLARE(Bool, enable_index_granularity_compression, true, "Compress in memory values of index granularity if it is possible", 0) \ DECLARE(Int64, merge_with_ttl_timeout, 3600 * 4, "Minimal time in seconds, when merge with delete TTL can be repeated.", 0) \ DECLARE(Int64, merge_with_recompression_ttl_timeout, 3600 * 4, "Minimal time in seconds, when merge with recompression TTL can be repeated.", 0) \ DECLARE(Bool, ttl_only_drop_parts, false, "Only drop altogether the expired parts and not partially prune them.", 0) \ diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 14a521ce429..9fa8b279e86 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -1,9 +1,9 @@ #include +#include #include #include #include #include -#include #include @@ -15,6 +15,10 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +namespace MergeTreeSetting +{ + extern const MergeTreeSettingsBool enable_index_granularity_compression; +} MergedBlockOutputStream::MergedBlockOutputStream( const MergeTreeMutableDataPartPtr & data_part, @@ -23,12 +27,12 @@ MergedBlockOutputStream::MergedBlockOutputStream( const MergeTreeIndices & skip_indices, const ColumnsStatistics & statistics, CompressionCodecPtr default_codec_, + MergeTreeIndexGranularityPtr index_granularity_ptr, TransactionID tid, bool reset_columns_, bool save_marks_in_cache, bool blocks_are_granules_size, - const WriteSettings & write_settings_, - const MergeTreeIndexGranularity & computed_index_granularity) + const WriteSettings & write_settings_) : IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, reset_columns_) , columns_list(columns_list_) , default_codec(default_codec_) @@ -53,11 +57,22 @@ MergedBlockOutputStream::MergedBlockOutputStream( data_part->storeVersionMetadata(); writer = createMergeTreeDataPartWriter(data_part->getType(), - data_part->name, data_part->storage.getLogName(), data_part->getSerializations(), - data_part_storage, data_part->index_granularity_info, - storage_settings, - columns_list, data_part->getColumnPositions(), metadata_snapshot, data_part->storage.getVirtualsPtr(), - skip_indices, statistics, data_part->getMarksFileExtension(), default_codec, writer_settings, computed_index_granularity); + data_part->name, + data_part->storage.getLogName(), + data_part->getSerializations(), + data_part_storage, + data_part->index_granularity_info, + storage_settings, + columns_list, + data_part->getColumnPositions(), + metadata_snapshot, + data_part->storage.getVirtualsPtr(), + skip_indices, + statistics, + data_part->getMarksFileExtension(), + default_codec, + writer_settings, + std::move(index_granularity_ptr)); } /// If data is pre-sorted. @@ -207,10 +222,14 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync( new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk()); new_part->setBytesUncompressedOnDisk(checksums.getTotalSizeUncompressedOnDisk()); new_part->index_granularity = writer->getIndexGranularity(); - /// Just in case - new_part->index_granularity.shrinkToFitInMemory(); new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(writer->getColumnsSample()); + if ((*new_part->storage.getSettings())[MergeTreeSetting::enable_index_granularity_compression]) + { + if (auto new_index_granularity = new_part->index_granularity->optimize()) + new_part->index_granularity = std::move(new_index_granularity); + } + /// In mutation, existing_rows_count is already calculated in PartMergerWriter /// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count if (!new_part->existing_rows_count.has_value()) diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.h b/src/Storages/MergeTree/MergedBlockOutputStream.h index 060778866e0..94bdb2c491b 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -22,12 +22,12 @@ public: const MergeTreeIndices & skip_indices, const ColumnsStatistics & statistics, CompressionCodecPtr default_codec_, + MergeTreeIndexGranularityPtr index_granularity_ptr, TransactionID tid, bool reset_columns_ = false, bool save_marks_in_cache = false, bool blocks_are_granules_size = false, - const WriteSettings & write_settings = {}, - const MergeTreeIndexGranularity & computed_index_granularity = {}); + const WriteSettings & write_settings = {}); Block getHeader() const { return metadata_snapshot->getSampleBlock(); } diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp index bed539dfe02..4163bceeab7 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp @@ -15,25 +15,25 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( const MergeTreeMutableDataPartPtr & data_part, const StorageMetadataPtr & metadata_snapshot_, const NamesAndTypesList & columns_list_, - CompressionCodecPtr default_codec, const MergeTreeIndices & indices_to_recalc, - const ColumnsStatistics & stats_to_recalc_, - WrittenOffsetColumns * offset_columns_, - bool save_marks_in_cache, - const MergeTreeIndexGranularity & index_granularity, - const MergeTreeIndexGranularityInfo * index_granularity_info) + const ColumnsStatistics & stats_to_recalc, + CompressionCodecPtr default_codec, + MergeTreeIndexGranularityPtr index_granularity_ptr, + WrittenOffsetColumns * offset_columns, + bool save_marks_in_cache) : IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, /*reset_columns=*/ true) { const auto & global_settings = data_part->storage.getContext()->getSettingsRef(); + /// Granularity is never recomputed while writing only columns. MergeTreeWriterSettings writer_settings( global_settings, data_part->storage.getContext()->getWriteSettings(), storage_settings, - index_granularity_info ? index_granularity_info->mark_type.adaptive : data_part->storage.canUseAdaptiveGranularity(), - /* rewrite_primary_key = */ false, + data_part->index_granularity_info.mark_type.adaptive, + /*rewrite_primary_key=*/ false, save_marks_in_cache, - /* blocks_are_granules_size = */ false); + /*blocks_are_granules_size=*/ false); writer = createMergeTreeDataPartWriter( data_part->getType(), @@ -45,17 +45,17 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( metadata_snapshot_, data_part->storage.getVirtualsPtr(), indices_to_recalc, - stats_to_recalc_, + stats_to_recalc, data_part->getMarksFileExtension(), default_codec, writer_settings, - index_granularity); + std::move(index_granularity_ptr)); auto * writer_on_disk = dynamic_cast(writer.get()); if (!writer_on_disk) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MergedColumnOnlyOutputStream supports only parts stored on disk"); - writer_on_disk->setWrittenOffsetColumns(offset_columns_); + writer_on_disk->setWrittenOffsetColumns(offset_columns); } void MergedColumnOnlyOutputStream::write(const Block & block) diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h index f6bf9e37a58..0338273e96c 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h @@ -18,13 +18,12 @@ public: const MergeTreeMutableDataPartPtr & data_part, const StorageMetadataPtr & metadata_snapshot_, const NamesAndTypesList & columns_list_, - CompressionCodecPtr default_codec_, - const MergeTreeIndices & indices_to_recalc_, - const ColumnsStatistics & stats_to_recalc_, - WrittenOffsetColumns * offset_columns_ = nullptr, - bool save_marks_in_cache = false, - const MergeTreeIndexGranularity & index_granularity = {}, - const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr); + const MergeTreeIndices & indices_to_recalc, + const ColumnsStatistics & stats_to_recalc, + CompressionCodecPtr default_codec, + MergeTreeIndexGranularityPtr index_granularity_ptr, + WrittenOffsetColumns * offset_columns = nullptr, + bool save_marks_in_cache = false); void write(const Block & block) override; diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 7f6588fc632..4d3910e2e94 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -74,6 +74,7 @@ namespace MergeTreeSetting extern const MergeTreeSettingsFloat ratio_of_defaults_for_sparse_serialization; extern const MergeTreeSettingsBool replace_long_file_name_to_hash; extern const MergeTreeSettingsBool ttl_only_drop_parts; + extern const MergeTreeSettingsBool enable_index_granularity_compression; } namespace ErrorCodes @@ -984,12 +985,16 @@ void finalizeMutatedPart( new_data_part->rows_count = source_part->rows_count; new_data_part->index_granularity = source_part->index_granularity; - /// Just in case - new_data_part->index_granularity.shrinkToFitInMemory(); new_data_part->setIndex(*source_part->getIndex()); new_data_part->minmax_idx = source_part->minmax_idx; new_data_part->modification_time = time(nullptr); + if ((*new_data_part->storage.getSettings())[MergeTreeSetting::enable_index_granularity_compression]) + { + if (auto new_index_granularity = new_data_part->index_granularity->optimize()) + new_data_part->index_granularity = std::move(new_index_granularity); + } + /// Load rest projections which are hardlinked bool noop; new_data_part->loadProjections(false, false, noop, true /* if_not_loaded */); @@ -1599,7 +1604,6 @@ private: ctx->minmax_idx = std::make_shared(); - MergeTreeIndexGranularity computed_granularity; bool has_delete = false; for (auto & command_for_interpreter : ctx->for_interpreter) @@ -1612,9 +1616,21 @@ private: } } + MergeTreeIndexGranularityPtr index_granularity_ptr; /// Reuse source part granularity if mutation does not change number of rows if (!has_delete && ctx->execute_ttl_type == ExecuteTTLType::NONE) - computed_granularity = ctx->source_part->index_granularity; + { + index_granularity_ptr = ctx->source_part->index_granularity; + } + else + { + index_granularity_ptr = createMergeTreeIndexGranularity( + ctx->new_data_part->rows_count, + ctx->new_data_part->getBytesUncompressedOnDisk(), + *ctx->data->getSettings(), + ctx->new_data_part->index_granularity_info, + /*blocks_are_granules=*/ false); + } ctx->out = std::make_shared( ctx->new_data_part, @@ -1623,12 +1639,12 @@ private: skip_indices, stats_to_rewrite, ctx->compression_codec, + std::move(index_granularity_ptr), ctx->txn ? ctx->txn->tid : Tx::PrehistoricTID, /*reset_columns=*/ true, /*save_marks_in_cache=*/ false, /*blocks_are_granules_size=*/ false, - ctx->context->getWriteSettings(), - computed_granularity); + ctx->context->getWriteSettings()); ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); ctx->mutating_pipeline.setProgressCallback(ctx->progress_callback); @@ -1850,14 +1866,10 @@ private: ctx->new_data_part, ctx->metadata_snapshot, ctx->updated_header.getNamesAndTypesList(), - ctx->compression_codec, std::vector(ctx->indices_to_recalc.begin(), ctx->indices_to_recalc.end()), ColumnsStatistics(ctx->stats_to_recalc.begin(), ctx->stats_to_recalc.end()), - nullptr, - /*save_marks_in_cache=*/ false, - ctx->source_part->index_granularity, - &ctx->source_part->index_granularity_info - ); + ctx->compression_codec, + ctx->source_part->index_granularity); ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); ctx->mutating_pipeline.setProgressCallback(ctx->progress_callback); diff --git a/src/Storages/MergeTree/RangesInDataPart.cpp b/src/Storages/MergeTree/RangesInDataPart.cpp index 50e0781b4e6..8256c20d81d 100644 --- a/src/Storages/MergeTree/RangesInDataPart.cpp +++ b/src/Storages/MergeTree/RangesInDataPart.cpp @@ -99,7 +99,7 @@ size_t RangesInDataPart::getMarksCount() const size_t RangesInDataPart::getRowsCount() const { - return data_part->index_granularity.getRowsCountInRanges(ranges); + return data_part->index_granularity->getRowsCountInRanges(ranges); } diff --git a/src/Storages/StorageMergeTreeIndex.cpp b/src/Storages/StorageMergeTreeIndex.cpp index 1d641add275..35e966b5489 100644 --- a/src/Storages/StorageMergeTreeIndex.cpp +++ b/src/Storages/StorageMergeTreeIndex.cpp @@ -63,7 +63,7 @@ protected: marks_loader = createMarksLoader(part, MergeTreeDataPartCompact::DATA_FILE_NAME, part->getColumns().size()); size_t num_columns = header.columns(); - size_t num_rows = index_granularity.getMarksCount(); + size_t num_rows = index_granularity->getMarksCount(); const auto & part_name_column = StorageMergeTreeIndex::part_name_column; const auto & mark_number_column = StorageMergeTreeIndex::mark_number_column; @@ -115,7 +115,7 @@ protected: data.resize(num_rows); for (size_t i = 0; i < num_rows; ++i) - data[i] = index_granularity.getMarkRows(i); + data[i] = index_granularity->getMarkRows(i); result_columns[pos] = std::move(column); } @@ -159,7 +159,7 @@ private: { size_t col_idx = 0; bool has_marks_in_part = false; - size_t num_rows = part->index_granularity.getMarksCount(); + size_t num_rows = part->index_granularity->getMarksCount(); if (isWidePart(part)) { diff --git a/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity.cpp b/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity.cpp index 0a3b3bd328f..02dcb62deb0 100644 --- a/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity.cpp +++ b/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity.cpp @@ -1,12 +1,15 @@ #include #include #include +#include // I know that inclusion of .cpp is not good at all -#include // NOLINT #include // NOLINT +#include +#include using namespace DB; + static Block getBlockWithSize(size_t required_size_in_bytes, size_t size_of_row_in_bytes) { @@ -25,16 +28,16 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests) auto block1 = getBlockWithSize(80, 8); EXPECT_EQ(block1.bytes(), 80); { /// Granularity bytes are not set. Take default index_granularity. - MergeTreeIndexGranularity index_granularity; - auto granularity = computeIndexGranularityImpl(block1, 0, 100, false, false); + MergeTreeIndexGranularityAdaptive index_granularity; + auto granularity = computeIndexGranularity(block1.rows(), block1.bytes(), 0, 100, false, false); fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows()); EXPECT_EQ(index_granularity.getMarksCount(), 1); EXPECT_EQ(index_granularity.getMarkRows(0), 100); } { /// Granule size is less than block size. Block contains multiple granules. - MergeTreeIndexGranularity index_granularity; - auto granularity = computeIndexGranularityImpl(block1, 16, 100, false, true); + MergeTreeIndexGranularityAdaptive index_granularity; + auto granularity = computeIndexGranularity(block1.rows(), block1.bytes(), 16, 100, false, true); fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows()); EXPECT_EQ(index_granularity.getMarksCount(), 5); /// First granule with 8 rows, and second with 1 row for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) @@ -43,8 +46,8 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests) { /// Granule size is more than block size. Whole block (and maybe more) can be placed in single granule. - MergeTreeIndexGranularity index_granularity; - auto granularity = computeIndexGranularityImpl(block1, 512, 100, false, true); + MergeTreeIndexGranularityAdaptive index_granularity; + auto granularity = computeIndexGranularity(block1.rows(), block1.bytes(), 512, 100, false, true); fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows()); EXPECT_EQ(index_granularity.getMarksCount(), 1); for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) @@ -53,8 +56,8 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests) { /// Blocks with granule size - MergeTreeIndexGranularity index_granularity; - auto granularity = computeIndexGranularityImpl(block1, 1, 100, true, true); + MergeTreeIndexGranularityAdaptive index_granularity; + auto granularity = computeIndexGranularity(block1.rows(), block1.bytes(), 1, 100, true, true); fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows()); EXPECT_EQ(index_granularity.getMarksCount(), 1); for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) @@ -62,8 +65,8 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests) } { /// Shift in index offset - MergeTreeIndexGranularity index_granularity; - auto granularity = computeIndexGranularityImpl(block1, 16, 100, false, true); + MergeTreeIndexGranularityAdaptive index_granularity; + auto granularity = computeIndexGranularity(block1.rows(), block1.bytes(), 16, 100, false, true); fillIndexGranularityImpl(index_granularity, 6, granularity, block1.rows()); EXPECT_EQ(index_granularity.getMarksCount(), 2); for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) @@ -78,10 +81,10 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks) auto block1 = getBlockWithSize(65536, 8); auto block2 = getBlockWithSize(65536, 8); auto block3 = getBlockWithSize(65536, 8); - MergeTreeIndexGranularity index_granularity; + MergeTreeIndexGranularityAdaptive index_granularity; for (const auto & block : {block1, block2, block3}) { - auto granularity = computeIndexGranularityImpl(block, 1024, 8192, false, true); + auto granularity = computeIndexGranularity(block.rows(), block.bytes(), 1024, 8192, false, true); fillIndexGranularityImpl(index_granularity, 0, granularity, block.rows()); } @@ -94,10 +97,10 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks) auto block2 = getBlockWithSize(32768, 32); auto block3 = getBlockWithSize(2048, 32); EXPECT_EQ(block1.rows() + block2.rows() + block3.rows(), 3136); - MergeTreeIndexGranularity index_granularity; + MergeTreeIndexGranularityAdaptive index_granularity; for (const auto & block : {block1, block2, block3}) { - auto granularity = computeIndexGranularityImpl(block, 1024, 8192, false, true); + auto granularity = computeIndexGranularity(block.rows(), block.bytes(), 1024, 8192, false, true); fillIndexGranularityImpl(index_granularity, 0, granularity, block.rows()); } @@ -113,11 +116,11 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks) EXPECT_EQ(block1.rows() + block2.rows() + block3.rows(), (2048 + 4096 + 8192) / 32); - MergeTreeIndexGranularity index_granularity; + MergeTreeIndexGranularityAdaptive index_granularity; size_t index_offset = 0; for (const auto & block : {block1, block2, block3}) { - auto granularity = computeIndexGranularityImpl(block, 16384, 8192, false, true); + auto granularity = computeIndexGranularity(block.rows(), block.bytes(), 16384, 8192, false, true); fillIndexGranularityImpl(index_granularity, index_offset, granularity, block.rows()); index_offset = index_granularity.getLastMarkRows() - block.rows(); } @@ -128,10 +131,10 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks) } -TEST(AdaptiveIndexGranularity, TestIndexGranularityClass) +TEST(AdaptiveIndexGranularity, TestIndexGranularityAdaptive) { { - MergeTreeIndexGranularity index_granularity; + MergeTreeIndexGranularityAdaptive index_granularity; size_t sum_rows = 0; size_t sum_marks = 0; for (size_t i = 10; i <= 100; i+=10) @@ -148,11 +151,70 @@ TEST(AdaptiveIndexGranularity, TestIndexGranularityClass) EXPECT_EQ(index_granularity.getMarkStartingRow(2), 30); EXPECT_EQ(index_granularity.getMarkStartingRow(3), 60); - EXPECT_EQ(index_granularity.getRowsCountInRange({0, 10}), sum_rows); - EXPECT_EQ(index_granularity.getRowsCountInRange({0, 1}), 10); - EXPECT_EQ(index_granularity.getRowsCountInRange({2, 5}), 30 + 40 + 50); - + EXPECT_EQ(index_granularity.getRowsCountInRange(0, 10), sum_rows); + EXPECT_EQ(index_granularity.getRowsCountInRange(0, 1), 10); + EXPECT_EQ(index_granularity.getRowsCountInRange(2, 5), 30 + 40 + 50); EXPECT_EQ(index_granularity.getRowsCountInRanges({{2, 5}, {0, 1}, {0, 10}}), 10 + 30 + 40 + 50 + sum_rows); } } + +TEST(AdaptiveIndexGranularity, TestIndexGranularityConstant) +{ + auto test = [](MergeTreeIndexGranularity & index_granularity, size_t granularity_rows) + { + size_t sum_marks = 10; + size_t sum_rows = granularity_rows * sum_marks; + + for (size_t i = 0; i < 10; ++i) + index_granularity.appendMark(granularity_rows); + + size_t new_granularity_rows = granularity_rows / 2; + index_granularity.adjustLastMark(new_granularity_rows); + sum_rows -= (granularity_rows - new_granularity_rows); + + index_granularity.appendMark(0); + ++sum_marks; + + EXPECT_EQ(index_granularity.getMarksCount(), sum_marks); + EXPECT_EQ(index_granularity.getMarksCountWithoutFinal(), sum_marks - 1); + EXPECT_EQ(index_granularity.hasFinalMark(), true); + EXPECT_EQ(index_granularity.getTotalRows(), sum_rows); + EXPECT_EQ(index_granularity.getTotalRows(), sum_rows); + EXPECT_EQ(index_granularity.getLastMarkRows(), 0); + EXPECT_EQ(index_granularity.getLastNonFinalMarkRows(), granularity_rows / 2); + + EXPECT_EQ(index_granularity.getMarkStartingRow(0), 0); + EXPECT_EQ(index_granularity.getMarkStartingRow(3), 30); + EXPECT_EQ(index_granularity.getMarkStartingRow(9), 90); + EXPECT_EQ(index_granularity.getMarkStartingRow(10), sum_rows); + EXPECT_EQ(index_granularity.getMarkStartingRow(11), sum_rows); + + EXPECT_EQ(index_granularity.getRowsCountInRange(0, 10), sum_rows); + EXPECT_EQ(index_granularity.getRowsCountInRange(0, 11), sum_rows); + EXPECT_EQ(index_granularity.getRowsCountInRange(0, 1), 10); + EXPECT_EQ(index_granularity.getRowsCountInRange(2, 5), 30); + EXPECT_EQ(index_granularity.getRowsCountInRange(3, 9), 60); + EXPECT_EQ(index_granularity.getRowsCountInRange(5, 10), 45); + EXPECT_EQ(index_granularity.getRowsCountInRange(5, 11), 45); + + EXPECT_EQ(index_granularity.countMarksForRows(0, 35), 3); + EXPECT_EQ(index_granularity.countMarksForRows(5, 29), 2); + EXPECT_EQ(index_granularity.countMarksForRows(0, 89), 8); + EXPECT_EQ(index_granularity.countMarksForRows(0, 90), 9); + EXPECT_EQ(index_granularity.countMarksForRows(0, 92), 9); + EXPECT_EQ(index_granularity.countMarksForRows(0, 95), sum_marks); + EXPECT_EQ(index_granularity.countMarksForRows(0, 99), sum_marks); + }; + + const size_t granularity_rows = 10; + + { + MergeTreeIndexGranularityConstant index_granularity(granularity_rows); + test(index_granularity, granularity_rows); + } + { + MergeTreeIndexGranularityAdaptive index_granularity; + test(index_granularity, granularity_rows); + } +} diff --git a/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity_compact_parts.cpp b/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity_compact_parts.cpp index 09b24c7dad6..ad0add709a6 100644 --- a/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity_compact_parts.cpp +++ b/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity_compact_parts.cpp @@ -4,6 +4,7 @@ // I know that inclusion of .cpp is not good at all #include // NOLINT +#include using namespace DB; @@ -13,7 +14,7 @@ TEST(IndexGranularityCompactParts, FillGranularitySequenceOfBlocks) size_t rows = 8; size_t granularity = 32; - MergeTreeIndexGranularity index_granularity; + MergeTreeIndexGranularityAdaptive index_granularity; size_t index_offset = 0; size_t rows_written = 0; for (size_t i = 0; i < 3; ++i) @@ -34,7 +35,7 @@ TEST(IndexGranularityCompactParts, FillGranularitySequenceOfBlocks) size_t rows2 = 8; size_t granularity = 32; - MergeTreeIndexGranularity index_granularity; + MergeTreeIndexGranularityAdaptive index_granularity; size_t index_offset = 0; fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows1); @@ -51,7 +52,7 @@ TEST(IndexGranularityCompactParts, FillGranularitySequenceOfBlocks) size_t rows2 = 25; size_t granularity = 32; - MergeTreeIndexGranularity index_granularity; + MergeTreeIndexGranularityAdaptive index_granularity; size_t index_offset = 0; fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows1); @@ -68,7 +69,7 @@ TEST(IndexGranularityCompactParts, FillGranularitySequenceOfBlocks) size_t rows = 40; size_t granularity = 32; - MergeTreeIndexGranularity index_granularity; + MergeTreeIndexGranularityAdaptive index_granularity; size_t index_offset = 0; for (size_t i = 0; i < 3; ++i) diff --git a/tests/clickhouse-test b/tests/clickhouse-test index 9c035b7cc35..df323daac16 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -981,6 +981,8 @@ class MergeTreeSettingsRandomizer: "cache_populated_by_fetch": lambda: random.randint(0, 1), "concurrent_part_removal_threshold": threshold_generator(0.2, 0.3, 0, 100), "old_parts_lifetime": threshold_generator(0.2, 0.3, 10, 8 * 60), + "use_const_adaptive_granularity": lambda: random.randint(0, 1), + "enable_index_granularity_compression": lambda: random.randint(0, 1), } @staticmethod diff --git a/tests/queries/0_stateless/03262_const_adaptive_index_granularity.reference b/tests/queries/0_stateless/03262_const_adaptive_index_granularity.reference new file mode 100644 index 00000000000..92255c7f154 --- /dev/null +++ b/tests/queries/0_stateless/03262_const_adaptive_index_granularity.reference @@ -0,0 +1,54 @@ +adaptive non-const, before merge +all_1_1_0 0 10 0 +all_1_1_0 1 5 10 +all_1_1_0 2 0 14 +all_2_2_0 0 2 15 +all_2_2_0 1 2 17 +all_2_2_0 2 2 19 +all_2_2_0 3 2 21 +all_2_2_0 4 2 23 +all_2_2_0 5 2 25 +all_2_2_0 6 2 27 +all_2_2_0 7 1 29 +all_2_2_0 8 0 29 +all_1_1_0 25 +all_2_2_0 25 +adaptive non-const, after merge +all_1_2_1 0 10 0 +all_1_2_1 1 5 10 +all_1_2_1 2 2 15 +all_1_2_1 3 2 17 +all_1_2_1 4 2 19 +all_1_2_1 5 2 21 +all_1_2_1 6 2 23 +all_1_2_1 7 2 25 +all_1_2_1 8 2 27 +all_1_2_1 9 1 29 +all_1_2_1 10 0 29 +all_1_2_1 88 +adaptive const, before merge +all_1_1_0 0 10 0 +all_1_1_0 1 5 10 +all_1_1_0 2 0 14 +all_2_2_0 0 2 15 +all_2_2_0 1 2 17 +all_2_2_0 2 2 19 +all_2_2_0 3 2 21 +all_2_2_0 4 2 23 +all_2_2_0 5 2 25 +all_2_2_0 6 2 27 +all_2_2_0 7 1 29 +all_2_2_0 8 0 29 +all_1_1_0 25 +all_2_2_0 25 +adaptive const, after merge +all_1_2_1 0 4 0 +all_1_2_1 1 4 4 +all_1_2_1 2 4 8 +all_1_2_1 3 4 12 +all_1_2_1 4 4 16 +all_1_2_1 5 4 20 +all_1_2_1 6 4 24 +all_1_2_1 7 2 28 +all_1_2_1 8 0 29 +all_1_2_1 25 diff --git a/tests/queries/0_stateless/03262_const_adaptive_index_granularity.sql b/tests/queries/0_stateless/03262_const_adaptive_index_granularity.sql new file mode 100644 index 00000000000..7445f66dc1a --- /dev/null +++ b/tests/queries/0_stateless/03262_const_adaptive_index_granularity.sql @@ -0,0 +1,53 @@ +DROP TABLE IF EXISTS t_index_granularity; + +CREATE TABLE t_index_granularity (id UInt64, s String) +ENGINE = MergeTree ORDER BY id +SETTINGS min_bytes_for_wide_part = 0, + index_granularity = 10, + index_granularity_bytes = 4096, + merge_max_block_size = 10, + merge_max_block_size_bytes = 4096, + enable_index_granularity_compression = 1, + use_const_adaptive_granularity = 0, + enable_vertical_merge_algorithm = 0; + +INSERT INTO t_index_granularity SELECT number, 'a' FROM numbers(15); +INSERT INTO t_index_granularity SELECT number, repeat('a', 2048) FROM numbers(15, 15); + +SELECT 'adaptive non-const, before merge'; +SELECT * FROM mergeTreeIndex(currentDatabase(), t_index_granularity) ORDER BY ALL; +SELECT name, index_granularity_bytes_in_memory FROM system.parts WHERE database = currentDatabase() AND table = 't_index_granularity' AND active; + +OPTIMIZE TABLE t_index_granularity FINAL; + +SELECT 'adaptive non-const, after merge'; +SELECT * FROM mergeTreeIndex(currentDatabase(), t_index_granularity) ORDER BY ALL; +SELECT name, index_granularity_bytes_in_memory FROM system.parts WHERE database = currentDatabase() AND table = 't_index_granularity' AND active; + +DROP TABLE t_index_granularity; + +CREATE TABLE t_index_granularity (id UInt64, s String) +ENGINE = MergeTree ORDER BY id +SETTINGS min_bytes_for_wide_part = 0, + index_granularity = 10, + index_granularity_bytes = 4096, + merge_max_block_size = 10, + merge_max_block_size_bytes = 4096, + enable_index_granularity_compression = 1, + use_const_adaptive_granularity = 1, + enable_vertical_merge_algorithm = 0; + +INSERT INTO t_index_granularity SELECT number, 'a' FROM numbers(15); +INSERT INTO t_index_granularity SELECT number, repeat('a', 2048) FROM numbers(15, 15); + +SELECT 'adaptive const, before merge'; +SELECT * FROM mergeTreeIndex(currentDatabase(), t_index_granularity) ORDER BY ALL; +SELECT name, index_granularity_bytes_in_memory FROM system.parts WHERE database = currentDatabase() AND table = 't_index_granularity' AND active; + +OPTIMIZE TABLE t_index_granularity FINAL; + +SELECT 'adaptive const, after merge'; +SELECT * FROM mergeTreeIndex(currentDatabase(), t_index_granularity) ORDER BY ALL; +SELECT name, index_granularity_bytes_in_memory FROM system.parts WHERE database = currentDatabase() AND table = 't_index_granularity' AND active; + +DROP TABLE t_index_granularity; diff --git a/tests/queries/0_stateless/03268_system_parts_index_granularity.reference b/tests/queries/0_stateless/03268_system_parts_index_granularity.reference index f301cd54ad2..f66f10ced96 100644 --- a/tests/queries/0_stateless/03268_system_parts_index_granularity.reference +++ b/tests/queries/0_stateless/03268_system_parts_index_granularity.reference @@ -1 +1,2 @@ -88 88 +88 128 +25 25 diff --git a/tests/queries/0_stateless/03268_system_parts_index_granularity.sql b/tests/queries/0_stateless/03268_system_parts_index_granularity.sql index 1bab7840856..3df9f6be028 100644 --- a/tests/queries/0_stateless/03268_system_parts_index_granularity.sql +++ b/tests/queries/0_stateless/03268_system_parts_index_granularity.sql @@ -8,8 +8,14 @@ CREATE TABLE t ( ENGINE MergeTree() ORDER by key SETTINGS index_granularity = 10, index_granularity_bytes = '1024K'; +ALTER TABLE t MODIFY SETTING enable_index_granularity_compression = 0; + INSERT INTO t SELECT number, toString(number) FROM numbers(100); -SELECT index_granularity_bytes_in_memory, index_granularity_bytes_in_memory_allocated FROM system.parts where table = 't' and database = currentDatabase(); +ALTER TABLE t MODIFY SETTING enable_index_granularity_compression = 1; + +INSERT INTO t SELECT number, toString(number) FROM numbers(100); + +SELECT index_granularity_bytes_in_memory, index_granularity_bytes_in_memory_allocated FROM system.parts where table = 't' and database = currentDatabase() ORDER BY name; DROP TABLE IF EXISTS t;