mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-14 19:45:11 +00:00
Fix refreshable MV in system database breaking server startup
This commit is contained in:
parent
e78ad9d5e6
commit
f7d9c3a9b1
@ -69,6 +69,7 @@
|
||||
#include <Interpreters/registerInterpreters.h>
|
||||
#include <Interpreters/JIT/CompiledExpressionCache.h>
|
||||
#include <Access/AccessControl.h>
|
||||
#include <Storages/MaterializedView/RefreshSet.h>
|
||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/System/attachSystemTables.h>
|
||||
@ -2083,6 +2084,12 @@ try
|
||||
|
||||
try
|
||||
{
|
||||
/// Don't run background queries until we loaded tables.
|
||||
/// (In particular things would break if a background drop query happens before the
|
||||
/// loadMarkedAsDroppedTables() call below - it'll see dropped table metadata and try to
|
||||
/// drop the table a second time and throw an exception.)
|
||||
global_context->getRefreshSet().setRefreshesStopped(true);
|
||||
|
||||
auto & database_catalog = DatabaseCatalog::instance();
|
||||
/// We load temporary database first, because projections need it.
|
||||
database_catalog.initializeAndLoadTemporaryDatabase();
|
||||
@ -2122,6 +2129,8 @@ try
|
||||
database_catalog.assertDatabaseExists(default_database);
|
||||
/// Load user-defined SQL functions.
|
||||
global_context->getUserDefinedSQLObjectsStorage().loadObjects();
|
||||
|
||||
global_context->getRefreshSet().setRefreshesStopped(false);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -176,7 +176,26 @@ void RefreshSet::notifyDependents(const StorageID & id) const
|
||||
res.push_back(task);
|
||||
}
|
||||
for (const RefreshTaskPtr & t : res)
|
||||
t->notifyDependencyProgress();
|
||||
t->notify();
|
||||
}
|
||||
|
||||
void RefreshSet::setRefreshesStopped(bool stopped)
|
||||
{
|
||||
|
||||
TaskMap tasks_copy;
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
refreshes_stopped.store(stopped);
|
||||
tasks_copy = tasks;
|
||||
}
|
||||
for (const auto & kv : tasks_copy)
|
||||
for (const RefreshTaskPtr & t : kv.second)
|
||||
t->notify();
|
||||
}
|
||||
|
||||
bool RefreshSet::refreshesStopped() const
|
||||
{
|
||||
return refreshes_stopped.load();
|
||||
}
|
||||
|
||||
RefreshSet::Handle::Handle(RefreshSet * parent_set_, StorageID id_, std::optional<StorageID> inner_table_id_, RefreshTaskList::iterator iter_, RefreshTaskList::iterator inner_table_iter_, std::vector<StorageID> dependencies_)
|
||||
|
@ -62,9 +62,12 @@ public:
|
||||
|
||||
RefreshTaskPtr tryGetTaskForInnerTable(const StorageID & inner_table_id) const;
|
||||
|
||||
/// Calls notifyDependencyProgress() on all tasks that depend on `id`.
|
||||
/// Calls notify() on all tasks that depend on `id`.
|
||||
void notifyDependents(const StorageID & id) const;
|
||||
|
||||
void setRefreshesStopped(bool stopped);
|
||||
bool refreshesStopped() const;
|
||||
|
||||
private:
|
||||
using TaskMap = std::unordered_map<StorageID, RefreshTaskList, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
|
||||
using DependentsMap = std::unordered_map<StorageID, std::unordered_set<RefreshTaskPtr>, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
|
||||
@ -78,6 +81,8 @@ private:
|
||||
DependentsMap dependents;
|
||||
InnerTableMap inner_tables;
|
||||
|
||||
std::atomic<bool> refreshes_stopped {false};
|
||||
|
||||
RefreshTaskList::iterator addTaskLocked(StorageID id, RefreshTaskPtr task);
|
||||
void removeTaskLocked(StorageID id, RefreshTaskList::iterator iter);
|
||||
RefreshTaskList::iterator addInnerTableLocked(StorageID inner_table_id, RefreshTaskPtr task);
|
||||
|
@ -299,9 +299,11 @@ std::chrono::sys_seconds RefreshTask::getNextRefreshTimeslot() const
|
||||
return refresh_schedule.advance(coordination.root_znode.last_completed_timeslot);
|
||||
}
|
||||
|
||||
void RefreshTask::notifyDependencyProgress()
|
||||
void RefreshTask::notify()
|
||||
{
|
||||
std::lock_guard guard(mutex);
|
||||
if (view && view->getContext()->getRefreshSet().refreshesStopped())
|
||||
interruptExecution();
|
||||
scheduling.dependencies_satisfied_until = std::chrono::sys_seconds(std::chrono::seconds(-1));
|
||||
refresh_task->schedule();
|
||||
}
|
||||
@ -367,7 +369,7 @@ void RefreshTask::refreshTask()
|
||||
|
||||
chassert(lock.owns_lock());
|
||||
|
||||
if (scheduling.stop_requested || coordination.read_only)
|
||||
if (scheduling.stop_requested || view->getContext()->getRefreshSet().refreshesStopped() || coordination.read_only)
|
||||
{
|
||||
/// Exit the task and wait for the user to start or resume, which will schedule the task again.
|
||||
setState(RefreshState::Disabled, lock);
|
||||
|
@ -83,8 +83,9 @@ public:
|
||||
/// A measure of how far this view has progressed. Used by dependent views.
|
||||
std::chrono::sys_seconds getNextRefreshTimeslot() const;
|
||||
|
||||
/// Called when progress is made (i.e. getNextRefreshTimeslot() changes) in any task that this task depends on.
|
||||
void notifyDependencyProgress();
|
||||
/// Called when refresh scheduling needs to be reconsidered, e.g. after a refresh happens in
|
||||
/// any task that this task depends on.
|
||||
void notify();
|
||||
|
||||
/// For tests
|
||||
void setFakeTime(std::optional<Int64> t);
|
||||
|
@ -1631,6 +1631,7 @@ class ClickHouseCluster:
|
||||
instance_env_variables=False,
|
||||
image="clickhouse/integration-test",
|
||||
tag=None,
|
||||
# keep the docker container running when clickhouse server is stopped
|
||||
stay_alive=False,
|
||||
ipv4_address=None,
|
||||
ipv6_address=None,
|
||||
|
@ -21,6 +21,7 @@ node1 = cluster.add_instance(
|
||||
user_configs=["configs/users.xml"],
|
||||
with_zookeeper=True,
|
||||
macros={"shard": "shard1", "replica": "1"},
|
||||
stay_alive=True,
|
||||
)
|
||||
node2 = cluster.add_instance(
|
||||
"node2",
|
||||
@ -199,3 +200,12 @@ def test_refreshable_mv_in_replicated_db(started_cluster):
|
||||
|
||||
node1.query("drop database re sync")
|
||||
node2.query("drop database re sync")
|
||||
|
||||
def test_refreshable_mv_in_system_db(started_cluster):
|
||||
node1.query(
|
||||
"create materialized view system.a refresh every 1 second (x Int64) engine Memory as select number+1 as x from numbers(2);"
|
||||
"system refresh view system.a;"
|
||||
)
|
||||
node1.restart_clickhouse()
|
||||
node1.query("system refresh view system.a")
|
||||
assert(node1.query("select count(), sum(x) from system.a") == "2\t3\n")
|
||||
|
Loading…
Reference in New Issue
Block a user