ColumnSparse: initial implementation

This commit is contained in:
Anton Popov 2021-03-12 19:33:41 +03:00
parent f7c7c5a9c7
commit 577d571300
40 changed files with 1206 additions and 252 deletions

View File

@ -361,6 +361,8 @@ public:
}
}
void addManyDefaults(size_t /* length */) const override {}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));

View File

@ -12,6 +12,7 @@
#include <Core/Field.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnSparse.h>
namespace DB
@ -153,11 +154,20 @@ public:
Arena * arena,
ssize_t if_argument_pos = -1) const = 0;
virtual void addBatchSparse(
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena) const = 0;
/** The same for single place.
*/
virtual void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const = 0;
virtual void addBatchSparseSinglePlace(
AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
/** The same for single place when need to aggregate only filtered data.
*/
virtual void addBatchSinglePlaceNotNull(
@ -213,6 +223,13 @@ public:
*/
virtual AggregateFunctionPtr getNestedFunction() const { return {}; }
virtual bool supportsSparseArguments() const { return false; }
virtual void addManyDefaults(size_t /* length */) const
{
throw Exception("Method addManyDefaults is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
const DataTypes & getArgumentTypes() const { return argument_types; }
const Array & getParameters() const { return parameters; }
@ -278,6 +295,32 @@ public:
}
}
void addBatchSparse(
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena) const override
{
const auto & column_sparse = assert_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
const auto & offsets_data = column_sparse.getOffsetsData();
size_t offset_pos = 0;
size_t offsets_size = offsets_data.size();
for (size_t i = 0; i < column_sparse.size(); ++i)
{
if (offset_pos < offsets_size && i == offsets_data[offset_pos])
{
static_cast<const Derived *>(this)->add(places[i] + place_offset, &values, offset_pos + 1, arena);
++offset_pos;
}
else
{
static_cast<const Derived *>(this)->add(places[i] + place_offset, &values, 0, arena);
}
}
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override
{
@ -297,6 +340,18 @@ public:
}
}
void addBatchSparseSinglePlace(
AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
{
const auto & column_sparse = assert_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
for (size_t i = 1; i < values->size(); ++i)
static_cast<const Derived *>(this)->add(place, &values, i, arena);
static_cast<const Derived *>(this)->addManyDefaults(column_sparse.getNumberOfDefaults());
}
void addBatchSinglePlaceNotNull(
size_t batch_size,
AggregateDataPtr place,

View File

@ -1206,7 +1206,7 @@ void ColumnArray::gather(ColumnGathererStream & gatherer)
gatherer.gather(*this);
}
void ColumnArray::getIndicesOfNonDefaultValues(IColumn::Offsets & indices) const
void ColumnArray::getIndicesOfNonDefaultValues(IColumn::Offsets & indices, size_t, size_t) const
{
const auto & offsets_data = getOffsets();
for (size_t i = 0; i < offsets_data.size(); ++i)
@ -1214,14 +1214,14 @@ void ColumnArray::getIndicesOfNonDefaultValues(IColumn::Offsets & indices) const
indices.push_back(i);
}
size_t ColumnArray::getNumberOfNonDefaultValues() const
{
const auto & offsets_data = getOffsets();
size_t res = 0;
for (size_t i = 0; i < offsets_data.size(); ++i)
res += (offsets_data[i] != offsets_data[i - 1]);
// size_t ColumnArray::getNumberOfDefaultRows() const
// {
// const auto & offsets_data = getOffsets();
// size_t res = 0;
// for (size_t i = 0; i < offsets_data.size(); ++i)
// res += (offsets_data[i] != offsets_data[i - 1]);
return res;
}
// return res;
// }
}

View File

@ -139,9 +139,9 @@ public:
return false;
}
void getIndicesOfNonDefaultValues(IColumn::Offsets & indices) const override;
void getIndicesOfNonDefaultValues(IColumn::Offsets & indices, size_t offset, size_t limit) const override;
size_t getNumberOfNonDefaultValues() const override;
// size_t getNumberOfDefaultRows() const override;
bool isCollationSupported() const override { return getData().isCollationSupported(); }

View File

@ -0,0 +1,486 @@
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnsCommon.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ColumnSparse::ColumnSparse(MutableColumnPtr && values_)
: values(std::move(values_)), _size(0)
{
if (!values->empty())
throw Exception("Not empty values passed to ColumnSparse, but no offsets passed", ErrorCodes::LOGICAL_ERROR);
values->insertDefault();
offsets = ColumnUInt64::create();
}
ColumnSparse::ColumnSparse(MutableColumnPtr && values_, MutableColumnPtr && offsets_, size_t size_)
: values(std::move(values_)), offsets(std::move(offsets_)), _size(size_)
{
const ColumnUInt64 * offsets_concrete = typeid_cast<const ColumnUInt64 *>(offsets.get());
if (!offsets_concrete)
throw Exception("offsets_column must be a ColumnUInt64", ErrorCodes::LOGICAL_ERROR);
if (offsets->size() + 1 != values->size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Values size is inconsistent with offsets size. Expected: {}, got {}", offsets->size() + 1, values->size());
}
MutableColumnPtr ColumnSparse::cloneResized(size_t new_size) const
{
if (new_size == 0)
return ColumnSparse::create(values->cloneEmpty());
if (new_size >= _size)
return ColumnSparse::create(IColumn::mutate(values), IColumn::mutate(offsets), new_size);
auto res = ColumnSparse::create(values->cloneEmpty());
res->insertRangeFrom(*this, 0, new_size);
return res;
}
bool ColumnSparse::isNullAt(size_t n) const
{
return values->isNullAt(getValueIndex(n));
}
Field ColumnSparse::operator[](size_t n) const
{
return (*values)[getValueIndex(n)];
}
void ColumnSparse::get(size_t n, Field & res) const
{
values->get(n, res);
}
bool ColumnSparse::getBool(size_t n) const
{
return values->getBool(getValueIndex(n));
}
UInt64 ColumnSparse::get64(size_t n) const
{
return values->get64(getValueIndex(n));
}
StringRef ColumnSparse::getDataAt(size_t n) const
{
return values->getDataAt(getValueIndex(n));
}
ColumnPtr ColumnSparse::convertToFullColumnIfSparse() const
{
auto res = values->cloneEmpty();
const auto & offsets_data = getOffsetsData();
size_t current_offset = 0;
for (size_t i = 0; i < offsets_data.size(); ++i)
{
size_t offsets_diff = offsets_data[i] - current_offset;
current_offset = offsets_data[i];
if (offsets_diff > 1)
res->insertManyDefaults(offsets_diff - 1);
res->insertFrom(*values, i + 1);
}
res->insertManyDefaults(_size - current_offset);
return res;
}
void ColumnSparse::insertData(const char * pos, size_t length)
{
_size += length;
return values->insertData(pos, length);
}
StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
return values->serializeValueIntoArena(getValueIndex(n), arena, begin);
}
const char * ColumnSparse::deserializeAndInsertFromArena(const char * pos)
{
UNUSED(pos);
throwMustBeDense();
}
void ColumnSparse::insertRangeFrom(const IColumn & src, size_t start, size_t length)
{
size_t end = start + length;
auto & offsets_data = getOffsetsData();
if (const auto * src_sparse = typeid_cast<const ColumnSparse *>(&src))
{
const auto & src_offsets = src_sparse->getOffsetsData();
const auto & src_values = src_sparse->getValuesColumn();
size_t offset_start = std::lower_bound(src_offsets.begin(), src_offsets.end(), start) - src_offsets.begin();
size_t offset_end = std::lower_bound(src_offsets.begin(), src_offsets.end(), end) - src_offsets.begin();
insertManyDefaults(offset_start - start);
offsets_data.push_back(_size);
for (size_t i = offset_start + 1; i < offset_end; ++i)
{
size_t current_diff = src_offsets[i] - src_offsets[i - 1];
insertManyDefaults(current_diff - 1);
offsets_data.push_back(_size);
++_size;
}
insertManyDefaults(end - offset_end);
values->insertRangeFrom(src_values, offset_start + 1, offset_end - offset_start);
}
else
{
for (size_t i = start; i < end; ++i)
{
offsets_data.push_back(_size);
++_size;
}
values->insertRangeFrom(src, start, length);
}
}
void ColumnSparse::insert(const Field & x)
{
getOffsetsData().push_back(_size);
values->insert(x);
++_size;
}
void ColumnSparse::insertFrom(const IColumn & src, size_t n)
{
if (const auto * src_sparse = typeid_cast<const ColumnSparse *>(&src))
{
if (size_t value_index = src_sparse->getValueIndex(n))
{
getOffsetsData().push_back(_size);
values->insertFrom(src_sparse->getValuesColumn(), value_index);
}
}
else
{
getOffsetsData().push_back(_size);
values->insertFrom(src, n);
}
++_size;
}
void ColumnSparse::insertDefault()
{
++_size;
}
void ColumnSparse::insertManyDefaults(size_t length)
{
_size += length;
}
void ColumnSparse::popBack(size_t n)
{
assert(n < _size);
auto & offsets_data = getOffsetsData();
size_t new_size = _size - n;
size_t removed_values = 0;
while(!offsets_data.empty() && offsets_data.back() >= new_size)
{
offsets_data.pop_back();
++removed_values;
}
if (removed_values)
values->popBack(removed_values);
_size = new_size;
}
ColumnPtr ColumnSparse::filter(const Filter & filt, ssize_t) const
{
if (_size != filt.size())
throw Exception("Size of filter doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
if (offsets->empty())
{
auto res = cloneEmpty();
res->insertManyDefaults(countBytesInFilter(filt));
return res;
}
const auto & offsets_data = getOffsetsData();
auto res_offsets = offsets->cloneEmpty();
auto & res_offsets_data = assert_cast<ColumnUInt64 &>(*res_offsets).getData();
Filter values_filter;
values_filter.reserve(values->size());
values_filter.push_back(1);
size_t values_result_size_hint = 1;
size_t offset_pos = 0;
size_t res_offset = 0;
for (size_t i = 0; i < _size; ++i)
{
if (offset_pos < offsets_data.size() && i == offsets_data[offset_pos])
{
if (filt[i])
{
res_offsets_data.push_back(res_offset);
values_filter.push_back(1);
++res_offset;
++values_result_size_hint;
}
else
{
values_filter.push_back(0);
}
++offset_pos;
}
else
{
res_offset += filt[i] != 0;
}
}
auto res_values = values->filter(values_filter, values_result_size_hint);
return this->create(std::move(res_values), std::move(res_offsets), res_offset);
}
ColumnPtr ColumnSparse::permute(const Permutation & perm, size_t limit) const
{
UNUSED(perm);
UNUSED(limit);
throwMustBeDense();
}
ColumnPtr ColumnSparse::index(const IColumn & indexes, size_t limit) const
{
UNUSED(indexes);
UNUSED(limit);
throwMustBeDense();
}
int ColumnSparse::compareAt(size_t n, size_t m, const IColumn & rhs_, int null_direction_hint) const
{
UNUSED(n);
UNUSED(m);
UNUSED(rhs_);
UNUSED(null_direction_hint);
std::cerr << "rhs: " << rhs_.dumpStructure() << "\n";
throwMustBeDense();
}
void ColumnSparse::compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const
{
UNUSED(rhs);
UNUSED(rhs_row_num);
UNUSED(row_indexes);
UNUSED(compare_results);
UNUSED(direction);
UNUSED(nan_direction_hint);
throwMustBeDense();
}
int ColumnSparse::compareAtWithCollation(size_t n, size_t m, const IColumn & rhs, int null_direction_hint, const Collator &) const
{
UNUSED(n);
UNUSED(m);
UNUSED(rhs);
UNUSED(null_direction_hint);
throwMustBeDense();
}
bool ColumnSparse::hasEqualValues() const
{
return offsets->size() == 0;
}
void ColumnSparse::getPermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const
{
UNUSED(reverse);
UNUSED(limit);
UNUSED(null_direction_hint);
UNUSED(res);
throwMustBeDense();
}
void ColumnSparse::updatePermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges & equal_range) const
{
UNUSED(reverse);
UNUSED(null_direction_hint);
UNUSED(limit);
UNUSED(res);
UNUSED(equal_range);
throwMustBeDense();
}
void ColumnSparse::getPermutationWithCollation(const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res) const
{
UNUSED(collator);
UNUSED(reverse);
UNUSED(limit);
UNUSED(null_direction_hint);
UNUSED(res);
throwMustBeDense();
}
void ColumnSparse::updatePermutationWithCollation(
const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges& equal_range) const
{
UNUSED(collator);
UNUSED(reverse);
UNUSED(limit);
UNUSED(null_direction_hint);
UNUSED(res);
UNUSED(equal_range);
throwMustBeDense();
}
void ColumnSparse::reserve(size_t)
{
}
size_t ColumnSparse::byteSize() const
{
return values->byteSize() + offsets->byteSize();
}
size_t ColumnSparse::byteSizeAt(size_t n) const
{
size_t index = getValueIndex(n);
size_t res = values->byteSizeAt(index);
if (index)
res += sizeof(UInt64);
return res;
}
size_t ColumnSparse::allocatedBytes() const
{
return values->allocatedBytes() + offsets->allocatedBytes();
}
void ColumnSparse::protect()
{
throwMustBeDense();
}
ColumnPtr ColumnSparse::replicate(const Offsets & replicate_offsets) const
{
UNUSED(replicate_offsets);
throwMustBeDense();
}
void ColumnSparse::updateHashWithValue(size_t n, SipHash & hash) const
{
UNUSED(n);
UNUSED(hash);
throwMustBeDense();
}
void ColumnSparse::updateWeakHash32(WeakHash32 & hash) const
{
UNUSED(hash);
throwMustBeDense();
}
void ColumnSparse::updateHashFast(SipHash & hash) const
{
UNUSED(hash);
throwMustBeDense();
}
void ColumnSparse::getExtremes(Field & min, Field & max) const
{
UNUSED(min);
UNUSED(max);
throwMustBeDense();
}
void ColumnSparse::getIndicesOfNonDefaultValues(IColumn::Offsets & indices, size_t from, size_t limit) const
{
const auto & offsets_data = getOffsetsData();
auto start = from ? std::lower_bound(offsets_data.begin(), offsets_data.end(), from) : offsets_data.begin();
auto end = limit ? std::lower_bound(offsets_data.begin(), offsets_data.end(), from + limit) : offsets_data.end();
indices.assign(start, end);
}
size_t ColumnSparse::getNumberOfDefaultRows(size_t step) const
{
return (_size - offsets->size()) / step;
}
MutableColumns ColumnSparse::scatter(ColumnIndex num_columns, const Selector & selector) const
{
UNUSED(num_columns);
UNUSED(selector);
throwMustBeDense();
}
void ColumnSparse::gather(ColumnGathererStream & gatherer_stream)
{
UNUSED(gatherer_stream);
throwMustBeDense();
}
ColumnPtr ColumnSparse::compress() const
{
throwMustBeDense();
}
bool ColumnSparse::structureEquals(const IColumn & rhs) const
{
UNUSED(rhs);
throwMustBeDense();
}
const IColumn::Offsets & ColumnSparse::getOffsetsData() const
{
return assert_cast<const ColumnUInt64 &>(*offsets).getData();
}
IColumn::Offsets & ColumnSparse::getOffsetsData()
{
return assert_cast<ColumnUInt64 &>(*offsets).getData();
}
size_t ColumnSparse::getValueIndex(size_t n) const
{
assert(n < _size);
const auto & offsets_data = getOffsetsData();
auto it = std::lower_bound(offsets_data.begin(), offsets_data.end(), n);
if (it == offsets_data.end() || *it != n)
return 0;
return it - offsets_data.begin() + 1;
}
}

161
src/Columns/ColumnSparse.h Normal file
View File

@ -0,0 +1,161 @@
#pragma once
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
class Collator;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class ColumnSparse final : public COWHelper<IColumn, ColumnSparse>
{
private:
friend class COWHelper<IColumn, ColumnSparse>;
explicit ColumnSparse(MutableColumnPtr && values_);
ColumnSparse(MutableColumnPtr && values_, MutableColumnPtr && offsets_, size_t size_);
ColumnSparse(const ColumnSparse &) = default;
public:
/** Create immutable column using immutable arguments. This arguments may be shared with other columns.
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
*/
using Base = COWHelper<IColumn, ColumnSparse>;
static Ptr create(const ColumnPtr & values_, const ColumnPtr & offsets_, size_t size_)
{
return Base::create(values_->assumeMutable(), offsets_->assumeMutable(), size_);
}
static MutablePtr create(MutableColumnPtr && values_, MutableColumnPtr && offsets_, size_t size_)
{
return Base::create(std::move(values_), std::move(offsets_), size_);
}
static Ptr create(const ColumnPtr & values_)
{
return Base::create(values_->assumeMutable());
}
template <typename Arg, typename = typename std::enable_if_t<std::is_rvalue_reference_v<Arg &&>>>
static MutablePtr create(Arg && arg)
{
return Base::create(std::forward<Arg>(arg));
}
const char * getFamilyName() const override { return "Sparse"; }
std::string getName() const override { return "Sparse(" + values->getName() + ")"; }
TypeIndex getDataType() const override { return values->getDataType(); }
MutableColumnPtr cloneResized(size_t new_size) const override;
size_t size() const override { return _size; }
bool isNullAt(size_t n) const override;
Field operator[](size_t n) const override;
void get(size_t n, Field & res) const override;
bool getBool(size_t n) const override;
UInt64 get64(size_t n) const override;
StringRef getDataAt(size_t n) const override;
ColumnPtr convertToFullColumnIfSparse() const override;
/// Will insert null value if pos=nullptr
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
void insert(const Field & x) override;
void insertFrom(const IColumn & src, size_t n) override;
void insertDefault() override;
void insertManyDefaults(size_t length) override;
void popBack(size_t n) override;
ColumnPtr filter(const Filter & filt, ssize_t) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
int compareAt(size_t n, size_t m, const IColumn & rhs_, int null_direction_hint) const override;
void compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const override;
int compareAtWithCollation(size_t n, size_t m, const IColumn & rhs, int null_direction_hint, const Collator &) const override;
bool hasEqualValues() const override;
void getPermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const override;
void updatePermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges & equal_range) const override;
void getPermutationWithCollation(const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res) const override;
void updatePermutationWithCollation(
const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges& equal_range) const override;
void reserve(size_t n) override;
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override;
void updateHashFast(SipHash & hash) const override;
void getExtremes(Field & min, Field & max) const override;
void getIndicesOfNonDefaultValues(IColumn::Offsets & indices, size_t from, size_t limit) const override;
size_t getNumberOfDefaultRows(size_t step) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override;
ColumnPtr compress() const override;
void forEachSubcolumn(ColumnCallback callback) override
{
callback(values);
callback(offsets);
}
bool structureEquals(const IColumn & rhs) const override;
bool isNullable() const override { return values->isNullable(); }
bool isFixedAndContiguous() const override { return false; }
bool valuesHaveFixedSize() const override { return values->valuesHaveFixedSize(); }
size_t sizeOfValueIfFixed() const override { return values->sizeOfValueIfFixed() + values->sizeOfValueIfFixed(); }
bool isCollationSupported() const override { return values->isCollationSupported(); }
size_t getNumberOfDefaults() const { return _size - offsets->size(); }
size_t getNumberOfTrailingDefaults() const
{
return offsets->empty() ? _size : _size - getOffsetsData().back() - 1;
}
size_t getValueIndex(size_t n) const;
const IColumn & getValuesColumn() const { return *values; }
IColumn & getValuesColumn() { return *values; }
const ColumnPtr & getValuesPtr() const { return values; }
ColumnPtr & getValuesPtr() { return values; }
const IColumn::Offsets & getOffsetsData() const;
IColumn::Offsets & getOffsetsData();
const ColumnPtr & getOffsetsPtr() const { return offsets; }
ColumnPtr & getOffsetsPtr() { return offsets; }
const IColumn & getOffsetsColumn() const { return *offsets; }
IColumn & getOffsetsColumn() { return *offsets; }
private:
[[noreturn]] void throwMustBeDense() const
{
throw Exception("Not implemented for ColumnSparse", ErrorCodes::LOGICAL_ERROR);
}
WrappedPtr values;
WrappedPtr offsets;
size_t _size;
};
}

View File

@ -530,21 +530,21 @@ void ColumnString::getExtremes(Field & min, Field & max) const
get(max_idx, max);
}
void ColumnString::getIndicesOfNonDefaultValues(Offsets & indices) const
void ColumnString::getIndicesOfNonDefaultValues(Offsets & indices, size_t, size_t) const
{
for (size_t i = 0; i < offsets.size(); ++i)
if (offsets[i] - offsets[i - 1] > 1)
indices.push_back(i);
}
size_t ColumnString::getNumberOfNonDefaultValues() const
{
size_t res = 0;
for (size_t i = 0; i < offsets.size(); ++i)
res += (offsets[i] - offsets[i - 1] > 1);
// size_t ColumnString::getNumberOfDefaultRows() const
// {
// size_t res = 0;
// for (size_t i = 0; i < offsets.size(); ++i)
// res += (offsets[i] - offsets[i - 1] > 1);
return res;
}
// return res;
// }
ColumnPtr ColumnString::compress() const
{

View File

@ -277,8 +277,8 @@ public:
return typeid(rhs) == typeid(ColumnString);
}
void getIndicesOfNonDefaultValues(Offsets & indices) const override;
size_t getNumberOfNonDefaultValues() const override;
void getIndicesOfNonDefaultValues(Offsets & indices, size_t from, size_t limit) const override;
// size_t getNumberOfDefaultRows() const override;
Chars & getChars() { return chars; }
const Chars & getChars() const { return chars; }

View File

@ -303,19 +303,34 @@ public:
return typeid(rhs) == typeid(ColumnVector<T>);
}
void getIndicesOfNonDefaultValues(IColumn::Offsets & offsets) const override
void getIndicesOfNonDefaultValues(IColumn::Offsets & offsets, size_t from, size_t limit) const override
{
offsets.reserve(data.size());
for (size_t i = 0; i < data.size(); ++i)
size_t to = limit && from + limit < size() ? from + limit : size();
for (size_t i = from; i < to; ++i)
if (data[i] != T{})
offsets.push_back(i);
}
size_t getNumberOfNonDefaultValues() const override
void insertAtOffsetsFrom(const IColumn::Offsets & offsets, const IColumn & values, size_t total_rows_hint) override
{
const auto & values_data = assert_cast<const Self &>(values).getData();
ssize_t position = static_cast<ssize_t>(data.size()) - 1;
data.resize_fill(data.size() + total_rows_hint);
for (size_t i = 0; i < offsets.size(); ++i)
{
position += offsets[i] + 1;
data[position] = values_data[i];
}
}
size_t getNumberOfDefaultRows(size_t step) const override
{
size_t res = 0;
for (size_t i = 0; i < data.size(); ++i)
res += (data[i] != T{});
for (size_t i = 0; i < data.size(); i += step)
res += (data[i] == T{});
return res;
}

View File

@ -30,6 +30,17 @@ void IColumn::insertFrom(const IColumn & src, size_t n)
insert(src[n]);
}
void IColumn::insertAtOffsetsFrom(const Offsets & offsets, const IColumn & values, size_t)
{
assert(offsets.size() == values.size());
for (size_t i = 0; i < offsets.size(); ++i)
{
if (offsets[i])
insertManyDefaults(offsets[i]);
insertFrom(values, i);
}
}
bool isColumnNullable(const IColumn & column)
{
return checkColumn<ColumnNullable>(column);

View File

@ -67,6 +67,8 @@ public:
/// If column is ColumnLowCardinality, transforms is to full column.
virtual Ptr convertToFullColumnIfLowCardinality() const { return getPtr(); }
virtual Ptr convertToFullColumnIfSparse() const { return getPtr(); }
/// Creates empty column with the same type.
virtual MutablePtr cloneEmpty() const { return cloneResized(0); }
@ -363,8 +365,14 @@ public:
throw Exception("Method structureEquals is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual void getIndicesOfNonDefaultValues(Offsets & /* offsets */) const {}
virtual size_t getNumberOfNonDefaultValues() const { return 0; }
virtual void getIndicesOfNonDefaultValues(Offsets & /* offsets */, size_t, size_t) const {}
virtual void insertAtOffsetsFrom(const Offsets & offsets, const IColumn & values, size_t total_rows_hint);
static constexpr auto DEFAULT_ROWS_SEARCH_STEP = 8;
static constexpr auto MIN_ROWS_TO_SEARCH_DEFAULTS = DEFAULT_ROWS_SEARCH_STEP * 16;
virtual size_t getNumberOfDefaultRows(size_t /* step */) const { return {}; }
/// Compress column in memory to some representation that allows to decompress it back.
/// Return itself if compression is not applicable for this column type.

0
src/Common/SparseArray.h Normal file
View File

View File

@ -11,6 +11,8 @@
#include <DataStreams/NativeBlockInputStream.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnSparse.h>
namespace DB
{
@ -79,7 +81,7 @@ void NativeBlockInputStream::readData(const IDataType & type, ColumnPtr & column
settings.position_independent_encoding = false;
ISerialization::DeserializeBinaryBulkStatePtr state;
auto serialization = type.getDefaultSerialization();
auto serialization = type.getSerialization(*column);
serialization->deserializeBinaryBulkStatePrefix(settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state);
@ -150,6 +152,10 @@ Block NativeBlockInputStream::readImpl()
readBinary(type_name, istr);
column.type = data_type_factory.get(type_name);
/// TODO: check revision.
SerializationKind serialization_kind;
readIntBinary(serialization_kind, istr);
if (use_index)
{
/// Index allows to do more checks.
@ -161,13 +167,19 @@ Block NativeBlockInputStream::readImpl()
/// Data
ColumnPtr read_column = column.type->createColumn();
if (serialization_kind == SerializationKind::SPARSE)
read_column = ColumnSparse::create(read_column);
double avg_value_size_hint = avg_value_size_hints.empty() ? 0 : avg_value_size_hints[i];
if (rows) /// If no rows, nothing to read.
readData(*column.type, read_column, istr, rows, avg_value_size_hint);
/// TODO: maybe remove.
read_column = read_column->convertToFullColumnIfSparse();
column.column = std::move(read_column);
// std::cerr << "column.column: " << column.column->dumpStructure() << "\n";
if (header)
{
/// Support insert from old clients without low cardinality type.

View File

@ -41,7 +41,7 @@ void NativeBlockOutputStream::flush()
}
void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit)
void NativeBlockOutputStream::writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit)
{
/** If there are columns-constants - then we materialize them.
* (Since the data type does not know how to serialize / deserialize constants.)
@ -53,12 +53,10 @@ void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr
settings.position_independent_encoding = false;
settings.low_cardinality_max_dictionary_size = 0;
auto serialization = type.getDefaultSerialization();
ISerialization::SerializeBinaryBulkStatePtr state;
serialization->serializeBinaryBulkStatePrefix(settings, state);
serialization->serializeBinaryBulkWithMultipleStreams(*full_column, offset, limit, settings, state);
serialization->serializeBinaryBulkStateSuffix(settings, state);
serialization.serializeBinaryBulkStatePrefix(settings, state);
serialization.serializeBinaryBulkWithMultipleStreams(*full_column, offset, limit, settings, state);
serialization.serializeBinaryBulkStateSuffix(settings, state);
}
@ -121,9 +119,13 @@ void NativeBlockOutputStream::write(const Block & block)
writeStringBinary(type_name, ostr);
/// TODO: add revision
auto serialization = column.type->getSerialization(*column.column);
writeIntBinary(serialization->getKind(), ostr);
/// Data
if (rows) /// Zero items of data is always represented as zero number of bytes.
writeData(*column.type, column.column, ostr, 0, 0);
writeData(*serialization, column.column, ostr, 0, 0);
if (index_ostr)
{

View File

@ -30,7 +30,7 @@ public:
void write(const Block & block) override;
void flush() override;
static void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit);
static void writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit);
String getContentType() const override { return "application/octet-stream"; }

View File

@ -340,8 +340,8 @@ SerializationPtr DataTypeTuple::getSerialization(const String & column_name, con
ISerialization::Settings settings =
{
.num_rows = info.getNumberOfRows(),
.num_non_default_rows = info.getNumberOfNonDefaultValues(subcolumn_name),
.min_ratio_for_dense_serialization = 10
.num_default_rows = info.getNumberOfDefaultRows(subcolumn_name),
.ratio_for_sparse_serialization = info.getRatioForSparseSerialization()
};
auto serializaion = elems[i]->getSerialization(settings);

View File

@ -1,5 +1,6 @@
#include <Columns/IColumn.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnSparse.h>
#include <Common/Exception.h>
#include <Common/escapeForFileName.h>
@ -158,13 +159,14 @@ SerializationPtr IDataType::getSubcolumnSerialization(const String & subcolumn_n
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
SerializationPtr IDataType::getSerialization(const String & column_name, const SerializationInfo & info) const
{
ISerialization::Settings settings =
{
.num_rows = info.getNumberOfRows(),
.num_non_default_rows = info.getNumberOfNonDefaultValues(column_name),
.min_ratio_for_dense_serialization = 10
.num_default_rows = info.getNumberOfDefaultRows(column_name),
.ratio_for_sparse_serialization = info.getRatioForSparseSerialization()
};
return getSerialization(settings);
@ -172,11 +174,14 @@ SerializationPtr IDataType::getSerialization(const String & column_name, const S
SerializationPtr IDataType::getSerialization(const IColumn & column) const
{
if (typeid_cast<const ColumnSparse *>(&column))
return getSparseSerialization();
ISerialization::Settings settings =
{
.num_rows = column.size(),
.num_non_default_rows = column.getNumberOfNonDefaultValues(),
.min_ratio_for_dense_serialization = 10
.num_default_rows = column.getNumberOfDefaultRows(IColumn::DEFAULT_ROWS_SEARCH_STEP),
.ratio_for_sparse_serialization = 10
};
return getSerialization(settings);
@ -184,10 +189,9 @@ SerializationPtr IDataType::getSerialization(const IColumn & column) const
SerializationPtr IDataType::getSerialization(const ISerialization::Settings & settings) const
{
// if (settings.num_non_default_rows * settings.min_ratio_for_dense_serialization < settings.num_rows)
// return getSparseSerialization();
UNUSED(settings);
double ratio = settings.num_rows ? std::min(static_cast<double>(settings.num_default_rows) / settings.num_rows, 1.0) : 0.0;
if (ratio > settings.ratio_for_sparse_serialization)
return getSparseSerialization();
return getDefaultSerialization();
}
@ -215,9 +219,6 @@ SerializationPtr IDataType::getSerialization(const String & column_name, const S
if (callback(sparse_idx_name))
return getSparseSerialization();
UNUSED(column_name);
UNUSED(callback);
return getDefaultSerialization();
}
@ -238,4 +239,9 @@ void IDataType::enumerateStreams(const SerializationPtr & serialization, const S
}, path);
}
bool isSparseSerializaion(const SerializationPtr & serialization)
{
return typeid_cast<const SerializationSparse *>(serialization.get());
}
}

View File

@ -75,6 +75,7 @@ public:
using StreamExistenceCallback = std::function<bool(const String &)>;
using BaseSerializationGetter = std::function<SerializationPtr(const IDataType &)>;
virtual SerializationPtr getSerialization(const IColumn & column) const;
virtual SerializationPtr getSerialization(const String & column_name, const StreamExistenceCallback & callback) const;
virtual SerializationPtr getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const;
@ -86,7 +87,7 @@ public:
virtual SerializationPtr getSerialization(const String & column_name, const SerializationInfo & info) const;
SerializationPtr getSerialization(const ISerialization::Settings & settings) const;
SerializationPtr getSerialization(const IColumn & column) const;
// SerializationPtr getSerialization(const IColumn & column) const;
using StreamCallbackWithType = std::function<void(const ISerialization::SubstreamPath &, const IDataType &)>;
@ -503,4 +504,6 @@ template <> inline constexpr bool IsDataTypeDateOrDateTime<DataTypeDate> = true;
template <> inline constexpr bool IsDataTypeDateOrDateTime<DataTypeDateTime> = true;
template <> inline constexpr bool IsDataTypeDateOrDateTime<DataTypeDateTime64> = true;
bool isSparseSerializaion(const SerializationPtr & serialization);
}

View File

@ -2,6 +2,8 @@
#include <Common/COW.h>
#include <Core/Types.h>
#include <common/demangle.h>
#include <Common/typeid_cast.h>
#include <unordered_map>
#include <memory>
@ -9,6 +11,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class IDataType;
class ReadBuffer;
@ -25,6 +32,12 @@ class Field;
struct FormatSettings;
struct NameAndTypePair;
enum class SerializationKind : UInt8
{
DEFAULT = 0,
SPARSE = 1
};
class ISerialization
{
public:
@ -90,6 +103,8 @@ public:
String toString() const;
};
virtual SerializationKind getKind() const { return SerializationKind::DEFAULT; }
/// Cache for common substreams of one type, but possible different its subcolumns.
/// E.g. sizes of arrays of Nested data type.
using SubstreamsCache = std::unordered_map<String, ColumnPtr>;
@ -143,8 +158,8 @@ public:
struct Settings
{
size_t num_rows;
size_t num_non_default_rows;
size_t min_ratio_for_dense_serialization;
size_t num_default_rows;
double ratio_for_sparse_serialization;
};
/// Call before serializeBinaryBulkWithMultipleStreams chain to write something before first mark.
@ -258,9 +273,48 @@ public:
static ColumnPtr getFromSubstreamsCache(SubstreamsCache * cache, const SubstreamPath & path);
static bool isSpecialCompressionAllowed(const SubstreamPath & path);
template <typename State, typename Serialization>
static State * checkAndGetSerializeState(SerializeBinaryBulkStatePtr & state, const Serialization &);
template <typename State, typename Serialization>
static State * checkAndGetDeserializeState(DeserializeBinaryBulkStatePtr & state, const Serialization &);
};
using SerializationPtr = std::shared_ptr<const ISerialization>;
using Serializations = std::vector<SerializationPtr>;
template <typename State, typename Serialization, typename StatePtr>
static State * checkAndGetState(StatePtr & state)
{
if (!state)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Got empty state for {}", demangle(typeid(Serialization).name()));
auto * state_concrete = typeid_cast<State *>(state.get());
if (!state_concrete)
{
auto & state_ref = *state;
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid State for {}. Expected: {}, got {}",
demangle(typeid(Serialization).name()),
demangle(typeid(State).name()),
demangle(typeid(state_ref).name()));
}
return state_concrete;
}
template <typename State, typename Serialization>
State * ISerialization::checkAndGetSerializeState(SerializeBinaryBulkStatePtr & state, const Serialization &)
{
return checkAndGetState<State, Serialization>(state);
}
template <typename State, typename Serialization>
State * ISerialization::checkAndGetDeserializeState(DeserializeBinaryBulkStatePtr & state, const Serialization &)
{
return checkAndGetState<State, Serialization>(state);
}
}

View File

@ -17,17 +17,25 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
SerializationInfo::SerializationInfo(
double ratio_for_sparse_serialization_,
size_t default_rows_search_step_)
: ratio_for_sparse_serialization(ratio_for_sparse_serialization_)
, default_rows_search_step(default_rows_search_step_)
{
}
void SerializationInfo::add(const Block & block)
{
number_of_rows += block.rows();
for (const auto & elem : block)
{
non_default_values[elem.name] = elem.column->getNumberOfNonDefaultValues();
default_rows[elem.name] += elem.column->getNumberOfDefaultRows(default_rows_search_step) * default_rows_search_step;
for (const auto & subname : elem.type->getSubcolumnNames())
{
auto subcolumn = elem.type->getSubcolumn(subname, *elem.column);
auto full_name = Nested::concatenateName(elem.name, subname);
non_default_values[full_name] += subcolumn->getNumberOfNonDefaultValues();
default_rows[full_name] += subcolumn->getNumberOfDefaultRows(default_rows_search_step) * default_rows_search_step;
}
}
}
@ -35,14 +43,14 @@ void SerializationInfo::add(const Block & block)
void SerializationInfo::add(const SerializationInfo & other)
{
number_of_rows += other.number_of_rows;
for (const auto & [name, num] : other.non_default_values)
non_default_values[name] += num;
for (const auto & [name, num] : other.default_rows)
default_rows[name] += num;
}
size_t SerializationInfo::getNumberOfNonDefaultValues(const String & column_name) const
size_t SerializationInfo::getNumberOfDefaultRows(const String & column_name) const
{
auto it = non_default_values.find(column_name);
if (it == non_default_values.end())
auto it = default_rows.find(column_name);
if (it == default_rows.end())
return 0;
return it->second;
}
@ -51,13 +59,15 @@ namespace
{
constexpr auto KEY_NUMBER_OF_ROWS = "number_of_rows";
constexpr auto KEY_NUMBER_OF_NON_DEFAULT_VALUES = "number_of_non_default_values";
constexpr auto KEY_NUMBER_OF_default_rows = "number_of_default_rows";
constexpr auto KEY_NUMBER = "number";
constexpr auto KEY_NAME = "name";
constexpr auto KEY_VERSION = "version";
}
/// TODO: add all fields.
void SerializationInfo::fromJSON(const String & json_str)
{
Poco::JSON::Parser parser;
@ -66,9 +76,9 @@ void SerializationInfo::fromJSON(const String & json_str)
if (object->has(KEY_NUMBER_OF_ROWS))
number_of_rows = object->getValue<size_t>(KEY_NUMBER_OF_ROWS);
if (object->has(KEY_NUMBER_OF_NON_DEFAULT_VALUES))
if (object->has(KEY_NUMBER_OF_default_rows))
{
auto array = object->getArray(KEY_NUMBER_OF_NON_DEFAULT_VALUES);
auto array = object->getArray(KEY_NUMBER_OF_default_rows);
for (const auto & elem : *array)
{
auto elem_object = elem.extract<Poco::JSON::Object::Ptr>();
@ -78,7 +88,7 @@ void SerializationInfo::fromJSON(const String & json_str)
auto name = elem_object->getValue<String>(KEY_NAME);
auto number = elem_object->getValue<size_t>(KEY_NUMBER);
non_default_values[name] = number;
default_rows[name] = number;
}
}
}
@ -90,7 +100,7 @@ String SerializationInfo::toJSON() const
info.set(KEY_NUMBER_OF_ROWS, number_of_rows);
Poco::JSON::Array column_infos;
for (const auto & [name, num] : non_default_values)
for (const auto & [name, num] : default_rows)
{
Poco::JSON::Object column_info;
column_info.set(KEY_NAME, name);
@ -98,7 +108,7 @@ String SerializationInfo::toJSON() const
column_infos.add(std::move(column_info));
}
info.set(KEY_NUMBER_OF_NON_DEFAULT_VALUES, std::move(column_infos));
info.set(KEY_NUMBER_OF_default_rows, std::move(column_infos));
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);

View File

@ -12,11 +12,16 @@ public:
using NameToNumber = std::unordered_map<String, size_t>;
SerializationInfo(
double ratio_for_sparse_serialization_,
size_t default_rows_search_step_ = IColumn::DEFAULT_ROWS_SEARCH_STEP);
void add(const Block & block);
void add(const SerializationInfo & other);
size_t getNumberOfNonDefaultValues(const String & column_name) const;
size_t getNumberOfDefaultRows(const String & column_name) const;
size_t getNumberOfRows() const { return number_of_rows; }
double getRatioForSparseSerialization() const { return ratio_for_sparse_serialization; }
void read(ReadBuffer & in);
void write(WriteBuffer & out) const;
@ -25,8 +30,11 @@ private:
void fromJSON(const String & json_str);
String toJSON() const;
double ratio_for_sparse_serialization;
size_t default_rows_search_step;
size_t number_of_rows = 0;
NameToNumber non_default_values;
NameToNumber default_rows;
};
}

View File

@ -196,42 +196,6 @@ struct DeserializeStateLowCardinality : public ISerialization::DeserializeBinary
explicit DeserializeStateLowCardinality(UInt64 key_version_) : key_version(key_version_) {}
};
static SerializeStateLowCardinality * checkAndGetLowCardinalitySerializeState(
ISerialization::SerializeBinaryBulkStatePtr & state)
{
if (!state)
throw Exception("Got empty state for SerializationLowCardinality.", ErrorCodes::LOGICAL_ERROR);
auto * low_cardinality_state = typeid_cast<SerializeStateLowCardinality *>(state.get());
if (!low_cardinality_state)
{
auto & state_ref = *state;
throw Exception("Invalid SerializeBinaryBulkState for SerializationLowCardinality. Expected: "
+ demangle(typeid(SerializeStateLowCardinality).name()) + ", got "
+ demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR);
}
return low_cardinality_state;
}
static DeserializeStateLowCardinality * checkAndGetLowCardinalityDeserializeState(
ISerialization::DeserializeBinaryBulkStatePtr & state)
{
if (!state)
throw Exception("Got empty state for SerializationLowCardinality.", ErrorCodes::LOGICAL_ERROR);
auto * low_cardinality_state = typeid_cast<DeserializeStateLowCardinality *>(state.get());
if (!low_cardinality_state)
{
auto & state_ref = *state;
throw Exception("Invalid DeserializeBinaryBulkState for SerializationLowCardinality. Expected: "
+ demangle(typeid(DeserializeStateLowCardinality).name()) + ", got "
+ demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR);
}
return low_cardinality_state;
}
void SerializationLowCardinality::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
@ -256,7 +220,7 @@ void SerializationLowCardinality::serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
auto * low_cardinality_state = checkAndGetLowCardinalitySerializeState(state);
auto * low_cardinality_state = checkAndGetSerializeState<SerializeStateLowCardinality>(state, *this);
KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value);
if (low_cardinality_state->shared_dictionary && settings.low_cardinality_max_dictionary_size)
@ -495,7 +459,7 @@ void SerializationLowCardinality::serializeBinaryBulkWithMultipleStreams(
const ColumnLowCardinality & low_cardinality_column = typeid_cast<const ColumnLowCardinality &>(column);
auto * low_cardinality_state = checkAndGetLowCardinalitySerializeState(state);
auto * low_cardinality_state = checkAndGetSerializeState<SerializeStateLowCardinality>(state, *this);
auto & global_dictionary = low_cardinality_state->shared_dictionary;
KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value);
@ -594,7 +558,7 @@ void SerializationLowCardinality::deserializeBinaryBulkWithMultipleStreams(
if (!indexes_stream)
throw Exception("Got empty stream for SerializationLowCardinality indexes.", ErrorCodes::LOGICAL_ERROR);
auto * low_cardinality_state = checkAndGetLowCardinalityDeserializeState(state);
auto * low_cardinality_state = checkAndGetDeserializeState<DeserializeStateLowCardinality>(state, *this);
KeysSerializationVersion::checkVersion(low_cardinality_state->key_version.value);
auto read_dictionary = [this, low_cardinality_state, keys_stream]()

View File

@ -2,6 +2,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnSparse.h>
#include <Common/assert_cast.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -12,28 +13,118 @@ namespace DB
namespace
{
void serializeOffsetsPositionIndependent(const IColumn::Offsets & offsets, WriteBuffer & ostr)
static constexpr auto END_OF_GRANULE_FLAG = 1ULL << 63;
struct SerializeStateSparse : public ISerialization::SerializeBinaryBulkState
{
size_t num_trailing_default_values = 0;
ISerialization::SerializeBinaryBulkStatePtr nested;
};
struct DeserializeStateSparse : public ISerialization::DeserializeBinaryBulkState
{
size_t num_trailing_defaults = 0;
bool has_value_after_defaults = false;
ISerialization::DeserializeBinaryBulkStatePtr nested;
};
void serializeOffsetsPositionIndependent(const IColumn::Offsets & offsets, WriteBuffer & ostr, size_t start, size_t end)
{
// std::cerr << "writing start: " << start << ", end: " << end << "\n";
// std::cerr << "offsets: ";
// for (const auto & x : offsets)
// std::cerr << x << " ";
// std::cerr << "\n";
size_t size = offsets.size();
IColumn::Offset prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
IColumn::Offset current_offset = offsets[i];
writeIntBinary(current_offset - prev_offset, ostr);
prev_offset = current_offset;
size_t group_size = offsets[i] - start;
// std::cerr << "writing group_size: " << group_size << "\n";
writeIntBinary(group_size, ostr);
start += group_size + 1;
}
// std::cerr << "writing start: " << start << ", end: " << end << "\n";
size_t group_size = start < end ? end - start : 0;
// std::cerr << "writing end group_size: " << group_size << "\n";
group_size |= END_OF_GRANULE_FLAG;
writeIntBinary(group_size, ostr);
}
void deserializeOffsetsPositionIndependent(IColumn::Offsets & offsets, ReadBuffer & istr)
// struct DeserializedRows
// {
// size_t total = 0;
// size_t trailing_defaults = 0;
// };
size_t deserializeOffsetsPositionIndependent(IColumn::Offsets & offsets,
ReadBuffer & istr, size_t limit, DeserializeStateSparse & state)
{
IColumn::Offset current_offset = 0;
// std::cerr << "limit: " << limit << ", num_trailing: " << state.num_trailing_defaults
// << ", has_value_after_defaults: " << state.has_value_after_defaults << "\n";
if (limit && state.num_trailing_defaults >= limit)
{
state.num_trailing_defaults -= limit;
return limit;
}
size_t total_rows = state.num_trailing_defaults;
if (state.has_value_after_defaults)
{
size_t start_of_group = offsets.empty() ? 0 : offsets.back() + 1;
offsets.push_back(start_of_group + state.num_trailing_defaults);
state.has_value_after_defaults = false;
state.num_trailing_defaults = 0;
++total_rows;
}
size_t group_size;
while (!istr.eof())
{
IColumn::Offset current_size = 0;
readIntBinary(current_size, istr);
current_offset += current_size;
offsets.push_back(current_offset);
readIntBinary(group_size, istr);
bool end_of_granule = group_size & END_OF_GRANULE_FLAG;
group_size &= ~END_OF_GRANULE_FLAG;
// std::cerr << "read group_size: " << group_size << ", end_of_granule: " << end_of_granule << "\n";
size_t next_total_rows = total_rows + group_size;
group_size += state.num_trailing_defaults;
// std::cerr << "group_size: " << group_size << ", end_of_granule: " << end_of_granule << "\n";
// std::cerr << "next_total_rows: " << next_total_rows << "\n";
if (limit && next_total_rows >= limit)
{
state.num_trailing_defaults = next_total_rows - limit;
state.has_value_after_defaults = !end_of_granule;
return limit;
}
if (end_of_granule)
{
state.has_value_after_defaults = false;
state.num_trailing_defaults = group_size;
}
else
{
size_t start_of_group = offsets.empty() ? 0 : offsets.back() + 1;
offsets.push_back(start_of_group + group_size);
state.num_trailing_defaults = 0;
state.has_value_after_defaults = false;
++next_total_rows;
}
total_rows = next_total_rows;
}
return total_rows;
}
}
@ -56,27 +147,13 @@ void SerializationSparse::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::SparseElements);
nested_serialization->serializeBinaryBulkStatePrefix(settings, state);
settings.path.pop_back();
}
auto state_sparse = std::make_shared<SerializeStateSparse>();
void SerializationSparse::serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::SparseElements);
nested_serialization->serializeBinaryBulkStateSuffix(settings, state);
nested_serialization->serializeBinaryBulkStatePrefix(settings, state_sparse->nested);
settings.path.pop_back();
}
void SerializationSparse::deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::SparseElements);
nested_serialization->deserializeBinaryBulkStatePrefix(settings, state);
settings.path.pop_back();
state = std::move(state_sparse);
}
void SerializationSparse::serializeBinaryBulkWithMultipleStreams(
@ -86,30 +163,67 @@ void SerializationSparse::serializeBinaryBulkWithMultipleStreams(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
UNUSED(limit);
UNUSED(offset);
/// TODO: inefficient.
/// TODO: use limit and offset
size_t size = column.size();
auto * state_sparse = checkAndGetSerializeState<SerializeStateSparse>(state, *this);
// std::cerr << "writing column: " << column.dumpStructure() << "\n";
auto offsets_column = DataTypeNumber<IColumn::Offset>().createColumn();
auto & offsets_data = assert_cast<ColumnVector<IColumn::Offset> &>(*offsets_column).getData();
column.getIndicesOfNonDefaultValues(offsets_data);
auto values = column.index(*offsets_column, 0);
offsets_data.push_back(size);
column.getIndicesOfNonDefaultValues(offsets_data, offset, limit);
settings.path.push_back(Substream::SparseOffsets);
if (auto * stream = settings.getter(settings.path))
serializeOffsetsPositionIndependent(offsets_data, *stream);
{
size_t end = limit && offset + limit < size ? offset + limit : size;
serializeOffsetsPositionIndependent(offsets_data, *stream, offset, end);
}
settings.path.back() = Substream::SparseElements;
nested_serialization->serializeBinaryBulkWithMultipleStreams(*values, 0, 0, settings, state);
if (!offsets_data.empty())
{
settings.path.back() = Substream::SparseElements;
if (const auto * column_sparse = typeid_cast<const ColumnSparse *>(&column))
{
const auto & values = column_sparse->getValuesColumn();
size_t begin = column_sparse->getValueIndex(offsets_data[0]);
size_t end = column_sparse->getValueIndex(offsets_data.back());
// std::cerr << "begin: " << begin << ", end: " << end << "\n";
nested_serialization->serializeBinaryBulkWithMultipleStreams(values, begin, end - begin + 1, settings, state_sparse->nested);
}
else
{
auto values = column.index(*offsets_column, 0);
nested_serialization->serializeBinaryBulkWithMultipleStreams(*values, 0, values->size(), settings, state_sparse->nested);
}
}
settings.path.pop_back();
}
void SerializationSparse::serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
auto * state_sparse = checkAndGetSerializeState<SerializeStateSparse>(state, *this);
settings.path.push_back(Substream::SparseElements);
nested_serialization->serializeBinaryBulkStateSuffix(settings, state_sparse->nested);
settings.path.pop_back();
}
void SerializationSparse::deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const
{
auto state_sparse = std::make_shared<DeserializeStateSparse>();
settings.path.push_back(Substream::SparseElements);
nested_serialization->deserializeBinaryBulkStatePrefix(settings, state_sparse->nested);
settings.path.pop_back();
state = std::move(state_sparse);
}
void SerializationSparse::deserializeBinaryBulkWithMultipleStreams(
ColumnPtr & column,
size_t limit,
@ -118,38 +232,50 @@ void SerializationSparse::deserializeBinaryBulkWithMultipleStreams(
SubstreamsCache * cache) const
{
settings.path.push_back(Substream::SparseOffsets);
auto offsets_column = DataTypeNumber<IColumn::Offset>().createColumn();
auto & offsets_data = assert_cast<ColumnVector<IColumn::Offset> &>(*offsets_column).getData();
if (auto * stream = settings.getter(settings.path))
deserializeOffsetsPositionIndependent(offsets_data, *stream);
settings.path.back() = Substream::SparseElements;
ColumnPtr values = column->cloneEmpty();
nested_serialization->deserializeBinaryBulkWithMultipleStreams(values, limit, settings, state, cache);
auto * state_sparse = checkAndGetDeserializeState<DeserializeStateSparse>(state, *this);
auto mutable_column = column->assumeMutable();
size_t size = values->size();
ssize_t prev_offset = -1;
auto & column_sparse = assert_cast<ColumnSparse &>(*mutable_column);
auto & offsets_data = column_sparse.getOffsetsData();
for (size_t i = 0; i < size; ++i)
{
size_t offsets_diff = static_cast<ssize_t>(offsets_data[i]) - prev_offset;
size_t old_size = offsets_data.size();
if (offsets_diff > 1)
mutable_column->insertManyDefaults(offsets_diff - 1);
size_t read_rows = 0;
if (auto * stream = settings.getter(settings.path))
read_rows = deserializeOffsetsPositionIndependent(offsets_data, *stream, limit, *state_sparse);
mutable_column->insertFrom(*values, i);
prev_offset = offsets_data[i];
}
size_t offsets_diff = offsets_data[size] - prev_offset;
if (offsets_diff > 1)
mutable_column->insertManyDefaults(offsets_diff - 1);
auto & values_column = column_sparse.getValuesPtr();
size_t values_limit = offsets_data.size() - old_size;
settings.path.back() = Substream::SparseElements;
nested_serialization->deserializeBinaryBulkWithMultipleStreams(values_column, values_limit, settings, state_sparse->nested, cache);
settings.path.pop_back();
if (offsets_data.size() + 1 != values_column->size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent sizes of values and offsets in SerializationSparse."
" Offsets size: {}, values size: {}", offsets_data.size(), values_column->size());
column_sparse.insertManyDefaults(read_rows);
// std::cerr << "column_sparse: " << column_sparse.dumpStructure() << "\n";
// std::cerr << "offsets: ";
// for (const auto & x : column_sparse.getOffsetsData())
// std::cerr << x << " ";
// std::cerr << "\n";
// std::cerr << "values: ";
// for (size_t i = 0; i < column_sparse.getValuesColumn().size(); ++i)
// std::cerr << toString(column_sparse.getValuesColumn()[i]) << " ";
// std::cerr << "\n";
column = std::move(mutable_column);
}
// void SerializationSparse::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
// {
// const auto & column_sparse = assert_cast<const ColumnSparse &>(column);
// const auto & values_column = column_sparse.getValuesColumn();
// nested_serialization->serializeText(values_column, column_sparse.getValueIndex(row_num), ostr, settings);
// }
}

View File

@ -10,6 +10,8 @@ class SerializationSparse final : public SerializationWrapper
public:
SerializationSparse(const SerializationPtr & nested_);
SerializationKind getKind() const override { return SerializationKind::SPARSE; }
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void serializeBinaryBulkStatePrefix(
@ -37,6 +39,9 @@ public:
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state,
SubstreamsCache * cache) const override;
// void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override;
};
}

View File

@ -293,39 +293,6 @@ struct DeserializeBinaryBulkStateTuple : public ISerialization::DeserializeBinar
std::vector<ISerialization::DeserializeBinaryBulkStatePtr> states;
};
static SerializeBinaryBulkStateTuple * checkAndGetTupleSerializeState(ISerialization::SerializeBinaryBulkStatePtr & state)
{
if (!state)
throw Exception("Got empty state for DataTypeTuple.", ErrorCodes::LOGICAL_ERROR);
auto * tuple_state = typeid_cast<SerializeBinaryBulkStateTuple *>(state.get());
if (!tuple_state)
{
auto & state_ref = *state;
throw Exception("Invalid SerializeBinaryBulkState for DataTypeTuple. Expected: "
+ demangle(typeid(SerializeBinaryBulkStateTuple).name()) + ", got "
+ demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR);
}
return tuple_state;
}
static DeserializeBinaryBulkStateTuple * checkAndGetTupleDeserializeState(ISerialization::DeserializeBinaryBulkStatePtr & state)
{
if (!state)
throw Exception("Got empty state for DataTypeTuple.", ErrorCodes::LOGICAL_ERROR);
auto * tuple_state = typeid_cast<DeserializeBinaryBulkStateTuple *>(state.get());
if (!tuple_state)
{
auto & state_ref = *state;
throw Exception("Invalid DeserializeBinaryBulkState for DataTypeTuple. Expected: "
+ demangle(typeid(DeserializeBinaryBulkStateTuple).name()) + ", got "
+ demangle(typeid(state_ref).name()), ErrorCodes::LOGICAL_ERROR);
}
return tuple_state;
}
void SerializationTuple::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
@ -344,7 +311,7 @@ void SerializationTuple::serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
auto * tuple_state = checkAndGetTupleSerializeState(state);
auto * tuple_state = checkAndGetSerializeState<SerializeBinaryBulkStateTuple>(state, *this);
for (size_t i = 0; i < elems.size(); ++i)
elems[i]->serializeBinaryBulkStateSuffix(settings, tuple_state->states[i]);
@ -370,7 +337,7 @@ void SerializationTuple::serializeBinaryBulkWithMultipleStreams(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
auto * tuple_state = checkAndGetTupleSerializeState(state);
auto * tuple_state = checkAndGetSerializeState<SerializeBinaryBulkStateTuple>(state, *this);
for (const auto i : ext::range(0, ext::size(elems)))
{
@ -386,7 +353,7 @@ void SerializationTuple::deserializeBinaryBulkWithMultipleStreams(
DeserializeBinaryBulkStatePtr & state,
SubstreamsCache * cache) const
{
auto * tuple_state = checkAndGetTupleDeserializeState(state);
auto * tuple_state = checkAndGetDeserializeState<DeserializeBinaryBulkStateTuple>(state, *this);
auto mutable_column = column->assumeMutable();
auto & column_tuple = assert_cast<ColumnTuple &>(*mutable_column);

View File

@ -8,6 +8,7 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnSparse.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/materializeBlock.h>
#include <IO/WriteBufferFromFile.h>
@ -590,6 +591,8 @@ void NO_INLINE Aggregator::executeImplBatch(
{
if (inst->offsets)
inst->batch_that->addBatchArray(rows, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparse(places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
else
inst->batch_that->addBatch(rows, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
}
@ -608,6 +611,8 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
if (inst->offsets)
inst->batch_that->addBatchSinglePlace(
inst->offsets[static_cast<ssize_t>(rows - 1)], res + inst->state_offset, inst->batch_arguments, arena);
else if(inst->has_sparse_arguments)
inst->batch_that->addBatchSparseSinglePlace(res + inst->state_offset, inst->batch_arguments, arena);
else
inst->batch_that->addBatchSinglePlace(rows, res + inst->state_offset, inst->batch_arguments, arena);
}
@ -643,19 +648,30 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns
for (size_t i = 0; i < params.aggregates_size; ++i)
{
bool allow_sparse_arguments = aggregate_columns[i].size() == 1;
bool has_sparse_arguments = false;
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
{
materialized_columns.push_back(columns.at(params.aggregates[i].arguments[j])->convertToFullColumnIfConst());
aggregate_columns[i][j] = materialized_columns.back().get();
auto column_no_lc = recursiveRemoveLowCardinality(aggregate_columns[i][j]->getPtr());
if (column_no_lc.get() != aggregate_columns[i][j])
auto full_column = allow_sparse_arguments
? aggregate_columns[i][j]->getPtr()
: aggregate_columns[i][j]->convertToFullColumnIfSparse();
full_column = recursiveRemoveLowCardinality(full_column);
if (full_column.get() != aggregate_columns[i][j])
{
materialized_columns.emplace_back(std::move(column_no_lc));
materialized_columns.emplace_back(std::move(full_column));
aggregate_columns[i][j] = materialized_columns.back().get();
}
if (typeid_cast<const ColumnSparse *>(aggregate_columns[i][j]))
has_sparse_arguments = true;
}
aggregate_functions_instructions[i].has_sparse_arguments = has_sparse_arguments;
aggregate_functions_instructions[i].arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];
auto * that = aggregate_functions[i];

View File

@ -1038,6 +1038,7 @@ protected:
const IAggregateFunction * batch_that;
const IColumn ** batch_arguments;
const UInt64 * offsets = nullptr;
bool has_sparse_arguments = false;
};
using AggregateFunctionInstructions = std::vector<AggregateFunctionInstruction>;

View File

@ -157,7 +157,7 @@ void PrettyBlockOutputFormat::write(const Chunk & chunk, PortKind port_kind)
Serializations serializations(num_columns);
for (size_t i = 0; i < num_columns; ++i)
serializations[i] = header.getByPosition(i).type->getDefaultSerialization();
serializations[i] = header.getByPosition(i).type->getSerialization(*columns[i]);
WidthsPerColumn widths;
Widths max_widths;
@ -291,6 +291,8 @@ void PrettyBlockOutputFormat::write(const Chunk & chunk, PortKind port_kind)
writeCString(grid_symbols.bar, out);
std::cerr << "current row: " << toString((*columns[0])[i]) << "\n";
for (size_t j = 0; j < num_columns; ++j)
{
if (j != 0)

View File

@ -26,7 +26,7 @@ void PrettySpaceBlockOutputFormat::write(const Chunk & chunk, PortKind port_kind
Serializations serializations(num_columns);
for (size_t i = 0; i < num_columns; ++i)
serializations[i] = header.getByPosition(i).type->getDefaultSerialization();
serializations[i] = header.getByPosition(i).type->getSerialization(*columns[i]);
WidthsPerColumn widths;
Widths max_widths;

View File

@ -252,17 +252,16 @@ static void decrementTypeMetric(MergeTreeDataPartType type)
IMergeTreeDataPart::IMergeTreeDataPart(
MergeTreeData & storage_, const String & name_, const VolumePtr & volume_, const std::optional<String> & relative_path_, Type part_type_)
: storage(storage_)
, name(name_)
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
, volume(volume_)
, relative_path(relative_path_.value_or(name_))
, index_granularity_info(storage_, part_type_)
, part_type(part_type_)
const MergeTreeData & storage_,
const String & name_,
const VolumePtr & volume_,
const std::optional<String> & relative_path_,
Type part_type_)
: IMergeTreeDataPart(
storage_, name_,
MergeTreePartInfo::fromPartName(name_, storage_.format_version),
volume_, relative_path_, part_type_)
{
incrementStateMetric(state);
incrementTypeMetric(part_type);
}
IMergeTreeDataPart::IMergeTreeDataPart(
@ -278,6 +277,7 @@ IMergeTreeDataPart::IMergeTreeDataPart(
, volume(volume_)
, relative_path(relative_path_.value_or(name_))
, index_granularity_info(storage_, part_type_)
, serialization_info(storage_.getSettings()->ratio_for_sparse_serialization)
, part_type(part_type_)
{
incrementStateMetric(state);
@ -563,7 +563,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
if (check_consistency)
checkConsistency(require_columns_checksums);
loadDefaultCompressionCodec();
loadSerializationInfo();
}
void IMergeTreeDataPart::loadIndexGranularity()
@ -930,6 +930,16 @@ void IMergeTreeDataPart::loadUUID()
}
}
void IMergeTreeDataPart::loadSerializationInfo()
{
String path = getFullRelativePath() + SERIALIZATION_FILE_NAME;
if (volume->getDisk()->exists(path))
{
auto in = openForReading(volume->getDisk(), path);
serialization_info.read(*in);
}
}
void IMergeTreeDataPart::loadColumns(bool require)
{
String path = getFullRelativePath() + "columns.txt";

View File

@ -72,7 +72,7 @@ public:
Type part_type_);
IMergeTreeDataPart(
MergeTreeData & storage_,
const MergeTreeData & storage_,
const String & name_,
const VolumePtr & volume,
const std::optional<String> & relative_path,
@ -178,6 +178,9 @@ public:
mutable String relative_path;
MergeTreeIndexGranularityInfo index_granularity_info;
/// TODO: add comment
SerializationInfo serialization_info;
size_t rows_count = 0;
@ -222,8 +225,6 @@ public:
TTLInfos ttl_infos;
SerializationInfo serialization_info;
/// Current state of the part. If the part is in working set already, it should be accessed via data_parts mutex
void setState(State new_state) const;
State getState() const;
@ -360,6 +361,8 @@ public:
static inline constexpr auto UUID_FILE_NAME = "uuid.txt";
static inline constexpr auto SERIALIZATION_FILE_NAME = "serialization.txt";
/// Checks that all TTLs (table min/max, column ttls, so on) for part
/// calculated. Part without calculated TTL may exist if TTL was added after
/// part creation (using alter query with materialize_ttl setting).
@ -421,6 +424,8 @@ private:
/// Loads ttl infos in json format from file ttl.txt. If file doesn't exists assigns ttl infos with all zeros
void loadTTLInfos();
void loadSerializationInfo();
void loadPartitionAndMinMaxIndex();
/// Load default compression codec from file default_compression_codec.txt

View File

@ -238,7 +238,7 @@ bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const
String MergeTreeDataPartWide::getFileNameForColumn(const NameAndTypePair & column) const
{
String filename;
auto serialization = column.type->getSerialization(column.name, serialization_info);
auto serialization = getSerializationForColumn(column);
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
if (filename.empty())

View File

@ -33,8 +33,12 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
, marks(*marks_file)
{
const auto & storage_columns = metadata_snapshot->getColumns();
serializations.reserve(columns_list.size());
for (const auto & column : columns_list)
{
serializations.emplace(column.name, column.type->getDefaultSerialization());
addStreams(column, storage_columns.getCodecDescOrDefault(column.name, default_codec));
}
}
void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column, const ASTPtr & effective_codec_desc)
@ -63,7 +67,7 @@ void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column,
compressed_streams.emplace(stream_name, stream);
};
column.type->enumerateStreams(column.type->getDefaultSerialization(), callback);
column.type->enumerateStreams(serializations[column.name], callback);
}
namespace
@ -105,6 +109,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
/// Write single granule of one column (rows between 2 marks)
void writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
const SerializationPtr & serialization,
ISerialization::OutputStreamGetter stream_getter,
size_t from_row,
size_t number_of_rows)
@ -116,7 +121,6 @@ void writeColumnSingleGranule(
serialize_settings.position_independent_encoding = true;
serialize_settings.low_cardinality_max_dictionary_size = 0;
auto serialization = column.type->getDefaultSerialization();
serialization->serializeBinaryBulkStatePrefix(serialize_settings, state);
serialization->serializeBinaryBulkWithMultipleStreams(*column.column, from_row, number_of_rows, serialize_settings, state);
serialization->serializeBinaryBulkStateSuffix(serialize_settings, state);
@ -203,7 +207,9 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
writeIntBinary(plain_hashing.count(), marks);
writeIntBinary(UInt64(0), marks);
writeColumnSingleGranule(block.getByName(name_and_type->name), stream_getter, granule.start_row, granule.rows_to_write);
writeColumnSingleGranule(
block.getByName(name_and_type->name), serializations[name_and_type->name],
stream_getter, granule.start_row, granule.rows_to_write);
/// Each type always have at least one substream
prev_stream->hashing_buf.next(); //-V522

View File

@ -121,7 +121,7 @@ static size_t computeIndexGranularityImpl(
}
else
{
size_t size_of_row_in_bytes = block_size_in_memory / rows_in_block;
size_t size_of_row_in_bytes = std::max(block_size_in_memory / rows_in_block, 1UL);
index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes;
}
}

View File

@ -132,6 +132,9 @@ protected:
MergeTreeIndexAggregators skip_indices_aggregators;
std::vector<size_t> skip_index_accumulated_marks;
using SerializationsMap = std::unordered_map<String, SerializationPtr>;
SerializationsMap serializations;
std::unique_ptr<WriteBufferFromFileBase> index_file_stream;
std::unique_ptr<HashingWriteBuffer> index_stream;
DataTypes index_types;

View File

@ -81,7 +81,10 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
{
const auto & columns = metadata_snapshot->getColumns();
for (const auto & it : columns_list)
{
serializations.emplace(it.name, it.type->getSerialization(it.name, data_part->serialization_info));
addStreams(it, columns.getCodecDescOrDefault(it.name, default_codec));
}
}
@ -112,9 +115,7 @@ void MergeTreeDataPartWriterWide::addStreams(
settings.max_compress_block_size);
};
auto serialization = column.type->getSerialization(column.name, data_part->serialization_info);
column.type->enumerateStreams(serialization, callback);
serializations.emplace(column.name, std::move(serialization));
column.type->enumerateStreams(serializations[column.name], callback);
}
@ -193,7 +194,14 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
fillIndexGranularity(index_granularity_for_block, block.rows());
}
auto granules_to_write = getGranulesToWrite(index_granularity, block.rows(), getCurrentMark(), rows_written_in_last_mark);
Block block_to_write = block;
for (auto & col : block_to_write)
{
if (serializations[col.name]->getKind() != SerializationKind::SPARSE)
col.column = col.column->convertToFullColumnIfSparse();
}
auto granules_to_write = getGranulesToWrite(index_granularity, block_to_write.rows(), getCurrentMark(), rows_written_in_last_mark);
auto offset_columns = written_offset_columns ? *written_offset_columns : WrittenOffsetColumns{};
Block primary_key_block;
@ -205,7 +213,7 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
{
const ColumnWithTypeAndName & column = block.getByName(it->name);
const ColumnWithTypeAndName & column = block_to_write.getByName(it->name);
if (permutation)
{
@ -301,7 +309,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
ISerialization::SerializeBinaryBulkSettings & serialize_settings,
const Granule & granule)
{
auto serialization = serializations[name_and_type.name];
const auto & serialization = serializations[name_and_type.name];
serialization->serializeBinaryBulkWithMultipleStreams(column, granule.start_row, granule.rows_to_write, serialize_settings, serialization_state);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
@ -406,7 +414,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
size_t mark_num;
auto serialization = type.getDefaultSerialization();
const auto & serialization = serializations[name];
for (mark_num = 0; !mrk_in.eof(); ++mark_num)
{

View File

@ -110,9 +110,6 @@ private:
using ColumnStreams = std::map<String, StreamPtr>;
ColumnStreams column_streams;
using Serializations = std::map<String, SerializationPtr>;
Serializations serializations;
/// Non written marks to disk (for each column). Waiting until all rows for
/// this marks will be written to disk.
using MarksForColumns = std::unordered_map<String, StreamsWithMarks>;

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/MergeTreeReaderWide.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnSparse.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/inplaceBlockConversions.h>
@ -83,7 +84,12 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
/// The column is already present in the block so we will append the values to the end.
bool append = res_columns[pos] != nullptr;
if (!append)
res_columns[pos] = type->createColumn();
{
if (isSparseSerializaion(serializations[name]))
res_columns[pos] = ColumnSparse::create(type->createColumn());
else
res_columns[pos] = type->createColumn();
}
auto & column = res_columns[pos];
try

View File

@ -32,6 +32,7 @@ struct Settings;
M(UInt64, min_rows_for_compact_part, 0, "Experimental. Minimal number of rows to create part in compact format instead of saving it in RAM", 0) \
M(Bool, in_memory_parts_enable_wal, true, "Whether to write blocks in Native format to write-ahead-log before creation in-memory part", 0) \
M(UInt64, write_ahead_log_max_bytes, 1024 * 1024 * 1024, "Rotate WAL, if it exceeds that amount of bytes", 0) \
M(Float, ratio_for_sparse_serialization, 1.1, "", 0) \
\
/** Merge settings. */ \
M(UInt64, merge_max_block_size, DEFAULT_MERGE_BLOCK_SIZE, "How many rows in blocks should be formed for merge operations.", 0) \

View File

@ -24,6 +24,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
: IMergedBlockOutputStream(data_part, metadata_snapshot_)
, columns_list(columns_list_)
, default_codec(default_codec_)
, serialization_info(storage.getSettings()->ratio_for_sparse_serialization)
{
MergeTreeWriterSettings writer_settings(
storage.global_context.getSettings(),
@ -147,6 +148,18 @@ void MergedBlockOutputStream::finalizePartOnDisk(
removeEmptyColumnsFromPart(new_part, part_columns, checksums);
if (serialization_info.getNumberOfRows() > 0)
{
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096);
HashingWriteBuffer out_hashing(*out);
serialization_info.write(out_hashing);
checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash();
out->finalize();
if (sync)
out->sync();
}
{
/// Write a file with a description of columns.
auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096);
@ -156,15 +169,6 @@ void MergedBlockOutputStream::finalizePartOnDisk(
out->sync();
}
// if (serialization_info.getNumberOfRows() > 0)
// {
// auto out = volume->getDisk()->writeFile(part_path + "serialization.txt", 4096);
// serialization_info.write(*out);
// out->finalize();
// if (sync)
// out->sync();
// }
if (default_codec != nullptr)
{
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);