refreshable materialized views

This commit is contained in:
koloshmet 2023-02-12 21:17:55 +02:00 committed by Michael Kolupaev
parent a9ac8dfb74
commit c52aa984ee
42 changed files with 2065 additions and 15 deletions

View File

@ -177,6 +177,7 @@ enum class AccessType
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \
M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, START VIEWS, STOP VIEWS, CANCEL VIEW, PAUSE VIEW, RESUME VIEW", VIEW, SYSTEM) \
M(SYSTEM_DISTRIBUTED_SENDS, "SYSTEM STOP DISTRIBUTED SENDS, SYSTEM START DISTRIBUTED SENDS, STOP DISTRIBUTED SENDS, START DISTRIBUTED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_REPLICATED_SENDS, "SYSTEM STOP REPLICATED SENDS, SYSTEM START REPLICATED SENDS, STOP REPLICATED SENDS, START REPLICATED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_SENDS, "SYSTEM STOP SENDS, SYSTEM START SENDS, STOP SENDS, START SENDS", GROUP, SYSTEM) \

View File

@ -226,6 +226,7 @@ add_object_library(clickhouse_storages_statistics Storages/Statistics)
add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_storages_windowview Storages/WindowView)
add_object_library(clickhouse_storages_s3queue Storages/S3Queue)
add_object_library(clickhouse_storages_materializedview Storages/MaterializedView)
add_object_library(clickhouse_client Client)
add_object_library(clickhouse_bridge BridgeHelper)
add_object_library(clickhouse_server Server)

View File

@ -253,6 +253,7 @@
M(MergeTreeAllRangesAnnouncementsSent, "The current number of announcement being sent in flight from the remote server to the initiator server about the set of data parts (for MergeTree tables). Measured on the remote server side.") \
M(CreatedTimersInQueryProfiler, "Number of Created thread local timers in QueryProfiler") \
M(ActiveTimersInQueryProfiler, "Number of Active thread local timers in QueryProfiler") \
M(Refresh, "Number of active refreshes") \
#ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)

View File

@ -71,6 +71,8 @@ struct IntervalKind
/// Returns false if the conversion did not succeed.
/// For example, `IntervalKind::tryParseString('second', result)` returns `result` equals `IntervalKind::Kind::Second`.
static bool tryParseString(const std::string & kind, IntervalKind::Kind & result);
auto operator<=>(const IntervalKind & other) const { return kind <=> other.kind; }
};
/// NOLINTNEXTLINE

View File

@ -18,6 +18,7 @@ namespace ActionLocks
extern const StorageActionBlockType PartsMove = 7;
extern const StorageActionBlockType PullReplicationLog = 8;
extern const StorageActionBlockType Cleanup = 9;
extern const StorageActionBlockType ViewRefresh = 8;
}

View File

@ -95,6 +95,7 @@
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Storages/MaterializedView/RefreshSet.h>
#include <Interpreters/SynonymsExtensions.h>
#include <Interpreters/Lemmatizers.h>
#include <Interpreters/ClusterDiscovery.h>
@ -289,6 +290,7 @@ struct ContextSharedPart : boost::noncopyable
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
ReplicatedFetchList replicated_fetch_list;
RefreshSet refresh_set; /// The list of active refreshes (for MaterializedView)
ConfigurationPtr users_config TSA_GUARDED_BY(mutex); /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
@ -825,6 +827,8 @@ MovesList & Context::getMovesList() { return shared->moves_list; }
const MovesList & Context::getMovesList() const { return shared->moves_list; }
ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; }
const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; }
RefreshSet & Context::getRefreshSet() { return shared->refresh_set; }
const RefreshSet & Context::getRefreshSet() const { return shared->refresh_set; }
String Context::resolveDatabase(const String & database_name) const
{

View File

@ -74,6 +74,7 @@ class BackgroundSchedulePool;
class MergeList;
class MovesList;
class ReplicatedFetchList;
class RefreshSet;
class Cluster;
class Compiler;
class MarkCache;
@ -922,6 +923,9 @@ public:
ReplicatedFetchList & getReplicatedFetchList();
const ReplicatedFetchList & getReplicatedFetchList() const;
RefreshSet & getRefreshSet();
const RefreshSet & getRefreshSet() const;
/// If the current session is expired at the time of the call, synchronously creates and returns a new session with the startNewSession() call.
/// If no ZooKeeper configured, throws an exception.
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;

View File

@ -54,6 +54,7 @@
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Storages/StorageAzureBlob.h>
#include <Storages/MaterializedView/RefreshTask.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Storages/System/StorageSystemFilesystemCache.h>
#include <Parsers/ASTSystemQuery.h>
@ -108,6 +109,7 @@ namespace ActionLocks
extern const StorageActionBlockType PartsMove;
extern const StorageActionBlockType PullReplicationLog;
extern const StorageActionBlockType Cleanup;
extern const StorageActionBlockType ViewRefresh;
}
@ -165,6 +167,8 @@ AccessType getRequiredAccessType(StorageActionBlockType action_type)
return AccessType::SYSTEM_PULLING_REPLICATION_LOG;
else if (action_type == ActionLocks::Cleanup)
return AccessType::SYSTEM_CLEANUP;
else if (action_type == ActionLocks::ViewRefresh)
return AccessType::SYSTEM_VIEWS;
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown action type: {}", std::to_string(action_type));
}
@ -605,6 +609,30 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_CLEANUP:
startStopAction(ActionLocks::Cleanup, true);
break;
case Type::START_VIEWS:
startStopAction(ActionLocks::ViewRefresh, true);
break;
case Type::STOP_VIEWS:
startStopAction(ActionLocks::ViewRefresh, false);
break;
case Type::START_VIEW:
startStopAction(ActionLocks::ViewRefresh, true);
break;
case Type::STOP_VIEW:
startStopAction(ActionLocks::ViewRefresh, false);
break;
case Type::REFRESH_VIEW:
getRefreshTask()->run();
break;
case Type::CANCEL_VIEW:
getRefreshTask()->cancel();
break;
case Type::PAUSE_VIEW:
getRefreshTask()->pause();
break;
case Type::RESUME_VIEW:
getRefreshTask()->resume();
break;
case Type::DROP_REPLICA:
dropReplica(query);
break;
@ -1092,6 +1120,17 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "SYSTEM RESTART DISK is not supported");
}
RefreshTaskHolder InterpreterSystemQuery::getRefreshTask()
{
auto ctx = getContext();
ctx->checkAccess(AccessType::SYSTEM_VIEWS);
auto task = ctx->getRefreshSet().getTask(table_id);
if (!task)
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Refreshable view {} doesn't exist", table_id.getNameForLogs());
return task;
}
AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() const
{
@ -1241,6 +1280,21 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_REPLICATION_QUEUES, query.getDatabase(), query.getTable());
break;
}
case Type::REFRESH_VIEW:
case Type::START_VIEW:
case Type::START_VIEWS:
case Type::STOP_VIEW:
case Type::STOP_VIEWS:
case Type::CANCEL_VIEW:
case Type::PAUSE_VIEW:
case Type::RESUME_VIEW:
{
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_VIEWS);
else
required_access.emplace_back(AccessType::SYSTEM_VIEWS, query.getDatabase(), query.getTable());
break;
}
case Type::DROP_REPLICA:
case Type::DROP_DATABASE_REPLICA:
{

View File

@ -3,6 +3,7 @@
#include <Interpreters/IInterpreter.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/MaterializedView/RefreshTask_fwd.h>
#include <Interpreters/StorageID.h>
#include <Common/ActionLock.h>
#include <Disks/IVolume.h>
@ -72,6 +73,8 @@ private:
void flushDistributed(ASTSystemQuery & query);
[[noreturn]] void restartDisk(String & name);
RefreshTaskHolder getRefreshTask();
AccessRightsElements getRequiredAccessForDDLOnCluster() const;
void startStopAction(StorageActionBlockType action_type, bool start);
};

View File

@ -2,7 +2,6 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Common/quoteString.h>
#include <Interpreters/StorageID.h>
#include <IO/Operators.h>
@ -340,6 +339,12 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
formatOnCluster(settings);
}
if (refresh_strategy)
{
settings.ostr << settings.nl_or_ws;
refresh_strategy->formatImpl(settings, state, frame);
}
if (to_table_id)
{
assert((is_materialized_view || is_window_view) && to_inner_uuid == UUIDHelpers::Nil);

View File

@ -5,6 +5,7 @@
#include <Parsers/ASTDictionary.h>
#include <Parsers/ASTDictionaryAttributeDeclaration.h>
#include <Parsers/ASTTableOverrides.h>
#include <Parsers/ASTRefreshStrategy.h>
#include <Interpreters/StorageID.h>
namespace DB
@ -116,6 +117,7 @@ public:
ASTExpressionList * dictionary_attributes_list = nullptr; /// attributes of
ASTDictionary * dictionary = nullptr; /// dictionary definition (layout, primary key, etc.)
ASTRefreshStrategy * refresh_strategy = nullptr; // For CREATE MATERIALIZED VIEW ... REFRESH ...
std::optional<UInt64> live_view_periodic_refresh; /// For CREATE LIVE VIEW ... WITH [PERIODIC] REFRESH ...
bool is_watermark_strictly_ascending{false}; /// STRICTLY ASCENDING WATERMARK STRATEGY FOR WINDOW VIEW

View File

@ -0,0 +1,74 @@
#include <Parsers/ASTRefreshStrategy.h>
#include <IO/Operators.h>
namespace DB
{
ASTPtr ASTRefreshStrategy::clone() const
{
auto res = std::make_shared<ASTRefreshStrategy>(*this);
res->children.clear();
if (interval)
res->set(res->interval, interval->clone());
if (period)
res->set(res->period, period->clone());
if (periodic_offset)
res->set(res->periodic_offset, periodic_offset->clone());
if (spread)
res->set(res->spread, spread->clone());
if (settings)
res->set(res->settings, settings->clone());
if (dependencies)
res->set(res->dependencies, dependencies->clone());
res->interval = interval;
res->spread = spread;
res->schedule_kind = schedule_kind;
return res;
}
void ASTRefreshStrategy::formatImpl(
const IAST::FormatSettings & f_settings, IAST::FormatState & state, IAST::FormatStateStacked frame) const
{
frame.need_parens = false;
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << "REFRESH ";
using enum ScheduleKind;
switch (schedule_kind)
{
case AFTER:
f_settings.ostr << "AFTER ";
interval->formatImpl(f_settings, state, frame);
break;
case EVERY:
f_settings.ostr << "EVERY ";
period->formatImpl(f_settings, state, frame);
if (periodic_offset)
{
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " OFFSET ";
periodic_offset->formatImpl(f_settings, state, frame);
}
break;
default:
break;
}
if (spread)
{
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " RANDOMIZE FOR ";
spread->formatImpl(f_settings, state, frame);
}
if (dependencies)
{
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " DEPENDS ON ";
dependencies->formatImpl(f_settings, state, frame);
}
if (settings)
{
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << " SETTINGS ";
settings->formatImpl(f_settings, state, frame);
}
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTTimeInterval.h>
namespace DB
{
/// Strategy for MATERIALIZED VIEW ... REFRESH ..
class ASTRefreshStrategy : public IAST
{
public:
enum class ScheduleKind : UInt8
{
UNKNOWN = 0,
AFTER,
EVERY
};
ASTSetQuery * settings = nullptr;
ASTExpressionList * dependencies = nullptr;
ASTTimeInterval * interval = nullptr;
ASTTimePeriod * period = nullptr;
ASTTimeInterval * periodic_offset = nullptr;
ASTTimePeriod * spread = nullptr;
ScheduleKind schedule_kind{ScheduleKind::UNKNOWN};
String getID(char) const override { return "Refresh strategy definition"; }
ASTPtr clone() const override;
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override;
};
}

View File

@ -90,6 +90,14 @@ public:
STOP_CLEANUP,
START_CLEANUP,
RESET_COVERAGE,
REFRESH_VIEW,
START_VIEW,
START_VIEWS,
STOP_VIEW,
STOP_VIEWS,
CANCEL_VIEW,
PAUSE_VIEW,
RESUME_VIEW,
END
};

View File

@ -0,0 +1,40 @@
#include <Parsers/ASTTimeInterval.h>
#include <IO/Operators.h>
#include <ranges>
namespace DB
{
ASTPtr ASTTimePeriod::clone() const
{
return std::make_shared<ASTTimePeriod>(*this);
}
void ASTTimePeriod::formatImpl(const FormatSettings & f_settings, FormatState &, FormatStateStacked frame) const
{
frame.need_parens = false;
f_settings.ostr << (f_settings.hilite ? hilite_none : "") << value << ' ';
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << kind.toKeyword();
}
ASTPtr ASTTimeInterval::clone() const
{
return std::make_shared<ASTTimeInterval>(*this);
}
void ASTTimeInterval::formatImpl(const FormatSettings & f_settings, FormatState &, FormatStateStacked frame) const
{
frame.need_parens = false;
for (bool is_first = true; auto [kind, value] : kinds | std::views::reverse)
{
if (!std::exchange(is_first, false))
f_settings.ostr << ' ';
f_settings.ostr << (f_settings.hilite ? hilite_none : "") << value << ' ';
f_settings.ostr << (f_settings.hilite ? hilite_keyword : "") << kind.toKeyword();
}
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <Parsers/IAST.h>
#include <Common/IntervalKind.h>
#include <map>
namespace DB
{
/// Simple periodic time interval like 10 SECOND
class ASTTimePeriod : public IAST
{
public:
UInt64 value{0};
IntervalKind kind{IntervalKind::Second};
String getID(char) const override { return "TimePeriod"; }
ASTPtr clone() const override;
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
/// Compound time interval like 1 YEAR 3 DAY 15 MINUTE
class ASTTimeInterval : public IAST
{
public:
std::map<IntervalKind, UInt64> kinds;
String getID(char) const override { return "TimeInterval"; }
ASTPtr clone() const override;
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
}

View File

@ -20,6 +20,7 @@
#include <Parsers/ParserProjectionSelectQuery.h>
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/ParserRefreshStrategy.h>
#include <Common/typeid_cast.h>
#include <Parsers/ASTColumnDeclaration.h>
@ -1390,6 +1391,7 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
ASTPtr as_database;
ASTPtr as_table;
ASTPtr select;
ASTPtr refresh_strategy;
String cluster_str;
bool attach = false;
@ -1436,6 +1438,15 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
return false;
}
if (ParserKeyword{"REFRESH"}.ignore(pos, expected))
{
// REFRESH only with materialized views
if (!is_materialized_view)
return false;
if (!ParserRefreshStrategy{}.parse(pos, refresh_strategy, expected))
return false;
}
if (is_materialized_view && ParserKeyword{"TO INNER UUID"}.ignore(pos, expected))
{
ParserStringLiteral literal_p;
@ -1527,6 +1538,8 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
query->set(query->columns_list, columns_list);
query->set(query->storage, storage);
if (refresh_strategy)
query->set(query->refresh_strategy, refresh_strategy);
if (comment)
query->set(query->comment, comment);
@ -1535,7 +1548,6 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
query->set(query->select, select);
return true;
}
bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)

View File

@ -0,0 +1,77 @@
#include <Parsers/ParserRefreshStrategy.h>
#include <Parsers/ASTRefreshStrategy.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/ParserTimeInterval.h>
#include <Parsers/CommonParsers.h>
namespace DB
{
bool ParserRefreshStrategy::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto refresh = std::make_shared<ASTRefreshStrategy>();
if (ParserKeyword{"AFTER"}.ignore(pos, expected))
{
refresh->schedule_kind = ASTRefreshStrategy::ScheduleKind::AFTER;
ASTPtr interval;
if (!ParserTimeInterval{}.parse(pos, interval, expected))
return false;
refresh->set(refresh->interval, interval);
}
else if (ParserKeyword{"EVERY"}.ignore(pos, expected))
{
refresh->schedule_kind = ASTRefreshStrategy::ScheduleKind::EVERY;
ASTPtr period;
ASTPtr periodic_offset;
if (!ParserTimePeriod{}.parse(pos, period, expected))
return false;
if (!ParserTimeInterval{}.parse(pos, periodic_offset, expected))
return false;
refresh->set(refresh->period, period);
refresh->set(refresh->periodic_offset, periodic_offset);
}
if (refresh->schedule_kind == ASTRefreshStrategy::ScheduleKind::UNKNOWN)
return false;
if (ParserKeyword{"RANDOMIZE FOR"}.ignore(pos, expected))
{
ASTPtr spread;
if (!ParserTimePeriod{}.parse(pos, spread, expected))
return false;
refresh->set(refresh->spread, spread);
}
if (ParserKeyword{"DEPENDS ON"}.ignore(pos, expected))
{
ASTPtr dependencies;
auto list_parser = ParserList{
std::make_unique<ParserIdentifier>(),
std::make_unique<ParserToken>(TokenType::Comma),
/* allow_empty= */ false};
if (!list_parser.parse(pos, dependencies, expected))
return false;
refresh->set(refresh->dependencies, dependencies);
}
// Refresh SETTINGS
if (ParserKeyword{"SETTINGS"}.ignore(pos, expected))
{
/// Settings are written like SET query, so parse them with ParserSetQuery
ASTPtr settings;
if (!ParserSetQuery{true}.parse(pos, settings, expected))
return false;
refresh->set(refresh->settings, settings);
}
node = refresh;
return true;
}
}

View File

@ -0,0 +1,16 @@
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
/// Parser for ASTRefreshStrategy
class ParserRefreshStrategy : public IParserBase
{
protected:
const char * getName() const override { return "refresh strategy"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -388,6 +388,20 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
parseDatabaseAndTableAsAST(pos, expected, res->database, res->table);
break;
case Type::REFRESH_VIEW:
case Type::START_VIEW:
case Type::STOP_VIEW:
case Type::CANCEL_VIEW:
case Type::PAUSE_VIEW:
case Type::RESUME_VIEW:
if (!parseDatabaseAndTableAsAST(pos, expected, res->database, res->table))
return false;
break;
case Type::START_VIEWS:
case Type::STOP_VIEWS:
break;
case Type::SUSPEND:
{
if (!parseQueryWithOnCluster(res, pos, expected))

View File

@ -0,0 +1,71 @@
#include <Parsers/ParserTimeInterval.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/parseIntervalKind.h>
#include <Parsers/ASTTimeInterval.h>
#include <Parsers/ASTLiteral.h>
namespace DB
{
namespace
{
struct ValKind
{
UInt64 val;
IntervalKind kind;
bool empty;
};
std::optional<ValKind> parseValKind(IParser::Pos & pos, Expected & expected)
{
ASTPtr value;
IntervalKind kind;
if (!ParserNumber{}.parse(pos, value, expected))
return ValKind{ .empty = true };
if (!parseIntervalKind(pos, expected, kind))
return {};
return ValKind{ value->as<ASTLiteral &>().value.safeGet<UInt64>(), kind, false };
}
}
bool ParserTimePeriod::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto parsed = parseValKind(pos, expected);
if (!parsed || parsed->empty || parsed->val == 0)
return false;
auto time_period = std::make_shared<ASTTimePeriod>();
time_period->value = parsed->val;
time_period->kind = parsed->kind;
node = time_period;
return true;
}
bool ParserTimeInterval::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto time_interval = std::make_shared<ASTTimeInterval>();
auto parsed = parseValKind(pos, expected);
while (parsed && !parsed->empty)
{
if (parsed->val == 0)
return false;
auto [it, inserted] = time_interval->kinds.emplace(parsed->kind, parsed->val);
if (!inserted)
return false;
parsed = parseValKind(pos, expected);
}
if (!parsed || time_interval->kinds.empty())
return false;
node = time_interval;
return true;
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
/// Parser for ASTTimePeriod
class ParserTimePeriod : public IParserBase
{
protected:
const char * getName() const override { return "time period"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/// Parser for ASTTimeInterval
class ParserTimeInterval : public IParserBase
{
protected:
const char * getName() const override { return "time interval"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -0,0 +1,55 @@
#include <Processors/Executors/ManualPipelineExecutor.h>
#include <QueryPipeline/QueryPipeline.h>
#include <QueryPipeline/ReadProgressCallback.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
QueryPipeline & validatePipeline(QueryPipeline & query_pipeline)
{
if (!query_pipeline.completed())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for ManualPipelineExecutor must be completed");
return query_pipeline;
}
}
ManualPipelineExecutor::ManualPipelineExecutor(QueryPipeline & query_pipeline)
: pipeline{&validatePipeline(query_pipeline)}
, executor(pipeline->processors, pipeline->process_list_element)
{
executor.setReadProgressCallback(pipeline->getReadProgressCallback());
}
ManualPipelineExecutor::~ManualPipelineExecutor()
{
try
{
executor.cancel();
}
catch (...)
{
tryLogCurrentException("ManualPipelineExecutor");
}
}
bool ManualPipelineExecutor::executeStep()
{
return executor.executeStep();
}
bool ManualPipelineExecutor::executeStep(std::atomic_bool & yield_flag)
{
return executor.executeStep(&yield_flag);
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Processors/Executors/PipelineExecutor.h>
#include <atomic>
namespace DB
{
class QueryPipeline;
/// Simple executor for step by step execution of completed QueryPipeline
class ManualPipelineExecutor
{
public:
explicit ManualPipelineExecutor(QueryPipeline & query_pipeline);
~ManualPipelineExecutor();
bool executeStep();
bool executeStep(std::atomic_bool & yield_flag);
private:
QueryPipeline * pipeline;
PipelineExecutor executor;
};
}

View File

@ -167,6 +167,7 @@ private:
friend class PushingAsyncPipelineExecutor;
friend class PullingAsyncPipelineExecutor;
friend class CompletedPipelineExecutor;
friend class ManualPipelineExecutor;
friend class QueryPipelineBuilder;
};

View File

@ -0,0 +1,58 @@
#include <Storages/MaterializedView/RefreshAllCombiner.h>
#include <algorithm>
namespace DB
{
RefreshAllCombiner::RefreshAllCombiner()
: time_arrived{false}
{}
RefreshAllCombiner::RefreshAllCombiner(const std::vector<StorageID> & parents)
: time_arrived{false}
{
parents_arrived.reserve(parents.size());
for (auto && parent : parents)
parents_arrived.emplace(parent.uuid, false);
}
bool RefreshAllCombiner::arriveTime()
{
std::lock_guard lock(combiner_mutex);
time_arrived = true;
return allArrivedLocked();
}
bool RefreshAllCombiner::arriveParent(const StorageID & id)
{
std::lock_guard lock(combiner_mutex);
parents_arrived[id.uuid] = true;
return allArrivedLocked();
}
void RefreshAllCombiner::flush()
{
std::lock_guard lock(combiner_mutex);
flushLocked();
}
bool RefreshAllCombiner::allArrivedLocked()
{
auto is_value = [](auto && key_value) { return key_value.second; };
if (time_arrived && std::ranges::all_of(parents_arrived, is_value))
{
flushLocked();
return true;
}
return false;
}
void RefreshAllCombiner::flushLocked()
{
for (auto & [parent, arrived] : parents_arrived)
arrived = false;
time_arrived = false;
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <Interpreters/StorageID.h>
namespace DB
{
/// Concurrent primitive for dependency completeness registration
/// When arrive methods return true, dependant task must be executed (or scheduled)
class RefreshAllCombiner
{
public:
RefreshAllCombiner();
explicit RefreshAllCombiner(const std::vector<StorageID> & parents);
bool arriveTime();
bool arriveParent(const StorageID & id);
void flush();
private:
bool allArrivedLocked();
void flushLocked();
std::mutex combiner_mutex;
std::unordered_map<UUID, bool> parents_arrived;
bool time_arrived;
};
}

View File

@ -0,0 +1,60 @@
#include <Storages/MaterializedView/RefreshDependencies.h>
#include <Storages/MaterializedView/RefreshTask.h>
namespace DB
{
RefreshDependencies::Entry::Entry(RefreshDependencies & deps, ContainerIter it)
: dependencies{&deps}
, entry_it{it}
{}
RefreshDependencies::Entry::Entry(Entry && other) noexcept
: dependencies(std::exchange(other.dependencies, nullptr))
, entry_it(std::move(other.entry_it))
{}
RefreshDependencies::Entry & RefreshDependencies::Entry::operator=(Entry && other) noexcept
{
if (this == &other)
return *this;
cleanup(std::exchange(dependencies, std::exchange(other.dependencies, nullptr)));
entry_it = std::move(other.entry_it);
return *this;
}
RefreshDependencies::Entry::~Entry()
{
cleanup(dependencies);
}
void RefreshDependencies::Entry::cleanup(RefreshDependencies * deps)
{
if (deps)
deps->erase(entry_it);
}
RefreshDependenciesEntry RefreshDependencies::add(RefreshTaskHolder dependency)
{
std::lock_guard lock(dependencies_mutex);
return Entry(*this, dependencies.emplace(dependencies.end(), dependency));
}
void RefreshDependencies::notifyAll(const StorageID & id)
{
std::lock_guard lock(dependencies_mutex);
for (auto && dep : dependencies)
{
if (auto task = dep.lock())
task->notify(id);
}
}
void RefreshDependencies::erase(ContainerIter it)
{
std::lock_guard lock(dependencies_mutex);
dependencies.erase(it);
}
}

View File

@ -0,0 +1,56 @@
#pragma once
#include <Storages/MaterializedView/RefreshTask_fwd.h>
#include <Interpreters/StorageID.h>
#include <list>
namespace DB
{
class RefreshTask;
/// Concurrent primitive for managing list of dependant task and notifying them
class RefreshDependencies
{
using Container = std::list<RefreshTaskObserver>;
using ContainerIter = typename Container::iterator;
public:
class Entry
{
friend class RefreshDependencies;
public:
Entry(Entry &&) noexcept;
Entry & operator=(Entry &&) noexcept;
~Entry();
private:
Entry(RefreshDependencies & deps, ContainerIter it);
void cleanup(RefreshDependencies * deps);
RefreshDependencies * dependencies;
ContainerIter entry_it;
};
RefreshDependencies() = default;
Entry add(RefreshTaskHolder dependency);
void notifyAll(const StorageID & id);
private:
void erase(ContainerIter it);
std::mutex dependencies_mutex;
std::list<RefreshTaskObserver> dependencies;
};
using RefreshDependenciesEntry = RefreshDependencies::Entry;
}

View File

@ -0,0 +1,128 @@
#include <Storages/MaterializedView/RefreshSet.h>
#include <Storages/MaterializedView/RefreshTask.h>
namespace DB
{
RefreshSetElement::RefreshSetElement(RefreshTaskHolder task, StorageID id)
: corresponding_task(task)
, view_id(std::move(id))
{}
RefreshInfo RefreshSetElement::getInfo() const
{
return {
.database = view_id.getDatabaseName(),
.view_name = view_id.getTableName(),
.refresh_status = toString(RefreshTask::TaskState{state.load()}),
.last_refresh_status = toString(RefreshTask::LastTaskState{last_state.load()}),
.last_refresh_time = static_cast<UInt32>(last_s.load(std::memory_order_relaxed)),
.next_refresh_time = static_cast<UInt32>(next_s.load(std::memory_order_relaxed)),
.progress = static_cast<Float64>(written_rows) / total_rows_to_read,
.elapsed_ns = elapsed_ns / 1e9,
.read_rows = read_rows.load(std::memory_order_relaxed),
.read_bytes = read_bytes.load(std::memory_order_relaxed),
.total_rows_to_read = total_rows_to_read.load(std::memory_order_relaxed),
.total_bytes_to_read = total_bytes_to_read.load(std::memory_order_relaxed),
.written_rows = written_rows.load(std::memory_order_relaxed),
.written_bytes = written_bytes.load(std::memory_order_relaxed),
.result_rows = result_rows.load(std::memory_order_relaxed),
.result_bytes = result_bytes.load(std::memory_order_relaxed)
};
}
const StorageID & RefreshSetElement::getID() const
{
return view_id;
}
RefreshTaskHolder RefreshSetElement::getTask() const
{
return corresponding_task.lock();
}
bool RefreshSetLess::operator()(const RefreshSetElement & l, const RefreshSetElement & r) const
{
return l.getID().uuid < r.getID().uuid;
}
bool RefreshSetLess::operator()(const StorageID & l, const RefreshSetElement & r) const
{
return l.uuid < r.getID().uuid;
}
bool RefreshSetLess::operator()(const RefreshSetElement & l, const StorageID & r) const
{
return l.getID().uuid < r.uuid;
}
bool RefreshSetLess::operator()(const StorageID & l, const StorageID & r) const
{
return l.uuid < r.uuid;
}
RefreshSet::Entry::Entry()
: parent_set{nullptr}
, metric_increment{}
{}
RefreshSet::Entry::Entry(Entry && other) noexcept
: parent_set{std::exchange(other.parent_set, nullptr)}
, iter(std::move(other.iter))
, metric_increment(std::move(other.metric_increment))
{}
RefreshSet::Entry & RefreshSet::Entry::operator=(Entry && other) noexcept
{
if (this == &other)
return *this;
cleanup(std::exchange(parent_set, std::exchange(other.parent_set, nullptr)));
iter = std::move(other.iter);
metric_increment = std::move(other.metric_increment);
return *this;
}
RefreshSet::Entry::~Entry()
{
cleanup(parent_set);
}
RefreshSet::Entry::Entry(RefreshSet & set, ContainerIter it, const CurrentMetrics::Metric & metric)
: parent_set{&set}, iter(std::move(it)), metric_increment(metric)
{}
void RefreshSet::Entry::cleanup(RefreshSet * set)
{
if (set)
set->erase(iter);
}
RefreshSet::RefreshSet()
: set_metric(CurrentMetrics::Refresh)
{}
RefreshTaskHolder RefreshSet::getTask(const StorageID & id) const
{
std::lock_guard lock(elements_mutex);
if (auto element = elements.find(id); element != elements.end())
return element->getTask();
return nullptr;
}
RefreshSet::InfoContainer RefreshSet::getInfo() const
{
std::lock_guard lock(elements_mutex);
InfoContainer res;
res.reserve(elements.size());
for (auto && element : elements)
res.emplace_back(element.getInfo());
return res;
}
void RefreshSet::erase(ContainerIter it)
{
std::lock_guard lock(elements_mutex);
elements.erase(it);
}
}

View File

@ -0,0 +1,142 @@
#pragma once
#include <Parsers/ASTIdentifier.h>
#include <Storages/IStorage.h>
#include <Storages/MaterializedView/RefreshTask_fwd.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric Refresh;
}
namespace DB
{
struct RefreshInfo
{
String database;
String view_name;
String refresh_status;
String last_refresh_status;
UInt32 last_refresh_time;
UInt32 next_refresh_time;
Float64 progress;
Float64 elapsed_ns;
UInt64 read_rows;
UInt64 read_bytes;
UInt64 total_rows_to_read;
UInt64 total_bytes_to_read;
UInt64 written_rows;
UInt64 written_bytes;
UInt64 result_rows;
UInt64 result_bytes;
};
class RefreshSetElement
{
friend class RefreshTask;
public:
RefreshSetElement(RefreshTaskHolder task, StorageID id);
RefreshSetElement(const RefreshSetElement &) = delete;
RefreshSetElement & operator=(const RefreshSetElement &) = delete;
RefreshInfo getInfo() const;
RefreshTaskHolder getTask() const;
const StorageID & getID() const;
private:
RefreshTaskObserver corresponding_task;
StorageID view_id;
mutable std::atomic<UInt64> read_rows{0};
mutable std::atomic<UInt64> read_bytes{0};
mutable std::atomic<UInt64> total_rows_to_read{0};
mutable std::atomic<UInt64> total_bytes_to_read{0};
mutable std::atomic<UInt64> written_rows{0};
mutable std::atomic<UInt64> written_bytes{0};
mutable std::atomic<UInt64> result_rows{0};
mutable std::atomic<UInt64> result_bytes{0};
mutable std::atomic<UInt64> elapsed_ns{0};
mutable std::atomic<Int64> last_s{0};
mutable std::atomic<Int64> next_s{0};
mutable std::atomic<RefreshTaskStateUnderlying> state{0};
mutable std::atomic<RefreshTaskStateUnderlying> last_state{0};
};
struct RefreshSetLess
{
using is_transparent = std::true_type;
bool operator()(const RefreshSetElement & l, const RefreshSetElement & r) const;
bool operator()(const StorageID & l, const RefreshSetElement & r) const;
bool operator()(const RefreshSetElement & l, const StorageID & r) const;
bool operator()(const StorageID & l, const StorageID & r) const;
};
/// Set of refreshable views
class RefreshSet
{
private:
using Container = std::set<RefreshSetElement, RefreshSetLess>;
using ContainerIter = typename Container::iterator;
public:
class Entry
{
friend class RefreshSet;
public:
Entry();
Entry(Entry &&) noexcept;
Entry & operator=(Entry &&) noexcept;
~Entry();
const RefreshSetElement * operator->() const { return std::to_address(iter); }
private:
RefreshSet * parent_set;
ContainerIter iter;
std::optional<CurrentMetrics::Increment> metric_increment;
Entry(
RefreshSet & set,
ContainerIter it,
const CurrentMetrics::Metric & metric);
void cleanup(RefreshSet * set);
};
using InfoContainer = std::vector<RefreshInfo>;
RefreshSet();
template <typename... Args>
std::optional<Entry> emplace(Args &&... args)
{
std::lock_guard guard(elements_mutex);
if (auto [it, is_inserted] = elements.emplace(std::forward<Args>(args)...); is_inserted)
return Entry(*this, std::move(it), set_metric);
return {};
}
RefreshTaskHolder getTask(const StorageID & id) const;
InfoContainer getInfo() const;
private:
mutable std::mutex elements_mutex;
Container elements;
CurrentMetrics::Metric set_metric;
void erase(ContainerIter it);
};
using RefreshSetEntry = RefreshSet::Entry;
}

View File

@ -0,0 +1,292 @@
#include <Storages/MaterializedView/RefreshTask.h>
#include <Storages/StorageMaterializedView.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Processors/Executors/ManualPipelineExecutor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
std::uniform_int_distribution<Int64> makeSpreadDistribution(const ASTTimePeriod * spread)
{
if (!spread)
return std::uniform_int_distribution<Int64>(0, 0);
Int64 limit = spread->kind.toAvgSeconds() * spread->value / 2;
return std::uniform_int_distribution(-limit, limit);
}
std::variant<RefreshEveryTimer, RefreshAfterTimer> makeRefreshTimer(const ASTRefreshStrategy & strategy)
{
using enum ASTRefreshStrategy::ScheduleKind;
switch (strategy.schedule_kind)
{
case EVERY:
return RefreshEveryTimer{*strategy.period, strategy.interval};
case AFTER:
return RefreshAfterTimer{strategy.interval};
default:
throw Exception("Unknown refresh strategy kind", ErrorCodes::BAD_ARGUMENTS);
}
}
}
RefreshTask::RefreshTask(
const ASTRefreshStrategy & strategy)
: refresh_timer(makeRefreshTimer(strategy))
, refresh_spread{makeSpreadDistribution(strategy.spread)}
, refresh_immediately{false}
, interrupt_execution{false}
, canceled{false}
{}
RefreshTaskHolder RefreshTask::create(
const StorageMaterializedView & view,
ContextMutablePtr context,
const DB::ASTRefreshStrategy & strategy)
{
auto task = std::make_shared<RefreshTask>(strategy);
task->refresh_task = context->getSchedulePool().createTask("MaterializedViewRefresherTask", task->makePoolTask());
task->set_entry = context->getRefreshSet().emplace(task, view.getStorageID()).value();
if (strategy.dependencies)
{
if (strategy.schedule_kind != ASTRefreshStrategy::ScheduleKind::AFTER)
throw Exception("Dependencies are allowed only for AFTER refresh kind", ErrorCodes::BAD_ARGUMENTS);
task->deps_entries.reserve(strategy.dependencies->children.size());
for (auto && dependency : strategy.dependencies->children)
{
StorageID dep_id(dependency->as<const ASTTableIdentifier &>());
if (auto dep_task = context->getRefreshSet().getTask(dep_id))
task->deps_entries.push_back(dep_task->dependencies.add(task));
}
}
return task;
}
void RefreshTask::initialize(std::shared_ptr<StorageMaterializedView> view)
{
view_to_refresh = view;
}
void RefreshTask::start()
{
storeState(TaskState::Scheduled);
refresh_task->activateAndSchedule();
}
void RefreshTask::stop()
{
refresh_task->deactivate();
cancel();
storeState(TaskState::Disabled);
}
void RefreshTask::run()
{
refresh_immediately.store(true);
refresh_task->activateAndSchedule();
}
void RefreshTask::cancel()
{
canceled.store(true);
interrupt_execution.store(true);
}
void RefreshTask::pause()
{
interrupt_execution.store(true);
}
void RefreshTask::resume()
{
interrupt_execution.store(false);
refresh_immediately.store(true);
refresh_task->schedule();
}
void RefreshTask::notify(const StorageID & parent_id)
{
if (combiner.arriveParent(parent_id))
{
refresh_immediately.store(true);
refresh_task->schedule();
}
}
void RefreshTask::doRefresh()
{
if (refresh_immediately.exchange(false))
{
refresh();
}
else
{
auto now = std::chrono::system_clock::now();
if (now >= next_refresh)
{
if (combiner.arriveTime())
refresh();
}
else
scheduleRefresh(now);
}
}
void RefreshTask::refresh()
{
auto view = lockView();
if (!view)
return;
if (!refresh_executor)
initializeRefresh(view);
storeState(TaskState::Running);
switch (executeRefresh())
{
case ExecutionResult::Paused:
storeState(TaskState::Paused);
return;
case ExecutionResult::Finished:
completeRefresh(view);
storeLastState(LastTaskState::Finished);
break;
case ExecutionResult::Cancelled:
storeLastState(LastTaskState::Canceled);
break;
}
refresh_executor.reset();
refresh_block.reset();
storeLastRefresh(std::chrono::system_clock::now());
scheduleRefresh(last_refresh);
}
RefreshTask::ExecutionResult RefreshTask::executeRefresh()
{
bool not_finished{true};
while (!interrupt_execution.load() && not_finished)
not_finished = refresh_executor->executeStep(interrupt_execution);
if (!not_finished)
return ExecutionResult::Finished;
if (interrupt_execution.load() && !canceled.load())
return ExecutionResult::Paused;
return ExecutionResult::Cancelled;
}
void RefreshTask::initializeRefresh(std::shared_ptr<StorageMaterializedView> view)
{
refresh_query = view->prepareRefreshQuery();
auto refresh_context = Context::createCopy(view->getContext());
refresh_block = InterpreterInsertQuery(refresh_query, refresh_context).execute();
refresh_block->pipeline.setProgressCallback([this](const Progress & progress){ progressCallback(progress); });
canceled.store(false);
interrupt_execution.store(false);
refresh_executor.emplace(refresh_block->pipeline);
}
void RefreshTask::completeRefresh(std::shared_ptr<StorageMaterializedView> view)
{
view->updateInnerTableAfterRefresh(refresh_query);
dependencies.notifyAll(view->getStorageID());
}
void RefreshTask::scheduleRefresh(std::chrono::system_clock::time_point now)
{
using namespace std::chrono_literals;
auto scheduled_refresh = calculateRefreshTime(now) + genSpreadSeconds();
storeNextRefresh(scheduled_refresh);
auto schedule_time = std::chrono::ceil<std::chrono::milliseconds>(scheduled_refresh - now);
storeState(TaskState::Scheduled);
refresh_task->scheduleAfter(std::max(schedule_time, 0ms).count());
}
namespace
{
template <typename... Ts>
struct CombinedVisitor : Ts... { using Ts::operator()...; };
template <typename... Ts>
CombinedVisitor(Ts...) -> CombinedVisitor<Ts...>;
}
std::chrono::sys_seconds RefreshTask::calculateRefreshTime(std::chrono::system_clock::time_point now) const
{
CombinedVisitor refresh_time_visitor{
[now](const RefreshAfterTimer & timer) { return timer.after(now); },
[now](const RefreshEveryTimer & timer) { return timer.next(now); }};
return std::visit<std::chrono::sys_seconds>(std::move(refresh_time_visitor), refresh_timer);
}
std::chrono::seconds RefreshTask::genSpreadSeconds()
{
return std::chrono::seconds{refresh_spread(thread_local_rng)};
}
void RefreshTask::progressCallback(const Progress & progress)
{
set_entry->read_rows.store(progress.read_rows, std::memory_order_relaxed);
set_entry->read_bytes.store(progress.read_bytes, std::memory_order_relaxed);
set_entry->total_rows_to_read.store(progress.total_rows_to_read, std::memory_order_relaxed);
set_entry->total_bytes_to_read.store(progress.total_bytes_to_read, std::memory_order_relaxed);
set_entry->written_rows.store(progress.written_rows, std::memory_order_relaxed);
set_entry->written_bytes.store(progress.written_bytes, std::memory_order_relaxed);
set_entry->result_rows.store(progress.result_rows, std::memory_order_relaxed);
set_entry->result_bytes.store(progress.result_bytes, std::memory_order_relaxed);
set_entry->elapsed_ns.store(progress.elapsed_ns, std::memory_order_relaxed);
}
std::shared_ptr<StorageMaterializedView> RefreshTask::lockView()
{
return std::static_pointer_cast<StorageMaterializedView>(view_to_refresh.lock());
}
void RefreshTask::storeState(TaskState task_state)
{
state.store(task_state);
set_entry->state.store(static_cast<RefreshTaskStateUnderlying>(task_state));
}
void RefreshTask::storeLastState(LastTaskState task_state)
{
last_state = task_state;
set_entry->last_state.store(static_cast<RefreshTaskStateUnderlying>(task_state));
}
void RefreshTask::storeLastRefresh(std::chrono::system_clock::time_point last)
{
last_refresh = last;
auto secs = std::chrono::floor<std::chrono::seconds>(last);
set_entry->last_s.store(secs.time_since_epoch().count());
}
void RefreshTask::storeNextRefresh(std::chrono::system_clock::time_point next)
{
next_refresh = next;
auto secs = std::chrono::floor<std::chrono::seconds>(next);
set_entry->next_s.store(secs.time_since_epoch().count());
}
}

View File

@ -0,0 +1,150 @@
#pragma once
#include <Storages/MaterializedView/RefreshAllCombiner.h>
#include <Storages/MaterializedView/RefreshDependencies.h>
#include <Storages/MaterializedView/RefreshSet.h>
#include <Storages/MaterializedView/RefreshTask_fwd.h>
#include <Storages/MaterializedView/RefreshTimers.h>
#include <Processors/Executors/ManualPipelineExecutor.h>
#include <Core/BackgroundSchedulePool.h>
#include <random>
namespace DB
{
class StorageMaterializedView;
class ASTRefreshStrategy;
class RefreshTask : public std::enable_shared_from_this<RefreshTask>
{
public:
enum class TaskState : RefreshTaskStateUnderlying
{
Disabled = 0,
Scheduled,
Running,
Paused
};
enum class LastTaskState : RefreshTaskStateUnderlying
{
Unknown = 0,
Canceled,
Finished
};
/// Never call it manual, public for shared_ptr construction only
RefreshTask(const ASTRefreshStrategy & strategy);
/// The only proper way to construct task
static RefreshTaskHolder create(
const StorageMaterializedView & view,
ContextMutablePtr context,
const DB::ASTRefreshStrategy & strategy);
void initialize(std::shared_ptr<StorageMaterializedView> view);
/// Enable task scheduling
void start();
/// Disable task scheduling
void stop();
/// Schedule task immediately
void run();
/// Cancel task execution
void cancel();
/// Pause task execution (must be either resumed or canceled later)
void pause();
/// Resume task execution
void resume();
/// Notify dependant task
void notify(const StorageID & parent_id);
private:
enum class ExecutionResult : UInt8
{
Finished,
Paused,
Cancelled
};
void doRefresh();
void scheduleRefresh(std::chrono::system_clock::time_point now);
void refresh();
ExecutionResult executeRefresh();
void initializeRefresh(std::shared_ptr<StorageMaterializedView> view);
void completeRefresh(std::shared_ptr<StorageMaterializedView> view);
std::chrono::sys_seconds calculateRefreshTime(std::chrono::system_clock::time_point now) const;
std::chrono::seconds genSpreadSeconds();
void progressCallback(const Progress & progress);
auto makePoolTask()
{
return [self = this->weak_from_this()]
{
if (auto task = self.lock())
task->doRefresh();
};
}
std::shared_ptr<StorageMaterializedView> lockView();
void storeState(TaskState task_state);
void storeLastState(LastTaskState task_state);
void storeLastRefresh(std::chrono::system_clock::time_point last);
void storeNextRefresh(std::chrono::system_clock::time_point next);
/// Task ownership
BackgroundSchedulePool::TaskHolder refresh_task;
std::weak_ptr<IStorage> view_to_refresh;
RefreshSet::Entry set_entry;
/// Task execution
std::optional<ManualPipelineExecutor> refresh_executor;
std::optional<BlockIO> refresh_block;
std::shared_ptr<ASTInsertQuery> refresh_query;
/// Concurrent dependency management
RefreshAllCombiner combiner;
RefreshDependencies dependencies;
std::vector<RefreshDependencies::Entry> deps_entries;
/// Refresh time settings and data
std::chrono::system_clock::time_point last_refresh;
std::chrono::system_clock::time_point next_refresh;
std::variant<RefreshEveryTimer, RefreshAfterTimer> refresh_timer;
/// Refresh time randomization
std::uniform_int_distribution<Int64> refresh_spread;
/// Task state
std::atomic<TaskState> state{TaskState::Disabled};
LastTaskState last_state{LastTaskState::Unknown};
/// Outer triggers
std::atomic_bool refresh_immediately;
std::atomic_bool interrupt_execution;
std::atomic_bool canceled;
};
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <base/types.h>
#include <memory>
namespace DB
{
class RefreshTask;
using RefreshTaskStateUnderlying = UInt8;
using RefreshTaskHolder = std::shared_ptr<RefreshTask>;
using RefreshTaskObserver = std::weak_ptr<RefreshTask>;
}

View File

@ -0,0 +1,243 @@
#include <Storages/MaterializedView/RefreshTimers.h>
#include <Parsers/ASTTimeInterval.h>
namespace DB
{
namespace
{
constexpr std::chrono::days ZERO_DAYS{0};
constexpr std::chrono::days ONE_DAY{1};
}
RefreshAfterTimer::RefreshAfterTimer(const ASTTimeInterval * time_interval)
{
if (time_interval)
{
for (auto && [kind, value] : time_interval->kinds)
setWithKind(kind, value);
}
}
std::chrono::sys_seconds RefreshAfterTimer::after(std::chrono::system_clock::time_point tp) const
{
auto tp_date = std::chrono::floor<std::chrono::days>(tp);
auto tp_time_offset = std::chrono::floor<std::chrono::seconds>(tp - tp_date);
std::chrono::year_month_day ymd(tp_date);
ymd += years;
ymd += months;
std::chrono::sys_days date = ymd;
date += weeks;
date += days;
auto result = std::chrono::time_point_cast<std::chrono::seconds>(date);
result += tp_time_offset;
result += hours;
result += minutes;
result += seconds;
return result;
}
void RefreshAfterTimer::setWithKind(IntervalKind kind, UInt64 val)
{
switch (kind)
{
case IntervalKind::Second:
seconds = std::chrono::seconds{val};
break;
case IntervalKind::Minute:
minutes = std::chrono::minutes{val};
break;
case IntervalKind::Hour:
hours = std::chrono::hours{val};
break;
case IntervalKind::Day:
days = std::chrono::days{val};
break;
case IntervalKind::Week:
weeks = std::chrono::weeks{val};
break;
case IntervalKind::Month:
months = std::chrono::months{val};
break;
case IntervalKind::Year:
years = std::chrono::years{val};
break;
default:
break;
}
}
RefreshEveryTimer::RefreshEveryTimer(const ASTTimePeriod & time_period, const ASTTimeInterval * time_offset)
: offset(time_offset)
, value{static_cast<UInt32>(time_period.value)}
, kind{time_period.kind}
{
// TODO: validate invariants
}
std::chrono::sys_seconds RefreshEveryTimer::next(std::chrono::system_clock::time_point tp) const
{
if (value == 0)
return std::chrono::floor<std::chrono::seconds>(tp);
switch (kind)
{
case IntervalKind::Second:
return alignedToSeconds(tp);
case IntervalKind::Minute:
return alignedToMinutes(tp);
case IntervalKind::Hour:
return alignedToHours(tp);
case IntervalKind::Day:
return alignedToDays(tp);
case IntervalKind::Week:
return alignedToWeeks(tp);
case IntervalKind::Month:
return alignedToMonths(tp);
case IntervalKind::Year:
return alignedToYears(tp);
default:
return std::chrono::ceil<std::chrono::seconds>(tp);
}
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToYears(std::chrono::system_clock::time_point tp) const
{
using namespace std::chrono_literals;
auto tp_days = std::chrono::floor<std::chrono::days>(tp);
std::chrono::year_month_day tp_ymd(tp_days);
auto normalize_years = [](std::chrono::year year) -> std::chrono::sys_days
{
return year / std::chrono::January / 1d;
};
auto prev_years = normalize_years(tp_ymd.year());
if (auto prev_time = offset.after(prev_years); prev_time > tp)
return prev_time;
auto next_years = normalize_years(tp_ymd.year() + std::chrono::years{1});
return offset.after(next_years);
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToMonths(std::chrono::system_clock::time_point tp) const
{
using namespace std::chrono_literals;
auto tp_days = std::chrono::floor<std::chrono::days>(tp);
std::chrono::year_month_day tp_ymd(tp_days);
auto normalize_months = [](const std::chrono::year_month_day & ymd, unsigned month_value) -> std::chrono::sys_days
{
return ymd.year() / std::chrono::month{month_value} / 1d;
};
auto prev_month_value = static_cast<unsigned>(tp_ymd.month()) / value * value;
auto prev_months = normalize_months(tp_ymd, prev_month_value);
if (auto prev_time = offset.after(prev_months); prev_time > tp)
return prev_time;
auto next_month_value = (static_cast<unsigned>(tp_ymd.month()) / value + 1) * value;
auto next_months = normalize_months(tp_ymd, next_month_value);
std::chrono::year_month_day next_ymd(next_months);
if (next_ymd.year() > tp_ymd.year())
return offset.after(normalize_months(next_ymd, value));
return offset.after(next_months);
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToWeeks(std::chrono::system_clock::time_point tp) const
{
using namespace std::chrono_literals;
auto cpp_weekday = offset.getDays() + ONE_DAY;
std::chrono::weekday offset_weekday((cpp_weekday - std::chrono::floor<std::chrono::weeks>(cpp_weekday)).count());
auto tp_days = std::chrono::floor<std::chrono::days>(tp);
std::chrono::year_month_weekday tp_ymd(tp_days);
auto normalize_weeks = [offset_weekday](const std::chrono::year_month_weekday & ymd, unsigned week_value)
{
return std::chrono::sys_days(ymd.year() / ymd.month() / std::chrono::weekday{offset_weekday}[week_value]);
};
auto prev_week_value = tp_ymd.index() / value * value;
auto prev_days = normalize_weeks(tp_ymd, prev_week_value);
if (auto prev_time = offset.after(prev_days - offset.getDays()); prev_time > tp)
return prev_time;
auto next_day_value = (tp_ymd.index() / value + 1) * value;
auto next_days = normalize_weeks(tp_ymd, next_day_value);
std::chrono::year_month_weekday next_ymd(next_days);
if (next_ymd.year() > tp_ymd.year() || next_ymd.month() > tp_ymd.month())
return offset.after(normalize_weeks(next_ymd, value) - offset.getDays());
return offset.after(next_days);
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToDays(std::chrono::system_clock::time_point tp) const
{
auto tp_days = std::chrono::floor<std::chrono::days>(tp);
std::chrono::year_month_day tp_ymd(tp_days);
auto normalize_days = [](const std::chrono::year_month_day & ymd, unsigned day_value) -> std::chrono::sys_days
{
return ymd.year() / ymd.month() / std::chrono::day{day_value};
};
auto prev_day_value = static_cast<unsigned>(tp_ymd.day()) / value * value;
auto prev_days = normalize_days(tp_ymd, prev_day_value);
if (auto prev_time = offset.after(prev_days); prev_time > tp)
return prev_time;
auto next_day_value = (static_cast<unsigned>(tp_ymd.day()) / value + 1) * value;
auto next_days = normalize_days(tp_ymd, next_day_value);
std::chrono::year_month_day next_ymd(next_days);
if (next_ymd.year() > tp_ymd.year() || next_ymd.month() > tp_ymd.month())
return offset.after(normalize_days(next_ymd, value));
return offset.after(next_days);
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToHours(std::chrono::system_clock::time_point tp) const
{
using namespace std::chrono_literals;
auto tp_days = std::chrono::floor<std::chrono::days>(tp);
auto tp_hours = std::chrono::floor<std::chrono::hours>(tp - tp_days);
auto prev_hours = (tp_hours / value) * value;
if (auto prev_time = offset.after(tp_days + prev_hours); prev_time > tp)
return prev_time;
auto next_hours = (tp_hours / value + 1h) * value;
if (std::chrono::floor<std::chrono::days>(next_hours - 1h) > ZERO_DAYS)
return offset.after(tp_days + ONE_DAY + std::chrono::hours{value});
return offset.after(tp_days + next_hours);
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToMinutes(std::chrono::system_clock::time_point tp) const
{
using namespace std::chrono_literals;
auto tp_hours = std::chrono::floor<std::chrono::hours>(tp);
auto tp_minutes = std::chrono::floor<std::chrono::minutes>(tp - tp_hours);
auto prev_minutes = (tp_minutes / value) * value;
if (auto prev_time = offset.after(tp_hours + prev_minutes); prev_time > tp)
return prev_time;
auto next_minutes = (tp_minutes / value + 1min) * value;
if (std::chrono::floor<std::chrono::hours>(next_minutes - 1min) > 0h)
return offset.after(tp_hours + 1h + std::chrono::minutes{value});
return offset.after(tp_hours + next_minutes);
}
std::chrono::sys_seconds RefreshEveryTimer::alignedToSeconds(std::chrono::system_clock::time_point tp) const
{
using namespace std::chrono_literals;
auto tp_minutes = std::chrono::floor<std::chrono::minutes>(tp);
auto tp_seconds = std::chrono::floor<std::chrono::seconds>(tp - tp_minutes);
auto next_seconds= (tp_seconds / value + 1s) * value;
if (std::chrono::floor<std::chrono::minutes>(next_seconds - 1s) > 0min)
return tp_minutes + 1min + std::chrono::seconds{value};
return tp_minutes + next_seconds;
}
}

View File

@ -0,0 +1,69 @@
#pragma once
#include <Common/IntervalKind.h>
#include <chrono>
namespace DB
{
class ASTTimeInterval;
class ASTTimePeriod;
/// Schedule timer for MATERIALIZED VIEW ... REFRESH AFTER ... queries
class RefreshAfterTimer
{
public:
explicit RefreshAfterTimer(const ASTTimeInterval * time_interval);
std::chrono::sys_seconds after(std::chrono::system_clock::time_point tp) const;
std::chrono::seconds getSeconds() const { return seconds; }
std::chrono::minutes getMinutes() const { return minutes; }
std::chrono::hours getHours() const { return hours; }
std::chrono::days getDays() const { return days; }
std::chrono::weeks getWeeks() const { return weeks; }
std::chrono::months getMonths() const { return months; }
std::chrono::years getYears() const { return years; }
private:
void setWithKind(IntervalKind kind, UInt64 val);
std::chrono::seconds seconds{0};
std::chrono::minutes minutes{0};
std::chrono::hours hours{0};
std::chrono::days days{0};
std::chrono::weeks weeks{0};
std::chrono::months months{0};
std::chrono::years years{0};
};
/// Schedule timer for MATERIALIZED VIEW ... REFRESH EVERY ... queries
class RefreshEveryTimer
{
public:
explicit RefreshEveryTimer(const ASTTimePeriod & time_period, const ASTTimeInterval * time_offset);
std::chrono::sys_seconds next(std::chrono::system_clock::time_point tp) const;
private:
std::chrono::sys_seconds alignedToYears(std::chrono::system_clock::time_point tp) const;
std::chrono::sys_seconds alignedToMonths(std::chrono::system_clock::time_point tp) const;
std::chrono::sys_seconds alignedToWeeks(std::chrono::system_clock::time_point tp) const;
std::chrono::sys_seconds alignedToDays(std::chrono::system_clock::time_point tp) const;
std::chrono::sys_seconds alignedToHours(std::chrono::system_clock::time_point tp) const;
std::chrono::sys_seconds alignedToMinutes(std::chrono::system_clock::time_point tp) const;
std::chrono::sys_seconds alignedToSeconds(std::chrono::system_clock::time_point tp) const;
RefreshAfterTimer offset;
UInt32 value{0};
IntervalKind kind{IntervalKind::Second};
};
}

View File

@ -0,0 +1,27 @@
#include <gtest/gtest.h>
#include <Storages/MaterializedView/RefreshTimers.h>
#include <Parsers/ASTTimeInterval.h>
using namespace DB;
TEST(Timers, AfterTimer)
{
using namespace std::chrono;
auto interval = std::make_shared<ASTTimeInterval>();
interval->kinds = {
{IntervalKind::Week, 2},
{IntervalKind::Day, 3},
{IntervalKind::Minute, 15},
};
RefreshAfterTimer timer(interval.get());
sys_days date_in = 2023y / January / 18d;
auto secs_in = date_in + 23h + 57min;
sys_days date_out = 2023y / February / 5d;
auto secs_out = date_out + 0h + 12min;
ASSERT_EQ(secs_out, timer.after(secs_in));
}

View File

@ -1,5 +1,7 @@
#include <Storages/StorageMaterializedView.h>
#include <Storages/MaterializedView/RefreshTask.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTCreateQuery.h>
@ -7,6 +9,7 @@
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Access/Common/AccessFlags.h>
@ -38,6 +41,11 @@ namespace ErrorCodes
extern const int QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW;
}
namespace ActionLocks
{
extern const StorageActionBlockType ViewRefresh;
}
static inline String generateInnerTableName(const StorageID & view_id)
{
if (view_id.hasUUID())
@ -126,6 +134,12 @@ StorageMaterializedView::StorageMaterializedView(
target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->getDatabase(), manual_create_query->getTable()}, getContext())->getStorageID();
}
if (query.refresh_strategy)
refresher = RefreshTask::create(
*this,
getContext(),
*query.refresh_strategy);
}
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(
@ -236,23 +250,24 @@ void StorageMaterializedView::dropInnerTableIfAny(bool sync, ContextPtr local_co
/// See the comment in StorageMaterializedView::drop.
/// DDL queries with StorageMaterializedView are fundamentally broken.
/// Best-effort to make them work: the inner table name is almost always less than the MV name (so it's safe to lock DDLGuard)
bool may_lock_ddl_guard = getStorageID().getQualifiedName() < target_table_id.getQualifiedName();
auto inner_table_id = getTargetTableId();
bool may_lock_ddl_guard = getStorageID().getQualifiedName() < inner_table_id.getQualifiedName();
if (has_inner_table && tryGetTargetTable())
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id,
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, inner_table_id,
sync, /* ignore_sync_setting */ true, may_lock_ddl_guard);
}
void StorageMaterializedView::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{
if (has_inner_table)
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, getContext(), local_context, target_table_id, true);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, getContext(), local_context, getTargetTableId(), true);
}
void StorageMaterializedView::checkStatementCanBeForwarded() const
{
if (!has_inner_table)
throw Exception(ErrorCodes::INCORRECT_QUERY, "MATERIALIZED VIEW targets existing table {}. "
"Execute the statement directly on it.", target_table_id.getNameForLogs());
"Execute the statement directly on it.", getTargetTableId().getNameForLogs());
}
bool StorageMaterializedView::optimize(
@ -270,6 +285,48 @@ bool StorageMaterializedView::optimize(
return getTargetTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, local_context);
}
std::shared_ptr<ASTInsertQuery> StorageMaterializedView::prepareRefreshQuery()
{
auto inner_table_id = getTargetTableId();
auto new_table_name = ".tmp" + generateInnerTableName(getStorageID());
auto db = DatabaseCatalog::instance().getDatabase(inner_table_id.database_name);
auto create_table_query = db->getCreateTableQuery(inner_table_id.table_name, getContext());
auto & create_query = create_table_query->as<ASTCreateQuery &>();
create_query.setTable(new_table_name);
create_query.setDatabase(db->getDatabaseName());
create_query.create_or_replace = true;
create_query.replace_table = true;
create_query.uuid = UUIDHelpers::Nil;
auto create_ctx = Context::createCopy(getContext());
InterpreterCreateQuery create_interpreter(create_table_query, create_ctx);
create_interpreter.setInternal(true);
create_interpreter.execute();
auto insert_query = std::make_shared<ASTInsertQuery>();
insert_query->setTable(new_table_name);
insert_query->setDatabase(db->getDatabaseName());
insert_query->select = getInMemoryMetadataPtr()->getSelectQuery().select_query;
return insert_query;
}
void StorageMaterializedView::updateInnerTableAfterRefresh(std::shared_ptr<ASTInsertQuery> refresh_query)
{
auto inner_table_id = getTargetTableId();
auto db = DatabaseCatalog::instance().getDatabase(inner_table_id.database_name);
auto target_db = DatabaseCatalog::instance().getDatabase(refresh_query->getDatabase());
auto rename_ctx = Context::createCopy(getContext());
target_db->renameTable(
rename_ctx, refresh_query->getTable(), *db, inner_table_id.table_name, /*exchange=*/true, /*dictionary=*/false);
setTargetTableId(db->getTable(refresh_query->getTable(), getContext())->getStorageID());
}
void StorageMaterializedView::alter(
const AlterCommands & params,
ContextPtr local_context,
@ -332,6 +389,7 @@ void StorageMaterializedView::mutate(const MutationCommands & commands, ContextP
void StorageMaterializedView::renameInMemory(const StorageID & new_table_id)
{
auto old_table_id = getStorageID();
auto inner_table_id = getTargetTableId();
auto metadata_snapshot = getInMemoryMetadataPtr();
bool from_atomic_to_atomic_database = old_table_id.hasUUID() && new_table_id.hasUUID();
@ -340,14 +398,14 @@ void StorageMaterializedView::renameInMemory(const StorageID & new_table_id)
auto new_target_table_name = generateInnerTableName(new_table_id);
auto rename = std::make_shared<ASTRenameQuery>();
assert(target_table_id.database_name == old_table_id.database_name);
assert(inner_table_id.database_name == old_table_id.database_name);
ASTRenameQuery::Element elem
{
ASTRenameQuery::Table
{
target_table_id.database_name.empty() ? nullptr : std::make_shared<ASTIdentifier>(target_table_id.database_name),
std::make_shared<ASTIdentifier>(target_table_id.table_name)
inner_table_id.database_name.empty() ? nullptr : std::make_shared<ASTIdentifier>(inner_table_id.database_name),
std::make_shared<ASTIdentifier>(inner_table_id.table_name)
},
ASTRenameQuery::Table
{
@ -358,15 +416,14 @@ void StorageMaterializedView::renameInMemory(const StorageID & new_table_id)
rename->elements.emplace_back(std::move(elem));
InterpreterRenameQuery(rename, getContext()).execute();
target_table_id.database_name = new_table_id.database_name;
target_table_id.table_name = new_target_table_name;
updateTargetTableId(new_table_id.database_name, new_target_table_name);
}
IStorage::renameInMemory(new_table_id);
if (from_atomic_to_atomic_database && has_inner_table)
{
assert(target_table_id.database_name == old_table_id.database_name);
target_table_id.database_name = new_table_id.database_name;
assert(inner_table_id.database_name == old_table_id.database_name);
updateTargetTableId(new_table_id.database_name, std::nullopt);
}
const auto & select_query = metadata_snapshot->getSelectQuery();
// TODO Actually we don't need to update dependency if MV has UUID, but then db and table name will be outdated
@ -379,10 +436,19 @@ void StorageMaterializedView::startup()
const auto & select_query = metadata_snapshot->getSelectQuery();
if (!select_query.select_table_id.empty())
DatabaseCatalog::instance().addViewDependency(select_query.select_table_id, getStorageID());
if (refresher)
{
refresher->initialize(std::static_pointer_cast<StorageMaterializedView>(shared_from_this()));
refresher->start();
}
}
void StorageMaterializedView::shutdown(bool)
{
if (refresher)
refresher->stop();
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto & select_query = metadata_snapshot->getSelectQuery();
/// Make sure the dependency is removed after DETACH TABLE
@ -393,13 +459,13 @@ void StorageMaterializedView::shutdown(bool)
StoragePtr StorageMaterializedView::getTargetTable() const
{
checkStackSize();
return DatabaseCatalog::instance().getTable(target_table_id, getContext());
return DatabaseCatalog::instance().getTable(getTargetTableId(), getContext());
}
StoragePtr StorageMaterializedView::tryGetTargetTable() const
{
checkStackSize();
return DatabaseCatalog::instance().tryGetTable(target_table_id, getContext());
return DatabaseCatalog::instance().tryGetTable(getTargetTableId(), getContext());
}
NamesAndTypesList StorageMaterializedView::getVirtuals() const
@ -472,6 +538,8 @@ std::optional<UInt64> StorageMaterializedView::totalBytesUncompressed(const Sett
ActionLock StorageMaterializedView::getActionLock(StorageActionBlockType type)
{
if (type == ActionLocks::ViewRefresh && refresher)
refresher->stop();
if (has_inner_table)
{
if (auto target_table = tryGetTargetTable())
@ -487,6 +555,34 @@ bool StorageMaterializedView::isRemote() const
return false;
}
void StorageMaterializedView::onActionLockRemove(StorageActionBlockType action_type)
{
if (action_type == ActionLocks::ViewRefresh && refresher)
refresher->start();
/// TODO: Do we need to release action lock on inner table?
}
DB::StorageID StorageMaterializedView::getTargetTableId() const
{
std::lock_guard guard(target_table_id_mutex);
return target_table_id;
}
void StorageMaterializedView::setTargetTableId(DB::StorageID id)
{
std::lock_guard guard(target_table_id_mutex);
target_table_id = std::move(id);
}
void StorageMaterializedView::updateTargetTableId(std::optional<String> database_name, std::optional<String> table_name)
{
std::lock_guard guard(target_table_id_mutex);
if (database_name)
target_table_id.database_name = *std::move(database_name);
if (table_name)
target_table_id.table_name = *std::move(table_name);
}
void registerStorageMaterializedView(StorageFactory & factory)
{
factory.registerStorage("MaterializedView", [](const StorageFactory::Arguments & args)

View File

@ -5,6 +5,7 @@
#include <Storages/IStorage.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/MaterializedView/RefreshTask_fwd.h>
namespace DB
{
@ -76,6 +77,7 @@ public:
NamesAndTypesList getVirtuals() const override;
ActionLock getActionLock(StorageActionBlockType type) override;
void onActionLockRemove(StorageActionBlockType action_type) override;
void read(
QueryPlan & query_plan,
@ -98,12 +100,25 @@ public:
std::optional<UInt64> totalBytesUncompressed(const Settings & settings) const override;
private:
mutable std::mutex target_table_id_mutex;
/// Will be initialized in constructor
StorageID target_table_id = StorageID::createEmpty();
RefreshTaskHolder refresher;
bool has_inner_table = false;
friend class RefreshTask;
void checkStatementCanBeForwarded() const;
std::shared_ptr<ASTInsertQuery> prepareRefreshQuery();
void updateInnerTableAfterRefresh(std::shared_ptr<ASTInsertQuery> refresh_query);
StorageID getTargetTableId() const;
void setTargetTableId(StorageID id);
void updateTargetTableId(std::optional<String> database_name, std::optional<String> table_name);
};
}

View File

@ -0,0 +1,67 @@
#include <Storages/System/StorageSystemViewRefreshes.h>
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Storages/MaterializedView/RefreshSet.h>
namespace DB
{
NamesAndTypesList StorageSystemViewRefreshes::getNamesAndTypes()
{
return {
{"database", std::make_shared<DataTypeString>()},
{"view", std::make_shared<DataTypeString>()},
{"refresh_status", std::make_shared<DataTypeString>()},
{"last_refresh_status", std::make_shared<DataTypeString>()},
{"last_refresh_time", std::make_shared<DataTypeDateTime>()},
{"next_refresh_time", std::make_shared<DataTypeDateTime>()},
{"progress", std::make_shared<DataTypeFloat64>()},
{"elapsed", std::make_shared<DataTypeFloat64>()},
{"read_rows", std::make_shared<DataTypeUInt64>()},
{"read_bytes", std::make_shared<DataTypeUInt64>()},
{"total_rows", std::make_shared<DataTypeUInt64>()},
{"total_bytes", std::make_shared<DataTypeUInt64>()},
{"written_rows", std::make_shared<DataTypeUInt64>()},
{"written_bytes", std::make_shared<DataTypeUInt64>()},
{"result_rows", std::make_shared<DataTypeUInt64>()},
{"result_bytes", std::make_shared<DataTypeUInt64>()},
};
}
void StorageSystemViewRefreshes::fillData(
MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
auto access = context->getAccess();
// TODO: Do we need to add new access type?
auto valid_access = AccessType::SHOW_TABLES;
bool check_access_for_tables = !access->isGranted(valid_access);
for (const auto & refresh : context->getRefreshSet().getInfo())
{
if (check_access_for_tables && !access->isGranted(valid_access, refresh.database, refresh.view_name))
continue;
std::size_t i = 0;
res_columns[i++]->insert(refresh.database);
res_columns[i++]->insert(refresh.view_name);
res_columns[i++]->insert(refresh.refresh_status);
res_columns[i++]->insert(refresh.last_refresh_status);
res_columns[i++]->insert(refresh.last_refresh_time);
res_columns[i++]->insert(refresh.next_refresh_time);
res_columns[i++]->insert(refresh.progress);
res_columns[i++]->insert(refresh.elapsed_ns);
res_columns[i++]->insert(refresh.read_rows);
res_columns[i++]->insert(refresh.read_bytes);
res_columns[i++]->insert(refresh.total_rows_to_read);
res_columns[i++]->insert(refresh.total_bytes_to_read);
res_columns[i++]->insert(refresh.written_rows);
res_columns[i++]->insert(refresh.written_bytes);
res_columns[i++]->insert(refresh.result_rows);
res_columns[i++]->insert(refresh.result_bytes);
}
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class StorageSystemViewRefreshes final : public IStorageSystemOneBlock<StorageSystemViewRefreshes>
{
public:
std::string getName() const override { return "SystemViewRefreshes"; }
static NamesAndTypesList getNamesAndTypes();
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -87,6 +87,7 @@
#include <Storages/System/StorageSystemScheduler.h>
#include <Storages/System/StorageSystemS3Queue.h>
#include <Storages/System/StorageSystemDashboards.h>
#include <Storages/System/StorageSystemViewRefreshes.h>
#if defined(__ELF__) && !defined(OS_FREEBSD)
#include <Storages/System/StorageSystemSymbols.h>
@ -209,6 +210,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b
attach<StorageSystemJemallocBins>(context, system_database, "jemalloc_bins");
attach<StorageSystemS3Queue>(context, system_database, "s3queue");
attach<StorageSystemDashboards>(context, system_database, "dashboards");
attach<StorageSystemViewRefreshes>(context, system_database, "view_refreshes");
if (has_zookeeper)
{