Revert "Remove useless code around locks"

This commit is contained in:
alesapin 2020-04-01 15:43:09 +03:00 committed by GitHub
parent 8bee467f4c
commit 46322370c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 77 additions and 61 deletions

View File

@ -25,7 +25,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream, * Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
* but it's clear that here is not the best place for this functionality. * but it's clear that here is not the best place for this functionality.
*/ */
addTableLock(storage->lockStructureForShare(context.getInitialQueryId())); addTableLock(storage->lockStructureForShare(true, context.getInitialQueryId()));
/// If the "root" table deduplactes blocks, there are no need to make deduplication for children /// If the "root" table deduplactes blocks, there are no need to make deduplication for children
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks /// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks
@ -54,7 +54,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get())) if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get()))
{ {
addTableLock(materialized_view->lockStructureForShare(context.getInitialQueryId())); addTableLock(materialized_view->lockStructureForShare(true, context.getInitialQueryId()));
StoragePtr inner_table = materialized_view->getTargetTable(); StoragePtr inner_table = materialized_view->getTargetTable();
auto inner_table_id = inner_table->getStorageID(); auto inner_table_id = inner_table->getStorageID();

View File

@ -358,7 +358,7 @@ void DatabaseMySQL::cleanOutdatedTables()
++iterator; ++iterator;
else else
{ {
const auto table_lock = (*iterator)->lockAlterIntention(); const auto table_lock = (*iterator)->lockAlterIntention(RWLockImpl::NO_QUERY);
(*iterator)->shutdown(); (*iterator)->shutdown();
(*iterator)->is_dropped = true; (*iterator)->is_dropped = true;

View File

@ -65,7 +65,7 @@ FunctionBaseImplPtr JoinGetOverloadResolver::build(const ColumnsWithTypeAndName
auto join = storage_join->getJoin(); auto join = storage_join->getJoin();
DataTypes data_types(arguments.size()); DataTypes data_types(arguments.size());
auto table_lock = storage_join->lockStructureForShare(context.getInitialQueryId()); auto table_lock = storage_join->lockStructureForShare(false, context.getInitialQueryId());
for (size_t i = 0; i < arguments.size(); ++i) for (size_t i = 0; i < arguments.size(); ++i)
data_types[i] = arguments[i].type; data_types[i] = arguments[i].type;

View File

@ -82,7 +82,7 @@ BlockIO InterpreterAlterQuery::execute()
if (!mutation_commands.empty()) if (!mutation_commands.empty())
{ {
auto table_lock_holder = table->lockStructureForShare(context.getCurrentQueryId()); auto table_lock_holder = table->lockStructureForShare(false /* because mutation is executed asyncronously */, context.getCurrentQueryId());
MutationsInterpreter(table, mutation_commands, context, false).validate(table_lock_holder); MutationsInterpreter(table, mutation_commands, context, false).validate(table_lock_holder);
table->mutate(mutation_commands, context); table->mutate(mutation_commands, context);
} }
@ -101,7 +101,7 @@ BlockIO InterpreterAlterQuery::execute()
switch (command.type) switch (command.type)
{ {
case LiveViewCommand::REFRESH: case LiveViewCommand::REFRESH:
live_view->refresh(); live_view->refresh(context);
break; break;
} }
} }
@ -109,7 +109,7 @@ BlockIO InterpreterAlterQuery::execute()
if (!alter_commands.empty()) if (!alter_commands.empty())
{ {
auto table_lock_holder = table->lockAlterIntention(); auto table_lock_holder = table->lockAlterIntention(context.getCurrentQueryId());
StorageInMemoryMetadata metadata = table->getInMemoryMetadata(); StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(metadata, context); alter_commands.validate(metadata, context);
alter_commands.prepare(metadata); alter_commands.prepare(metadata);

View File

@ -403,7 +403,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
StoragePtr as_storage = DatabaseCatalog::instance().getTable({as_database_name, create.as_table}); StoragePtr as_storage = DatabaseCatalog::instance().getTable({as_database_name, create.as_table});
/// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table. /// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table.
as_storage_lock = as_storage->lockStructureForShare(context.getCurrentQueryId()); as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
properties.columns = as_storage->getColumns(); properties.columns = as_storage->getColumns();
/// Secondary indices make sense only for MergeTree family of storage engines. /// Secondary indices make sense only for MergeTree family of storage engines.

View File

@ -89,7 +89,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
table = DatabaseCatalog::instance().getTable(table_id); table = DatabaseCatalog::instance().getTable(table_id);
} }
auto table_lock = table->lockStructureForShare(context.getInitialQueryId()); auto table_lock = table->lockStructureForShare(false, context.getInitialQueryId());
columns = table->getColumns(); columns = table->getColumns();
} }

View File

@ -109,7 +109,7 @@ BlockIO InterpreterInsertQuery::execute()
BlockIO res; BlockIO res;
StoragePtr table = getTable(query); StoragePtr table = getTable(query);
auto table_lock = table->lockStructureForShare(context.getInitialQueryId()); auto table_lock = table->lockStructureForShare(true, context.getInitialQueryId());
auto query_sample_block = getSampleBlock(query, table); auto query_sample_block = getSampleBlock(query, table);
if (!query.table_function) if (!query.table_function)

View File

@ -255,7 +255,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (storage) if (storage)
{ {
table_lock = storage->lockStructureForShare(context->getInitialQueryId()); table_lock = storage->lockStructureForShare(false, context->getInitialQueryId());
table_id = storage->getStorageID(); table_id = storage->getStorageID();
} }

View File

@ -314,9 +314,11 @@ bool IStorage::isVirtualColumn(const String & column_name) const
return getColumns().get(column_name).is_virtual; return getColumns().get(column_name).is_virtual;
} }
TableStructureReadLockHolder IStorage::lockStructureForShare(const String & query_id) TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id)
{ {
TableStructureReadLockHolder result; TableStructureReadLockHolder result;
if (will_add_new_data)
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Read, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Read, query_id); result.structure_lock = structure_lock->getLock(RWLockImpl::Read, query_id);
if (is_dropped) if (is_dropped)
@ -324,10 +326,10 @@ TableStructureReadLockHolder IStorage::lockStructureForShare(const String & quer
return result; return result;
} }
TableStructureWriteLockHolder IStorage::lockAlterIntention() TableStructureWriteLockHolder IStorage::lockAlterIntention(const String & query_id)
{ {
TableStructureWriteLockHolder result; TableStructureWriteLockHolder result;
result.alter_lock = std::unique_lock(alter_lock); result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped) if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED); throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
@ -336,20 +338,23 @@ TableStructureWriteLockHolder IStorage::lockAlterIntention()
void IStorage::lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id) void IStorage::lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id)
{ {
if (!lock_holder.alter_lock) if (!lock_holder.alter_intention_lock)
throw Exception("Alter intention lock for table " + getStorageID().getNameForLogs() + " was not taken. This is a bug.", ErrorCodes::LOGICAL_ERROR); throw Exception("Alter intention lock for table " + getStorageID().getNameForLogs() + " was not taken. This is a bug.", ErrorCodes::LOGICAL_ERROR);
if (!lock_holder.new_data_structure_lock)
lock_holder.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
lock_holder.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id); lock_holder.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
} }
TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id) TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id)
{ {
TableStructureWriteLockHolder result; TableStructureWriteLockHolder result;
result.alter_lock = std::unique_lock(alter_lock); result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped) if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED); throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id); result.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
return result; return result;

View File

@ -199,11 +199,11 @@ public:
/// Acquire this lock if you need the table structure to remain constant during the execution of /// Acquire this lock if you need the table structure to remain constant during the execution of
/// the query. If will_add_new_data is true, this means that the query will add new data to the table /// the query. If will_add_new_data is true, this means that the query will add new data to the table
/// (INSERT or a parts merge). /// (INSERT or a parts merge).
TableStructureReadLockHolder lockStructureForShare(const String & query_id); TableStructureReadLockHolder lockStructureForShare(bool will_add_new_data, const String & query_id);
/// Acquire this lock at the start of ALTER to lock out other ALTERs and make sure that only you /// Acquire this lock at the start of ALTER to lock out other ALTERs and make sure that only you
/// can modify the table structure. It can later be upgraded to the exclusive lock. /// can modify the table structure. It can later be upgraded to the exclusive lock.
TableStructureWriteLockHolder lockAlterIntention(); TableStructureWriteLockHolder lockAlterIntention(const String & query_id);
/// Upgrade alter intention lock to the full exclusive structure lock. This is done by ALTER queries /// Upgrade alter intention lock to the full exclusive structure lock. This is done by ALTER queries
/// to ensure that no other query uses the table structure and it can be safely changed. /// to ensure that no other query uses the table structure and it can be safely changed.
@ -490,7 +490,12 @@ private:
/// If you hold this lock exclusively, you can be sure that no other structure modifying queries /// If you hold this lock exclusively, you can be sure that no other structure modifying queries
/// (e.g. ALTER, DROP) are concurrently executing. But queries that only read table structure /// (e.g. ALTER, DROP) are concurrently executing. But queries that only read table structure
/// (e.g. SELECT, INSERT) can continue to execute. /// (e.g. SELECT, INSERT) can continue to execute.
mutable std::mutex alter_lock; mutable RWLock alter_intention_lock = RWLockImpl::create();
/// It is taken for share for the entire INSERT query and the entire merge of the parts (for MergeTree).
/// ALTER COLUMN queries acquire an exclusive lock to ensure that no new parts with the old structure
/// are added to the table and thus the set of parts to modify doesn't change.
mutable RWLock new_data_structure_lock = RWLockImpl::create();
/// Lock for the table column structure (names, types, etc.) and data path. /// Lock for the table column structure (names, types, etc.) and data path.
/// It is taken in exclusive mode by queries that modify them (e.g. RENAME, ALTER and DROP) /// It is taken in exclusive mode by queries that modify them (e.g. RENAME, ALTER and DROP)

View File

@ -517,11 +517,14 @@ void StorageLiveView::drop(TableStructureWriteLockHolder &)
condition.notify_all(); condition.notify_all();
} }
void StorageLiveView::refresh() void StorageLiveView::refresh(const Context & context)
{ {
std::lock_guard lock(mutex); auto alter_lock = lockAlterIntention(context.getCurrentQueryId());
if (getNewBlocks()) {
condition.notify_all(); std::lock_guard lock(mutex);
if (getNewBlocks())
condition.notify_all();
}
} }
Pipes StorageLiveView::read( Pipes StorageLiveView::read(

View File

@ -123,7 +123,7 @@ public:
void startup() override; void startup() override;
void shutdown() override; void shutdown() override;
void refresh(); void refresh(const Context & context);
Pipes read( Pipes read(
const Names & column_names, const Names & column_names,

View File

@ -85,7 +85,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
try try
{ {
auto storage_lock = data.lockStructureForShare(RWLockImpl::NO_QUERY); auto storage_lock = data.lockStructureForShare(false, RWLockImpl::NO_QUERY);
MergeTreeData::DataPartPtr part = findPart(part_name); MergeTreeData::DataPartPtr part = findPart(part_name);

View File

@ -57,7 +57,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
{ {
/// TODO: Implement tryLockStructureForShare. /// TODO: Implement tryLockStructureForShare.
auto lock = storage.lockStructureForShare(""); auto lock = storage.lockStructureForShare(false, "");
storage.clearOldTemporaryDirectories(); storage.clearOldTemporaryDirectories();
} }

View File

@ -203,7 +203,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
else if (part->name == part_name) else if (part->name == part_name)
{ {
auto zookeeper = storage.getZooKeeper(); auto zookeeper = storage.getZooKeeper();
auto table_lock = storage.lockStructureForShare(RWLockImpl::NO_QUERY); auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums( auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
part->getColumns(), part->checksums); part->getColumns(), part->checksums);

View File

@ -168,7 +168,7 @@ Pipes StorageBuffer::read(
if (destination.get() == this) if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP); throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
auto destination_lock = destination->lockStructureForShare(context.getCurrentQueryId()); auto destination_lock = destination->lockStructureForShare(false, context.getCurrentQueryId());
const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [this, destination](const String& column_name) const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [this, destination](const String& column_name)
{ {

View File

@ -185,7 +185,7 @@ Pipes StorageMaterializedView::read(
const unsigned num_streams) const unsigned num_streams)
{ {
auto storage = getTargetTable(); auto storage = getTargetTable();
auto lock = storage->lockStructureForShare(context.getCurrentQueryId()); auto lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
if (query_info.order_by_optimizer) if (query_info.order_by_optimizer)
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage); query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage);
@ -200,7 +200,7 @@ Pipes StorageMaterializedView::read(
BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const Context & context) BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const Context & context)
{ {
auto storage = getTargetTable(); auto storage = getTargetTable();
auto lock = storage->lockStructureForShare(context.getCurrentQueryId()); auto lock = storage->lockStructureForShare(true, context.getCurrentQueryId());
auto stream = storage->write(query, context); auto stream = storage->write(query, context);
stream->addTableLock(lock); stream->addTableLock(lock);
return stream; return stream;

View File

@ -364,7 +364,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String
{ {
auto & table = iterator->table(); auto & table = iterator->table();
if (table.get() != this) if (table.get() != this)
selected_tables.emplace_back(table, table->lockStructureForShare(query_id), iterator->name()); selected_tables.emplace_back(table, table->lockStructureForShare(false, query_id), iterator->name());
iterator->next(); iterator->next();
} }
@ -389,7 +389,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr
if (storage.get() != this) if (storage.get() != this)
{ {
selected_tables.emplace_back(storage, storage->lockStructureForShare(query_id), iterator->name()); selected_tables.emplace_back(storage, storage->lockStructureForShare(false, query_id), iterator->name());
virtual_column->insert(iterator->name()); virtual_column->insert(iterator->name());
} }

View File

@ -241,7 +241,7 @@ void StorageMergeTree::alter(
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata); DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata);
/// We release all locks except alter_lock which allows /// We release all locks except alter_intention_lock which allows
/// to execute alter queries sequentially /// to execute alter queries sequentially
table_lock_holder.releaseAllExceptAlterIntention(); table_lock_holder.releaseAllExceptAlterIntention();
@ -537,7 +537,7 @@ bool StorageMergeTree::merge(
bool deduplicate, bool deduplicate,
String * out_disable_reason) String * out_disable_reason)
{ {
auto table_lock_holder = lockStructureForShare(RWLockImpl::NO_QUERY); auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
FutureMergedMutatedPart future_part; FutureMergedMutatedPart future_part;
@ -655,7 +655,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask()
bool StorageMergeTree::tryMutatePart() bool StorageMergeTree::tryMutatePart()
{ {
auto table_lock_holder = lockStructureForShare(RWLockImpl::NO_QUERY); auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements; size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements;
FutureMergedMutatedPart future_part; FutureMergedMutatedPart future_part;
@ -780,7 +780,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{ {
{ {
/// TODO: Implement tryLockStructureForShare. /// TODO: Implement tryLockStructureForShare.
auto lock_structure = lockStructureForShare(""); auto lock_structure = lockStructureForShare(false, "");
clearOldPartsFromFilesystem(); clearOldPartsFromFilesystem();
clearOldTemporaryDirectories(); clearOldTemporaryDirectories();
} }
@ -973,14 +973,14 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::FREEZE_PARTITION: case PartitionCommand::FREEZE_PARTITION:
{ {
auto lock = lockStructureForShare(context.getCurrentQueryId()); auto lock = lockStructureForShare(false, context.getCurrentQueryId());
freezePartition(command.partition, command.with_name, context, lock); freezePartition(command.partition, command.with_name, context, lock);
} }
break; break;
case PartitionCommand::FREEZE_ALL_PARTITIONS: case PartitionCommand::FREEZE_ALL_PARTITIONS:
{ {
auto lock = lockStructureForShare(context.getCurrentQueryId()); auto lock = lockStructureForShare(false, context.getCurrentQueryId());
freezeAll(command.with_name, context, lock); freezeAll(command.with_name, context, lock);
} }
break; break;
@ -1045,8 +1045,8 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context) void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context)
{ {
auto lock1 = lockStructureForShare(context.getCurrentQueryId()); auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(context.getCurrentQueryId()); auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
Stopwatch watch; Stopwatch watch;
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table); MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table);
@ -1116,8 +1116,8 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context) void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
{ {
auto lock1 = lockStructureForShare(context.getCurrentQueryId()); auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = dest_table->lockStructureForShare(context.getCurrentQueryId()); auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId());
auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table); auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table);
if (!dest_table_storage) if (!dest_table_storage)

View File

@ -1025,7 +1025,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge, ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge,
ttl_infos, time(nullptr), max_volume_index); ttl_infos, time(nullptr), max_volume_index);
auto table_lock = lockStructureForShare(RWLockImpl::NO_QUERY); auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
FutureMergedMutatedPart future_merged_part(parts, entry.new_part_type); FutureMergedMutatedPart future_merged_part(parts, entry.new_part_type);
if (future_merged_part.name != entry.new_part_name) if (future_merged_part.name != entry.new_part_name)
@ -1160,7 +1160,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/// Can throw an exception. /// Can throw an exception.
ReservationPtr reserved_space = reserveSpace(estimated_space_for_result, source_part->disk); ReservationPtr reserved_space = reserveSpace(estimated_space_for_result, source_part->disk);
auto table_lock = lockStructureForShare(RWLockImpl::NO_QUERY); auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
MutableDataPartPtr new_part; MutableDataPartPtr new_part;
Transaction transaction(*this); Transaction transaction(*this);
@ -1514,7 +1514,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
PartDescriptions parts_to_add; PartDescriptions parts_to_add;
DataPartsVector parts_to_remove; DataPartsVector parts_to_remove;
auto table_lock_holder_dst_table = lockStructureForShare(RWLockImpl::NO_QUERY); auto table_lock_holder_dst_table = lockStructureForShare(false, RWLockImpl::NO_QUERY);
for (size_t i = 0; i < entry_replace.new_part_names.size(); ++i) for (size_t i = 0; i < entry_replace.new_part_names.size(); ++i)
{ {
@ -1576,7 +1576,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
return 0; return 0;
} }
table_lock_holder_src_table = source_table->lockStructureForShare(RWLockImpl::NO_QUERY); table_lock_holder_src_table = source_table->lockStructureForShare(false, RWLockImpl::NO_QUERY);
DataPartStates valid_states{MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, DataPartStates valid_states{MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed,
MergeTreeDataPartState::Outdated}; MergeTreeDataPartState::Outdated};
@ -2699,7 +2699,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
TableStructureReadLockHolder table_lock_holder; TableStructureReadLockHolder table_lock_holder;
if (!to_detached) if (!to_detached)
table_lock_holder = lockStructureForShare(RWLockImpl::NO_QUERY); table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
/// Logging /// Logging
Stopwatch stopwatch; Stopwatch stopwatch;
@ -3223,7 +3223,7 @@ void StorageReplicatedMergeTree::alter(
alter_entry.emplace(); alter_entry.emplace();
mutation_znode.reset(); mutation_znode.reset();
/// We can safely read structure, because we guarded with alter_lock /// We can safely read structure, because we guarded with alter_intention_lock
if (is_readonly) if (is_readonly)
throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY); throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
@ -3428,14 +3428,14 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
case PartitionCommand::FREEZE_PARTITION: case PartitionCommand::FREEZE_PARTITION:
{ {
auto lock = lockStructureForShare(query_context.getCurrentQueryId()); auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
freezePartition(command.partition, command.with_name, query_context, lock); freezePartition(command.partition, command.with_name, query_context, lock);
} }
break; break;
case PartitionCommand::FREEZE_ALL_PARTITIONS: case PartitionCommand::FREEZE_ALL_PARTITIONS:
{ {
auto lock = lockStructureForShare(query_context.getCurrentQueryId()); auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
freezeAll(command.with_name, query_context, lock); freezeAll(command.with_name, query_context, lock);
} }
break; break;
@ -4443,7 +4443,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{ {
/// Critical section is not required (since grabOldParts() returns unique part set on each call) /// Critical section is not required (since grabOldParts() returns unique part set on each call)
auto table_lock = lockStructureForShare(RWLockImpl::NO_QUERY); auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
DataPartsVector parts = grabOldParts(); DataPartsVector parts = grabOldParts();
@ -4738,8 +4738,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
const Context & context) const Context & context)
{ {
/// First argument is true, because we possibly will add new data to current table. /// First argument is true, because we possibly will add new data to current table.
auto lock1 = lockStructureForShare(context.getCurrentQueryId()); auto lock1 = lockStructureForShare(true, context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(context.getCurrentQueryId()); auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
Stopwatch watch; Stopwatch watch;
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table); MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table);
@ -4917,8 +4917,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context) void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
{ {
auto lock1 = lockStructureForShare(context.getCurrentQueryId()); auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = dest_table->lockStructureForShare(context.getCurrentQueryId()); auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId());
auto dest_table_storage = std::dynamic_pointer_cast<StorageReplicatedMergeTree>(dest_table); auto dest_table_storage = std::dynamic_pointer_cast<StorageReplicatedMergeTree>(dest_table);
if (!dest_table_storage) if (!dest_table_storage)

View File

@ -103,7 +103,7 @@ protected:
try try
{ {
table_lock = storage->lockStructureForShare(query_id); table_lock = storage->lockStructureForShare(false, query_id);
} }
catch (const Exception & e) catch (const Exception & e)
{ {

View File

@ -192,7 +192,7 @@ StoragesInfo StoragesInfoStream::next()
try try
{ {
/// For table not to be dropped and set of columns to remain constant. /// For table not to be dropped and set of columns to remain constant.
info.table_lock = info.storage->lockStructureForShare(query_id); info.table_lock = info.storage->lockStructureForShare(false, query_id);
} }
catch (const Exception & e) catch (const Exception & e)
{ {

View File

@ -244,7 +244,7 @@ protected:
if (need_lock_structure) if (need_lock_structure)
{ {
table = tables_it->table(); table = tables_it->table();
lock = table->lockStructureForShare(context.getCurrentQueryId()); lock = table->lockStructureForShare(false, context.getCurrentQueryId());
} }
} }
catch (const Exception & e) catch (const Exception & e)

View File

@ -12,11 +12,12 @@ struct TableStructureWriteLockHolder
{ {
void release() void release()
{ {
*this = {}; *this = TableStructureWriteLockHolder();
} }
void releaseAllExceptAlterIntention() void releaseAllExceptAlterIntention()
{ {
new_data_structure_lock.reset();
structure_lock.reset(); structure_lock.reset();
} }
@ -24,7 +25,8 @@ private:
friend class IStorage; friend class IStorage;
/// Order is important. /// Order is important.
std::unique_lock<std::mutex> alter_lock; RWLockImpl::LockHolder alter_intention_lock;
RWLockImpl::LockHolder new_data_structure_lock;
RWLockImpl::LockHolder structure_lock; RWLockImpl::LockHolder structure_lock;
}; };
@ -32,13 +34,14 @@ struct TableStructureReadLockHolder
{ {
void release() void release()
{ {
*this = {}; *this = TableStructureReadLockHolder();
} }
private: private:
friend class IStorage; friend class IStorage;
/// Order is important. /// Order is important.
RWLockImpl::LockHolder new_data_structure_lock;
RWLockImpl::LockHolder structure_lock; RWLockImpl::LockHolder structure_lock;
}; };