added DataTypeWithDictionary serialization per granule

This commit is contained in:
Nikolai Kochetov 2018-05-04 23:11:32 +03:00
parent 5b0ac680ec
commit bfc42259cb
3 changed files with 34 additions and 17 deletions

View File

@ -6,6 +6,7 @@
#include <Common/typeid_cast.h>
#include <Columns/ColumnVector.h>
#include <Common/HashTable/HashSet.h>
#include <Common/HashTable/HashMap.h>
namespace DB
@ -323,30 +324,44 @@ namespace detail
return nullptr;
}
template <typename T>
PaddedPODArray<T> * getIndexesData(IColumn & indexes)
{
auto * column = typeid_cast<const ColumnVector<T> *>(&indexes);
if (column)
return &column->getData();
return nullptr;
}
template const PaddedPODArray<UInt8> * getIndexesData<UInt8>(const DB::ColumnPtr & indexes);
template const PaddedPODArray<UInt16> * getIndexesData<UInt16>(const DB::ColumnPtr & indexes);
template const PaddedPODArray<UInt32> * getIndexesData<UInt32>(const DB::ColumnPtr & indexes);
template const PaddedPODArray<UInt64> * getIndexesData<UInt64>(const DB::ColumnPtr & indexes);
template <typename T>
MutableColumnPtr getUniqueIndexImpl(const PaddedPODArray<T> & index)
MutableColumnPtr getUniqueIndexImpl(PaddedPODArray<T> & index)
{
HashSet<T> hash_table;
HashMap<T, T> hash_map;
for (auto val : index)
hash_table.insert(val);
hash_map.insert({val, hash_map.size()});
auto res_col = ColumnVector<T>::create();
auto & data = res_col->getData();
data.reserve(hash_table.size());
for (auto val : hash_table)
data.push_back(val);
data.resize(hash_map.size());
for (auto val : hash_map)
data[val.second] = val.first;
for (auto & ind : index)
ind = hash_map[ind];
return std::move(res_col);
}
}
MutableColumnPtr getUniqueIndex(const ColumnPtr & column)
/// Returns unique values of column. Write new index to column.
MutableColumnPtr makeSubIndex(IColumn & column)
{
if (auto * data_uint8 = detail::getIndexesData<UInt8>(column))
return detail::getUniqueIndexImpl(*data_uint8);
@ -357,7 +372,7 @@ MutableColumnPtr getUniqueIndex(const ColumnPtr & column)
else if (auto * data_uint64 = detail::getIndexesData<UInt64>(column))
return detail::getUniqueIndexImpl(*data_uint64);
else
throw Exception("Indexes column for getUniqueIndex must be ColumnUInt, got" + column->getName(),
throw Exception("Indexes column for makeSubindex must be ColumnUInt, got" + column->getName(),
ErrorCodes::LOGICAL_ERROR);
}

View File

@ -75,6 +75,6 @@ ColumnPtr selectIndexImpl(const Column & column, const ColumnPtr & indexes, size
/// Get unique values from index column (ColumnUInt*).
MutableColumnPtr getUniqueIndex(const ColumnPtr & column);
MutableColumnPtr makeSubIndex(const ColumnPtr & column);
}

View File

@ -69,8 +69,7 @@ void DataTypeWithDictionary::serializeBinaryBulkWithMultipleStreams(
SubstreamPath path) const
{
const ColumnWithDictionary & column_with_dictionary = typeid_cast<const ColumnWithDictionary &>(column);
const auto & indexes = column_with_dictionary.getIndexesPtr();
const auto & keys = column_with_dictionary.getUnique()->getNestedColumn();
MutableColumnPtr sub_index;
if (limit == 0)
limit = indexes->size();
@ -78,10 +77,10 @@ void DataTypeWithDictionary::serializeBinaryBulkWithMultipleStreams(
path.push_back(Substream::DictionaryKeys);
if (auto stream = getter(path))
{
bool full_column = offset == 0 && limit >= indexes->size();
ColumnPtr unique_indexes = getUniqueIndex(full_column ? indexes : indexes->cut(offset, limit - offset));
const auto & indexes = column_with_dictionary.getIndexesPtr();
const auto & keys = column_with_dictionary.getUnique()->getNestedColumn();
sub_index = (*indexes->cut(offset, limit - offset)).mutate();
ColumnPtr unique_indexes = makeSubIndex(sub_index);
auto used_keys = keys->index(unique_indexes, 0);
UInt64 used_keys_size = used_keys->size();
@ -92,7 +91,10 @@ void DataTypeWithDictionary::serializeBinaryBulkWithMultipleStreams(
path.back() = Substream::DictionaryIndexes;
if (auto stream = getter(path))
{
indexes_type->serializeBinaryBulk(*indexes, *stream, offset, limit);
if (!sub_index)
throw Exception("Dictionary keys wasn't serialized", ErrorCodes::LOGICAL_ERROR);
indexes_type->serializeBinaryBulk(*sub_index, *stream, offset, limit);
}
}
@ -112,7 +114,7 @@ void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams(
{
UInt64 num_keys;
readIntBinary(num_keys, *stream);
auto dict_column = column_with_dictionary.getUnique()->getNestedColumn()->cloneEmpty();
auto dict_column = dictionary_type->cloneEmpty();
dictionary_type->deserializeBinaryBulkWithMultipleStreams(*dict_column, getter, num_keys, 0, position_independent_encoding, path);
indexes = column_with_dictionary.getUnique()->uniqueInsertRangeFrom(*dict_column, 0, num_keys);
}