removing function populate in windowview

This commit is contained in:
Vxider 2022-05-10 15:06:58 +08:00
parent 8c3c80f84c
commit 5d0a5d34c8
5 changed files with 86 additions and 75 deletions

View File

@ -1437,19 +1437,17 @@ BlockIO InterpreterCreateQuery::fillTableIfNeeded(const ASTCreateQuery & create)
/// If the query is a CREATE SELECT, insert the data into the table.
if (create.select && !create.attach
&& !create.is_ordinary_view && !create.is_live_view
&& (!create.is_materialized_view || create.is_populate))
&& (!(create.is_materialized_view || create.is_window_view) || create.is_populate))
{
if (create.is_window_view)
{
auto table = DatabaseCatalog::instance().getTable({create.getDatabase(), create.getTable(), create.uuid}, getContext());
if (auto * window_view = typeid_cast<StorageWindowView *>(table.get()))
return window_view->populate();
return {};
}
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = {create.getDatabase(), create.getTable(), create.uuid};
insert->select = create.select->clone();
if (create.is_window_view)
{
auto table = DatabaseCatalog::instance().getTable(insert->table_id, getContext());
insert->select = typeid_cast<StorageWindowView *>(table.get())->getSourceTableSelectQuery();
}
else
insert->select = create.select->clone();
return InterpreterInsertQuery(insert, getContext(),
getContext()->getSettingsRef().insert_allow_materialized_columns).execute();

View File

@ -314,7 +314,7 @@ Chain buildPushingToViewsChain(
runtime_stats->type = QueryViewsLogElement::ViewType::WINDOW;
query = window_view->getMergeableQuery(); // Used only to log in system.query_views_log
out = buildPushingToViewsChain(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, view_thread_status, view_counter_ms, storage_header);
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, view_thread_status, view_counter_ms);
}
else
out = buildPushingToViewsChain(
@ -392,7 +392,7 @@ Chain buildPushingToViewsChain(
}
else if (auto * window_view = dynamic_cast<StorageWindowView *>(storage.get()))
{
auto sink = std::make_shared<PushingToWindowViewSink>(live_view_header, *window_view, storage, context);
auto sink = std::make_shared<PushingToWindowViewSink>(window_view->getHeader(), *window_view, storage, context);
sink->setRuntimeData(thread_status, elapsed_counter_ms);
result_chain.addSource(std::move(sink));
}

View File

@ -606,6 +606,54 @@ inline void StorageWindowView::fire(UInt32 watermark)
}
}
ASTPtr StorageWindowView::getSourceTableSelectQuery()
{
auto select_query_ = select_query->clone();
auto & modified_select = select_query_->as<ASTSelectQuery &>();
if (hasJoin(modified_select))
{
auto analyzer_res = TreeRewriterResult({});
removeJoin(modified_select, analyzer_res, getContext());
}
else
{
modified_select.setExpression(ASTSelectQuery::Expression::HAVING, {});
modified_select.setExpression(ASTSelectQuery::Expression::GROUP_BY, {});
}
auto select_list = std::make_shared<ASTExpressionList>();
for (const auto & column_name : source_header.getNames())
select_list->children.emplace_back(std::make_shared<ASTIdentifier>(column_name));
modified_select.setExpression(ASTSelectQuery::Expression::SELECT, select_list);
if (!is_time_column_func_now)
{
auto select_query_ = select_query->clone();
DropTableIdentifierMatcher::Data drop_table_identifier_data;
DropTableIdentifierMatcher::Visitor drop_table_identifier_visitor(drop_table_identifier_data);
drop_table_identifier_visitor.visit(select_query_);
FetchQueryInfoMatcher::Data query_info_data;
FetchQueryInfoMatcher::Visitor(query_info_data).visit(select_query_);
auto order_by = std::make_shared<ASTExpressionList>();
auto order_by_elem = std::make_shared<ASTOrderByElement>();
order_by_elem->children.push_back(std::make_shared<ASTIdentifier>(query_info_data.timestamp_column_name));
order_by_elem->direction = 1;
order_by->children.push_back(order_by_elem);
modified_select.setExpression(ASTSelectQuery::Expression::ORDER_BY, std::move(order_by));
}
else
modified_select.setExpression(ASTSelectQuery::Expression::ORDER_BY, {});
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
select_with_union_query->list_of_selects->children.push_back(select_query_);
return select_with_union_query;
}
std::shared_ptr<ASTCreateQuery> StorageWindowView::getInnerTableCreateQuery(
const ASTPtr & inner_query, ASTStorage * storage, const String & database_name, const String & table_name)
{
@ -1002,16 +1050,11 @@ StorageWindowView::StorageWindowView(
const StorageID & table_id_,
ContextPtr context_,
const ASTCreateQuery & query,
const ColumnsDescription & columns_,
bool attach_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get(fmt::format("StorageWindowView({}.{})", table_id_.database_name, table_id_.table_name)))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
if (!query.select)
throw Exception(ErrorCodes::INCORRECT_QUERY, "SELECT query is not specified for {}", getName());
@ -1021,6 +1064,13 @@ StorageWindowView::StorageWindowView(
"UNION is not supported for {}", getName());
select_query = query.select->list_of_selects->children.at(0)->clone();
source_header = InterpreterSelectQuery(select_query->clone(), getContext(), SelectQueryOptions(QueryProcessingStage::FetchColumns))
.getSampleBlock();
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(ColumnsDescription(source_header.getNamesAndTypesList()));
setInMemoryMetadata(storage_metadata);
String select_database_name = getContext()->getCurrentDatabase();
String select_table_name;
auto select_query_tmp = select_query->clone();
@ -1205,52 +1255,6 @@ private:
ContextPtr context;
};
BlockIO StorageWindowView::populate()
{
QueryPipelineBuilder pipeline;
InterpreterSelectQuery interpreter_fetch{select_query, getContext(), SelectQueryOptions(QueryProcessingStage::FetchColumns)};
pipeline = interpreter_fetch.buildQueryPipeline();
if (!is_time_column_func_now)
{
SortDescription order_descr;
order_descr.emplace_back(timestamp_column_name);
pipeline.addSimpleTransform(
[&](const Block & header)
{
return std::make_shared<MergeSortingTransform>(
header,
order_descr,
getContext()->getSettingsRef().max_block_size,
0 /*LIMIT*/,
getContext()->getSettingsRef().max_bytes_before_remerge_sort,
getContext()->getSettingsRef().remerge_sort_lowered_memory_bytes_ratio,
getContext()->getSettingsRef().max_bytes_before_external_sort,
getContext()->getTemporaryVolume(),
getContext()->getSettingsRef().min_free_disk_space_for_temporary_data);
});
}
auto sink = std::make_shared<PushingToWindowViewSink>(interpreter_fetch.getSampleBlock(), *this, nullptr, getContext());
BlockIO res;
pipeline.addChain(Chain(std::move(sink)));
pipeline.setMaxThreads(1);
pipeline.setSinks([&](const Block & cur_header, QueryPipelineBuilder::StreamType) -> ProcessorPtr
{
return std::make_shared<EmptySink>(cur_header);
});
res.pipeline = QueryPipelineBuilder::getPipeline(std::move(pipeline));
res.pipeline.addStorageHolder(shared_from_this());
res.pipeline.addStorageHolder(getInnerTable());
return res;
}
void StorageWindowView::writeIntoWindowView(
StorageWindowView & window_view, const Block & block, ContextPtr local_context)
{
@ -1492,21 +1496,26 @@ void StorageWindowView::dropInnerTableIfAny(bool no_delay, ContextPtr local_cont
}
}
Block & StorageWindowView::getHeader() const
Block StorageWindowView::getHeader() const
{
return source_header;
}
Block StorageWindowView::getTargetHeader() const
{
std::lock_guard lock(sample_block_lock);
if (!sample_block)
if (!target_header)
{
sample_block = InterpreterSelectQuery(select_query->clone(), getContext(), SelectQueryOptions(QueryProcessingStage::Complete))
target_header = InterpreterSelectQuery(select_query->clone(), getContext(), SelectQueryOptions(QueryProcessingStage::Complete))
.getSampleBlock();
/// convert all columns to full columns
/// in case some of them are constant
for (size_t i = 0; i < sample_block.columns(); ++i)
for (size_t i = 0; i < target_header.columns(); ++i)
{
sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst();
target_header.safeGetByPosition(i).column = target_header.safeGetByPosition(i).column->convertToFullColumnIfConst();
}
}
return sample_block;
return target_header;
}
StoragePtr StorageWindowView::getSourceTable() const
@ -1578,7 +1587,7 @@ void registerStorageWindowView(StorageFactory & factory)
"Experimental WINDOW VIEW feature is not enabled (the setting 'allow_experimental_window_view')",
ErrorCodes::SUPPORT_IS_DISABLED);
return std::make_shared<StorageWindowView>(args.table_id, args.getLocalContext(), args.query, args.columns, args.attach);
return std::make_shared<StorageWindowView>(args.table_id, args.getLocalContext(), args.query, args.attach);
});
}

View File

@ -108,7 +108,6 @@ public:
const StorageID & table_id_,
ContextPtr context_,
const ASTCreateQuery & query,
const ColumnsDescription & columns,
bool attach_);
String getName() const override { return "WindowView"; }
@ -153,6 +152,10 @@ public:
ASTPtr getMergeableQuery() const { return mergeable_query->clone(); }
ASTPtr getSourceTableSelectQuery();
Block getHeader() const;
private:
Poco::Logger * log;
@ -168,7 +171,8 @@ private:
bool is_tumble; // false if is hop
std::atomic<bool> shutdown_called{false};
bool has_inner_table{true};
mutable Block sample_block;
mutable Block source_header;
mutable Block target_header;
UInt64 clean_interval_ms;
const DateLUTImpl * time_zone = nullptr;
UInt32 max_timestamp = 0;
@ -240,6 +244,6 @@ private:
StoragePtr getInnerTable() const;
StoragePtr getTargetTable() const;
Block & getHeader() const;
Block getTargetHeader() const;
};
}

View File

@ -20,7 +20,7 @@ public:
: SourceWithProgress(
is_events_ ? Block(
{ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark")})
: storage_->getHeader())
: storage_->getTargetHeader())
, storage(storage_)
, is_events(is_events_)
, window_view_timezone(window_view_timezone_)
@ -32,7 +32,7 @@ public:
header.insert(
ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark"));
else
header = storage->getHeader();
header = storage->getTargetHeader();
}
String getName() const override { return "WindowViewSource"; }