Merge pull request #70460 from ClickHouse/dep

Fix refreshable MV in system database breaking server startup
This commit is contained in:
Michael Kolupaev 2024-10-08 21:17:12 +00:00 committed by GitHub
commit ab06938478
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 57 additions and 6 deletions

View File

@ -69,6 +69,7 @@
#include <Interpreters/registerInterpreters.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Access/AccessControl.h>
#include <Storages/MaterializedView/RefreshSet.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/System/attachSystemTables.h>
@ -2083,6 +2084,12 @@ try
try
{
/// Don't run background queries until we loaded tables.
/// (In particular things would break if a background drop query happens before the
/// loadMarkedAsDroppedTables() call below - it'll see dropped table metadata and try to
/// drop the table a second time and throw an exception.)
global_context->getRefreshSet().setRefreshesStopped(true);
auto & database_catalog = DatabaseCatalog::instance();
/// We load temporary database first, because projections need it.
database_catalog.initializeAndLoadTemporaryDatabase();
@ -2122,6 +2129,8 @@ try
database_catalog.assertDatabaseExists(default_database);
/// Load user-defined SQL functions.
global_context->getUserDefinedSQLObjectsStorage().loadObjects();
global_context->getRefreshSet().setRefreshesStopped(false);
}
catch (...)
{

View File

@ -176,7 +176,26 @@ void RefreshSet::notifyDependents(const StorageID & id) const
res.push_back(task);
}
for (const RefreshTaskPtr & t : res)
t->notifyDependencyProgress();
t->notify();
}
void RefreshSet::setRefreshesStopped(bool stopped)
{
TaskMap tasks_copy;
{
std::lock_guard lock(mutex);
refreshes_stopped.store(stopped);
tasks_copy = tasks;
}
for (const auto & kv : tasks_copy)
for (const RefreshTaskPtr & t : kv.second)
t->notify();
}
bool RefreshSet::refreshesStopped() const
{
return refreshes_stopped.load();
}
RefreshSet::Handle::Handle(RefreshSet * parent_set_, StorageID id_, std::optional<StorageID> inner_table_id_, RefreshTaskList::iterator iter_, RefreshTaskList::iterator inner_table_iter_, std::vector<StorageID> dependencies_)

View File

@ -62,9 +62,12 @@ public:
RefreshTaskPtr tryGetTaskForInnerTable(const StorageID & inner_table_id) const;
/// Calls notifyDependencyProgress() on all tasks that depend on `id`.
/// Calls notify() on all tasks that depend on `id`.
void notifyDependents(const StorageID & id) const;
void setRefreshesStopped(bool stopped);
bool refreshesStopped() const;
private:
using TaskMap = std::unordered_map<StorageID, RefreshTaskList, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
using DependentsMap = std::unordered_map<StorageID, std::unordered_set<RefreshTaskPtr>, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
@ -78,6 +81,8 @@ private:
DependentsMap dependents;
InnerTableMap inner_tables;
std::atomic<bool> refreshes_stopped {false};
RefreshTaskList::iterator addTaskLocked(StorageID id, RefreshTaskPtr task);
void removeTaskLocked(StorageID id, RefreshTaskList::iterator iter);
RefreshTaskList::iterator addInnerTableLocked(StorageID inner_table_id, RefreshTaskPtr task);

View File

@ -299,9 +299,11 @@ std::chrono::sys_seconds RefreshTask::getNextRefreshTimeslot() const
return refresh_schedule.advance(coordination.root_znode.last_completed_timeslot);
}
void RefreshTask::notifyDependencyProgress()
void RefreshTask::notify()
{
std::lock_guard guard(mutex);
if (view && view->getContext()->getRefreshSet().refreshesStopped())
interruptExecution();
scheduling.dependencies_satisfied_until = std::chrono::sys_seconds(std::chrono::seconds(-1));
refresh_task->schedule();
}
@ -367,7 +369,7 @@ void RefreshTask::refreshTask()
chassert(lock.owns_lock());
if (scheduling.stop_requested || coordination.read_only)
if (scheduling.stop_requested || view->getContext()->getRefreshSet().refreshesStopped() || coordination.read_only)
{
/// Exit the task and wait for the user to start or resume, which will schedule the task again.
setState(RefreshState::Disabled, lock);

View File

@ -83,8 +83,9 @@ public:
/// A measure of how far this view has progressed. Used by dependent views.
std::chrono::sys_seconds getNextRefreshTimeslot() const;
/// Called when progress is made (i.e. getNextRefreshTimeslot() changes) in any task that this task depends on.
void notifyDependencyProgress();
/// Called when refresh scheduling needs to be reconsidered, e.g. after a refresh happens in
/// any task that this task depends on.
void notify();
/// For tests
void setFakeTime(std::optional<Int64> t);

View File

@ -1631,6 +1631,7 @@ class ClickHouseCluster:
instance_env_variables=False,
image="clickhouse/integration-test",
tag=None,
# keep the docker container running when clickhouse server is stopped
stay_alive=False,
ipv4_address=None,
ipv6_address=None,

View File

@ -21,6 +21,7 @@ node1 = cluster.add_instance(
user_configs=["configs/users.xml"],
with_zookeeper=True,
macros={"shard": "shard1", "replica": "1"},
stay_alive=True,
)
node2 = cluster.add_instance(
"node2",
@ -203,3 +204,16 @@ def test_refreshable_mv_in_replicated_db(started_cluster):
node1.query("drop database re sync")
node2.query("drop database re sync")
def test_refreshable_mv_in_system_db(started_cluster):
node1.query(
"create materialized view system.a refresh every 1 second (x Int64) engine Memory as select number+1 as x from numbers(2);"
"system refresh view system.a;"
)
node1.restart_clickhouse()
node1.query("system refresh view system.a")
assert node1.query("select count(), sum(x) from system.a") == "2\t3\n"
node1.query("drop table system.a")