stop background ops before converting db engine

This commit is contained in:
Alexander Tokmakov 2022-09-09 18:27:19 +02:00
parent 12e7cd6b19
commit 9bcec2d49a
5 changed files with 61 additions and 22 deletions

View File

@ -2974,7 +2974,7 @@ const IHostContextPtr & Context::getHostContext() const
}
std::shared_ptr<ActionLocksManager> Context::getActionLocksManager()
std::shared_ptr<ActionLocksManager> Context::getActionLocksManager() const
{
auto lock = getLock();

View File

@ -936,7 +936,7 @@ public:
bool applyDeletedMask() const { return apply_deleted_mask; }
void setApplyDeletedMask(bool apply) { apply_deleted_mask = apply; }
ActionLocksManagerPtr getActionLocksManager();
ActionLocksManagerPtr getActionLocksManager() const;
enum class ApplicationType
{

View File

@ -180,15 +180,28 @@ void InterpreterSystemQuery::startStopAction(StorageActionBlockType action_type,
{
for (auto & elem : DatabaseCatalog::instance().getDatabases())
{
for (auto iterator = elem.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
startStopActionInDatabase(action_type, start, elem.first, elem.second, getContext(), log);
}
}
}
void InterpreterSystemQuery::startStopActionInDatabase(StorageActionBlockType action_type, bool start,
const String & database_name, const DatabasePtr & database,
const ContextPtr & local_context, Poco::Logger * log)
{
auto manager = local_context->getActionLocksManager();
auto access = local_context->getAccess();
auto required_access_type = getRequiredAccessType(action_type);
for (auto iterator = database->getTablesIterator(local_context); iterator->isValid(); iterator->next())
{
StoragePtr table = iterator->table();
if (!table)
continue;
if (!access->isGranted(required_access_type, elem.first, iterator->name()))
if (!access->isGranted(required_access_type, database_name, iterator->name()))
{
LOG_INFO(log, "Access {} denied, skipping {}.{}", toString(required_access_type), elem.first, iterator->name());
LOG_INFO(log, "Access {} denied, skipping {}.{}", toString(required_access_type), database_name, iterator->name());
continue;
}
@ -201,8 +214,6 @@ void InterpreterSystemQuery::startStopAction(StorageActionBlockType action_type,
manager->add(table, action_type);
}
}
}
}
InterpreterSystemQuery::InterpreterSystemQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_)

View File

@ -16,6 +16,9 @@ namespace DB
class Context;
class AccessRightsElements;
class ASTSystemQuery;
class IDatabase;
using DatabasePtr = std::shared_ptr<IDatabase>;
/** Implement various SYSTEM queries.
@ -37,6 +40,10 @@ public:
BlockIO execute() override;
static void startStopActionInDatabase(StorageActionBlockType action_type, bool start,
const String & database_name, const DatabasePtr & database,
const ContextPtr & local_context, Poco::Logger * log);
private:
ASTPtr query_ptr;
Poco::Logger * log = nullptr;

View File

@ -5,6 +5,7 @@
#include <Parsers/parseQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterSystemQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/loadMetadata.h>
#include <Interpreters/executeQuery.h>
@ -32,6 +33,14 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace ActionLocks
{
extern StorageActionBlockType PartsMerge;
extern StorageActionBlockType PartsFetch;
extern StorageActionBlockType PartsSend;
extern StorageActionBlockType DistributedSend;
}
static void executeCreateQuery(
const String & query,
ContextMutablePtr context,
@ -327,9 +336,21 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
}
auto local_context = Context::createCopy(context);
/// We have to stop background operations that may lock table for share to avoid DEADLOCK_AVOIDED error
/// on moving tables from Ordinary database. Server has not started to accept connections yet,
/// so there are no user queries, only background operations
LOG_INFO(log, "Will stop background operations to be able to rename tables in Ordinary database {}", database_name);
for (const auto & action : {ActionLocks::PartsMerge, ActionLocks::PartsFetch, ActionLocks::PartsSend, ActionLocks::DistributedSend})
InterpreterSystemQuery::startStopActionInDatabase(action, /* start */ false, database_name, database, context, log);
local_context->setSetting("check_table_dependencies", false);
convertOrdinaryDatabaseToAtomic(log, local_context, database, database_name, tmp_name);
LOG_INFO(log, "Will start background operations after renaming tables in database {}", database_name);
for (const auto & action : {ActionLocks::PartsMerge, ActionLocks::PartsFetch, ActionLocks::PartsSend, ActionLocks::DistributedSend})
InterpreterSystemQuery::startStopActionInDatabase(action, /* start */ true, database_name, database, context, log);
auto new_database = DatabaseCatalog::instance().getDatabase(database_name);
UUID db_uuid = new_database->getUUID();
std::vector<UUID> tables_uuids;