From 577d5713001b4078037e3501c3eb920eb51432a8 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 12 Mar 2021 19:33:41 +0300 Subject: [PATCH] ColumnSparse: initial implementation --- src/AggregateFunctions/AggregateFunctionSum.h | 2 + src/AggregateFunctions/IAggregateFunction.h | 55 ++ src/Columns/ColumnArray.cpp | 18 +- src/Columns/ColumnArray.h | 4 +- src/Columns/ColumnSparse.cpp | 486 ++++++++++++++++++ src/Columns/ColumnSparse.h | 161 ++++++ src/Columns/ColumnString.cpp | 16 +- src/Columns/ColumnString.h | 4 +- src/Columns/ColumnVector.h | 25 +- src/Columns/IColumn.cpp | 11 + src/Columns/IColumn.h | 12 +- src/Common/SparseArray.h | 0 src/DataStreams/NativeBlockInputStream.cpp | 14 +- src/DataStreams/NativeBlockOutputStream.cpp | 16 +- src/DataStreams/NativeBlockOutputStream.h | 2 +- src/DataTypes/DataTypeTuple.cpp | 4 +- src/DataTypes/IDataType.cpp | 28 +- src/DataTypes/IDataType.h | 5 +- src/DataTypes/Serializations/ISerialization.h | 58 ++- .../Serializations/SerializationInfo.cpp | 36 +- .../Serializations/SerializationInfo.h | 12 +- .../SerializationLowCardinality.cpp | 42 +- .../Serializations/SerializationSparse.cpp | 256 ++++++--- .../Serializations/SerializationSparse.h | 5 + .../Serializations/SerializationTuple.cpp | 39 +- src/Interpreters/Aggregator.cpp | 22 +- src/Interpreters/Aggregator.h | 1 + .../Formats/Impl/PrettyBlockOutputFormat.cpp | 4 +- .../Impl/PrettySpaceBlockOutputFormat.cpp | 2 +- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 32 +- src/Storages/MergeTree/IMergeTreeDataPart.h | 11 +- .../MergeTree/MergeTreeDataPartWide.cpp | 2 +- .../MergeTreeDataPartWriterCompact.cpp | 12 +- .../MergeTreeDataPartWriterOnDisk.cpp | 2 +- .../MergeTree/MergeTreeDataPartWriterOnDisk.h | 3 + .../MergeTree/MergeTreeDataPartWriterWide.cpp | 22 +- .../MergeTree/MergeTreeDataPartWriterWide.h | 3 - .../MergeTree/MergeTreeReaderWide.cpp | 8 +- src/Storages/MergeTree/MergeTreeSettings.h | 1 + .../MergeTree/MergedBlockOutputStream.cpp | 22 +- 40 files changed, 1206 insertions(+), 252 deletions(-) create mode 100644 src/Columns/ColumnSparse.cpp create mode 100644 src/Columns/ColumnSparse.h create mode 100644 src/Common/SparseArray.h diff --git a/src/AggregateFunctions/AggregateFunctionSum.h b/src/AggregateFunctions/AggregateFunctionSum.h index bd1f9fc302e..343686a6f6d 100644 --- a/src/AggregateFunctions/AggregateFunctionSum.h +++ b/src/AggregateFunctions/AggregateFunctionSum.h @@ -361,6 +361,8 @@ public: } } + void addManyDefaults(size_t /* length */) const override {} + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override { this->data(place).merge(this->data(rhs)); diff --git a/src/AggregateFunctions/IAggregateFunction.h b/src/AggregateFunctions/IAggregateFunction.h index d15ff4e8a78..42ffa0cd4eb 100644 --- a/src/AggregateFunctions/IAggregateFunction.h +++ b/src/AggregateFunctions/IAggregateFunction.h @@ -12,6 +12,7 @@ #include #include #include +#include namespace DB @@ -153,11 +154,20 @@ public: Arena * arena, ssize_t if_argument_pos = -1) const = 0; + virtual void addBatchSparse( + AggregateDataPtr * places, + size_t place_offset, + const IColumn ** columns, + Arena * arena) const = 0; + /** The same for single place. */ virtual void addBatchSinglePlace( size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const = 0; + virtual void addBatchSparseSinglePlace( + AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0; + /** The same for single place when need to aggregate only filtered data. */ virtual void addBatchSinglePlaceNotNull( @@ -213,6 +223,13 @@ public: */ virtual AggregateFunctionPtr getNestedFunction() const { return {}; } + virtual bool supportsSparseArguments() const { return false; } + + virtual void addManyDefaults(size_t /* length */) const + { + throw Exception("Method addManyDefaults is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + const DataTypes & getArgumentTypes() const { return argument_types; } const Array & getParameters() const { return parameters; } @@ -278,6 +295,32 @@ public: } } + void addBatchSparse( + AggregateDataPtr * places, + size_t place_offset, + const IColumn ** columns, + Arena * arena) const override + { + const auto & column_sparse = assert_cast(*columns[0]); + const auto * values = &column_sparse.getValuesColumn(); + const auto & offsets_data = column_sparse.getOffsetsData(); + + size_t offset_pos = 0; + size_t offsets_size = offsets_data.size(); + for (size_t i = 0; i < column_sparse.size(); ++i) + { + if (offset_pos < offsets_size && i == offsets_data[offset_pos]) + { + static_cast(this)->add(places[i] + place_offset, &values, offset_pos + 1, arena); + ++offset_pos; + } + else + { + static_cast(this)->add(places[i] + place_offset, &values, 0, arena); + } + } + } + void addBatchSinglePlace( size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override { @@ -297,6 +340,18 @@ public: } } + void addBatchSparseSinglePlace( + AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override + { + const auto & column_sparse = assert_cast(*columns[0]); + const auto * values = &column_sparse.getValuesColumn(); + + for (size_t i = 1; i < values->size(); ++i) + static_cast(this)->add(place, &values, i, arena); + + static_cast(this)->addManyDefaults(column_sparse.getNumberOfDefaults()); + } + void addBatchSinglePlaceNotNull( size_t batch_size, AggregateDataPtr place, diff --git a/src/Columns/ColumnArray.cpp b/src/Columns/ColumnArray.cpp index 9bea4c4bedc..e8e60cd941e 100644 --- a/src/Columns/ColumnArray.cpp +++ b/src/Columns/ColumnArray.cpp @@ -1206,7 +1206,7 @@ void ColumnArray::gather(ColumnGathererStream & gatherer) gatherer.gather(*this); } -void ColumnArray::getIndicesOfNonDefaultValues(IColumn::Offsets & indices) const +void ColumnArray::getIndicesOfNonDefaultValues(IColumn::Offsets & indices, size_t, size_t) const { const auto & offsets_data = getOffsets(); for (size_t i = 0; i < offsets_data.size(); ++i) @@ -1214,14 +1214,14 @@ void ColumnArray::getIndicesOfNonDefaultValues(IColumn::Offsets & indices) const indices.push_back(i); } -size_t ColumnArray::getNumberOfNonDefaultValues() const -{ - const auto & offsets_data = getOffsets(); - size_t res = 0; - for (size_t i = 0; i < offsets_data.size(); ++i) - res += (offsets_data[i] != offsets_data[i - 1]); +// size_t ColumnArray::getNumberOfDefaultRows() const +// { +// const auto & offsets_data = getOffsets(); +// size_t res = 0; +// for (size_t i = 0; i < offsets_data.size(); ++i) +// res += (offsets_data[i] != offsets_data[i - 1]); - return res; -} +// return res; +// } } diff --git a/src/Columns/ColumnArray.h b/src/Columns/ColumnArray.h index 19614365b2a..8e285eb3dff 100644 --- a/src/Columns/ColumnArray.h +++ b/src/Columns/ColumnArray.h @@ -139,9 +139,9 @@ public: return false; } - void getIndicesOfNonDefaultValues(IColumn::Offsets & indices) const override; + void getIndicesOfNonDefaultValues(IColumn::Offsets & indices, size_t offset, size_t limit) const override; - size_t getNumberOfNonDefaultValues() const override; + // size_t getNumberOfDefaultRows() const override; bool isCollationSupported() const override { return getData().isCollationSupported(); } diff --git a/src/Columns/ColumnSparse.cpp b/src/Columns/ColumnSparse.cpp new file mode 100644 index 00000000000..e67cb45847d --- /dev/null +++ b/src/Columns/ColumnSparse.cpp @@ -0,0 +1,486 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +ColumnSparse::ColumnSparse(MutableColumnPtr && values_) + : values(std::move(values_)), _size(0) +{ + if (!values->empty()) + throw Exception("Not empty values passed to ColumnSparse, but no offsets passed", ErrorCodes::LOGICAL_ERROR); + + values->insertDefault(); + offsets = ColumnUInt64::create(); +} + +ColumnSparse::ColumnSparse(MutableColumnPtr && values_, MutableColumnPtr && offsets_, size_t size_) + : values(std::move(values_)), offsets(std::move(offsets_)), _size(size_) +{ + const ColumnUInt64 * offsets_concrete = typeid_cast(offsets.get()); + + if (!offsets_concrete) + throw Exception("offsets_column must be a ColumnUInt64", ErrorCodes::LOGICAL_ERROR); + + if (offsets->size() + 1 != values->size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Values size is inconsistent with offsets size. Expected: {}, got {}", offsets->size() + 1, values->size()); +} + +MutableColumnPtr ColumnSparse::cloneResized(size_t new_size) const +{ + if (new_size == 0) + return ColumnSparse::create(values->cloneEmpty()); + + if (new_size >= _size) + return ColumnSparse::create(IColumn::mutate(values), IColumn::mutate(offsets), new_size); + + auto res = ColumnSparse::create(values->cloneEmpty()); + res->insertRangeFrom(*this, 0, new_size); + return res; +} + +bool ColumnSparse::isNullAt(size_t n) const +{ + return values->isNullAt(getValueIndex(n)); +} + +Field ColumnSparse::operator[](size_t n) const +{ + return (*values)[getValueIndex(n)]; +} + +void ColumnSparse::get(size_t n, Field & res) const +{ + values->get(n, res); +} + +bool ColumnSparse::getBool(size_t n) const +{ + return values->getBool(getValueIndex(n)); +} + +UInt64 ColumnSparse::get64(size_t n) const +{ + return values->get64(getValueIndex(n)); +} + +StringRef ColumnSparse::getDataAt(size_t n) const +{ + return values->getDataAt(getValueIndex(n)); +} + +ColumnPtr ColumnSparse::convertToFullColumnIfSparse() const +{ + auto res = values->cloneEmpty(); + const auto & offsets_data = getOffsetsData(); + size_t current_offset = 0; + for (size_t i = 0; i < offsets_data.size(); ++i) + { + size_t offsets_diff = offsets_data[i] - current_offset; + current_offset = offsets_data[i]; + if (offsets_diff > 1) + res->insertManyDefaults(offsets_diff - 1); + res->insertFrom(*values, i + 1); + } + + res->insertManyDefaults(_size - current_offset); + return res; +} + + +void ColumnSparse::insertData(const char * pos, size_t length) +{ + _size += length; + return values->insertData(pos, length); +} + +StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const +{ + return values->serializeValueIntoArena(getValueIndex(n), arena, begin); +} + +const char * ColumnSparse::deserializeAndInsertFromArena(const char * pos) +{ + UNUSED(pos); + throwMustBeDense(); +} + +void ColumnSparse::insertRangeFrom(const IColumn & src, size_t start, size_t length) +{ + size_t end = start + length; + auto & offsets_data = getOffsetsData(); + + if (const auto * src_sparse = typeid_cast(&src)) + { + const auto & src_offsets = src_sparse->getOffsetsData(); + const auto & src_values = src_sparse->getValuesColumn(); + + size_t offset_start = std::lower_bound(src_offsets.begin(), src_offsets.end(), start) - src_offsets.begin(); + size_t offset_end = std::lower_bound(src_offsets.begin(), src_offsets.end(), end) - src_offsets.begin(); + + insertManyDefaults(offset_start - start); + offsets_data.push_back(_size); + + for (size_t i = offset_start + 1; i < offset_end; ++i) + { + size_t current_diff = src_offsets[i] - src_offsets[i - 1]; + insertManyDefaults(current_diff - 1); + offsets_data.push_back(_size); + ++_size; + } + + insertManyDefaults(end - offset_end); + values->insertRangeFrom(src_values, offset_start + 1, offset_end - offset_start); + } + else + { + for (size_t i = start; i < end; ++i) + { + offsets_data.push_back(_size); + ++_size; + } + + values->insertRangeFrom(src, start, length); + } +} + +void ColumnSparse::insert(const Field & x) +{ + getOffsetsData().push_back(_size); + values->insert(x); + ++_size; +} + +void ColumnSparse::insertFrom(const IColumn & src, size_t n) +{ + + if (const auto * src_sparse = typeid_cast(&src)) + { + if (size_t value_index = src_sparse->getValueIndex(n)) + { + getOffsetsData().push_back(_size); + values->insertFrom(src_sparse->getValuesColumn(), value_index); + } + } + else + { + getOffsetsData().push_back(_size); + values->insertFrom(src, n); + } + + ++_size; +} + +void ColumnSparse::insertDefault() +{ + ++_size; +} + +void ColumnSparse::insertManyDefaults(size_t length) +{ + _size += length; +} + +void ColumnSparse::popBack(size_t n) +{ + assert(n < _size); + + auto & offsets_data = getOffsetsData(); + size_t new_size = _size - n; + + size_t removed_values = 0; + while(!offsets_data.empty() && offsets_data.back() >= new_size) + { + offsets_data.pop_back(); + ++removed_values; + } + + if (removed_values) + values->popBack(removed_values); + + _size = new_size; +} + +ColumnPtr ColumnSparse::filter(const Filter & filt, ssize_t) const +{ + if (_size != filt.size()) + throw Exception("Size of filter doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); + + if (offsets->empty()) + { + auto res = cloneEmpty(); + res->insertManyDefaults(countBytesInFilter(filt)); + return res; + } + + const auto & offsets_data = getOffsetsData(); + + auto res_offsets = offsets->cloneEmpty(); + auto & res_offsets_data = assert_cast(*res_offsets).getData(); + + Filter values_filter; + values_filter.reserve(values->size()); + values_filter.push_back(1); + size_t values_result_size_hint = 1; + + size_t offset_pos = 0; + size_t res_offset = 0; + + for (size_t i = 0; i < _size; ++i) + { + if (offset_pos < offsets_data.size() && i == offsets_data[offset_pos]) + { + if (filt[i]) + { + res_offsets_data.push_back(res_offset); + values_filter.push_back(1); + ++res_offset; + ++values_result_size_hint; + } + else + { + values_filter.push_back(0); + } + + ++offset_pos; + } + else + { + res_offset += filt[i] != 0; + } + } + + auto res_values = values->filter(values_filter, values_result_size_hint); + return this->create(std::move(res_values), std::move(res_offsets), res_offset); +} + +ColumnPtr ColumnSparse::permute(const Permutation & perm, size_t limit) const +{ + UNUSED(perm); + UNUSED(limit); + + throwMustBeDense(); +} + +ColumnPtr ColumnSparse::index(const IColumn & indexes, size_t limit) const +{ + UNUSED(indexes); + UNUSED(limit); + + throwMustBeDense(); +} + +int ColumnSparse::compareAt(size_t n, size_t m, const IColumn & rhs_, int null_direction_hint) const +{ + UNUSED(n); + UNUSED(m); + UNUSED(rhs_); + UNUSED(null_direction_hint); + + std::cerr << "rhs: " << rhs_.dumpStructure() << "\n"; + + throwMustBeDense(); +} + +void ColumnSparse::compareColumn(const IColumn & rhs, size_t rhs_row_num, + PaddedPODArray * row_indexes, PaddedPODArray & compare_results, + int direction, int nan_direction_hint) const +{ + UNUSED(rhs); + UNUSED(rhs_row_num); + UNUSED(row_indexes); + UNUSED(compare_results); + UNUSED(direction); + UNUSED(nan_direction_hint); + + throwMustBeDense(); +} + +int ColumnSparse::compareAtWithCollation(size_t n, size_t m, const IColumn & rhs, int null_direction_hint, const Collator &) const +{ + UNUSED(n); + UNUSED(m); + UNUSED(rhs); + UNUSED(null_direction_hint); + + throwMustBeDense(); +} + +bool ColumnSparse::hasEqualValues() const +{ + return offsets->size() == 0; +} + +void ColumnSparse::getPermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const +{ + UNUSED(reverse); + UNUSED(limit); + UNUSED(null_direction_hint); + UNUSED(res); + + throwMustBeDense(); +} + +void ColumnSparse::updatePermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges & equal_range) const +{ + UNUSED(reverse); + UNUSED(null_direction_hint); + UNUSED(limit); + UNUSED(res); + UNUSED(equal_range); + + throwMustBeDense(); +} + +void ColumnSparse::getPermutationWithCollation(const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res) const +{ + UNUSED(collator); + UNUSED(reverse); + UNUSED(limit); + UNUSED(null_direction_hint); + UNUSED(res); + + throwMustBeDense(); +} + +void ColumnSparse::updatePermutationWithCollation( + const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges& equal_range) const +{ + UNUSED(collator); + UNUSED(reverse); + UNUSED(limit); + UNUSED(null_direction_hint); + UNUSED(res); + UNUSED(equal_range); + + throwMustBeDense(); +} + +void ColumnSparse::reserve(size_t) +{ +} + +size_t ColumnSparse::byteSize() const +{ + return values->byteSize() + offsets->byteSize(); +} + +size_t ColumnSparse::byteSizeAt(size_t n) const +{ + size_t index = getValueIndex(n); + size_t res = values->byteSizeAt(index); + if (index) + res += sizeof(UInt64); + + return res; +} + +size_t ColumnSparse::allocatedBytes() const +{ + return values->allocatedBytes() + offsets->allocatedBytes(); +} + +void ColumnSparse::protect() +{ + throwMustBeDense(); +} + +ColumnPtr ColumnSparse::replicate(const Offsets & replicate_offsets) const +{ + UNUSED(replicate_offsets); + throwMustBeDense(); +} + +void ColumnSparse::updateHashWithValue(size_t n, SipHash & hash) const +{ + UNUSED(n); + UNUSED(hash); + throwMustBeDense(); +} + +void ColumnSparse::updateWeakHash32(WeakHash32 & hash) const +{ + UNUSED(hash); + throwMustBeDense(); +} + +void ColumnSparse::updateHashFast(SipHash & hash) const +{ + UNUSED(hash); + throwMustBeDense(); +} + +void ColumnSparse::getExtremes(Field & min, Field & max) const +{ + UNUSED(min); + UNUSED(max); + throwMustBeDense(); +} + +void ColumnSparse::getIndicesOfNonDefaultValues(IColumn::Offsets & indices, size_t from, size_t limit) const +{ + const auto & offsets_data = getOffsetsData(); + auto start = from ? std::lower_bound(offsets_data.begin(), offsets_data.end(), from) : offsets_data.begin(); + auto end = limit ? std::lower_bound(offsets_data.begin(), offsets_data.end(), from + limit) : offsets_data.end(); + + indices.assign(start, end); +} + +size_t ColumnSparse::getNumberOfDefaultRows(size_t step) const +{ + return (_size - offsets->size()) / step; +} + +MutableColumns ColumnSparse::scatter(ColumnIndex num_columns, const Selector & selector) const +{ + UNUSED(num_columns); + UNUSED(selector); + throwMustBeDense(); +} + +void ColumnSparse::gather(ColumnGathererStream & gatherer_stream) +{ + UNUSED(gatherer_stream); + throwMustBeDense(); +} + +ColumnPtr ColumnSparse::compress() const +{ + throwMustBeDense(); +} + +bool ColumnSparse::structureEquals(const IColumn & rhs) const +{ + UNUSED(rhs); + throwMustBeDense(); +} + +const IColumn::Offsets & ColumnSparse::getOffsetsData() const +{ + return assert_cast(*offsets).getData(); +} + +IColumn::Offsets & ColumnSparse::getOffsetsData() +{ + return assert_cast(*offsets).getData(); +} + +size_t ColumnSparse::getValueIndex(size_t n) const +{ + assert(n < _size); + + const auto & offsets_data = getOffsetsData(); + auto it = std::lower_bound(offsets_data.begin(), offsets_data.end(), n); + if (it == offsets_data.end() || *it != n) + return 0; + + return it - offsets_data.begin() + 1; +} + +} diff --git a/src/Columns/ColumnSparse.h b/src/Columns/ColumnSparse.h new file mode 100644 index 00000000000..c14c64c010e --- /dev/null +++ b/src/Columns/ColumnSparse.h @@ -0,0 +1,161 @@ +#pragma once + +#include +#include +#include +#include +#include + +class Collator; + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +class ColumnSparse final : public COWHelper +{ +private: + friend class COWHelper; + + explicit ColumnSparse(MutableColumnPtr && values_); + ColumnSparse(MutableColumnPtr && values_, MutableColumnPtr && offsets_, size_t size_); + ColumnSparse(const ColumnSparse &) = default; + +public: + /** Create immutable column using immutable arguments. This arguments may be shared with other columns. + * Use IColumn::mutate in order to make mutable column and mutate shared nested columns. + */ + using Base = COWHelper; + static Ptr create(const ColumnPtr & values_, const ColumnPtr & offsets_, size_t size_) + { + return Base::create(values_->assumeMutable(), offsets_->assumeMutable(), size_); + } + + static MutablePtr create(MutableColumnPtr && values_, MutableColumnPtr && offsets_, size_t size_) + { + return Base::create(std::move(values_), std::move(offsets_), size_); + } + + static Ptr create(const ColumnPtr & values_) + { + return Base::create(values_->assumeMutable()); + } + + template >> + static MutablePtr create(Arg && arg) + { + return Base::create(std::forward(arg)); + } + + const char * getFamilyName() const override { return "Sparse"; } + std::string getName() const override { return "Sparse(" + values->getName() + ")"; } + TypeIndex getDataType() const override { return values->getDataType(); } + MutableColumnPtr cloneResized(size_t new_size) const override; + size_t size() const override { return _size; } + bool isNullAt(size_t n) const override; + Field operator[](size_t n) const override; + void get(size_t n, Field & res) const override; + bool getBool(size_t n) const override; + UInt64 get64(size_t n) const override; + StringRef getDataAt(size_t n) const override; + + ColumnPtr convertToFullColumnIfSparse() const override; + + /// Will insert null value if pos=nullptr + void insertData(const char * pos, size_t length) override; + StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; + const char * deserializeAndInsertFromArena(const char * pos) override; + void insertRangeFrom(const IColumn & src, size_t start, size_t length) override; + void insert(const Field & x) override; + void insertFrom(const IColumn & src, size_t n) override; + void insertDefault() override; + void insertManyDefaults(size_t length) override; + + void popBack(size_t n) override; + ColumnPtr filter(const Filter & filt, ssize_t) const override; + ColumnPtr permute(const Permutation & perm, size_t limit) const override; + ColumnPtr index(const IColumn & indexes, size_t limit) const override; + int compareAt(size_t n, size_t m, const IColumn & rhs_, int null_direction_hint) const override; + void compareColumn(const IColumn & rhs, size_t rhs_row_num, + PaddedPODArray * row_indexes, PaddedPODArray & compare_results, + int direction, int nan_direction_hint) const override; + int compareAtWithCollation(size_t n, size_t m, const IColumn & rhs, int null_direction_hint, const Collator &) const override; + bool hasEqualValues() const override; + void getPermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const override; + void updatePermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges & equal_range) const override; + void getPermutationWithCollation(const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res) const override; + void updatePermutationWithCollation( + const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges& equal_range) const override; + void reserve(size_t n) override; + size_t byteSize() const override; + size_t byteSizeAt(size_t n) const override; + size_t allocatedBytes() const override; + void protect() override; + ColumnPtr replicate(const Offsets & replicate_offsets) const override; + void updateHashWithValue(size_t n, SipHash & hash) const override; + void updateWeakHash32(WeakHash32 & hash) const override; + void updateHashFast(SipHash & hash) const override; + void getExtremes(Field & min, Field & max) const override; + + void getIndicesOfNonDefaultValues(IColumn::Offsets & indices, size_t from, size_t limit) const override; + size_t getNumberOfDefaultRows(size_t step) const override; + + MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override; + + void gather(ColumnGathererStream & gatherer_stream) override; + + ColumnPtr compress() const override; + + void forEachSubcolumn(ColumnCallback callback) override + { + callback(values); + callback(offsets); + } + + bool structureEquals(const IColumn & rhs) const override; + + bool isNullable() const override { return values->isNullable(); } + bool isFixedAndContiguous() const override { return false; } + bool valuesHaveFixedSize() const override { return values->valuesHaveFixedSize(); } + size_t sizeOfValueIfFixed() const override { return values->sizeOfValueIfFixed() + values->sizeOfValueIfFixed(); } + bool isCollationSupported() const override { return values->isCollationSupported(); } + + size_t getNumberOfDefaults() const { return _size - offsets->size(); } + size_t getNumberOfTrailingDefaults() const + { + return offsets->empty() ? _size : _size - getOffsetsData().back() - 1; + } + + size_t getValueIndex(size_t n) const; + + const IColumn & getValuesColumn() const { return *values; } + IColumn & getValuesColumn() { return *values; } + + const ColumnPtr & getValuesPtr() const { return values; } + ColumnPtr & getValuesPtr() { return values; } + + const IColumn::Offsets & getOffsetsData() const; + IColumn::Offsets & getOffsetsData(); + + const ColumnPtr & getOffsetsPtr() const { return offsets; } + ColumnPtr & getOffsetsPtr() { return offsets; } + + const IColumn & getOffsetsColumn() const { return *offsets; } + IColumn & getOffsetsColumn() { return *offsets; } + +private: + [[noreturn]] void throwMustBeDense() const + { + throw Exception("Not implemented for ColumnSparse", ErrorCodes::LOGICAL_ERROR); + } + + WrappedPtr values; + WrappedPtr offsets; + size_t _size; +}; + +} diff --git a/src/Columns/ColumnString.cpp b/src/Columns/ColumnString.cpp index 61276909a55..cab848c8b07 100644 --- a/src/Columns/ColumnString.cpp +++ b/src/Columns/ColumnString.cpp @@ -530,21 +530,21 @@ void ColumnString::getExtremes(Field & min, Field & max) const get(max_idx, max); } -void ColumnString::getIndicesOfNonDefaultValues(Offsets & indices) const +void ColumnString::getIndicesOfNonDefaultValues(Offsets & indices, size_t, size_t) const { for (size_t i = 0; i < offsets.size(); ++i) if (offsets[i] - offsets[i - 1] > 1) indices.push_back(i); } -size_t ColumnString::getNumberOfNonDefaultValues() const -{ - size_t res = 0; - for (size_t i = 0; i < offsets.size(); ++i) - res += (offsets[i] - offsets[i - 1] > 1); +// size_t ColumnString::getNumberOfDefaultRows() const +// { +// size_t res = 0; +// for (size_t i = 0; i < offsets.size(); ++i) +// res += (offsets[i] - offsets[i - 1] > 1); - return res; -} +// return res; +// } ColumnPtr ColumnString::compress() const { diff --git a/src/Columns/ColumnString.h b/src/Columns/ColumnString.h index fe09abda7b5..ef9f51b9cfe 100644 --- a/src/Columns/ColumnString.h +++ b/src/Columns/ColumnString.h @@ -277,8 +277,8 @@ public: return typeid(rhs) == typeid(ColumnString); } - void getIndicesOfNonDefaultValues(Offsets & indices) const override; - size_t getNumberOfNonDefaultValues() const override; + void getIndicesOfNonDefaultValues(Offsets & indices, size_t from, size_t limit) const override; + // size_t getNumberOfDefaultRows() const override; Chars & getChars() { return chars; } const Chars & getChars() const { return chars; } diff --git a/src/Columns/ColumnVector.h b/src/Columns/ColumnVector.h index a7ee420c2cc..c2620a369de 100644 --- a/src/Columns/ColumnVector.h +++ b/src/Columns/ColumnVector.h @@ -303,19 +303,34 @@ public: return typeid(rhs) == typeid(ColumnVector); } - void getIndicesOfNonDefaultValues(IColumn::Offsets & offsets) const override + void getIndicesOfNonDefaultValues(IColumn::Offsets & offsets, size_t from, size_t limit) const override { offsets.reserve(data.size()); - for (size_t i = 0; i < data.size(); ++i) + size_t to = limit && from + limit < size() ? from + limit : size(); + for (size_t i = from; i < to; ++i) if (data[i] != T{}) offsets.push_back(i); } - size_t getNumberOfNonDefaultValues() const override + void insertAtOffsetsFrom(const IColumn::Offsets & offsets, const IColumn & values, size_t total_rows_hint) override + { + const auto & values_data = assert_cast(values).getData(); + + ssize_t position = static_cast(data.size()) - 1; + data.resize_fill(data.size() + total_rows_hint); + + for (size_t i = 0; i < offsets.size(); ++i) + { + position += offsets[i] + 1; + data[position] = values_data[i]; + } + } + + size_t getNumberOfDefaultRows(size_t step) const override { size_t res = 0; - for (size_t i = 0; i < data.size(); ++i) - res += (data[i] != T{}); + for (size_t i = 0; i < data.size(); i += step) + res += (data[i] == T{}); return res; } diff --git a/src/Columns/IColumn.cpp b/src/Columns/IColumn.cpp index a3ed0885651..88bc8fa3ead 100644 --- a/src/Columns/IColumn.cpp +++ b/src/Columns/IColumn.cpp @@ -30,6 +30,17 @@ void IColumn::insertFrom(const IColumn & src, size_t n) insert(src[n]); } +void IColumn::insertAtOffsetsFrom(const Offsets & offsets, const IColumn & values, size_t) +{ + assert(offsets.size() == values.size()); + for (size_t i = 0; i < offsets.size(); ++i) + { + if (offsets[i]) + insertManyDefaults(offsets[i]); + insertFrom(values, i); + } +} + bool isColumnNullable(const IColumn & column) { return checkColumn(column); diff --git a/src/Columns/IColumn.h b/src/Columns/IColumn.h index 44b7e280118..902353618f6 100644 --- a/src/Columns/IColumn.h +++ b/src/Columns/IColumn.h @@ -67,6 +67,8 @@ public: /// If column is ColumnLowCardinality, transforms is to full column. virtual Ptr convertToFullColumnIfLowCardinality() const { return getPtr(); } + virtual Ptr convertToFullColumnIfSparse() const { return getPtr(); } + /// Creates empty column with the same type. virtual MutablePtr cloneEmpty() const { return cloneResized(0); } @@ -363,8 +365,14 @@ public: throw Exception("Method structureEquals is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); } - virtual void getIndicesOfNonDefaultValues(Offsets & /* offsets */) const {} - virtual size_t getNumberOfNonDefaultValues() const { return 0; } + virtual void getIndicesOfNonDefaultValues(Offsets & /* offsets */, size_t, size_t) const {} + + virtual void insertAtOffsetsFrom(const Offsets & offsets, const IColumn & values, size_t total_rows_hint); + + static constexpr auto DEFAULT_ROWS_SEARCH_STEP = 8; + static constexpr auto MIN_ROWS_TO_SEARCH_DEFAULTS = DEFAULT_ROWS_SEARCH_STEP * 16; + + virtual size_t getNumberOfDefaultRows(size_t /* step */) const { return {}; } /// Compress column in memory to some representation that allows to decompress it back. /// Return itself if compression is not applicable for this column type. diff --git a/src/Common/SparseArray.h b/src/Common/SparseArray.h new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/DataStreams/NativeBlockInputStream.cpp b/src/DataStreams/NativeBlockInputStream.cpp index 9e39b4b2b28..9f0b0031fde 100644 --- a/src/DataStreams/NativeBlockInputStream.cpp +++ b/src/DataStreams/NativeBlockInputStream.cpp @@ -11,6 +11,8 @@ #include #include +#include + namespace DB { @@ -79,7 +81,7 @@ void NativeBlockInputStream::readData(const IDataType & type, ColumnPtr & column settings.position_independent_encoding = false; ISerialization::DeserializeBinaryBulkStatePtr state; - auto serialization = type.getDefaultSerialization(); + auto serialization = type.getSerialization(*column); serialization->deserializeBinaryBulkStatePrefix(settings, state); serialization->deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state); @@ -150,6 +152,10 @@ Block NativeBlockInputStream::readImpl() readBinary(type_name, istr); column.type = data_type_factory.get(type_name); + /// TODO: check revision. + SerializationKind serialization_kind; + readIntBinary(serialization_kind, istr); + if (use_index) { /// Index allows to do more checks. @@ -161,13 +167,19 @@ Block NativeBlockInputStream::readImpl() /// Data ColumnPtr read_column = column.type->createColumn(); + if (serialization_kind == SerializationKind::SPARSE) + read_column = ColumnSparse::create(read_column); double avg_value_size_hint = avg_value_size_hints.empty() ? 0 : avg_value_size_hints[i]; if (rows) /// If no rows, nothing to read. readData(*column.type, read_column, istr, rows, avg_value_size_hint); + /// TODO: maybe remove. + read_column = read_column->convertToFullColumnIfSparse(); column.column = std::move(read_column); + // std::cerr << "column.column: " << column.column->dumpStructure() << "\n"; + if (header) { /// Support insert from old clients without low cardinality type. diff --git a/src/DataStreams/NativeBlockOutputStream.cpp b/src/DataStreams/NativeBlockOutputStream.cpp index da68376201f..bee93efcc9f 100644 --- a/src/DataStreams/NativeBlockOutputStream.cpp +++ b/src/DataStreams/NativeBlockOutputStream.cpp @@ -41,7 +41,7 @@ void NativeBlockOutputStream::flush() } -void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit) +void NativeBlockOutputStream::writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit) { /** If there are columns-constants - then we materialize them. * (Since the data type does not know how to serialize / deserialize constants.) @@ -53,12 +53,10 @@ void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr settings.position_independent_encoding = false; settings.low_cardinality_max_dictionary_size = 0; - auto serialization = type.getDefaultSerialization(); - ISerialization::SerializeBinaryBulkStatePtr state; - serialization->serializeBinaryBulkStatePrefix(settings, state); - serialization->serializeBinaryBulkWithMultipleStreams(*full_column, offset, limit, settings, state); - serialization->serializeBinaryBulkStateSuffix(settings, state); + serialization.serializeBinaryBulkStatePrefix(settings, state); + serialization.serializeBinaryBulkWithMultipleStreams(*full_column, offset, limit, settings, state); + serialization.serializeBinaryBulkStateSuffix(settings, state); } @@ -121,9 +119,13 @@ void NativeBlockOutputStream::write(const Block & block) writeStringBinary(type_name, ostr); + /// TODO: add revision + auto serialization = column.type->getSerialization(*column.column); + writeIntBinary(serialization->getKind(), ostr); + /// Data if (rows) /// Zero items of data is always represented as zero number of bytes. - writeData(*column.type, column.column, ostr, 0, 0); + writeData(*serialization, column.column, ostr, 0, 0); if (index_ostr) { diff --git a/src/DataStreams/NativeBlockOutputStream.h b/src/DataStreams/NativeBlockOutputStream.h index 64ccd267634..25ad4e1b470 100644 --- a/src/DataStreams/NativeBlockOutputStream.h +++ b/src/DataStreams/NativeBlockOutputStream.h @@ -30,7 +30,7 @@ public: void write(const Block & block) override; void flush() override; - static void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit); + static void writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit); String getContentType() const override { return "application/octet-stream"; } diff --git a/src/DataTypes/DataTypeTuple.cpp b/src/DataTypes/DataTypeTuple.cpp index 371a50a32cf..435cdcbbe6e 100644 --- a/src/DataTypes/DataTypeTuple.cpp +++ b/src/DataTypes/DataTypeTuple.cpp @@ -340,8 +340,8 @@ SerializationPtr DataTypeTuple::getSerialization(const String & column_name, con ISerialization::Settings settings = { .num_rows = info.getNumberOfRows(), - .num_non_default_rows = info.getNumberOfNonDefaultValues(subcolumn_name), - .min_ratio_for_dense_serialization = 10 + .num_default_rows = info.getNumberOfDefaultRows(subcolumn_name), + .ratio_for_sparse_serialization = info.getRatioForSparseSerialization() }; auto serializaion = elems[i]->getSerialization(settings); diff --git a/src/DataTypes/IDataType.cpp b/src/DataTypes/IDataType.cpp index 2b09bc12054..999dfada206 100644 --- a/src/DataTypes/IDataType.cpp +++ b/src/DataTypes/IDataType.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -158,13 +159,14 @@ SerializationPtr IDataType::getSubcolumnSerialization(const String & subcolumn_n throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName()); } + SerializationPtr IDataType::getSerialization(const String & column_name, const SerializationInfo & info) const { ISerialization::Settings settings = { .num_rows = info.getNumberOfRows(), - .num_non_default_rows = info.getNumberOfNonDefaultValues(column_name), - .min_ratio_for_dense_serialization = 10 + .num_default_rows = info.getNumberOfDefaultRows(column_name), + .ratio_for_sparse_serialization = info.getRatioForSparseSerialization() }; return getSerialization(settings); @@ -172,11 +174,14 @@ SerializationPtr IDataType::getSerialization(const String & column_name, const S SerializationPtr IDataType::getSerialization(const IColumn & column) const { + if (typeid_cast(&column)) + return getSparseSerialization(); + ISerialization::Settings settings = { .num_rows = column.size(), - .num_non_default_rows = column.getNumberOfNonDefaultValues(), - .min_ratio_for_dense_serialization = 10 + .num_default_rows = column.getNumberOfDefaultRows(IColumn::DEFAULT_ROWS_SEARCH_STEP), + .ratio_for_sparse_serialization = 10 }; return getSerialization(settings); @@ -184,10 +189,9 @@ SerializationPtr IDataType::getSerialization(const IColumn & column) const SerializationPtr IDataType::getSerialization(const ISerialization::Settings & settings) const { - // if (settings.num_non_default_rows * settings.min_ratio_for_dense_serialization < settings.num_rows) - // return getSparseSerialization(); - - UNUSED(settings); + double ratio = settings.num_rows ? std::min(static_cast(settings.num_default_rows) / settings.num_rows, 1.0) : 0.0; + if (ratio > settings.ratio_for_sparse_serialization) + return getSparseSerialization(); return getDefaultSerialization(); } @@ -215,9 +219,6 @@ SerializationPtr IDataType::getSerialization(const String & column_name, const S if (callback(sparse_idx_name)) return getSparseSerialization(); - UNUSED(column_name); - UNUSED(callback); - return getDefaultSerialization(); } @@ -238,4 +239,9 @@ void IDataType::enumerateStreams(const SerializationPtr & serialization, const S }, path); } +bool isSparseSerializaion(const SerializationPtr & serialization) +{ + return typeid_cast(serialization.get()); +} + } diff --git a/src/DataTypes/IDataType.h b/src/DataTypes/IDataType.h index 9e60c0b3b1d..865c081d859 100644 --- a/src/DataTypes/IDataType.h +++ b/src/DataTypes/IDataType.h @@ -75,6 +75,7 @@ public: using StreamExistenceCallback = std::function; using BaseSerializationGetter = std::function; + virtual SerializationPtr getSerialization(const IColumn & column) const; virtual SerializationPtr getSerialization(const String & column_name, const StreamExistenceCallback & callback) const; virtual SerializationPtr getSubcolumnSerialization( const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const; @@ -86,7 +87,7 @@ public: virtual SerializationPtr getSerialization(const String & column_name, const SerializationInfo & info) const; SerializationPtr getSerialization(const ISerialization::Settings & settings) const; - SerializationPtr getSerialization(const IColumn & column) const; + // SerializationPtr getSerialization(const IColumn & column) const; using StreamCallbackWithType = std::function; @@ -503,4 +504,6 @@ template <> inline constexpr bool IsDataTypeDateOrDateTime = true; template <> inline constexpr bool IsDataTypeDateOrDateTime = true; template <> inline constexpr bool IsDataTypeDateOrDateTime = true; +bool isSparseSerializaion(const SerializationPtr & serialization); + } diff --git a/src/DataTypes/Serializations/ISerialization.h b/src/DataTypes/Serializations/ISerialization.h index 9c0a6e683a5..b72feb72b6f 100644 --- a/src/DataTypes/Serializations/ISerialization.h +++ b/src/DataTypes/Serializations/ISerialization.h @@ -2,6 +2,8 @@ #include #include +#include +#include #include #include @@ -9,6 +11,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + class IDataType; class ReadBuffer; @@ -25,6 +32,12 @@ class Field; struct FormatSettings; struct NameAndTypePair; +enum class SerializationKind : UInt8 +{ + DEFAULT = 0, + SPARSE = 1 +}; + class ISerialization { public: @@ -90,6 +103,8 @@ public: String toString() const; }; + virtual SerializationKind getKind() const { return SerializationKind::DEFAULT; } + /// Cache for common substreams of one type, but possible different its subcolumns. /// E.g. sizes of arrays of Nested data type. using SubstreamsCache = std::unordered_map; @@ -143,8 +158,8 @@ public: struct Settings { size_t num_rows; - size_t num_non_default_rows; - size_t min_ratio_for_dense_serialization; + size_t num_default_rows; + double ratio_for_sparse_serialization; }; /// Call before serializeBinaryBulkWithMultipleStreams chain to write something before first mark. @@ -258,9 +273,48 @@ public: static ColumnPtr getFromSubstreamsCache(SubstreamsCache * cache, const SubstreamPath & path); static bool isSpecialCompressionAllowed(const SubstreamPath & path); + + template + static State * checkAndGetSerializeState(SerializeBinaryBulkStatePtr & state, const Serialization &); + + template + static State * checkAndGetDeserializeState(DeserializeBinaryBulkStatePtr & state, const Serialization &); }; using SerializationPtr = std::shared_ptr; using Serializations = std::vector; +template +static State * checkAndGetState(StatePtr & state) +{ + if (!state) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Got empty state for {}", demangle(typeid(Serialization).name())); + + auto * state_concrete = typeid_cast(state.get()); + if (!state_concrete) + { + auto & state_ref = *state; + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Invalid State for {}. Expected: {}, got {}", + demangle(typeid(Serialization).name()), + demangle(typeid(State).name()), + demangle(typeid(state_ref).name())); + } + + return state_concrete; +} + +template +State * ISerialization::checkAndGetSerializeState(SerializeBinaryBulkStatePtr & state, const Serialization &) +{ + return checkAndGetState(state); +} + +template +State * ISerialization::checkAndGetDeserializeState(DeserializeBinaryBulkStatePtr & state, const Serialization &) +{ + return checkAndGetState(state); +} + } diff --git a/src/DataTypes/Serializations/SerializationInfo.cpp b/src/DataTypes/Serializations/SerializationInfo.cpp index cb86bac7514..124ee30729e 100644 --- a/src/DataTypes/Serializations/SerializationInfo.cpp +++ b/src/DataTypes/Serializations/SerializationInfo.cpp @@ -17,17 +17,25 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +SerializationInfo::SerializationInfo( + double ratio_for_sparse_serialization_, + size_t default_rows_search_step_) + : ratio_for_sparse_serialization(ratio_for_sparse_serialization_) + , default_rows_search_step(default_rows_search_step_) +{ +} + void SerializationInfo::add(const Block & block) { number_of_rows += block.rows(); for (const auto & elem : block) { - non_default_values[elem.name] = elem.column->getNumberOfNonDefaultValues(); + default_rows[elem.name] += elem.column->getNumberOfDefaultRows(default_rows_search_step) * default_rows_search_step; for (const auto & subname : elem.type->getSubcolumnNames()) { auto subcolumn = elem.type->getSubcolumn(subname, *elem.column); auto full_name = Nested::concatenateName(elem.name, subname); - non_default_values[full_name] += subcolumn->getNumberOfNonDefaultValues(); + default_rows[full_name] += subcolumn->getNumberOfDefaultRows(default_rows_search_step) * default_rows_search_step; } } } @@ -35,14 +43,14 @@ void SerializationInfo::add(const Block & block) void SerializationInfo::add(const SerializationInfo & other) { number_of_rows += other.number_of_rows; - for (const auto & [name, num] : other.non_default_values) - non_default_values[name] += num; + for (const auto & [name, num] : other.default_rows) + default_rows[name] += num; } -size_t SerializationInfo::getNumberOfNonDefaultValues(const String & column_name) const +size_t SerializationInfo::getNumberOfDefaultRows(const String & column_name) const { - auto it = non_default_values.find(column_name); - if (it == non_default_values.end()) + auto it = default_rows.find(column_name); + if (it == default_rows.end()) return 0; return it->second; } @@ -51,13 +59,15 @@ namespace { constexpr auto KEY_NUMBER_OF_ROWS = "number_of_rows"; -constexpr auto KEY_NUMBER_OF_NON_DEFAULT_VALUES = "number_of_non_default_values"; +constexpr auto KEY_NUMBER_OF_default_rows = "number_of_default_rows"; constexpr auto KEY_NUMBER = "number"; constexpr auto KEY_NAME = "name"; constexpr auto KEY_VERSION = "version"; } +/// TODO: add all fields. + void SerializationInfo::fromJSON(const String & json_str) { Poco::JSON::Parser parser; @@ -66,9 +76,9 @@ void SerializationInfo::fromJSON(const String & json_str) if (object->has(KEY_NUMBER_OF_ROWS)) number_of_rows = object->getValue(KEY_NUMBER_OF_ROWS); - if (object->has(KEY_NUMBER_OF_NON_DEFAULT_VALUES)) + if (object->has(KEY_NUMBER_OF_default_rows)) { - auto array = object->getArray(KEY_NUMBER_OF_NON_DEFAULT_VALUES); + auto array = object->getArray(KEY_NUMBER_OF_default_rows); for (const auto & elem : *array) { auto elem_object = elem.extract(); @@ -78,7 +88,7 @@ void SerializationInfo::fromJSON(const String & json_str) auto name = elem_object->getValue(KEY_NAME); auto number = elem_object->getValue(KEY_NUMBER); - non_default_values[name] = number; + default_rows[name] = number; } } } @@ -90,7 +100,7 @@ String SerializationInfo::toJSON() const info.set(KEY_NUMBER_OF_ROWS, number_of_rows); Poco::JSON::Array column_infos; - for (const auto & [name, num] : non_default_values) + for (const auto & [name, num] : default_rows) { Poco::JSON::Object column_info; column_info.set(KEY_NAME, name); @@ -98,7 +108,7 @@ String SerializationInfo::toJSON() const column_infos.add(std::move(column_info)); } - info.set(KEY_NUMBER_OF_NON_DEFAULT_VALUES, std::move(column_infos)); + info.set(KEY_NUMBER_OF_default_rows, std::move(column_infos)); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); diff --git a/src/DataTypes/Serializations/SerializationInfo.h b/src/DataTypes/Serializations/SerializationInfo.h index ceee705cf57..6205c3c7e0c 100644 --- a/src/DataTypes/Serializations/SerializationInfo.h +++ b/src/DataTypes/Serializations/SerializationInfo.h @@ -12,11 +12,16 @@ public: using NameToNumber = std::unordered_map; + SerializationInfo( + double ratio_for_sparse_serialization_, + size_t default_rows_search_step_ = IColumn::DEFAULT_ROWS_SEARCH_STEP); + void add(const Block & block); void add(const SerializationInfo & other); - size_t getNumberOfNonDefaultValues(const String & column_name) const; + size_t getNumberOfDefaultRows(const String & column_name) const; size_t getNumberOfRows() const { return number_of_rows; } + double getRatioForSparseSerialization() const { return ratio_for_sparse_serialization; } void read(ReadBuffer & in); void write(WriteBuffer & out) const; @@ -25,8 +30,11 @@ private: void fromJSON(const String & json_str); String toJSON() const; + double ratio_for_sparse_serialization; + size_t default_rows_search_step; + size_t number_of_rows = 0; - NameToNumber non_default_values; + NameToNumber default_rows; }; } diff --git a/src/DataTypes/Serializations/SerializationLowCardinality.cpp b/src/DataTypes/Serializations/SerializationLowCardinality.cpp index 31058cb6e57..71216435a7c 100644 --- a/src/DataTypes/Serializations/SerializationLowCardinality.cpp +++ b/src/DataTypes/Serializations/SerializationLowCardinality.cpp @@ -196,42 +196,6 @@ struct DeserializeStateLowCardinality : public ISerialization::DeserializeBinary explicit DeserializeStateLowCardinality(UInt64 key_version_) : key_version(key_version_) {} }; -static SerializeStateLowCardinality * checkAndGetLowCardinalitySerializeState( - ISerialization::SerializeBinaryBulkStatePtr & state) -{ - if (!state) - throw Exception("Got empty state for SerializationLowCardinality.", ErrorCodes::LOGICAL_ERROR); - - auto * low_cardinality_state = typeid_cast(state.get()); - if (!low_cardinality_state) - { - auto & state_ref = *state; - throw Exception("Invalid SerializeBinaryBulkState for SerializationLowCardinality. Expected: " - + demangle(typeid(SerializeStateLowCardinality).name()) + ", got " - + demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR); - } - - return low_cardinality_state; -} - -static DeserializeStateLowCardinality * checkAndGetLowCardinalityDeserializeState( - ISerialization::DeserializeBinaryBulkStatePtr & state) -{ - if (!state) - throw Exception("Got empty state for SerializationLowCardinality.", ErrorCodes::LOGICAL_ERROR); - - auto * low_cardinality_state = typeid_cast(state.get()); - if (!low_cardinality_state) - { - auto & state_ref = *state; - throw Exception("Invalid DeserializeBinaryBulkState for SerializationLowCardinality. Expected: " - + demangle(typeid(DeserializeStateLowCardinality).name()) + ", got " - + demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR); - } - - return low_cardinality_state; -} - void SerializationLowCardinality::serializeBinaryBulkStatePrefix( SerializeBinaryBulkSettings & settings, SerializeBinaryBulkStatePtr & state) const @@ -256,7 +220,7 @@ void SerializationLowCardinality::serializeBinaryBulkStateSuffix( SerializeBinaryBulkSettings & settings, SerializeBinaryBulkStatePtr & state) const { - auto * low_cardinality_state = checkAndGetLowCardinalitySerializeState(state); + auto * low_cardinality_state = checkAndGetSerializeState(state, *this); KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value); if (low_cardinality_state->shared_dictionary && settings.low_cardinality_max_dictionary_size) @@ -495,7 +459,7 @@ void SerializationLowCardinality::serializeBinaryBulkWithMultipleStreams( const ColumnLowCardinality & low_cardinality_column = typeid_cast(column); - auto * low_cardinality_state = checkAndGetLowCardinalitySerializeState(state); + auto * low_cardinality_state = checkAndGetSerializeState(state, *this); auto & global_dictionary = low_cardinality_state->shared_dictionary; KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value); @@ -594,7 +558,7 @@ void SerializationLowCardinality::deserializeBinaryBulkWithMultipleStreams( if (!indexes_stream) throw Exception("Got empty stream for SerializationLowCardinality indexes.", ErrorCodes::LOGICAL_ERROR); - auto * low_cardinality_state = checkAndGetLowCardinalityDeserializeState(state); + auto * low_cardinality_state = checkAndGetDeserializeState(state, *this); KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value); auto read_dictionary = [this, low_cardinality_state, keys_stream]() diff --git a/src/DataTypes/Serializations/SerializationSparse.cpp b/src/DataTypes/Serializations/SerializationSparse.cpp index 19dddf84464..291d98d3135 100644 --- a/src/DataTypes/Serializations/SerializationSparse.cpp +++ b/src/DataTypes/Serializations/SerializationSparse.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -12,28 +13,118 @@ namespace DB namespace { -void serializeOffsetsPositionIndependent(const IColumn::Offsets & offsets, WriteBuffer & ostr) +static constexpr auto END_OF_GRANULE_FLAG = 1ULL << 63; + +struct SerializeStateSparse : public ISerialization::SerializeBinaryBulkState { + size_t num_trailing_default_values = 0; + ISerialization::SerializeBinaryBulkStatePtr nested; +}; + +struct DeserializeStateSparse : public ISerialization::DeserializeBinaryBulkState +{ + size_t num_trailing_defaults = 0; + bool has_value_after_defaults = false; + ISerialization::DeserializeBinaryBulkStatePtr nested; +}; + +void serializeOffsetsPositionIndependent(const IColumn::Offsets & offsets, WriteBuffer & ostr, size_t start, size_t end) +{ + // std::cerr << "writing start: " << start << ", end: " << end << "\n"; + // std::cerr << "offsets: "; + // for (const auto & x : offsets) + // std::cerr << x << " "; + // std::cerr << "\n"; + size_t size = offsets.size(); - IColumn::Offset prev_offset = 0; for (size_t i = 0; i < size; ++i) { - IColumn::Offset current_offset = offsets[i]; - writeIntBinary(current_offset - prev_offset, ostr); - prev_offset = current_offset; + size_t group_size = offsets[i] - start; + + // std::cerr << "writing group_size: " << group_size << "\n"; + + writeIntBinary(group_size, ostr); + start += group_size + 1; } + + // std::cerr << "writing start: " << start << ", end: " << end << "\n"; + size_t group_size = start < end ? end - start : 0; + // std::cerr << "writing end group_size: " << group_size << "\n"; + group_size |= END_OF_GRANULE_FLAG; + writeIntBinary(group_size, ostr); } -void deserializeOffsetsPositionIndependent(IColumn::Offsets & offsets, ReadBuffer & istr) +// struct DeserializedRows +// { +// size_t total = 0; +// size_t trailing_defaults = 0; +// }; + +size_t deserializeOffsetsPositionIndependent(IColumn::Offsets & offsets, + ReadBuffer & istr, size_t limit, DeserializeStateSparse & state) { - IColumn::Offset current_offset = 0; + // std::cerr << "limit: " << limit << ", num_trailing: " << state.num_trailing_defaults + // << ", has_value_after_defaults: " << state.has_value_after_defaults << "\n"; + + if (limit && state.num_trailing_defaults >= limit) + { + state.num_trailing_defaults -= limit; + return limit; + } + + size_t total_rows = state.num_trailing_defaults; + if (state.has_value_after_defaults) + { + size_t start_of_group = offsets.empty() ? 0 : offsets.back() + 1; + offsets.push_back(start_of_group + state.num_trailing_defaults); + + state.has_value_after_defaults = false; + state.num_trailing_defaults = 0; + ++total_rows; + } + + size_t group_size; while (!istr.eof()) { - IColumn::Offset current_size = 0; - readIntBinary(current_size, istr); - current_offset += current_size; - offsets.push_back(current_offset); + readIntBinary(group_size, istr); + + bool end_of_granule = group_size & END_OF_GRANULE_FLAG; + group_size &= ~END_OF_GRANULE_FLAG; + + // std::cerr << "read group_size: " << group_size << ", end_of_granule: " << end_of_granule << "\n"; + size_t next_total_rows = total_rows + group_size; + group_size += state.num_trailing_defaults; + + + // std::cerr << "group_size: " << group_size << ", end_of_granule: " << end_of_granule << "\n"; + // std::cerr << "next_total_rows: " << next_total_rows << "\n"; + + if (limit && next_total_rows >= limit) + { + state.num_trailing_defaults = next_total_rows - limit; + state.has_value_after_defaults = !end_of_granule; + return limit; + } + + if (end_of_granule) + { + state.has_value_after_defaults = false; + state.num_trailing_defaults = group_size; + } + else + { + size_t start_of_group = offsets.empty() ? 0 : offsets.back() + 1; + offsets.push_back(start_of_group + group_size); + + state.num_trailing_defaults = 0; + state.has_value_after_defaults = false; + ++next_total_rows; + } + + total_rows = next_total_rows; } + + return total_rows; } } @@ -56,27 +147,13 @@ void SerializationSparse::serializeBinaryBulkStatePrefix( SerializeBinaryBulkSettings & settings, SerializeBinaryBulkStatePtr & state) const { - settings.path.push_back(Substream::SparseElements); - nested_serialization->serializeBinaryBulkStatePrefix(settings, state); - settings.path.pop_back(); -} + auto state_sparse = std::make_shared(); -void SerializationSparse::serializeBinaryBulkStateSuffix( - SerializeBinaryBulkSettings & settings, - SerializeBinaryBulkStatePtr & state) const -{ settings.path.push_back(Substream::SparseElements); - nested_serialization->serializeBinaryBulkStateSuffix(settings, state); + nested_serialization->serializeBinaryBulkStatePrefix(settings, state_sparse->nested); settings.path.pop_back(); -} -void SerializationSparse::deserializeBinaryBulkStatePrefix( - DeserializeBinaryBulkSettings & settings, - DeserializeBinaryBulkStatePtr & state) const -{ - settings.path.push_back(Substream::SparseElements); - nested_serialization->deserializeBinaryBulkStatePrefix(settings, state); - settings.path.pop_back(); + state = std::move(state_sparse); } void SerializationSparse::serializeBinaryBulkWithMultipleStreams( @@ -86,30 +163,67 @@ void SerializationSparse::serializeBinaryBulkWithMultipleStreams( SerializeBinaryBulkSettings & settings, SerializeBinaryBulkStatePtr & state) const { - UNUSED(limit); - UNUSED(offset); - - /// TODO: inefficient. - /// TODO: use limit and offset size_t size = column.size(); + auto * state_sparse = checkAndGetSerializeState(state, *this); + + // std::cerr << "writing column: " << column.dumpStructure() << "\n"; auto offsets_column = DataTypeNumber().createColumn(); auto & offsets_data = assert_cast &>(*offsets_column).getData(); - - column.getIndicesOfNonDefaultValues(offsets_data); - auto values = column.index(*offsets_column, 0); - offsets_data.push_back(size); + column.getIndicesOfNonDefaultValues(offsets_data, offset, limit); settings.path.push_back(Substream::SparseOffsets); if (auto * stream = settings.getter(settings.path)) - serializeOffsetsPositionIndependent(offsets_data, *stream); + { + size_t end = limit && offset + limit < size ? offset + limit : size; + serializeOffsetsPositionIndependent(offsets_data, *stream, offset, end); + } - settings.path.back() = Substream::SparseElements; - nested_serialization->serializeBinaryBulkWithMultipleStreams(*values, 0, 0, settings, state); + if (!offsets_data.empty()) + { + settings.path.back() = Substream::SparseElements; + if (const auto * column_sparse = typeid_cast(&column)) + { + const auto & values = column_sparse->getValuesColumn(); + size_t begin = column_sparse->getValueIndex(offsets_data[0]); + size_t end = column_sparse->getValueIndex(offsets_data.back()); + // std::cerr << "begin: " << begin << ", end: " << end << "\n"; + nested_serialization->serializeBinaryBulkWithMultipleStreams(values, begin, end - begin + 1, settings, state_sparse->nested); + } + else + { + auto values = column.index(*offsets_column, 0); + nested_serialization->serializeBinaryBulkWithMultipleStreams(*values, 0, values->size(), settings, state_sparse->nested); + } + } settings.path.pop_back(); } +void SerializationSparse::serializeBinaryBulkStateSuffix( + SerializeBinaryBulkSettings & settings, + SerializeBinaryBulkStatePtr & state) const +{ + auto * state_sparse = checkAndGetSerializeState(state, *this); + + settings.path.push_back(Substream::SparseElements); + nested_serialization->serializeBinaryBulkStateSuffix(settings, state_sparse->nested); + settings.path.pop_back(); +} + +void SerializationSparse::deserializeBinaryBulkStatePrefix( + DeserializeBinaryBulkSettings & settings, + DeserializeBinaryBulkStatePtr & state) const +{ + auto state_sparse = std::make_shared(); + + settings.path.push_back(Substream::SparseElements); + nested_serialization->deserializeBinaryBulkStatePrefix(settings, state_sparse->nested); + settings.path.pop_back(); + + state = std::move(state_sparse); +} + void SerializationSparse::deserializeBinaryBulkWithMultipleStreams( ColumnPtr & column, size_t limit, @@ -118,38 +232,50 @@ void SerializationSparse::deserializeBinaryBulkWithMultipleStreams( SubstreamsCache * cache) const { settings.path.push_back(Substream::SparseOffsets); - - auto offsets_column = DataTypeNumber().createColumn(); - auto & offsets_data = assert_cast &>(*offsets_column).getData(); - - if (auto * stream = settings.getter(settings.path)) - deserializeOffsetsPositionIndependent(offsets_data, *stream); - - settings.path.back() = Substream::SparseElements; - - ColumnPtr values = column->cloneEmpty(); - nested_serialization->deserializeBinaryBulkWithMultipleStreams(values, limit, settings, state, cache); + auto * state_sparse = checkAndGetDeserializeState(state, *this); auto mutable_column = column->assumeMutable(); - size_t size = values->size(); - ssize_t prev_offset = -1; + auto & column_sparse = assert_cast(*mutable_column); + auto & offsets_data = column_sparse.getOffsetsData(); - for (size_t i = 0; i < size; ++i) - { - size_t offsets_diff = static_cast(offsets_data[i]) - prev_offset; + size_t old_size = offsets_data.size(); - if (offsets_diff > 1) - mutable_column->insertManyDefaults(offsets_diff - 1); + size_t read_rows = 0; + if (auto * stream = settings.getter(settings.path)) + read_rows = deserializeOffsetsPositionIndependent(offsets_data, *stream, limit, *state_sparse); - mutable_column->insertFrom(*values, i); - prev_offset = offsets_data[i]; - } - - size_t offsets_diff = offsets_data[size] - prev_offset; - if (offsets_diff > 1) - mutable_column->insertManyDefaults(offsets_diff - 1); + auto & values_column = column_sparse.getValuesPtr(); + size_t values_limit = offsets_data.size() - old_size; + settings.path.back() = Substream::SparseElements; + nested_serialization->deserializeBinaryBulkWithMultipleStreams(values_column, values_limit, settings, state_sparse->nested, cache); settings.path.pop_back(); + + if (offsets_data.size() + 1 != values_column->size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent sizes of values and offsets in SerializationSparse." + " Offsets size: {}, values size: {}", offsets_data.size(), values_column->size()); + + column_sparse.insertManyDefaults(read_rows); + + // std::cerr << "column_sparse: " << column_sparse.dumpStructure() << "\n"; + // std::cerr << "offsets: "; + // for (const auto & x : column_sparse.getOffsetsData()) + // std::cerr << x << " "; + // std::cerr << "\n"; + + // std::cerr << "values: "; + // for (size_t i = 0; i < column_sparse.getValuesColumn().size(); ++i) + // std::cerr << toString(column_sparse.getValuesColumn()[i]) << " "; + // std::cerr << "\n"; + + column = std::move(mutable_column); } +// void SerializationSparse::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const +// { +// const auto & column_sparse = assert_cast(column); +// const auto & values_column = column_sparse.getValuesColumn(); +// nested_serialization->serializeText(values_column, column_sparse.getValueIndex(row_num), ostr, settings); +// } + } diff --git a/src/DataTypes/Serializations/SerializationSparse.h b/src/DataTypes/Serializations/SerializationSparse.h index a5f8c7547c3..73daf801dd2 100644 --- a/src/DataTypes/Serializations/SerializationSparse.h +++ b/src/DataTypes/Serializations/SerializationSparse.h @@ -10,6 +10,8 @@ class SerializationSparse final : public SerializationWrapper public: SerializationSparse(const SerializationPtr & nested_); + SerializationKind getKind() const override { return SerializationKind::SPARSE; } + void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override; void serializeBinaryBulkStatePrefix( @@ -37,6 +39,9 @@ public: DeserializeBinaryBulkSettings & settings, DeserializeBinaryBulkStatePtr & state, SubstreamsCache * cache) const override; + + // void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override; + }; } diff --git a/src/DataTypes/Serializations/SerializationTuple.cpp b/src/DataTypes/Serializations/SerializationTuple.cpp index bdeea80477e..0a0d1a8ec32 100644 --- a/src/DataTypes/Serializations/SerializationTuple.cpp +++ b/src/DataTypes/Serializations/SerializationTuple.cpp @@ -293,39 +293,6 @@ struct DeserializeBinaryBulkStateTuple : public ISerialization::DeserializeBinar std::vector states; }; -static SerializeBinaryBulkStateTuple * checkAndGetTupleSerializeState(ISerialization::SerializeBinaryBulkStatePtr & state) -{ - if (!state) - throw Exception("Got empty state for DataTypeTuple.", ErrorCodes::LOGICAL_ERROR); - - auto * tuple_state = typeid_cast(state.get()); - if (!tuple_state) - { - auto & state_ref = *state; - throw Exception("Invalid SerializeBinaryBulkState for DataTypeTuple. Expected: " - + demangle(typeid(SerializeBinaryBulkStateTuple).name()) + ", got " - + demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR); - } - - return tuple_state; -} - -static DeserializeBinaryBulkStateTuple * checkAndGetTupleDeserializeState(ISerialization::DeserializeBinaryBulkStatePtr & state) -{ - if (!state) - throw Exception("Got empty state for DataTypeTuple.", ErrorCodes::LOGICAL_ERROR); - - auto * tuple_state = typeid_cast(state.get()); - if (!tuple_state) - { - auto & state_ref = *state; - throw Exception("Invalid DeserializeBinaryBulkState for DataTypeTuple. Expected: " - + demangle(typeid(DeserializeBinaryBulkStateTuple).name()) + ", got " - + demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR); - } - - return tuple_state; -} void SerializationTuple::serializeBinaryBulkStatePrefix( SerializeBinaryBulkSettings & settings, @@ -344,7 +311,7 @@ void SerializationTuple::serializeBinaryBulkStateSuffix( SerializeBinaryBulkSettings & settings, SerializeBinaryBulkStatePtr & state) const { - auto * tuple_state = checkAndGetTupleSerializeState(state); + auto * tuple_state = checkAndGetSerializeState(state, *this); for (size_t i = 0; i < elems.size(); ++i) elems[i]->serializeBinaryBulkStateSuffix(settings, tuple_state->states[i]); @@ -370,7 +337,7 @@ void SerializationTuple::serializeBinaryBulkWithMultipleStreams( SerializeBinaryBulkSettings & settings, SerializeBinaryBulkStatePtr & state) const { - auto * tuple_state = checkAndGetTupleSerializeState(state); + auto * tuple_state = checkAndGetSerializeState(state, *this); for (const auto i : ext::range(0, ext::size(elems))) { @@ -386,7 +353,7 @@ void SerializationTuple::deserializeBinaryBulkWithMultipleStreams( DeserializeBinaryBulkStatePtr & state, SubstreamsCache * cache) const { - auto * tuple_state = checkAndGetTupleDeserializeState(state); + auto * tuple_state = checkAndGetDeserializeState(state, *this); auto mutable_column = column->assumeMutable(); auto & column_tuple = assert_cast(*mutable_column); diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index abff6f21acf..2bc75dc9533 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -590,6 +591,8 @@ void NO_INLINE Aggregator::executeImplBatch( { if (inst->offsets) inst->batch_that->addBatchArray(rows, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool); + else if (inst->has_sparse_arguments) + inst->batch_that->addBatchSparse(places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool); else inst->batch_that->addBatch(rows, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool); } @@ -608,6 +611,8 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl( if (inst->offsets) inst->batch_that->addBatchSinglePlace( inst->offsets[static_cast(rows - 1)], res + inst->state_offset, inst->batch_arguments, arena); + else if(inst->has_sparse_arguments) + inst->batch_that->addBatchSparseSinglePlace(res + inst->state_offset, inst->batch_arguments, arena); else inst->batch_that->addBatchSinglePlace(rows, res + inst->state_offset, inst->batch_arguments, arena); } @@ -643,19 +648,30 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns for (size_t i = 0; i < params.aggregates_size; ++i) { + bool allow_sparse_arguments = aggregate_columns[i].size() == 1; + bool has_sparse_arguments = false; + for (size_t j = 0; j < aggregate_columns[i].size(); ++j) { materialized_columns.push_back(columns.at(params.aggregates[i].arguments[j])->convertToFullColumnIfConst()); aggregate_columns[i][j] = materialized_columns.back().get(); - auto column_no_lc = recursiveRemoveLowCardinality(aggregate_columns[i][j]->getPtr()); - if (column_no_lc.get() != aggregate_columns[i][j]) + auto full_column = allow_sparse_arguments + ? aggregate_columns[i][j]->getPtr() + : aggregate_columns[i][j]->convertToFullColumnIfSparse(); + + full_column = recursiveRemoveLowCardinality(full_column); + if (full_column.get() != aggregate_columns[i][j]) { - materialized_columns.emplace_back(std::move(column_no_lc)); + materialized_columns.emplace_back(std::move(full_column)); aggregate_columns[i][j] = materialized_columns.back().get(); } + + if (typeid_cast(aggregate_columns[i][j])) + has_sparse_arguments = true; } + aggregate_functions_instructions[i].has_sparse_arguments = has_sparse_arguments; aggregate_functions_instructions[i].arguments = aggregate_columns[i].data(); aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i]; auto * that = aggregate_functions[i]; diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index c5bcc1eb27f..e541733ee61 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1038,6 +1038,7 @@ protected: const IAggregateFunction * batch_that; const IColumn ** batch_arguments; const UInt64 * offsets = nullptr; + bool has_sparse_arguments = false; }; using AggregateFunctionInstructions = std::vector; diff --git a/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp b/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp index 0825d9f329e..d962fa74ce5 100644 --- a/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp @@ -157,7 +157,7 @@ void PrettyBlockOutputFormat::write(const Chunk & chunk, PortKind port_kind) Serializations serializations(num_columns); for (size_t i = 0; i < num_columns; ++i) - serializations[i] = header.getByPosition(i).type->getDefaultSerialization(); + serializations[i] = header.getByPosition(i).type->getSerialization(*columns[i]); WidthsPerColumn widths; Widths max_widths; @@ -291,6 +291,8 @@ void PrettyBlockOutputFormat::write(const Chunk & chunk, PortKind port_kind) writeCString(grid_symbols.bar, out); + std::cerr << "current row: " << toString((*columns[0])[i]) << "\n"; + for (size_t j = 0; j < num_columns; ++j) { if (j != 0) diff --git a/src/Processors/Formats/Impl/PrettySpaceBlockOutputFormat.cpp b/src/Processors/Formats/Impl/PrettySpaceBlockOutputFormat.cpp index fa987c6b949..9d9269341e5 100644 --- a/src/Processors/Formats/Impl/PrettySpaceBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/PrettySpaceBlockOutputFormat.cpp @@ -26,7 +26,7 @@ void PrettySpaceBlockOutputFormat::write(const Chunk & chunk, PortKind port_kind Serializations serializations(num_columns); for (size_t i = 0; i < num_columns; ++i) - serializations[i] = header.getByPosition(i).type->getDefaultSerialization(); + serializations[i] = header.getByPosition(i).type->getSerialization(*columns[i]); WidthsPerColumn widths; Widths max_widths; diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 8175a648f64..23b104abd92 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -252,17 +252,16 @@ static void decrementTypeMetric(MergeTreeDataPartType type) IMergeTreeDataPart::IMergeTreeDataPart( - MergeTreeData & storage_, const String & name_, const VolumePtr & volume_, const std::optional & relative_path_, Type part_type_) - : storage(storage_) - , name(name_) - , info(MergeTreePartInfo::fromPartName(name_, storage.format_version)) - , volume(volume_) - , relative_path(relative_path_.value_or(name_)) - , index_granularity_info(storage_, part_type_) - , part_type(part_type_) + const MergeTreeData & storage_, + const String & name_, + const VolumePtr & volume_, + const std::optional & relative_path_, + Type part_type_) + : IMergeTreeDataPart( + storage_, name_, + MergeTreePartInfo::fromPartName(name_, storage_.format_version), + volume_, relative_path_, part_type_) { - incrementStateMetric(state); - incrementTypeMetric(part_type); } IMergeTreeDataPart::IMergeTreeDataPart( @@ -278,6 +277,7 @@ IMergeTreeDataPart::IMergeTreeDataPart( , volume(volume_) , relative_path(relative_path_.value_or(name_)) , index_granularity_info(storage_, part_type_) + , serialization_info(storage_.getSettings()->ratio_for_sparse_serialization) , part_type(part_type_) { incrementStateMetric(state); @@ -563,7 +563,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks if (check_consistency) checkConsistency(require_columns_checksums); loadDefaultCompressionCodec(); - + loadSerializationInfo(); } void IMergeTreeDataPart::loadIndexGranularity() @@ -930,6 +930,16 @@ void IMergeTreeDataPart::loadUUID() } } +void IMergeTreeDataPart::loadSerializationInfo() +{ + String path = getFullRelativePath() + SERIALIZATION_FILE_NAME; + if (volume->getDisk()->exists(path)) + { + auto in = openForReading(volume->getDisk(), path); + serialization_info.read(*in); + } +} + void IMergeTreeDataPart::loadColumns(bool require) { String path = getFullRelativePath() + "columns.txt"; diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 4098077d8de..7f85849688d 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -72,7 +72,7 @@ public: Type part_type_); IMergeTreeDataPart( - MergeTreeData & storage_, + const MergeTreeData & storage_, const String & name_, const VolumePtr & volume, const std::optional & relative_path, @@ -178,6 +178,9 @@ public: mutable String relative_path; MergeTreeIndexGranularityInfo index_granularity_info; + /// TODO: add comment + SerializationInfo serialization_info; + size_t rows_count = 0; @@ -222,8 +225,6 @@ public: TTLInfos ttl_infos; - SerializationInfo serialization_info; - /// Current state of the part. If the part is in working set already, it should be accessed via data_parts mutex void setState(State new_state) const; State getState() const; @@ -360,6 +361,8 @@ public: static inline constexpr auto UUID_FILE_NAME = "uuid.txt"; + static inline constexpr auto SERIALIZATION_FILE_NAME = "serialization.txt"; + /// Checks that all TTLs (table min/max, column ttls, so on) for part /// calculated. Part without calculated TTL may exist if TTL was added after /// part creation (using alter query with materialize_ttl setting). @@ -421,6 +424,8 @@ private: /// Loads ttl infos in json format from file ttl.txt. If file doesn't exists assigns ttl infos with all zeros void loadTTLInfos(); + void loadSerializationInfo(); + void loadPartitionAndMinMaxIndex(); /// Load default compression codec from file default_compression_codec.txt diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index 1cbc9859fe9..d5f34e379a3 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -238,7 +238,7 @@ bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const String MergeTreeDataPartWide::getFileNameForColumn(const NameAndTypePair & column) const { String filename; - auto serialization = column.type->getSerialization(column.name, serialization_info); + auto serialization = getSerializationForColumn(column); serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { if (filename.empty()) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index c7b5051ebd9..2aa517a9833 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -33,8 +33,12 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact( , marks(*marks_file) { const auto & storage_columns = metadata_snapshot->getColumns(); + serializations.reserve(columns_list.size()); for (const auto & column : columns_list) + { + serializations.emplace(column.name, column.type->getDefaultSerialization()); addStreams(column, storage_columns.getCodecDescOrDefault(column.name, default_codec)); + } } void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column, const ASTPtr & effective_codec_desc) @@ -63,7 +67,7 @@ void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column, compressed_streams.emplace(stream_name, stream); }; - column.type->enumerateStreams(column.type->getDefaultSerialization(), callback); + column.type->enumerateStreams(serializations[column.name], callback); } namespace @@ -105,6 +109,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity, /// Write single granule of one column (rows between 2 marks) void writeColumnSingleGranule( const ColumnWithTypeAndName & column, + const SerializationPtr & serialization, ISerialization::OutputStreamGetter stream_getter, size_t from_row, size_t number_of_rows) @@ -116,7 +121,6 @@ void writeColumnSingleGranule( serialize_settings.position_independent_encoding = true; serialize_settings.low_cardinality_max_dictionary_size = 0; - auto serialization = column.type->getDefaultSerialization(); serialization->serializeBinaryBulkStatePrefix(serialize_settings, state); serialization->serializeBinaryBulkWithMultipleStreams(*column.column, from_row, number_of_rows, serialize_settings, state); serialization->serializeBinaryBulkStateSuffix(serialize_settings, state); @@ -203,7 +207,9 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G writeIntBinary(plain_hashing.count(), marks); writeIntBinary(UInt64(0), marks); - writeColumnSingleGranule(block.getByName(name_and_type->name), stream_getter, granule.start_row, granule.rows_to_write); + writeColumnSingleGranule( + block.getByName(name_and_type->name), serializations[name_and_type->name], + stream_getter, granule.start_row, granule.rows_to_write); /// Each type always have at least one substream prev_stream->hashing_buf.next(); //-V522 diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index 2ea35969a4e..2494195223a 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -121,7 +121,7 @@ static size_t computeIndexGranularityImpl( } else { - size_t size_of_row_in_bytes = block_size_in_memory / rows_in_block; + size_t size_of_row_in_bytes = std::max(block_size_in_memory / rows_in_block, 1UL); index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes; } } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h index 704b38ba6d5..d952950e461 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h @@ -132,6 +132,9 @@ protected: MergeTreeIndexAggregators skip_indices_aggregators; std::vector skip_index_accumulated_marks; + using SerializationsMap = std::unordered_map; + SerializationsMap serializations; + std::unique_ptr index_file_stream; std::unique_ptr index_stream; DataTypes index_types; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 04179fbb781..b5b36ab6f44 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -81,7 +81,10 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide( { const auto & columns = metadata_snapshot->getColumns(); for (const auto & it : columns_list) + { + serializations.emplace(it.name, it.type->getSerialization(it.name, data_part->serialization_info)); addStreams(it, columns.getCodecDescOrDefault(it.name, default_codec)); + } } @@ -112,9 +115,7 @@ void MergeTreeDataPartWriterWide::addStreams( settings.max_compress_block_size); }; - auto serialization = column.type->getSerialization(column.name, data_part->serialization_info); - column.type->enumerateStreams(serialization, callback); - serializations.emplace(column.name, std::move(serialization)); + column.type->enumerateStreams(serializations[column.name], callback); } @@ -193,7 +194,14 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm fillIndexGranularity(index_granularity_for_block, block.rows()); } - auto granules_to_write = getGranulesToWrite(index_granularity, block.rows(), getCurrentMark(), rows_written_in_last_mark); + Block block_to_write = block; + for (auto & col : block_to_write) + { + if (serializations[col.name]->getKind() != SerializationKind::SPARSE) + col.column = col.column->convertToFullColumnIfSparse(); + } + + auto granules_to_write = getGranulesToWrite(index_granularity, block_to_write.rows(), getCurrentMark(), rows_written_in_last_mark); auto offset_columns = written_offset_columns ? *written_offset_columns : WrittenOffsetColumns{}; Block primary_key_block; @@ -205,7 +213,7 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm auto it = columns_list.begin(); for (size_t i = 0; i < columns_list.size(); ++i, ++it) { - const ColumnWithTypeAndName & column = block.getByName(it->name); + const ColumnWithTypeAndName & column = block_to_write.getByName(it->name); if (permutation) { @@ -301,7 +309,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule( ISerialization::SerializeBinaryBulkSettings & serialize_settings, const Granule & granule) { - auto serialization = serializations[name_and_type.name]; + const auto & serialization = serializations[name_and_type.name]; serialization->serializeBinaryBulkWithMultipleStreams(column, granule.start_row, granule.rows_to_write, serialize_settings, serialization_state); /// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one. @@ -406,7 +414,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name, size_t mark_num; - auto serialization = type.getDefaultSerialization(); + const auto & serialization = serializations[name]; for (mark_num = 0; !mrk_in.eof(); ++mark_num) { diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index 227c95edfb2..5eaaa0c1bbe 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -110,9 +110,6 @@ private: using ColumnStreams = std::map; ColumnStreams column_streams; - using Serializations = std::map; - Serializations serializations; - /// Non written marks to disk (for each column). Waiting until all rows for /// this marks will be written to disk. using MarksForColumns = std::unordered_map; diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.cpp b/src/Storages/MergeTree/MergeTreeReaderWide.cpp index 0da2f643eb0..efa9e429259 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderWide.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include #include @@ -83,7 +84,12 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si /// The column is already present in the block so we will append the values to the end. bool append = res_columns[pos] != nullptr; if (!append) - res_columns[pos] = type->createColumn(); + { + if (isSparseSerializaion(serializations[name])) + res_columns[pos] = ColumnSparse::create(type->createColumn()); + else + res_columns[pos] = type->createColumn(); + } auto & column = res_columns[pos]; try diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 16657b4083d..7cd02fdf85e 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -32,6 +32,7 @@ struct Settings; M(UInt64, min_rows_for_compact_part, 0, "Experimental. Minimal number of rows to create part in compact format instead of saving it in RAM", 0) \ M(Bool, in_memory_parts_enable_wal, true, "Whether to write blocks in Native format to write-ahead-log before creation in-memory part", 0) \ M(UInt64, write_ahead_log_max_bytes, 1024 * 1024 * 1024, "Rotate WAL, if it exceeds that amount of bytes", 0) \ + M(Float, ratio_for_sparse_serialization, 1.1, "", 0) \ \ /** Merge settings. */ \ M(UInt64, merge_max_block_size, DEFAULT_MERGE_BLOCK_SIZE, "How many rows in blocks should be formed for merge operations.", 0) \ diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index aefbe28b45b..1301eef0421 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -24,6 +24,7 @@ MergedBlockOutputStream::MergedBlockOutputStream( : IMergedBlockOutputStream(data_part, metadata_snapshot_) , columns_list(columns_list_) , default_codec(default_codec_) + , serialization_info(storage.getSettings()->ratio_for_sparse_serialization) { MergeTreeWriterSettings writer_settings( storage.global_context.getSettings(), @@ -147,6 +148,18 @@ void MergedBlockOutputStream::finalizePartOnDisk( removeEmptyColumnsFromPart(new_part, part_columns, checksums); + if (serialization_info.getNumberOfRows() > 0) + { + auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096); + HashingWriteBuffer out_hashing(*out); + serialization_info.write(out_hashing); + checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count(); + checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash(); + out->finalize(); + if (sync) + out->sync(); + } + { /// Write a file with a description of columns. auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096); @@ -156,15 +169,6 @@ void MergedBlockOutputStream::finalizePartOnDisk( out->sync(); } - // if (serialization_info.getNumberOfRows() > 0) - // { - // auto out = volume->getDisk()->writeFile(part_path + "serialization.txt", 4096); - // serialization_info.write(*out); - // out->finalize(); - // if (sync) - // out->sync(); - // } - if (default_codec != nullptr) { auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);