diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 87de19b8907..2613e9ec116 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -253,7 +253,8 @@ M(MergeTreeAllRangesAnnouncementsSent, "The current number of announcement being sent in flight from the remote server to the initiator server about the set of data parts (for MergeTree tables). Measured on the remote server side.") \ M(CreatedTimersInQueryProfiler, "Number of Created thread local timers in QueryProfiler") \ M(ActiveTimersInQueryProfiler, "Number of Active thread local timers in QueryProfiler") \ - M(RefreshingViews, "Number of active refreshes") \ + M(RefreshableViews, "Number materialized views with periodic refreshing (REFRESH)") \ + M(RefreshingViews, "Number of materialized views currently executing a refresh") \ #ifdef APPLY_FOR_EXTERNAL_METRICS #define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M) diff --git a/src/Databases/TablesDependencyGraph.h b/src/Databases/TablesDependencyGraph.h index e71d5ecc5fc..50be3bbf969 100644 --- a/src/Databases/TablesDependencyGraph.h +++ b/src/Databases/TablesDependencyGraph.h @@ -60,7 +60,7 @@ public: /// Removes all dependencies of "table_id", returns those dependencies. std::vector removeDependencies(const StorageID & table_id, bool remove_isolated_tables = false); - /// Removes a table from the graph and removes all references to in from the graph (both from its dependencies and dependents). + /// Removes a table from the graph and removes all references to it from the graph (both from its dependencies and dependents). bool removeTable(const StorageID & table_id); /// Removes tables from the graph by a specified filter. diff --git a/src/Interpreters/AddDefaultDatabaseVisitor.h b/src/Interpreters/AddDefaultDatabaseVisitor.h index 27639c4b813..e6354467938 100644 --- a/src/Interpreters/AddDefaultDatabaseVisitor.h +++ b/src/Interpreters/AddDefaultDatabaseVisitor.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -87,6 +88,12 @@ public: visit(child); } + void visit(ASTRefreshStrategy & refresh) const + { + ASTPtr unused; + visit(refresh, unused); + } + private: ContextPtr context; @@ -229,6 +236,12 @@ private: } } + void visit(ASTRefreshStrategy & refresh, ASTPtr &) const + { + for (auto & table : refresh.children) + tryVisit(table); + } + void visitChildren(IAST & ast) const { for (auto & child : ast.children) diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 1eadb325e95..801a46f4167 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -1210,6 +1210,15 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) visitor.visit(*create.select); } + if (create.refresh_strategy) + { + /// TODO: This doesn't work for some reason. + AddDefaultDatabaseVisitor visitor(getContext(), current_database); + visitor.visit(*create.refresh_strategy); + + /// TODO: For DEPENDS ON, check that the specified tables exist. + } + if (create.columns_list) { AddDefaultDatabaseVisitor visitor(getContext(), current_database); diff --git a/src/Parsers/ParserRefreshStrategy.cpp b/src/Parsers/ParserRefreshStrategy.cpp index a6fbb373ed3..05dd081e61d 100644 --- a/src/Parsers/ParserRefreshStrategy.cpp +++ b/src/Parsers/ParserRefreshStrategy.cpp @@ -54,10 +54,12 @@ bool ParserRefreshStrategy::parseImpl(Pos & pos, ASTPtr & node, Expected & expec if (ParserKeyword{"DEPENDS ON"}.ignore(pos, expected)) { ASTPtr dependencies; + auto list_parser = ParserList{ - std::make_unique(), + std::make_unique( + /*table_name_with_optional_uuid_*/ true, /*allow_query_parameter_*/ false), std::make_unique(TokenType::Comma), - /* allow_empty= */ false}; + /*allow_empty*/ false}; if (!list_parser.parse(pos, dependencies, expected)) return false; refresh->set(refresh->dependencies, dependencies); diff --git a/src/Storages/MaterializedView/RefreshAllCombiner.cpp b/src/Storages/MaterializedView/RefreshAllCombiner.cpp deleted file mode 100644 index 5cb06ade9c7..00000000000 --- a/src/Storages/MaterializedView/RefreshAllCombiner.cpp +++ /dev/null @@ -1,58 +0,0 @@ -#include - -#include - -namespace DB -{ - -RefreshAllCombiner::RefreshAllCombiner() - : time_arrived{false} -{} - -RefreshAllCombiner::RefreshAllCombiner(const std::vector & parents) - : time_arrived{false} -{ - parents_arrived.reserve(parents.size()); - for (auto && parent : parents) - parents_arrived.emplace(parent.uuid, false); -} - -bool RefreshAllCombiner::arriveTime() -{ - std::lock_guard lock(combiner_mutex); - time_arrived = true; - return allArrivedLocked(); -} - -bool RefreshAllCombiner::arriveParent(const StorageID & id) -{ - std::lock_guard lock(combiner_mutex); - parents_arrived[id.uuid] = true; - return allArrivedLocked(); -} - -void RefreshAllCombiner::flush() -{ - std::lock_guard lock(combiner_mutex); - flushLocked(); -} - -bool RefreshAllCombiner::allArrivedLocked() -{ - auto is_value = [](auto && key_value) { return key_value.second; }; - if (time_arrived && std::ranges::all_of(parents_arrived, is_value)) - { - flushLocked(); - return true; - } - return false; -} - -void RefreshAllCombiner::flushLocked() -{ - for (auto & [parent, arrived] : parents_arrived) - arrived = false; - time_arrived = false; -} - -} diff --git a/src/Storages/MaterializedView/RefreshAllCombiner.h b/src/Storages/MaterializedView/RefreshAllCombiner.h deleted file mode 100644 index f9f3a8d319c..00000000000 --- a/src/Storages/MaterializedView/RefreshAllCombiner.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include - -namespace DB -{ - -/// Concurrent primitive for dependency completeness registration -/// When arrive methods return true, dependent task must be executed (or scheduled) -/// TODO: Doesn't need to be thread safe. -class RefreshAllCombiner -{ -public: - RefreshAllCombiner(); - - explicit RefreshAllCombiner(const std::vector & parents); - - bool arriveTime(); - - bool arriveParent(const StorageID & id); - - void flush(); - -private: - bool allArrivedLocked(); - - void flushLocked(); - - std::mutex combiner_mutex; - std::unordered_map parents_arrived; - bool time_arrived; -}; - -} diff --git a/src/Storages/MaterializedView/RefreshDependencies.cpp b/src/Storages/MaterializedView/RefreshDependencies.cpp deleted file mode 100644 index f1a834a6b3a..00000000000 --- a/src/Storages/MaterializedView/RefreshDependencies.cpp +++ /dev/null @@ -1,60 +0,0 @@ -#include - -#include - -namespace DB -{ - -RefreshDependencies::Entry::Entry(RefreshDependencies & deps, ContainerIter it) - : dependencies{&deps} - , entry_it{it} -{} - -RefreshDependencies::Entry::Entry(Entry && other) noexcept - : dependencies(std::exchange(other.dependencies, nullptr)) - , entry_it(std::move(other.entry_it)) -{} - -RefreshDependencies::Entry & RefreshDependencies::Entry::operator=(Entry && other) noexcept -{ - if (this == &other) - return *this; - cleanup(std::exchange(dependencies, std::exchange(other.dependencies, nullptr))); - entry_it = std::move(other.entry_it); - return *this; -} - -RefreshDependencies::Entry::~Entry() -{ - cleanup(dependencies); -} - -void RefreshDependencies::Entry::cleanup(RefreshDependencies * deps) -{ - if (deps) - deps->erase(entry_it); -} - -RefreshDependenciesEntry RefreshDependencies::add(RefreshTaskHolder dependency) -{ - std::lock_guard lock(dependencies_mutex); - return Entry(*this, dependencies.emplace(dependencies.end(), dependency)); -} - -void RefreshDependencies::notifyAll(const StorageID & id) -{ - std::lock_guard lock(dependencies_mutex); - for (auto && dep : dependencies) - { - if (auto task = dep.lock()) - task->notify(id); - } -} - -void RefreshDependencies::erase(ContainerIter it) -{ - std::lock_guard lock(dependencies_mutex); - dependencies.erase(it); -} - -} diff --git a/src/Storages/MaterializedView/RefreshDependencies.h b/src/Storages/MaterializedView/RefreshDependencies.h deleted file mode 100644 index 8d370f96d40..00000000000 --- a/src/Storages/MaterializedView/RefreshDependencies.h +++ /dev/null @@ -1,56 +0,0 @@ -#pragma once - -#include - -#include - -#include - - -namespace DB -{ - -class RefreshTask; - -/// Concurrent primitive for managing list of dependent task and notifying them -class RefreshDependencies -{ - using Container = std::list; - using ContainerIter = typename Container::iterator; - -public: - class Entry - { - friend class RefreshDependencies; - - public: - Entry(Entry &&) noexcept; - Entry & operator=(Entry &&) noexcept; - - ~Entry(); - - private: - Entry(RefreshDependencies & deps, ContainerIter it); - - void cleanup(RefreshDependencies * deps); - - RefreshDependencies * dependencies; - ContainerIter entry_it; - }; - - RefreshDependencies() = default; - - Entry add(RefreshTaskHolder dependency); - - void notifyAll(const StorageID & id); - -private: - void erase(ContainerIter it); - - std::mutex dependencies_mutex; - std::list dependencies; -}; - -using RefreshDependenciesEntry = RefreshDependencies::Entry; - -} diff --git a/src/Storages/MaterializedView/RefreshSet.cpp b/src/Storages/MaterializedView/RefreshSet.cpp index 9efd82e1afc..bef628bc42b 100644 --- a/src/Storages/MaterializedView/RefreshSet.cpp +++ b/src/Storages/MaterializedView/RefreshSet.cpp @@ -3,15 +3,16 @@ namespace CurrentMetrics { - extern const Metric RefreshingViews; + extern const Metric RefreshableViews; } namespace DB { -RefreshSetElement::RefreshSetElement(StorageID id, RefreshTaskHolder task) +RefreshSetElement::RefreshSetElement(StorageID id, std::vector deps, RefreshTaskHolder task) : corresponding_task(task) , view_id(std::move(id)) + , dependencies(std::move(deps)) {} RefreshInfo RefreshSetElement::getInfo() const @@ -36,40 +37,21 @@ RefreshInfo RefreshSetElement::getInfo() const }; } -const StorageID & RefreshSetElement::getID() const -{ - return view_id; -} - RefreshTaskHolder RefreshSetElement::getTask() const { return corresponding_task.lock(); } -bool RefreshSetLess::operator()(const RefreshSetElement & l, const RefreshSetElement & r) const +const StorageID & RefreshSetElement::getID() const { - return l.getID().uuid < r.getID().uuid; + return view_id; } -bool RefreshSetLess::operator()(const StorageID & l, const RefreshSetElement & r) const +const std::vector & RefreshSetElement::getDependencies() const { - return l.uuid < r.getID().uuid; + return dependencies; } -bool RefreshSetLess::operator()(const RefreshSetElement & l, const StorageID & r) const -{ - return l.getID().uuid < r.uuid; -} - -bool RefreshSetLess::operator()(const StorageID & l, const StorageID & r) const -{ - return l.uuid < r.uuid; -} - -RefreshSet::Entry::Entry() - : parent_set{nullptr} -{} - RefreshSet::Entry::Entry(Entry && other) noexcept : parent_set{std::exchange(other.parent_set, nullptr)} , iter(std::move(other.iter)) @@ -80,7 +62,8 @@ RefreshSet::Entry & RefreshSet::Entry::operator=(Entry && other) noexcept { if (this == &other) return *this; - cleanup(std::exchange(parent_set, std::exchange(other.parent_set, nullptr))); + reset(); + parent_set = std::exchange(other.parent_set, nullptr); iter = std::move(other.iter); metric_increment = std::move(other.metric_increment); return *this; @@ -88,34 +71,51 @@ RefreshSet::Entry & RefreshSet::Entry::operator=(Entry && other) noexcept RefreshSet::Entry::~Entry() { - cleanup(parent_set); + reset(); } -RefreshSet::Entry::Entry(RefreshSet & set, ContainerIter it, const CurrentMetrics::Metric & metric) - : parent_set{&set}, iter(std::move(it)), metric_increment(metric) +RefreshSet::Entry::Entry(RefreshSet & set, ElementMapIter it) + : parent_set{&set}, iter(std::move(it)), metric_increment(CurrentMetrics::RefreshableViews) {} -void RefreshSet::Entry::cleanup(RefreshSet * set) +void RefreshSet::Entry::reset() { - if (set) - set->erase(iter); + if (!parent_set) + return; + std::exchange(parent_set, nullptr)->erase(iter); + metric_increment.reset(); } -RefreshSet::RefreshSet() - : set_metric(CurrentMetrics::RefreshingViews) -{} +RefreshSet::RefreshSet() {} + +RefreshSet::Entry RefreshSet::emplace(StorageID id, std::vector dependencies, RefreshTaskHolder task) +{ + std::lock_guard guard(mutex); + auto [it, is_inserted] = elements.emplace(std::piecewise_construct, std::forward_as_tuple(id), std::forward_as_tuple(id, dependencies, std::move(task))); + if (!is_inserted) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Refresh set entry already exists for table {}", id.getFullTableName()); + + for (const StorageID & dep : dependencies) + { + auto [unused, dep_inserted] = dependents[dep].insert(id); + if (!dep_inserted) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Refresh set entry already contains dependency of {} on {}", id.getFullTableName(), dep.getFullTableName()); + } + + return Entry(*this, std::move(it)); +} RefreshTaskHolder RefreshSet::getTask(const StorageID & id) const { - std::lock_guard lock(elements_mutex); - if (auto element = elements.find(id.uuid); element != elements.end()) + std::lock_guard lock(mutex); + if (auto element = elements.find(id); element != elements.end()) return element->second.getTask(); return nullptr; } RefreshSet::InfoContainer RefreshSet::getInfo() const { - std::lock_guard lock(elements_mutex); + std::lock_guard lock(mutex); InfoContainer res; res.reserve(elements.size()); for (auto && element : elements) @@ -123,9 +123,29 @@ RefreshSet::InfoContainer RefreshSet::getInfo() const return res; } -void RefreshSet::erase(ContainerIter it) +std::vector RefreshSet::getDependents(const StorageID & id) const { - std::lock_guard lock(elements_mutex); + std::lock_guard lock(mutex); + std::vector res; + auto it = dependents.find(id); + if (it == dependents.end()) + return {}; + for (auto & dep_id : it->second) + if (auto element = elements.find(dep_id); element != elements.end()) + res.push_back(element->second.getTask()); + return res; +} + +void RefreshSet::erase(ElementMapIter it) +{ + std::lock_guard lock(mutex); + for (const StorageID & dep : it->second.getDependencies()) + { + auto & set = dependents[dep]; + set.erase(it->second.getID()); + if (set.empty()) + dependents.erase(dep); + } elements.erase(it); } diff --git a/src/Storages/MaterializedView/RefreshSet.h b/src/Storages/MaterializedView/RefreshSet.h index cc5b0006218..b2685d67883 100644 --- a/src/Storages/MaterializedView/RefreshSet.h +++ b/src/Storages/MaterializedView/RefreshSet.h @@ -9,6 +9,8 @@ namespace DB { +using DatabaseAndTableNameSet = std::unordered_set; + struct RefreshInfo { String database; @@ -33,7 +35,7 @@ class RefreshSetElement { friend class RefreshTask; public: - RefreshSetElement(StorageID id, RefreshTaskHolder task); + RefreshSetElement(StorageID id, std::vector deps, RefreshTaskHolder task); RefreshSetElement(const RefreshSetElement &) = delete; RefreshSetElement & operator=(const RefreshSetElement &) = delete; @@ -41,12 +43,13 @@ public: RefreshInfo getInfo() const; RefreshTaskHolder getTask() const; - const StorageID & getID() const; + const std::vector & getDependencies() const; private: RefreshTaskObserver corresponding_task; StorageID view_id; + std::vector dependencies; std::atomic read_rows{0}; std::atomic read_bytes{0}; @@ -63,73 +66,62 @@ private: std::atomic last_result{0}; }; -struct RefreshSetLess -{ - using is_transparent = std::true_type; - - bool operator()(const RefreshSetElement & l, const RefreshSetElement & r) const; - bool operator()(const StorageID & l, const RefreshSetElement & r) const; - bool operator()(const RefreshSetElement & l, const StorageID & r) const; - bool operator()(const StorageID & l, const StorageID & r) const; -}; - /// Set of refreshable views class RefreshSet { private: - using Container = std::map; - using ContainerIter = typename Container::iterator; + using ElementMap = std::unordered_map; + using ElementMapIter = typename ElementMap::iterator; public: class Entry { friend class RefreshSet; public: - Entry(); + Entry() = default; Entry(Entry &&) noexcept; Entry & operator=(Entry &&) noexcept; ~Entry(); - RefreshSetElement * operator->() { return &iter->second; } + explicit operator bool() const { return parent_set != nullptr; } + RefreshSetElement * operator->() { chassert(parent_set); return &iter->second; } + + void reset(); private: - RefreshSet * parent_set; - ContainerIter iter; + RefreshSet * parent_set = nullptr; + ElementMapIter iter; std::optional metric_increment; - Entry( - RefreshSet & set, - ContainerIter it, - const CurrentMetrics::Metric & metric); - - void cleanup(RefreshSet * set); + Entry(RefreshSet & set, ElementMapIter it); }; using InfoContainer = std::vector; RefreshSet(); - std::optional emplace(StorageID id, RefreshTaskHolder task) - { - std::lock_guard guard(elements_mutex); - auto [it, is_inserted] = elements.emplace(std::piecewise_construct, std::forward_as_tuple(id.uuid), std::forward_as_tuple(id, std::move(task))); - if (is_inserted) - return Entry(*this, std::move(it), set_metric); - return {}; - } + Entry emplace(StorageID id, std::vector dependencies, RefreshTaskHolder task); RefreshTaskHolder getTask(const StorageID & id) const; InfoContainer getInfo() const; -private: - mutable std::mutex elements_mutex; - Container elements; - CurrentMetrics::Metric set_metric; + /// Get tasks that depend on the given one. + std::vector getDependents(const StorageID & id) const; - void erase(ContainerIter it); +private: + using DependentsMap = std::unordered_map; + + /// Protects the two maps below, not locked for any nontrivial operations (e.g. operations that + /// block or lock other mutexes). + mutable std::mutex mutex; + + ElementMap elements; + DependentsMap dependents; + + void erase(ElementMapIter it); }; using RefreshSetEntry = RefreshSet::Entry; diff --git a/src/Storages/MaterializedView/RefreshTask.cpp b/src/Storages/MaterializedView/RefreshTask.cpp index 0a85f533a27..000ee7aa1bd 100644 --- a/src/Storages/MaterializedView/RefreshTask.cpp +++ b/src/Storages/MaterializedView/RefreshTask.cpp @@ -2,20 +2,21 @@ #include +#include #include #include #include #include #include +namespace CurrentMetrics +{ + extern const Metric RefreshingViews; +} + namespace DB { -namespace ErrorCodes -{ - extern const int BAD_ARGUMENTS; -} - namespace { @@ -27,26 +28,12 @@ std::uniform_int_distribution makeSpreadDistribution(const ASTTimePeriod return std::uniform_int_distribution(-limit, limit); } -std::variant makeRefreshTimer(const ASTRefreshStrategy & strategy) -{ - using enum ASTRefreshStrategy::ScheduleKind; - switch (strategy.schedule_kind) - { - case EVERY: - return RefreshEveryTimer{*strategy.period, strategy.interval}; - case AFTER: - return RefreshAfterTimer{strategy.interval}; - default: - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown refresh strategy kind"); - } -} - } RefreshTask::RefreshTask( const ASTRefreshStrategy & strategy) : log(&Poco::Logger::get("RefreshTask")) - , refresh_timer(makeRefreshTimer(strategy)) + , refresh_timer(strategy) , refresh_spread{makeSpreadDistribution(strategy.spread)} {} @@ -63,28 +50,13 @@ RefreshTaskHolder RefreshTask::create( if (auto t = self.lock()) t->refreshTask(); }); - task->set_entry = context->getRefreshSet().emplace(view.getStorageID(), task).value(); + + std::vector deps; if (strategy.dependencies) - { - if (strategy.schedule_kind != ASTRefreshStrategy::ScheduleKind::AFTER) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Dependencies are allowed only for AFTER refresh kind"); - - task->deps_entries.reserve(strategy.dependencies->children.size()); for (auto && dependency : strategy.dependencies->children) - { - StorageID dep_id(dependency->as()); - /// TODO: - /// * This depends on the order in which different tables are initialized. - /// Is the order guaranteed on startup? - /// * At what point does the table name from the query get mapped to the table's UUID? - /// Does it work at all? Is it reliable? - /// * Don't silently ignore if the table is missing. - if (auto dep_task = context->getRefreshSet().getTask(dep_id)) - task->deps_entries.push_back(dep_task->dependencies.add(task)); - } + deps.emplace_back(dependency->as()); - /// TODO: Initialize combiner. - } + task->set_entry = context->getRefreshSet().emplace(view.getStorageID(), deps, task); return task; } @@ -93,6 +65,7 @@ void RefreshTask::initializeAndStart(std::shared_ptr vi { view_to_refresh = view; /// TODO: Add a setting to stop views on startup, set `stop_requested = true` in that case. + populateDependencies(); calculateNextRefreshTime(std::chrono::system_clock::now()); refresh_task->schedule(); } @@ -148,14 +121,78 @@ void RefreshTask::resume() refresh_task->schedule(); } -void RefreshTask::notify(const StorageID & parent_id) +void RefreshTask::shutdown() +{ + { + std::lock_guard guard(mutex); + stop_requested = true; + interrupt_execution.store(true); + } + + /// Wait for the task to return and prevent it from being scheduled in future. + refresh_task->deactivate(); + + /// Remove from RefreshSet on DROP, without waiting for the IStorage to be destroyed. + /// This matters because a table may get dropped and immediately created again with the same name, + /// while the old table's IStorage still exists (pinned by ongoing queries). + std::lock_guard guard(mutex); + set_entry.reset(); +} + +void RefreshTask::notify(const StorageID & parent_id, std::chrono::system_clock::time_point scheduled_time_without_spread, const RefreshTimer & parent_timer) { std::lock_guard guard(mutex); - if (!combiner.arriveParent(parent_id)) + if (!set_entry) + return; // we've shut down + + /// In the general case, it's not clear what the meaning of dependencies should be. + /// E.g. what behavior would the user want/expect in the following cases?: + /// * REFRESH EVERY 3 HOUR depends on REFRESH EVERY 2 HOUR + /// * REFRESH AFTER 3 HOUR depends on REFRESH AFTER 2 HOUR + /// * REFRESH AFTER 3 HOUR depends on REFRESH EVERY 1 DAY + /// I don't know. + /// + /// Cases that are important to support well include: + /// (1) REFRESH EVERY 1 DAY depends on REFRESH EVERY 1 DAY + /// Here the second refresh should start only after the first refresh completed *for the same day*. + /// Yesterday's refresh of the dependency shouldn't trigger today's refresh of the dependent, + /// even if it completed today. + /// (2) REFRESH EVERY 1 DAY OFFSET 2 HOUR depends on REFRESH EVERY 1 DAY OFFSET 1 HOUR + /// (3) REFRESH EVERY 1 DAY OFFSET 1 HOUR depends on REFRESH EVERY 1 DAY OFFSET 23 HOUR + /// Here the dependency's refresh on day X should trigger dependent's refresh on day X+1. + /// (4) REFRESH EVERY 2 HOUR depends on REFRESH EVERY 1 HOUR + /// The 2 HOUR refresh should happen after the 1 HOUR refresh for every other hour, e.g. + /// after the 2pm refresh, then after the 4pm refresh, etc. + /// (5) REFRESH AFTER 1 HOUR depends on REFRESH AFTER 1 HOUR + /// Here the two views should try to synchronize their schedules instead of arbitrarily drifting + /// apart. In particular, consider the case where the dependency refreshes slightly faster than + /// the dependent. If we don't do anything special, the DEPENDS ON will have pretty much no effect. + /// To apply some synchronization pressure, we reduce the dependent's delay by some percentage + /// after the dependent completed. + /// (6) REFRESH AFTER 1 HOUR depends on REFRESH AFTER 2 HOUR + /// REFRESH EVERY 1 HOUR depends on REFRESH EVERY 2 HOUR + /// Not sure about these. Currently we just make the dependent refresh at the same rate as + /// the dependency, i.e. the 1 HOUR table will actually be refreshed every 2 hours. + + /// Only accept the dependency's refresh if its next refresh time is after ours. + /// This takes care of cases (1)-(4), and seems harmless in all other cases. + /// Might be mildly helpful in weird cases like REFRESH AFTER 3 HOUR depends on REFRESH AFTER 2 HOUR. + if (parent_timer.next(scheduled_time_without_spread) <= next_refresh_without_spread) return; - if (std::exchange(refresh_immediately, true)) - return; - refresh_task->schedule(); + + if (arriveDependency(parent_id) && !std::exchange(refresh_immediately, true)) + refresh_task->schedule(); + + /// Decrease delay in case (5). + /// Maybe we should do it for all AFTER-AFTER dependencies, even if periods are different. + if (refresh_timer == parent_timer && refresh_timer.tryGetAfter()) + { + /// TODO: Implement this: + /// * Add setting max_after_delay_adjustment_pct + /// * Decrease both next_refresh_without_spread and next_refresh_with_spread, + /// but only if they haven't already been decreased this way during current period + /// * refresh_task->schedule() + } } void RefreshTask::refreshTask() @@ -205,7 +242,7 @@ void RefreshTask::refreshTask() auto now = std::chrono::system_clock::now(); if (now >= next_refresh_with_spread) { - if (combiner.arriveTime()) + if (arriveTime()) refresh_immediately = true; else { @@ -239,6 +276,9 @@ void RefreshTask::refreshTask() reportState(RefreshState::Running); + CurrentMetrics::Increment metric_inc(CurrentMetrics::RefreshingViews); + auto scheduled_time_without_spread = next_refresh_without_spread; + lock.unlock(); bool finished = false; @@ -251,7 +291,7 @@ void RefreshTask::refreshTask() finished = executeRefresh(); if (finished) - completeRefresh(view, LastTaskResult::Finished); + completeRefresh(view, LastTaskResult::Finished, scheduled_time_without_spread); } catch (...) { @@ -311,12 +351,17 @@ bool RefreshTask::executeRefresh() return !not_finished; } -void RefreshTask::completeRefresh(std::shared_ptr view, LastTaskResult result) +void RefreshTask::completeRefresh(std::shared_ptr view, LastTaskResult result, std::chrono::system_clock::time_point scheduled_time_without_spread) { auto stale_table = view->exchangeTargetTable(refresh_query->table_id); - dependencies.notifyAll(view->getStorageID()); - auto drop_context = Context::createCopy(view->getContext()); + auto context = view->getContext(); + StorageID my_id = set_entry->getID(); + auto dependents = context->getRefreshSet().getDependents(my_id); + for (const RefreshTaskHolder & dep_task : dependents) + dep_task->notify(my_id, scheduled_time_without_spread, refresh_timer); + + auto drop_context = Context::createCopy(context); InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, drop_context, drop_context, stale_table, /*sync=*/true); cleanState(); @@ -351,16 +396,6 @@ void RefreshTask::cleanState() refresh_query.reset(); } -namespace -{ - -template -struct CombinedVisitor : Ts... { using Ts::operator()...; }; -template -CombinedVisitor(Ts...) -> CombinedVisitor; - -} - void RefreshTask::calculateNextRefreshTime(std::chrono::system_clock::time_point now) { /// TODO: Add a setting to randomize initial delay in case of AFTER, for the case when the server @@ -368,21 +403,11 @@ void RefreshTask::calculateNextRefreshTime(std::chrono::system_clock::time_point /// TODO: Maybe do something like skip_update_after_seconds and skip_update_after_ratio. /// Unclear if that's useful at all if the last refresh timestamp is not remembered across restarts. - auto advance = [&](std::chrono::system_clock::time_point t) - { - CombinedVisitor refresh_time_visitor{ - [t](const RefreshAfterTimer & timer) { return timer.after(t); }, - [t](const RefreshEveryTimer & timer) { return timer.next(t); }}; - auto r = std::visit(std::move(refresh_time_visitor), refresh_timer); - chassert(r > t); - return r; - }; - /// It's important to use time without spread here, otherwise we would do multiple refreshes instead /// of one, if the generated spread is negative and the first refresh completes faster than the spread. - std::chrono::sys_seconds next = advance(next_refresh_without_spread); + std::chrono::sys_seconds next = refresh_timer.next(next_refresh_without_spread); if (next < now) - next = advance(now); // fell behind, skip to current time + next = refresh_timer.next(now); // fell behind, skip to current time next_refresh_without_spread = next; next_refresh_with_spread = next + std::chrono::seconds{refresh_spread(thread_local_rng)}; @@ -390,6 +415,32 @@ void RefreshTask::calculateNextRefreshTime(std::chrono::system_clock::time_point reportNextRefreshTime(next_refresh_with_spread); } +bool RefreshTask::arriveDependency(const StorageID & parent_table_or_timer) +{ + remaining_dependencies.erase(parent_table_or_timer); + if (!remaining_dependencies.empty() || !time_arrived) + return false; + populateDependencies(); + return true; +} + +bool RefreshTask::arriveTime() +{ + time_arrived = true; + if (!remaining_dependencies.empty() || !time_arrived) + return false; + populateDependencies(); + return true; +} + +void RefreshTask::populateDependencies() +{ + chassert(remaining_dependencies.empty()); + auto deps = set_entry->getDependencies(); + remaining_dependencies.insert(deps.begin(), deps.end()); + time_arrived = false; +} + std::shared_ptr RefreshTask::lockView() { return std::static_pointer_cast(view_to_refresh.lock()); diff --git a/src/Storages/MaterializedView/RefreshTask.h b/src/Storages/MaterializedView/RefreshTask.h index 192a4776be0..cdb0d22342e 100644 --- a/src/Storages/MaterializedView/RefreshTask.h +++ b/src/Storages/MaterializedView/RefreshTask.h @@ -1,7 +1,5 @@ #pragma once -#include -#include #include #include #include @@ -70,8 +68,11 @@ public: /// Resume task execution void resume(); + /// Permanently disable task scheduling and remove this table from RefreshSet. + void shutdown(); + /// Notify dependent task - void notify(const StorageID & parent_id); + void notify(const StorageID & parent_id, std::chrono::system_clock::time_point scheduled_time_without_spread, const RefreshTimer & parent_timer); private: Poco::Logger * log = nullptr; @@ -79,7 +80,7 @@ private: RefreshSet::Entry set_entry; /// Refresh schedule - std::variant refresh_timer; + RefreshTimer refresh_timer; std::uniform_int_distribution refresh_spread; /// Task execution. Non-empty iff a refresh is in progress (possibly paused). @@ -88,10 +89,9 @@ private: std::optional refresh_block; std::shared_ptr refresh_query; - /// Concurrent dependency management - RefreshAllCombiner combiner; - RefreshDependencies dependencies; - std::vector deps_entries; + /// StorageIDs of our dependencies that we're waiting for. + DatabaseAndTableNameSet remaining_dependencies; + bool time_arrived = false; /// Protects all fields below (they're accessed by both refreshTask() and public methods). /// Never locked for blocking operations (e.g. creating or dropping the internal table). @@ -132,13 +132,18 @@ private: /// Methods that do the actual work: creating/dropping internal table, executing the query. void initializeRefresh(std::shared_ptr view); bool executeRefresh(); - void completeRefresh(std::shared_ptr view, LastTaskResult result); + void completeRefresh(std::shared_ptr view, LastTaskResult result, std::chrono::system_clock::time_point scheduled_time_without_spread); void cancelRefresh(LastTaskResult result); void cleanState(); /// Assigns next_refresh_* void calculateNextRefreshTime(std::chrono::system_clock::time_point now); + /// Returns true if all dependencies are fulfilled now. Refills remaining_dependencies in this case. + bool arriveDependency(const StorageID & parent_table_or_timer); + bool arriveTime(); + void populateDependencies(); + std::shared_ptr lockView(); /// Methods that push information to RefreshSet, for observability. diff --git a/src/Storages/MaterializedView/RefreshTimers.cpp b/src/Storages/MaterializedView/RefreshTimers.cpp index ebef561fc29..0331bad82c3 100644 --- a/src/Storages/MaterializedView/RefreshTimers.cpp +++ b/src/Storages/MaterializedView/RefreshTimers.cpp @@ -1,10 +1,15 @@ #include -#include +#include namespace DB { +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + namespace { constexpr std::chrono::days ZERO_DAYS{0}; @@ -68,6 +73,14 @@ void RefreshAfterTimer::setWithKind(IntervalKind kind, UInt64 val) } } +bool RefreshAfterTimer::operator==(const RefreshAfterTimer & rhs) const +{ + /// (Or maybe different implementations of standard library have different sizes of chrono types. + /// If so, feel free to just remove this assert.) + static_assert(sizeof(*this) == 40, "RefreshAfterTimer fields appear to have changed. Please update this operator==() here."); + return std::tie(seconds, minutes, hours, days, weeks, months, years) == std::tie(rhs.seconds, rhs.minutes, rhs.hours, rhs.days, rhs.weeks, rhs.months, rhs.years); +} + RefreshEveryTimer::RefreshEveryTimer(const ASTTimePeriod & time_period, const ASTTimeInterval * time_offset) : offset(time_offset) , value{static_cast(time_period.value)} @@ -240,4 +253,50 @@ std::chrono::sys_seconds RefreshEveryTimer::alignedToSeconds(std::chrono::system return tp_minutes + next_seconds; } +bool RefreshEveryTimer::operator==(const RefreshEveryTimer & rhs) const +{ + static_assert(sizeof(*this) == sizeof(offset) + 8, "RefreshEveryTimer fields appear to have changed. Please update this operator==() here."); + return std::tie(offset, value, kind) == std::tie(rhs.offset, rhs.value, rhs.kind); +} + +std::variant makeTimer(const ASTRefreshStrategy & strategy) +{ + using enum ASTRefreshStrategy::ScheduleKind; + switch (strategy.schedule_kind) + { + case EVERY: + return RefreshEveryTimer{*strategy.period, strategy.interval}; + case AFTER: + return RefreshAfterTimer{strategy.interval}; + default: + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown refresh strategy kind"); + } +} + +RefreshTimer::RefreshTimer(const ASTRefreshStrategy & strategy) : timer(makeTimer(strategy)) {} + +namespace +{ + +template +struct CombinedVisitor : Ts... { using Ts::operator()...; }; +template +CombinedVisitor(Ts...) -> CombinedVisitor; + +} + +std::chrono::sys_seconds RefreshTimer::next(std::chrono::system_clock::time_point tp) const +{ + CombinedVisitor visitor{ + [tp](const RefreshAfterTimer & timer_) { return timer_.after(tp); }, + [tp](const RefreshEveryTimer & timer_) { return timer_.next(tp); }}; + auto r = std::visit(std::move(visitor), timer); + chassert(r > tp); + return r; +} + +bool RefreshTimer::operator==(const RefreshTimer & rhs) const { return timer == rhs.timer; } +const RefreshAfterTimer * RefreshTimer::tryGetAfter() const { return std::get_if(&timer); } +const RefreshEveryTimer * RefreshTimer::tryGetEvery() const { return std::get_if(&timer); } + } diff --git a/src/Storages/MaterializedView/RefreshTimers.h b/src/Storages/MaterializedView/RefreshTimers.h index 0672782a3f9..4625e8cd344 100644 --- a/src/Storages/MaterializedView/RefreshTimers.h +++ b/src/Storages/MaterializedView/RefreshTimers.h @@ -9,6 +9,7 @@ namespace DB class ASTTimeInterval; class ASTTimePeriod; +class ASTRefreshStrategy; /// Schedule timer for MATERIALIZED VIEW ... REFRESH AFTER ... queries class RefreshAfterTimer @@ -26,6 +27,8 @@ public: std::chrono::months getMonths() const { return months; } std::chrono::years getYears() const { return years; } + bool operator==(const RefreshAfterTimer & rhs) const; + private: void setWithKind(IntervalKind kind, UInt64 val); @@ -46,6 +49,8 @@ public: std::chrono::sys_seconds next(std::chrono::system_clock::time_point tp) const; + bool operator==(const RefreshEveryTimer & rhs) const; + private: std::chrono::sys_seconds alignedToYears(std::chrono::system_clock::time_point tp) const; @@ -66,4 +71,18 @@ private: IntervalKind kind{IntervalKind::Second}; }; +struct RefreshTimer +{ + std::variant timer; + + explicit RefreshTimer(const ASTRefreshStrategy & strategy); + + std::chrono::sys_seconds next(std::chrono::system_clock::time_point tp) const; + + bool operator==(const RefreshTimer & rhs) const; + + const RefreshAfterTimer * tryGetAfter() const; + const RefreshEveryTimer * tryGetEvery() const; +}; + } diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 4f2ffb38017..6504bfa313b 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -451,7 +451,7 @@ void StorageMaterializedView::startup() void StorageMaterializedView::shutdown(bool) { if (refresher) - refresher->stop(); + refresher->shutdown(); auto metadata_snapshot = getInMemoryMetadataPtr(); const auto & select_query = metadata_snapshot->getSelectQuery();