ClickHouse/dbms/src/DataStreams/LimitBlockInputStream.cpp

166 lines
4.4 KiB
C++
Raw Normal View History

2010-03-12 18:25:35 +00:00
#include <algorithm>
#include <DataStreams/LimitBlockInputStream.h>
2010-03-12 18:25:35 +00:00
namespace DB
{
2019-04-19 13:38:25 +00:00
namespace detail
{
2019-04-19 15:45:42 +00:00
/// gets pointers to all columns of block, which were used for ORDER BY
2019-04-19 13:38:25 +00:00
ColumnRawPtrs getBlockColumns(const Block & block, const SortDescription description)
{
size_t size = description.size();
ColumnRawPtrs res;
res.reserve(size);
for (size_t i = 0; i < size; ++i)
{
const IColumn * column = !description[i].column_name.empty()
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
res.emplace_back(column);
}
return res;
}
2019-04-19 15:45:42 +00:00
void setSharedBlockRowRef(SharedBlockRowRef &row_ref, SharedBlockPtr shared_block, ColumnRawPtrs *columns,
size_t row_num)
{
row_ref.row_num = row_num;
row_ref.columns = columns;
row_ref.shared_block = shared_block;
}
2019-04-19 13:38:25 +00:00
}
LimitBlockInputStream::LimitBlockInputStream(
const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_,
bool use_limit_as_total_rows_approx, bool with_ties_, const SortDescription & description_)
: limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_), with_ties(with_ties_)
, description(description_)
2010-03-12 18:25:35 +00:00
{
2019-03-08 14:54:32 +00:00
if (use_limit_as_total_rows_approx)
{
addTotalRowsApprox(static_cast<size_t>(limit));
}
children.push_back(input);
2010-03-12 18:25:35 +00:00
}
2011-09-04 21:23:19 +00:00
Block LimitBlockInputStream::readImpl()
2010-03-12 18:25:35 +00:00
{
Block res;
2019-02-10 16:22:38 +00:00
UInt64 rows = 0;
2019-04-19 15:45:42 +00:00
/// pos >= offset + limit and all rows in previous block were equal to ties_row_ref
/// so we check current block
if (with_ties && ties_row_ref.shared_block)
2019-04-19 13:38:25 +00:00
{
res = children.back()->read();
rows = res.rows();
pos += rows;
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
ColumnRawPtrs columns = getBlockColumns(*ptr, description);
UInt64 len;
for (len = 0; len < rows; ++len)
{
SharedBlockRowRef currentRow;
2019-04-19 15:45:42 +00:00
setSharedBlockRowRef(currentRow, ptr, &columns, len);
if (currentRow != ties_row_ref)
2019-04-19 13:38:25 +00:00
{
2019-04-19 15:45:42 +00:00
ties_row_ref.reset();
2019-04-19 13:38:25 +00:00
break;
}
}
if (len < rows - 1)
{
for (size_t i = 0; i < ptr->columns(); ++i)
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(0, len);
}
return *ptr;
}
/// pos - how many lines were read, including the last read block
if (pos >= offset + limit)
{
if (!always_read_till_end)
return res;
else
{
while (children.back()->read())
;
return res;
}
}
do
{
res = children.back()->read();
if (!res)
return res;
rows = res.rows();
pos += rows;
} while (pos <= offset);
2019-04-19 13:38:25 +00:00
/// give away the whole block
if (pos >= offset + rows && pos <= offset + limit)
return res;
/// give away a piece of the block
2019-02-10 16:22:38 +00:00
UInt64 start = std::max(
static_cast<Int64>(0),
static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows));
2019-02-10 16:22:38 +00:00
UInt64 length = std::min(
static_cast<Int64>(limit), std::min(
static_cast<Int64>(pos) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows)));
2019-04-19 13:38:25 +00:00
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
2019-04-19 15:45:42 +00:00
/// check if other rows in current block equals to last one in limit
2019-04-19 13:38:25 +00:00
if (with_ties)
{
ColumnRawPtrs columns = getBlockColumns(*ptr, description);
2019-04-19 15:45:42 +00:00
setSharedBlockRowRef(ties_row_ref, ptr, &columns, start + length - 1);
2019-04-19 13:38:25 +00:00
2019-04-19 15:45:42 +00:00
for (size_t i = ties_row_ref.row_num + 1; i < rows; ++i)
2019-04-19 13:38:25 +00:00
{
2019-04-19 15:45:42 +00:00
SharedBlockRowRef current_row;
setSharedBlockRowRef(current_row, ptr, &columns, i);
if (current_row == ties_row_ref)
2019-04-19 13:38:25 +00:00
++length;
else
{
2019-04-19 15:45:42 +00:00
ties_row_ref.reset();
2019-04-19 13:38:25 +00:00
break;
}
}
}
for (size_t i = 0; i < ptr->columns(); ++i)
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(start, length);
// TODO: we should provide feedback to child-block, so it will know how many rows are actually consumed.
// It's crucial for streaming engines like Kafka.
2019-04-19 13:38:25 +00:00
return *ptr;
2010-03-12 18:25:35 +00:00
}
2019-04-19 13:38:25 +00:00
2010-03-12 18:25:35 +00:00
}