update fetch column logic

This commit is contained in:
Vxider 2020-03-31 13:48:01 +08:00
parent 107ddb21b3
commit 8f84599ba3
2 changed files with 27 additions and 50 deletions

View File

@ -224,25 +224,6 @@ namespace
}
String generateInnerTableName(const String & table_name) { return ".inner." + table_name; }
std::shared_ptr<ASTSelectQuery> generateFetchColumnsQuery(const StorageID & inner_storage)
{
auto res_query = std::make_shared<ASTSelectQuery>();
auto select = std::make_shared<ASTExpressionList>();
select->children.push_back(std::make_shared<ASTAsterisk>());
res_query->setExpression(ASTSelectQuery::Expression::SELECT, select);
res_query->setExpression(ASTSelectQuery::Expression::TABLES, std::make_shared<ASTTablesInSelectQuery>());
auto tables = res_query->tables();
auto tables_elem = std::make_shared<ASTTablesInSelectQueryElement>();
auto table_expr = std::make_shared<ASTTableExpression>();
tables->children.push_back(tables_elem);
tables_elem->table_expression = table_expr;
tables_elem->children.push_back(table_expr);
table_expr->database_and_table_name = createTableIdentifier(inner_storage.database_name, inner_storage.table_name);
table_expr->children.push_back(table_expr->database_and_table_name);
return res_query;
}
}
static void extractDependentTable(ASTSelectQuery & query, String & select_database_name, String & select_table_name)
@ -285,7 +266,7 @@ static void extractDependentTable(ASTSelectQuery & query, String & select_databa
ASTPtr StorageWindowView::generateCleanCacheQuery(UInt32 timestamp)
{
auto function_tuple
= makeASTFunction("tupleElement", std::make_shared<ASTIdentifier>(window_column_name), std::make_shared<ASTLiteral>("2"));
= makeASTFunction("tupleElement", std::make_shared<ASTIdentifier>(window_column_name), std::make_shared<ASTLiteral>(Field{UInt8(2)}));
auto function_equal = makeASTFunction("less", function_tuple, std::make_shared<ASTLiteral>(timestamp));
auto alter_command = std::make_shared<ASTAlterCommand>();
@ -453,7 +434,7 @@ std::shared_ptr<ASTCreateQuery> StorageWindowView::generateInnerTableCreateQuery
inner_create_query->database = database_name;
inner_create_query->table = table_name;
auto inner_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(getInnerQuery());
auto inner_select_query = std::static_pointer_cast<ASTSelectQuery>(getInnerQuery());
Aliases aliases;
QueryAliasesVisitor::Data query_aliases_data{aliases};
@ -896,7 +877,6 @@ StorageWindowView::StorageWindowView(
inner_storage = DatabaseCatalog::instance().getTable(StorageID(inner_create_query->database, inner_create_query->table));
inner_table_id = inner_storage->getStorageID();
}
fetch_column_query = generateFetchColumnsQuery(inner_table_id);
clean_cache_task = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanCache(); });
if (is_proctime)
@ -1099,6 +1079,29 @@ StoragePtr & StorageWindowView::getInnerStorage() const
return inner_storage;
}
ASTPtr StorageWindowView::getFetchColumnQuery(UInt32 watermark) const
{
auto res_query = std::make_shared<ASTSelectQuery>();
auto select = std::make_shared<ASTExpressionList>();
select->children.push_back(std::make_shared<ASTAsterisk>());
res_query->setExpression(ASTSelectQuery::Expression::SELECT, select);
res_query->setExpression(ASTSelectQuery::Expression::TABLES, std::make_shared<ASTTablesInSelectQuery>());
auto tables_elem = std::make_shared<ASTTablesInSelectQueryElement>();
auto table_expr = std::make_shared<ASTTableExpression>();
res_query->tables()->children.push_back(tables_elem);
tables_elem->table_expression = table_expr;
tables_elem->children.push_back(table_expr);
table_expr->database_and_table_name = createTableIdentifier(inner_table_id.database_name, inner_table_id.table_name);
table_expr->children.push_back(table_expr->database_and_table_name);
auto func_tuple
= makeASTFunction("tupleElement", std::make_shared<ASTIdentifier>(window_column_name), std::make_shared<ASTLiteral>(Field{UInt8(2)}));
auto func_equals = makeASTFunction("equals", func_tuple, std::make_shared<ASTLiteral>(watermark));
res_query->setExpression(ASTSelectQuery::Expression::WHERE, func_equals);
return res_query;
}
StoragePtr & StorageWindowView::getTargetStorage() const
{
if (target_storage == nullptr && !target_table_id.empty())
@ -1111,38 +1114,12 @@ BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtr(UInt32 waterma
Pipes pipes;
auto & storage = getInnerStorage();
InterpreterSelectQuery fetch(fetch_column_query, global_context, storage, SelectQueryOptions(QueryProcessingStage::FetchColumns));
InterpreterSelectQuery fetch(getFetchColumnQuery(watermark), global_context, storage, SelectQueryOptions(QueryProcessingStage::FetchColumns));
QueryPipeline pipeline;
BlockInputStreams streams = fetch.executeWithMultipleStreams(pipeline);
for (auto & stream : streams)
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
ColumnsWithTypeAndName t_columns;
t_columns.emplace_back(
nullptr,
std::make_shared<DataTypeTuple>(DataTypes{std::make_shared<DataTypeDateTime>(), std::make_shared<DataTypeDateTime>()}),
window_column_name);
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(t_columns, global_context);
const auto & function_tuple = FunctionFactory::instance().get("tupleElement", global_context);
actions->add(ExpressionAction::addColumn(
{std::make_shared<DataTypeUInt8>()->createColumnConst(1, toField(2)), std::make_shared<DataTypeUInt8>(), "____tuple_arg"}));
actions->add(ExpressionAction::applyFunction(function_tuple, Names{window_column_name, "____tuple_arg"}, "____w_end"));
actions->add(ExpressionAction::removeColumn("____tuple_arg"));
actions->add(ExpressionAction::addColumn({std::make_shared<DataTypeDateTime>()->createColumnConst(1, toField(watermark)),
std::make_shared<DataTypeDateTime>(),
"____watermark"}));
const auto & function_equals = FunctionFactory::instance().get("equals", global_context);
ExpressionActionsPtr apply_function_actions = std::make_shared<ExpressionActions>(t_columns, global_context);
actions->add(ExpressionAction::applyFunction(function_equals, Names{"____w_end", "____watermark"}, "____filter"));
actions->add(ExpressionAction::removeColumn("____w_end"));
actions->add(ExpressionAction::removeColumn("____watermark"));
for (auto & pipe : pipes)
pipe.addSimpleTransform(std::make_shared<FilterTransform>(pipe.getHeader(), actions,
"____filter", true));
auto proxy_storage = std::make_shared<WindowViewProxyStorage>(
StorageID(global_context.getCurrentDatabase(), "WindowViewProxyStorage"), getParentStorage()->getColumns(), std::move(pipes), QueryProcessingStage::WithMergeableState);

View File

@ -55,7 +55,6 @@ public:
private:
ASTPtr inner_query;
ASTPtr final_query;
ASTPtr fetch_column_query;
Context & global_context;
bool is_proctime{true};
@ -127,6 +126,7 @@ private:
ASTPtr getInnerQuery() const { return inner_query->clone(); }
ASTPtr getFinalQuery() const { return final_query->clone(); }
ASTPtr getFetchColumnQuery(UInt32 watermark) const;
StoragePtr getParentStorage() const;