mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
Better UNION ALL: development #1947
This commit is contained in:
parent
a586fd119a
commit
6ef9917fe2
@ -24,6 +24,7 @@
|
||||
#include <DataStreams/OneBlockInputStream.h>
|
||||
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
@ -94,7 +95,7 @@ void InterpreterSelectQuery::init()
|
||||
auto table_expression = query.table();
|
||||
|
||||
/// Read from subquery.
|
||||
if (table_expression && typeid_cast<const ASTSelectQuery *>(table_expression.get()))
|
||||
if (table_expression && typeid_cast<const ASTSelectWithUnionQuery *>(table_expression.get()))
|
||||
{
|
||||
source_header = InterpreterSelectQuery::getSampleBlock(table_expression, context);
|
||||
}
|
||||
@ -564,7 +565,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
|
||||
}
|
||||
|
||||
auto query_table = query.table();
|
||||
if (query_table && typeid_cast<ASTSelectQuery *>(query_table.get()))
|
||||
if (query_table && typeid_cast<ASTSelectWithUnionQuery *>(query_table.get()))
|
||||
{
|
||||
/** There are no limits on the maximum size of the result for the subquery.
|
||||
* Since the result of the query is not the result of the entire query.
|
||||
@ -1072,6 +1073,31 @@ void InterpreterSelectQuery::executeLimitBy(Pipeline & pipeline)
|
||||
}
|
||||
|
||||
|
||||
bool hasWithTotalsInAnySubqueryInFromClause(const ASTSelectQuery & query)
|
||||
{
|
||||
if (query.group_by_with_totals)
|
||||
return true;
|
||||
|
||||
/** NOTE You can also check that the table in the subquery is distributed, and that it only looks at one shard.
|
||||
* In other cases, totals will be computed on the initiating server of the query, and it is not necessary to read the data to the end.
|
||||
*/
|
||||
|
||||
auto query_table = query.table();
|
||||
if (query_table)
|
||||
{
|
||||
auto ast_union = typeid_cast<const ASTSelectWithUnionQuery *>(query_table.get());
|
||||
if (ast_union)
|
||||
{
|
||||
for (const auto & elem : ast_union->list_of_selects->children)
|
||||
if (hasWithTotalsInAnySubqueryInFromClause(typeid_cast<const ASTSelectQuery &>(*elem)))
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::executeLimit(Pipeline & pipeline)
|
||||
{
|
||||
size_t limit_length = 0;
|
||||
@ -1093,34 +1119,10 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline)
|
||||
bool always_read_till_end = false;
|
||||
|
||||
if (query.group_by_with_totals && !query.order_expression_list)
|
||||
{
|
||||
always_read_till_end = true;
|
||||
}
|
||||
|
||||
auto query_table = query.table();
|
||||
if (!query.group_by_with_totals && query_table && typeid_cast<const ASTSelectQuery *>(query_table.get()))
|
||||
{
|
||||
const ASTSelectQuery * subquery = static_cast<const ASTSelectQuery *>(query_table.get());
|
||||
|
||||
while (subquery->table())
|
||||
{
|
||||
if (subquery->group_by_with_totals)
|
||||
{
|
||||
/** NOTE You can also check that the table in the subquery is distributed, and that it only looks at one shard.
|
||||
* In other cases, totals will be computed on the initiating server of the query, and it is not necessary to read the data to the end.
|
||||
*/
|
||||
|
||||
always_read_till_end = true;
|
||||
break;
|
||||
}
|
||||
|
||||
auto subquery_table = subquery->table();
|
||||
if (typeid_cast<const ASTSelectQuery *>(subquery_table.get()))
|
||||
subquery = static_cast<const ASTSelectQuery *>(subquery_table.get());
|
||||
else
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!query.group_by_with_totals && hasWithTotalsInAnySubqueryInFromClause(query))
|
||||
always_read_till_end = true;
|
||||
|
||||
pipeline.transform([&](auto & stream)
|
||||
{
|
||||
|
@ -3,6 +3,9 @@
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <DataStreams/UnionBlockInputStream.h>
|
||||
#include <DataStreams/NullBlockInputStream.h>
|
||||
#include <DataStreams/ConvertingBlockInputStream.h>
|
||||
#include <DataTypes/getLeastSupertype.h>
|
||||
#include <Columns/ColumnConst.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
|
||||
|
||||
@ -12,6 +15,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH;
|
||||
}
|
||||
|
||||
|
||||
@ -35,6 +39,8 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
|
||||
|
||||
for (const auto & select : ast.list_of_selects->children)
|
||||
nested_interpreters.emplace_back(std::make_unique<InterpreterSelectQuery>(select, context, to_stage, subquery_depth));
|
||||
|
||||
init();
|
||||
}
|
||||
|
||||
|
||||
@ -59,28 +65,88 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
|
||||
|
||||
for (const auto & select : ast.list_of_selects->children)
|
||||
nested_interpreters.emplace_back(std::make_unique<InterpreterSelectQuery>(select, context, required_column_names, to_stage, subquery_depth));
|
||||
|
||||
init();
|
||||
}
|
||||
|
||||
|
||||
InterpreterSelectWithUnionQuery::~InterpreterSelectWithUnionQuery() = default;
|
||||
|
||||
|
||||
void InterpreterSelectWithUnionQuery::init()
|
||||
{
|
||||
size_t num_selects = nested_interpreters.size();
|
||||
|
||||
if (!num_selects)
|
||||
throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (num_selects == 1)
|
||||
{
|
||||
result_header = nested_interpreters.front()->getSampleBlock();
|
||||
}
|
||||
else
|
||||
{
|
||||
Blocks headers(num_selects);
|
||||
for (size_t query_num = 0; query_num < num_selects; ++query_num)
|
||||
headers[query_num] = nested_interpreters[query_num]->getSampleBlock();
|
||||
|
||||
result_header = headers.front();
|
||||
size_t num_columns = result_header.columns();
|
||||
|
||||
for (size_t query_num = 1; query_num < num_selects; ++query_num)
|
||||
if (headers[query_num].columns() != num_columns)
|
||||
throw Exception("Different number of columns in UNION ALL elements", ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
|
||||
|
||||
for (size_t column_num = 0; column_num < num_columns; ++column_num)
|
||||
{
|
||||
ColumnWithTypeAndName & result_elem = result_header.getByPosition(column_num);
|
||||
|
||||
/// Determine common type.
|
||||
|
||||
DataTypes types(num_selects);
|
||||
for (size_t query_num = 0; query_num < num_selects; ++query_num)
|
||||
types[query_num] = headers[query_num].getByPosition(column_num).type;
|
||||
|
||||
result_elem.type = getLeastSupertype(types);
|
||||
|
||||
/// If there are different constness or different values of constants, the result must be non-constant.
|
||||
|
||||
if (result_elem.column->isColumnConst())
|
||||
{
|
||||
bool need_materialize = false;
|
||||
for (size_t query_num = 1; query_num < num_selects; ++query_num)
|
||||
{
|
||||
const ColumnWithTypeAndName & source_elem = headers[query_num].getByPosition(column_num);
|
||||
|
||||
if (!source_elem.column->isColumnConst()
|
||||
|| (static_cast<const ColumnConst &>(*result_elem.column).getField()
|
||||
!= static_cast<const ColumnConst &>(*source_elem.column).getField()))
|
||||
{
|
||||
need_materialize = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (need_materialize)
|
||||
result_elem.column = result_elem.type->createColumn();
|
||||
}
|
||||
|
||||
/// BTW, result column names are from first SELECT.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Block InterpreterSelectWithUnionQuery::getSampleBlock()
|
||||
{
|
||||
return nested_interpreters.front()->getSampleBlock();
|
||||
return result_header;
|
||||
}
|
||||
|
||||
Block InterpreterSelectWithUnionQuery::getSampleBlock(
|
||||
const ASTPtr & query_ptr,
|
||||
const Context & context)
|
||||
{
|
||||
const ASTSelectWithUnionQuery & ast = typeid_cast<const ASTSelectWithUnionQuery &>(*query_ptr);
|
||||
|
||||
size_t num_selects = ast.list_of_selects->children.size();
|
||||
if (!num_selects)
|
||||
throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
return InterpreterSelectQuery::getSampleBlock(ast.list_of_selects->children.front(), context);
|
||||
return InterpreterSelectWithUnionQuery(query_ptr, context).getSampleBlock();
|
||||
}
|
||||
|
||||
|
||||
@ -114,8 +180,12 @@ BlockIO InterpreterSelectWithUnionQuery::execute()
|
||||
}
|
||||
else
|
||||
{
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
result_stream = std::make_shared<UnionBlockInputStream<>>(nested_streams, nullptr, settings.max_threads);
|
||||
/// Unify data structure.
|
||||
if (nested_interpreters.size() > 1)
|
||||
for (auto & stream : nested_streams)
|
||||
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, result_header, ConvertingBlockInputStream::MatchColumnsMode::Position);
|
||||
|
||||
result_stream = std::make_shared<UnionBlockInputStream<>>(nested_streams, nullptr, context.getSettingsRef().max_threads);
|
||||
nested_streams.clear();
|
||||
}
|
||||
|
||||
|
@ -51,6 +51,10 @@ private:
|
||||
size_t subquery_depth;
|
||||
|
||||
std::vector<std::unique_ptr<InterpreterSelectQuery>> nested_interpreters;
|
||||
|
||||
Block result_header;
|
||||
|
||||
void init();
|
||||
};
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user