mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Remove redundant information from RWLock.
And fix broken exception in some race conditions
This commit is contained in:
parent
470f96ce19
commit
8b5a05947c
@ -53,7 +53,7 @@ public:
|
||||
};
|
||||
|
||||
|
||||
RWLockImpl::LockHandler RWLockImpl::getLock(RWLockImpl::Type type, RWLockImpl::Client client)
|
||||
RWLockImpl::LockHandler RWLockImpl::getLock(RWLockImpl::Type type)
|
||||
{
|
||||
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
|
||||
CurrentMetrics::Increment waiting_client_increment((type == Read) ? CurrentMetrics::RWLockWaitingReaders
|
||||
@ -78,16 +78,17 @@ RWLockImpl::LockHandler RWLockImpl::getLock(RWLockImpl::Type type, RWLockImpl::C
|
||||
{
|
||||
auto handler_ptr = it_handler->second.lock();
|
||||
|
||||
if (!handler_ptr)
|
||||
throw Exception("Lock handler cannot be nullptr. This is a bug", ErrorCodes::LOGICAL_ERROR);
|
||||
/// Lock may be released in another thread, but not yet deleted inside |~LogHandlerImpl()|
|
||||
|
||||
if (handler_ptr)
|
||||
{
|
||||
/// XXX: it means we can't upgrade lock from read to write - with proper waiting!
|
||||
if (type != Read || handler_ptr->it_group->type != Read)
|
||||
throw Exception("Attempt to acquire exclusive lock recursively", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
handler_ptr->it_client->info += "; " + client.info;
|
||||
|
||||
return handler_ptr;
|
||||
}
|
||||
}
|
||||
|
||||
if (type == Type::Write || queue.empty() || queue.back().type == Type::Write)
|
||||
{
|
||||
@ -104,7 +105,7 @@ RWLockImpl::LockHandler RWLockImpl::getLock(RWLockImpl::Type type, RWLockImpl::C
|
||||
auto & clients = it_group->clients;
|
||||
try
|
||||
{
|
||||
it_client = clients.emplace(clients.end(), std::move(client));
|
||||
it_client = clients.emplace(clients.end(), type);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -114,10 +115,6 @@ RWLockImpl::LockHandler RWLockImpl::getLock(RWLockImpl::Type type, RWLockImpl::C
|
||||
throw;
|
||||
}
|
||||
|
||||
it_client->thread_number = Poco::ThreadNumber::get();
|
||||
it_client->enqueue_time = time(nullptr);
|
||||
it_client->type = type;
|
||||
|
||||
LockHandler res(new LockHandlerImpl(shared_from_this(), it_group, it_client));
|
||||
|
||||
/// Insert myself (weak_ptr to the handler) to threads set to implement recursive lock
|
||||
@ -128,7 +125,6 @@ RWLockImpl::LockHandler RWLockImpl::getLock(RWLockImpl::Type type, RWLockImpl::C
|
||||
/// If we are not the first client in the group, a notification could be already sent
|
||||
if (it_group == queue.begin())
|
||||
{
|
||||
it_client->start_time = it_client->enqueue_time;
|
||||
finalize_metrics();
|
||||
return res;
|
||||
}
|
||||
@ -136,7 +132,6 @@ RWLockImpl::LockHandler RWLockImpl::getLock(RWLockImpl::Type type, RWLockImpl::C
|
||||
/// Wait a notification
|
||||
it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); });
|
||||
|
||||
it_client->start_time = time(nullptr);
|
||||
finalize_metrics();
|
||||
return res;
|
||||
}
|
||||
@ -169,7 +164,7 @@ RWLockImpl::LockHandlerImpl::~LockHandlerImpl()
|
||||
RWLockImpl::LockHandlerImpl::LockHandlerImpl(RWLock && parent, RWLockImpl::GroupsContainer::iterator it_group,
|
||||
RWLockImpl::ClientsContainer::iterator it_client)
|
||||
: parent{std::move(parent)}, it_group{it_group}, it_client{it_client},
|
||||
active_client_increment{(it_client->type == RWLockImpl::Read) ? CurrentMetrics::RWLockActiveReaders
|
||||
active_client_increment{(*it_client == RWLockImpl::Read) ? CurrentMetrics::RWLockActiveReaders
|
||||
: CurrentMetrics::RWLockActiveWriters}
|
||||
{}
|
||||
|
||||
|
@ -27,23 +27,6 @@ public:
|
||||
Write,
|
||||
};
|
||||
|
||||
private:
|
||||
/// Client is that who wants to acquire the lock.
|
||||
struct Client
|
||||
{
|
||||
explicit Client(const std::string & info = {}) : info{info} {}
|
||||
|
||||
bool isStarted() { return start_time != 0; }
|
||||
|
||||
/// TODO: delete extra info below if there is no need fot it already.
|
||||
std::string info;
|
||||
int thread_number = 0;
|
||||
std::time_t enqueue_time = 0;
|
||||
std::time_t start_time = 0;
|
||||
Type type = Read;
|
||||
};
|
||||
|
||||
public:
|
||||
static RWLock create() { return RWLock(new RWLockImpl); }
|
||||
|
||||
/// Just use LockHandler::reset() to release the lock
|
||||
@ -53,21 +36,21 @@ public:
|
||||
|
||||
|
||||
/// Waits in the queue and returns appropriate lock
|
||||
LockHandler getLock(Type type, Client client = Client{});
|
||||
LockHandler getLock(Type type, const std::string & who) { return getLock(type, Client(who)); }
|
||||
LockHandler getLock(Type type);
|
||||
|
||||
private:
|
||||
RWLockImpl() = default;
|
||||
|
||||
struct Group;
|
||||
using GroupsContainer = std::list<Group>;
|
||||
using ClientsContainer = std::list<Client>;
|
||||
using ClientsContainer = std::list<Type>;
|
||||
using ThreadToHandler = std::map<std::thread::id, std::weak_ptr<LockHandlerImpl>>;
|
||||
|
||||
/// Group of clients that should be executed concurrently
|
||||
/// i.e. a group could contain several readers, but only one writer
|
||||
struct Group
|
||||
{
|
||||
// FIXME: there is only redundant |type| information inside |clients|.
|
||||
const Type type;
|
||||
ClientsContainer clients;
|
||||
|
||||
|
@ -38,7 +38,7 @@ TEST(Common, RWLock_1)
|
||||
auto type = (std::uniform_int_distribution<>(0, 9)(gen) >= round) ? RWLockImpl::Read : RWLockImpl::Write;
|
||||
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
|
||||
|
||||
auto lock = fifo_lock->getLock(type, "RW");
|
||||
auto lock = fifo_lock->getLock(type);
|
||||
|
||||
if (type == RWLockImpl::Write)
|
||||
{
|
@ -20,7 +20,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
|
||||
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
|
||||
* but it's clear that here is not the best place for this functionality.
|
||||
*/
|
||||
addTableLock(storage->lockStructure(true, __PRETTY_FUNCTION__));
|
||||
addTableLock(storage->lockStructure(true));
|
||||
|
||||
/// If the "root" table deduplactes blocks, there are no need to make deduplication for children
|
||||
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks
|
||||
@ -45,7 +45,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
|
||||
auto & materialized_view = dynamic_cast<const StorageMaterializedView &>(*dependent_table);
|
||||
|
||||
if (StoragePtr inner_table = materialized_view.tryGetTargetTable())
|
||||
addTableLock(inner_table->lockStructure(true, __PRETTY_FUNCTION__));
|
||||
addTableLock(inner_table->lockStructure(true));
|
||||
|
||||
auto query = materialized_view.getInnerQuery();
|
||||
BlockOutputStreamPtr out = std::make_shared<PushingToViewsBlockOutputStream>(
|
||||
|
@ -547,7 +547,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
if (!as_table_name.empty())
|
||||
{
|
||||
as_storage = context.getTable(as_database_name, as_table_name);
|
||||
as_storage_lock = as_storage->lockStructure(false, __PRETTY_FUNCTION__);
|
||||
as_storage_lock = as_storage->lockStructure(false);
|
||||
}
|
||||
|
||||
/// Set and retrieve list of columns.
|
||||
|
@ -102,7 +102,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
|
||||
table = context.getTable(database_name, table_name);
|
||||
}
|
||||
|
||||
auto table_lock = table->lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto table_lock = table->lockStructure(false);
|
||||
columns = table->getColumns().getAll();
|
||||
column_defaults = table->getColumns().defaults;
|
||||
column_comments = table->getColumns().comments;
|
||||
|
@ -69,7 +69,7 @@ BlockIO InterpreterDropQuery::executeToTable(String & database_name_, String & t
|
||||
{
|
||||
database_and_table.second->shutdown();
|
||||
/// If table was already dropped by anyone, an exception will be thrown
|
||||
auto table_lock = database_and_table.second->lockForAlter(__PRETTY_FUNCTION__);
|
||||
auto table_lock = database_and_table.second->lockForAlter();
|
||||
/// Drop table from memory, don't touch data and metadata
|
||||
database_and_table.first->detachTable(database_and_table.second->getTableName());
|
||||
}
|
||||
@ -78,7 +78,7 @@ BlockIO InterpreterDropQuery::executeToTable(String & database_name_, String & t
|
||||
database_and_table.second->checkTableCanBeDropped();
|
||||
|
||||
/// If table was already dropped by anyone, an exception will be thrown
|
||||
auto table_lock = database_and_table.second->lockForAlter(__PRETTY_FUNCTION__);
|
||||
auto table_lock = database_and_table.second->lockForAlter();
|
||||
/// Drop table data, don't touch metadata
|
||||
database_and_table.second->truncate(query_ptr);
|
||||
}
|
||||
@ -88,7 +88,7 @@ BlockIO InterpreterDropQuery::executeToTable(String & database_name_, String & t
|
||||
|
||||
database_and_table.second->shutdown();
|
||||
/// If table was already dropped by anyone, an exception will be thrown
|
||||
auto table_lock = database_and_table.second->lockForAlter(__PRETTY_FUNCTION__);
|
||||
auto table_lock = database_and_table.second->lockForAlter();
|
||||
/// Delete table metdata and table itself from memory
|
||||
database_and_table.first->removeTable(context, database_and_table.second->getTableName());
|
||||
/// Delete table data
|
||||
@ -124,7 +124,7 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(String & table_name, ASTDr
|
||||
if (kind == ASTDropQuery::Kind::Truncate)
|
||||
{
|
||||
/// If table was already dropped by anyone, an exception will be thrown
|
||||
auto table_lock = table->lockForAlter(__PRETTY_FUNCTION__);
|
||||
auto table_lock = table->lockForAlter();
|
||||
/// Drop table data, don't touch metadata
|
||||
table->truncate(query_ptr);
|
||||
}
|
||||
@ -133,7 +133,7 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(String & table_name, ASTDr
|
||||
context_handle.tryRemoveExternalTable(table_name);
|
||||
table->shutdown();
|
||||
/// If table was already dropped by anyone, an exception will be thrown
|
||||
auto table_lock = table->lockForAlter(__PRETTY_FUNCTION__);
|
||||
auto table_lock = table->lockForAlter();
|
||||
/// Delete table data
|
||||
table->drop();
|
||||
table->is_dropped = true;
|
||||
|
@ -92,7 +92,7 @@ BlockIO InterpreterInsertQuery::execute()
|
||||
checkAccess(query);
|
||||
StoragePtr table = getTable(query);
|
||||
|
||||
auto table_lock = table->lockStructure(true, __PRETTY_FUNCTION__);
|
||||
auto table_lock = table->lockStructure(true);
|
||||
|
||||
/// We create a pipeline of several streams, into which we will write data.
|
||||
BlockOutputStreamPtr out;
|
||||
|
@ -23,7 +23,7 @@ BlockIO InterpreterOptimizeQuery::execute()
|
||||
return executeDDLQueryOnCluster(query_ptr, context, {ast.database});
|
||||
|
||||
StoragePtr table = context.getTable(ast.database, ast.table);
|
||||
auto table_lock = table->lockStructure(true, __PRETTY_FUNCTION__);
|
||||
auto table_lock = table->lockStructure(true);
|
||||
table->optimize(query_ptr, ast.partition, ast.final, ast.deduplicate, context);
|
||||
return {};
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ BlockIO InterpreterRenameQuery::execute()
|
||||
|
||||
for (const auto & names : unique_tables_from)
|
||||
if (auto table = context.tryGetTable(names.database_name, names.table_name))
|
||||
locks.emplace_back(table->lockForAlter(__PRETTY_FUNCTION__));
|
||||
locks.emplace_back(table->lockForAlter());
|
||||
|
||||
/** All tables are locked. If there are more than one rename in chain,
|
||||
* we need to hold global lock while doing all renames. Order matters to avoid deadlocks.
|
||||
|
@ -182,7 +182,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
|
||||
}
|
||||
|
||||
if (storage)
|
||||
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
|
||||
table_lock = storage->lockStructure(false);
|
||||
|
||||
syntax_analyzer_result = SyntaxAnalyzer(context, storage)
|
||||
.analyze(query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, subquery_depth);
|
||||
|
@ -238,7 +238,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const String & database_nam
|
||||
table->shutdown();
|
||||
|
||||
/// If table was already dropped by anyone, an exception will be thrown
|
||||
auto table_lock = table->lockForAlter(__PRETTY_FUNCTION__);
|
||||
auto table_lock = table->lockForAlter();
|
||||
create_ast = system_context.getCreateTableQuery(database_name, table_name);
|
||||
|
||||
database->detachTable(table_name);
|
||||
|
@ -4,13 +4,13 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
TableStructureReadLock::TableStructureReadLock(StoragePtr storage_, bool lock_structure, bool lock_data, const std::string & who)
|
||||
TableStructureReadLock::TableStructureReadLock(StoragePtr storage_, bool lock_structure, bool lock_data)
|
||||
: storage(storage_)
|
||||
{
|
||||
if (lock_data)
|
||||
data_lock = storage->data_lock->getLock(RWLockImpl::Read, who);
|
||||
data_lock = storage->data_lock->getLock(RWLockImpl::Read);
|
||||
if (lock_structure)
|
||||
structure_lock = storage->structure_lock->getLock(RWLockImpl::Read, who);
|
||||
structure_lock = storage->structure_lock->getLock(RWLockImpl::Read);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -65,7 +65,7 @@ private:
|
||||
RWLockImpl::LockHandler structure_lock;
|
||||
|
||||
public:
|
||||
TableStructureReadLock(StoragePtr storage_, bool lock_structure, bool lock_data, const std::string & who);
|
||||
TableStructureReadLock(StoragePtr storage_, bool lock_structure, bool lock_data);
|
||||
};
|
||||
|
||||
|
||||
@ -115,14 +115,13 @@ public:
|
||||
/** Does not allow you to change the structure or name of the table.
|
||||
* If you change the data in the table, you will need to specify will_modify_data = true.
|
||||
* This will take an extra lock that does not allow starting ALTER MODIFY.
|
||||
* Parameter 'who' identifies a client of the lock (ALTER query, merge process, etc), used for diagnostic purposes.
|
||||
*
|
||||
* WARNING: You need to call methods from ITableDeclaration under such a lock. Without it, they are not thread safe.
|
||||
* WARNING: To avoid deadlocks, this method must not be called under lock of Context.
|
||||
*/
|
||||
TableStructureReadLockPtr lockStructure(bool will_modify_data, const std::string & who)
|
||||
TableStructureReadLockPtr lockStructure(bool will_modify_data)
|
||||
{
|
||||
TableStructureReadLockPtr res = std::make_shared<TableStructureReadLock>(shared_from_this(), true, will_modify_data, who);
|
||||
TableStructureReadLockPtr res = std::make_shared<TableStructureReadLock>(shared_from_this(), true, will_modify_data);
|
||||
if (is_dropped)
|
||||
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
|
||||
return res;
|
||||
@ -130,11 +129,11 @@ public:
|
||||
|
||||
/** Does not allow reading the table structure. It is taken for ALTER, RENAME and DROP, TRUNCATE.
|
||||
*/
|
||||
TableFullWriteLock lockForAlter(const std::string & who = "Alter")
|
||||
TableFullWriteLock lockForAlter()
|
||||
{
|
||||
/// The calculation order is important.
|
||||
auto res_data_lock = lockDataForAlter(who);
|
||||
auto res_structure_lock = lockStructureForAlter(who);
|
||||
auto res_data_lock = lockDataForAlter();
|
||||
auto res_structure_lock = lockStructureForAlter();
|
||||
|
||||
return {std::move(res_data_lock), std::move(res_structure_lock)};
|
||||
}
|
||||
@ -143,17 +142,17 @@ public:
|
||||
* It is taken during write temporary data in ALTER MODIFY.
|
||||
* Under this lock, you can take lockStructureForAlter() to change the structure of the table.
|
||||
*/
|
||||
TableDataWriteLock lockDataForAlter(const std::string & who = "Alter")
|
||||
TableDataWriteLock lockDataForAlter()
|
||||
{
|
||||
auto res = data_lock->getLock(RWLockImpl::Write, who);
|
||||
auto res = data_lock->getLock(RWLockImpl::Write);
|
||||
if (is_dropped)
|
||||
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
|
||||
return res;
|
||||
}
|
||||
|
||||
TableStructureWriteLock lockStructureForAlter(const std::string & who = "Alter")
|
||||
TableStructureWriteLock lockStructureForAlter()
|
||||
{
|
||||
auto res = structure_lock->getLock(RWLockImpl::Write, who);
|
||||
auto res = structure_lock->getLock(RWLockImpl::Write);
|
||||
if (is_dropped)
|
||||
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
|
||||
return res;
|
||||
@ -243,7 +242,7 @@ public:
|
||||
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
auto lock = lockStructureForAlter(__PRETTY_FUNCTION__);
|
||||
auto lock = lockStructureForAlter();
|
||||
auto new_columns = getColumns();
|
||||
params.apply(new_columns);
|
||||
context.getDatabase(database_name)->alterTable(context, table_name, new_columns, {});
|
||||
|
@ -79,7 +79,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
|
||||
|
||||
try
|
||||
{
|
||||
auto storage_lock = owned_storage->lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto storage_lock = owned_storage->lockStructure(false);
|
||||
|
||||
MergeTreeData::DataPartPtr part = findPart(part_name);
|
||||
|
||||
|
@ -84,7 +84,7 @@ void ReplicatedMergeTreeAlterThread::run()
|
||||
|
||||
LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock.");
|
||||
|
||||
auto table_lock = storage.lockStructureForAlter(__PRETTY_FUNCTION__);
|
||||
auto table_lock = storage.lockStructureForAlter();
|
||||
|
||||
if (columns_in_zk != storage.getColumns())
|
||||
{
|
||||
@ -113,7 +113,7 @@ void ReplicatedMergeTreeAlterThread::run()
|
||||
/// Update parts.
|
||||
if (changed_version || force_recheck_parts)
|
||||
{
|
||||
auto table_lock = storage.lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto table_lock = storage.lockStructure(false);
|
||||
|
||||
if (changed_version)
|
||||
LOG_INFO(log, "ALTER-ing parts");
|
||||
|
@ -202,7 +202,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
|
||||
else if (part->name == part_name)
|
||||
{
|
||||
auto zookeeper = storage.getZooKeeper();
|
||||
auto table_lock = storage.lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto table_lock = storage.lockStructure(false);
|
||||
|
||||
/// If the part is in ZooKeeper, check its data with its checksums, and them with ZooKeeper.
|
||||
if (zookeeper->exists(storage.replica_path + "/parts/" + part_name))
|
||||
|
@ -634,7 +634,7 @@ void StorageBuffer::alter(const AlterCommands & params, const String & database_
|
||||
if (param.type == AlterCommand::MODIFY_PRIMARY_KEY)
|
||||
throw Exception("Storage engine " + getName() + " doesn't support primary key.", ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
auto lock = lockStructureForAlter(__PRETTY_FUNCTION__);
|
||||
auto lock = lockStructureForAlter();
|
||||
|
||||
/// So that no blocks of the old structure remain.
|
||||
optimize({} /*query*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context);
|
||||
|
@ -308,7 +308,7 @@ void StorageDistributed::alter(const AlterCommands & params, const String & data
|
||||
if (param.type == AlterCommand::MODIFY_PRIMARY_KEY)
|
||||
throw Exception("Storage engine " + getName() + " doesn't support primary key.", ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
auto lock = lockStructureForAlter(__PRETTY_FUNCTION__);
|
||||
auto lock = lockStructureForAlter();
|
||||
|
||||
ColumnsDescription new_columns = getColumns();
|
||||
params.apply(new_columns);
|
||||
|
@ -344,7 +344,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables() const
|
||||
{
|
||||
auto & table = iterator->table();
|
||||
if (table.get() != this)
|
||||
selected_tables.emplace_back(table, table->lockStructure(false, __PRETTY_FUNCTION__));
|
||||
selected_tables.emplace_back(table, table->lockStructure(false));
|
||||
}
|
||||
|
||||
iterator->next();
|
||||
@ -374,7 +374,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr
|
||||
if (storage.get() != this)
|
||||
{
|
||||
virtual_column->insert(storage->getTableName());
|
||||
selected_tables.emplace_back(storage, get_lock ? storage->lockStructure(false, __PRETTY_FUNCTION__) : TableStructureReadLockPtr{});
|
||||
selected_tables.emplace_back(storage, get_lock ? storage->lockStructure(false) : TableStructureReadLockPtr{});
|
||||
}
|
||||
}
|
||||
|
||||
@ -400,7 +400,7 @@ void StorageMerge::alter(const AlterCommands & params, const String & database_n
|
||||
if (param.type == AlterCommand::MODIFY_PRIMARY_KEY)
|
||||
throw Exception("Storage engine " + getName() + " doesn't support primary key.", ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
auto lock = lockStructureForAlter(__PRETTY_FUNCTION__);
|
||||
auto lock = lockStructureForAlter();
|
||||
|
||||
ColumnsDescription new_columns = getColumns();
|
||||
params.apply(new_columns);
|
||||
|
@ -190,7 +190,7 @@ void StorageMergeTree::alter(
|
||||
{
|
||||
if (!params.is_mutable())
|
||||
{
|
||||
auto table_soft_lock = lockStructureForAlter(__PRETTY_FUNCTION__);
|
||||
auto table_soft_lock = lockStructureForAlter();
|
||||
auto new_columns = getColumns();
|
||||
params.apply(new_columns);
|
||||
context.getDatabase(database_name)->alterTable(context, table_name, new_columns, {});
|
||||
@ -201,7 +201,7 @@ void StorageMergeTree::alter(
|
||||
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
|
||||
auto merge_blocker = merger_mutator.actions_blocker.cancel();
|
||||
|
||||
auto table_soft_lock = lockDataForAlter(__PRETTY_FUNCTION__);
|
||||
auto table_soft_lock = lockDataForAlter();
|
||||
|
||||
data.checkAlter(params);
|
||||
|
||||
@ -234,7 +234,7 @@ void StorageMergeTree::alter(
|
||||
transactions.push_back(std::move(transaction));
|
||||
}
|
||||
|
||||
auto table_hard_lock = lockStructureForAlter(__PRETTY_FUNCTION__);
|
||||
auto table_hard_lock = lockStructureForAlter();
|
||||
|
||||
IDatabase::ASTModifier storage_modifier;
|
||||
if (primary_key_is_modified)
|
||||
@ -400,7 +400,7 @@ bool StorageMergeTree::merge(
|
||||
bool deduplicate,
|
||||
String * out_disable_reason)
|
||||
{
|
||||
auto structure_lock = lockStructure(true, __PRETTY_FUNCTION__);
|
||||
auto structure_lock = lockStructure(true);
|
||||
|
||||
MergeTreeDataMergerMutator::FuturePart future_part;
|
||||
|
||||
@ -505,7 +505,7 @@ bool StorageMergeTree::merge(
|
||||
|
||||
bool StorageMergeTree::tryMutatePart()
|
||||
{
|
||||
auto structure_lock = lockStructure(true, __PRETTY_FUNCTION__);
|
||||
auto structure_lock = lockStructure(true);
|
||||
|
||||
MergeTreeDataMergerMutator::FuturePart future_part;
|
||||
MutationCommands commands;
|
||||
@ -705,7 +705,7 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi
|
||||
auto merge_blocker = merger_mutator.actions_blocker.cancel();
|
||||
|
||||
/// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
|
||||
auto lock_read_structure = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto lock_read_structure = lockStructure(false);
|
||||
|
||||
String partition_id = data.getPartitionIDFromQuery(partition, context);
|
||||
auto parts = data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
|
||||
@ -807,7 +807,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
|
||||
|
||||
case PartitionCommand::FREEZE_PARTITION:
|
||||
{
|
||||
auto lock = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto lock = lockStructure(false);
|
||||
data.freezePartition(command.partition, command.with_name, context);
|
||||
}
|
||||
break;
|
||||
@ -818,7 +818,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
|
||||
|
||||
case PartitionCommand::FREEZE_ALL_PARTITIONS:
|
||||
{
|
||||
auto lock = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto lock = lockStructure(false);
|
||||
data.freezeAll(command.with_name, context);
|
||||
}
|
||||
break;
|
||||
@ -836,7 +836,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, cons
|
||||
/// This protects against "revival" of data for a removed partition after completion of merge.
|
||||
auto merge_blocker = merger_mutator.actions_blocker.cancel();
|
||||
/// Waits for completion of merge and does not start new ones.
|
||||
auto lock = lockForAlter(__PRETTY_FUNCTION__);
|
||||
auto lock = lockForAlter();
|
||||
|
||||
String partition_id = data.getPartitionIDFromQuery(partition, context);
|
||||
|
||||
@ -919,8 +919,8 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool part, cons
|
||||
|
||||
void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context)
|
||||
{
|
||||
auto lock1 = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto lock2 = source_table->lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto lock1 = lockStructure(false);
|
||||
auto lock2 = source_table->lockStructure(false);
|
||||
|
||||
Stopwatch watch;
|
||||
MergeTreeData * src_data = data.checkStructureAndGetMergeTreeData(source_table);
|
||||
|
@ -31,7 +31,7 @@ void registerStorageNull(StorageFactory & factory)
|
||||
|
||||
void StorageNull::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context)
|
||||
{
|
||||
auto lock = lockStructureForAlter(__PRETTY_FUNCTION__);
|
||||
auto lock = lockStructureForAlter();
|
||||
|
||||
ColumnsDescription new_columns = getColumns();
|
||||
params.apply(new_columns);
|
||||
|
@ -1146,7 +1146,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
|
||||
/// Can throw an exception.
|
||||
DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_merge);
|
||||
|
||||
auto table_lock = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto table_lock = lockStructure(false);
|
||||
|
||||
MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, entry.new_part_name, parts);
|
||||
|
||||
@ -1276,7 +1276,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
|
||||
/// Can throw an exception.
|
||||
DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_result);
|
||||
|
||||
auto table_lock = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto table_lock = lockStructure(false);
|
||||
|
||||
MergeTreeData::MutableDataPartPtr new_part;
|
||||
MergeTreeData::Transaction transaction(data);
|
||||
@ -1582,7 +1582,7 @@ void StorageReplicatedMergeTree::executeClearColumnInPartition(const LogEntry &
|
||||
/// We don't change table structure, only data in some parts
|
||||
/// To disable reading from these parts, we will sequentially acquire write lock for each part inside alterDataPart()
|
||||
/// If we will lock the whole table here, a deadlock can occur. For example, if use use Buffer table (CLICKHOUSE-3238)
|
||||
auto lock_read_structure = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto lock_read_structure = lockStructure(false);
|
||||
|
||||
auto zookeeper = getZooKeeper();
|
||||
|
||||
@ -1683,7 +1683,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
PartDescriptions parts_to_add;
|
||||
MergeTreeData::DataPartsVector parts_to_remove;
|
||||
|
||||
auto structure_lock_dst_table = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto structure_lock_dst_table = lockStructure(false);
|
||||
|
||||
for (size_t i = 0; i < entry_replace.new_part_names.size(); ++i)
|
||||
{
|
||||
@ -1745,7 +1745,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
return 0;
|
||||
}
|
||||
|
||||
structure_lock_src_table = source_table->lockStructure(false, __PRETTY_FUNCTION__);
|
||||
structure_lock_src_table = source_table->lockStructure(false);
|
||||
|
||||
MergeTreeData::DataPartStates valid_states{MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed,
|
||||
MergeTreeDataPartState::Outdated};
|
||||
@ -2767,7 +2767,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
|
||||
|
||||
TableStructureReadLockPtr table_lock;
|
||||
if (!to_detached)
|
||||
table_lock = lockStructure(true, __PRETTY_FUNCTION__);
|
||||
table_lock = lockStructure(true);
|
||||
|
||||
/// Logging
|
||||
Stopwatch stopwatch;
|
||||
@ -3130,7 +3130,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
|
||||
|
||||
{
|
||||
/// Just to read current structure. Alter will be done in separate thread.
|
||||
auto table_lock = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto table_lock = lockStructure(false);
|
||||
|
||||
if (is_readonly)
|
||||
throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
|
||||
@ -3312,7 +3312,7 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
|
||||
|
||||
case PartitionCommand::FREEZE_PARTITION:
|
||||
{
|
||||
auto lock = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto lock = lockStructure(false);
|
||||
data.freezePartition(command.partition, command.with_name, context);
|
||||
}
|
||||
break;
|
||||
@ -3323,7 +3323,7 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
|
||||
|
||||
case PartitionCommand::FREEZE_ALL_PARTITIONS:
|
||||
{
|
||||
auto lock = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto lock = lockStructure(false);
|
||||
data.freezeAll(command.with_name, context);
|
||||
}
|
||||
break;
|
||||
@ -4325,7 +4325,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
|
||||
{
|
||||
/// Critical section is not required (since grabOldParts() returns unique part set on each call)
|
||||
|
||||
auto table_lock = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto table_lock = lockStructure(false);
|
||||
auto zookeeper = getZooKeeper();
|
||||
|
||||
MergeTreeData::DataPartsVector parts = data.grabOldParts();
|
||||
@ -4600,8 +4600,8 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
|
||||
void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace,
|
||||
const Context & context)
|
||||
{
|
||||
auto lock1 = lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto lock2 = source_table->lockStructure(false, __PRETTY_FUNCTION__);
|
||||
auto lock1 = lockStructure(false);
|
||||
auto lock2 = source_table->lockStructure(false);
|
||||
|
||||
Stopwatch watch;
|
||||
MergeTreeData * src_data = data.checkStructureAndGetMergeTreeData(source_table);
|
||||
|
@ -98,7 +98,7 @@ protected:
|
||||
|
||||
try
|
||||
{
|
||||
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
|
||||
table_lock = storage->lockStructure(false);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
|
@ -166,7 +166,7 @@ public:
|
||||
try
|
||||
{
|
||||
/// For table not to be dropped and set of columns to remain constant.
|
||||
info.table_lock = info.storage->lockStructure(false, __PRETTY_FUNCTION__);
|
||||
info.table_lock = info.storage->lockStructure(false);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
|
@ -6038,8 +6038,8 @@
|
||||
"CodeNext": 0,
|
||||
"CodePrev": 0,
|
||||
"ErrorCode": "V001",
|
||||
"FileName": "gtest_rw_lock_fifo.cpp.cpp",
|
||||
"Message": "A code fragment from 'gtest_rw_lock_fifo.cpp.cpp' cannot be analyzed."
|
||||
"FileName": "gtest_rw_lock.cpp",
|
||||
"Message": "A code fragment from 'gtest_rw_lock.cpp' cannot be analyzed."
|
||||
},
|
||||
{
|
||||
"CodeCurrent": 0,
|
||||
|
Loading…
Reference in New Issue
Block a user