Merge pull request #15857 from amosbird/e2

Refactor processors.
This commit is contained in:
Nikolai Kochetov 2020-10-15 10:45:00 +03:00 committed by GitHub
commit e8038ae84f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 57 additions and 45 deletions

View File

@ -22,9 +22,9 @@
#include <Interpreters/ClientInfo.h>
#include <Compression/CompressionFactory.h>
#include <Processors/Pipe.h>
#include <Processors/QueryPipeline.h>
#include <Processors/ISink.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/ConcatProcessor.h>
#include <pcg_random.hpp>
#if !defined(ARCADIA_BUILD)
@ -646,16 +646,17 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
PipelineExecutorPtr executor;
auto on_cancel = [& executor]() { executor->cancel(); };
if (elem->pipe->numOutputPorts() > 1)
elem->pipe->addTransform(std::make_shared<ConcatProcessor>(elem->pipe->getHeader(), elem->pipe->numOutputPorts()));
auto sink = std::make_shared<ExternalTableDataSink>(elem->pipe->getHeader(), *this, *elem, std::move(on_cancel));
DB::connect(*elem->pipe->getOutputPort(0), sink->getPort());
auto processors = Pipe::detachProcessors(std::move(*elem->pipe));
processors.push_back(sink);
executor = std::make_shared<PipelineExecutor>(processors);
QueryPipeline pipeline;
pipeline.init(std::move(*elem->pipe));
pipeline.resize(1);
auto sink = std::make_shared<ExternalTableDataSink>(pipeline.getHeader(), *this, *elem, std::move(on_cancel));
pipeline.setSinks([&](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr
{
if (type != QueryPipeline::StreamType::Main)
return nullptr;
return sink;
});
executor = pipeline.execute();
executor->execute(/*num_threads = */ 1);
auto read_rows = sink->getNumReadRows();

View File

@ -11,7 +11,6 @@
#include <Processors/Pipe.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/ConcatProcessor.h>
#include <Core/ExternalTable.h>
#include <Poco/Net/MessageHeader.h>
#include <common/find_symbols.h>
@ -171,8 +170,7 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
BlockOutputStreamPtr output = storage->write(ASTPtr(), storage->getInMemoryMetadataPtr(), context);
/// Write data
if (data->pipe->numOutputPorts() > 1)
data->pipe->addTransform(std::make_shared<ConcatProcessor>(data->pipe->getHeader(), data->pipe->numOutputPorts()));
data->pipe->resize(1);
auto sink = std::make_shared<SinkToOutputStream>(std::move(output));
connect(*data->pipe->getOutputPort(0), sink->getPort());

View File

@ -8,7 +8,7 @@
#include <common/logger_useful.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/IBlockInputStream.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>

View File

@ -30,7 +30,7 @@ IOutputFormat::Status IOutputFormat::prepare()
if (!input.hasData())
return Status::NeedData;
current_chunk = input.pull();
current_chunk = input.pull(true);
current_block_kind = kind;
has_input = true;
return Status::Ready;

View File

@ -24,7 +24,7 @@ ISink::Status ISink::prepare()
if (!input.hasData())
return Status::NeedData;
current_chunk = input.pull();
current_chunk = input.pull(true);
has_input = true;
return Status::Ready;
}

View File

@ -6,19 +6,19 @@ namespace DB
{
/// Sink which closes input port and reads nothing.
class NullSink : public IProcessor
class NullSink : public ISink
{
public:
explicit NullSink(Block header) : IProcessor({std::move(header)}, {}) {}
explicit NullSink(Block header) : ISink(std::move(header)) {}
String getName() const override { return "NullSink"; }
Status prepare() override
{
inputs.front().close();
input.close();
return Status::Finished;
}
InputPort & getPort() { return inputs.front(); }
protected:
void consume(Chunk) override {}
};
/// Sink which reads everything and do nothing with it.

View File

@ -620,6 +620,24 @@ void Pipe::addSimpleTransform(const ProcessorGetter & getter)
addSimpleTransform([&](const Block & stream_header, StreamType) { return getter(stream_header); });
}
void Pipe::resize(size_t num_streams, bool force, bool strict)
{
if (output_ports.empty())
throw Exception("Cannot resize an empty Pipe.", ErrorCodes::LOGICAL_ERROR);
if (!force && num_streams == numOutputPorts())
return;
ProcessorPtr resize;
if (strict)
resize = std::make_shared<StrictResizeProcessor>(getHeader(), numOutputPorts(), num_streams);
else
resize = std::make_shared<ResizeProcessor>(getHeader(), numOutputPorts(), num_streams);
addTransform(std::move(resize));
}
void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
{
if (output_ports.empty())

View File

@ -82,6 +82,9 @@ public:
void addSimpleTransform(const ProcessorGetter & getter);
void addSimpleTransform(const ProcessorGetterWithStreamKind & getter);
/// Changes the number of output ports if needed. Adds ResizeTransform.
void resize(size_t num_streams, bool force = false, bool strict = false);
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform Pipe in general way.

View File

@ -378,7 +378,7 @@ public:
/// * If you finish port, it isFinished().
/// * If port isFinished(), you can do nothing with it.
/// * If port not isNeeded(), you can only finish() it.
/// * You can hush only if port doesn't hasData().
/// * You can push only if port doesn't hasData().
class OutputPort : public Port
{
friend void connect(OutputPort &, InputPort &);

View File

@ -1,7 +1,6 @@
#include <Processors/QueryPipeline.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/LimitTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <Processors/Transforms/ExtremesTransform.h>
@ -124,18 +123,7 @@ void QueryPipeline::addMergingAggregatedMemoryEfficientTransform(AggregatingTran
void QueryPipeline::resize(size_t num_streams, bool force, bool strict)
{
checkInitializedAndNotCompleted();
if (!force && num_streams == getNumStreams())
return;
ProcessorPtr resize;
if (strict)
resize = std::make_shared<StrictResizeProcessor>(getHeader(), getNumStreams(), num_streams);
else
resize = std::make_shared<ResizeProcessor>(getHeader(), getNumStreams(), num_streams);
pipe.addTransform(std::move(resize));
pipe.resize(num_streams, force, strict);
}
void QueryPipeline::addTotalsHavingTransform(ProcessorPtr transform)

View File

@ -76,7 +76,7 @@ public:
void addMergingAggregatedMemoryEfficientTransform(AggregatingTransformParamsPtr params, size_t num_merging_processors);
/// Changes the number of input ports if needed. Adds ResizeTransform.
/// Changes the number of output ports if needed. Adds ResizeTransform.
void resize(size_t num_streams, bool force = false, bool strict = false);
/// Unite several pipelines together. Result pipeline would have common_header structure.

View File

@ -30,12 +30,12 @@ DelayedSource::DelayedSource(const Block & header, Creator processors_creator, b
IProcessor::Status DelayedSource::prepare()
{
/// At first, wait for main input is needed and expand pipeline.
/// At first, wait for main output is needed and expand pipeline.
if (inputs.empty())
{
auto & first_output = outputs.front();
/// If main port was finished before callback was called, stop execution.
/// If main output port was finished before callback was called, stop execution.
if (first_output.isFinished())
{
for (auto & output : outputs)
@ -75,7 +75,7 @@ IProcessor::Status DelayedSource::prepare()
input->setNeeded();
if (!input->hasData())
return Status::PortFull;
return Status::NeedData;
output->pushData(input->pullData(true));
return Status::PortFull;
@ -122,8 +122,7 @@ void DelayedSource::work()
return;
}
if (pipe.numOutputPorts() > 1)
pipe.addTransform(std::make_shared<ResizeProcessor>(header, pipe.numOutputPorts(), 1));
pipe.resize(1);
main_output = pipe.getOutputPort(0);
totals_output = pipe.getTotalsPort();

View File

@ -113,7 +113,7 @@ void FillingTransform::transform(Chunk & chunk)
if (generate_suffix)
{
const auto & empty_columns = inputs.front().getHeader().getColumns();
const auto & empty_columns = input.getHeader().getColumns();
init_columns_by_positions(empty_columns, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(empty_columns, old_other_columns, res_other_columns, other_column_positions);

View File

@ -511,7 +511,7 @@ void addMergingAggregatedMemoryEfficientTransform(
/// --> GroupingAggregated --> ResizeProcessor --> MergingAggregatedBucket --> SortingAggregated -->
/// --> --> MergingAggregatedBucket -->
pipe.addTransform(std::make_shared<ResizeProcessor>(Block(), 1, num_merging_processors));
pipe.resize(num_merging_processors);
pipe.addSimpleTransform([params](const Block &)
{

View File

@ -21,7 +21,6 @@ namespace DB
*/
class StorageMemory final : public ext::shared_ptr_helper<StorageMemory>, public IStorage
{
friend class MemoryBlockInputStream;
friend class MemoryBlockOutputStream;
friend struct ext::shared_ptr_helper<StorageMemory>;

View File

@ -250,6 +250,8 @@ Pipe StorageMerge::read(
auto pipe = Pipe::unitePipes(std::move(pipes));
if (!pipe.empty())
// It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time.
// Using narrowPipe instead.
narrowPipe(pipe, num_streams);
return pipe;
@ -324,6 +326,8 @@ Pipe StorageMerge::createSources(
if (!pipe.empty())
{
if (concat_streams && pipe.numOutputPorts() > 1)
// It's possible to have many tables read from merge, resize(1) might open too many files at the same time.
// Using concat instead.
pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts()));
if (has_table_virtual_column)

View File

@ -320,6 +320,8 @@ Pipe StorageS3::read(
key));
auto pipe = Pipe::unitePipes(std::move(pipes));
// It's possible to have many buckets read from s3, resize(num_streams) might open too many handles at the same time.
// Using narrowPipe instead.
narrowPipe(pipe, num_streams);
return pipe;
}