mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
Overhaul dependencies
This commit is contained in:
parent
bd18522cad
commit
a524e8c51e
@ -253,7 +253,8 @@
|
||||
M(MergeTreeAllRangesAnnouncementsSent, "The current number of announcement being sent in flight from the remote server to the initiator server about the set of data parts (for MergeTree tables). Measured on the remote server side.") \
|
||||
M(CreatedTimersInQueryProfiler, "Number of Created thread local timers in QueryProfiler") \
|
||||
M(ActiveTimersInQueryProfiler, "Number of Active thread local timers in QueryProfiler") \
|
||||
M(RefreshingViews, "Number of active refreshes") \
|
||||
M(RefreshableViews, "Number materialized views with periodic refreshing (REFRESH)") \
|
||||
M(RefreshingViews, "Number of materialized views currently executing a refresh") \
|
||||
|
||||
#ifdef APPLY_FOR_EXTERNAL_METRICS
|
||||
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)
|
||||
|
@ -60,7 +60,7 @@ public:
|
||||
/// Removes all dependencies of "table_id", returns those dependencies.
|
||||
std::vector<StorageID> removeDependencies(const StorageID & table_id, bool remove_isolated_tables = false);
|
||||
|
||||
/// Removes a table from the graph and removes all references to in from the graph (both from its dependencies and dependents).
|
||||
/// Removes a table from the graph and removes all references to it from the graph (both from its dependencies and dependents).
|
||||
bool removeTable(const StorageID & table_id);
|
||||
|
||||
/// Removes tables from the graph by a specified filter.
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Parsers/ASTQueryWithTableAndOutput.h>
|
||||
#include <Parsers/ASTRenameQuery.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTRefreshStrategy.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTSubquery.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
@ -87,6 +88,12 @@ public:
|
||||
visit(child);
|
||||
}
|
||||
|
||||
void visit(ASTRefreshStrategy & refresh) const
|
||||
{
|
||||
ASTPtr unused;
|
||||
visit(refresh, unused);
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
ContextPtr context;
|
||||
@ -229,6 +236,12 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
void visit(ASTRefreshStrategy & refresh, ASTPtr &) const
|
||||
{
|
||||
for (auto & table : refresh.children)
|
||||
tryVisit<ASTTableIdentifier>(table);
|
||||
}
|
||||
|
||||
void visitChildren(IAST & ast) const
|
||||
{
|
||||
for (auto & child : ast.children)
|
||||
|
@ -1210,6 +1210,15 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
visitor.visit(*create.select);
|
||||
}
|
||||
|
||||
if (create.refresh_strategy)
|
||||
{
|
||||
/// TODO: This doesn't work for some reason.
|
||||
AddDefaultDatabaseVisitor visitor(getContext(), current_database);
|
||||
visitor.visit(*create.refresh_strategy);
|
||||
|
||||
/// TODO: For DEPENDS ON, check that the specified tables exist.
|
||||
}
|
||||
|
||||
if (create.columns_list)
|
||||
{
|
||||
AddDefaultDatabaseVisitor visitor(getContext(), current_database);
|
||||
|
@ -54,10 +54,12 @@ bool ParserRefreshStrategy::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
|
||||
if (ParserKeyword{"DEPENDS ON"}.ignore(pos, expected))
|
||||
{
|
||||
ASTPtr dependencies;
|
||||
|
||||
auto list_parser = ParserList{
|
||||
std::make_unique<ParserIdentifier>(),
|
||||
std::make_unique<ParserCompoundIdentifier>(
|
||||
/*table_name_with_optional_uuid_*/ true, /*allow_query_parameter_*/ false),
|
||||
std::make_unique<ParserToken>(TokenType::Comma),
|
||||
/* allow_empty= */ false};
|
||||
/*allow_empty*/ false};
|
||||
if (!list_parser.parse(pos, dependencies, expected))
|
||||
return false;
|
||||
refresh->set(refresh->dependencies, dependencies);
|
||||
|
@ -1,58 +0,0 @@
|
||||
#include <Storages/MaterializedView/RefreshAllCombiner.h>
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
RefreshAllCombiner::RefreshAllCombiner()
|
||||
: time_arrived{false}
|
||||
{}
|
||||
|
||||
RefreshAllCombiner::RefreshAllCombiner(const std::vector<StorageID> & parents)
|
||||
: time_arrived{false}
|
||||
{
|
||||
parents_arrived.reserve(parents.size());
|
||||
for (auto && parent : parents)
|
||||
parents_arrived.emplace(parent.uuid, false);
|
||||
}
|
||||
|
||||
bool RefreshAllCombiner::arriveTime()
|
||||
{
|
||||
std::lock_guard lock(combiner_mutex);
|
||||
time_arrived = true;
|
||||
return allArrivedLocked();
|
||||
}
|
||||
|
||||
bool RefreshAllCombiner::arriveParent(const StorageID & id)
|
||||
{
|
||||
std::lock_guard lock(combiner_mutex);
|
||||
parents_arrived[id.uuid] = true;
|
||||
return allArrivedLocked();
|
||||
}
|
||||
|
||||
void RefreshAllCombiner::flush()
|
||||
{
|
||||
std::lock_guard lock(combiner_mutex);
|
||||
flushLocked();
|
||||
}
|
||||
|
||||
bool RefreshAllCombiner::allArrivedLocked()
|
||||
{
|
||||
auto is_value = [](auto && key_value) { return key_value.second; };
|
||||
if (time_arrived && std::ranges::all_of(parents_arrived, is_value))
|
||||
{
|
||||
flushLocked();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void RefreshAllCombiner::flushLocked()
|
||||
{
|
||||
for (auto & [parent, arrived] : parents_arrived)
|
||||
arrived = false;
|
||||
time_arrived = false;
|
||||
}
|
||||
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/StorageID.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Concurrent primitive for dependency completeness registration
|
||||
/// When arrive methods return true, dependent task must be executed (or scheduled)
|
||||
/// TODO: Doesn't need to be thread safe.
|
||||
class RefreshAllCombiner
|
||||
{
|
||||
public:
|
||||
RefreshAllCombiner();
|
||||
|
||||
explicit RefreshAllCombiner(const std::vector<StorageID> & parents);
|
||||
|
||||
bool arriveTime();
|
||||
|
||||
bool arriveParent(const StorageID & id);
|
||||
|
||||
void flush();
|
||||
|
||||
private:
|
||||
bool allArrivedLocked();
|
||||
|
||||
void flushLocked();
|
||||
|
||||
std::mutex combiner_mutex;
|
||||
std::unordered_map<UUID, bool> parents_arrived;
|
||||
bool time_arrived;
|
||||
};
|
||||
|
||||
}
|
@ -1,60 +0,0 @@
|
||||
#include <Storages/MaterializedView/RefreshDependencies.h>
|
||||
|
||||
#include <Storages/MaterializedView/RefreshTask.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
RefreshDependencies::Entry::Entry(RefreshDependencies & deps, ContainerIter it)
|
||||
: dependencies{&deps}
|
||||
, entry_it{it}
|
||||
{}
|
||||
|
||||
RefreshDependencies::Entry::Entry(Entry && other) noexcept
|
||||
: dependencies(std::exchange(other.dependencies, nullptr))
|
||||
, entry_it(std::move(other.entry_it))
|
||||
{}
|
||||
|
||||
RefreshDependencies::Entry & RefreshDependencies::Entry::operator=(Entry && other) noexcept
|
||||
{
|
||||
if (this == &other)
|
||||
return *this;
|
||||
cleanup(std::exchange(dependencies, std::exchange(other.dependencies, nullptr)));
|
||||
entry_it = std::move(other.entry_it);
|
||||
return *this;
|
||||
}
|
||||
|
||||
RefreshDependencies::Entry::~Entry()
|
||||
{
|
||||
cleanup(dependencies);
|
||||
}
|
||||
|
||||
void RefreshDependencies::Entry::cleanup(RefreshDependencies * deps)
|
||||
{
|
||||
if (deps)
|
||||
deps->erase(entry_it);
|
||||
}
|
||||
|
||||
RefreshDependenciesEntry RefreshDependencies::add(RefreshTaskHolder dependency)
|
||||
{
|
||||
std::lock_guard lock(dependencies_mutex);
|
||||
return Entry(*this, dependencies.emplace(dependencies.end(), dependency));
|
||||
}
|
||||
|
||||
void RefreshDependencies::notifyAll(const StorageID & id)
|
||||
{
|
||||
std::lock_guard lock(dependencies_mutex);
|
||||
for (auto && dep : dependencies)
|
||||
{
|
||||
if (auto task = dep.lock())
|
||||
task->notify(id);
|
||||
}
|
||||
}
|
||||
|
||||
void RefreshDependencies::erase(ContainerIter it)
|
||||
{
|
||||
std::lock_guard lock(dependencies_mutex);
|
||||
dependencies.erase(it);
|
||||
}
|
||||
|
||||
}
|
@ -1,56 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/MaterializedView/RefreshTask_fwd.h>
|
||||
|
||||
#include <Interpreters/StorageID.h>
|
||||
|
||||
#include <list>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class RefreshTask;
|
||||
|
||||
/// Concurrent primitive for managing list of dependent task and notifying them
|
||||
class RefreshDependencies
|
||||
{
|
||||
using Container = std::list<RefreshTaskObserver>;
|
||||
using ContainerIter = typename Container::iterator;
|
||||
|
||||
public:
|
||||
class Entry
|
||||
{
|
||||
friend class RefreshDependencies;
|
||||
|
||||
public:
|
||||
Entry(Entry &&) noexcept;
|
||||
Entry & operator=(Entry &&) noexcept;
|
||||
|
||||
~Entry();
|
||||
|
||||
private:
|
||||
Entry(RefreshDependencies & deps, ContainerIter it);
|
||||
|
||||
void cleanup(RefreshDependencies * deps);
|
||||
|
||||
RefreshDependencies * dependencies;
|
||||
ContainerIter entry_it;
|
||||
};
|
||||
|
||||
RefreshDependencies() = default;
|
||||
|
||||
Entry add(RefreshTaskHolder dependency);
|
||||
|
||||
void notifyAll(const StorageID & id);
|
||||
|
||||
private:
|
||||
void erase(ContainerIter it);
|
||||
|
||||
std::mutex dependencies_mutex;
|
||||
std::list<RefreshTaskObserver> dependencies;
|
||||
};
|
||||
|
||||
using RefreshDependenciesEntry = RefreshDependencies::Entry;
|
||||
|
||||
}
|
@ -3,15 +3,16 @@
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric RefreshingViews;
|
||||
extern const Metric RefreshableViews;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
RefreshSetElement::RefreshSetElement(StorageID id, RefreshTaskHolder task)
|
||||
RefreshSetElement::RefreshSetElement(StorageID id, std::vector<StorageID> deps, RefreshTaskHolder task)
|
||||
: corresponding_task(task)
|
||||
, view_id(std::move(id))
|
||||
, dependencies(std::move(deps))
|
||||
{}
|
||||
|
||||
RefreshInfo RefreshSetElement::getInfo() const
|
||||
@ -36,40 +37,21 @@ RefreshInfo RefreshSetElement::getInfo() const
|
||||
};
|
||||
}
|
||||
|
||||
const StorageID & RefreshSetElement::getID() const
|
||||
{
|
||||
return view_id;
|
||||
}
|
||||
|
||||
RefreshTaskHolder RefreshSetElement::getTask() const
|
||||
{
|
||||
return corresponding_task.lock();
|
||||
}
|
||||
|
||||
bool RefreshSetLess::operator()(const RefreshSetElement & l, const RefreshSetElement & r) const
|
||||
const StorageID & RefreshSetElement::getID() const
|
||||
{
|
||||
return l.getID().uuid < r.getID().uuid;
|
||||
return view_id;
|
||||
}
|
||||
|
||||
bool RefreshSetLess::operator()(const StorageID & l, const RefreshSetElement & r) const
|
||||
const std::vector<StorageID> & RefreshSetElement::getDependencies() const
|
||||
{
|
||||
return l.uuid < r.getID().uuid;
|
||||
return dependencies;
|
||||
}
|
||||
|
||||
bool RefreshSetLess::operator()(const RefreshSetElement & l, const StorageID & r) const
|
||||
{
|
||||
return l.getID().uuid < r.uuid;
|
||||
}
|
||||
|
||||
bool RefreshSetLess::operator()(const StorageID & l, const StorageID & r) const
|
||||
{
|
||||
return l.uuid < r.uuid;
|
||||
}
|
||||
|
||||
RefreshSet::Entry::Entry()
|
||||
: parent_set{nullptr}
|
||||
{}
|
||||
|
||||
RefreshSet::Entry::Entry(Entry && other) noexcept
|
||||
: parent_set{std::exchange(other.parent_set, nullptr)}
|
||||
, iter(std::move(other.iter))
|
||||
@ -80,7 +62,8 @@ RefreshSet::Entry & RefreshSet::Entry::operator=(Entry && other) noexcept
|
||||
{
|
||||
if (this == &other)
|
||||
return *this;
|
||||
cleanup(std::exchange(parent_set, std::exchange(other.parent_set, nullptr)));
|
||||
reset();
|
||||
parent_set = std::exchange(other.parent_set, nullptr);
|
||||
iter = std::move(other.iter);
|
||||
metric_increment = std::move(other.metric_increment);
|
||||
return *this;
|
||||
@ -88,34 +71,51 @@ RefreshSet::Entry & RefreshSet::Entry::operator=(Entry && other) noexcept
|
||||
|
||||
RefreshSet::Entry::~Entry()
|
||||
{
|
||||
cleanup(parent_set);
|
||||
reset();
|
||||
}
|
||||
|
||||
RefreshSet::Entry::Entry(RefreshSet & set, ContainerIter it, const CurrentMetrics::Metric & metric)
|
||||
: parent_set{&set}, iter(std::move(it)), metric_increment(metric)
|
||||
RefreshSet::Entry::Entry(RefreshSet & set, ElementMapIter it)
|
||||
: parent_set{&set}, iter(std::move(it)), metric_increment(CurrentMetrics::RefreshableViews)
|
||||
{}
|
||||
|
||||
void RefreshSet::Entry::cleanup(RefreshSet * set)
|
||||
void RefreshSet::Entry::reset()
|
||||
{
|
||||
if (set)
|
||||
set->erase(iter);
|
||||
if (!parent_set)
|
||||
return;
|
||||
std::exchange(parent_set, nullptr)->erase(iter);
|
||||
metric_increment.reset();
|
||||
}
|
||||
|
||||
RefreshSet::RefreshSet()
|
||||
: set_metric(CurrentMetrics::RefreshingViews)
|
||||
{}
|
||||
RefreshSet::RefreshSet() {}
|
||||
|
||||
RefreshSet::Entry RefreshSet::emplace(StorageID id, std::vector<StorageID> dependencies, RefreshTaskHolder task)
|
||||
{
|
||||
std::lock_guard guard(mutex);
|
||||
auto [it, is_inserted] = elements.emplace(std::piecewise_construct, std::forward_as_tuple(id), std::forward_as_tuple(id, dependencies, std::move(task)));
|
||||
if (!is_inserted)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Refresh set entry already exists for table {}", id.getFullTableName());
|
||||
|
||||
for (const StorageID & dep : dependencies)
|
||||
{
|
||||
auto [unused, dep_inserted] = dependents[dep].insert(id);
|
||||
if (!dep_inserted)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Refresh set entry already contains dependency of {} on {}", id.getFullTableName(), dep.getFullTableName());
|
||||
}
|
||||
|
||||
return Entry(*this, std::move(it));
|
||||
}
|
||||
|
||||
RefreshTaskHolder RefreshSet::getTask(const StorageID & id) const
|
||||
{
|
||||
std::lock_guard lock(elements_mutex);
|
||||
if (auto element = elements.find(id.uuid); element != elements.end())
|
||||
std::lock_guard lock(mutex);
|
||||
if (auto element = elements.find(id); element != elements.end())
|
||||
return element->second.getTask();
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
RefreshSet::InfoContainer RefreshSet::getInfo() const
|
||||
{
|
||||
std::lock_guard lock(elements_mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
InfoContainer res;
|
||||
res.reserve(elements.size());
|
||||
for (auto && element : elements)
|
||||
@ -123,9 +123,29 @@ RefreshSet::InfoContainer RefreshSet::getInfo() const
|
||||
return res;
|
||||
}
|
||||
|
||||
void RefreshSet::erase(ContainerIter it)
|
||||
std::vector<RefreshTaskHolder> RefreshSet::getDependents(const StorageID & id) const
|
||||
{
|
||||
std::lock_guard lock(elements_mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
std::vector<RefreshTaskHolder> res;
|
||||
auto it = dependents.find(id);
|
||||
if (it == dependents.end())
|
||||
return {};
|
||||
for (auto & dep_id : it->second)
|
||||
if (auto element = elements.find(dep_id); element != elements.end())
|
||||
res.push_back(element->second.getTask());
|
||||
return res;
|
||||
}
|
||||
|
||||
void RefreshSet::erase(ElementMapIter it)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
for (const StorageID & dep : it->second.getDependencies())
|
||||
{
|
||||
auto & set = dependents[dep];
|
||||
set.erase(it->second.getID());
|
||||
if (set.empty())
|
||||
dependents.erase(dep);
|
||||
}
|
||||
elements.erase(it);
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,8 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using DatabaseAndTableNameSet = std::unordered_set<StorageID, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
|
||||
|
||||
struct RefreshInfo
|
||||
{
|
||||
String database;
|
||||
@ -33,7 +35,7 @@ class RefreshSetElement
|
||||
{
|
||||
friend class RefreshTask;
|
||||
public:
|
||||
RefreshSetElement(StorageID id, RefreshTaskHolder task);
|
||||
RefreshSetElement(StorageID id, std::vector<StorageID> deps, RefreshTaskHolder task);
|
||||
|
||||
RefreshSetElement(const RefreshSetElement &) = delete;
|
||||
RefreshSetElement & operator=(const RefreshSetElement &) = delete;
|
||||
@ -41,12 +43,13 @@ public:
|
||||
RefreshInfo getInfo() const;
|
||||
|
||||
RefreshTaskHolder getTask() const;
|
||||
|
||||
const StorageID & getID() const;
|
||||
const std::vector<StorageID> & getDependencies() const;
|
||||
|
||||
private:
|
||||
RefreshTaskObserver corresponding_task;
|
||||
StorageID view_id;
|
||||
std::vector<StorageID> dependencies;
|
||||
|
||||
std::atomic<UInt64> read_rows{0};
|
||||
std::atomic<UInt64> read_bytes{0};
|
||||
@ -63,73 +66,62 @@ private:
|
||||
std::atomic<RefreshTaskStateUnderlying> last_result{0};
|
||||
};
|
||||
|
||||
struct RefreshSetLess
|
||||
{
|
||||
using is_transparent = std::true_type;
|
||||
|
||||
bool operator()(const RefreshSetElement & l, const RefreshSetElement & r) const;
|
||||
bool operator()(const StorageID & l, const RefreshSetElement & r) const;
|
||||
bool operator()(const RefreshSetElement & l, const StorageID & r) const;
|
||||
bool operator()(const StorageID & l, const StorageID & r) const;
|
||||
};
|
||||
|
||||
/// Set of refreshable views
|
||||
class RefreshSet
|
||||
{
|
||||
private:
|
||||
using Container = std::map<UUID, RefreshSetElement>;
|
||||
using ContainerIter = typename Container::iterator;
|
||||
using ElementMap = std::unordered_map<StorageID, RefreshSetElement, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
|
||||
using ElementMapIter = typename ElementMap::iterator;
|
||||
|
||||
public:
|
||||
class Entry
|
||||
{
|
||||
friend class RefreshSet;
|
||||
public:
|
||||
Entry();
|
||||
Entry() = default;
|
||||
|
||||
Entry(Entry &&) noexcept;
|
||||
Entry & operator=(Entry &&) noexcept;
|
||||
|
||||
~Entry();
|
||||
|
||||
RefreshSetElement * operator->() { return &iter->second; }
|
||||
explicit operator bool() const { return parent_set != nullptr; }
|
||||
RefreshSetElement * operator->() { chassert(parent_set); return &iter->second; }
|
||||
|
||||
void reset();
|
||||
|
||||
private:
|
||||
RefreshSet * parent_set;
|
||||
ContainerIter iter;
|
||||
RefreshSet * parent_set = nullptr;
|
||||
ElementMapIter iter;
|
||||
std::optional<CurrentMetrics::Increment> metric_increment;
|
||||
|
||||
Entry(
|
||||
RefreshSet & set,
|
||||
ContainerIter it,
|
||||
const CurrentMetrics::Metric & metric);
|
||||
|
||||
void cleanup(RefreshSet * set);
|
||||
Entry(RefreshSet & set, ElementMapIter it);
|
||||
};
|
||||
|
||||
using InfoContainer = std::vector<RefreshInfo>;
|
||||
|
||||
RefreshSet();
|
||||
|
||||
std::optional<Entry> emplace(StorageID id, RefreshTaskHolder task)
|
||||
{
|
||||
std::lock_guard guard(elements_mutex);
|
||||
auto [it, is_inserted] = elements.emplace(std::piecewise_construct, std::forward_as_tuple(id.uuid), std::forward_as_tuple(id, std::move(task)));
|
||||
if (is_inserted)
|
||||
return Entry(*this, std::move(it), set_metric);
|
||||
return {};
|
||||
}
|
||||
Entry emplace(StorageID id, std::vector<StorageID> dependencies, RefreshTaskHolder task);
|
||||
|
||||
RefreshTaskHolder getTask(const StorageID & id) const;
|
||||
|
||||
InfoContainer getInfo() const;
|
||||
|
||||
private:
|
||||
mutable std::mutex elements_mutex;
|
||||
Container elements;
|
||||
CurrentMetrics::Metric set_metric;
|
||||
/// Get tasks that depend on the given one.
|
||||
std::vector<RefreshTaskHolder> getDependents(const StorageID & id) const;
|
||||
|
||||
void erase(ContainerIter it);
|
||||
private:
|
||||
using DependentsMap = std::unordered_map<StorageID, DatabaseAndTableNameSet, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
|
||||
|
||||
/// Protects the two maps below, not locked for any nontrivial operations (e.g. operations that
|
||||
/// block or lock other mutexes).
|
||||
mutable std::mutex mutex;
|
||||
|
||||
ElementMap elements;
|
||||
DependentsMap dependents;
|
||||
|
||||
void erase(ElementMapIter it);
|
||||
};
|
||||
|
||||
using RefreshSetEntry = RefreshSet::Entry;
|
||||
|
@ -2,20 +2,21 @@
|
||||
|
||||
#include <Storages/StorageMaterializedView.h>
|
||||
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/InterpreterInsertQuery.h>
|
||||
#include <Interpreters/InterpreterDropQuery.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Processors/Executors/ManualPipelineExecutor.h>
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric RefreshingViews;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
@ -27,26 +28,12 @@ std::uniform_int_distribution<Int64> makeSpreadDistribution(const ASTTimePeriod
|
||||
return std::uniform_int_distribution(-limit, limit);
|
||||
}
|
||||
|
||||
std::variant<RefreshEveryTimer, RefreshAfterTimer> makeRefreshTimer(const ASTRefreshStrategy & strategy)
|
||||
{
|
||||
using enum ASTRefreshStrategy::ScheduleKind;
|
||||
switch (strategy.schedule_kind)
|
||||
{
|
||||
case EVERY:
|
||||
return RefreshEveryTimer{*strategy.period, strategy.interval};
|
||||
case AFTER:
|
||||
return RefreshAfterTimer{strategy.interval};
|
||||
default:
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown refresh strategy kind");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
RefreshTask::RefreshTask(
|
||||
const ASTRefreshStrategy & strategy)
|
||||
: log(&Poco::Logger::get("RefreshTask"))
|
||||
, refresh_timer(makeRefreshTimer(strategy))
|
||||
, refresh_timer(strategy)
|
||||
, refresh_spread{makeSpreadDistribution(strategy.spread)}
|
||||
{}
|
||||
|
||||
@ -63,28 +50,13 @@ RefreshTaskHolder RefreshTask::create(
|
||||
if (auto t = self.lock())
|
||||
t->refreshTask();
|
||||
});
|
||||
task->set_entry = context->getRefreshSet().emplace(view.getStorageID(), task).value();
|
||||
|
||||
std::vector<StorageID> deps;
|
||||
if (strategy.dependencies)
|
||||
{
|
||||
if (strategy.schedule_kind != ASTRefreshStrategy::ScheduleKind::AFTER)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Dependencies are allowed only for AFTER refresh kind");
|
||||
|
||||
task->deps_entries.reserve(strategy.dependencies->children.size());
|
||||
for (auto && dependency : strategy.dependencies->children)
|
||||
{
|
||||
StorageID dep_id(dependency->as<const ASTTableIdentifier &>());
|
||||
/// TODO:
|
||||
/// * This depends on the order in which different tables are initialized.
|
||||
/// Is the order guaranteed on startup?
|
||||
/// * At what point does the table name from the query get mapped to the table's UUID?
|
||||
/// Does it work at all? Is it reliable?
|
||||
/// * Don't silently ignore if the table is missing.
|
||||
if (auto dep_task = context->getRefreshSet().getTask(dep_id))
|
||||
task->deps_entries.push_back(dep_task->dependencies.add(task));
|
||||
}
|
||||
deps.emplace_back(dependency->as<const ASTTableIdentifier &>());
|
||||
|
||||
/// TODO: Initialize combiner.
|
||||
}
|
||||
task->set_entry = context->getRefreshSet().emplace(view.getStorageID(), deps, task);
|
||||
|
||||
return task;
|
||||
}
|
||||
@ -93,6 +65,7 @@ void RefreshTask::initializeAndStart(std::shared_ptr<StorageMaterializedView> vi
|
||||
{
|
||||
view_to_refresh = view;
|
||||
/// TODO: Add a setting to stop views on startup, set `stop_requested = true` in that case.
|
||||
populateDependencies();
|
||||
calculateNextRefreshTime(std::chrono::system_clock::now());
|
||||
refresh_task->schedule();
|
||||
}
|
||||
@ -148,14 +121,78 @@ void RefreshTask::resume()
|
||||
refresh_task->schedule();
|
||||
}
|
||||
|
||||
void RefreshTask::notify(const StorageID & parent_id)
|
||||
void RefreshTask::shutdown()
|
||||
{
|
||||
{
|
||||
std::lock_guard guard(mutex);
|
||||
stop_requested = true;
|
||||
interrupt_execution.store(true);
|
||||
}
|
||||
|
||||
/// Wait for the task to return and prevent it from being scheduled in future.
|
||||
refresh_task->deactivate();
|
||||
|
||||
/// Remove from RefreshSet on DROP, without waiting for the IStorage to be destroyed.
|
||||
/// This matters because a table may get dropped and immediately created again with the same name,
|
||||
/// while the old table's IStorage still exists (pinned by ongoing queries).
|
||||
std::lock_guard guard(mutex);
|
||||
set_entry.reset();
|
||||
}
|
||||
|
||||
void RefreshTask::notify(const StorageID & parent_id, std::chrono::system_clock::time_point scheduled_time_without_spread, const RefreshTimer & parent_timer)
|
||||
{
|
||||
std::lock_guard guard(mutex);
|
||||
if (!combiner.arriveParent(parent_id))
|
||||
return;
|
||||
if (std::exchange(refresh_immediately, true))
|
||||
if (!set_entry)
|
||||
return; // we've shut down
|
||||
|
||||
/// In the general case, it's not clear what the meaning of dependencies should be.
|
||||
/// E.g. what behavior would the user want/expect in the following cases?:
|
||||
/// * REFRESH EVERY 3 HOUR depends on REFRESH EVERY 2 HOUR
|
||||
/// * REFRESH AFTER 3 HOUR depends on REFRESH AFTER 2 HOUR
|
||||
/// * REFRESH AFTER 3 HOUR depends on REFRESH EVERY 1 DAY
|
||||
/// I don't know.
|
||||
///
|
||||
/// Cases that are important to support well include:
|
||||
/// (1) REFRESH EVERY 1 DAY depends on REFRESH EVERY 1 DAY
|
||||
/// Here the second refresh should start only after the first refresh completed *for the same day*.
|
||||
/// Yesterday's refresh of the dependency shouldn't trigger today's refresh of the dependent,
|
||||
/// even if it completed today.
|
||||
/// (2) REFRESH EVERY 1 DAY OFFSET 2 HOUR depends on REFRESH EVERY 1 DAY OFFSET 1 HOUR
|
||||
/// (3) REFRESH EVERY 1 DAY OFFSET 1 HOUR depends on REFRESH EVERY 1 DAY OFFSET 23 HOUR
|
||||
/// Here the dependency's refresh on day X should trigger dependent's refresh on day X+1.
|
||||
/// (4) REFRESH EVERY 2 HOUR depends on REFRESH EVERY 1 HOUR
|
||||
/// The 2 HOUR refresh should happen after the 1 HOUR refresh for every other hour, e.g.
|
||||
/// after the 2pm refresh, then after the 4pm refresh, etc.
|
||||
/// (5) REFRESH AFTER 1 HOUR depends on REFRESH AFTER 1 HOUR
|
||||
/// Here the two views should try to synchronize their schedules instead of arbitrarily drifting
|
||||
/// apart. In particular, consider the case where the dependency refreshes slightly faster than
|
||||
/// the dependent. If we don't do anything special, the DEPENDS ON will have pretty much no effect.
|
||||
/// To apply some synchronization pressure, we reduce the dependent's delay by some percentage
|
||||
/// after the dependent completed.
|
||||
/// (6) REFRESH AFTER 1 HOUR depends on REFRESH AFTER 2 HOUR
|
||||
/// REFRESH EVERY 1 HOUR depends on REFRESH EVERY 2 HOUR
|
||||
/// Not sure about these. Currently we just make the dependent refresh at the same rate as
|
||||
/// the dependency, i.e. the 1 HOUR table will actually be refreshed every 2 hours.
|
||||
|
||||
/// Only accept the dependency's refresh if its next refresh time is after ours.
|
||||
/// This takes care of cases (1)-(4), and seems harmless in all other cases.
|
||||
/// Might be mildly helpful in weird cases like REFRESH AFTER 3 HOUR depends on REFRESH AFTER 2 HOUR.
|
||||
if (parent_timer.next(scheduled_time_without_spread) <= next_refresh_without_spread)
|
||||
return;
|
||||
|
||||
if (arriveDependency(parent_id) && !std::exchange(refresh_immediately, true))
|
||||
refresh_task->schedule();
|
||||
|
||||
/// Decrease delay in case (5).
|
||||
/// Maybe we should do it for all AFTER-AFTER dependencies, even if periods are different.
|
||||
if (refresh_timer == parent_timer && refresh_timer.tryGetAfter())
|
||||
{
|
||||
/// TODO: Implement this:
|
||||
/// * Add setting max_after_delay_adjustment_pct
|
||||
/// * Decrease both next_refresh_without_spread and next_refresh_with_spread,
|
||||
/// but only if they haven't already been decreased this way during current period
|
||||
/// * refresh_task->schedule()
|
||||
}
|
||||
}
|
||||
|
||||
void RefreshTask::refreshTask()
|
||||
@ -205,7 +242,7 @@ void RefreshTask::refreshTask()
|
||||
auto now = std::chrono::system_clock::now();
|
||||
if (now >= next_refresh_with_spread)
|
||||
{
|
||||
if (combiner.arriveTime())
|
||||
if (arriveTime())
|
||||
refresh_immediately = true;
|
||||
else
|
||||
{
|
||||
@ -239,6 +276,9 @@ void RefreshTask::refreshTask()
|
||||
|
||||
reportState(RefreshState::Running);
|
||||
|
||||
CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews);
|
||||
auto scheduled_time_without_spread = next_refresh_without_spread;
|
||||
|
||||
lock.unlock();
|
||||
|
||||
bool finished = false;
|
||||
@ -251,7 +291,7 @@ void RefreshTask::refreshTask()
|
||||
finished = executeRefresh();
|
||||
|
||||
if (finished)
|
||||
completeRefresh(view, LastTaskResult::Finished);
|
||||
completeRefresh(view, LastTaskResult::Finished, scheduled_time_without_spread);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -311,12 +351,17 @@ bool RefreshTask::executeRefresh()
|
||||
return !not_finished;
|
||||
}
|
||||
|
||||
void RefreshTask::completeRefresh(std::shared_ptr<StorageMaterializedView> view, LastTaskResult result)
|
||||
void RefreshTask::completeRefresh(std::shared_ptr<StorageMaterializedView> view, LastTaskResult result, std::chrono::system_clock::time_point scheduled_time_without_spread)
|
||||
{
|
||||
auto stale_table = view->exchangeTargetTable(refresh_query->table_id);
|
||||
dependencies.notifyAll(view->getStorageID());
|
||||
|
||||
auto drop_context = Context::createCopy(view->getContext());
|
||||
auto context = view->getContext();
|
||||
StorageID my_id = set_entry->getID();
|
||||
auto dependents = context->getRefreshSet().getDependents(my_id);
|
||||
for (const RefreshTaskHolder & dep_task : dependents)
|
||||
dep_task->notify(my_id, scheduled_time_without_spread, refresh_timer);
|
||||
|
||||
auto drop_context = Context::createCopy(context);
|
||||
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, drop_context, drop_context, stale_table, /*sync=*/true);
|
||||
|
||||
cleanState();
|
||||
@ -351,16 +396,6 @@ void RefreshTask::cleanState()
|
||||
refresh_query.reset();
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
template <typename... Ts>
|
||||
struct CombinedVisitor : Ts... { using Ts::operator()...; };
|
||||
template <typename... Ts>
|
||||
CombinedVisitor(Ts...) -> CombinedVisitor<Ts...>;
|
||||
|
||||
}
|
||||
|
||||
void RefreshTask::calculateNextRefreshTime(std::chrono::system_clock::time_point now)
|
||||
{
|
||||
/// TODO: Add a setting to randomize initial delay in case of AFTER, for the case when the server
|
||||
@ -368,21 +403,11 @@ void RefreshTask::calculateNextRefreshTime(std::chrono::system_clock::time_point
|
||||
/// TODO: Maybe do something like skip_update_after_seconds and skip_update_after_ratio.
|
||||
/// Unclear if that's useful at all if the last refresh timestamp is not remembered across restarts.
|
||||
|
||||
auto advance = [&](std::chrono::system_clock::time_point t)
|
||||
{
|
||||
CombinedVisitor refresh_time_visitor{
|
||||
[t](const RefreshAfterTimer & timer) { return timer.after(t); },
|
||||
[t](const RefreshEveryTimer & timer) { return timer.next(t); }};
|
||||
auto r = std::visit<std::chrono::sys_seconds>(std::move(refresh_time_visitor), refresh_timer);
|
||||
chassert(r > t);
|
||||
return r;
|
||||
};
|
||||
|
||||
/// It's important to use time without spread here, otherwise we would do multiple refreshes instead
|
||||
/// of one, if the generated spread is negative and the first refresh completes faster than the spread.
|
||||
std::chrono::sys_seconds next = advance(next_refresh_without_spread);
|
||||
std::chrono::sys_seconds next = refresh_timer.next(next_refresh_without_spread);
|
||||
if (next < now)
|
||||
next = advance(now); // fell behind, skip to current time
|
||||
next = refresh_timer.next(now); // fell behind, skip to current time
|
||||
|
||||
next_refresh_without_spread = next;
|
||||
next_refresh_with_spread = next + std::chrono::seconds{refresh_spread(thread_local_rng)};
|
||||
@ -390,6 +415,32 @@ void RefreshTask::calculateNextRefreshTime(std::chrono::system_clock::time_point
|
||||
reportNextRefreshTime(next_refresh_with_spread);
|
||||
}
|
||||
|
||||
bool RefreshTask::arriveDependency(const StorageID & parent_table_or_timer)
|
||||
{
|
||||
remaining_dependencies.erase(parent_table_or_timer);
|
||||
if (!remaining_dependencies.empty() || !time_arrived)
|
||||
return false;
|
||||
populateDependencies();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool RefreshTask::arriveTime()
|
||||
{
|
||||
time_arrived = true;
|
||||
if (!remaining_dependencies.empty() || !time_arrived)
|
||||
return false;
|
||||
populateDependencies();
|
||||
return true;
|
||||
}
|
||||
|
||||
void RefreshTask::populateDependencies()
|
||||
{
|
||||
chassert(remaining_dependencies.empty());
|
||||
auto deps = set_entry->getDependencies();
|
||||
remaining_dependencies.insert(deps.begin(), deps.end());
|
||||
time_arrived = false;
|
||||
}
|
||||
|
||||
std::shared_ptr<StorageMaterializedView> RefreshTask::lockView()
|
||||
{
|
||||
return std::static_pointer_cast<StorageMaterializedView>(view_to_refresh.lock());
|
||||
|
@ -1,7 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/MaterializedView/RefreshAllCombiner.h>
|
||||
#include <Storages/MaterializedView/RefreshDependencies.h>
|
||||
#include <Storages/MaterializedView/RefreshSet.h>
|
||||
#include <Storages/MaterializedView/RefreshTask_fwd.h>
|
||||
#include <Storages/MaterializedView/RefreshTimers.h>
|
||||
@ -70,8 +68,11 @@ public:
|
||||
/// Resume task execution
|
||||
void resume();
|
||||
|
||||
/// Permanently disable task scheduling and remove this table from RefreshSet.
|
||||
void shutdown();
|
||||
|
||||
/// Notify dependent task
|
||||
void notify(const StorageID & parent_id);
|
||||
void notify(const StorageID & parent_id, std::chrono::system_clock::time_point scheduled_time_without_spread, const RefreshTimer & parent_timer);
|
||||
|
||||
private:
|
||||
Poco::Logger * log = nullptr;
|
||||
@ -79,7 +80,7 @@ private:
|
||||
RefreshSet::Entry set_entry;
|
||||
|
||||
/// Refresh schedule
|
||||
std::variant<RefreshEveryTimer, RefreshAfterTimer> refresh_timer;
|
||||
RefreshTimer refresh_timer;
|
||||
std::uniform_int_distribution<Int64> refresh_spread;
|
||||
|
||||
/// Task execution. Non-empty iff a refresh is in progress (possibly paused).
|
||||
@ -88,10 +89,9 @@ private:
|
||||
std::optional<BlockIO> refresh_block;
|
||||
std::shared_ptr<ASTInsertQuery> refresh_query;
|
||||
|
||||
/// Concurrent dependency management
|
||||
RefreshAllCombiner combiner;
|
||||
RefreshDependencies dependencies;
|
||||
std::vector<RefreshDependencies::Entry> deps_entries;
|
||||
/// StorageIDs of our dependencies that we're waiting for.
|
||||
DatabaseAndTableNameSet remaining_dependencies;
|
||||
bool time_arrived = false;
|
||||
|
||||
/// Protects all fields below (they're accessed by both refreshTask() and public methods).
|
||||
/// Never locked for blocking operations (e.g. creating or dropping the internal table).
|
||||
@ -132,13 +132,18 @@ private:
|
||||
/// Methods that do the actual work: creating/dropping internal table, executing the query.
|
||||
void initializeRefresh(std::shared_ptr<const StorageMaterializedView> view);
|
||||
bool executeRefresh();
|
||||
void completeRefresh(std::shared_ptr<StorageMaterializedView> view, LastTaskResult result);
|
||||
void completeRefresh(std::shared_ptr<StorageMaterializedView> view, LastTaskResult result, std::chrono::system_clock::time_point scheduled_time_without_spread);
|
||||
void cancelRefresh(LastTaskResult result);
|
||||
void cleanState();
|
||||
|
||||
/// Assigns next_refresh_*
|
||||
void calculateNextRefreshTime(std::chrono::system_clock::time_point now);
|
||||
|
||||
/// Returns true if all dependencies are fulfilled now. Refills remaining_dependencies in this case.
|
||||
bool arriveDependency(const StorageID & parent_table_or_timer);
|
||||
bool arriveTime();
|
||||
void populateDependencies();
|
||||
|
||||
std::shared_ptr<StorageMaterializedView> lockView();
|
||||
|
||||
/// Methods that push information to RefreshSet, for observability.
|
||||
|
@ -1,10 +1,15 @@
|
||||
#include <Storages/MaterializedView/RefreshTimers.h>
|
||||
|
||||
#include <Parsers/ASTTimeInterval.h>
|
||||
#include <Parsers/ASTRefreshStrategy.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
constexpr std::chrono::days ZERO_DAYS{0};
|
||||
@ -68,6 +73,14 @@ void RefreshAfterTimer::setWithKind(IntervalKind kind, UInt64 val)
|
||||
}
|
||||
}
|
||||
|
||||
bool RefreshAfterTimer::operator==(const RefreshAfterTimer & rhs) const
|
||||
{
|
||||
/// (Or maybe different implementations of standard library have different sizes of chrono types.
|
||||
/// If so, feel free to just remove this assert.)
|
||||
static_assert(sizeof(*this) == 40, "RefreshAfterTimer fields appear to have changed. Please update this operator==() here.");
|
||||
return std::tie(seconds, minutes, hours, days, weeks, months, years) == std::tie(rhs.seconds, rhs.minutes, rhs.hours, rhs.days, rhs.weeks, rhs.months, rhs.years);
|
||||
}
|
||||
|
||||
RefreshEveryTimer::RefreshEveryTimer(const ASTTimePeriod & time_period, const ASTTimeInterval * time_offset)
|
||||
: offset(time_offset)
|
||||
, value{static_cast<UInt32>(time_period.value)}
|
||||
@ -240,4 +253,50 @@ std::chrono::sys_seconds RefreshEveryTimer::alignedToSeconds(std::chrono::system
|
||||
return tp_minutes + next_seconds;
|
||||
}
|
||||
|
||||
bool RefreshEveryTimer::operator==(const RefreshEveryTimer & rhs) const
|
||||
{
|
||||
static_assert(sizeof(*this) == sizeof(offset) + 8, "RefreshEveryTimer fields appear to have changed. Please update this operator==() here.");
|
||||
return std::tie(offset, value, kind) == std::tie(rhs.offset, rhs.value, rhs.kind);
|
||||
}
|
||||
|
||||
std::variant<RefreshEveryTimer, RefreshAfterTimer> makeTimer(const ASTRefreshStrategy & strategy)
|
||||
{
|
||||
using enum ASTRefreshStrategy::ScheduleKind;
|
||||
switch (strategy.schedule_kind)
|
||||
{
|
||||
case EVERY:
|
||||
return RefreshEveryTimer{*strategy.period, strategy.interval};
|
||||
case AFTER:
|
||||
return RefreshAfterTimer{strategy.interval};
|
||||
default:
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown refresh strategy kind");
|
||||
}
|
||||
}
|
||||
|
||||
RefreshTimer::RefreshTimer(const ASTRefreshStrategy & strategy) : timer(makeTimer(strategy)) {}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
template <typename... Ts>
|
||||
struct CombinedVisitor : Ts... { using Ts::operator()...; };
|
||||
template <typename... Ts>
|
||||
CombinedVisitor(Ts...) -> CombinedVisitor<Ts...>;
|
||||
|
||||
}
|
||||
|
||||
std::chrono::sys_seconds RefreshTimer::next(std::chrono::system_clock::time_point tp) const
|
||||
{
|
||||
CombinedVisitor visitor{
|
||||
[tp](const RefreshAfterTimer & timer_) { return timer_.after(tp); },
|
||||
[tp](const RefreshEveryTimer & timer_) { return timer_.next(tp); }};
|
||||
auto r = std::visit<std::chrono::sys_seconds>(std::move(visitor), timer);
|
||||
chassert(r > tp);
|
||||
return r;
|
||||
}
|
||||
|
||||
bool RefreshTimer::operator==(const RefreshTimer & rhs) const { return timer == rhs.timer; }
|
||||
const RefreshAfterTimer * RefreshTimer::tryGetAfter() const { return std::get_if<RefreshAfterTimer>(&timer); }
|
||||
const RefreshEveryTimer * RefreshTimer::tryGetEvery() const { return std::get_if<RefreshEveryTimer>(&timer); }
|
||||
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ namespace DB
|
||||
|
||||
class ASTTimeInterval;
|
||||
class ASTTimePeriod;
|
||||
class ASTRefreshStrategy;
|
||||
|
||||
/// Schedule timer for MATERIALIZED VIEW ... REFRESH AFTER ... queries
|
||||
class RefreshAfterTimer
|
||||
@ -26,6 +27,8 @@ public:
|
||||
std::chrono::months getMonths() const { return months; }
|
||||
std::chrono::years getYears() const { return years; }
|
||||
|
||||
bool operator==(const RefreshAfterTimer & rhs) const;
|
||||
|
||||
private:
|
||||
void setWithKind(IntervalKind kind, UInt64 val);
|
||||
|
||||
@ -46,6 +49,8 @@ public:
|
||||
|
||||
std::chrono::sys_seconds next(std::chrono::system_clock::time_point tp) const;
|
||||
|
||||
bool operator==(const RefreshEveryTimer & rhs) const;
|
||||
|
||||
private:
|
||||
std::chrono::sys_seconds alignedToYears(std::chrono::system_clock::time_point tp) const;
|
||||
|
||||
@ -66,4 +71,18 @@ private:
|
||||
IntervalKind kind{IntervalKind::Second};
|
||||
};
|
||||
|
||||
struct RefreshTimer
|
||||
{
|
||||
std::variant<RefreshEveryTimer, RefreshAfterTimer> timer;
|
||||
|
||||
explicit RefreshTimer(const ASTRefreshStrategy & strategy);
|
||||
|
||||
std::chrono::sys_seconds next(std::chrono::system_clock::time_point tp) const;
|
||||
|
||||
bool operator==(const RefreshTimer & rhs) const;
|
||||
|
||||
const RefreshAfterTimer * tryGetAfter() const;
|
||||
const RefreshEveryTimer * tryGetEvery() const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -451,7 +451,7 @@ void StorageMaterializedView::startup()
|
||||
void StorageMaterializedView::shutdown(bool)
|
||||
{
|
||||
if (refresher)
|
||||
refresher->stop();
|
||||
refresher->shutdown();
|
||||
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
const auto & select_query = metadata_snapshot->getSelectQuery();
|
||||
|
Loading…
Reference in New Issue
Block a user