ClickHouse/src/Processors/Transforms/DistinctTransform.cpp

115 lines
3.1 KiB
C++
Raw Normal View History

2019-03-25 16:58:59 +00:00
#include <Processors/Transforms/DistinctTransform.h>
2019-03-25 16:37:27 +00:00
2019-03-25 16:58:59 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
}
DistinctTransform::DistinctTransform(
2019-08-03 11:02:40 +00:00
const Block & header_,
const SizeLimits & set_size_limits_,
UInt64 limit_hint_,
const Names & columns_)
: ISimpleTransform(header_, header_, true)
, limit_hint(limit_hint_)
, set_size_limits(set_size_limits_)
2019-03-25 16:58:59 +00:00
{
2019-08-03 11:02:40 +00:00
size_t num_columns = columns_.empty() ? header_.columns() : columns_.size();
2019-03-25 16:58:59 +00:00
2019-08-03 11:02:40 +00:00
key_columns_pos.reserve(columns_.size());
2019-03-25 16:58:59 +00:00
for (size_t i = 0; i < num_columns; ++i)
{
2019-08-03 11:02:40 +00:00
auto pos = columns_.empty() ? i
: header_.getPositionByName(columns_[i]);
2019-03-25 16:58:59 +00:00
2020-04-22 07:03:43 +00:00
const auto & col = header_.getByPosition(pos).column;
2019-03-25 16:58:59 +00:00
2019-07-08 13:15:45 +00:00
if (!(col && isColumnConst(*col)))
2019-04-08 16:35:44 +00:00
key_columns_pos.emplace_back(pos);
2019-03-25 16:58:59 +00:00
}
}
template <typename Method>
void DistinctTransform::buildFilter(
Method & method,
const ColumnRawPtrs & columns,
IColumn::Filter & filter,
size_t rows,
SetVariants & variants) const
{
typename Method::State state(columns, key_sizes, nullptr);
for (size_t i = 0; i < rows; ++i)
{
auto emplace_result = state.emplaceKey(method.data, i, variants.string_pool);
/// Emit the record if there is no such key in the current set yet.
/// Skip it otherwise.
filter[i] = emplace_result.isInserted();
}
}
void DistinctTransform::transform(Chunk & chunk)
{
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
/// Stop reading if we already reach the limit.
if (no_more_rows || (limit_hint && data.getTotalRowCount() >= limit_hint))
{
stopReading();
return;
}
ColumnRawPtrs column_ptrs;
column_ptrs.reserve(key_columns_pos.size());
for (auto pos : key_columns_pos)
column_ptrs.emplace_back(columns[pos].get());
if (column_ptrs.empty())
{
/// Only constants. We need to return single row.
no_more_rows = true;
for (auto & column : columns)
column = column->cut(0, 1);
chunk.setColumns(std::move(columns), 1);
return;
}
if (data.empty())
data.init(SetVariants::chooseMethod(column_ptrs, key_sizes));
const auto old_set_size = data.getTotalRowCount();
IColumn::Filter filter(num_rows);
switch (data.type)
{
case SetVariants::Type::EMPTY:
break;
#define M(NAME) \
case SetVariants::Type::NAME: \
buildFilter(*data.NAME, column_ptrs, filter, num_rows, data); \
break;
APPLY_FOR_SET_VARIANTS(M)
#undef M
}
/// Just go to the next chunk if there isn't any new record in the current one.
if (data.getTotalRowCount() == old_set_size)
return;
if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
return;
for (auto & column : columns)
column = column->filter(filter, -1);
chunk.setColumns(std::move(columns), data.getTotalRowCount() - old_set_size);
}
}