diff --git a/src/Core/Block.cpp b/src/Core/Block.cpp index 0c9a470dc1d..0f470c10b81 100644 --- a/src/Core/Block.cpp +++ b/src/Core/Block.cpp @@ -484,7 +484,7 @@ DataTypes Block::getDataTypes() const template -static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, const std::string & context_description) +static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, const std::string & context_description, bool allow_remove_constants) { auto on_error = [](const std::string & message [[maybe_unused]], int code [[maybe_unused]]) { @@ -515,7 +515,16 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons if (!actual.column || !expected.column) continue; - if (actual.column->getName() != expected.column->getName()) + const IColumn * actual_column = actual.column.get(); + + /// If we allow to remove constants, and expected column is not const, then unwrap actual constant column. + if (allow_remove_constants && !isColumnConst(*expected.column)) + { + if (const auto * column_const = typeid_cast(actual_column)) + actual_column = &column_const->getDataColumn(); + } + + if (actual_column->getName() != expected.column->getName()) return on_error("Block structure mismatch in " + context_description + " stream: different columns:\n" + lhs.dumpStructure() + "\n" + rhs.dumpStructure(), ErrorCodes::LOGICAL_ERROR); @@ -537,13 +546,25 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs) { - return checkBlockStructure(lhs, rhs, {}); + return checkBlockStructure(lhs, rhs, {}, false); } void assertBlocksHaveEqualStructure(const Block & lhs, const Block & rhs, const std::string & context_description) { - checkBlockStructure(lhs, rhs, context_description); + checkBlockStructure(lhs, rhs, context_description, false); +} + + +bool isCompatibleHeader(const Block & actual, const Block & desired) +{ + return checkBlockStructure(actual, desired, {}, true); +} + + +void assertCompatibleHeader(const Block & actual, const Block & desired, const std::string & context_description) +{ + checkBlockStructure(actual, desired, context_description, true); } diff --git a/src/Core/Block.h b/src/Core/Block.h index 0cfd17b27dc..6a94034b8fd 100644 --- a/src/Core/Block.h +++ b/src/Core/Block.h @@ -184,6 +184,12 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs); /// Throw exception when blocks are different. void assertBlocksHaveEqualStructure(const Block & lhs, const Block & rhs, const std::string & context_description); +/// Actual header is compatible to desired if block have equal structure except constants. +/// It is allowed when column from actual header is constant, but in desired is not. +/// If both columns are constant, it is checked that they have the same value. +bool isCompatibleHeader(const Block & actual, const Block & desired); +void assertCompatibleHeader(const Block & actual, const Block & desired, const std::string & context_description); + /// Calculate difference in structure of blocks and write description into output strings. NOTE It doesn't compare values of constant columns. void getBlocksDifference(const Block & lhs, const Block & rhs, std::string & out_lhs_diff, std::string & out_rhs_diff); diff --git a/src/Processors/Pipe.cpp b/src/Processors/Pipe.cpp index 677956db299..044975448ad 100644 --- a/src/Processors/Pipe.cpp +++ b/src/Processors/Pipe.cpp @@ -8,6 +8,7 @@ #include #include #include +#include namespace DB { @@ -250,6 +251,47 @@ static Pipes removeEmptyPipes(Pipes pipes) return res; } +/// Calculate common header for pipes. +/// This function is needed only to remove ColumnConst from common header in case if some columns are const, and some not. +/// E.g. if the first header is `x, const y, const z` and the second is `const x, y, const z`, the common header will be `x, y, const z`. +static Block getCommonHeader(const Pipes & pipes) +{ + Block res; + + for (const auto & pipe : pipes) + { + if (const auto & header = pipe.getHeader()) + { + res = header; + break; + } + } + + for (const auto & pipe : pipes) + { + const auto & header = pipe.getHeader(); + for (size_t i = 0; i < res.columns(); ++i) + { + /// We do not check that headers are compatible here. Will do it later. + + if (i >= header.columns()) + break; + + auto & common = res.getByPosition(i).column; + const auto & cur = header.getByPosition(i).column; + + /// Only remove const from common header if it is not const for current pipe. + if (cur && common && !isColumnConst(*cur)) + { + if (const auto * column_const = typeid_cast(common.get())) + common = column_const->getDataColumnPtr(); + } + } + } + + return res; +} + Pipe Pipe::unitePipes(Pipes pipes) { return Pipe::unitePipes(std::move(pipes), nullptr, false); @@ -276,23 +318,12 @@ Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors, bool allow OutputPortRawPtrs totals; OutputPortRawPtrs extremes; res.collected_processors = collected_processors; - res.header = pipes.front().header; - if (allow_empty_header && !res.header) - { - for (const auto & pipe : pipes) - { - if (const auto & header = pipe.getHeader()) - { - res.header = header; - break; - } - } - } + res.header = getCommonHeader(pipes); for (auto & pipe : pipes) { if (!allow_empty_header || pipe.header) - assertBlocksHaveEqualStructure(res.header, pipe.header, "Pipe::unitePipes"); + assertCompatibleHeader(pipe.header, res.header, "Pipe::unitePipes"); res.processors.insert(res.processors.end(), pipe.processors.begin(), pipe.processors.end()); res.output_ports.insert(res.output_ports.end(), pipe.output_ports.begin(), pipe.output_ports.end()); diff --git a/src/Processors/Port.cpp b/src/Processors/Port.cpp index 7e7ccb1adad..0a6026b27f2 100644 --- a/src/Processors/Port.cpp +++ b/src/Processors/Port.cpp @@ -16,7 +16,7 @@ void connect(OutputPort & output, InputPort & input) auto out_name = output.getProcessor().getName(); auto in_name = input.getProcessor().getName(); - assertBlocksHaveEqualStructure(input.getHeader(), output.getHeader(), " function connect between " + out_name + " and " + in_name); + assertCompatibleHeader(output.getHeader(), input.getHeader(), " function connect between " + out_name + " and " + in_name); input.output_port = &output; output.input_port = &input; diff --git a/src/Processors/QueryPipeline.cpp b/src/Processors/QueryPipeline.cpp index cabf5f19190..1b803ec0886 100644 --- a/src/Processors/QueryPipeline.cpp +++ b/src/Processors/QueryPipeline.cpp @@ -232,8 +232,6 @@ QueryPipeline QueryPipeline::unitePipelines( pipeline.checkInitialized(); pipeline.pipe.collected_processors = collected_processors; - assertBlocksHaveEqualStructure(pipeline.getHeader(), common_header, "QueryPipeline::unitePipelines"); - pipes.emplace_back(std::move(pipeline.pipe)); max_threads += pipeline.max_threads; diff --git a/tests/queries/0_stateless/01822_union_and_constans_error.reference b/tests/queries/0_stateless/01822_union_and_constans_error.reference new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/tests/queries/0_stateless/01822_union_and_constans_error.reference @@ -0,0 +1 @@ +1 diff --git a/tests/queries/0_stateless/01822_union_and_constans_error.sql b/tests/queries/0_stateless/01822_union_and_constans_error.sql new file mode 100644 index 00000000000..38b7df700cd --- /dev/null +++ b/tests/queries/0_stateless/01822_union_and_constans_error.sql @@ -0,0 +1,20 @@ +drop table if exists t0; +CREATE TABLE t0 (c0 String) ENGINE = Log(); + +SELECT isNull(t0.c0) OR COUNT('\n?pVa') +FROM t0 +GROUP BY t0.c0 +HAVING isNull(t0.c0) +UNION ALL +SELECT isNull(t0.c0) OR COUNT('\n?pVa') +FROM t0 +GROUP BY t0.c0 +HAVING NOT isNull(t0.c0) +UNION ALL +SELECT isNull(t0.c0) OR COUNT('\n?pVa') +FROM t0 +GROUP BY t0.c0 +HAVING isNull(isNull(t0.c0)) +SETTINGS aggregate_functions_null_for_empty = 1, enable_optimize_predicate_expression = 0; + +drop table if exists t0;