#include #include #include #include #include #include using namespace DB; TEST(Processors, PortsConnected) { auto col = ColumnUInt8::create(1, 1); Columns columns; columns.emplace_back(std::move(col)); Chunk chunk(std::move(columns), 1); Block header = {ColumnWithTypeAndName(ColumnUInt8::create(), std::make_shared(), "x")}; auto source = std::make_shared(std::move(header), std::move(chunk)); auto sink = std::make_shared(source->getPort().getHeader()); connect(source->getPort(), sink->getPort()); Processors processors; processors.emplace_back(std::move(source)); processors.emplace_back(std::move(sink)); PipelineExecutor executor(processors); executor.execute(1); } TEST(Processors, PortsNotConnected) { auto col = ColumnUInt8::create(1, 1); Columns columns; columns.emplace_back(std::move(col)); Chunk chunk(std::move(columns), 1); Block header = {ColumnWithTypeAndName(ColumnUInt8::create(), std::make_shared(), "x")}; auto source = std::make_shared(std::move(header), std::move(chunk)); auto sink = std::make_shared(source->getPort().getHeader()); /// connect(source->getPort(), sink->getPort()); Processors processors; processors.emplace_back(std::move(source)); processors.emplace_back(std::move(sink)); PipelineExecutor executor(processors); auto exec = [&]() { try { executor.execute(1); } catch (DB::Exception & e) { std::cout << e.displayText() << std::endl; ASSERT_TRUE(e.displayText().find("pipeline") != std::string::npos); } }; exec(); }