diff --git a/dbms/src/DataStreams/CastTypeBlockInputStream.cpp b/dbms/src/DataStreams/CastTypeBlockInputStream.cpp deleted file mode 100644 index 5028799d41d..00000000000 --- a/dbms/src/DataStreams/CastTypeBlockInputStream.cpp +++ /dev/null @@ -1,76 +0,0 @@ -#include -#include - - -namespace DB -{ - - -CastTypeBlockInputStream::CastTypeBlockInputStream( - const Context & context_, - const BlockInputStreamPtr & input, - const Block & reference_definition) - : context(context_) -{ - children.emplace_back(input); - - Block input_header = input->getHeader(); - - for (size_t col_num = 0, num_columns = input_header.columns(); col_num < num_columns; ++col_num) - { - const auto & elem = input_header.getByPosition(col_num); - - if (!reference_definition.has(elem.name)) - { - header.insert(elem); - continue; - } - - const auto & ref_column = reference_definition.getByName(elem.name); - - /// Force conversion if source and destination types is different. - if (ref_column.type->equals(*elem.type)) - { - header.insert(elem); - } - else - { - header.insert({ castColumn(elem, ref_column.type, context), ref_column.type, elem.name }); - cast_description.emplace(col_num, ref_column.type); - } - } -} - -String CastTypeBlockInputStream::getName() const -{ - return "CastType"; -} - -Block CastTypeBlockInputStream::readImpl() -{ - Block block = children.back()->read(); - - if (!block) - return block; - - if (cast_description.empty()) - return block; - - size_t num_columns = block.columns(); - Block res = block; - - for (size_t col = 0; col < num_columns; ++col) - { - auto it = cast_description.find(col); - if (cast_description.end() != it) - { - auto & elem = res.getByPosition(col); - elem.column = castColumn(elem, it->second, context); - elem.type = it->second; - } - } - - return res; -} - -} diff --git a/dbms/src/DataStreams/CastTypeBlockInputStream.h b/dbms/src/DataStreams/CastTypeBlockInputStream.h deleted file mode 100644 index f84f6dacf0e..00000000000 --- a/dbms/src/DataStreams/CastTypeBlockInputStream.h +++ /dev/null @@ -1,33 +0,0 @@ -#pragma once - -#include -#include - - -namespace DB -{ - -/// Implicitly converts types. -class CastTypeBlockInputStream : public IProfilingBlockInputStream -{ -public: - CastTypeBlockInputStream(const Context & context, - const BlockInputStreamPtr & input, - const Block & reference_definition); - - String getName() const override; - - Block getHeader() const override { return header; } - -private: - Block readImpl() override; - - const Context & context; - Block header; - - /// Describes required conversions on source block - /// Contains column numbers in source block that should be converted - std::unordered_map cast_description; -}; - -} diff --git a/dbms/src/DataStreams/ConvertingBlockInputStream.cpp b/dbms/src/DataStreams/ConvertingBlockInputStream.cpp new file mode 100644 index 00000000000..8313f5820e5 --- /dev/null +++ b/dbms/src/DataStreams/ConvertingBlockInputStream.cpp @@ -0,0 +1,100 @@ +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int THERE_IS_NO_COLUMN; + extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE; + extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH; +} + + +ConvertingBlockInputStream::ConvertingBlockInputStream( + const Context & context_, + const BlockInputStreamPtr & input, + const Block & result_header, + MatchColumnsMode mode) + : context(context_), header(result_header), conversion(header.columns()) +{ + children.emplace_back(input); + + Block input_header = input->getHeader(); + + size_t num_input_columns = input_header.columns(); + size_t num_result_columns = result_header.columns(); + + if (mode == MatchColumnsMode::Position && num_input_columns != num_result_columns) + throw Exception("Number of columns doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH); + + for (size_t result_col_num = 0; result_col_num < num_result_columns; ++result_col_num) + { + const auto & res_elem = result_header.getByPosition(result_col_num); + + switch (mode) + { + case MatchColumnsMode::Position: + conversion[result_col_num] = result_col_num; + break; + + case MatchColumnsMode::Name: + if (input_header.has(res_elem.name)) + conversion[result_col_num] = input_header.getPositionByName(res_elem.name); + else + throw Exception("Cannot find column " + backQuoteIfNeed(res_elem.name) + " in source stream", + ErrorCodes::THERE_IS_NO_COLUMN); + break; + } + + const auto & src_elem = input_header.getByPosition(conversion[result_col_num]); + + /// Check constants. + + if (res_elem.column->isColumnConst()) + { + if (!src_elem.column->isColumnConst()) + throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name) + + " because it is non constant in source stream but must be constant in result", + ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE); + else if (static_cast(*src_elem.column).getField() != static_cast(*res_elem.column).getField()) + throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name) + + " because it is constant but values of constants are different in source and result", + ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE); + } + + /// Check conversion by dry run CAST function. + + castColumn(src_elem, res_elem.type, context); + } +} + + +Block ConvertingBlockInputStream::readImpl() +{ + Block src = children.back()->read(); + + if (!src) + return src; + + Block res = header.cloneEmpty(); + for (size_t res_pos = 0, size = conversion.size(); res_pos < size; ++res_pos) + { + const auto & src_elem = src.getByPosition(conversion[res_pos]); + auto & res_elem = res.getByPosition(res_pos); + + ColumnPtr converted = castColumn(src_elem, res_elem.type, context); + + if (src_elem.column->isColumnConst() && !res_elem.column->isColumnConst()) + converted = converted->convertToFullColumnIfConst(); + + res_elem.column = std::move(converted); + } + return res; +} + +} diff --git a/dbms/src/DataStreams/ConvertingBlockInputStream.h b/dbms/src/DataStreams/ConvertingBlockInputStream.h new file mode 100644 index 00000000000..e4511477a72 --- /dev/null +++ b/dbms/src/DataStreams/ConvertingBlockInputStream.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +/** Convert one block structure to another: + * + * Leaves only necessary columns; + * + * Columns are searched in source first by name; + * and if there is no column with same name, then by position. + * + * Converting types of matching columns (with CAST function). + * + * Materializing columns which are const in source and non-const in result, + * throw if they are const in result and non const in source, + * or if they are const and have different values. + */ +class ConvertingBlockInputStream : public IProfilingBlockInputStream +{ +public: + enum class MatchColumnsMode + { + /// Require same number of columns in source and result. Match columns by corresponding positions, regardless to names. + Position, + /// Find columns in source by their names. Allow excessive columns in source. + Name + }; + + ConvertingBlockInputStream( + const Context & context, + const BlockInputStreamPtr & input, + const Block & result_header, + MatchColumnsMode mode); + + String getName() const override { return "Converting"; } + Block getHeader() const override { return header; } + +private: + Block readImpl() override; + + const Context & context; + Block header; + + /// How to construct result block. Position in source block, where to get each column. + using Conversion = std::vector; + Conversion conversion; +}; + +} diff --git a/dbms/src/DataStreams/NullableAdapterBlockInputStream.cpp b/dbms/src/DataStreams/NullableAdapterBlockInputStream.cpp deleted file mode 100644 index d7d23633b72..00000000000 --- a/dbms/src/DataStreams/NullableAdapterBlockInputStream.cpp +++ /dev/null @@ -1,123 +0,0 @@ -#include -#include -#include -#include - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN; - extern const int TYPE_MISMATCH; - extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH; -} - - -static Block transform(const Block & block, const NullableAdapterBlockInputStream::Actions & actions, const std::vector> & rename) -{ - size_t num_columns = block.columns(); - - Block res; - for (size_t i = 0; i < num_columns; ++i) - { - const auto & elem = block.getByPosition(i); - - switch (actions[i]) - { - case NullableAdapterBlockInputStream::TO_ORDINARY: - { - const auto & nullable_col = static_cast(*elem.column); - const auto & nullable_type = static_cast(*elem.type); - - const auto & null_map = nullable_col.getNullMapData(); - bool has_nulls = !memoryIsZero(null_map.data(), null_map.size()); - - if (has_nulls) - throw Exception{"Cannot insert NULL value into non-nullable column", - ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN}; - else - res.insert({ - nullable_col.getNestedColumnPtr(), - nullable_type.getNestedType(), - rename[i].value_or(elem.name)}); - break; - } - case NullableAdapterBlockInputStream::TO_NULLABLE: - { - ColumnPtr null_map = ColumnUInt8::create(elem.column->size(), 0); - - res.insert({ - ColumnNullable::create(elem.column, null_map), - std::make_shared(elem.type), - rename[i].value_or(elem.name)}); - break; - } - case NullableAdapterBlockInputStream::NONE: - { - res.insert({elem.column, elem.type, rename[i].value_or(elem.name)}); - break; - } - } - } - - return res; -} - - -NullableAdapterBlockInputStream::NullableAdapterBlockInputStream( - const BlockInputStreamPtr & input, - const Block & src_header, const Block & res_header) -{ - buildActions(src_header, res_header); - children.push_back(input); - header = transform(src_header, actions, rename); -} - - -Block NullableAdapterBlockInputStream::readImpl() -{ - Block block = children.back()->read(); - - if (!block) - return block; - - return transform(block, actions, rename); -} - -void NullableAdapterBlockInputStream::buildActions( - const Block & src_header, - const Block & res_header) -{ - size_t in_size = src_header.columns(); - - if (res_header.columns() != in_size) - throw Exception("Number of columns in INSERT SELECT doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH); - - actions.reserve(in_size); - rename.reserve(in_size); - - for (size_t i = 0; i < in_size; ++i) - { - const auto & in_elem = src_header.getByPosition(i); - const auto & out_elem = res_header.getByPosition(i); - - bool is_in_nullable = in_elem.type->isNullable(); - bool is_out_nullable = out_elem.type->isNullable(); - - if (is_in_nullable && !is_out_nullable) - actions.push_back(TO_ORDINARY); - else if (!is_in_nullable && is_out_nullable) - actions.push_back(TO_NULLABLE); - else - actions.push_back(NONE); - - if (in_elem.name != out_elem.name) - rename.emplace_back(std::make_optional(out_elem.name)); - else - rename.emplace_back(); - } -} - -} diff --git a/dbms/src/DataStreams/NullableAdapterBlockInputStream.h b/dbms/src/DataStreams/NullableAdapterBlockInputStream.h deleted file mode 100644 index 60c2b2ec16e..00000000000 --- a/dbms/src/DataStreams/NullableAdapterBlockInputStream.h +++ /dev/null @@ -1,57 +0,0 @@ -#pragma once - -#include -#include - -namespace DB -{ - -/// This stream allows perfoming INSERT requests in which the types of -/// the target and source blocks are compatible up to nullability: -/// -/// - if a target column is nullable while the corresponding source -/// column is not, we embed the source column into a nullable column; -/// - if a source column is nullable while the corresponding target -/// column is not, we extract the nested column from the source -/// while checking that is doesn't actually contain NULLs; -/// - otherwise we just perform an identity mapping. -class NullableAdapterBlockInputStream : public IProfilingBlockInputStream -{ -public: - NullableAdapterBlockInputStream(const BlockInputStreamPtr & input, const Block & src_header, const Block & res_header); - - String getName() const override { return "NullableAdapter"; } - - Block getHeader() const override { return header; } - - - /// Given a column of a block we have just read, - /// how must we process it? - enum Action - { - /// Do nothing. - NONE = 0, - /// Convert nullable column to ordinary column. - TO_ORDINARY, - /// Convert non-nullable column to nullable column. - TO_NULLABLE - }; - - /// Actions to be taken for each column of a block. - using Actions = std::vector; - -private: - Block readImpl() override; - - /// Determine the actions to be taken using the source sample block, - /// which describes the columns from which we fetch data inside an INSERT - /// query, and the target sample block which contains the columns - /// we insert data into. - void buildActions(const Block & src_header, const Block & res_header); - - Block header; - Actions actions; - std::vector> rename; -}; - -} diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index 60cd5ea70cb..a01b42761da 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -3,11 +3,9 @@ #include #include -#include #include -#include +#include #include -#include #include #include #include @@ -104,8 +102,6 @@ BlockIO InterpreterInsertQuery::execute() out = std::make_shared(query.database, query.table, table, context, query_ptr, query.no_destination); - out = std::make_shared(out, table->getSampleBlock()); - out = std::make_shared( out, getSampleBlock(query, table), required_columns, table->column_defaults, context, static_cast(context.getSettingsRef().strict_insert_defaults)); @@ -127,8 +123,7 @@ BlockIO InterpreterInsertQuery::execute() res.in = interpreter_select.execute().in; - res.in = std::make_shared(res.in, res.in->getHeader(), res.out->getHeader()); - res.in = std::make_shared(context, res.in, res.out->getHeader()); + res.in = std::make_shared(context, res.in, res.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Position); res.in = std::make_shared(res.in, res.out); res.out = nullptr; diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index da60ef7ada4..07ba3675546 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -186,6 +186,8 @@ void InterpreterSelectQuery::basicInit() } table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); + + /// TODO This looks weird. source_header = storage->getSampleBlockNonMaterialized(); } } @@ -402,6 +404,87 @@ void InterpreterSelectQuery::executeWithoutUnionImpl(Pipeline & pipeline, const executeSingleQuery(pipeline); } + +InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage) +{ + AnalysisResult res; + + /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. + res.first_stage = from_stage < QueryProcessingStage::WithMergeableState + && to_stage >= QueryProcessingStage::WithMergeableState; + /// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing. + res.second_stage = from_stage <= QueryProcessingStage::WithMergeableState + && to_stage > QueryProcessingStage::WithMergeableState; + + /** First we compose a chain of actions and remember the necessary steps from it. + * Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and + * throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries. + */ + + { + ExpressionActionsChain chain; + + res.need_aggregate = query_analyzer->hasAggregation(); + + query_analyzer->appendArrayJoin(chain, !res.first_stage); + + if (query_analyzer->appendJoin(chain, !res.first_stage)) + { + res.has_join = true; + res.before_join = chain.getLastActions(); + chain.addStep(); + } + + if (query_analyzer->appendWhere(chain, !res.first_stage)) + { + res.has_where = true; + res.before_where = chain.getLastActions(); + chain.addStep(); + } + + if (res.need_aggregate) + { + query_analyzer->appendGroupBy(chain, !res.first_stage); + query_analyzer->appendAggregateFunctionsArguments(chain, !res.first_stage); + res.before_aggregation = chain.getLastActions(); + + chain.finalize(); + chain.clear(); + + if (query_analyzer->appendHaving(chain, !res.second_stage)) + { + res.has_having = true; + res.before_having = chain.getLastActions(); + chain.addStep(); + } + } + + /// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers. + query_analyzer->appendSelect(chain, res.need_aggregate ? !res.second_stage : !res.first_stage); + res.selected_columns = chain.getLastStep().required_output; + res.has_order_by = query_analyzer->appendOrderBy(chain, res.need_aggregate ? !res.second_stage : !res.first_stage); + res.before_order_and_select = chain.getLastActions(); + chain.addStep(); + + query_analyzer->appendProjectResult(chain); + res.final_projection = chain.getLastActions(); + + chain.finalize(); + chain.clear(); + } + + /// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys). + if (res.has_where) + res.before_where->prependProjectInput(); + if (res.has_having) + res.before_having->prependProjectInput(); + + res.subqueries_for_sets = query_analyzer->getSubqueriesForSets(); + + return res; +} + + void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline) { /** Streams of data. When the query is executed in parallel, we have several data streams. @@ -423,101 +506,17 @@ void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline) LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage)); + AnalysisResult expressions = analyzeExpressions(from_stage); + const Settings & settings = context.getSettingsRef(); if (to_stage > QueryProcessingStage::FetchColumns) { - bool has_join = false; - bool has_where = false; - bool need_aggregate = false; - bool has_having = false; - bool has_order_by = false; - - ExpressionActionsPtr before_join; /// including JOIN - ExpressionActionsPtr before_where; - ExpressionActionsPtr before_aggregation; - ExpressionActionsPtr before_having; - ExpressionActionsPtr before_order_and_select; - ExpressionActionsPtr final_projection; - - /// Columns from the SELECT list, before renaming them to aliases. - Names selected_columns; - - /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. - bool first_stage = from_stage < QueryProcessingStage::WithMergeableState - && to_stage >= QueryProcessingStage::WithMergeableState; - /// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing. - bool second_stage = from_stage <= QueryProcessingStage::WithMergeableState - && to_stage > QueryProcessingStage::WithMergeableState; - - /** First we compose a chain of actions and remember the necessary steps from it. - * Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and - * throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries. - */ - - { - ExpressionActionsChain chain; - - need_aggregate = query_analyzer->hasAggregation(); - - query_analyzer->appendArrayJoin(chain, !first_stage); - - if (query_analyzer->appendJoin(chain, !first_stage)) - { - has_join = true; - before_join = chain.getLastActions(); - chain.addStep(); - } - - if (query_analyzer->appendWhere(chain, !first_stage)) - { - has_where = true; - before_where = chain.getLastActions(); - chain.addStep(); - } - - if (need_aggregate) - { - query_analyzer->appendGroupBy(chain, !first_stage); - query_analyzer->appendAggregateFunctionsArguments(chain, !first_stage); - before_aggregation = chain.getLastActions(); - - chain.finalize(); - chain.clear(); - - if (query_analyzer->appendHaving(chain, !second_stage)) - { - has_having = true; - before_having = chain.getLastActions(); - chain.addStep(); - } - } - - /// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers. - query_analyzer->appendSelect(chain, need_aggregate ? !second_stage : !first_stage); - selected_columns = chain.getLastStep().required_output; - has_order_by = query_analyzer->appendOrderBy(chain, need_aggregate ? !second_stage : !first_stage); - before_order_and_select = chain.getLastActions(); - chain.addStep(); - - query_analyzer->appendProjectResult(chain); - final_projection = chain.getLastActions(); - - chain.finalize(); - chain.clear(); - } - - /// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys). - if (has_where) - before_where->prependProjectInput(); - if (has_having) - before_having->prependProjectInput(); - /// Now we will compose block streams that perform the necessary actions. /// Do I need to aggregate in a separate row rows that have not passed max_rows_to_group_by. bool aggregate_overflow_row = - need_aggregate && + expressions.need_aggregate && query.group_by_with_totals && settings.limits.max_rows_to_group_by && settings.limits.group_by_overflow_mode == OverflowMode::ANY && @@ -525,32 +524,32 @@ void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline) /// Do I need to immediately finalize the aggregate functions after the aggregation? bool aggregate_final = - need_aggregate && + expressions.need_aggregate && to_stage > QueryProcessingStage::WithMergeableState && !query.group_by_with_totals; - if (first_stage) + if (expressions.first_stage) { - if (has_join) + if (expressions.has_join) { const ASTTableJoin & join = static_cast(*query.join()->table_join); if (join.kind == ASTTableJoin::Kind::Full || join.kind == ASTTableJoin::Kind::Right) - pipeline.stream_with_non_joined_data = before_join->createStreamWithNonJoinedDataIfFullOrRightJoin( + pipeline.stream_with_non_joined_data = expressions.before_join->createStreamWithNonJoinedDataIfFullOrRightJoin( pipeline.firstStream()->getHeader(), settings.max_block_size); for (auto & stream : pipeline.streams) /// Applies to all sources except stream_with_non_joined_data. - stream = std::make_shared(stream, before_join); + stream = std::make_shared(stream, expressions.before_join); } - if (has_where) - executeWhere(pipeline, before_where); + if (expressions.has_where) + executeWhere(pipeline, expressions.before_where); - if (need_aggregate) - executeAggregation(pipeline, before_aggregation, aggregate_overflow_row, aggregate_final); + if (expressions.need_aggregate) + executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final); else { - executeExpression(pipeline, before_order_and_select); - executeDistinct(pipeline, true, selected_columns); + executeExpression(pipeline, expressions.before_order_and_select); + executeDistinct(pipeline, true, expressions.selected_columns); } /** For distributed query processing, @@ -558,36 +557,36 @@ void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline) * but there is an ORDER or LIMIT, * then we will perform the preliminary sorting and LIMIT on the remote server. */ - if (!second_stage && !need_aggregate && !has_having) + if (!expressions.second_stage && !expressions.need_aggregate && !expressions.has_having) { - if (has_order_by) + if (expressions.has_order_by) executeOrder(pipeline); - if (has_order_by && query.limit_length) - executeDistinct(pipeline, false, selected_columns); + if (expressions.has_order_by && query.limit_length) + executeDistinct(pipeline, false, expressions.selected_columns); if (query.limit_length) executePreLimit(pipeline); } } - if (second_stage) + if (expressions.second_stage) { bool need_second_distinct_pass; - if (need_aggregate) + if (expressions.need_aggregate) { /// If you need to combine aggregated results from multiple servers - if (!first_stage) + if (!expressions.first_stage) executeMergeAggregated(pipeline, aggregate_overflow_row, aggregate_final); if (!aggregate_final) - executeTotalsAndHaving(pipeline, has_having, before_having, aggregate_overflow_row); - else if (has_having) - executeHaving(pipeline, before_having); + executeTotalsAndHaving(pipeline, expressions.has_having, expressions.before_having, aggregate_overflow_row); + else if (expressions.has_having) + executeHaving(pipeline, expressions.before_having); - executeExpression(pipeline, before_order_and_select); - executeDistinct(pipeline, true, selected_columns); + executeExpression(pipeline, expressions.before_order_and_select); + executeDistinct(pipeline, true, expressions.selected_columns); need_second_distinct_pass = query.distinct && pipeline.hasMoreThanOneStream(); } @@ -599,19 +598,19 @@ void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline) executeTotalsAndHaving(pipeline, false, nullptr, aggregate_overflow_row); } - if (has_order_by) + if (expressions.has_order_by) { /** If there is an ORDER BY for distributed query processing, * but there is no aggregation, then on the remote servers ORDER BY was made * - therefore, we merge the sorted streams from remote servers. */ - if (!first_stage && !need_aggregate && !(query.group_by_with_totals && !aggregate_final)) + if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final)) executeMergeSorted(pipeline); else /// Otherwise, just sort. executeOrder(pipeline); } - executeProjection(pipeline, final_projection); + executeProjection(pipeline, expressions.final_projection); /// At this stage, we can calculate the minimums and maximums, if necessary. if (settings.extremes) @@ -624,8 +623,8 @@ void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline) } /** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT, - * limiting the number of entries in each up to `offset + limit`. - */ + * limiting the number of rows in each up to `offset + limit`. + */ if (query.limit_length && pipeline.hasMoreThanOneStream() && !query.distinct && !query.limit_by_expression_list) executePreLimit(pipeline); @@ -653,9 +652,8 @@ void InterpreterSelectQuery::executeSingleQuery(Pipeline & pipeline) } } - SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets(); - if (!subqueries_for_sets.empty()) - executeSubqueriesInSetsAndJoins(pipeline, subqueries_for_sets); + if (!expressions.subqueries_for_sets.empty()) + executeSubqueriesInSetsAndJoins(pipeline, expressions.subqueries_for_sets); } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index 14416e5fd46..aa87c048af0 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -128,6 +129,36 @@ private: /// Execute one SELECT query from the UNION ALL chain. void executeSingleQuery(Pipeline & pipeline); + + struct AnalysisResult + { + bool has_join = false; + bool has_where = false; + bool need_aggregate = false; + bool has_having = false; + bool has_order_by = false; + + ExpressionActionsPtr before_join; /// including JOIN + ExpressionActionsPtr before_where; + ExpressionActionsPtr before_aggregation; + ExpressionActionsPtr before_having; + ExpressionActionsPtr before_order_and_select; + ExpressionActionsPtr final_projection; + + /// Columns from the SELECT list, before renaming them to aliases. + Names selected_columns; + + /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. + bool first_stage = false; + /// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing. + bool second_stage = false; + + SubqueriesForSets subqueries_for_sets; + }; + + AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage); + + /** Leave only the necessary columns of the SELECT section in each query of the UNION ALL chain. * However, if you use at least one DISTINCT in the chain, then all the columns are considered necessary, * since otherwise DISTINCT would work differently. @@ -150,10 +181,6 @@ private: */ void getDatabaseAndTableNames(String & database_name, String & table_name); - /** Select from the list of columns any, better - with minimum size. - */ - String getAnyColumn(); - /// Different stages of query execution. /// Fetch data from the table. Returns the stage to which the query was processed in Storage. diff --git a/dbms/src/Storages/StorageMerge.cpp b/dbms/src/Storages/StorageMerge.cpp index d94a41e37ab..6193ffce779 100644 --- a/dbms/src/Storages/StorageMerge.cpp +++ b/dbms/src/Storages/StorageMerge.cpp @@ -2,23 +2,26 @@ #include #include #include +#include +#include +#include +#include #include #include #include +#include #include #include #include -#include +#include #include #include +#include +#include #include #include #include #include -#include -#include -#include -#include namespace DB @@ -47,6 +50,22 @@ StorageMerge::StorageMerge( { } + +NameAndTypePair StorageMerge::getColumn(const String & column_name) const +{ + auto type = VirtualColumnFactory::tryGetType(column_name); + if (type) + return NameAndTypePair(column_name, type); + + return IStorage::getColumn(column_name); +} + +bool StorageMerge::hasColumn(const String & column_name) const +{ + return VirtualColumnFactory::hasColumn(column_name) || IStorage::hasColumn(column_name); +} + + bool StorageMerge::isRemote() const { auto database = context.getDatabase(source_database); @@ -67,38 +86,6 @@ bool StorageMerge::isRemote() const return false; } -NameAndTypePair StorageMerge::getColumn(const String & column_name) const -{ - auto type = VirtualColumnFactory::tryGetType(column_name); - if (type) - return NameAndTypePair(column_name, type); - - return IStorage::getColumn(column_name); -} - -bool StorageMerge::hasColumn(const String & column_name) const -{ - return VirtualColumnFactory::hasColumn(column_name) || IStorage::hasColumn(column_name); -} - -static Names collectIdentifiersInFirstLevelOfSelectQuery(ASTPtr ast) -{ - ASTSelectQuery & select = typeid_cast(*ast); - ASTExpressionList & node = typeid_cast(*select.select_expression_list); - ASTs & asts = node.children; - - Names names; - for (size_t i = 0; i < asts.size(); ++i) - { - if (const ASTIdentifier * identifier = typeid_cast(&* asts[i])) - { - if (identifier->kind == ASTIdentifier::Kind::Column) - names.push_back(identifier->name); - } - } - return names; -} - namespace { @@ -137,12 +124,19 @@ BlockInputStreams StorageMerge::read( { BlockInputStreams res; - Names virt_column_names, real_column_names; - for (const auto & it : column_names) - if (it != "_table") - real_column_names.push_back(it); + bool has_table_virtual_column = false; + Names real_column_names; + real_column_names.reserve(column_names.size()); + + for (const auto & name : column_names) + { + if (name == "_table") + { + has_table_virtual_column = true; + } else - virt_column_names.push_back(it); + real_column_names.push_back(name); + } std::optional processed_stage_in_source_tables; @@ -161,8 +155,8 @@ BlockInputStreams StorageMerge::read( Block virtual_columns_block = getBlockWithVirtualColumns(selected_tables); - /// If at least one virtual column is requested, try indexing - if (!virt_column_names.empty()) + /// If _table column is requested, try filtering + if (has_table_virtual_column) { VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context); auto values = VirtualColumnUtils::extractSingleValueFromBlock(virtual_columns_block, "_table"); @@ -177,7 +171,8 @@ BlockInputStreams StorageMerge::read( Context modified_context = context; modified_context.getSettingsRef().optimize_move_to_prewhere = false; - Block header = getSampleBlockForColumns(real_column_names); + /// What will be result structure depending on query processed stage in source tables? + Block header; size_t tables_count = selected_tables.size(); @@ -219,24 +214,41 @@ BlockInputStreams StorageMerge::read( throw Exception("Source tables for Merge table are processing data up to different stages", ErrorCodes::INCOMPATIBLE_SOURCE_TABLES); - /// The table may return excessive columns if we query only its virtual column. - /// We filter excessive columns. This is done only if query was not processed more than FetchColumns. - if (processed_stage_in_source_table == QueryProcessingStage::FetchColumns) + if (!header) + { + switch (processed_stage_in_source_table) + { + case QueryProcessingStage::FetchColumns: + header = getSampleBlockForColumns(column_names); + break; + case QueryProcessingStage::WithMergeableState: + header = materializeBlock(InterpreterSelectQuery(query_info.query, context, QueryProcessingStage::WithMergeableState, 0, + std::make_shared(getSampleBlockForColumns(column_names))).execute().in->getHeader()); + break; + case QueryProcessingStage::Complete: + header = materializeBlock(InterpreterSelectQuery(query_info.query, context, QueryProcessingStage::Complete, 0, + std::make_shared(getSampleBlockForColumns(column_names))).execute().in->getHeader()); + break; + } + } + + if (has_table_virtual_column) for (auto & stream : source_streams) - stream = std::make_shared(stream, real_column_names, true); + stream = std::make_shared>( + stream, std::make_shared(), table->getTableName(), "_table"); /// Subordinary tables could have different but convertible types, like numeric types of different width. /// We must return streams with structure equals to structure of Merge table. for (auto & stream : source_streams) - { - /// will throw if some columns not convertible - stream = std::make_shared(context, stream, header); - } + stream = std::make_shared(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name); } else { + if (!processed_stage_in_source_tables) + throw Exception("Logical error: unknown processed stage in source tables", ErrorCodes::LOGICAL_ERROR); + /// If many streams, initialize it lazily, to avoid long delay before start of query processing. - source_streams.emplace_back(std::make_shared(header, [=] + source_streams.emplace_back(std::make_shared(header, [=]() -> BlockInputStreamPtr { QueryProcessingStage::Enum processed_stage_in_source_table = processed_stage; BlockInputStreams streams = table->read( @@ -247,36 +259,30 @@ BlockInputStreams StorageMerge::read( max_block_size, 1); - if (!processed_stage_in_source_tables) - throw Exception("Logical error: unknown processed stage in source tables", - ErrorCodes::LOGICAL_ERROR); - else if (processed_stage_in_source_table != *processed_stage_in_source_tables) + if (processed_stage_in_source_table != *processed_stage_in_source_tables) throw Exception("Source tables for Merge table are processing data up to different stages", ErrorCodes::INCOMPATIBLE_SOURCE_TABLES); - if (processed_stage_in_source_table == QueryProcessingStage::FetchColumns) - for (auto & stream : streams) - stream = std::make_shared(stream, real_column_names, true); - - auto stream = streams.empty() ? std::make_shared(header) : streams.front(); - if (!streams.empty()) + if (streams.empty()) { - /// will throw if some columns not convertible - stream = std::make_shared(context, stream, header); + return std::make_shared(header); + } + else + { + BlockInputStreamPtr stream = streams.size() > 1 ? std::make_shared(streams) : streams[0]; + + if (has_table_virtual_column) + stream = std::make_shared>( + stream, std::make_shared(), table->getTableName(), "_table"); + + return std::make_shared(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name); } - return stream; })); } for (auto & stream : source_streams) stream->addTableLock(table_lock); - for (auto & virtual_column : virt_column_names) - if (virtual_column == "_table") - for (auto & stream : source_streams) - stream = std::make_shared>( - stream, std::make_shared(), table->getTableName(), "_table"); - res.insert(res.end(), source_streams.begin(), source_streams.end()); } @@ -287,44 +293,6 @@ BlockInputStreams StorageMerge::read( return res; res = narrowBlockInputStreams(res, num_streams); - - /// Added to avoid different block structure from different sources - if (!processed_stage_in_source_tables || *processed_stage_in_source_tables == QueryProcessingStage::FetchColumns) - { - for (auto & stream : res) - stream = std::make_shared(stream, column_names, true); - } - else - { - /// Blocks from distributed tables may have extra columns. TODO Why? - /// We need to remove them to make blocks compatible. - - /// Remove columns that are in "column_names" but not in first level of SELECT query. - - Names filtered_columns = res.at(0)->getHeader().getNames(); - std::set filtered_columns_set(filtered_columns.begin(), filtered_columns.end()); - bool need_remove = false; - - auto identifiers = collectIdentifiersInFirstLevelOfSelectQuery(query); - std::set identifiers_set(identifiers.begin(), identifiers.end()); - - for (const auto & column : column_names) - { - if (filtered_columns_set.count(column) && !identifiers_set.count(column)) - { - need_remove = true; - filtered_columns_set.erase(column); - } - } - - if (need_remove) - { - filtered_columns.assign(filtered_columns_set.begin(), filtered_columns_set.end()); - for (auto & stream : res) - stream = std::make_shared(stream, filtered_columns, true); - } - } - return res; } diff --git a/dbms/tests/queries/0_stateless/00550_join_insert_select.sh b/dbms/tests/queries/0_stateless/00550_join_insert_select.sh index 6cc71fedc06..3e78942f989 100755 --- a/dbms/tests/queries/0_stateless/00550_join_insert_select.sh +++ b/dbms/tests/queries/0_stateless/00550_join_insert_select.sh @@ -20,6 +20,6 @@ INSERT INTO test.test1 SELECT id, name FROM test.test2 ANY LEFT OUTER JOIN test. DROP TABLE test.test1; DROP TABLE test.test2; DROP TABLE test.test3; -" 2>&1 | grep -F "Number of columns in INSERT SELECT doesn't match" | wc -l +" 2>&1 | grep -F "Number of columns doesn't match" | wc -l $CLICKHOUSE_CLIENT --query="SELECT 1";