2018-02-25 00:50:53 +00:00
|
|
|
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
|
|
|
|
#include <Interpreters/InterpreterSelectQuery.h>
|
|
|
|
#include <Parsers/ASTSelectWithUnionQuery.h>
|
2018-02-27 20:16:58 +00:00
|
|
|
#include <Parsers/ASTSelectQuery.h>
|
2018-02-25 00:50:53 +00:00
|
|
|
#include <DataStreams/UnionBlockInputStream.h>
|
|
|
|
#include <DataStreams/NullBlockInputStream.h>
|
2018-02-28 02:33:04 +00:00
|
|
|
#include <DataStreams/ConcatBlockInputStream.h>
|
2018-02-26 06:12:59 +00:00
|
|
|
#include <DataStreams/ConvertingBlockInputStream.h>
|
2019-05-31 12:36:44 +00:00
|
|
|
#include <Columns/getLeastSuperColumn.h>
|
2018-02-26 06:12:59 +00:00
|
|
|
#include <Columns/ColumnConst.h>
|
2018-02-25 07:39:45 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2018-05-30 19:23:15 +00:00
|
|
|
#include <Parsers/queryToString.h>
|
2019-02-11 19:53:55 +00:00
|
|
|
#include <Parsers/ASTExpressionList.h>
|
2018-02-25 00:50:53 +00:00
|
|
|
|
2019-03-26 18:28:37 +00:00
|
|
|
#include <Processors/Sources/NullSource.h>
|
|
|
|
#include <Processors/QueryPipeline.h>
|
2019-11-05 17:33:03 +00:00
|
|
|
#include <Processors/Pipe.h>
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2018-08-05 02:39:09 +00:00
|
|
|
|
2018-02-25 00:50:53 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
2018-02-26 06:12:59 +00:00
|
|
|
extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH;
|
2018-02-25 00:50:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
|
|
|
|
const ASTPtr & query_ptr_,
|
|
|
|
const Context & context_,
|
2019-03-18 12:05:51 +00:00
|
|
|
const SelectQueryOptions & options_,
|
2019-03-15 13:49:58 +00:00
|
|
|
const Names & required_result_column_names)
|
2019-03-18 12:05:51 +00:00
|
|
|
: options(options_),
|
2019-03-15 13:49:58 +00:00
|
|
|
query_ptr(query_ptr_),
|
|
|
|
context(context_)
|
2018-02-25 00:50:53 +00:00
|
|
|
{
|
2019-03-15 16:14:13 +00:00
|
|
|
const auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>();
|
2018-02-25 07:39:45 +00:00
|
|
|
|
|
|
|
size_t num_selects = ast.list_of_selects->children.size();
|
2018-02-25 00:50:53 +00:00
|
|
|
|
|
|
|
if (!num_selects)
|
|
|
|
throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2018-02-27 20:16:58 +00:00
|
|
|
/// Initialize interpreters for each SELECT query.
|
|
|
|
/// Note that we pass 'required_result_column_names' to first SELECT.
|
2018-02-27 20:43:42 +00:00
|
|
|
/// And for the rest, we pass names at the corresponding positions of 'required_result_column_names' in the result of first SELECT,
|
2018-02-27 20:16:58 +00:00
|
|
|
/// because names could be different.
|
2018-02-25 00:50:53 +00:00
|
|
|
|
2018-02-27 20:16:58 +00:00
|
|
|
nested_interpreters.reserve(num_selects);
|
2018-02-25 00:50:53 +00:00
|
|
|
|
2018-02-27 20:43:42 +00:00
|
|
|
std::vector<Names> required_result_column_names_for_other_selects(num_selects);
|
2018-03-02 04:05:20 +00:00
|
|
|
if (!required_result_column_names.empty() && num_selects > 1)
|
2018-02-27 20:16:58 +00:00
|
|
|
{
|
2018-02-27 20:43:42 +00:00
|
|
|
/// Result header if there are no filtering by 'required_result_column_names'.
|
|
|
|
/// We use it to determine positions of 'required_result_column_names' in SELECT clause.
|
|
|
|
|
|
|
|
Block full_result_header = InterpreterSelectQuery(
|
2019-03-18 12:05:51 +00:00
|
|
|
ast.list_of_selects->children.at(0), context, options.copy().analyze().noModify()).getSampleBlock();
|
2018-02-27 20:16:58 +00:00
|
|
|
|
2018-02-27 20:43:42 +00:00
|
|
|
std::vector<size_t> positions_of_required_result_columns(required_result_column_names.size());
|
2018-02-27 20:16:58 +00:00
|
|
|
for (size_t required_result_num = 0, size = required_result_column_names.size(); required_result_num < size; ++required_result_num)
|
2018-02-27 20:43:42 +00:00
|
|
|
positions_of_required_result_columns[required_result_num] = full_result_header.getPositionByName(required_result_column_names[required_result_num]);
|
|
|
|
|
|
|
|
for (size_t query_num = 1; query_num < num_selects; ++query_num)
|
2018-02-27 20:16:58 +00:00
|
|
|
{
|
2018-02-27 20:43:42 +00:00
|
|
|
Block full_result_header_for_current_select = InterpreterSelectQuery(
|
2019-03-18 12:05:51 +00:00
|
|
|
ast.list_of_selects->children.at(query_num), context, options.copy().analyze().noModify()).getSampleBlock();
|
2018-02-27 20:43:42 +00:00
|
|
|
|
|
|
|
if (full_result_header_for_current_select.columns() != full_result_header.columns())
|
2018-08-05 02:39:09 +00:00
|
|
|
throw Exception("Different number of columns in UNION ALL elements:\n"
|
|
|
|
+ full_result_header.dumpNames()
|
|
|
|
+ "\nand\n"
|
|
|
|
+ full_result_header_for_current_select.dumpNames() + "\n",
|
|
|
|
ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
|
2018-02-27 20:43:42 +00:00
|
|
|
|
|
|
|
required_result_column_names_for_other_selects[query_num].reserve(required_result_column_names.size());
|
|
|
|
for (const auto & pos : positions_of_required_result_columns)
|
|
|
|
required_result_column_names_for_other_selects[query_num].push_back(full_result_header_for_current_select.getByPosition(pos).name);
|
2018-02-27 20:16:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for (size_t query_num = 0; query_num < num_selects; ++query_num)
|
|
|
|
{
|
2019-02-11 19:53:55 +00:00
|
|
|
const Names & current_required_result_column_names
|
|
|
|
= query_num == 0 ? required_result_column_names : required_result_column_names_for_other_selects[query_num];
|
2018-02-27 20:16:58 +00:00
|
|
|
|
2018-02-27 20:43:42 +00:00
|
|
|
nested_interpreters.emplace_back(std::make_unique<InterpreterSelectQuery>(
|
2019-02-11 19:53:55 +00:00
|
|
|
ast.list_of_selects->children.at(query_num),
|
|
|
|
context,
|
2019-03-18 12:05:51 +00:00
|
|
|
options,
|
2019-03-15 13:49:58 +00:00
|
|
|
current_required_result_column_names));
|
2018-02-27 20:16:58 +00:00
|
|
|
}
|
|
|
|
|
2018-08-05 03:28:31 +00:00
|
|
|
/// Determine structure of the result.
|
2018-02-26 06:12:59 +00:00
|
|
|
|
|
|
|
if (num_selects == 1)
|
|
|
|
{
|
|
|
|
result_header = nested_interpreters.front()->getSampleBlock();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
Blocks headers(num_selects);
|
|
|
|
for (size_t query_num = 0; query_num < num_selects; ++query_num)
|
|
|
|
headers[query_num] = nested_interpreters[query_num]->getSampleBlock();
|
|
|
|
|
2019-04-09 13:07:07 +00:00
|
|
|
result_header = getCommonHeaderForUnion(headers);
|
|
|
|
}
|
|
|
|
}
|
2018-02-26 06:12:59 +00:00
|
|
|
|
|
|
|
|
2019-04-09 13:07:07 +00:00
|
|
|
Block InterpreterSelectWithUnionQuery::getCommonHeaderForUnion(const Blocks & headers)
|
|
|
|
{
|
|
|
|
size_t num_selects = headers.size();
|
|
|
|
Block common_header = headers.front();
|
|
|
|
size_t num_columns = common_header.columns();
|
|
|
|
|
|
|
|
for (size_t query_num = 1; query_num < num_selects; ++query_num)
|
2019-06-25 17:00:54 +00:00
|
|
|
{
|
2019-04-09 13:07:07 +00:00
|
|
|
if (headers[query_num].columns() != num_columns)
|
|
|
|
throw Exception("Different number of columns in UNION ALL elements:\n"
|
|
|
|
+ common_header.dumpNames()
|
|
|
|
+ "\nand\n"
|
|
|
|
+ headers[query_num].dumpNames() + "\n",
|
|
|
|
ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
|
|
|
|
|
2018-02-26 06:12:59 +00:00
|
|
|
for (size_t column_num = 0; column_num < num_columns; ++column_num)
|
2019-04-09 13:07:07 +00:00
|
|
|
{
|
2019-05-31 12:36:44 +00:00
|
|
|
std::vector<const ColumnWithTypeAndName *> columns;
|
|
|
|
columns.reserve(num_selects);
|
|
|
|
for (size_t i = 0; i < num_selects; ++i)
|
|
|
|
columns.push_back(&headers[i].getByPosition(column_num));
|
2019-04-09 13:07:07 +00:00
|
|
|
|
2019-06-25 17:00:54 +00:00
|
|
|
ColumnWithTypeAndName & result_elem = common_header.getByPosition(column_num);
|
2019-05-31 12:36:44 +00:00
|
|
|
result_elem = getLeastSuperColumn(columns);
|
2018-02-26 06:12:59 +00:00
|
|
|
}
|
|
|
|
}
|
2019-04-09 13:07:07 +00:00
|
|
|
|
|
|
|
return common_header;
|
2018-02-26 06:12:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-02-27 20:16:58 +00:00
|
|
|
InterpreterSelectWithUnionQuery::~InterpreterSelectWithUnionQuery() = default;
|
|
|
|
|
|
|
|
|
2018-02-25 00:50:53 +00:00
|
|
|
Block InterpreterSelectWithUnionQuery::getSampleBlock()
|
|
|
|
{
|
2018-02-26 06:12:59 +00:00
|
|
|
return result_header;
|
2018-02-25 00:50:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Block InterpreterSelectWithUnionQuery::getSampleBlock(
|
|
|
|
const ASTPtr & query_ptr,
|
|
|
|
const Context & context)
|
|
|
|
{
|
2018-05-30 19:23:15 +00:00
|
|
|
auto & cache = context.getSampleBlockCache();
|
|
|
|
/// Using query string because query_ptr changes for every internal SELECT
|
|
|
|
auto key = queryToString(query_ptr);
|
|
|
|
if (cache.find(key) != cache.end())
|
|
|
|
{
|
|
|
|
return cache[key];
|
|
|
|
}
|
|
|
|
|
2019-03-18 12:05:51 +00:00
|
|
|
return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, SelectQueryOptions().analyze()).getSampleBlock();
|
2018-02-25 00:50:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-11-15 18:41:18 +00:00
|
|
|
BlockInputStreams InterpreterSelectWithUnionQuery::executeWithMultipleStreams(QueryPipeline & parent_pipeline)
|
2018-02-25 00:50:53 +00:00
|
|
|
{
|
|
|
|
BlockInputStreams nested_streams;
|
|
|
|
|
|
|
|
for (auto & interpreter : nested_interpreters)
|
|
|
|
{
|
2019-11-15 18:41:18 +00:00
|
|
|
BlockInputStreams streams = interpreter->executeWithMultipleStreams(parent_pipeline);
|
2018-02-25 00:50:53 +00:00
|
|
|
nested_streams.insert(nested_streams.end(), streams.begin(), streams.end());
|
|
|
|
}
|
|
|
|
|
2018-02-26 09:10:11 +00:00
|
|
|
/// Unify data structure.
|
|
|
|
if (nested_interpreters.size() > 1)
|
|
|
|
for (auto & stream : nested_streams)
|
|
|
|
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, result_header, ConvertingBlockInputStream::MatchColumnsMode::Position);
|
|
|
|
|
2018-02-25 00:50:53 +00:00
|
|
|
return nested_streams;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
BlockIO InterpreterSelectWithUnionQuery::execute()
|
|
|
|
{
|
2018-02-28 02:33:04 +00:00
|
|
|
const Settings & settings = context.getSettingsRef();
|
|
|
|
|
2019-11-15 18:41:18 +00:00
|
|
|
BlockIO res;
|
|
|
|
BlockInputStreams nested_streams = executeWithMultipleStreams(res.pipeline);
|
2018-02-25 00:50:53 +00:00
|
|
|
BlockInputStreamPtr result_stream;
|
|
|
|
|
|
|
|
if (nested_streams.empty())
|
|
|
|
{
|
2018-02-25 06:34:20 +00:00
|
|
|
result_stream = std::make_shared<NullBlockInputStream>(getSampleBlock());
|
2018-02-25 00:50:53 +00:00
|
|
|
}
|
|
|
|
else if (nested_streams.size() == 1)
|
|
|
|
{
|
|
|
|
result_stream = nested_streams.front();
|
2018-02-25 06:34:20 +00:00
|
|
|
nested_streams.clear();
|
2018-02-25 00:50:53 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2018-11-28 14:33:40 +00:00
|
|
|
result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr, settings.max_threads);
|
2018-02-25 00:50:53 +00:00
|
|
|
nested_streams.clear();
|
|
|
|
}
|
|
|
|
|
|
|
|
res.in = result_stream;
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2018-02-25 06:34:20 +00:00
|
|
|
|
2019-03-26 18:28:37 +00:00
|
|
|
QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
|
|
|
|
{
|
|
|
|
QueryPipeline main_pipeline;
|
|
|
|
std::vector<QueryPipeline> pipelines;
|
|
|
|
bool has_main_pipeline = false;
|
|
|
|
|
2019-04-09 13:07:07 +00:00
|
|
|
Blocks headers;
|
|
|
|
headers.reserve(nested_interpreters.size());
|
|
|
|
|
2019-03-26 18:28:37 +00:00
|
|
|
for (auto & interpreter : nested_interpreters)
|
|
|
|
{
|
|
|
|
if (!has_main_pipeline)
|
|
|
|
{
|
|
|
|
has_main_pipeline = true;
|
|
|
|
main_pipeline = interpreter->executeWithProcessors();
|
2019-04-09 13:07:07 +00:00
|
|
|
headers.emplace_back(main_pipeline.getHeader());
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
else
|
2019-04-09 13:07:07 +00:00
|
|
|
{
|
2019-03-26 18:28:37 +00:00
|
|
|
pipelines.emplace_back(interpreter->executeWithProcessors());
|
2019-04-09 13:07:07 +00:00
|
|
|
headers.emplace_back(pipelines.back().getHeader());
|
|
|
|
}
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (!has_main_pipeline)
|
2019-11-05 17:33:03 +00:00
|
|
|
main_pipeline.init(Pipe(std::make_shared<NullSource>(getSampleBlock())));
|
2019-03-26 18:28:37 +00:00
|
|
|
|
|
|
|
if (!pipelines.empty())
|
2019-04-09 13:07:07 +00:00
|
|
|
{
|
|
|
|
auto common_header = getCommonHeaderForUnion(headers);
|
|
|
|
main_pipeline.unitePipelines(std::move(pipelines), common_header, context);
|
|
|
|
}
|
2019-03-26 18:28:37 +00:00
|
|
|
|
|
|
|
return main_pipeline;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-02-25 06:34:20 +00:00
|
|
|
void InterpreterSelectWithUnionQuery::ignoreWithTotals()
|
|
|
|
{
|
|
|
|
for (auto & interpreter : nested_interpreters)
|
|
|
|
interpreter->ignoreWithTotals();
|
|
|
|
}
|
|
|
|
|
2018-02-25 00:50:53 +00:00
|
|
|
}
|