Added pre and post serialization for IDataType BinaryBulkWithMultipleStreams. Supported dictionary serialization format with single global dictionary (limited wit max_dictionary_size) and additional keys which are stored per granule. Changed IDataType::enumerateStream interface. Added (de)serialization params for binary bulk with multiple stream (de)serialization. Changed IColumn::index interface.

This commit is contained in:
Nikolai Kochetov 2018-06-07 21:14:37 +03:00
parent 58cbcbd5c3
commit f56d16769b
46 changed files with 1364 additions and 510 deletions

View File

@ -162,7 +162,7 @@ ColumnPtr ColumnAggregateFunction::permute(const Permutation & perm, size_t limi
return std::move(res);
}
ColumnPtr ColumnAggregateFunction::index(const ColumnPtr & indexes, size_t limit) const
ColumnPtr ColumnAggregateFunction::index(const IColumn & indexes, size_t limit) const
{
return selectIndexImpl(*this, indexes, limit);
}

View File

@ -156,7 +156,7 @@ public:
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const ColumnPtr & indexes, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
template <typename Type>
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;

View File

@ -626,7 +626,7 @@ ColumnPtr ColumnArray::permute(const Permutation & perm, size_t limit) const
return std::move(res);
}
ColumnPtr ColumnArray::index(const ColumnPtr & indexes, size_t limit) const
ColumnPtr ColumnArray::index(const IColumn & indexes, size_t limit) const
{
return selectIndexImpl(*this, indexes, limit);
}
@ -657,7 +657,7 @@ ColumnPtr ColumnArray::indexImpl(const PaddedPODArray<T> & indexes, size_t limit
}
if (current_offset != 0)
res->data = data->index(std::move(nested_indexes_column), current_offset);
res->data = data->index(*nested_indexes_column, current_offset);
return std::move(res);
}

View File

@ -71,7 +71,7 @@ public:
void popBack(size_t n) override;
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const ColumnPtr & indexes, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
template <typename Type> ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override;
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;

View File

@ -63,13 +63,13 @@ ColumnPtr ColumnConst::permute(const Permutation & perm, size_t limit) const
return ColumnConst::create(data, limit);
}
ColumnPtr ColumnConst::index(const ColumnPtr & indexes, size_t limit) const
ColumnPtr ColumnConst::index(const IColumn & indexes, size_t limit) const
{
if (limit == 0)
limit = indexes->size();
limit = indexes.size();
if (indexes->size() < limit)
throw Exception("Size of indexes (" + toString(indexes->size()) + ") is less than required (" + toString(limit) + ")",
if (indexes.size() < limit)
throw Exception("Size of indexes (" + toString(indexes.size()) + ") is less than required (" + toString(limit) + ")",
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
return ColumnConst::create(data, limit);

View File

@ -153,7 +153,7 @@ public:
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
ColumnPtr replicate(const Offsets & offsets) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const ColumnPtr & indexes, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
size_t byteSize() const override

View File

@ -260,7 +260,7 @@ ColumnPtr ColumnFixedString::permute(const Permutation & perm, size_t limit) con
}
ColumnPtr ColumnFixedString::index(const ColumnPtr & indexes, size_t limit) const
ColumnPtr ColumnFixedString::index(const IColumn & indexes, size_t limit) const
{
return selectIndexImpl(*this, indexes, limit);
}

View File

@ -108,7 +108,7 @@ public:
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const ColumnPtr & indexes, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
template <typename Type>
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;

View File

@ -86,7 +86,7 @@ ColumnPtr ColumnFunction::permute(const Permutation & perm, size_t limit) const
return ColumnFunction::create(limit, function, capture);
}
ColumnPtr ColumnFunction::index(const ColumnPtr & indexes, size_t limit) const
ColumnPtr ColumnFunction::index(const IColumn & indexes, size_t limit) const
{
ColumnsWithTypeAndName capture = captured_columns;
for (auto & column : capture)

View File

@ -33,7 +33,7 @@ public:
ColumnPtr replicate(const Offsets & offsets) const override;
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const ColumnPtr & indexes, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
void insertDefault() override;
void popBack(size_t n) override;
std::vector<MutableColumnPtr> scatter(IColumn::ColumnIndex num_columns,

View File

@ -166,7 +166,7 @@ ColumnPtr ColumnNullable::permute(const Permutation & perm, size_t limit) const
return ColumnNullable::create(permuted_data, permuted_null_map);
}
ColumnPtr ColumnNullable::index(const ColumnPtr & indexes, size_t limit) const
ColumnPtr ColumnNullable::index(const IColumn & indexes, size_t limit) const
{
ColumnPtr indexed_data = getNestedColumn().index(indexes, limit);
ColumnPtr indexed_null_map = getNullMapColumn().index(indexes, limit);

View File

@ -65,7 +65,7 @@ public:
void popBack(size_t n) override;
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const ColumnPtr & indexes, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
int compareAt(size_t n, size_t m, const IColumn & rhs_, int null_direction_hint) const override;
void getPermutation(bool reverse, size_t limit, int null_direction_hint, Permutation & res) const override;
void reserve(size_t n) override;

View File

@ -162,7 +162,7 @@ ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
}
ColumnPtr ColumnString::index(const ColumnPtr & indexes, size_t limit) const
ColumnPtr ColumnString::index(const IColumn & indexes, size_t limit) const
{
return selectIndexImpl(*this, indexes, limit);
}

View File

@ -220,7 +220,7 @@ public:
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const ColumnPtr & indexes, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
template <typename Type>
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;

View File

@ -179,7 +179,7 @@ ColumnPtr ColumnTuple::permute(const Permutation & perm, size_t limit) const
return ColumnTuple::create(new_columns);
}
ColumnPtr ColumnTuple::index(const ColumnPtr & indexes, size_t limit) const
ColumnPtr ColumnTuple::index(const IColumn & indexes, size_t limit) const
{
const size_t tuple_size = columns.size();
Columns new_columns(tuple_size);

View File

@ -60,7 +60,7 @@ public:
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const ColumnPtr & indexes, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
ColumnPtr replicate(const Offsets & offsets) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override;

View File

@ -55,7 +55,7 @@ class ColumnUnique final : public COWPtrHelper<IColumnUnique, ColumnUnique<Colum
private:
explicit ColumnUnique(MutableColumnPtr && holder);
explicit ColumnUnique(const DataTypePtr & type);
explicit ColumnUnique(const IDataType & type);
ColumnUnique(const ColumnUnique & other)
: column_holder(other.column_holder), nullable_column(other.nullable_column)
, nullable_column_map(other.nullable_column_map), is_nullable(other.is_nullable) {}
@ -64,10 +64,13 @@ public:
const ColumnPtr & getNestedColumn() const override;
size_t uniqueInsert(const Field & x) override;
size_t uniqueInsertFrom(const IColumn & src, size_t n) override;
ColumnPtr uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length) override;
MutableColumnPtr uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length) override;
IColumnUnique::IndexesWithOverflow uniqueInsertRangeWithOverflow(const IColumn & src, size_t start, size_t length,
size_t max_dictionary_size) override;
size_t uniqueInsertData(const char * pos, size_t length) override;
size_t uniqueInsertDataWithTerminatingZero(const char * pos, size_t length) override;
size_t uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) override;
IColumnUnique::SerializableState getSerializableState() const override;
size_t getDefaultValueIndex() const override { return is_nullable ? 1 : 0; }
size_t getNullValueIndex() const override;
@ -146,21 +149,28 @@ private:
const ColumnType * getRawColumnPtr() const { return static_cast<ColumnType *>(column_holder.get()); }
IndexType insertIntoMap(const StringRefWrapper<ColumnType> & ref, IndexType value);
void uniqueInsertRangeImpl(
const IColumn & src,
size_t start,
size_t length,
typename ColumnVector<IndexType>::Container & positions,
ColumnType * overflowed_keys,
size_t max_dictionary_size);
};
template <typename ColumnType, typename IndexType>
ColumnUnique<ColumnType, IndexType>::ColumnUnique(const DataTypePtr & type) : is_nullable(type->isNullable())
ColumnUnique<ColumnType, IndexType>::ColumnUnique(const IDataType & type) : is_nullable(type.isNullable())
{
if (is_nullable)
{
nullable_column = type->createColumn()->cloneResized(numSpecialValues());
nullable_column = type.createColumn()->cloneResized(numSpecialValues());
auto & column_nullable = static_cast<ColumnNullable &>(nullable_column->assumeMutableRef());
column_holder = column_nullable.getNestedColumnPtr();
nullable_column_map = &column_nullable.getNullMapData();
(*nullable_column_map)[getDefaultValueIndex()] = 0;
}
else
column_holder = type->createColumn()->cloneResized(numSpecialValues());
column_holder = type.createColumn()->cloneResized(numSpecialValues());
}
template <typename ColumnType, typename IndexType>
@ -330,7 +340,13 @@ size_t ColumnUnique<ColumnType, IndexType>::uniqueDeserializeAndInsertFromArena(
}
template <typename ColumnType, typename IndexType>
ColumnPtr ColumnUnique<ColumnType, IndexType>::uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length)
void ColumnUnique<ColumnType, IndexType>::uniqueInsertRangeImpl(
const IColumn & src,
size_t start,
size_t length,
typename ColumnVector<IndexType>::Container & positions,
ColumnType * overflowed_keys,
size_t max_dictionary_size)
{
if (!index)
buildIndex();
@ -347,9 +363,11 @@ ColumnPtr ColumnUnique<ColumnType, IndexType>::uniqueInsertRangeFrom(const IColu
else
src_column = static_cast<const ColumnType *>(&src);
std::unique_ptr<IndexMapType> secondary_index;
if (overflowed_keys)
secondary_index = std::make_unique<IndexMapType>();
auto column = getRawColumnPtr();
auto positions_column = ColumnVector<IndexType>::create(length);
auto & positions = positions_column->getData();
size_t next_position = column->size();
for (auto i : ext::range(0, length))
@ -365,18 +383,80 @@ ColumnPtr ColumnUnique<ColumnType, IndexType>::uniqueInsertRangeFrom(const IColu
auto it = index->find(StringRefWrapper<ColumnType>(src_column, row));
if (it == index->end())
{
positions[i] = next_position;
auto ref = src_column->getDataAt(row);
column->insertData(ref.data, ref.size);
(*index)[StringRefWrapper<ColumnType>(column, next_position)] = next_position;
++next_position;
if (overflowed_keys && next_position >= max_dictionary_size + numSpecialValues())
{
auto jt = secondary_index->find(StringRefWrapper<ColumnType>(src_column, row));
if (jt == secondary_index->end())
{
positions[i] = next_position;
auto ref = src_column->getDataAt(row);
overflowed_keys->insertData(ref.data, ref.size);
(*secondary_index)[StringRefWrapper<ColumnType>(src_column, row)] = next_position;
++next_position;
}
else
positions[i] = jt->second;
}
else
{
positions[i] = next_position;
auto ref = src_column->getDataAt(row);
column->insertData(ref.data, ref.size);
(*index)[StringRefWrapper<ColumnType>(column, next_position)] = next_position;
++next_position;
}
}
else
positions[i] = it->second;
}
}
}
template <typename ColumnType, typename IndexType>
MutableColumnPtr ColumnUnique<ColumnType, IndexType>::uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length)
{
auto positions_column = ColumnVector<IndexType>::create(length);
auto & positions = positions_column->getData();
uniqueInsertRangeImpl(src, start, length, positions, nullptr, 0);
return positions_column;
}
template <typename ColumnType, typename IndexType>
IColumnUnique::IndexesWithOverflow ColumnUnique<ColumnType, IndexType>::uniqueInsertRangeWithOverflow(
const IColumn & src,
size_t start,
size_t length,
size_t max_dictionary_size)
{
auto positions_column = ColumnVector<IndexType>::create(length);
auto overflowed_keys = column_holder->cloneEmpty();
auto & positions = positions_column->getData();
auto overflowed_keys_ptr = typeid_cast<ColumnType *>(overflowed_keys.get());
if (!overflowed_keys_ptr)
throw Exception("Invalid keys type for ColumnUnique.", ErrorCodes::LOGICAL_ERROR);
uniqueInsertRangeImpl(src, start, length, positions, overflowed_keys_ptr, max_dictionary_size);
IColumnUnique::IndexesWithOverflow indexes_with_overflow;
indexes_with_overflow.indexes = std::move(positions_column);
indexes_with_overflow.overflowed_keys = std::move(overflowed_keys);
return indexes_with_overflow;
}
template <typename ColumnType, typename IndexType>
IColumnUnique::SerializableState ColumnUnique<ColumnType, IndexType>::getSerializableState() const
{
IColumnUnique::SerializableState state;
state.column = column_holder;
state.offset = numSpecialValues();
state.limit = column_holder->size() - state.offset;
return state;
}
};

View File

@ -233,7 +233,7 @@ ColumnPtr ColumnVector<T>::permute(const IColumn::Permutation & perm, size_t lim
}
template <typename T>
ColumnPtr ColumnVector<T>::index(const ColumnPtr & indexes, size_t limit) const
ColumnPtr ColumnVector<T>::index(const IColumn & indexes, size_t limit) const
{
return selectIndexImpl(*this, indexes, limit);
}

View File

@ -252,7 +252,7 @@ public:
ColumnPtr permute(const IColumn::Permutation & perm, size_t limit) const override;
ColumnPtr index(const ColumnPtr & indexes, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
template <typename Type>
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;

View File

@ -5,7 +5,7 @@ namespace DB
{
ColumnWithDictionary::ColumnWithDictionary(MutableColumnPtr && column_unique_, MutableColumnPtr && indexes_)
: column_unique(std::move(column_unique_)), indexes(std::move(indexes_))
: column_unique(std::move(column_unique_)), indexes(std::move(indexes_))
{
if (!dynamic_cast<const IColumnUnique *>(column_unique.get()))
throw Exception("ColumnUnique expected as argument of ColumnWithDictionary.", ErrorCodes::ILLEGAL_COLUMN);

View File

@ -39,7 +39,7 @@ public:
ColumnPtr convertToFullColumn() const
{
return getUnique()->getNestedColumn()->index(indexes, 0);
return getUnique()->getNestedColumn()->index(*indexes, 0);
}
ColumnPtr convertToFullColumnIfWithDictionary() const override { return convertToFullColumn(); }
@ -102,7 +102,7 @@ public:
auto & src_with_dict = static_cast<const ColumnWithDictionary &>(src);
auto & src_nested = src_with_dict.getUnique()->getNestedColumn();
auto inserted_idx = getUnique()->uniqueInsertRangeFrom(*src_nested, 0, src_nested->size());
auto idx = inserted_idx->index(src_with_dict.getIndexes()->cut(start, length), 0);
auto idx = inserted_idx->index(*src_with_dict.getIndexes()->cut(start, length), 0);
getIndexes()->insertRangeFrom(*idx, 0, length);
}
@ -150,7 +150,7 @@ public:
return ColumnWithDictionary::create(column_unique, indexes->permute(perm, limit));
}
ColumnPtr index(const ColumnPtr & indexes_, size_t limit) const override
ColumnPtr index(const IColumn & indexes_, size_t limit) const override
{
return ColumnWithDictionary::create(column_unique, indexes->index(indexes_, limit));
}
@ -233,13 +233,14 @@ public:
IColumnUnique * getUnique() { return static_cast<IColumnUnique *>(column_unique->assumeMutable().get()); }
const IColumnUnique * getUnique() const { return static_cast<const IColumnUnique *>(column_unique->assumeMutable().get()); }
const ColumnPtr & getUniquePtr() const { return column_unique; }
ColumnPtr getUniquePtr() const { return column_unique; }
IColumn * getIndexes() { return indexes->assumeMutable().get(); }
const IColumn * getIndexes() const { return indexes.get(); }
const ColumnPtr & getIndexesPtr() const { return indexes; }
void setIndexes(MutableColumnPtr && indexes_) { indexes = std::move(indexes_); }
void setUnique(const ColumnPtr & unique) { column_unique = unique; }
bool withDictionary() const override { return true; }

View File

@ -315,65 +315,19 @@ INSTANTIATE(Float64)
namespace detail
{
template <typename T>
const PaddedPODArray<T> * getIndexesData(const ColumnPtr & indexes)
const PaddedPODArray<T> * getIndexesData(const IColumn & indexes)
{
auto * column = typeid_cast<const ColumnVector<T> *>(indexes.get());
auto * column = typeid_cast<const ColumnVector<T> *>(&indexes);
if (column)
return &column->getData();
return nullptr;
}
template <typename T>
PaddedPODArray<T> * getIndexesData(IColumn & indexes)
{
auto * column = typeid_cast<ColumnVector<T> *>(&indexes);
if (column)
return &column->getData();
return nullptr;
}
template const PaddedPODArray<UInt8> * getIndexesData<UInt8>(const DB::ColumnPtr & indexes);
template const PaddedPODArray<UInt16> * getIndexesData<UInt16>(const DB::ColumnPtr & indexes);
template const PaddedPODArray<UInt32> * getIndexesData<UInt32>(const DB::ColumnPtr & indexes);
template const PaddedPODArray<UInt64> * getIndexesData<UInt64>(const DB::ColumnPtr & indexes);
template <typename T>
MutableColumnPtr getUniqueIndexImpl(PaddedPODArray<T> & index)
{
HashMap<T, T> hash_map;
for (auto val : index)
hash_map.insert({val, hash_map.size()});
auto res_col = ColumnVector<T>::create();
auto & data = res_col->getData();
data.resize(hash_map.size());
for (auto val : hash_map)
data[val.second] = val.first;
for (auto & ind : index)
ind = hash_map[ind];
return std::move(res_col);
}
}
/// Returns unique values of column. Write new index to column.
MutableColumnPtr makeSubIndex(IColumn & column)
{
if (auto * data_uint8 = detail::getIndexesData<UInt8>(column))
return detail::getUniqueIndexImpl(*data_uint8);
else if (auto * data_uint16 = detail::getIndexesData<UInt16>(column))
return detail::getUniqueIndexImpl(*data_uint16);
else if (auto * data_uint32 = detail::getIndexesData<UInt32>(column))
return detail::getUniqueIndexImpl(*data_uint32);
else if (auto * data_uint64 = detail::getIndexesData<UInt64>(column))
return detail::getUniqueIndexImpl(*data_uint64);
else
throw Exception("Indexes column for makeSubindex must be ColumnUInt, got" + column.getName(),
ErrorCodes::LOGICAL_ERROR);
template const PaddedPODArray<UInt8> * getIndexesData<UInt8>(const IColumn & indexes);
template const PaddedPODArray<UInt16> * getIndexesData<UInt16>(const IColumn & indexes);
template const PaddedPODArray<UInt32> * getIndexesData<UInt32>(const IColumn & indexes);
template const PaddedPODArray<UInt64> * getIndexesData<UInt64>(const IColumn & indexes);
}
}

View File

@ -41,17 +41,17 @@ void filterArraysImplOnlyData(
namespace detail
{
template <typename T>
const PaddedPODArray<T> * getIndexesData(const ColumnPtr & indexes);
const PaddedPODArray<T> * getIndexesData(const IColumn & indexes);
}
/// Check limit <= indexes->size() and call column.indexImpl(const PaddedPodArray<Type> & indexes, size_t limit).
template <typename Column>
ColumnPtr selectIndexImpl(const Column & column, const ColumnPtr & indexes, size_t limit)
ColumnPtr selectIndexImpl(const Column & column, const IColumn & indexes, size_t limit)
{
if (limit == 0)
limit = indexes->size();
limit = indexes.size();
if (indexes->size() < limit)
if (indexes.size() < limit)
throw Exception("Size of indexes is less than required.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
if (auto * data_uint8 = detail::getIndexesData<UInt8>(indexes))
@ -63,7 +63,7 @@ ColumnPtr selectIndexImpl(const Column & column, const ColumnPtr & indexes, size
else if (auto * data_uint64 = detail::getIndexesData<UInt64>(indexes))
return column.template indexImpl<UInt64>(*data_uint64, limit);
else
throw Exception("Indexes column for IColumn::select must be ColumnUInt, got" + indexes->getName(),
throw Exception("Indexes column for IColumn::select must be ColumnUInt, got" + indexes.getName(),
ErrorCodes::LOGICAL_ERROR);
}
@ -72,9 +72,4 @@ ColumnPtr selectIndexImpl(const Column & column, const ColumnPtr & indexes, size
template ColumnPtr Column::indexImpl<UInt16>(const PaddedPODArray<UInt16> & indexes, size_t limit) const; \
template ColumnPtr Column::indexImpl<UInt32>(const PaddedPODArray<UInt32> & indexes, size_t limit) const; \
template ColumnPtr Column::indexImpl<UInt64>(const PaddedPODArray<UInt64> & indexes, size_t limit) const;
/// Get unique values from index column (ColumnUInt*).
MutableColumnPtr makeSubIndex(IColumn & column);
}

View File

@ -194,7 +194,7 @@ public:
/// Creates new column with values column[indexes[:limit]]. If limit is 0, all indexes are used.
/// Indexes must be one of the ColumnUInt. For default implementation, see selectIndexImpl from ColumnsCommon.h
virtual Ptr index(const Ptr & indexes, size_t limit) const = 0;
virtual Ptr index(const IColumn & indexes, size_t limit) const = 0;
/** Compares (*this)[n] and rhs[m].
* Returns negative number, 0, or positive number (*this)[n] is less, equal, greater than rhs[m] respectively.

View File

@ -87,9 +87,9 @@ public:
return cloneDummy(limit ? std::min(s, limit) : s);
}
ColumnPtr index(const ColumnPtr & indexes, size_t limit) const override
ColumnPtr index(const IColumn & indexes, size_t limit) const override
{
if (indexes->size() < limit)
if (indexes.size() < limit)
throw Exception("Size of indexes is less than required.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
return cloneDummy(limit ? limit : s);

View File

@ -7,9 +7,13 @@ namespace DB
class IColumnUnique : public IColumn
{
public:
using ColumnUniquePtr = IColumn::template immutable_ptr<IColumnUnique>;
using MutableColumnUniquePtr = IColumn::template mutable_ptr<IColumnUnique>;
/// Column always contains Null if it's Nullable and empty string if it's String or Nullable(String).
/// So, size may be greater than the number of inserted unique values.
virtual const ColumnPtr & getNestedColumn() const = 0;
size_t size() const override { return getNestedColumn()->size(); }
/// Appends new value at the end of column (column's size is increased by 1).
@ -19,7 +23,18 @@ public:
virtual size_t uniqueInsertFrom(const IColumn & src, size_t n) = 0;
/// Appends range of elements from other column.
/// Could be used to concatenate columns.
virtual ColumnPtr uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length) = 0;
virtual MutableColumnPtr uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length) = 0;
struct IndexesWithOverflow
{
MutableColumnPtr indexes;
MutableColumnPtr overflowed_keys;
};
/// Like uniqueInsertRangeFrom, but doesn't insert keys if inner dictionary has more than max_dictionary_size keys.
/// Keys that won't be inserted into dictionary will be into overflowed_keys, indexes will be calculated for
/// concatenation of nested column (which can be got from getNestedColumn() function) and overflowed_keys.
virtual IndexesWithOverflow uniqueInsertRangeWithOverflow(const IColumn & src, size_t start,
size_t length, size_t max_dictionary_size) = 0;
/// Appends data located in specified memory chunk if it is possible (throws an exception if it cannot be implemented).
/// Is used to optimize some computations (in aggregation, for example).
@ -33,7 +48,18 @@ public:
virtual size_t uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) = 0;
// virtual size_t getInsertionPoint(const char * pos, size_t length) const = 0;
/// Column which contains the set of necessary for serialization keys. Such that empty column after
/// uniqueInsertRangeFrom(column->cut(offset, limit), 0, limit) call will contain the same set of keys.
struct SerializableState
{
ColumnPtr column;
size_t offset;
size_t limit;
};
virtual SerializableState getSerializableState() const = 0;
// virtual MutableColumnPtr getInsertionPoints(const ColumnPtr & keys) const = 0;
//
// virtual bool has(const char * pos, size_t length) const { return getInsertionPoint(pos, length) != size(); }
@ -74,7 +100,7 @@ public:
throw Exception("Method deserializeAndInsertFromArena is not supported for ColumnUnique.", ErrorCodes::NOT_IMPLEMENTED);
}
ColumnPtr index(const ColumnPtr &, size_t) const override
ColumnPtr index(const IColumn &, size_t) const override
{
throw Exception("Method index is not supported for ColumnUnique.", ErrorCodes::NOT_IMPLEMENTED);
}
@ -110,4 +136,7 @@ public:
}
};
using ColumnUniquePtr = IColumnUnique::ColumnUniquePtr;
using MutableColumnUniquePtr = IColumnUnique::MutableColumnUniquePtr;
}

View File

@ -59,9 +59,14 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server
void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
{
IDataType::InputStreamGetter input_stream_getter = [&] (const IDataType::SubstreamPath &) { return &istr; };
auto state = type.createDeserializeBinaryBulkState();
type.deserializeBinaryBulkWithMultipleStreams(column, input_stream_getter, rows, avg_value_size_hint, false, {}, state);
IDataType::DeserializeBinaryBulkSettings settings;
settings.getter = [&](IDataType::SubstreamPath) -> ReadBuffer * { return &istr; };
settings.avg_value_size_hint = avg_value_size_hint;
settings.position_independent_encoding = false;
IDataType::DeserializeBinaryBulkStatePtr state;
type.deserializeBinaryBulkStatePrefix(settings, state);
type.deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state);
if (column.size() != rows)
throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);

View File

@ -52,8 +52,15 @@ void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr
else
full_column = column;
IDataType::OutputStreamGetter output_stream_getter = [&] (const IDataType::SubstreamPath &) { return &ostr; };
type.serializeBinaryBulkWithMultipleStreams(*full_column, output_stream_getter, offset, limit, false, {});
IDataType::SerializeBinaryBulkSettings settings;
settings.getter = [&ostr](IDataType::SubstreamPath) -> WriteBuffer * { return &ostr; };
settings.position_independent_encoding = false;
settings.max_dictionary_size = 0;
IDataType::SerializeBinaryBulkStatePtr state;
type.serializeBinaryBulkStatePrefix(settings, state);
type.serializeBinaryBulkWithMultipleStreams(*full_column, offset, limit, settings, state);
type.serializeBinaryBulkStateSuffix(settings, state);
}

View File

@ -144,37 +144,67 @@ namespace
}
void DataTypeArray::enumerateStreams(StreamCallback callback, SubstreamPath path) const
void DataTypeArray::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
path.push_back(Substream::ArraySizes);
callback(path);
path.back() = Substream::ArrayElements;
nested->enumerateStreams(callback, path);
path.pop_back();
}
void DataTypeArray::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::ArrayElements);
nested->serializeBinaryBulkStatePrefix(settings, state);
settings.path.pop_back();
}
void DataTypeArray::serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::ArrayElements);
nested->serializeBinaryBulkStateSuffix(settings, state);
settings.path.pop_back();
}
void DataTypeArray::deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::ArrayElements);
nested->deserializeBinaryBulkStatePrefix(settings, state);
settings.path.pop_back();
}
void DataTypeArray::serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
OutputStreamGetter getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath path) const
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
const ColumnArray & column_array = typeid_cast<const ColumnArray &>(column);
/// First serialize array sizes.
path.push_back(Substream::ArraySizes);
if (auto stream = getter(path))
settings.path.push_back(Substream::ArraySizes);
if (auto stream = settings.getter(settings.path))
{
if (position_independent_encoding)
if (settings.position_independent_encoding)
serializeArraySizesPositionIndependent(column, *stream, offset, limit);
else
DataTypeNumber<ColumnArray::Offset>().serializeBinaryBulk(*column_array.getOffsetsPtr(), *stream, offset, limit);
}
/// Then serialize contents of arrays.
path.back() = Substream::ArrayElements;
settings.path.back() = Substream::ArrayElements;
const ColumnArray::Offsets & offset_values = column_array.getOffsets();
if (offset > offset_values.size())
@ -196,31 +226,29 @@ void DataTypeArray::serializeBinaryBulkWithMultipleStreams(
: 0;
if (limit == 0 || nested_limit)
nested->serializeBinaryBulkWithMultipleStreams(column_array.getData(), getter, nested_offset, nested_limit, position_independent_encoding, path);
nested->serializeBinaryBulkWithMultipleStreams(column_array.getData(), nested_offset, nested_limit, settings, state);
settings.path.pop_back();
}
void DataTypeArray::deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
InputStreamGetter getter,
size_t limit,
double /*avg_value_size_hint*/,
bool position_independent_encoding,
SubstreamPath path,
const DeserializeBinaryBulkStatePtr & state) const
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const
{
ColumnArray & column_array = typeid_cast<ColumnArray &>(column);
path.push_back(Substream::ArraySizes);
if (auto stream = getter(path))
settings.path.push_back(Substream::ArraySizes);
if (auto stream = settings.getter(settings.path))
{
if (position_independent_encoding)
if (settings.position_independent_encoding)
deserializeArraySizesPositionIndependent(column, *stream, limit);
else
DataTypeNumber<ColumnArray::Offset>().deserializeBinaryBulk(column_array.getOffsetsColumn(), *stream, limit, 0);
}
path.back() = Substream::ArrayElements;
settings.path.back() = Substream::ArrayElements;
ColumnArray::Offsets & offset_values = column_array.getOffsets();
IColumn & nested_column = column_array.getData();
@ -230,7 +258,8 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams(
if (last_offset < nested_column.size())
throw Exception("Nested column is longer than last offset", ErrorCodes::LOGICAL_ERROR);
size_t nested_limit = last_offset - nested_column.size();
nested->deserializeBinaryBulkWithMultipleStreams(nested_column, getter, nested_limit, 0, position_independent_encoding, path, state);
nested->deserializeBinaryBulkWithMultipleStreams(nested_column, nested_limit, settings, state);
settings.path.pop_back();
/// Check consistency between offsets and elements subcolumns.
/// But if elements column is empty - it's ok for columns of Nested types that was added by ALTER.

View File

@ -61,29 +61,32 @@ public:
* This is necessary, because when implementing nested structures, several arrays can have common sizes.
*/
void enumerateStreams(StreamCallback callback, SubstreamPath path) const override;
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const override;
void serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
OutputStreamGetter getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath path) const override;
DeserializeBinaryBulkStatePtr createDeserializeBinaryBulkState() const override
{
return nested->createDeserializeBinaryBulkState();
}
const IColumn & column,
size_t offset,
size_t limit,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
InputStreamGetter getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath path,
const DeserializeBinaryBulkStatePtr & state) const override;
IColumn & column,
size_t limit,
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const override;
MutableColumnPtr createColumn() const override;

View File

@ -37,54 +37,83 @@ bool DataTypeNullable::onlyNull() const
}
void DataTypeNullable::enumerateStreams(StreamCallback callback, SubstreamPath path) const
void DataTypeNullable::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
path.push_back(Substream::NullMap);
callback(path);
path.back() = Substream::NullableElements;
nested_data_type->enumerateStreams(callback, path);
path.pop_back();
}
void DataTypeNullable::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::NullableElements);
nested_data_type->serializeBinaryBulkStatePrefix(settings, state);
settings.path.pop_back();
}
void DataTypeNullable::serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::NullableElements);
nested_data_type->serializeBinaryBulkStateSuffix(settings, state);
settings.path.pop_back();
}
void DataTypeNullable::deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::NullableElements);
nested_data_type->deserializeBinaryBulkStatePrefix(settings, state);
settings.path.pop_back();
}
void DataTypeNullable::serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
OutputStreamGetter getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath path) const
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
const ColumnNullable & col = static_cast<const ColumnNullable &>(column);
col.checkConsistency();
/// First serialize null map.
path.push_back(Substream::NullMap);
if (auto stream = getter(path))
settings.path.push_back(Substream::NullMap);
if (auto stream = settings.getter(settings.path))
DataTypeUInt8().serializeBinaryBulk(col.getNullMapColumn(), *stream, offset, limit);
/// Then serialize contents of arrays.
path.back() = Substream::NullableElements;
nested_data_type->serializeBinaryBulkWithMultipleStreams(col.getNestedColumn(), getter, offset, limit, position_independent_encoding, path);
settings.path.back() = Substream::NullableElements;
nested_data_type->serializeBinaryBulkWithMultipleStreams(col.getNestedColumn(), offset, limit, settings, state);
settings.path.pop_back();
}
void DataTypeNullable::deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
InputStreamGetter getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath path,
const DeserializeBinaryBulkStatePtr & state) const
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const
{
ColumnNullable & col = static_cast<ColumnNullable &>(column);
path.push_back(Substream::NullMap);
if (auto stream = getter(path))
settings.path.push_back(Substream::NullMap);
if (auto stream = settings.getter(settings.path))
DataTypeUInt8().deserializeBinaryBulk(col.getNullMapColumn(), *stream, limit, 0);
path.back() = Substream::NullableElements;
nested_data_type->deserializeBinaryBulkWithMultipleStreams(col.getNestedColumn(), getter, limit, avg_value_size_hint, position_independent_encoding, path, state);
settings.path.back() = Substream::NullableElements;
nested_data_type->deserializeBinaryBulkWithMultipleStreams(col.getNestedColumn(), limit, settings, state);
settings.path.pop_back();
}

View File

@ -17,29 +17,32 @@ public:
std::string getName() const override { return "Nullable(" + nested_data_type->getName() + ")"; }
const char * getFamilyName() const override { return "Nullable"; }
void enumerateStreams(StreamCallback callback, SubstreamPath path) const override;
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const override;
void serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
OutputStreamGetter getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath path) const override;
DeserializeBinaryBulkStatePtr createDeserializeBinaryBulkState() const override
{
return nested_data_type->createDeserializeBinaryBulkState();
}
const IColumn & column,
size_t offset,
size_t limit,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
InputStreamGetter getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath path,
const DeserializeBinaryBulkStatePtr & state) const override;
IColumn & column,
size_t limit,
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const override;
void serializeBinary(const Field & field, WriteBuffer & ostr) const override { nested_data_type->serializeBinary(field, ostr); }
void deserializeBinary(Field & field, ReadBuffer & istr) const override { nested_data_type->deserializeBinary(field, istr); }

View File

@ -282,7 +282,7 @@ void DataTypeTuple::deserializeTextCSV(IColumn & column, ReadBuffer & istr, cons
});
}
void DataTypeTuple::enumerateStreams(StreamCallback callback, SubstreamPath path) const
void DataTypeTuple::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
path.push_back(Substream::TupleElement);
for (const auto i : ext::range(0, ext::size(elems)))
@ -290,44 +290,133 @@ void DataTypeTuple::enumerateStreams(StreamCallback callback, SubstreamPath path
path.back().tuple_element_name = names[i];
elems[i]->enumerateStreams(callback, path);
}
path.pop_back();
}
struct SerializeBinaryBulkStateTuple : public IDataType::SerializeBinaryBulkState
{
std::vector<IDataType::SerializeBinaryBulkStatePtr> states;
};
struct DeserializeBinaryBulkStateTuple : public IDataType::DeserializeBinaryBulkState
{
std::vector<IDataType::DeserializeBinaryBulkStatePtr> states;
};
static SerializeBinaryBulkStateTuple * checkAndGetTupleSerializeState(IDataType::SerializeBinaryBulkStatePtr & state)
{
if (!state)
throw Exception("Got empty state for DataTypeTuple.", ErrorCodes::LOGICAL_ERROR);
auto * tuple_state = typeid_cast<SerializeBinaryBulkStateTuple *>(state.get());
if (!tuple_state)
throw Exception("Invalid SerializeBinaryBulkState for DataTypeTuple. Expected: "
+ demangle(typeid(SerializeBinaryBulkStateTuple).name()) + ", got "
+ demangle(typeid(*state).name()), ErrorCodes::LOGICAL_ERROR);
return tuple_state;
}
static DeserializeBinaryBulkStateTuple * checkAndGetTupleDeserializeState(IDataType::DeserializeBinaryBulkStatePtr & state)
{
if (!state)
throw Exception("Got empty state for DataTypeTuple.", ErrorCodes::LOGICAL_ERROR);
auto * tuple_state = typeid_cast<DeserializeBinaryBulkStateTuple *>(state.get());
if (!tuple_state)
throw Exception("Invalid DeserializeBinaryBulkState for DataTypeTuple. Expected: "
+ demangle(typeid(DeserializeBinaryBulkStateTuple).name()) + ", got "
+ demangle(typeid(*state).name()), ErrorCodes::LOGICAL_ERROR);
return tuple_state;
}
void DataTypeTuple::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
auto tuple_state = std::make_shared<SerializeBinaryBulkStateTuple>();
tuple_state->states.resize(elems.size());
settings.path.push_back(Substream::TupleElement);
for (size_t i = 0; i < elems.size(); ++i)
{
settings.path.back().tuple_element_name = names[i];
elems[i]->serializeBinaryBulkStatePrefix(settings, tuple_state->states[i]);
}
settings.path.pop_back();
state = std::move(tuple_state);
}
void DataTypeTuple::serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
auto * tuple_state = checkAndGetTupleSerializeState(state);
settings.path.push_back(Substream::TupleElement);
for (size_t i = 0; i < elems.size(); ++i)
{
settings.path.back().tuple_element_name = names[i];
elems[i]->serializeBinaryBulkStateSuffix(settings, tuple_state->states[i]);
}
settings.path.pop_back();
}
void DataTypeTuple::deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const
{
auto tuple_state = std::make_shared<DeserializeBinaryBulkStateTuple>();
tuple_state->states.resize(elems.size());
settings.path.push_back(Substream::TupleElement);
for (size_t i = 0; i < elems.size(); ++i)
{
settings.path.back().tuple_element_name = names[i];
elems[i]->deserializeBinaryBulkStatePrefix(settings, tuple_state->states[i]);
}
settings.path.pop_back();
state = std::move(tuple_state);
}
void DataTypeTuple::serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
OutputStreamGetter getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath path) const
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
path.push_back(Substream::TupleElement);
auto * tuple_state = checkAndGetTupleSerializeState(state);
settings.path.push_back(Substream::TupleElement);
for (const auto i : ext::range(0, ext::size(elems)))
{
path.back().tuple_element_name = names[i];
elems[i]->serializeBinaryBulkWithMultipleStreams(
extractElementColumn(column, i), getter, offset, limit, position_independent_encoding, path);
settings.path.back().tuple_element_name = names[i];
auto & element_col = extractElementColumn(column, i);
elems[i]->serializeBinaryBulkWithMultipleStreams(element_col, offset, limit, settings, tuple_state->states[i]);
}
settings.path.pop_back();
}
void DataTypeTuple::deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
InputStreamGetter getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath path,
const DeserializeBinaryBulkStatePtr & state) const
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const
{
auto * tuple_state = typeid_cast<DeserializeBinaryBulkStateTuple *>(state.get());
auto * tuple_state = checkAndGetTupleDeserializeState(state);
path.push_back(Substream::TupleElement);
settings.path.push_back(Substream::TupleElement);
for (const auto i : ext::range(0, ext::size(elems)))
{
path.back().tuple_element_name = names[i];
elems[i]->deserializeBinaryBulkWithMultipleStreams(
extractElementColumn(column, i), getter, limit, avg_value_size_hint,
position_independent_encoding, path, tuple_state->states[i]);
settings.path.back().tuple_element_name = names[i];
auto & element_col = extractElementColumn(column, i);
elems[i]->deserializeBinaryBulkWithMultipleStreams(element_col, limit, settings, tuple_state->states[i]);
}
settings.path.pop_back();
}
MutableColumnPtr DataTypeTuple::createColumn() const

View File

@ -53,40 +53,32 @@ public:
/** Each sub-column in a tuple is serialized in separate stream.
*/
void enumerateStreams(StreamCallback callback, SubstreamPath path) const override;
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const override;
void serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
OutputStreamGetter getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath path) const override;
struct DeserializeBinaryBulkStateTuple : public IDataType::DeserializeBinaryBulkState
{
std::vector<DeserializeBinaryBulkStatePtr> states;
DeserializeBinaryBulkStateTuple(const DataTypes & types)
{
states.reserve(types.size());
for (auto & type : types)
states.emplace_back(type->createDeserializeBinaryBulkState());
}
};
DeserializeBinaryBulkStatePtr createDeserializeBinaryBulkState() const override
{
return std::make_shared<DeserializeBinaryBulkStateTuple>(elems);
}
const IColumn & column,
size_t offset,
size_t limit,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
InputStreamGetter getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath path,
const DeserializeBinaryBulkStatePtr & state) const override;
IColumn & column,
size_t limit,
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const override;
MutableColumnPtr createColumn() const override;

View File

@ -52,134 +52,543 @@ DataTypeWithDictionary::DataTypeWithDictionary(DataTypePtr dictionary_type_, Dat
+ dictionary_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
void DataTypeWithDictionary::enumerateStreams(StreamCallback callback, SubstreamPath path) const
void DataTypeWithDictionary::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
path.push_back(Substream::DictionaryKeys);
dictionary_type->enumerateStreams(callback, path);
path.back() = Substream::DictionaryIndexes;
indexes_type->enumerateStreams(callback, path);
path.pop_back();
}
struct KeysSerializationVersion
{
/// Write keys as full column. No indexes is written. Structure:
/// <name>.dict.bin : [version - 32 bits][keys]
/// <name>.dict.mrk : [marks for keys]
// FullColumn = 0,
/// Write all keys in serializePostfix and read in deserializePrefix.
/// <name>.dict.bin : [version - 32 bits][indexes type - 32 bits][keys]
/// <name>.bin : [indexes]
/// <name>.mrk : [marks for indexes]
// SingleDictionary,
/// Write distinct set of keys for each granule. Structure:
/// <name>.dict.bin : [version - 32 bits][indexes type - 32 bits][keys]
/// <name>.dict.mrk : [marks for keys]
/// <name>.bin : [indexes]
/// <name>.mrk : [marks for indexes]
// DictionaryPerGranule,
enum Value
{
SingleDictionaryWithAdditionalKeysPerBlock = 1,
};
Value value;
static void checkVersion(UInt64 version)
{
if (version != SingleDictionaryWithAdditionalKeysPerBlock)
throw Exception("Invalid version for DataTypeWithDictionary key column.", ErrorCodes::LOGICAL_ERROR);
}
KeysSerializationVersion(UInt64 version) : value(static_cast<Value>(version)) { checkVersion(version); }
};
struct IndexesSerializationType
{
using SerializationType = UInt64;
static constexpr UInt64 NeedGlobalDictionaryBit = 1u << 8u;
static constexpr UInt64 HasAdditionalKeysBit = 1u << 9u;
enum Type
{
TUInt8 = 0,
TUInt16,
TUInt32,
TUInt64,
};
Type type;
bool has_additional_keys;
bool need_global_dictionary;
static constexpr SerializationType resetFlags(SerializationType type)
{
return type & (~(HasAdditionalKeysBit | NeedGlobalDictionaryBit));
}
static void checkType(SerializationType type)
{
UInt64 value = resetFlags(type);
if (value <= TUInt64)
return;
throw Exception("Invalid type for DataTypeWithDictionary index column.", ErrorCodes::LOGICAL_ERROR);
}
void serialize(WriteBuffer & buffer) const
{
SerializationType val = type;
if (has_additional_keys)
val |= HasAdditionalKeysBit;
if (need_global_dictionary)
val |= NeedGlobalDictionaryBit;
writeIntBinary(val, buffer);
}
void deserialize(ReadBuffer & buffer)
{
SerializationType val;
readIntBinary(val, buffer);
checkType(val);
has_additional_keys = (val & HasAdditionalKeysBit) != 0;
need_global_dictionary = (val & NeedGlobalDictionaryBit) != 0;
type = static_cast<Type>(resetFlags(val));
}
IndexesSerializationType(const IDataType & data_type, bool has_additional_keys, bool need_global_dictionary)
: has_additional_keys(has_additional_keys), need_global_dictionary(need_global_dictionary)
{
if (typeid_cast<const DataTypeUInt8 *>(&data_type))
type = TUInt8;
else if (typeid_cast<const DataTypeUInt16 *>(&data_type))
type = TUInt16;
else if (typeid_cast<const DataTypeUInt32 *>(&data_type))
type = TUInt32;
else if (typeid_cast<const DataTypeUInt64 *>(&data_type))
type = TUInt64;
else
throw Exception("Invalid DataType for IndexesSerializationType. Expected UInt*, got " + data_type.getName(),
ErrorCodes::LOGICAL_ERROR);
}
DataTypePtr getDataType() const
{
if (type == TUInt8)
return std::make_shared<DataTypeUInt8>();
if (type == TUInt16)
return std::make_shared<DataTypeUInt16>();
if (type == TUInt32)
return std::make_shared<DataTypeUInt32>();
if (type == TUInt64)
return std::make_shared<DataTypeUInt64>();
throw Exception("Can't create DataType from IndexesSerializationType.", ErrorCodes::LOGICAL_ERROR);
}
IndexesSerializationType() = default;
};
struct SerializeStateWithDictionary : public IDataType::SerializeBinaryBulkState
{
KeysSerializationVersion key_version;
MutableColumnUniquePtr global_dictionary;
explicit SerializeStateWithDictionary(
UInt64 key_version,
MutableColumnUniquePtr && column_unique)
: key_version(key_version)
, global_dictionary(std::move(column_unique)) {}
};
struct DeserializeStateWithDictionary : public IDataType::DeserializeBinaryBulkState
{
KeysSerializationVersion key_version;
ColumnUniquePtr global_dictionary;
UInt64 num_bytes_in_dictionary;
IndexesSerializationType index_type;
MutableColumnPtr additional_keys;
UInt64 num_pending_rows = 0;
explicit DeserializeStateWithDictionary(UInt64 key_version) : key_version(key_version) {}
};
static SerializeStateWithDictionary * checkAndGetWithDictionarySerializeState(
IDataType::SerializeBinaryBulkStatePtr & state)
{
if (!state)
throw Exception("Got empty state for DataTypeWithDictionary.", ErrorCodes::LOGICAL_ERROR);
auto * with_dictionary_state = typeid_cast<SerializeStateWithDictionary *>(state.get());
if (!with_dictionary_state)
throw Exception("Invalid SerializeBinaryBulkState for DataTypeWithDictionary. Expected: "
+ demangle(typeid(SerializeStateWithDictionary).name()) + ", got "
+ demangle(typeid(*state).name()), ErrorCodes::LOGICAL_ERROR);
return with_dictionary_state;
}
static DeserializeStateWithDictionary * checkAndGetWithDictionaryDeserializeState(
IDataType::DeserializeBinaryBulkStatePtr & state)
{
if (!state)
throw Exception("Got empty state for DataTypeWithDictionary.", ErrorCodes::LOGICAL_ERROR);
auto * with_dictionary_state = typeid_cast<DeserializeStateWithDictionary *>(state.get());
if (!with_dictionary_state)
throw Exception("Invalid DeserializeBinaryBulkState for DataTypeWithDictionary. Expected: "
+ demangle(typeid(DeserializeStateWithDictionary).name()) + ", got "
+ demangle(typeid(*state).name()), ErrorCodes::LOGICAL_ERROR);
return with_dictionary_state;
}
void DataTypeWithDictionary::serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::DictionaryKeys);
auto * stream = settings.getter(settings.path);
settings.path.pop_back();
if (!stream)
throw Exception("Got empty stream in DataTypeWithDictionary::serializeBinaryBulkStatePrefix",
ErrorCodes::LOGICAL_ERROR);
/// Write version and create SerializeBinaryBulkState.
UInt64 key_version = KeysSerializationVersion::SingleDictionaryWithAdditionalKeysPerBlock;
writeIntBinary(key_version, *stream);
auto column_unique = createColumnUnique(*dictionary_type, *indexes_type);
state = std::make_shared<SerializeStateWithDictionary>(key_version, std::move(column_unique));
}
void DataTypeWithDictionary::serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
auto * state_with_dictionary = checkAndGetWithDictionarySerializeState(state);
KeysSerializationVersion::checkVersion(state_with_dictionary->key_version.value);
if (state_with_dictionary->global_dictionary)
{
auto unique_state = state_with_dictionary->global_dictionary->getSerializableState();
UInt64 num_keys = unique_state.limit;
if (settings.max_dictionary_size)
{
settings.path.push_back(Substream::DictionaryKeys);
auto * stream = settings.getter(settings.path);
settings.path.pop_back();
if (!stream)
throw Exception("Got empty stream in DataTypeWithDictionary::serializeBinaryBulkStateSuffix",
ErrorCodes::LOGICAL_ERROR);
writeIntBinary(num_keys, *stream);
removeNullable(dictionary_type)->serializeBinaryBulk(*unique_state.column, *stream,
unique_state.offset, unique_state.limit);
}
}
}
void DataTypeWithDictionary::deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::DictionaryKeys);
auto * stream = settings.getter(settings.path);
settings.path.pop_back();
if (!stream)
throw Exception("Got empty stream in DataTypeWithDictionary::deserializeBinaryBulkStatePrefix",
ErrorCodes::LOGICAL_ERROR);
UInt64 keys_version;
readIntBinary(keys_version, *stream);
state = std::make_shared<DeserializeStateWithDictionary>(keys_version);
}
namespace
{
template <typename T>
PaddedPODArray<T> * getIndexesData(IColumn & indexes)
{
auto * column = typeid_cast<ColumnVector<T> *>(&indexes);
if (column)
return &column->getData();
return nullptr;
}
template <typename T>
MutableColumnPtr mapUniqueIndexImpl(PaddedPODArray<T> & index)
{
HashMap<T, T> hash_map;
for (auto val : index)
hash_map.insert({val, hash_map.size()});
auto res_col = ColumnVector<T>::create();
auto & data = res_col->getData();
data.resize(hash_map.size());
for (auto val : hash_map)
data[val.second] = val.first;
for (auto & ind : index)
ind = hash_map[ind];
return std::move(res_col);
}
/// Returns unique values of column. Write new index to column.
MutableColumnPtr mapUniqueIndex(IColumn & column)
{
if (auto * data_uint8 = getIndexesData<UInt8>(column))
return mapUniqueIndexImpl(*data_uint8);
else if (auto * data_uint16 = getIndexesData<UInt16>(column))
return mapUniqueIndexImpl(*data_uint16);
else if (auto * data_uint32 = getIndexesData<UInt32>(column))
return mapUniqueIndexImpl(*data_uint32);
else if (auto * data_uint64 = getIndexesData<UInt64>(column))
return mapUniqueIndexImpl(*data_uint64);
else
throw Exception("Indexes column for getUniqueIndex must be ColumnUInt, got" + column.getName(),
ErrorCodes::LOGICAL_ERROR);
}
template <typename T>
MutableColumnPtr mapIndexWithOverflow(PaddedPODArray<T> & index, size_t max_val)
{
HashMap<T, T> hash_map;
HashMap<T, T> hash_map_with_overflow;
for (auto val : index)
{
auto & map = val < max_val ? hash_map : hash_map_with_overflow;
map.insert({val, map.size()});
}
auto index_map_col = ColumnVector<T>::create();
auto & index_data = index_map_col->getData();
index_data.resize(hash_map.size());
for (auto val : hash_map)
index_data[val.second] = val.first;
for (auto & val : index)
val = val < max_val ? hash_map[val]
: hash_map_with_overflow[val] + hash_map.size();
return index_map_col;
}
MutableColumnPtr mapIndexWithOverflow(IColumn & column, size_t max_size)
{
if (auto * data_uint8 = getIndexesData<UInt8>(column))
return mapIndexWithOverflow(*data_uint8, max_size);
else if (auto * data_uint16 = getIndexesData<UInt16>(column))
return mapIndexWithOverflow(*data_uint16, max_size);
else if (auto * data_uint32 = getIndexesData<UInt32>(column))
return mapIndexWithOverflow(*data_uint32, max_size);
else if (auto * data_uint64 = getIndexesData<UInt64>(column))
return mapIndexWithOverflow(*data_uint64, max_size);
else
throw Exception("Indexes column for makeIndexWithOverflow must be ColumnUInt, got" + column.getName(),
ErrorCodes::LOGICAL_ERROR);
}
}
void DataTypeWithDictionary::serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
OutputStreamGetter getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath path) const
const IColumn & column,
size_t offset,
size_t limit,
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const
{
settings.path.push_back(Substream::DictionaryKeys);
auto * keys_stream = settings.getter(settings.path);
settings.path.back() = Substream::DictionaryIndexes;
auto * indexes_stream = settings.getter(settings.path);
settings.path.pop_back();
if (!keys_stream && !indexes_stream)
return;
if (!keys_stream)
throw Exception("Got empty stream for DataTypeWithDictionary keys.", ErrorCodes::LOGICAL_ERROR);
if (!indexes_stream)
throw Exception("Got empty stream for DataTypeWithDictionary indexes.", ErrorCodes::LOGICAL_ERROR);
const ColumnWithDictionary & column_with_dictionary = typeid_cast<const ColumnWithDictionary &>(column);
auto * state_with_dictionary = checkAndGetWithDictionarySerializeState(state);
auto & global_dictionary = state_with_dictionary->global_dictionary;
KeysSerializationVersion::checkVersion(state_with_dictionary->key_version.value);
auto unique_state = global_dictionary->getSerializableState();
bool was_global_dictionary_written = unique_state.limit >= settings.max_dictionary_size;
const auto & indexes = column_with_dictionary.getIndexesPtr();
const auto & keys = column_with_dictionary.getUnique()->getSerializableState().column;
size_t max_limit = column.size() - offset;
limit = limit ? std::min(limit, max_limit) : max_limit;
path.push_back(Substream::DictionaryIndexes);
if (auto stream = getter(path))
/// Create pair (used_keys, sub_index) which is the dictionary for [offset, offset + limit) range.
MutableColumnPtr sub_index = (*indexes->cut(offset, limit)).mutate();
auto unique_indexes = mapUniqueIndex(*sub_index);
/// unique_indexes->index(*sub_index) == indexes[offset:offset + limit]
MutableColumnPtr used_keys = (*keys->index(*unique_indexes, 0)).mutate();
if (settings.max_dictionary_size)
{
const auto & indexes = column_with_dictionary.getIndexesPtr();
const auto & keys = column_with_dictionary.getUnique()->getNestedColumn();
MutableColumnPtr sub_index = (*indexes->cut(offset, limit)).mutate();
ColumnPtr unique_indexes = makeSubIndex(*sub_index);
/// unique_indexes->index(sub_index) == indexes[offset:offset + limit]
auto used_keys = keys->index(unique_indexes, 0);
/// (used_keys, sub_index) is ColumnWithDictionary for range [offset:offset + limit]
UInt64 used_keys_size = used_keys->size();
writeIntBinary(used_keys_size, *stream);
UInt64 indexes_size = sub_index->size();
writeIntBinary(indexes_size, *stream);
path.back() = Substream::DictionaryKeys;
dictionary_type->serializeBinaryBulkWithMultipleStreams(*used_keys, getter, 0, 0,
position_independent_encoding, path);
indexes_type->serializeBinaryBulk(*sub_index, *stream, 0, limit);
/// Insert used_keys into global dictionary and update sub_index.
auto indexes_with_overflow = global_dictionary->uniqueInsertRangeWithOverflow(*used_keys, 0, used_keys->size(),
settings.max_dictionary_size);
sub_index = (*indexes_with_overflow.indexes->index(*sub_index, 0)).mutate();
used_keys = std::move(indexes_with_overflow.overflowed_keys);
}
}
struct DeserializeBinaryBulkStateWithDictionary : public IDataType::DeserializeBinaryBulkState
{
UInt64 num_rows_to_read_until_next_index = 0;
ColumnPtr index;
IDataType::DeserializeBinaryBulkStatePtr state;
bool need_additional_keys = !used_keys->empty();
bool need_dictionary = settings.max_dictionary_size != 0;
bool need_write_dictionary = !was_global_dictionary_written && unique_state.limit >= settings.max_dictionary_size;
explicit DeserializeBinaryBulkStateWithDictionary(IDataType::DeserializeBinaryBulkStatePtr && state)
: state(std::move(state)) {}
};
IndexesSerializationType index_version(*indexes_type, need_additional_keys, need_dictionary);
index_version.serialize(*indexes_stream);
IDataType::DeserializeBinaryBulkStatePtr DataTypeWithDictionary::createDeserializeBinaryBulkState() const
{
return std::make_shared<DeserializeBinaryBulkStateWithDictionary>(
dictionary_type->createDeserializeBinaryBulkState());
unique_state = global_dictionary->getSerializableState();
if (need_write_dictionary)
{
/// Write global dictionary if it wasn't written and has too many keys.
UInt64 num_keys = unique_state.limit;
writeIntBinary(num_keys, *keys_stream);
removeNullable(dictionary_type)->serializeBinaryBulk(*unique_state.column, *keys_stream, unique_state.offset, num_keys);
}
if (need_additional_keys)
{
UInt64 num_keys = used_keys->size();
writeIntBinary(num_keys, *indexes_stream);
removeNullable(dictionary_type)->serializeBinaryBulk(*used_keys, *indexes_stream, 0, num_keys);
}
UInt64 num_rows = sub_index->size();
writeIntBinary(num_rows, *indexes_stream);
indexes_type->serializeBinaryBulk(*sub_index, *indexes_stream, 0, num_rows);
}
void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
InputStreamGetter getter,
size_t limit,
double /*avg_value_size_hint*/,
bool position_independent_encoding,
SubstreamPath path,
const DeserializeBinaryBulkStatePtr & state) const
IColumn & column,
size_t limit,
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const
{
ColumnWithDictionary & column_with_dictionary = typeid_cast<ColumnWithDictionary &>(column);
auto dict_state = typeid_cast<DeserializeBinaryBulkStateWithDictionary *>(state.get());
if (dict_state == nullptr)
throw Exception("Invalid DeserializeBinaryBulkState.", ErrorCodes::LOGICAL_ERROR);
auto * state_with_dictionary = checkAndGetWithDictionaryDeserializeState(state);
KeysSerializationVersion::checkVersion(state_with_dictionary->key_version.value);
auto readIndexes = [&](ReadBuffer * stream, const ColumnPtr & index, size_t num_rows)
settings.path.push_back(Substream::DictionaryKeys);
auto * keys_stream = settings.getter(settings.path);
settings.path.back() = Substream::DictionaryIndexes;
auto * indexes_stream = settings.getter(settings.path);
settings.path.pop_back();
if (!keys_stream && !indexes_stream)
return;
if (!keys_stream)
throw Exception("Got empty stream for DataTypeWithDictionary keys.", ErrorCodes::LOGICAL_ERROR);
if (!indexes_stream)
throw Exception("Got empty stream for DataTypeWithDictionary indexes.", ErrorCodes::LOGICAL_ERROR);
auto readDictionary = [this, state_with_dictionary, keys_stream, &column_with_dictionary]()
{
auto index_col = indexes_type->createColumn();
indexes_type->deserializeBinaryBulk(*index_col, *stream, num_rows, 0);
column_with_dictionary.getIndexes()->insertRangeFrom(*index->index(std::move(index_col), 0), 0, num_rows);
UInt64 num_keys;
readIntBinary(num_keys, *keys_stream);
auto keys_type = removeNullable(dictionary_type);
auto global_dict_keys = keys_type->createColumn();
keys_type->deserializeBinaryBulk(*global_dict_keys, *keys_stream, num_keys, 0);
auto column_unique = createColumnUnique(*dictionary_type, *indexes_type);
column_unique->uniqueInsertRangeFrom(*global_dict_keys, 0, num_keys);
state_with_dictionary->global_dictionary = std::move(column_unique);
};
using CachedStreams = std::unordered_map<std::string, ReadBuffer *>;
CachedStreams cached_streams;
IDataType::InputStreamGetter cached_stream_getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer *
auto readAdditionalKeys = [this, state_with_dictionary, indexes_stream]()
{
std::string stream_name = IDataType::getFileNameForStream("", path);
auto iter = cached_streams.find(stream_name);
if (iter == cached_streams.end())
iter = cached_streams.insert({stream_name, getter(path)}).first;
return iter->second;
UInt64 num_keys;
readIntBinary(num_keys, *indexes_stream);
auto keys_type = removeNullable(dictionary_type);
state_with_dictionary->additional_keys = keys_type->createColumn();
keys_type->deserializeBinaryBulk(*state_with_dictionary->additional_keys, *indexes_stream, num_keys, 0);
};
auto readDict = [&](UInt64 num_keys)
auto readIndexes = [this, state_with_dictionary, indexes_stream, &column_with_dictionary](UInt64 num_rows,
bool need_dictionary)
{
auto dict_column = dictionary_type->createColumn();
dictionary_type->deserializeBinaryBulkWithMultipleStreams(*dict_column, cached_stream_getter, num_keys, 0,
position_independent_encoding, path, dict_state->state);
return column_with_dictionary.getUnique()->uniqueInsertRangeFrom(*dict_column, 0, num_keys);
};
MutableColumnPtr indexes_column = indexes_type->createColumn();
indexes_type->deserializeBinaryBulk(*indexes_column, *indexes_stream, num_rows, 0);
path.push_back(Substream::DictionaryIndexes);
auto & global_dictionary = state_with_dictionary->global_dictionary;
const auto & additional_keys = state_with_dictionary->additional_keys;
auto * column_unique = column_with_dictionary.getUnique();
if (auto stream = getter(path))
{
path.back() = Substream::DictionaryKeys;
bool has_additional_keys = state_with_dictionary->additional_keys != nullptr;
bool column_is_empty = column_with_dictionary.empty();
bool column_with_global_dictionary = column_unique == global_dictionary.get();
while (limit)
if (!has_additional_keys && (column_is_empty || column_with_global_dictionary))
{
if (dict_state->num_rows_to_read_until_next_index == 0)
{
if (stream->eof())
break;
if (column_is_empty)
column_with_dictionary.setUnique(global_dictionary);
UInt64 num_keys;
readIntBinary(num_keys, *stream);
readIntBinary(dict_state->num_rows_to_read_until_next_index, *stream);
dict_state->index = readDict(num_keys);
column_with_dictionary.getIndexes()->insertRangeFrom(*indexes_column, 0, num_rows);
}
else if (!need_dictionary)
{
auto indexes = column_unique->uniqueInsertRangeFrom(*additional_keys, 0, additional_keys->size());
column_with_dictionary.getIndexes()->insertRangeFrom(*indexes->index(*indexes_column, 0), 0, num_rows);
}
else
{
auto index_map = mapIndexWithOverflow(*indexes_column, global_dictionary->size());
auto used_keys = global_dictionary->getNestedColumn()->index(*index_map, 0);
auto indexes = column_unique->uniqueInsertRangeFrom(*used_keys, 0, used_keys->size());
if (additional_keys)
{
size_t num_keys = additional_keys->size();
auto additional_indexes = column_unique->uniqueInsertRangeFrom(*additional_keys, 0, num_keys);
indexes->insertRangeFrom(*additional_indexes, 0, num_keys);
}
size_t num_rows_to_read = std::min(limit, dict_state->num_rows_to_read_until_next_index);
readIndexes(stream, dict_state->index, num_rows_to_read);
limit -= num_rows_to_read;
dict_state->num_rows_to_read_until_next_index -= num_rows_to_read;
column_with_dictionary.getIndexes()->insertRangeFrom(*indexes->index(*indexes_column, 0), 0, num_rows);
}
};
while (limit)
{
if (state_with_dictionary->num_pending_rows == 0)
{
if (indexes_stream->eof())
break;
state_with_dictionary->index_type.deserialize(*indexes_stream);
if (state_with_dictionary->index_type.need_global_dictionary && !state_with_dictionary->global_dictionary)
readDictionary();
if (state_with_dictionary->index_type.has_additional_keys)
readAdditionalKeys();
else
state_with_dictionary->additional_keys = nullptr;
readIntBinary(state_with_dictionary->num_pending_rows, *indexes_stream);
}
size_t num_rows_to_read = std::min(limit, state_with_dictionary->num_pending_rows);
readIndexes(num_rows_to_read, state_with_dictionary->index_type.need_global_dictionary);
limit -= num_rows_to_read;
state_with_dictionary->num_pending_rows -= num_rows_to_read;
}
}
@ -216,64 +625,69 @@ void DataTypeWithDictionary::deserializeImpl(
}
template <typename ColumnType, typename IndexType>
MutableColumnPtr DataTypeWithDictionary::createColumnImpl() const
MutableColumnUniquePtr DataTypeWithDictionary::createColumnUniqueImpl(const IDataType & keys_type)
{
return ColumnWithDictionary::create(ColumnUnique<ColumnType, IndexType>::create(dictionary_type),
indexes_type->createColumn());
return ColumnUnique<ColumnType, IndexType>::create(keys_type);
}
template <typename ColumnType>
MutableColumnPtr DataTypeWithDictionary::createColumnImpl() const
MutableColumnUniquePtr DataTypeWithDictionary::createColumnUniqueImpl(const IDataType & keys_type,
const IDataType & indexes_type)
{
if (typeid_cast<const DataTypeUInt8 *>(indexes_type.get()))
return createColumnImpl<ColumnType, UInt8>();
if (typeid_cast<const DataTypeUInt16 *>(indexes_type.get()))
return createColumnImpl<ColumnType, UInt16>();
if (typeid_cast<const DataTypeUInt32 *>(indexes_type.get()))
return createColumnImpl<ColumnType, UInt32>();
if (typeid_cast<const DataTypeUInt64 *>(indexes_type.get()))
return createColumnImpl<ColumnType, UInt64>();
if (typeid_cast<const DataTypeUInt8 *>(&indexes_type))
return createColumnUniqueImpl<ColumnType, UInt8>(keys_type);
if (typeid_cast<const DataTypeUInt16 *>(&indexes_type))
return createColumnUniqueImpl<ColumnType, UInt16>(keys_type);
if (typeid_cast<const DataTypeUInt32 *>(&indexes_type))
return createColumnUniqueImpl<ColumnType, UInt32>(keys_type);
if (typeid_cast<const DataTypeUInt64 *>(&indexes_type))
return createColumnUniqueImpl<ColumnType, UInt64>(keys_type);
throw Exception("The type of indexes must be unsigned integer, but got " + dictionary_type->getName(),
throw Exception("The type of indexes must be unsigned integer, but got " + indexes_type.getName(),
ErrorCodes::LOGICAL_ERROR);
}
struct CreateColumnVector
{
MutableColumnPtr & column;
const DataTypeWithDictionary * data_type_with_dictionary;
const IDataType * type;
MutableColumnUniquePtr & column;
const IDataType & keys_type;
const IDataType & indexes_type;
const IDataType * nested_type;
CreateColumnVector(MutableColumnPtr & column, const DataTypeWithDictionary * data_type_with_dictionary,
const IDataType * type)
: column(column), data_type_with_dictionary(data_type_with_dictionary), type(type) {}
CreateColumnVector(MutableColumnUniquePtr & column, const IDataType & keys_type, const IDataType & indexes_type)
: column(column), keys_type(keys_type), indexes_type(indexes_type), nested_type(&keys_type)
{
if (auto nullable_type = typeid_cast<const DataTypeNullable *>(&keys_type))
nested_type = nullable_type->getNestedType().get();
}
template <typename T, size_t>
void operator()()
{
if (typeid_cast<const DataTypeNumber<T> *>(type))
column = data_type_with_dictionary->createColumnImpl<ColumnVector<T>>();
if (typeid_cast<const DataTypeNumber<T> *>(nested_type))
column = DataTypeWithDictionary::createColumnUniqueImpl<ColumnVector<T>>(keys_type, indexes_type);
}
};
MutableColumnPtr DataTypeWithDictionary::createColumn() const
MutableColumnUniquePtr DataTypeWithDictionary::createColumnUnique(const IDataType & keys_type,
const IDataType & indexes_type)
{
auto type = dictionary_type;
auto * type = &keys_type;
if (type->isNullable())
type = static_cast<const DataTypeNullable &>(*dictionary_type).getNestedType();
type = static_cast<const DataTypeNullable &>(keys_type).getNestedType().get();
if (type->isString())
return createColumnImpl<ColumnString>();
return createColumnUniqueImpl<ColumnString>(keys_type, indexes_type);
if (type->isFixedString())
return createColumnImpl<ColumnFixedString>();
if (typeid_cast<const DataTypeDate *>(type.get()))
return createColumnImpl<ColumnVector<UInt16>>();
if (typeid_cast<const DataTypeDateTime *>(type.get()))
return createColumnImpl<ColumnVector<UInt32>>();
return createColumnUniqueImpl<ColumnFixedString>(keys_type, indexes_type);
if (typeid_cast<const DataTypeDate *>(type))
return createColumnUniqueImpl<ColumnVector<UInt16>>(keys_type, indexes_type);
if (typeid_cast<const DataTypeDateTime *>(type))
return createColumnUniqueImpl<ColumnVector<UInt32>>(keys_type, indexes_type);
if (type->isNumber())
{
MutableColumnPtr column;
TypeListNumbers::forEach(CreateColumnVector(column, this, type.get()));
MutableColumnUniquePtr column;
TypeListNumbers::forEach(CreateColumnVector(column, keys_type, indexes_type));
if (!column)
throw Exception("Unexpected numeric type: " + type->getName(), ErrorCodes::LOGICAL_ERROR);
@ -285,6 +699,13 @@ MutableColumnPtr DataTypeWithDictionary::createColumn() const
ErrorCodes::LOGICAL_ERROR);
}
MutableColumnPtr DataTypeWithDictionary::createColumn() const
{
MutableColumnPtr indexes = indexes_type->createColumn();
MutableColumnPtr dictionary = createColumnUnique(*dictionary_type, *indexes_type);
return ColumnWithDictionary::create(std::move(dictionary), std::move(indexes));
}
bool DataTypeWithDictionary::equals(const IDataType & rhs) const
{
if (typeid(rhs) != typeid(*this))

View File

@ -1,5 +1,6 @@
#pragma once
#include <DataTypes/IDataType.h>
#include <Columns/IColumnUnique.h>
namespace DB
{
@ -22,26 +23,32 @@ public:
}
const char * getFamilyName() const override { return "WithDictionary"; }
void enumerateStreams(StreamCallback callback, SubstreamPath path) const override;
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const override;
void serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
OutputStreamGetter getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath path) const override;
DeserializeBinaryBulkStatePtr createDeserializeBinaryBulkState() const override;
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & state) const override;
void deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
InputStreamGetter getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath path,
const DeserializeBinaryBulkStatePtr & state) const override;
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const override;
void serializeBinary(const Field & field, WriteBuffer & ostr) const override;
void deserializeBinary(Field & field, ReadBuffer & istr) const override;
@ -139,6 +146,8 @@ public:
bool onlyNull() const override { return false; }
bool withDictionary() const override { return true; }
static MutableColumnUniquePtr createColumnUnique(const IDataType & keys_type, const IDataType & indexes_type);
private:
template <typename ... Args>
@ -156,10 +165,11 @@ private:
DeserealizeFunctionPtr<Args ...> func, Args ... args) const;
template <typename ColumnType, typename IndexType>
MutableColumnPtr createColumnImpl() const;
static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type);
template <typename ColumnType>
MutableColumnPtr createColumnImpl() const;
static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type, const IDataType & indexes_type);
friend struct CreateColumnVector;
};

View File

@ -94,14 +94,62 @@ public:
using SubstreamPath = std::vector<Substream>;
using StreamCallback = std::function<void(const SubstreamPath &)>;
virtual void enumerateStreams(StreamCallback callback, SubstreamPath path) const
virtual void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
callback(path);
}
void enumerateStreams(const StreamCallback & callback, SubstreamPath && path) const { enumerateStreams(callback, path); }
void enumerateStreams(const StreamCallback & callback) const { enumerateStreams(callback, {}); }
using OutputStreamGetter = std::function<WriteBuffer*(const SubstreamPath &)>;
using InputStreamGetter = std::function<ReadBuffer*(const SubstreamPath &)>;
struct SerializeBinaryBulkState
{
virtual ~SerializeBinaryBulkState() = default;
};
struct DeserializeBinaryBulkState
{
virtual ~DeserializeBinaryBulkState() = default;
};
using SerializeBinaryBulkStatePtr = std::shared_ptr<SerializeBinaryBulkState>;
using DeserializeBinaryBulkStatePtr = std::shared_ptr<DeserializeBinaryBulkState>;
struct SerializeBinaryBulkSettings
{
OutputStreamGetter getter;
SubstreamPath path;
bool position_independent_encoding = true;
size_t max_dictionary_size = 0;
};
struct DeserializeBinaryBulkSettings
{
InputStreamGetter getter;
SubstreamPath path;
bool position_independent_encoding = true;
/// If not zero, may be used to avoid reallocations while reading column of String type.
double avg_value_size_hint = 0;
};
/// Call before serializeBinaryBulkWithMultipleStreams chain to write something before first mark.
virtual void serializeBinaryBulkStatePrefix(
SerializeBinaryBulkSettings & /*settings*/,
SerializeBinaryBulkStatePtr & /*state*/) const {}
/// Call after serializeBinaryBulkWithMultipleStreams chain to finish serialization.
virtual void serializeBinaryBulkStateSuffix(
SerializeBinaryBulkSettings & /*settings*/,
SerializeBinaryBulkStatePtr & /*state*/) const {}
/// Call before before deserializeBinaryBulkWithMultipleStreams chain to get DeserializeBinaryBulkStatePtr.
virtual void deserializeBinaryBulkStatePrefix(
DeserializeBinaryBulkSettings & /*settings*/,
DeserializeBinaryBulkStatePtr & /*state*/) const {}
/** 'offset' and 'limit' are used to specify range.
* limit = 0 - means no limit.
* offset must be not greater than size of column.
@ -110,38 +158,24 @@ public:
*/
virtual void serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
OutputStreamGetter getter,
size_t offset,
size_t limit,
bool /*position_independent_encoding*/,
SubstreamPath path) const
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & /*state*/) const
{
if (WriteBuffer * stream = getter(path))
if (WriteBuffer * stream = settings.getter(settings.path))
serializeBinaryBulk(column, *stream, offset, limit);
}
struct DeserializeBinaryBulkState
{
virtual ~DeserializeBinaryBulkState() = default;
};
using DeserializeBinaryBulkStatePtr = std::shared_ptr<DeserializeBinaryBulkState>;
virtual DeserializeBinaryBulkStatePtr createDeserializeBinaryBulkState() const { return nullptr; }
/** Read no more than limit values and append them into column.
* avg_value_size_hint - if not zero, may be used to avoid reallocations while reading column of String type.
*/
/// Read no more than limit values and append them into column.
virtual void deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
InputStreamGetter getter,
size_t limit,
double avg_value_size_hint,
bool /*position_independent_encoding*/,
SubstreamPath path,
const DeserializeBinaryBulkStatePtr & /*state*/) const
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & /*state*/) const
{
if (ReadBuffer * stream = getter(path))
deserializeBinaryBulk(column, *stream, limit, avg_value_size_hint);
if (ReadBuffer * stream = settings.getter(settings.path))
deserializeBinaryBulk(column, *stream, limit, settings.avg_value_size_hint);
}
/** Override these methods for data types that require just single stream (most of data types).

View File

@ -1601,7 +1601,7 @@ private:
auto res_keys = std::move(res.column);
auto idx = col_with_dict->getUnique()->uniqueInsertRangeFrom(*res_keys, 0, res_keys->size());
col_with_dict->getIndexes()->insertRangeFrom(*idx->index(res_indexes, 0), 0, res_indexes->size());
col_with_dict->getIndexes()->insertRangeFrom(*idx->index(*res_indexes, 0), 0, res_indexes->size());
}
else
col_with_dict->insertRangeFromFullColumn(*res.column, 0, res.column->size());
@ -1609,7 +1609,7 @@ private:
res.column = std::move(res_column);
}
else
res.column = res.column->index(res_indexes, 0);
res.column = res.column->index(*res_indexes, 0);
};
}

View File

@ -329,7 +329,8 @@ void PreparedFunctionImpl::execute(Block & block, const ColumnNumbers & args, si
ErrorCodes::LOGICAL_ERROR);
col_with_dict->insertRangeFromFullColumn(*temp_res_col, 0, temp_res_col->size());
res_col.column = indexes ? col_with_dict->index(indexes, 0) : std::move(col_wit_dict_ptr);
res_col.column = indexes ? col_with_dict->index(*indexes, 0)
: std::move(col_wit_dict_ptr);
return;
}
}

View File

@ -546,12 +546,13 @@ void MergeTreeDataPart::accumulateColumnSizes(ColumnToSize & column_to_size) con
for (const NameAndTypePair & name_type : storage.getColumns().getAllPhysical())
{
IDataType::SubstreamPath path;
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
Poco::File bin_file(getFullPath() + IDataType::getFileNameForStream(name_type.name, substream_path) + ".bin");
if (bin_file.exists())
column_to_size[name_type.name] += bin_file.getSize();
}, {});
}, path);
}
}
@ -597,6 +598,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
{
for (const NameAndTypePair & name_type : columns)
{
IDataType::SubstreamPath stream_path;
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
@ -608,7 +610,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
if (!checksums.files.count(bin_file_name))
throw Exception("No " + bin_file_name + " file checksum for column " + name + " in part " + path,
ErrorCodes::NO_FILE_IN_DATA_PART);
}, {});
}, stream_path);
}
}
@ -678,7 +680,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
throw Exception("Part " + path + " is broken: marks have different sizes.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
}, {});
});
}
}
}

View File

@ -353,7 +353,8 @@ void MergeTreeReader::addStreams(const String & name, const IDataType & type, co
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
};
type.enumerateStreams(callback, {});
IDataType::SubstreamPath path;
type.enumerateStreams(callback, path);
}
@ -362,36 +363,42 @@ void MergeTreeReader::readData(
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
bool with_offsets)
{
IDataType::InputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer *
auto get_stream_getter = [&](bool stream_for_prefix) -> IDataType::InputStreamGetter
{
/// If offsets for arrays have already been read.
if (!with_offsets && path.size() == 1 && path[0].type == IDataType::Substream::ArraySizes)
return nullptr;
return [&](const IDataType::SubstreamPath & path) -> ReadBuffer *
{
/// If offsets for arrays have already been read.
if (!with_offsets && path.size() == 1 && path[0].type == IDataType::Substream::ArraySizes)
return nullptr;
String stream_name = IDataType::getFileNameForStream(name, path);
String stream_name = IDataType::getFileNameForStream(name, path);
auto it = streams.find(stream_name);
if (it == streams.end())
return nullptr;
auto it = streams.find(stream_name);
if (it == streams.end())
return nullptr;
Stream & stream = *it->second;
Stream & stream = *it->second;
if (!continue_reading)
stream.seekToMark(from_mark);
if (!continue_reading && !stream_for_prefix)
stream.seekToMark(from_mark);
return stream.data_buffer;
return stream.data_buffer;
};
};
if (!continue_reading)
deserialize_binary_bulk_state_map[name] = type.createDeserializeBinaryBulkState();
double & avg_value_size_hint = avg_value_size_hints[name];
IDataType::DeserializeBinaryBulkSettings settings;
settings.avg_value_size_hint = avg_value_size_hint;
if (deserialize_binary_bulk_state_map.count(name) == 0)
throw Exception("DeserializeBinaryBulkState wasn't created for column " + name, ErrorCodes::LOGICAL_ERROR);
{
settings.getter = get_stream_getter(true);
type.deserializeBinaryBulkStatePrefix(settings, deserialize_binary_bulk_state_map[name]);
}
double & avg_value_size_hint = avg_value_size_hints[name];
settings.getter = get_stream_getter(false);
auto & deserialize_state = deserialize_binary_bulk_state_map[name];
type.deserializeBinaryBulkWithMultipleStreams(column, stream_getter, max_rows_to_read,
avg_value_size_hint, true, {}, deserialize_state);
type.deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, settings, deserialize_state);
IDataType::updateAvgValueSizeHint(column, avg_value_size_hint);
}

View File

@ -64,7 +64,28 @@ void IMergedBlockOutputStream::addStreams(
aio_threshold);
};
type.enumerateStreams(callback, {});
IDataType::SubstreamPath stream_path;
type.enumerateStreams(callback, stream_path);
}
IDataType::OutputStreamGetter IMergedBlockOutputStream::createStreamGetter(
const String & name, OffsetColumns & offset_columns, bool skip_offsets)
{
return [&, skip_offsets] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
return nullptr;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return nullptr;
return &column_streams[stream_name]->compressed;
};
}
@ -73,8 +94,13 @@ void IMergedBlockOutputStream::writeData(
const IDataType & type,
const IColumn & column,
OffsetColumns & offset_columns,
bool skip_offsets)
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state)
{
IDataType::SerializeBinaryBulkSettings settings;
settings.getter = createStreamGetter(name, offset_columns, skip_offsets);
settings.max_dictionary_size = 1024;
size_t size = column.size();
size_t prev_mark = 0;
while (prev_mark < size)
@ -109,25 +135,10 @@ void IMergedBlockOutputStream::writeData(
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}, {});
}, settings.path);
}
IDataType::OutputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
return nullptr;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return nullptr;
return &column_streams[stream_name]->compressed;
};
type.serializeBinaryBulkWithMultipleStreams(column, stream_getter, prev_mark, limit, true, {});
type.serializeBinaryBulkWithMultipleStreams(column, prev_mark, limit, settings, serialization_state);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
@ -143,7 +154,7 @@ void IMergedBlockOutputStream::writeData(
return;
column_streams[stream_name]->compressed.nextIfAtEnd();
}, {});
}, settings.path);
prev_mark += limit;
}
@ -157,7 +168,7 @@ void IMergedBlockOutputStream::writeData(
String stream_name = IDataType::getFileNameForStream(name, substream_path);
offset_columns.insert(stream_name);
}
}, {});
}, settings.path);
}
@ -284,6 +295,17 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
const NamesAndTypesList * total_column_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums)
{
/// Finish columns serialization.
IDataType::SerializeBinaryBulkSettings settings;
settings.max_dictionary_size = 1024;
OffsetColumns offset_columns;
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
{
settings.getter = createStreamGetter(it->name, offset_columns, false);
it->type->serializeBinaryBulkStateSuffix(settings, serialization_states[i]);
}
if (!total_column_list)
total_column_list = &columns_list;
@ -404,28 +426,44 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
index_columns[i] = primary_columns[i].column->cloneEmpty();
}
/// Now write the data.
for (const auto & it : columns_list)
if (serialization_states.empty())
{
const ColumnWithTypeAndName & column = block.getByName(it.name);
serialization_states.reserve(columns_list.size());
OffsetColumns tmp_offset_columns;
IDataType::SerializeBinaryBulkSettings settings;
for (const auto & col : columns_list)
{
settings.getter = createStreamGetter(col.name, tmp_offset_columns, false);
serialization_states.emplace_back(nullptr);
col.type->serializeBinaryBulkStatePrefix(settings, serialization_states.back());
}
}
/// Now write the data.
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
{
const ColumnWithTypeAndName & column = block.getByName(it->name);
if (permutation)
{
auto primary_column_it = primary_columns_name_to_position.find(it.name);
auto primary_column_it = primary_columns_name_to_position.find(it->name);
if (primary_columns_name_to_position.end() != primary_column_it)
{
writeData(column.name, *column.type, *primary_columns[primary_column_it->second].column, offset_columns, false);
auto & primary_column = *primary_columns[primary_column_it->second].column;
writeData(column.name, *column.type, primary_column, offset_columns, false, serialization_states[i]);
}
else
{
/// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM.
ColumnPtr permutted_column = column.column->permute(*permutation, 0);
writeData(column.name, *column.type, *permutted_column, offset_columns, false);
ColumnPtr permuted_column = column.column->permute(*permutation, 0);
writeData(column.name, *column.type, *permuted_column, offset_columns, false, serialization_states[i]);
}
}
else
{
writeData(column.name, *column.type, *column.column, offset_columns, false);
writeData(column.name, *column.type, *column.column, offset_columns, false, serialization_states[i]);
}
}
@ -479,11 +517,21 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
if (!initialized)
{
column_streams.clear();
serialization_states.clear();
serialization_states.reserve(block.columns());
OffsetColumns tmp_offset_columns;
IDataType::SerializeBinaryBulkSettings settings;
for (size_t i = 0; i < block.columns(); ++i)
{
addStreams(part_path, block.safeGetByPosition(i).name,
*block.safeGetByPosition(i).type, 0, skip_offsets);
const auto & col = block.safeGetByPosition(i);
addStreams(part_path, col.name, *col.type, 0, skip_offsets);
serialization_states.emplace_back(nullptr);
settings.getter = createStreamGetter(col.name, tmp_offset_columns, false);
col.type->serializeBinaryBulkStatePrefix(settings, serialization_states.back());
}
initialized = true;
}
@ -493,7 +541,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
writeData(column.name, *column.type, *column.column, offset_columns, skip_offsets);
writeData(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i]);
}
size_t written_for_last_mark = (storage.index_granularity - index_offset + rows) % storage.index_granularity;
@ -507,6 +555,17 @@ void MergedColumnOnlyOutputStream::writeSuffix()
MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums()
{
/// Finish columns serialization.
IDataType::SerializeBinaryBulkSettings settings;
settings.max_dictionary_size = 1024;
OffsetColumns offset_columns;
for (size_t i = 0; i < header.columns(); ++i)
{
auto & column = header.safeGetByPosition(i);
settings.getter = createStreamGetter(column.name, offset_columns, skip_offsets);
column.type->serializeBinaryBulkStateSuffix(settings, serialization_states[i]);
}
MergeTreeData::DataPart::Checksums checksums;
for (auto & column_stream : column_streams)
@ -519,6 +578,7 @@ MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndG
}
column_streams.clear();
serialization_states.clear();
initialized = false;
return checksums;

View File

@ -25,6 +25,8 @@ public:
protected:
using OffsetColumns = std::set<std::string>;
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::vector<SerializationState>;
struct ColumnStream
{
@ -64,8 +66,12 @@ protected:
void addStreams(const String & path, const String & name, const IDataType & type, size_t estimated_size, bool skip_offsets);
IDataType::OutputStreamGetter createStreamGetter(const String & name, OffsetColumns & offset_columns, bool skip_offsets);
/// Write data of one column.
void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns, bool skip_offsets);
void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns,
bool skip_offsets, IDataType::SerializeBinaryBulkStatePtr & serialization_state);
MergeTreeData & storage;
@ -135,6 +141,7 @@ private:
private:
NamesAndTypesList columns_list;
SerializationStates serialization_states;
String part_path;
size_t rows_count = 0;
@ -161,6 +168,7 @@ public:
private:
Block header;
SerializationStates serialization_states;
String part_path;
bool initialized = false;

View File

@ -249,6 +249,8 @@ MergeTreeData::DataPart::Checksums checkDataPart(
while (true)
{
IDataType::DeserializeBinaryBulkSettings settings;
/// Check that mark points to current position in file.
bool marks_eof = false;
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
@ -270,7 +272,7 @@ MergeTreeData::DataPart::Checksums checkDataPart(
+ ", mrk file offset: " + toString(stream.mrk_hashing_buf.count()));
throw;
}
}, {});
}, settings.path);
++mark_num;
@ -278,19 +280,18 @@ MergeTreeData::DataPart::Checksums checkDataPart(
/// NOTE Shared array sizes of Nested columns are read more than once. That's Ok.
MutableColumnPtr tmp_column = name_type.type->createColumn();
auto state = name_type.type->createDeserializeBinaryBulkState();
name_type.type->deserializeBinaryBulkWithMultipleStreams(
*tmp_column,
[&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
auto stream_it = streams.find(file_name);
if (stream_it == streams.end())
throw Exception("Logical error: cannot find stream " + file_name);
return &stream_it->second.uncompressed_hashing_buf;
},
index_granularity,
0, true, {}, state);
settings.getter = [&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
auto stream_it = streams.find(file_name);
if (stream_it == streams.end())
throw Exception("Logical error: cannot find stream " + file_name);
return &stream_it->second.uncompressed_hashing_buf;
};
IDataType::DeserializeBinaryBulkStatePtr state;
name_type.type->deserializeBinaryBulkStatePrefix(settings, state);
name_type.type->deserializeBinaryBulkWithMultipleStreams(*tmp_column, index_granularity, settings, state);
size_t read_size = tmp_column->size();
column_size += read_size;

View File

@ -76,6 +76,7 @@ protected:
Block readImpl() override;
private:
size_t block_size;
NamesAndTypesList columns;
StorageLog & storage;
@ -101,6 +102,10 @@ private:
using FileStreams = std::map<std::string, Stream>;
FileStreams streams;
using DeserializeState = IDataType::DeserializeBinaryBulkStatePtr;
using DeserializeStates = std::map<String, DeserializeState>;
DeserializeStates deserialize_states;
void readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read);
};
@ -167,6 +172,12 @@ private:
WriteBufferFromFile marks_stream; /// Declared below `lock` to make the file open when rwlock is captured.
using SerializeState = IDataType::SerializeBinaryBulkStatePtr;
using SerializeStates = std::map<String, SerializeState>;
SerializeStates serialize_states;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenStreams & written_streams);
void writeData(const String & name, const IDataType & type, const IColumn & column,
MarksForColumns & out_marks,
WrittenStreams & written_streams);
@ -225,26 +236,36 @@ Block LogBlockInputStream::readImpl()
void LogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read)
{
IDataType::InputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer *
IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint.
auto createStringGetter = [&](bool stream_for_prefix)
{
String stream_name = IDataType::getFileNameForStream(name, path);
return [&] (const IDataType::SubstreamPath & path) -> ReadBuffer *
{
String stream_name = IDataType::getFileNameForStream(name, path);
const auto & file_it = storage.files.find(stream_name);
if (storage.files.end() == file_it)
throw Exception("Logical error: no information about file " + stream_name + " in StorageLog", ErrorCodes::LOGICAL_ERROR);
const auto & file_it = storage.files.find(stream_name);
if (storage.files.end() == file_it)
throw Exception("Logical error: no information about file " + stream_name + " in StorageLog", ErrorCodes::LOGICAL_ERROR);
auto it = streams.try_emplace(stream_name,
file_it->second.data_file.path(),
mark_number
? file_it->second.marks[mark_number].offset
: 0,
max_read_buffer_size).first;
UInt64 offset = 0;
if (!stream_for_prefix && mark_number)
offset = file_it->second.marks[mark_number].offset;
return &it->second.compressed;
auto & data_file_path = file_it->second.data_file.path();
auto it = streams.try_emplace(stream_name, data_file_path, offset, max_read_buffer_size).first;
return &it->second.compressed;
};
};
auto state = type.createDeserializeBinaryBulkState();
type.deserializeBinaryBulkWithMultipleStreams(column, stream_getter, max_rows_to_read, 0, true, {}, state); /// TODO Use avg_value_size_hint.
if (deserialize_states.count(name) == 0)
{
settings.getter = createStringGetter(true);
type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]);
}
settings.getter = createStringGetter(false);
type.deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, settings, deserialize_states[name]);
}
@ -274,6 +295,18 @@ void LogBlockOutputStream::writeSuffix()
return;
done = true;
WrittenStreams written_streams;
IDataType::SerializeBinaryBulkSettings settings;
for (const auto & column : getHeader())
{
auto it = serialize_states.find(column.name);
if (it != serialize_states.end())
{
settings.getter = createStreamGetter(column.name, written_streams);
column.type->serializeBinaryBulkStateSuffix(settings, it->second);
}
}
/// Finish write.
marks_stream.next();
@ -291,27 +324,10 @@ void LogBlockOutputStream::writeSuffix()
}
void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column,
MarksForColumns & out_marks,
WrittenStreams & written_streams)
IDataType::OutputStreamGetter LogBlockOutputStream::createStreamGetter(const String & name,
WrittenStreams & written_streams)
{
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
{
String stream_name = IDataType::getFileNameForStream(name, path);
if (written_streams.count(stream_name))
return;
const auto & file = storage.files[stream_name];
const auto stream_it = streams.try_emplace(stream_name, storage.files[stream_name].data_file.path(), storage.max_compress_block_size).first;
Mark mark;
mark.rows = (file.marks.empty() ? 0 : file.marks.back().rows) + column.size();
mark.offset = stream_it->second.plain_offset + stream_it->second.plain.count();
out_marks.emplace_back(file.column_index, mark);
}, {});
IDataType::OutputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
return [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
{
String stream_name = IDataType::getFileNameForStream(name, path);
if (written_streams.count(stream_name))
@ -319,11 +335,50 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
auto it = streams.find(stream_name);
if (streams.end() == it)
throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream", ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream",
ErrorCodes::LOGICAL_ERROR);
return &it->second.compressed;
};
}
type.serializeBinaryBulkWithMultipleStreams(column, stream_getter, 0, 0, true, {});
void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column,
MarksForColumns & out_marks,
WrittenStreams & written_streams)
{
IDataType::SerializeBinaryBulkSettings settings;
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
{
String stream_name = IDataType::getFileNameForStream(name, path);
if (written_streams.count(stream_name))
return;
streams.try_emplace(stream_name, storage.files[stream_name].data_file.path(), storage.max_compress_block_size);
}, settings.path);
settings.getter = createStreamGetter(name, written_streams);
if (serialize_states.count(name) == 0)
type.serializeBinaryBulkStatePrefix(settings, serialize_states[name]);
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
{
String stream_name = IDataType::getFileNameForStream(name, path);
if (written_streams.count(stream_name))
return;
const auto & file = storage.files[stream_name];
const auto stream_it = streams.find(stream_name);
Mark mark;
mark.rows = (file.marks.empty() ? 0 : file.marks.back().rows) + column.size();
mark.offset = stream_it->second.plain_offset + stream_it->second.plain.count();
out_marks.emplace_back(file.column_index, mark);
}, settings.path);
type.serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]);
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
{
@ -335,7 +390,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
if (streams.end() == it)
throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream", ErrorCodes::LOGICAL_ERROR);
it->second.compressed.next();
}, {});
}, settings.path);
}
@ -401,7 +456,8 @@ void StorageLog::addFiles(const String & column_name, const IDataType & type)
}
};
type.enumerateStreams(stream_callback, {});
IDataType::SubstreamPath path;
type.enumerateStreams(stream_callback, path);
}
@ -474,11 +530,12 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount() const
* If this is a data type with multiple stream, get the first stream, that we assume have real row count.
* (Example: for Array data type, first stream is array sizes; and number of array sizes is the number of arrays).
*/
IDataType::SubstreamPath path;
column_type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
if (filename.empty())
filename = IDataType::getFileNameForStream(column_name, substream_path);
}, {});
}, path);
Files_t::const_iterator it = files.find(filename);
if (files.end() == it)

View File

@ -95,6 +95,10 @@ private:
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;
FileStreams streams;
using DeserializeState = IDataType::DeserializeBinaryBulkStatePtr;
using DeserializeStates = std::map<String, DeserializeState>;
DeserializeStates deserialize_states;
void readData(const String & name, const IDataType & type, IColumn & column, size_t limit);
};
@ -149,8 +153,13 @@ private:
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;
FileStreams streams;
using SerializeState = IDataType::SerializeBinaryBulkStatePtr;
using SerializeStates = std::map<String, SerializeState>;
SerializeStates serialize_states;
using WrittenStreams = std::set<std::string>;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenStreams & written_streams);
void writeData(const String & name, const IDataType & type, const IColumn & column, WrittenStreams & written_streams);
};
@ -206,7 +215,8 @@ Block TinyLogBlockInputStream::readImpl()
void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t limit)
{
IDataType::InputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer *
IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint.
settings.getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer *
{
String stream_name = IDataType::getFileNameForStream(name, path);
@ -216,14 +226,17 @@ void TinyLogBlockInputStream::readData(const String & name, const IDataType & ty
return &streams[stream_name]->compressed;
};
auto state = type.createDeserializeBinaryBulkState();
type.deserializeBinaryBulkWithMultipleStreams(column, stream_getter, limit, 0, true, {}, state); /// TODO Use avg_value_size_hint.
if (deserialize_states.count(name) == 0)
type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]);
type.deserializeBinaryBulkWithMultipleStreams(column, limit, settings, deserialize_states[name]);
}
void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, WrittenStreams & written_streams)
IDataType::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(const String & name,
WrittenStreams & written_streams)
{
IDataType::OutputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
return [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
{
String stream_name = IDataType::getFileNameForStream(name, path);
@ -231,12 +244,23 @@ void TinyLogBlockOutputStream::writeData(const String & name, const IDataType &
return nullptr;
if (!streams.count(stream_name))
streams[stream_name] = std::make_unique<Stream>(storage.files[stream_name].data_file.path(), storage.max_compress_block_size);
streams[stream_name] = std::make_unique<Stream>(storage.files[stream_name].data_file.path(),
storage.max_compress_block_size);
return &streams[stream_name]->compressed;
};
}
type.serializeBinaryBulkWithMultipleStreams(column, stream_getter, 0, 0, true, {});
void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, WrittenStreams & written_streams)
{
IDataType::SerializeBinaryBulkSettings settings;
settings.getter = createStreamGetter(name, written_streams);
if (serialize_states.count(name) == 0)
type.serializeBinaryBulkStatePrefix(settings, serialize_states[name]);
type.serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]);
}
@ -246,6 +270,18 @@ void TinyLogBlockOutputStream::writeSuffix()
return;
done = true;
WrittenStreams written_streams;
IDataType::SerializeBinaryBulkSettings settings;
for (const auto & column : getHeader())
{
auto it = serialize_states.find(column.name);
if (it != serialize_states.end())
{
settings.getter = createStreamGetter(column.name, written_streams);
column.type->serializeBinaryBulkStateSuffix(settings, it->second);
}
}
/// Finish write.
for (auto & stream : streams)
stream.second->finalize();
@ -321,7 +357,8 @@ void StorageTinyLog::addFiles(const String & column_name, const IDataType & type
}
};
type.enumerateStreams(stream_callback, {});
IDataType::SubstreamPath path;
type.enumerateStreams(stream_callback, path);
}