Better locks in Storages

This commit is contained in:
alesapin 2020-06-18 19:10:47 +03:00
parent c8a58299ac
commit d79982f497
64 changed files with 162 additions and 287 deletions

View File

@ -53,7 +53,7 @@ std::ostream & operator<<(std::ostream & stream, const IStorage & what)
return stream;
}
std::ostream & operator<<(std::ostream & stream, const TableStructureReadLock &)
std::ostream & operator<<(std::ostream & stream, const TableLockHolder &)
{
stream << "TableStructureReadLock()";
return stream;

View File

@ -22,9 +22,6 @@ std::ostream & operator<<(std::ostream & stream, const IDataType & what);
class IStorage;
std::ostream & operator<<(std::ostream & stream, const IStorage & what);
class TableStructureReadLock;
std::ostream & operator<<(std::ostream & stream, const TableStructureReadLock & what);
class IFunctionOverloadResolver;
std::ostream & operator<<(std::ostream & stream, const IFunctionOverloadResolver & what);

View File

@ -109,7 +109,7 @@ public:
size_t checkDepth(size_t max_depth) const { return checkDepthImpl(max_depth, max_depth); }
/// Do not allow to change the table while the blocks stream and its children are alive.
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
/// Get information about execution speed.
const BlockStreamProfileInfo & getProfileInfo() const { return info; }
@ -229,7 +229,7 @@ public:
protected:
/// Order is important: `table_locks` must be destroyed after `children` so that tables from
/// which child streams read are protected by the locks during the lifetime of the child streams.
std::vector<TableStructureReadLockHolder> table_locks;
std::vector<TableLockHolder> table_locks;
BlockInputStreams children;
std::shared_mutex children_mutex;

View File

@ -61,10 +61,10 @@ public:
/** Don't let to alter table while instance of stream is alive.
*/
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
private:
std::vector<TableStructureReadLockHolder> table_locks;
std::vector<TableLockHolder> table_locks;
};
}

View File

@ -33,7 +33,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
* but it's clear that here is not the best place for this functionality.
*/
addTableLock(
storage->lockStructureForShare(true, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout));
storage->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout));
/// If the "root" table deduplactes blocks, there are no need to make deduplication for children
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks
@ -74,8 +74,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get()))
{
addTableLock(
materialized_view->lockStructureForShare(
true, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout));
materialized_view->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout));
StoragePtr inner_table = materialized_view->getTargetTable();
auto inner_table_id = inner_table->getStorageID();

View File

@ -362,7 +362,7 @@ void DatabaseMySQL::cleanOutdatedTables()
++iterator;
else
{
const auto table_lock = (*iterator)->lockAlterIntention(RWLockImpl::NO_QUERY, lock_acquire_timeout);
const auto table_lock = (*iterator)->lockExclusively(RWLockImpl::NO_QUERY, lock_acquire_timeout);
(*iterator)->shutdown();
(*iterator)->is_dropped = true;

View File

@ -266,7 +266,7 @@ void DatabaseOnDisk::renameTable(
}
auto table_data_relative_path = getTableDataPath(table_name);
TableStructureWriteLockHolder table_lock;
TableExclusiveLockHolder table_lock;
String table_metadata_path;
ASTPtr attach_query;
/// DatabaseLazy::detachTable may return nullptr even if table exists, so we need tryGetTable for this case.

View File

@ -22,7 +22,6 @@ class Context;
struct Settings;
struct ConstraintsDescription;
struct IndicesDescription;
struct TableStructureWriteLockHolder;
class ASTCreateQuery;
using Dictionaries = std::vector<String>;

View File

@ -67,8 +67,7 @@ FunctionBaseImplPtr JoinGetOverloadResolver<or_null>::build(const ColumnsWithTyp
auto join = storage_join->getJoin();
DataTypes data_types(arguments.size());
auto table_lock = storage_join->lockStructureForShare(
false, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_lock = storage_join->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
for (size_t i = 0; i < arguments.size(); ++i)
data_types[i] = arguments[i].type;

View File

@ -37,7 +37,7 @@ class FunctionJoinGet final : public IFunctionBaseImpl
public:
static constexpr auto name = or_null ? "joinGetOrNull" : "joinGet";
FunctionJoinGet(TableStructureReadLockHolder table_lock_, StoragePtr storage_join_,
FunctionJoinGet(TableLockHolder table_lock_, StoragePtr storage_join_,
HashJoinPtr join_, String attr_name_,
DataTypes argument_types_, DataTypePtr return_type_)
: table_lock(std::move(table_lock_))
@ -57,7 +57,7 @@ public:
ExecutableFunctionImplPtr prepare(const Block & sample_block, const ColumnNumbers & arguments, size_t result) const override;
private:
TableStructureReadLockHolder table_lock;
TableLockHolder table_lock;
StoragePtr storage_join;
HashJoinPtr join;
const String attr_name;

View File

@ -43,6 +43,7 @@ BlockIO InterpreterAlterQuery::execute()
context.checkAccess(getRequiredAccess());
auto table_id = context.resolveStorageID(alter, Context::ResolveOrdinary);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context);
auto alter_lock = table->lockForAlter(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
/// Add default database to table identifiers that we can encounter in e.g. default expressions,
@ -83,10 +84,7 @@ BlockIO InterpreterAlterQuery::execute()
if (!mutation_commands.empty())
{
auto table_lock_holder = table->lockStructureForShare(
false /* because mutation is executed asyncronously */,
context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
MutationsInterpreter(table, metadata_snapshot, mutation_commands, context, false).validate(table_lock_holder);
MutationsInterpreter(table, metadata_snapshot, mutation_commands, context, false).validate();
table->mutate(mutation_commands, context);
}
@ -112,13 +110,11 @@ BlockIO InterpreterAlterQuery::execute()
if (!alter_commands.empty())
{
auto table_lock_holder = table->lockAlterIntention(
context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(metadata, context);
alter_commands.prepare(metadata);
table->checkAlterIsPossible(alter_commands, context.getSettingsRef());
table->alter(alter_commands, context, table_lock_holder);
table->alter(alter_commands, context, alter_lock);
}
return {};

View File

@ -405,7 +405,7 @@ ConstraintsDescription InterpreterCreateQuery::getConstraintsDescription(const A
InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(ASTCreateQuery & create) const
{
TableProperties properties;
TableStructureReadLockHolder as_storage_lock;
TableLockHolder as_storage_lock;
if (create.columns_list)
{
@ -428,8 +428,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
StoragePtr as_storage = DatabaseCatalog::instance().getTable({as_database_name, create.as_table}, context);
/// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table.
as_storage_lock = as_storage->lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
as_storage_lock = as_storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto as_storage_metadata = as_storage->getInMemoryMetadataPtr();
properties.columns = as_storage_metadata->getColumns();

View File

@ -89,8 +89,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
table = DatabaseCatalog::instance().getTable(table_id, context);
}
auto table_lock = table->lockStructureForShare(
false, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_lock = table->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
columns = metadata_snapshot->getColumns();
}

View File

@ -93,7 +93,7 @@ BlockIO InterpreterDropQuery::executeToTable(
{
context.checkAccess(table->isView() ? AccessType::DROP_VIEW : AccessType::DROP_TABLE, table_id);
table->shutdown();
TableStructureWriteLockHolder table_lock;
TableExclusiveLockHolder table_lock;
if (database->getEngineName() != "Atomic")
table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
/// Drop table from memory, don't touch data and metadata
@ -116,7 +116,7 @@ BlockIO InterpreterDropQuery::executeToTable(
table->shutdown();
TableStructureWriteLockHolder table_lock;
TableExclusiveLockHolder table_lock;
if (database->getEngineName() != "Atomic")
table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);

View File

@ -118,8 +118,7 @@ BlockIO InterpreterInsertQuery::execute()
BlockIO res;
StoragePtr table = getTable(query);
auto table_lock = table->lockStructureForShare(
true, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_lock = table->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
auto query_sample_block = getSampleBlock(query, table, metadata_snapshot);

View File

@ -255,8 +255,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (storage)
{
table_lock = storage->lockStructureForShare(
false, context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
table_id = storage->getStorageID();
if (metadata_snapshot == nullptr)
metadata_snapshot = storage->getInMemoryMetadataPtr();
@ -277,7 +276,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
{
/// Rewritten with subquery. Free storage locks here.
storage = {};
table_lock.release();
table_lock.reset();
table_id = StorageID::createEmpty();
}
}

View File

@ -185,7 +185,7 @@ private:
/// Table from where to read data, if not subquery.
StoragePtr storage;
StorageID table_id = StorageID::createEmpty(); /// Will be initialized if storage is not nullptr
TableStructureReadLockHolder table_lock;
TableLockHolder table_lock;
/// Used when we read from prepared input, not table or subquery.
BlockInputStreamPtr input;

View File

@ -671,7 +671,7 @@ BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::ve
return in;
}
void MutationsInterpreter::validate(TableStructureReadLockHolder &)
void MutationsInterpreter::validate()
{
const Settings & settings = context.getSettingsRef();
@ -696,7 +696,7 @@ void MutationsInterpreter::validate(TableStructureReadLockHolder &)
addStreamsForLaterStages(stages, in)->getHeader();
}
BlockInputStreamPtr MutationsInterpreter::execute(TableStructureReadLockHolder &)
BlockInputStreamPtr MutationsInterpreter::execute()
{
if (!can_execute)
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);

View File

@ -32,12 +32,12 @@ public:
const Context & context_,
bool can_execute_);
void validate(TableStructureReadLockHolder & table_lock_holder);
void validate();
size_t evaluateCommandsSize();
/// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices.
BlockInputStreamPtr execute(TableStructureReadLockHolder & table_lock_holder);
BlockInputStreamPtr execute();
/// Only changed columns.
const Block & getUpdatedHeader() const;

View File

@ -62,12 +62,12 @@ public:
/// Do not allow to change the table while the processors of pipe are alive.
/// TODO: move it to pipeline.
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
/// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible.
void addInterpreterContext(std::shared_ptr<Context> context) { interpreter_context.emplace_back(std::move(context)); }
void addStorageHolder(StoragePtr storage) { storage_holders.emplace_back(std::move(storage)); }
const std::vector<TableStructureReadLockHolder> & getTableLocks() const { return table_locks; }
const std::vector<TableLockHolder> & getTableLocks() const { return table_locks; }
const std::vector<std::shared_ptr<Context>> & getContexts() const { return interpreter_context; }
const std::vector<StoragePtr> & getStorageHolders() const { return storage_holders; }
@ -80,7 +80,7 @@ private:
/// It is the max number of processors which can be executed in parallel for each step. See QueryPipeline::Streams.
size_t max_parallel_streams = 0;
std::vector<TableStructureReadLockHolder> table_locks;
std::vector<TableLockHolder> table_locks;
/// Some processors may implicitly use Context or temporary Storage created by Interpreter.
/// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here,

View File

@ -7,14 +7,13 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/TableStructureLockHolder.h>
namespace DB
{
class TableStructureReadLock;
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
using TableStructureReadLocks = std::vector<TableStructureReadLockHolder>;
using TableLockHolders = std::vector<TableLockHolder>;
class Context;
class IOutputFormat;
@ -146,7 +145,7 @@ public:
const Block & getHeader() const { return current_header; }
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
void addInterpreterContext(std::shared_ptr<Context> context) { interpreter_context.emplace_back(std::move(context)); }
void addStorageHolder(StoragePtr storage) { storage_holders.emplace_back(std::move(storage)); }
@ -180,7 +179,7 @@ private:
/// because QueryPipeline is alive until query is finished.
std::vector<std::shared_ptr<Context>> interpreter_context;
std::vector<StoragePtr> storage_holders;
TableStructureReadLocks table_locks;
TableLockHolders table_locks;
/// Common header for each stream.
Block current_header;

View File

@ -47,58 +47,43 @@ RWLockImpl::LockHolder IStorage::tryLockTimed(
return lock_holder;
}
TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id, const SettingSeconds & acquire_timeout)
TableLockHolder IStorage::lockForShare(const String & query_id, const SettingSeconds & acquire_timeout)
{
TableStructureReadLockHolder result;
if (will_add_new_data)
result.new_data_structure_lock = tryLockTimed(new_data_structure_lock, RWLockImpl::Read, query_id, acquire_timeout);
result.structure_lock = tryLockTimed(structure_lock, RWLockImpl::Read, query_id, acquire_timeout);
TableLockHolder result = tryLockTimed(drop_lock, RWLockImpl::Read, query_id, acquire_timeout);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return result;
}
TableStructureWriteLockHolder IStorage::lockAlterIntention(const String & query_id, const SettingSeconds & acquire_timeout)
TableLockHolder IStorage::lockForAlter(const String & query_id, const SettingSeconds & acquire_timeout)
{
TableStructureWriteLockHolder result;
result.alter_intention_lock = tryLockTimed(alter_intention_lock, RWLockImpl::Write, query_id, acquire_timeout);
TableLockHolder result = tryLockTimed(alter_lock, RWLockImpl::Write, query_id, acquire_timeout);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return result;
}
void IStorage::lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id, const SettingSeconds & acquire_timeout)
{
if (!lock_holder.alter_intention_lock)
throw Exception("Alter intention lock for table " + getStorageID().getNameForLogs() + " was not taken. This is a bug.", ErrorCodes::LOGICAL_ERROR);
if (!lock_holder.new_data_structure_lock)
lock_holder.new_data_structure_lock = tryLockTimed(new_data_structure_lock, RWLockImpl::Write, query_id, acquire_timeout);
lock_holder.structure_lock = tryLockTimed(structure_lock, RWLockImpl::Write, query_id, acquire_timeout);
}
TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id, const SettingSeconds & acquire_timeout)
TableExclusiveLockHolder IStorage::lockExclusively(const String & query_id, const SettingSeconds & acquire_timeout)
{
TableStructureWriteLockHolder result;
result.alter_intention_lock = tryLockTimed(alter_intention_lock, RWLockImpl::Write, query_id, acquire_timeout);
TableExclusiveLockHolder result;
result.alter_lock = tryLockTimed(alter_lock, RWLockImpl::Write, query_id, acquire_timeout);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
result.new_data_structure_lock = tryLockTimed(new_data_structure_lock, RWLockImpl::Write, query_id, acquire_timeout);
result.structure_lock = tryLockTimed(structure_lock, RWLockImpl::Write, query_id, acquire_timeout);
result.drop_lock = tryLockTimed(drop_lock, RWLockImpl::Write, query_id, acquire_timeout);
return result;
}
void IStorage::alter(
const AlterCommands & params,
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
const AlterCommands & params, const Context & context, TableLockHolder &)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_id = getStorageID();
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
params.apply(new_metadata, context);

View File

@ -135,7 +135,7 @@ public:
using ColumnSizeByName = std::unordered_map<std::string, ColumnSize>;
virtual ColumnSizeByName getColumnSizes() const { return {}; }
public: /// thread-unsafe part. lockStructure must be acquired
public:
StorageInMemoryMetadata getInMemoryMetadata() const { return *metadata.get(); }
StorageMetadataPtr getInMemoryMetadataPtr() const { return metadata.get(); }
@ -174,21 +174,11 @@ private:
const RWLock & rwlock, RWLockImpl::Type type, const String & query_id, const SettingSeconds & acquire_timeout) const;
public:
/// Acquire this lock if you need the table structure to remain constant during the execution of
/// the query. If will_add_new_data is true, this means that the query will add new data to the table
/// (INSERT or a parts merge).
TableStructureReadLockHolder lockStructureForShare(bool will_add_new_data, const String & query_id, const SettingSeconds & acquire_timeout);
TableLockHolder lockForShare(const String & query_id, const SettingSeconds & acquire_timeout);
/// Acquire this lock at the start of ALTER to lock out other ALTERs and make sure that only you
/// can modify the table structure. It can later be upgraded to the exclusive lock.
TableStructureWriteLockHolder lockAlterIntention(const String & query_id, const SettingSeconds & acquire_timeout);
TableLockHolder lockForAlter(const String & query_id, const SettingSeconds & acquire_timeout);
/// Upgrade alter intention lock to the full exclusive structure lock. This is done by ALTER queries
/// to ensure that no other query uses the table structure and it can be safely changed.
void lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id, const SettingSeconds & acquire_timeout);
/// Acquire the full exclusive lock immediately. No other queries can run concurrently.
TableStructureWriteLockHolder lockExclusively(const String & query_id, const SettingSeconds & acquire_timeout);
TableExclusiveLockHolder lockExclusively(const String & query_id, const SettingSeconds & acquire_timeout);
/** Returns stage to which query is going to be processed in read() function.
* (Normally, the function only reads the columns from the list, but in other cases,
@ -297,7 +287,7 @@ public:
const ASTPtr & /*query*/,
const StorageMetadataPtr & /* metadata_snapshot */,
const Context & /* context */,
TableStructureWriteLockHolder &)
TableExclusiveLockHolder &)
{
throw Exception("Truncate is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
@ -323,7 +313,7 @@ public:
* This method must fully execute the ALTER query, taking care of the locks itself.
* To update the table metadata on disk, this method should call InterpreterAlterQuery::updateMetadata->
*/
virtual void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder);
virtual void alter(const AlterCommands & params, const Context & context, TableLockHolder & alter_lock_holder);
/** Checks that alter commands can be applied to storage. For example, columns can be modified,
* or primary key can be changes, etc.
@ -441,22 +431,9 @@ public:
}
private:
/// You always need to take the next three locks in this order.
mutable RWLock alter_lock = RWLockImpl::create();
/// If you hold this lock exclusively, you can be sure that no other structure modifying queries
/// (e.g. ALTER, DROP) are concurrently executing. But queries that only read table structure
/// (e.g. SELECT, INSERT) can continue to execute.
mutable RWLock alter_intention_lock = RWLockImpl::create();
/// It is taken for share for the entire INSERT query and the entire merge of the parts (for MergeTree).
/// ALTER COLUMN queries acquire an exclusive lock to ensure that no new parts with the old structure
/// are added to the table and thus the set of parts to modify doesn't change.
mutable RWLock new_data_structure_lock = RWLockImpl::create();
/// Lock for the table column structure (names, types, etc.) and data path.
/// It is taken in exclusive mode by queries that modify them (e.g. RENAME, ALTER and DROP)
/// and in share mode by other queries.
mutable RWLock structure_lock = RWLockImpl::create();
mutable RWLock drop_lock = RWLockImpl::create();
};
}

View File

@ -514,7 +514,7 @@ void StorageLiveView::drop()
void StorageLiveView::refresh(const Context & context)
{
auto alter_lock = lockAlterIntention(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto alter_lock = lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
{
std::lock_guard lock(mutex);
if (getNewBlocks())

View File

@ -1436,7 +1436,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
void MergeTreeData::changeSettings(
const ASTPtr & new_settings,
TableStructureWriteLockHolder & /* table_lock_holder */)
TableLockHolder & /* table_lock_holder */)
{
if (new_settings)
{
@ -1481,7 +1481,7 @@ void MergeTreeData::changeSettings(
}
}
void MergeTreeData::freezeAll(const String & with_name, const Context & context, TableStructureReadLockHolder &)
void MergeTreeData::freezeAll(const String & with_name, const Context & context, TableLockHolder &)
{
freezePartitionsByMatcher([] (const DataPartPtr &){ return true; }, with_name, context);
}
@ -2289,7 +2289,7 @@ void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part
}
void MergeTreeData::freezePartition(const ASTPtr & partition_ast, const String & with_name, const Context & context, TableStructureReadLockHolder &)
void MergeTreeData::freezePartition(const ASTPtr & partition_ast, const String & with_name, const Context & context, TableLockHolder &)
{
std::optional<String> prefix;
String partition_id;

View File

@ -477,7 +477,7 @@ public:
/// Delete all directories which names begin with "tmp"
/// Set non-negative parameter value to override MergeTreeSettings temporary_directories_lifetime
/// Must be called with locked lockStructureForShare().
/// Must be called with locked lockForShare().
void clearOldTemporaryDirectories(ssize_t custom_directories_lifetime_seconds = -1);
/// After the call to dropAllData() no method can be called.
@ -489,7 +489,7 @@ public:
/// Moves the entire data directory.
/// Flushes the uncompressed blocks cache and the marks cache.
/// Must be called with locked lockStructureForAlter().
/// Must be called with locked lockForShare().
void rename(const String & new_table_path, const StorageID & new_table_id) override;
/// Check if the ALTER can be performed:
@ -502,10 +502,10 @@ public:
/// Change MergeTreeSettings
void changeSettings(
const ASTPtr & new_settings,
TableStructureWriteLockHolder & table_lock_holder);
TableLockHolder & table_lock_holder);
/// Freezes all parts.
void freezeAll(const String & with_name, const Context & context, TableStructureReadLockHolder & table_lock_holder);
void freezeAll(const String & with_name, const Context & context, TableLockHolder & table_lock_holder);
/// Should be called if part data is suspected to be corrupted.
void reportBrokenPart(const String & name) const
@ -527,7 +527,7 @@ public:
* Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number,
* or if 'with_name' is specified - backup is created in directory with specified name.
*/
void freezePartition(const ASTPtr & partition, const String & with_name, const Context & context, TableStructureReadLockHolder & table_lock_holder);
void freezePartition(const ASTPtr & partition, const String & with_name, const Context & context, TableLockHolder & table_lock_holder);
public:

View File

@ -581,7 +581,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
const FutureMergedMutatedPart & future_part,
const StorageMetadataPtr & metadata_snapshot,
MergeList::Entry & merge_entry,
TableStructureReadLockHolder &,
TableLockHolder &,
time_t time_of_merge,
const ReservationPtr & space_reservation,
bool deduplicate,
@ -995,7 +995,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
time_t time_of_mutation,
const Context & context,
const ReservationPtr & space_reservation,
TableStructureReadLockHolder & table_lock_holder)
TableLockHolder &)
{
checkOperationIsNotCanceled(merge_entry);
@ -1046,7 +1046,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
if (!for_interpreter.empty())
{
interpreter.emplace(storage_from_source_part, metadata_snapshot, for_interpreter, context_for_reading, true);
in = interpreter->execute(table_lock_holder);
in = interpreter->execute();
updated_header = interpreter->getUpdatedHeader();
in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress));
}

View File

@ -107,7 +107,7 @@ public:
const FutureMergedMutatedPart & future_part,
const StorageMetadataPtr & metadata_snapshot,
MergeListEntry & merge_entry,
TableStructureReadLockHolder & table_lock_holder,
TableLockHolder & table_lock_holder,
time_t time_of_merge,
const ReservationPtr & space_reservation,
bool deduplicate,
@ -122,7 +122,7 @@ public:
time_t time_of_mutation,
const Context & context,
const ReservationPtr & space_reservation,
TableStructureReadLockHolder & table_lock_holder);
TableLockHolder & table_lock_holder);
MergeTreeData::DataPartPtr renameMergedTemporaryPart(
MergeTreeData::MutableDataPartPtr & new_data_part,

View File

@ -58,9 +58,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
storage.clearOldPartsAndRemoveFromZK();
{
/// TODO: Implement tryLockStructureForShare.
auto lock = storage.lockStructureForShare(
false, RWLockImpl::NO_QUERY, storage.getSettings()->lock_acquire_timeout_for_background_operations);
auto lock = storage.lockForShare(RWLockImpl::NO_QUERY, storage.getSettings()->lock_acquire_timeout_for_background_operations);
storage.clearOldTemporaryDirectories();
}

View File

@ -201,8 +201,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
{
auto zookeeper = storage.getZooKeeper();
auto table_lock = storage.lockStructureForShare(
false, RWLockImpl::NO_QUERY, storage.getSettings()->lock_acquire_timeout_for_background_operations);
auto table_lock = storage.lockForShare(RWLockImpl::NO_QUERY, storage.getSettings()->lock_acquire_timeout_for_background_operations);
auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
part->getColumns(), part->checksums);

View File

@ -33,13 +33,13 @@ ReadInOrderOptimizer::ReadInOrderOptimizer(
InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & storage, const StorageMetadataPtr & metadata_snapshot) const
{
Names sorting_key_columns;
if (const auto * merge_tree = dynamic_cast<const MergeTreeData *>(storage.get()))
if (dynamic_cast<const MergeTreeData *>(storage.get()))
{
if (!metadata_snapshot->hasSortingKey())
return {};
sorting_key_columns = metadata_snapshot->getSortingKeyColumns();
}
else if (const auto * part = dynamic_cast<const StorageFromMergeTreeDataPart *>(storage.get()))
else if (dynamic_cast<const StorageFromMergeTreeDataPart *>(storage.get()))
{
if (!metadata_snapshot->hasSortingKey())
return {};

View File

@ -163,8 +163,7 @@ Pipes StorageBuffer::read(
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
auto destination_lock = destination->lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto destination_lock = destination->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto destination_metadata_snapshot = destination->getInMemoryMetadataPtr();
@ -804,10 +803,8 @@ std::optional<UInt64> StorageBuffer::totalBytes() const
return bytes;
}
void StorageBuffer::alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
void StorageBuffer::alter(const AlterCommands & params, const Context & context, TableLockHolder &)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_id = getStorageID();
checkAlterIsPossible(params, context.getSettingsRef());
auto metadata_snapshot = getInMemoryMetadataPtr();

View File

@ -89,7 +89,7 @@ public:
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) const override;
/// The structure of the subordinate table is not checked and does not change.
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void alter(const AlterCommands & params, const Context & context, TableLockHolder & table_lock_holder) override;
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalBytes() const override;

View File

@ -564,9 +564,8 @@ void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, co
}
}
void StorageDistributed::alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
void StorageDistributed::alter(const AlterCommands & params, const Context & context, TableLockHolder &)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_id = getStorageID();
checkAlterIsPossible(params, context.getSettingsRef());
@ -619,7 +618,7 @@ Strings StorageDistributed::getDataPaths() const
return paths;
}
void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableStructureWriteLockHolder &)
void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{
std::lock_guard lock(cluster_nodes_mutex);

View File

@ -82,7 +82,7 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
/// Removes temporary data in local filesystem.
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
void renameOnDisk(const String & new_path_to_table_data);
@ -91,7 +91,7 @@ public:
/// in the sub-tables, you need to manually add and delete columns
/// the structure of the sub-table is not checked
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void alter(const AlterCommands & params, const Context & context, TableLockHolder & table_lock_holder) override;
void startup() override;
void shutdown() override;

View File

@ -527,7 +527,7 @@ void StorageFile::truncate(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /* metadata_snapshot */,
const Context & /* context */,
TableStructureWriteLockHolder &)
TableExclusiveLockHolder &)
{
if (paths.size() != 1)
throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);

View File

@ -42,7 +42,7 @@ public:
const ASTPtr & /*query*/,
const StorageMetadataPtr & /* metadata_snapshot */,
const Context & /* context */,
TableStructureWriteLockHolder &) override;
TableExclusiveLockHolder &) override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;

View File

@ -65,7 +65,7 @@ StorageJoin::StorageJoin(
void StorageJoin::truncate(
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableStructureWriteLockHolder &)
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder&)
{
Poco::File(path).remove(true);
Poco::File(path).createDirectories();

View File

@ -27,7 +27,7 @@ class StorageJoin final : public ext::shared_ptr_helper<StorageJoin>, public Sto
public:
String getName() const override { return "Join"; }
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
/// Access the innards.
HashJoinPtr & getJoin() { return join; }

View File

@ -535,7 +535,7 @@ void StorageLog::rename(const String & new_path_to_table_data, const StorageID &
renameInMemory(new_table_id);
}
void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableStructureWriteLockHolder &)
void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &)
{
std::shared_lock<std::shared_mutex> lock(rwlock);

View File

@ -39,7 +39,7 @@ public:
CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }

View File

@ -116,8 +116,7 @@ Pipes StorageMaterializedView::read(
const unsigned num_streams)
{
auto storage = getTargetTable();
auto lock = storage->lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock = storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
if (query_info.order_optimizer)
@ -134,8 +133,7 @@ Pipes StorageMaterializedView::read(
BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context)
{
auto storage = getTargetTable();
auto lock = storage->lockStructureForShare(
true, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock = storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
auto stream = storage->write(query, metadata_snapshot, context);
@ -173,7 +171,7 @@ void StorageMaterializedView::drop()
executeDropQuery(ASTDropQuery::Kind::Drop, global_context, target_table_id);
}
void StorageMaterializedView::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableStructureWriteLockHolder &)
void StorageMaterializedView::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{
if (has_inner_table)
executeDropQuery(ASTDropQuery::Kind::Truncate, global_context, target_table_id);
@ -204,9 +202,8 @@ bool StorageMaterializedView::optimize(
void StorageMaterializedView::alter(
const AlterCommands & params,
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
TableLockHolder &)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_id = getStorageID();
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();

View File

@ -37,7 +37,7 @@ public:
void drop() override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;
bool optimize(
const ASTPtr & query,
@ -47,7 +47,7 @@ public:
bool deduplicate,
const Context & context) override;
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void alter(const AlterCommands & params, const Context & context, TableLockHolder & table_lock_holder) override;
void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings) const override;

View File

@ -149,7 +149,7 @@ void StorageMemory::drop()
}
void StorageMemory::truncate(
const ASTPtr &, const StorageMetadataPtr &, const Context &, TableStructureWriteLockHolder &)
const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{
std::lock_guard lock(mutex);
data.clear();

View File

@ -41,7 +41,7 @@ public:
void drop() override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalBytes() const override;

View File

@ -333,7 +333,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String
const auto & table = iterator->table();
if (table && table.get() != this)
selected_tables.emplace_back(
table, table->lockStructureForShare(false, query_id, settings.lock_acquire_timeout), iterator->name());
table, table->lockForShare(query_id, settings.lock_acquire_timeout), iterator->name());
iterator->next();
}
@ -362,7 +362,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
if (storage.get() != this)
{
selected_tables.emplace_back(
storage, storage->lockStructureForShare(false, query_id, settings.lock_acquire_timeout), iterator->name());
storage, storage->lockForShare(query_id, settings.lock_acquire_timeout), iterator->name());
virtual_column->insert(iterator->name());
}
@ -405,9 +405,8 @@ void StorageMerge::checkAlterIsPossible(const AlterCommands & commands, const Se
}
void StorageMerge::alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
const AlterCommands & params, const Context & context, TableLockHolder &)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_id = getStorageID();
StorageInMemoryMetadata storage_metadata = getInMemoryMetadata();

View File

@ -42,7 +42,7 @@ public:
/// you need to add and remove columns in the sub-tables manually
/// the structure of sub-tables is not checked
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void alter(const AlterCommands & params, const Context & context, TableLockHolder & table_lock_holder) override;
bool mayBenefitFromIndexForIn(
const ASTPtr & left_in_operand, const Context & query_context, const StorageMetadataPtr & metadata_snapshot) const override;
@ -52,7 +52,7 @@ private:
OptimizedRegularExpression table_name_regexp;
Context global_context;
using StorageWithLockAndName = std::tuple<StoragePtr, TableStructureReadLockHolder, String>;
using StorageWithLockAndName = std::tuple<StoragePtr, TableLockHolder, String>;
using StorageListWithLocks = std::list<StorageWithLockAndName>;
StorageListWithLocks getSelectedTables(const String & query_id, const Settings & settings) const;

View File

@ -231,7 +231,7 @@ void StorageMergeTree::drop()
dropAllData();
}
void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableStructureWriteLockHolder &)
void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{
{
/// Asks to complete merges and does not allow them to start.
@ -254,7 +254,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, cons
void StorageMergeTree::alter(
const AlterCommands & commands,
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
TableLockHolder & table_lock_holder)
{
auto table_id = getStorageID();
@ -268,8 +268,6 @@ void StorageMergeTree::alter(
/// This alter can be performed at new_metadata level only
if (commands.isSettingsAlter())
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
changeSettings(new_metadata.settings_changes, table_lock_holder);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id, new_metadata);
@ -277,10 +275,6 @@ void StorageMergeTree::alter(
else
{
{
/// TODO (relax this lock and remove this action lock)
auto merges_block = getActionLock(ActionLocks::PartsMerge);
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
changeSettings(new_metadata.settings_changes, table_lock_holder);
/// Reinitialize primary key because primary key column types might have changed.
setProperties(new_metadata, old_metadata);
@ -290,9 +284,6 @@ void StorageMergeTree::alter(
if (!maybe_mutation_commands.empty())
mutation_version = startMutation(maybe_mutation_commands, mutation_file_name);
/// We release all locks except alter_intention_lock which allows
/// to execute alter queries sequentially
table_lock_holder.releaseAllExceptAlterIntention();
}
/// Always execute required mutations synchronously, because alters
@ -591,8 +582,7 @@ bool StorageMergeTree::merge(
bool deduplicate,
String * out_disable_reason)
{
auto table_lock_holder = lockStructureForShare(
true, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto metadata_snapshot = getInMemoryMetadataPtr();
FutureMergedMutatedPart future_part;
@ -740,8 +730,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask()
bool StorageMergeTree::tryMutatePart()
{
auto table_lock_holder = lockStructureForShare(
true, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr();
size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements;
@ -876,13 +865,8 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
/// Clear old parts. It is unnecessary to do it more than once a second.
if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
{
{
/// TODO: Implement tryLockStructureForShare.
auto lock_structure = lockStructureForShare(
false, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
clearOldPartsFromFilesystem();
clearOldTemporaryDirectories();
}
clearOldMutations();
}
@ -1078,16 +1062,14 @@ void StorageMergeTree::alterPartition(
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock = lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
freezePartition(command.partition, command.with_name, context, lock);
}
break;
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock = lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
freezeAll(command.with_name, context, lock);
}
break;
@ -1156,8 +1138,8 @@ void StorageMergeTree::attachPartition(
void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context)
{
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock1 = lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock2 = source_table->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
auto my_metadata_snapshot = getInMemoryMetadataPtr();
@ -1229,8 +1211,8 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
{
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock1 = lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock2 = dest_table->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table);
if (!dest_table_storage)

View File

@ -75,9 +75,9 @@ public:
CancellationCode killMutation(const String & mutation_id) override;
void drop() override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;
void alter(const AlterCommands & commands, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void alter(const AlterCommands & commands, const Context & context, TableLockHolder & table_lock_holder) override;
void checkTableCanBeDropped() const override;

View File

@ -45,10 +45,8 @@ void StorageNull::checkAlterIsPossible(const AlterCommands & commands, const Set
}
void StorageNull::alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
void StorageNull::alter(const AlterCommands & params, const Context & context, TableLockHolder &)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_id = getStorageID();
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();

View File

@ -44,7 +44,7 @@ public:
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) const override;
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void alter(const AlterCommands & params, const Context & context, TableLockHolder & table_lock_holder) override;
std::optional<UInt64> totalRows() const override
{

View File

@ -1309,8 +1309,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge,
ttl_infos, time(nullptr), max_volume_index);
auto table_lock = lockStructureForShare(
false, RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
auto table_lock = lockForShare(RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr();
FutureMergedMutatedPart future_merged_part(parts, entry.new_part_type);
@ -1436,8 +1435,8 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/// Can throw an exception.
ReservationPtr reserved_space = reserveSpace(estimated_space_for_result, source_part->volume);
auto table_lock = lockStructureForShare(
false, RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
auto table_lock = lockForShare(
RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr();
MutableDataPartPtr new_part;
@ -1793,8 +1792,8 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
PartDescriptions parts_to_add;
DataPartsVector parts_to_remove;
auto table_lock_holder_dst_table = lockStructureForShare(
false, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto table_lock_holder_dst_table = lockForShare(
RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto dst_metadata_snapshot = getInMemoryMetadataPtr();
for (size_t i = 0; i < entry_replace.new_part_names.size(); ++i)
@ -1833,7 +1832,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
}
StoragePtr source_table;
TableStructureReadLockHolder table_lock_holder_src_table;
TableLockHolder table_lock_holder_src_table;
StorageID source_table_id{entry_replace.from_database, entry_replace.from_table};
auto clone_data_parts_from_source_table = [&] () -> size_t
@ -1857,11 +1856,11 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
return 0;
}
table_lock_holder_src_table = source_table->lockStructureForShare(
false, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
table_lock_holder_src_table = source_table->lockForShare(
RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
DataPartStates valid_states{MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed,
MergeTreeDataPartState::Outdated};
DataPartStates valid_states{
MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated};
size_t num_clonable_parts = 0;
for (PartDescriptionPtr & part_desc : parts_to_add)
@ -3092,10 +3091,9 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
LOG_DEBUG(log, "Fetching part {} from {}", part_name, source_replica_path);
TableStructureReadLockHolder table_lock_holder;
TableLockHolder table_lock_holder;
if (!to_detached)
table_lock_holder = lockStructureForShare(
true, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
/// Logging
Stopwatch stopwatch;
@ -3636,10 +3634,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
zookeeper->multi(requests);
{
/// TODO (relax this lock and remove this action locks)
auto merges_block = getActionLock(ActionLocks::PartsMerge);
auto fetchers_block = getActionLock(ActionLocks::PartsFetch);
auto table_lock = lockExclusively(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto alter_lock = lockForAlter(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
LOG_INFO(log, "Metadata changed in ZooKeeper. Applying changes locally.");
@ -3658,7 +3653,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
void StorageReplicatedMergeTree::alter(
const AlterCommands & params, const Context & query_context, TableStructureWriteLockHolder & table_lock_holder)
const AlterCommands & params, const Context & query_context, TableLockHolder & table_lock_holder)
{
assertNotReadonly();
@ -3666,8 +3661,6 @@ void StorageReplicatedMergeTree::alter(
if (params.isSettingsAlter())
{
lockStructureExclusively(
table_lock_holder, query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
/// We don't replicate storage_settings_ptr ALTER. It's local operation.
/// Also we don't upgrade alter lock to table structure lock.
StorageInMemoryMetadata future_metadata = getInMemoryMetadata();
@ -3732,8 +3725,6 @@ void StorageReplicatedMergeTree::alter(
if (ast_to_str(current_metadata->settings_changes) != ast_to_str(future_metadata.settings_changes))
{
lockStructureExclusively(
table_lock_holder, query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
/// Just change settings
StorageInMemoryMetadata metadata_copy = *current_metadata;
metadata_copy.settings_changes = future_metadata.settings_changes;
@ -3824,7 +3815,7 @@ void StorageReplicatedMergeTree::alter(
}
table_lock_holder.release();
table_lock_holder.reset();
std::vector<String> unwaited;
if (query_context.getSettingsRef().replication_alter_partitions_sync == 2)
@ -3908,16 +3899,14 @@ void StorageReplicatedMergeTree::alterPartition(
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructureForShare(
false, query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
freezePartition(command.partition, command.with_name, query_context, lock);
}
break;
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructureForShare(
false, query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
freezeAll(command.with_name, query_context, lock);
}
break;
@ -4012,7 +4001,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPt
void StorageReplicatedMergeTree::truncate(
const ASTPtr & query, const StorageMetadataPtr &, const Context & query_context, TableStructureWriteLockHolder & table_lock)
const ASTPtr & query, const StorageMetadataPtr &, const Context & query_context, TableExclusiveLockHolder & table_lock)
{
table_lock.release(); /// Truncate is done asynchronously.
@ -4925,10 +4914,8 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{
/// Critical section is not required (since grabOldParts() returns unique part set on each call)
auto table_lock = lockStructureForShare(
false, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto table_lock = lockForShare(
RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto zookeeper = getZooKeeper();
DataPartsVector parts = grabOldParts();
@ -5219,8 +5206,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
const Context & context)
{
/// First argument is true, because we possibly will add new data to current table.
auto lock1 = lockStructureForShare(true, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock1 = lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock2 = source_table->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
auto metadata_snapshot = getInMemoryMetadataPtr();
@ -5397,16 +5384,16 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
/// If necessary, wait until the operation is performed on all replicas.
if (context.getSettingsRef().replication_alter_partitions_sync > 1)
{
lock2.release();
lock1.release();
lock2.reset();
lock1.reset();
waitForAllReplicasToProcessLogEntry(entry);
}
}
void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & query_context)
{
auto lock1 = lockStructureForShare(false, query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
auto lock2 = dest_table->lockStructureForShare(false, query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
auto lock1 = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
auto lock2 = dest_table->lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
auto dest_table_storage = std::dynamic_pointer_cast<StorageReplicatedMergeTree>(dest_table);
if (!dest_table_storage)
@ -5583,7 +5570,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
if (query_context.getSettingsRef().replication_alter_partitions_sync > 1)
{
lock2.release();
lock2.reset();
dest_table_storage->waitForAllReplicasToProcessLogEntry(entry);
}
@ -5600,7 +5587,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
if (query_context.getSettingsRef().replication_alter_partitions_sync > 1)
{
lock1.release();
lock1.reset();
waitForAllReplicasToProcessLogEntry(entry_delete);
}

View File

@ -103,7 +103,7 @@ public:
bool optimize(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context) override;
void alter(const AlterCommands & params, const Context & query_context, TableStructureWriteLockHolder & table_lock_holder) override;
void alter(const AlterCommands & params, const Context & query_context, TableLockHolder & table_lock_holder) override;
void alterPartition(
const ASTPtr & query,
@ -120,7 +120,7 @@ public:
*/
void drop() override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
@ -315,7 +315,7 @@ private:
void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot);
/// A part of ALTER: apply metadata changes only (data parts are altered separately).
/// Must be called under IStorage::lockStructureForAlter() lock.
/// Must be called under IStorage::lockForAlter() lock.
void setTableStructure(ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff);
/** Check that the set of parts corresponds to that in ZK (/replicas/me/parts/).

View File

@ -142,7 +142,7 @@ void StorageSet::finishInsert() { set->finishInsert(); }
size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
void StorageSet::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableStructureWriteLockHolder &)
void StorageSet::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &)
{
Poco::File(path).remove(true);
Poco::File(path).createDirectories();

View File

@ -67,7 +67,7 @@ public:
/// Access the insides.
SetPtr & getSet() { return set; }
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
private:
SetPtr set;

View File

@ -326,7 +326,7 @@ CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Conte
return file_checker.check();
}
void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableStructureWriteLockHolder &)
void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{
std::shared_lock<std::shared_mutex> lock(rwlock);

View File

@ -42,7 +42,7 @@ public:
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder&) override;
protected:
StorageStripeLog(

View File

@ -430,7 +430,7 @@ CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context
}
void StorageTinyLog::truncate(
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableStructureWriteLockHolder &)
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &)
{
std::unique_lock<std::shared_mutex> lock(rwlock);

View File

@ -41,7 +41,7 @@ public:
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
void drop() override;

View File

@ -103,11 +103,11 @@ protected:
{
StoragePtr storage = storages.at(std::make_pair(database_name, table_name));
TableStructureReadLockHolder table_lock;
TableLockHolder table_lock;
try
{
table_lock = storage->lockStructureForShare(false, query_id, lock_acquire_timeout);
table_lock = storage->lockForShare(query_id, lock_acquire_timeout);
}
catch (const Exception & e)
{

View File

@ -196,7 +196,7 @@ StoragesInfo StoragesInfoStream::next()
try
{
/// For table not to be dropped and set of columns to remain constant.
info.table_lock = info.storage->lockStructureForShare(false, query_id, settings.lock_acquire_timeout);
info.table_lock = info.storage->lockForShare(query_id, settings.lock_acquire_timeout);
}
catch (const Exception & e)
{

View File

@ -14,7 +14,7 @@ class Context;
struct StoragesInfo
{
StoragePtr storage = nullptr;
TableStructureReadLockHolder table_lock;
TableLockHolder table_lock;
String database;
String table;

View File

@ -245,7 +245,7 @@ protected:
continue;
StoragePtr table = nullptr;
TableStructureReadLockHolder lock;
TableLockHolder lock;
if (need_lock_structure)
{
@ -257,8 +257,7 @@ protected:
}
try
{
lock = table->lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
lock = table->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
}
catch (const Exception & e)
{

View File

@ -5,44 +5,17 @@
namespace DB
{
/// Structs that hold table structure (columns, their types, default values etc.) locks when executing queries.
/// See IStorage::lock* methods for comments.
struct TableStructureWriteLockHolder
struct TableExclusiveLockHolder
{
void release()
{
*this = TableStructureWriteLockHolder();
}
void releaseAllExceptAlterIntention()
{
new_data_structure_lock.reset();
structure_lock.reset();
}
void release() { *this = TableExclusiveLockHolder(); }
private:
friend class IStorage;
/// Order is important.
RWLockImpl::LockHolder alter_intention_lock;
RWLockImpl::LockHolder new_data_structure_lock;
RWLockImpl::LockHolder structure_lock;
};
struct TableStructureReadLockHolder
{
void release()
{
*this = TableStructureReadLockHolder();
}
private:
friend class IStorage;
/// Order is important.
RWLockImpl::LockHolder new_data_structure_lock;
RWLockImpl::LockHolder structure_lock;
RWLockImpl::LockHolder alter_lock;
RWLockImpl::LockHolder drop_lock;
};
using TableLockHolder = RWLockImpl::LockHolder;
}