ClickHouse/src/Storages/WindowView/StorageWindowView.cpp

1636 lines
61 KiB
C++
Raw Normal View History

#include <numeric>
#include <regex>
2021-11-19 13:09:12 +00:00
2020-01-14 03:07:31 +00:00
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
2020-02-12 17:39:57 +00:00
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsTimeWindow.h>
2020-01-14 03:07:31 +00:00
#include <Interpreters/AddDefaultDatabaseVisitor.h>
2020-06-09 08:48:04 +00:00
#include <Interpreters/Context.h>
2020-01-14 03:07:31 +00:00
#include <Interpreters/InDepthNodeVisitor.h>
2020-02-12 17:39:57 +00:00
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
2020-01-14 03:07:31 +00:00
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/QueryAliasesVisitor.h>
2021-12-31 07:07:01 +00:00
#include <Interpreters/QueryNormalizer.h>
2020-01-14 03:07:31 +00:00
#include <Interpreters/getTableExpressions.h>
2020-02-12 17:39:57 +00:00
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTAsterisk.h>
2020-01-14 03:07:31 +00:00
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
2020-01-14 03:07:31 +00:00
#include <Parsers/ASTWatchQuery.h>
#include <Parsers/parseQuery.h>
2020-01-14 03:07:31 +00:00
#include <Parsers/formatAST.h>
2021-11-19 13:09:12 +00:00
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Sources/BlocksSource.h>
2020-02-22 17:06:10 +00:00
#include <Processors/Sources/SourceFromSingleChunk.h>
2021-11-19 13:09:12 +00:00
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
2021-05-28 07:36:19 +00:00
#include <Processors/Transforms/ExpressionTransform.h>
2020-02-22 17:06:10 +00:00
#include <Processors/Transforms/FilterTransform.h>
2021-11-19 13:09:12 +00:00
#include <Processors/Transforms/WatermarkTransform.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
2022-05-11 09:00:49 +00:00
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
2021-11-19 13:09:12 +00:00
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sinks/EmptySink.h>
2020-01-14 03:07:31 +00:00
#include <Storages/StorageFactory.h>
#include <Common/typeid_cast.h>
2021-11-19 13:09:12 +00:00
#include <base/sleep.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
2020-01-14 03:07:31 +00:00
#include <Storages/LiveView/StorageBlocks.h>
2020-01-14 03:07:31 +00:00
#include <Storages/WindowView/StorageWindowView.h>
2021-11-19 13:09:12 +00:00
#include <Storages/WindowView/WindowViewSource.h>
2020-01-14 03:07:31 +00:00
2021-11-19 13:09:12 +00:00
#include <QueryPipeline/printPipeline.h>
2020-01-14 03:07:31 +00:00
namespace DB
{
namespace ErrorCodes
{
2020-03-01 18:08:52 +00:00
extern const int ARGUMENT_OUT_OF_BOUND;
2020-07-21 09:43:42 +00:00
extern const int BAD_ARGUMENTS;
2022-02-13 14:54:03 +00:00
extern const int SYNTAX_ERROR;
2020-07-27 09:32:15 +00:00
extern const int ILLEGAL_COLUMN;
2020-07-21 09:43:42 +00:00
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
2020-01-14 03:07:31 +00:00
extern const int INCORRECT_QUERY;
extern const int LOGICAL_ERROR;
2020-01-14 03:07:31 +00:00
extern const int QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW;
extern const int SUPPORT_IS_DISABLED;
extern const int TABLE_WAS_NOT_DROPPED;
2020-01-14 03:07:31 +00:00
}
namespace
{
2021-12-07 08:14:00 +00:00
/// Fetch all window info and replace tumble or hop node names with windowID
2021-05-28 07:36:19 +00:00
struct FetchQueryInfoMatcher
2020-01-14 03:07:31 +00:00
{
2021-05-28 07:36:19 +00:00
using Visitor = InDepthNodeVisitor<FetchQueryInfoMatcher, true>;
2020-03-22 15:03:16 +00:00
using TypeToVisit = ASTFunction;
2020-01-14 03:07:31 +00:00
2020-07-20 13:32:34 +00:00
struct Data
{
ASTPtr window_function;
String window_id_name;
String window_id_alias;
String serialized_window_function;
String timestamp_column_name;
bool is_tumble = false;
bool is_hop = false;
};
static bool needChildVisit(ASTPtr &, const ASTPtr &) { return true; }
2020-01-14 03:07:31 +00:00
2020-07-20 13:32:34 +00:00
static void visit(ASTPtr & ast, Data & data)
2020-01-14 03:07:31 +00:00
{
2020-07-20 13:32:34 +00:00
if (auto * t = ast->as<ASTFunction>())
2020-01-14 03:07:31 +00:00
{
2021-12-07 08:14:00 +00:00
if (t->name == "tumble" || t->name == "hop")
{
2021-12-07 08:14:00 +00:00
data.is_tumble = t->name == "tumble";
data.is_hop = t->name == "hop";
2021-12-04 12:30:04 +00:00
auto temp_node = t->clone();
temp_node->setAlias("");
2020-07-20 13:32:34 +00:00
if (!data.window_function)
{
2021-12-04 12:30:04 +00:00
data.serialized_window_function = serializeAST(*temp_node);
2021-12-07 08:14:00 +00:00
t->name = "windowID";
2020-07-20 13:32:34 +00:00
data.window_id_name = t->getColumnName();
data.window_id_alias = t->alias;
data.window_function = t->clone();
data.window_function->setAlias("");
data.timestamp_column_name = t->arguments->children[0]->getColumnName();
}
else
{
if (serializeAST(*temp_node) != data.serialized_window_function)
throw Exception("WINDOW VIEW only support ONE TIME WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
2021-12-07 08:14:00 +00:00
t->name = "windowID";
2020-07-20 13:32:34 +00:00
}
}
2020-01-14 03:07:31 +00:00
}
}
};
2020-02-14 08:55:56 +00:00
2021-12-10 15:12:27 +00:00
/// Replace windowID node name with either tumble or hop
2020-07-20 13:32:34 +00:00
struct ReplaceWindowIdMatcher
{
2020-07-20 13:32:34 +00:00
public:
using Visitor = InDepthNodeVisitor<ReplaceWindowIdMatcher, true>;
struct Data
{
String window_name;
};
static bool needChildVisit(ASTPtr &, const ASTPtr &) { return true; }
2020-07-20 13:32:34 +00:00
static void visit(ASTPtr & ast, Data & data)
{
2020-07-20 13:32:34 +00:00
if (auto * t = ast->as<ASTFunction>())
2020-06-17 15:06:19 +00:00
{
2021-12-07 08:14:00 +00:00
if (t->name == "windowID")
2020-07-20 13:32:34 +00:00
t->name = data.window_name;
2020-06-17 15:06:19 +00:00
}
}
};
2021-12-07 08:14:00 +00:00
/// GROUP BY tumble(now(), INTERVAL '5' SECOND)
2021-11-24 09:55:36 +00:00
/// will become
2021-12-07 08:14:00 +00:00
/// GROUP BY tumble(____timestamp, INTERVAL '5' SECOND)
2020-07-19 18:56:55 +00:00
struct ReplaceFunctionNowData
2020-03-22 15:03:16 +00:00
{
using TypeToVisit = ASTFunction;
bool is_time_column_func_now = false;
String window_id_name;
2021-12-03 09:48:18 +00:00
String now_timezone;
2020-03-22 15:03:16 +00:00
void visit(ASTFunction & node, ASTPtr & node_ptr)
{
2021-12-07 08:14:00 +00:00
if (node.name == "windowID" || node.name == "tumble" || node.name == "hop")
2020-03-22 15:03:16 +00:00
{
2021-11-24 09:55:36 +00:00
if (const auto * t = node.arguments->children[0]->as<ASTFunction>();
t && t->name == "now")
2020-03-22 15:03:16 +00:00
{
2021-12-03 09:48:18 +00:00
if (!t->children.empty())
{
const auto & children = t->children[0]->as<ASTExpressionList>()->children;
if (!children.empty())
{
const auto * timezone_ast = children[0]->as<ASTLiteral>();
if (timezone_ast)
now_timezone = timezone_ast->value.safeGet<String>();
}
}
2020-03-22 15:03:16 +00:00
is_time_column_func_now = true;
node_ptr->children[0]->children[0] = std::make_shared<ASTIdentifier>("____timestamp");
window_id_name = node.getColumnName();
2020-03-22 15:03:16 +00:00
}
}
}
};
2020-07-19 18:56:55 +00:00
using ReplaceFunctionNowVisitor = InDepthNodeVisitor<OneTypeMatcher<ReplaceFunctionNowData>, true>;
2020-07-20 13:32:34 +00:00
struct ReplaceFunctionWindowMatcher
{
2020-07-20 13:32:34 +00:00
using Visitor = InDepthNodeVisitor<ReplaceFunctionWindowMatcher, true>;
struct Data{};
2020-07-20 13:32:34 +00:00
static bool needChildVisit(ASTPtr &, const ASTPtr &) { return true; }
static void visit(ASTPtr & ast, Data &)
{
2020-07-20 13:32:34 +00:00
if (auto * t = ast->as<ASTFunction>())
{
2021-12-07 08:14:00 +00:00
if (t->name == "hop" || t->name == "tumble")
t->name = "windowID";
2020-07-20 13:32:34 +00:00
}
2020-03-22 15:03:16 +00:00
}
};
2020-07-19 18:56:55 +00:00
class ToIdentifierMatcher
2020-02-17 05:06:03 +00:00
{
public:
2020-07-19 18:56:55 +00:00
using Visitor = InDepthNodeVisitor<ToIdentifierMatcher, true>;
2020-02-17 05:06:03 +00:00
struct Data
{
String window_id_name;
String window_id_alias;
2020-02-17 05:06:03 +00:00
};
2020-03-22 15:03:16 +00:00
static bool needChildVisit(ASTPtr &, const ASTPtr &) { return true; }
2020-02-17 05:06:03 +00:00
static void visit(ASTPtr & ast, Data & data)
{
if (const auto * t = ast->as<ASTFunction>())
visit(*t, ast, data);
2020-03-22 15:03:16 +00:00
if (const auto * t = ast->as<ASTIdentifier>())
visit(*t, ast, data);
2020-02-17 05:06:03 +00:00
}
private:
static void visit(const ASTFunction & node, ASTPtr & node_ptr, Data &)
2020-02-17 05:06:03 +00:00
{
if (node.name == "tuple")
2021-11-30 08:16:24 +00:00
{
2021-12-07 08:14:00 +00:00
/// tuple(windowID(timestamp, toIntervalSecond('5')))
return;
2021-11-30 08:16:24 +00:00
}
else
2021-11-30 08:16:24 +00:00
{
2021-12-07 08:14:00 +00:00
/// windowID(timestamp, toIntervalSecond('5')) -> identifier.
2021-11-30 08:16:24 +00:00
/// and other...
node_ptr = std::make_shared<ASTIdentifier>(node.getColumnName());
2021-11-30 08:16:24 +00:00
}
2020-02-17 05:06:03 +00:00
}
2020-03-22 15:03:16 +00:00
static void visit(const ASTIdentifier & node, ASTPtr & node_ptr, Data & data)
{
if (node.getColumnName() == data.window_id_alias)
2021-05-29 10:29:38 +00:00
{
if (auto identifier = std::dynamic_pointer_cast<ASTIdentifier>(node_ptr))
identifier->setShortName(data.window_id_name);
}
2020-03-22 15:03:16 +00:00
}
2020-02-17 05:06:03 +00:00
};
struct DropTableIdentifierMatcher
{
using Visitor = InDepthNodeVisitor<DropTableIdentifierMatcher, true>;
struct Data{};
static bool needChildVisit(ASTPtr &, const ASTPtr &) { return true; }
static void visit(ASTPtr & ast, Data &)
{
if (auto * t = ast->as<ASTIdentifier>())
{
ast = std::make_shared<ASTIdentifier>(t->shortName());
}
}
};
2020-07-21 17:41:03 +00:00
IntervalKind strToIntervalKind(const String& interval_str)
2020-02-14 08:07:03 +00:00
{
if (interval_str == "Nanosecond")
return IntervalKind::Nanosecond;
else if (interval_str == "Microsecond")
return IntervalKind::Microsecond;
else if (interval_str == "Millisecond")
return IntervalKind::Millisecond;
else if (interval_str == "Second")
2020-02-14 08:07:03 +00:00
return IntervalKind::Second;
else if (interval_str == "Minute")
return IntervalKind::Minute;
else if (interval_str == "Hour")
return IntervalKind::Hour;
else if (interval_str == "Day")
return IntervalKind::Day;
else if (interval_str == "Week")
return IntervalKind::Week;
else if (interval_str == "Month")
return IntervalKind::Month;
else if (interval_str == "Quarter")
return IntervalKind::Quarter;
else if (interval_str == "Year")
return IntervalKind::Year;
__builtin_unreachable();
}
2020-07-21 09:43:42 +00:00
2020-07-21 17:41:03 +00:00
void extractWindowArgument(const ASTPtr & ast, IntervalKind::Kind & kind, Int64 & num_units, String err_msg)
2020-07-21 09:43:42 +00:00
{
const auto * arg = ast->as<ASTFunction>();
if (!arg || !startsWith(arg->name, "toInterval"))
throw Exception(err_msg, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
2021-11-30 08:16:24 +00:00
kind = strToIntervalKind(arg->name.substr(10));
2020-07-21 09:43:42 +00:00
const auto * interval_unit = arg->children.front()->children.front()->as<ASTLiteral>();
if (!interval_unit
2021-11-22 15:09:45 +00:00
|| (interval_unit->value.getType() != Field::Types::String
&& interval_unit->value.getType() != Field::Types::UInt64))
2020-07-21 09:43:42 +00:00
throw Exception("Interval argument must be integer", ErrorCodes::BAD_ARGUMENTS);
2021-11-22 15:09:45 +00:00
2020-07-21 09:43:42 +00:00
if (interval_unit->value.getType() == Field::Types::String)
2021-11-22 15:09:45 +00:00
num_units = parse<Int64>(interval_unit->value.safeGet<String>());
2020-07-21 09:43:42 +00:00
else
num_units = interval_unit->value.safeGet<UInt64>();
if (num_units <= 0)
throw Exception("Value for Interval argument must be positive.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
2020-07-26 19:11:27 +00:00
UInt32 addTime(UInt32 time_sec, IntervalKind::Kind kind, Int64 num_units, const DateLUTImpl & time_zone)
{
switch (kind)
{
case IntervalKind::Nanosecond:
case IntervalKind::Microsecond:
case IntervalKind::Millisecond:
throw Exception("Fractional seconds are not supported by windows yet", ErrorCodes::SYNTAX_ERROR);
2020-07-26 19:11:27 +00:00
#define CASE_WINDOW_KIND(KIND) \
case IntervalKind::KIND: { \
return AddTime<IntervalKind::KIND>::execute(time_sec, num_units, time_zone); \
}
CASE_WINDOW_KIND(Second)
CASE_WINDOW_KIND(Minute)
CASE_WINDOW_KIND(Hour)
CASE_WINDOW_KIND(Day)
CASE_WINDOW_KIND(Week)
CASE_WINDOW_KIND(Month)
CASE_WINDOW_KIND(Quarter)
CASE_WINDOW_KIND(Year)
#undef CASE_WINDOW_KIND
}
__builtin_unreachable();
}
class AddingAggregatedChunkInfoTransform : public ISimpleTransform
{
public:
2022-05-07 17:33:42 +00:00
explicit AddingAggregatedChunkInfoTransform(Block header) : ISimpleTransform(header, header, false) { }
void transform(Chunk & chunk) override { chunk.setChunkInfo(std::make_shared<AggregatedChunkInfo>()); }
String getName() const override { return "AddingAggregatedChunkInfoTransform"; }
};
2022-05-14 16:46:49 +00:00
2022-05-15 17:40:42 +00:00
String generateInnerTableName(const StorageID & storage_id)
2022-05-14 16:46:49 +00:00
{
if (storage_id.hasUUID())
return ".inner." + toString(storage_id.uuid);
return ".inner." + storage_id.getTableName();
}
2022-05-15 17:40:42 +00:00
String generateTargetTableName(const StorageID & storage_id)
2022-05-14 16:46:49 +00:00
{
if (storage_id.hasUUID())
return ".inner.target." + toString(storage_id.uuid);
return ".inner.target." + storage_id.table_name;
}
2022-05-15 17:40:42 +00:00
ASTPtr generateInnerFetchQuery(StorageID inner_table_id)
2022-05-14 16:46:49 +00:00
{
auto fetch_query = std::make_shared<ASTSelectQuery>();
auto select = std::make_shared<ASTExpressionList>();
select->children.push_back(std::make_shared<ASTAsterisk>());
fetch_query->setExpression(ASTSelectQuery::Expression::SELECT, select);
fetch_query->setExpression(ASTSelectQuery::Expression::TABLES, std::make_shared<ASTTablesInSelectQuery>());
auto tables_elem = std::make_shared<ASTTablesInSelectQueryElement>();
auto table_expr = std::make_shared<ASTTableExpression>();
fetch_query->tables()->children.push_back(tables_elem);
tables_elem->table_expression = table_expr;
tables_elem->children.push_back(table_expr);
table_expr->database_and_table_name = std::make_shared<ASTTableIdentifier>(inner_table_id);
table_expr->children.push_back(table_expr->database_and_table_name);
return fetch_query;
}
2020-01-14 03:07:31 +00:00
}
2021-12-06 07:12:21 +00:00
static void extractDependentTable(ContextPtr context, ASTPtr & query, String & select_database_name, String & select_table_name)
2020-01-14 03:07:31 +00:00
{
2021-12-06 07:12:21 +00:00
ASTSelectQuery & select_query = typeid_cast<ASTSelectQuery &>(*query);
auto db_and_table = getDatabaseAndTable(select_query, 0);
ASTPtr subquery = extractTableExpression(select_query, 0);
2020-01-14 03:07:31 +00:00
if (!db_and_table && !subquery)
return;
if (db_and_table)
{
select_table_name = db_and_table->table;
if (db_and_table->database.empty())
{
db_and_table->database = select_database_name;
2021-11-19 13:09:12 +00:00
AddDefaultDatabaseVisitor visitor(context, select_database_name);
2021-12-06 07:12:21 +00:00
visitor.visit(select_query);
2020-01-14 03:07:31 +00:00
}
else
select_database_name = db_and_table->database;
}
else if (auto * ast_select = subquery->as<ASTSelectWithUnionQuery>())
{
if (ast_select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for WINDOW VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
2020-07-19 19:00:21 +00:00
auto & inner_select_query = ast_select->list_of_selects->children.at(0);
2020-01-14 03:07:31 +00:00
2021-12-06 07:12:21 +00:00
extractDependentTable(context, inner_select_query, select_database_name, select_table_name);
2020-01-14 03:07:31 +00:00
}
else
throw Exception(
"Logical error while creating StorageWindowView."
" Could not retrieve table name from select query.",
DB::ErrorCodes::LOGICAL_ERROR);
2021-11-19 13:09:12 +00:00
}
2020-07-13 14:31:54 +00:00
UInt32 StorageWindowView::getCleanupBound()
{
2022-05-15 16:50:34 +00:00
if (max_fired_watermark == 0)
return 0;
if (is_proctime)
return max_fired_watermark;
else
2020-07-13 14:31:54 +00:00
{
2022-05-15 16:50:34 +00:00
auto w_bound = max_fired_watermark;
if (allowed_lateness)
w_bound = addTime(w_bound, lateness_kind, -lateness_num_units, *time_zone);
return getWindowLowerBound(w_bound);
2020-07-13 14:31:54 +00:00
}
}
2021-05-28 07:36:19 +00:00
ASTPtr StorageWindowView::getCleanupQuery()
2020-03-25 17:56:49 +00:00
{
ASTPtr function_equal;
2021-11-22 15:09:45 +00:00
function_equal = makeASTFunction(
"less",
2022-05-18 08:05:06 +00:00
std::make_shared<ASTIdentifier>(inner_window_id_column_name),
2021-11-22 15:09:45 +00:00
std::make_shared<ASTLiteral>(getCleanupBound()));
2020-03-25 17:56:49 +00:00
2021-05-28 07:36:19 +00:00
auto alter_query = std::make_shared<ASTAlterQuery>();
2021-11-20 10:10:16 +00:00
alter_query->setDatabase(inner_table_id.database_name);
alter_query->setTable(inner_table_id.table_name);
2021-11-20 15:34:45 +00:00
alter_query->uuid = inner_table_id.uuid;
2021-05-28 07:36:19 +00:00
alter_query->set(alter_query->command_list, std::make_shared<ASTExpressionList>());
2021-11-19 13:09:12 +00:00
alter_query->alter_object = ASTAlterQuery::AlterObjectType::TABLE;
2021-05-28 07:36:19 +00:00
2020-03-25 17:56:49 +00:00
auto alter_command = std::make_shared<ASTAlterCommand>();
alter_command->type = ASTAlterCommand::DELETE;
alter_command->predicate = function_equal;
alter_command->children.push_back(alter_command->predicate);
2021-05-28 07:36:19 +00:00
alter_query->command_list->children.push_back(alter_command);
2020-03-25 17:56:49 +00:00
return alter_query;
}
2021-05-28 07:36:19 +00:00
void StorageWindowView::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
2020-02-17 08:18:27 +00:00
{
2021-11-20 15:34:45 +00:00
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, getContext(), local_context, inner_table_id, true);
2020-02-17 08:18:27 +00:00
}
2020-07-15 14:38:50 +00:00
bool StorageWindowView::optimize(
const ASTPtr & query,
2021-05-28 07:36:19 +00:00
const StorageMetadataPtr & /*metadata_snapshot*/,
2020-07-15 14:38:50 +00:00
const ASTPtr & partition,
bool final,
bool deduplicate,
2021-05-28 07:36:19 +00:00
const Names & deduplicate_by_columns,
ContextPtr local_context)
2020-02-22 17:06:10 +00:00
{
2021-05-28 07:36:19 +00:00
auto storage_ptr = getInnerStorage();
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
return getInnerStorage()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, local_context);
2020-02-22 17:06:10 +00:00
}
2021-11-19 13:09:12 +00:00
std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
{
2022-05-04 12:56:09 +00:00
UInt32 w_start = addTime(watermark, window_kind, -window_num_units, *time_zone);
2021-11-19 13:09:12 +00:00
2022-05-14 16:46:49 +00:00
auto inner_storage = getInnerStorage();
2021-11-19 13:09:12 +00:00
InterpreterSelectQuery fetch(
2022-05-14 16:46:49 +00:00
inner_fetch_query,
2022-04-01 09:58:23 +00:00
getContext(),
2022-05-14 16:46:49 +00:00
inner_storage,
inner_storage->getInMemoryMetadataPtr(),
2021-11-19 13:09:12 +00:00
SelectQueryOptions(QueryProcessingStage::FetchColumns));
auto builder = fetch.buildQueryPipeline();
2022-05-14 16:46:49 +00:00
ASTPtr filter_function;
if (is_tumble)
{
/// SELECT * FROM inner_table WHERE window_id_name == w_end
/// (because we fire at the end of windows)
2022-05-18 08:05:06 +00:00
filter_function = makeASTFunction("equals", std::make_shared<ASTIdentifier>(inner_window_id_column_name), std::make_shared<ASTLiteral>(watermark));
2022-05-14 16:46:49 +00:00
}
else
{
auto func_array = makeASTFunction("array");
auto w_end = watermark;
while (w_start < w_end)
{
/// slice_num_units = std::gcd(hop_num_units, window_num_units);
/// We use std::gcd(hop_num_units, window_num_units) as the new window size
/// to split the overlapped windows into non-overlapped.
/// For a hopping window with window_size=3 slice=1, the windows might be
/// [1,3],[2,4],[3,5], which will cause recomputation.
/// In this case, the slice_num_units will be `gcd(1,3)=1' and the non-overlapped
/// windows will split into [1], [2], [3]... We compute each split window into
/// mergeable state and merge them when the window is triggering.
func_array ->arguments->children.push_back(std::make_shared<ASTLiteral>(w_end));
w_end = addTime(w_end, window_kind, -slice_num_units, *time_zone);
}
2022-05-18 08:05:06 +00:00
filter_function = makeASTFunction("has", func_array, std::make_shared<ASTIdentifier>(inner_window_id_column_name));
2022-05-14 16:46:49 +00:00
}
auto syntax_result = TreeRewriter(getContext()).analyze(filter_function, builder.getHeader().getNamesAndTypesList());
auto filter_expression = ExpressionAnalyzer(filter_function, syntax_result, getContext()).getActionsDAG(false);
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header, std::make_shared<ExpressionActions>(filter_expression), filter_function->getColumnName(), true);
});
/// Adding window column
DataTypes window_column_type{std::make_shared<DataTypeDateTime>(), std::make_shared<DataTypeDateTime>()};
ColumnWithTypeAndName column;
2022-05-18 08:05:06 +00:00
column.name = inner_window_column_name;
column.type = std::make_shared<DataTypeTuple>(std::move(window_column_type));
column.column = column.type->createColumnConst(0, Tuple{w_start, watermark});
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions
= std::make_shared<ExpressionActions>(std::move(adding_column_dag), ExpressionActionsSettings::fromContext(getContext()));
2022-05-07 17:33:42 +00:00
builder.addSimpleTransform([&](const Block & header)
{
2022-05-07 17:33:42 +00:00
return std::make_shared<ExpressionTransform>(header, adding_column_actions);
});
2021-11-19 13:09:12 +00:00
/// Removing window id column
auto new_header = builder.getHeader();
2022-05-18 08:05:06 +00:00
new_header.erase(inner_window_id_column_name);
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
builder.getHeader().getColumnsWithTypeAndName(),
new_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto actions = std::make_shared<ExpressionActions>(
convert_actions_dag, ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & stream_header)
2021-11-19 13:09:12 +00:00
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
2021-11-19 13:09:12 +00:00
builder.addSimpleTransform([&](const Block & header)
2021-11-19 13:09:12 +00:00
{
return std::make_shared<AddingAggregatedChunkInfoTransform>(header);
2021-11-19 13:09:12 +00:00
});
Pipes pipes;
2021-11-19 13:09:12 +00:00
auto pipe = QueryPipelineBuilder::getPipe(std::move(builder));
pipes.emplace_back(std::move(pipe));
2022-01-19 02:24:27 +00:00
auto creator = [&](const StorageID & blocks_id_global)
{
auto parent_table_metadata = getParentStorage()->getInMemoryMetadataPtr();
auto required_columns = parent_table_metadata->getColumns();
required_columns.add(ColumnDescription("____timestamp", std::make_shared<DataTypeDateTime>()));
return StorageBlocks::createStorage(blocks_id_global, required_columns, std::move(pipes), QueryProcessingStage::WithMergeableState);
};
2022-04-01 09:58:23 +00:00
TemporaryTableHolder blocks_storage(getContext(), creator);
2021-11-19 13:09:12 +00:00
2021-11-22 15:09:45 +00:00
InterpreterSelectQuery select(
getFinalQuery(),
2022-04-01 09:58:23 +00:00
getContext(),
blocks_storage.getTable(),
blocks_storage.getTable()->getInMemoryMetadataPtr(),
SelectQueryOptions(QueryProcessingStage::Complete));
2021-11-19 13:09:12 +00:00
builder = select.buildQueryPipeline();
builder.addSimpleTransform([&](const Block & current_header)
{
return std::make_shared<MaterializingTransform>(current_header);
});
builder.addSimpleTransform([&](const Block & current_header)
{
return std::make_shared<SquashingChunksTransform>(
current_header,
getContext()->getSettingsRef().min_insert_block_size_rows,
getContext()->getSettingsRef().min_insert_block_size_bytes);
});
auto header = builder.getHeader();
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline);
Block block;
BlocksPtr new_blocks = std::make_shared<Blocks>();
2021-11-20 09:03:39 +00:00
2021-11-19 13:09:12 +00:00
while (executor.pull(block))
{
if (block.rows() == 0)
continue;
new_blocks->push_back(std::move(block));
}
return std::make_pair(new_blocks, header);
2020-02-12 17:39:57 +00:00
}
2020-03-01 18:08:52 +00:00
inline void StorageWindowView::fire(UInt32 watermark)
2020-01-14 03:07:31 +00:00
{
2021-11-22 15:09:45 +00:00
LOG_TRACE(log, "Watch streams number: {}, target table: {}",
2021-11-20 15:34:45 +00:00
watch_streams.size(),
target_table_id.empty() ? "None" : target_table_id.getNameForLogs());
if (target_table_id.empty() && watch_streams.empty())
return;
2021-11-19 13:09:12 +00:00
BlocksPtr blocks;
Block header;
2022-05-14 16:46:49 +00:00
try
2020-03-03 04:42:12 +00:00
{
std::lock_guard lock(mutex);
2021-11-19 13:09:12 +00:00
std::tie(blocks, header) = getNewBlocks(watermark);
2020-03-03 04:42:12 +00:00
}
2022-05-14 16:46:49 +00:00
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2020-03-03 04:42:12 +00:00
2022-05-16 13:59:55 +00:00
if (!blocks || blocks->empty())
return;
2021-11-19 13:09:12 +00:00
for (const auto & block : *blocks)
{
2021-11-19 13:09:12 +00:00
for (auto & watch_stream : watch_streams)
{
2021-11-19 13:09:12 +00:00
if (auto watch_stream_ptr = watch_stream.lock())
2021-12-12 07:45:55 +00:00
watch_stream_ptr->addBlock(block, watermark);
}
2022-05-15 17:14:53 +00:00
fire_condition.notify_all();
}
2021-11-19 13:09:12 +00:00
if (!target_table_id.empty())
{
StoragePtr target_table = getTargetStorage();
2021-11-19 13:09:12 +00:00
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = target_table->getStorageID();
InterpreterInsertQuery interpreter(insert, getContext());
auto block_io = interpreter.execute();
2021-11-20 15:34:45 +00:00
2021-11-19 13:09:12 +00:00
auto pipe = Pipe(std::make_shared<BlocksSource>(blocks, header));
2021-11-20 15:34:45 +00:00
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipe.getHeader().getColumnsWithTypeAndName(),
block_io.pipeline.getHeader().getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(
convert_actions_dag,
ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes));
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
2021-11-19 13:09:12 +00:00
block_io.pipeline.complete(std::move(pipe));
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
}
2020-01-14 03:07:31 +00:00
}
2021-11-22 15:09:45 +00:00
std::shared_ptr<ASTCreateQuery> StorageWindowView::getInnerTableCreateQuery(
const ASTPtr & inner_query, ASTStorage * storage, const String & database_name, const String & table_name)
2020-02-12 17:39:57 +00:00
{
/// We will create a query to create an internal table.
auto inner_create_query = std::make_shared<ASTCreateQuery>();
2021-11-20 10:10:16 +00:00
inner_create_query->setDatabase(database_name);
inner_create_query->setTable(table_name);
2020-02-12 17:39:57 +00:00
2021-12-31 07:07:01 +00:00
Aliases aliases;
QueryAliasesVisitor(aliases).visit(inner_query);
auto inner_query_normalized = inner_query->clone();
QueryNormalizer::Data normalizer_data(aliases, {}, false, getContext()->getSettingsRef(), false);
QueryNormalizer(normalizer_data).visit(inner_query_normalized);
auto inner_select_query = std::static_pointer_cast<ASTSelectQuery>(inner_query_normalized);
2020-03-24 02:46:56 +00:00
auto t_sample_block
= InterpreterSelectQuery(inner_select_query, getContext(), SelectQueryOptions(QueryProcessingStage::WithMergeableState))
2022-04-01 09:58:23 +00:00
.getSampleBlock();
2020-02-12 17:39:57 +00:00
auto columns_list = std::make_shared<ASTExpressionList>();
2020-02-17 05:06:03 +00:00
if (is_time_column_func_now)
2020-02-17 05:06:03 +00:00
{
auto column_window = std::make_shared<ASTColumnDeclaration>();
column_window->name = window_id_name;
column_window->type = std::make_shared<ASTIdentifier>("UInt32");
2020-02-17 05:06:03 +00:00
columns_list->children.push_back(column_window);
2022-05-18 08:05:06 +00:00
inner_window_id_column_name = window_id_name;
2020-02-17 05:06:03 +00:00
}
2020-06-17 15:06:19 +00:00
for (const auto & column : t_sample_block.getColumnsWithTypeAndName())
2020-02-12 17:39:57 +00:00
{
ParserIdentifierWithOptionalParameters parser;
2020-03-24 02:46:56 +00:00
String sql = column.type->getName();
2020-06-09 08:48:04 +00:00
ASTPtr ast = parseQuery(parser, sql.data(), sql.data() + sql.size(), "data type", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
2020-02-12 17:39:57 +00:00
auto column_dec = std::make_shared<ASTColumnDeclaration>();
2020-03-24 02:46:56 +00:00
column_dec->name = column.name;
2020-02-12 17:39:57 +00:00
column_dec->type = ast;
columns_list->children.push_back(column_dec);
2022-05-18 08:05:06 +00:00
if (!is_time_column_func_now && inner_window_id_column_name.empty() && startsWith(column.name, "windowID"))
2021-12-31 07:07:01 +00:00
{
2022-05-18 08:05:06 +00:00
inner_window_id_column_name = column.name;
2021-12-31 07:07:01 +00:00
}
2020-02-12 17:39:57 +00:00
}
2022-05-18 08:05:06 +00:00
if (inner_window_id_column_name.empty())
2021-12-31 07:07:01 +00:00
throw Exception(
"The first argument of time window function should not be a constant value.",
ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
2022-05-18 08:05:06 +00:00
inner_window_column_name = std::regex_replace(inner_window_id_column_name, std::regex("windowID"), is_tumble ? "tumble" : "hop");
2022-02-24 06:06:37 +00:00
ToIdentifierMatcher::Data query_data;
query_data.window_id_name = window_id_name;
query_data.window_id_alias = window_id_alias;
ToIdentifierMatcher::Visitor to_identifier_visitor(query_data);
ReplaceFunctionNowData time_now_data;
ReplaceFunctionNowVisitor time_now_visitor(time_now_data);
ReplaceFunctionWindowMatcher::Data func_hop_data;
ReplaceFunctionWindowMatcher::Visitor func_window_visitor(func_hop_data);
DropTableIdentifierMatcher::Data drop_table_identifier_data;
DropTableIdentifierMatcher::Visitor drop_table_identifier_visitor(drop_table_identifier_data);
auto visit = [&](const IAST * ast)
{
auto node = ast->clone();
QueryNormalizer(normalizer_data).visit(node);
/// now() -> ____timestamp
if (is_time_column_func_now)
{
time_now_visitor.visit(node);
function_now_timezone = time_now_data.now_timezone;
}
drop_table_identifier_visitor.visit(node);
/// tumble/hop -> windowID
func_window_visitor.visit(node);
to_identifier_visitor.visit(node);
node->setAlias("");
return node;
};
auto new_storage = std::make_shared<ASTStorage>();
2022-05-14 16:46:49 +00:00
/// storage != nullptr in case create window view with INNER ENGINE syntax
2021-11-24 09:55:36 +00:00
if (storage)
2020-03-22 15:03:16 +00:00
{
if (storage->ttl_table)
2021-11-22 15:09:45 +00:00
throw Exception(
ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW,
"TTL is not supported for inner table in Window View");
2022-05-14 16:46:49 +00:00
new_storage->set(new_storage->engine, storage->engine->clone());
if (endsWith(storage->engine->name, "MergeTree"))
{
if (storage->partition_by)
new_storage->set(new_storage->partition_by, visit(storage->partition_by));
if (storage->primary_key)
new_storage->set(new_storage->primary_key, visit(storage->primary_key));
if (storage->order_by)
new_storage->set(new_storage->order_by, visit(storage->order_by));
if (storage->sample_by)
new_storage->set(new_storage->sample_by, visit(storage->sample_by));
if (storage->settings)
new_storage->set(new_storage->settings, storage->settings->clone());
}
2020-03-22 15:03:16 +00:00
}
2021-11-24 09:55:36 +00:00
else
{
new_storage->set(new_storage->engine, makeASTFunction("AggregatingMergeTree"));
2022-02-24 06:06:37 +00:00
if (inner_select_query->groupBy()->children.size() == 1) //GROUP BY windowID
{
auto node = visit(inner_select_query->groupBy()->children[0].get());
new_storage->set(new_storage->order_by, std::make_shared<ASTIdentifier>(node->getColumnName()));
}
else
{
auto group_by_function = makeASTFunction("tuple");
for (auto & child : inner_select_query->groupBy()->children)
{
auto node = visit(child.get());
group_by_function->arguments->children.push_back(std::make_shared<ASTIdentifier>(node->getColumnName()));
}
new_storage->set(new_storage->order_by, group_by_function);
}
2021-11-24 09:55:36 +00:00
}
2020-03-22 15:03:16 +00:00
auto new_columns = std::make_shared<ASTColumns>();
new_columns->set(new_columns->columns, columns_list);
inner_create_query->set(inner_create_query->columns_list, new_columns);
inner_create_query->set(inner_create_query->storage, new_storage);
2020-02-12 17:39:57 +00:00
return inner_create_query;
2020-02-12 17:39:57 +00:00
}
2021-06-01 03:01:35 +00:00
UInt32 StorageWindowView::getWindowLowerBound(UInt32 time_sec)
2020-03-03 04:42:12 +00:00
{
2022-05-04 04:02:13 +00:00
switch (slide_kind)
2020-03-03 04:42:12 +00:00
{
case IntervalKind::Nanosecond:
case IntervalKind::Microsecond:
case IntervalKind::Millisecond:
throw Exception("Fractional seconds are not supported by windows yet", ErrorCodes::SYNTAX_ERROR);
2020-03-03 04:42:12 +00:00
#define CASE_WINDOW_KIND(KIND) \
case IntervalKind::KIND: \
{ \
if (is_tumble) \
2020-07-27 04:10:52 +00:00
return ToStartOfTransform<IntervalKind::KIND>::execute(time_sec, window_num_units, *time_zone); \
2020-03-03 04:42:12 +00:00
else \
{\
2020-07-27 04:10:52 +00:00
UInt32 w_start = ToStartOfTransform<IntervalKind::KIND>::execute(time_sec, hop_num_units, *time_zone); \
UInt32 w_end = AddTime<IntervalKind::KIND>::execute(w_start, hop_num_units, *time_zone);\
2022-05-04 12:56:09 +00:00
return AddTime<IntervalKind::KIND>::execute(w_end, -window_num_units, *time_zone);\
}\
2020-03-03 04:42:12 +00:00
}
CASE_WINDOW_KIND(Second)
CASE_WINDOW_KIND(Minute)
CASE_WINDOW_KIND(Hour)
CASE_WINDOW_KIND(Day)
CASE_WINDOW_KIND(Week)
CASE_WINDOW_KIND(Month)
CASE_WINDOW_KIND(Quarter)
CASE_WINDOW_KIND(Year)
#undef CASE_WINDOW_KIND
}
__builtin_unreachable();
}
2021-06-01 03:01:35 +00:00
UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec)
2020-01-14 03:07:31 +00:00
{
2022-05-04 04:02:13 +00:00
switch (slide_kind)
2020-01-14 03:07:31 +00:00
{
case IntervalKind::Nanosecond:
case IntervalKind::Microsecond:
case IntervalKind::Millisecond:
2022-02-13 14:54:03 +00:00
throw Exception("Fractional seconds are not supported by window view yet", ErrorCodes::SYNTAX_ERROR);
2020-01-14 03:07:31 +00:00
#define CASE_WINDOW_KIND(KIND) \
case IntervalKind::KIND: \
{ \
2022-05-04 04:02:13 +00:00
UInt32 w_start = ToStartOfTransform<IntervalKind::KIND>::execute(time_sec, slide_num_units, *time_zone); \
return AddTime<IntervalKind::KIND>::execute(w_start, slide_num_units, *time_zone); \
2020-01-14 03:07:31 +00:00
}
2020-01-14 16:24:26 +00:00
CASE_WINDOW_KIND(Second)
CASE_WINDOW_KIND(Minute)
CASE_WINDOW_KIND(Hour)
CASE_WINDOW_KIND(Day)
CASE_WINDOW_KIND(Week)
CASE_WINDOW_KIND(Month)
CASE_WINDOW_KIND(Quarter)
CASE_WINDOW_KIND(Year)
2020-01-14 03:07:31 +00:00
#undef CASE_WINDOW_KIND
}
__builtin_unreachable();
}
2021-11-20 15:34:45 +00:00
void StorageWindowView::addFireSignal(std::set<UInt32> & signals)
{
2020-03-03 04:42:12 +00:00
std::lock_guard lock(fire_signal_mutex);
2020-06-17 15:06:19 +00:00
for (const auto & signal : signals)
2020-03-23 03:21:01 +00:00
fire_signal.push_back(signal);
2020-03-01 18:08:52 +00:00
fire_signal_condition.notify_all();
}
2021-11-20 15:34:45 +00:00
void StorageWindowView::updateMaxTimestamp(UInt32 timestamp)
2020-02-14 08:07:03 +00:00
{
2020-03-03 04:42:12 +00:00
std::lock_guard lock(fire_signal_mutex);
2020-03-01 18:08:52 +00:00
if (timestamp > max_timestamp)
max_timestamp = timestamp;
2020-02-14 08:07:03 +00:00
}
2021-11-20 15:34:45 +00:00
void StorageWindowView::updateMaxWatermark(UInt32 watermark)
2020-02-14 08:07:03 +00:00
{
2020-03-03 04:42:12 +00:00
std::lock_guard lock(fire_signal_mutex);
2020-03-01 18:08:52 +00:00
if (max_watermark == 0)
{
2021-11-30 08:16:24 +00:00
max_watermark = getWindowUpperBound(watermark - 1);
2020-03-01 18:08:52 +00:00
return;
}
2020-03-03 04:42:12 +00:00
bool updated;
2020-03-01 18:08:52 +00:00
if (is_watermark_strictly_ascending)
{
2020-03-03 04:42:12 +00:00
updated = max_watermark < watermark;
2020-03-01 18:08:52 +00:00
while (max_watermark < watermark)
{
fire_signal.push_back(max_watermark);
2022-05-04 04:02:13 +00:00
max_watermark = addTime(max_watermark, slide_kind, slide_num_units, *time_zone);
2020-03-01 18:08:52 +00:00
}
}
else // strictly || bounded
{
2020-07-27 04:10:52 +00:00
UInt32 max_watermark_bias = addTime(max_watermark, watermark_kind, watermark_num_units, *time_zone);
2020-03-03 04:42:12 +00:00
updated = max_watermark_bias <= watermark;
2020-03-01 18:08:52 +00:00
while (max_watermark_bias <= max_timestamp)
{
fire_signal.push_back(max_watermark);
2022-05-04 04:02:13 +00:00
max_watermark = addTime(max_watermark, slide_kind, slide_num_units, *time_zone);
max_watermark_bias = addTime(max_watermark, slide_kind, slide_num_units, *time_zone);
2020-03-01 18:08:52 +00:00
}
}
2021-11-22 15:09:45 +00:00
2020-03-03 04:42:12 +00:00
if (updated)
fire_signal_condition.notify_all();
2020-02-14 08:07:03 +00:00
}
2021-11-20 15:34:45 +00:00
inline void StorageWindowView::cleanup()
{
2022-05-16 12:51:19 +00:00
std::lock_guard fire_signal_lock(fire_signal_mutex);
std::lock_guard mutex_lock(mutex);
2022-05-16 11:31:49 +00:00
auto alter_query = getCleanupQuery();
InterpreterAlterQuery interpreter_alter(alter_query, getContext());
interpreter_alter.execute();
2021-11-20 15:34:45 +00:00
watch_streams.remove_if([](std::weak_ptr<WindowViewSource> & ptr) { return ptr.expired(); });
}
2020-07-13 14:31:54 +00:00
void StorageWindowView::threadFuncCleanup()
2020-02-12 17:39:57 +00:00
{
2021-11-20 15:34:45 +00:00
try
2020-02-12 17:39:57 +00:00
{
2021-11-20 15:34:45 +00:00
cleanup();
2020-02-12 17:39:57 +00:00
}
2021-11-20 15:34:45 +00:00
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!shutdown_called)
clean_cache_task->scheduleAfter(clean_interval_ms);
2020-02-12 17:39:57 +00:00
}
2020-03-01 18:08:52 +00:00
void StorageWindowView::threadFuncFireProc()
2020-01-14 03:07:31 +00:00
{
std::unique_lock lock(fire_signal_mutex);
UInt32 timestamp_now = std::time(nullptr);
2021-11-20 15:34:45 +00:00
2022-05-03 04:42:42 +00:00
while (next_fire_signal <= timestamp_now)
2020-01-14 03:07:31 +00:00
{
try
2020-01-14 03:07:31 +00:00
{
2022-05-03 04:42:42 +00:00
fire(next_fire_signal);
2020-01-14 03:07:31 +00:00
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2022-05-03 04:42:42 +00:00
max_fired_watermark = next_fire_signal;
2022-05-04 04:02:13 +00:00
auto slide_interval = addTime(0, slide_kind, slide_num_units, *time_zone);
/// Convert DayNum into seconds when the slide interval is larger than Day
if (slide_kind > IntervalKind::Day)
2022-05-03 04:42:42 +00:00
slide_interval *= 86400;
next_fire_signal += slide_interval;
2020-03-01 18:08:52 +00:00
}
2020-08-02 18:07:01 +00:00
UInt64 timestamp_ms = static_cast<UInt64>(Poco::Timestamp().epochMicroseconds()) / 1000;
2020-03-01 18:08:52 +00:00
if (!shutdown_called)
2022-04-10 17:28:17 +00:00
fire_task->scheduleAfter(std::max(
UInt64(0),
2022-05-03 04:42:42 +00:00
static_cast<UInt64>(next_fire_signal) * 1000 - timestamp_ms));
2020-03-01 18:08:52 +00:00
}
void StorageWindowView::threadFuncFireEvent()
{
std::unique_lock lock(fire_signal_mutex);
while (!shutdown_called)
{
bool signaled = std::cv_status::no_timeout == fire_signal_condition.wait_for(lock, std::chrono::seconds(5));
if (!signaled)
continue;
2022-01-06 12:48:38 +00:00
LOG_TRACE(log, "Fire events: {}", fire_signal.size());
2020-03-01 18:08:52 +00:00
while (!fire_signal.empty())
{
2020-03-01 18:08:52 +00:00
fire(fire_signal.front());
2022-05-15 16:50:34 +00:00
max_fired_watermark = fire_signal.front();
2020-03-01 18:08:52 +00:00
fire_signal.pop_front();
}
2020-02-14 08:07:03 +00:00
}
}
2022-05-11 09:00:49 +00:00
Pipe StorageWindowView::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
QueryPlan plan;
read(plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(local_context), BuildQueryPipelineSettings::fromContext(local_context));
}
void StorageWindowView::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
if (target_table_id.empty())
return;
2022-05-11 09:00:49 +00:00
auto storage = getTargetStorage();
auto lock = storage->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto target_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto target_storage_snapshot = storage->getStorageSnapshot(target_metadata_snapshot, local_context);
if (query_info.order_optimizer)
query_info.input_order_info = query_info.order_optimizer->getInputOrder(target_metadata_snapshot, local_context);
storage->read(query_plan, column_names, target_storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
if (query_plan.isInitialized())
{
auto wv_header = getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage);
auto target_header = query_plan.getCurrentDataStream().header;
if (!blocksHaveEqualStructure(wv_header, target_header))
{
auto converting_actions = ActionsDAG::makeConvertingActions(
target_header.getColumnsWithTypeAndName(), wv_header.getColumnsWithTypeAndName(), ActionsDAG::MatchColumnsMode::Name);
auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), converting_actions);
converting_step->setStepDescription("Convert Target table structure to WindowView structure");
query_plan.addStep(std::move(converting_step));
}
StreamLocalLimits limits;
SizeLimits leaf_limits;
/// Add table lock for target table.
auto adding_limits_and_quota = std::make_unique<SettingQuotaAndLimitsStep>(
query_plan.getCurrentDataStream(),
storage,
std::move(lock),
limits,
leaf_limits,
nullptr,
nullptr);
adding_limits_and_quota->setStepDescription("Lock target table for WindowView");
query_plan.addStep(std::move(adding_limits_and_quota));
}
}
2021-11-19 13:09:12 +00:00
Pipe StorageWindowView::watch(
2020-01-14 03:07:31 +00:00
const Names & /*column_names*/,
const SelectQueryInfo & query_info,
2021-05-28 07:36:19 +00:00
ContextPtr local_context,
2020-01-14 03:07:31 +00:00
QueryProcessingStage::Enum & processed_stage,
size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
ASTWatchQuery & query = typeid_cast<ASTWatchQuery &>(*query_info.query);
bool has_limit = false;
UInt64 limit = 0;
if (query.limit_length)
{
has_limit = true;
limit = safeGet<UInt64>(typeid_cast<ASTLiteral &>(*query.limit_length).value);
}
2021-11-19 13:09:12 +00:00
auto reader = std::make_shared<WindowViewSource>(
2021-12-13 02:38:16 +00:00
std::static_pointer_cast<StorageWindowView>(shared_from_this()),
2021-12-12 07:45:55 +00:00
query.is_watch_events,
window_view_timezone,
has_limit,
limit,
2021-05-28 07:36:19 +00:00
local_context->getSettingsRef().window_view_heartbeat_interval.totalSeconds());
2020-01-14 03:07:31 +00:00
2020-03-03 04:42:12 +00:00
std::lock_guard lock(fire_signal_mutex);
watch_streams.push_back(reader);
2020-01-14 03:07:31 +00:00
processed_stage = QueryProcessingStage::Complete;
2021-11-19 13:09:12 +00:00
return Pipe(reader);
2020-01-14 03:07:31 +00:00
}
StorageWindowView::StorageWindowView(
2020-01-24 02:45:45 +00:00
const StorageID & table_id_,
2021-05-28 07:36:19 +00:00
ContextPtr context_,
2020-01-14 03:07:31 +00:00
const ASTCreateQuery & query,
2020-02-12 17:39:57 +00:00
const ColumnsDescription & columns_,
bool attach_)
2020-01-24 02:45:45 +00:00
: IStorage(table_id_)
2021-05-28 07:36:19 +00:00
, WithContext(context_->getGlobalContext())
2021-11-20 15:34:45 +00:00
, log(&Poco::Logger::get(fmt::format("StorageWindowView({}.{})", table_id_.database_name, table_id_.table_name)))
2020-01-14 03:07:31 +00:00
{
2020-07-15 14:38:50 +00:00
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
2020-01-14 03:07:31 +00:00
if (!query.select)
2021-11-22 15:09:45 +00:00
throw Exception(ErrorCodes::INCORRECT_QUERY, "SELECT query is not specified for {}", getName());
2020-01-14 03:07:31 +00:00
2022-05-14 16:46:49 +00:00
/// If the target table is not set, use inner target table
inner_target_table = query.to_table_id.empty();
if (inner_target_table && !query.storage)
throw Exception(
"You must specify where to save results of a WindowView query: either ENGINE or an existing table in a TO clause",
ErrorCodes::INCORRECT_QUERY);
2020-01-14 03:07:31 +00:00
if (query.select->list_of_selects->children.size() != 1)
2021-11-22 15:09:45 +00:00
throw Exception(
ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW,
"UNION is not supported for {}", getName());
2020-01-14 03:07:31 +00:00
2021-12-06 07:12:21 +00:00
select_query = query.select->list_of_selects->children.at(0)->clone();
2021-05-28 07:36:19 +00:00
String select_database_name = getContext()->getCurrentDatabase();
2020-01-24 02:45:45 +00:00
String select_table_name;
2021-12-06 07:12:21 +00:00
auto select_query_tmp = select_query->clone();
extractDependentTable(getContext(), select_query_tmp, select_database_name, select_table_name);
2021-11-19 13:09:12 +00:00
2020-07-20 13:32:34 +00:00
/// If the table is not specified - use the table `system.one`
if (select_table_name.empty())
{
select_database_name = "system";
select_table_name = "one";
}
2020-01-24 02:45:45 +00:00
select_table_id = StorageID(select_database_name, select_table_name);
2020-07-20 13:32:34 +00:00
DatabaseCatalog::instance().addDependency(select_table_id, table_id_);
2021-12-07 08:14:00 +00:00
/// Extract all info from query; substitute Function_tumble and Function_hop with Function_windowID.
2021-12-06 07:12:21 +00:00
auto inner_query = innerQueryParser(select_query->as<ASTSelectQuery &>());
2020-01-14 03:07:31 +00:00
2020-07-27 04:10:52 +00:00
// Parse mergeable query
mergeable_query = inner_query->clone();
2020-07-19 18:56:55 +00:00
ReplaceFunctionNowData func_now_data;
ReplaceFunctionNowVisitor(func_now_data).visit(mergeable_query);
is_time_column_func_now = func_now_data.is_time_column_func_now;
if (is_time_column_func_now)
window_id_name = func_now_data.window_id_name;
2021-12-07 08:14:00 +00:00
// Parse final query (same as mergeable query but has tumble/hop instead of windowID)
final_query = mergeable_query->clone();
2020-07-20 13:32:34 +00:00
ReplaceWindowIdMatcher::Data final_query_data;
if (is_tumble)
2021-12-07 08:14:00 +00:00
final_query_data.window_name = "tumble";
2020-07-20 13:32:34 +00:00
else
2021-12-07 08:14:00 +00:00
final_query_data.window_name = "hop";
2020-07-20 13:32:34 +00:00
ReplaceWindowIdMatcher::Visitor(final_query_data).visit(final_query);
2020-03-01 18:08:52 +00:00
is_watermark_strictly_ascending = query.is_watermark_strictly_ascending;
is_watermark_ascending = query.is_watermark_ascending;
is_watermark_bounded = query.is_watermark_bounded;
2020-01-14 03:07:31 +00:00
2021-11-19 13:09:12 +00:00
/// Extract information about watermark, lateness.
2020-07-20 13:32:34 +00:00
eventTimeParser(query);
2020-01-14 03:07:31 +00:00
if (attach_)
2020-02-12 17:39:57 +00:00
{
2022-05-14 16:46:49 +00:00
inner_table_id = StorageID(table_id_.database_name, generateInnerTableName(table_id_));
if (inner_target_table)
target_table_id = StorageID(table_id_.database_name, generateTargetTableName(table_id_));
else
target_table_id = query.to_table_id;
}
else
{
2022-05-14 16:46:49 +00:00
/// create inner table
auto inner_create_query
2022-05-14 16:46:49 +00:00
= getInnerTableCreateQuery(inner_query, query.inner_storage, table_id_.database_name, generateInnerTableName(table_id_));
auto create_context = Context::createCopy(context_);
InterpreterCreateQuery create_interpreter(inner_create_query, create_context);
2021-11-22 15:09:45 +00:00
create_interpreter.setInternal(true);
create_interpreter.execute();
inner_table_id = StorageID(inner_create_query->getDatabase(), inner_create_query->getTable());
2022-05-14 16:46:49 +00:00
if (inner_target_table)
{
/// create inner target table
auto create_context = Context::createCopy(context_);
auto target_create_query = std::make_shared<ASTCreateQuery>();
target_create_query->setDatabase(table_id_.database_name);
target_create_query->setTable(generateTargetTableName(table_id_));
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, query.columns_list->columns->ptr());
target_create_query->set(target_create_query->columns_list, new_columns_list);
target_create_query->set(target_create_query->storage, query.storage->ptr());
InterpreterCreateQuery create_interpreter(target_create_query, create_context);
create_interpreter.setInternal(true);
create_interpreter.execute();
target_table_id = StorageID(target_create_query->getDatabase(), target_create_query->getTable());
}
else
target_table_id = query.to_table_id;
2020-02-12 17:39:57 +00:00
}
2022-05-14 16:46:49 +00:00
inner_fetch_query = generateInnerFetchQuery(inner_table_id);
2021-11-20 15:34:45 +00:00
clean_interval_ms = getContext()->getSettingsRef().window_view_clean_interval.totalMilliseconds();
2020-07-20 13:32:34 +00:00
next_fire_signal = getWindowUpperBound(std::time(nullptr));
2022-04-01 09:58:23 +00:00
clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
2020-03-01 18:08:52 +00:00
if (is_proctime)
2022-04-01 09:58:23 +00:00
fire_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFireProc(); });
2020-03-01 18:08:52 +00:00
else
2022-04-01 09:58:23 +00:00
fire_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFireEvent(); });
2020-03-24 02:46:56 +00:00
clean_cache_task->deactivate();
fire_task->deactivate();
2020-01-14 03:07:31 +00:00
}
2021-12-06 07:12:21 +00:00
ASTPtr StorageWindowView::innerQueryParser(const ASTSelectQuery & query)
2020-01-14 03:07:31 +00:00
{
if (!query.groupBy())
2021-11-22 15:09:45 +00:00
throw Exception(ErrorCodes::INCORRECT_QUERY, "GROUP BY query is required for {}", getName());
2020-01-14 03:07:31 +00:00
2020-07-27 04:10:52 +00:00
// Parse stage mergeable
2020-01-14 03:07:31 +00:00
ASTPtr result = query.clone();
2021-05-28 07:36:19 +00:00
FetchQueryInfoMatcher::Data query_info_data;
2021-11-19 13:09:12 +00:00
FetchQueryInfoMatcher::Visitor(query_info_data).visit(result);
2021-11-22 15:09:45 +00:00
2021-05-28 07:36:19 +00:00
if (!query_info_data.is_tumble && !query_info_data.is_hop)
2021-11-22 15:09:45 +00:00
throw Exception(ErrorCodes::INCORRECT_QUERY,
"TIME WINDOW FUNCTION is not specified for {}", getName());
2021-11-22 15:09:45 +00:00
2021-05-28 07:36:19 +00:00
window_id_name = query_info_data.window_id_name;
window_id_alias = query_info_data.window_id_alias;
timestamp_column_name = query_info_data.timestamp_column_name;
is_tumble = query_info_data.is_tumble;
2020-01-14 03:07:31 +00:00
// Parse time window function
2021-05-28 07:36:19 +00:00
ASTFunction & window_function = typeid_cast<ASTFunction &>(*query_info_data.window_function);
2020-02-12 17:39:57 +00:00
const auto & arguments = window_function.arguments->children;
2020-07-20 13:32:34 +00:00
extractWindowArgument(
2021-11-22 15:09:45 +00:00
arguments.at(1), window_kind, window_num_units,
2020-07-20 13:32:34 +00:00
"Illegal type of second argument of function " + window_function.name + " should be Interval");
2020-03-01 18:08:52 +00:00
2022-05-04 04:02:13 +00:00
slide_kind = window_kind;
slide_num_units = window_num_units;
2020-03-01 18:08:52 +00:00
if (!is_tumble)
{
hop_kind = window_kind;
hop_num_units = window_num_units;
2020-07-20 13:32:34 +00:00
extractWindowArgument(
2021-11-22 15:09:45 +00:00
arguments.at(2), window_kind, window_num_units,
2020-07-20 13:32:34 +00:00
"Illegal type of third argument of function " + window_function.name + " should be Interval");
2021-11-22 15:09:45 +00:00
slice_num_units = std::gcd(hop_num_units, window_num_units);
2020-03-01 18:08:52 +00:00
}
2020-07-27 04:10:52 +00:00
2020-07-27 09:32:15 +00:00
// Parse time zone
2020-07-27 04:10:52 +00:00
size_t time_zone_arg_num = is_tumble ? 2 : 3;
if (arguments.size() > time_zone_arg_num)
{
const auto & ast = arguments.at(time_zone_arg_num);
const auto * time_zone_ast = ast->as<ASTLiteral>();
if (!time_zone_ast || time_zone_ast->value.getType() != Field::Types::String)
throw Exception(
2021-11-22 15:09:45 +00:00
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column #{} of time zone argument of function, must be constant string",
time_zone_arg_num);
2021-12-12 07:45:55 +00:00
window_view_timezone = time_zone_ast->value.safeGet<String>();
time_zone = &DateLUT::instance(window_view_timezone);
2020-07-27 04:10:52 +00:00
}
else
time_zone = &DateLUT::instance();
2020-01-14 03:07:31 +00:00
return result;
}
2020-07-20 13:32:34 +00:00
void StorageWindowView::eventTimeParser(const ASTCreateQuery & query)
{
if (query.is_watermark_strictly_ascending || query.is_watermark_ascending || query.is_watermark_bounded)
{
is_proctime = false;
2021-11-22 15:09:45 +00:00
2020-07-20 13:32:34 +00:00
if (is_time_column_func_now)
2021-11-19 13:09:12 +00:00
throw Exception("now() is not supported for Event time processing.", ErrorCodes::INCORRECT_QUERY);
2021-11-22 15:09:45 +00:00
2020-07-20 13:32:34 +00:00
if (query.is_watermark_ascending)
{
is_watermark_bounded = true;
watermark_kind = IntervalKind::Second;
watermark_num_units = 1;
}
else if (query.is_watermark_bounded)
{
extractWindowArgument(
2021-11-22 15:09:45 +00:00
query.watermark_function, watermark_kind, watermark_num_units,
"Illegal type WATERMARK function should be Interval");
2020-07-20 13:32:34 +00:00
}
}
if (query.allowed_lateness)
{
allowed_lateness = true;
extractWindowArgument(
2021-11-22 15:09:45 +00:00
query.lateness_function, lateness_kind, lateness_num_units,
"Illegal type ALLOWED_LATENESS function should be Interval");
2020-07-20 13:32:34 +00:00
}
}
2021-05-28 07:36:19 +00:00
void StorageWindowView::writeIntoWindowView(
2021-11-22 15:09:45 +00:00
StorageWindowView & window_view, const Block & block, ContextPtr local_context)
2020-01-14 03:07:31 +00:00
{
2020-02-22 17:06:10 +00:00
Pipe pipe(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), Chunk(block.getColumns(), block.rows())));
2020-02-21 12:35:26 +00:00
UInt32 lateness_bound = 0;
UInt32 t_max_watermark = 0;
UInt32 t_max_timestamp = 0;
UInt32 t_max_fired_watermark = 0;
{
std::lock_guard lock(window_view.fire_signal_mutex);
t_max_fired_watermark = window_view.max_fired_watermark;
t_max_watermark = window_view.max_watermark;
t_max_timestamp = window_view.max_timestamp;
}
// Filter outdated data
if (window_view.allowed_lateness && t_max_timestamp != 0)
{
2022-05-04 12:56:09 +00:00
lateness_bound = addTime(t_max_timestamp, window_view.lateness_kind, -window_view.lateness_num_units, *window_view.time_zone);
2021-11-22 15:09:45 +00:00
if (window_view.is_watermark_bounded)
{
2022-05-04 04:02:13 +00:00
UInt32 watermark_lower_bound
= addTime(t_max_watermark, window_view.slide_kind, -window_view.slide_num_units, *window_view.time_zone);
2021-11-22 15:09:45 +00:00
if (watermark_lower_bound < lateness_bound)
lateness_bound = watermark_lower_bound;
}
}
2021-11-20 09:03:39 +00:00
else if (!window_view.is_time_column_func_now)
{
lateness_bound = t_max_fired_watermark;
}
2021-11-22 15:09:45 +00:00
if (lateness_bound > 0) /// Add filter, which leaves rows with timestamp >= lateness_bound
{
2021-05-28 07:36:19 +00:00
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(std::make_shared<ASTIdentifier>(window_view.timestamp_column_name));
args->children.push_back(std::make_shared<ASTLiteral>(lateness_bound));
auto filter_function = std::make_shared<ASTFunction>();
filter_function->name = "greaterOrEquals";
filter_function->arguments = args;
filter_function->children.push_back(filter_function->arguments);
ASTPtr query = filter_function;
NamesAndTypesList columns;
columns.emplace_back(window_view.timestamp_column_name, std::make_shared<DataTypeDateTime>());
auto syntax_result = TreeRewriter(local_context).analyze(query, columns);
auto filter_expression = ExpressionAnalyzer(filter_function, syntax_result, local_context).getActionsDAG(false);
pipe.addSimpleTransform([&](const Block & header)
{
2021-11-22 15:09:45 +00:00
return std::make_shared<FilterTransform>(
header, std::make_shared<ExpressionActions>(filter_expression),
filter_function->getColumnName(), true);
2021-05-28 07:36:19 +00:00
});
}
2020-02-21 12:35:26 +00:00
std::shared_lock<std::shared_mutex> fire_signal_lock;
2021-11-19 13:09:12 +00:00
QueryPipelineBuilder builder;
if (window_view.is_proctime)
2020-02-20 17:30:58 +00:00
{
2020-02-21 12:35:26 +00:00
fire_signal_lock = std::shared_lock<std::shared_mutex>(window_view.fire_signal_mutex);
2021-11-30 08:16:24 +00:00
/// Fill ____timestamp column with current time in case of now() time column.
if (window_view.is_time_column_func_now)
2021-05-28 07:36:19 +00:00
{
ColumnWithTypeAndName column;
column.name = "____timestamp";
2021-12-03 09:48:18 +00:00
const auto & timezone = window_view.function_now_timezone;
if (timezone.empty())
column.type = std::make_shared<DataTypeDateTime>();
else
column.type = std::make_shared<DataTypeDateTime>(timezone);
2021-05-28 07:36:19 +00:00
column.column = column.type->createColumnConst(0, Field(std::time(nullptr)));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions = std::make_shared<ExpressionActions>(
std::move(adding_column_dag),
ExpressionActionsSettings::fromContext(local_context));
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions);
});
}
}
2021-05-28 07:36:19 +00:00
Pipes pipes;
pipes.emplace_back(std::move(pipe));
2020-03-01 18:08:52 +00:00
2022-01-19 02:24:27 +00:00
auto creator = [&](const StorageID & blocks_id_global)
{
auto parent_metadata = window_view.getParentStorage()->getInMemoryMetadataPtr();
auto required_columns = parent_metadata->getColumns();
required_columns.add(ColumnDescription("____timestamp", std::make_shared<DataTypeDateTime>()));
return StorageBlocks::createStorage(blocks_id_global, required_columns, std::move(pipes), QueryProcessingStage::FetchColumns);
};
TemporaryTableHolder blocks_storage(local_context, creator);
2020-03-01 18:08:52 +00:00
InterpreterSelectQuery select_block(
window_view.getMergeableQuery(),
local_context,
blocks_storage.getTable(),
blocks_storage.getTable()->getInMemoryMetadataPtr(),
QueryProcessingStage::WithMergeableState);
builder = select_block.buildQueryPipeline();
2022-01-19 02:24:27 +00:00
builder.addSimpleTransform([&](const Block & current_header)
{
return std::make_shared<SquashingChunksTransform>(
current_header,
local_context->getSettingsRef().min_insert_block_size_rows,
local_context->getSettingsRef().min_insert_block_size_bytes);
});
2020-03-01 18:08:52 +00:00
if (!window_view.is_proctime)
{
2021-11-19 13:09:12 +00:00
UInt32 block_max_timestamp = 0;
if (window_view.is_watermark_bounded || window_view.allowed_lateness)
{
2021-11-22 15:09:45 +00:00
const auto & timestamp_column = *block.getByName(window_view.timestamp_column_name).column;
const auto & timestamp_data = typeid_cast<const ColumnUInt32 &>(timestamp_column).getData();
for (const auto & timestamp : timestamp_data)
{
2021-11-22 15:09:45 +00:00
if (timestamp > block_max_timestamp)
block_max_timestamp = timestamp;
}
}
2020-02-14 08:07:03 +00:00
2021-11-30 08:16:24 +00:00
if (block_max_timestamp)
window_view.updateMaxTimestamp(block_max_timestamp);
2021-11-19 13:09:12 +00:00
UInt32 lateness_upper_bound = 0;
2021-11-30 08:16:24 +00:00
if (window_view.allowed_lateness && t_max_fired_watermark)
2021-11-19 13:09:12 +00:00
lateness_upper_bound = t_max_fired_watermark;
2021-11-30 08:16:24 +00:00
/// On each chunk check window end for each row in a window column, calculating max.
/// Update max watermark (latest seen window end) if needed.
/// If lateness is allowed, add lateness signals.
2021-11-19 13:09:12 +00:00
builder.addSimpleTransform([&](const Block & current_header)
{
return std::make_shared<WatermarkTransform>(
current_header,
window_view,
window_view.window_id_name,
lateness_upper_bound);
});
2020-03-03 04:42:12 +00:00
}
2020-02-12 17:39:57 +00:00
2021-11-20 15:34:45 +00:00
auto inner_storage = window_view.getInnerStorage();
2021-11-22 15:09:45 +00:00
auto lock = inner_storage->lockForShare(
local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
2021-11-19 13:09:12 +00:00
auto metadata_snapshot = inner_storage->getInMemoryMetadataPtr();
auto output = inner_storage->write(window_view.getMergeableQuery(), metadata_snapshot, local_context);
2022-05-05 09:43:23 +00:00
if (!blocksHaveEqualStructure(builder.getHeader(), output->getHeader()))
2022-05-05 06:27:12 +00:00
{
2022-05-05 09:43:23 +00:00
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
builder.getHeader().getColumnsWithTypeAndName(),
output->getHeader().getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(
convert_actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & header) { return std::make_shared<ExpressionTransform>(header, convert_actions); });
}
2022-05-05 06:27:12 +00:00
2021-11-19 13:09:12 +00:00
builder.addChain(Chain(std::move(output)));
builder.setSinks([&](const Block & cur_header, Pipe::StreamType)
{
return std::make_shared<EmptySink>(cur_header);
});
auto executor = builder.execute();
executor->execute(builder.getNumThreads());
2020-01-14 03:07:31 +00:00
}
void StorageWindowView::startup()
{
2022-05-18 14:34:40 +00:00
if (is_time_column_func_now)
inner_window_id_column_name = window_id_name;
else
{
Aliases aliases;
QueryAliasesVisitor(aliases).visit(mergeable_query);
auto inner_query_normalized = mergeable_query->clone();
QueryNormalizer::Data normalizer_data(aliases, {}, false, getContext()->getSettingsRef(), false);
QueryNormalizer(normalizer_data).visit(inner_query_normalized);
auto inner_select_query = std::static_pointer_cast<ASTSelectQuery>(inner_query_normalized);
auto t_sample_block
= InterpreterSelectQuery(inner_select_query, getContext(), SelectQueryOptions(QueryProcessingStage::WithMergeableState))
.getSampleBlock();
for (const auto & column : t_sample_block.getColumnsWithTypeAndName())
{
if (startsWith(column.name, "windowID"))
{
inner_window_id_column_name = column.name;
break;
}
}
}
inner_window_column_name = std::regex_replace(inner_window_id_column_name, std::regex("windowID"), is_tumble ? "tumble" : "hop");
2020-01-14 03:07:31 +00:00
// Start the working thread
2020-03-24 02:46:56 +00:00
clean_cache_task->activateAndSchedule();
fire_task->activateAndSchedule();
2020-01-14 03:07:31 +00:00
}
void StorageWindowView::shutdown()
{
2021-11-22 15:09:45 +00:00
shutdown_called = true;
2021-11-20 15:34:45 +00:00
2022-05-15 17:14:53 +00:00
fire_condition.notify_all();
fire_signal_condition.notify_all();
2021-11-20 15:34:45 +00:00
2020-03-24 02:46:56 +00:00
clean_cache_task->deactivate();
fire_task->deactivate();
2021-11-20 15:34:45 +00:00
auto table_id = getStorageID();
DatabaseCatalog::instance().removeDependency(select_table_id, table_id);
2020-01-14 03:07:31 +00:00
}
2021-11-20 15:34:45 +00:00
void StorageWindowView::checkTableCanBeDropped() const
{
auto table_id = getStorageID();
Dependencies dependencies = DatabaseCatalog::instance().getDependencies(table_id);
if (!dependencies.empty())
{
StorageID dependent_table_id = dependencies.front();
throw Exception("Table has dependency " + dependent_table_id.getNameForLogs(), ErrorCodes::TABLE_WAS_NOT_DROPPED);
}
}
void StorageWindowView::drop()
{
/// Must be guaranteed at this point for database engine Atomic that has_inner_table == false,
/// because otherwise will be a deadlock.
dropInnerTableIfAny(true, getContext());
}
void StorageWindowView::dropInnerTableIfAny(bool no_delay, ContextPtr local_context)
{
if (!std::exchange(has_inner_table, false))
return;
try
{
InterpreterDropQuery::executeDropQuery(
ASTDropQuery::Kind::Drop, getContext(), local_context, inner_table_id, no_delay);
2022-05-14 16:46:49 +00:00
if (inner_target_table)
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, no_delay);
2021-11-20 15:34:45 +00:00
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
2020-03-03 04:42:12 +00:00
Block & StorageWindowView::getHeader() const
{
std::lock_guard lock(sample_block_lock);
if (!sample_block)
{
sample_block = InterpreterSelectQuery(select_query->clone(), getContext(), SelectQueryOptions(QueryProcessingStage::Complete))
.getSampleBlock();
2021-12-12 07:45:55 +00:00
/// convert all columns to full columns
/// in case some of them are constant
2020-03-03 04:42:12 +00:00
for (size_t i = 0; i < sample_block.columns(); ++i)
2021-12-12 07:45:55 +00:00
{
2020-03-03 04:42:12 +00:00
sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst();
2021-12-12 07:45:55 +00:00
}
2020-03-03 04:42:12 +00:00
}
return sample_block;
}
StoragePtr StorageWindowView::getParentStorage() const
{
return DatabaseCatalog::instance().getTable(select_table_id, getContext());
2020-03-03 04:42:12 +00:00
}
2021-11-20 15:34:45 +00:00
StoragePtr StorageWindowView::getInnerStorage() const
2020-03-03 04:42:12 +00:00
{
return DatabaseCatalog::instance().getTable(inner_table_id, getContext());
2020-03-03 04:42:12 +00:00
}
2021-11-20 15:34:45 +00:00
StoragePtr StorageWindowView::getTargetStorage() const
2020-03-03 04:42:12 +00:00
{
return DatabaseCatalog::instance().getTable(target_table_id, getContext());
2020-03-03 04:42:12 +00:00
}
2020-01-14 03:07:31 +00:00
void registerStorageWindowView(StorageFactory & factory)
{
2020-01-14 09:53:52 +00:00
factory.registerStorage("WindowView", [](const StorageFactory::Arguments & args)
{
2021-05-28 07:36:19 +00:00
if (!args.attach && !args.getLocalContext()->getSettingsRef().allow_experimental_window_view)
2020-01-14 03:07:31 +00:00
throw Exception(
"Experimental WINDOW VIEW feature is not enabled (the setting 'allow_experimental_window_view')",
ErrorCodes::SUPPORT_IS_DISABLED);
2020-01-24 02:45:45 +00:00
return std::make_shared<StorageWindowView>(args.table_id, args.getLocalContext(), args.query, args.columns, args.attach);
2020-01-14 03:07:31 +00:00
});
}
2020-02-12 17:39:57 +00:00
2020-01-14 16:24:26 +00:00
}