From 6d29ed99d9b0270dc6e14438f914bd5f04fef001 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 19 Aug 2019 13:37:04 +0300 Subject: [PATCH] Fix bug with enable_mixed_granularity_parts and mutations --- .../MergeTree/IMergedBlockOutputStream.cpp | 15 ++++++++------- .../Storages/MergeTree/IMergedBlockOutputStream.h | 5 ++++- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 3 ++- .../MergeTree/MergeTreeDataMergerMutator.cpp | 4 +++- .../MergeTree/MergedColumnOnlyOutputStream.cpp | 6 ++++-- .../MergeTree/MergedColumnOnlyOutputStream.h | 3 ++- .../integration/test_adaptive_granularity/test.py | 11 +++++++++++ 7 files changed, 34 insertions(+), 13 deletions(-) diff --git a/dbms/src/Storages/MergeTree/IMergedBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/IMergedBlockOutputStream.cpp index 4109a5511af..407fcb18ad5 100644 --- a/dbms/src/Storages/MergeTree/IMergedBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/IMergedBlockOutputStream.cpp @@ -25,25 +25,26 @@ IMergedBlockOutputStream::IMergedBlockOutputStream( size_t aio_threshold_, bool blocks_are_granules_size_, const std::vector & indices_to_recalc, - const MergeTreeIndexGranularity & index_granularity_) + const MergeTreeIndexGranularity & index_granularity_, + const MergeTreeIndexGranularityInfo * index_granularity_info_) : storage(storage_) , part_path(part_path_) , min_compress_block_size(min_compress_block_size_) , max_compress_block_size(max_compress_block_size_) , aio_threshold(aio_threshold_) - , marks_file_extension(storage.canUseAdaptiveGranularity() ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension()) + , can_use_adaptive_granularity(index_granularity_info_ ? index_granularity_info_->is_adaptive : storage.canUseAdaptiveGranularity()) + , marks_file_extension(can_use_adaptive_granularity ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension()) , blocks_are_granules_size(blocks_are_granules_size_) , index_granularity(index_granularity_) , compute_granularity(index_granularity.empty()) , codec(std::move(codec_)) , skip_indices(indices_to_recalc) - , with_final_mark(storage.settings.write_final_mark && storage.canUseAdaptiveGranularity()) + , with_final_mark(storage.settings.write_final_mark && can_use_adaptive_granularity) { if (blocks_are_granules_size && !index_granularity.empty()) throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR); } - void IMergedBlockOutputStream::addStreams( const String & path, const String & name, @@ -145,7 +146,7 @@ void IMergedBlockOutputStream::fillIndexGranularity(const Block & block) blocks_are_granules_size, index_offset, index_granularity, - storage.canUseAdaptiveGranularity()); + can_use_adaptive_granularity); } void IMergedBlockOutputStream::writeSingleMark( @@ -176,7 +177,7 @@ void IMergedBlockOutputStream::writeSingleMark( writeIntBinary(stream.plain_hashing.count(), stream.marks); writeIntBinary(stream.compressed.offset(), stream.marks); - if (storage.canUseAdaptiveGranularity()) + if (can_use_adaptive_granularity) writeIntBinary(number_of_rows, stream.marks); }, path); } @@ -362,7 +363,7 @@ void IMergedBlockOutputStream::calculateAndSerializeSkipIndices( writeIntBinary(stream.compressed.offset(), stream.marks); /// Actually this numbers is redundant, but we have to store them /// to be compatible with normal .mrk2 file format - if (storage.canUseAdaptiveGranularity()) + if (can_use_adaptive_granularity) writeIntBinary(1UL, stream.marks); ++skip_index_current_mark; diff --git a/dbms/src/Storages/MergeTree/IMergedBlockOutputStream.h b/dbms/src/Storages/MergeTree/IMergedBlockOutputStream.h index cbf78c1a2ea..97c7922042d 100644 --- a/dbms/src/Storages/MergeTree/IMergedBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/IMergedBlockOutputStream.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -23,7 +24,8 @@ public: size_t aio_threshold_, bool blocks_are_granules_size_, const std::vector & indices_to_recalc, - const MergeTreeIndexGranularity & index_granularity_); + const MergeTreeIndexGranularity & index_granularity_, + const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr); using WrittenOffsetColumns = std::set; @@ -141,6 +143,7 @@ protected: size_t current_mark = 0; size_t skip_index_mark = 0; + const bool can_use_adaptive_granularity; const std::string marks_file_extension; const bool blocks_are_granules_size; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 3b57c27d3e9..d8871b9e1a8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1581,7 +1581,8 @@ void MergeTreeData::alterDataPart( true /* skip_offsets */, {}, unused_written_offsets, - part->index_granularity); + part->index_granularity, + &part->index_granularity_info); in.readPrefix(); out.writePrefix(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 19d775890d8..74193fa7156 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -934,6 +934,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor new_data_part->relative_path = "tmp_mut_" + future_part.name; new_data_part->is_temp = true; new_data_part->ttl_infos = source_part->ttl_infos; + new_data_part->index_granularity_info = source_part->index_granularity_info; String new_part_tmp_path = new_data_part->getFullPath(); @@ -1069,7 +1070,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor /* skip_offsets = */ false, std::vector(indices_to_recalc.begin(), indices_to_recalc.end()), unused_written_offsets, - source_part->index_granularity + source_part->index_granularity, + &source_part->index_granularity_info ); in->readPrefix(); diff --git a/dbms/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp b/dbms/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp index e79ec7dd046..3c15bd54df2 100644 --- a/dbms/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp @@ -8,14 +8,16 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( CompressionCodecPtr default_codec_, bool skip_offsets_, const std::vector & indices_to_recalc_, WrittenOffsetColumns & already_written_offset_columns_, - const MergeTreeIndexGranularity & index_granularity_) + const MergeTreeIndexGranularity & index_granularity_, + const MergeTreeIndexGranularityInfo * index_granularity_info_) : IMergedBlockOutputStream( storage_, part_path_, storage_.global_context.getSettings().min_compress_block_size, storage_.global_context.getSettings().max_compress_block_size, default_codec_, storage_.global_context.getSettings().min_bytes_to_use_direct_io, false, indices_to_recalc_, - index_granularity_), + index_granularity_, + index_granularity_info_), header(header_), sync(sync_), skip_offsets(skip_offsets_), already_written_offset_columns(already_written_offset_columns_) { diff --git a/dbms/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h b/dbms/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h index b8d637f37fb..8970bf19565 100644 --- a/dbms/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h +++ b/dbms/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h @@ -17,7 +17,8 @@ public: CompressionCodecPtr default_codec_, bool skip_offsets_, const std::vector & indices_to_recalc_, WrittenOffsetColumns & already_written_offset_columns_, - const MergeTreeIndexGranularity & index_granularity_); + const MergeTreeIndexGranularity & index_granularity_, + const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr); Block getHeader() const override { return header; } void write(const Block & block) override; diff --git a/dbms/tests/integration/test_adaptive_granularity/test.py b/dbms/tests/integration/test_adaptive_granularity/test.py index db653427f02..50b43fc08ec 100644 --- a/dbms/tests/integration/test_adaptive_granularity/test.py +++ b/dbms/tests/integration/test_adaptive_granularity/test.py @@ -288,6 +288,17 @@ def test_mixed_granularity_single_node(start_dynamic_cluster, node): node.exec_in_container(["bash", "-c", "find {p} -name '*.mrk' | grep '.*'".format(p=path_to_old_part)]) # check that we have non adaptive files + node.query("ALTER TABLE table_with_default_granularity UPDATE dummy = dummy + 1 WHERE 1") + # still works + assert node.query("SELECT count() from table_with_default_granularity") == '6\n' + + node.query("ALTER TABLE table_with_default_granularity MODIFY COLUMN dummy String") + node.query("ALTER TABLE table_with_default_granularity ADD COLUMN dummy2 Float64") + + #still works + assert node.query("SELECT count() from table_with_default_granularity") == '6\n' + + def test_version_update_two_nodes(start_dynamic_cluster): node11.query("INSERT INTO table_with_default_granularity VALUES (toDate('2018-10-01'), 1, 333), (toDate('2018-10-02'), 2, 444)") node12.query("SYSTEM SYNC REPLICA table_with_default_granularity")