Print pipeline on PipelineExecutor init exception.

This commit is contained in:
Nikolai Kochetov 2020-03-25 19:02:51 +03:00
parent 778b95797f
commit 0b1ad7f7c8
3 changed files with 82 additions and 1 deletions

View File

@ -40,7 +40,21 @@ PipelineExecutor::PipelineExecutor(Processors & processors_, QueryStatus * elem)
, expand_pipeline_task(nullptr)
, process_list_element(elem)
{
buildGraph();
try
{
buildGraph();
}
catch (Exception & exception)
{
/// If exception was thrown while pipeline initialization, it means that query pipeline was not build correctly.
/// It is logical error, and we need more information about pipeline.
WriteBufferFromOwnString buf;
printPipeline(processors, buf);
buf.finalize();
exception.addMessage("Query pipeline:\n" + buf.str());
throw;
}
}
bool PipelineExecutor::addEdges(UInt64 node)

View File

@ -42,6 +42,9 @@ void printPipeline(const Processors & processors, const Statuses & statuses, Wri
{
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & curr = *processor;
const IProcessor & next = port.getInputPort().getProcessor();

View File

@ -0,0 +1,64 @@
#include <gtest/gtest.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/NullSink.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
using namespace DB;
TEST(Processors, PortsConnected)
{
auto col = ColumnUInt8::create(1, 1);
Columns columns;
columns.emplace_back(std::move(col));
Chunk chunk(std::move(columns), 1);
Block header = {ColumnWithTypeAndName(ColumnUInt8::create(), std::make_shared<DataTypeUInt8>(), "x")};
auto source = std::make_shared<SourceFromSingleChunk>(std::move(header), std::move(chunk));
auto sink = std::make_shared<NullSink>(source->getPort().getHeader());
connect(source->getPort(), sink->getPort());
Processors processors;
processors.emplace_back(std::move(source));
processors.emplace_back(std::move(sink));
PipelineExecutor executor(processors);
executor.execute(1);
}
TEST(Processors, PortsNotConnected)
{
auto col = ColumnUInt8::create(1, 1);
Columns columns;
columns.emplace_back(std::move(col));
Chunk chunk(std::move(columns), 1);
Block header = {ColumnWithTypeAndName(ColumnUInt8::create(), std::make_shared<DataTypeUInt8>(), "x")};
auto source = std::make_shared<SourceFromSingleChunk>(std::move(header), std::move(chunk));
auto sink = std::make_shared<NullSink>(source->getPort().getHeader());
/// connect(source->getPort(), sink->getPort());
Processors processors;
processors.emplace_back(std::move(source));
processors.emplace_back(std::move(sink));
PipelineExecutor executor(processors);
ASSERT_THROW(
try
{
executor.execute(1);
}
catch (DB::Exception & e)
{
std::cout << e.displayText() << std::endl;
throw;
}, DB::Exception);
}