This commit is contained in:
Alexander Tokmakov 2020-04-09 02:59:39 +03:00
parent dd1590830b
commit 140cd88c20
6 changed files with 35 additions and 7 deletions

View File

@ -404,7 +404,7 @@ struct ContextShared
if (system_logs) if (system_logs)
system_logs->shutdown(); system_logs->shutdown();
DatabaseCatalog::instance().shutdown(); DatabaseCatalog::shutdown();
/// Preemptive destruction is important, because these objects may have a refcount to ContextShared (cyclic reference). /// Preemptive destruction is important, because these objects may have a refcount to ContextShared (cyclic reference).
/// TODO: Get rid of this. /// TODO: Get rid of this.

View File

@ -27,6 +27,7 @@ namespace ErrorCodes
extern const int DATABASE_NOT_EMPTY; extern const int DATABASE_NOT_EMPTY;
extern const int DATABASE_ACCESS_DENIED; extern const int DATABASE_ACCESS_DENIED;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int NULL_POINTER_DEREFERENCE;
} }
TemporaryTableHolder::TemporaryTableHolder(const Context & context_, TemporaryTableHolder::TemporaryTableHolder(const Context & context_,
@ -107,6 +108,7 @@ StoragePtr TemporaryTableHolder::getTable() const
void DatabaseCatalog::loadDatabases() void DatabaseCatalog::loadDatabases()
{ {
drop_delay_s = global_context->getConfigRef().getInt("database_atomic_delay_before_drop_table_s", 60);
auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE); auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE);
attachDatabase(TEMPORARY_DATABASE, db_for_temporary_and_external_tables); attachDatabase(TEMPORARY_DATABASE, db_for_temporary_and_external_tables);
@ -117,7 +119,7 @@ void DatabaseCatalog::loadDatabases()
(*drop_task)->activateAndSchedule(); (*drop_task)->activateAndSchedule();
} }
void DatabaseCatalog::shutdown() void DatabaseCatalog::shutdownImpl()
{ {
if (drop_task) if (drop_task)
(*drop_task)->deactivate(); (*drop_task)->deactivate();
@ -378,8 +380,7 @@ DatabaseCatalog::DatabaseCatalog(Context * global_context_)
: global_context(global_context_), log(&Poco::Logger::get("DatabaseCatalog")) : global_context(global_context_), log(&Poco::Logger::get("DatabaseCatalog"))
{ {
if (!global_context) if (!global_context)
throw Exception("DatabaseCatalog is not initialized. It's a bug.", ErrorCodes::LOGICAL_ERROR); throw Exception("DatabaseCatalog is not initialized. It's a bug.", ErrorCodes::NULL_POINTER_DEREFERENCE);
drop_delay_s = global_context->getConfigRef().getInt("database_atomic_delay_before_drop_table_s", 60);
} }
DatabaseCatalog & DatabaseCatalog::init(Context * global_context_) DatabaseCatalog & DatabaseCatalog::init(Context * global_context_)
@ -393,6 +394,23 @@ DatabaseCatalog & DatabaseCatalog::instance()
return init(nullptr); return init(nullptr);
} }
void DatabaseCatalog::shutdown()
{
try
{
instance().shutdownImpl();
}
catch (const Exception & e)
{
/// If catalog was not initialized yet by init(global_context), instance() throws NULL_POINTER_DEREFERENCE.
/// It can happen if some exception was thrown on first steps of startup (e.g. command line arguments parsing).
/// Ignore it.
if (e.code() == ErrorCodes::NULL_POINTER_DEREFERENCE)
return;
throw;
}
}
DatabasePtr DatabaseCatalog::getDatabase(const String & database_name, const Context & local_context) const DatabasePtr DatabaseCatalog::getDatabase(const String & database_name, const Context & local_context) const
{ {
String resolved_database = local_context.resolveDatabase(database_name); String resolved_database = local_context.resolveDatabase(database_name);

View File

@ -102,9 +102,9 @@ public:
static DatabaseCatalog & init(Context * global_context_); static DatabaseCatalog & init(Context * global_context_);
static DatabaseCatalog & instance(); static DatabaseCatalog & instance();
static void shutdown();
void loadDatabases(); void loadDatabases();
void shutdown();
/// Get an object that protects the table from concurrently executing multiple DDL operations. /// Get an object that protects the table from concurrently executing multiple DDL operations.
std::unique_ptr<DDLGuard> getDDLGuard(const String & database, const String & table); std::unique_ptr<DDLGuard> getDDLGuard(const String & database, const String & table);
@ -166,6 +166,8 @@ private:
void assertDatabaseExistsUnlocked(const String & database_name) const; void assertDatabaseExistsUnlocked(const String & database_name) const;
void assertDatabaseDoesntExistUnlocked(const String & database_name) const; void assertDatabaseDoesntExistUnlocked(const String & database_name) const;
void shutdownImpl();
struct UUIDToStorageMapPart struct UUIDToStorageMapPart
{ {
@ -222,7 +224,7 @@ private:
mutable std::mutex tables_marked_dropped_mutex; mutable std::mutex tables_marked_dropped_mutex;
std::unique_ptr<BackgroundSchedulePoolTaskHolder> drop_task; std::unique_ptr<BackgroundSchedulePoolTaskHolder> drop_task;
time_t drop_delay_s; time_t drop_delay_s = 60;
}; };
} }

View File

@ -2880,11 +2880,15 @@ void StorageReplicatedMergeTree::startup()
move_parts_task_handle = pool.createTask([this] { return movePartsTask(); }); move_parts_task_handle = pool.createTask([this] { return movePartsTask(); });
pool.startTask(move_parts_task_handle); pool.startTask(move_parts_task_handle);
} }
need_shutdown.store(true);
} }
void StorageReplicatedMergeTree::shutdown() void StorageReplicatedMergeTree::shutdown()
{ {
if (!need_shutdown.load())
return;
clearOldPartsFromFilesystem(true); clearOldPartsFromFilesystem(true);
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever(); fetcher.blocker.cancelForever();
@ -2917,6 +2921,7 @@ void StorageReplicatedMergeTree::shutdown()
std::unique_lock lock(data_parts_exchange_endpoint->rwlock); std::unique_lock lock(data_parts_exchange_endpoint->rwlock);
} }
data_parts_exchange_endpoint.reset(); data_parts_exchange_endpoint.reset();
need_shutdown.store(false);
} }

View File

@ -288,6 +288,8 @@ private:
/// True if replica was created for existing table with fixed granularity /// True if replica was created for existing table with fixed granularity
bool other_replicas_fixed_granularity = false; bool other_replicas_fixed_granularity = false;
std::atomic_bool need_shutdown{false};
template <class Func> template <class Func>
void foreachCommittedParts(const Func & func) const; void foreachCommittedParts(const Func & func) const;

View File

@ -23,7 +23,8 @@ ALTER TABLE table_for_rename_pk RENAME COLUMN key3 TO renamed_key3; --{serverErr
ALTER TABLE table_for_rename_pk RENAME COLUMN key2 TO renamed_key2; --{serverError 44} ALTER TABLE table_for_rename_pk RENAME COLUMN key2 TO renamed_key2; --{serverError 44}
DROP TABLE IF EXISTS table_for_rename_pk; DROP TABLE IF EXISTS table_for_rename_pk NO DELAY;
SELECT sleep(1) FORMAT Null;
DROP TABLE IF EXISTS table_for_rename_with_primary_key; DROP TABLE IF EXISTS table_for_rename_with_primary_key;