Draft of proper support for subquries in live view tables.

This commit is contained in:
Vitaliy Zakaznikov 2019-12-05 00:29:37 +01:00
parent b05c009d77
commit 7f92e6a21f
3 changed files with 134 additions and 23 deletions

View File

@ -0,0 +1,46 @@
#pragma once
#include <Storages/IStorage.h>
namespace DB
{
class StorageBlocks : public IStorage
{
public:
StorageBlocks(const std::string & database_name_, const std::string & table_name_,
const ColumnsDescription & columns_, BlockInputStreams streams_,
QueryProcessingStage::Enum to_stage_)
: database_name(database_name_), table_name(table_name_), streams(streams_), to_stage(to_stage_)
{
setColumns(columns_);
}
static StoragePtr createStorage(std::string database_name, std::string table_name, const ColumnsDescription columns, BlockInputStreams streams, QueryProcessingStage::Enum to_stage)
{
return std::make_shared<StorageBlocks>(std::move(database_name), std::move(table_name), std::move(columns), std::move(streams), to_stage);
}
std::string getName() const override { return "Blocks"; }
std::string getTableName() const override { return table_name; }
std::string getDatabaseName() const override { return database_name; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & /*context*/) const override { return to_stage; }
BlockInputStreams read(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
return streams;
}
private:
std::string database_name;
std::string table_name;
Block res_block;
BlockInputStreams streams;
QueryProcessingStage::Enum to_stage;
};
}

View File

@ -33,6 +33,7 @@ limitations under the License. */
#include <Storages/LiveView/LiveViewBlockOutputStream.h> #include <Storages/LiveView/LiveViewBlockOutputStream.h>
#include <Storages/LiveView/LiveViewEventsBlockInputStream.h> #include <Storages/LiveView/LiveViewEventsBlockInputStream.h>
#include <Storages/LiveView/ProxyStorage.h> #include <Storages/LiveView/ProxyStorage.h>
#include <Storages/LiveView/StorageBlocks.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Parsers/ASTTablesInSelectQuery.h> #include <Parsers/ASTTablesInSelectQuery.h>
@ -52,13 +53,44 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED; extern const int SUPPORT_IS_DISABLED;
} }
static void extractDependentTable(ASTSelectQuery & query, String & select_database_name, String & select_table_name) static ASTTableExpression * getTableExpression(ASTSelectQuery & select, size_t table_number)
{ {
auto db_and_table = getDatabaseAndTable(query, 0); if (!select.tables())
ASTPtr subquery = extractTableExpression(query, 0); return {};
const auto & tables_in_select_query = select.tables()->as<ASTTablesInSelectQuery &>();
if (tables_in_select_query.children.size() <= table_number)
return {};
const auto & tables_element = tables_in_select_query.children[table_number]->as<ASTTablesInSelectQueryElement &>();
if (!tables_element.table_expression)
return {};
return tables_element.table_expression->as<ASTTableExpression>();
}
static void extractDependentTable(ASTPtr & query, String & select_database_name, String & select_table_name, const String & database_name, const String & table_name, ASTPtr & inner_outer_query, ASTPtr & inner_subquery)
{
ASTSelectQuery & select_query = typeid_cast<ASTSelectQuery &>(*query);
auto db_and_table = getDatabaseAndTable(select_query, 0);
ASTPtr subquery = extractTableExpression(select_query, 0);
if (!db_and_table && !subquery) if (!db_and_table && !subquery)
{
if (inner_outer_query && inner_subquery)
{
auto table_expression = getTableExpression(inner_outer_query->as<ASTSelectQuery &>(), 0);
//String table_alias = getTableExpressionAlias(table_expression);
table_expression->subquery = nullptr;
table_expression->database_and_table_name = createTableIdentifier(database_name, table_name);
//if (!table_alias.empty())
// table_expression->database_and_table_name->setAlias(table_alias);
}
return; return;
}
if (db_and_table) if (db_and_table)
{ {
@ -68,7 +100,7 @@ static void extractDependentTable(ASTSelectQuery & query, String & select_databa
{ {
db_and_table->database = select_database_name; db_and_table->database = select_database_name;
AddDefaultDatabaseVisitor visitor(select_database_name); AddDefaultDatabaseVisitor visitor(select_database_name);
visitor.visit(query); visitor.visit(select_query);
} }
else else
select_database_name = db_and_table->database; select_database_name = db_and_table->database;
@ -78,9 +110,10 @@ static void extractDependentTable(ASTSelectQuery & query, String & select_databa
if (ast_select->list_of_selects->children.size() != 1) if (ast_select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW); throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
auto & inner_query = ast_select->list_of_selects->children.at(0); inner_outer_query = query;
inner_subquery = ast_select->list_of_selects->children.at(0);
extractDependentTable(inner_query->as<ASTSelectQuery &>(), select_database_name, select_table_name); extractDependentTable(inner_subquery, select_database_name, select_table_name, database_name, table_name, inner_outer_query, inner_subquery);
} }
else else
throw Exception("Logical error while creating StorageLiveView." throw Exception("Logical error while creating StorageLiveView."
@ -112,6 +145,10 @@ void StorageLiveView::writeIntoLiveView(
BlockInputStreams from; BlockInputStreams from;
BlocksPtrs mergeable_blocks; BlocksPtrs mergeable_blocks;
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>(); BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
ASTPtr mergeable_query = live_view.getInnerQuery();
if (live_view.getInnerSubQuery())
mergeable_query = live_view.getInnerSubQuery();
{ {
std::lock_guard lock(live_view.mutex); std::lock_guard lock(live_view.mutex);
@ -121,7 +158,7 @@ void StorageLiveView::writeIntoLiveView(
{ {
mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>(); mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
BlocksPtr base_mergeable_blocks = std::make_shared<Blocks>(); BlocksPtr base_mergeable_blocks = std::make_shared<Blocks>();
InterpreterSelectQuery interpreter(live_view.getInnerQuery(), context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names()); InterpreterSelectQuery interpreter(mergeable_query, context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>( auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
interpreter.execute().in); interpreter.execute().in);
while (Block this_block = view_mergeable_stream->read()) while (Block this_block = view_mergeable_stream->read())
@ -148,8 +185,7 @@ void StorageLiveView::writeIntoLiveView(
auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName()); auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName());
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)}; BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(streams), QueryProcessingStage::FetchColumns); auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(streams), QueryProcessingStage::FetchColumns);
InterpreterSelectQuery select_block(live_view.getInnerQuery(), InterpreterSelectQuery select_block(mergeable_query, context, proxy_storage,
context, proxy_storage,
QueryProcessingStage::WithMergeableState); QueryProcessingStage::WithMergeableState);
auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>( auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
select_block.execute().in); select_block.execute().in);
@ -178,10 +214,18 @@ void StorageLiveView::writeIntoLiveView(
} }
auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName()); auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName());
auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(from), QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(live_view.getInnerQuery(), context, proxy_storage, QueryProcessingStage::Complete); auto blocks_storage = StorageBlocks::createStorage(live_view.database_name, live_view.table_name, parent_storage->getColumns(), std::move(from), QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(mergeable_query->clone(), context, blocks_storage, SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in); BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
if (live_view.getInnerSubQuery())
{
auto outer_blocks_storage = StorageBlocks::createStorage(live_view.database_name, live_view.table_name, ColumnsDescription(data->getHeader().getNamesAndTypesList()), {data}, QueryProcessingStage::FetchColumns);
InterpreterSelectQuery outer_select(live_view.getInnerOuterQuery(), context, outer_blocks_storage, SelectQueryOptions(QueryProcessingStage::Complete));
data = std::make_shared<MaterializingBlockInputStream>(outer_select.execute().in);
}
/// Squashing is needed here because the view query can generate a lot of blocks /// Squashing is needed here because the view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered). /// and two-level aggregation is triggered).
@ -216,8 +260,7 @@ StorageLiveView::StorageLiveView(
inner_query = query.select->list_of_selects->children.at(0); inner_query = query.select->list_of_selects->children.at(0);
ASTSelectQuery & select_query = typeid_cast<ASTSelectQuery &>(*inner_query); extractDependentTable(inner_query, select_database_name, select_table_name, database_name, table_name, inner_outer_query, inner_subquery);
extractDependentTable(select_query, select_database_name, select_table_name);
/// If the table is not specified - use the table `system.one` /// If the table is not specified - use the table `system.one`
if (select_table_name.empty()) if (select_table_name.empty())
@ -260,9 +303,7 @@ Block StorageLiveView::getHeader() const
if (!sample_block) if (!sample_block)
{ {
auto storage = global_context.getTable(select_database_name, select_table_name); sample_block = InterpreterSelectQuery(inner_query->clone(), *live_view_context, SelectQueryOptions(QueryProcessingStage::Complete)).getSampleBlock();
sample_block = InterpreterSelectQuery(inner_query, *live_view_context, storage,
SelectQueryOptions(QueryProcessingStage::Complete)).getSampleBlock();
sample_block.insert({DataTypeUInt64().createColumnConst( sample_block.insert({DataTypeUInt64().createColumnConst(
sample_block.rows(), 0)->convertToFullColumnIfConst(), sample_block.rows(), 0)->convertToFullColumnIfConst(),
std::make_shared<DataTypeUInt64>(), std::make_shared<DataTypeUInt64>(),
@ -274,7 +315,6 @@ Block StorageLiveView::getHeader() const
sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst(); sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst();
} }
} }
return sample_block; return sample_block;
} }
@ -285,20 +325,31 @@ bool StorageLiveView::getNewBlocks()
BlocksPtr new_blocks = std::make_shared<Blocks>(); BlocksPtr new_blocks = std::make_shared<Blocks>();
BlocksMetadataPtr new_blocks_metadata = std::make_shared<BlocksMetadata>(); BlocksMetadataPtr new_blocks_metadata = std::make_shared<BlocksMetadata>();
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>(); BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
ASTPtr mergeable_query = inner_query;
InterpreterSelectQuery interpreter(inner_query->clone(), *live_view_context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names()); if (inner_subquery)
mergeable_query = inner_subquery;
InterpreterSelectQuery interpreter(mergeable_query->clone(), *live_view_context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
auto mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().in); auto mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().in);
while (Block block = mergeable_stream->read()) while (Block block = mergeable_stream->read())
new_mergeable_blocks->push_back(block); new_mergeable_blocks->push_back(block);
mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>(); mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
mergeable_blocks->push_back(new_mergeable_blocks); mergeable_blocks->push_back(new_mergeable_blocks);
BlockInputStreamPtr from = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(new_mergeable_blocks), mergeable_stream->getHeader()); BlockInputStreamPtr from = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(new_mergeable_blocks), mergeable_stream->getHeader());
auto proxy_storage = ProxyStorage::createProxyStorage(global_context.getTable(select_database_name, select_table_name), {from}, QueryProcessingStage::WithMergeableState); auto blocks_storage = StorageBlocks::createStorage(database_name, table_name, global_context.getTable(select_database_name, select_table_name)->getColumns(), {from}, QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(inner_query->clone(), *live_view_context, proxy_storage, SelectQueryOptions(QueryProcessingStage::Complete)); InterpreterSelectQuery select(mergeable_query->clone(), *live_view_context, blocks_storage, SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in); BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
if (inner_subquery)
{
auto outer_blocks_storage = StorageBlocks::createStorage(database_name, table_name,ColumnsDescription(data->getHeader().getNamesAndTypesList()), {data}, QueryProcessingStage::FetchColumns);
InterpreterSelectQuery outer_select(inner_outer_query->clone(), *live_view_context, outer_blocks_storage, SelectQueryOptions(QueryProcessingStage::Complete));
data = std::make_shared<MaterializingBlockInputStream>(outer_select.execute().in);
}
/// Squashing is needed here because the view query can generate a lot of blocks /// Squashing is needed here because the view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered). /// and two-level aggregation is triggered).

View File

@ -51,6 +51,18 @@ public:
// const NamesAndTypesList & getColumnsListImpl() const override { return *columns; } // const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
ASTPtr getInnerQuery() const { return inner_query->clone(); } ASTPtr getInnerQuery() const { return inner_query->clone(); }
ASTPtr getInnerSubQuery() const
{
if (inner_subquery)
return inner_subquery->clone();
return nullptr;
};
ASTPtr getInnerOuterQuery() const
{
if (inner_outer_query)
return inner_outer_query->clone();
return nullptr;
};
/// It is passed inside the query and solved at its level. /// It is passed inside the query and solved at its level.
bool supportsSampling() const override { return true; } bool supportsSampling() const override { return true; }
@ -146,7 +158,9 @@ private:
String select_table_name; String select_table_name;
String table_name; String table_name;
String database_name; String database_name;
ASTPtr inner_query; ASTPtr inner_query; /// stored query : SELECT * FROM ( SELECT a FROM A)
ASTPtr inner_subquery; /// stored query's subquery if any : SLECT a FROM A
ASTPtr inner_outer_query; /// the query right before innermost subquery : ... SELECT * FROM ( subquery )
Context & global_context; Context & global_context;
std::unique_ptr<Context> live_view_context; std::unique_ptr<Context> live_view_context;