From 46322370c004f00662b68dd6eea5d6f2efdff26f Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 1 Apr 2020 15:43:09 +0300 Subject: [PATCH] Revert "Remove useless code around locks" --- .../PushingToViewsBlockOutputStream.cpp | 4 +-- dbms/src/Databases/DatabaseMySQL.cpp | 2 +- dbms/src/Functions/FunctionJoinGet.cpp | 2 +- .../Interpreters/InterpreterAlterQuery.cpp | 6 ++--- .../Interpreters/InterpreterCreateQuery.cpp | 2 +- .../Interpreters/InterpreterDescribeQuery.cpp | 2 +- .../Interpreters/InterpreterInsertQuery.cpp | 2 +- .../Interpreters/InterpreterSelectQuery.cpp | 2 +- dbms/src/Storages/IStorage.cpp | 15 +++++++---- dbms/src/Storages/IStorage.h | 11 +++++--- .../src/Storages/LiveView/StorageLiveView.cpp | 11 +++++--- dbms/src/Storages/LiveView/StorageLiveView.h | 2 +- .../Storages/MergeTree/DataPartsExchange.cpp | 2 +- .../ReplicatedMergeTreeCleanupThread.cpp | 2 +- .../ReplicatedMergeTreePartCheckThread.cpp | 2 +- dbms/src/Storages/StorageBuffer.cpp | 2 +- dbms/src/Storages/StorageMaterializedView.cpp | 4 +-- dbms/src/Storages/StorageMerge.cpp | 4 +-- dbms/src/Storages/StorageMergeTree.cpp | 20 +++++++------- .../Storages/StorageReplicatedMergeTree.cpp | 26 +++++++++---------- .../Storages/System/StorageSystemColumns.cpp | 2 +- .../System/StorageSystemPartsBase.cpp | 2 +- .../Storages/System/StorageSystemTables.cpp | 2 +- dbms/src/Storages/TableStructureLockHolder.h | 9 ++++--- 24 files changed, 77 insertions(+), 61 deletions(-) diff --git a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 5752fbaff96..991d206777a 100644 --- a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -25,7 +25,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( * Although now any insertion into the table is done via PushingToViewsBlockOutputStream, * but it's clear that here is not the best place for this functionality. */ - addTableLock(storage->lockStructureForShare(context.getInitialQueryId())); + addTableLock(storage->lockStructureForShare(true, context.getInitialQueryId())); /// If the "root" table deduplactes blocks, there are no need to make deduplication for children /// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks @@ -54,7 +54,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( if (auto * materialized_view = dynamic_cast(dependent_table.get())) { - addTableLock(materialized_view->lockStructureForShare(context.getInitialQueryId())); + addTableLock(materialized_view->lockStructureForShare(true, context.getInitialQueryId())); StoragePtr inner_table = materialized_view->getTargetTable(); auto inner_table_id = inner_table->getStorageID(); diff --git a/dbms/src/Databases/DatabaseMySQL.cpp b/dbms/src/Databases/DatabaseMySQL.cpp index ad40cff9e6b..959121585ea 100644 --- a/dbms/src/Databases/DatabaseMySQL.cpp +++ b/dbms/src/Databases/DatabaseMySQL.cpp @@ -358,7 +358,7 @@ void DatabaseMySQL::cleanOutdatedTables() ++iterator; else { - const auto table_lock = (*iterator)->lockAlterIntention(); + const auto table_lock = (*iterator)->lockAlterIntention(RWLockImpl::NO_QUERY); (*iterator)->shutdown(); (*iterator)->is_dropped = true; diff --git a/dbms/src/Functions/FunctionJoinGet.cpp b/dbms/src/Functions/FunctionJoinGet.cpp index 6a6c0c4a97e..0860deccb14 100644 --- a/dbms/src/Functions/FunctionJoinGet.cpp +++ b/dbms/src/Functions/FunctionJoinGet.cpp @@ -65,7 +65,7 @@ FunctionBaseImplPtr JoinGetOverloadResolver::build(const ColumnsWithTypeAndName auto join = storage_join->getJoin(); DataTypes data_types(arguments.size()); - auto table_lock = storage_join->lockStructureForShare(context.getInitialQueryId()); + auto table_lock = storage_join->lockStructureForShare(false, context.getInitialQueryId()); for (size_t i = 0; i < arguments.size(); ++i) data_types[i] = arguments[i].type; diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index ddf1e27af87..315527765ef 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -82,7 +82,7 @@ BlockIO InterpreterAlterQuery::execute() if (!mutation_commands.empty()) { - auto table_lock_holder = table->lockStructureForShare(context.getCurrentQueryId()); + auto table_lock_holder = table->lockStructureForShare(false /* because mutation is executed asyncronously */, context.getCurrentQueryId()); MutationsInterpreter(table, mutation_commands, context, false).validate(table_lock_holder); table->mutate(mutation_commands, context); } @@ -101,7 +101,7 @@ BlockIO InterpreterAlterQuery::execute() switch (command.type) { case LiveViewCommand::REFRESH: - live_view->refresh(); + live_view->refresh(context); break; } } @@ -109,7 +109,7 @@ BlockIO InterpreterAlterQuery::execute() if (!alter_commands.empty()) { - auto table_lock_holder = table->lockAlterIntention(); + auto table_lock_holder = table->lockAlterIntention(context.getCurrentQueryId()); StorageInMemoryMetadata metadata = table->getInMemoryMetadata(); alter_commands.validate(metadata, context); alter_commands.prepare(metadata); diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index b57604828e1..f15796688e1 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -403,7 +403,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS StoragePtr as_storage = DatabaseCatalog::instance().getTable({as_database_name, create.as_table}); /// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table. - as_storage_lock = as_storage->lockStructureForShare(context.getCurrentQueryId()); + as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId()); properties.columns = as_storage->getColumns(); /// Secondary indices make sense only for MergeTree family of storage engines. diff --git a/dbms/src/Interpreters/InterpreterDescribeQuery.cpp b/dbms/src/Interpreters/InterpreterDescribeQuery.cpp index cf7bb0458e9..1353c01ebf6 100644 --- a/dbms/src/Interpreters/InterpreterDescribeQuery.cpp +++ b/dbms/src/Interpreters/InterpreterDescribeQuery.cpp @@ -89,7 +89,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl() table = DatabaseCatalog::instance().getTable(table_id); } - auto table_lock = table->lockStructureForShare(context.getInitialQueryId()); + auto table_lock = table->lockStructureForShare(false, context.getInitialQueryId()); columns = table->getColumns(); } diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index f12ac68cede..b4280ee20e6 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -109,7 +109,7 @@ BlockIO InterpreterInsertQuery::execute() BlockIO res; StoragePtr table = getTable(query); - auto table_lock = table->lockStructureForShare(context.getInitialQueryId()); + auto table_lock = table->lockStructureForShare(true, context.getInitialQueryId()); auto query_sample_block = getSampleBlock(query, table); if (!query.table_function) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index b08e0ce1146..514efb90a00 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -255,7 +255,7 @@ InterpreterSelectQuery::InterpreterSelectQuery( if (storage) { - table_lock = storage->lockStructureForShare(context->getInitialQueryId()); + table_lock = storage->lockStructureForShare(false, context->getInitialQueryId()); table_id = storage->getStorageID(); } diff --git a/dbms/src/Storages/IStorage.cpp b/dbms/src/Storages/IStorage.cpp index c36a28b115f..4d916ca1b46 100644 --- a/dbms/src/Storages/IStorage.cpp +++ b/dbms/src/Storages/IStorage.cpp @@ -314,9 +314,11 @@ bool IStorage::isVirtualColumn(const String & column_name) const return getColumns().get(column_name).is_virtual; } -TableStructureReadLockHolder IStorage::lockStructureForShare(const String & query_id) +TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id) { TableStructureReadLockHolder result; + if (will_add_new_data) + result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Read, query_id); result.structure_lock = structure_lock->getLock(RWLockImpl::Read, query_id); if (is_dropped) @@ -324,10 +326,10 @@ TableStructureReadLockHolder IStorage::lockStructureForShare(const String & quer return result; } -TableStructureWriteLockHolder IStorage::lockAlterIntention() +TableStructureWriteLockHolder IStorage::lockAlterIntention(const String & query_id) { TableStructureWriteLockHolder result; - result.alter_lock = std::unique_lock(alter_lock); + result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id); if (is_dropped) throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED); @@ -336,20 +338,23 @@ TableStructureWriteLockHolder IStorage::lockAlterIntention() void IStorage::lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id) { - if (!lock_holder.alter_lock) + if (!lock_holder.alter_intention_lock) throw Exception("Alter intention lock for table " + getStorageID().getNameForLogs() + " was not taken. This is a bug.", ErrorCodes::LOGICAL_ERROR); + if (!lock_holder.new_data_structure_lock) + lock_holder.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id); lock_holder.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id); } TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id) { TableStructureWriteLockHolder result; - result.alter_lock = std::unique_lock(alter_lock); + result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id); if (is_dropped) throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED); + result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id); result.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id); return result; diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index 469f39d65df..d3cede6e5c8 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -199,11 +199,11 @@ public: /// Acquire this lock if you need the table structure to remain constant during the execution of /// the query. If will_add_new_data is true, this means that the query will add new data to the table /// (INSERT or a parts merge). - TableStructureReadLockHolder lockStructureForShare(const String & query_id); + TableStructureReadLockHolder lockStructureForShare(bool will_add_new_data, const String & query_id); /// Acquire this lock at the start of ALTER to lock out other ALTERs and make sure that only you /// can modify the table structure. It can later be upgraded to the exclusive lock. - TableStructureWriteLockHolder lockAlterIntention(); + TableStructureWriteLockHolder lockAlterIntention(const String & query_id); /// Upgrade alter intention lock to the full exclusive structure lock. This is done by ALTER queries /// to ensure that no other query uses the table structure and it can be safely changed. @@ -490,7 +490,12 @@ private: /// If you hold this lock exclusively, you can be sure that no other structure modifying queries /// (e.g. ALTER, DROP) are concurrently executing. But queries that only read table structure /// (e.g. SELECT, INSERT) can continue to execute. - mutable std::mutex alter_lock; + mutable RWLock alter_intention_lock = RWLockImpl::create(); + + /// It is taken for share for the entire INSERT query and the entire merge of the parts (for MergeTree). + /// ALTER COLUMN queries acquire an exclusive lock to ensure that no new parts with the old structure + /// are added to the table and thus the set of parts to modify doesn't change. + mutable RWLock new_data_structure_lock = RWLockImpl::create(); /// Lock for the table column structure (names, types, etc.) and data path. /// It is taken in exclusive mode by queries that modify them (e.g. RENAME, ALTER and DROP) diff --git a/dbms/src/Storages/LiveView/StorageLiveView.cpp b/dbms/src/Storages/LiveView/StorageLiveView.cpp index 93d183a594f..049110a3294 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.cpp +++ b/dbms/src/Storages/LiveView/StorageLiveView.cpp @@ -517,11 +517,14 @@ void StorageLiveView::drop(TableStructureWriteLockHolder &) condition.notify_all(); } -void StorageLiveView::refresh() +void StorageLiveView::refresh(const Context & context) { - std::lock_guard lock(mutex); - if (getNewBlocks()) - condition.notify_all(); + auto alter_lock = lockAlterIntention(context.getCurrentQueryId()); + { + std::lock_guard lock(mutex); + if (getNewBlocks()) + condition.notify_all(); + } } Pipes StorageLiveView::read( diff --git a/dbms/src/Storages/LiveView/StorageLiveView.h b/dbms/src/Storages/LiveView/StorageLiveView.h index b3ed89f8d10..9186132f99d 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.h +++ b/dbms/src/Storages/LiveView/StorageLiveView.h @@ -123,7 +123,7 @@ public: void startup() override; void shutdown() override; - void refresh(); + void refresh(const Context & context); Pipes read( const Names & column_names, diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index 1b216e8bec3..6373c85a15d 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -85,7 +85,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo try { - auto storage_lock = data.lockStructureForShare(RWLockImpl::NO_QUERY); + auto storage_lock = data.lockStructureForShare(false, RWLockImpl::NO_QUERY); MergeTreeData::DataPartPtr part = findPart(part_name); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 27ad6871573..77a5bca7a92 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -57,7 +57,7 @@ void ReplicatedMergeTreeCleanupThread::iterate() { /// TODO: Implement tryLockStructureForShare. - auto lock = storage.lockStructureForShare(""); + auto lock = storage.lockStructureForShare(false, ""); storage.clearOldTemporaryDirectories(); } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index 5c8f878503a..17b716d14c2 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -203,7 +203,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na else if (part->name == part_name) { auto zookeeper = storage.getZooKeeper(); - auto table_lock = storage.lockStructureForShare(RWLockImpl::NO_QUERY); + auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY); auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums( part->getColumns(), part->checksums); diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 53fb257d58d..7699f8379d9 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -168,7 +168,7 @@ Pipes StorageBuffer::read( if (destination.get() == this) throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP); - auto destination_lock = destination->lockStructureForShare(context.getCurrentQueryId()); + auto destination_lock = destination->lockStructureForShare(false, context.getCurrentQueryId()); const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [this, destination](const String& column_name) { diff --git a/dbms/src/Storages/StorageMaterializedView.cpp b/dbms/src/Storages/StorageMaterializedView.cpp index 63031572cd6..3fb25bf8275 100644 --- a/dbms/src/Storages/StorageMaterializedView.cpp +++ b/dbms/src/Storages/StorageMaterializedView.cpp @@ -185,7 +185,7 @@ Pipes StorageMaterializedView::read( const unsigned num_streams) { auto storage = getTargetTable(); - auto lock = storage->lockStructureForShare(context.getCurrentQueryId()); + auto lock = storage->lockStructureForShare(false, context.getCurrentQueryId()); if (query_info.order_by_optimizer) query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage); @@ -200,7 +200,7 @@ Pipes StorageMaterializedView::read( BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const Context & context) { auto storage = getTargetTable(); - auto lock = storage->lockStructureForShare(context.getCurrentQueryId()); + auto lock = storage->lockStructureForShare(true, context.getCurrentQueryId()); auto stream = storage->write(query, context); stream->addTableLock(lock); return stream; diff --git a/dbms/src/Storages/StorageMerge.cpp b/dbms/src/Storages/StorageMerge.cpp index f102ee1c6f8..f3322c7dfff 100644 --- a/dbms/src/Storages/StorageMerge.cpp +++ b/dbms/src/Storages/StorageMerge.cpp @@ -364,7 +364,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String { auto & table = iterator->table(); if (table.get() != this) - selected_tables.emplace_back(table, table->lockStructureForShare(query_id), iterator->name()); + selected_tables.emplace_back(table, table->lockStructureForShare(false, query_id), iterator->name()); iterator->next(); } @@ -389,7 +389,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr if (storage.get() != this) { - selected_tables.emplace_back(storage, storage->lockStructureForShare(query_id), iterator->name()); + selected_tables.emplace_back(storage, storage->lockStructureForShare(false, query_id), iterator->name()); virtual_column->insert(iterator->name()); } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 64950a47437..2efeff19657 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -241,7 +241,7 @@ void StorageMergeTree::alter( DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata); - /// We release all locks except alter_lock which allows + /// We release all locks except alter_intention_lock which allows /// to execute alter queries sequentially table_lock_holder.releaseAllExceptAlterIntention(); @@ -537,7 +537,7 @@ bool StorageMergeTree::merge( bool deduplicate, String * out_disable_reason) { - auto table_lock_holder = lockStructureForShare(RWLockImpl::NO_QUERY); + auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); FutureMergedMutatedPart future_part; @@ -655,7 +655,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask() bool StorageMergeTree::tryMutatePart() { - auto table_lock_holder = lockStructureForShare(RWLockImpl::NO_QUERY); + auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements; FutureMergedMutatedPart future_part; @@ -780,7 +780,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask() { { /// TODO: Implement tryLockStructureForShare. - auto lock_structure = lockStructureForShare(""); + auto lock_structure = lockStructureForShare(false, ""); clearOldPartsFromFilesystem(); clearOldTemporaryDirectories(); } @@ -973,14 +973,14 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma case PartitionCommand::FREEZE_PARTITION: { - auto lock = lockStructureForShare(context.getCurrentQueryId()); + auto lock = lockStructureForShare(false, context.getCurrentQueryId()); freezePartition(command.partition, command.with_name, context, lock); } break; case PartitionCommand::FREEZE_ALL_PARTITIONS: { - auto lock = lockStructureForShare(context.getCurrentQueryId()); + auto lock = lockStructureForShare(false, context.getCurrentQueryId()); freezeAll(command.with_name, context, lock); } break; @@ -1045,8 +1045,8 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context) { - auto lock1 = lockStructureForShare(context.getCurrentQueryId()); - auto lock2 = source_table->lockStructureForShare(context.getCurrentQueryId()); + auto lock1 = lockStructureForShare(false, context.getCurrentQueryId()); + auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId()); Stopwatch watch; MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table); @@ -1116,8 +1116,8 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context) { - auto lock1 = lockStructureForShare(context.getCurrentQueryId()); - auto lock2 = dest_table->lockStructureForShare(context.getCurrentQueryId()); + auto lock1 = lockStructureForShare(false, context.getCurrentQueryId()); + auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId()); auto dest_table_storage = std::dynamic_pointer_cast(dest_table); if (!dest_table_storage) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 336fef069d0..8896151561b 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1025,7 +1025,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge, ttl_infos, time(nullptr), max_volume_index); - auto table_lock = lockStructureForShare(RWLockImpl::NO_QUERY); + auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY); FutureMergedMutatedPart future_merged_part(parts, entry.new_part_type); if (future_merged_part.name != entry.new_part_name) @@ -1160,7 +1160,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM /// Can throw an exception. ReservationPtr reserved_space = reserveSpace(estimated_space_for_result, source_part->disk); - auto table_lock = lockStructureForShare(RWLockImpl::NO_QUERY); + auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY); MutableDataPartPtr new_part; Transaction transaction(*this); @@ -1514,7 +1514,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) PartDescriptions parts_to_add; DataPartsVector parts_to_remove; - auto table_lock_holder_dst_table = lockStructureForShare(RWLockImpl::NO_QUERY); + auto table_lock_holder_dst_table = lockStructureForShare(false, RWLockImpl::NO_QUERY); for (size_t i = 0; i < entry_replace.new_part_names.size(); ++i) { @@ -1576,7 +1576,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) return 0; } - table_lock_holder_src_table = source_table->lockStructureForShare(RWLockImpl::NO_QUERY); + table_lock_holder_src_table = source_table->lockStructureForShare(false, RWLockImpl::NO_QUERY); DataPartStates valid_states{MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated}; @@ -2699,7 +2699,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin TableStructureReadLockHolder table_lock_holder; if (!to_detached) - table_lock_holder = lockStructureForShare(RWLockImpl::NO_QUERY); + table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); /// Logging Stopwatch stopwatch; @@ -3223,7 +3223,7 @@ void StorageReplicatedMergeTree::alter( alter_entry.emplace(); mutation_znode.reset(); - /// We can safely read structure, because we guarded with alter_lock + /// We can safely read structure, because we guarded with alter_intention_lock if (is_readonly) throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY); @@ -3428,14 +3428,14 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part case PartitionCommand::FREEZE_PARTITION: { - auto lock = lockStructureForShare(query_context.getCurrentQueryId()); + auto lock = lockStructureForShare(false, query_context.getCurrentQueryId()); freezePartition(command.partition, command.with_name, query_context, lock); } break; case PartitionCommand::FREEZE_ALL_PARTITIONS: { - auto lock = lockStructureForShare(query_context.getCurrentQueryId()); + auto lock = lockStructureForShare(false, query_context.getCurrentQueryId()); freezeAll(command.with_name, query_context, lock); } break; @@ -4443,7 +4443,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK() { /// Critical section is not required (since grabOldParts() returns unique part set on each call) - auto table_lock = lockStructureForShare(RWLockImpl::NO_QUERY); + auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY); auto zookeeper = getZooKeeper(); DataPartsVector parts = grabOldParts(); @@ -4738,8 +4738,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_ const Context & context) { /// First argument is true, because we possibly will add new data to current table. - auto lock1 = lockStructureForShare(context.getCurrentQueryId()); - auto lock2 = source_table->lockStructureForShare(context.getCurrentQueryId()); + auto lock1 = lockStructureForShare(true, context.getCurrentQueryId()); + auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId()); Stopwatch watch; MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table); @@ -4917,8 +4917,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context) { - auto lock1 = lockStructureForShare(context.getCurrentQueryId()); - auto lock2 = dest_table->lockStructureForShare(context.getCurrentQueryId()); + auto lock1 = lockStructureForShare(false, context.getCurrentQueryId()); + auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId()); auto dest_table_storage = std::dynamic_pointer_cast(dest_table); if (!dest_table_storage) diff --git a/dbms/src/Storages/System/StorageSystemColumns.cpp b/dbms/src/Storages/System/StorageSystemColumns.cpp index 9af8904ab26..cbf6ada9ed3 100644 --- a/dbms/src/Storages/System/StorageSystemColumns.cpp +++ b/dbms/src/Storages/System/StorageSystemColumns.cpp @@ -103,7 +103,7 @@ protected: try { - table_lock = storage->lockStructureForShare(query_id); + table_lock = storage->lockStructureForShare(false, query_id); } catch (const Exception & e) { diff --git a/dbms/src/Storages/System/StorageSystemPartsBase.cpp b/dbms/src/Storages/System/StorageSystemPartsBase.cpp index c212b30d268..d8f564b0160 100644 --- a/dbms/src/Storages/System/StorageSystemPartsBase.cpp +++ b/dbms/src/Storages/System/StorageSystemPartsBase.cpp @@ -192,7 +192,7 @@ StoragesInfo StoragesInfoStream::next() try { /// For table not to be dropped and set of columns to remain constant. - info.table_lock = info.storage->lockStructureForShare(query_id); + info.table_lock = info.storage->lockStructureForShare(false, query_id); } catch (const Exception & e) { diff --git a/dbms/src/Storages/System/StorageSystemTables.cpp b/dbms/src/Storages/System/StorageSystemTables.cpp index 5d0aec921de..a8d5fc2ec57 100644 --- a/dbms/src/Storages/System/StorageSystemTables.cpp +++ b/dbms/src/Storages/System/StorageSystemTables.cpp @@ -244,7 +244,7 @@ protected: if (need_lock_structure) { table = tables_it->table(); - lock = table->lockStructureForShare(context.getCurrentQueryId()); + lock = table->lockStructureForShare(false, context.getCurrentQueryId()); } } catch (const Exception & e) diff --git a/dbms/src/Storages/TableStructureLockHolder.h b/dbms/src/Storages/TableStructureLockHolder.h index 50f196517e3..b5fc0c620ad 100644 --- a/dbms/src/Storages/TableStructureLockHolder.h +++ b/dbms/src/Storages/TableStructureLockHolder.h @@ -12,11 +12,12 @@ struct TableStructureWriteLockHolder { void release() { - *this = {}; + *this = TableStructureWriteLockHolder(); } void releaseAllExceptAlterIntention() { + new_data_structure_lock.reset(); structure_lock.reset(); } @@ -24,7 +25,8 @@ private: friend class IStorage; /// Order is important. - std::unique_lock alter_lock; + RWLockImpl::LockHolder alter_intention_lock; + RWLockImpl::LockHolder new_data_structure_lock; RWLockImpl::LockHolder structure_lock; }; @@ -32,13 +34,14 @@ struct TableStructureReadLockHolder { void release() { - *this = {}; + *this = TableStructureReadLockHolder(); } private: friend class IStorage; /// Order is important. + RWLockImpl::LockHolder new_data_structure_lock; RWLockImpl::LockHolder structure_lock; };