Fix limit transfrom prepare.

This commit is contained in:
Nikolai Kochetov 2019-04-12 17:43:28 +03:00
parent a8fdddfc88
commit 5aa3d98ea2

View File

@ -18,30 +18,35 @@ LimitTransform::LimitTransform(
LimitTransform::Status LimitTransform::prepare() LimitTransform::Status LimitTransform::prepare()
{ {
/// Check if we are done with pushing.
bool pushing_is_finished = rows_read >= offset + limit;
/// Check can output. /// Check can output.
if (output.isFinished()) if (output.isFinished())
{ {
input.close(); pushing_is_finished = true;
return Status::Finished; if (!always_read_till_end)
{
input.close();
return Status::Finished;
}
} }
if (!output.canPush()) if (!pushing_is_finished && !output.canPush())
{ {
input.setNotNeeded(); input.setNotNeeded();
return Status::PortFull; return Status::PortFull;
} }
/// Push block if can. /// Push block if can.
if (has_block && block_processed) if (!pushing_is_finished && has_block && block_processed)
{ {
output.push(std::move(current_chunk)); output.push(std::move(current_chunk));
has_block = false; has_block = false;
block_processed = false; block_processed = false;
} }
/// Check if we are done with pushing.
bool pushing_is_finished = rows_read >= offset + limit;
if (pushing_is_finished) if (pushing_is_finished)
{ {
if (!always_read_till_end) if (!always_read_till_end)
@ -77,6 +82,12 @@ LimitTransform::Status LimitTransform::prepare()
current_chunk.clear(); current_chunk.clear();
has_block = false; has_block = false;
if (input.isFinished())
{
output.finish();
return Status::Finished;
}
/// Now, we pulled from input, and it must be empty. /// Now, we pulled from input, and it must be empty.
return Status::NeedData; return Status::NeedData;
} }
@ -90,6 +101,12 @@ LimitTransform::Status LimitTransform::prepare()
current_chunk.clear(); current_chunk.clear();
has_block = false; has_block = false;
if (input.isFinished())
{
output.finish();
return Status::Finished;
}
/// Now, we pulled from input, and it must be empty. /// Now, we pulled from input, and it must be empty.
return Status::NeedData; return Status::NeedData;
} }
@ -103,6 +120,12 @@ LimitTransform::Status LimitTransform::prepare()
output.push(std::move(current_chunk)); output.push(std::move(current_chunk));
has_block = false; has_block = false;
if (input.isFinished())
{
output.finish();
return Status::Finished;
}
return Status::NeedData; return Status::NeedData;
} }