ColumnSparse: sorting and dummy aggregation

This commit is contained in:
Anton Popov 2021-04-06 03:02:48 +03:00
parent d46958a8d2
commit d7ca6ecc50
4 changed files with 88 additions and 35 deletions

View File

@ -1,5 +1,6 @@
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnCompressed.h>
#include <Common/WeakHash.h>
#include <Common/SipHash.h>
@ -337,14 +338,25 @@ void ColumnSparse::compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const
{
UNUSED(rhs);
UNUSED(rhs_row_num);
UNUSED(row_indexes);
UNUSED(compare_results);
UNUSED(direction);
UNUSED(nan_direction_hint);
if (row_indexes)
{
/// TODO: implement without conversion to full column.
auto this_full = convertToFullColumnIfSparse();
auto rhs_full = rhs.convertToFullColumnIfSparse();
this_full->compareColumn(*rhs_full, rhs_row_num, row_indexes, compare_results, direction, nan_direction_hint);
}
else
{
const auto & rhs_sparse = assert_cast<const ColumnSparse &>(rhs);
PaddedPODArray<Int8> nested_result;
values->compareColumn(rhs_sparse.getValuesColumn(), rhs_sparse.getValueIndex(rhs_row_num),
nullptr, nested_result, direction, nan_direction_hint);
throwMustBeDense();
const auto & offsets_data = getOffsetsData();
compare_results.resize(_size, nested_result[0]);
for (size_t i = 0; i < offsets_data.size(); ++i)
compare_results[offsets_data[i]] = nested_result[i + 1];
}
}
int ColumnSparse::compareAtWithCollation(size_t n, size_t m, const IColumn & rhs, int null_direction_hint, const Collator & collator) const
@ -360,7 +372,7 @@ bool ColumnSparse::hasEqualValues() const
return offsets->size() == 0;
}
void ColumnSparse::getPermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const
void ColumnSparse::getPermutationImpl(bool reverse, size_t limit, int null_direction_hint, Permutation & res, const Collator * collator) const
{
if (_size == 0)
return;
@ -373,7 +385,10 @@ void ColumnSparse::getPermutation(bool reverse, size_t limit, int null_direction
return;
Permutation perm;
values->getPermutation(reverse, limit, null_direction_hint, perm);
if (collator)
values->getPermutationWithCollation(*collator, reverse, limit, null_direction_hint, perm);
else
values->getPermutation(reverse, limit, null_direction_hint, perm);
if (limit == 0 || limit > _size)
limit = _size;
@ -412,39 +427,27 @@ void ColumnSparse::getPermutation(bool reverse, size_t limit, int null_direction
assert(row == limit);
}
void ColumnSparse::getPermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const
{
return getPermutationImpl(reverse, limit, null_direction_hint, res, nullptr);
}
void ColumnSparse::updatePermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges & equal_range) const
{
UNUSED(reverse);
UNUSED(null_direction_hint);
UNUSED(limit);
UNUSED(res);
UNUSED(equal_range);
throwMustBeDense();
auto this_full = convertToFullColumnIfSparse();
this_full->updatePermutation(reverse, limit, null_direction_hint, res, equal_range);
}
void ColumnSparse::getPermutationWithCollation(const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res) const
{
UNUSED(collator);
UNUSED(reverse);
UNUSED(limit);
UNUSED(null_direction_hint);
UNUSED(res);
throwMustBeDense();
return getPermutationImpl(reverse, limit, null_direction_hint, res, &collator);
}
void ColumnSparse::updatePermutationWithCollation(
const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges& equal_range) const
{
UNUSED(collator);
UNUSED(reverse);
UNUSED(limit);
UNUSED(null_direction_hint);
UNUSED(res);
UNUSED(equal_range);
throwMustBeDense();
auto this_full = convertToFullColumnIfSparse();
this_full->updatePermutationWithCollation(collator, reverse, limit, null_direction_hint, res, equal_range);
}
void ColumnSparse::reserve(size_t)
@ -479,8 +482,37 @@ void ColumnSparse::protect()
ColumnPtr ColumnSparse::replicate(const Offsets & replicate_offsets) const
{
UNUSED(replicate_offsets);
throwMustBeDense();
/// TODO: implement specializations.
if (_size != replicate_offsets.size())
throw Exception("Size of offsets doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
if (_size == 0)
return ColumnSparse::create(values->cloneEmpty());
const auto & offsets_data = getOffsetsData();
auto res_offsets = offsets->cloneEmpty();
auto & res_offsets_data = assert_cast<ColumnUInt64 &>(*res_offsets).getData();
auto res_values = values->cloneEmpty();
res_values->insertDefault();
size_t current_offset = 0;
for (size_t i = 0; i < _size; ++i)
{
if (current_offset < offsets_data.size() && i == offsets_data[current_offset])
{
size_t replicate_size = replicate_offsets[i] - replicate_offsets[i - 1];
res_offsets_data.reserve(res_offsets_data.size() + replicate_size);
for (size_t row = replicate_offsets[i - 1]; row < replicate_offsets[i]; ++row)
{
res_offsets_data.push_back(row);
res_values->insertFrom(*values, current_offset + 1);
}
++current_offset;
}
}
return ColumnSparse::create(std::move(res_values), std::move(res_offsets), replicate_offsets.back());
}
void ColumnSparse::updateHashWithValue(size_t n, SipHash & hash) const
@ -535,7 +567,16 @@ void ColumnSparse::gather(ColumnGathererStream & gatherer_stream)
ColumnPtr ColumnSparse::compress() const
{
throwMustBeDense();
auto values_compressed = values->compress();
auto offsets_compressed = offsets->compress();
size_t byte_size = values_compressed->byteSize() + offsets_compressed->byteSize();
return ColumnCompressed::create(size(), byte_size,
[values_compressed = std::move(values_compressed), offsets_compressed = std::move(offsets_compressed), size = size()]
{
return ColumnSparse::create(values_compressed->decompress(), offsets_compressed->decompress(), size);
});
}
bool ColumnSparse::structureEquals(const IColumn & rhs) const

View File

@ -90,13 +90,18 @@ public:
void compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const override;
int compareAtWithCollation(size_t n, size_t m, const IColumn & rhs, int null_direction_hint, const Collator & collator) const override;
bool hasEqualValues() const override;
void getPermutationImpl(bool reverse, size_t limit, int null_direction_hint, Permutation & res, const Collator * collator) const;
void getPermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const override;
void updatePermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges & equal_range) const override;
void getPermutationWithCollation(const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res) const override;
void updatePermutationWithCollation(
const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges& equal_range) const override;
void reserve(size_t n) override;
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;

View File

@ -707,7 +707,7 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
{
materialized_columns.push_back(columns.at(params.keys[i])->convertToFullColumnIfConst());
materialized_columns.push_back(columns.at(params.keys[i])->convertToFullColumnIfSparse()->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back().get();
if (!result.isLowCardinality())

View File

@ -37,6 +37,13 @@ MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t
if (chunk.getNumRows() == 0)
continue;
size_t num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
for (auto & column : columns)
column = column->convertToFullColumnIfSparse();
chunk.setColumns(std::move(columns), num_rows);
cursors.emplace_back(chunk.getColumns(), description);
has_collation |= cursors.back().has_collation;