2018-05-23 20:19:33 +00:00
|
|
|
#include <Processors/LimitTransform.h>
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2019-04-09 10:17:25 +00:00
|
|
|
LimitTransform::LimitTransform(
|
2020-03-12 15:29:35 +00:00
|
|
|
const Block & header_, size_t limit_, size_t offset_, size_t num_streams,
|
2019-08-27 17:48:42 +00:00
|
|
|
bool always_read_till_end_, bool with_ties_,
|
2020-03-11 12:19:30 +00:00
|
|
|
SortDescription description_)
|
2019-08-03 11:02:40 +00:00
|
|
|
: IProcessor({header_}, {header_})
|
2020-03-12 15:29:35 +00:00
|
|
|
, limit(limit_), offset(offset_)
|
2019-08-03 11:02:40 +00:00
|
|
|
, always_read_till_end(always_read_till_end_)
|
2020-03-11 12:19:30 +00:00
|
|
|
, with_ties(with_ties_), description(std::move(description_))
|
2018-05-23 20:19:33 +00:00
|
|
|
{
|
2020-03-12 15:29:35 +00:00
|
|
|
input_ports.reserve(num_streams);
|
|
|
|
output_ports.reserve(num_streams);
|
|
|
|
|
|
|
|
for (auto & input : inputs)
|
|
|
|
input_ports.emplace_back(&input);
|
|
|
|
|
|
|
|
for (auto & output : outputs)
|
|
|
|
output_ports.emplace_back(&output);
|
|
|
|
|
2019-08-27 17:48:42 +00:00
|
|
|
for (const auto & desc : description)
|
|
|
|
{
|
|
|
|
if (!desc.column_name.empty())
|
|
|
|
sort_column_positions.push_back(header_.getPositionByName(desc.column_name));
|
|
|
|
else
|
|
|
|
sort_column_positions.push_back(desc.column_number);
|
|
|
|
}
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
2020-03-02 12:56:47 +00:00
|
|
|
Chunk LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, size_t row) const
|
2020-02-28 20:20:39 +00:00
|
|
|
{
|
|
|
|
assert(row < chunk.getNumRows());
|
2020-03-02 12:56:47 +00:00
|
|
|
ColumnRawPtrs current_columns = extractSortColumns(chunk.getColumns());
|
|
|
|
MutableColumns last_row_sort_columns;
|
2020-02-28 20:20:39 +00:00
|
|
|
for (size_t i = 0; i < current_columns.size(); ++i)
|
2020-03-02 12:56:47 +00:00
|
|
|
{
|
|
|
|
last_row_sort_columns.emplace_back(current_columns[i]->cloneEmpty());
|
|
|
|
last_row_sort_columns[i]->insertFrom(*current_columns[i], row);
|
|
|
|
}
|
|
|
|
return Chunk(std::move(last_row_sort_columns), 1);
|
2020-02-28 20:20:39 +00:00
|
|
|
}
|
|
|
|
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2020-03-12 15:29:35 +00:00
|
|
|
IProcessor::Status LimitTransform::prepare(
|
|
|
|
const PortNumbers & updated_input_ports,
|
|
|
|
const PortNumbers & updated_output_ports)
|
|
|
|
{
|
|
|
|
bool has_full_port = false;
|
|
|
|
|
|
|
|
auto process_pair = [&](size_t pos)
|
|
|
|
{
|
|
|
|
auto status = preparePair(*input_ports[pos], *output_ports[pos]);
|
|
|
|
|
|
|
|
switch (status)
|
|
|
|
{
|
|
|
|
case IProcessor::Status::Finished:
|
|
|
|
{
|
|
|
|
if (!is_port_pair_finished[pos])
|
|
|
|
{
|
|
|
|
is_port_pair_finished[pos] = true;
|
|
|
|
++num_finished_port_pairs;
|
|
|
|
}
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
case IProcessor::Status::PortFull:
|
|
|
|
{
|
|
|
|
has_full_port = true;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
case IProcessor::Status::NeedData:
|
|
|
|
return;
|
|
|
|
default:
|
|
|
|
throw Exception(
|
|
|
|
"Unexpected status for LimitTransform::preparePair : " + IProcessor::statusToName(status),
|
|
|
|
ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
__builtin_unreachable();
|
|
|
|
};
|
|
|
|
|
|
|
|
for (auto pos : updated_input_ports)
|
|
|
|
process_pair(pos);
|
|
|
|
|
|
|
|
for (auto pos : updated_output_ports)
|
|
|
|
process_pair(pos);
|
|
|
|
|
|
|
|
if (num_finished_port_pairs == input_ports.size())
|
|
|
|
return Status::Finished;
|
|
|
|
|
|
|
|
if (has_full_port)
|
|
|
|
return Status::PortFull;
|
|
|
|
|
|
|
|
return Status::NeedData;
|
|
|
|
}
|
|
|
|
|
|
|
|
LimitTransform::Status LimitTransform::preparePair(InputPort & input, OutputPort & output)
|
2018-05-23 20:19:33 +00:00
|
|
|
{
|
2019-04-12 14:59:31 +00:00
|
|
|
/// Check can output.
|
|
|
|
bool output_finished = false;
|
2019-02-07 18:51:53 +00:00
|
|
|
if (output.isFinished())
|
2018-05-23 20:19:33 +00:00
|
|
|
{
|
2019-04-12 14:59:31 +00:00
|
|
|
output_finished = true;
|
2019-04-12 14:43:28 +00:00
|
|
|
if (!always_read_till_end)
|
|
|
|
{
|
|
|
|
input.close();
|
|
|
|
return Status::Finished;
|
|
|
|
}
|
2019-02-07 18:51:53 +00:00
|
|
|
}
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-04-12 14:59:31 +00:00
|
|
|
if (!output_finished && !output.canPush())
|
2019-02-07 18:51:53 +00:00
|
|
|
{
|
|
|
|
input.setNotNeeded();
|
|
|
|
return Status::PortFull;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Push block if can.
|
2019-04-12 14:59:31 +00:00
|
|
|
if (!output_finished && has_block && block_processed)
|
2019-02-07 18:51:53 +00:00
|
|
|
{
|
2019-02-18 16:36:07 +00:00
|
|
|
output.push(std::move(current_chunk));
|
2019-02-07 18:51:53 +00:00
|
|
|
has_block = false;
|
|
|
|
block_processed = false;
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
2019-04-12 14:59:31 +00:00
|
|
|
/// Check if we are done with pushing.
|
2020-03-11 13:17:50 +00:00
|
|
|
bool is_limit_reached = (rows_read >= offset + limit) && !previous_row_chunk;
|
|
|
|
if (is_limit_reached)
|
2018-05-23 20:19:33 +00:00
|
|
|
{
|
|
|
|
if (!always_read_till_end)
|
|
|
|
{
|
2019-04-12 14:22:49 +00:00
|
|
|
output.finish();
|
2019-02-07 18:51:53 +00:00
|
|
|
input.close();
|
2018-05-23 20:19:33 +00:00
|
|
|
return Status::Finished;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Check can input.
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
if (input.isFinished())
|
|
|
|
{
|
|
|
|
output.finish();
|
|
|
|
return Status::Finished;
|
|
|
|
}
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
input.setNeeded();
|
2018-05-23 20:19:33 +00:00
|
|
|
if (!input.hasData())
|
|
|
|
return Status::NeedData;
|
|
|
|
|
2020-01-28 11:50:22 +00:00
|
|
|
current_chunk = input.pull(true);
|
2019-02-07 18:51:53 +00:00
|
|
|
has_block = true;
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-04-08 14:55:20 +00:00
|
|
|
auto rows = current_chunk.getNumRows();
|
2019-04-30 17:01:12 +00:00
|
|
|
rows_before_limit_at_least += rows;
|
2019-04-08 14:55:20 +00:00
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Skip block (for 'always_read_till_end' case).
|
2020-03-11 13:17:50 +00:00
|
|
|
if (is_limit_reached)
|
2018-05-23 20:19:33 +00:00
|
|
|
{
|
2019-02-18 16:36:07 +00:00
|
|
|
current_chunk.clear();
|
2019-02-07 18:51:53 +00:00
|
|
|
has_block = false;
|
|
|
|
|
2020-03-12 15:29:35 +00:00
|
|
|
if (input.isFinished())
|
2019-04-12 14:43:28 +00:00
|
|
|
{
|
|
|
|
output.finish();
|
|
|
|
return Status::Finished;
|
|
|
|
}
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Now, we pulled from input, and it must be empty.
|
2020-01-28 11:50:22 +00:00
|
|
|
input.setNeeded();
|
2018-05-23 20:19:33 +00:00
|
|
|
return Status::NeedData;
|
|
|
|
}
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Process block.
|
2020-03-12 15:29:35 +00:00
|
|
|
|
|
|
|
rows_read += rows;
|
|
|
|
|
2018-05-23 20:19:33 +00:00
|
|
|
if (rows_read <= offset)
|
|
|
|
{
|
2019-02-18 16:36:07 +00:00
|
|
|
current_chunk.clear();
|
2019-02-07 18:51:53 +00:00
|
|
|
has_block = false;
|
|
|
|
|
2019-04-12 14:43:28 +00:00
|
|
|
if (input.isFinished())
|
|
|
|
{
|
|
|
|
output.finish();
|
|
|
|
return Status::Finished;
|
|
|
|
}
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Now, we pulled from input, and it must be empty.
|
2020-01-28 11:50:22 +00:00
|
|
|
input.setNeeded();
|
2018-05-23 20:19:33 +00:00
|
|
|
return Status::NeedData;
|
|
|
|
}
|
|
|
|
|
2020-03-12 15:29:35 +00:00
|
|
|
if (output.hasData())
|
|
|
|
return Status::PortFull;
|
|
|
|
|
2018-05-23 20:19:33 +00:00
|
|
|
if (rows_read >= offset + rows && rows_read <= offset + limit)
|
|
|
|
{
|
2020-03-12 15:29:35 +00:00
|
|
|
/// Return the whole chunk.
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2020-03-12 15:29:35 +00:00
|
|
|
/// Save the last row of current chunk to check if next block begins with the same row (for WITH TIES).
|
2019-08-27 17:48:42 +00:00
|
|
|
if (with_ties && rows_read == offset + limit)
|
2020-02-28 20:20:39 +00:00
|
|
|
previous_row_chunk = makeChunkWithPreviousRow(current_chunk, current_chunk.getNumRows() - 1);
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
2020-03-12 15:29:35 +00:00
|
|
|
else
|
|
|
|
splitChunk();
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2020-02-28 20:20:39 +00:00
|
|
|
bool may_need_more_data_for_ties = previous_row_chunk || rows_read - rows <= offset + limit;
|
2019-04-12 11:02:48 +00:00
|
|
|
/// No more data is needed.
|
2020-02-28 20:20:39 +00:00
|
|
|
if (!always_read_till_end && (rows_read >= offset + limit) && !may_need_more_data_for_ties)
|
2019-04-12 11:02:48 +00:00
|
|
|
input.close();
|
|
|
|
|
2020-03-12 15:29:35 +00:00
|
|
|
output.push(std::move(current_chunk));
|
|
|
|
has_block = false;
|
|
|
|
|
|
|
|
return Status::PortFull;
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-03-12 15:29:35 +00:00
|
|
|
void LimitTransform::splitChunk()
|
2018-05-23 20:19:33 +00:00
|
|
|
{
|
2020-03-02 12:56:47 +00:00
|
|
|
auto current_chunk_sort_columns = extractSortColumns(current_chunk.getColumns());
|
|
|
|
size_t num_rows = current_chunk.getNumRows();
|
|
|
|
size_t num_columns = current_chunk.getNumColumns();
|
2019-08-27 17:48:42 +00:00
|
|
|
|
2020-02-28 20:20:39 +00:00
|
|
|
if (previous_row_chunk && rows_read >= offset + limit)
|
2019-08-27 17:48:42 +00:00
|
|
|
{
|
2020-02-28 20:20:39 +00:00
|
|
|
/// Scan until the first row, which is not equal to previous_row_chunk (for WITH TIES)
|
|
|
|
size_t current_row_num = 0;
|
|
|
|
for (; current_row_num < num_rows; ++current_row_num)
|
2019-08-27 17:48:42 +00:00
|
|
|
{
|
2020-03-02 12:56:47 +00:00
|
|
|
if (!sortColumnsEqualAt(current_chunk_sort_columns, current_row_num))
|
2019-08-27 17:48:42 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2020-03-02 12:56:47 +00:00
|
|
|
auto columns = current_chunk.detachColumns();
|
2019-08-27 17:48:42 +00:00
|
|
|
|
2020-02-28 20:20:39 +00:00
|
|
|
if (current_row_num < num_rows)
|
2019-08-27 17:48:42 +00:00
|
|
|
{
|
2020-02-28 20:20:39 +00:00
|
|
|
previous_row_chunk = {};
|
2019-08-27 17:48:42 +00:00
|
|
|
for (size_t i = 0; i < num_columns; ++i)
|
2020-02-28 20:20:39 +00:00
|
|
|
columns[i] = columns[i]->cut(0, current_row_num);
|
2019-08-27 17:48:42 +00:00
|
|
|
}
|
|
|
|
|
2020-02-28 20:20:39 +00:00
|
|
|
current_chunk.setColumns(std::move(columns), current_row_num);
|
2019-08-27 17:48:42 +00:00
|
|
|
block_processed = true;
|
|
|
|
return;
|
|
|
|
}
|
2018-05-23 20:19:33 +00:00
|
|
|
|
|
|
|
/// return a piece of the block
|
|
|
|
size_t start = std::max(
|
|
|
|
static_cast<Int64>(0),
|
2019-02-18 16:36:07 +00:00
|
|
|
static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows));
|
2018-05-23 20:19:33 +00:00
|
|
|
|
|
|
|
size_t length = std::min(
|
|
|
|
static_cast<Int64>(limit), std::min(
|
|
|
|
static_cast<Int64>(rows_read) - static_cast<Int64>(offset),
|
2019-02-18 16:36:07 +00:00
|
|
|
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows)));
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-08-27 17:48:42 +00:00
|
|
|
/// check if other rows in current block equals to last one in limit
|
2020-02-28 20:20:39 +00:00
|
|
|
if (with_ties && length)
|
2019-08-27 17:48:42 +00:00
|
|
|
{
|
2020-02-28 20:20:39 +00:00
|
|
|
size_t current_row_num = start + length;
|
2020-03-02 12:56:47 +00:00
|
|
|
previous_row_chunk = makeChunkWithPreviousRow(current_chunk, current_row_num - 1);
|
2019-08-27 17:48:42 +00:00
|
|
|
|
2020-02-28 20:20:39 +00:00
|
|
|
for (; current_row_num < num_rows; ++current_row_num)
|
2019-08-27 17:48:42 +00:00
|
|
|
{
|
2020-03-02 12:56:47 +00:00
|
|
|
if (!sortColumnsEqualAt(current_chunk_sort_columns, current_row_num))
|
2019-08-27 17:48:42 +00:00
|
|
|
{
|
2020-02-28 20:20:39 +00:00
|
|
|
previous_row_chunk = {};
|
2019-08-27 17:48:42 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2020-02-28 20:20:39 +00:00
|
|
|
|
|
|
|
length = current_row_num - start;
|
2019-08-27 17:48:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (length == num_rows)
|
|
|
|
{
|
|
|
|
block_processed = true;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2020-03-02 12:56:47 +00:00
|
|
|
auto columns = current_chunk.detachColumns();
|
2019-02-18 16:36:07 +00:00
|
|
|
|
|
|
|
for (size_t i = 0; i < num_columns; ++i)
|
|
|
|
columns[i] = columns[i]->cut(start, length);
|
|
|
|
|
|
|
|
current_chunk.setColumns(std::move(columns), length);
|
2019-02-07 18:51:53 +00:00
|
|
|
|
|
|
|
block_processed = true;
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
2020-02-28 20:20:39 +00:00
|
|
|
ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns) const
|
2019-08-27 17:48:42 +00:00
|
|
|
{
|
|
|
|
ColumnRawPtrs res;
|
|
|
|
res.reserve(description.size());
|
|
|
|
for (size_t pos : sort_column_positions)
|
|
|
|
res.push_back(columns[pos].get());
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2020-03-02 12:56:47 +00:00
|
|
|
bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, size_t current_chunk_row_num) const
|
|
|
|
{
|
|
|
|
assert(current_chunk_sort_columns.size() == previous_row_chunk.getNumColumns());
|
|
|
|
size_t size = current_chunk_sort_columns.size();
|
|
|
|
const auto & previous_row_sort_columns = previous_row_chunk.getColumns();
|
|
|
|
for (size_t i = 0; i < size; ++i)
|
|
|
|
if (0 != current_chunk_sort_columns[i]->compareAt(current_chunk_row_num, 0, *previous_row_sort_columns[i], 1))
|
|
|
|
return false;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|