Better UNION ALL: development #1947

This commit is contained in:
Alexey Milovidov 2018-02-28 04:29:55 +03:00
parent c35727c7ed
commit 9ea0a603a0
4 changed files with 54 additions and 66 deletions

View File

@ -60,6 +60,8 @@
#include <DataTypes/DataTypeFunction.h> #include <DataTypes/DataTypeFunction.h>
#include <Functions/FunctionsMiscellaneous.h> #include <Functions/FunctionsMiscellaneous.h>
#include <Core/iostream_debug_helpers.h>
namespace DB namespace DB
{ {
@ -187,7 +189,12 @@ ExpressionAnalyzer::ExpressionAnalyzer(
} }
} }
removeDuplicateColumns(source_columns); if (storage && source_columns.empty())
source_columns = storage->getSampleBlock().getNamesAndTypesList();
else
removeDuplicateColumns(source_columns);
DUMP(source_columns);
addAliasColumns(); addAliasColumns();
@ -2686,7 +2693,7 @@ void ExpressionAnalyzer::collectUsedColumns()
if (required.empty()) if (required.empty())
required.insert(ExpressionActions::getSmallestColumn(source_columns)); required.insert(ExpressionActions::getSmallestColumn(source_columns));
unknown_required_source_columns = required; NameSet unknown_required_source_columns = required;
for (NamesAndTypesList::iterator it = source_columns.begin(); it != source_columns.end();) for (NamesAndTypesList::iterator it = source_columns.begin(); it != source_columns.end();)
{ {
@ -2713,6 +2720,9 @@ void ExpressionAnalyzer::collectUsedColumns()
++it; ++it;
} }
} }
if (!unknown_required_source_columns.empty())
throw Exception("Unknown identifier: " + *unknown_required_source_columns.begin(), ErrorCodes::UNKNOWN_IDENTIFIER);
} }
void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAndTypesList & joined_columns_name_type) void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAndTypesList & joined_columns_name_type)
@ -2775,14 +2785,7 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAnd
Names ExpressionAnalyzer::getRequiredSourceColumns() const Names ExpressionAnalyzer::getRequiredSourceColumns() const
{ {
if (!unknown_required_source_columns.empty()) return source_columns.getNames();
throw Exception("Unknown identifier: " + *unknown_required_source_columns.begin(), ErrorCodes::UNKNOWN_IDENTIFIER);
Names res;
for (const auto & column_name_type : source_columns)
res.push_back(column_name_type.name);
return res;
} }

View File

@ -69,7 +69,7 @@ public:
const ASTPtr & ast_, const ASTPtr & ast_,
const Context & context_, const Context & context_,
const StoragePtr & storage_, const StoragePtr & storage_,
const NamesAndTypesList & source_columns_, const NamesAndTypesList & source_columns_ = {},
const Names & required_result_columns_ = {}, const Names & required_result_columns_ = {},
size_t subquery_depth_ = 0, size_t subquery_depth_ = 0,
bool do_global_ = false, bool do_global_ = false,
@ -146,9 +146,6 @@ private:
Settings settings; Settings settings;
size_t subquery_depth; size_t subquery_depth;
/// Columns that are mentioned in the expression, but were not specified in the constructor.
NameSet unknown_required_source_columns;
/** Original columns. /** Original columns.
* First, all available columns of the table are placed here. Then (when analyzing the query), unused columns are deleted. * First, all available columns of the table are placed here. Then (when analyzing the query), unused columns are deleted.
*/ */

View File

@ -19,7 +19,6 @@
#include <DataStreams/CreatingSetsBlockInputStream.h> #include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h> #include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h> #include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h> #include <Parsers/ASTSelectWithUnionQuery.h>
@ -113,53 +112,44 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names)
max_streams = settings.max_threads; max_streams = settings.max_threads;
/// Read from prepared input. const auto & table_expression = query.table();
NamesAndTypesList source_columns;
if (input) if (input)
{ {
source_header = input->getHeader(); /// Read from prepared input.
source_columns = input->getHeader().getNamesAndTypesList();
}
else if (table_expression && typeid_cast<const ASTSelectWithUnionQuery *>(table_expression.get()))
{
/// Read from subquery.
source_columns = InterpreterSelectWithUnionQuery::getSampleBlock(table_expression, context).getNamesAndTypesList();
}
else if (table_expression && typeid_cast<const ASTFunction *>(table_expression.get()))
{
/// Read from table function.
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(
typeid_cast<const ASTFunction *>(table_expression.get())->name, context);
/// Run it and remember the result
storage = table_function_ptr->execute(table_expression, context);
} }
else else
{ {
auto table_expression = query.table(); /// Read from table. Even without table expression (implicit SELECT ... FROM system.one).
String database_name;
String table_name;
/// Read from subquery. getDatabaseAndTableNames(database_name, table_name);
if (table_expression && typeid_cast<const ASTSelectWithUnionQuery *>(table_expression.get()))
{
source_header = InterpreterSelectWithUnionQuery::getSampleBlock(table_expression, context);
}
else
{
/// Read from table function.
if (table_expression && typeid_cast<const ASTFunction *>(table_expression.get()))
{
/// Get the table function
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(
typeid_cast<const ASTFunction *>(table_expression.get())->name, context);
/// Run it and remember the result
storage = table_function_ptr->execute(table_expression, context);
}
else
{
/// Read from table.
String database_name;
String table_name;
getDatabaseAndTableNames(database_name, table_name); storage = context.getTable(database_name, table_name);
storage = context.getTable(database_name, table_name);
}
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
source_header = storage->getSampleBlock();
}
} }
if (!source_header) if (storage)
throw Exception("There are no available columns", ErrorCodes::THERE_IS_NO_COLUMN); table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
query_analyzer = std::make_unique<ExpressionAnalyzer>( query_analyzer = std::make_unique<ExpressionAnalyzer>(
query_ptr, context, storage, source_header.getNamesAndTypesList(), required_result_column_names, subquery_depth, !only_analyze); query_ptr, context, storage, source_columns, required_result_column_names, subquery_depth, !only_analyze);
if (query.sample_size() && (input || !storage || !storage->supportsSampling())) if (query.sample_size() && (input || !storage || !storage->supportsSampling()))
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED); throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
@ -208,7 +198,7 @@ void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, St
Block InterpreterSelectQuery::getSampleBlock() Block InterpreterSelectQuery::getSampleBlock()
{ {
Pipeline pipeline; Pipeline pipeline;
executeImpl(pipeline, std::make_shared<OneBlockInputStream>(source_header)); executeImpl(pipeline, input, true);
auto res = pipeline.firstStream()->getHeader(); auto res = pipeline.firstStream()->getHeader();
return res; return res;
} }
@ -223,7 +213,7 @@ Block InterpreterSelectQuery::getSampleBlock(const ASTPtr & query_ptr_, const Co
BlockIO InterpreterSelectQuery::execute() BlockIO InterpreterSelectQuery::execute()
{ {
Pipeline pipeline; Pipeline pipeline;
executeImpl(pipeline, input); executeImpl(pipeline, input, false);
executeUnion(pipeline); executeUnion(pipeline);
BlockIO res; BlockIO res;
@ -234,7 +224,7 @@ BlockIO InterpreterSelectQuery::execute()
BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams() BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams()
{ {
Pipeline pipeline; Pipeline pipeline;
executeImpl(pipeline, input); executeImpl(pipeline, input, false);
return pipeline.streams; return pipeline.streams;
} }
@ -319,7 +309,7 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
} }
void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputStreamPtr & input) void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run)
{ {
if (input) if (input)
pipeline.streams.push_back(input); pipeline.streams.push_back(input);
@ -335,7 +325,7 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
*/ */
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */ /** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
QueryProcessingStage::Enum from_stage = executeFetchColumns(pipeline); QueryProcessingStage::Enum from_stage = executeFetchColumns(pipeline, dry_run);
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage)); LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
@ -508,7 +498,7 @@ static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, siz
} }
} }
QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline & pipeline) QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline & pipeline, bool dry_run)
{ {
/// List of columns to read to execute the query. /// List of columns to read to execute the query.
Names required_columns = query_analyzer->getRequiredSourceColumns(); Names required_columns = query_analyzer->getRequiredSourceColumns();
@ -544,7 +534,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
required_columns_expr_list->children.emplace_back(std::make_shared<ASTIdentifier>(column)); required_columns_expr_list->children.emplace_back(std::make_shared<ASTIdentifier>(column));
} }
alias_actions = ExpressionAnalyzer{required_columns_expr_list, context, storage, source_header.getNamesAndTypesList()}.getActions(true); alias_actions = ExpressionAnalyzer(required_columns_expr_list, context, storage).getActions(true);
/// The set of required columns could be added as a result of adding an action to calculate ALIAS. /// The set of required columns could be added as a result of adding an action to calculate ALIAS.
required_columns = alias_actions->getRequiredColumns(); required_columns = alias_actions->getRequiredColumns();
@ -668,8 +658,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
optimize_prewhere(*merge_tree); optimize_prewhere(*merge_tree);
} }
/// If there was no already prepared input. if (!dry_run)
if (pipeline.streams.empty())
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams); pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
if (pipeline.streams.empty()) if (pipeline.streams.empty())

View File

@ -33,7 +33,7 @@ public:
* You can perform till the intermediate aggregation state, which are combined from different servers for distributed query processing. * You can perform till the intermediate aggregation state, which are combined from different servers for distributed query processing.
* *
* subquery_depth * subquery_depth
* - to control the restrictions on the depth of nesting of subqueries. For subqueries, a value that is incremented by one is passed; * - to control the limit on the depth of nesting of subqueries. For subqueries, a value that is incremented by one is passed;
* for INSERT SELECT, a value 1 is passed instead of 0. * for INSERT SELECT, a value 1 is passed instead of 0.
* *
* input * input
@ -109,7 +109,7 @@ private:
void init(const Names & required_result_column_names); void init(const Names & required_result_column_names);
void executeImpl(Pipeline & pipeline, const BlockInputStreamPtr & input); void executeImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run);
struct AnalysisResult struct AnalysisResult
@ -147,10 +147,10 @@ private:
/// Different stages of query execution. /// Different stages of query execution.
/// Fetch data from the table. Returns the stage to which the query was processed in Storage. void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run);
QueryProcessingStage::Enum executeFetchColumns(Pipeline & pipeline);
void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input); /// Fetch data from the table. Returns the stage to which the query was processed in Storage.
QueryProcessingStage::Enum executeFetchColumns(Pipeline & pipeline, bool dry_run);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression); void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final); void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
@ -182,7 +182,6 @@ private:
QueryProcessingStage::Enum to_stage; QueryProcessingStage::Enum to_stage;
size_t subquery_depth; size_t subquery_depth;
std::unique_ptr<ExpressionAnalyzer> query_analyzer; std::unique_ptr<ExpressionAnalyzer> query_analyzer;
Block source_header;
/// How many streams we ask for storage to produce, and in how many threads we will do further processing. /// How many streams we ask for storage to produce, and in how many threads we will do further processing.
size_t max_streams = 1; size_t max_streams = 1;