From f7d9c3a9b19d44ef0c6f1b0ce99d9da5d9ceb086 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Tue, 8 Oct 2024 03:32:13 +0000 Subject: [PATCH] Fix refreshable MV in system database breaking server startup --- programs/server/Server.cpp | 9 ++++++++ src/Storages/MaterializedView/RefreshSet.cpp | 21 ++++++++++++++++++- src/Storages/MaterializedView/RefreshSet.h | 7 ++++++- src/Storages/MaterializedView/RefreshTask.cpp | 6 ++++-- src/Storages/MaterializedView/RefreshTask.h | 5 +++-- tests/integration/helpers/cluster.py | 1 + tests/integration/test_refreshable_mv/test.py | 10 +++++++++ 7 files changed, 53 insertions(+), 6 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 0dbc0c727ab..99e6f5d900b 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -69,6 +69,7 @@ #include #include #include +#include #include #include #include @@ -2083,6 +2084,12 @@ try try { + /// Don't run background queries until we loaded tables. + /// (In particular things would break if a background drop query happens before the + /// loadMarkedAsDroppedTables() call below - it'll see dropped table metadata and try to + /// drop the table a second time and throw an exception.) + global_context->getRefreshSet().setRefreshesStopped(true); + auto & database_catalog = DatabaseCatalog::instance(); /// We load temporary database first, because projections need it. database_catalog.initializeAndLoadTemporaryDatabase(); @@ -2122,6 +2129,8 @@ try database_catalog.assertDatabaseExists(default_database); /// Load user-defined SQL functions. global_context->getUserDefinedSQLObjectsStorage().loadObjects(); + + global_context->getRefreshSet().setRefreshesStopped(false); } catch (...) { diff --git a/src/Storages/MaterializedView/RefreshSet.cpp b/src/Storages/MaterializedView/RefreshSet.cpp index 4c13bdf1dd0..f1000cfd3d9 100644 --- a/src/Storages/MaterializedView/RefreshSet.cpp +++ b/src/Storages/MaterializedView/RefreshSet.cpp @@ -176,7 +176,26 @@ void RefreshSet::notifyDependents(const StorageID & id) const res.push_back(task); } for (const RefreshTaskPtr & t : res) - t->notifyDependencyProgress(); + t->notify(); +} + +void RefreshSet::setRefreshesStopped(bool stopped) +{ + + TaskMap tasks_copy; + { + std::lock_guard lock(mutex); + refreshes_stopped.store(stopped); + tasks_copy = tasks; + } + for (const auto & kv : tasks_copy) + for (const RefreshTaskPtr & t : kv.second) + t->notify(); +} + +bool RefreshSet::refreshesStopped() const +{ + return refreshes_stopped.load(); } RefreshSet::Handle::Handle(RefreshSet * parent_set_, StorageID id_, std::optional inner_table_id_, RefreshTaskList::iterator iter_, RefreshTaskList::iterator inner_table_iter_, std::vector dependencies_) diff --git a/src/Storages/MaterializedView/RefreshSet.h b/src/Storages/MaterializedView/RefreshSet.h index 205a5512ffb..d8822b1aa4f 100644 --- a/src/Storages/MaterializedView/RefreshSet.h +++ b/src/Storages/MaterializedView/RefreshSet.h @@ -62,9 +62,12 @@ public: RefreshTaskPtr tryGetTaskForInnerTable(const StorageID & inner_table_id) const; - /// Calls notifyDependencyProgress() on all tasks that depend on `id`. + /// Calls notify() on all tasks that depend on `id`. void notifyDependents(const StorageID & id) const; + void setRefreshesStopped(bool stopped); + bool refreshesStopped() const; + private: using TaskMap = std::unordered_map; using DependentsMap = std::unordered_map, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>; @@ -78,6 +81,8 @@ private: DependentsMap dependents; InnerTableMap inner_tables; + std::atomic refreshes_stopped {false}; + RefreshTaskList::iterator addTaskLocked(StorageID id, RefreshTaskPtr task); void removeTaskLocked(StorageID id, RefreshTaskList::iterator iter); RefreshTaskList::iterator addInnerTableLocked(StorageID inner_table_id, RefreshTaskPtr task); diff --git a/src/Storages/MaterializedView/RefreshTask.cpp b/src/Storages/MaterializedView/RefreshTask.cpp index edfd62c621e..8d46f9834f1 100644 --- a/src/Storages/MaterializedView/RefreshTask.cpp +++ b/src/Storages/MaterializedView/RefreshTask.cpp @@ -299,9 +299,11 @@ std::chrono::sys_seconds RefreshTask::getNextRefreshTimeslot() const return refresh_schedule.advance(coordination.root_znode.last_completed_timeslot); } -void RefreshTask::notifyDependencyProgress() +void RefreshTask::notify() { std::lock_guard guard(mutex); + if (view && view->getContext()->getRefreshSet().refreshesStopped()) + interruptExecution(); scheduling.dependencies_satisfied_until = std::chrono::sys_seconds(std::chrono::seconds(-1)); refresh_task->schedule(); } @@ -367,7 +369,7 @@ void RefreshTask::refreshTask() chassert(lock.owns_lock()); - if (scheduling.stop_requested || coordination.read_only) + if (scheduling.stop_requested || view->getContext()->getRefreshSet().refreshesStopped() || coordination.read_only) { /// Exit the task and wait for the user to start or resume, which will schedule the task again. setState(RefreshState::Disabled, lock); diff --git a/src/Storages/MaterializedView/RefreshTask.h b/src/Storages/MaterializedView/RefreshTask.h index ceb073c8313..59898390984 100644 --- a/src/Storages/MaterializedView/RefreshTask.h +++ b/src/Storages/MaterializedView/RefreshTask.h @@ -83,8 +83,9 @@ public: /// A measure of how far this view has progressed. Used by dependent views. std::chrono::sys_seconds getNextRefreshTimeslot() const; - /// Called when progress is made (i.e. getNextRefreshTimeslot() changes) in any task that this task depends on. - void notifyDependencyProgress(); + /// Called when refresh scheduling needs to be reconsidered, e.g. after a refresh happens in + /// any task that this task depends on. + void notify(); /// For tests void setFakeTime(std::optional t); diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index d26487e9aa4..9b7fc9dc5d7 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -1631,6 +1631,7 @@ class ClickHouseCluster: instance_env_variables=False, image="clickhouse/integration-test", tag=None, + # keep the docker container running when clickhouse server is stopped stay_alive=False, ipv4_address=None, ipv6_address=None, diff --git a/tests/integration/test_refreshable_mv/test.py b/tests/integration/test_refreshable_mv/test.py index 5e764e381f3..e537f977e0e 100644 --- a/tests/integration/test_refreshable_mv/test.py +++ b/tests/integration/test_refreshable_mv/test.py @@ -21,6 +21,7 @@ node1 = cluster.add_instance( user_configs=["configs/users.xml"], with_zookeeper=True, macros={"shard": "shard1", "replica": "1"}, + stay_alive=True, ) node2 = cluster.add_instance( "node2", @@ -199,3 +200,12 @@ def test_refreshable_mv_in_replicated_db(started_cluster): node1.query("drop database re sync") node2.query("drop database re sync") + +def test_refreshable_mv_in_system_db(started_cluster): + node1.query( + "create materialized view system.a refresh every 1 second (x Int64) engine Memory as select number+1 as x from numbers(2);" + "system refresh view system.a;" + ) + node1.restart_clickhouse() + node1.query("system refresh view system.a") + assert(node1.query("select count(), sum(x) from system.a") == "2\t3\n")