diff --git a/src/Interpreters/TransactionVersionMetadata.cpp b/src/Interpreters/TransactionVersionMetadata.cpp index f7d2c885f55..cbcff6057a5 100644 --- a/src/Interpreters/TransactionVersionMetadata.cpp +++ b/src/Interpreters/TransactionVersionMetadata.cpp @@ -44,6 +44,13 @@ void VersionMetadata::lockMaxTID(const TransactionID & tid, const String & error bool locked = maxtid_lock.compare_exchange_strong(expected_max_lock_value, max_lock_value); if (!locked) { + if (tid == Tx::PrehistoricTID && expected_max_lock_value == Tx::PrehistoricTID.getHash()) + { + /// Don't need to lock part for queries without transaction + //FIXME Transactions: why is it possible? + return; + } + throw Exception(ErrorCodes::SERIALIZATION_ERROR, "Serialization error: " "Transaction {} tried to remove data part, " "but it's locked ({}) by another transaction {} which is currently removing this part. {}", diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index dde6ff7dd72..b558e58997d 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -172,10 +172,15 @@ static void logQuery(const String & query, ContextPtr context, bool internal) if (!comment.empty()) comment = fmt::format(" (comment: {})", comment); - LOG_DEBUG(&Poco::Logger::get("executeQuery"), "(from {}{}{}){} {}", + String transaction_info; + if (auto txn = context->getCurrentTransaction()) + transaction_info = fmt::format(" (TID: {}, TIDH: {})", txn->tid, txn->tid.getHash()); + + LOG_DEBUG(&Poco::Logger::get("executeQuery"), "(from {}{}{}){}{} {}", client_info.current_address.toString(), (current_user != "default" ? ", user: " + current_user : ""), (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()), + transaction_info, comment, joinLines(query)); diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index fa5f2c28b06..3bdd535b585 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -538,11 +538,6 @@ public: /// Similar to above but checks for DETACH. It's only used for DICTIONARIES. virtual void checkTableCanBeDetached() const {} - /// Checks that Partition could be dropped right now - /// Otherwise - throws an exception with detailed information. - /// We do not use mutex because it is not very important that the size could change during the operation. - virtual void checkPartitionCanBeDropped(const ASTPtr & /*partition*/) {} - /// Returns true if Storage may store some data on disk. /// NOTE: may not be equivalent to !getDataPaths().empty() virtual bool storesDataOnDisk() const { return false; } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 0a318276059..d6b25a5a6ef 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -226,6 +226,7 @@ public: */ enum class State { + ///TODO Transactions: rename Committed to Active, because it becomes confusing Temporary, /// the part is generating now, it is not in data_parts list PreCommitted, /// the part is in data_parts, but not used for SELECTs Committed, /// active data part, used by current and upcoming SELECTs diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index cfb5a32ae9e..d14a4d9260f 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3142,9 +3142,23 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & return getActiveContainingPart(part_info); } -MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(MergeTreeData::DataPartState state, const String & partition_id) const +MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id) const { - DataPartStateAndPartitionID state_with_partition{state, partition_id}; + if (const auto * txn = local_context->getCurrentTransaction().get()) + { + DataPartStateAndPartitionID active_parts{MergeTreeDataPartState::Committed, partition_id}; + DataPartStateAndPartitionID outdated_parts{MergeTreeDataPartState::Outdated, partition_id}; + DataPartsVector res; + { + auto lock = lockParts(); + res.insert(res.end(), data_parts_by_state_and_info.lower_bound(active_parts), data_parts_by_state_and_info.upper_bound(active_parts)); + res.insert(res.end(), data_parts_by_state_and_info.lower_bound(outdated_parts), data_parts_by_state_and_info.upper_bound(outdated_parts)); + } + filterVisibleDataParts(res, txn->getSnapshot(), txn->tid); + return res; + } + + DataPartStateAndPartitionID state_with_partition{MergeTreeDataPartState::Committed, partition_id}; auto lock = lockParts(); return DataPartsVector( @@ -3152,19 +3166,37 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(Merg data_parts_by_state_and_info.upper_bound(state_with_partition)); } -MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartitions(MergeTreeData::DataPartState state, const std::unordered_set & partition_ids) const +MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartitions(ContextPtr local_context, const std::unordered_set & partition_ids) const { - auto lock = lockParts(); + auto txn = local_context->getCurrentTransaction(); DataPartsVector res; - for (const auto & partition_id : partition_ids) { - DataPartStateAndPartitionID state_with_partition{state, partition_id}; - insertAtEnd( - res, - DataPartsVector( - data_parts_by_state_and_info.lower_bound(state_with_partition), - data_parts_by_state_and_info.upper_bound(state_with_partition))); + auto lock = lockParts(); + for (const auto & partition_id : partition_ids) + { + DataPartStateAndPartitionID active_parts{MergeTreeDataPartState::Committed, partition_id}; + insertAtEnd( + res, + DataPartsVector( + data_parts_by_state_and_info.lower_bound(active_parts), + data_parts_by_state_and_info.upper_bound(active_parts))); + + if (txn) + { + DataPartStateAndPartitionID outdated_parts{MergeTreeDataPartState::Committed, partition_id}; + + insertAtEnd( + res, + DataPartsVector( + data_parts_by_state_and_info.lower_bound(outdated_parts), + data_parts_by_state_and_info.upper_bound(outdated_parts))); + } + } } + + if (txn) + filterVisibleDataParts(res, txn->getSnapshot(), txn->tid); + return res; } @@ -3295,10 +3327,10 @@ void MergeTreeData::checkAlterPartitionIsPossible( } } -void MergeTreeData::checkPartitionCanBeDropped(const ASTPtr & partition) +void MergeTreeData::checkPartitionCanBeDropped(const ASTPtr & partition, ContextPtr local_context) { - const String partition_id = getPartitionIDFromQuery(partition, getContext()); - auto parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); + const String partition_id = getPartitionIDFromQuery(partition, local_context); + auto parts_to_remove = getVisibleDataPartsVectorInPartition(local_context, partition_id); UInt64 partition_size = 0; @@ -3337,7 +3369,7 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String & throw Exception("Part " + partition_id + " is not exists or not active", ErrorCodes::NO_SUCH_DATA_PART); } else - parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); + parts = getVisibleDataPartsVectorInPartition(local_context, partition_id); auto disk = getStoragePolicy()->getDiskByName(name); if (!disk) @@ -3382,7 +3414,7 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String throw Exception("Part " + partition_id + " is not exists or not active", ErrorCodes::NO_SUCH_DATA_PART); } else - parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); + parts = getVisibleDataPartsVectorInPartition(local_context, partition_id); auto volume = getStoragePolicy()->getVolumeByName(name); if (!volume) @@ -3454,7 +3486,7 @@ Pipe MergeTreeData::alterPartition( } else { - checkPartitionCanBeDropped(command.partition); + checkPartitionCanBeDropped(command.partition, query_context); dropPartition(command.partition, command.detach, query_context); } } @@ -3503,7 +3535,7 @@ Pipe MergeTreeData::alterPartition( case PartitionCommand::REPLACE_PARTITION: { if (command.replace) - checkPartitionCanBeDropped(command.partition); + checkPartitionCanBeDropped(command.partition, query_context); String from_database = query_context->resolveDatabase(command.from_database); auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, query_context); replacePartitionFrom(from_storage, command.partition, command.replace, query_context); @@ -3564,7 +3596,7 @@ BackupEntries MergeTreeData::backup(const ASTs & partitions, ContextPtr local_co if (partitions.empty()) data_parts = getDataPartsVector(); else - data_parts = getDataPartsVectorInPartitions(MergeTreeDataPartState::Committed, getPartitionIDsFromQuery(partitions, local_context)); + data_parts = getVisibleDataPartsVectorInPartitions(local_context, getPartitionIDsFromQuery(partitions, local_context)); return backupDataParts(data_parts); } @@ -3771,26 +3803,54 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc } -DataPartsVector MergeTreeData::getDataPartsVector(ContextPtr local_context) const +DataPartsVector MergeTreeData::getVisibleDataPartsVector(ContextPtr local_context) const { - return getVisibleDataPartsVector(local_context->getCurrentTransaction()); + DataPartsVector res; + if (const auto * txn = local_context->getCurrentTransaction().get()) + { + res = getDataPartsVector({DataPartState::Committed, DataPartState::Outdated}); + filterVisibleDataParts(res, txn->getSnapshot(), txn->tid); + } + else + { + res = getDataPartsVector(); + } + return res; } MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(const MergeTreeTransactionPtr & txn) const { - if (!txn) - return getDataPartsVector(); + DataPartsVector res; + if (txn) + { + res = getDataPartsVector({DataPartState::Committed, DataPartState::Outdated}); + filterVisibleDataParts(res, txn->getSnapshot(), txn->tid); + } + else + { + res = getDataPartsVector(); + } + return res; +} - DataPartsVector maybe_visible_parts = getDataPartsVector({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated}); +MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(Snapshot snapshot_version, TransactionID current_tid) const +{ + auto res = getDataPartsVector({DataPartState::Committed, DataPartState::Outdated}); + filterVisibleDataParts(res, snapshot_version, current_tid); + return res; +} + +void MergeTreeData::filterVisibleDataParts(DataPartsVector & maybe_visible_parts, Snapshot snapshot_version, TransactionID current_tid) const +{ if (maybe_visible_parts.empty()) - return maybe_visible_parts; + return; auto it = maybe_visible_parts.begin(); auto it_last = maybe_visible_parts.end() - 1; String visible_parts_str; while (it <= it_last) { - if ((*it)->versions.isVisible(*txn)) + if ((*it)->versions.isVisible(snapshot_version, current_tid)) { visible_parts_str += (*it)->name; visible_parts_str += " "; @@ -3804,9 +3864,8 @@ MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(const Me } size_t new_size = it_last - maybe_visible_parts.begin() + 1; - LOG_TRACE(log, "Got {} parts visible for {}: {}", new_size, txn->tid, visible_parts_str); + LOG_TEST(log, "Got {} parts visible in snapshot {} (TID {}): {}", new_size, snapshot_version, current_tid, visible_parts_str); maybe_visible_parts.resize(new_size); - return maybe_visible_parts; } @@ -4238,7 +4297,7 @@ MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affo return res; } -MergeTreeData::DataParts MergeTreeData::getDataParts() const +MergeTreeData::DataParts MergeTreeData::getDataPartsForInternalUsage() const { return getDataParts({DataPartState::Committed}); } @@ -4879,7 +4938,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( max_added_blocks = std::make_shared(replicated->getMaxAddedBlocks()); } - auto parts = getDataPartsVector(query_context); + auto parts = getVisibleDataPartsVector(query_context); // If minmax_count_projection is a valid candidate, check its completeness. if (minmax_conut_projection_candidate) @@ -5237,7 +5296,7 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher( const String shadow_path = "shadow/"; /// Acquire a snapshot of active data parts to prevent removing while doing backup. - const auto data_parts = getDataParts(); + const auto data_parts = getVisibleDataPartsVector(local_context); String backup_name = (!with_name.empty() ? escapeForFileName(with_name) : toString(increment)); String backup_path = fs::path(shadow_path) / backup_name / ""; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index bf00db2c5c1..903cf0f979c 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -425,6 +425,7 @@ public: Int64 getMaxBlockNumber() const; + /// Returns a copy of the list so that the caller shouldn't worry about locks. DataParts getDataParts(const DataPartStates & affordable_states) const; @@ -436,24 +437,18 @@ public: /// Returns absolutely all parts (and snapshot of their states) DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr, bool require_projection_parts = false) const; - /// Returns all detached parts - DetachedPartsInfo getDetachedParts() const; - - void validateDetachedPartName(const String & name) const; - - void dropDetached(const ASTPtr & partition, bool part, ContextPtr context); - - MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part, - ContextPtr context, PartsTemporaryRename & renamed_parts); - - /// Returns Committed parts - DataParts getDataParts() const; + /// Returns parts in Committed state (NOT in terms of transactions, should be used carefully) + DataParts getDataPartsForInternalUsage() const; DataPartsVector getDataPartsVector() const; - DataPartsVector getDataPartsVector(ContextPtr local_context) const; - DataPartsVector getVisibleDataPartsVector(const MergeTreeTransactionPtr & txn) const; + void filterVisibleDataParts(DataPartsVector & maybe_visible_parts, Snapshot snapshot_version, TransactionID current_tid) const; - /// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr. + /// Returns parts that visible with current snapshot + DataPartsVector getVisibleDataPartsVector(ContextPtr local_context) const; + DataPartsVector getVisibleDataPartsVector(const MergeTreeTransactionPtr & txn) const; + DataPartsVector getVisibleDataPartsVector(Snapshot snapshot_version, TransactionID current_tid) const; + + /// Returns a part in Committed state with the given name or a part containing it. If there is no such part, returns nullptr. DataPartPtr getActiveContainingPart(const String & part_name) const; DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info) const; DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock) const; @@ -463,8 +458,8 @@ public: void swapActivePart(MergeTreeData::DataPartPtr part_copy); /// Returns all parts in specified partition - DataPartsVector getDataPartsVectorInPartition(DataPartState state, const String & partition_id) const; - DataPartsVector getDataPartsVectorInPartitions(DataPartState state, const std::unordered_set & partition_ids) const; + DataPartsVector getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id) const; + DataPartsVector getVisibleDataPartsVectorInPartitions(ContextPtr local_context, const std::unordered_set & partition_ids) const; /// Returns the part with the given name and state or nullptr if no such part. DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states); @@ -484,6 +479,18 @@ public: /// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition. std::optional getMinPartDataVersion() const; + + /// Returns all detached parts + DetachedPartsInfo getDetachedParts() const; + + void validateDetachedPartName(const String & name) const; + + void dropDetached(const ASTPtr & partition, bool part, ContextPtr context); + + MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part, + ContextPtr context, PartsTemporaryRename & renamed_parts); + + /// If the table contains too many active parts, sleep for a while to give them time to merge. /// If until is non-null, wake up from the sleep earlier if the event happened. void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const; @@ -656,7 +663,10 @@ public: /// Moves partition to specified Volume void movePartitionToVolume(const ASTPtr & partition, const String & name, bool moving_part, ContextPtr context); - void checkPartitionCanBeDropped(const ASTPtr & partition) override; + /// Checks that Partition could be dropped right now + /// Otherwise - throws an exception with detailed information. + /// We do not use mutex because it is not very important that the size could change during the operation. + void checkPartitionCanBeDropped(const ASTPtr & partition, ContextPtr local_context); void checkPartCanBeDropped(const String & part_name); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 3a6944df633..7fa55005f93 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -395,7 +395,7 @@ MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::selectAllPartsFromPar { MergeTreeData::DataPartsVector parts_from_partition; - MergeTreeData::DataParts data_parts = data.getDataParts(); + MergeTreeData::DataParts data_parts = data.getDataPartsForInternalUsage(); for (const auto & current_part : data_parts) { diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 1fc61012e6f..c766c775ac7 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -130,7 +130,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( return std::make_unique(); const auto & settings = context->getSettingsRef(); - auto parts = data.getDataPartsVector(context); + auto parts = data.getVisibleDataPartsVector(context); if (!query_info.projection) { diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index e1165dbe3c8..ec7504715e8 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -87,7 +87,7 @@ StorageMergeTree::StorageMergeTree( { loadDataParts(has_force_restore_data_flag); - if (!attach && !getDataParts().empty()) + if (!attach && !getDataPartsForInternalUsage().empty()) throw Exception("Data directory for table already containing data parts - probably it was unclean DROP table or manual intervention. You must either clear directory by hand or use ATTACH TABLE instead of CREATE TABLE if you need to use that parts.", ErrorCodes::INCORRECT_DATA); increment.set(getMaxBlockNumber()); @@ -258,7 +258,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont /// This protects against "revival" of data for a removed partition after completion of merge. auto merge_blocker = stopMergesAndWait(); - auto parts_to_remove = getDataPartsVector(); + auto parts_to_remove = getVisibleDataPartsVector(local_context); removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), parts_to_remove, true); LOG_INFO(log, "Removed {} parts.", parts_to_remove.size()); @@ -713,9 +713,9 @@ std::shared_ptr StorageMergeTree::selectPartsToMerge( { /// Cannot merge parts if some of them is not visible in current snapshot /// TODO We can use simplified visibility rules (without CSN lookup) here - if (left && !left->versions.isVisible(*tx)) + if (left && !left->versions.isVisible(tx->getSnapshot(), Tx::EmptyTID)) return false; - if (right && !right->versions.isVisible(*tx)) + if (right && !right->versions.isVisible(tx->getSnapshot(), Tx::EmptyTID)) return false; } @@ -1288,7 +1288,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont /// This protects against "revival" of data for a removed partition after completion of merge. auto merge_blocker = stopMergesAndWait(); String partition_id = getPartitionIDFromQuery(partition, local_context); - parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); + parts_to_remove = getVisibleDataPartsVectorInPartition(local_context, partition_id); /// TODO should we throw an exception if parts_to_remove is empty? removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), parts_to_remove, true); @@ -1370,7 +1370,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot); String partition_id = getPartitionIDFromQuery(partition, local_context); - DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); + DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id); MutableDataPartsVector dst_parts; static const String TMP_PREFIX = "tmp_replace_from_"; @@ -1455,7 +1455,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const MergeTreeData & src_data = dest_table_storage->checkStructureAndGetMergeTreeData(*this, metadata_snapshot, dest_metadata_snapshot); String partition_id = getPartitionIDFromQuery(partition, local_context); - DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); + DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id); MutableDataPartsVector dst_parts; static const String TMP_PREFIX = "tmp_move_from_"; @@ -1535,7 +1535,7 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_ if (const auto & check_query = query->as(); check_query.partition) { String partition_id = getPartitionIDFromQuery(check_query.partition, local_context); - data_parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); + data_parts = getVisibleDataPartsVectorInPartition(local_context, partition_id); } else data_parts = getDataPartsVector(); diff --git a/src/Storages/StorageProxy.h b/src/Storages/StorageProxy.h index 304f84c02eb..c3f4536da82 100644 --- a/src/Storages/StorageProxy.h +++ b/src/Storages/StorageProxy.h @@ -145,7 +145,6 @@ public: CheckResults checkData(const ASTPtr & query , ContextPtr context) override { return getNested()->checkData(query, context); } void checkTableCanBeDropped() const override { getNested()->checkTableCanBeDropped(); } - void checkPartitionCanBeDropped(const ASTPtr & partition) override { getNested()->checkPartitionCanBeDropped(partition); } bool storesDataOnDisk() const override { return getNested()->storesDataOnDisk(); } Strings getDataPaths() const override { return getNested()->getDataPaths(); } StoragePolicyPtr getStoragePolicy() const override { return getNested()->getStoragePolicy(); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 306d4d4f7e9..3a0aa3089a2 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -405,7 +405,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( if (!attach) { - if (!getDataParts().empty()) + if (!getDataPartsForInternalUsage().empty()) throw Exception("Data directory for table already contains data parts" " - probably it was unclean DROP table or manual intervention." " You must either clear directory by hand or use ATTACH TABLE" @@ -2452,7 +2452,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_from_zk); - auto local_active_parts = getDataParts(); + auto local_active_parts = getDataPartsForInternalUsage(); DataPartsVector parts_to_remove_from_working_set; @@ -4187,7 +4187,7 @@ ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock StorageReplicatedMerg { ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock max_added_blocks; - for (const auto & data_part : getDataParts()) + for (const auto & data_part : getDataPartsForInternalUsage()) { max_added_blocks[data_part->info.partition_id] = std::max(max_added_blocks[data_part->info.partition_id], data_part->info.max_block); @@ -4293,6 +4293,7 @@ void StorageReplicatedMergeTree::foreachCommittedParts(Func && func, bool select max_added_blocks = getMaxAddedBlocks(); auto lock = lockParts(); + /// TODO Transactions: should we count visible parts only? for (const auto & part : getDataPartsStateRange(DataPartState::Committed)) { if (part->isEmpty()) @@ -6246,7 +6247,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( String partition_id = getPartitionIDFromQuery(partition, query_context); /// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet. - DataPartsVector src_all_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); + DataPartsVector src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id); LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size()); @@ -7068,7 +7069,7 @@ CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, Context if (const auto & check_query = query->as(); check_query.partition) { String partition_id = getPartitionIDFromQuery(check_query.partition, local_context); - data_parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); + data_parts = getVisibleDataPartsVectorInPartition(local_context, partition_id); } else data_parts = getDataPartsVector(); diff --git a/tests/queries/0_stateless/01169_alter_partition_isolation_stress.reference b/tests/queries/0_stateless/01169_alter_partition_isolation_stress.reference new file mode 100644 index 00000000000..bf9c6e88bb2 --- /dev/null +++ b/tests/queries/0_stateless/01169_alter_partition_isolation_stress.reference @@ -0,0 +1,6 @@ +1 1 +2 1 +3 1 +4 1 +1 +10 100 diff --git a/tests/queries/0_stateless/01169_alter_partition_isolation_stress.sh b/tests/queries/0_stateless/01169_alter_partition_isolation_stress.sh new file mode 100755 index 00000000000..f05a3ec2b24 --- /dev/null +++ b/tests/queries/0_stateless/01169_alter_partition_isolation_stress.sh @@ -0,0 +1,113 @@ +#!/usr/bin/env bash +# Tags: long + +# shellcheck disable=SC2015 + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +set -e + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS src"; +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS dst"; +$CLICKHOUSE_CLIENT --query "CREATE TABLE src (n UInt64, type UInt8) ENGINE=MergeTree ORDER BY type SETTINGS old_parts_lifetime=0"; +$CLICKHOUSE_CLIENT --query "CREATE TABLE dst (n UInt64, type UInt8) ENGINE=MergeTree ORDER BY type SETTINGS old_parts_lifetime=0"; + +function thread_insert() +{ + set -e + trap "exit 0" INT + while true; do + action="ROLLBACK" + if (( RANDOM % 2 )); then + action="COMMIT" + fi + val=$RANDOM + $CLICKHOUSE_CLIENT --multiquery --query " + BEGIN TRANSACTION; + INSERT INTO src VALUES ($val, 1); + INSERT INTO src VALUES ($val, 2); + COMMIT;" + sleep 0.$RANDOM; + done +} + + +# NOTE +# ALTER PARTITION query stops merges, +# but serialization error is still possible if some merge was assigned (and committed) between BEGIN and ALTER. +function thread_partition_src_to_dst() +{ + set -e + count=0 + sum=0 + for i in {1..20}; do + out=$( + $CLICKHOUSE_CLIENT --multiquery --query " + BEGIN TRANSACTION; + INSERT INTO src VALUES ($i, 3); + INSERT INTO dst SELECT * FROM src; + ALTER TABLE src DROP PARTITION ID 'all'; + SELECT throwIf((SELECT (count(), sum(n)) FROM merge(currentDatabase(), '') WHERE type=3) != ($count + 1, $sum + $i)) FORMAT Null; + COMMIT;" 2>&1) ||: + + echo "$out" | grep -Fv "SERIALIZATION_ERROR" | grep -F "Received from " && $CLICKHOUSE_CLIENT -q "SELECT _table, type, arraySort(groupArray(n)) FROM merge(currentDatabase(), '') GROUP BY _table, type ORDER BY _table, type" ||: + echo "$out" | grep -Fa "SERIALIZATION_ERROR" >/dev/null || count=$((count+1)) && sum=$((sum+i)) + done +} + +function thread_partition_dst_to_src() +{ + set -e + for i in {1..20}; do + action="ROLLBACK" + if (( i % 2 )); then + action="COMMIT" + fi + $CLICKHOUSE_CLIENT --multiquery --query " + SYSTEM STOP MERGES dst; + BEGIN TRANSACTION; + INSERT INTO dst VALUES ($i, 4); + INSERT INTO src SELECT * FROM dst; + ALTER TABLE dst DROP PARTITION ID 'all'; + SYSTEM START MERGES dst; + SELECT throwIf((SELECT (count(), sum(n)) FROM merge(currentDatabase(), '') WHERE type=4) != (toUInt8($i/2 + 1), (select sum(number) from numbers(1, $i) where number % 2 or number=$i))) FORMAT Null; + $action;" || $CLICKHOUSE_CLIENT -q "SELECT _table, type, arraySort(groupArray(n)) FROM merge(currentDatabase(), '') GROUP BY _table, type ORDER BY _table, type" + done +} + +function thread_select() +{ + set -e + trap "exit 0" INT + while true; do + $CLICKHOUSE_CLIENT --multiquery --query " + BEGIN TRANSACTION; + -- no duplicates + SELECT type, throwIf(count(n) != countDistinct(n)) FROM src GROUP BY type FORMAT Null; + SELECT type, throwIf(count(n) != countDistinct(n)) FROM dst GROUP BY type FORMAT Null; + -- rows inserted by thread_insert moved together + SELECT _table, throwIf(arraySort(groupArrayIf(n, type=1)) != arraySort(groupArrayIf(n, type=2))) FROM merge(currentDatabase(), '') GROUP BY _table FORMAT Null; + COMMIT;" || $CLICKHOUSE_CLIENT -q "SELECT _table, type, arraySort(groupArray(n)) FROM merge(currentDatabase(), '') GROUP BY _table, type ORDER BY _table, type" + done +} + +thread_insert & PID_1=$! +thread_select & PID_2=$! + +thread_partition_src_to_dst & PID_3=$! +thread_partition_dst_to_src & PID_4=$! +wait $PID_3 && wait $PID_4 + +kill -INT $PID_1 +kill -INT $PID_2 +wait + +$CLICKHOUSE_CLIENT -q "SELECT type, count(n) = countDistinct(n) FROM merge(currentDatabase(), '') GROUP BY type ORDER BY type" +$CLICKHOUSE_CLIENT -q "SELECT DISTINCT arraySort(groupArrayIf(n, type=1)) = arraySort(groupArrayIf(n, type=2)) FROM merge(currentDatabase(), '') GROUP BY _table ORDER BY _table" +$CLICKHOUSE_CLIENT -q "SELECT count(n), sum(n) FROM merge(currentDatabase(), '') WHERE type=4" + + +$CLICKHOUSE_CLIENT --query "DROP TABLE src"; +$CLICKHOUSE_CLIENT --query "DROP TABLE dst"; diff --git a/tests/queries/0_stateless/01170_alter_partition_isolation.reference b/tests/queries/0_stateless/01170_alter_partition_isolation.reference new file mode 100644 index 00000000000..fc772355a57 --- /dev/null +++ b/tests/queries/0_stateless/01170_alter_partition_isolation.reference @@ -0,0 +1,30 @@ +1 1 +2 3 +3 2 +3 4 +4 3 + +5 3 +5 5 + +6 3 +6 5 +6 6 +7 8 +8 3 +8 5 +8 7 +8 9 +SERIALIZATION_ERROR +INVALID_TRANSACTION +9 8 + +10 8 + +11 8 +11 11 +11 12 +12 8 +12 8 +12 11 +12 12 diff --git a/tests/queries/0_stateless/01170_alter_partition_isolation.sh b/tests/queries/0_stateless/01170_alter_partition_isolation.sh new file mode 100755 index 00000000000..4174f8215fe --- /dev/null +++ b/tests/queries/0_stateless/01170_alter_partition_isolation.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh +# shellcheck source=./transactions.lib +. "$CURDIR"/transactions.lib + +$CLICKHOUSE_CLIENT -q "drop table if exists mt" +$CLICKHOUSE_CLIENT -q "create table mt (n int) engine=MergeTree order by n" + +tx 1 "begin transaction" +tx 1 "insert into mt values (1)" +tx 2 "begin transaction" +tx 2 "insert into mt values (2)" +tx 1 "select 1, n from mt order by n" +tx 1 "alter table mt drop partition id 'all'" +tx 2 "insert into mt values (4)" +tx 1 "insert into mt values (3)" +tx 1 "select 2, n from mt order by n" +tx 2 "select 3, n from mt order by n" +tx 2 "alter table mt drop partition id 'all'" +tx 2 "insert into mt values (5)" +tx 1 "select 4, n from mt order by n" +tx 2 "commit" +tx 1 "commit" + +echo '' +$CLICKHOUSE_CLIENT -q "select 5, n from mt order by n" +echo '' + +tx 4 "begin transaction" +tx 4 "insert into mt values (6)" +tx 3 "begin transaction" +tx 3 "insert into mt values (7)" +tx 4 "select 6, n from mt order by n" +tx 4 "alter table mt drop partition id 'all'" +tx 3 "insert into mt values (9)" +tx 4 "insert into mt values (8)" +tx 4 "select 7, n from mt order by n" +tx 3 "select 8, n from mt order by n" +tx 3 "alter table mt drop partition id 'all'" | grep -Eo "SERIALIZATION_ERROR" | uniq +tx 3 "insert into mt values (10)" | grep -Eo "INVALID_TRANSACTION" | uniq +tx 4 "select 9, n from mt order by n" +tx 3 "rollback" +tx 4 "commit" + +echo '' +$CLICKHOUSE_CLIENT -q "select 10, n from mt order by n" +echo '' + +$CLICKHOUSE_CLIENT -q "drop table if exists another_mt" +$CLICKHOUSE_CLIENT -q "create table another_mt (n int) engine=MergeTree order by n" + +tx 5 "begin transaction" +tx 5 "insert into another_mt values (11)" +tx 6 "begin transaction" +tx 6 "insert into mt values (12)" +tx 6 "insert into another_mt values (13)" +tx 5 "alter table another_mt move partition id 'all' to table mt" +tx 6 "alter table another_mt replace partition id 'all' from mt" +tx 5 "alter table another_mt attach partition id 'all' from mt" +tx 5 "commit" +tx 6 "commit" + +$CLICKHOUSE_CLIENT -q "select 11, n from mt order by n" +$CLICKHOUSE_CLIENT -q "select 12, n from another_mt order by n" + +$CLICKHOUSE_CLIENT -q "drop table another_mt" +$CLICKHOUSE_CLIENT -q "drop table mt" diff --git a/tests/queries/0_stateless/01171_mv_select_insert_isolation_long.sh b/tests/queries/0_stateless/01171_mv_select_insert_isolation_long.sh index c48f86f4aee..672a49df5fc 100755 --- a/tests/queries/0_stateless/01171_mv_select_insert_isolation_long.sh +++ b/tests/queries/0_stateless/01171_mv_select_insert_isolation_long.sh @@ -22,6 +22,7 @@ $CLICKHOUSE_CLIENT --query "INSERT INTO src VALUES (0, 0)" # some transactions will fail due to constraint function thread_insert_commit() { + set -e for i in {1..100}; do $CLICKHOUSE_CLIENT --multiquery --query " BEGIN TRANSACTION; @@ -34,6 +35,7 @@ function thread_insert_commit() function thread_insert_rollback() { + set -e for _ in {1..100}; do $CLICKHOUSE_CLIENT --multiquery --query " BEGIN TRANSACTION; @@ -46,11 +48,17 @@ function thread_insert_rollback() # make merges more aggressive function thread_optimize() { + set -e trap "exit 0" INT while true; do optimize_query="OPTIMIZE TABLE src" + partition_id=$(( RANDOM % 2 )) if (( RANDOM % 2 )); then optimize_query="OPTIMIZE TABLE dst" + partition_id="all" + fi + if (( RANDOM % 2 )); then + optimize_query="$optimize_query PARTITION ID '$partition_id'" fi if (( RANDOM % 2 )); then optimize_query="$optimize_query FINAL" @@ -71,6 +79,7 @@ function thread_optimize() function thread_select() { + set -e trap "exit 0" INT while true; do $CLICKHOUSE_CLIENT --multiquery --query " @@ -86,6 +95,7 @@ function thread_select() function thread_select_insert() { + set -e trap "exit 0" INT while true; do $CLICKHOUSE_CLIENT --multiquery --query " diff --git a/tests/queries/0_stateless/transactions.lib b/tests/queries/0_stateless/transactions.lib new file mode 100755 index 00000000000..2d3eafc784a --- /dev/null +++ b/tests/queries/0_stateless/transactions.lib @@ -0,0 +1,14 @@ +#!/usr/bin/env bash + +function tx() +{ + tx_num=$1 + query=$2 + + url_without_session="https://${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_HTTPS}/?" + session="${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}_$tx_num" + url="${url_without_session}session_id=$session&database=$CLICKHOUSE_DATABASE" + + ${CLICKHOUSE_CURL} -m 30 -sSk "$url" --data "$query" +} +