Improve the performance of serialized aggregation method when involving multiple [nullable] columns.

This commit is contained in:
Amos Bird 2024-02-26 02:29:00 +08:00
parent 8141e1c3d1
commit 3b04f5c605
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
43 changed files with 905 additions and 968 deletions

View File

@ -1,10 +1,7 @@
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/MaskOperations.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromArena.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Common/AlignedBuffer.h>
#include <Common/Arena.h>
#include <Common/FieldVisitorToString.h>
@ -14,6 +11,10 @@
#include <Common/assert_cast.h>
#include <Common/iota.h>
#include <Common/typeid_cast.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromArena.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
namespace DB
@ -542,7 +543,7 @@ void ColumnAggregateFunction::insertDefault()
pushBackAndCreateState(data, arena, func.get());
}
StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin, const UInt8 *) const
StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const
{
WriteBufferFromArena out(arena, begin);
func->serialize(data[n], out, version);
@ -650,11 +651,6 @@ void ColumnAggregateFunction::getPermutation(PermutationSortDirection /*directio
void ColumnAggregateFunction::updatePermutation(PermutationSortDirection, PermutationSortStability,
size_t, int, Permutation &, EqualRanges&) const {}
void ColumnAggregateFunction::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
void ColumnAggregateFunction::getExtremes(Field & min, Field & max) const
{
/// Place serialized default values into min/max.
@ -690,7 +686,7 @@ ColumnAggregateFunction::MutablePtr ColumnAggregateFunction::createView() const
}
ColumnAggregateFunction::ColumnAggregateFunction(const ColumnAggregateFunction & src_)
: COWHelper<IColumn, ColumnAggregateFunction>(src_),
: COWHelper<IColumnHelper<ColumnAggregateFunction>, ColumnAggregateFunction>(src_),
foreign_arenas(concatArenas(src_.foreign_arenas, src_.my_arena)),
func(src_.func), src(src_.getPtr()), data(src_.data.begin(), src_.data.end())
{

View File

@ -51,13 +51,13 @@ using ConstArenas = std::vector<ConstArenaPtr>;
* specifying which individual values should be destroyed and which ones should not.
* Clearly, this method would have a substantially non-zero price.
*/
class ColumnAggregateFunction final : public COWHelper<IColumn, ColumnAggregateFunction>
class ColumnAggregateFunction final : public COWHelper<IColumnHelper<ColumnAggregateFunction>, ColumnAggregateFunction>
{
public:
using Container = PaddedPODArray<AggregateDataPtr>;
private:
friend class COWHelper<IColumn, ColumnAggregateFunction>;
friend class COWHelper<IColumnHelper<ColumnAggregateFunction>, ColumnAggregateFunction>;
/// Arenas used by function states that are created elsewhere. We own these
/// arenas in the sense of extending their lifetime, but do not modify them.
@ -164,7 +164,7 @@ public:
void insertDefault() override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
const char * deserializeAndInsertFromArena(const char * src_arena) override;
@ -203,8 +203,6 @@ public:
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override;
int compareAt(size_t, size_t, const IColumn &, int) const override
{
return 0;

View File

@ -8,7 +8,6 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/MaskOperations.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Common/Exception.h>
#include <Common/Arena.h>
#include <Common/SipHash.h>
@ -205,7 +204,7 @@ void ColumnArray::insertData(const char * pos, size_t length)
}
StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
size_t array_size = sizeAt(n);
size_t offset = offsetAt(n);
@ -226,6 +225,18 @@ StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char con
}
void ColumnArray::serializeValueIntoMemory(size_t n, char *& memory) const
{
size_t array_size = sizeAt(n);
size_t offset = offsetAt(n);
memcpy(memory, &array_size, sizeof(array_size));
memory += sizeof(array_size);
for (size_t i = 0; i < array_size; ++i)
getData().serializeValueIntoMemory(offset + i, memory);
}
const char * ColumnArray::deserializeAndInsertFromArena(const char * pos)
{
size_t array_size = unalignedLoad<size_t>(pos);
@ -390,19 +401,6 @@ int ColumnArray::compareAtWithCollation(size_t n, size_t m, const IColumn & rhs_
return compareAtImpl(n, m, rhs_, nan_direction_hint, &collator);
}
void ColumnArray::compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const
{
return doCompareColumn<ColumnArray>(assert_cast<const ColumnArray &>(rhs), rhs_row_num, row_indexes,
compare_results, direction, nan_direction_hint);
}
bool ColumnArray::hasEqualValues() const
{
return hasEqualValuesImpl<ColumnArray>();
}
struct ColumnArray::ComparatorBase
{
const ColumnArray & parent;
@ -988,22 +986,6 @@ ColumnPtr ColumnArray::compress() const
});
}
double ColumnArray::getRatioOfDefaultRows(double sample_ratio) const
{
return getRatioOfDefaultRowsImpl<ColumnArray>(sample_ratio);
}
UInt64 ColumnArray::getNumberOfDefaultRows() const
{
return getNumberOfDefaultRowsImpl<ColumnArray>();
}
void ColumnArray::getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const
{
return getIndicesOfNonDefaultRowsImpl<ColumnArray>(indices, from, limit);
}
ColumnPtr ColumnArray::replicate(const Offsets & replicate_offsets) const
{
if (replicate_offsets.empty())
@ -1298,11 +1280,6 @@ ColumnPtr ColumnArray::replicateTuple(const Offsets & replicate_offsets) const
assert_cast<const ColumnArray &>(*temporary_arrays.front()).getOffsetsPtr());
}
void ColumnArray::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
size_t ColumnArray::getNumberOfDimensions() const
{
const auto * nested_array = checkAndGetColumn<ColumnArray>(*data);

View File

@ -15,10 +15,10 @@ namespace DB
* In memory, it is represented as one column of a nested type, whose size is equal to the sum of the sizes of all arrays,
* and as an array of offsets in it, which allows you to get each element.
*/
class ColumnArray final : public COWHelper<IColumn, ColumnArray>
class ColumnArray final : public COWHelper<IColumnHelper<ColumnArray>, ColumnArray>
{
private:
friend class COWHelper<IColumn, ColumnArray>;
friend class COWHelper<IColumnHelper<ColumnArray>, ColumnArray>;
/** Create an array column with specified values and offsets. */
ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && offsets_column);
@ -48,7 +48,7 @@ public:
/** Create immutable column using immutable arguments. This arguments may be shared with other columns.
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
*/
using Base = COWHelper<IColumn, ColumnArray>;
using Base = COWHelper<IColumnHelper<ColumnArray>, ColumnArray>;
static Ptr create(const ColumnPtr & nested_column, const ColumnPtr & offsets_column)
{
@ -77,7 +77,8 @@ public:
StringRef getDataAt(size_t n) const override;
bool isDefaultAt(size_t n) const override;
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
void serializeValueIntoMemory(size_t, char *& memory) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
@ -95,11 +96,7 @@ public:
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
template <typename Type> ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override;
void compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const override;
int compareAtWithCollation(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint, const Collator & collator) const override;
bool hasEqualValues() const override;
void getPermutation(PermutationSortDirection direction, PermutationSortStability stability,
size_t limit, int nan_direction_hint, Permutation & res) const override;
void updatePermutation(PermutationSortDirection direction, PermutationSortStability stability,
@ -148,13 +145,6 @@ public:
/// For example, `getDataInRange(0, size())` is the same as `getDataPtr()->clone()`.
MutableColumnPtr getDataInRange(size_t start, size_t length) const;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
return scatterImpl<ColumnArray>(num_columns, selector);
}
void gather(ColumnGathererStream & gatherer_stream) override;
ColumnPtr compress() const override;
void forEachSubcolumn(MutableColumnCallback callback) override
@ -178,11 +168,6 @@ public:
return false;
}
double getRatioOfDefaultRows(double sample_ratio) const override;
UInt64 getNumberOfDefaultRows() const override;
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;
void finalize() override { data->finalize(); }
bool isFinalized() const override { return data->isFinalized(); }

View File

@ -30,7 +30,7 @@ namespace ErrorCodes
*
* Also in-memory compression allows to keep more data in RAM.
*/
class ColumnCompressed : public COWHelper<IColumn, ColumnCompressed>
class ColumnCompressed : public COWHelper<IColumnHelper<ColumnCompressed>, ColumnCompressed>
{
public:
using Lazy = std::function<ColumnPtr()>;
@ -89,7 +89,8 @@ public:
void insertData(const char *, size_t) override { throwMustBeDecompressed(); }
void insertDefault() override { throwMustBeDecompressed(); }
void popBack(size_t) override { throwMustBeDecompressed(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override { throwMustBeDecompressed(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwMustBeDecompressed(); }
void serializeValueIntoMemory(size_t, char *&) const override { throwMustBeDecompressed(); }
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeDecompressed(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeDecompressed(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeDecompressed(); }

View File

@ -20,10 +20,10 @@ namespace ErrorCodes
/** ColumnConst contains another column with single element,
* but looks like a column with arbitrary amount of same elements.
*/
class ColumnConst final : public COWHelper<IColumn, ColumnConst>
class ColumnConst final : public COWHelper<IColumnHelper<ColumnConst>, ColumnConst>
{
private:
friend class COWHelper<IColumn, ColumnConst>;
friend class COWHelper<IColumnHelper<ColumnConst>, ColumnConst>;
WrappedPtr data;
size_t s;
@ -160,11 +160,16 @@ public:
s -= n;
}
StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin, const UInt8 *) const override
StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin) const override
{
return data->serializeValueIntoArena(0, arena, begin);
}
void serializeValueIntoMemory(size_t, char *& memory) const override
{
return data->serializeValueIntoMemory(0, memory);
}
const char * deserializeAndInsertFromArena(const char * pos) override
{
const auto * res = data->deserializeAndInsertFromArena(pos);

View File

@ -42,46 +42,6 @@ int ColumnDecimal<T>::compareAt(size_t n, size_t m, const IColumn & rhs_, int) c
return decimalLess<T>(b, a, other.scale, scale) ? 1 : (decimalLess<T>(a, b, scale, other.scale) ? -1 : 0);
}
template <is_decimal T>
void ColumnDecimal<T>::compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const
{
return this->template doCompareColumn<ColumnDecimal<T>>(static_cast<const Self &>(rhs), rhs_row_num, row_indexes,
compare_results, direction, nan_direction_hint);
}
template <is_decimal T>
bool ColumnDecimal<T>::hasEqualValues() const
{
return this->template hasEqualValuesImpl<ColumnDecimal<T>>();
}
template <is_decimal T>
StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &data[n], sizeof(T));
return res;
}
template <is_decimal T>
const char * ColumnDecimal<T>::deserializeAndInsertFromArena(const char * pos)
{
@ -470,12 +430,6 @@ ColumnPtr ColumnDecimal<T>::replicate(const IColumn::Offsets & offsets) const
return res;
}
template <is_decimal T>
void ColumnDecimal<T>::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
template <is_decimal T>
ColumnPtr ColumnDecimal<T>::compress() const
{

View File

@ -1,14 +1,12 @@
#pragma once
#include <cmath>
#include <base/sort.h>
#include <base/TypeName.h>
#include <Core/Field.h>
#include <Core/DecimalFunctions.h>
#include <Core/TypeId.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnVectorHelper.h>
#include <Columns/ColumnFixedSizeHelper.h>
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
@ -18,11 +16,11 @@ namespace DB
/// A ColumnVector for Decimals
template <is_decimal T>
class ColumnDecimal final : public COWHelper<ColumnVectorHelper, ColumnDecimal<T>>
class ColumnDecimal final : public COWHelper<IColumnHelper<ColumnDecimal<T>, ColumnFixedSizeHelper>, ColumnDecimal<T>>
{
private:
using Self = ColumnDecimal;
friend class COWHelper<ColumnVectorHelper, Self>;
friend class COWHelper<IColumnHelper<Self, ColumnFixedSizeHelper>, Self>;
public:
using ValueType = T;
@ -82,17 +80,12 @@ public:
Float64 getFloat64(size_t n) const final { return DecimalUtils::convertTo<Float64>(data[n], scale); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override;
void updateHashFast(SipHash & hash) const override;
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override;
void compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const override;
bool hasEqualValues() const override;
void getPermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int nan_direction_hint, IColumn::Permutation & res) const override;
void updatePermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
@ -119,13 +112,6 @@ public:
ColumnPtr replicate(const IColumn::Offsets & offsets) const override;
void getExtremes(Field & min, Field & max) const override;
MutableColumns scatter(IColumn::ColumnIndex num_columns, const IColumn::Selector & selector) const override
{
return this->template scatterImpl<Self>(num_columns, selector);
}
void gather(ColumnGathererStream & gatherer_stream) override;
bool structureEquals(const IColumn & rhs) const override
{
if (auto rhs_concrete = typeid_cast<const ColumnDecimal<T> *>(&rhs))
@ -133,21 +119,6 @@ public:
return false;
}
double getRatioOfDefaultRows(double sample_ratio) const override
{
return this->template getRatioOfDefaultRowsImpl<Self>(sample_ratio);
}
UInt64 getNumberOfDefaultRows() const override
{
return this->template getNumberOfDefaultRowsImpl<Self>();
}
void getIndicesOfNonDefaultRows(IColumn::Offsets & indices, size_t from, size_t limit) const override
{
return this->template getIndicesOfNonDefaultRowsImpl<Self>(indices, from, limit);
}
ColumnPtr compress() const override;
void insertValue(const T value) { data.push_back(value); }

View File

@ -7,7 +7,7 @@
namespace DB
{
/** Allows to access internal array of ColumnVector or ColumnFixedString without cast to concrete type.
/** Allows to access internal array of fixed-size column without cast to concrete type.
* We will inherit ColumnVector and ColumnFixedString from this class instead of IColumn.
* Assumes data layout of ColumnVector, ColumnFixedString and PODArray.
*
@ -22,7 +22,7 @@ namespace DB
* To allow functional tests to work under UBSan we have to separate some base class that will present the memory layout in explicit way,
* and we will do static_cast to this class.
*/
class ColumnVectorHelper : public IColumn
class ColumnFixedSizeHelper : public IColumn
{
public:
template <size_t ELEMENT_SIZE>

View File

@ -2,7 +2,6 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnCompressed.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteHelpers.h>
#include <Common/Arena.h>
#include <Common/HashTable/Hash.h>
@ -97,30 +96,6 @@ void ColumnFixedString::insertData(const char * pos, size_t length)
memset(chars.data() + old_size + length, 0, n - length);
}
StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + n;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = n;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &chars[n * index], n);
return res;
}
const char * ColumnFixedString::deserializeAndInsertFromArena(const char * pos)
{
size_t old_size = chars.size();
@ -375,11 +350,6 @@ ColumnPtr ColumnFixedString::replicate(const Offsets & offsets) const
return res;
}
void ColumnFixedString::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
void ColumnFixedString::getExtremes(Field & min, Field & max) const
{
min = String();

View File

@ -6,7 +6,7 @@
#include <Common/assert_cast.h>
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
#include <Columns/ColumnVectorHelper.h>
#include <Columns/ColumnFixedSizeHelper.h>
#include <Core/Field.h>
@ -16,10 +16,10 @@ namespace DB
/** A column of values of "fixed-length string" type.
* If you insert a smaller string, it will be padded with zero bytes.
*/
class ColumnFixedString final : public COWHelper<ColumnVectorHelper, ColumnFixedString>
class ColumnFixedString final : public COWHelper<IColumnHelper<ColumnFixedString, ColumnFixedSizeHelper>, ColumnFixedString>
{
public:
friend class COWHelper<ColumnVectorHelper, ColumnFixedString>;
friend class COWHelper<IColumnHelper<ColumnFixedString, ColumnFixedSizeHelper>, ColumnFixedString>;
using Chars = PaddedPODArray<UInt8>;
@ -107,7 +107,7 @@ public:
chars.resize_fill(chars.size() + n);
}
virtual void insertManyDefaults(size_t length) override
void insertManyDefaults(size_t length) override
{
chars.resize_fill(chars.size() + n * length);
}
@ -117,8 +117,6 @@ public:
chars.resize_assume_reserved(chars.size() - n * elems);
}
StringRef serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
@ -136,24 +134,6 @@ public:
return memcmpSmallAllowOverflow15(chars.data() + p1 * n, rhs.chars.data() + p2 * n, n);
}
void compareColumn(
const IColumn & rhs_,
size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes,
PaddedPODArray<Int8> & compare_results,
int direction,
int nan_direction_hint) const override
{
const ColumnFixedString & rhs = assert_cast<const ColumnFixedString &>(rhs_);
chassert(this->n == rhs.n);
return doCompareColumn<ColumnFixedString>(rhs, rhs_row_num, row_indexes, compare_results, direction, nan_direction_hint);
}
bool hasEqualValues() const override
{
return hasEqualValuesImpl<ColumnFixedString>();
}
void getPermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int nan_direction_hint, Permutation & res) const override;
@ -175,13 +155,6 @@ public:
ColumnPtr replicate(const Offsets & offsets) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
return scatterImpl<ColumnFixedString>(num_columns, selector);
}
void gather(ColumnGathererStream & gatherer_stream) override;
ColumnPtr compress() const override;
void reserve(size_t size) override
@ -208,21 +181,6 @@ public:
return false;
}
double getRatioOfDefaultRows(double sample_ratio) const override
{
return getRatioOfDefaultRowsImpl<ColumnFixedString>(sample_ratio);
}
UInt64 getNumberOfDefaultRows() const override
{
return getNumberOfDefaultRowsImpl<ColumnFixedString>();
}
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override
{
return getIndicesOfNonDefaultRowsImpl<ColumnFixedString>(indices, from, limit);
}
bool canBeInsideNullable() const override { return true; }
bool isFixedAndContiguous() const override { return true; }

View File

@ -19,10 +19,10 @@ using FunctionBasePtr = std::shared_ptr<const IFunctionBase>;
/** A column containing a lambda expression.
* Contains an expression and captured columns, but not input arguments.
*/
class ColumnFunction final : public COWHelper<IColumn, ColumnFunction>
class ColumnFunction final : public COWHelper<IColumnHelper<ColumnFunction>, ColumnFunction>
{
private:
friend class COWHelper<IColumn, ColumnFunction>;
friend class COWHelper<IColumnHelper<ColumnFunction>, ColumnFunction>;
ColumnFunction(
size_t size,
@ -102,7 +102,7 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot insert into {}", getName());
}
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot serialize from {}", getName());
}

View File

@ -2,7 +2,6 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <DataTypes/NumberTraits.h>
#include <Common/HashTable/HashMap.h>
#include <Common/WeakHash.h>
@ -137,7 +136,7 @@ ColumnLowCardinality::ColumnLowCardinality(MutableColumnPtr && column_unique_, M
void ColumnLowCardinality::insert(const Field & x)
{
compactIfSharedDictionary();
idx.insertPosition(dictionary.getColumnUnique().uniqueInsert(x));
idx.insertPosition(getDictionary().uniqueInsert(x));
}
bool ColumnLowCardinality::tryInsert(const Field & x)
@ -175,14 +174,14 @@ void ColumnLowCardinality::insertFrom(const IColumn & src, size_t n)
{
compactIfSharedDictionary();
const auto & nested = *low_cardinality_src->getDictionary().getNestedColumn();
idx.insertPosition(dictionary.getColumnUnique().uniqueInsertFrom(nested, position));
idx.insertPosition(getDictionary().uniqueInsertFrom(nested, position));
}
}
void ColumnLowCardinality::insertFromFullColumn(const IColumn & src, size_t n)
{
compactIfSharedDictionary();
idx.insertPosition(dictionary.getColumnUnique().uniqueInsertFrom(src, n));
idx.insertPosition(getDictionary().uniqueInsertFrom(src, n));
}
void ColumnLowCardinality::insertRangeFrom(const IColumn & src, size_t start, size_t length)
@ -209,7 +208,7 @@ void ColumnLowCardinality::insertRangeFrom(const IColumn & src, size_t start, si
auto src_nested = low_cardinality_src->getDictionary().getNestedColumn();
auto used_keys = src_nested->index(*idx_map, 0);
auto inserted_indexes = dictionary.getColumnUnique().uniqueInsertRangeFrom(*used_keys, 0, used_keys->size());
auto inserted_indexes = getDictionary().uniqueInsertRangeFrom(*used_keys, 0, used_keys->size());
idx.insertPositionsRange(*inserted_indexes->index(*sub_idx, 0), 0, length);
}
}
@ -217,7 +216,7 @@ void ColumnLowCardinality::insertRangeFrom(const IColumn & src, size_t start, si
void ColumnLowCardinality::insertRangeFromFullColumn(const IColumn & src, size_t start, size_t length)
{
compactIfSharedDictionary();
auto inserted_indexes = dictionary.getColumnUnique().uniqueInsertRangeFrom(src, start, length);
auto inserted_indexes = getDictionary().uniqueInsertRangeFrom(src, start, length);
idx.insertPositionsRange(*inserted_indexes, 0, length);
}
@ -257,27 +256,50 @@ void ColumnLowCardinality::insertRangeFromDictionaryEncodedColumn(const IColumn
{
checkPositionsAreLimited(positions, keys.size());
compactIfSharedDictionary();
auto inserted_indexes = dictionary.getColumnUnique().uniqueInsertRangeFrom(keys, 0, keys.size());
auto inserted_indexes = getDictionary().uniqueInsertRangeFrom(keys, 0, keys.size());
idx.insertPositionsRange(*inserted_indexes->index(positions, 0), 0, positions.size());
}
void ColumnLowCardinality::insertData(const char * pos, size_t length)
{
compactIfSharedDictionary();
idx.insertPosition(dictionary.getColumnUnique().uniqueInsertData(pos, length));
idx.insertPosition(getDictionary().uniqueInsertData(pos, length));
}
StringRef ColumnLowCardinality::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
StringRef ColumnLowCardinality::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
return getDictionary().serializeValueIntoArena(getIndexes().getUInt(n), arena, begin);
}
void ColumnLowCardinality::serializeValueIntoMemory(size_t n, char *& memory) const
{
return getDictionary().serializeValueIntoMemory(getIndexes().getUInt(n), memory);
}
void ColumnLowCardinality::collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const
{
/// nullable is handled internally.
chassert(is_null == nullptr);
if (empty())
return;
size_t rows = size();
if (sizes.empty())
sizes.resize_fill(rows);
else if (sizes.size() != rows)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of sizes: {} doesn't match rows_num: {}. It is a bug", sizes.size(), rows);
PaddedPODArray<UInt64> dict_sizes;
getDictionary().collectSerializedValueSizes(dict_sizes, nullptr);
idx.collectSerializedValueSizes(sizes, dict_sizes);
}
const char * ColumnLowCardinality::deserializeAndInsertFromArena(const char * pos)
{
compactIfSharedDictionary();
const char * new_pos;
idx.insertPosition(dictionary.getColumnUnique().uniqueDeserializeAndInsertFromArena(pos, new_pos));
idx.insertPosition(getDictionary().uniqueDeserializeAndInsertFromArena(pos, new_pos));
return new_pos;
}
@ -308,11 +330,6 @@ void ColumnLowCardinality::updateHashFast(SipHash & hash) const
getDictionary().getNestedColumn()->updateHashFast(hash);
}
void ColumnLowCardinality::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
MutableColumnPtr ColumnLowCardinality::cloneResized(size_t size) const
{
auto unique_ptr = dictionary.getColumnUniquePtr();
@ -354,15 +371,6 @@ int ColumnLowCardinality::compareAtWithCollation(size_t n, size_t m, const IColu
return compareAtImpl(n, m, rhs, nan_direction_hint, &collator);
}
void ColumnLowCardinality::compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const
{
return doCompareColumn<ColumnLowCardinality>(
assert_cast<const ColumnLowCardinality &>(rhs), rhs_row_num, row_indexes,
compare_results, direction, nan_direction_hint);
}
bool ColumnLowCardinality::hasEqualValues() const
{
if (getDictionary().size() <= 1)
@ -502,7 +510,7 @@ void ColumnLowCardinality::setSharedDictionary(const ColumnPtr & column_unique)
ColumnLowCardinality::MutablePtr ColumnLowCardinality::cutAndCompact(size_t start, size_t length) const
{
auto sub_positions = IColumn::mutate(idx.getPositions()->cut(start, length));
auto new_column_unique = Dictionary::compact(dictionary.getColumnUnique(), sub_positions);
auto new_column_unique = Dictionary::compact(getDictionary(), sub_positions);
return ColumnLowCardinality::create(std::move(new_column_unique), std::move(sub_positions));
}
@ -812,6 +820,20 @@ void ColumnLowCardinality::Index::updateWeakHash(WeakHash32 & hash, WeakHash32 &
callForType(std::move(update_weak_hash), size_of_type);
}
void ColumnLowCardinality::Index::collectSerializedValueSizes(
PaddedPODArray<UInt64> & sizes, const PaddedPODArray<UInt64> & dict_sizes) const
{
auto func = [&](auto x)
{
using CurIndexType = decltype(x);
auto & data = getPositionsData<CurIndexType>();
size_t rows = sizes.size();
for (size_t i = 0; i < rows; ++i)
sizes[i] += dict_sizes[data[i]];
};
callForType(std::move(func), size_of_type);
}
ColumnLowCardinality::Dictionary::Dictionary(MutableColumnPtr && column_unique_, bool is_shared)
: column_unique(std::move(column_unique_)), shared(is_shared)

View File

@ -1,9 +1,10 @@
#pragma once
#include <Columns/ColumnsNumber.h>
#include <Columns/IColumn.h>
#include <Columns/IColumnUnique.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include "ColumnsNumber.h"
#include <Common/typeid_cast.h>
namespace DB
@ -23,9 +24,9 @@ namespace ErrorCodes
*
* @note The indices column always contains the default value (empty StringRef) with the first index.
*/
class ColumnLowCardinality final : public COWHelper<IColumn, ColumnLowCardinality>
class ColumnLowCardinality final : public COWHelper<IColumnHelper<ColumnLowCardinality>, ColumnLowCardinality>
{
friend class COWHelper<IColumn, ColumnLowCardinality>;
friend class COWHelper<IColumnHelper<ColumnLowCardinality>, ColumnLowCardinality>;
ColumnLowCardinality(MutableColumnPtr && column_unique, MutableColumnPtr && indexes, bool is_shared = false);
ColumnLowCardinality(const ColumnLowCardinality & other) = default;
@ -34,7 +35,7 @@ public:
/** Create immutable column using immutable arguments. This arguments may be shared with other columns.
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
*/
using Base = COWHelper<IColumn, ColumnLowCardinality>;
using Base = COWHelper<IColumnHelper<ColumnLowCardinality>, ColumnLowCardinality>;
static Ptr create(const ColumnPtr & column_unique_, const ColumnPtr & indexes_, bool is_shared = false)
{
return ColumnLowCardinality::create(column_unique_->assumeMutable(), indexes_->assumeMutable(), is_shared);
@ -88,7 +89,10 @@ public:
void popBack(size_t n) override { idx.popBack(n); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
void serializeValueIntoMemory(size_t n, char *& memory) const override;
void collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
@ -125,10 +129,6 @@ public:
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override;
void compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const override;
int compareAtWithCollation(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint, const Collator &) const override;
bool hasEqualValues() const override;
@ -152,8 +152,6 @@ public:
std::vector<MutableColumnPtr> scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override;
void getExtremes(Field & min, Field & max) const override
{
return dictionary.getColumnUnique().getNestedColumn()->index(getIndexes(), 0)->getExtremes(min, max); /// TODO: optimize
@ -315,6 +313,8 @@ public:
void updateWeakHash(WeakHash32 & hash, WeakHash32 & dict_hash) const;
void collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const PaddedPODArray<UInt64> & dict_sizes) const;
private:
WrappedPtr positions;
size_t size_of_type = 0;

View File

@ -1,7 +1,5 @@
#include <Columns/ColumnMap.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/IColumnImpl.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/typeid_cast.h>
@ -120,11 +118,16 @@ void ColumnMap::popBack(size_t n)
nested->popBack(n);
}
StringRef ColumnMap::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
StringRef ColumnMap::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
return nested->serializeValueIntoArena(n, arena, begin);
}
void ColumnMap::serializeValueIntoMemory(size_t n, char *& memory) const
{
return nested->serializeValueIntoMemory(n, memory);
}
const char * ColumnMap::deserializeAndInsertFromArena(const char * pos)
{
return nested->deserializeAndInsertFromArena(pos);
@ -208,19 +211,6 @@ int ColumnMap::compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direct
return nested->compareAt(n, m, rhs_map.getNestedColumn(), nan_direction_hint);
}
void ColumnMap::compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const
{
return doCompareColumn<ColumnMap>(assert_cast<const ColumnMap &>(rhs), rhs_row_num, row_indexes,
compare_results, direction, nan_direction_hint);
}
bool ColumnMap::hasEqualValues() const
{
return hasEqualValuesImpl<ColumnMap>();
}
void ColumnMap::getPermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int nan_direction_hint, IColumn::Permutation & res) const
{
@ -233,11 +223,6 @@ void ColumnMap::updatePermutation(IColumn::PermutationSortDirection direction, I
nested->updatePermutation(direction, stability, limit, nan_direction_hint, res, equal_ranges);
}
void ColumnMap::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
void ColumnMap::reserve(size_t n)
{
nested->reserve(n);
@ -310,21 +295,6 @@ bool ColumnMap::structureEquals(const IColumn & rhs) const
return false;
}
double ColumnMap::getRatioOfDefaultRows(double sample_ratio) const
{
return getRatioOfDefaultRowsImpl<ColumnMap>(sample_ratio);
}
UInt64 ColumnMap::getNumberOfDefaultRows() const
{
return getNumberOfDefaultRowsImpl<ColumnMap>();
}
void ColumnMap::getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const
{
return getIndicesOfNonDefaultRowsImpl<ColumnMap>(indices, from, limit);
}
ColumnPtr ColumnMap::compress() const
{
auto compressed = nested->compress();

View File

@ -10,10 +10,10 @@ namespace DB
/** Column, that stores a nested Array(Tuple(key, value)) column.
*/
class ColumnMap final : public COWHelper<IColumn, ColumnMap>
class ColumnMap final : public COWHelper<IColumnHelper<ColumnMap>, ColumnMap>
{
private:
friend class COWHelper<IColumn, ColumnMap>;
friend class COWHelper<IColumnHelper<ColumnMap>, ColumnMap>;
WrappedPtr nested;
@ -25,7 +25,7 @@ public:
/** Create immutable column using immutable arguments. This arguments may be shared with other columns.
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
*/
using Base = COWHelper<IColumn, ColumnMap>;
using Base = COWHelper<IColumnHelper<ColumnMap>, ColumnMap>;
static Ptr create(const ColumnPtr & keys, const ColumnPtr & values, const ColumnPtr & offsets)
{
@ -59,7 +59,8 @@ public:
bool tryInsert(const Field & x) override;
void insertDefault() override;
void popBack(size_t n) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
void serializeValueIntoMemory(size_t n, char *& memory) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
@ -73,12 +74,7 @@ public:
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
ColumnPtr replicate(const Offsets & offsets) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override;
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override;
void compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const override;
bool hasEqualValues() const override;
void getExtremes(Field & min, Field & max) const override;
void getPermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int nan_direction_hint, IColumn::Permutation & res) const override;
@ -94,9 +90,6 @@ public:
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;
double getRatioOfDefaultRows(double sample_ratio) const override;
UInt64 getNumberOfDefaultRows() const override;
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;
void finalize() override { nested->finalize(); }
bool isFinalized() const override { return nested->isFinalized(); }

View File

@ -2,16 +2,11 @@
#include <Common/SipHash.h>
#include <Common/assert_cast.h>
#include <Common/WeakHash.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnsDateTime.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/ColumnLowCardinality.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#if USE_EMBEDDED_COMPILER
#include <DataTypes/Native.h>
@ -35,7 +30,6 @@ ColumnNullable::ColumnNullable(MutableColumnPtr && nested_column_, MutableColumn
{
/// ColumnNullable cannot have constant nested column. But constant argument could be passed. Materialize it.
nested_column = getNestedColumn().convertToFullColumnIfConst();
nested_type = nested_column->getDataType();
if (!getNestedColumn().canBeInsideNullable())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "{} cannot be inside Nullable column", getNestedColumn().getName());
@ -136,77 +130,35 @@ void ColumnNullable::insertData(const char * pos, size_t length)
}
}
StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
const auto & arr = getNullMapData();
static constexpr auto s = sizeof(arr[0]);
char * pos;
switch (nested_type)
{
case TypeIndex::UInt8:
return static_cast<const ColumnUInt8 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt16:
return static_cast<const ColumnUInt16 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt32:
return static_cast<const ColumnUInt32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt64:
return static_cast<const ColumnUInt64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt128:
return static_cast<const ColumnUInt128 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt256:
return static_cast<const ColumnUInt256 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int8:
return static_cast<const ColumnInt8 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int16:
return static_cast<const ColumnInt16 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int32:
return static_cast<const ColumnInt32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int64:
return static_cast<const ColumnInt64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int128:
return static_cast<const ColumnInt128 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int256:
return static_cast<const ColumnInt256 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Float32:
return static_cast<const ColumnFloat32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Float64:
return static_cast<const ColumnFloat64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Date:
return static_cast<const ColumnDate *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Date32:
return static_cast<const ColumnDate32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::DateTime:
return static_cast<const ColumnDateTime *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::DateTime64:
return static_cast<const ColumnDateTime64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::String:
return static_cast<const ColumnString *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::FixedString:
return static_cast<const ColumnFixedString *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal32:
return static_cast<const ColumnDecimal<Decimal32> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal64:
return static_cast<const ColumnDecimal<Decimal64> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal128:
return static_cast<const ColumnDecimal<Decimal128> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal256:
return static_cast<const ColumnDecimal<Decimal256> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UUID:
return static_cast<const ColumnUUID *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::IPv4:
return static_cast<const ColumnIPv4 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::IPv6:
return static_cast<const ColumnIPv6 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
default:
pos = arena.allocContinue(s, begin);
memcpy(pos, &arr[n], s);
if (arr[n])
return StringRef(pos, s);
auto nested_ref = getNestedColumn().serializeValueIntoArena(n, arena, begin);
/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
return StringRef(nested_ref.data - s, nested_ref.size + s);
}
auto * pos = arena.allocContinue(s, begin);
memcpy(pos, &arr[n], s);
if (arr[n])
return StringRef(pos, s);
auto nested_ref = getNestedColumn().serializeValueIntoArena(n, arena, begin);
/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
return StringRef(nested_ref.data - s, nested_ref.size + s);
}
void ColumnNullable::serializeValueIntoMemory(size_t n, char *& memory) const
{
const auto & arr = getNullMapData();
static constexpr auto s = sizeof(arr[0]);
memcpy(memory, &arr[n], s);
++memory;
if (arr[n])
return;
getNestedColumn().serializeValueIntoMemory(n, memory);
}
const char * ColumnNullable::deserializeAndInsertFromArena(const char * pos)
@ -418,19 +370,6 @@ int ColumnNullable::compareAtWithCollation(size_t n, size_t m, const IColumn & r
return compareAtImpl(n, m, rhs_, null_direction_hint, &collator);
}
void ColumnNullable::compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const
{
return doCompareColumn<ColumnNullable>(assert_cast<const ColumnNullable &>(rhs), rhs_row_num, row_indexes,
compare_results, direction, nan_direction_hint);
}
bool ColumnNullable::hasEqualValues() const
{
return hasEqualValuesImpl<ColumnNullable>();
}
void ColumnNullable::getPermutationImpl(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int null_direction_hint, Permutation & res, const Collator * collator) const
{
@ -680,11 +619,6 @@ void ColumnNullable::updatePermutationWithCollation(const Collator & collator, I
updatePermutationImpl(direction, stability, limit, null_direction_hint, res, equal_ranges, &collator);
}
void ColumnNullable::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
void ColumnNullable::reserve(size_t n)
{
getNestedColumn().reserve(n);

View File

@ -1,12 +1,10 @@
#pragma once
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include "Core/TypeId.h"
#include "config.h"
@ -27,10 +25,10 @@ using ConstNullMapPtr = const NullMap *;
/// over a bitmap because columns are usually stored on disk as compressed
/// files. In this regard, using a bitmap instead of a byte map would
/// greatly complicate the implementation with little to no benefits.
class ColumnNullable final : public COWHelper<IColumn, ColumnNullable>
class ColumnNullable final : public COWHelper<IColumnHelper<ColumnNullable>, ColumnNullable>
{
private:
friend class COWHelper<IColumn, ColumnNullable>;
friend class COWHelper<IColumnHelper<ColumnNullable>, ColumnNullable>;
ColumnNullable(MutableColumnPtr && nested_column_, MutableColumnPtr && null_map_);
ColumnNullable(const ColumnNullable &) = default;
@ -39,7 +37,7 @@ public:
/** Create immutable column using immutable arguments. This arguments may be shared with other columns.
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
*/
using Base = COWHelper<IColumn, ColumnNullable>;
using Base = COWHelper<IColumnHelper<ColumnNullable>, ColumnNullable>;
static Ptr create(const ColumnPtr & nested_column_, const ColumnPtr & null_map_)
{
return ColumnNullable::create(nested_column_->assumeMutable(), null_map_->assumeMutable());
@ -63,7 +61,8 @@ public:
StringRef getDataAt(size_t) const override;
/// Will insert null value if pos=nullptr
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
void serializeValueIntoMemory(size_t n, char *& memory) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
@ -96,11 +95,7 @@ public:
#endif
void compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const override;
int compareAtWithCollation(size_t n, size_t m, const IColumn & rhs, int null_direction_hint, const Collator &) const override;
bool hasEqualValues() const override;
void getPermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int null_direction_hint, Permutation & res) const override;
void updatePermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
@ -124,13 +119,6 @@ public:
// Special function for nullable minmax index
void getExtremesNullLast(Field & min, Field & max) const;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
return scatterImpl<ColumnNullable>(num_columns, selector);
}
void gather(ColumnGathererStream & gatherer_stream) override;
ColumnPtr compress() const override;
void forEachSubcolumn(MutableColumnCallback callback) override
@ -154,21 +142,6 @@ public:
return false;
}
double getRatioOfDefaultRows(double sample_ratio) const override
{
return getRatioOfDefaultRowsImpl<ColumnNullable>(sample_ratio);
}
UInt64 getNumberOfDefaultRows() const override
{
return getNumberOfDefaultRowsImpl<ColumnNullable>();
}
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override
{
getIndicesOfNonDefaultRowsImpl<ColumnNullable>(indices, from, limit);
}
ColumnPtr createWithOffsets(const Offsets & offsets, const ColumnConst & column_with_default_value, size_t total_rows, size_t shift) const override;
bool isNullable() const override { return true; }
@ -215,8 +188,6 @@ public:
private:
WrappedPtr nested_column;
WrappedPtr null_map;
// optimize serializeValueIntoArena
TypeIndex nested_type;
template <bool negative>
void applyNullMapImpl(const NullMap & map);

View File

@ -12,7 +12,6 @@
#include <Interpreters/castColumn.h>
#include <Interpreters/convertFieldToType.h>
#include <Common/HashTable/HashSet.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <numeric>
@ -852,14 +851,6 @@ void ColumnObject::getPermutation(PermutationSortDirection, PermutationSortStabi
iota(res.data(), res.size(), size_t(0));
}
void ColumnObject::compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const
{
return doCompareColumn<ColumnObject>(assert_cast<const ColumnObject &>(rhs), rhs_row_num, row_indexes,
compare_results, direction, nan_direction_hint);
}
void ColumnObject::getExtremes(Field & min, Field & max) const
{
if (num_rows == 0)
@ -874,16 +865,6 @@ void ColumnObject::getExtremes(Field & min, Field & max) const
}
}
MutableColumns ColumnObject::scatter(ColumnIndex num_columns, const Selector & selector) const
{
return scatterImpl<ColumnObject>(num_columns, selector);
}
void ColumnObject::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
const ColumnObject::Subcolumn & ColumnObject::getSubcolumn(const PathInData & key) const
{
if (const auto * node = subcolumns.findLeaf(key))

View File

@ -48,7 +48,7 @@ FieldInfo getFieldInfo(const Field & field);
* a trie-like structure. ColumnObject is not suitable for writing into tables
* and it should be converted to Tuple with fixed set of subcolumns before that.
*/
class ColumnObject final : public COWHelper<IColumn, ColumnObject>
class ColumnObject final : public COWHelper<IColumnHelper<ColumnObject>, ColumnObject>
{
public:
/** Class that represents one subcolumn.
@ -229,23 +229,17 @@ public:
/// Order of rows in ColumnObject is undefined.
void getPermutation(PermutationSortDirection, PermutationSortStability, size_t, int, Permutation & res) const override;
void compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const override;
void updatePermutation(PermutationSortDirection, PermutationSortStability, size_t, int, Permutation &, EqualRanges &) const override {}
int compareAt(size_t, size_t, const IColumn &, int) const override { return 0; }
void getExtremes(Field & min, Field & max) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer) override;
/// All other methods throw exception.
StringRef getDataAt(size_t) const override { throwMustBeConcrete(); }
bool isDefaultAt(size_t) const override { throwMustBeConcrete(); }
void insertData(const char *, size_t) override { throwMustBeConcrete(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override { throwMustBeConcrete(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwMustBeConcrete(); }
void serializeValueIntoMemory(size_t, char *&) const override { throwMustBeConcrete(); }
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeConcrete(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeConcrete(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeConcrete(); }

View File

@ -1,13 +1,14 @@
#include <Columns/ColumnCompressed.h>
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsCommon.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Columns/ColumnTuple.h>
#include <Common/HashTable/Hash.h>
#include <Common/SipHash.h>
#include <Common/WeakHash.h>
#include <Common/iota.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <algorithm>
#include <bit>
@ -152,11 +153,16 @@ void ColumnSparse::insertData(const char * pos, size_t length)
insertSingleValue([&](IColumn & column) { column.insertData(pos, length); });
}
StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
return values->serializeValueIntoArena(getValueIndex(n), arena, begin);
}
void ColumnSparse::serializeValueIntoMemory(size_t n, char *& memory) const
{
values->serializeValueIntoMemory(getValueIndex(n), memory);
}
const char * ColumnSparse::deserializeAndInsertFromArena(const char * pos)
{
const char * res = nullptr;
@ -730,16 +736,6 @@ UInt64 ColumnSparse::getNumberOfDefaultRows() const
return _size - offsets->size();
}
MutableColumns ColumnSparse::scatter(ColumnIndex num_columns, const Selector & selector) const
{
return scatterImpl<ColumnSparse>(num_columns, selector);
}
void ColumnSparse::gather(ColumnGathererStream & gatherer_stream)
{
gatherer_stream.gather(*this);
}
ColumnPtr ColumnSparse::compress() const
{
auto values_compressed = values->compress();

View File

@ -1,7 +1,6 @@
#pragma once
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
@ -18,10 +17,10 @@ namespace DB
* values contains also one default value at 0 position to make
* implementation of execution of functions and sorting more convenient.
*/
class ColumnSparse final : public COWHelper<IColumn, ColumnSparse>
class ColumnSparse final : public COWHelper<IColumnHelper<ColumnSparse>, ColumnSparse>
{
private:
friend class COWHelper<IColumn, ColumnSparse>;
friend class COWHelper<IColumnHelper<ColumnSparse>, ColumnSparse>;
explicit ColumnSparse(MutableColumnPtr && values_);
ColumnSparse(MutableColumnPtr && values_, MutableColumnPtr && offsets_, size_t size_);
@ -31,7 +30,7 @@ public:
static constexpr auto DEFAULT_ROWS_SEARCH_SAMPLE_RATIO = 0.1;
static constexpr auto DEFAULT_RATIO_FOR_SPARSE_SERIALIZATION = 0.95;
using Base = COWHelper<IColumn, ColumnSparse>;
using Base = COWHelper<IColumnHelper<ColumnSparse>, ColumnSparse>;
static Ptr create(const ColumnPtr & values_, const ColumnPtr & offsets_, size_t size_)
{
return Base::create(values_->assumeMutable(), offsets_->assumeMutable(), size_);
@ -78,7 +77,8 @@ public:
/// Will insert null value if pos=nullptr
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
void serializeValueIntoMemory(size_t n, char *& memory) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char *) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
@ -135,10 +135,6 @@ public:
double getRatioOfDefaultRows(double sample_ratio) const override;
UInt64 getNumberOfDefaultRows() const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override;
ColumnPtr compress() const override;
void forEachSubcolumn(MutableColumnCallback callback) override;

View File

@ -4,7 +4,6 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/MaskOperations.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Common/Arena.h>
#include <Common/HashTable/Hash.h>
#include <Common/WeakHash.h>
@ -27,7 +26,7 @@ namespace ErrorCodes
ColumnString::ColumnString(const ColumnString & src)
: COWHelper<IColumn, ColumnString>(src),
: COWHelper<IColumnHelper<ColumnString>, ColumnString>(src),
offsets(src.offsets.begin(), src.offsets.end()),
chars(src.chars.begin(), src.chars.end())
{
@ -213,34 +212,68 @@ ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
}
StringRef ColumnString::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
void ColumnString::collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const
{
size_t string_size = sizeAt(n);
size_t offset = offsetAt(n);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
if (empty())
return;
size_t rows = size();
if (sizes.empty())
sizes.resize_fill(rows);
else if (sizes.size() != rows)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of sizes: {} doesn't match rows_num: {}. It is a bug", sizes.size(), rows);
if (is_null)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(string_size) + string_size;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
for (size_t i = 0; i < rows; ++i)
{
if (is_null[i])
{
++sizes[i];
}
else
{
size_t string_size = sizeAt(i);
sizes[i] += sizeof(string_size) + string_size + 1 /* null byte */;
}
}
}
else
{
res.size = sizeof(string_size) + string_size;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
for (size_t i = 0; i < rows; ++i)
{
size_t string_size = sizeAt(i);
sizes[i] += sizeof(string_size) + string_size;
}
}
}
StringRef ColumnString::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
size_t string_size = sizeAt(n);
size_t offset = offsetAt(n);
StringRef res;
res.size = sizeof(string_size) + string_size;
char * pos = arena.allocContinue(res.size, begin);
memcpy(pos, &string_size, sizeof(string_size));
memcpy(pos + sizeof(string_size), &chars[offset], string_size);
res.data = pos;
return res;
}
void ColumnString::serializeValueIntoMemory(size_t n, char *& memory) const
{
size_t string_size = sizeAt(n);
size_t offset = offsetAt(n);
memcpy(memory, &string_size, sizeof(string_size));
memcpy(memory + sizeof(string_size), &chars[offset], string_size);
memory += sizeof(string_size) + string_size;
}
const char * ColumnString::deserializeAndInsertFromArena(const char * pos)
{
const size_t string_size = unalignedLoad<size_t>(pos);
@ -303,20 +336,6 @@ ColumnPtr ColumnString::indexImpl(const PaddedPODArray<Type> & indexes, size_t l
return res;
}
void ColumnString::compareColumn(
const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const
{
return doCompareColumn<ColumnString>(assert_cast<const ColumnString &>(rhs), rhs_row_num, row_indexes,
compare_results, direction, nan_direction_hint);
}
bool ColumnString::hasEqualValues() const
{
return hasEqualValuesImpl<ColumnString>();
}
struct ColumnString::ComparatorBase
{
const ColumnString & parent;
@ -482,13 +501,6 @@ ColumnPtr ColumnString::replicate(const Offsets & replicate_offsets) const
return res;
}
void ColumnString::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
void ColumnString::reserve(size_t n)
{
offsets.reserve_exact(n);

View File

@ -23,14 +23,14 @@ class Arena;
/** Column for String values.
*/
class ColumnString final : public COWHelper<IColumn, ColumnString>
class ColumnString final : public COWHelper<IColumnHelper<ColumnString>, ColumnString>
{
public:
using Char = UInt8;
using Chars = PaddedPODArray<UInt8>;
private:
friend class COWHelper<IColumn, ColumnString>;
friend class COWHelper<IColumnHelper<ColumnString>, ColumnString>;
/// Maps i'th position to offset to i+1'th element. Last offset maps to the end of all chars (is the size of all chars).
Offsets offsets;
@ -179,7 +179,10 @@ public:
offsets.resize_assume_reserved(offsets.size() - n);
}
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
void collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
void serializeValueIntoMemory(size_t n, char *& memory) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
@ -234,12 +237,6 @@ public:
return memcmpSmallAllowOverflow15(chars.data() + offsetAt(n), sizeAt(n) - 1, rhs.chars.data() + rhs.offsetAt(m), rhs.sizeAt(m) - 1);
}
void compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const override;
bool hasEqualValues() const override;
/// Variant of compareAt for string comparison with respect of collation.
int compareAtWithCollation(size_t n, size_t m, const IColumn & rhs_, int, const Collator & collator) const override;
@ -258,13 +255,6 @@ public:
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
return scatterImpl<ColumnString>(num_columns, selector);
}
void gather(ColumnGathererStream & gatherer_stream) override;
ColumnPtr compress() const override;
void reserve(size_t n) override;
@ -272,7 +262,6 @@ public:
void getExtremes(Field & min, Field & max) const override;
bool canBeInsideNullable() const override { return true; }
bool structureEquals(const IColumn & rhs) const override
@ -280,21 +269,6 @@ public:
return typeid(rhs) == typeid(ColumnString);
}
double getRatioOfDefaultRows(double sample_ratio) const override
{
return getRatioOfDefaultRowsImpl<ColumnString>(sample_ratio);
}
UInt64 getNumberOfDefaultRows() const override
{
return getNumberOfDefaultRowsImpl<ColumnString>();
}
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override
{
return getIndicesOfNonDefaultRowsImpl<ColumnString>(indices, from, limit);
}
Chars & getChars() { return chars; }
const Chars & getChars() const { return chars; }

View File

@ -3,15 +3,15 @@
#include <Columns/ColumnCompressed.h>
#include <Columns/IColumnImpl.h>
#include <Core/Field.h>
#include <Common/WeakHash.h>
#include <Common/assert_cast.h>
#include <Common/iota.h>
#include <Common/typeid_cast.h>
#include <DataTypes/Serializations/SerializationInfoTuple.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <base/sort.h>
#include <Common/WeakHash.h>
#include <Common/assert_cast.h>
#include <Common/iota.h>
#include <Common/typeid_cast.h>
namespace DB
@ -197,7 +197,7 @@ void ColumnTuple::popBack(size_t n)
column->popBack(n);
}
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
StringRef res(begin, 0);
for (const auto & column : columns)
@ -210,6 +210,12 @@ StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char con
return res;
}
void ColumnTuple::serializeValueIntoMemory(size_t n, char *& memory) const
{
for (const auto & column : columns)
column->serializeValueIntoMemory(n, memory);
}
const char * ColumnTuple::deserializeAndInsertFromArena(const char * pos)
{
for (auto & column : columns)
@ -351,24 +357,11 @@ int ColumnTuple::compareAt(size_t n, size_t m, const IColumn & rhs, int nan_dire
return compareAtImpl(n, m, rhs, nan_direction_hint);
}
void ColumnTuple::compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const
{
return doCompareColumn<ColumnTuple>(assert_cast<const ColumnTuple &>(rhs), rhs_row_num, row_indexes,
compare_results, direction, nan_direction_hint);
}
int ColumnTuple::compareAtWithCollation(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint, const Collator & collator) const
{
return compareAtImpl(n, m, rhs, nan_direction_hint, &collator);
}
bool ColumnTuple::hasEqualValues() const
{
return hasEqualValuesImpl<ColumnTuple>();
}
template <bool positive>
struct ColumnTuple::Less
{
@ -457,11 +450,6 @@ void ColumnTuple::updatePermutationWithCollation(const Collator & collator, ICol
updatePermutationImpl(direction, stability, limit, nan_direction_hint, res, equal_ranges, &collator);
}
void ColumnTuple::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
void ColumnTuple::reserve(size_t n)
{
const size_t tuple_size = columns.size();
@ -592,21 +580,6 @@ ColumnPtr ColumnTuple::compress() const
});
}
double ColumnTuple::getRatioOfDefaultRows(double sample_ratio) const
{
return getRatioOfDefaultRowsImpl<ColumnTuple>(sample_ratio);
}
UInt64 ColumnTuple::getNumberOfDefaultRows() const
{
return getNumberOfDefaultRowsImpl<ColumnTuple>();
}
void ColumnTuple::getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const
{
return getIndicesOfNonDefaultRowsImpl<ColumnTuple>(indices, from, limit);
}
void ColumnTuple::finalize()
{
for (auto & column : columns)

View File

@ -12,10 +12,10 @@ namespace DB
* Mixed constant/non-constant columns is prohibited in tuple
* for implementation simplicity.
*/
class ColumnTuple final : public COWHelper<IColumn, ColumnTuple>
class ColumnTuple final : public COWHelper<IColumnHelper<ColumnTuple>, ColumnTuple>
{
private:
friend class COWHelper<IColumn, ColumnTuple>;
friend class COWHelper<IColumnHelper<ColumnTuple>, ColumnTuple>;
using TupleColumns = std::vector<WrappedPtr>;
TupleColumns columns;
@ -30,7 +30,7 @@ public:
/** Create immutable column using immutable arguments. This arguments may be shared with other columns.
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
*/
using Base = COWHelper<IColumn, ColumnTuple>;
using Base = COWHelper<IColumnHelper<ColumnTuple>, ColumnTuple>;
static Ptr create(const Columns & columns);
static Ptr create(const TupleColumns & columns);
static Ptr create(Columns && arg) { return create(arg); }
@ -62,7 +62,8 @@ public:
void insertFrom(const IColumn & src_, size_t n) override;
void insertDefault() override;
void popBack(size_t n) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
void serializeValueIntoMemory(size_t n, char *& memory) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
@ -75,13 +76,8 @@ public:
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
ColumnPtr replicate(const Offsets & offsets) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override;
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override;
void compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const override;
int compareAtWithCollation(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint, const Collator & collator) const override;
bool hasEqualValues() const override;
void getExtremes(Field & min, Field & max) const override;
void getPermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int nan_direction_hint, IColumn::Permutation & res) const override;
@ -103,9 +99,6 @@ public:
bool structureEquals(const IColumn & rhs) const override;
bool isCollationSupported() const override;
ColumnPtr compress() const override;
double getRatioOfDefaultRows(double sample_ratio) const override;
UInt64 getNumberOfDefaultRows() const override;
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;
void finalize() override;
bool isFinalized() const override;

View File

@ -1,7 +1,6 @@
#pragma once
#include <Columns/IColumnUnique.h>
#include <Columns/IColumnImpl.h>
#include <Columns/ReverseIndex.h>
#include <Columns/ColumnVector.h>
@ -80,7 +79,9 @@ public:
Float32 getFloat32(size_t n) const override { return getNestedColumn()->getFloat32(n); }
bool getBool(size_t n) const override { return getNestedColumn()->getBool(n); }
bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
void collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
void serializeValueIntoMemory(size_t n, char *& memory) const override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash_func) const override
{
@ -394,7 +395,21 @@ size_t ColumnUnique<ColumnType>::uniqueInsertData(const char * pos, size_t lengt
}
template <typename ColumnType>
StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
void ColumnUnique<ColumnType>::collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const
{
/// nullable is handled internally.
chassert(is_null == nullptr);
if (IColumn::empty())
return;
if (is_nullable)
column_holder->collectSerializedValueSizes(sizes, assert_cast<const ColumnUInt8 &>(*nested_null_mask).getData().data());
else
column_holder->collectSerializedValueSizes(sizes, nullptr);
}
template <typename ColumnType>
StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
if (is_nullable)
{
@ -417,6 +432,22 @@ StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & ar
return column_holder->serializeValueIntoArena(n, arena, begin);
}
template <typename ColumnType>
void ColumnUnique<ColumnType>::serializeValueIntoMemory(size_t n, char *& memory) const
{
if (is_nullable)
{
UInt8 flag = (n == getNullValueIndex() ? 1 : 0);
unalignedStore<UInt8>(memory, flag);
++memory;
if (n == getNullValueIndex())
return;
}
column_holder->serializeValueIntoMemory(n, memory);
}
template <typename ColumnType>
size_t ColumnUnique<ColumnType>::uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos)
{

View File

@ -643,7 +643,7 @@ void ColumnVariant::popBack(size_t n)
offsets->popBack(n);
}
StringRef ColumnVariant::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
StringRef ColumnVariant::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
/// During any serialization/deserialization we should always use global discriminators.
Discriminator global_discr = globalDiscriminatorAt(n);
@ -1304,7 +1304,14 @@ UInt64 ColumnVariant::getNumberOfDefaultRows() const
void ColumnVariant::getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const
{
return getIndicesOfNonDefaultRowsImpl<ColumnVariant>(indices, from, limit);
size_t to = limit && from + limit < size() ? from + limit : size();
indices.reserve(indices.size() + to - from);
for (size_t i = from; i < to; ++i)
{
if (!isDefaultAt(i))
indices.push_back(i);
}
}
void ColumnVariant::finalize()

View File

@ -185,7 +185,7 @@ public:
void insertDefault() override;
void insertManyDefaults(size_t length) override;
void popBack(size_t n) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;

View File

@ -51,31 +51,6 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
template <typename T>
StringRef ColumnVector<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
unalignedStore<T>(pos, data[n]);
return res;
}
template <typename T>
const char * ColumnVector<T>::deserializeAndInsertFromArena(const char * pos)
{
@ -862,12 +837,6 @@ ColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets & offsets) const
return res;
}
template <typename T>
void ColumnVector<T>::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
template <typename T>
void ColumnVector<T>::getExtremes(Field & min, Field & max) const
{

View File

@ -1,16 +1,15 @@
#pragma once
#include <cmath>
#include <Columns/ColumnVectorHelper.h>
#include <Columns/ColumnFixedSizeHelper.h>
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
#include <Common/TargetSpecific.h>
#include <Common/assert_cast.h>
#include <Core/CompareHelper.h>
#include <Core/Field.h>
#include <Core/TypeId.h>
#include <base/TypeName.h>
#include <base/unaligned.h>
#include <Common/TargetSpecific.h>
#include <Common/assert_cast.h>
#include "config.h"
@ -30,13 +29,13 @@ namespace ErrorCodes
/** A template for columns that use a simple array to store.
*/
template <typename T>
class ColumnVector final : public COWHelper<ColumnVectorHelper, ColumnVector<T>>
class ColumnVector final : public COWHelper<IColumnHelper<ColumnVector<T>, ColumnFixedSizeHelper>, ColumnVector<T>>
{
static_assert(!is_decimal<T>);
private:
using Self = ColumnVector;
friend class COWHelper<ColumnVectorHelper, Self>;
friend class COWHelper<IColumnHelper<Self, ColumnFixedSizeHelper>, Self>;
struct less;
struct less_stable;
@ -101,8 +100,6 @@ public:
data.resize_assume_reserved(data.size() - n);
}
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
@ -158,19 +155,6 @@ public:
#endif
void compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const override
{
return this->template doCompareColumn<Self>(assert_cast<const Self &>(rhs), rhs_row_num, row_indexes,
compare_results, direction, nan_direction_hint);
}
bool hasEqualValues() const override
{
return this->template hasEqualValuesImpl<Self>();
}
void getPermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int nan_direction_hint, IColumn::Permutation & res) const override;
@ -265,13 +249,6 @@ public:
void getExtremes(Field & min, Field & max) const override;
MutableColumns scatter(IColumn::ColumnIndex num_columns, const IColumn::Selector & selector) const override
{
return this->template scatterImpl<Self>(num_columns, selector);
}
void gather(ColumnGathererStream & gatherer_stream) override;
bool canBeInsideNullable() const override { return true; }
bool isFixedAndContiguous() const override { return true; }
size_t sizeOfValueIfFixed() const override { return sizeof(T); }
@ -293,21 +270,6 @@ public:
return typeid(rhs) == typeid(ColumnVector<T>);
}
double getRatioOfDefaultRows(double sample_ratio) const override
{
return this->template getRatioOfDefaultRowsImpl<Self>(sample_ratio);
}
UInt64 getNumberOfDefaultRows() const override
{
return this->template getNumberOfDefaultRowsImpl<Self>();
}
void getIndicesOfNonDefaultRows(IColumn::Offsets & indices, size_t from, size_t limit) const override
{
return this->template getIndicesOfNonDefaultRowsImpl<Self>(indices, from, limit);
}
ColumnPtr createWithOffsets(const IColumn::Offsets & offsets, const ColumnConst & column_with_default_value, size_t total_rows, size_t shift) const override;
ColumnPtr compress() const override;

View File

@ -1,12 +1,26 @@
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/IColumnDummy.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnFunction.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnObject.h>
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnVector.h>
#include <Core/Field.h>
#include <DataTypes/Serializations/SerializationInfo.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
namespace DB
{
@ -14,6 +28,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
}
String IColumn::dumpStructure() const
@ -94,4 +109,363 @@ bool isColumnConst(const IColumn & column)
return checkColumn<ColumnConst>(column);
}
template <typename Derived, typename Parent>
MutableColumns IColumnHelper<Derived, Parent>::scatter(IColumn::ColumnIndex num_columns, const IColumn::Selector & selector) const
{
const auto & self = static_cast<const Derived &>(*this);
size_t num_rows = self.size();
if (num_rows != selector.size())
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of selector: {} doesn't match size of column: {}",
selector.size(), num_rows);
MutableColumns columns(num_columns);
for (auto & column : columns)
column = self.cloneEmpty();
{
size_t reserve_size = static_cast<size_t>(num_rows * 1.1 / num_columns); /// 1.1 is just a guess. Better to use n-sigma rule.
if (reserve_size > 1)
for (auto & column : columns)
column->reserve(reserve_size);
}
for (size_t i = 0; i < num_rows; ++i)
static_cast<Derived &>(*columns[selector[i]]).insertFrom(*this, i);
return columns;
}
template <typename Derived, typename Parent>
void IColumnHelper<Derived, Parent>::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(static_cast<Derived &>(*this));
}
template <typename Derived, bool reversed>
void compareImpl(
const Derived & lhs,
const Derived & rhs,
size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes [[maybe_unused]],
PaddedPODArray<Int8> & compare_results,
int nan_direction_hint)
{
size_t num_rows = lhs.size();
if (compare_results.empty())
compare_results.resize(num_rows);
else if (compare_results.size() != num_rows)
throw Exception(
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH,
"Size of compare_results: {} doesn't match rows_num: {}",
compare_results.size(),
num_rows);
for (size_t i = 0; i < num_rows; ++i)
{
UInt64 row = i;
int res = lhs.compareAt(row, rhs_row_num, rhs, nan_direction_hint);
assert(res == 1 || res == -1 || res == 0);
compare_results[row] = static_cast<Int8>(res);
if constexpr (reversed)
compare_results[row] = -compare_results[row];
}
}
template <typename Derived, bool reversed>
void compareWithIndexImpl(
const Derived & lhs,
const Derived & rhs,
size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes [[maybe_unused]],
PaddedPODArray<Int8> & compare_results,
int nan_direction_hint)
{
size_t num_rows = lhs.size();
if (compare_results.empty())
compare_results.resize(num_rows);
else if (compare_results.size() != num_rows)
throw Exception(
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH,
"Size of compare_results: {} doesn't match rows_num: {}",
compare_results.size(),
num_rows);
UInt64 * indexes = row_indexes->data();
UInt64 * next_index = indexes;
size_t num_indexes = row_indexes->size();
for (size_t i = 0; i < num_indexes; ++i)
{
UInt64 row = indexes[i];
int res = lhs.compareAt(row, rhs_row_num, rhs, nan_direction_hint);
assert(res == 1 || res == -1 || res == 0);
compare_results[row] = static_cast<Int8>(res);
if constexpr (reversed)
compare_results[row] = -compare_results[row];
if (compare_results[row] == 0)
{
*next_index = row;
++next_index;
}
}
size_t equal_row_indexes_size = next_index - row_indexes->data();
row_indexes->resize(equal_row_indexes_size);
}
template <typename Derived, typename Parent>
void IColumnHelper<Derived, Parent>::compareColumn(
const IColumn & rhs_base,
size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes,
PaddedPODArray<Int8> & compare_results,
int direction,
int nan_direction_hint) const
{
const auto & lhs = static_cast<const Derived &>(*this);
const auto & rhs = static_cast<const Derived &>(rhs_base);
if (direction < 0)
{
if (row_indexes)
compareWithIndexImpl<Derived, true>(lhs, rhs, rhs_row_num, row_indexes, compare_results, nan_direction_hint);
else
compareImpl<Derived, true>(lhs, rhs, rhs_row_num, row_indexes, compare_results, nan_direction_hint);
}
else if (row_indexes)
{
compareWithIndexImpl<Derived, false>(lhs, rhs, rhs_row_num, row_indexes, compare_results, nan_direction_hint);
}
else
{
compareImpl<Derived, false>(lhs, rhs, rhs_row_num, row_indexes, compare_results, nan_direction_hint);
}
}
template <typename Derived, typename Parent>
bool IColumnHelper<Derived, Parent>::hasEqualValues() const
{
const auto & self = static_cast<const Derived &>(*this);
size_t num_rows = self.size();
for (size_t i = 1; i < num_rows; ++i)
{
if (self.compareAt(i, 0, self, false) != 0)
return false;
}
return true;
}
template <typename Derived, typename Parent>
double IColumnHelper<Derived, Parent>::getRatioOfDefaultRows(double sample_ratio) const
{
if (sample_ratio <= 0.0 || sample_ratio > 1.0)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Value of 'sample_ratio' must be in interval (0.0; 1.0], but got: {}", sample_ratio);
static constexpr auto max_number_of_rows_for_full_search = 1000;
const auto & self = static_cast<const Derived &>(*this);
size_t num_rows = self.size();
size_t num_sampled_rows = std::min(static_cast<size_t>(num_rows * sample_ratio), num_rows);
size_t num_checked_rows = 0;
size_t res = 0;
if (num_sampled_rows == num_rows || num_rows <= max_number_of_rows_for_full_search)
{
for (size_t i = 0; i < num_rows; ++i)
res += self.isDefaultAt(i);
num_checked_rows = num_rows;
}
else if (num_sampled_rows != 0)
{
for (size_t i = 0; i < num_rows; ++i)
{
if (num_checked_rows * num_rows <= i * num_sampled_rows)
{
res += self.isDefaultAt(i);
++num_checked_rows;
}
}
}
if (num_checked_rows == 0)
return 0.0;
return static_cast<double>(res) / num_checked_rows;
}
template <typename Derived, typename Parent>
UInt64 IColumnHelper<Derived, Parent>::getNumberOfDefaultRows() const
{
const auto & self = static_cast<const Derived &>(*this);
UInt64 res = 0;
size_t num_rows = self.size();
for (size_t i = 0; i < num_rows; ++i)
res += self.isDefaultAt(i);
return res;
}
template <typename Derived, typename Parent>
void IColumnHelper<Derived, Parent>::getIndicesOfNonDefaultRows(IColumn::Offsets & indices, size_t from, size_t limit) const
{
const auto & self = static_cast<const Derived &>(*this);
size_t to = limit && from + limit < self.size() ? from + limit : self.size();
indices.reserve_exact(indices.size() + to - from);
for (size_t i = from; i < to; ++i)
{
if (!self.isDefaultAt(i))
indices.push_back(i);
}
}
template <typename Derived, typename Parent>
StringRef
IColumnHelper<Derived, Parent>::serializeValueIntoArenaWithNull(size_t n, Arena & arena, char const *& begin, const UInt8 * is_null) const
{
const auto & self = static_cast<const Derived &>(*this);
if (is_null)
{
char * memory;
if (is_null[n])
{
memory = arena.allocContinue(1, begin);
*memory = 1;
return {memory, 1};
}
size_t sz = self.byteSizeAt(n) + 1 /* null byte */;
memory = arena.allocContinue(sz, begin);
StringRef ret(memory, sz);
*memory = 0;
++memory;
self.serializeValueIntoMemory(n, memory);
return ret;
}
else
{
return self.serializeValueIntoArena(n, arena, begin);
}
}
template <typename Derived, typename Parent>
StringRef IColumnHelper<Derived, Parent>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
if constexpr (!std::is_base_of_v<ColumnFixedSizeHelper, Derived>)
return IColumn::serializeValueIntoArena(n, arena, begin);
const auto & self = static_cast<const Derived &>(*this);
size_t sz = self.byteSizeAt(n);
char * memory = arena.allocContinue(sz, begin);
self.serializeValueIntoMemory(n, memory);
return {memory - sz, sz};
}
template <typename Derived, typename Parent>
void IColumnHelper<Derived, Parent>::serializeValueIntoMemoryWithNull(size_t n, char *& memory, const UInt8 * is_null) const
{
const auto & self = static_cast<const Derived &>(*this);
if (is_null)
{
*memory = is_null[n];
++memory;
if (!is_null[n])
self.serializeValueIntoMemory(n, memory);
}
else
{
self.serializeValueIntoMemory(n, memory);
}
}
template <typename Derived, typename Parent>
void IColumnHelper<Derived, Parent>::serializeValueIntoMemory(size_t n, char *& memory) const
{
if constexpr (!std::is_base_of_v<ColumnFixedSizeHelper, Derived>)
return IColumn::serializeValueIntoMemory(n, memory);
const auto & self = static_cast<const Derived &>(*this);
auto raw_data = self.getDataAt(n);
memcpy(memory, raw_data.data, raw_data.size);
memory += raw_data.size;
}
template <typename Derived, typename Parent>
void IColumnHelper<Derived, Parent>::collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const
{
if constexpr (!std::is_base_of_v<ColumnFixedSizeHelper, Derived>)
return IColumn::collectSerializedValueSizes(sizes, is_null);
const auto & self = static_cast<const Derived &>(*this);
size_t rows = self.size();
if (sizes.empty())
sizes.resize_fill(rows);
else if (sizes.size() != rows)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of sizes: {} doesn't match rows_num: {}. It is a bug", sizes.size(), rows);
if (rows == 0)
return;
size_t element_size = self.byteSizeAt(0);
if (is_null)
{
for (size_t i = 0; i < rows; ++i)
{
if (is_null[i])
++sizes[i];
else
sizes[i] += element_size + 1 /* null byte */;
}
}
else
{
for (auto & sz : sizes)
sz += element_size;
}
}
template class IColumnHelper<ColumnVector<UInt8>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<UInt16>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<UInt32>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<UInt64>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<UInt128>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<UInt256>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<Int8>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<Int16>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<Int32>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<Int64>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<Int128>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<Int256>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<Float32>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<Float64>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<UUID>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<IPv4>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnVector<IPv6>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnDecimal<Decimal32>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnDecimal<Decimal64>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnDecimal<Decimal128>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnDecimal<Decimal256>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnDecimal<DateTime64>, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnFixedString, ColumnFixedSizeHelper>;
template class IColumnHelper<ColumnString, IColumn>;
template class IColumnHelper<ColumnLowCardinality, IColumn>;
template class IColumnHelper<ColumnNullable, IColumn>;
template class IColumnHelper<ColumnConst, IColumn>;
template class IColumnHelper<ColumnArray, IColumn>;
template class IColumnHelper<ColumnTuple, IColumn>;
template class IColumnHelper<ColumnMap, IColumn>;
template class IColumnHelper<ColumnSparse, IColumn>;
template class IColumnHelper<ColumnObject, IColumn>;
template class IColumnHelper<ColumnAggregateFunction, IColumn>;
template class IColumnHelper<ColumnFunction, IColumn>;
template class IColumnHelper<ColumnCompressed, IColumn>;
template class IColumnHelper<IColumnDummy, IColumn>;
}

View File

@ -223,7 +223,38 @@ public:
* For example, to obtain unambiguous representation of Array of strings, strings data should be interleaved with their sizes.
* Parameter begin should be used with Arena::allocContinue.
*/
virtual StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit = nullptr) const = 0;
virtual StringRef serializeValueIntoArena(size_t /* n */, Arena & /* arena */, char const *& /* begin */) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method serializeValueIntoArena is not supported for {}", getName());
}
/// Same as above but serialize into already allocated continuous memory.
/// When finished, `memory` will point to the end of the serialization data.
virtual void serializeValueIntoMemory(size_t /* n */, char * & /* memory */) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method serializeValueIntoMemory is not supported for {}", getName());
}
/// Nullable variant to avoid calling virtualized method inside ColumnNullable.
virtual StringRef
serializeValueIntoArenaWithNull(size_t /* n */, Arena & /* arena */, char const *& /* begin */, const UInt8 * /* is_null */) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method serializeValueIntoArenaWithNull is not supported for {}", getName());
}
virtual void serializeValueIntoMemoryWithNull(size_t /* n */, char * & /* memory */, const UInt8 * /* is_null */) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method serializeValueIntoMemoryWithNull is not supported for {}", getName());
}
/// Calculate all the sizes of serialized data in column, then added to `sizes`.
/// If `is_null` is not nullptr, also take null bit into account.
/// This is currently used to facilitate the allocation of memory for an entire continuous row
/// in a single step. For more details, refer to the HashMethodSerialized implementation.
virtual void collectSerializedValueSizes(PaddedPODArray<UInt64> & /* sizes */, const UInt8 * /* is_null */) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method collectSerializedValueSizes is not supported for {}", getName());
}
/// Deserializes a value that was serialized using IColumn::serializeValueIntoArena method.
/// Returns pointer to the position after the read data.
@ -574,43 +605,18 @@ public:
[[nodiscard]] String dumpStructure() const;
protected:
/// Template is to devirtualize calls to insertFrom method.
/// In derived classes (that use final keyword), implement scatter method as call to scatterImpl.
template <typename Derived>
std::vector<MutablePtr> scatterImpl(ColumnIndex num_columns, const Selector & selector) const;
template <typename Derived, bool reversed, bool use_indexes>
void compareImpl(const Derived & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes,
PaddedPODArray<Int8> & compare_results,
int nan_direction_hint) const;
template <typename Derived>
void doCompareColumn(const Derived & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes,
PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const;
template <typename Derived>
bool hasEqualValuesImpl() const;
/// Template is to devirtualize calls to 'isDefaultAt' method.
template <typename Derived>
double getRatioOfDefaultRowsImpl(double sample_ratio) const;
template <typename Derived>
UInt64 getNumberOfDefaultRowsImpl() const;
template <typename Derived>
void getIndicesOfNonDefaultRowsImpl(Offsets & indices, size_t from, size_t limit) const;
template <typename Compare, typename Sort, typename PartialSort>
void getPermutationImpl(size_t limit, Permutation & res, Compare compare,
Sort full_sort, PartialSort partial_sort) const;
void getPermutationImpl(size_t limit, Permutation & res, Compare compare, Sort full_sort, PartialSort partial_sort) const;
template <typename Compare, typename Equals, typename Sort, typename PartialSort>
void updatePermutationImpl(size_t limit, Permutation & res, EqualRanges & equal_ranges, Compare compare, Equals equals,
Sort full_sort, PartialSort partial_sort) const;
void updatePermutationImpl(
size_t limit,
Permutation & res,
EqualRanges & equal_ranges,
Compare compare,
Equals equals,
Sort full_sort,
PartialSort partial_sort) const;
};
using ColumnPtr = IColumn::Ptr;
@ -627,7 +633,7 @@ struct IsMutableColumns;
template <typename Arg, typename ... Args>
struct IsMutableColumns<Arg, Args ...>
{
static const bool value = std::is_assignable<MutableColumnPtr &&, Arg>::value && IsMutableColumns<Args ...>::value;
static const bool value = std::is_assignable_v<MutableColumnPtr &&, Arg> && IsMutableColumns<Args ...>::value;
};
template <>
@ -667,4 +673,47 @@ bool isColumnNullable(const IColumn & column);
/// True if column's is ColumnNullable or ColumnLowCardinality with nullable nested column.
bool isColumnNullableOrLowCardinalityNullable(const IColumn & column);
/// Implement methods to devirtualize some calls of IColumn in final descendents.
/// `typename Parent` is needed because some columns don't inherit IColumn directly.
/// See ColumnFixedSizeHelper for example.
template <typename Derived, typename Parent = IColumn>
class IColumnHelper : public Parent
{
/// Devirtualize insertFrom.
MutableColumns scatter(IColumn::ColumnIndex num_columns, const IColumn::Selector & selector) const override;
/// Devirtualize insertFrom and insertRangeFrom.
void gather(ColumnGathererStream & gatherer) override;
/// Devirtualize compareAt.
void compareColumn(
const IColumn & rhs_base,
size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes,
PaddedPODArray<Int8> & compare_results,
int direction,
int nan_direction_hint) const override;
/// Devirtualize compareAt.
bool hasEqualValues() const override;
/// Devirtualize isDefaultAt.
double getRatioOfDefaultRows(double sample_ratio) const override;
/// Devirtualize isDefaultAt.
UInt64 getNumberOfDefaultRows() const override;
/// Devirtualize isDefaultAt.
void getIndicesOfNonDefaultRows(IColumn::Offsets & indices, size_t from, size_t limit) const override;
/// Devirtualize byteSizeAt.
void collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const override;
/// Move common implementations into the same translation unit to ensure they are properly inlined.
void serializeValueIntoMemoryWithNull(size_t n, char * & memory, const UInt8 * is_null) const override;
StringRef serializeValueIntoArenaWithNull(size_t n, Arena & arena, char const *& begin, const UInt8 * is_null) const override;
void serializeValueIntoMemory(size_t n, char * & memory) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
};
}

View File

@ -35,7 +35,7 @@ bool IColumnDummy::isDefaultAt(size_t) const
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "isDefaultAt is not implemented for {}", getName());
}
StringRef IColumnDummy::serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin, const UInt8 *) const
StringRef IColumnDummy::serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin) const
{
/// Has to put one useless byte into Arena, because serialization into zero number of bytes is ambiguous.
char * res = arena.allocContinue(1, begin);

View File

@ -11,7 +11,7 @@ class Arena;
/** Base class for columns-constants that contain a value that is not in the `Field`.
* Not a full-fledged column and is used in a special way.
*/
class IColumnDummy : public IColumn
class IColumnDummy : public IColumnHelper<IColumnDummy>
{
public:
IColumnDummy() : s(0) {}
@ -49,8 +49,10 @@ public:
++s;
}
StringRef serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin, const UInt8 *) const override;
StringRef serializeValueIntoArena(size_t /*n*/, Arena & arena, char const *& begin) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t /*n*/, SipHash & /*hash*/) const override

View File

@ -2,7 +2,7 @@
/**
* This file implements template methods of IColumn that depend on other types
* we don't want to include.
* Currently, this is only the scatterImpl method that depends on PODArray
* Currently, getPermutationImpl and updatePermutationImpl depend on PODArray
* implementation.
*/
@ -15,11 +15,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int LOGICAL_ERROR;
}
struct DefaultSort
{
@ -39,186 +34,6 @@ struct DefaultPartialSort
}
};
template <typename Derived>
std::vector<IColumn::MutablePtr> IColumn::scatterImpl(ColumnIndex num_columns,
const Selector & selector) const
{
size_t num_rows = size();
if (num_rows != selector.size())
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of selector: {} doesn't match size of column: {}",
selector.size(), num_rows);
std::vector<MutablePtr> columns(num_columns);
for (auto & column : columns)
column = cloneEmpty();
{
size_t reserve_size = static_cast<size_t>(num_rows * 1.1 / num_columns); /// 1.1 is just a guess. Better to use n-sigma rule.
if (reserve_size > 1)
for (auto & column : columns)
column->reserve(reserve_size);
}
for (size_t i = 0; i < num_rows; ++i)
static_cast<Derived &>(*columns[selector[i]]).insertFrom(*this, i);
return columns;
}
template <typename Derived, bool reversed, bool use_indexes>
void IColumn::compareImpl(const Derived & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes [[maybe_unused]],
PaddedPODArray<Int8> & compare_results,
int nan_direction_hint) const
{
size_t num_rows = size();
size_t num_indexes = num_rows;
UInt64 * indexes [[maybe_unused]];
UInt64 * next_index [[maybe_unused]];
if constexpr (use_indexes)
{
num_indexes = row_indexes->size();
indexes = row_indexes->data();
next_index = indexes;
}
compare_results.resize(num_rows);
if (compare_results.empty())
compare_results.resize(num_rows);
else if (compare_results.size() != num_rows)
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of compare_results: {} doesn't match rows_num: {}",
compare_results.size(), num_rows);
for (size_t i = 0; i < num_indexes; ++i)
{
UInt64 row = i;
if constexpr (use_indexes)
row = indexes[i];
int res = static_cast<const Derived *>(this)->compareAt(row, rhs_row_num, rhs, nan_direction_hint);
assert(res == 1 || res == -1 || res == 0);
compare_results[row] = static_cast<Int8>(res);
if constexpr (reversed)
compare_results[row] = -compare_results[row];
if constexpr (use_indexes)
{
if (compare_results[row] == 0)
{
*next_index = row;
++next_index;
}
}
}
if constexpr (use_indexes)
{
size_t equal_row_indexes_size = next_index - row_indexes->data();
row_indexes->resize(equal_row_indexes_size);
}
}
template <typename Derived>
void IColumn::doCompareColumn(const Derived & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes,
PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const
{
if (direction < 0)
{
if (row_indexes)
compareImpl<Derived, true, true>(rhs, rhs_row_num, row_indexes, compare_results, nan_direction_hint);
else
compareImpl<Derived, true, false>(rhs, rhs_row_num, row_indexes, compare_results, nan_direction_hint);
}
else
{
if (row_indexes)
compareImpl<Derived, false, true>(rhs, rhs_row_num, row_indexes, compare_results, nan_direction_hint);
else
compareImpl<Derived, false, false>(rhs, rhs_row_num, row_indexes, compare_results, nan_direction_hint);
}
}
template <typename Derived>
bool IColumn::hasEqualValuesImpl() const
{
size_t num_rows = size();
for (size_t i = 1; i < num_rows; ++i)
{
if (compareAt(i, 0, static_cast<const Derived &>(*this), false) != 0)
return false;
}
return true;
}
template <typename Derived>
double IColumn::getRatioOfDefaultRowsImpl(double sample_ratio) const
{
if (sample_ratio <= 0.0 || sample_ratio > 1.0)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Value of 'sample_ratio' must be in interval (0.0; 1.0], but got: {}", sample_ratio);
static constexpr auto max_number_of_rows_for_full_search = 1000;
size_t num_rows = size();
size_t num_sampled_rows = std::min(static_cast<size_t>(num_rows * sample_ratio), num_rows);
size_t num_checked_rows = 0;
size_t res = 0;
if (num_sampled_rows == num_rows || num_rows <= max_number_of_rows_for_full_search)
{
for (size_t i = 0; i < num_rows; ++i)
res += static_cast<const Derived &>(*this).isDefaultAt(i);
num_checked_rows = num_rows;
}
else if (num_sampled_rows != 0)
{
for (size_t i = 0; i < num_rows; ++i)
{
if (num_checked_rows * num_rows <= i * num_sampled_rows)
{
res += static_cast<const Derived &>(*this).isDefaultAt(i);
++num_checked_rows;
}
}
}
if (num_checked_rows == 0)
return 0.0;
return static_cast<double>(res) / num_checked_rows;
}
template <typename Derived>
UInt64 IColumn::getNumberOfDefaultRowsImpl() const
{
UInt64 res = 0;
size_t num_rows = size();
for (size_t i = 0; i < num_rows; ++i)
res += static_cast<const Derived &>(*this).isDefaultAt(i);
return res;
}
template <typename Derived>
void IColumn::getIndicesOfNonDefaultRowsImpl(Offsets & indices, size_t from, size_t limit) const
{
size_t to = limit && from + limit < size() ? from + limit : size();
indices.reserve_exact(indices.size() + to - from);
for (size_t i = from; i < to; ++i)
{
if (!static_cast<const Derived &>(*this).isDefaultAt(i))
indices.push_back(i);
}
}
template <typename ComparatorBase, IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability>
struct ComparatorHelperImpl : public ComparatorBase
{

View File

@ -117,7 +117,7 @@ void column_unique_unique_deserialize_from_arena_impl(ColumnType & column, const
const char * pos = nullptr;
for (size_t i = 0; i < num_values; ++i)
{
auto ref = column_unique_pattern->serializeValueIntoArena(idx->getUInt(i), arena, pos, nullptr);
auto ref = column_unique_pattern->serializeValueIntoArena(idx->getUInt(i), arena, pos);
const char * new_pos;
column_unique->uniqueDeserializeAndInsertFromArena(ref.data, new_pos);
ASSERT_EQ(new_pos - ref.data, ref.size) << "Deserialized data has different sizes at position " << i;
@ -140,8 +140,8 @@ void column_unique_unique_deserialize_from_arena_impl(ColumnType & column, const
const char * pos_lc = nullptr;
for (size_t i = 0; i < num_values; ++i)
{
auto ref_string = column.serializeValueIntoArena(i, arena_string, pos_string, nullptr);
auto ref_lc = column_unique->serializeValueIntoArena(idx->getUInt(i), arena_lc, pos_lc, nullptr);
auto ref_string = column.serializeValueIntoArena(i, arena_string, pos_string);
auto ref_lc = column_unique->serializeValueIntoArena(idx->getUInt(i), arena_lc, pos_lc);
ASSERT_EQ(ref_string, ref_lc) << "Serialized data is different from pattern at position " << i;
}
}

View File

@ -685,25 +685,75 @@ struct HashMethodKeysFixed
* That is, for example, for strings, it contains first the serialized length of the string, and then the bytes.
* Therefore, when aggregating by several strings, there is no ambiguity.
*/
template <typename Value, typename Mapped>
template <typename Value, typename Mapped, bool nullable, bool prealloc>
struct HashMethodSerialized
: public columns_hashing_impl::HashMethodBase<HashMethodSerialized<Value, Mapped>, Value, Mapped, false>
: public columns_hashing_impl::HashMethodBase<HashMethodSerialized<Value, Mapped, nullable, prealloc>, Value, Mapped, false>
{
using Self = HashMethodSerialized<Value, Mapped>;
using Self = HashMethodSerialized<Value, Mapped, nullable, prealloc>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;
static constexpr bool has_cheap_key_calculation = false;
ColumnRawPtrs key_columns;
size_t keys_size;
std::vector<const UInt8 *> null_maps;
PaddedPODArray<UInt64> row_sizes;
HashMethodSerialized(const ColumnRawPtrs & key_columns_, const Sizes & /*key_sizes*/, const HashMethodContextPtr &)
: key_columns(key_columns_), keys_size(key_columns_.size()) {}
: key_columns(key_columns_), keys_size(key_columns_.size())
{
if constexpr (nullable)
{
null_maps.resize(keys_size);
for (size_t i = 0; i < keys_size; ++i)
{
if (const auto * nullable_column = dynamic_cast<const ColumnNullable *>(key_columns[i]))
{
null_maps[i] = nullable_column->getNullMapData().data();
key_columns[i] = nullable_column->getNestedColumnPtr().get();
}
}
}
if constexpr (prealloc)
{
null_maps.resize(keys_size);
for (size_t i = 0; i < keys_size; ++i)
key_columns[i]->collectSerializedValueSizes(row_sizes, null_maps[i]);
}
}
friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;
ALWAYS_INLINE SerializedKeyHolder getKeyHolder(size_t row, Arena & pool) const
{
if constexpr (prealloc)
{
const char * begin = nullptr;
char * memory = pool.allocContinue(row_sizes[row], begin);
StringRef key(memory, row_sizes[row]);
for (size_t j = 0; j < keys_size; ++j)
{
if constexpr (nullable)
key_columns[j]->serializeValueIntoMemoryWithNull(row, memory, null_maps[j]);
else
key_columns[j]->serializeValueIntoMemory(row, memory);
}
return SerializedKeyHolder{key, pool};
}
else if constexpr (nullable)
{
const char * begin = nullptr;
size_t sum_size = 0;
for (size_t j = 0; j < keys_size; ++j)
sum_size += key_columns[j]->serializeValueIntoArenaWithNull(row, pool, begin, null_maps[j]).size;
return SerializedKeyHolder{{begin, sum_size}, pool};
}
return SerializedKeyHolder{
serializeKeysToPoolContiguous(row, keys_size, key_columns, pool),
pool};

View File

@ -48,7 +48,7 @@ public:
SerializationPtr serialization = type->getDefaultSerialization();
auto col_res = ColumnArray::create(ColumnString::create());
ColumnString & col_res_strings = typeid_cast<ColumnString &>(col_res->getData());
ColumnVectorHelper::Offsets & col_res_offsets = typeid_cast<ColumnArray::Offsets &>(col_res->getOffsets());
ColumnFixedSizeHelper::Offsets & col_res_offsets = typeid_cast<ColumnArray::Offsets &>(col_res->getOffsets());
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
col_res_strings.insert(substream_path.toString());

View File

@ -89,7 +89,7 @@ void fillFixedBatch(size_t keys_size, const ColumnRawPtrs & key_columns, const S
/// Note: here we violate strict aliasing.
/// It should be ok as log as we do not reffer to any value from `out` before filling.
const char * source = static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin<sizeof(T)>();
const char * source = static_cast<const ColumnFixedSizeHelper *>(column)->getRawDataBegin<sizeof(T)>();
size_t offset_to = offset;
if constexpr (std::endian::native == std::endian::big)
offset_to = sizeof(Key) - sizeof(T) - offset;
@ -151,33 +151,33 @@ static inline T ALWAYS_INLINE packFixed(
{
case 1:
{
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin<1>() + index, 1);
memcpy(bytes + offset, static_cast<const ColumnFixedSizeHelper *>(column)->getRawDataBegin<1>() + index, 1);
offset += 1;
}
break;
case 2:
if constexpr (sizeof(T) >= 2) /// To avoid warning about memcpy exceeding object size.
{
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin<2>() + index * 2, 2);
memcpy(bytes + offset, static_cast<const ColumnFixedSizeHelper *>(column)->getRawDataBegin<2>() + index * 2, 2);
offset += 2;
}
break;
case 4:
if constexpr (sizeof(T) >= 4)
{
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin<4>() + index * 4, 4);
memcpy(bytes + offset, static_cast<const ColumnFixedSizeHelper *>(column)->getRawDataBegin<4>() + index * 4, 4);
offset += 4;
}
break;
case 8:
if constexpr (sizeof(T) >= 8)
{
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin<8>() + index * 8, 8);
memcpy(bytes + offset, static_cast<const ColumnFixedSizeHelper *>(column)->getRawDataBegin<8>() + index * 8, 8);
offset += 8;
}
break;
default:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(column)->getRawDataBegin<1>() + index * key_sizes[j], key_sizes[j]);
memcpy(bytes + offset, static_cast<const ColumnFixedSizeHelper *>(column)->getRawDataBegin<1>() + index * key_sizes[j], key_sizes[j]);
offset += key_sizes[j];
}
}
@ -227,23 +227,23 @@ static inline T ALWAYS_INLINE packFixed(
switch (key_sizes[j])
{
case 1:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin<1>() + i, 1);
memcpy(bytes + offset, static_cast<const ColumnFixedSizeHelper *>(key_columns[j])->getRawDataBegin<1>() + i, 1);
offset += 1;
break;
case 2:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin<2>() + i * 2, 2);
memcpy(bytes + offset, static_cast<const ColumnFixedSizeHelper *>(key_columns[j])->getRawDataBegin<2>() + i * 2, 2);
offset += 2;
break;
case 4:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin<4>() + i * 4, 4);
memcpy(bytes + offset, static_cast<const ColumnFixedSizeHelper *>(key_columns[j])->getRawDataBegin<4>() + i * 4, 4);
offset += 4;
break;
case 8:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin<8>() + i * 8, 8);
memcpy(bytes + offset, static_cast<const ColumnFixedSizeHelper *>(key_columns[j])->getRawDataBegin<8>() + i * 8, 8);
offset += 8;
break;
default:
memcpy(bytes + offset, static_cast<const ColumnVectorHelper *>(key_columns[j])->getRawDataBegin<1>() + i * key_sizes[j], key_sizes[j]);
memcpy(bytes + offset, static_cast<const ColumnFixedSizeHelper *>(key_columns[j])->getRawDataBegin<1>() + i * key_sizes[j], key_sizes[j]);
offset += key_sizes[j];
}
}

View File

@ -774,6 +774,17 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
}
}
bool all_keys_are_numbers_or_strings = true;
for (size_t j = 0; j < params.keys_size; ++j)
{
if (!types_removed_nullable[j]->isValueRepresentedByNumber() && !isString(types_removed_nullable[j])
&& !isFixedString(types_removed_nullable[j]))
{
all_keys_are_numbers_or_strings = false;
break;
}
}
if (has_nullable_key)
{
/// Optimization for one key
@ -832,8 +843,11 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
return AggregatedDataVariants::Type::low_cardinality_key_fixed_string;
}
if (params.keys_size > 1 && all_keys_are_numbers_or_strings)
return AggregatedDataVariants::Type::nullable_prealloc_serialized;
/// Fallback case.
return AggregatedDataVariants::Type::serialized;
return AggregatedDataVariants::Type::nullable_serialized;
}
/// No key has been found to be nullable.
@ -915,6 +929,9 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
return AggregatedDataVariants::Type::key_string;
}
if (params.keys_size > 1 && all_keys_are_numbers_or_strings)
return AggregatedDataVariants::Type::prealloc_serialized;
return AggregatedDataVariants::Type::serialized;
}
@ -3308,12 +3325,15 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
auto merge_method = method_chosen;
#define APPLY_FOR_VARIANTS_THAT_MAY_USE_BETTER_HASH_FUNCTION(M) \
M(key64) \
M(key_string) \
M(key_fixed_string) \
M(keys128) \
M(keys256) \
M(serialized) \
M(key64) \
M(key_string) \
M(key_fixed_string) \
M(keys128) \
M(keys256) \
M(serialized) \
M(nullable_serialized) \
M(prealloc_serialized) \
M(nullable_prealloc_serialized) \
#define M(NAME) \
if (merge_method == AggregatedDataVariants::Type::NAME) \

View File

@ -227,17 +227,17 @@ struct AggregationMethodOneNumber
// Insert the key from the hash table into columns.
static void insertKeyIntoColumns(const Key & key, std::vector<IColumn *> & key_columns, const Sizes & /*key_sizes*/)
{
ColumnVectorHelper * column;
ColumnFixedSizeHelper * column;
if constexpr (nullable)
{
ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*key_columns[0]);
ColumnUInt8 * null_map = assert_cast<ColumnUInt8 *>(&nullable_col.getNullMapColumn());
null_map->insertDefault();
column = static_cast<ColumnVectorHelper *>(&nullable_col.getNestedColumn());
column = static_cast<ColumnFixedSizeHelper *>(&nullable_col.getNestedColumn());
}
else
{
column = static_cast<ColumnVectorHelper *>(key_columns[0]);
column = static_cast<ColumnFixedSizeHelper *>(key_columns[0]);
}
static_assert(sizeof(FieldType) <= sizeof(Key));
const auto * key_holder = reinterpret_cast<const char *>(&key);
@ -561,7 +561,7 @@ struct AggregationMethodKeysFixed
* That is, for example, for strings, it contains first the serialized length of the string, and then the bytes.
* Therefore, when aggregating by several strings, there is no ambiguity.
*/
template <typename TData>
template <typename TData, bool nullable = false, bool prealloc = false>
struct AggregationMethodSerialized
{
using Data = TData;
@ -580,7 +580,7 @@ struct AggregationMethodSerialized
}
template <bool use_cache>
using StateImpl = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped>;
using StateImpl = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped, nullable, prealloc>;
using State = StateImpl<true>;
using StateNoCache = StateImpl<false>;
@ -598,6 +598,14 @@ struct AggregationMethodSerialized
}
};
template <typename TData>
using AggregationMethodNullableSerialized = AggregationMethodSerialized<TData, true>;
template <typename TData>
using AggregationMethodPreallocSerialized = AggregationMethodSerialized<TData, false, true>;
template <typename TData>
using AggregationMethodNullablePreallocSerialized = AggregationMethodSerialized<TData, true, true>;
class Aggregator;
@ -655,7 +663,10 @@ struct AggregatedDataVariants : private boost::noncopyable
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt64Key>> keys64;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128>> keys128;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256>> keys256;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKey>> serialized;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKey>> serialized;
std::unique_ptr<AggregationMethodNullableSerialized<AggregatedDataWithStringKey>> nullable_serialized;
std::unique_ptr<AggregationMethodPreallocSerialized<AggregatedDataWithStringKey>> prealloc_serialized;
std::unique_ptr<AggregationMethodNullablePreallocSerialized<AggregatedDataWithStringKey>> nullable_prealloc_serialized;
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>> key32_two_level;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>> key64_two_level;
@ -665,14 +676,20 @@ struct AggregatedDataVariants : private boost::noncopyable
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyTwoLevel>> keys64_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel>> keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel>> keys256_two_level;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel>> serialized_two_level;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel>> serialized_two_level;
std::unique_ptr<AggregationMethodNullableSerialized<AggregatedDataWithStringKeyTwoLevel>> nullable_serialized_two_level;
std::unique_ptr<AggregationMethodPreallocSerialized<AggregatedDataWithStringKeyTwoLevel>> prealloc_serialized_two_level;
std::unique_ptr<AggregationMethodNullablePreallocSerialized<AggregatedDataWithStringKeyTwoLevel>> nullable_prealloc_serialized_two_level;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyHash64>> key64_hash64;
std::unique_ptr<AggregationMethodString<AggregatedDataWithStringKeyHash64>> key_string_hash64;
std::unique_ptr<AggregationMethodFixedString<AggregatedDataWithStringKeyHash64>> key_fixed_string_hash64;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128Hash64>> keys128_hash64;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256Hash64>> keys256_hash64;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyHash64>> serialized_hash64;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyHash64>> serialized_hash64;
std::unique_ptr<AggregationMethodNullableSerialized<AggregatedDataWithStringKeyHash64>> nullable_serialized_hash64;
std::unique_ptr<AggregationMethodPreallocSerialized<AggregatedDataWithStringKeyHash64>> prealloc_serialized_hash64;
std::unique_ptr<AggregationMethodNullablePreallocSerialized<AggregatedDataWithStringKeyHash64>> nullable_prealloc_serialized_hash64;
/// Support for nullable keys.
std::unique_ptr<AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false, true>> nullable_key8;
@ -723,7 +740,10 @@ struct AggregatedDataVariants : private boost::noncopyable
M(keys64, false) \
M(keys128, false) \
M(keys256, false) \
M(serialized, false) \
M(serialized, false) \
M(nullable_serialized, false) \
M(prealloc_serialized, false) \
M(nullable_prealloc_serialized, false) \
M(key32_two_level, true) \
M(key64_two_level, true) \
M(key_string_two_level, true) \
@ -732,13 +752,19 @@ struct AggregatedDataVariants : private boost::noncopyable
M(keys64_two_level, true) \
M(keys128_two_level, true) \
M(keys256_two_level, true) \
M(serialized_two_level, true) \
M(serialized_two_level, true) \
M(nullable_serialized_two_level, true) \
M(prealloc_serialized_two_level, true) \
M(nullable_prealloc_serialized_two_level, true) \
M(key64_hash64, false) \
M(key_string_hash64, false) \
M(key_fixed_string_hash64, false) \
M(keys128_hash64, false) \
M(keys256_hash64, false) \
M(serialized_hash64, false) \
M(serialized_hash64, false) \
M(nullable_serialized_hash64, false) \
M(prealloc_serialized_hash64, false) \
M(nullable_prealloc_serialized_hash64, false) \
M(nullable_key8, false) \
M(nullable_key16, false) \
M(nullable_key32, false) \
@ -863,6 +889,9 @@ struct AggregatedDataVariants : private boost::noncopyable
M(keys128) \
M(keys256) \
M(serialized) \
M(nullable_serialized) \
M(prealloc_serialized) \
M(nullable_prealloc_serialized) \
M(nullable_key32) \
M(nullable_key64) \
M(nullable_key_string) \
@ -889,6 +918,9 @@ struct AggregatedDataVariants : private boost::noncopyable
M(keys128_hash64) \
M(keys256_hash64) \
M(serialized_hash64) \
M(nullable_serialized_hash64) \
M(prealloc_serialized_hash64) \
M(nullable_prealloc_serialized_hash64) \
M(low_cardinality_key8) \
M(low_cardinality_key16) \
@ -925,6 +957,9 @@ struct AggregatedDataVariants : private boost::noncopyable
M(keys128_two_level) \
M(keys256_two_level) \
M(serialized_two_level) \
M(nullable_serialized_two_level) \
M(prealloc_serialized_two_level) \
M(nullable_prealloc_serialized_two_level) \
M(nullable_key32_two_level) \
M(nullable_key64_two_level) \
M(nullable_key_string_two_level) \

View File

@ -1,8 +1,7 @@
<test>
<settings>
<max_insert_threads>8</max_insert_threads>
<allow_experimental_projection_optimization>0</allow_experimental_projection_optimization>
<max_threads>4</max_threads>
<max_threads>1</max_threads>
</settings>
<create_query>
@ -29,4 +28,4 @@
<query>select toDecimal64(key_int64_1, 3),toDecimal64(key_int64_2, 3),toDecimal64(key_int64_3, 3),toDecimal64(key_int64_4, 3),toDecimal64(key_int64_5, 3), min(m1) from t_nullable group by toDecimal64(key_int64_1, 3),toDecimal64(key_int64_2, 3),toDecimal64(key_int64_3, 3),toDecimal64(key_int64_4, 3),toDecimal64(key_int64_5, 3) limit 10</query>
<drop_query>drop table if exists t_nullable</drop_query>
</test>
</test>