Merge pull request #23359 from ClickHouse/try-fix-23029

Fix #23029
This commit is contained in:
Nikolai Kochetov 2021-04-21 08:46:21 +03:00 committed by GitHub
commit 9ec6fd1943
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 97 additions and 20 deletions

View File

@ -484,7 +484,7 @@ DataTypes Block::getDataTypes() const
template <typename ReturnType>
static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, const std::string & context_description)
static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, const std::string & context_description, bool allow_remove_constants)
{
auto on_error = [](const std::string & message [[maybe_unused]], int code [[maybe_unused]])
{
@ -515,7 +515,16 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons
if (!actual.column || !expected.column)
continue;
if (actual.column->getName() != expected.column->getName())
const IColumn * actual_column = actual.column.get();
/// If we allow to remove constants, and expected column is not const, then unwrap actual constant column.
if (allow_remove_constants && !isColumnConst(*expected.column))
{
if (const auto * column_const = typeid_cast<const ColumnConst *>(actual_column))
actual_column = &column_const->getDataColumn();
}
if (actual_column->getName() != expected.column->getName())
return on_error("Block structure mismatch in " + context_description + " stream: different columns:\n"
+ lhs.dumpStructure() + "\n" + rhs.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
@ -537,13 +546,25 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
{
return checkBlockStructure<bool>(lhs, rhs, {});
return checkBlockStructure<bool>(lhs, rhs, {}, false);
}
void assertBlocksHaveEqualStructure(const Block & lhs, const Block & rhs, const std::string & context_description)
{
checkBlockStructure<void>(lhs, rhs, context_description);
checkBlockStructure<void>(lhs, rhs, context_description, false);
}
bool isCompatibleHeader(const Block & actual, const Block & desired)
{
return checkBlockStructure<bool>(actual, desired, {}, true);
}
void assertCompatibleHeader(const Block & actual, const Block & desired, const std::string & context_description)
{
checkBlockStructure<void>(actual, desired, context_description, true);
}

View File

@ -184,6 +184,12 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs);
/// Throw exception when blocks are different.
void assertBlocksHaveEqualStructure(const Block & lhs, const Block & rhs, const std::string & context_description);
/// Actual header is compatible to desired if block have equal structure except constants.
/// It is allowed when column from actual header is constant, but in desired is not.
/// If both columns are constant, it is checked that they have the same value.
bool isCompatibleHeader(const Block & actual, const Block & desired);
void assertCompatibleHeader(const Block & actual, const Block & desired, const std::string & context_description);
/// Calculate difference in structure of blocks and write description into output strings. NOTE It doesn't compare values of constant columns.
void getBlocksDifference(const Block & lhs, const Block & rhs, std::string & out_lhs_diff, std::string & out_rhs_diff);

View File

@ -8,6 +8,7 @@
#include <Processors/Transforms/ExtremesTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/NullSource.h>
#include <Columns/ColumnConst.h>
namespace DB
{
@ -250,6 +251,47 @@ static Pipes removeEmptyPipes(Pipes pipes)
return res;
}
/// Calculate common header for pipes.
/// This function is needed only to remove ColumnConst from common header in case if some columns are const, and some not.
/// E.g. if the first header is `x, const y, const z` and the second is `const x, y, const z`, the common header will be `x, y, const z`.
static Block getCommonHeader(const Pipes & pipes)
{
Block res;
for (const auto & pipe : pipes)
{
if (const auto & header = pipe.getHeader())
{
res = header;
break;
}
}
for (const auto & pipe : pipes)
{
const auto & header = pipe.getHeader();
for (size_t i = 0; i < res.columns(); ++i)
{
/// We do not check that headers are compatible here. Will do it later.
if (i >= header.columns())
break;
auto & common = res.getByPosition(i).column;
const auto & cur = header.getByPosition(i).column;
/// Only remove const from common header if it is not const for current pipe.
if (cur && common && !isColumnConst(*cur))
{
if (const auto * column_const = typeid_cast<const ColumnConst *>(common.get()))
common = column_const->getDataColumnPtr();
}
}
}
return res;
}
Pipe Pipe::unitePipes(Pipes pipes)
{
return Pipe::unitePipes(std::move(pipes), nullptr, false);
@ -276,23 +318,12 @@ Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors, bool allow
OutputPortRawPtrs totals;
OutputPortRawPtrs extremes;
res.collected_processors = collected_processors;
res.header = pipes.front().header;
if (allow_empty_header && !res.header)
{
for (const auto & pipe : pipes)
{
if (const auto & header = pipe.getHeader())
{
res.header = header;
break;
}
}
}
res.header = getCommonHeader(pipes);
for (auto & pipe : pipes)
{
if (!allow_empty_header || pipe.header)
assertBlocksHaveEqualStructure(res.header, pipe.header, "Pipe::unitePipes");
assertCompatibleHeader(pipe.header, res.header, "Pipe::unitePipes");
res.processors.insert(res.processors.end(), pipe.processors.begin(), pipe.processors.end());
res.output_ports.insert(res.output_ports.end(), pipe.output_ports.begin(), pipe.output_ports.end());

View File

@ -16,7 +16,7 @@ void connect(OutputPort & output, InputPort & input)
auto out_name = output.getProcessor().getName();
auto in_name = input.getProcessor().getName();
assertBlocksHaveEqualStructure(input.getHeader(), output.getHeader(), " function connect between " + out_name + " and " + in_name);
assertCompatibleHeader(output.getHeader(), input.getHeader(), " function connect between " + out_name + " and " + in_name);
input.output_port = &output;
output.input_port = &input;

View File

@ -232,8 +232,6 @@ QueryPipeline QueryPipeline::unitePipelines(
pipeline.checkInitialized();
pipeline.pipe.collected_processors = collected_processors;
assertBlocksHaveEqualStructure(pipeline.getHeader(), common_header, "QueryPipeline::unitePipelines");
pipes.emplace_back(std::move(pipeline.pipe));
max_threads += pipeline.max_threads;

View File

@ -0,0 +1,20 @@
drop table if exists t0;
CREATE TABLE t0 (c0 String) ENGINE = Log();
SELECT isNull(t0.c0) OR COUNT('\n?pVa')
FROM t0
GROUP BY t0.c0
HAVING isNull(t0.c0)
UNION ALL
SELECT isNull(t0.c0) OR COUNT('\n?pVa')
FROM t0
GROUP BY t0.c0
HAVING NOT isNull(t0.c0)
UNION ALL
SELECT isNull(t0.c0) OR COUNT('\n?pVa')
FROM t0
GROUP BY t0.c0
HAVING isNull(isNull(t0.c0))
SETTINGS aggregate_functions_null_for_empty = 1, enable_optimize_predicate_expression = 0;
drop table if exists t0;