2020-06-08 06:27:30 +00:00
|
|
|
#include <numeric>
|
|
|
|
#include <regex>
|
2020-03-01 18:08:52 +00:00
|
|
|
#include <DataStreams/ExpressionBlockInputStream.h>
|
2020-01-14 03:07:31 +00:00
|
|
|
#include <DataStreams/IBlockOutputStream.h>
|
|
|
|
#include <DataStreams/MaterializingBlockInputStream.h>
|
|
|
|
#include <DataStreams/SquashingBlockInputStream.h>
|
|
|
|
#include <DataStreams/copyData.h>
|
|
|
|
#include <DataTypes/DataTypeDateTime.h>
|
|
|
|
#include <DataTypes/DataTypesNumber.h>
|
2020-02-12 17:39:57 +00:00
|
|
|
#include <Functions/FunctionFactory.h>
|
2020-01-14 03:07:31 +00:00
|
|
|
#include <Functions/FunctionsWindow.h>
|
|
|
|
#include <Interpreters/AddDefaultDatabaseVisitor.h>
|
2020-06-09 08:48:04 +00:00
|
|
|
#include <Interpreters/Context.h>
|
2020-01-14 03:07:31 +00:00
|
|
|
#include <Interpreters/InDepthNodeVisitor.h>
|
2020-02-12 17:39:57 +00:00
|
|
|
#include <Interpreters/InterpreterAlterQuery.h>
|
2020-01-14 03:07:31 +00:00
|
|
|
#include <Interpreters/InterpreterDropQuery.h>
|
2020-03-27 16:44:09 +00:00
|
|
|
#include <Interpreters/QueryAliasesVisitor.h>
|
2020-01-14 03:07:31 +00:00
|
|
|
#include <Interpreters/getTableExpressions.h>
|
2020-02-12 17:39:57 +00:00
|
|
|
#include <Parsers/ASTAlterQuery.h>
|
|
|
|
#include <Parsers/ASTAsterisk.h>
|
2020-01-14 03:07:31 +00:00
|
|
|
#include <Parsers/ASTCreateQuery.h>
|
|
|
|
#include <Parsers/ASTDropQuery.h>
|
|
|
|
#include <Parsers/ASTIdentifier.h>
|
|
|
|
#include <Parsers/ASTLiteral.h>
|
|
|
|
#include <Parsers/ASTSelectQuery.h>
|
2020-03-22 15:03:16 +00:00
|
|
|
#include <Parsers/ASTSetQuery.h>
|
2020-01-14 03:07:31 +00:00
|
|
|
#include <Parsers/ASTSubquery.h>
|
|
|
|
#include <Parsers/ASTTablesInSelectQuery.h>
|
|
|
|
#include <Parsers/ASTWatchQuery.h>
|
|
|
|
#include <Parsers/formatAST.h>
|
2020-03-22 15:03:16 +00:00
|
|
|
#include <Parsers/queryToString.h>
|
2020-02-22 17:06:10 +00:00
|
|
|
#include <Processors/Sources/SourceFromInputStream.h>
|
|
|
|
#include <Processors/Sources/SourceFromSingleChunk.h>
|
|
|
|
#include <Processors/Transforms/AddingConstColumnTransform.h>
|
|
|
|
#include <Processors/Transforms/FilterTransform.h>
|
2020-01-14 03:07:31 +00:00
|
|
|
#include <Storages/StorageFactory.h>
|
2020-02-14 08:07:03 +00:00
|
|
|
#include <boost/lexical_cast.hpp>
|
2020-01-14 03:07:31 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
|
|
|
|
2020-06-08 06:27:30 +00:00
|
|
|
#include <Storages/WindowView/ReplaceWindowColumnBlockInputStream.h>
|
2020-01-14 03:07:31 +00:00
|
|
|
#include <Storages/WindowView/StorageWindowView.h>
|
2020-02-21 12:35:26 +00:00
|
|
|
#include <Storages/WindowView/WatermarkBlockInputStream.h>
|
2020-01-14 03:07:31 +00:00
|
|
|
#include <Storages/WindowView/WindowViewBlockInputStream.h>
|
|
|
|
#include <Storages/WindowView/WindowViewProxyStorage.h>
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2020-03-01 18:08:52 +00:00
|
|
|
extern const int ARGUMENT_OUT_OF_BOUND;
|
2020-02-28 07:06:17 +00:00
|
|
|
extern const int CANNOT_PARSE_TEXT;
|
|
|
|
extern const int ILLEGAL_COLUMN;
|
2020-01-14 03:07:31 +00:00
|
|
|
extern const int INCORRECT_QUERY;
|
2020-02-28 07:06:17 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2020-01-14 03:07:31 +00:00
|
|
|
extern const int QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW;
|
|
|
|
extern const int SUPPORT_IS_DISABLED;
|
2020-02-28 07:06:17 +00:00
|
|
|
extern const int TABLE_WAS_NOT_DROPPED;
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
namespace
|
|
|
|
{
|
|
|
|
const auto RESCHEDULE_MS = 500;
|
|
|
|
|
2020-06-08 06:27:30 +00:00
|
|
|
struct MergeableQueryVisitorData
|
2020-01-14 03:07:31 +00:00
|
|
|
{
|
2020-03-22 15:03:16 +00:00
|
|
|
using TypeToVisit = ASTFunction;
|
2020-01-14 03:07:31 +00:00
|
|
|
|
2020-03-22 15:03:16 +00:00
|
|
|
ASTPtr window_function;
|
2020-06-08 06:27:30 +00:00
|
|
|
String window_id_name;
|
|
|
|
String window_id_alias;
|
|
|
|
String serialized_window_function;
|
2020-03-22 15:03:16 +00:00
|
|
|
String timestamp_column_name;
|
|
|
|
bool is_tumble = false;
|
|
|
|
bool is_hop = false;
|
2020-01-14 03:07:31 +00:00
|
|
|
|
2020-03-22 15:03:16 +00:00
|
|
|
void visit(const ASTFunction & node, ASTPtr & node_ptr)
|
2020-01-14 03:07:31 +00:00
|
|
|
{
|
2020-06-19 09:35:18 +00:00
|
|
|
if (node.name == "TUMBLE" || node.name == "HOP")
|
2020-01-14 03:07:31 +00:00
|
|
|
{
|
2020-06-19 09:35:18 +00:00
|
|
|
is_tumble = node.name == "TUMBLE";
|
|
|
|
is_hop = node.name == "HOP";
|
2020-03-22 15:03:16 +00:00
|
|
|
if (!window_function)
|
2020-01-14 03:07:31 +00:00
|
|
|
{
|
2020-06-19 09:35:18 +00:00
|
|
|
std::static_pointer_cast<ASTFunction>(node_ptr)->name = "WINDOW_ID";
|
2020-06-08 06:27:30 +00:00
|
|
|
window_id_name = node.getColumnName();
|
|
|
|
window_id_alias = node.alias;
|
2020-03-22 15:03:16 +00:00
|
|
|
window_function = node.clone();
|
2020-06-08 06:27:30 +00:00
|
|
|
window_function->setAlias("");
|
|
|
|
serialized_window_function = serializeAST(*window_function);
|
2020-03-22 15:03:16 +00:00
|
|
|
timestamp_column_name = node.arguments->children[0]->getColumnName();
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
2020-06-08 06:27:30 +00:00
|
|
|
else
|
|
|
|
{
|
|
|
|
auto temp_node = node.clone();
|
|
|
|
temp_node->setAlias("");
|
|
|
|
if (serializeAST(*temp_node) != serialized_window_function)
|
|
|
|
throw Exception("WINDOW VIEW only support ONE WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
|
|
|
|
}
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2020-02-14 08:55:56 +00:00
|
|
|
|
2020-06-19 09:35:18 +00:00
|
|
|
struct ReplaceWindowIdVisitorData
|
2020-06-08 06:27:30 +00:00
|
|
|
{
|
|
|
|
using TypeToVisit = ASTFunction;
|
2020-06-17 15:06:19 +00:00
|
|
|
bool is_tumble;
|
2020-06-08 06:27:30 +00:00
|
|
|
|
2020-06-19 16:29:44 +00:00
|
|
|
void visit(const ASTFunction & node, ASTPtr & node_ptr) const
|
2020-06-08 06:27:30 +00:00
|
|
|
{
|
2020-06-17 15:06:19 +00:00
|
|
|
if (node.name == "WINDOW_ID")
|
|
|
|
{
|
|
|
|
if (is_tumble)
|
|
|
|
std::static_pointer_cast<ASTFunction>(node_ptr)->name = "TUMBLE";
|
|
|
|
else
|
|
|
|
std::static_pointer_cast<ASTFunction>(node_ptr)->name = "HOP";
|
|
|
|
}
|
2020-06-08 06:27:30 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
struct ReplaceFunctionNowVisitorData
|
2020-03-22 15:03:16 +00:00
|
|
|
{
|
|
|
|
using TypeToVisit = ASTFunction;
|
|
|
|
|
|
|
|
bool is_time_column_func_now = false;
|
2020-06-08 06:27:30 +00:00
|
|
|
String window_id_name;
|
2020-03-22 15:03:16 +00:00
|
|
|
|
|
|
|
void visit(ASTFunction & node, ASTPtr & node_ptr)
|
|
|
|
{
|
2020-06-19 09:35:18 +00:00
|
|
|
if (node.name == "WINDOW_ID")
|
2020-03-22 15:03:16 +00:00
|
|
|
{
|
|
|
|
if (const auto * t = node.arguments->children[0]->as<ASTFunction>(); t && t->name == "now")
|
|
|
|
{
|
|
|
|
is_time_column_func_now = true;
|
|
|
|
node_ptr->children[0]->children[0] = std::make_shared<ASTIdentifier>("____timestamp");
|
2020-06-08 06:27:30 +00:00
|
|
|
window_id_name = node.getColumnName();
|
2020-03-22 15:03:16 +00:00
|
|
|
}
|
|
|
|
}
|
2020-06-08 06:27:30 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2020-06-19 09:35:18 +00:00
|
|
|
struct ReplaceFunctionWindowVisitorData
|
2020-06-08 06:27:30 +00:00
|
|
|
{
|
|
|
|
using TypeToVisit = ASTFunction;
|
|
|
|
|
2020-06-09 15:57:47 +00:00
|
|
|
static void visit(ASTFunction & node, ASTPtr & node_ptr)
|
2020-06-08 06:27:30 +00:00
|
|
|
{
|
2020-06-19 09:35:18 +00:00
|
|
|
if (node.name == "HOP" || node.name == "TUMBLE")
|
2020-06-17 15:06:19 +00:00
|
|
|
std::static_pointer_cast<ASTFunction>(node_ptr)->name = "WINDOW_ID";
|
2020-03-22 15:03:16 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
class ReplaceFunctionWindowMatcher
|
2020-02-17 05:06:03 +00:00
|
|
|
{
|
|
|
|
public:
|
2020-03-22 15:03:16 +00:00
|
|
|
using Visitor = InDepthNodeVisitor<ReplaceFunctionWindowMatcher, true>;
|
2020-02-17 05:06:03 +00:00
|
|
|
|
|
|
|
struct Data
|
|
|
|
{
|
2020-06-08 06:27:30 +00:00
|
|
|
String window_id_name;
|
|
|
|
String window_id_alias;
|
2020-03-27 16:44:09 +00:00
|
|
|
Aliases * aliases;
|
2020-02-17 05:06:03 +00:00
|
|
|
};
|
|
|
|
|
2020-03-22 15:03:16 +00:00
|
|
|
static bool needChildVisit(ASTPtr &, const ASTPtr &) { return true; }
|
2020-02-17 05:06:03 +00:00
|
|
|
|
|
|
|
static void visit(ASTPtr & ast, Data & data)
|
|
|
|
{
|
|
|
|
if (const auto * t = ast->as<ASTFunction>())
|
|
|
|
visit(*t, ast, data);
|
2020-03-22 15:03:16 +00:00
|
|
|
if (const auto * t = ast->as<ASTIdentifier>())
|
|
|
|
visit(*t, ast, data);
|
2020-02-17 05:06:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
2020-03-27 16:44:09 +00:00
|
|
|
static void visit(const ASTFunction & node, ASTPtr & node_ptr, Data &)
|
2020-02-17 05:06:03 +00:00
|
|
|
{
|
2020-03-27 16:44:09 +00:00
|
|
|
if (node.name == "tuple")
|
|
|
|
return;
|
|
|
|
else
|
|
|
|
node_ptr = std::make_shared<ASTIdentifier>(node.getColumnName());
|
2020-02-17 05:06:03 +00:00
|
|
|
}
|
2020-03-22 15:03:16 +00:00
|
|
|
|
|
|
|
static void visit(const ASTIdentifier & node, ASTPtr & node_ptr, Data & data)
|
|
|
|
{
|
2020-06-08 06:27:30 +00:00
|
|
|
if (node.getColumnName() == data.window_id_alias)
|
|
|
|
dynamic_cast<ASTIdentifier *>(node_ptr.get())->name = data.window_id_name;
|
2020-03-27 16:44:09 +00:00
|
|
|
else if (auto it = data.aliases->find(node.getColumnName()); it != data.aliases->end())
|
|
|
|
dynamic_cast<ASTIdentifier *>(node_ptr.get())->name = it->second->getColumnName();
|
2020-03-22 15:03:16 +00:00
|
|
|
}
|
2020-02-17 05:06:03 +00:00
|
|
|
};
|
|
|
|
|
2020-03-23 03:21:01 +00:00
|
|
|
IntervalKind strToIntervalKind(const String& interval_str)
|
2020-02-14 08:07:03 +00:00
|
|
|
{
|
|
|
|
if (interval_str == "Second")
|
|
|
|
return IntervalKind::Second;
|
|
|
|
else if (interval_str == "Minute")
|
|
|
|
return IntervalKind::Minute;
|
|
|
|
else if (interval_str == "Hour")
|
|
|
|
return IntervalKind::Hour;
|
|
|
|
else if (interval_str == "Day")
|
|
|
|
return IntervalKind::Day;
|
|
|
|
else if (interval_str == "Week")
|
|
|
|
return IntervalKind::Week;
|
|
|
|
else if (interval_str == "Month")
|
|
|
|
return IntervalKind::Month;
|
|
|
|
else if (interval_str == "Quarter")
|
|
|
|
return IntervalKind::Quarter;
|
|
|
|
else if (interval_str == "Year")
|
|
|
|
return IntervalKind::Year;
|
|
|
|
__builtin_unreachable();
|
|
|
|
}
|
2020-02-12 17:39:57 +00:00
|
|
|
|
2020-03-23 03:21:01 +00:00
|
|
|
String generateInnerTableName(const String & table_name) { return ".inner." + table_name; }
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
static void extractDependentTable(ASTSelectQuery & query, String & select_database_name, String & select_table_name)
|
|
|
|
{
|
|
|
|
auto db_and_table = getDatabaseAndTable(query, 0);
|
|
|
|
ASTPtr subquery = extractTableExpression(query, 0);
|
|
|
|
|
|
|
|
if (!db_and_table && !subquery)
|
|
|
|
return;
|
|
|
|
|
|
|
|
if (db_and_table)
|
|
|
|
{
|
|
|
|
select_table_name = db_and_table->table;
|
|
|
|
|
|
|
|
if (db_and_table->database.empty())
|
|
|
|
{
|
|
|
|
db_and_table->database = select_database_name;
|
|
|
|
AddDefaultDatabaseVisitor visitor(select_database_name);
|
|
|
|
visitor.visit(query);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
select_database_name = db_and_table->database;
|
|
|
|
}
|
|
|
|
else if (auto * ast_select = subquery->as<ASTSelectWithUnionQuery>())
|
|
|
|
{
|
|
|
|
if (ast_select->list_of_selects->children.size() != 1)
|
|
|
|
throw Exception("UNION is not supported for WINDOW VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
|
|
|
|
|
|
|
|
auto & inner_query = ast_select->list_of_selects->children.at(0);
|
|
|
|
|
|
|
|
extractDependentTable(inner_query->as<ASTSelectQuery &>(), select_database_name, select_table_name);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
throw Exception(
|
|
|
|
"Logical error while creating StorageWindowView."
|
|
|
|
" Could not retrieve table name from select query.",
|
|
|
|
DB::ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
|
2020-07-13 14:31:54 +00:00
|
|
|
UInt32 StorageWindowView::getCleanupBound()
|
|
|
|
{
|
|
|
|
UInt32 w_bound;
|
|
|
|
{
|
|
|
|
std::lock_guard lock(fire_signal_mutex);
|
|
|
|
w_bound = max_fired_watermark;
|
|
|
|
if (w_bound == 0)
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
if (!is_proctime)
|
|
|
|
{
|
|
|
|
if (max_watermark == 0)
|
|
|
|
return 0;
|
|
|
|
if (allowed_lateness)
|
|
|
|
{
|
|
|
|
UInt32 lateness_bound = addTime(max_timestamp, lateness_kind, -1 * lateness_num_units);
|
|
|
|
lateness_bound = getWindowLowerBound(lateness_bound);
|
|
|
|
if (lateness_bound < w_bound)
|
|
|
|
w_bound = lateness_bound;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return w_bound;
|
|
|
|
}
|
|
|
|
|
|
|
|
ASTPtr StorageWindowView::generateCleanupQuery()
|
2020-03-25 17:56:49 +00:00
|
|
|
{
|
2020-06-08 06:27:30 +00:00
|
|
|
ASTPtr function_equal;
|
2020-07-13 14:31:54 +00:00
|
|
|
function_equal = makeASTFunction("less", std::make_shared<ASTIdentifier>(window_id_name), std::make_shared<ASTLiteral>(getCleanupBound()));
|
2020-03-25 17:56:49 +00:00
|
|
|
|
|
|
|
auto alter_command = std::make_shared<ASTAlterCommand>();
|
|
|
|
alter_command->type = ASTAlterCommand::DELETE;
|
|
|
|
alter_command->predicate = function_equal;
|
|
|
|
alter_command->children.push_back(alter_command->predicate);
|
|
|
|
|
|
|
|
auto alter_command_list = std::make_shared<ASTAlterCommandList>();
|
|
|
|
alter_command_list->add(alter_command);
|
|
|
|
|
|
|
|
auto alter_query = std::make_shared<ASTAlterQuery>();
|
|
|
|
alter_query->database = inner_table_id.database_name;
|
|
|
|
alter_query->table = inner_table_id.table_name;
|
|
|
|
alter_query->set(alter_query->command_list, alter_command_list);
|
|
|
|
return alter_query;
|
|
|
|
}
|
|
|
|
|
2020-01-14 03:07:31 +00:00
|
|
|
void StorageWindowView::checkTableCanBeDropped() const
|
|
|
|
{
|
2020-01-24 02:45:45 +00:00
|
|
|
auto table_id = getStorageID();
|
2020-03-22 16:36:11 +00:00
|
|
|
Dependencies dependencies = DatabaseCatalog::instance().getDependencies(table_id);
|
2020-01-14 03:07:31 +00:00
|
|
|
if (!dependencies.empty())
|
|
|
|
{
|
2020-01-24 02:45:45 +00:00
|
|
|
StorageID dependent_table_id = dependencies.front();
|
|
|
|
throw Exception("Table has dependency " + dependent_table_id.getNameForLogs(), ErrorCodes::TABLE_WAS_NOT_DROPPED);
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-31 14:02:53 +00:00
|
|
|
static void executeDropQuery(ASTDropQuery::Kind kind, Context & context, const StorageID & target_table_id)
|
2020-02-12 17:39:57 +00:00
|
|
|
{
|
2020-06-09 08:48:04 +00:00
|
|
|
if (DatabaseCatalog::instance().tryGetTable(target_table_id, context))
|
2020-02-12 17:39:57 +00:00
|
|
|
{
|
|
|
|
auto drop_query = std::make_shared<ASTDropQuery>();
|
|
|
|
drop_query->database = target_table_id.database_name;
|
|
|
|
drop_query->table = target_table_id.table_name;
|
|
|
|
drop_query->kind = kind;
|
|
|
|
ASTPtr ast_drop_query = drop_query;
|
2020-03-31 14:02:53 +00:00
|
|
|
InterpreterDropQuery drop_interpreter(ast_drop_query, context);
|
2020-02-12 17:39:57 +00:00
|
|
|
drop_interpreter.execute();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-09 08:48:04 +00:00
|
|
|
void StorageWindowView::drop()
|
2020-01-14 03:07:31 +00:00
|
|
|
{
|
2020-01-24 02:45:45 +00:00
|
|
|
auto table_id = getStorageID();
|
2020-03-22 16:36:11 +00:00
|
|
|
DatabaseCatalog::instance().removeDependency(select_table_id, table_id);
|
2020-03-27 16:44:09 +00:00
|
|
|
executeDropQuery(ASTDropQuery::Kind::Drop, global_context, inner_table_id);
|
2020-02-12 17:39:57 +00:00
|
|
|
|
2020-01-14 03:07:31 +00:00
|
|
|
std::lock_guard lock(mutex);
|
|
|
|
is_dropped = true;
|
2020-02-23 15:32:34 +00:00
|
|
|
fire_condition.notify_all();
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
|
|
|
|
2020-02-17 08:18:27 +00:00
|
|
|
void StorageWindowView::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
|
|
|
|
{
|
2020-03-27 16:44:09 +00:00
|
|
|
executeDropQuery(ASTDropQuery::Kind::Truncate, global_context, inner_table_id);
|
2020-02-17 08:18:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool StorageWindowView::optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context)
|
|
|
|
{
|
|
|
|
return getInnerStorage()->optimize(query, partition, final, deduplicate, context);
|
|
|
|
}
|
|
|
|
|
2020-03-03 04:42:12 +00:00
|
|
|
Pipes StorageWindowView::blocksToPipes(BlocksList & blocks, Block & sample_block)
|
2020-02-22 17:06:10 +00:00
|
|
|
{
|
|
|
|
Pipes pipes;
|
2020-03-24 02:46:56 +00:00
|
|
|
for (auto & block : blocks)
|
|
|
|
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(sample_block, Chunk(block.getColumns(), block.rows())));
|
2020-02-22 17:06:10 +00:00
|
|
|
return pipes;
|
|
|
|
}
|
|
|
|
|
2020-07-13 14:31:54 +00:00
|
|
|
inline void StorageWindowView::cleanup()
|
2020-02-12 17:39:57 +00:00
|
|
|
{
|
2020-07-13 14:31:54 +00:00
|
|
|
InterpreterAlterQuery alt_query(generateCleanupQuery(), *wv_context);
|
2020-03-27 16:44:09 +00:00
|
|
|
alt_query.execute();
|
2020-02-20 17:30:58 +00:00
|
|
|
|
2020-03-03 04:42:12 +00:00
|
|
|
std::lock_guard lock(fire_signal_mutex);
|
2020-03-01 18:08:52 +00:00
|
|
|
watch_streams.remove_if([](std::weak_ptr<WindowViewBlockInputStream> & ptr) { return ptr.expired(); });
|
2020-02-12 17:39:57 +00:00
|
|
|
}
|
|
|
|
|
2020-03-01 18:08:52 +00:00
|
|
|
inline void StorageWindowView::fire(UInt32 watermark)
|
2020-01-14 03:07:31 +00:00
|
|
|
{
|
2020-02-27 17:18:40 +00:00
|
|
|
if (target_table_id.empty() && watch_streams.empty())
|
2020-02-23 15:32:34 +00:00
|
|
|
return;
|
|
|
|
|
2020-03-03 04:42:12 +00:00
|
|
|
BlockInputStreamPtr in_stream;
|
|
|
|
{
|
|
|
|
std::lock_guard lock(mutex);
|
|
|
|
in_stream = getNewBlocksInputStreamPtr(watermark);
|
|
|
|
}
|
|
|
|
|
2020-02-23 15:32:34 +00:00
|
|
|
if (target_table_id.empty())
|
|
|
|
{
|
2020-03-01 18:08:52 +00:00
|
|
|
in_stream->readPrefix();
|
2020-03-24 02:46:56 +00:00
|
|
|
while (auto block = in_stream->read())
|
2020-02-23 15:32:34 +00:00
|
|
|
{
|
|
|
|
for (auto & watch_stream : watch_streams)
|
|
|
|
{
|
2020-03-24 02:46:56 +00:00
|
|
|
if (auto watch_stream_ptr = watch_stream.lock())
|
|
|
|
watch_stream_ptr->addBlock(block);
|
2020-02-23 15:32:34 +00:00
|
|
|
}
|
|
|
|
}
|
2020-03-01 18:08:52 +00:00
|
|
|
in_stream->readSuffix();
|
2020-02-23 15:32:34 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2020-06-08 06:27:30 +00:00
|
|
|
StoragePtr target_table = getTargetStorage();
|
2020-06-09 08:48:04 +00:00
|
|
|
auto lock = target_table->lockStructureForShare(true, wv_context->getCurrentQueryId(), wv_context->getSettingsRef().lock_acquire_timeout);
|
|
|
|
auto out_stream = target_table->write(getFinalQuery(), *wv_context);
|
2020-06-08 06:27:30 +00:00
|
|
|
in_stream->readPrefix();
|
|
|
|
out_stream->writePrefix();
|
|
|
|
while (auto block = in_stream->read())
|
2020-02-23 15:32:34 +00:00
|
|
|
{
|
2020-06-08 06:27:30 +00:00
|
|
|
for (auto & watch_stream : watch_streams)
|
2020-02-23 15:32:34 +00:00
|
|
|
{
|
2020-06-08 06:27:30 +00:00
|
|
|
if (const auto & watch_stream_ptr = watch_stream.lock())
|
|
|
|
watch_stream_ptr->addBlock(block);
|
2020-02-23 15:32:34 +00:00
|
|
|
}
|
2020-06-08 06:27:30 +00:00
|
|
|
out_stream->write(std::move(block));
|
2020-02-23 15:32:34 +00:00
|
|
|
}
|
2020-06-08 06:27:30 +00:00
|
|
|
in_stream->readSuffix();
|
|
|
|
out_stream->writeSuffix();
|
2020-02-23 15:32:34 +00:00
|
|
|
}
|
|
|
|
fire_condition.notify_all();
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
|
|
|
|
2020-03-27 16:44:09 +00:00
|
|
|
std::shared_ptr<ASTCreateQuery> StorageWindowView::generateInnerTableCreateQuery(ASTStorage * storage, const String & database_name, const String & table_name)
|
2020-02-12 17:39:57 +00:00
|
|
|
{
|
|
|
|
/// We will create a query to create an internal table.
|
2020-03-27 16:44:09 +00:00
|
|
|
auto inner_create_query = std::make_shared<ASTCreateQuery>();
|
|
|
|
inner_create_query->database = database_name;
|
|
|
|
inner_create_query->table = table_name;
|
2020-02-12 17:39:57 +00:00
|
|
|
|
2020-03-31 05:48:01 +00:00
|
|
|
auto inner_select_query = std::static_pointer_cast<ASTSelectQuery>(getInnerQuery());
|
2020-03-27 16:44:09 +00:00
|
|
|
|
|
|
|
Aliases aliases;
|
|
|
|
QueryAliasesVisitor::Data query_aliases_data{aliases};
|
|
|
|
QueryAliasesVisitor(query_aliases_data).visit(inner_select_query);
|
2020-02-12 17:39:57 +00:00
|
|
|
|
2020-03-24 02:46:56 +00:00
|
|
|
auto t_sample_block
|
2020-06-09 08:48:04 +00:00
|
|
|
= InterpreterSelectQuery(inner_select_query, *wv_context, getParentStorage(), SelectQueryOptions(QueryProcessingStage::WithMergeableState))
|
2020-02-12 17:39:57 +00:00
|
|
|
.getSampleBlock();
|
|
|
|
|
|
|
|
auto columns_list = std::make_shared<ASTExpressionList>();
|
2020-02-17 05:06:03 +00:00
|
|
|
|
2020-06-08 06:27:30 +00:00
|
|
|
if (is_time_column_func_now)
|
2020-02-17 05:06:03 +00:00
|
|
|
{
|
|
|
|
auto column_window = std::make_shared<ASTColumnDeclaration>();
|
2020-06-08 06:27:30 +00:00
|
|
|
column_window->name = window_id_name;
|
2020-06-19 09:35:18 +00:00
|
|
|
column_window->type = std::make_shared<ASTIdentifier>("UInt32");
|
2020-02-17 05:06:03 +00:00
|
|
|
columns_list->children.push_back(column_window);
|
|
|
|
}
|
|
|
|
|
2020-06-17 15:06:19 +00:00
|
|
|
for (const auto & column : t_sample_block.getColumnsWithTypeAndName())
|
2020-02-12 17:39:57 +00:00
|
|
|
{
|
|
|
|
ParserIdentifierWithOptionalParameters parser;
|
2020-03-24 02:46:56 +00:00
|
|
|
String sql = column.type->getName();
|
2020-06-09 08:48:04 +00:00
|
|
|
ASTPtr ast = parseQuery(parser, sql.data(), sql.data() + sql.size(), "data type", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
2020-02-12 17:39:57 +00:00
|
|
|
auto column_dec = std::make_shared<ASTColumnDeclaration>();
|
2020-03-24 02:46:56 +00:00
|
|
|
column_dec->name = column.name;
|
2020-02-12 17:39:57 +00:00
|
|
|
column_dec->type = ast;
|
|
|
|
columns_list->children.push_back(column_dec);
|
|
|
|
}
|
|
|
|
|
2020-03-22 15:03:16 +00:00
|
|
|
ReplaceFunctionWindowMatcher::Data query_data;
|
2020-06-08 06:27:30 +00:00
|
|
|
query_data.window_id_name = window_id_name;
|
|
|
|
query_data.window_id_alias = window_id_alias;
|
2020-03-27 16:44:09 +00:00
|
|
|
query_data.aliases = &aliases;
|
2020-03-22 15:03:16 +00:00
|
|
|
ReplaceFunctionWindowMatcher::Visitor visitor(query_data);
|
|
|
|
|
2020-06-08 06:27:30 +00:00
|
|
|
ReplaceFunctionNowVisitorData time_now_data;
|
2020-06-19 09:35:18 +00:00
|
|
|
ReplaceFunctionWindowVisitorData func_hop_data;
|
2020-06-08 06:27:30 +00:00
|
|
|
InDepthNodeVisitor<OneTypeMatcher<ReplaceFunctionNowVisitorData>, true> time_now_visitor(time_now_data);
|
2020-06-19 09:35:18 +00:00
|
|
|
InDepthNodeVisitor<OneTypeMatcher<ReplaceFunctionWindowVisitorData>, true> func_window_visitor(func_hop_data);
|
2020-03-22 15:03:16 +00:00
|
|
|
|
2020-03-27 16:44:09 +00:00
|
|
|
auto new_storage = std::make_shared<ASTStorage>();
|
|
|
|
if (storage == nullptr)
|
2020-03-22 15:03:16 +00:00
|
|
|
{
|
2020-03-27 16:44:09 +00:00
|
|
|
new_storage->set(new_storage->engine, makeASTFunction("AggregatingMergeTree"));
|
|
|
|
|
|
|
|
for (auto & child : inner_select_query->groupBy()->children)
|
|
|
|
if (auto * ast_with_alias = dynamic_cast<ASTWithAlias *>(child.get()))
|
|
|
|
ast_with_alias->setAlias("");
|
|
|
|
|
|
|
|
auto order_by = std::make_shared<ASTFunction>();
|
|
|
|
order_by->name = "tuple";
|
|
|
|
order_by->arguments = inner_select_query->groupBy();
|
|
|
|
order_by->children.push_back(order_by->arguments);
|
|
|
|
|
|
|
|
ASTPtr order_by_ptr = order_by;
|
2020-03-22 15:03:16 +00:00
|
|
|
if (is_time_column_func_now)
|
2020-03-27 16:44:09 +00:00
|
|
|
time_now_visitor.visit(order_by_ptr);
|
|
|
|
visitor.visit(order_by_ptr);
|
|
|
|
|
|
|
|
for (auto & child : order_by->arguments->children)
|
|
|
|
{
|
2020-06-08 06:27:30 +00:00
|
|
|
if (child->getColumnName() == window_id_name)
|
2020-03-27 16:44:09 +00:00
|
|
|
{
|
|
|
|
ASTPtr tmp = child;
|
|
|
|
child = order_by->arguments->children[0];
|
|
|
|
order_by->arguments->children[0] = tmp;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
new_storage->set(new_storage->order_by, order_by_ptr);
|
2020-06-08 06:27:30 +00:00
|
|
|
new_storage->set(new_storage->primary_key, std::make_shared<ASTIdentifier>(window_id_name));
|
2020-03-22 15:03:16 +00:00
|
|
|
}
|
2020-03-27 16:44:09 +00:00
|
|
|
else
|
2020-03-22 15:03:16 +00:00
|
|
|
{
|
2020-03-27 16:44:09 +00:00
|
|
|
if (storage->ttl_table)
|
|
|
|
throw Exception("TTL is not supported for inner table in Window View", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
|
|
|
|
if (!endsWith(storage->engine->name, "MergeTree"))
|
|
|
|
throw Exception(
|
|
|
|
"The ENGINE of WindowView must be MergeTree family of table engines including the engines with replication support",
|
|
|
|
ErrorCodes::INCORRECT_QUERY);
|
|
|
|
|
|
|
|
new_storage->set(new_storage->engine, storage->engine->clone());
|
|
|
|
if (storage->partition_by)
|
|
|
|
{
|
|
|
|
auto partition_by = storage->partition_by->clone();
|
|
|
|
if (is_time_column_func_now)
|
|
|
|
time_now_visitor.visit(partition_by);
|
2020-06-19 09:35:18 +00:00
|
|
|
func_window_visitor.visit(partition_by);
|
2020-03-27 16:44:09 +00:00
|
|
|
visitor.visit(partition_by);
|
|
|
|
new_storage->set(new_storage->partition_by, partition_by);
|
|
|
|
}
|
|
|
|
if (storage->primary_key)
|
|
|
|
{
|
2020-06-09 15:57:47 +00:00
|
|
|
auto tmp_primary_key = storage->primary_key->clone();
|
2020-03-27 16:44:09 +00:00
|
|
|
if (is_time_column_func_now)
|
2020-06-09 15:57:47 +00:00
|
|
|
time_now_visitor.visit(tmp_primary_key);
|
2020-06-19 09:35:18 +00:00
|
|
|
func_window_visitor.visit(tmp_primary_key);
|
2020-06-09 15:57:47 +00:00
|
|
|
visitor.visit(tmp_primary_key);
|
|
|
|
new_storage->set(new_storage->primary_key, tmp_primary_key);
|
2020-03-27 16:44:09 +00:00
|
|
|
}
|
|
|
|
if (storage->order_by)
|
|
|
|
{
|
|
|
|
auto order_by = storage->order_by->clone();
|
|
|
|
if (is_time_column_func_now)
|
|
|
|
time_now_visitor.visit(order_by);
|
2020-06-19 09:35:18 +00:00
|
|
|
func_window_visitor.visit(order_by);
|
2020-03-27 16:44:09 +00:00
|
|
|
visitor.visit(order_by);
|
|
|
|
new_storage->set(new_storage->order_by, order_by);
|
|
|
|
}
|
|
|
|
if (storage->sample_by)
|
|
|
|
{
|
|
|
|
auto sample_by = storage->sample_by->clone();
|
|
|
|
if (is_time_column_func_now)
|
|
|
|
time_now_visitor.visit(sample_by);
|
2020-06-19 09:35:18 +00:00
|
|
|
func_window_visitor.visit(sample_by);
|
2020-03-27 16:44:09 +00:00
|
|
|
visitor.visit(sample_by);
|
|
|
|
new_storage->set(new_storage->sample_by, sample_by);
|
|
|
|
}
|
|
|
|
if (storage->settings)
|
|
|
|
new_storage->set(new_storage->settings, storage->settings->clone());
|
2020-03-22 15:03:16 +00:00
|
|
|
}
|
|
|
|
|
2020-03-27 16:44:09 +00:00
|
|
|
auto new_columns = std::make_shared<ASTColumns>();
|
|
|
|
new_columns->set(new_columns->columns, columns_list);
|
|
|
|
inner_create_query->set(inner_create_query->columns_list, new_columns);
|
|
|
|
inner_create_query->set(inner_create_query->storage, new_storage);
|
2020-02-12 17:39:57 +00:00
|
|
|
|
2020-03-27 16:44:09 +00:00
|
|
|
return inner_create_query;
|
2020-02-12 17:39:57 +00:00
|
|
|
}
|
|
|
|
|
2020-06-08 06:27:30 +00:00
|
|
|
inline UInt32 StorageWindowView::addTime(UInt32 time_sec, IntervalKind::Kind kind, Int64 num_units) const
|
|
|
|
{
|
|
|
|
switch (kind)
|
|
|
|
{
|
|
|
|
#define CASE_WINDOW_KIND(KIND) \
|
|
|
|
case IntervalKind::KIND: { \
|
|
|
|
return AddTime<IntervalKind::KIND>::execute(time_sec, num_units, time_zone); \
|
|
|
|
}
|
|
|
|
CASE_WINDOW_KIND(Second)
|
|
|
|
CASE_WINDOW_KIND(Minute)
|
|
|
|
CASE_WINDOW_KIND(Hour)
|
|
|
|
CASE_WINDOW_KIND(Day)
|
|
|
|
CASE_WINDOW_KIND(Week)
|
|
|
|
CASE_WINDOW_KIND(Month)
|
|
|
|
CASE_WINDOW_KIND(Quarter)
|
|
|
|
CASE_WINDOW_KIND(Year)
|
|
|
|
#undef CASE_WINDOW_KIND
|
|
|
|
}
|
|
|
|
__builtin_unreachable();
|
|
|
|
}
|
|
|
|
|
2020-03-03 04:42:12 +00:00
|
|
|
inline UInt32 StorageWindowView::getWindowLowerBound(UInt32 time_sec)
|
|
|
|
{
|
2020-03-24 02:46:56 +00:00
|
|
|
IntervalKind window_interval_kind;
|
2020-03-03 04:42:12 +00:00
|
|
|
if (is_tumble)
|
2020-03-24 02:46:56 +00:00
|
|
|
window_interval_kind = window_kind;
|
2020-03-03 04:42:12 +00:00
|
|
|
else
|
2020-03-24 02:46:56 +00:00
|
|
|
window_interval_kind = hop_kind;
|
2020-03-03 04:42:12 +00:00
|
|
|
|
2020-03-24 02:46:56 +00:00
|
|
|
switch (window_interval_kind)
|
2020-03-03 04:42:12 +00:00
|
|
|
{
|
|
|
|
#define CASE_WINDOW_KIND(KIND) \
|
|
|
|
case IntervalKind::KIND: \
|
|
|
|
{ \
|
|
|
|
if (is_tumble) \
|
|
|
|
return ToStartOfTransform<IntervalKind::KIND>::execute(time_sec, window_num_units, time_zone); \
|
|
|
|
else \
|
2020-06-08 06:27:30 +00:00
|
|
|
{\
|
|
|
|
UInt32 w_start = ToStartOfTransform<IntervalKind::KIND>::execute(time_sec, hop_num_units, time_zone); \
|
|
|
|
UInt32 w_end = AddTime<IntervalKind::KIND>::execute(w_start, hop_num_units, time_zone);\
|
|
|
|
return AddTime<IntervalKind::KIND>::execute(w_end, -1 * window_num_units, time_zone);\
|
|
|
|
}\
|
2020-03-03 04:42:12 +00:00
|
|
|
}
|
|
|
|
CASE_WINDOW_KIND(Second)
|
|
|
|
CASE_WINDOW_KIND(Minute)
|
|
|
|
CASE_WINDOW_KIND(Hour)
|
|
|
|
CASE_WINDOW_KIND(Day)
|
|
|
|
CASE_WINDOW_KIND(Week)
|
|
|
|
CASE_WINDOW_KIND(Month)
|
|
|
|
CASE_WINDOW_KIND(Quarter)
|
|
|
|
CASE_WINDOW_KIND(Year)
|
|
|
|
#undef CASE_WINDOW_KIND
|
|
|
|
}
|
|
|
|
__builtin_unreachable();
|
|
|
|
}
|
|
|
|
|
|
|
|
inline UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec)
|
2020-01-14 03:07:31 +00:00
|
|
|
{
|
2020-06-08 06:27:30 +00:00
|
|
|
IntervalKind window_interval_kind;
|
|
|
|
if (is_tumble)
|
|
|
|
window_interval_kind = window_kind;
|
|
|
|
else
|
|
|
|
window_interval_kind = hop_kind;
|
|
|
|
|
|
|
|
switch (window_interval_kind)
|
2020-01-14 03:07:31 +00:00
|
|
|
{
|
|
|
|
#define CASE_WINDOW_KIND(KIND) \
|
|
|
|
case IntervalKind::KIND: \
|
|
|
|
{ \
|
2020-06-08 06:27:30 +00:00
|
|
|
if (is_tumble) \
|
|
|
|
{\
|
|
|
|
UInt32 w_start = ToStartOfTransform<IntervalKind::KIND>::execute(time_sec, window_num_units, time_zone); \
|
|
|
|
return AddTime<IntervalKind::KIND>::execute(w_start, window_num_units, time_zone); \
|
|
|
|
}\
|
|
|
|
else \
|
|
|
|
{\
|
|
|
|
UInt32 w_start = ToStartOfTransform<IntervalKind::KIND>::execute(time_sec, hop_num_units, time_zone); \
|
|
|
|
return AddTime<IntervalKind::KIND>::execute(w_start, hop_num_units, time_zone);\
|
|
|
|
}\
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
2020-01-14 16:24:26 +00:00
|
|
|
CASE_WINDOW_KIND(Second)
|
|
|
|
CASE_WINDOW_KIND(Minute)
|
|
|
|
CASE_WINDOW_KIND(Hour)
|
|
|
|
CASE_WINDOW_KIND(Day)
|
|
|
|
CASE_WINDOW_KIND(Week)
|
|
|
|
CASE_WINDOW_KIND(Month)
|
|
|
|
CASE_WINDOW_KIND(Quarter)
|
|
|
|
CASE_WINDOW_KIND(Year)
|
2020-01-14 03:07:31 +00:00
|
|
|
#undef CASE_WINDOW_KIND
|
|
|
|
}
|
|
|
|
__builtin_unreachable();
|
|
|
|
}
|
|
|
|
|
2020-03-03 04:42:12 +00:00
|
|
|
inline void StorageWindowView::addFireSignal(std::set<UInt32> & signals)
|
2020-02-23 15:32:34 +00:00
|
|
|
{
|
2020-03-03 04:42:12 +00:00
|
|
|
std::lock_guard lock(fire_signal_mutex);
|
2020-06-17 15:06:19 +00:00
|
|
|
for (const auto & signal : signals)
|
2020-03-23 03:21:01 +00:00
|
|
|
fire_signal.push_back(signal);
|
2020-03-01 18:08:52 +00:00
|
|
|
fire_signal_condition.notify_all();
|
2020-02-23 15:32:34 +00:00
|
|
|
}
|
|
|
|
|
2020-03-01 18:08:52 +00:00
|
|
|
inline void StorageWindowView::updateMaxTimestamp(UInt32 timestamp)
|
2020-02-14 08:07:03 +00:00
|
|
|
{
|
2020-03-03 04:42:12 +00:00
|
|
|
std::lock_guard lock(fire_signal_mutex);
|
2020-03-01 18:08:52 +00:00
|
|
|
if (timestamp > max_timestamp)
|
|
|
|
max_timestamp = timestamp;
|
2020-02-14 08:07:03 +00:00
|
|
|
}
|
|
|
|
|
2020-03-01 18:08:52 +00:00
|
|
|
inline void StorageWindowView::updateMaxWatermark(UInt32 watermark)
|
2020-02-14 08:07:03 +00:00
|
|
|
{
|
2020-03-03 04:42:12 +00:00
|
|
|
std::lock_guard lock(fire_signal_mutex);
|
2020-03-01 18:08:52 +00:00
|
|
|
if (max_watermark == 0)
|
|
|
|
{
|
2020-06-19 09:35:18 +00:00
|
|
|
max_watermark = getWindowUpperBound(watermark - 1);
|
2020-03-01 18:08:52 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2020-03-03 04:42:12 +00:00
|
|
|
bool updated;
|
2020-03-01 18:08:52 +00:00
|
|
|
if (is_watermark_strictly_ascending)
|
|
|
|
{
|
2020-03-03 04:42:12 +00:00
|
|
|
updated = max_watermark < watermark;
|
2020-03-01 18:08:52 +00:00
|
|
|
while (max_watermark < watermark)
|
|
|
|
{
|
|
|
|
fire_signal.push_back(max_watermark);
|
2020-03-03 04:42:12 +00:00
|
|
|
max_fired_watermark = max_watermark;
|
2020-06-19 09:35:18 +00:00
|
|
|
max_watermark
|
|
|
|
= is_tumble ? addTime(max_watermark, window_kind, window_num_units) : addTime(max_watermark, hop_kind, hop_num_units);
|
2020-03-01 18:08:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
else // strictly || bounded
|
|
|
|
{
|
2020-06-08 06:27:30 +00:00
|
|
|
UInt32 max_watermark_bias = addTime(max_watermark, watermark_kind, watermark_num_units);
|
2020-03-03 04:42:12 +00:00
|
|
|
updated = max_watermark_bias <= watermark;
|
2020-03-01 18:08:52 +00:00
|
|
|
while (max_watermark_bias <= max_timestamp)
|
|
|
|
{
|
|
|
|
fire_signal.push_back(max_watermark);
|
2020-03-03 04:42:12 +00:00
|
|
|
max_fired_watermark = max_watermark;
|
2020-06-09 11:37:33 +00:00
|
|
|
if (is_tumble)
|
2020-06-08 06:27:30 +00:00
|
|
|
{
|
|
|
|
max_watermark = addTime(max_watermark, window_kind, window_num_units);
|
|
|
|
max_watermark_bias = addTime(max_watermark, window_kind, window_num_units);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
max_watermark = addTime(max_watermark, hop_kind, hop_num_units);
|
|
|
|
max_watermark_bias = addTime(max_watermark, hop_kind, hop_num_units);
|
|
|
|
}
|
2020-03-01 18:08:52 +00:00
|
|
|
}
|
|
|
|
}
|
2020-03-03 04:42:12 +00:00
|
|
|
if (updated)
|
|
|
|
fire_signal_condition.notify_all();
|
2020-02-14 08:07:03 +00:00
|
|
|
}
|
|
|
|
|
2020-07-13 14:31:54 +00:00
|
|
|
void StorageWindowView::threadFuncCleanup()
|
2020-02-12 17:39:57 +00:00
|
|
|
{
|
|
|
|
while (!shutdown_called)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2020-02-17 08:18:27 +00:00
|
|
|
sleep(clean_interval);
|
2020-07-13 14:31:54 +00:00
|
|
|
cleanup();
|
2020-02-12 17:39:57 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (!shutdown_called)
|
2020-03-24 02:46:56 +00:00
|
|
|
clean_cache_task->scheduleAfter(RESCHEDULE_MS);
|
2020-02-12 17:39:57 +00:00
|
|
|
}
|
|
|
|
|
2020-03-01 18:08:52 +00:00
|
|
|
void StorageWindowView::threadFuncFireProc()
|
2020-01-14 03:07:31 +00:00
|
|
|
{
|
2020-02-23 15:32:34 +00:00
|
|
|
std::unique_lock lock(fire_signal_mutex);
|
|
|
|
while (!shutdown_called)
|
2020-01-14 03:07:31 +00:00
|
|
|
{
|
2020-02-23 15:32:34 +00:00
|
|
|
UInt32 timestamp_now = std::time(nullptr);
|
|
|
|
while (next_fire_signal <= timestamp_now)
|
2020-01-14 03:07:31 +00:00
|
|
|
{
|
2020-06-08 06:27:30 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
fire(next_fire_signal);
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
2020-03-31 06:28:04 +00:00
|
|
|
max_fired_watermark = next_fire_signal;
|
2020-06-08 06:27:30 +00:00
|
|
|
next_fire_signal = addTime(next_fire_signal, window_kind, window_num_units);
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
|
|
|
|
2020-03-01 18:08:52 +00:00
|
|
|
next_fire_signal = getWindowUpperBound(timestamp_now);
|
2020-02-14 08:07:03 +00:00
|
|
|
UInt64 timestamp_usec = static_cast<UInt64>(Poco::Timestamp().epochMicroseconds());
|
2020-03-01 18:08:52 +00:00
|
|
|
fire_signal_condition.wait_for(lock, std::chrono::microseconds(static_cast<UInt64>(next_fire_signal) * 1000000 - timestamp_usec));
|
|
|
|
}
|
|
|
|
if (!shutdown_called)
|
2020-03-24 02:46:56 +00:00
|
|
|
fire_task->scheduleAfter(RESCHEDULE_MS);
|
2020-03-01 18:08:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void StorageWindowView::threadFuncFireEvent()
|
|
|
|
{
|
|
|
|
std::unique_lock lock(fire_signal_mutex);
|
|
|
|
while (!shutdown_called)
|
|
|
|
{
|
|
|
|
bool signaled = std::cv_status::no_timeout == fire_signal_condition.wait_for(lock, std::chrono::seconds(5));
|
|
|
|
if (!signaled)
|
|
|
|
continue;
|
2020-02-23 15:32:34 +00:00
|
|
|
|
2020-03-01 18:08:52 +00:00
|
|
|
while (!fire_signal.empty())
|
2020-02-23 15:32:34 +00:00
|
|
|
{
|
2020-03-01 18:08:52 +00:00
|
|
|
fire(fire_signal.front());
|
|
|
|
fire_signal.pop_front();
|
2020-02-23 15:32:34 +00:00
|
|
|
}
|
2020-02-14 08:07:03 +00:00
|
|
|
}
|
|
|
|
if (!shutdown_called)
|
2020-03-24 02:46:56 +00:00
|
|
|
fire_task->scheduleAfter(RESCHEDULE_MS);
|
2020-02-14 08:07:03 +00:00
|
|
|
}
|
|
|
|
|
2020-01-14 03:07:31 +00:00
|
|
|
BlockInputStreams StorageWindowView::watch(
|
|
|
|
const Names & /*column_names*/,
|
|
|
|
const SelectQueryInfo & query_info,
|
2020-03-24 02:40:34 +00:00
|
|
|
const Context & context,
|
2020-01-14 03:07:31 +00:00
|
|
|
QueryProcessingStage::Enum & processed_stage,
|
|
|
|
size_t /*max_block_size*/,
|
|
|
|
const unsigned /*num_streams*/)
|
|
|
|
{
|
|
|
|
ASTWatchQuery & query = typeid_cast<ASTWatchQuery &>(*query_info.query);
|
|
|
|
|
|
|
|
bool has_limit = false;
|
|
|
|
UInt64 limit = 0;
|
|
|
|
if (query.limit_length)
|
|
|
|
{
|
|
|
|
has_limit = true;
|
|
|
|
limit = safeGet<UInt64>(typeid_cast<ASTLiteral &>(*query.limit_length).value);
|
|
|
|
}
|
|
|
|
|
2020-03-06 14:58:16 +00:00
|
|
|
auto reader = std::make_shared<WindowViewBlockInputStream>(
|
|
|
|
std::static_pointer_cast<StorageWindowView>(shared_from_this()),
|
|
|
|
has_limit,
|
|
|
|
limit,
|
2020-03-24 02:40:34 +00:00
|
|
|
context.getSettingsRef().window_view_heartbeat_interval.totalSeconds());
|
2020-01-14 03:07:31 +00:00
|
|
|
|
2020-03-03 04:42:12 +00:00
|
|
|
std::lock_guard lock(fire_signal_mutex);
|
|
|
|
watch_streams.push_back(reader);
|
2020-01-14 03:07:31 +00:00
|
|
|
processed_stage = QueryProcessingStage::Complete;
|
|
|
|
|
|
|
|
return {reader};
|
|
|
|
}
|
|
|
|
|
|
|
|
StorageWindowView::StorageWindowView(
|
2020-01-24 02:45:45 +00:00
|
|
|
const StorageID & table_id_,
|
2020-01-14 03:07:31 +00:00
|
|
|
Context & local_context,
|
|
|
|
const ASTCreateQuery & query,
|
2020-02-12 17:39:57 +00:00
|
|
|
const ColumnsDescription & columns_,
|
|
|
|
bool attach_)
|
2020-01-24 02:45:45 +00:00
|
|
|
: IStorage(table_id_)
|
2020-01-14 03:07:31 +00:00
|
|
|
, global_context(local_context.getGlobalContext())
|
|
|
|
, time_zone(DateLUT::instance())
|
|
|
|
{
|
2020-06-09 08:48:04 +00:00
|
|
|
wv_context = std::make_unique<Context>(global_context);
|
|
|
|
wv_context->makeQueryContext();
|
2020-03-31 14:02:53 +00:00
|
|
|
|
2020-01-14 03:07:31 +00:00
|
|
|
setColumns(columns_);
|
|
|
|
|
|
|
|
if (!query.select)
|
|
|
|
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
|
|
|
|
|
|
|
|
if (query.select->list_of_selects->children.size() != 1)
|
|
|
|
throw Exception("UNION is not supported for Window View", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
|
|
|
|
|
2020-03-24 02:46:56 +00:00
|
|
|
ASTSelectQuery & select_query = typeid_cast<ASTSelectQuery &>(*query.select->list_of_selects->children.at(0));
|
2020-03-31 14:02:53 +00:00
|
|
|
String select_database_name = global_context.getCurrentDatabase();
|
2020-01-24 02:45:45 +00:00
|
|
|
String select_table_name;
|
2020-01-14 03:07:31 +00:00
|
|
|
extractDependentTable(select_query, select_database_name, select_table_name);
|
2020-01-24 02:45:45 +00:00
|
|
|
select_table_id = StorageID(select_database_name, select_table_name);
|
2020-01-14 03:07:31 +00:00
|
|
|
inner_query = innerQueryParser(select_query);
|
|
|
|
|
2020-06-08 06:27:30 +00:00
|
|
|
mergeable_query = inner_query->clone();
|
|
|
|
|
|
|
|
ReplaceFunctionNowVisitorData func_now_data;
|
|
|
|
InDepthNodeVisitor<OneTypeMatcher<ReplaceFunctionNowVisitorData>, true>(func_now_data).visit(mergeable_query);
|
|
|
|
is_time_column_func_now = func_now_data.is_time_column_func_now;
|
|
|
|
if (is_time_column_func_now)
|
|
|
|
window_id_name = func_now_data.window_id_name;
|
|
|
|
|
|
|
|
final_query = mergeable_query->clone();
|
|
|
|
|
2020-06-19 09:35:18 +00:00
|
|
|
ReplaceWindowIdVisitorData final_query_data;
|
2020-06-17 15:06:19 +00:00
|
|
|
final_query_data.is_tumble = is_tumble;
|
2020-06-19 09:35:18 +00:00
|
|
|
InDepthNodeVisitor<OneTypeMatcher<ReplaceWindowIdVisitorData>, true>(final_query_data).visit(final_query);
|
2020-06-08 06:27:30 +00:00
|
|
|
|
2020-03-01 18:08:52 +00:00
|
|
|
is_watermark_strictly_ascending = query.is_watermark_strictly_ascending;
|
|
|
|
is_watermark_ascending = query.is_watermark_ascending;
|
|
|
|
is_watermark_bounded = query.is_watermark_bounded;
|
2020-02-17 05:06:03 +00:00
|
|
|
|
2020-01-14 03:07:31 +00:00
|
|
|
/// If the table is not specified - use the table `system.one`
|
|
|
|
if (select_table_name.empty())
|
|
|
|
{
|
|
|
|
select_database_name = "system";
|
|
|
|
select_table_name = "one";
|
|
|
|
}
|
|
|
|
|
2020-03-22 16:36:11 +00:00
|
|
|
DatabaseCatalog::instance().addDependency(select_table_id, table_id_);
|
2020-01-24 02:45:45 +00:00
|
|
|
|
2020-06-09 08:48:04 +00:00
|
|
|
target_table_id = query.to_table_id;
|
2020-01-14 03:07:31 +00:00
|
|
|
|
2020-03-31 14:02:53 +00:00
|
|
|
clean_interval = global_context.getSettingsRef().window_view_clean_interval.totalSeconds();
|
2020-02-23 15:32:34 +00:00
|
|
|
next_fire_signal = getWindowUpperBound(std::time(nullptr));
|
2020-01-14 03:07:31 +00:00
|
|
|
|
2020-03-01 18:08:52 +00:00
|
|
|
if (query.is_watermark_strictly_ascending || query.is_watermark_ascending || query.is_watermark_bounded)
|
2020-02-14 08:07:03 +00:00
|
|
|
{
|
2020-03-01 18:08:52 +00:00
|
|
|
is_proctime = false;
|
2020-03-22 15:03:16 +00:00
|
|
|
if (is_time_column_func_now)
|
2020-03-01 18:08:52 +00:00
|
|
|
throw Exception("now() is not support for Event time processing.", ErrorCodes::INCORRECT_QUERY);
|
|
|
|
if (query.is_watermark_ascending)
|
2020-02-14 08:07:03 +00:00
|
|
|
{
|
2020-03-01 18:08:52 +00:00
|
|
|
is_watermark_bounded = true;
|
|
|
|
watermark_kind = IntervalKind::Second;
|
|
|
|
watermark_num_units = 1;
|
2020-02-14 08:07:03 +00:00
|
|
|
}
|
2020-03-01 18:08:52 +00:00
|
|
|
else if (query.is_watermark_bounded)
|
2020-02-14 08:07:03 +00:00
|
|
|
{
|
2020-03-01 18:08:52 +00:00
|
|
|
// parser watermark function
|
|
|
|
const auto & watermark_function = std::static_pointer_cast<ASTFunction>(query.watermark_function);
|
|
|
|
if (!startsWith(watermark_function->name, "toInterval"))
|
2020-03-03 04:42:12 +00:00
|
|
|
throw Exception("Illegal type of WATERMARK function, should be Interval", ErrorCodes::ILLEGAL_COLUMN);
|
2020-03-01 18:08:52 +00:00
|
|
|
|
|
|
|
const auto & interval_units_p1 = std::static_pointer_cast<ASTLiteral>(watermark_function->children.front()->children.front());
|
2020-03-03 04:42:12 +00:00
|
|
|
watermark_kind = strToIntervalKind(watermark_function->name.substr(10));
|
2020-03-01 18:08:52 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
watermark_num_units = boost::lexical_cast<int>(interval_units_p1->value.get<String>());
|
|
|
|
}
|
|
|
|
catch (const boost::bad_lexical_cast &)
|
|
|
|
{
|
|
|
|
throw Exception("Cannot parse string '" + interval_units_p1->value.get<String>() + "' as Integer.", ErrorCodes::CANNOT_PARSE_TEXT);
|
|
|
|
}
|
|
|
|
if (watermark_num_units <= 0)
|
|
|
|
throw Exception("Value for WATERMARK function must be positive.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
2020-02-14 08:07:03 +00:00
|
|
|
}
|
|
|
|
}
|
2020-02-12 17:39:57 +00:00
|
|
|
|
2020-03-03 04:42:12 +00:00
|
|
|
if (query.allowed_lateness)
|
|
|
|
{
|
|
|
|
allowed_lateness = true;
|
|
|
|
|
|
|
|
// parser lateness function
|
|
|
|
const auto & lateness_function = std::static_pointer_cast<ASTFunction>(query.lateness_function);
|
|
|
|
if (!startsWith(lateness_function->name, "toInterval"))
|
|
|
|
throw Exception("Illegal type of ALLOWED_LATENESS function, should be Interval", ErrorCodes::ILLEGAL_COLUMN);
|
|
|
|
|
|
|
|
const auto & interval_units_p1 = std::static_pointer_cast<ASTLiteral>(lateness_function->children.front()->children.front());
|
|
|
|
lateness_kind = strToIntervalKind(lateness_function->name.substr(10));
|
|
|
|
try
|
|
|
|
{
|
|
|
|
lateness_num_units = boost::lexical_cast<int>(interval_units_p1->value.get<String>());
|
|
|
|
}
|
|
|
|
catch (const boost::bad_lexical_cast &)
|
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
"Cannot parse string '" + interval_units_p1->value.get<String>() + "' as Integer.", ErrorCodes::CANNOT_PARSE_TEXT);
|
|
|
|
}
|
|
|
|
if (lateness_num_units <= 0)
|
|
|
|
throw Exception("Value for ALLOWED_LATENESS function must be positive.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
|
|
|
}
|
|
|
|
|
2020-03-27 16:44:09 +00:00
|
|
|
if (attach_)
|
2020-02-12 17:39:57 +00:00
|
|
|
{
|
2020-03-27 16:44:09 +00:00
|
|
|
inner_table_id = StorageID(table_id_.database_name, generateInnerTableName(table_id_.table_name));
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
auto inner_create_query
|
|
|
|
= generateInnerTableCreateQuery(query.storage, table_id_.database_name, generateInnerTableName(table_id_.table_name));
|
|
|
|
|
2020-06-09 08:48:04 +00:00
|
|
|
InterpreterCreateQuery create_interpreter(inner_create_query, *wv_context);
|
2020-03-27 16:44:09 +00:00
|
|
|
create_interpreter.setInternal(true);
|
|
|
|
create_interpreter.execute();
|
2020-06-09 08:48:04 +00:00
|
|
|
inner_storage = DatabaseCatalog::instance().getTable(StorageID(inner_create_query->database, inner_create_query->table), global_context);
|
2020-03-27 16:44:09 +00:00
|
|
|
inner_table_id = inner_storage->getStorageID();
|
2020-02-12 17:39:57 +00:00
|
|
|
}
|
|
|
|
|
2020-06-17 15:06:19 +00:00
|
|
|
if (is_tumble)
|
|
|
|
window_column_name = std::regex_replace(window_id_name, std::regex("WINDOW_ID"), "TUMBLE");
|
|
|
|
else
|
|
|
|
window_column_name = std::regex_replace(window_id_name, std::regex("WINDOW_ID"), "HOP");
|
2020-06-08 06:27:30 +00:00
|
|
|
|
2020-07-13 14:31:54 +00:00
|
|
|
clean_cache_task = wv_context->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
|
2020-03-01 18:08:52 +00:00
|
|
|
if (is_proctime)
|
2020-06-09 08:48:04 +00:00
|
|
|
fire_task = wv_context->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFireProc(); });
|
2020-03-01 18:08:52 +00:00
|
|
|
else
|
2020-06-09 08:48:04 +00:00
|
|
|
fire_task = wv_context->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFireEvent(); });
|
2020-03-24 02:46:56 +00:00
|
|
|
clean_cache_task->deactivate();
|
|
|
|
fire_task->deactivate();
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query)
|
|
|
|
{
|
|
|
|
if (!query.groupBy())
|
|
|
|
throw Exception("GROUP BY query is required for " + getName(), ErrorCodes::INCORRECT_QUERY);
|
|
|
|
|
|
|
|
// parse stage mergeable
|
|
|
|
ASTPtr result = query.clone();
|
|
|
|
ASTPtr expr_list = result;
|
2020-06-08 06:27:30 +00:00
|
|
|
MergeableQueryVisitorData stage_mergeable_data;
|
2020-06-09 08:48:04 +00:00
|
|
|
InDepthNodeVisitor<OneTypeMatcher<MergeableQueryVisitorData, NeedChild::all>, true>(stage_mergeable_data).visit(expr_list);
|
2020-03-24 02:46:56 +00:00
|
|
|
if (!stage_mergeable_data.is_tumble && !stage_mergeable_data.is_hop)
|
2020-01-14 03:07:31 +00:00
|
|
|
throw Exception("WINDOW FUNCTION is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
|
2020-06-08 06:27:30 +00:00
|
|
|
window_id_name = stage_mergeable_data.window_id_name;
|
|
|
|
window_id_alias = stage_mergeable_data.window_id_alias;
|
2020-03-24 02:46:56 +00:00
|
|
|
timestamp_column_name = stage_mergeable_data.timestamp_column_name;
|
|
|
|
is_tumble = stage_mergeable_data.is_tumble;
|
2020-01-14 03:07:31 +00:00
|
|
|
|
|
|
|
// parser window function
|
2020-03-24 02:46:56 +00:00
|
|
|
ASTFunction & window_function = typeid_cast<ASTFunction &>(*stage_mergeable_data.window_function);
|
2020-02-12 17:39:57 +00:00
|
|
|
const auto & arguments = window_function.arguments->children;
|
2020-02-14 08:07:03 +00:00
|
|
|
const auto & arg1 = std::static_pointer_cast<ASTFunction>(arguments.at(1));
|
|
|
|
if (!arg1 || !startsWith(arg1->name, "toInterval"))
|
2020-03-01 18:08:52 +00:00
|
|
|
throw Exception("Illegal type of second argument of function " + arg1->name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN);
|
|
|
|
window_kind = strToIntervalKind(arg1->name.substr(10));
|
2020-02-14 08:07:03 +00:00
|
|
|
const auto & interval_units_p1 = std::static_pointer_cast<ASTLiteral>(arg1->children.front()->children.front());
|
2020-01-14 03:07:31 +00:00
|
|
|
window_num_units = stoi(interval_units_p1->value.get<String>());
|
2020-03-01 18:08:52 +00:00
|
|
|
if (window_num_units <= 0)
|
|
|
|
throw Exception("Interval value for WINDOW function must be positive.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
|
|
|
|
|
|
|
if (!is_tumble)
|
|
|
|
{
|
2020-06-08 06:27:30 +00:00
|
|
|
hop_kind = window_kind;
|
|
|
|
hop_num_units = window_num_units;
|
2020-03-01 18:08:52 +00:00
|
|
|
const auto & arg2 = std::static_pointer_cast<ASTFunction>(arguments.at(2));
|
|
|
|
if (!arg2 || !startsWith(arg2->name, "toInterval"))
|
|
|
|
throw Exception("Illegal type of last argument of function " + arg2->name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN);
|
2020-06-08 06:27:30 +00:00
|
|
|
window_kind = strToIntervalKind(arg2->name.substr(10));
|
2020-03-01 18:08:52 +00:00
|
|
|
const auto & interval_units_p2 = std::static_pointer_cast<ASTLiteral>(arg2->children.front()->children.front());
|
2020-06-08 06:27:30 +00:00
|
|
|
window_num_units = stoi(interval_units_p2->value.get<String>());
|
|
|
|
if (window_num_units <= 0)
|
2020-03-01 18:08:52 +00:00
|
|
|
throw Exception("Interval value for WINDOW function must be positive.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
2020-06-08 06:27:30 +00:00
|
|
|
|
|
|
|
slice_num_units= std::gcd(hop_num_units, window_num_units);
|
2020-03-01 18:08:52 +00:00
|
|
|
}
|
2020-01-14 03:07:31 +00:00
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context)
|
|
|
|
{
|
2020-02-22 17:06:10 +00:00
|
|
|
Pipe pipe(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), Chunk(block.getColumns(), block.rows())));
|
2020-03-01 18:08:52 +00:00
|
|
|
BlockInputStreamPtr source_stream;
|
2020-02-21 12:35:26 +00:00
|
|
|
|
|
|
|
std::shared_lock<std::shared_mutex> fire_signal_lock;
|
2020-02-28 07:06:17 +00:00
|
|
|
if (window_view.is_proctime)
|
2020-02-20 17:30:58 +00:00
|
|
|
{
|
2020-02-21 12:35:26 +00:00
|
|
|
fire_signal_lock = std::shared_lock<std::shared_mutex>(window_view.fire_signal_mutex);
|
2020-07-13 14:28:31 +00:00
|
|
|
if (window_view.is_time_column_func_now)
|
|
|
|
pipe.addSimpleTransform(std::make_shared<AddingConstColumnTransform<UInt32>>(
|
|
|
|
pipe.getHeader(), std::make_shared<DataTypeDateTime>(), std::time(nullptr), "____timestamp"));
|
2020-06-08 06:27:30 +00:00
|
|
|
InterpreterSelectQuery select_block(window_view.getMergeableQuery(), context, {std::move(pipe)}, QueryProcessingStage::WithMergeableState);
|
2020-03-01 18:08:52 +00:00
|
|
|
|
2020-06-09 08:48:04 +00:00
|
|
|
source_stream = select_block.execute().getInputStream();
|
2020-03-01 18:08:52 +00:00
|
|
|
source_stream = std::make_shared<SquashingBlockInputStream>(
|
|
|
|
source_stream, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
|
2020-02-20 17:30:58 +00:00
|
|
|
}
|
2020-02-17 05:06:03 +00:00
|
|
|
else
|
2020-02-20 17:30:58 +00:00
|
|
|
{
|
2020-03-24 02:46:56 +00:00
|
|
|
UInt32 t_max_fired_watermark = 0;
|
2020-03-03 04:42:12 +00:00
|
|
|
if (window_view.allowed_lateness)
|
|
|
|
{
|
2020-06-19 09:35:18 +00:00
|
|
|
UInt32 t_max_timestamp = 0;
|
2020-03-24 02:46:56 +00:00
|
|
|
UInt32 t_max_watermark = 0;
|
2020-03-03 04:42:12 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(window_view.fire_signal_mutex);
|
2020-03-24 02:46:56 +00:00
|
|
|
t_max_fired_watermark = window_view.max_fired_watermark;
|
|
|
|
t_max_watermark = window_view.max_watermark;
|
2020-06-19 09:35:18 +00:00
|
|
|
t_max_timestamp = window_view.max_timestamp;
|
2020-03-03 04:42:12 +00:00
|
|
|
}
|
|
|
|
|
2020-06-19 09:35:18 +00:00
|
|
|
if (t_max_timestamp!= 0)
|
2020-03-03 04:42:12 +00:00
|
|
|
{
|
|
|
|
UInt32 lateness_bound
|
2020-06-19 09:35:18 +00:00
|
|
|
= window_view.addTime(t_max_timestamp, window_view.lateness_kind, -1 * window_view.lateness_num_units);
|
2020-03-03 04:42:12 +00:00
|
|
|
if (window_view.is_watermark_bounded)
|
|
|
|
{
|
|
|
|
UInt32 watermark_lower_bound = window_view.is_tumble
|
2020-06-08 06:27:30 +00:00
|
|
|
? window_view.addTime(t_max_watermark, window_view.window_kind, -1 * window_view.window_num_units)
|
|
|
|
: window_view.addTime(t_max_watermark, window_view.hop_kind, -1 * window_view.hop_num_units);
|
2020-03-03 04:42:12 +00:00
|
|
|
if (watermark_lower_bound < lateness_bound)
|
|
|
|
lateness_bound = watermark_lower_bound;
|
|
|
|
}
|
|
|
|
|
2020-03-24 02:46:56 +00:00
|
|
|
ColumnsWithTypeAndName columns;
|
|
|
|
columns.emplace_back(nullptr, std::make_shared<DataTypeDateTime>(), window_view.timestamp_column_name);
|
|
|
|
ExpressionActionsPtr filter_expressions = std::make_shared<ExpressionActions>(columns, context);
|
|
|
|
filter_expressions->add(
|
2020-03-03 04:42:12 +00:00
|
|
|
ExpressionAction::addColumn({std::make_shared<DataTypeDateTime>()->createColumnConst(1, toField(lateness_bound)),
|
|
|
|
std::make_shared<DataTypeDateTime>(),
|
|
|
|
"____lateness_bound"}));
|
|
|
|
const auto & function_greater = FunctionFactory::instance().get("greaterOrEquals", context);
|
2020-03-24 02:46:56 +00:00
|
|
|
filter_expressions->add(ExpressionAction::applyFunction(
|
2020-03-03 04:42:12 +00:00
|
|
|
function_greater, Names{window_view.timestamp_column_name, "____lateness_bound"}, "____filter"));
|
2020-03-24 02:46:56 +00:00
|
|
|
pipe.addSimpleTransform(std::make_shared<FilterTransform>(pipe.getHeader(), filter_expressions, "____filter", true));
|
2020-03-03 04:42:12 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-08 06:27:30 +00:00
|
|
|
InterpreterSelectQuery select_block(window_view.getMergeableQuery(), context, {std::move(pipe)}, QueryProcessingStage::WithMergeableState);
|
2020-03-01 18:08:52 +00:00
|
|
|
|
2020-06-09 08:48:04 +00:00
|
|
|
source_stream = select_block.execute().getInputStream();
|
2020-03-01 18:08:52 +00:00
|
|
|
source_stream = std::make_shared<SquashingBlockInputStream>(
|
|
|
|
source_stream, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
|
2020-06-19 09:35:18 +00:00
|
|
|
source_stream = std::make_shared<WatermarkBlockInputStream>(source_stream, window_view, window_view.window_id_name);
|
2020-03-01 18:08:52 +00:00
|
|
|
|
2020-06-19 09:35:18 +00:00
|
|
|
if (window_view.is_watermark_bounded || window_view.allowed_lateness)
|
2020-06-08 06:27:30 +00:00
|
|
|
{
|
2020-06-19 09:35:18 +00:00
|
|
|
UInt32 t_max_timestamp = 0;
|
|
|
|
if (window_view.is_watermark_bounded || window_view.allowed_lateness)
|
|
|
|
{
|
|
|
|
const auto & column_timestamp = block.getByName(window_view.timestamp_column_name).column;
|
|
|
|
const ColumnUInt32::Container & timestamp_data = static_cast<const ColumnUInt32 &>(*column_timestamp).getData();
|
|
|
|
for (const auto & timestamp : timestamp_data)
|
|
|
|
{
|
|
|
|
if (timestamp > t_max_timestamp)
|
|
|
|
t_max_timestamp = timestamp;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
std::static_pointer_cast<WatermarkBlockInputStream>(source_stream)->setMaxTimestamp(t_max_timestamp);
|
2020-06-08 06:27:30 +00:00
|
|
|
}
|
2020-02-14 08:07:03 +00:00
|
|
|
|
2020-03-24 02:46:56 +00:00
|
|
|
if (window_view.allowed_lateness && t_max_fired_watermark != 0)
|
|
|
|
std::static_pointer_cast<WatermarkBlockInputStream>(source_stream)->setAllowedLateness(t_max_fired_watermark);
|
2020-03-03 04:42:12 +00:00
|
|
|
}
|
2020-02-12 17:39:57 +00:00
|
|
|
|
2020-03-27 16:44:09 +00:00
|
|
|
auto & inner_storage = window_view.getInnerStorage();
|
2020-06-09 08:48:04 +00:00
|
|
|
auto lock = inner_storage->lockStructureForShare(true, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
|
2020-06-08 06:27:30 +00:00
|
|
|
auto stream = inner_storage->write(window_view.getMergeableQuery(), context);
|
2020-03-27 16:44:09 +00:00
|
|
|
copyData(*source_stream, *stream);
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void StorageWindowView::startup()
|
|
|
|
{
|
|
|
|
// Start the working thread
|
2020-03-24 02:46:56 +00:00
|
|
|
clean_cache_task->activateAndSchedule();
|
|
|
|
fire_task->activateAndSchedule();
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void StorageWindowView::shutdown()
|
|
|
|
{
|
|
|
|
bool expected = false;
|
|
|
|
if (!shutdown_called.compare_exchange_strong(expected, true))
|
|
|
|
return;
|
2020-03-24 02:46:56 +00:00
|
|
|
clean_cache_task->deactivate();
|
|
|
|
fire_task->deactivate();
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
StorageWindowView::~StorageWindowView()
|
|
|
|
{
|
|
|
|
shutdown();
|
|
|
|
}
|
|
|
|
|
2020-03-03 04:42:12 +00:00
|
|
|
Block & StorageWindowView::getHeader() const
|
|
|
|
{
|
|
|
|
std::lock_guard lock(sample_block_lock);
|
|
|
|
if (!sample_block)
|
|
|
|
{
|
|
|
|
sample_block = InterpreterSelectQuery(
|
2020-06-09 08:48:04 +00:00
|
|
|
getFinalQuery(), *wv_context, getParentStorage(), SelectQueryOptions(QueryProcessingStage::Complete))
|
2020-03-03 04:42:12 +00:00
|
|
|
.getSampleBlock();
|
|
|
|
for (size_t i = 0; i < sample_block.columns(); ++i)
|
|
|
|
sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst();
|
|
|
|
}
|
|
|
|
return sample_block;
|
|
|
|
}
|
|
|
|
|
|
|
|
StoragePtr StorageWindowView::getParentStorage() const
|
|
|
|
{
|
|
|
|
if (parent_storage == nullptr)
|
2020-06-09 08:48:04 +00:00
|
|
|
parent_storage = DatabaseCatalog::instance().getTable(select_table_id, global_context);
|
2020-03-03 04:42:12 +00:00
|
|
|
return parent_storage;
|
|
|
|
}
|
|
|
|
|
|
|
|
StoragePtr & StorageWindowView::getInnerStorage() const
|
|
|
|
{
|
2020-03-27 16:44:09 +00:00
|
|
|
if (inner_storage == nullptr)
|
2020-06-09 08:48:04 +00:00
|
|
|
inner_storage = DatabaseCatalog::instance().getTable(inner_table_id, global_context);
|
2020-03-03 04:42:12 +00:00
|
|
|
return inner_storage;
|
|
|
|
}
|
|
|
|
|
2020-06-08 06:27:30 +00:00
|
|
|
ASTPtr StorageWindowView::getFetchColumnQuery(UInt32 w_start, UInt32 w_end) const
|
2020-03-31 05:48:01 +00:00
|
|
|
{
|
|
|
|
auto res_query = std::make_shared<ASTSelectQuery>();
|
|
|
|
auto select = std::make_shared<ASTExpressionList>();
|
|
|
|
select->children.push_back(std::make_shared<ASTAsterisk>());
|
|
|
|
res_query->setExpression(ASTSelectQuery::Expression::SELECT, select);
|
|
|
|
res_query->setExpression(ASTSelectQuery::Expression::TABLES, std::make_shared<ASTTablesInSelectQuery>());
|
|
|
|
auto tables_elem = std::make_shared<ASTTablesInSelectQueryElement>();
|
|
|
|
auto table_expr = std::make_shared<ASTTableExpression>();
|
|
|
|
res_query->tables()->children.push_back(tables_elem);
|
|
|
|
tables_elem->table_expression = table_expr;
|
|
|
|
tables_elem->children.push_back(table_expr);
|
|
|
|
table_expr->database_and_table_name = createTableIdentifier(inner_table_id.database_name, inner_table_id.table_name);
|
|
|
|
table_expr->children.push_back(table_expr->database_and_table_name);
|
|
|
|
|
2020-06-08 06:27:30 +00:00
|
|
|
if (is_tumble)
|
|
|
|
{
|
2020-06-19 09:35:18 +00:00
|
|
|
auto func_equals = makeASTFunction("equals", std::make_shared<ASTIdentifier>(window_id_name), std::make_shared<ASTLiteral>(w_end));
|
|
|
|
res_query->setExpression(ASTSelectQuery::Expression::PREWHERE, func_equals);
|
2020-06-08 06:27:30 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
auto func_array = makeASTFunction("array");
|
|
|
|
while (w_end > w_start)
|
|
|
|
{
|
|
|
|
func_array ->arguments->children.push_back(std::make_shared<ASTLiteral>(w_end));
|
|
|
|
w_end = addTime(w_end, window_kind, -1 * slice_num_units);
|
|
|
|
}
|
|
|
|
auto func_has = makeASTFunction("has", func_array, std::make_shared<ASTIdentifier>(window_id_name));
|
2020-06-19 09:35:18 +00:00
|
|
|
res_query->setExpression(ASTSelectQuery::Expression::PREWHERE, func_has);
|
2020-06-08 06:27:30 +00:00
|
|
|
}
|
2020-03-31 05:48:01 +00:00
|
|
|
|
|
|
|
return res_query;
|
|
|
|
}
|
|
|
|
|
2020-03-03 04:42:12 +00:00
|
|
|
StoragePtr & StorageWindowView::getTargetStorage() const
|
|
|
|
{
|
|
|
|
if (target_storage == nullptr && !target_table_id.empty())
|
2020-06-09 08:48:04 +00:00
|
|
|
target_storage = DatabaseCatalog::instance().getTable(target_table_id, global_context);
|
2020-03-03 04:42:12 +00:00
|
|
|
return target_storage;
|
|
|
|
}
|
|
|
|
|
2020-03-01 18:08:52 +00:00
|
|
|
BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtr(UInt32 watermark)
|
2020-01-14 03:07:31 +00:00
|
|
|
{
|
2020-06-08 06:27:30 +00:00
|
|
|
UInt32 w_start = addTime(watermark, window_kind, -1 * window_num_units);
|
|
|
|
|
2020-06-09 08:48:04 +00:00
|
|
|
InterpreterSelectQuery fetch(getFetchColumnQuery(w_start, watermark), *wv_context, getInnerStorage(), SelectQueryOptions(QueryProcessingStage::FetchColumns));
|
|
|
|
BlockInputStreamPtr in_stream = fetch.execute().getInputStream();
|
2020-01-14 03:07:31 +00:00
|
|
|
|
2020-06-19 09:35:18 +00:00
|
|
|
in_stream = std::make_shared<ReplaceWindowColumnBlockInputStream>(in_stream, window_column_name, w_start, watermark);
|
2020-06-08 06:27:30 +00:00
|
|
|
|
|
|
|
Pipes pipes;
|
|
|
|
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(in_stream)));
|
2020-01-14 03:07:31 +00:00
|
|
|
|
2020-02-12 17:39:57 +00:00
|
|
|
auto proxy_storage = std::make_shared<WindowViewProxyStorage>(
|
2020-06-08 06:27:30 +00:00
|
|
|
StorageID(getStorageID().database_name, "WindowViewProxyStorage"), getParentStorage()->getColumns(), std::move(pipes), QueryProcessingStage::WithMergeableState);
|
2020-06-09 11:37:33 +00:00
|
|
|
|
2020-06-09 08:48:04 +00:00
|
|
|
SelectQueryOptions query_options(QueryProcessingStage::Complete);
|
|
|
|
query_options.ignore_limits = true;
|
|
|
|
query_options.ignore_quota = true;
|
|
|
|
InterpreterSelectQuery select(getFinalQuery(), *wv_context, proxy_storage, query_options);
|
|
|
|
BlockInputStreamPtr data = select.execute().getInputStream();
|
2020-02-22 17:06:10 +00:00
|
|
|
|
|
|
|
data = std::make_shared<SquashingBlockInputStream>(
|
|
|
|
data, global_context.getSettingsRef().min_insert_block_size_rows,
|
|
|
|
global_context.getSettingsRef().min_insert_block_size_bytes);
|
2020-02-12 17:39:57 +00:00
|
|
|
return data;
|
2020-01-14 03:07:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void registerStorageWindowView(StorageFactory & factory)
|
|
|
|
{
|
2020-01-14 09:53:52 +00:00
|
|
|
factory.registerStorage("WindowView", [](const StorageFactory::Arguments & args)
|
|
|
|
{
|
2020-01-14 03:07:31 +00:00
|
|
|
if (!args.attach && !args.local_context.getSettingsRef().allow_experimental_window_view)
|
|
|
|
throw Exception(
|
|
|
|
"Experimental WINDOW VIEW feature is not enabled (the setting 'allow_experimental_window_view')",
|
|
|
|
ErrorCodes::SUPPORT_IS_DISABLED);
|
2020-01-24 02:45:45 +00:00
|
|
|
|
2020-02-12 17:39:57 +00:00
|
|
|
return StorageWindowView::create(args.table_id, args.local_context, args.query, args.columns, args.attach);
|
2020-01-14 03:07:31 +00:00
|
|
|
});
|
|
|
|
}
|
2020-02-12 17:39:57 +00:00
|
|
|
|
2020-01-14 16:24:26 +00:00
|
|
|
}
|