This commit is contained in:
Nikolai Kochetov 2021-04-20 14:55:23 +03:00
parent adc103e338
commit 5e589ca6b2
5 changed files with 73 additions and 20 deletions

View File

@ -484,7 +484,7 @@ DataTypes Block::getDataTypes() const
template <typename ReturnType>
static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, const std::string & context_description)
static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, const std::string & context_description, bool allow_remove_constants)
{
auto on_error = [](const std::string & message [[maybe_unused]], int code [[maybe_unused]])
{
@ -515,7 +515,16 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons
if (!actual.column || !expected.column)
continue;
if (actual.column->getName() != expected.column->getName())
const IColumn * actual_column = actual.column.get();
/// If we allow to remove constants, and expected column is not const, then unwrap actual constant column.
if (allow_remove_constants && !isColumnConst(*expected.column))
{
if (const auto * column_const = typeid_cast<const ColumnConst *>(actual_column))
actual_column = &column_const->getDataColumn();
}
if (actual_column->getName() != expected.column->getName())
return on_error("Block structure mismatch in " + context_description + " stream: different columns:\n"
+ lhs.dumpStructure() + "\n" + rhs.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
@ -537,13 +546,25 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
{
return checkBlockStructure<bool>(lhs, rhs, {});
return checkBlockStructure<bool>(lhs, rhs, {}, false);
}
void assertBlocksHaveEqualStructure(const Block & lhs, const Block & rhs, const std::string & context_description)
{
checkBlockStructure<void>(lhs, rhs, context_description);
checkBlockStructure<void>(lhs, rhs, context_description, false);
}
bool isCompatibleHeader(const Block & actual, const Block & desired)
{
return checkBlockStructure<bool>(actual, desired, {}, true);
}
void assertCompatibleHeader(const Block & actual, const Block & desired, const std::string & context_description)
{
checkBlockStructure<void>(actual, desired, context_description, true);
}

View File

@ -184,6 +184,12 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs);
/// Throw exception when blocks are different.
void assertBlocksHaveEqualStructure(const Block & lhs, const Block & rhs, const std::string & context_description);
/// Actual header is compatible to desired if block have equal structure except constants.
/// It is allowed when column from actual header is constant, but in desired is not.
/// If both columns are constant, it is checked that they have the same value.
bool isCompatibleHeader(const Block & actual, const Block & desired);
void assertCompatibleHeader(const Block & actual, const Block & desired, const std::string & context_description);
/// Calculate difference in structure of blocks and write description into output strings. NOTE It doesn't compare values of constant columns.
void getBlocksDifference(const Block & lhs, const Block & rhs, std::string & out_lhs_diff, std::string & out_rhs_diff);

View File

@ -8,6 +8,7 @@
#include <Processors/Transforms/ExtremesTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/NullSource.h>
#include <Columns/ColumnConst.h>
namespace DB
{
@ -250,6 +251,44 @@ static Pipes removeEmptyPipes(Pipes pipes)
return res;
}
static Block getCommonHeader(const Pipes & pipes)
{
Block res;
for (const auto & pipe : pipes)
{
if (const auto & header = pipe.getHeader())
{
res = header;
break;
}
}
for (const auto & pipe : pipes)
{
const auto & header = pipe.getHeader();
for (size_t i = 0; i < res.columns(); ++i)
{
/// We do not check that headers are compatible here. Will do it later.
if (i >= header.columns())
break;
auto & common = res.getByPosition(i).column;
const auto & cur = header.getByPosition(i).column;
/// Only remove const from common header if it is not const for current pipe.
if (cur && common && !isColumnConst(*cur))
{
if (const auto * column_const = typeid_cast<const ColumnConst *>(common.get()))
common = column_const->getDataColumnPtr();
}
}
}
return res;
}
Pipe Pipe::unitePipes(Pipes pipes)
{
return Pipe::unitePipes(std::move(pipes), nullptr, false);
@ -276,23 +315,12 @@ Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors, bool allow
OutputPortRawPtrs totals;
OutputPortRawPtrs extremes;
res.collected_processors = collected_processors;
res.header = pipes.front().header;
if (allow_empty_header && !res.header)
{
for (const auto & pipe : pipes)
{
if (const auto & header = pipe.getHeader())
{
res.header = header;
break;
}
}
}
res.header = getCommonHeader(pipes);
for (auto & pipe : pipes)
{
if (!allow_empty_header || pipe.header)
assertBlocksHaveEqualStructure(res.header, pipe.header, "Pipe::unitePipes");
assertCompatibleHeader(pipe.header, res.header, "Pipe::unitePipes");
res.processors.insert(res.processors.end(), pipe.processors.begin(), pipe.processors.end());
res.output_ports.insert(res.output_ports.end(), pipe.output_ports.begin(), pipe.output_ports.end());

View File

@ -16,7 +16,7 @@ void connect(OutputPort & output, InputPort & input)
auto out_name = output.getProcessor().getName();
auto in_name = input.getProcessor().getName();
assertBlocksHaveEqualStructure(input.getHeader(), output.getHeader(), " function connect between " + out_name + " and " + in_name);
assertCompatibleHeader(output.getHeader(), input.getHeader(), " function connect between " + out_name + " and " + in_name);
input.output_port = &output;
output.input_port = &input;

View File

@ -232,8 +232,6 @@ QueryPipeline QueryPipeline::unitePipelines(
pipeline.checkInitialized();
pipeline.pipe.collected_processors = collected_processors;
assertBlocksHaveEqualStructure(pipeline.getHeader(), common_header, "QueryPipeline::unitePipelines");
pipes.emplace_back(std::move(pipeline.pipe));
max_threads += pipeline.max_threads;