Update LimitTransform.

This commit is contained in:
Nikolai Kochetov 2020-03-12 15:49:42 +03:00
parent aa72aa540c
commit 560576e1c9
2 changed files with 78 additions and 0 deletions

View File

@ -263,5 +263,64 @@ bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort
return true; return true;
} }
LimitReachedCheckingTransform::LimitReachedCheckingTransform(
const Block & header_, size_t num_streams,
size_t limit, size_t offset, LimitTransform::LimitStatePtr limit_state_)
: IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_))
, max_total_rows_to_read(limit + offset), limit_state(std::move(limit_state_))
{
input_ports.reserve(num_streams);
output_ports.reserve(num_streams);
for (auto & input : inputs)
input_ports.emplace_back(&input);
for (auto & output : outputs)
output_ports.emplace_back(&output);
}
static IProcessor::Status processPair(InputPort * input, OutputPort * output)
{
/// Check can output.
if (output->isFinished())
{
input->close();
return IProcessor::Status::Finished;
}
if (!output->canPush())
return IProcessor::Status::PortFull;
/// Check input has data..
if (input->isFinished())
{
output->finish();
return IProcessor::Status::Finished;
}
input->setNeeded();
if (input->hasData())
output->push(input->pull(true));
/// Input can be finished after pull. check it again.
if (input->isFinished())
{
output->finish();
return IProcessor::Status::Finished;
}
return IProcessor::Status::PortFull;
}
IProcessor::Status LimitReachedCheckingTransform::prepare(
const PortNumbers & updated_input_ports,
const PortNumbers & updated_output_ports)
{
}
} }

View File

@ -61,4 +61,23 @@ public:
UInt64 getRowsBeforeLimitAtLeast() const { return rows_before_limit_at_least; } UInt64 getRowsBeforeLimitAtLeast() const { return rows_before_limit_at_least; }
}; };
class LimitReachedCheckingTransform : public IProcessor
{
public:
LimitReachedCheckingTransform(
const Block & header_, size_t num_streams,
size_t limit, size_t offset, LimitTransform::LimitStatePtr limit_state_);
String getName() const override { return "Limit"; }
Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) override;
private:
size_t max_total_rows_to_read;
LimitTransform::LimitStatePtr limit_state;
std::vector<InputPort *> input_ports;
std::vector<OutputPort *> output_ports;
};
} }