Make DatsTypeWithDictionary independent from index type.

This commit is contained in:
Nikolai Kochetov 2018-07-09 21:19:03 +03:00
parent c542cb6314
commit 9c763f8090
14 changed files with 1015 additions and 566 deletions

View File

@ -39,10 +39,10 @@ struct StringRefWrapper
namespace ZeroTraits
{
template <typename ColumnType>
bool check(const StringRefWrapper<ColumnType> x) { return nullptr == x.column; }
bool check(const StringRefWrapper<ColumnType> x) { return nullptr == x.column && nullptr == x.ref.data; }
template <typename ColumnType>
void set(StringRefWrapper<ColumnType> & x) { x.column = nullptr; }
void set(StringRefWrapper<ColumnType> & x) { x.column = nullptr; x.ref.data = nullptr; }
};
@ -52,7 +52,7 @@ namespace DB
template <typename ColumnType>
class ColumnUnique final : public COWPtrHelper<IColumnUnique, ColumnUnique<ColumnType>>
{
friend class COWPtrHelper<IColumnUnique, ColumnUnique<ColumnType>;
friend class COWPtrHelper<IColumnUnique, ColumnUnique<ColumnType>>;
private:
explicit ColumnUnique(MutableColumnPtr && holder, bool is_nullable);
@ -60,7 +60,9 @@ private:
ColumnUnique(const ColumnUnique & other) : column_holder(other.column_holder), is_nullable(other.is_nullable) {}
public:
ColumnPtr getNestedColumn() const override;
MutableColumnPtr cloneEmpty() const override;
const ColumnPtr & getNestedColumn() const override;
const ColumnPtr & getNestedNotNullableColumn() const override { return column_holder; }
size_t uniqueInsert(const Field & x) override;
@ -71,7 +73,6 @@ public:
size_t uniqueInsertData(const char * pos, size_t length) override;
size_t uniqueInsertDataWithTerminatingZero(const char * pos, size_t length) override;
size_t uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) override;
IColumnUnique::SerializableState getSerializableState() const override;
size_t getDefaultValueIndex() const override { return is_nullable ? 1 : 0; }
size_t getNullValueIndex() const override;
@ -119,6 +120,7 @@ public:
void forEachSubcolumn(IColumn::ColumnCallback callback) override
{
callback(column_holder);
index = nullptr;
}
private:
@ -129,6 +131,7 @@ private:
/// For DataTypeNullable, stores null map.
mutable ColumnPtr cached_null_mask;
mutable ColumnPtr cached_column_nullable;
/// Lazy initialized.
std::unique_ptr<IndexMapType> index;
@ -147,11 +150,19 @@ private:
const IColumn & src,
size_t start,
size_t length,
size_t num_added_rows,
typename ColumnVector<IndexType>::MutablePtr && positions_column,
ColumnType * overflowed_keys,
IndexMapType * secondary_index,
size_t max_dictionary_size);
};
template <typename ColumnType>
MutableColumnPtr ColumnUnique<ColumnType>::cloneEmpty() const
{
return ColumnUnique<ColumnType>::create(column_holder->cloneResized(numSpecialValues()), is_nullable);
}
template <typename ColumnType>
ColumnUnique<ColumnType>::ColumnUnique(const IDataType & type) : is_nullable(type.isNullable())
{
@ -170,7 +181,7 @@ ColumnUnique<ColumnType>::ColumnUnique(MutableColumnPtr && holder, bool is_nulla
}
template <typename ColumnType>
ColumnPtr ColumnUnique<ColumnType>::getNestedColumn() const
const ColumnPtr & ColumnUnique<ColumnType>::getNestedColumn() const
{
if (is_nullable)
{
@ -180,6 +191,7 @@ ColumnPtr ColumnUnique<ColumnType>::getNestedColumn() const
ColumnUInt8::MutablePtr null_mask = ColumnUInt8::create(size, UInt8(0));
null_mask->getData()[getNullValueIndex()] = 1;
cached_null_mask = std::move(null_mask);
cached_column_nullable = ColumnNullable::create(column_holder, cached_null_mask);
}
if (cached_null_mask->size() != size)
@ -187,9 +199,10 @@ ColumnPtr ColumnUnique<ColumnType>::getNestedColumn() const
MutableColumnPtr null_mask = (*std::move(cached_null_mask)).mutate();
static_cast<ColumnUInt8 &>(*null_mask).getData().resize_fill(size);
cached_null_mask = std::move(null_mask);
cached_column_nullable = ColumnNullable::create(column_holder, cached_null_mask);
}
return ColumnNullable::create(column_holder, cached_null_mask);
return cached_column_nullable;
}
return column_holder;
}
@ -219,7 +232,7 @@ void ColumnUnique<ColumnType>::buildIndex()
}
template <typename ColumnType>
IndexType ColumnUnique<ColumnType>::insertIntoMap(const StringRefWrapper<ColumnType> & ref, IndexType value)
UInt64 ColumnUnique<ColumnType>::insertIntoMap(const StringRefWrapper<ColumnType> & ref, UInt64 value)
{
if (!index)
buildIndex();
@ -242,7 +255,7 @@ size_t ColumnUnique<ColumnType>::uniqueInsert(const Field & x)
return getNullValueIndex();
auto column = getRawColumnPtr();
auto prev_size = static_cast<IndexType>(column->size());
auto prev_size = static_cast<UInt64>(column->size());
if ((*column)[getDefaultValueIndex()] == x)
return getDefaultValueIndex();
@ -261,6 +274,9 @@ size_t ColumnUnique<ColumnType>::uniqueInsertFrom(const IColumn & src, size_t n)
if (is_nullable && src.isNullAt(n))
return getNullValueIndex();
if (auto * nullable = typeid_cast<const ColumnNullable *>(&src))
return uniqueInsertFrom(nullable->getNestedColumn(), n);
auto ref = src.getDataAt(n);
return uniqueInsertData(ref.data, ref.size);
}
@ -336,14 +352,31 @@ size_t ColumnUnique<ColumnType>::uniqueDeserializeAndInsertFromArena(const char
return static_cast<size_t>(index_pos);
}
template <typename IndexType>
static void checkIndexes(const ColumnVector<IndexType> & indexes, size_t max_dictionary_size)
{
auto & data = indexes.getData();
for (size_t i = 0; i < data.size(); ++i)
{
if (data[i] >= max_dictionary_size)
{
throw Exception("Found index " + toString(data[i]) + " at position " + toString(i)
+ " which is grated or equal than dictionary size " + toString(max_dictionary_size),
ErrorCodes::LOGICAL_ERROR);
}
}
}
template <typename ColumnType>
template <typename IndexType>
MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeImpl(
const IColumn & src,
size_t start,
size_t length,
size_t num_added_rows,
typename ColumnVector<IndexType>::MutablePtr && positions_column,
ColumnType * overflowed_keys,
IndexMapType * secondary_index,
size_t max_dictionary_size)
{
if (!index)
@ -353,9 +386,11 @@ MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeImpl(
const NullMap * null_map = nullptr;
auto & positions = positions_column->getData();
using SuperiorIndexType = NumberTraits::Construct<false, false, NumberTraits::nextSize(sizeof(IndexType))>::Type;
auto updatePosition = [&](UInt64 & next_position, UInt64 num_added_rows) -> MutableColumnPtr
auto updatePosition = [&](UInt64 & next_position) -> MutableColumnPtr
{
constexpr auto next_size = NumberTraits::nextSize(sizeof(IndexType));
using SuperiorIndexType = typename NumberTraits::Construct<false, false, next_size>::Type;
++next_position;
if (next_position > std::numeric_limits<IndexType>::max())
@ -364,102 +399,108 @@ MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeImpl(
throw Exception("Can't find superior index type for type " + demangle(typeid(IndexType).name()),
ErrorCodes::LOGICAL_ERROR);
auto expanded_column = ColumnVector<IndexType>::create(length);
auto expanded_column = ColumnVector<SuperiorIndexType>::create(length);
auto & expanded_data = expanded_column->getData();
for (size_t i = 0; i < num_added_rows; ++i)
expanded_data[i] = positions[i];
return uniqueInsertRangeImpl<SuperiorIndexType>(
src,
start + num_added_rows,
length - num_added_rows,
start,
length,
num_added_rows,
std::move(expanded_column),
overflowed_keys,
secondary_index,
max_dictionary_size);
}
return nullptr;
};
if (src.isColumnNullable())
if (auto nullable_column = typeid_cast<const ColumnNullable *>(&src))
{
auto nullable_column = static_cast<const ColumnNullable *>(&src);
src_column = static_cast<const ColumnType *>(&nullable_column->getNestedColumn());
src_column = typeid_cast<const ColumnType *>(&nullable_column->getNestedColumn());
null_map = &nullable_column->getNullMapData();
}
else
src_column = static_cast<const ColumnType *>(&src);
src_column = typeid_cast<const ColumnType *>(&src);
std::unique_ptr<IndexMapType> secondary_index;
if (overflowed_keys)
secondary_index = std::make_unique<IndexMapType>();
if (src_column == nullptr)
throw Exception("Invalid column type for ColumnUnique::insertRangeFrom. Expected " + column_holder->getName() +
", got " + src.getName(), ErrorCodes::ILLEGAL_COLUMN);
auto column = getRawColumnPtr();
UInt64 next_position = column->size();
for (auto i : ext::range(0, length))
if (secondary_index)
next_position += secondary_index->size();
for (; num_added_rows < length; ++num_added_rows)
{
auto row = start + i;
auto row = start + num_added_rows;
if (null_map && (*null_map)[row])
positions[i] = getNullValueIndex();
positions[num_added_rows] = getNullValueIndex();
else if (column->compareAt(getDefaultValueIndex(), row, *src_column, 1) == 0)
positions[i] = getDefaultValueIndex();
positions[num_added_rows] = getDefaultValueIndex();
else
{
auto it = index->find(StringRefWrapper<ColumnType>(src_column, row));
if (it == index->end())
{
if (overflowed_keys && next_position >= max_dictionary_size + numSpecialValues())
if (overflowed_keys && next_position >= max_dictionary_size)
{
auto jt = secondary_index->find(StringRefWrapper<ColumnType>(src_column, row));
if (jt == secondary_index->end())
{
positions[i] = next_position;
positions[num_added_rows] = next_position;
auto ref = src_column->getDataAt(row);
overflowed_keys->insertData(ref.data, ref.size);
(*secondary_index)[StringRefWrapper<ColumnType>(src_column, row)] = next_position;
if (auto res = updatePosition(next_position, i))
if (auto res = updatePosition(next_position))
return res;
}
else
positions[i] = jt->second;
positions[num_added_rows] = jt->second;
}
else
{
positions[i] = next_position;
positions[num_added_rows] = next_position;
auto ref = src_column->getDataAt(row);
column->insertData(ref.data, ref.size);
(*index)[StringRefWrapper<ColumnType>(column, next_position)] = next_position;
if (auto res = updatePosition(next_position, i))
if (auto res = updatePosition(next_position))
return res;
}
}
else
positions[i] = it->second;
positions[num_added_rows] = it->second;
}
}
return positions_column;
checkIndexes(*positions_column, column->size() + (overflowed_keys ? overflowed_keys->size() : 0));
return std::move(positions_column);
}
template <typename ColumnType>
MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length)
{
size_t size = getRawColumnPtr()->size();
auto callForType = [&](auto x)
auto callForType = [this, &src, start, length](auto x) -> MutableColumnPtr
{
size_t size = getRawColumnPtr()->size();
using IndexType = decltype(x);
if (size <= std::numeric_limits<IndexType>::max())
{
auto positions_column = ColumnVector<IndexType>::create(length);
auto & positions = positions_column->getData();
return uniqueInsertRangeImpl(src, start, length, positions, nullptr, 0);
auto positions = ColumnVector<IndexType>::create(length);
return this->uniqueInsertRangeImpl<IndexType>(src, start, length, 0,
std::move(positions), nullptr, nullptr, 0);
}
return nullptr;
@ -488,21 +529,22 @@ IColumnUnique::IndexesWithOverflow ColumnUnique<ColumnType>::uniqueInsertRangeWi
size_t max_dictionary_size)
{
size_t size = getRawColumnPtr()->size();
auto overflowed_keys = column_holder->cloneEmpty();
auto overflowed_keys_ptr = typeid_cast<ColumnType *>(overflowed_keys.get());
if (!overflowed_keys_ptr)
throw Exception("Invalid keys type for ColumnUnique.", ErrorCodes::LOGICAL_ERROR);
auto callForType = [&](auto x)
auto callForType = [this, &src, start, length, overflowed_keys_ptr, max_dictionary_size](auto x) -> MutableColumnPtr
{
size_t size = getRawColumnPtr()->size();
using IndexType = decltype(x);
if (size <= std::numeric_limits<IndexType>::max())
{
auto positions_column = ColumnVector<IndexType>::create(length);
auto & positions = positions_column->getData();
return uniqueInsertRangeImpl(src, start, length, positions, overflowed_keys_ptr, max_dictionary_size);
auto positions = ColumnVector<IndexType>::create(length);
IndexMapType secondary_index;
return this->uniqueInsertRangeImpl<IndexType>(src, start, length, 0, std::move(positions),
overflowed_keys_ptr, &secondary_index, max_dictionary_size);
}
return nullptr;
@ -526,15 +568,4 @@ IColumnUnique::IndexesWithOverflow ColumnUnique<ColumnType>::uniqueInsertRangeWi
return indexes_with_overflow;
}
template <typename ColumnType>
IColumnUnique::SerializableState ColumnUnique<ColumnType>::getSerializableState() const
{
IColumnUnique::SerializableState state;
state.column = column_holder;
state.offset = numSpecialValues();
state.limit = column_holder->size() - state.offset;
return state;
}
};

View File

@ -1,21 +1,192 @@
#include <Columns/ColumnWithDictionary.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/ColumnGathererStream.h>
#include <DataTypes/NumberTraits.h>
#include <Common/HashTable/HashMap.h>
namespace DB
{
ColumnWithDictionary::ColumnWithDictionary(MutableColumnPtr && column_unique_, MutableColumnPtr && indexes_)
: column_unique(std::move(column_unique_)), indexes(std::move(indexes_))
namespace
{
if (!dynamic_cast<const IColumnUnique *>(column_unique.get()))
throw Exception("ColumnUnique expected as argument of ColumnWithDictionary.", ErrorCodes::ILLEGAL_COLUMN);
template <typename T>
PaddedPODArray<T> * getIndexesData(IColumn & indexes)
{
auto * column = typeid_cast<ColumnVector<T> *>(&indexes);
if (column)
return &column->getData();
getSizeOfCurrentIndexType();
return nullptr;
}
template <typename T>
MutableColumnPtr mapUniqueIndexImpl(PaddedPODArray<T> & index)
{
PaddedPODArray<T> copy(index.cbegin(), index.cend());
HashMap<T, T> hash_map;
for (auto val : index)
hash_map.insert({val, hash_map.size()});
auto res_col = ColumnVector<T>::create();
auto & data = res_col->getData();
data.resize(hash_map.size());
for (auto val : hash_map)
data[val.second] = val.first;
for (auto & ind : index)
ind = hash_map[ind];
for (size_t i = 0; i < index.size(); ++i)
if (data[index[i]] != copy[i])
throw Exception("Expected " + toString(data[index[i]]) + ", but got " + toString(copy[i]), ErrorCodes::LOGICAL_ERROR);
return std::move(res_col);
}
/// Returns unique values of column. Write new index to column.
MutableColumnPtr mapUniqueIndex(IColumn & column)
{
if (auto * data_uint8 = getIndexesData<UInt8>(column))
return mapUniqueIndexImpl(*data_uint8);
else if (auto * data_uint16 = getIndexesData<UInt16>(column))
return mapUniqueIndexImpl(*data_uint16);
else if (auto * data_uint32 = getIndexesData<UInt32>(column))
return mapUniqueIndexImpl(*data_uint32);
else if (auto * data_uint64 = getIndexesData<UInt64>(column))
return mapUniqueIndexImpl(*data_uint64);
else
throw Exception("Indexes column for getUniqueIndex must be ColumnUInt, got" + column.getName(),
ErrorCodes::LOGICAL_ERROR);
}
}
ColumnWithDictionary::ColumnWithDictionary(const ColumnWithDictionary & other)
: column_unique(other.column_unique), indexes(other.indexes)
ColumnWithDictionary::ColumnWithDictionary(MutableColumnPtr && column_unique_, MutableColumnPtr && indexes_)
: dictionary(std::move(column_unique_)), idx(std::move(indexes_))
{
idx.check(getDictionary().size());
}
void ColumnWithDictionary::insert(const Field & x)
{
compactIfSharedDictionary();
idx.insertPosition(dictionary.getColumnUnique().uniqueInsert(x));
idx.check(getDictionary().size());
}
void ColumnWithDictionary::insertDefault()
{
idx.insertPosition(getDictionary().getDefaultValueIndex());
}
void ColumnWithDictionary::insertFrom(const IColumn & src, size_t n)
{
auto * src_with_dict = typeid_cast<const ColumnWithDictionary *>(&src);
if (!src_with_dict)
throw Exception("Expected ColumnWithDictionary, got" + src.getName(), ErrorCodes::ILLEGAL_COLUMN);
size_t position = src_with_dict->getIndexes().getUInt(n);
if (&src_with_dict->getDictionary() == &getDictionary())
{
/// Dictionary is shared with src column. Insert only index.
idx.insertPosition(position);
}
else
{
compactIfSharedDictionary();
const auto & nested = *src_with_dict->getDictionary().getNestedColumn();
idx.insertPosition(dictionary.getColumnUnique().uniqueInsertFrom(nested, position));
}
idx.check(getDictionary().size());
}
void ColumnWithDictionary::insertFromFullColumn(const IColumn & src, size_t n)
{
compactIfSharedDictionary();
idx.insertPosition(dictionary.getColumnUnique().uniqueInsertFrom(src, n));
idx.check(getDictionary().size());
}
void ColumnWithDictionary::insertRangeFrom(const IColumn & src, size_t start, size_t length)
{
auto * src_with_dict = typeid_cast<const ColumnWithDictionary *>(&src);
if (!src_with_dict)
throw Exception("Expected ColumnWithDictionary, got" + src.getName(), ErrorCodes::ILLEGAL_COLUMN);
if (&src_with_dict->getDictionary() == &getDictionary())
{
/// Dictionary is shared with src column. Insert only indexes.
idx.insertPositionsRange(src_with_dict->getIndexes(), start, length);
}
else
{
compactIfSharedDictionary();
/// TODO: Support native insertion from other unique column. It will help to avoid null map creation.
auto sub_idx = (*src_with_dict->getIndexes().cut(start, length)).mutate();
auto idx_map = mapUniqueIndex(*sub_idx);
auto src_nested = src_with_dict->getDictionary().getNestedColumn();
auto used_keys = src_nested->index(*idx_map, 0);
auto inserted_indexes = dictionary.getColumnUnique().uniqueInsertRangeFrom(*used_keys, 0, used_keys->size());
idx.insertPositionsRange(*inserted_indexes->index(*sub_idx, 0), 0, length);
}
idx.check(getDictionary().size());
}
void ColumnWithDictionary::insertRangeFromFullColumn(const IColumn & src, size_t start, size_t length)
{
compactIfSharedDictionary();
auto inserted_indexes = dictionary.getColumnUnique().uniqueInsertRangeFrom(src, start, length);
idx.insertPositionsRange(*inserted_indexes, 0, length);
idx.check(getDictionary().size());
}
void ColumnWithDictionary::insertRangeFromDictionaryEncodedColumn(const IColumn & keys, const IColumn & positions)
{
Index(positions.getPtr()).check(keys.size());
compactIfSharedDictionary();
auto inserted_indexes = dictionary.getColumnUnique().uniqueInsertRangeFrom(keys, 0, keys.size());
idx.insertPositionsRange(*inserted_indexes->index(positions, 0), 0, positions.size());
idx.check(getDictionary().size());
}
void ColumnWithDictionary::insertData(const char * pos, size_t length)
{
compactIfSharedDictionary();
idx.insertPosition(dictionary.getColumnUnique().uniqueInsertData(pos, length));
idx.check(getDictionary().size());
}
void ColumnWithDictionary::insertDataWithTerminatingZero(const char * pos, size_t length)
{
compactIfSharedDictionary();
idx.insertPosition(dictionary.getColumnUnique().uniqueInsertDataWithTerminatingZero(pos, length));
idx.check(getDictionary().size());
}
StringRef ColumnWithDictionary::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
return getDictionary().serializeValueIntoArena(getIndexes().getUInt(n), arena, begin);
}
const char * ColumnWithDictionary::deserializeAndInsertFromArena(const char * pos)
{
compactIfSharedDictionary();
const char * new_pos;
idx.insertPosition(dictionary.getColumnUnique().uniqueDeserializeAndInsertFromArena(pos, new_pos));
idx.check(getDictionary().size());
return new_pos;
}
void ColumnWithDictionary::gather(ColumnGathererStream & gatherer)
@ -25,88 +196,364 @@ void ColumnWithDictionary::gather(ColumnGathererStream & gatherer)
MutableColumnPtr ColumnWithDictionary::cloneResized(size_t size) const
{
auto unique_ptr = column_unique;
return ColumnWithDictionary::create((*std::move(unique_ptr)).mutate(), indexes->cloneResized(size));
auto unique_ptr = dictionary.getColumnUniquePtr();
return ColumnWithDictionary::create((*std::move(unique_ptr)).mutate(), getIndexes().cloneResized(size));
}
size_t ColumnWithDictionary::getSizeOfCurrentIndexType() const
int ColumnWithDictionary::compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const
{
if (typeid_cast<const ColumnUInt8 *>(indexes.get()))
return sizeof(UInt8);
if (typeid_cast<const ColumnUInt16 *>(indexes.get()))
return sizeof(UInt16);
if (typeid_cast<const ColumnUInt32 *>(indexes.get()))
return sizeof(UInt32);
if (typeid_cast<const ColumnUInt64 *>(indexes.get()))
return sizeof(UInt64);
const auto & column_with_dictionary = static_cast<const ColumnWithDictionary &>(rhs);
size_t n_index = getIndexes().getUInt(n);
size_t m_index = column_with_dictionary.getIndexes().getUInt(m);
return getDictionary().compareAt(n_index, m_index, column_with_dictionary.getDictionary(), nan_direction_hint);
}
throw Exception("Unexpected indexes type for ColumnWithDictionary. Expected ColumnUInt, got " + indexes->getName(),
void ColumnWithDictionary::getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const
{
if (limit == 0)
limit = size();
size_t unique_limit = std::min(limit, getDictionary().size());
Permutation unique_perm;
getDictionary().getNestedColumn()->getPermutation(reverse, unique_limit, nan_direction_hint, unique_perm);
/// TODO: optimize with sse.
/// Get indexes per row in column_unique.
std::vector<std::vector<size_t>> indexes_per_row(getDictionary().size());
size_t indexes_size = getIndexes().size();
for (size_t row = 0; row < indexes_size; ++row)
indexes_per_row[getIndexes().getUInt(row)].push_back(row);
/// Replicate permutation.
size_t perm_size = std::min(indexes_size, limit);
res.resize(perm_size);
size_t perm_index = 0;
for (size_t row = 0; row < indexes_size && perm_index < perm_size; ++row)
{
const auto & row_indexes = indexes_per_row[unique_perm[row]];
for (auto row_index : row_indexes)
{
res[perm_index] = row_index;
++perm_index;
if (perm_index == perm_size)
break;
}
}
}
std::vector<MutableColumnPtr> ColumnWithDictionary::scatter(ColumnIndex num_columns, const Selector & selector) const
{
auto columns = getIndexes().scatter(num_columns, selector);
for (auto & column : columns)
{
auto unique_ptr = dictionary.getColumnUniquePtr();
column = ColumnWithDictionary::create((*std::move(unique_ptr)).mutate(), std::move(column));
}
return columns;
}
void ColumnWithDictionary::setSharedDictionary(const ColumnPtr & column_unique)
{
if (!empty())
throw Exception("Can't set ColumnUnique for ColumnWithDictionary because is't not empty.",
ErrorCodes::LOGICAL_ERROR);
dictionary.setShared(column_unique);
}
ColumnWithDictionary::MutablePtr ColumnWithDictionary::compact()
{
auto positions = idx.getPositions();
/// Create column with new indexes and old dictionary.
auto column = ColumnWithDictionary::create(getDictionary().assumeMutable(), (*std::move(positions)).mutate());
/// Will create new dictionary.
column->compactInplace();
return column;
}
ColumnWithDictionary::MutablePtr ColumnWithDictionary::cutAndCompact(size_t start, size_t length) const
{
auto sub_positions = (*idx.getPositions()->cut(start, length)).mutate();
/// Create column with new indexes and old dictionary.
auto column = ColumnWithDictionary::create(getDictionary().assumeMutable(), std::move(sub_positions));
/// Will create new dictionary.
column->compactInplace();
return column;
}
void ColumnWithDictionary::compactInplace()
{
auto positions = idx.detachPositions();
dictionary.compact(positions);
idx.attachPositions(std::move(positions));
}
void ColumnWithDictionary::compactIfSharedDictionary()
{
if (dictionary.isShared())
compactInplace();
}
ColumnWithDictionary::Index::Index() : positions(ColumnUInt8::create()), size_of_type(sizeof(UInt8)) {}
ColumnWithDictionary::Index::Index(MutableColumnPtr && positions) : positions(std::move(positions))
{
updateSizeOfType();
}
ColumnWithDictionary::Index::Index(ColumnPtr positions) : positions(std::move(positions))
{
updateSizeOfType();
}
template <typename Callback>
void ColumnWithDictionary::Index::callForType(Callback && callback, size_t size_of_type)
{
switch (size_of_type)
{
case sizeof(UInt8): { callback(UInt8()); break; }
case sizeof(UInt16): { callback(UInt16()); break; }
case sizeof(UInt32): { callback(UInt32()); break; }
case sizeof(UInt64): { callback(UInt64()); break; }
default: {
throw Exception("Unexpected size of index type for ColumnWithDictionary: " + toString(size_of_type),
ErrorCodes::LOGICAL_ERROR);
}
}
}
size_t ColumnWithDictionary::Index::getSizeOfIndexType(const IColumn & column, size_t hint)
{
auto checkFor = [&](auto type) { return typeid_cast<const ColumnVector<decltype(type)> *>(&column) != nullptr; };
auto tryGetSizeFor = [&](auto type) -> size_t { return checkFor(type) ? sizeof(decltype(type)) : 0; };
if (hint)
{
size_t size = 0;
callForType([&](auto type) { size = tryGetSizeFor(type); }, hint);
if (size)
return size;
}
if (auto size = tryGetSizeFor(UInt8()))
return size;
if (auto size = tryGetSizeFor(UInt16()))
return size;
if (auto size = tryGetSizeFor(UInt32()))
return size;
if (auto size = tryGetSizeFor(UInt64()))
return size;
throw Exception("Unexpected indexes type for ColumnWithDictionary. Expected UInt, got " + column.getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
void ColumnWithDictionary::Index::attachPositions(ColumnPtr positions_)
{
positions = std::move(positions_);
updateSizeOfType();
}
template <typename IndexType>
void ColumnWithDictionary::convertIndexes()
typename ColumnVector<IndexType>::Container & ColumnWithDictionary::Index::getPositionsData()
{
auto * positions_ptr = typeid_cast<ColumnVector<IndexType> *>(positions->assumeMutable().get());
if (!positions_ptr)
throw Exception("Invalid indexes type for ColumnWithDictionary."
" Expected UInt" + toString(8 * sizeof(IndexType)) + ", got " + positions->getName(),
ErrorCodes::LOGICAL_ERROR);
return positions_ptr->getData();
}
template <typename IndexType>
void ColumnWithDictionary::Index::convertPositions()
{
auto convert = [&](auto x)
{
using CurIndexType = typeof(x);
if (auto * index_col = typeid_cast<const ColumnVector<CurIndexType> *>(indexes.get()))
using CurIndexType = decltype(x);
auto & data = getPositionsData<CurIndexType>();
if (sizeof(CurIndexType) > sizeof(IndexType))
throw Exception("Converting indexes to smaller type: from " + toString(sizeof(CurIndexType)) +
" to " + toString(sizeof(IndexType)), ErrorCodes::LOGICAL_ERROR);
if (sizeof(CurIndexType) != sizeof(IndexType))
{
if (sizeof(CurIndexType) != sizeof(IndexType))
{
size_t size = index_col->size();
auto new_index_col = ColumnVector<IndexType>::create(size);
auto & data = index_col->getData();
auto & new_data = new_index_col->getData();
size_t size = data.size();
auto new_positions = ColumnVector<IndexType>::create(size);
auto & new_data = new_positions->getData();
for (size_t i = 0; i < size; ++i)
new_data[i] = data[i];
/// TODO: Optimize with SSE?
for (size_t i = 0; i < size; ++i)
new_data[i] = data[i];
indexes = std::move(new_index_col);
}
return true;
positions = std::move(new_positions);
size_of_type = sizeof(IndexType);
}
return false;
};
if (!convert(UInt8()) &&
!convert(UInt16()) &&
!convert(UInt32()) &&
!convert(UInt64()))
throw Exception("Unexpected indexes type for ColumnWithDictionary. Expected ColumnUInt, got "
+ indexes->getName(), ErrorCodes::ILLEGAL_COLUMN);
callForType(std::move(convert), size_of_type);
checkSizeOfType();
}
void ColumnWithDictionary::insertIndex(size_t value)
void ColumnWithDictionary::Index::expandType()
{
auto current_index_type = getSizeOfCurrentIndexType();
auto insertForType = [&](auto x)
auto expand = [&](auto type)
{
using IndexType = typeof(x);
if (value <= std::numeric_limits<IndexType>::max())
using CurIndexType = decltype(type);
constexpr auto next_size = NumberTraits::nextSize(sizeof(CurIndexType));
if (next_size == sizeof(CurIndexType))
throw Exception("Can't expand indexes type for ColumnWithDictionary from type: "
+ demangle(typeid(CurIndexType).name()), ErrorCodes::LOGICAL_ERROR);
using NewIndexType = typename NumberTraits::Construct<false, false, next_size>::Type;
convertPositions<NewIndexType>();
};
callForType(std::move(expand), size_of_type);
}
UInt64 ColumnWithDictionary::Index::getMaxPositionForCurrentType() const
{
UInt64 value = 0;
callForType([&](auto type) { value = std::numeric_limits<decltype(type)>::max(); }, size_of_type);
return value;
}
void ColumnWithDictionary::Index::insertPosition(UInt64 position)
{
while (position > getMaxPositionForCurrentType())
expandType();
positions->assumeMutableRef().insert(UInt64(position));
checkSizeOfType();
}
void ColumnWithDictionary::Index::insertPositionsRange(const IColumn & column, size_t offset, size_t limit)
{
auto insertForType = [&](auto type)
{
using ColumnType = decltype(type);
const auto * column_ptr = typeid_cast<const ColumnVector<ColumnType> *>(&column);
if (!column_ptr)
return false;
if (size_of_type < sizeof(ColumnType))
convertPositions<ColumnType>();
if (size_of_type == sizeof(ColumnType))
positions->assumeMutableRef().insertRangeFrom(column, offset, limit);
else
{
if (sizeof(IndexType) > current_index_type)
convertIndexes<IndexType>();
auto copy = [&](auto cur_type)
{
using CurIndexType = decltype(cur_type);
auto & positions_data = getPositionsData<CurIndexType>();
const auto & column_data = column_ptr->getData();
getIndexes()->insert(UInt64(value));
size_t size = positions_data.size();
positions_data.resize(size + limit);
return true;
for (size_t i = 0; i < limit; ++i)
positions_data[size + i] = column_data[offset + i];
};
callForType(std::move(copy), size_of_type);
}
return false;
return true;
};
if (!insertForType(UInt8()) &&
!insertForType(UInt16()) &&
!insertForType(UInt32()) &&
!insertForType(UInt64()))
throw Exception("Unexpected indexes type for ColumnWithDictionary.", ErrorCodes::ILLEGAL_COLUMN);
throw Exception("Invalid column for ColumnWithDictionary index. Expected UInt, got " + column.getName(),
ErrorCodes::ILLEGAL_COLUMN);
checkSizeOfType();
}
void ColumnWithDictionary::insertIndexesRange(const ColumnPtr & column)
void ColumnWithDictionary::Index::check(size_t max_dictionary_size)
{
auto check = [&](auto cur_type)
{
using CurIndexType = decltype(cur_type);
auto & positions_data = getPositionsData<CurIndexType>();
for (size_t i = 0; i < positions_data.size(); ++i)
{
if (positions_data[i] >= max_dictionary_size)
{
throw Exception("Found index " + toString(positions_data[i]) + " at position " + toString(i)
+ " which is grated or equal than dictionary size " + toString(max_dictionary_size),
ErrorCodes::LOGICAL_ERROR);
}
}
};
callForType(std::move(check), size_of_type);
}
void ColumnWithDictionary::Index::checkSizeOfType()
{
if (size_of_type != getSizeOfIndexType(*positions, size_of_type))
throw Exception("Invalid size of type. Expected " + toString(8 * size_of_type) +
", but positions are " + positions->getName(), ErrorCodes::LOGICAL_ERROR);
}
ColumnWithDictionary::Dictionary::Dictionary(MutableColumnPtr && column_unique_)
: column_unique(std::move(column_unique_))
{
checkColumn(*column_unique);
}
ColumnWithDictionary::Dictionary::Dictionary(ColumnPtr column_unique_)
: column_unique(std::move(column_unique_))
{
checkColumn(*column_unique);
}
void ColumnWithDictionary::Dictionary::checkColumn(const IColumn & column)
{
if (!dynamic_cast<const IColumnUnique *>(&column))
throw Exception("ColumnUnique expected as an argument of ColumnWithDictionary.", ErrorCodes::ILLEGAL_COLUMN);
}
void ColumnWithDictionary::Dictionary::setShared(const ColumnPtr & dictionary)
{
checkColumn(*dictionary);
column_unique = dictionary;
shared = true;
}
void ColumnWithDictionary::Dictionary::compact(ColumnPtr & positions)
{
auto new_column_unique = column_unique->cloneEmpty();
auto & unique = getColumnUnique();
auto & new_unique = static_cast<IColumnUnique &>(*new_column_unique);
auto indexes = mapUniqueIndex(positions->assumeMutableRef());
auto sub_keys = unique.getNestedColumn()->index(*indexes, 0);
auto new_indexes = new_unique.uniqueInsertRangeFrom(*sub_keys, 0, sub_keys->size());
positions = (*new_indexes->index(*positions, 0)).mutate();
column_unique = std::move(new_column_unique);
shared = false;
}
}

View File

@ -18,7 +18,7 @@ class ColumnWithDictionary final : public COWPtrHelper<IColumn, ColumnWithDictio
friend class COWPtrHelper<IColumn, ColumnWithDictionary>;
ColumnWithDictionary(MutableColumnPtr && column_unique, MutableColumnPtr && indexes);
ColumnWithDictionary(const ColumnWithDictionary & other);
ColumnWithDictionary(const ColumnWithDictionary & other) = default;
public:
/** Create immutable column using immutable arguments. This arguments may be shared with other columns.
@ -37,216 +37,203 @@ public:
std::string getName() const override { return "ColumnWithDictionary"; }
const char * getFamilyName() const override { return "ColumnWithDictionary"; }
ColumnPtr convertToFullColumn() const { return getUnique()->getNestedColumn()->index(*indexes, 0); }
ColumnPtr convertToFullColumn() const { return getDictionary().getNestedColumn()->index(getIndexes(), 0); }
ColumnPtr convertToFullColumnIfWithDictionary() const override { return convertToFullColumn(); }
MutableColumnPtr cloneResized(size_t size) const override;
size_t size() const override { return indexes->size(); }
size_t size() const override { return getIndexes().size(); }
Field operator[](size_t n) const override { return (*column_unique)[indexes->getUInt(n)]; }
void get(size_t n, Field & res) const override { column_unique->get(indexes->getUInt(n), res); }
Field operator[](size_t n) const override { return getDictionary()[getIndexes().getUInt(n)]; }
void get(size_t n, Field & res) const override { getDictionary().get(getIndexes().getUInt(n), res); }
StringRef getDataAt(size_t n) const override { return column_unique->getDataAt(indexes->getUInt(n)); }
StringRef getDataAt(size_t n) const override { return getDictionary().getDataAt(getIndexes().getUInt(n)); }
StringRef getDataAtWithTerminatingZero(size_t n) const override
{
return column_unique->getDataAtWithTerminatingZero(indexes->getUInt(n));
return getDictionary().getDataAtWithTerminatingZero(getIndexes().getUInt(n));
}
UInt64 get64(size_t n) const override { return column_unique->get64(indexes->getUInt(n)); }
UInt64 getUInt(size_t n) const override { return column_unique->getUInt(indexes->getUInt(n)); }
Int64 getInt(size_t n) const override { return column_unique->getInt(indexes->getUInt(n)); }
bool isNullAt(size_t n) const override { return column_unique->isNullAt(indexes->getUInt(n)); }
UInt64 get64(size_t n) const override { return getDictionary().get64(getIndexes().getUInt(n)); }
UInt64 getUInt(size_t n) const override { return getDictionary().getUInt(getIndexes().getUInt(n)); }
Int64 getInt(size_t n) const override { return getDictionary().getInt(getIndexes().getUInt(n)); }
bool isNullAt(size_t n) const override { return getDictionary().isNullAt(getIndexes().getUInt(n)); }
ColumnPtr cut(size_t start, size_t length) const override
{
return ColumnWithDictionary::create(column_unique, indexes->cut(start, length));
return ColumnWithDictionary::create(dictionary.getColumnUniquePtr(), getIndexes().cut(start, length));
}
void insert(const Field & x) override { getIndexes()->insert(Field(UInt64(getUnique()->uniqueInsert(x)))); }
void insert(const Field & x) override;
void insertDefault() override;
void insertFromFullColumn(const IColumn & src, size_t n)
{
getIndexes()->insert(getUnique()->uniqueInsertFrom(src, n));
}
void insertFrom(const IColumn & src, size_t n) override
{
if (!typeid_cast<const ColumnWithDictionary *>(&src))
throw Exception("Expected ColumnWithDictionary, got" + src.getName(), ErrorCodes::ILLEGAL_COLUMN);
void insertFrom(const IColumn & src, size_t n) override;
void insertFromFullColumn(const IColumn & src, size_t n);
auto & src_with_dict = static_cast<const ColumnWithDictionary &>(src);
size_t idx = src_with_dict.getIndexes()->getUInt(n);
insertFromFullColumn(*src_with_dict.getUnique()->getNestedColumn(), idx);
}
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
void insertRangeFromFullColumn(const IColumn & src, size_t start, size_t length);
void insertRangeFromDictionaryEncodedColumn(const IColumn & keys, const IColumn & positions);
void insertRangeFromFullColumn(const IColumn & src, size_t start, size_t length)
{
auto inserted_indexes = getUnique()->uniqueInsertRangeFrom(src, start, length);
getIndexes()->insertRangeFrom(*inserted_indexes, 0, length);
}
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override
{
if (!typeid_cast<const ColumnWithDictionary *>(&src))
throw Exception("Expected ColumnWithDictionary, got" + src.getName(), ErrorCodes::ILLEGAL_COLUMN);
void insertData(const char * pos, size_t length) override;
void insertDataWithTerminatingZero(const char * pos, size_t length) override;
auto & src_with_dict = static_cast<const ColumnWithDictionary &>(src);
/// TODO: Support native insertion from other unique column. It will help to avoid null map creation.
auto src_nested = src_with_dict.getUnique()->getNestedColumn();
auto inserted_idx = getUnique()->uniqueInsertRangeFrom(*src_nested, 0, src_nested->size());
auto idx = inserted_idx->index(*src_with_dict.getIndexes()->cut(start, length), 0);
getIndexes()->insertRangeFrom(*idx, 0, length);
}
void insertData(const char * pos, size_t length) override
{
getIndexes()->insert(Field(UInt64(getUnique()->uniqueInsertData(pos, length))));
}
void popBack(size_t n) override { idx.popBack(n); }
void insertDataWithTerminatingZero(const char * pos, size_t length) override
{
getIndexes()->insert(Field(UInt64(getUnique()->uniqueInsertDataWithTerminatingZero(pos, length))));
}
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
void insertDefault() override
{
getIndexes()->insert(getUnique()->getDefaultValueIndex());
}
void popBack(size_t n) override { getIndexes()->popBack(n); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override
{
return getUnique()->serializeValueIntoArena(indexes->getUInt(n), arena, begin);
}
const char * deserializeAndInsertFromArena(const char * pos) override
{
const char * new_pos;
getIndexes()->insert(getUnique()->uniqueDeserializeAndInsertFromArena(pos, new_pos));
return new_pos;
}
const char * deserializeAndInsertFromArena(const char * pos) override;
void updateHashWithValue(size_t n, SipHash & hash) const override
{
return getUnique()->updateHashWithValue(indexes->getUInt(n), hash);
return getDictionary().updateHashWithValue(getIndexes().getUInt(n), hash);
}
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override
{
return ColumnWithDictionary::create(column_unique, indexes->filter(filt, result_size_hint));
return ColumnWithDictionary::create(dictionary.getColumnUniquePtr(), getIndexes().filter(filt, result_size_hint));
}
ColumnPtr permute(const Permutation & perm, size_t limit) const override
{
return ColumnWithDictionary::create(column_unique, indexes->permute(perm, limit));
return ColumnWithDictionary::create(dictionary.getColumnUniquePtr(), getIndexes().permute(perm, limit));
}
ColumnPtr index(const IColumn & indexes_, size_t limit) const override
{
return ColumnWithDictionary::create(column_unique, indexes->index(indexes_, limit));
return ColumnWithDictionary::create(dictionary.getColumnUniquePtr(), getIndexes().index(indexes_, limit));
}
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override
{
const auto & column_with_dictionary = static_cast<const ColumnWithDictionary &>(rhs);
size_t n_index = indexes->getUInt(n);
size_t m_index = column_with_dictionary.indexes->getUInt(m);
return getUnique()->compareAt(n_index, m_index, *column_with_dictionary.column_unique, nan_direction_hint);
}
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override;
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override
{
if (limit == 0)
limit = size();
size_t unique_limit = std::min(limit, getUnique()->size());
Permutation unique_perm;
getUnique()->getNestedColumn()->getPermutation(reverse, unique_limit, nan_direction_hint, unique_perm);
/// TODO: optimize with sse.
/// Get indexes per row in column_unique.
std::vector<std::vector<size_t>> indexes_per_row(getUnique()->size());
size_t indexes_size = indexes->size();
for (size_t row = 0; row < indexes_size; ++row)
indexes_per_row[indexes->getUInt(row)].push_back(row);
/// Replicate permutation.
size_t perm_size = std::min(indexes_size, limit);
res.resize(perm_size);
size_t perm_index = 0;
for (size_t row = 0; row < indexes_size && perm_index < perm_size; ++row)
{
const auto & row_indexes = indexes_per_row[unique_perm[row]];
for (auto row_index : row_indexes)
{
res[perm_index] = row_index;
++perm_index;
if (perm_index == perm_size)
break;
}
}
}
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
ColumnPtr replicate(const Offsets & offsets) const override
{
return ColumnWithDictionary::create(column_unique, indexes->replicate(offsets));
return ColumnWithDictionary::create(dictionary.getColumnUniquePtr(), getIndexes().replicate(offsets));
}
std::vector<MutableColumnPtr> scatter(ColumnIndex num_columns, const Selector & selector) const override
{
auto columns = indexes->scatter(num_columns, selector);
for (auto & column : columns)
{
auto unique_ptr = column_unique;
column = ColumnWithDictionary::create((*std::move(unique_ptr)).mutate(), std::move(column));
}
return columns;
}
std::vector<MutableColumnPtr> scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override ;
void getExtremes(Field & min, Field & max) const override { return column_unique->getExtremes(min, max); }
void getExtremes(Field & min, Field & max) const override {
return getDictionary().index(getIndexes(), 0)->getExtremes(min, max); /// TODO: optimize
}
void reserve(size_t n) override { getIndexes()->reserve(n); }
void reserve(size_t n) override { idx.reserve(n); }
size_t byteSize() const override { return indexes->byteSize() + column_unique->byteSize(); }
size_t allocatedBytes() const override { return indexes->allocatedBytes() + column_unique->allocatedBytes(); }
size_t byteSize() const override { return idx.getPositions()->byteSize() + getDictionary().byteSize(); }
size_t allocatedBytes() const override { return idx.getPositions()->allocatedBytes() + getDictionary().allocatedBytes(); }
void forEachSubcolumn(ColumnCallback callback) override
{
callback(column_unique);
callback(indexes);
callback(idx.getPositionsPtr());
/// Column doesn't own dictionary if it's shared.
if (!dictionary.isShared())
callback(dictionary.getColumnUniquePtr());
}
bool valuesHaveFixedSize() const override { return column_unique->valuesHaveFixedSize(); }
bool isFixedAndContiguous() const override { return column_unique->isFixedAndContiguous(); }
size_t sizeOfValueIfFixed() const override { return column_unique->sizeOfValueIfFixed(); }
bool isNumeric() const override { return column_unique->isNumeric(); }
bool valuesHaveFixedSize() const override { return getDictionary().valuesHaveFixedSize(); }
bool isFixedAndContiguous() const override { return getDictionary().isFixedAndContiguous(); }
size_t sizeOfValueIfFixed() const override { return getDictionary().sizeOfValueIfFixed(); }
bool isNumeric() const override { return getDictionary().isNumeric(); }
IColumnUnique * getUnique() { return static_cast<IColumnUnique *>(column_unique->assumeMutable().get()); }
const IColumnUnique * getUnique() const { return static_cast<const IColumnUnique *>(column_unique->assumeMutable().get()); }
ColumnPtr getUniquePtr() const { return column_unique; }
const IColumnUnique & getDictionary() const { return dictionary.getColumnUnique(); }
/// IColumnUnique & getUnique() { return static_cast<IColumnUnique &>(*column_unique->assumeMutable()); }
/// ColumnPtr getUniquePtr() const { return column_unique; }
IColumn * getIndexes() { return indexes->assumeMutable().get(); }
const IColumn * getIndexes() const { return indexes.get(); }
const ColumnPtr & getIndexesPtr() const { return indexes; }
/// IColumn & getIndexes() { return idx.getPositions()->assumeMutableRef(); }
const IColumn & getIndexes() const { return *idx.getPositions(); }
const ColumnPtr & getIndexesPtr() const { return idx.getPositions(); }
void setIndexes(MutableColumnPtr && indexes_) { indexes = std::move(indexes_); }
void setUnique(const ColumnPtr & unique) { column_unique = unique; }
///void setIndexes(MutableColumnPtr && indexes_) { indexes = std::move(indexes_); }
/// Set shared ColumnUnique for empty column with dictionary.
void setSharedDictionary(const ColumnPtr & column_unique);
/// Create column new dictionary with only keys that are mentioned in index.
MutablePtr compact();
/// Cut + compact.
MutablePtr cutAndCompact(size_t start, size_t length) const;
bool withDictionary() const override { return true; }
class Index
{
public:
Index();
Index(const Index & other) = default;
explicit Index(MutableColumnPtr && positions);
explicit Index(ColumnPtr positions);
const ColumnPtr & getPositions() const { return positions; }
ColumnPtr & getPositionsPtr() { return positions; }
void insertPosition(UInt64 position);
void insertPositionsRange(const IColumn & column, size_t offset, size_t limit);
void popBack(size_t n) { positions->assumeMutableRef().popBack(n); }
void reserve(size_t n) { positions->assumeMutableRef().reserve(n); }
UInt64 getMaxPositionForCurrentType() const;
static size_t getSizeOfIndexType(const IColumn & column, size_t hint);
void check(size_t max_dictionary_size);
void checkSizeOfType();
ColumnPtr detachPositions() { return std::move(positions); }
void attachPositions(ColumnPtr positions_);
private:
ColumnPtr positions;
size_t size_of_type = 0;
void updateSizeOfType() { size_of_type = getSizeOfIndexType(*positions, size_of_type); }
void expandType();
template <typename IndexType>
typename ColumnVector<IndexType>::Container & getPositionsData();
template <typename IndexType>
void convertPositions();
template <typename Callback>
static void callForType(Callback && callback, size_t size_of_type);
};
private:
ColumnPtr column_unique;
ColumnPtr indexes;
class Dictionary
{
public:
Dictionary(const Dictionary & other) = default;
explicit Dictionary(MutableColumnPtr && column_unique);
explicit Dictionary(ColumnPtr column_unique);
size_t getSizeOfCurrentIndexType() const;
const ColumnPtr & getColumnUniquePtr() const { return column_unique; }
ColumnPtr & getColumnUniquePtr() { return column_unique; }
template <typename IndexType>
void convertIndexes();
void insertIndex(size_t value);
void insertIndexesRange(const ColumnPtr & column);
const IColumnUnique & getColumnUnique() const { return static_cast<const IColumnUnique &>(*column_unique); }
IColumnUnique & getColumnUnique() { return static_cast<IColumnUnique &>(column_unique->assumeMutableRef()); }
/// Dictionary may be shared for several mutable columns.
/// Immutable columns may have the same column unique, which isn't necessarily shared dictionary.
void setShared(const ColumnPtr & dictionary);
bool isShared() const { return shared; }
/// Create new dictionary with only keys that are mentioned in positions.
void compact(ColumnPtr & positions);
private:
ColumnPtr column_unique;
bool shared = false;
void checkColumn(const IColumn & column);
};
Dictionary dictionary;
Index idx;
void compactInplace();
void compactIfSharedDictionary();
};

View File

@ -12,7 +12,7 @@ public:
/// Column always contains Null if it's Nullable and empty string if it's String or Nullable(String).
/// So, size may be greater than the number of inserted unique values.
virtual ColumnPtr getNestedColumn() const = 0;
virtual const ColumnPtr & getNestedColumn() const = 0;
/// The same as getNestedColumn, but removes null map if nested column is nullable.
virtual const ColumnPtr & getNestedNotNullableColumn() const = 0;
@ -50,21 +50,6 @@ public:
virtual size_t uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) = 0;
/// Column which contains the set of necessary for serialization keys. Such that empty column after
/// uniqueInsertRangeFrom(column->cut(offset, limit), 0, limit) call will contain the same set of keys.
struct SerializableState
{
ColumnPtr column;
size_t offset;
size_t limit;
};
virtual SerializableState getSerializableState() const = 0;
// virtual MutableColumnPtr getInsertionPoints(const ColumnPtr & keys) const = 0;
//
// virtual bool has(const char * pos, size_t length) const { return getInsertionPoint(pos, length) != size(); }
const char * getFamilyName() const override { return "ColumnUnique"; }
void insert(const Field &) override

View File

@ -34,13 +34,9 @@ namespace
}
}
DataTypeWithDictionary::DataTypeWithDictionary(DataTypePtr dictionary_type_, DataTypePtr indexes_type_)
: dictionary_type(std::move(dictionary_type_)), indexes_type(std::move(indexes_type_))
DataTypeWithDictionary::DataTypeWithDictionary(DataTypePtr dictionary_type_)
: dictionary_type(std::move(dictionary_type_))
{
if (!indexes_type->isUnsignedInteger())
throw Exception("Index type of DataTypeWithDictionary must be unsigned integer, but got "
+ indexes_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto inner_type = dictionary_type;
if (dictionary_type->isNullable())
inner_type = static_cast<const DataTypeNullable &>(*dictionary_type).getNestedType();
@ -57,7 +53,7 @@ void DataTypeWithDictionary::enumerateStreams(const StreamCallback & callback, S
path.push_back(Substream::DictionaryKeys);
dictionary_type->enumerateStreams(callback, path);
path.back() = Substream::DictionaryIndexes;
indexes_type->enumerateStreams(callback, path);
callback(path);
path.pop_back();
}
@ -147,20 +143,20 @@ struct IndexesSerializationType
type = static_cast<Type>(resetFlags(val));
}
IndexesSerializationType(const IDataType & data_type, bool has_additional_keys, bool need_global_dictionary)
IndexesSerializationType(const IColumn & column, bool has_additional_keys, bool need_global_dictionary)
: has_additional_keys(has_additional_keys), need_global_dictionary(need_global_dictionary)
{
if (typeid_cast<const DataTypeUInt8 *>(&data_type))
if (typeid_cast<const ColumnUInt8 *>(&column))
type = TUInt8;
else if (typeid_cast<const DataTypeUInt16 *>(&data_type))
else if (typeid_cast<const ColumnUInt16 *>(&column))
type = TUInt16;
else if (typeid_cast<const DataTypeUInt32 *>(&data_type))
else if (typeid_cast<const ColumnUInt32 *>(&column))
type = TUInt32;
else if (typeid_cast<const DataTypeUInt64 *>(&data_type))
else if (typeid_cast<const ColumnUInt64 *>(&column))
type = TUInt64;
else
throw Exception("Invalid DataType for IndexesSerializationType. Expected UInt*, got " + data_type.getName(),
ErrorCodes::LOGICAL_ERROR);
throw Exception("Invalid Indexes column for IndexesSerializationType. Expected ColumnUInt*, got "
+ column.getName(), ErrorCodes::LOGICAL_ERROR);
}
DataTypePtr getDataType() const
@ -196,10 +192,9 @@ struct DeserializeStateWithDictionary : public IDataType::DeserializeBinaryBulkS
{
KeysSerializationVersion key_version;
ColumnUniquePtr global_dictionary;
UInt64 num_bytes_in_dictionary;
IndexesSerializationType index_type;
MutableColumnPtr additional_keys;
ColumnPtr additional_keys;
UInt64 num_pending_rows = 0;
explicit DeserializeStateWithDictionary(UInt64 key_version) : key_version(key_version) {}
@ -252,7 +247,7 @@ void DataTypeWithDictionary::serializeBinaryBulkStatePrefix(
writeIntBinary(key_version, *stream);
auto column_unique = createColumnUnique(*dictionary_type, *indexes_type);
auto column_unique = createColumnUnique(*dictionary_type);
state = std::make_shared<SerializeStateWithDictionary>(key_version, std::move(column_unique));
}
@ -263,24 +258,21 @@ void DataTypeWithDictionary::serializeBinaryBulkStateSuffix(
auto * state_with_dictionary = checkAndGetWithDictionarySerializeState(state);
KeysSerializationVersion::checkVersion(state_with_dictionary->key_version.value);
if (state_with_dictionary->global_dictionary)
if (state_with_dictionary->global_dictionary && settings.max_dictionary_size)
{
auto unique_state = state_with_dictionary->global_dictionary->getSerializableState();
UInt64 num_keys = unique_state.limit;
if (settings.max_dictionary_size)
{
settings.path.push_back(Substream::DictionaryKeys);
auto * stream = settings.getter(settings.path);
settings.path.pop_back();
auto nested_column = state_with_dictionary->global_dictionary->getNestedNotNullableColumn();
if (!stream)
throw Exception("Got empty stream in DataTypeWithDictionary::serializeBinaryBulkStateSuffix",
ErrorCodes::LOGICAL_ERROR);
settings.path.push_back(Substream::DictionaryKeys);
auto * stream = settings.getter(settings.path);
settings.path.pop_back();
writeIntBinary(num_keys, *stream);
removeNullable(dictionary_type)->serializeBinaryBulk(*unique_state.column, *stream,
unique_state.offset, unique_state.limit);
}
if (!stream)
throw Exception("Got empty stream in DataTypeWithDictionary::serializeBinaryBulkStateSuffix",
ErrorCodes::LOGICAL_ERROR);
UInt64 num_keys = nested_column->size();
writeIntBinary(num_keys, *stream);
removeNullable(dictionary_type)->serializeBinaryBulk(*nested_column, *stream, 0, num_keys);
}
}
@ -314,79 +306,76 @@ namespace
return nullptr;
}
template <typename T>
MutableColumnPtr mapUniqueIndexImpl(PaddedPODArray<T> & index)
struct IndexMapsWithAdditionalKeys
{
HashMap<T, T> hash_map;
for (auto val : index)
hash_map.insert({val, hash_map.size()});
auto res_col = ColumnVector<T>::create();
auto & data = res_col->getData();
data.resize(hash_map.size());
for (auto val : hash_map)
data[val.second] = val.first;
for (auto & ind : index)
ind = hash_map[ind];
return std::move(res_col);
}
/// Returns unique values of column. Write new index to column.
MutableColumnPtr mapUniqueIndex(IColumn & column)
{
if (auto * data_uint8 = getIndexesData<UInt8>(column))
return mapUniqueIndexImpl(*data_uint8);
else if (auto * data_uint16 = getIndexesData<UInt16>(column))
return mapUniqueIndexImpl(*data_uint16);
else if (auto * data_uint32 = getIndexesData<UInt32>(column))
return mapUniqueIndexImpl(*data_uint32);
else if (auto * data_uint64 = getIndexesData<UInt64>(column))
return mapUniqueIndexImpl(*data_uint64);
else
throw Exception("Indexes column for getUniqueIndex must be ColumnUInt, got" + column.getName(),
ErrorCodes::LOGICAL_ERROR);
}
MutableColumnPtr dictionary_map;
MutableColumnPtr additional_keys_map;
};
template <typename T>
MutableColumnPtr mapIndexWithOverflow(PaddedPODArray<T> & index, size_t max_val)
IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeys(PaddedPODArray<T> & index, size_t dict_size)
{
HashMap<T, T> hash_map;
PaddedPODArray<T> copy(index.cbegin(), index.cend());
HashMap<T, T> dict_map;
HashMap<T, T> add_keys_map;
for (auto val : index)
{
if (val < max_val)
hash_map.insert({val, hash_map.size()});
if (val < dict_size)
dict_map.insert({val, dict_map.size()});
else
add_keys_map.insert({val, add_keys_map.size()});
}
auto index_map_col = ColumnVector<T>::create();
auto & index_data = index_map_col->getData();
auto dictionary_map = ColumnVector<T>::create(dict_map.size());
auto additional_keys_map = ColumnVector<T>::create(add_keys_map.size());
auto & dict_data = dictionary_map->getData();
auto & add_keys_data = additional_keys_map->getData();
index_data.resize(hash_map.size());
for (auto val : hash_map)
index_data[val.second] = val.first;
for (auto val : dict_map)
dict_data[val.second] = val.first;
for (auto val : add_keys_map)
add_keys_data[val.second] = val.first - dict_size;
for (auto & val : index)
val = val < max_val ? hash_map[val]
: val - max_val + hash_map.size();
val = val < dict_size ? dict_map[val]
: add_keys_map[val] + dict_map.size();
return index_map_col;
for (size_t i = 0; i < index.size(); ++i)
{
T expected = index[i] < dict_data.size() ? dict_data[index[i]]
: add_keys_data[index[i] - dict_data.size()] + dict_size;
if (expected != copy[i])
throw Exception("Expected " + toString(expected) + ", but got " + toString(copy[i]), ErrorCodes::LOGICAL_ERROR);
}
return {std::move(dictionary_map), std::move(additional_keys_map)};
}
MutableColumnPtr mapIndexWithOverflow(IColumn & column, size_t max_size)
/// Update column and return map with old indexes.
/// Let N is the number of distinct values which are less than max_size;
/// old_column - column before function call;
/// new_column - column after function call;
/// map - function result (map.size() is N):
/// * if old_column[i] < max_size, than
/// map[new_column[i]] = old_column[i]
/// * else
/// new_column[i] = old_column[i] - max_size + N
IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeys(IColumn & column, size_t dict_size)
{
if (auto * data_uint8 = getIndexesData<UInt8>(column))
return mapIndexWithOverflow(*data_uint8, max_size);
return mapIndexWithAdditionalKeys(*data_uint8, dict_size);
else if (auto * data_uint16 = getIndexesData<UInt16>(column))
return mapIndexWithOverflow(*data_uint16, max_size);
return mapIndexWithAdditionalKeys(*data_uint16, dict_size);
else if (auto * data_uint32 = getIndexesData<UInt32>(column))
return mapIndexWithOverflow(*data_uint32, max_size);
return mapIndexWithAdditionalKeys(*data_uint32, dict_size);
else if (auto * data_uint64 = getIndexesData<UInt64>(column))
return mapIndexWithOverflow(*data_uint64, max_size);
return mapIndexWithAdditionalKeys(*data_uint64, dict_size);
else
throw Exception("Indexes column for makeIndexWithOverflow must be ColumnUInt, got" + column.getName(),
throw Exception("Indexes column for mapIndexWithAdditionalKeys must be UInt, got" + column.getName(),
ErrorCodes::LOGICAL_ERROR);
}
}
@ -419,57 +408,65 @@ void DataTypeWithDictionary::serializeBinaryBulkWithMultipleStreams(
auto & global_dictionary = state_with_dictionary->global_dictionary;
KeysSerializationVersion::checkVersion(state_with_dictionary->key_version.value);
auto unique_state = global_dictionary->getSerializableState();
bool was_global_dictionary_written = unique_state.limit >= settings.max_dictionary_size;
const auto & indexes = column_with_dictionary.getIndexesPtr();
const auto & keys = column_with_dictionary.getUnique()->getSerializableState().column;
size_t max_limit = column.size() - offset;
limit = limit ? std::min(limit, max_limit) : max_limit;
/// Create pair (used_keys, sub_index) which is the dictionary for [offset, offset + limit) range.
MutableColumnPtr sub_index = (*indexes->cut(offset, limit)).mutate();
auto unique_indexes = mapUniqueIndex(*sub_index);
/// unique_indexes->index(*sub_index) == indexes[offset:offset + limit]
MutableColumnPtr used_keys = (*keys->index(*unique_indexes, 0)).mutate();
auto sub_column = column_with_dictionary.cutAndCompact(offset, limit);
ColumnPtr positions = sub_column->getIndexesPtr();
ColumnPtr keys = sub_column->getDictionary().getNestedColumn();
if (settings.max_dictionary_size)
{
/// Insert used_keys into global dictionary and update sub_index.
auto indexes_with_overflow = global_dictionary->uniqueInsertRangeWithOverflow(*used_keys, 0, used_keys->size(),
auto indexes_with_overflow = global_dictionary->uniqueInsertRangeWithOverflow(*keys, 0, keys->size(),
settings.max_dictionary_size);
sub_index = (*indexes_with_overflow.indexes->index(*sub_index, 0)).mutate();
used_keys = std::move(indexes_with_overflow.overflowed_keys);
size_t max_size = settings.max_dictionary_size + indexes_with_overflow.overflowed_keys->size();
ColumnWithDictionary::Index(indexes_with_overflow.indexes->getPtr()).check(max_size);
if (global_dictionary->size() > settings.max_dictionary_size)
throw Exception("Got dictionary with size " + toString(global_dictionary->size()) +
" but max dictionary size is " + toString(settings.max_dictionary_size),
ErrorCodes::LOGICAL_ERROR);
positions = indexes_with_overflow.indexes->index(*positions, 0);
keys = std::move(indexes_with_overflow.overflowed_keys);
if (global_dictionary->size() < settings.max_dictionary_size && !keys->empty())
throw Exception("Has additional keys, but dict size is " + toString(global_dictionary->size()) +
" which is less then max dictionary size (" + toString(settings.max_dictionary_size) + ")",
ErrorCodes::LOGICAL_ERROR);
}
bool need_additional_keys = !used_keys->empty();
if (auto nullable_keys = typeid_cast<const ColumnNullable *>(keys.get()))
keys = nullable_keys->getNestedColumnPtr();
bool need_additional_keys = !keys->empty();
bool need_dictionary = settings.max_dictionary_size != 0;
bool need_write_dictionary = !was_global_dictionary_written && unique_state.limit >= settings.max_dictionary_size;
bool need_write_dictionary = !settings.use_single_dictionary_for_part
&& global_dictionary->size() >= settings.max_dictionary_size;
IndexesSerializationType index_version(*indexes_type, need_additional_keys, need_dictionary);
IndexesSerializationType index_version(*positions, need_additional_keys, need_dictionary);
index_version.serialize(*indexes_stream);
unique_state = global_dictionary->getSerializableState();
if (need_write_dictionary)
{
/// Write global dictionary if it wasn't written and has too many keys.
UInt64 num_keys = unique_state.limit;
const auto & nested_column = global_dictionary->getNestedNotNullableColumn();
UInt64 num_keys = nested_column->size();
writeIntBinary(num_keys, *keys_stream);
removeNullable(dictionary_type)->serializeBinaryBulk(*unique_state.column, *keys_stream, unique_state.offset, num_keys);
removeNullable(dictionary_type)->serializeBinaryBulk(*nested_column, *keys_stream, 0, num_keys);
state_with_dictionary->global_dictionary = createColumnUnique(*dictionary_type);
}
if (need_additional_keys)
{
UInt64 num_keys = used_keys->size();
UInt64 num_keys = keys->size();
writeIntBinary(num_keys, *indexes_stream);
removeNullable(dictionary_type)->serializeBinaryBulk(*used_keys, *indexes_stream, 0, num_keys);
removeNullable(dictionary_type)->serializeBinaryBulk(*keys, *indexes_stream, 0, num_keys);
}
UInt64 num_rows = sub_index->size();
UInt64 num_rows = positions->size();
writeIntBinary(num_rows, *indexes_stream);
indexes_type->serializeBinaryBulk(*sub_index, *indexes_stream, 0, num_rows);
index_version.getDataType()->serializeBinaryBulk(*positions, *indexes_stream, 0, num_rows);
}
void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams(
@ -507,8 +504,7 @@ void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams(
auto global_dict_keys = keys_type->createColumn();
keys_type->deserializeBinaryBulk(*global_dict_keys, *keys_stream, num_keys, 0);
auto column_unique = createColumnUnique(*dictionary_type, *indexes_type);
column_unique->uniqueInsertRangeFrom(*global_dict_keys, 0, num_keys);
auto column_unique = createColumnUnique(*dictionary_type, std::move(global_dict_keys));
state_with_dictionary->global_dictionary = std::move(column_unique);
};
@ -517,61 +513,60 @@ void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams(
UInt64 num_keys;
readIntBinary(num_keys, *indexes_stream);
auto keys_type = removeNullable(dictionary_type);
state_with_dictionary->additional_keys = keys_type->createColumn();
keys_type->deserializeBinaryBulk(*state_with_dictionary->additional_keys, *indexes_stream, num_keys, 0);
auto additional_keys = keys_type->createColumn();
keys_type->deserializeBinaryBulk(*additional_keys, *indexes_stream, num_keys, 0);
state_with_dictionary->additional_keys = std::move(additional_keys);
};
auto readIndexes = [this, state_with_dictionary, indexes_stream, &column_with_dictionary](UInt64 num_rows,
bool need_dictionary)
auto readIndexes = [this, state_with_dictionary, indexes_stream, &column_with_dictionary](UInt64 num_rows)
{
auto indexes_type = state_with_dictionary->index_type.getDataType();
MutableColumnPtr indexes_column = indexes_type->createColumn();
indexes_type->deserializeBinaryBulk(*indexes_column, *indexes_stream, num_rows, 0);
auto & global_dictionary = state_with_dictionary->global_dictionary;
const auto & additional_keys = state_with_dictionary->additional_keys;
auto * column_unique = column_with_dictionary.getUnique();
bool has_additional_keys = state_with_dictionary->additional_keys != nullptr;
bool has_additional_keys = state_with_dictionary->index_type.has_additional_keys;
bool column_is_empty = column_with_dictionary.empty();
bool column_with_global_dictionary = column_unique == global_dictionary.get();
if (!has_additional_keys && (column_is_empty || column_with_global_dictionary))
if (!state_with_dictionary->index_type.need_global_dictionary)
{
column_with_dictionary.insertRangeFromDictionaryEncodedColumn(*additional_keys, *indexes_column);
}
else if (!has_additional_keys)
{
if (column_is_empty)
column_with_dictionary.setUnique(global_dictionary);
column_with_dictionary.setSharedDictionary(global_dictionary);
column_with_dictionary.getIndexes()->insertRangeFrom(*indexes_column, 0, num_rows);
}
else if (!need_dictionary)
{
auto indexes = column_unique->uniqueInsertRangeFrom(*additional_keys, 0, additional_keys->size());
column_with_dictionary.getIndexes()->insertRangeFrom(*indexes->index(*indexes_column, 0), 0, num_rows);
auto local_column = ColumnWithDictionary::create(global_dictionary, std::move(indexes_column));
column_with_dictionary.insertRangeFrom(*local_column, 0, num_rows);
}
else
{
if (column_with_global_dictionary)
auto maps = mapIndexWithAdditionalKeys(*indexes_column, global_dictionary->size());
ColumnWithDictionary::Index(maps.additional_keys_map->getPtr()).check(additional_keys->size());
ColumnWithDictionary::Index(indexes_column->getPtr()).check(
maps.dictionary_map->size() + maps.additional_keys_map->size());
auto used_keys = (*std::move(global_dictionary->getNestedColumn()->index(*maps.dictionary_map, 0))).mutate();
if (!maps.additional_keys_map->empty())
{
auto unique_indexes = mapUniqueIndex(*column_with_dictionary.getIndexes());
auto sub_keys = column_with_dictionary.getUnique()->getNestedColumn()->index(*unique_indexes, 0);
auto new_unique = createColumnUnique(*dictionary_type, *indexes_type);
auto new_idx = new_unique->uniqueInsertRangeFrom(*sub_keys, 0, sub_keys->size());
column_with_dictionary.setUnique(std::move(new_unique));
column_with_dictionary.setIndexes((*(new_idx->index(*column_with_dictionary.getIndexes(), 0))).mutate());
column_unique = column_with_dictionary.getUnique();
auto used_add_keys = additional_keys->index(*maps.additional_keys_map, 0);
if (dictionary_type->isNullable())
{
ColumnPtr null_map = ColumnUInt8::create(used_add_keys->size(), 0);
used_add_keys = ColumnNullable::create(used_add_keys, null_map);
}
used_keys->insertRangeFrom(*used_add_keys, 0, used_add_keys->size());
}
auto index_map = mapIndexWithOverflow(*indexes_column, global_dictionary->size());
auto used_keys = global_dictionary->getNestedColumn()->index(*index_map, 0);
auto indexes = column_unique->uniqueInsertRangeFrom(*used_keys, 0, used_keys->size());
if (additional_keys)
{
size_t num_keys = additional_keys->size();
auto additional_indexes = column_unique->uniqueInsertRangeFrom(*additional_keys, 0, num_keys);
indexes->insertRangeFrom(*additional_indexes, 0, num_keys);
}
column_with_dictionary.getIndexes()->insertRangeFrom(*indexes->index(*indexes_column, 0), 0, num_rows);
column_with_dictionary.insertRangeFromDictionaryEncodedColumn(*used_keys, *indexes_column);
}
};
@ -596,7 +591,7 @@ void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams(
}
size_t num_rows_to_read = std::min(limit, state_with_dictionary->num_pending_rows);
readIndexes(num_rows_to_read, state_with_dictionary->index_type.need_global_dictionary);
readIndexes(num_rows_to_read);
limit -= num_rows_to_read;
state_with_dictionary->num_pending_rows -= num_rows_to_read;
}
@ -617,8 +612,8 @@ void DataTypeWithDictionary::serializeImpl(
DataTypeWithDictionary::SerealizeFunctionPtr<Args ...> func, Args & ... args) const
{
auto & column_with_dictionary = getColumnWithDictionary(column);
size_t unique_row_number = column_with_dictionary.getIndexes()->getUInt(row_num);
(dictionary_type.get()->*func)(*column_with_dictionary.getUnique()->getNestedColumn(), unique_row_number, ostr, std::forward<Args>(args)...);
size_t unique_row_number = column_with_dictionary.getIndexes().getUInt(row_num);
(dictionary_type.get()->*func)(*column_with_dictionary.getDictionary().getNestedColumn(), unique_row_number, ostr, std::forward<Args>(args)...);
}
template <typename ... Args>
@ -627,77 +622,56 @@ void DataTypeWithDictionary::deserializeImpl(
DataTypeWithDictionary::DeserealizeFunctionPtr<Args ...> func, Args & ... args) const
{
auto & column_with_dictionary = getColumnWithDictionary(column);
auto temp_column = column_with_dictionary.getUnique()->cloneEmpty();
auto temp_column = column_with_dictionary.getDictionary().cloneEmpty();
(dictionary_type.get()->*func)(*temp_column, istr, std::forward<Args>(args)...);
column_with_dictionary.insertFromFullColumn(*temp_column, 0);
}
template <typename ColumnType, typename IndexType>
MutableColumnUniquePtr DataTypeWithDictionary::createColumnUniqueImpl(const IDataType & keys_type)
namespace
{
return ColumnUnique<ColumnType, IndexType>::create(keys_type);
template <typename Creator>
struct CreateColumnVector
{
MutableColumnUniquePtr & column;
const IDataType & keys_type;
const Creator & creator;
CreateColumnVector(MutableColumnUniquePtr & column, const IDataType & keys_type, const Creator & creator)
: column(column), keys_type(keys_type), creator(creator)
{
}
template <typename T, size_t>
void operator()()
{
if (typeid_cast<const DataTypeNumber<T> *>(&keys_type))
column = creator((ColumnVector<T> *)(nullptr));
}
};
}
template <typename ColumnType>
template <typename Creator>
MutableColumnUniquePtr DataTypeWithDictionary::createColumnUniqueImpl(const IDataType & keys_type,
const IDataType & indexes_type)
{
if (typeid_cast<const DataTypeUInt8 *>(&indexes_type))
return createColumnUniqueImpl<ColumnType, UInt8>(keys_type);
if (typeid_cast<const DataTypeUInt16 *>(&indexes_type))
return createColumnUniqueImpl<ColumnType, UInt16>(keys_type);
if (typeid_cast<const DataTypeUInt32 *>(&indexes_type))
return createColumnUniqueImpl<ColumnType, UInt32>(keys_type);
if (typeid_cast<const DataTypeUInt64 *>(&indexes_type))
return createColumnUniqueImpl<ColumnType, UInt64>(keys_type);
throw Exception("The type of indexes must be unsigned integer, but got " + indexes_type.getName(),
ErrorCodes::LOGICAL_ERROR);
}
struct CreateColumnVector
{
MutableColumnUniquePtr & column;
const IDataType & keys_type;
const IDataType & indexes_type;
const IDataType * nested_type;
CreateColumnVector(MutableColumnUniquePtr & column, const IDataType & keys_type, const IDataType & indexes_type)
: column(column), keys_type(keys_type), indexes_type(indexes_type), nested_type(&keys_type)
{
if (auto nullable_type = typeid_cast<const DataTypeNullable *>(&keys_type))
nested_type = nullable_type->getNestedType().get();
}
template <typename T, size_t>
void operator()()
{
if (typeid_cast<const DataTypeNumber<T> *>(nested_type))
column = DataTypeWithDictionary::createColumnUniqueImpl<ColumnVector<T>>(keys_type, indexes_type);
}
};
MutableColumnUniquePtr DataTypeWithDictionary::createColumnUnique(const IDataType & keys_type,
const IDataType & indexes_type)
const Creator & creator)
{
auto * type = &keys_type;
if (type->isNullable())
type = static_cast<const DataTypeNullable &>(keys_type).getNestedType().get();
if (auto * nullable_type = typeid_cast<const DataTypeNullable *>(&keys_type))
type = nullable_type->getNestedType().get();
if (type->isString())
return createColumnUniqueImpl<ColumnString>(keys_type, indexes_type);
return creator((ColumnString *)(nullptr));
if (type->isFixedString())
return createColumnUniqueImpl<ColumnFixedString>(keys_type, indexes_type);
return creator((ColumnFixedString *)(nullptr));
if (typeid_cast<const DataTypeDate *>(type))
return createColumnUniqueImpl<ColumnVector<UInt16>>(keys_type, indexes_type);
return creator((ColumnVector<UInt16> *)(nullptr));
if (typeid_cast<const DataTypeDateTime *>(type))
return createColumnUniqueImpl<ColumnVector<UInt32>>(keys_type, indexes_type);
return creator((ColumnVector<UInt32> *)(nullptr));
if (type->isNumber())
{
MutableColumnUniquePtr column;
TypeListNumbers::forEach(CreateColumnVector(column, keys_type, indexes_type));
TypeListNumbers::forEach(CreateColumnVector(column, *type, creator));
if (!column)
throw Exception("Unexpected numeric type: " + type->getName(), ErrorCodes::LOGICAL_ERROR);
@ -709,10 +683,31 @@ MutableColumnUniquePtr DataTypeWithDictionary::createColumnUnique(const IDataTyp
ErrorCodes::LOGICAL_ERROR);
}
MutableColumnUniquePtr DataTypeWithDictionary::createColumnUnique(const IDataType & keys_type)
{
auto creator = [&](auto x)
{
using ColumnType = typename std::remove_pointer<decltype(x)>::type;
return ColumnUnique<ColumnType>::create(keys_type);
};
return createColumnUniqueImpl(keys_type, creator);
}
MutableColumnUniquePtr DataTypeWithDictionary::createColumnUnique(const IDataType & keys_type, MutableColumnPtr && keys)
{
auto creator = [&](auto x)
{
using ColumnType = typename std::remove_pointer<decltype(x)>::type;
return ColumnUnique<ColumnType>::create(std::move(keys), keys_type.isNullable());
};
return createColumnUniqueImpl(keys_type, creator);
}
MutableColumnPtr DataTypeWithDictionary::createColumn() const
{
MutableColumnPtr indexes = indexes_type->createColumn();
MutableColumnPtr dictionary = createColumnUnique(*dictionary_type, *indexes_type);
MutableColumnPtr indexes = DataTypeUInt8().createColumn();
MutableColumnPtr dictionary = createColumnUnique(*dictionary_type);
return ColumnWithDictionary::create(std::move(dictionary), std::move(indexes));
}
@ -722,20 +717,17 @@ bool DataTypeWithDictionary::equals(const IDataType & rhs) const
return false;
auto & rhs_with_dictionary = static_cast<const DataTypeWithDictionary &>(rhs);
return dictionary_type->equals(*rhs_with_dictionary.dictionary_type)
&& indexes_type->equals(*rhs_with_dictionary.indexes_type);
return dictionary_type->equals(*rhs_with_dictionary.dictionary_type);
}
static DataTypePtr create(const ASTPtr & arguments)
{
if (!arguments || arguments->children.size() != 2)
throw Exception("WithDictionary data type family must have two arguments - type of elements and type of indices"
, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!arguments || arguments->children.size() != 1)
throw Exception("WithDictionary data type family must have single argument - type of elements",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<DataTypeWithDictionary>(DataTypeFactory::instance().get(arguments->children[0]),
DataTypeFactory::instance().get(arguments->children[1]));
return std::make_shared<DataTypeWithDictionary>(DataTypeFactory::instance().get(arguments->children[0]));
}
void registerDataTypeWithDictionary(DataTypeFactory & factory)

View File

@ -9,17 +9,15 @@ class DataTypeWithDictionary : public IDataType
{
private:
DataTypePtr dictionary_type;
DataTypePtr indexes_type;
public:
DataTypeWithDictionary(DataTypePtr dictionary_type_, DataTypePtr indexes_type_);
DataTypeWithDictionary(DataTypePtr dictionary_type_);
const DataTypePtr & getDictionaryType() const { return dictionary_type; }
const DataTypePtr & getIndexesType() const { return indexes_type; }
String getName() const override
{
return "WithDictionary(" + dictionary_type->getName() + ", " + indexes_type->getName() + ")";
return "WithDictionary(" + dictionary_type->getName() + ")";
}
const char * getFamilyName() const override { return "WithDictionary"; }
@ -146,7 +144,8 @@ public:
bool onlyNull() const override { return false; }
bool withDictionary() const override { return true; }
static MutableColumnUniquePtr createColumnUnique(const IDataType & keys_type, const IDataType & indexes_type);
static MutableColumnUniquePtr createColumnUnique(const IDataType & keys_type);
static MutableColumnUniquePtr createColumnUnique(const IDataType & keys_type, MutableColumnPtr && keys);
private:
@ -164,14 +163,8 @@ private:
void deserializeImpl(IColumn & column, ReadBuffer & istr,
DeserealizeFunctionPtr<Args ...> func, Args & ... args) const;
template <typename ColumnType, typename IndexType>
static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type);
template <typename ColumnType>
static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type, const IDataType & indexes_type);
friend struct CreateColumnVector;
template <typename Creator>
static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type, const Creator & creator);
};
}

View File

@ -121,8 +121,10 @@ public:
OutputStreamGetter getter;
SubstreamPath path;
bool position_independent_encoding = true;
size_t max_dictionary_size = 0;
bool use_single_dictionary_for_part = true;
bool position_independent_encoding = true;
};
struct DeserializeBinaryBulkSettings

View File

@ -1576,7 +1576,7 @@ private:
if (from_with_dict)
{
auto * col_with_dict = typeid_cast<const ColumnWithDictionary *>(prev_arg_col.get());
arg.column = col_with_dict->getUnique()->getNestedColumn();
arg.column = col_with_dict->getDictionary().getNestedColumn();
arg.type = from_with_dict->getDictionaryType();
tmp_rows_count = arg.column->size();
@ -1602,9 +1602,7 @@ private:
if (from_with_dict)
{
auto res_keys = std::move(res.column);
auto idx = col_with_dict->getUnique()->uniqueInsertRangeFrom(*res_keys, 0, res_keys->size());
col_with_dict->getIndexes()->insertRangeFrom(*idx->index(*res_indexes, 0), 0, res_indexes->size());
col_with_dict->insertRangeFromDictionaryEncodedColumn(*res_keys, *res_indexes);
}
else
col_with_dict->insertRangeFromFullColumn(*res.column, 0, res.column->size());

View File

@ -1836,35 +1836,11 @@ public:
};
template <typename IndexType>
struct FunctionMakeDictionaryName;
template <>
struct FunctionMakeDictionaryName<UInt8>
{
static constexpr auto name = "makeDictionaryUInt8";
};
template <>
struct FunctionMakeDictionaryName<UInt16>
{
static constexpr auto name = "makeDictionaryUInt16";
};
template <>
struct FunctionMakeDictionaryName<UInt32>
{
static constexpr auto name = "makeDictionaryUInt32";
};
template <>
struct FunctionMakeDictionaryName<UInt64>
{
static constexpr auto name = "makeDictionaryUInt64";
};
template <typename IndexType>
class FunctionMakeDictionary: public IFunction
{
public:
static constexpr auto name = FunctionMakeDictionaryName<IndexType>::name;
static FunctionPtr create(const Context &) { return std::make_shared<FunctionMakeDictionary<IndexType>>(); }
static constexpr auto name = "makeDictionary";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionMakeDictionary>(); }
String getName() const override { return name; }
@ -1875,7 +1851,7 @@ public:
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
return std::make_shared<DataTypeWithDictionary>(arguments[0], std::make_shared<DataTypeNumber<IndexType>>());
return std::make_shared<DataTypeWithDictionary>(arguments[0]);
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override
@ -1910,7 +1886,7 @@ public:
throw Exception("First first argument of function dictionaryIndexes must be ColumnWithDictionary, but got"
+ arguments[0]->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return type->getIndexesType();
return std::make_shared<DataTypeUInt64>();
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override
@ -1918,7 +1894,13 @@ public:
auto arg_num = arguments[0];
const auto & arg = block.getByPosition(arg_num);
auto & res = block.getByPosition(result);
res.column = typeid_cast<const ColumnWithDictionary *>(arg.column.get())->getIndexesPtr();
auto indexes_col = typeid_cast<const ColumnWithDictionary *>(arg.column.get())->getIndexesPtr();
auto new_indexes_col = ColumnUInt64::create(indexes_col->size());
auto & data = new_indexes_col->getData();
for (size_t i = 0; i < data.size(); ++i)
data[i] = indexes_col->getUInt(i);
res.column = std::move(new_indexes_col);
}
};
@ -1952,7 +1934,7 @@ public:
const auto & arg = block.getByPosition(arg_num);
auto & res = block.getByPosition(result);
const auto * column_with_dictionary = typeid_cast<const ColumnWithDictionary *>(arg.column.get());
res.column = column_with_dictionary->getUnique()->getNestedColumn()->cloneResized(arg.column->size());
res.column = column_with_dictionary->getDictionary().getNestedColumn()->cloneResized(arg.column->size());
}
};
@ -2009,10 +1991,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
factory.registerFunction<FunctionRunningIncome>();
factory.registerFunction<FunctionFinalizeAggregation>();
factory.registerFunction<FunctionMakeDictionary<UInt8>>();
factory.registerFunction<FunctionMakeDictionary<UInt16>>();
factory.registerFunction<FunctionMakeDictionary<UInt32>>();
factory.registerFunction<FunctionMakeDictionary<UInt64>>();
factory.registerFunction<FunctionMakeDictionary>();
factory.registerFunction<FunctionDictionaryIndexes>();
factory.registerFunction<FunctionDictionaryValues>();
}

View File

@ -254,7 +254,7 @@ static Block removeColumnsWithDictionary(Block & block, const ColumnNumbers & ar
else
{
has_with_dictionary = true;
column_with_dict_size = column_with_dict->getUnique()->size();
column_with_dict_size = column_with_dict->getDictionary().size();
indexes = column_with_dict->getIndexesPtr();
}
}
@ -291,7 +291,7 @@ static Block removeColumnsWithDictionary(Block & block, const ColumnNumbers & ar
+ column.type->getName(), ErrorCodes::LOGICAL_ERROR);
ColumnPtr new_column = convert_all_to_full ? column_with_dict->convertToFullColumn()
: column_with_dict->getUnique()->getNestedColumn();
: column_with_dict->getDictionary().getNestedColumn();
temp_block.insert({new_column, type_with_dict->getDictionaryType(), column.name});
}
@ -355,7 +355,6 @@ void FunctionBuilderImpl::checkNumberOfArguments(size_t number_of_arguments) con
struct ArgumentsWithoutDictionary
{
ColumnsWithTypeAndName arguments;
DataTypePtr common_index_type;
bool all_without_dictionary = true;
explicit ArgumentsWithoutDictionary(const ColumnsWithTypeAndName & args)
@ -372,12 +371,8 @@ struct ArgumentsWithoutDictionary
arguments = args;
}
arguments[i].type = arg_with_dict->getDictionaryType();
index_types.push_back(arg_with_dict->getIndexesType());
}
}
if (!all_without_dictionary)
common_index_type = getLeastSupertype(index_types);
}
};
@ -387,13 +382,13 @@ DataTypePtr FunctionBuilderImpl::getReturnTypeWithoutDictionary(const ColumnsWit
if (!arguments.empty() && useDefaultImplementationForNulls())
{
NullPresence null_presense = getNullPresense(arguments);
NullPresence null_presence = getNullPresense(arguments);
if (null_presense.has_null_constant)
if (null_presence.has_null_constant)
{
return makeNullable(std::make_shared<DataTypeNothing>());
}
if (null_presense.has_nullable)
if (null_presence.has_nullable)
{
Block nested_block = createBlockWithNestedColumns(Block(arguments), ext::collection_cast<ColumnNumbers>(ext::range(0, arguments.size())));
auto return_type = getReturnTypeImpl(ColumnsWithTypeAndName(nested_block.begin(), nested_block.end()));
@ -479,8 +474,7 @@ DataTypePtr FunctionBuilderImpl::getReturnType(const ColumnsWithTypeAndName & ar
ArgumentsWithoutDictionary arguments_without_dictionary(arguments);
if (!arguments_without_dictionary.all_without_dictionary)
return std::make_shared<DataTypeWithDictionary>(
getReturnTypeWithoutDictionary(arguments_without_dictionary.arguments),
arguments_without_dictionary.common_index_type);
getReturnTypeWithoutDictionary(arguments_without_dictionary.arguments));
}
return getReturnTypeWithoutDictionary(arguments);

View File

@ -264,6 +264,10 @@ struct Settings
M(SettingUInt64, enable_conditional_computation, 0, "Enable conditional computations") \
\
M(SettingDateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.") \
\
M(SettingUInt64, max_dictionary_size, 8192, "Maximum size (in rows) of shared global dictionary for WithDictionary type.") \
M(SettingBool, use_single_dictionary_for_part, true, "WithDictionary type serialization setting. If is true, than will use additional keys when global dictionary overflows. Otherwise, will create several shared dictionaries.") \
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};

View File

@ -200,6 +200,11 @@ MergeTreeReader::Stream::Stream(
getMark(right).offset_in_compressed_file - getMark(all_mark_ranges[i].begin).offset_in_compressed_file);
}
/// Avoid empty buffer. May happen while reading dictionary for DataTypeWithDictionary.
/// For example: part has single dictionary and all marks point to the same position.
if (max_mark_range == 0)
max_mark_range = max_read_buffer_size;
size_t buffer_size = std::min(max_read_buffer_size, max_mark_range);
/// Estimate size of the data to be read.
@ -329,6 +334,26 @@ void MergeTreeReader::Stream::seekToMark(size_t index)
}
void MergeTreeReader::Stream::seekToStart()
{
try
{
if (cached_buffer)
cached_buffer->seek(0, 0);
if (non_cached_buffer)
non_cached_buffer->seek(0, 0);
}
catch (Exception & e)
{
/// Better diagnostics.
if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND)
e.addMessage("(while seeking to start of column " + path_prefix + ")");
throw;
}
}
void MergeTreeReader::addStreams(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
{
@ -379,7 +404,12 @@ void MergeTreeReader::readData(
Stream & stream = *it->second;
if (!continue_reading && !stream_for_prefix)
if (stream_for_prefix)
{
stream.seekToStart();
continue_reading = false;
}
else if (!continue_reading)
stream.seekToMark(from_mark);
return stream.data_buffer;

View File

@ -64,6 +64,7 @@ private:
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
void seekToMark(size_t index);
void seekToStart();
ReadBuffer * data_buffer;

View File

@ -97,9 +97,11 @@ void IMergedBlockOutputStream::writeData(
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state)
{
IDataType::SerializeBinaryBulkSettings settings;
settings.getter = createStreamGetter(name, offset_columns, skip_offsets);
settings.max_dictionary_size = 1024;
auto & settings = storage.context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = createStreamGetter(name, offset_columns, skip_offsets);
serialize_settings.max_dictionary_size = settings.max_dictionary_size;
serialize_settings.use_single_dictionary_for_part = settings.use_single_dictionary_for_part != 0;
size_t size = column.size();
size_t prev_mark = 0;
@ -135,10 +137,10 @@ void IMergedBlockOutputStream::writeData(
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}, settings.path);
}, serialize_settings.path);
}
type.serializeBinaryBulkWithMultipleStreams(column, prev_mark, limit, settings, serialization_state);
type.serializeBinaryBulkWithMultipleStreams(column, prev_mark, limit, serialize_settings, serialization_state);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
@ -154,7 +156,7 @@ void IMergedBlockOutputStream::writeData(
return;
column_streams[stream_name]->compressed.nextIfAtEnd();
}, settings.path);
}, serialize_settings.path);
prev_mark += limit;
}
@ -168,7 +170,7 @@ void IMergedBlockOutputStream::writeData(
String stream_name = IDataType::getFileNameForStream(name, substream_path);
offset_columns.insert(stream_name);
}
}, settings.path);
}, serialize_settings.path);
}
@ -296,14 +298,16 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
MergeTreeData::DataPart::Checksums * additional_column_checksums)
{
/// Finish columns serialization.
IDataType::SerializeBinaryBulkSettings settings;
settings.max_dictionary_size = 1024;
auto & settings = storage.context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.max_dictionary_size = settings.max_dictionary_size;
serialize_settings.use_single_dictionary_for_part = settings.use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
{
settings.getter = createStreamGetter(it->name, offset_columns, false);
it->type->serializeBinaryBulkStateSuffix(settings, serialization_states[i]);
serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
}
if (!total_column_list)
@ -550,14 +554,16 @@ void MergedColumnOnlyOutputStream::writeSuffix()
MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums()
{
/// Finish columns serialization.
IDataType::SerializeBinaryBulkSettings settings;
settings.max_dictionary_size = 1024;
auto & settings = storage.context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.max_dictionary_size = settings.max_dictionary_size;
serialize_settings.use_single_dictionary_for_part = settings.use_single_dictionary_for_part != 0;
OffsetColumns offset_columns;
for (size_t i = 0; i < header.columns(); ++i)
{
auto & column = header.safeGetByPosition(i);
settings.getter = createStreamGetter(column.name, offset_columns, skip_offsets);
column.type->serializeBinaryBulkStateSuffix(settings, serialization_states[i]);
serialize_settings.getter = createStreamGetter(column.name, offset_columns, skip_offsets);
column.type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
}
MergeTreeData::DataPart::Checksums checksums;