separate Read and Write lock holders, better method names

This commit is contained in:
Alexey Zatelepin 2019-03-07 21:04:47 +03:00
parent ab025805a3
commit 3344955154
36 changed files with 106 additions and 103 deletions

View File

@ -115,7 +115,7 @@ public:
size_t checkDepth(size_t max_depth) const { return checkDepthImpl(max_depth, max_depth); }
/// Do not allow to change the table while the blocks stream and its children are alive.
void addTableLock(const TableStructureLockHolder & lock) { table_locks.push_back(lock); }
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
/// Get information about execution speed.
const BlockStreamProfileInfo & getProfileInfo() const { return info; }
@ -242,7 +242,7 @@ public:
protected:
/// Order is important: `table_locks` must be destroyed after `children` so that tables from
/// which child streams read are protected by the locks during the lifetime of the child streams.
std::vector<TableStructureLockHolder> table_locks;
std::vector<TableStructureReadLockHolder> table_locks;
BlockInputStreams children;
std::shared_mutex children_mutex;

View File

@ -58,10 +58,10 @@ public:
/** Don't let to alter table while instance of stream is alive.
*/
void addTableLock(const TableStructureLockHolder & lock) { table_locks.push_back(lock); }
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
private:
std::vector<TableStructureLockHolder> table_locks;
std::vector<TableStructureReadLockHolder> table_locks;
};
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;

View File

@ -20,7 +20,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
* but it's clear that here is not the best place for this functionality.
*/
addTableLock(storage->lockStructure(true, context.getCurrentQueryId()));
addTableLock(storage->lockStructureForShare(true, context.getCurrentQueryId()));
/// If the "root" table deduplactes blocks, there are no need to make deduplication for children
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks
@ -45,7 +45,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
auto & materialized_view = dynamic_cast<const StorageMaterializedView &>(*dependent_table);
if (StoragePtr inner_table = materialized_view.tryGetTargetTable())
addTableLock(inner_table->lockStructure(true, context.getCurrentQueryId()));
addTableLock(inner_table->lockStructureForShare(true, context.getCurrentQueryId()));
auto query = materialized_view.getInnerQuery();
BlockOutputStreamPtr out = std::make_shared<PushingToViewsBlockOutputStream>(

View File

@ -65,7 +65,7 @@ FunctionBasePtr FunctionBuilderJoinGet::buildImpl(const ColumnsWithTypeAndName &
auto join = storage_join->getJoin();
DataTypes data_types(arguments.size());
auto table_lock = storage_join->lockStructure(false, context.getCurrentQueryId());
auto table_lock = storage_join->lockStructureForShare(false, context.getCurrentQueryId());
for (size_t i = 0; i < arguments.size(); ++i)
data_types[i] = arguments[i].type;

View File

@ -15,7 +15,7 @@ public:
static constexpr auto name = "joinGet";
FunctionJoinGet(
TableStructureLockHolder table_lock, StoragePtr storage_join, JoinPtr join, const String & attr_name, DataTypePtr return_type)
TableStructureReadLockHolder table_lock, StoragePtr storage_join, JoinPtr join, const String & attr_name, DataTypePtr return_type)
: table_lock(std::move(table_lock))
, storage_join(std::move(storage_join))
, join(std::move(join))
@ -35,7 +35,7 @@ private:
size_t getNumberOfArguments() const override { return 0; }
private:
TableStructureLockHolder table_lock;
TableStructureReadLockHolder table_lock;
StoragePtr storage_join;
JoinPtr join;
const String attr_name;

View File

@ -74,7 +74,7 @@ BlockIO InterpreterAlterQuery::execute()
if (!alter_commands.empty())
{
auto structure_lock = table->lockIntentionForAlter(context.getCurrentQueryId());
auto structure_lock = table->lockAlterIntention(context.getCurrentQueryId());
alter_commands.validate(*table, context);
table->alter(alter_commands, database_name, table_name, context, structure_lock);
}

View File

@ -587,11 +587,11 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
String as_table_name = create.as_table;
StoragePtr as_storage;
TableStructureLockHolder as_storage_lock;
TableStructureReadLockHolder as_storage_lock;
if (!as_table_name.empty())
{
as_storage = context.getTable(as_database_name, as_table_name);
as_storage_lock = as_storage->lockStructure(false, context.getCurrentQueryId());
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
}
/// Set and retrieve list of columns.

View File

@ -93,7 +93,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
table = context.getTable(database_name, table_name);
}
auto table_lock = table->lockStructure(false, context.getCurrentQueryId());
auto table_lock = table->lockStructureForShare(false, context.getCurrentQueryId());
columns = table->getColumns().getAll();
column_defaults = table->getColumns().defaults;
column_comments = table->getColumns().comments;

View File

@ -69,7 +69,6 @@ BlockIO InterpreterDropQuery::executeToTable(String & database_name_, String & t
{
database_and_table.second->shutdown();
/// If table was already dropped by anyone, an exception will be thrown
// TODO
auto table_lock = database_and_table.second->lockExclusively(context.getCurrentQueryId());
/// Drop table from memory, don't touch data and metadata
database_and_table.first->detachTable(database_and_table.second->getTableName());

View File

@ -96,7 +96,7 @@ BlockIO InterpreterInsertQuery::execute()
checkAccess(query);
StoragePtr table = getTable(query);
auto table_lock = table->lockStructure(true, context.getCurrentQueryId());
auto table_lock = table->lockStructureForShare(true, context.getCurrentQueryId());
/// We create a pipeline of several streams, into which we will write data.
BlockOutputStreamPtr out;

View File

@ -23,7 +23,7 @@ BlockIO InterpreterOptimizeQuery::execute()
return executeDDLQueryOnCluster(query_ptr, context, {ast.database});
StoragePtr table = context.getTable(ast.database, ast.table);
auto table_lock = table->lockStructure(true, context.getCurrentQueryId());
auto table_lock = table->lockStructureForShare(true, context.getCurrentQueryId());
table->optimize(query_ptr, ast.partition, ast.final, ast.deduplicate, context);
return {};
}

View File

@ -96,7 +96,7 @@ BlockIO InterpreterRenameQuery::execute()
table_guards.emplace(to, context.getDDLGuard(to.database_name, to.table_name));
}
std::vector<TableStructureLockHolder> locks;
std::vector<TableStructureWriteLockHolder> locks;
locks.reserve(unique_tables_from.size());
for (const auto & names : unique_tables_from)

View File

@ -200,7 +200,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
if (storage)
table_lock = storage->lockStructure(false, context.getCurrentQueryId());
table_lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
syntax_analyzer_result = SyntaxAnalyzer(context, subquery_depth).analyze(
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage);

View File

@ -248,7 +248,7 @@ private:
/// Table from where to read data, if not subquery.
StoragePtr storage;
TableStructureLockHolder table_lock;
TableStructureReadLockHolder table_lock;
/// Used when we read from prepared input, not table or subquery.
BlockInputStreamPtr input;

View File

@ -5,7 +5,7 @@
namespace DB
{
void IStorage::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context, TableStructureLockHolder & structure_lock)
void IStorage::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context, TableStructureWriteLockHolder & structure_lock)
{
for (const auto & param : params)
{
@ -13,7 +13,7 @@ void IStorage::alter(const AlterCommands & params, const String & database_name,
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
lockStructureForAlter(structure_lock, context.getCurrentQueryId());
lockStructureExclusively(structure_lock, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndicesDescription();
params.apply(new_columns);

View File

@ -84,9 +84,9 @@ public:
virtual bool supportsDeduplication() const { return false; }
TableStructureLockHolder lockStructure(bool will_add_new_data, const String & query_id)
TableStructureReadLockHolder lockStructureForShare(bool will_add_new_data, const String & query_id)
{
TableStructureLockHolder result;
TableStructureReadLockHolder result;
if (will_add_new_data)
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Read, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Read, query_id);
@ -96,9 +96,9 @@ public:
return result;
}
TableStructureLockHolder lockIntentionForAlter(const String & query_id)
TableStructureWriteLockHolder lockAlterIntention(const String & query_id)
{
TableStructureLockHolder result;
TableStructureWriteLockHolder result;
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped)
@ -106,19 +106,16 @@ public:
return result;
}
void lockDataForAlter(TableStructureLockHolder & lock_holder, const String & query_id)
void lockNewDataStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id)
{
if (!lock_holder.alter_intention_lock)
throw Exception("Alter intention lock for table " + getTableName() + " was not taken. This is a bug.",
ErrorCodes::LOGICAL_ERROR);
lock_holder.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
}
void lockStructureForAlter(TableStructureLockHolder & lock_holder, const String & query_id)
void lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id)
{
if (!lock_holder.alter_intention_lock)
throw Exception("Alter intention lock for table " + getTableName() + " was not taken. This is a bug.",
@ -127,21 +124,19 @@ public:
if (!lock_holder.new_data_structure_lock)
lock_holder.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
lock_holder.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
}
TableStructureLockHolder lockExclusively(const String & query_id)
TableStructureWriteLockHolder lockExclusively(const String & query_id)
{
TableStructureLockHolder result;
TableStructureWriteLockHolder result;
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
return result;
}
@ -221,7 +216,7 @@ public:
* This method must fully execute the ALTER query, taking care of the locks itself.
* To update the table metadata on disk, this method should call InterpreterAlterQuery::updateMetadata.
*/
virtual void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context, TableStructureLockHolder & structure_lock);
virtual void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context, TableStructureWriteLockHolder & structure_lock);
/** ALTER tables with regard to its partitions.
* Should handle locks for each command on its own.

View File

@ -79,7 +79,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
try
{
auto storage_lock = owned_storage->lockStructure(false, RWLockImpl::NO_QUERY);
auto storage_lock = owned_storage->lockStructureForShare(false, RWLockImpl::NO_QUERY);
MergeTreeData::DataPartPtr part = findPart(part_name);

View File

@ -108,7 +108,6 @@ void ReplicatedMergeTreeAlterThread::run()
LOG_INFO(log, "Version of metadata nodes in ZooKeeper changed. Waiting for structure write lock.");
// TODO
auto table_lock = storage.lockExclusively(RWLockImpl::NO_QUERY);
if (columns_in_zk == storage.getColumns() && metadata_diff.empty())
@ -135,7 +134,7 @@ void ReplicatedMergeTreeAlterThread::run()
/// Update parts.
if (changed_columns_version || force_recheck_parts)
{
auto table_lock = storage.lockStructure(false, RWLockImpl::NO_QUERY);
auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY);
if (changed_columns_version)
LOG_INFO(log, "ALTER-ing parts");

View File

@ -202,7 +202,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
else if (part->name == part_name)
{
auto zookeeper = storage.getZooKeeper();
auto table_lock = storage.lockStructure(false, RWLockImpl::NO_QUERY);
auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
part->columns, part->checksums);

View File

@ -150,7 +150,7 @@ BlockInputStreams StorageBuffer::read(
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
auto destination_lock = destination->lockStructure(false, context.getCurrentQueryId());
auto destination_lock = destination->lockStructureForShare(false, context.getCurrentQueryId());
const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [this, destination](const String& column_name)
{
@ -677,9 +677,9 @@ void StorageBuffer::flushThread()
}
void StorageBuffer::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context, TableStructureLockHolder & structure_lock)
void StorageBuffer::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context, TableStructureWriteLockHolder & structure_lock)
{
lockStructureForAlter(structure_lock, context.getCurrentQueryId());
lockStructureExclusively(structure_lock, context.getCurrentQueryId());
/// So that no blocks of the old structure remain.
optimize({} /*query*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context);

View File

@ -83,7 +83,7 @@ public:
/// The structure of the subordinate table is not checked and does not change.
void alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & context, TableStructureLockHolder & structure_locks) override;
const Context & context, TableStructureWriteLockHolder & structure_lock) override;
private:
String name;

View File

@ -338,9 +338,9 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context & c
void StorageDistributed::alter(
const AlterCommands & params, const String & database_name, const String & current_table_name,
const Context & context, TableStructureLockHolder & structure_lock)
const Context & context, TableStructureWriteLockHolder & structure_lock)
{
lockStructureForAlter(structure_lock, context.getCurrentQueryId());
lockStructureExclusively(structure_lock, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndicesDescription();

View File

@ -83,7 +83,7 @@ public:
/// the structure of the sub-table is not checked
void alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & context, TableStructureLockHolder & structure_lock) override;
const Context & context, TableStructureWriteLockHolder & structure_lock) override;
void startup() override;
void shutdown() override;

View File

@ -190,7 +190,7 @@ BlockInputStreams StorageMaterializedView::read(
const unsigned num_streams)
{
auto storage = getTargetTable();
auto lock = storage->lockStructure(false, context.getCurrentQueryId());
auto lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
auto streams = storage->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
for (auto & stream : streams)
stream->addTableLock(lock);
@ -200,7 +200,7 @@ BlockInputStreams StorageMaterializedView::read(
BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const Context & context)
{
auto storage = getTargetTable();
auto lock = storage->lockStructure(true, context.getCurrentQueryId());
auto lock = storage->lockStructureForShare(true, context.getCurrentQueryId());
auto stream = storage->write(query, context);
stream->addTableLock(lock);
return stream;

View File

@ -224,7 +224,7 @@ BlockInputStreams StorageMerge::read(
current_streams = std::max(size_t(1), current_streams);
StoragePtr storage = it->first;
TableStructureLockHolder struct_lock = it->second;
TableStructureReadLockHolder struct_lock = it->second;
BlockInputStreams source_streams;
@ -262,7 +262,7 @@ BlockInputStreams StorageMerge::read(
BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size, const Block & header, const StoragePtr & storage,
const TableStructureLockHolder & struct_lock, Names & real_column_names,
const TableStructureReadLockHolder & struct_lock, Names & real_column_names,
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
bool concat_streams)
{
@ -345,7 +345,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String
{
auto & table = iterator->table();
if (table.get() != this)
selected_tables.emplace_back(table, table->lockStructure(false, query_id));
selected_tables.emplace_back(table, table->lockStructureForShare(false, query_id));
}
iterator->next();
@ -375,7 +375,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr
if (storage.get() != this)
{
virtual_column->insert(storage->getTableName());
selected_tables.emplace_back(storage, get_lock ? storage->lockStructure(false, query_id) : TableStructureLockHolder{});
selected_tables.emplace_back(storage, get_lock ? storage->lockStructureForShare(false, query_id) : TableStructureReadLockHolder{});
}
}
@ -397,9 +397,9 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr
void StorageMerge::alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & context, TableStructureLockHolder & structure_lock)
const Context & context, TableStructureWriteLockHolder & structure_lock)
{
lockStructureForAlter(structure_lock, context.getCurrentQueryId());
lockStructureExclusively(structure_lock, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndicesDescription();

View File

@ -46,7 +46,7 @@ public:
/// the structure of sub-tables is not checked
void alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & context, TableStructureLockHolder & structure_lock) override;
const Context & context, TableStructureWriteLockHolder & structure_lock) override;
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override;
@ -56,7 +56,7 @@ private:
OptimizedRegularExpression table_name_regexp;
Context global_context;
using StorageListWithLocks = std::list<std::pair<StoragePtr, TableStructureLockHolder>>;
using StorageListWithLocks = std::list<std::pair<StoragePtr, TableStructureReadLockHolder>>;
StorageListWithLocks getSelectedTables(const String & query_id) const;
@ -78,7 +78,7 @@ protected:
BlockInputStreams createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size, const Block & header, const StoragePtr & storage,
const TableStructureLockHolder & struct_lock, Names & real_column_names,
const TableStructureReadLockHolder & struct_lock, Names & real_column_names,
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
bool concat_streams = false);

View File

@ -196,11 +196,11 @@ void StorageMergeTree::alter(
const String & current_database_name,
const String & current_table_name,
const Context & context,
TableStructureLockHolder & structure_lock)
TableStructureWriteLockHolder & structure_lock)
{
if (!params.is_mutable())
{
lockStructureForAlter(structure_lock, context.getCurrentQueryId());
lockStructureExclusively(structure_lock, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndicesDescription();
params.apply(new_columns);
@ -212,7 +212,7 @@ void StorageMergeTree::alter(
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
lockDataForAlter(structure_lock, context.getCurrentQueryId());
lockNewDataStructureExclusively(structure_lock, context.getCurrentQueryId());
data.checkAlter(params, context);
@ -231,7 +231,7 @@ void StorageMergeTree::alter(
transactions.push_back(std::move(transaction));
}
lockStructureForAlter(structure_lock, context.getCurrentQueryId());
lockStructureExclusively(structure_lock, context.getCurrentQueryId());
IDatabase::ASTModifier storage_modifier = [&] (IAST & ast)
{
@ -453,7 +453,7 @@ bool StorageMergeTree::merge(
bool deduplicate,
String * out_disable_reason)
{
auto structure_lock = lockStructure(true, RWLockImpl::NO_QUERY);
auto structure_lock = lockStructureForShare(true, RWLockImpl::NO_QUERY);
FutureMergedMutatedPart future_part;
@ -563,7 +563,7 @@ bool StorageMergeTree::merge(
bool StorageMergeTree::tryMutatePart()
{
auto structure_lock = lockStructure(true, RWLockImpl::NO_QUERY);
auto structure_lock = lockStructureForShare(true, RWLockImpl::NO_QUERY);
FutureMergedMutatedPart future_part;
MutationCommands commands;
@ -775,7 +775,7 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi
auto merge_blocker = merger_mutator.actions_blocker.cancel();
/// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
auto lock_read_structure = lockStructure(false, context.getCurrentQueryId());
auto lock_read_structure = lockStructureForShare(false, context.getCurrentQueryId());
String partition_id = data.getPartitionIDFromQuery(partition, context);
auto parts = data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
@ -880,7 +880,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructure(false, context.getCurrentQueryId());
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
data.freezePartition(command.partition, command.with_name, context);
}
break;
@ -891,7 +891,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructure(false, context.getCurrentQueryId());
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
data.freezeAll(command.with_name, context);
}
break;
@ -909,7 +909,6 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, cons
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
/// Waits for completion of merge and does not start new ones.
// TODO
auto lock = lockExclusively(context.getCurrentQueryId());
String partition_id = data.getPartitionIDFromQuery(partition, context);
@ -993,8 +992,8 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context)
{
auto lock1 = lockStructure(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructure(false, context.getCurrentQueryId());
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
Stopwatch watch;
MergeTreeData * src_data = data.checkStructureAndGetMergeTreeData(source_table);

View File

@ -79,7 +79,7 @@ public:
void alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & context, TableStructureLockHolder & structure_lock) override;
const Context & context, TableStructureWriteLockHolder & structure_lock) override;
void checkTableCanBeDropped() const override;

View File

@ -32,9 +32,9 @@ void registerStorageNull(StorageFactory & factory)
void StorageNull::alter(
const AlterCommands & params, const String & current_database_name, const String & current_table_name,
const Context & context, TableStructureLockHolder & structure_lock)
const Context & context, TableStructureWriteLockHolder & structure_lock)
{
lockStructureForAlter(structure_lock, context.getCurrentQueryId());
lockStructureExclusively(structure_lock, context.getCurrentQueryId());
ColumnsDescription new_columns = getColumns();
IndicesDescription new_indices = getIndicesDescription();

View File

@ -43,7 +43,7 @@ public:
void alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & context, TableStructureLockHolder & structure_lock) override;
const Context & context, TableStructureWriteLockHolder & structure_lock) override;
private:
String table_name;

View File

@ -1089,7 +1089,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
/// Can throw an exception.
DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_merge);
auto table_lock = lockStructure(false, RWLockImpl::NO_QUERY);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
FutureMergedMutatedPart future_merged_part(parts);
if (future_merged_part.name != entry.new_part_name)
@ -1219,7 +1219,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/// Can throw an exception.
DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_result);
auto table_lock = lockStructure(false, RWLockImpl::NO_QUERY);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
MergeTreeData::MutableDataPartPtr new_part;
MergeTreeData::Transaction transaction(data);
@ -1528,7 +1528,7 @@ void StorageReplicatedMergeTree::executeClearColumnInPartition(const LogEntry &
/// We don't change table structure, only data in some parts
/// To disable reading from these parts, we will sequentially acquire write lock for each part inside alterDataPart()
/// If we will lock the whole table here, a deadlock can occur. For example, if use use Buffer table (CLICKHOUSE-3238)
auto lock_read_structure = lockStructure(false, RWLockImpl::NO_QUERY);
auto lock_read_structure = lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto zookeeper = getZooKeeper();
@ -1624,7 +1624,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
PartDescriptions parts_to_add;
MergeTreeData::DataPartsVector parts_to_remove;
auto structure_lock_dst_table = lockStructure(false, RWLockImpl::NO_QUERY);
auto structure_lock_dst_table = lockStructureForShare(false, RWLockImpl::NO_QUERY);
for (size_t i = 0; i < entry_replace.new_part_names.size(); ++i)
{
@ -1662,7 +1662,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
}
StoragePtr source_table;
TableStructureLockHolder structure_lock_src_table;
TableStructureReadLockHolder structure_lock_src_table;
String source_table_name = entry_replace.from_database + "." + entry_replace.from_table;
auto clone_data_parts_from_source_table = [&] () -> size_t
@ -1686,7 +1686,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
return 0;
}
structure_lock_src_table = source_table->lockStructure(false, RWLockImpl::NO_QUERY);
structure_lock_src_table = source_table->lockStructureForShare(false, RWLockImpl::NO_QUERY);
MergeTreeData::DataPartStates valid_states{MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed,
MergeTreeDataPartState::Outdated};
@ -2719,9 +2719,9 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
LOG_DEBUG(log, "Fetching part " << part_name << " from " << source_replica_path);
TableStructureLockHolder table_lock;
TableStructureReadLockHolder table_lock;
if (!to_detached)
table_lock = lockStructure(true, RWLockImpl::NO_QUERY);
table_lock = lockStructureForShare(true, RWLockImpl::NO_QUERY);
/// Logging
Stopwatch stopwatch;
@ -3089,7 +3089,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
void StorageReplicatedMergeTree::alter(
const AlterCommands & params, const String & /*database_name*/, const String & /*table_name*/,
const Context & query_context, TableStructureLockHolder & structure_lock)
const Context & query_context, TableStructureWriteLockHolder & structure_lock)
{
assertNotReadonly();
@ -3125,7 +3125,7 @@ void StorageReplicatedMergeTree::alter(
{
/// Just to read current structure. Alter will be done in separate thread.
auto table_lock = lockStructure(false, query_context.getCurrentQueryId());
auto table_lock = lockStructureForShare(false, query_context.getCurrentQueryId());
if (is_readonly)
throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
@ -3385,7 +3385,7 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructure(false, query_context.getCurrentQueryId());
auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
data.freezePartition(command.partition, command.with_name, query_context);
}
break;
@ -3396,7 +3396,7 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructure(false, query_context.getCurrentQueryId());
auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
data.freezeAll(command.with_name, query_context);
}
break;
@ -4451,7 +4451,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{
/// Critical section is not required (since grabOldParts() returns unique part set on each call)
auto table_lock = lockStructure(false, RWLockImpl::NO_QUERY);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto zookeeper = getZooKeeper();
MergeTreeData::DataPartsVector parts = data.grabOldParts();
@ -4743,8 +4743,8 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace,
const Context & context)
{
auto lock1 = lockStructure(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructure(false, context.getCurrentQueryId());
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
Stopwatch watch;
MergeTreeData * src_data = data.checkStructureAndGetMergeTreeData(source_table);

View File

@ -118,7 +118,7 @@ public:
void alter(
const AlterCommands & params, const String & database_name, const String & table_name,
const Context & query_context, TableStructureLockHolder & structure_lock) override;
const Context & query_context, TableStructureWriteLockHolder & structure_lock) override;
void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override;

View File

@ -100,11 +100,11 @@ protected:
{
StoragePtr storage = storages.at(std::make_pair(database_name, table_name));
TableStructureLockHolder table_lock;
TableStructureReadLockHolder table_lock;
try
{
table_lock = storage->lockStructure(false, query_id);
table_lock = storage->lockStructureForShare(false, query_id);
}
catch (const Exception & e)
{

View File

@ -167,7 +167,7 @@ public:
try
{
/// For table not to be dropped and set of columns to remain constant.
info.table_lock = info.storage->lockStructure(false, query_id);
info.table_lock = info.storage->lockStructureForShare(false, query_id);
}
catch (const Exception & e)
{

View File

@ -34,7 +34,7 @@ public:
struct StoragesInfo
{
StoragePtr storage;
TableStructureLockHolder table_lock;
TableStructureReadLockHolder table_lock;
String database;
String table;

View File

@ -5,19 +5,30 @@
namespace DB
{
struct TableStructureLockHolder
struct TableStructureWriteLockHolder
{
void release()
{
*this = TableStructureWriteLockHolder();
}
private:
friend class IStorage;
/// Order is important.
RWLockImpl::LockHolder alter_intention_lock;
RWLockImpl::LockHolder new_data_structure_lock;
RWLockImpl::LockHolder structure_lock;
};
void release()
{
structure_lock.reset();
new_data_structure_lock.reset();
alter_intention_lock.reset();
}
struct TableStructureReadLockHolder
{
private:
friend class IStorage;
/// Order is important.
RWLockImpl::LockHolder new_data_structure_lock;
RWLockImpl::LockHolder structure_lock;
};
}