From 9c763f8090ebb7de942874bcd338bd45f804e612 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 9 Jul 2018 21:19:03 +0300 Subject: [PATCH] Make DatsTypeWithDictionary independent from index type. --- dbms/src/Columns/ColumnUnique.h | 143 +++-- dbms/src/Columns/ColumnWithDictionary.cpp | 555 ++++++++++++++++-- dbms/src/Columns/ColumnWithDictionary.h | 285 +++++---- dbms/src/Columns/IColumnUnique.h | 17 +- dbms/src/DataTypes/DataTypeWithDictionary.cpp | 414 +++++++------ dbms/src/DataTypes/DataTypeWithDictionary.h | 19 +- dbms/src/DataTypes/IDataType.h | 4 +- dbms/src/Functions/FunctionsConversion.h | 6 +- dbms/src/Functions/FunctionsMiscellaneous.cpp | 47 +- dbms/src/Functions/IFunction.cpp | 18 +- dbms/src/Interpreters/Settings.h | 4 + .../Storages/MergeTree/MergeTreeReader.cpp | 32 +- dbms/src/Storages/MergeTree/MergeTreeReader.h | 1 + .../MergeTree/MergedBlockOutputStream.cpp | 36 +- 14 files changed, 1015 insertions(+), 566 deletions(-) diff --git a/dbms/src/Columns/ColumnUnique.h b/dbms/src/Columns/ColumnUnique.h index e1988723640..fb32c123560 100644 --- a/dbms/src/Columns/ColumnUnique.h +++ b/dbms/src/Columns/ColumnUnique.h @@ -39,10 +39,10 @@ struct StringRefWrapper namespace ZeroTraits { template - bool check(const StringRefWrapper x) { return nullptr == x.column; } + bool check(const StringRefWrapper x) { return nullptr == x.column && nullptr == x.ref.data; } template - void set(StringRefWrapper & x) { x.column = nullptr; } + void set(StringRefWrapper & x) { x.column = nullptr; x.ref.data = nullptr; } }; @@ -52,7 +52,7 @@ namespace DB template class ColumnUnique final : public COWPtrHelper> { - friend class COWPtrHelper; + friend class COWPtrHelper>; private: explicit ColumnUnique(MutableColumnPtr && holder, bool is_nullable); @@ -60,7 +60,9 @@ private: ColumnUnique(const ColumnUnique & other) : column_holder(other.column_holder), is_nullable(other.is_nullable) {} public: - ColumnPtr getNestedColumn() const override; + MutableColumnPtr cloneEmpty() const override; + + const ColumnPtr & getNestedColumn() const override; const ColumnPtr & getNestedNotNullableColumn() const override { return column_holder; } size_t uniqueInsert(const Field & x) override; @@ -71,7 +73,6 @@ public: size_t uniqueInsertData(const char * pos, size_t length) override; size_t uniqueInsertDataWithTerminatingZero(const char * pos, size_t length) override; size_t uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) override; - IColumnUnique::SerializableState getSerializableState() const override; size_t getDefaultValueIndex() const override { return is_nullable ? 1 : 0; } size_t getNullValueIndex() const override; @@ -119,6 +120,7 @@ public: void forEachSubcolumn(IColumn::ColumnCallback callback) override { callback(column_holder); + index = nullptr; } private: @@ -129,6 +131,7 @@ private: /// For DataTypeNullable, stores null map. mutable ColumnPtr cached_null_mask; + mutable ColumnPtr cached_column_nullable; /// Lazy initialized. std::unique_ptr index; @@ -147,11 +150,19 @@ private: const IColumn & src, size_t start, size_t length, + size_t num_added_rows, typename ColumnVector::MutablePtr && positions_column, ColumnType * overflowed_keys, + IndexMapType * secondary_index, size_t max_dictionary_size); }; +template +MutableColumnPtr ColumnUnique::cloneEmpty() const +{ + return ColumnUnique::create(column_holder->cloneResized(numSpecialValues()), is_nullable); +} + template ColumnUnique::ColumnUnique(const IDataType & type) : is_nullable(type.isNullable()) { @@ -170,7 +181,7 @@ ColumnUnique::ColumnUnique(MutableColumnPtr && holder, bool is_nulla } template -ColumnPtr ColumnUnique::getNestedColumn() const +const ColumnPtr & ColumnUnique::getNestedColumn() const { if (is_nullable) { @@ -180,6 +191,7 @@ ColumnPtr ColumnUnique::getNestedColumn() const ColumnUInt8::MutablePtr null_mask = ColumnUInt8::create(size, UInt8(0)); null_mask->getData()[getNullValueIndex()] = 1; cached_null_mask = std::move(null_mask); + cached_column_nullable = ColumnNullable::create(column_holder, cached_null_mask); } if (cached_null_mask->size() != size) @@ -187,9 +199,10 @@ ColumnPtr ColumnUnique::getNestedColumn() const MutableColumnPtr null_mask = (*std::move(cached_null_mask)).mutate(); static_cast(*null_mask).getData().resize_fill(size); cached_null_mask = std::move(null_mask); + cached_column_nullable = ColumnNullable::create(column_holder, cached_null_mask); } - return ColumnNullable::create(column_holder, cached_null_mask); + return cached_column_nullable; } return column_holder; } @@ -219,7 +232,7 @@ void ColumnUnique::buildIndex() } template -IndexType ColumnUnique::insertIntoMap(const StringRefWrapper & ref, IndexType value) +UInt64 ColumnUnique::insertIntoMap(const StringRefWrapper & ref, UInt64 value) { if (!index) buildIndex(); @@ -242,7 +255,7 @@ size_t ColumnUnique::uniqueInsert(const Field & x) return getNullValueIndex(); auto column = getRawColumnPtr(); - auto prev_size = static_cast(column->size()); + auto prev_size = static_cast(column->size()); if ((*column)[getDefaultValueIndex()] == x) return getDefaultValueIndex(); @@ -261,6 +274,9 @@ size_t ColumnUnique::uniqueInsertFrom(const IColumn & src, size_t n) if (is_nullable && src.isNullAt(n)) return getNullValueIndex(); + if (auto * nullable = typeid_cast(&src)) + return uniqueInsertFrom(nullable->getNestedColumn(), n); + auto ref = src.getDataAt(n); return uniqueInsertData(ref.data, ref.size); } @@ -336,14 +352,31 @@ size_t ColumnUnique::uniqueDeserializeAndInsertFromArena(const char return static_cast(index_pos); } +template +static void checkIndexes(const ColumnVector & indexes, size_t max_dictionary_size) +{ + auto & data = indexes.getData(); + for (size_t i = 0; i < data.size(); ++i) + { + if (data[i] >= max_dictionary_size) + { + throw Exception("Found index " + toString(data[i]) + " at position " + toString(i) + + " which is grated or equal than dictionary size " + toString(max_dictionary_size), + ErrorCodes::LOGICAL_ERROR); + } + } +} + template template MutableColumnPtr ColumnUnique::uniqueInsertRangeImpl( const IColumn & src, size_t start, size_t length, + size_t num_added_rows, typename ColumnVector::MutablePtr && positions_column, ColumnType * overflowed_keys, + IndexMapType * secondary_index, size_t max_dictionary_size) { if (!index) @@ -353,9 +386,11 @@ MutableColumnPtr ColumnUnique::uniqueInsertRangeImpl( const NullMap * null_map = nullptr; auto & positions = positions_column->getData(); - using SuperiorIndexType = NumberTraits::Construct::Type; - auto updatePosition = [&](UInt64 & next_position, UInt64 num_added_rows) -> MutableColumnPtr + auto updatePosition = [&](UInt64 & next_position) -> MutableColumnPtr { + constexpr auto next_size = NumberTraits::nextSize(sizeof(IndexType)); + using SuperiorIndexType = typename NumberTraits::Construct::Type; + ++next_position; if (next_position > std::numeric_limits::max()) @@ -364,102 +399,108 @@ MutableColumnPtr ColumnUnique::uniqueInsertRangeImpl( throw Exception("Can't find superior index type for type " + demangle(typeid(IndexType).name()), ErrorCodes::LOGICAL_ERROR); - auto expanded_column = ColumnVector::create(length); + auto expanded_column = ColumnVector::create(length); auto & expanded_data = expanded_column->getData(); for (size_t i = 0; i < num_added_rows; ++i) expanded_data[i] = positions[i]; return uniqueInsertRangeImpl( src, - start + num_added_rows, - length - num_added_rows, + start, + length, + num_added_rows, std::move(expanded_column), overflowed_keys, + secondary_index, max_dictionary_size); } return nullptr; }; - if (src.isColumnNullable()) + if (auto nullable_column = typeid_cast(&src)) { - auto nullable_column = static_cast(&src); - src_column = static_cast(&nullable_column->getNestedColumn()); + src_column = typeid_cast(&nullable_column->getNestedColumn()); null_map = &nullable_column->getNullMapData(); } else - src_column = static_cast(&src); + src_column = typeid_cast(&src); - std::unique_ptr secondary_index; - if (overflowed_keys) - secondary_index = std::make_unique(); + if (src_column == nullptr) + throw Exception("Invalid column type for ColumnUnique::insertRangeFrom. Expected " + column_holder->getName() + + ", got " + src.getName(), ErrorCodes::ILLEGAL_COLUMN); auto column = getRawColumnPtr(); UInt64 next_position = column->size(); - for (auto i : ext::range(0, length)) + if (secondary_index) + next_position += secondary_index->size(); + + for (; num_added_rows < length; ++num_added_rows) { - auto row = start + i; + auto row = start + num_added_rows; if (null_map && (*null_map)[row]) - positions[i] = getNullValueIndex(); + positions[num_added_rows] = getNullValueIndex(); else if (column->compareAt(getDefaultValueIndex(), row, *src_column, 1) == 0) - positions[i] = getDefaultValueIndex(); + positions[num_added_rows] = getDefaultValueIndex(); else { auto it = index->find(StringRefWrapper(src_column, row)); if (it == index->end()) { - if (overflowed_keys && next_position >= max_dictionary_size + numSpecialValues()) + if (overflowed_keys && next_position >= max_dictionary_size) { auto jt = secondary_index->find(StringRefWrapper(src_column, row)); if (jt == secondary_index->end()) { - positions[i] = next_position; + positions[num_added_rows] = next_position; auto ref = src_column->getDataAt(row); overflowed_keys->insertData(ref.data, ref.size); (*secondary_index)[StringRefWrapper(src_column, row)] = next_position; - if (auto res = updatePosition(next_position, i)) + if (auto res = updatePosition(next_position)) return res; } else - positions[i] = jt->second; + positions[num_added_rows] = jt->second; } else { - positions[i] = next_position; + positions[num_added_rows] = next_position; auto ref = src_column->getDataAt(row); column->insertData(ref.data, ref.size); (*index)[StringRefWrapper(column, next_position)] = next_position; - if (auto res = updatePosition(next_position, i)) + if (auto res = updatePosition(next_position)) return res; } } else - positions[i] = it->second; + positions[num_added_rows] = it->second; } } - return positions_column; + checkIndexes(*positions_column, column->size() + (overflowed_keys ? overflowed_keys->size() : 0)); + + return std::move(positions_column); } template MutableColumnPtr ColumnUnique::uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length) { - size_t size = getRawColumnPtr()->size(); - auto callForType = [&](auto x) + auto callForType = [this, &src, start, length](auto x) -> MutableColumnPtr { + size_t size = getRawColumnPtr()->size(); + using IndexType = decltype(x); if (size <= std::numeric_limits::max()) { - auto positions_column = ColumnVector::create(length); - auto & positions = positions_column->getData(); - - return uniqueInsertRangeImpl(src, start, length, positions, nullptr, 0); + auto positions = ColumnVector::create(length); + return this->uniqueInsertRangeImpl(src, start, length, 0, + std::move(positions), nullptr, nullptr, 0); } return nullptr; @@ -488,21 +529,22 @@ IColumnUnique::IndexesWithOverflow ColumnUnique::uniqueInsertRangeWi size_t max_dictionary_size) { - size_t size = getRawColumnPtr()->size(); auto overflowed_keys = column_holder->cloneEmpty(); auto overflowed_keys_ptr = typeid_cast(overflowed_keys.get()); if (!overflowed_keys_ptr) throw Exception("Invalid keys type for ColumnUnique.", ErrorCodes::LOGICAL_ERROR); - auto callForType = [&](auto x) + auto callForType = [this, &src, start, length, overflowed_keys_ptr, max_dictionary_size](auto x) -> MutableColumnPtr { + size_t size = getRawColumnPtr()->size(); + using IndexType = decltype(x); if (size <= std::numeric_limits::max()) { - auto positions_column = ColumnVector::create(length); - auto & positions = positions_column->getData(); - - return uniqueInsertRangeImpl(src, start, length, positions, overflowed_keys_ptr, max_dictionary_size); + auto positions = ColumnVector::create(length); + IndexMapType secondary_index; + return this->uniqueInsertRangeImpl(src, start, length, 0, std::move(positions), + overflowed_keys_ptr, &secondary_index, max_dictionary_size); } return nullptr; @@ -526,15 +568,4 @@ IColumnUnique::IndexesWithOverflow ColumnUnique::uniqueInsertRangeWi return indexes_with_overflow; } -template -IColumnUnique::SerializableState ColumnUnique::getSerializableState() const -{ - IColumnUnique::SerializableState state; - state.column = column_holder; - state.offset = numSpecialValues(); - state.limit = column_holder->size() - state.offset; - - return state; -} - }; diff --git a/dbms/src/Columns/ColumnWithDictionary.cpp b/dbms/src/Columns/ColumnWithDictionary.cpp index b4489dc5b81..0f64d4bcbe0 100644 --- a/dbms/src/Columns/ColumnWithDictionary.cpp +++ b/dbms/src/Columns/ColumnWithDictionary.cpp @@ -1,21 +1,192 @@ #include +#include #include +#include +#include namespace DB { -ColumnWithDictionary::ColumnWithDictionary(MutableColumnPtr && column_unique_, MutableColumnPtr && indexes_) - : column_unique(std::move(column_unique_)), indexes(std::move(indexes_)) +namespace { - if (!dynamic_cast(column_unique.get())) - throw Exception("ColumnUnique expected as argument of ColumnWithDictionary.", ErrorCodes::ILLEGAL_COLUMN); + template + PaddedPODArray * getIndexesData(IColumn & indexes) + { + auto * column = typeid_cast *>(&indexes); + if (column) + return &column->getData(); - getSizeOfCurrentIndexType(); + return nullptr; + } + + template + MutableColumnPtr mapUniqueIndexImpl(PaddedPODArray & index) + { + PaddedPODArray copy(index.cbegin(), index.cend()); + + HashMap hash_map; + for (auto val : index) + hash_map.insert({val, hash_map.size()}); + + auto res_col = ColumnVector::create(); + auto & data = res_col->getData(); + + data.resize(hash_map.size()); + for (auto val : hash_map) + data[val.second] = val.first; + + for (auto & ind : index) + ind = hash_map[ind]; + + for (size_t i = 0; i < index.size(); ++i) + if (data[index[i]] != copy[i]) + throw Exception("Expected " + toString(data[index[i]]) + ", but got " + toString(copy[i]), ErrorCodes::LOGICAL_ERROR); + + return std::move(res_col); + } + + /// Returns unique values of column. Write new index to column. + MutableColumnPtr mapUniqueIndex(IColumn & column) + { + if (auto * data_uint8 = getIndexesData(column)) + return mapUniqueIndexImpl(*data_uint8); + else if (auto * data_uint16 = getIndexesData(column)) + return mapUniqueIndexImpl(*data_uint16); + else if (auto * data_uint32 = getIndexesData(column)) + return mapUniqueIndexImpl(*data_uint32); + else if (auto * data_uint64 = getIndexesData(column)) + return mapUniqueIndexImpl(*data_uint64); + else + throw Exception("Indexes column for getUniqueIndex must be ColumnUInt, got" + column.getName(), + ErrorCodes::LOGICAL_ERROR); + } } -ColumnWithDictionary::ColumnWithDictionary(const ColumnWithDictionary & other) - : column_unique(other.column_unique), indexes(other.indexes) + +ColumnWithDictionary::ColumnWithDictionary(MutableColumnPtr && column_unique_, MutableColumnPtr && indexes_) + : dictionary(std::move(column_unique_)), idx(std::move(indexes_)) { + idx.check(getDictionary().size()); +} + +void ColumnWithDictionary::insert(const Field & x) +{ + compactIfSharedDictionary(); + idx.insertPosition(dictionary.getColumnUnique().uniqueInsert(x)); + idx.check(getDictionary().size()); +} + +void ColumnWithDictionary::insertDefault() +{ + idx.insertPosition(getDictionary().getDefaultValueIndex()); +} + +void ColumnWithDictionary::insertFrom(const IColumn & src, size_t n) +{ + auto * src_with_dict = typeid_cast(&src); + + if (!src_with_dict) + throw Exception("Expected ColumnWithDictionary, got" + src.getName(), ErrorCodes::ILLEGAL_COLUMN); + + size_t position = src_with_dict->getIndexes().getUInt(n); + + if (&src_with_dict->getDictionary() == &getDictionary()) + { + /// Dictionary is shared with src column. Insert only index. + idx.insertPosition(position); + } + else + { + compactIfSharedDictionary(); + const auto & nested = *src_with_dict->getDictionary().getNestedColumn(); + idx.insertPosition(dictionary.getColumnUnique().uniqueInsertFrom(nested, position)); + } + + idx.check(getDictionary().size()); +} + +void ColumnWithDictionary::insertFromFullColumn(const IColumn & src, size_t n) +{ + compactIfSharedDictionary(); + idx.insertPosition(dictionary.getColumnUnique().uniqueInsertFrom(src, n)); + idx.check(getDictionary().size()); +} + +void ColumnWithDictionary::insertRangeFrom(const IColumn & src, size_t start, size_t length) +{ + auto * src_with_dict = typeid_cast(&src); + + if (!src_with_dict) + throw Exception("Expected ColumnWithDictionary, got" + src.getName(), ErrorCodes::ILLEGAL_COLUMN); + + if (&src_with_dict->getDictionary() == &getDictionary()) + { + /// Dictionary is shared with src column. Insert only indexes. + idx.insertPositionsRange(src_with_dict->getIndexes(), start, length); + } + else + { + compactIfSharedDictionary(); + + /// TODO: Support native insertion from other unique column. It will help to avoid null map creation. + + auto sub_idx = (*src_with_dict->getIndexes().cut(start, length)).mutate(); + auto idx_map = mapUniqueIndex(*sub_idx); + + auto src_nested = src_with_dict->getDictionary().getNestedColumn(); + auto used_keys = src_nested->index(*idx_map, 0); + + auto inserted_indexes = dictionary.getColumnUnique().uniqueInsertRangeFrom(*used_keys, 0, used_keys->size()); + idx.insertPositionsRange(*inserted_indexes->index(*sub_idx, 0), 0, length); + } + idx.check(getDictionary().size()); +} + +void ColumnWithDictionary::insertRangeFromFullColumn(const IColumn & src, size_t start, size_t length) +{ + compactIfSharedDictionary(); + auto inserted_indexes = dictionary.getColumnUnique().uniqueInsertRangeFrom(src, start, length); + idx.insertPositionsRange(*inserted_indexes, 0, length); + idx.check(getDictionary().size()); +} + +void ColumnWithDictionary::insertRangeFromDictionaryEncodedColumn(const IColumn & keys, const IColumn & positions) +{ + Index(positions.getPtr()).check(keys.size()); + compactIfSharedDictionary(); + auto inserted_indexes = dictionary.getColumnUnique().uniqueInsertRangeFrom(keys, 0, keys.size()); + idx.insertPositionsRange(*inserted_indexes->index(positions, 0), 0, positions.size()); + idx.check(getDictionary().size()); +} + +void ColumnWithDictionary::insertData(const char * pos, size_t length) +{ + compactIfSharedDictionary(); + idx.insertPosition(dictionary.getColumnUnique().uniqueInsertData(pos, length)); + idx.check(getDictionary().size()); +} + +void ColumnWithDictionary::insertDataWithTerminatingZero(const char * pos, size_t length) +{ + compactIfSharedDictionary(); + idx.insertPosition(dictionary.getColumnUnique().uniqueInsertDataWithTerminatingZero(pos, length)); + idx.check(getDictionary().size()); +} + +StringRef ColumnWithDictionary::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const +{ + return getDictionary().serializeValueIntoArena(getIndexes().getUInt(n), arena, begin); +} + +const char * ColumnWithDictionary::deserializeAndInsertFromArena(const char * pos) +{ + compactIfSharedDictionary(); + + const char * new_pos; + idx.insertPosition(dictionary.getColumnUnique().uniqueDeserializeAndInsertFromArena(pos, new_pos)); + + idx.check(getDictionary().size()); + return new_pos; } void ColumnWithDictionary::gather(ColumnGathererStream & gatherer) @@ -25,88 +196,364 @@ void ColumnWithDictionary::gather(ColumnGathererStream & gatherer) MutableColumnPtr ColumnWithDictionary::cloneResized(size_t size) const { - auto unique_ptr = column_unique; - return ColumnWithDictionary::create((*std::move(unique_ptr)).mutate(), indexes->cloneResized(size)); + auto unique_ptr = dictionary.getColumnUniquePtr(); + return ColumnWithDictionary::create((*std::move(unique_ptr)).mutate(), getIndexes().cloneResized(size)); } -size_t ColumnWithDictionary::getSizeOfCurrentIndexType() const +int ColumnWithDictionary::compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const { - if (typeid_cast(indexes.get())) - return sizeof(UInt8); - if (typeid_cast(indexes.get())) - return sizeof(UInt16); - if (typeid_cast(indexes.get())) - return sizeof(UInt32); - if (typeid_cast(indexes.get())) - return sizeof(UInt64); + const auto & column_with_dictionary = static_cast(rhs); + size_t n_index = getIndexes().getUInt(n); + size_t m_index = column_with_dictionary.getIndexes().getUInt(m); + return getDictionary().compareAt(n_index, m_index, column_with_dictionary.getDictionary(), nan_direction_hint); +} - throw Exception("Unexpected indexes type for ColumnWithDictionary. Expected ColumnUInt, got " + indexes->getName(), +void ColumnWithDictionary::getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const +{ + if (limit == 0) + limit = size(); + + size_t unique_limit = std::min(limit, getDictionary().size()); + Permutation unique_perm; + getDictionary().getNestedColumn()->getPermutation(reverse, unique_limit, nan_direction_hint, unique_perm); + + /// TODO: optimize with sse. + + /// Get indexes per row in column_unique. + std::vector> indexes_per_row(getDictionary().size()); + size_t indexes_size = getIndexes().size(); + for (size_t row = 0; row < indexes_size; ++row) + indexes_per_row[getIndexes().getUInt(row)].push_back(row); + + /// Replicate permutation. + size_t perm_size = std::min(indexes_size, limit); + res.resize(perm_size); + size_t perm_index = 0; + for (size_t row = 0; row < indexes_size && perm_index < perm_size; ++row) + { + const auto & row_indexes = indexes_per_row[unique_perm[row]]; + for (auto row_index : row_indexes) + { + res[perm_index] = row_index; + ++perm_index; + + if (perm_index == perm_size) + break; + } + } +} + +std::vector ColumnWithDictionary::scatter(ColumnIndex num_columns, const Selector & selector) const +{ + auto columns = getIndexes().scatter(num_columns, selector); + for (auto & column : columns) + { + auto unique_ptr = dictionary.getColumnUniquePtr(); + column = ColumnWithDictionary::create((*std::move(unique_ptr)).mutate(), std::move(column)); + } + + return columns; +} + +void ColumnWithDictionary::setSharedDictionary(const ColumnPtr & column_unique) +{ + if (!empty()) + throw Exception("Can't set ColumnUnique for ColumnWithDictionary because is't not empty.", + ErrorCodes::LOGICAL_ERROR); + + dictionary.setShared(column_unique); +} + +ColumnWithDictionary::MutablePtr ColumnWithDictionary::compact() +{ + auto positions = idx.getPositions(); + /// Create column with new indexes and old dictionary. + auto column = ColumnWithDictionary::create(getDictionary().assumeMutable(), (*std::move(positions)).mutate()); + /// Will create new dictionary. + column->compactInplace(); + + return column; +} + +ColumnWithDictionary::MutablePtr ColumnWithDictionary::cutAndCompact(size_t start, size_t length) const +{ + auto sub_positions = (*idx.getPositions()->cut(start, length)).mutate(); + /// Create column with new indexes and old dictionary. + auto column = ColumnWithDictionary::create(getDictionary().assumeMutable(), std::move(sub_positions)); + /// Will create new dictionary. + column->compactInplace(); + + return column; +} + +void ColumnWithDictionary::compactInplace() +{ + auto positions = idx.detachPositions(); + dictionary.compact(positions); + idx.attachPositions(std::move(positions)); +} + +void ColumnWithDictionary::compactIfSharedDictionary() +{ + if (dictionary.isShared()) + compactInplace(); +} + + +ColumnWithDictionary::Index::Index() : positions(ColumnUInt8::create()), size_of_type(sizeof(UInt8)) {} + +ColumnWithDictionary::Index::Index(MutableColumnPtr && positions) : positions(std::move(positions)) +{ + updateSizeOfType(); +} + +ColumnWithDictionary::Index::Index(ColumnPtr positions) : positions(std::move(positions)) +{ + updateSizeOfType(); +} + +template +void ColumnWithDictionary::Index::callForType(Callback && callback, size_t size_of_type) +{ + switch (size_of_type) + { + case sizeof(UInt8): { callback(UInt8()); break; } + case sizeof(UInt16): { callback(UInt16()); break; } + case sizeof(UInt32): { callback(UInt32()); break; } + case sizeof(UInt64): { callback(UInt64()); break; } + default: { + throw Exception("Unexpected size of index type for ColumnWithDictionary: " + toString(size_of_type), + ErrorCodes::LOGICAL_ERROR); + } + } +} + +size_t ColumnWithDictionary::Index::getSizeOfIndexType(const IColumn & column, size_t hint) +{ + auto checkFor = [&](auto type) { return typeid_cast *>(&column) != nullptr; }; + auto tryGetSizeFor = [&](auto type) -> size_t { return checkFor(type) ? sizeof(decltype(type)) : 0; }; + + if (hint) + { + size_t size = 0; + callForType([&](auto type) { size = tryGetSizeFor(type); }, hint); + + if (size) + return size; + } + + if (auto size = tryGetSizeFor(UInt8())) + return size; + if (auto size = tryGetSizeFor(UInt16())) + return size; + if (auto size = tryGetSizeFor(UInt32())) + return size; + if (auto size = tryGetSizeFor(UInt64())) + return size; + + throw Exception("Unexpected indexes type for ColumnWithDictionary. Expected UInt, got " + column.getName(), ErrorCodes::ILLEGAL_COLUMN); } +void ColumnWithDictionary::Index::attachPositions(ColumnPtr positions_) +{ + positions = std::move(positions_); + updateSizeOfType(); +} + template -void ColumnWithDictionary::convertIndexes() +typename ColumnVector::Container & ColumnWithDictionary::Index::getPositionsData() +{ + auto * positions_ptr = typeid_cast *>(positions->assumeMutable().get()); + if (!positions_ptr) + throw Exception("Invalid indexes type for ColumnWithDictionary." + " Expected UInt" + toString(8 * sizeof(IndexType)) + ", got " + positions->getName(), + ErrorCodes::LOGICAL_ERROR); + + return positions_ptr->getData(); +} + +template +void ColumnWithDictionary::Index::convertPositions() { auto convert = [&](auto x) { - using CurIndexType = typeof(x); - if (auto * index_col = typeid_cast *>(indexes.get())) + using CurIndexType = decltype(x); + auto & data = getPositionsData(); + + if (sizeof(CurIndexType) > sizeof(IndexType)) + throw Exception("Converting indexes to smaller type: from " + toString(sizeof(CurIndexType)) + + " to " + toString(sizeof(IndexType)), ErrorCodes::LOGICAL_ERROR); + + if (sizeof(CurIndexType) != sizeof(IndexType)) { - if (sizeof(CurIndexType) != sizeof(IndexType)) - { - size_t size = index_col->size(); - auto new_index_col = ColumnVector::create(size); - auto & data = index_col->getData(); - auto & new_data = new_index_col->getData(); + size_t size = data.size(); + auto new_positions = ColumnVector::create(size); + auto & new_data = new_positions->getData(); - for (size_t i = 0; i < size; ++i) - new_data[i] = data[i]; + /// TODO: Optimize with SSE? + for (size_t i = 0; i < size; ++i) + new_data[i] = data[i]; - indexes = std::move(new_index_col); - } - - return true; + positions = std::move(new_positions); + size_of_type = sizeof(IndexType); } - return false; }; - if (!convert(UInt8()) && - !convert(UInt16()) && - !convert(UInt32()) && - !convert(UInt64())) - throw Exception("Unexpected indexes type for ColumnWithDictionary. Expected ColumnUInt, got " - + indexes->getName(), ErrorCodes::ILLEGAL_COLUMN); + callForType(std::move(convert), size_of_type); + + checkSizeOfType(); } -void ColumnWithDictionary::insertIndex(size_t value) +void ColumnWithDictionary::Index::expandType() { - auto current_index_type = getSizeOfCurrentIndexType(); - - auto insertForType = [&](auto x) + auto expand = [&](auto type) { - using IndexType = typeof(x); - if (value <= std::numeric_limits::max()) + using CurIndexType = decltype(type); + constexpr auto next_size = NumberTraits::nextSize(sizeof(CurIndexType)); + if (next_size == sizeof(CurIndexType)) + throw Exception("Can't expand indexes type for ColumnWithDictionary from type: " + + demangle(typeid(CurIndexType).name()), ErrorCodes::LOGICAL_ERROR); + + using NewIndexType = typename NumberTraits::Construct::Type; + convertPositions(); + }; + + callForType(std::move(expand), size_of_type); +} + +UInt64 ColumnWithDictionary::Index::getMaxPositionForCurrentType() const +{ + UInt64 value = 0; + callForType([&](auto type) { value = std::numeric_limits::max(); }, size_of_type); + return value; +} + +void ColumnWithDictionary::Index::insertPosition(UInt64 position) +{ + while (position > getMaxPositionForCurrentType()) + expandType(); + + positions->assumeMutableRef().insert(UInt64(position)); + checkSizeOfType(); +} + +void ColumnWithDictionary::Index::insertPositionsRange(const IColumn & column, size_t offset, size_t limit) +{ + auto insertForType = [&](auto type) + { + using ColumnType = decltype(type); + const auto * column_ptr = typeid_cast *>(&column); + + if (!column_ptr) + return false; + + if (size_of_type < sizeof(ColumnType)) + convertPositions(); + + if (size_of_type == sizeof(ColumnType)) + positions->assumeMutableRef().insertRangeFrom(column, offset, limit); + else { - if (sizeof(IndexType) > current_index_type) - convertIndexes(); + auto copy = [&](auto cur_type) + { + using CurIndexType = decltype(cur_type); + auto & positions_data = getPositionsData(); + const auto & column_data = column_ptr->getData(); - getIndexes()->insert(UInt64(value)); + size_t size = positions_data.size(); + positions_data.resize(size + limit); - return true; + for (size_t i = 0; i < limit; ++i) + positions_data[size + i] = column_data[offset + i]; + }; + + callForType(std::move(copy), size_of_type); } - return false; + + return true; }; if (!insertForType(UInt8()) && !insertForType(UInt16()) && !insertForType(UInt32()) && !insertForType(UInt64())) - throw Exception("Unexpected indexes type for ColumnWithDictionary.", ErrorCodes::ILLEGAL_COLUMN); + throw Exception("Invalid column for ColumnWithDictionary index. Expected UInt, got " + column.getName(), + ErrorCodes::ILLEGAL_COLUMN); + + checkSizeOfType(); } -void ColumnWithDictionary::insertIndexesRange(const ColumnPtr & column) +void ColumnWithDictionary::Index::check(size_t max_dictionary_size) +{ + auto check = [&](auto cur_type) + { + using CurIndexType = decltype(cur_type); + auto & positions_data = getPositionsData(); + + for (size_t i = 0; i < positions_data.size(); ++i) + { + if (positions_data[i] >= max_dictionary_size) + { + throw Exception("Found index " + toString(positions_data[i]) + " at position " + toString(i) + + " which is grated or equal than dictionary size " + toString(max_dictionary_size), + ErrorCodes::LOGICAL_ERROR); + } + } + }; + + callForType(std::move(check), size_of_type); +} + +void ColumnWithDictionary::Index::checkSizeOfType() +{ + if (size_of_type != getSizeOfIndexType(*positions, size_of_type)) + throw Exception("Invalid size of type. Expected " + toString(8 * size_of_type) + + ", but positions are " + positions->getName(), ErrorCodes::LOGICAL_ERROR); +} + + +ColumnWithDictionary::Dictionary::Dictionary(MutableColumnPtr && column_unique_) + : column_unique(std::move(column_unique_)) +{ + checkColumn(*column_unique); +} +ColumnWithDictionary::Dictionary::Dictionary(ColumnPtr column_unique_) + : column_unique(std::move(column_unique_)) +{ + checkColumn(*column_unique); +} + +void ColumnWithDictionary::Dictionary::checkColumn(const IColumn & column) { + if (!dynamic_cast(&column)) + throw Exception("ColumnUnique expected as an argument of ColumnWithDictionary.", ErrorCodes::ILLEGAL_COLUMN); +} + +void ColumnWithDictionary::Dictionary::setShared(const ColumnPtr & dictionary) +{ + checkColumn(*dictionary); + + column_unique = dictionary; + shared = true; +} + +void ColumnWithDictionary::Dictionary::compact(ColumnPtr & positions) +{ + auto new_column_unique = column_unique->cloneEmpty(); + + auto & unique = getColumnUnique(); + auto & new_unique = static_cast(*new_column_unique); + + auto indexes = mapUniqueIndex(positions->assumeMutableRef()); + auto sub_keys = unique.getNestedColumn()->index(*indexes, 0); + auto new_indexes = new_unique.uniqueInsertRangeFrom(*sub_keys, 0, sub_keys->size()); + + positions = (*new_indexes->index(*positions, 0)).mutate(); + column_unique = std::move(new_column_unique); + + shared = false; } } diff --git a/dbms/src/Columns/ColumnWithDictionary.h b/dbms/src/Columns/ColumnWithDictionary.h index 935c4adf156..aaced565e14 100644 --- a/dbms/src/Columns/ColumnWithDictionary.h +++ b/dbms/src/Columns/ColumnWithDictionary.h @@ -18,7 +18,7 @@ class ColumnWithDictionary final : public COWPtrHelper; ColumnWithDictionary(MutableColumnPtr && column_unique, MutableColumnPtr && indexes); - ColumnWithDictionary(const ColumnWithDictionary & other); + ColumnWithDictionary(const ColumnWithDictionary & other) = default; public: /** Create immutable column using immutable arguments. This arguments may be shared with other columns. @@ -37,216 +37,203 @@ public: std::string getName() const override { return "ColumnWithDictionary"; } const char * getFamilyName() const override { return "ColumnWithDictionary"; } - ColumnPtr convertToFullColumn() const { return getUnique()->getNestedColumn()->index(*indexes, 0); } + ColumnPtr convertToFullColumn() const { return getDictionary().getNestedColumn()->index(getIndexes(), 0); } ColumnPtr convertToFullColumnIfWithDictionary() const override { return convertToFullColumn(); } MutableColumnPtr cloneResized(size_t size) const override; - size_t size() const override { return indexes->size(); } + size_t size() const override { return getIndexes().size(); } - Field operator[](size_t n) const override { return (*column_unique)[indexes->getUInt(n)]; } - void get(size_t n, Field & res) const override { column_unique->get(indexes->getUInt(n), res); } + Field operator[](size_t n) const override { return getDictionary()[getIndexes().getUInt(n)]; } + void get(size_t n, Field & res) const override { getDictionary().get(getIndexes().getUInt(n), res); } - StringRef getDataAt(size_t n) const override { return column_unique->getDataAt(indexes->getUInt(n)); } + StringRef getDataAt(size_t n) const override { return getDictionary().getDataAt(getIndexes().getUInt(n)); } StringRef getDataAtWithTerminatingZero(size_t n) const override { - return column_unique->getDataAtWithTerminatingZero(indexes->getUInt(n)); + return getDictionary().getDataAtWithTerminatingZero(getIndexes().getUInt(n)); } - UInt64 get64(size_t n) const override { return column_unique->get64(indexes->getUInt(n)); } - UInt64 getUInt(size_t n) const override { return column_unique->getUInt(indexes->getUInt(n)); } - Int64 getInt(size_t n) const override { return column_unique->getInt(indexes->getUInt(n)); } - bool isNullAt(size_t n) const override { return column_unique->isNullAt(indexes->getUInt(n)); } + UInt64 get64(size_t n) const override { return getDictionary().get64(getIndexes().getUInt(n)); } + UInt64 getUInt(size_t n) const override { return getDictionary().getUInt(getIndexes().getUInt(n)); } + Int64 getInt(size_t n) const override { return getDictionary().getInt(getIndexes().getUInt(n)); } + bool isNullAt(size_t n) const override { return getDictionary().isNullAt(getIndexes().getUInt(n)); } ColumnPtr cut(size_t start, size_t length) const override { - return ColumnWithDictionary::create(column_unique, indexes->cut(start, length)); + return ColumnWithDictionary::create(dictionary.getColumnUniquePtr(), getIndexes().cut(start, length)); } - void insert(const Field & x) override { getIndexes()->insert(Field(UInt64(getUnique()->uniqueInsert(x)))); } + void insert(const Field & x) override; + void insertDefault() override; - void insertFromFullColumn(const IColumn & src, size_t n) - { - getIndexes()->insert(getUnique()->uniqueInsertFrom(src, n)); - } - void insertFrom(const IColumn & src, size_t n) override - { - if (!typeid_cast(&src)) - throw Exception("Expected ColumnWithDictionary, got" + src.getName(), ErrorCodes::ILLEGAL_COLUMN); + void insertFrom(const IColumn & src, size_t n) override; + void insertFromFullColumn(const IColumn & src, size_t n); - auto & src_with_dict = static_cast(src); - size_t idx = src_with_dict.getIndexes()->getUInt(n); - insertFromFullColumn(*src_with_dict.getUnique()->getNestedColumn(), idx); - } + void insertRangeFrom(const IColumn & src, size_t start, size_t length) override; + void insertRangeFromFullColumn(const IColumn & src, size_t start, size_t length); + void insertRangeFromDictionaryEncodedColumn(const IColumn & keys, const IColumn & positions); - void insertRangeFromFullColumn(const IColumn & src, size_t start, size_t length) - { - auto inserted_indexes = getUnique()->uniqueInsertRangeFrom(src, start, length); - getIndexes()->insertRangeFrom(*inserted_indexes, 0, length); - } - void insertRangeFrom(const IColumn & src, size_t start, size_t length) override - { - if (!typeid_cast(&src)) - throw Exception("Expected ColumnWithDictionary, got" + src.getName(), ErrorCodes::ILLEGAL_COLUMN); + void insertData(const char * pos, size_t length) override; + void insertDataWithTerminatingZero(const char * pos, size_t length) override; - auto & src_with_dict = static_cast(src); - /// TODO: Support native insertion from other unique column. It will help to avoid null map creation. - auto src_nested = src_with_dict.getUnique()->getNestedColumn(); - auto inserted_idx = getUnique()->uniqueInsertRangeFrom(*src_nested, 0, src_nested->size()); - auto idx = inserted_idx->index(*src_with_dict.getIndexes()->cut(start, length), 0); - getIndexes()->insertRangeFrom(*idx, 0, length); - } - void insertData(const char * pos, size_t length) override - { - getIndexes()->insert(Field(UInt64(getUnique()->uniqueInsertData(pos, length)))); - } + void popBack(size_t n) override { idx.popBack(n); } - void insertDataWithTerminatingZero(const char * pos, size_t length) override - { - getIndexes()->insert(Field(UInt64(getUnique()->uniqueInsertDataWithTerminatingZero(pos, length)))); - } + StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; - void insertDefault() override - { - getIndexes()->insert(getUnique()->getDefaultValueIndex()); - } - - void popBack(size_t n) override { getIndexes()->popBack(n); } - - StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override - { - return getUnique()->serializeValueIntoArena(indexes->getUInt(n), arena, begin); - } - - const char * deserializeAndInsertFromArena(const char * pos) override - { - const char * new_pos; - getIndexes()->insert(getUnique()->uniqueDeserializeAndInsertFromArena(pos, new_pos)); - return new_pos; - } + const char * deserializeAndInsertFromArena(const char * pos) override; void updateHashWithValue(size_t n, SipHash & hash) const override { - return getUnique()->updateHashWithValue(indexes->getUInt(n), hash); + return getDictionary().updateHashWithValue(getIndexes().getUInt(n), hash); } ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override { - return ColumnWithDictionary::create(column_unique, indexes->filter(filt, result_size_hint)); + return ColumnWithDictionary::create(dictionary.getColumnUniquePtr(), getIndexes().filter(filt, result_size_hint)); } ColumnPtr permute(const Permutation & perm, size_t limit) const override { - return ColumnWithDictionary::create(column_unique, indexes->permute(perm, limit)); + return ColumnWithDictionary::create(dictionary.getColumnUniquePtr(), getIndexes().permute(perm, limit)); } ColumnPtr index(const IColumn & indexes_, size_t limit) const override { - return ColumnWithDictionary::create(column_unique, indexes->index(indexes_, limit)); + return ColumnWithDictionary::create(dictionary.getColumnUniquePtr(), getIndexes().index(indexes_, limit)); } - int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override - { - const auto & column_with_dictionary = static_cast(rhs); - size_t n_index = indexes->getUInt(n); - size_t m_index = column_with_dictionary.indexes->getUInt(m); - return getUnique()->compareAt(n_index, m_index, *column_with_dictionary.column_unique, nan_direction_hint); - } + int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override; - void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override - { - if (limit == 0) - limit = size(); - - size_t unique_limit = std::min(limit, getUnique()->size()); - Permutation unique_perm; - getUnique()->getNestedColumn()->getPermutation(reverse, unique_limit, nan_direction_hint, unique_perm); - - /// TODO: optimize with sse. - - /// Get indexes per row in column_unique. - std::vector> indexes_per_row(getUnique()->size()); - size_t indexes_size = indexes->size(); - for (size_t row = 0; row < indexes_size; ++row) - indexes_per_row[indexes->getUInt(row)].push_back(row); - - /// Replicate permutation. - size_t perm_size = std::min(indexes_size, limit); - res.resize(perm_size); - size_t perm_index = 0; - for (size_t row = 0; row < indexes_size && perm_index < perm_size; ++row) - { - const auto & row_indexes = indexes_per_row[unique_perm[row]]; - for (auto row_index : row_indexes) - { - res[perm_index] = row_index; - ++perm_index; - - if (perm_index == perm_size) - break; - } - } - } + void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override; ColumnPtr replicate(const Offsets & offsets) const override { - return ColumnWithDictionary::create(column_unique, indexes->replicate(offsets)); + return ColumnWithDictionary::create(dictionary.getColumnUniquePtr(), getIndexes().replicate(offsets)); } - std::vector scatter(ColumnIndex num_columns, const Selector & selector) const override - { - auto columns = indexes->scatter(num_columns, selector); - for (auto & column : columns) - { - auto unique_ptr = column_unique; - column = ColumnWithDictionary::create((*std::move(unique_ptr)).mutate(), std::move(column)); - } - - return columns; - } + std::vector scatter(ColumnIndex num_columns, const Selector & selector) const override; void gather(ColumnGathererStream & gatherer_stream) override ; - void getExtremes(Field & min, Field & max) const override { return column_unique->getExtremes(min, max); } + void getExtremes(Field & min, Field & max) const override { + return getDictionary().index(getIndexes(), 0)->getExtremes(min, max); /// TODO: optimize + } - void reserve(size_t n) override { getIndexes()->reserve(n); } + void reserve(size_t n) override { idx.reserve(n); } - size_t byteSize() const override { return indexes->byteSize() + column_unique->byteSize(); } - size_t allocatedBytes() const override { return indexes->allocatedBytes() + column_unique->allocatedBytes(); } + size_t byteSize() const override { return idx.getPositions()->byteSize() + getDictionary().byteSize(); } + size_t allocatedBytes() const override { return idx.getPositions()->allocatedBytes() + getDictionary().allocatedBytes(); } void forEachSubcolumn(ColumnCallback callback) override { - callback(column_unique); - callback(indexes); + callback(idx.getPositionsPtr()); + + /// Column doesn't own dictionary if it's shared. + if (!dictionary.isShared()) + callback(dictionary.getColumnUniquePtr()); } - bool valuesHaveFixedSize() const override { return column_unique->valuesHaveFixedSize(); } - bool isFixedAndContiguous() const override { return column_unique->isFixedAndContiguous(); } - size_t sizeOfValueIfFixed() const override { return column_unique->sizeOfValueIfFixed(); } - bool isNumeric() const override { return column_unique->isNumeric(); } + bool valuesHaveFixedSize() const override { return getDictionary().valuesHaveFixedSize(); } + bool isFixedAndContiguous() const override { return getDictionary().isFixedAndContiguous(); } + size_t sizeOfValueIfFixed() const override { return getDictionary().sizeOfValueIfFixed(); } + bool isNumeric() const override { return getDictionary().isNumeric(); } - IColumnUnique * getUnique() { return static_cast(column_unique->assumeMutable().get()); } - const IColumnUnique * getUnique() const { return static_cast(column_unique->assumeMutable().get()); } - ColumnPtr getUniquePtr() const { return column_unique; } + const IColumnUnique & getDictionary() const { return dictionary.getColumnUnique(); } + /// IColumnUnique & getUnique() { return static_cast(*column_unique->assumeMutable()); } + /// ColumnPtr getUniquePtr() const { return column_unique; } - IColumn * getIndexes() { return indexes->assumeMutable().get(); } - const IColumn * getIndexes() const { return indexes.get(); } - const ColumnPtr & getIndexesPtr() const { return indexes; } + /// IColumn & getIndexes() { return idx.getPositions()->assumeMutableRef(); } + const IColumn & getIndexes() const { return *idx.getPositions(); } + const ColumnPtr & getIndexesPtr() const { return idx.getPositions(); } - void setIndexes(MutableColumnPtr && indexes_) { indexes = std::move(indexes_); } - void setUnique(const ColumnPtr & unique) { column_unique = unique; } + ///void setIndexes(MutableColumnPtr && indexes_) { indexes = std::move(indexes_); } + + /// Set shared ColumnUnique for empty column with dictionary. + void setSharedDictionary(const ColumnPtr & column_unique); + + /// Create column new dictionary with only keys that are mentioned in index. + MutablePtr compact(); + + /// Cut + compact. + MutablePtr cutAndCompact(size_t start, size_t length) const; bool withDictionary() const override { return true; } + class Index + { + public: + Index(); + Index(const Index & other) = default; + explicit Index(MutableColumnPtr && positions); + explicit Index(ColumnPtr positions); + + const ColumnPtr & getPositions() const { return positions; } + ColumnPtr & getPositionsPtr() { return positions; } + void insertPosition(UInt64 position); + void insertPositionsRange(const IColumn & column, size_t offset, size_t limit); + + void popBack(size_t n) { positions->assumeMutableRef().popBack(n); } + void reserve(size_t n) { positions->assumeMutableRef().reserve(n); } + + UInt64 getMaxPositionForCurrentType() const; + + static size_t getSizeOfIndexType(const IColumn & column, size_t hint); + + void check(size_t max_dictionary_size); + void checkSizeOfType(); + + ColumnPtr detachPositions() { return std::move(positions); } + void attachPositions(ColumnPtr positions_); + + private: + ColumnPtr positions; + size_t size_of_type = 0; + + void updateSizeOfType() { size_of_type = getSizeOfIndexType(*positions, size_of_type); } + void expandType(); + + template + typename ColumnVector::Container & getPositionsData(); + + template + void convertPositions(); + + template + static void callForType(Callback && callback, size_t size_of_type); + }; + private: - ColumnPtr column_unique; - ColumnPtr indexes; + class Dictionary + { + public: + Dictionary(const Dictionary & other) = default; + explicit Dictionary(MutableColumnPtr && column_unique); + explicit Dictionary(ColumnPtr column_unique); - size_t getSizeOfCurrentIndexType() const; + const ColumnPtr & getColumnUniquePtr() const { return column_unique; } + ColumnPtr & getColumnUniquePtr() { return column_unique; } - template - void convertIndexes(); - void insertIndex(size_t value); - void insertIndexesRange(const ColumnPtr & column); + const IColumnUnique & getColumnUnique() const { return static_cast(*column_unique); } + IColumnUnique & getColumnUnique() { return static_cast(column_unique->assumeMutableRef()); } + /// Dictionary may be shared for several mutable columns. + /// Immutable columns may have the same column unique, which isn't necessarily shared dictionary. + void setShared(const ColumnPtr & dictionary); + bool isShared() const { return shared; } + + /// Create new dictionary with only keys that are mentioned in positions. + void compact(ColumnPtr & positions); + + private: + ColumnPtr column_unique; + bool shared = false; + + void checkColumn(const IColumn & column); + }; + + Dictionary dictionary; + Index idx; + + void compactInplace(); + void compactIfSharedDictionary(); }; diff --git a/dbms/src/Columns/IColumnUnique.h b/dbms/src/Columns/IColumnUnique.h index aedaf3fed1f..44d655f2aab 100644 --- a/dbms/src/Columns/IColumnUnique.h +++ b/dbms/src/Columns/IColumnUnique.h @@ -12,7 +12,7 @@ public: /// Column always contains Null if it's Nullable and empty string if it's String or Nullable(String). /// So, size may be greater than the number of inserted unique values. - virtual ColumnPtr getNestedColumn() const = 0; + virtual const ColumnPtr & getNestedColumn() const = 0; /// The same as getNestedColumn, but removes null map if nested column is nullable. virtual const ColumnPtr & getNestedNotNullableColumn() const = 0; @@ -50,21 +50,6 @@ public: virtual size_t uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) = 0; - /// Column which contains the set of necessary for serialization keys. Such that empty column after - /// uniqueInsertRangeFrom(column->cut(offset, limit), 0, limit) call will contain the same set of keys. - struct SerializableState - { - ColumnPtr column; - size_t offset; - size_t limit; - }; - - virtual SerializableState getSerializableState() const = 0; - -// virtual MutableColumnPtr getInsertionPoints(const ColumnPtr & keys) const = 0; -// -// virtual bool has(const char * pos, size_t length) const { return getInsertionPoint(pos, length) != size(); } - const char * getFamilyName() const override { return "ColumnUnique"; } void insert(const Field &) override diff --git a/dbms/src/DataTypes/DataTypeWithDictionary.cpp b/dbms/src/DataTypes/DataTypeWithDictionary.cpp index 178404a7368..5259718c9b0 100644 --- a/dbms/src/DataTypes/DataTypeWithDictionary.cpp +++ b/dbms/src/DataTypes/DataTypeWithDictionary.cpp @@ -34,13 +34,9 @@ namespace } } -DataTypeWithDictionary::DataTypeWithDictionary(DataTypePtr dictionary_type_, DataTypePtr indexes_type_) - : dictionary_type(std::move(dictionary_type_)), indexes_type(std::move(indexes_type_)) +DataTypeWithDictionary::DataTypeWithDictionary(DataTypePtr dictionary_type_) + : dictionary_type(std::move(dictionary_type_)) { - if (!indexes_type->isUnsignedInteger()) - throw Exception("Index type of DataTypeWithDictionary must be unsigned integer, but got " - + indexes_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - auto inner_type = dictionary_type; if (dictionary_type->isNullable()) inner_type = static_cast(*dictionary_type).getNestedType(); @@ -57,7 +53,7 @@ void DataTypeWithDictionary::enumerateStreams(const StreamCallback & callback, S path.push_back(Substream::DictionaryKeys); dictionary_type->enumerateStreams(callback, path); path.back() = Substream::DictionaryIndexes; - indexes_type->enumerateStreams(callback, path); + callback(path); path.pop_back(); } @@ -147,20 +143,20 @@ struct IndexesSerializationType type = static_cast(resetFlags(val)); } - IndexesSerializationType(const IDataType & data_type, bool has_additional_keys, bool need_global_dictionary) + IndexesSerializationType(const IColumn & column, bool has_additional_keys, bool need_global_dictionary) : has_additional_keys(has_additional_keys), need_global_dictionary(need_global_dictionary) { - if (typeid_cast(&data_type)) + if (typeid_cast(&column)) type = TUInt8; - else if (typeid_cast(&data_type)) + else if (typeid_cast(&column)) type = TUInt16; - else if (typeid_cast(&data_type)) + else if (typeid_cast(&column)) type = TUInt32; - else if (typeid_cast(&data_type)) + else if (typeid_cast(&column)) type = TUInt64; else - throw Exception("Invalid DataType for IndexesSerializationType. Expected UInt*, got " + data_type.getName(), - ErrorCodes::LOGICAL_ERROR); + throw Exception("Invalid Indexes column for IndexesSerializationType. Expected ColumnUInt*, got " + + column.getName(), ErrorCodes::LOGICAL_ERROR); } DataTypePtr getDataType() const @@ -196,10 +192,9 @@ struct DeserializeStateWithDictionary : public IDataType::DeserializeBinaryBulkS { KeysSerializationVersion key_version; ColumnUniquePtr global_dictionary; - UInt64 num_bytes_in_dictionary; IndexesSerializationType index_type; - MutableColumnPtr additional_keys; + ColumnPtr additional_keys; UInt64 num_pending_rows = 0; explicit DeserializeStateWithDictionary(UInt64 key_version) : key_version(key_version) {} @@ -252,7 +247,7 @@ void DataTypeWithDictionary::serializeBinaryBulkStatePrefix( writeIntBinary(key_version, *stream); - auto column_unique = createColumnUnique(*dictionary_type, *indexes_type); + auto column_unique = createColumnUnique(*dictionary_type); state = std::make_shared(key_version, std::move(column_unique)); } @@ -263,24 +258,21 @@ void DataTypeWithDictionary::serializeBinaryBulkStateSuffix( auto * state_with_dictionary = checkAndGetWithDictionarySerializeState(state); KeysSerializationVersion::checkVersion(state_with_dictionary->key_version.value); - if (state_with_dictionary->global_dictionary) + if (state_with_dictionary->global_dictionary && settings.max_dictionary_size) { - auto unique_state = state_with_dictionary->global_dictionary->getSerializableState(); - UInt64 num_keys = unique_state.limit; - if (settings.max_dictionary_size) - { - settings.path.push_back(Substream::DictionaryKeys); - auto * stream = settings.getter(settings.path); - settings.path.pop_back(); + auto nested_column = state_with_dictionary->global_dictionary->getNestedNotNullableColumn(); - if (!stream) - throw Exception("Got empty stream in DataTypeWithDictionary::serializeBinaryBulkStateSuffix", - ErrorCodes::LOGICAL_ERROR); + settings.path.push_back(Substream::DictionaryKeys); + auto * stream = settings.getter(settings.path); + settings.path.pop_back(); - writeIntBinary(num_keys, *stream); - removeNullable(dictionary_type)->serializeBinaryBulk(*unique_state.column, *stream, - unique_state.offset, unique_state.limit); - } + if (!stream) + throw Exception("Got empty stream in DataTypeWithDictionary::serializeBinaryBulkStateSuffix", + ErrorCodes::LOGICAL_ERROR); + + UInt64 num_keys = nested_column->size(); + writeIntBinary(num_keys, *stream); + removeNullable(dictionary_type)->serializeBinaryBulk(*nested_column, *stream, 0, num_keys); } } @@ -314,79 +306,76 @@ namespace return nullptr; } - template - MutableColumnPtr mapUniqueIndexImpl(PaddedPODArray & index) + struct IndexMapsWithAdditionalKeys { - HashMap hash_map; - for (auto val : index) - hash_map.insert({val, hash_map.size()}); - - auto res_col = ColumnVector::create(); - auto & data = res_col->getData(); - - data.resize(hash_map.size()); - for (auto val : hash_map) - data[val.second] = val.first; - - for (auto & ind : index) - ind = hash_map[ind]; - - return std::move(res_col); - } - - /// Returns unique values of column. Write new index to column. - MutableColumnPtr mapUniqueIndex(IColumn & column) - { - if (auto * data_uint8 = getIndexesData(column)) - return mapUniqueIndexImpl(*data_uint8); - else if (auto * data_uint16 = getIndexesData(column)) - return mapUniqueIndexImpl(*data_uint16); - else if (auto * data_uint32 = getIndexesData(column)) - return mapUniqueIndexImpl(*data_uint32); - else if (auto * data_uint64 = getIndexesData(column)) - return mapUniqueIndexImpl(*data_uint64); - else - throw Exception("Indexes column for getUniqueIndex must be ColumnUInt, got" + column.getName(), - ErrorCodes::LOGICAL_ERROR); - } + MutableColumnPtr dictionary_map; + MutableColumnPtr additional_keys_map; + }; template - MutableColumnPtr mapIndexWithOverflow(PaddedPODArray & index, size_t max_val) + IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeys(PaddedPODArray & index, size_t dict_size) { - HashMap hash_map; + PaddedPODArray copy(index.cbegin(), index.cend()); + + HashMap dict_map; + HashMap add_keys_map; for (auto val : index) { - if (val < max_val) - hash_map.insert({val, hash_map.size()}); + if (val < dict_size) + dict_map.insert({val, dict_map.size()}); + else + add_keys_map.insert({val, add_keys_map.size()}); } - auto index_map_col = ColumnVector::create(); - auto & index_data = index_map_col->getData(); + auto dictionary_map = ColumnVector::create(dict_map.size()); + auto additional_keys_map = ColumnVector::create(add_keys_map.size()); + auto & dict_data = dictionary_map->getData(); + auto & add_keys_data = additional_keys_map->getData(); - index_data.resize(hash_map.size()); - for (auto val : hash_map) - index_data[val.second] = val.first; + for (auto val : dict_map) + dict_data[val.second] = val.first; + + for (auto val : add_keys_map) + add_keys_data[val.second] = val.first - dict_size; for (auto & val : index) - val = val < max_val ? hash_map[val] - : val - max_val + hash_map.size(); + val = val < dict_size ? dict_map[val] + : add_keys_map[val] + dict_map.size(); - return index_map_col; + for (size_t i = 0; i < index.size(); ++i) + { + T expected = index[i] < dict_data.size() ? dict_data[index[i]] + : add_keys_data[index[i] - dict_data.size()] + dict_size; + if (expected != copy[i]) + throw Exception("Expected " + toString(expected) + ", but got " + toString(copy[i]), ErrorCodes::LOGICAL_ERROR); + + } + + return {std::move(dictionary_map), std::move(additional_keys_map)}; } - MutableColumnPtr mapIndexWithOverflow(IColumn & column, size_t max_size) + /// Update column and return map with old indexes. + /// Let N is the number of distinct values which are less than max_size; + /// old_column - column before function call; + /// new_column - column after function call; + /// map - function result (map.size() is N): + /// * if old_column[i] < max_size, than + /// map[new_column[i]] = old_column[i] + /// * else + /// new_column[i] = old_column[i] - max_size + N + IndexMapsWithAdditionalKeys mapIndexWithAdditionalKeys(IColumn & column, size_t dict_size) { if (auto * data_uint8 = getIndexesData(column)) - return mapIndexWithOverflow(*data_uint8, max_size); + return mapIndexWithAdditionalKeys(*data_uint8, dict_size); else if (auto * data_uint16 = getIndexesData(column)) - return mapIndexWithOverflow(*data_uint16, max_size); + return mapIndexWithAdditionalKeys(*data_uint16, dict_size); else if (auto * data_uint32 = getIndexesData(column)) - return mapIndexWithOverflow(*data_uint32, max_size); + return mapIndexWithAdditionalKeys(*data_uint32, dict_size); else if (auto * data_uint64 = getIndexesData(column)) - return mapIndexWithOverflow(*data_uint64, max_size); + return mapIndexWithAdditionalKeys(*data_uint64, dict_size); else - throw Exception("Indexes column for makeIndexWithOverflow must be ColumnUInt, got" + column.getName(), + throw Exception("Indexes column for mapIndexWithAdditionalKeys must be UInt, got" + column.getName(), ErrorCodes::LOGICAL_ERROR); } } @@ -419,57 +408,65 @@ void DataTypeWithDictionary::serializeBinaryBulkWithMultipleStreams( auto & global_dictionary = state_with_dictionary->global_dictionary; KeysSerializationVersion::checkVersion(state_with_dictionary->key_version.value); - auto unique_state = global_dictionary->getSerializableState(); - bool was_global_dictionary_written = unique_state.limit >= settings.max_dictionary_size; - - const auto & indexes = column_with_dictionary.getIndexesPtr(); - const auto & keys = column_with_dictionary.getUnique()->getSerializableState().column; - size_t max_limit = column.size() - offset; limit = limit ? std::min(limit, max_limit) : max_limit; - /// Create pair (used_keys, sub_index) which is the dictionary for [offset, offset + limit) range. - MutableColumnPtr sub_index = (*indexes->cut(offset, limit)).mutate(); - auto unique_indexes = mapUniqueIndex(*sub_index); - /// unique_indexes->index(*sub_index) == indexes[offset:offset + limit] - MutableColumnPtr used_keys = (*keys->index(*unique_indexes, 0)).mutate(); + auto sub_column = column_with_dictionary.cutAndCompact(offset, limit); + ColumnPtr positions = sub_column->getIndexesPtr(); + ColumnPtr keys = sub_column->getDictionary().getNestedColumn(); if (settings.max_dictionary_size) { /// Insert used_keys into global dictionary and update sub_index. - auto indexes_with_overflow = global_dictionary->uniqueInsertRangeWithOverflow(*used_keys, 0, used_keys->size(), + auto indexes_with_overflow = global_dictionary->uniqueInsertRangeWithOverflow(*keys, 0, keys->size(), settings.max_dictionary_size); - sub_index = (*indexes_with_overflow.indexes->index(*sub_index, 0)).mutate(); - used_keys = std::move(indexes_with_overflow.overflowed_keys); + size_t max_size = settings.max_dictionary_size + indexes_with_overflow.overflowed_keys->size(); + ColumnWithDictionary::Index(indexes_with_overflow.indexes->getPtr()).check(max_size); + + if (global_dictionary->size() > settings.max_dictionary_size) + throw Exception("Got dictionary with size " + toString(global_dictionary->size()) + + " but max dictionary size is " + toString(settings.max_dictionary_size), + ErrorCodes::LOGICAL_ERROR); + + positions = indexes_with_overflow.indexes->index(*positions, 0); + keys = std::move(indexes_with_overflow.overflowed_keys); + + if (global_dictionary->size() < settings.max_dictionary_size && !keys->empty()) + throw Exception("Has additional keys, but dict size is " + toString(global_dictionary->size()) + + " which is less then max dictionary size (" + toString(settings.max_dictionary_size) + ")", + ErrorCodes::LOGICAL_ERROR); } - bool need_additional_keys = !used_keys->empty(); + if (auto nullable_keys = typeid_cast(keys.get())) + keys = nullable_keys->getNestedColumnPtr(); + + bool need_additional_keys = !keys->empty(); bool need_dictionary = settings.max_dictionary_size != 0; - bool need_write_dictionary = !was_global_dictionary_written && unique_state.limit >= settings.max_dictionary_size; + bool need_write_dictionary = !settings.use_single_dictionary_for_part + && global_dictionary->size() >= settings.max_dictionary_size; - IndexesSerializationType index_version(*indexes_type, need_additional_keys, need_dictionary); + IndexesSerializationType index_version(*positions, need_additional_keys, need_dictionary); index_version.serialize(*indexes_stream); - unique_state = global_dictionary->getSerializableState(); - if (need_write_dictionary) { - /// Write global dictionary if it wasn't written and has too many keys. - UInt64 num_keys = unique_state.limit; + const auto & nested_column = global_dictionary->getNestedNotNullableColumn(); + UInt64 num_keys = nested_column->size(); writeIntBinary(num_keys, *keys_stream); - removeNullable(dictionary_type)->serializeBinaryBulk(*unique_state.column, *keys_stream, unique_state.offset, num_keys); + removeNullable(dictionary_type)->serializeBinaryBulk(*nested_column, *keys_stream, 0, num_keys); + state_with_dictionary->global_dictionary = createColumnUnique(*dictionary_type); } if (need_additional_keys) { - UInt64 num_keys = used_keys->size(); + UInt64 num_keys = keys->size(); writeIntBinary(num_keys, *indexes_stream); - removeNullable(dictionary_type)->serializeBinaryBulk(*used_keys, *indexes_stream, 0, num_keys); + removeNullable(dictionary_type)->serializeBinaryBulk(*keys, *indexes_stream, 0, num_keys); } - UInt64 num_rows = sub_index->size(); + UInt64 num_rows = positions->size(); writeIntBinary(num_rows, *indexes_stream); - indexes_type->serializeBinaryBulk(*sub_index, *indexes_stream, 0, num_rows); + index_version.getDataType()->serializeBinaryBulk(*positions, *indexes_stream, 0, num_rows); } void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams( @@ -507,8 +504,7 @@ void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams( auto global_dict_keys = keys_type->createColumn(); keys_type->deserializeBinaryBulk(*global_dict_keys, *keys_stream, num_keys, 0); - auto column_unique = createColumnUnique(*dictionary_type, *indexes_type); - column_unique->uniqueInsertRangeFrom(*global_dict_keys, 0, num_keys); + auto column_unique = createColumnUnique(*dictionary_type, std::move(global_dict_keys)); state_with_dictionary->global_dictionary = std::move(column_unique); }; @@ -517,61 +513,60 @@ void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams( UInt64 num_keys; readIntBinary(num_keys, *indexes_stream); auto keys_type = removeNullable(dictionary_type); - state_with_dictionary->additional_keys = keys_type->createColumn(); - keys_type->deserializeBinaryBulk(*state_with_dictionary->additional_keys, *indexes_stream, num_keys, 0); + auto additional_keys = keys_type->createColumn(); + keys_type->deserializeBinaryBulk(*additional_keys, *indexes_stream, num_keys, 0); + state_with_dictionary->additional_keys = std::move(additional_keys); }; - auto readIndexes = [this, state_with_dictionary, indexes_stream, &column_with_dictionary](UInt64 num_rows, - bool need_dictionary) + auto readIndexes = [this, state_with_dictionary, indexes_stream, &column_with_dictionary](UInt64 num_rows) { + auto indexes_type = state_with_dictionary->index_type.getDataType(); MutableColumnPtr indexes_column = indexes_type->createColumn(); indexes_type->deserializeBinaryBulk(*indexes_column, *indexes_stream, num_rows, 0); auto & global_dictionary = state_with_dictionary->global_dictionary; const auto & additional_keys = state_with_dictionary->additional_keys; - auto * column_unique = column_with_dictionary.getUnique(); - bool has_additional_keys = state_with_dictionary->additional_keys != nullptr; + bool has_additional_keys = state_with_dictionary->index_type.has_additional_keys; bool column_is_empty = column_with_dictionary.empty(); - bool column_with_global_dictionary = column_unique == global_dictionary.get(); - if (!has_additional_keys && (column_is_empty || column_with_global_dictionary)) + if (!state_with_dictionary->index_type.need_global_dictionary) + { + column_with_dictionary.insertRangeFromDictionaryEncodedColumn(*additional_keys, *indexes_column); + } + else if (!has_additional_keys) { if (column_is_empty) - column_with_dictionary.setUnique(global_dictionary); + column_with_dictionary.setSharedDictionary(global_dictionary); - column_with_dictionary.getIndexes()->insertRangeFrom(*indexes_column, 0, num_rows); - } - else if (!need_dictionary) - { - auto indexes = column_unique->uniqueInsertRangeFrom(*additional_keys, 0, additional_keys->size()); - column_with_dictionary.getIndexes()->insertRangeFrom(*indexes->index(*indexes_column, 0), 0, num_rows); + auto local_column = ColumnWithDictionary::create(global_dictionary, std::move(indexes_column)); + column_with_dictionary.insertRangeFrom(*local_column, 0, num_rows); } else { - if (column_with_global_dictionary) + auto maps = mapIndexWithAdditionalKeys(*indexes_column, global_dictionary->size()); + + ColumnWithDictionary::Index(maps.additional_keys_map->getPtr()).check(additional_keys->size()); + + ColumnWithDictionary::Index(indexes_column->getPtr()).check( + maps.dictionary_map->size() + maps.additional_keys_map->size()); + + auto used_keys = (*std::move(global_dictionary->getNestedColumn()->index(*maps.dictionary_map, 0))).mutate(); + + if (!maps.additional_keys_map->empty()) { - auto unique_indexes = mapUniqueIndex(*column_with_dictionary.getIndexes()); - auto sub_keys = column_with_dictionary.getUnique()->getNestedColumn()->index(*unique_indexes, 0); - auto new_unique = createColumnUnique(*dictionary_type, *indexes_type); - auto new_idx = new_unique->uniqueInsertRangeFrom(*sub_keys, 0, sub_keys->size()); - column_with_dictionary.setUnique(std::move(new_unique)); - column_with_dictionary.setIndexes((*(new_idx->index(*column_with_dictionary.getIndexes(), 0))).mutate()); - column_unique = column_with_dictionary.getUnique(); + auto used_add_keys = additional_keys->index(*maps.additional_keys_map, 0); + + if (dictionary_type->isNullable()) + { + ColumnPtr null_map = ColumnUInt8::create(used_add_keys->size(), 0); + used_add_keys = ColumnNullable::create(used_add_keys, null_map); + } + + used_keys->insertRangeFrom(*used_add_keys, 0, used_add_keys->size()); } - auto index_map = mapIndexWithOverflow(*indexes_column, global_dictionary->size()); - auto used_keys = global_dictionary->getNestedColumn()->index(*index_map, 0); - auto indexes = column_unique->uniqueInsertRangeFrom(*used_keys, 0, used_keys->size()); - - if (additional_keys) - { - size_t num_keys = additional_keys->size(); - auto additional_indexes = column_unique->uniqueInsertRangeFrom(*additional_keys, 0, num_keys); - indexes->insertRangeFrom(*additional_indexes, 0, num_keys); - } - - column_with_dictionary.getIndexes()->insertRangeFrom(*indexes->index(*indexes_column, 0), 0, num_rows); + column_with_dictionary.insertRangeFromDictionaryEncodedColumn(*used_keys, *indexes_column); } }; @@ -596,7 +591,7 @@ void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams( } size_t num_rows_to_read = std::min(limit, state_with_dictionary->num_pending_rows); - readIndexes(num_rows_to_read, state_with_dictionary->index_type.need_global_dictionary); + readIndexes(num_rows_to_read); limit -= num_rows_to_read; state_with_dictionary->num_pending_rows -= num_rows_to_read; } @@ -617,8 +612,8 @@ void DataTypeWithDictionary::serializeImpl( DataTypeWithDictionary::SerealizeFunctionPtr func, Args & ... args) const { auto & column_with_dictionary = getColumnWithDictionary(column); - size_t unique_row_number = column_with_dictionary.getIndexes()->getUInt(row_num); - (dictionary_type.get()->*func)(*column_with_dictionary.getUnique()->getNestedColumn(), unique_row_number, ostr, std::forward(args)...); + size_t unique_row_number = column_with_dictionary.getIndexes().getUInt(row_num); + (dictionary_type.get()->*func)(*column_with_dictionary.getDictionary().getNestedColumn(), unique_row_number, ostr, std::forward(args)...); } template @@ -627,77 +622,56 @@ void DataTypeWithDictionary::deserializeImpl( DataTypeWithDictionary::DeserealizeFunctionPtr func, Args & ... args) const { auto & column_with_dictionary = getColumnWithDictionary(column); - auto temp_column = column_with_dictionary.getUnique()->cloneEmpty(); + auto temp_column = column_with_dictionary.getDictionary().cloneEmpty(); (dictionary_type.get()->*func)(*temp_column, istr, std::forward(args)...); column_with_dictionary.insertFromFullColumn(*temp_column, 0); } -template -MutableColumnUniquePtr DataTypeWithDictionary::createColumnUniqueImpl(const IDataType & keys_type) +namespace { - return ColumnUnique::create(keys_type); + template + struct CreateColumnVector + { + MutableColumnUniquePtr & column; + const IDataType & keys_type; + const Creator & creator; + + CreateColumnVector(MutableColumnUniquePtr & column, const IDataType & keys_type, const Creator & creator) + : column(column), keys_type(keys_type), creator(creator) + { + } + + template + void operator()() + { + if (typeid_cast *>(&keys_type)) + column = creator((ColumnVector *)(nullptr)); + } + }; } -template +template MutableColumnUniquePtr DataTypeWithDictionary::createColumnUniqueImpl(const IDataType & keys_type, - const IDataType & indexes_type) -{ - if (typeid_cast(&indexes_type)) - return createColumnUniqueImpl(keys_type); - if (typeid_cast(&indexes_type)) - return createColumnUniqueImpl(keys_type); - if (typeid_cast(&indexes_type)) - return createColumnUniqueImpl(keys_type); - if (typeid_cast(&indexes_type)) - return createColumnUniqueImpl(keys_type); - - throw Exception("The type of indexes must be unsigned integer, but got " + indexes_type.getName(), - ErrorCodes::LOGICAL_ERROR); -} - -struct CreateColumnVector -{ - MutableColumnUniquePtr & column; - const IDataType & keys_type; - const IDataType & indexes_type; - const IDataType * nested_type; - - CreateColumnVector(MutableColumnUniquePtr & column, const IDataType & keys_type, const IDataType & indexes_type) - : column(column), keys_type(keys_type), indexes_type(indexes_type), nested_type(&keys_type) - { - if (auto nullable_type = typeid_cast(&keys_type)) - nested_type = nullable_type->getNestedType().get(); - } - - template - void operator()() - { - if (typeid_cast *>(nested_type)) - column = DataTypeWithDictionary::createColumnUniqueImpl>(keys_type, indexes_type); - } -}; - -MutableColumnUniquePtr DataTypeWithDictionary::createColumnUnique(const IDataType & keys_type, - const IDataType & indexes_type) + const Creator & creator) { auto * type = &keys_type; - if (type->isNullable()) - type = static_cast(keys_type).getNestedType().get(); + if (auto * nullable_type = typeid_cast(&keys_type)) + type = nullable_type->getNestedType().get(); if (type->isString()) - return createColumnUniqueImpl(keys_type, indexes_type); + return creator((ColumnString *)(nullptr)); if (type->isFixedString()) - return createColumnUniqueImpl(keys_type, indexes_type); + return creator((ColumnFixedString *)(nullptr)); if (typeid_cast(type)) - return createColumnUniqueImpl>(keys_type, indexes_type); + return creator((ColumnVector *)(nullptr)); if (typeid_cast(type)) - return createColumnUniqueImpl>(keys_type, indexes_type); + return creator((ColumnVector *)(nullptr)); if (type->isNumber()) { MutableColumnUniquePtr column; - TypeListNumbers::forEach(CreateColumnVector(column, keys_type, indexes_type)); + TypeListNumbers::forEach(CreateColumnVector(column, *type, creator)); if (!column) throw Exception("Unexpected numeric type: " + type->getName(), ErrorCodes::LOGICAL_ERROR); @@ -709,10 +683,31 @@ MutableColumnUniquePtr DataTypeWithDictionary::createColumnUnique(const IDataTyp ErrorCodes::LOGICAL_ERROR); } + +MutableColumnUniquePtr DataTypeWithDictionary::createColumnUnique(const IDataType & keys_type) +{ + auto creator = [&](auto x) + { + using ColumnType = typename std::remove_pointer::type; + return ColumnUnique::create(keys_type); + }; + return createColumnUniqueImpl(keys_type, creator); +} + +MutableColumnUniquePtr DataTypeWithDictionary::createColumnUnique(const IDataType & keys_type, MutableColumnPtr && keys) +{ + auto creator = [&](auto x) + { + using ColumnType = typename std::remove_pointer::type; + return ColumnUnique::create(std::move(keys), keys_type.isNullable()); + }; + return createColumnUniqueImpl(keys_type, creator); +} + MutableColumnPtr DataTypeWithDictionary::createColumn() const { - MutableColumnPtr indexes = indexes_type->createColumn(); - MutableColumnPtr dictionary = createColumnUnique(*dictionary_type, *indexes_type); + MutableColumnPtr indexes = DataTypeUInt8().createColumn(); + MutableColumnPtr dictionary = createColumnUnique(*dictionary_type); return ColumnWithDictionary::create(std::move(dictionary), std::move(indexes)); } @@ -722,20 +717,17 @@ bool DataTypeWithDictionary::equals(const IDataType & rhs) const return false; auto & rhs_with_dictionary = static_cast(rhs); - return dictionary_type->equals(*rhs_with_dictionary.dictionary_type) - && indexes_type->equals(*rhs_with_dictionary.indexes_type); + return dictionary_type->equals(*rhs_with_dictionary.dictionary_type); } - static DataTypePtr create(const ASTPtr & arguments) { - if (!arguments || arguments->children.size() != 2) - throw Exception("WithDictionary data type family must have two arguments - type of elements and type of indices" - , ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + if (!arguments || arguments->children.size() != 1) + throw Exception("WithDictionary data type family must have single argument - type of elements", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - return std::make_shared(DataTypeFactory::instance().get(arguments->children[0]), - DataTypeFactory::instance().get(arguments->children[1])); + return std::make_shared(DataTypeFactory::instance().get(arguments->children[0])); } void registerDataTypeWithDictionary(DataTypeFactory & factory) diff --git a/dbms/src/DataTypes/DataTypeWithDictionary.h b/dbms/src/DataTypes/DataTypeWithDictionary.h index fe6848d389e..4fa7bc80ea0 100644 --- a/dbms/src/DataTypes/DataTypeWithDictionary.h +++ b/dbms/src/DataTypes/DataTypeWithDictionary.h @@ -9,17 +9,15 @@ class DataTypeWithDictionary : public IDataType { private: DataTypePtr dictionary_type; - DataTypePtr indexes_type; public: - DataTypeWithDictionary(DataTypePtr dictionary_type_, DataTypePtr indexes_type_); + DataTypeWithDictionary(DataTypePtr dictionary_type_); const DataTypePtr & getDictionaryType() const { return dictionary_type; } - const DataTypePtr & getIndexesType() const { return indexes_type; } String getName() const override { - return "WithDictionary(" + dictionary_type->getName() + ", " + indexes_type->getName() + ")"; + return "WithDictionary(" + dictionary_type->getName() + ")"; } const char * getFamilyName() const override { return "WithDictionary"; } @@ -146,7 +144,8 @@ public: bool onlyNull() const override { return false; } bool withDictionary() const override { return true; } - static MutableColumnUniquePtr createColumnUnique(const IDataType & keys_type, const IDataType & indexes_type); + static MutableColumnUniquePtr createColumnUnique(const IDataType & keys_type); + static MutableColumnUniquePtr createColumnUnique(const IDataType & keys_type, MutableColumnPtr && keys); private: @@ -164,14 +163,8 @@ private: void deserializeImpl(IColumn & column, ReadBuffer & istr, DeserealizeFunctionPtr func, Args & ... args) const; - template - static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type); - - template - static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type, const IDataType & indexes_type); - - - friend struct CreateColumnVector; + template + static MutableColumnUniquePtr createColumnUniqueImpl(const IDataType & keys_type, const Creator & creator); }; } diff --git a/dbms/src/DataTypes/IDataType.h b/dbms/src/DataTypes/IDataType.h index 5608f72f9e3..b180dd9365b 100644 --- a/dbms/src/DataTypes/IDataType.h +++ b/dbms/src/DataTypes/IDataType.h @@ -121,8 +121,10 @@ public: OutputStreamGetter getter; SubstreamPath path; - bool position_independent_encoding = true; size_t max_dictionary_size = 0; + bool use_single_dictionary_for_part = true; + + bool position_independent_encoding = true; }; struct DeserializeBinaryBulkSettings diff --git a/dbms/src/Functions/FunctionsConversion.h b/dbms/src/Functions/FunctionsConversion.h index b10fea32d8d..a6a2862ca8f 100644 --- a/dbms/src/Functions/FunctionsConversion.h +++ b/dbms/src/Functions/FunctionsConversion.h @@ -1576,7 +1576,7 @@ private: if (from_with_dict) { auto * col_with_dict = typeid_cast(prev_arg_col.get()); - arg.column = col_with_dict->getUnique()->getNestedColumn(); + arg.column = col_with_dict->getDictionary().getNestedColumn(); arg.type = from_with_dict->getDictionaryType(); tmp_rows_count = arg.column->size(); @@ -1602,9 +1602,7 @@ private: if (from_with_dict) { auto res_keys = std::move(res.column); - - auto idx = col_with_dict->getUnique()->uniqueInsertRangeFrom(*res_keys, 0, res_keys->size()); - col_with_dict->getIndexes()->insertRangeFrom(*idx->index(*res_indexes, 0), 0, res_indexes->size()); + col_with_dict->insertRangeFromDictionaryEncodedColumn(*res_keys, *res_indexes); } else col_with_dict->insertRangeFromFullColumn(*res.column, 0, res.column->size()); diff --git a/dbms/src/Functions/FunctionsMiscellaneous.cpp b/dbms/src/Functions/FunctionsMiscellaneous.cpp index b2054190327..ebd7e3aa3db 100644 --- a/dbms/src/Functions/FunctionsMiscellaneous.cpp +++ b/dbms/src/Functions/FunctionsMiscellaneous.cpp @@ -1836,35 +1836,11 @@ public: }; -template -struct FunctionMakeDictionaryName; -template <> -struct FunctionMakeDictionaryName -{ - static constexpr auto name = "makeDictionaryUInt8"; -}; -template <> -struct FunctionMakeDictionaryName -{ - static constexpr auto name = "makeDictionaryUInt16"; -}; -template <> -struct FunctionMakeDictionaryName -{ - static constexpr auto name = "makeDictionaryUInt32"; -}; -template <> -struct FunctionMakeDictionaryName -{ - static constexpr auto name = "makeDictionaryUInt64"; -}; - -template class FunctionMakeDictionary: public IFunction { public: - static constexpr auto name = FunctionMakeDictionaryName::name; - static FunctionPtr create(const Context &) { return std::make_shared>(); } + static constexpr auto name = "makeDictionary"; + static FunctionPtr create(const Context &) { return std::make_shared(); } String getName() const override { return name; } @@ -1875,7 +1851,7 @@ public: DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override { - return std::make_shared(arguments[0], std::make_shared>()); + return std::make_shared(arguments[0]); } void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override @@ -1910,7 +1886,7 @@ public: throw Exception("First first argument of function dictionaryIndexes must be ColumnWithDictionary, but got" + arguments[0]->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - return type->getIndexesType(); + return std::make_shared(); } void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override @@ -1918,7 +1894,13 @@ public: auto arg_num = arguments[0]; const auto & arg = block.getByPosition(arg_num); auto & res = block.getByPosition(result); - res.column = typeid_cast(arg.column.get())->getIndexesPtr(); + auto indexes_col = typeid_cast(arg.column.get())->getIndexesPtr(); + auto new_indexes_col = ColumnUInt64::create(indexes_col->size()); + auto & data = new_indexes_col->getData(); + for (size_t i = 0; i < data.size(); ++i) + data[i] = indexes_col->getUInt(i); + + res.column = std::move(new_indexes_col); } }; @@ -1952,7 +1934,7 @@ public: const auto & arg = block.getByPosition(arg_num); auto & res = block.getByPosition(result); const auto * column_with_dictionary = typeid_cast(arg.column.get()); - res.column = column_with_dictionary->getUnique()->getNestedColumn()->cloneResized(arg.column->size()); + res.column = column_with_dictionary->getDictionary().getNestedColumn()->cloneResized(arg.column->size()); } }; @@ -2009,10 +1991,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory) factory.registerFunction(); factory.registerFunction(); - factory.registerFunction>(); - factory.registerFunction>(); - factory.registerFunction>(); - factory.registerFunction>(); + factory.registerFunction(); factory.registerFunction(); factory.registerFunction(); } diff --git a/dbms/src/Functions/IFunction.cpp b/dbms/src/Functions/IFunction.cpp index 2a2492e4577..3d7d8ef4f73 100644 --- a/dbms/src/Functions/IFunction.cpp +++ b/dbms/src/Functions/IFunction.cpp @@ -254,7 +254,7 @@ static Block removeColumnsWithDictionary(Block & block, const ColumnNumbers & ar else { has_with_dictionary = true; - column_with_dict_size = column_with_dict->getUnique()->size(); + column_with_dict_size = column_with_dict->getDictionary().size(); indexes = column_with_dict->getIndexesPtr(); } } @@ -291,7 +291,7 @@ static Block removeColumnsWithDictionary(Block & block, const ColumnNumbers & ar + column.type->getName(), ErrorCodes::LOGICAL_ERROR); ColumnPtr new_column = convert_all_to_full ? column_with_dict->convertToFullColumn() - : column_with_dict->getUnique()->getNestedColumn(); + : column_with_dict->getDictionary().getNestedColumn(); temp_block.insert({new_column, type_with_dict->getDictionaryType(), column.name}); } @@ -355,7 +355,6 @@ void FunctionBuilderImpl::checkNumberOfArguments(size_t number_of_arguments) con struct ArgumentsWithoutDictionary { ColumnsWithTypeAndName arguments; - DataTypePtr common_index_type; bool all_without_dictionary = true; explicit ArgumentsWithoutDictionary(const ColumnsWithTypeAndName & args) @@ -372,12 +371,8 @@ struct ArgumentsWithoutDictionary arguments = args; } arguments[i].type = arg_with_dict->getDictionaryType(); - index_types.push_back(arg_with_dict->getIndexesType()); } } - - if (!all_without_dictionary) - common_index_type = getLeastSupertype(index_types); } }; @@ -387,13 +382,13 @@ DataTypePtr FunctionBuilderImpl::getReturnTypeWithoutDictionary(const ColumnsWit if (!arguments.empty() && useDefaultImplementationForNulls()) { - NullPresence null_presense = getNullPresense(arguments); + NullPresence null_presence = getNullPresense(arguments); - if (null_presense.has_null_constant) + if (null_presence.has_null_constant) { return makeNullable(std::make_shared()); } - if (null_presense.has_nullable) + if (null_presence.has_nullable) { Block nested_block = createBlockWithNestedColumns(Block(arguments), ext::collection_cast(ext::range(0, arguments.size()))); auto return_type = getReturnTypeImpl(ColumnsWithTypeAndName(nested_block.begin(), nested_block.end())); @@ -479,8 +474,7 @@ DataTypePtr FunctionBuilderImpl::getReturnType(const ColumnsWithTypeAndName & ar ArgumentsWithoutDictionary arguments_without_dictionary(arguments); if (!arguments_without_dictionary.all_without_dictionary) return std::make_shared( - getReturnTypeWithoutDictionary(arguments_without_dictionary.arguments), - arguments_without_dictionary.common_index_type); + getReturnTypeWithoutDictionary(arguments_without_dictionary.arguments)); } return getReturnTypeWithoutDictionary(arguments); diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index c4d0d7654e6..aa2dbe0a579 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -264,6 +264,10 @@ struct Settings M(SettingUInt64, enable_conditional_computation, 0, "Enable conditional computations") \ \ M(SettingDateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.") \ + \ + M(SettingUInt64, max_dictionary_size, 8192, "Maximum size (in rows) of shared global dictionary for WithDictionary type.") \ + M(SettingBool, use_single_dictionary_for_part, true, "WithDictionary type serialization setting. If is true, than will use additional keys when global dictionary overflows. Otherwise, will create several shared dictionaries.") \ + #define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \ TYPE NAME {DEFAULT}; diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp index c060f9e7982..d4947d9097a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp @@ -200,6 +200,11 @@ MergeTreeReader::Stream::Stream( getMark(right).offset_in_compressed_file - getMark(all_mark_ranges[i].begin).offset_in_compressed_file); } + /// Avoid empty buffer. May happen while reading dictionary for DataTypeWithDictionary. + /// For example: part has single dictionary and all marks point to the same position. + if (max_mark_range == 0) + max_mark_range = max_read_buffer_size; + size_t buffer_size = std::min(max_read_buffer_size, max_mark_range); /// Estimate size of the data to be read. @@ -329,6 +334,26 @@ void MergeTreeReader::Stream::seekToMark(size_t index) } +void MergeTreeReader::Stream::seekToStart() +{ + try + { + if (cached_buffer) + cached_buffer->seek(0, 0); + if (non_cached_buffer) + non_cached_buffer->seek(0, 0); + } + catch (Exception & e) + { + /// Better diagnostics. + if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND) + e.addMessage("(while seeking to start of column " + path_prefix + ")"); + + throw; + } +} + + void MergeTreeReader::addStreams(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type) { @@ -379,7 +404,12 @@ void MergeTreeReader::readData( Stream & stream = *it->second; - if (!continue_reading && !stream_for_prefix) + if (stream_for_prefix) + { + stream.seekToStart(); + continue_reading = false; + } + else if (!continue_reading) stream.seekToMark(from_mark); return stream.data_buffer; diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.h b/dbms/src/Storages/MergeTree/MergeTreeReader.h index 7271524ca86..a389918fdc4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.h @@ -64,6 +64,7 @@ private: const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type); void seekToMark(size_t index); + void seekToStart(); ReadBuffer * data_buffer; diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 0bf03531db5..a3da1e28319 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -97,9 +97,11 @@ void IMergedBlockOutputStream::writeData( bool skip_offsets, IDataType::SerializeBinaryBulkStatePtr & serialization_state) { - IDataType::SerializeBinaryBulkSettings settings; - settings.getter = createStreamGetter(name, offset_columns, skip_offsets); - settings.max_dictionary_size = 1024; + auto & settings = storage.context.getSettingsRef(); + IDataType::SerializeBinaryBulkSettings serialize_settings; + serialize_settings.getter = createStreamGetter(name, offset_columns, skip_offsets); + serialize_settings.max_dictionary_size = settings.max_dictionary_size; + serialize_settings.use_single_dictionary_for_part = settings.use_single_dictionary_for_part != 0; size_t size = column.size(); size_t prev_mark = 0; @@ -135,10 +137,10 @@ void IMergedBlockOutputStream::writeData( writeIntBinary(stream.plain_hashing.count(), stream.marks); writeIntBinary(stream.compressed.offset(), stream.marks); - }, settings.path); + }, serialize_settings.path); } - type.serializeBinaryBulkWithMultipleStreams(column, prev_mark, limit, settings, serialization_state); + type.serializeBinaryBulkWithMultipleStreams(column, prev_mark, limit, serialize_settings, serialization_state); /// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one. type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path) @@ -154,7 +156,7 @@ void IMergedBlockOutputStream::writeData( return; column_streams[stream_name]->compressed.nextIfAtEnd(); - }, settings.path); + }, serialize_settings.path); prev_mark += limit; } @@ -168,7 +170,7 @@ void IMergedBlockOutputStream::writeData( String stream_name = IDataType::getFileNameForStream(name, substream_path); offset_columns.insert(stream_name); } - }, settings.path); + }, serialize_settings.path); } @@ -296,14 +298,16 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart( MergeTreeData::DataPart::Checksums * additional_column_checksums) { /// Finish columns serialization. - IDataType::SerializeBinaryBulkSettings settings; - settings.max_dictionary_size = 1024; + auto & settings = storage.context.getSettingsRef(); + IDataType::SerializeBinaryBulkSettings serialize_settings; + serialize_settings.max_dictionary_size = settings.max_dictionary_size; + serialize_settings.use_single_dictionary_for_part = settings.use_single_dictionary_for_part != 0; OffsetColumns offset_columns; auto it = columns_list.begin(); for (size_t i = 0; i < columns_list.size(); ++i, ++it) { - settings.getter = createStreamGetter(it->name, offset_columns, false); - it->type->serializeBinaryBulkStateSuffix(settings, serialization_states[i]); + serialize_settings.getter = createStreamGetter(it->name, offset_columns, false); + it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]); } if (!total_column_list) @@ -550,14 +554,16 @@ void MergedColumnOnlyOutputStream::writeSuffix() MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums() { /// Finish columns serialization. - IDataType::SerializeBinaryBulkSettings settings; - settings.max_dictionary_size = 1024; + auto & settings = storage.context.getSettingsRef(); + IDataType::SerializeBinaryBulkSettings serialize_settings; + serialize_settings.max_dictionary_size = settings.max_dictionary_size; + serialize_settings.use_single_dictionary_for_part = settings.use_single_dictionary_for_part != 0; OffsetColumns offset_columns; for (size_t i = 0; i < header.columns(); ++i) { auto & column = header.safeGetByPosition(i); - settings.getter = createStreamGetter(column.name, offset_columns, skip_offsets); - column.type->serializeBinaryBulkStateSuffix(settings, serialization_states[i]); + serialize_settings.getter = createStreamGetter(column.name, offset_columns, skip_offsets); + column.type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]); } MergeTreeData::DataPart::Checksums checksums;