From 204d0ac9556e91718dea571c614c344468d607a3 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 19 Mar 2020 17:11:37 +0300 Subject: [PATCH] Fix bugs after method split --- .../MergeTree/MergeTreeDataMergerMutator.cpp | 49 +++++++------------ .../MergeTree/MergeTreeDataMergerMutator.h | 4 ++ dbms/src/Storages/StorageMergeTree.cpp | 2 - .../00991_system_parts_race_condition.sh | 2 +- 4 files changed, 22 insertions(+), 35 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 44e2c6048a6..f46162d352f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -946,15 +946,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor const ReservationPtr & space_reservation, TableStructureReadLockHolder & table_lock_holder) { - auto check_not_cancelled = [&]() - { - if (merges_blocker.isCancelled() || merge_entry->is_cancelled) - throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED); - - return true; - }; - - check_not_cancelled(); + checkOperationIsNotCanceled(merge_entry); if (future_part.parts.size() != 1) throw Exception("Trying to mutate " + toString(future_part.parts.size()) + " parts, not one. " @@ -1000,8 +992,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor NamesAndTypesList all_columns = data.getColumns().getAllPhysical(); - LOG_DEBUG(log, "All columns:" << all_columns.toString()); - if (!for_interpreter.empty()) { interpreter.emplace(storage_from_source_part, for_interpreter, context_for_reading, true); @@ -1091,6 +1081,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor mutateSomePartColumns( source_part, indices_to_recalc, + updated_header, new_data_part, in, time_of_mutation, @@ -1408,8 +1399,7 @@ std::set MergeTreeDataMergerMutator::getIndicesToRecalc( if (!indices_to_recalc.empty() && input_stream) { - auto indices_recalc_syntax = - SyntaxAnalyzer(context).analyze(indices_recalc_expr_list, updated_columns); + auto indices_recalc_syntax = SyntaxAnalyzer(context).analyze(indices_recalc_expr_list, input_stream->getHeader().getNamesAndTypesList()); auto indices_recalc_expr = ExpressionAnalyzer( indices_recalc_expr_list, indices_recalc_syntax, context).getActions(false); @@ -1453,14 +1443,6 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns( if (mutating_stream == nullptr) throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR); - auto check_not_cancelled = [&]() - { - if (merges_blocker.isCancelled() || merge_entry->is_cancelled) - throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED); - - return true; - }; - if (data.hasPrimaryKey() || data.hasSkipIndices()) mutating_stream = std::make_shared( std::make_shared(mutating_stream, data.primary_key_and_skip_indices_expr)); @@ -1479,7 +1461,7 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns( out.writePrefix(); Block block; - while (check_not_cancelled() && (block = mutating_stream->read())) + while (checkOperationIsNotCanceled(merge_entry) && (block = mutating_stream->read())) { minmax_idx.update(block, data.minmax_idx_columns); out.write(block); @@ -1498,6 +1480,7 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns( void MergeTreeDataMergerMutator::mutateSomePartColumns( const MergeTreeDataPartPtr & source_part, const std::set & indices_to_recalc, + const Block & mutation_header, MergeTreeData::MutableDataPartPtr new_data_part, BlockInputStreamPtr mutating_stream, time_t time_of_mutation, @@ -1505,24 +1488,17 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns( MergeListEntry & merge_entry, bool need_remove_expired_values) const { - auto check_not_cancelled = [&]() - { - if (merges_blocker.isCancelled() || merge_entry->is_cancelled) - throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED); - - return true; - }; - if (mutating_stream == nullptr) throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR); + if (need_remove_expired_values) mutating_stream = std::make_shared(mutating_stream, data, new_data_part, time_of_mutation, true); IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets; MergedColumnOnlyOutputStream out( new_data_part, - mutating_stream->getHeader(), + mutation_header, /* sync = */ false, compression_codec, /* skip_offsets = */ false, @@ -1536,7 +1512,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns( out.writePrefix(); Block block; - while (check_not_cancelled() && (block = mutating_stream->read())) + while (checkOperationIsNotCanceled(merge_entry) && (block = mutating_stream->read())) { out.write(block); @@ -1590,4 +1566,13 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart( } + +bool MergeTreeDataMergerMutator::checkOperationIsNotCanceled(const MergeListEntry & merge_entry) const +{ + if (merges_blocker.isCancelled() || merge_entry->is_cancelled) + throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED); + + return true; +} + } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 4aad9ea4f51..48aaab7c129 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -183,6 +183,7 @@ private: void mutateSomePartColumns( const MergeTreeDataPartPtr & source_part, const std::set & indices_to_recalc, + const Block & mutation_header, MergeTreeData::MutableDataPartPtr new_data_part, BlockInputStreamPtr mutating_stream, time_t time_of_mutation, @@ -216,6 +217,9 @@ private: const MergeTreeData::DataPartsVector & parts, size_t rows_upper_bound, const NamesAndTypesList & gathering_columns, bool deduplicate, bool need_remove_expired_values) const; + bool checkOperationIsNotCanceled(const MergeListEntry & merge_entry) const; + + private: MergeTreeData & data; const size_t background_pool_size; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index aefaaafd9aa..3213e8db93b 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -667,12 +667,10 @@ bool StorageMergeTree::tryMutatePart() if (current_mutations_by_version.empty()) return false; - LOG_DEBUG(log, "Looking at parts"); auto mutations_end_it = current_mutations_by_version.end(); for (const auto & part : getDataPartsVector()) { - LOG_DEBUG(log, "Iterating parts"); if (currently_merging_mutating_parts.count(part)) continue; diff --git a/dbms/tests/queries/0_stateless/00991_system_parts_race_condition.sh b/dbms/tests/queries/0_stateless/00991_system_parts_race_condition.sh index f8cd9fd7b36..4205f8be8c6 100755 --- a/dbms/tests/queries/0_stateless/00991_system_parts_race_condition.sh +++ b/dbms/tests/queries/0_stateless/00991_system_parts_race_condition.sh @@ -19,7 +19,7 @@ function thread1() function thread2() { - while true; do $CLICKHOUSE_CLIENT -n --query "ALTER TABLE alter_table ADD COLUMN h String; ALTER TABLE alter_table MODIFY COLUMN h UInt64; ALTER TABLE alter_table DROP COLUMN h;"; done + while true; do $CLICKHOUSE_CLIENT -n --query "ALTER TABLE alter_table ADD COLUMN h String '0'; ALTER TABLE alter_table MODIFY COLUMN h UInt64; ALTER TABLE alter_table DROP COLUMN h;"; done } function thread3()