ClickHouse/dbms/src/Processors/LimitTransform.cpp

144 lines
3.3 KiB
C++
Raw Normal View History

#include <Processors/LimitTransform.h>
namespace DB
{
2019-04-09 10:17:25 +00:00
LimitTransform::LimitTransform(
2019-04-11 14:51:25 +00:00
const Block & header, size_t limit, size_t offset,
2019-04-09 10:17:25 +00:00
bool always_read_till_end, bool do_count_rows_before_limit)
2019-04-11 14:51:25 +00:00
: IProcessor({header}, {header})
2019-04-09 10:17:25 +00:00
, input(inputs.front()), output(outputs.front())
, limit(limit), offset(offset)
, always_read_till_end(always_read_till_end)
, do_count_rows_before_limit(do_count_rows_before_limit)
{
}
LimitTransform::Status LimitTransform::prepare()
{
2019-02-07 18:51:53 +00:00
/// Check can output.
if (output.isFinished())
{
2019-02-07 18:51:53 +00:00
input.close();
return Status::Finished;
}
2019-02-07 18:51:53 +00:00
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// Push block if can.
if (has_block && block_processed)
2019-02-07 18:51:53 +00:00
{
output.push(std::move(current_chunk));
2019-02-07 18:51:53 +00:00
has_block = false;
block_processed = false;
}
2019-02-07 18:51:53 +00:00
/// Check if we are done with pushing.
bool pushing_is_finished = rows_read >= offset + limit;
if (pushing_is_finished)
{
2019-02-07 18:51:53 +00:00
output.finish();
if (!always_read_till_end)
{
2019-02-07 18:51:53 +00:00
input.close();
return Status::Finished;
}
}
2019-02-07 18:51:53 +00:00
/// Check can input.
2019-02-07 18:51:53 +00:00
if (input.isFinished())
{
output.finish();
return Status::Finished;
}
2019-02-07 18:51:53 +00:00
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
current_chunk = input.pull();
2019-02-07 18:51:53 +00:00
has_block = true;
2019-04-08 14:55:20 +00:00
auto rows = current_chunk.getNumRows();
2019-04-09 10:17:25 +00:00
if (do_count_rows_before_limit)
rows_before_limit_at_least += rows;
2019-04-08 14:55:20 +00:00
2019-02-07 18:51:53 +00:00
/// Skip block (for 'always_read_till_end' case).
if (pushing_is_finished)
{
current_chunk.clear();
2019-02-07 18:51:53 +00:00
has_block = false;
/// Now, we pulled from input, and it must be empty.
return Status::NeedData;
}
2019-02-07 18:51:53 +00:00
/// Process block.
rows_read += rows;
if (rows_read <= offset)
{
current_chunk.clear();
2019-02-07 18:51:53 +00:00
has_block = false;
/// Now, we pulled from input, and it must be empty.
return Status::NeedData;
}
2019-02-07 18:51:53 +00:00
/// Return the whole block.
if (rows_read >= offset + rows && rows_read <= offset + limit)
{
if (output.hasData())
return Status::PortFull;
output.push(std::move(current_chunk));
2019-02-07 18:51:53 +00:00
has_block = false;
return Status::NeedData;
}
/// No more data is needed.
if (rows_read >= offset + limit)
input.close();
return Status::Ready;
}
void LimitTransform::work()
{
size_t num_rows = current_chunk.getNumRows();
size_t num_columns = current_chunk.getNumColumns();
/// return a piece of the block
size_t start = std::max(
static_cast<Int64>(0),
static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows));
size_t length = std::min(
static_cast<Int64>(limit), std::min(
static_cast<Int64>(rows_read) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows)));
auto columns = current_chunk.detachColumns();
for (size_t i = 0; i < num_columns; ++i)
columns[i] = columns[i]->cut(start, length);
current_chunk.setColumns(std::move(columns), length);
2019-02-07 18:51:53 +00:00
block_processed = true;
}
}