Merge pull request #1950 from yandex/unify-cast-column-and-nullable-adapter-streams

Unify cast column and nullable adapter streams
This commit is contained in:
alexey-milovidov 2018-02-23 09:15:24 +03:00 committed by GitHub
commit 7434ddd185
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 383 additions and 530 deletions

View File

@ -1,76 +0,0 @@
#include <DataStreams/CastTypeBlockInputStream.h>
#include <Interpreters/castColumn.h>
namespace DB
{
CastTypeBlockInputStream::CastTypeBlockInputStream(
const Context & context_,
const BlockInputStreamPtr & input,
const Block & reference_definition)
: context(context_)
{
children.emplace_back(input);
Block input_header = input->getHeader();
for (size_t col_num = 0, num_columns = input_header.columns(); col_num < num_columns; ++col_num)
{
const auto & elem = input_header.getByPosition(col_num);
if (!reference_definition.has(elem.name))
{
header.insert(elem);
continue;
}
const auto & ref_column = reference_definition.getByName(elem.name);
/// Force conversion if source and destination types is different.
if (ref_column.type->equals(*elem.type))
{
header.insert(elem);
}
else
{
header.insert({ castColumn(elem, ref_column.type, context), ref_column.type, elem.name });
cast_description.emplace(col_num, ref_column.type);
}
}
}
String CastTypeBlockInputStream::getName() const
{
return "CastType";
}
Block CastTypeBlockInputStream::readImpl()
{
Block block = children.back()->read();
if (!block)
return block;
if (cast_description.empty())
return block;
size_t num_columns = block.columns();
Block res = block;
for (size_t col = 0; col < num_columns; ++col)
{
auto it = cast_description.find(col);
if (cast_description.end() != it)
{
auto & elem = res.getByPosition(col);
elem.column = castColumn(elem, it->second, context);
elem.type = it->second;
}
}
return res;
}
}

View File

@ -1,33 +0,0 @@
#pragma once
#include <unordered_map>
#include <DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
/// Implicitly converts types.
class CastTypeBlockInputStream : public IProfilingBlockInputStream
{
public:
CastTypeBlockInputStream(const Context & context,
const BlockInputStreamPtr & input,
const Block & reference_definition);
String getName() const override;
Block getHeader() const override { return header; }
private:
Block readImpl() override;
const Context & context;
Block header;
/// Describes required conversions on source block
/// Contains column numbers in source block that should be converted
std::unordered_map<size_t, DataTypePtr> cast_description;
};
}

View File

@ -0,0 +1,100 @@
#include <DataStreams/ConvertingBlockInputStream.h>
#include <Interpreters/castColumn.h>
#include <Columns/ColumnConst.h>
#include <Parsers/IAST.h>
namespace DB
{
namespace ErrorCodes
{
extern const int THERE_IS_NO_COLUMN;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
}
ConvertingBlockInputStream::ConvertingBlockInputStream(
const Context & context_,
const BlockInputStreamPtr & input,
const Block & result_header,
MatchColumnsMode mode)
: context(context_), header(result_header), conversion(header.columns())
{
children.emplace_back(input);
Block input_header = input->getHeader();
size_t num_input_columns = input_header.columns();
size_t num_result_columns = result_header.columns();
if (mode == MatchColumnsMode::Position && num_input_columns != num_result_columns)
throw Exception("Number of columns doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
for (size_t result_col_num = 0; result_col_num < num_result_columns; ++result_col_num)
{
const auto & res_elem = result_header.getByPosition(result_col_num);
switch (mode)
{
case MatchColumnsMode::Position:
conversion[result_col_num] = result_col_num;
break;
case MatchColumnsMode::Name:
if (input_header.has(res_elem.name))
conversion[result_col_num] = input_header.getPositionByName(res_elem.name);
else
throw Exception("Cannot find column " + backQuoteIfNeed(res_elem.name) + " in source stream",
ErrorCodes::THERE_IS_NO_COLUMN);
break;
}
const auto & src_elem = input_header.getByPosition(conversion[result_col_num]);
/// Check constants.
if (res_elem.column->isColumnConst())
{
if (!src_elem.column->isColumnConst())
throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name)
+ " because it is non constant in source stream but must be constant in result",
ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
else if (static_cast<const ColumnConst &>(*src_elem.column).getField() != static_cast<const ColumnConst &>(*res_elem.column).getField())
throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name)
+ " because it is constant but values of constants are different in source and result",
ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
}
/// Check conversion by dry run CAST function.
castColumn(src_elem, res_elem.type, context);
}
}
Block ConvertingBlockInputStream::readImpl()
{
Block src = children.back()->read();
if (!src)
return src;
Block res = header.cloneEmpty();
for (size_t res_pos = 0, size = conversion.size(); res_pos < size; ++res_pos)
{
const auto & src_elem = src.getByPosition(conversion[res_pos]);
auto & res_elem = res.getByPosition(res_pos);
ColumnPtr converted = castColumn(src_elem, res_elem.type, context);
if (src_elem.column->isColumnConst() && !res_elem.column->isColumnConst())
converted = converted->convertToFullColumnIfConst();
res_elem.column = std::move(converted);
}
return res;
}
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <unordered_map>
#include <DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
/** Convert one block structure to another:
*
* Leaves only necessary columns;
*
* Columns are searched in source first by name;
* and if there is no column with same name, then by position.
*
* Converting types of matching columns (with CAST function).
*
* Materializing columns which are const in source and non-const in result,
* throw if they are const in result and non const in source,
* or if they are const and have different values.
*/
class ConvertingBlockInputStream : public IProfilingBlockInputStream
{
public:
enum class MatchColumnsMode
{
/// Require same number of columns in source and result. Match columns by corresponding positions, regardless to names.
Position,
/// Find columns in source by their names. Allow excessive columns in source.
Name
};
ConvertingBlockInputStream(
const Context & context,
const BlockInputStreamPtr & input,
const Block & result_header,
MatchColumnsMode mode);
String getName() const override { return "Converting"; }
Block getHeader() const override { return header; }
private:
Block readImpl() override;
const Context & context;
Block header;
/// How to construct result block. Position in source block, where to get each column.
using Conversion = std::vector<size_t>;
Conversion conversion;
};
}

View File

@ -1,123 +0,0 @@
#include <DataStreams/NullableAdapterBlockInputStream.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsCommon.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN;
extern const int TYPE_MISMATCH;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
}
static Block transform(const Block & block, const NullableAdapterBlockInputStream::Actions & actions, const std::vector<std::optional<String>> & rename)
{
size_t num_columns = block.columns();
Block res;
for (size_t i = 0; i < num_columns; ++i)
{
const auto & elem = block.getByPosition(i);
switch (actions[i])
{
case NullableAdapterBlockInputStream::TO_ORDINARY:
{
const auto & nullable_col = static_cast<const ColumnNullable &>(*elem.column);
const auto & nullable_type = static_cast<const DataTypeNullable &>(*elem.type);
const auto & null_map = nullable_col.getNullMapData();
bool has_nulls = !memoryIsZero(null_map.data(), null_map.size());
if (has_nulls)
throw Exception{"Cannot insert NULL value into non-nullable column",
ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN};
else
res.insert({
nullable_col.getNestedColumnPtr(),
nullable_type.getNestedType(),
rename[i].value_or(elem.name)});
break;
}
case NullableAdapterBlockInputStream::TO_NULLABLE:
{
ColumnPtr null_map = ColumnUInt8::create(elem.column->size(), 0);
res.insert({
ColumnNullable::create(elem.column, null_map),
std::make_shared<DataTypeNullable>(elem.type),
rename[i].value_or(elem.name)});
break;
}
case NullableAdapterBlockInputStream::NONE:
{
res.insert({elem.column, elem.type, rename[i].value_or(elem.name)});
break;
}
}
}
return res;
}
NullableAdapterBlockInputStream::NullableAdapterBlockInputStream(
const BlockInputStreamPtr & input,
const Block & src_header, const Block & res_header)
{
buildActions(src_header, res_header);
children.push_back(input);
header = transform(src_header, actions, rename);
}
Block NullableAdapterBlockInputStream::readImpl()
{
Block block = children.back()->read();
if (!block)
return block;
return transform(block, actions, rename);
}
void NullableAdapterBlockInputStream::buildActions(
const Block & src_header,
const Block & res_header)
{
size_t in_size = src_header.columns();
if (res_header.columns() != in_size)
throw Exception("Number of columns in INSERT SELECT doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
actions.reserve(in_size);
rename.reserve(in_size);
for (size_t i = 0; i < in_size; ++i)
{
const auto & in_elem = src_header.getByPosition(i);
const auto & out_elem = res_header.getByPosition(i);
bool is_in_nullable = in_elem.type->isNullable();
bool is_out_nullable = out_elem.type->isNullable();
if (is_in_nullable && !is_out_nullable)
actions.push_back(TO_ORDINARY);
else if (!is_in_nullable && is_out_nullable)
actions.push_back(TO_NULLABLE);
else
actions.push_back(NONE);
if (in_elem.name != out_elem.name)
rename.emplace_back(std::make_optional(out_elem.name));
else
rename.emplace_back();
}
}
}

View File

@ -1,57 +0,0 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <optional>
namespace DB
{
/// This stream allows perfoming INSERT requests in which the types of
/// the target and source blocks are compatible up to nullability:
///
/// - if a target column is nullable while the corresponding source
/// column is not, we embed the source column into a nullable column;
/// - if a source column is nullable while the corresponding target
/// column is not, we extract the nested column from the source
/// while checking that is doesn't actually contain NULLs;
/// - otherwise we just perform an identity mapping.
class NullableAdapterBlockInputStream : public IProfilingBlockInputStream
{
public:
NullableAdapterBlockInputStream(const BlockInputStreamPtr & input, const Block & src_header, const Block & res_header);
String getName() const override { return "NullableAdapter"; }
Block getHeader() const override { return header; }
/// Given a column of a block we have just read,
/// how must we process it?
enum Action
{
/// Do nothing.
NONE = 0,
/// Convert nullable column to ordinary column.
TO_ORDINARY,
/// Convert non-nullable column to nullable column.
TO_NULLABLE
};
/// Actions to be taken for each column of a block.
using Actions = std::vector<Action>;
private:
Block readImpl() override;
/// Determine the actions to be taken using the source sample block,
/// which describes the columns from which we fetch data inside an INSERT
/// query, and the target sample block which contains the columns
/// we insert data into.
void buildActions(const Block & src_header, const Block & res_header);
Block header;
Actions actions;
std::vector<std::optional<String>> rename;
};
}

View File

@ -3,11 +3,9 @@
#include <Common/typeid_cast.h>
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <DataStreams/CastTypeBlockInputStream.h>
#include <DataStreams/CountingBlockOutputStream.h>
#include <DataStreams/MaterializingBlockOutputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DataStreams/NullableAdapterBlockInputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/copyData.h>
@ -104,8 +102,6 @@ BlockIO InterpreterInsertQuery::execute()
out = std::make_shared<PushingToViewsBlockOutputStream>(query.database, query.table, table, context, query_ptr, query.no_destination);
out = std::make_shared<MaterializingBlockOutputStream>(out, table->getSampleBlock());
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, getSampleBlock(query, table), required_columns, table->column_defaults, context,
static_cast<bool>(context.getSettingsRef().strict_insert_defaults));
@ -127,8 +123,7 @@ BlockIO InterpreterInsertQuery::execute()
res.in = interpreter_select.execute().in;
res.in = std::make_shared<NullableAdapterBlockInputStream>(res.in, res.in->getHeader(), res.out->getHeader());
res.in = std::make_shared<CastTypeBlockInputStream>(context, res.in, res.out->getHeader());
res.in = std::make_shared<ConvertingBlockInputStream>(context, res.in, res.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Position);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, res.out);
res.out = nullptr;

View File

@ -186,6 +186,8 @@ void InterpreterSelectQuery::basicInit()
}
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
/// TODO This looks weird.
source_header = storage->getSampleBlockNonMaterialized();
}
}
@ -402,6 +404,87 @@ void InterpreterSelectQuery::executeWithoutUnionImpl(Pipeline & pipeline, const
executeSingleQuery(pipeline);
}
InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage)
{
AnalysisResult res;
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
res.first_stage = from_stage < QueryProcessingStage::WithMergeableState
&& to_stage >= QueryProcessingStage::WithMergeableState;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
res.second_stage = from_stage <= QueryProcessingStage::WithMergeableState
&& to_stage > QueryProcessingStage::WithMergeableState;
/** First we compose a chain of actions and remember the necessary steps from it.
* Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and
* throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries.
*/
{
ExpressionActionsChain chain;
res.need_aggregate = query_analyzer->hasAggregation();
query_analyzer->appendArrayJoin(chain, !res.first_stage);
if (query_analyzer->appendJoin(chain, !res.first_stage))
{
res.has_join = true;
res.before_join = chain.getLastActions();
chain.addStep();
}
if (query_analyzer->appendWhere(chain, !res.first_stage))
{
res.has_where = true;
res.before_where = chain.getLastActions();
chain.addStep();
}
if (res.need_aggregate)
{
query_analyzer->appendGroupBy(chain, !res.first_stage);
query_analyzer->appendAggregateFunctionsArguments(chain, !res.first_stage);
res.before_aggregation = chain.getLastActions();
chain.finalize();
chain.clear();
if (query_analyzer->appendHaving(chain, !res.second_stage))
{
res.has_having = true;
res.before_having = chain.getLastActions();
chain.addStep();
}
}
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer->appendSelect(chain, res.need_aggregate ? !res.second_stage : !res.first_stage);
res.selected_columns = chain.getLastStep().required_output;
res.has_order_by = query_analyzer->appendOrderBy(chain, res.need_aggregate ? !res.second_stage : !res.first_stage);
res.before_order_and_select = chain.getLastActions();
chain.addStep();
query_analyzer->appendProjectResult(chain);
res.final_projection = chain.getLastActions();
chain.finalize();
chain.clear();
}
/// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys).
if (res.has_where)
res.before_where->prependProjectInput();
if (res.has_having)
res.before_having->prependProjectInput();
res.subqueries_for_sets = query_analyzer->getSubqueriesForSets();
return res;
}
void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline)
{
/** Streams of data. When the query is executed in parallel, we have several data streams.
@ -423,101 +506,17 @@ void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline)
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
AnalysisResult expressions = analyzeExpressions(from_stage);
const Settings & settings = context.getSettingsRef();
if (to_stage > QueryProcessingStage::FetchColumns)
{
bool has_join = false;
bool has_where = false;
bool need_aggregate = false;
bool has_having = false;
bool has_order_by = false;
ExpressionActionsPtr before_join; /// including JOIN
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
ExpressionActionsPtr before_having;
ExpressionActionsPtr before_order_and_select;
ExpressionActionsPtr final_projection;
/// Columns from the SELECT list, before renaming them to aliases.
Names selected_columns;
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = from_stage < QueryProcessingStage::WithMergeableState
&& to_stage >= QueryProcessingStage::WithMergeableState;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
bool second_stage = from_stage <= QueryProcessingStage::WithMergeableState
&& to_stage > QueryProcessingStage::WithMergeableState;
/** First we compose a chain of actions and remember the necessary steps from it.
* Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and
* throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries.
*/
{
ExpressionActionsChain chain;
need_aggregate = query_analyzer->hasAggregation();
query_analyzer->appendArrayJoin(chain, !first_stage);
if (query_analyzer->appendJoin(chain, !first_stage))
{
has_join = true;
before_join = chain.getLastActions();
chain.addStep();
}
if (query_analyzer->appendWhere(chain, !first_stage))
{
has_where = true;
before_where = chain.getLastActions();
chain.addStep();
}
if (need_aggregate)
{
query_analyzer->appendGroupBy(chain, !first_stage);
query_analyzer->appendAggregateFunctionsArguments(chain, !first_stage);
before_aggregation = chain.getLastActions();
chain.finalize();
chain.clear();
if (query_analyzer->appendHaving(chain, !second_stage))
{
has_having = true;
before_having = chain.getLastActions();
chain.addStep();
}
}
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer->appendSelect(chain, need_aggregate ? !second_stage : !first_stage);
selected_columns = chain.getLastStep().required_output;
has_order_by = query_analyzer->appendOrderBy(chain, need_aggregate ? !second_stage : !first_stage);
before_order_and_select = chain.getLastActions();
chain.addStep();
query_analyzer->appendProjectResult(chain);
final_projection = chain.getLastActions();
chain.finalize();
chain.clear();
}
/// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys).
if (has_where)
before_where->prependProjectInput();
if (has_having)
before_having->prependProjectInput();
/// Now we will compose block streams that perform the necessary actions.
/// Do I need to aggregate in a separate row rows that have not passed max_rows_to_group_by.
bool aggregate_overflow_row =
need_aggregate &&
expressions.need_aggregate &&
query.group_by_with_totals &&
settings.limits.max_rows_to_group_by &&
settings.limits.group_by_overflow_mode == OverflowMode::ANY &&
@ -525,32 +524,32 @@ void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline)
/// Do I need to immediately finalize the aggregate functions after the aggregation?
bool aggregate_final =
need_aggregate &&
expressions.need_aggregate &&
to_stage > QueryProcessingStage::WithMergeableState &&
!query.group_by_with_totals;
if (first_stage)
if (expressions.first_stage)
{
if (has_join)
if (expressions.has_join)
{
const ASTTableJoin & join = static_cast<const ASTTableJoin &>(*query.join()->table_join);
if (join.kind == ASTTableJoin::Kind::Full || join.kind == ASTTableJoin::Kind::Right)
pipeline.stream_with_non_joined_data = before_join->createStreamWithNonJoinedDataIfFullOrRightJoin(
pipeline.stream_with_non_joined_data = expressions.before_join->createStreamWithNonJoinedDataIfFullOrRightJoin(
pipeline.firstStream()->getHeader(), settings.max_block_size);
for (auto & stream : pipeline.streams) /// Applies to all sources except stream_with_non_joined_data.
stream = std::make_shared<ExpressionBlockInputStream>(stream, before_join);
stream = std::make_shared<ExpressionBlockInputStream>(stream, expressions.before_join);
}
if (has_where)
executeWhere(pipeline, before_where);
if (expressions.has_where)
executeWhere(pipeline, expressions.before_where);
if (need_aggregate)
executeAggregation(pipeline, before_aggregation, aggregate_overflow_row, aggregate_final);
if (expressions.need_aggregate)
executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);
else
{
executeExpression(pipeline, before_order_and_select);
executeDistinct(pipeline, true, selected_columns);
executeExpression(pipeline, expressions.before_order_and_select);
executeDistinct(pipeline, true, expressions.selected_columns);
}
/** For distributed query processing,
@ -558,36 +557,36 @@ void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline)
* but there is an ORDER or LIMIT,
* then we will perform the preliminary sorting and LIMIT on the remote server.
*/
if (!second_stage && !need_aggregate && !has_having)
if (!expressions.second_stage && !expressions.need_aggregate && !expressions.has_having)
{
if (has_order_by)
if (expressions.has_order_by)
executeOrder(pipeline);
if (has_order_by && query.limit_length)
executeDistinct(pipeline, false, selected_columns);
if (expressions.has_order_by && query.limit_length)
executeDistinct(pipeline, false, expressions.selected_columns);
if (query.limit_length)
executePreLimit(pipeline);
}
}
if (second_stage)
if (expressions.second_stage)
{
bool need_second_distinct_pass;
if (need_aggregate)
if (expressions.need_aggregate)
{
/// If you need to combine aggregated results from multiple servers
if (!first_stage)
if (!expressions.first_stage)
executeMergeAggregated(pipeline, aggregate_overflow_row, aggregate_final);
if (!aggregate_final)
executeTotalsAndHaving(pipeline, has_having, before_having, aggregate_overflow_row);
else if (has_having)
executeHaving(pipeline, before_having);
executeTotalsAndHaving(pipeline, expressions.has_having, expressions.before_having, aggregate_overflow_row);
else if (expressions.has_having)
executeHaving(pipeline, expressions.before_having);
executeExpression(pipeline, before_order_and_select);
executeDistinct(pipeline, true, selected_columns);
executeExpression(pipeline, expressions.before_order_and_select);
executeDistinct(pipeline, true, expressions.selected_columns);
need_second_distinct_pass = query.distinct && pipeline.hasMoreThanOneStream();
}
@ -599,19 +598,19 @@ void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline)
executeTotalsAndHaving(pipeline, false, nullptr, aggregate_overflow_row);
}
if (has_order_by)
if (expressions.has_order_by)
{
/** If there is an ORDER BY for distributed query processing,
* but there is no aggregation, then on the remote servers ORDER BY was made
* - therefore, we merge the sorted streams from remote servers.
*/
if (!first_stage && !need_aggregate && !(query.group_by_with_totals && !aggregate_final))
if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final))
executeMergeSorted(pipeline);
else /// Otherwise, just sort.
executeOrder(pipeline);
}
executeProjection(pipeline, final_projection);
executeProjection(pipeline, expressions.final_projection);
/// At this stage, we can calculate the minimums and maximums, if necessary.
if (settings.extremes)
@ -624,7 +623,7 @@ void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline)
}
/** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
* limiting the number of entries in each up to `offset + limit`.
* limiting the number of rows in each up to `offset + limit`.
*/
if (query.limit_length && pipeline.hasMoreThanOneStream() && !query.distinct && !query.limit_by_expression_list)
executePreLimit(pipeline);
@ -653,9 +652,8 @@ void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline)
}
}
SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();
if (!subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(pipeline, subqueries_for_sets);
if (!expressions.subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(pipeline, expressions.subqueries_for_sets);
}

View File

@ -3,6 +3,7 @@
#include <Core/QueryProcessingStage.h>
#include <Interpreters/Context.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/IBlockInputStream.h>
@ -128,6 +129,36 @@ private:
/// Execute one SELECT query from the UNION ALL chain.
void executeSingleQuery(Pipeline & pipeline);
struct AnalysisResult
{
bool has_join = false;
bool has_where = false;
bool need_aggregate = false;
bool has_having = false;
bool has_order_by = false;
ExpressionActionsPtr before_join; /// including JOIN
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
ExpressionActionsPtr before_having;
ExpressionActionsPtr before_order_and_select;
ExpressionActionsPtr final_projection;
/// Columns from the SELECT list, before renaming them to aliases.
Names selected_columns;
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = false;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
bool second_stage = false;
SubqueriesForSets subqueries_for_sets;
};
AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage);
/** Leave only the necessary columns of the SELECT section in each query of the UNION ALL chain.
* However, if you use at least one DISTINCT in the chain, then all the columns are considered necessary,
* since otherwise DISTINCT would work differently.
@ -150,10 +181,6 @@ private:
*/
void getDatabaseAndTableNames(String & database_name, String & table_name);
/** Select from the list of columns any, better - with minimum size.
*/
String getAnyColumn();
/// Different stages of query execution.
/// Fetch data from the table. Returns the stage to which the query was processed in Storage.

View File

@ -2,23 +2,26 @@
#include <DataStreams/narrowBlockInputStreams.h>
#include <DataStreams/LazyBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <Storages/StorageMerge.h>
#include <Storages/StorageFactory.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/VirtualColumnFactory.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Storages/VirtualColumnFactory.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTExpressionList.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <Common/typeid_cast.h>
#include <Databases/IDatabase.h>
#include <DataStreams/CastTypeBlockInputStream.h>
#include <DataStreams/FilterColumnsBlockInputStream.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTExpressionList.h>
namespace DB
@ -47,6 +50,22 @@ StorageMerge::StorageMerge(
{
}
NameAndTypePair StorageMerge::getColumn(const String & column_name) const
{
auto type = VirtualColumnFactory::tryGetType(column_name);
if (type)
return NameAndTypePair(column_name, type);
return IStorage::getColumn(column_name);
}
bool StorageMerge::hasColumn(const String & column_name) const
{
return VirtualColumnFactory::hasColumn(column_name) || IStorage::hasColumn(column_name);
}
bool StorageMerge::isRemote() const
{
auto database = context.getDatabase(source_database);
@ -67,38 +86,6 @@ bool StorageMerge::isRemote() const
return false;
}
NameAndTypePair StorageMerge::getColumn(const String & column_name) const
{
auto type = VirtualColumnFactory::tryGetType(column_name);
if (type)
return NameAndTypePair(column_name, type);
return IStorage::getColumn(column_name);
}
bool StorageMerge::hasColumn(const String & column_name) const
{
return VirtualColumnFactory::hasColumn(column_name) || IStorage::hasColumn(column_name);
}
static Names collectIdentifiersInFirstLevelOfSelectQuery(ASTPtr ast)
{
ASTSelectQuery & select = typeid_cast<ASTSelectQuery &>(*ast);
ASTExpressionList & node = typeid_cast<ASTExpressionList &>(*select.select_expression_list);
ASTs & asts = node.children;
Names names;
for (size_t i = 0; i < asts.size(); ++i)
{
if (const ASTIdentifier * identifier = typeid_cast<const ASTIdentifier *>(&* asts[i]))
{
if (identifier->kind == ASTIdentifier::Kind::Column)
names.push_back(identifier->name);
}
}
return names;
}
namespace
{
@ -137,12 +124,19 @@ BlockInputStreams StorageMerge::read(
{
BlockInputStreams res;
Names virt_column_names, real_column_names;
for (const auto & it : column_names)
if (it != "_table")
real_column_names.push_back(it);
bool has_table_virtual_column = false;
Names real_column_names;
real_column_names.reserve(column_names.size());
for (const auto & name : column_names)
{
if (name == "_table")
{
has_table_virtual_column = true;
}
else
virt_column_names.push_back(it);
real_column_names.push_back(name);
}
std::optional<QueryProcessingStage::Enum> processed_stage_in_source_tables;
@ -161,8 +155,8 @@ BlockInputStreams StorageMerge::read(
Block virtual_columns_block = getBlockWithVirtualColumns(selected_tables);
/// If at least one virtual column is requested, try indexing
if (!virt_column_names.empty())
/// If _table column is requested, try filtering
if (has_table_virtual_column)
{
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
@ -177,7 +171,8 @@ BlockInputStreams StorageMerge::read(
Context modified_context = context;
modified_context.getSettingsRef().optimize_move_to_prewhere = false;
Block header = getSampleBlockForColumns(real_column_names);
/// What will be result structure depending on query processed stage in source tables?
Block header;
size_t tables_count = selected_tables.size();
@ -219,24 +214,41 @@ BlockInputStreams StorageMerge::read(
throw Exception("Source tables for Merge table are processing data up to different stages",
ErrorCodes::INCOMPATIBLE_SOURCE_TABLES);
/// The table may return excessive columns if we query only its virtual column.
/// We filter excessive columns. This is done only if query was not processed more than FetchColumns.
if (processed_stage_in_source_table == QueryProcessingStage::FetchColumns)
if (!header)
{
switch (processed_stage_in_source_table)
{
case QueryProcessingStage::FetchColumns:
header = getSampleBlockForColumns(column_names);
break;
case QueryProcessingStage::WithMergeableState:
header = materializeBlock(InterpreterSelectQuery(query_info.query, context, QueryProcessingStage::WithMergeableState, 0,
std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names))).execute().in->getHeader());
break;
case QueryProcessingStage::Complete:
header = materializeBlock(InterpreterSelectQuery(query_info.query, context, QueryProcessingStage::Complete, 0,
std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names))).execute().in->getHeader());
break;
}
}
if (has_table_virtual_column)
for (auto & stream : source_streams)
stream = std::make_shared<FilterColumnsBlockInputStream>(stream, real_column_names, true);
stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
stream, std::make_shared<DataTypeString>(), table->getTableName(), "_table");
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
for (auto & stream : source_streams)
{
/// will throw if some columns not convertible
stream = std::make_shared<CastTypeBlockInputStream>(context, stream, header);
}
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
}
else
{
if (!processed_stage_in_source_tables)
throw Exception("Logical error: unknown processed stage in source tables", ErrorCodes::LOGICAL_ERROR);
/// If many streams, initialize it lazily, to avoid long delay before start of query processing.
source_streams.emplace_back(std::make_shared<LazyBlockInputStream>(header, [=]
source_streams.emplace_back(std::make_shared<LazyBlockInputStream>(header, [=]() -> BlockInputStreamPtr
{
QueryProcessingStage::Enum processed_stage_in_source_table = processed_stage;
BlockInputStreams streams = table->read(
@ -247,36 +259,30 @@ BlockInputStreams StorageMerge::read(
max_block_size,
1);
if (!processed_stage_in_source_tables)
throw Exception("Logical error: unknown processed stage in source tables",
ErrorCodes::LOGICAL_ERROR);
else if (processed_stage_in_source_table != *processed_stage_in_source_tables)
if (processed_stage_in_source_table != *processed_stage_in_source_tables)
throw Exception("Source tables for Merge table are processing data up to different stages",
ErrorCodes::INCOMPATIBLE_SOURCE_TABLES);
if (processed_stage_in_source_table == QueryProcessingStage::FetchColumns)
for (auto & stream : streams)
stream = std::make_shared<FilterColumnsBlockInputStream>(stream, real_column_names, true);
auto stream = streams.empty() ? std::make_shared<NullBlockInputStream>(header) : streams.front();
if (!streams.empty())
if (streams.empty())
{
/// will throw if some columns not convertible
stream = std::make_shared<CastTypeBlockInputStream>(context, stream, header);
return std::make_shared<NullBlockInputStream>(header);
}
else
{
BlockInputStreamPtr stream = streams.size() > 1 ? std::make_shared<ConcatBlockInputStream>(streams) : streams[0];
if (has_table_virtual_column)
stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
stream, std::make_shared<DataTypeString>(), table->getTableName(), "_table");
return std::make_shared<ConvertingBlockInputStream>(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
}
return stream;
}));
}
for (auto & stream : source_streams)
stream->addTableLock(table_lock);
for (auto & virtual_column : virt_column_names)
if (virtual_column == "_table")
for (auto & stream : source_streams)
stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
stream, std::make_shared<DataTypeString>(), table->getTableName(), "_table");
res.insert(res.end(), source_streams.begin(), source_streams.end());
}
@ -287,44 +293,6 @@ BlockInputStreams StorageMerge::read(
return res;
res = narrowBlockInputStreams(res, num_streams);
/// Added to avoid different block structure from different sources
if (!processed_stage_in_source_tables || *processed_stage_in_source_tables == QueryProcessingStage::FetchColumns)
{
for (auto & stream : res)
stream = std::make_shared<FilterColumnsBlockInputStream>(stream, column_names, true);
}
else
{
/// Blocks from distributed tables may have extra columns. TODO Why?
/// We need to remove them to make blocks compatible.
/// Remove columns that are in "column_names" but not in first level of SELECT query.
Names filtered_columns = res.at(0)->getHeader().getNames();
std::set<String> filtered_columns_set(filtered_columns.begin(), filtered_columns.end());
bool need_remove = false;
auto identifiers = collectIdentifiersInFirstLevelOfSelectQuery(query);
std::set<String> identifiers_set(identifiers.begin(), identifiers.end());
for (const auto & column : column_names)
{
if (filtered_columns_set.count(column) && !identifiers_set.count(column))
{
need_remove = true;
filtered_columns_set.erase(column);
}
}
if (need_remove)
{
filtered_columns.assign(filtered_columns_set.begin(), filtered_columns_set.end());
for (auto & stream : res)
stream = std::make_shared<FilterColumnsBlockInputStream>(stream, filtered_columns, true);
}
}
return res;
}

View File

@ -20,6 +20,6 @@ INSERT INTO test.test1 SELECT id, name FROM test.test2 ANY LEFT OUTER JOIN test.
DROP TABLE test.test1;
DROP TABLE test.test2;
DROP TABLE test.test3;
" 2>&1 | grep -F "Number of columns in INSERT SELECT doesn't match" | wc -l
" 2>&1 | grep -F "Number of columns doesn't match" | wc -l
$CLICKHOUSE_CLIENT --query="SELECT 1";