This commit is contained in:
Vitaliy Zakaznikov 2020-01-01 11:44:45 -05:00
parent cf22cde702
commit 2e36fd417d

View File

@ -54,35 +54,6 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED;
}
static ASTTableExpression * getTableExpression(ASTSelectQuery & select, size_t table_number)
{
if (!select.tables())
return {};
const auto & tables_in_select_query = select.tables()->as<ASTTablesInSelectQuery &>();
if (tables_in_select_query.children.size() <= table_number)
return {};
const auto & tables_element = tables_in_select_query.children[table_number]->as<ASTTablesInSelectQueryElement &>();
if (!tables_element.table_expression)
return {};
return tables_element.table_expression->as<ASTTableExpression>();
}
static String getTableExpressionAlias(const ASTTableExpression * table_expression)
{
if (table_expression->subquery)
return table_expression->subquery->tryGetAlias();
else if (table_expression->table_function)
return table_expression->table_function->tryGetAlias();
else if (table_expression->database_and_table_name)
return table_expression->database_and_table_name->tryGetAlias();
return String();
}
static void extractDependentTable(ASTPtr & query, String & select_database_name, String & select_table_name, const String & database_name, const String & table_name, ASTPtr & inner_outer_query, ASTPtr & inner_subquery)
{
ASTSelectQuery & select_query = typeid_cast<ASTSelectQuery &>(*query);
@ -107,16 +78,7 @@ static void extractDependentTable(ASTPtr & query, String & select_database_name,
else
select_database_name = db_and_table->database;
if (inner_subquery)
{
auto table_expression = getTableExpression(inner_outer_query->as<ASTSelectQuery &>(), 0);
String table_alias = getTableExpressionAlias(table_expression);
table_expression->subquery = nullptr;
table_expression->database_and_table_name = createTableIdentifier("", table_name + "_blocks");
if (!table_alias.empty())
table_expression->database_and_table_name->setAlias(table_alias);
}
select_query.replaceDatabaseAndTable("", table_name + "_blocks");
}
else if (auto * ast_select = subquery->as<ASTSelectWithUnionQuery>())
{
@ -124,9 +86,9 @@ static void extractDependentTable(ASTPtr & query, String & select_database_name,
throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
inner_outer_query = query;
inner_subquery = ast_select->list_of_selects->children.at(0);
inner_subquery = ast_select->list_of_selects->children.at(0)->clone();
extractDependentTable(inner_subquery, select_database_name, select_table_name, database_name, table_name, inner_outer_query, inner_subquery);
extractDependentTable(ast_select->list_of_selects->children.at(0), select_database_name, select_table_name, database_name, table_name, inner_outer_query, inner_subquery);
}
else
throw Exception("Logical error while creating StorageLiveView."
@ -231,16 +193,10 @@ void StorageLiveView::writeIntoLiveView(
}
auto blocks_storage = StorageBlocks::createStorage(live_view.database_name, live_view.table_name, parent_storage->getColumns(), std::move(from), QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(mergeable_query->clone(), context, blocks_storage, SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
block_context->addExternalTable(live_view.table_name + "_blocks", blocks_storage);
if (live_view.getInnerSubQuery())
{
auto outer_blocks_storage = StorageBlocks::createStorage("_liveview", live_view.table_name + "_blocks", ColumnsDescription(data->getHeader().getNamesAndTypesList()), {data}, QueryProcessingStage::FetchColumns);
block_context->addExternalTable(live_view.table_name + "_blocks", outer_blocks_storage);
InterpreterSelectQuery outer_select(live_view.getInnerOuterQuery(), *block_context, SelectQueryOptions(QueryProcessingStage::Complete));
data = std::make_shared<MaterializingBlockInputStream>(outer_select.execute().in);
}
InterpreterSelectQuery select(live_view.getInnerOuterQuery(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
/// Squashing is needed here because the view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
@ -278,6 +234,9 @@ StorageLiveView::StorageLiveView(
inner_outer_query = inner_query->clone();
ASTPtr outer_query = inner_query->clone();
inner_outer_query = inner_query->clone();
InterpreterSelectQuery(inner_outer_query, *live_view_context, SelectQueryOptions().modify().analyze());
extractDependentTable(inner_outer_query, select_database_name, select_table_name, database_name, table_name, outer_query, inner_subquery);
/// If the table is not specified - use the table `system.one`
@ -354,24 +313,19 @@ bool StorageLiveView::getNewBlocks()
while (Block block = mergeable_stream->read())
new_mergeable_blocks->push_back(block);
auto block_context = std::make_unique<Context>(global_context);
block_context->makeQueryContext();
mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
mergeable_blocks->push_back(new_mergeable_blocks);
BlockInputStreamPtr from = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(new_mergeable_blocks), mergeable_stream->getHeader());
auto blocks_storage = StorageBlocks::createStorage(database_name, table_name, global_context.getTable(select_database_name, select_table_name)->getColumns(), {from}, QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(mergeable_query->clone(), *live_view_context, blocks_storage, SelectQueryOptions(QueryProcessingStage::Complete));
block_context->addExternalTable(table_name + "_blocks", blocks_storage);
InterpreterSelectQuery select(inner_outer_query->clone(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
if (inner_subquery)
{
auto block_context = std::make_unique<Context>(global_context);
block_context->makeQueryContext();
auto outer_blocks_storage = StorageBlocks::createStorage("_liveview", table_name + "_blocks", ColumnsDescription(data->getHeader().getNamesAndTypesList()), {data}, QueryProcessingStage::FetchColumns);
block_context->addExternalTable(table_name + "_blocks", outer_blocks_storage);
InterpreterSelectQuery outer_select(inner_outer_query->clone(), *block_context, SelectQueryOptions(QueryProcessingStage::Complete));
data = std::make_shared<MaterializingBlockInputStream>(outer_select.execute().in);
}
/// Squashing is needed here because the view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).