function now support for windowview populate

This commit is contained in:
Vxider 2022-05-08 23:43:08 +08:00
parent 1e9c1c0829
commit 34f1821eb9
2 changed files with 28 additions and 29 deletions

View File

@ -1436,9 +1436,17 @@ BlockIO InterpreterCreateQuery::fillTableIfNeeded(const ASTCreateQuery & create)
{
/// If the query is a CREATE SELECT, insert the data into the table.
if (create.select && !create.attach
&& !create.is_ordinary_view && !create.is_live_view && !create.is_window_view
&& !create.is_ordinary_view && !create.is_live_view
&& (!create.is_materialized_view || create.is_populate))
{
if (create.is_window_view)
{
auto table = DatabaseCatalog::instance().getTable({create.getDatabase(), create.getTable(), create.uuid}, getContext());
if (auto * window_view = typeid_cast<StorageWindowView *>(table.get()))
return window_view->populate();
return {};
}
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = {create.getDatabase(), create.getTable(), create.uuid};
insert->select = create.select->clone();
@ -1446,13 +1454,6 @@ BlockIO InterpreterCreateQuery::fillTableIfNeeded(const ASTCreateQuery & create)
return InterpreterInsertQuery(insert, getContext(),
getContext()->getSettingsRef().insert_allow_materialized_columns).execute();
}
else if (create.select && !create.attach && create.is_window_view && create.is_populate)
{
auto table = DatabaseCatalog::instance().getTable({create.getDatabase(), create.getTable(), create.uuid}, getContext());
if (auto * window_view = dynamic_cast<StorageWindowView *>(table.get()))
return window_view->populate();
return {};
}
return {};
}

View File

@ -1207,32 +1207,30 @@ private:
BlockIO StorageWindowView::populate()
{
if (is_time_column_func_now)
throw Exception(
ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW, "POPULATE is not supported when using function now() as the time column");
QueryPipelineBuilder pipeline;
InterpreterSelectQuery interpreter_fetch{select_query, getContext(), SelectQueryOptions(QueryProcessingStage::FetchColumns)};
pipeline = interpreter_fetch.buildQueryPipeline();
SortDescription order_descr;
order_descr.emplace_back(timestamp_column_name);
pipeline.addSimpleTransform(
[&](const Block & header)
{
return std::make_shared<MergeSortingTransform>(
header,
order_descr,
getContext()->getSettingsRef().max_block_size,
0 /*LIMIT*/,
getContext()->getSettingsRef().max_bytes_before_remerge_sort,
getContext()->getSettingsRef().remerge_sort_lowered_memory_bytes_ratio,
getContext()->getSettingsRef().max_bytes_before_external_sort,
getContext()->getTemporaryVolume(),
getContext()->getSettingsRef().min_free_disk_space_for_temporary_data);
});
if (!is_time_column_func_now)
{
SortDescription order_descr;
order_descr.emplace_back(timestamp_column_name);
pipeline.addSimpleTransform(
[&](const Block & header)
{
return std::make_shared<MergeSortingTransform>(
header,
order_descr,
getContext()->getSettingsRef().max_block_size,
0 /*LIMIT*/,
getContext()->getSettingsRef().max_bytes_before_remerge_sort,
getContext()->getSettingsRef().remerge_sort_lowered_memory_bytes_ratio,
getContext()->getSettingsRef().max_bytes_before_external_sort,
getContext()->getTemporaryVolume(),
getContext()->getSettingsRef().min_free_disk_space_for_temporary_data);
});
}
auto sink = std::make_shared<PushingToWindowViewSink>(interpreter_fetch.getSampleBlock(), *this, nullptr, getContext());