Make building column arrays for chunk processing straightforward

This commit is contained in:
Igor Nikonov 2022-07-27 07:44:42 +00:00
parent 24f3a6905f
commit 589104fa6e
2 changed files with 57 additions and 68 deletions

View File

@ -46,58 +46,73 @@ DistinctSortedTransform::DistinctSortedTransform(
void DistinctSortedTransform::transform(Chunk & chunk) void DistinctSortedTransform::transform(Chunk & chunk)
{ {
const ColumnRawPtrs column_ptrs(getKeyColumns(chunk)); if (!chunk.hasRows())
if (column_ptrs.empty()) return;
return;
ColumnRawPtrs clearing_hint_columns(getClearingColumns(column_ptrs)); /// get DISTINCT columns from chunk
ColumnRawPtrs column_ptrs;
column_ptrs.reserve(column_positions.size());
for (const auto pos : column_positions)
{
const auto & column = chunk.getColumns()[pos];
column_ptrs.emplace_back(column.get());
}
if (data.type == ClearableSetVariants::Type::EMPTY) /// get DISTINCT columns from chunk which form sort prefix of sort description
data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes)); ColumnRawPtrs clearing_hint_columns;
clearing_hint_columns.reserve(sort_prefix_positions.size());
for (const auto pos : sort_prefix_positions)
{
const auto & column = chunk.getColumns()[pos];
clearing_hint_columns.emplace_back(column.get());
}
const size_t rows = chunk.getNumRows(); if (data.type == ClearableSetVariants::Type::EMPTY)
IColumn::Filter filter(rows); data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes));
bool has_new_data = false; const size_t rows = chunk.getNumRows();
switch (data.type) IColumn::Filter filter(rows);
{
case ClearableSetVariants::Type::EMPTY:
break;
#define M(NAME) \
case ClearableSetVariants::Type::NAME: \
has_new_data = buildFilter(*data.NAME, column_ptrs, clearing_hint_columns, filter, rows, data); \
break;
APPLY_FOR_SET_VARIANTS(M)
#undef M
}
/// Just go to the next block if there isn't any new record in the current one. bool has_new_data = false;
if (!has_new_data) switch (data.type)
{ {
chunk.clear(); case ClearableSetVariants::Type::EMPTY:
return; break;
} #define M(NAME) \
case ClearableSetVariants::Type::NAME: \
has_new_data = buildFilter(*data.NAME, column_ptrs, clearing_hint_columns, filter, rows, data); \
break;
APPLY_FOR_SET_VARIANTS(M)
#undef M
}
if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED)) /// Just go to the next block if there isn't any new record in the current one.
{ if (!has_new_data)
stopReading(); {
chunk.clear(); chunk.clear();
return; return;
} }
/// Stop reading if we already reached the limit. if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
if (limit_hint && data.getTotalRowCount() >= limit_hint) {
stopReading(); stopReading();
chunk.clear();
return;
}
prev_chunk.chunk = std::move(chunk); /// Stop reading if we already reached the limit.
prev_chunk.clearing_hint_columns = std::move(clearing_hint_columns); if (limit_hint && data.getTotalRowCount() >= limit_hint)
stopReading();
size_t all_columns = prev_chunk.chunk.getNumColumns(); prev_chunk.chunk = std::move(chunk);
Chunk res_chunk; prev_chunk.clearing_hint_columns = std::move(clearing_hint_columns);
for (size_t i = 0; i < all_columns; ++i)
res_chunk.addColumn(prev_chunk.chunk.getColumns().at(i)->filter(filter, -1));
chunk = std::move(res_chunk); size_t all_columns = prev_chunk.chunk.getNumColumns();
Chunk res_chunk;
for (size_t i = 0; i < all_columns; ++i)
res_chunk.addColumn(prev_chunk.chunk.getColumns().at(i)->filter(filter, -1));
chunk = std::move(res_chunk);
} }
@ -142,28 +157,6 @@ bool DistinctSortedTransform::buildFilter(
return has_new_data; return has_new_data;
} }
ColumnRawPtrs DistinctSortedTransform::getKeyColumns(const Chunk & chunk) const
{
ColumnRawPtrs column_ptrs;
column_ptrs.reserve(column_positions.size());
for (const auto pos : column_positions)
{
const auto & column = chunk.getColumns()[pos];
column_ptrs.emplace_back(column.get());
}
return column_ptrs;
}
ColumnRawPtrs DistinctSortedTransform::getClearingColumns(const ColumnRawPtrs & key_columns) const
{
ColumnRawPtrs clearing_hint_columns;
clearing_hint_columns.reserve(sort_prefix_positions.size());
for (const auto pos : sort_prefix_positions)
clearing_hint_columns.emplace_back(key_columns[pos]);
return clearing_hint_columns;
}
bool DistinctSortedTransform::rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m) bool DistinctSortedTransform::rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m)
{ {
for (size_t column_index = 0, num_columns = lhs.size(); column_index < num_columns; ++column_index) for (size_t column_index = 0, num_columns = lhs.size(); column_index < num_columns; ++column_index)

View File

@ -32,10 +32,6 @@ protected:
void transform(Chunk & chunk) override; void transform(Chunk & chunk) override;
private: private:
ColumnRawPtrs getKeyColumns(const Chunk & chunk) const;
/// When clearing_columns changed, we can clean HashSet to memory optimization
/// clearing_columns is a left-prefix of SortDescription exists in key_columns
ColumnRawPtrs getClearingColumns(const ColumnRawPtrs & key_columns) const;
static bool rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m); static bool rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m);
/// return true if has new data /// return true if has new data