added max_block_size for DictionaryBlockImputStreamBase

This commit is contained in:
Nikolai Kochetov 2017-05-26 19:08:56 +03:00
parent 45c1beca2a
commit 5fb5397941
13 changed files with 296 additions and 160 deletions

View File

@ -954,7 +954,7 @@ CacheDictionary::Attribute & CacheDictionary::getAttribute(const std::string & a
bool CacheDictionary::isEmptyCell(const UInt64 idx) const
{
return (idx != zero_cell_idx && cells[idx].id == 0) || (cells[idx].data
return (idx != zero_cell_idx && cells[idx].id == 0) || (cells[idx].data
== ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t()));
}
@ -977,7 +977,7 @@ PaddedPODArray<CacheDictionary::Key> CacheDictionary::getCachedIds() const
BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_names) const
{
using BlockInputStreamType = DictionaryBlockInputStream<CacheDictionary, Key>;
auto block_input_stream = std::make_unique<BlockInputStreamType>(*this, getCachedIds(), column_names);
auto block_input_stream = std::make_unique<BlockInputStreamType>(shared_from_this(), 2, getCachedIds(), column_names);
return BlockInputStreamPtr(std::move(block_input_stream));
}

View File

@ -974,7 +974,7 @@ StringRef ComplexKeyCacheDictionary::copyKey(const StringRef key) const
bool ComplexKeyCacheDictionary::isEmptyCell(const UInt64 idx) const
{
return (cells[idx].key == StringRef{} && (idx != zero_cell_idx
return (cells[idx].key == StringRef{} && (idx != zero_cell_idx
|| cells[idx].data == ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t())));
}
@ -990,7 +990,7 @@ BlockInputStreamPtr ComplexKeyCacheDictionary::getBlockInputStream(const Names &
}
using BlockInputStreamType = DictionaryBlockInputStream<ComplexKeyCacheDictionary, UInt64>;
return std::move(std::make_unique<BlockInputStreamType>(*this, keys, column_names));
return std::move(std::make_unique<BlockInputStreamType>(shared_from_this(), 2, keys, column_names));
}

View File

@ -538,7 +538,7 @@ std::vector<StringRef> ComplexKeyHashedDictionary::getKeys(const Attribute& attr
BlockInputStreamPtr ComplexKeyHashedDictionary::getBlockInputStream(const Names & column_names) const
{
using BlockInputStreamType = DictionaryBlockInputStream<ComplexKeyHashedDictionary, UInt64>;
return std::move(std::make_unique<BlockInputStreamType>(*this, getKeys(), column_names));
return std::move(std::make_unique<BlockInputStreamType>(shared_from_this(), 2, getKeys(), column_names));
}

View File

@ -16,6 +16,7 @@
namespace DB
{
class ComplexKeyHashedDictionary final : public IDictionaryBase
{
public:

View File

@ -11,98 +11,135 @@
#include <common/logger_useful.h>
#include <Core/Names.h>
namespace DB
namespace DB
{
/*
* BlockInputStream implementation for external dictionaries
/*
* BlockInputStream implementation for external dictionaries
* read() returns single block consisting of the in-memory contents of the dictionaries
*/
template <class DictionaryType, class Key>
class DictionaryBlockInputStream : public DictionaryBlockInputStreamBase
{
public:
DictionaryBlockInputStream(const DictionaryType& dictionary,
const PaddedPODArray<Key> & ids, const Names & column_names);
DictionaryBlockInputStream(const DictionaryType& dictionary,
using DictionatyPtr = std::shared_ptr<DictionaryType const>;
DictionaryBlockInputStream(std::shared_ptr<const IDictionaryBase> dictionary, size_t max_block_size,
PaddedPODArray<Key> && ids, const Names & column_names);
DictionaryBlockInputStream(std::shared_ptr<const IDictionaryBase> dictionary, size_t max_block_size,
const std::vector<StringRef> & keys, const Names & column_names);
String getName() const override { return "DictionaryBlockInputStream"; }
String getName() const override {
return "DictionaryBlockInputStream";
}
protected:
Block getBlock(size_t start, size_t size) const override;
private:
// pointer types to getXXX functions
// for single key dictionaries
template <class Type>
using DictionaryGetter = void (DictionaryType::*)(
const std::string &, const PaddedPODArray<Key> &, PaddedPODArray<Type> &) const;
const std::string &, const PaddedPODArray<Key> &, PaddedPODArray<Type> &) const;
using DictionaryStringGetter = void (DictionaryType::*)(
const std::string &, const PaddedPODArray<Key> &, ColumnString *) const;
const std::string &, const PaddedPODArray<Key> &, ColumnString *) const;
// for complex complex key dictionaries
template <class Type>
using GetterByKey = void (DictionaryType::*)(
const std::string &, const ConstColumnPlainPtrs &, const DataTypes &, PaddedPODArray<Type> & out) const;
const std::string &, const ConstColumnPlainPtrs &, const DataTypes &, PaddedPODArray<Type> & out) const;
using StringGetterByKey = void (DictionaryType::*)(
const std::string &, const ConstColumnPlainPtrs &, const DataTypes &, ColumnString * out) const;
const std::string &, const ConstColumnPlainPtrs &, const DataTypes &, ColumnString * out) const;
// call getXXX
// for single key dictionaries
template <class Type, class Container>
void callGetter(DictionaryGetter<Type> getter, const PaddedPODArray<Key> & ids,
const ConstColumnPlainPtrs & keys, const DataTypes & data_types,
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary);
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const;
template <class Container>
void callGetter(DictionaryStringGetter getter, const PaddedPODArray<Key> & ids,
const ConstColumnPlainPtrs & keys, const DataTypes & data_types,
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary);
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const;
// for complex complex key dictionaries
template <class Type, class Container>
void callGetter(GetterByKey<Type> getter, const PaddedPODArray<Key> & ids,
const ConstColumnPlainPtrs & keys, const DataTypes & data_types,
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary);
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const;
template <class Container>
void callGetter(StringGetterByKey getter, const PaddedPODArray<Key> & ids,
const ConstColumnPlainPtrs & keys, const DataTypes & data_types,
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary);
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const;
template <template <class> class Getter, class StringGetter>
void fillBlock(const DictionaryType& dictionary, const Names& column_names,
const PaddedPODArray<Key>& ids, const ColumnsWithTypeAndName& keys);
Block fillBlock(const PaddedPODArray<Key>& ids, const ColumnsWithTypeAndName& keys) const;
template <class AttributeType, class Getter>
ColumnPtr getColumnFromAttribute(Getter getter, const PaddedPODArray<Key> & ids,
ColumnPtr getColumnFromAttribute(Getter getter, const PaddedPODArray<Key> & ids,
const ConstColumnPlainPtrs & keys, const DataTypes & data_types,
const DictionaryAttribute & attribute, const DictionaryType & dictionary);
const DictionaryAttribute & attribute, const DictionaryType & dictionary) const;
template <class Getter>
ColumnPtr getColumnFromStringAttribute(Getter getter, const PaddedPODArray<Key> & ids,
const ConstColumnPlainPtrs & keys, const DataTypes & data_types,
const DictionaryAttribute& attribute, const DictionaryType& dictionary);
ColumnPtr getColumnFromIds(const PaddedPODArray<Key>& ids);
const DictionaryAttribute& attribute, const DictionaryType& dictionary) const;
ColumnPtr getColumnFromIds(const PaddedPODArray<Key>& ids) const;
void fillKeyColumns(const std::vector<StringRef> & keys, const DictionaryStructure& dictionary_structure,
ColumnsWithTypeAndName & columns);
void fillKeyColumns(const std::vector<StringRef> & keys, size_t start, size_t size,
const DictionaryStructure& dictionary_structure, ColumnsWithTypeAndName & columns) const;
Poco::Logger * logger;
DictionatyPtr dictionary;
Names column_names;
PaddedPODArray<Key> ids;
ColumnsWithTypeAndName key_columns;
Block (DictionaryBlockInputStream<DictionaryType, Key>::*fillBlockFunction)(
const PaddedPODArray<Key>& ids, const ColumnsWithTypeAndName& keys) const;
};
template <class DictionaryType, class Key>
DictionaryBlockInputStream<DictionaryType, Key>::DictionaryBlockInputStream(
const DictionaryType& dictionary, const PaddedPODArray<Key> & ids, const Names& column_names)
std::shared_ptr<const IDictionaryBase> dictionary, size_t max_block_size,
PaddedPODArray<Key> && ids, const Names& column_names)
: DictionaryBlockInputStreamBase(ids.size(), max_block_size),
dictionary(std::static_pointer_cast<const DictionaryType>(dictionary)), column_names(column_names), ids(std::move(ids))
{
logger = &Poco::Logger::get("DictionaryBlockInputStream");
fillBlock<DictionaryGetter, DictionaryStringGetter>(dictionary, column_names, ids, {});
//fillBlock<DictionaryGetter, DictionaryStringGetter>(dictionary, column_names, ids, {});
fillBlockFunction = &DictionaryBlockInputStream<DictionaryType, Key>::fillBlock<DictionaryGetter, DictionaryStringGetter>;
}
template <class DictionaryType, class Key>
DictionaryBlockInputStream<DictionaryType, Key>::DictionaryBlockInputStream(
const DictionaryType& dictionary, const std::vector<StringRef> & keys, const Names& column_names)
std::shared_ptr<const IDictionaryBase> dictionary, size_t max_block_size,
const std::vector<StringRef> & keys, const Names& column_names)
: DictionaryBlockInputStreamBase(keys.size(), max_block_size),
dictionary(std::static_pointer_cast<const DictionaryType>(dictionary)), column_names(column_names)
{
logger = &Poco::Logger::get("DictionaryBlockInputStream");
const DictionaryStructure& dictionaty_structure = dictionary.getStructure();
ColumnsWithTypeAndName columns;
fillKeyColumns(keys, dictionaty_structure, columns);
fillBlock<GetterByKey, StringGetterByKey>(dictionary, column_names, {}, columns);
const DictionaryStructure& dictionaty_structure = dictionary->getStructure();
fillKeyColumns(keys, 0, keys.size(), dictionaty_structure, key_columns);
//fillBlock<GetterByKey, StringGetterByKey>(dictionary, column_names, {}, columns);
fillBlockFunction = &DictionaryBlockInputStream<DictionaryType, Key>::fillBlock<GetterByKey, StringGetterByKey>;
}
template <class DictionaryType, class Key>
Block DictionaryBlockInputStream<DictionaryType, Key>::getBlock(size_t start, size_t length) const
{
if (ids.empty())
{
ColumnsWithTypeAndName columns;
columns.reserve(key_columns.size());
for (const auto & key_column : key_columns)
columns.emplace_back(key_column.column->cut(start, length), key_column.type, key_column.name);
// throw std::to_string(columns.size()) + " " + std::to_string(columns[0].column->size());
return (this->*fillBlockFunction)({}, columns);
}
else
{
PaddedPODArray<Key> block_ids(ids.begin() + start, ids.begin() + start + length);
return (this->*fillBlockFunction)(block_ids, {});
}
}
template <class DictionaryType, class Key>
@ -110,7 +147,7 @@ template <class Type, class Container>
void DictionaryBlockInputStream<DictionaryType, Key>::callGetter(
DictionaryGetter<Type> getter, const PaddedPODArray<Key> & ids,
const ConstColumnPlainPtrs & keys, const DataTypes & data_types,
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary)
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const
{
(dictionary.*getter)(attribute.name, ids, container);
}
@ -120,7 +157,7 @@ template <class Container>
void DictionaryBlockInputStream<DictionaryType, Key>::callGetter(
DictionaryStringGetter getter, const PaddedPODArray<Key> & ids,
const ConstColumnPlainPtrs & keys, const DataTypes & data_types,
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary)
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const
{
(dictionary.*getter)(attribute.name, ids, container);
}
@ -130,7 +167,7 @@ template <class Type, class Container>
void DictionaryBlockInputStream<DictionaryType, Key>::callGetter(
GetterByKey<Type> getter, const PaddedPODArray<Key> & ids,
const ConstColumnPlainPtrs & keys, const DataTypes & data_types,
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary)
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const
{
(dictionary.*getter)(attribute.name, keys, data_types, container);
}
@ -140,16 +177,15 @@ template <class Container>
void DictionaryBlockInputStream<DictionaryType, Key>::callGetter(
StringGetterByKey getter, const PaddedPODArray<Key> & ids,
const ConstColumnPlainPtrs & keys, const DataTypes & data_types,
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary)
Container & container, const DictionaryAttribute & attribute, const DictionaryType & dictionary) const
{
(dictionary.*getter)(attribute.name, keys, data_types, container);
}
template <class DictionaryType, class Key>
template <template <class> class Getter, class StringGetter>
void DictionaryBlockInputStream<DictionaryType, Key>::fillBlock(
const DictionaryType& dictionary, const Names& column_names,
const PaddedPODArray<Key>& ids, const ColumnsWithTypeAndName& keys)
Block DictionaryBlockInputStream<DictionaryType, Key>::fillBlock(
const PaddedPODArray<Key>& ids, const ColumnsWithTypeAndName& keys) const
{
std::unordered_set<std::string> names(column_names.begin(), column_names.end());
@ -163,7 +199,7 @@ void DictionaryBlockInputStream<DictionaryType, Key>::fillBlock(
}
ColumnsWithTypeAndName columns;
const DictionaryStructure& structure = dictionary.getStructure();
const DictionaryStructure& structure = dictionary->getStructure();
if (structure.id && names.find(structure.id->name) != names.end())
columns.emplace_back(getColumnFromIds(ids), std::make_shared<DataTypeUInt64>(), structure.id->name);
@ -175,36 +211,56 @@ void DictionaryBlockInputStream<DictionaryType, Key>::fillBlock(
for (const auto idx : ext::range(0, structure.attributes.size()))
{
const DictionaryAttribute& attribute = structure.attributes[idx];
if (names.find(attribute.name) != names.end())
if (names.find(attribute.name) != names.end())
{
ColumnPtr column;
#define GET_COLUMN_FORM_ATTRIBUTE(TYPE) \
#define GET_COLUMN_FORM_ATTRIBUTE(TYPE) \
column = getColumnFromAttribute<TYPE, Getter<TYPE>>( \
&DictionaryType::get##TYPE, ids, key_columns_palin_ptrs, data_types, attribute, dictionary)
&DictionaryType::get##TYPE, ids, key_columns_palin_ptrs, data_types, attribute, *dictionary)
switch (attribute.underlying_type)
{
case AttributeUnderlyingType::UInt8: GET_COLUMN_FORM_ATTRIBUTE(UInt8); break;
case AttributeUnderlyingType::UInt16: GET_COLUMN_FORM_ATTRIBUTE(UInt16); break;
case AttributeUnderlyingType::UInt32: GET_COLUMN_FORM_ATTRIBUTE(UInt32); break;
case AttributeUnderlyingType::UInt64: GET_COLUMN_FORM_ATTRIBUTE(UInt64); break;
case AttributeUnderlyingType::Int8: GET_COLUMN_FORM_ATTRIBUTE(Int8); break;
case AttributeUnderlyingType::Int16: GET_COLUMN_FORM_ATTRIBUTE(Int16); break;
case AttributeUnderlyingType::Int32: GET_COLUMN_FORM_ATTRIBUTE(Int32); break;
case AttributeUnderlyingType::Int64: GET_COLUMN_FORM_ATTRIBUTE(Int64); break;
case AttributeUnderlyingType::Float32: GET_COLUMN_FORM_ATTRIBUTE(Float32); break;
case AttributeUnderlyingType::Float64: GET_COLUMN_FORM_ATTRIBUTE(Float64); break;
case AttributeUnderlyingType::String:
{
column = getColumnFromStringAttribute<StringGetter>(
&DictionaryType::getString, ids, key_columns_palin_ptrs, data_types, attribute, dictionary);
break;
}
case AttributeUnderlyingType::UInt8:
GET_COLUMN_FORM_ATTRIBUTE(UInt8);
break;
case AttributeUnderlyingType::UInt16:
GET_COLUMN_FORM_ATTRIBUTE(UInt16);
break;
case AttributeUnderlyingType::UInt32:
GET_COLUMN_FORM_ATTRIBUTE(UInt32);
break;
case AttributeUnderlyingType::UInt64:
GET_COLUMN_FORM_ATTRIBUTE(UInt64);
break;
case AttributeUnderlyingType::Int8:
GET_COLUMN_FORM_ATTRIBUTE(Int8);
break;
case AttributeUnderlyingType::Int16:
GET_COLUMN_FORM_ATTRIBUTE(Int16);
break;
case AttributeUnderlyingType::Int32:
GET_COLUMN_FORM_ATTRIBUTE(Int32);
break;
case AttributeUnderlyingType::Int64:
GET_COLUMN_FORM_ATTRIBUTE(Int64);
break;
case AttributeUnderlyingType::Float32:
GET_COLUMN_FORM_ATTRIBUTE(Float32);
break;
case AttributeUnderlyingType::Float64:
GET_COLUMN_FORM_ATTRIBUTE(Float64);
break;
case AttributeUnderlyingType::String:
{
column = getColumnFromStringAttribute<StringGetter>(
&DictionaryType::getString, ids, key_columns_palin_ptrs, data_types, attribute, *dictionary);
break;
}
}
columns.emplace_back(column, attribute.type, attribute.name);
}
}
block = Block(columns);
return Block(columns);
}
template <class DictionaryType, class Key>
@ -212,7 +268,7 @@ template <class AttributeType, class Getter>
ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromAttribute(
Getter getter, const PaddedPODArray<Key> & ids,
const ConstColumnPlainPtrs & keys, const DataTypes & data_types,
const DictionaryAttribute & attribute, const DictionaryType & dictionary)
const DictionaryAttribute & attribute, const DictionaryType & dictionary) const
{
auto size = ids.size();
if (!keys.empty())
@ -227,7 +283,7 @@ template <class Getter>
ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromStringAttribute(
Getter getter, const PaddedPODArray<Key> & ids,
const ConstColumnPlainPtrs & keys, const DataTypes & data_types,
const DictionaryAttribute& attribute, const DictionaryType& dictionary)
const DictionaryAttribute& attribute, const DictionaryType& dictionary) const
{
auto column_string = std::make_unique<ColumnString>();
auto ptr = column_string.get();
@ -236,11 +292,11 @@ ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromStringAt
}
template <class DictionaryType, class Key>
ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromIds(const PaddedPODArray<Key>& ids)
ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromIds(const PaddedPODArray<Key>& ids) const
{
auto column_vector = std::make_unique<ColumnVector<UInt64>>();
column_vector->getData().reserve(ids.size());
for (UInt64 id : ids)
for (UInt64 id : ids)
{
column_vector->insert(id);
}
@ -249,36 +305,55 @@ ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromIds(cons
template <class DictionaryType, class Key>
void DictionaryBlockInputStream<DictionaryType, Key>::fillKeyColumns(
const std::vector<StringRef> & keys, const DictionaryStructure& dictionary_structure,
ColumnsWithTypeAndName & columns)
const std::vector<StringRef> & keys, size_t start, size_t size,
const DictionaryStructure& dictionary_structure, ColumnsWithTypeAndName & columns) const
{
for (const DictionaryAttribute & attribute: *dictionary_structure.key)
{
#define ADD_COLUMN(TYPE) columns.push_back( \
#define ADD_COLUMN(TYPE) columns.push_back( \
ColumnWithTypeAndName(std::move(std::make_unique<ColumnVector<TYPE>>()), attribute.type, attribute.name))
switch (attribute.underlying_type)
{
case AttributeUnderlyingType::UInt8: ADD_COLUMN(UInt8); break;
case AttributeUnderlyingType::UInt16:ADD_COLUMN(UInt16); break;
case AttributeUnderlyingType::UInt32: ADD_COLUMN(UInt32); break;
case AttributeUnderlyingType::UInt64: ADD_COLUMN(UInt64); break;
case AttributeUnderlyingType::Int8: ADD_COLUMN(Int8); break;
case AttributeUnderlyingType::Int16: ADD_COLUMN(Int16); break;
case AttributeUnderlyingType::Int32: ADD_COLUMN(Int32); break;
case AttributeUnderlyingType::Int64: ADD_COLUMN(Int64); break;
case AttributeUnderlyingType::Float32: ADD_COLUMN(Float32); break;
case AttributeUnderlyingType::Float64: ADD_COLUMN(Float64); break;
case AttributeUnderlyingType::String:
{
columns.push_back(ColumnWithTypeAndName(
std::move(std::make_unique<ColumnString>()), attribute.type, attribute.name));
break;
}
case AttributeUnderlyingType::UInt8:
ADD_COLUMN(UInt8);
break;
case AttributeUnderlyingType::UInt16:
ADD_COLUMN(UInt16);
break;
case AttributeUnderlyingType::UInt32:
ADD_COLUMN(UInt32);
break;
case AttributeUnderlyingType::UInt64:
ADD_COLUMN(UInt64);
break;
case AttributeUnderlyingType::Int8:
ADD_COLUMN(Int8);
break;
case AttributeUnderlyingType::Int16:
ADD_COLUMN(Int16);
break;
case AttributeUnderlyingType::Int32:
ADD_COLUMN(Int32);
break;
case AttributeUnderlyingType::Int64:
ADD_COLUMN(Int64);
break;
case AttributeUnderlyingType::Float32:
ADD_COLUMN(Float32);
break;
case AttributeUnderlyingType::Float64:
ADD_COLUMN(Float64);
break;
case AttributeUnderlyingType::String:
{
columns.push_back(ColumnWithTypeAndName(std::make_shared<ColumnString>(), attribute.type, attribute.name));
break;
}
}
}
for (const auto & key : keys)
for (auto idx : ext::range(start, size))
{
const auto & key = keys[idx];
auto ptr = key.data;
for (const auto & column : columns)
ptr = column.column->deserializeAndInsertFromArena(ptr);
@ -287,4 +362,4 @@ void DictionaryBlockInputStream<DictionaryType, Key>::fillKeyColumns(
}
}
}
}

View File

@ -3,6 +3,11 @@
namespace DB
{
DictionaryBlockInputStreamBase::DictionaryBlockInputStreamBase(size_t rows_count, size_t max_block_size)
: rows_count(rows_count), max_block_size(max_block_size), next_row(0)
{
}
String DictionaryBlockInputStreamBase::getID() const
{
std::stringstream ss;
@ -12,11 +17,13 @@ String DictionaryBlockInputStreamBase::getID() const
Block DictionaryBlockInputStreamBase::readImpl()
{
if (was_read)
if (next_row == rows_count)
return Block();
was_read = true;
size_t block_size = std::min<size_t>(max_block_size, rows_count - next_row);
Block block = getBlock(next_row, block_size);
next_row += block_size;
return block;
}
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
namespace DB
namespace DB
{
class DictionaryBlockInputStreamBase : public IProfilingBlockInputStream
@ -9,16 +9,19 @@ class DictionaryBlockInputStreamBase : public IProfilingBlockInputStream
protected:
Block block;
DictionaryBlockInputStreamBase() : was_read(false) {}
DictionaryBlockInputStreamBase(size_t rows_count, size_t max_block_size);
String getID() const override;
virtual Block getBlock(size_t start, size_t length) const = 0;
private:
bool was_read;
const size_t rows_count;
const size_t max_block_size;
size_t next_row;
Block readImpl() override;
void readPrefixImpl() override { was_read = false; }
void readSuffixImpl() override { was_read = false; }
void readPrefixImpl() override { next_row = 0; }
};
}
}

View File

@ -542,7 +542,7 @@ PaddedPODArray<FlatDictionary::Key> FlatDictionary::getIds() const
BlockInputStreamPtr FlatDictionary::getBlockInputStream(const Names & column_names) const
{
using BlockInputStreamType = DictionaryBlockInputStream<FlatDictionary, Key>;
auto block_input_stream = std::make_unique<BlockInputStreamType>(*this, getIds() ,column_names);
auto block_input_stream = std::make_unique<BlockInputStreamType>(shared_from_this(), 2, getIds() ,column_names);
return BlockInputStreamPtr(std::move(block_input_stream));
}

View File

@ -515,7 +515,7 @@ PaddedPODArray<HashedDictionary::Key> HashedDictionary::getIds() const
BlockInputStreamPtr HashedDictionary::getBlockInputStream(const Names & column_names) const
{
using BlockInputStreamType = DictionaryBlockInputStream<HashedDictionary, Key>;
auto block_input_stream = std::make_unique<BlockInputStreamType>(*this, getIds(), column_names);
auto block_input_stream = std::make_unique<BlockInputStreamType>(shared_from_this(), 2, getIds(), column_names);
return BlockInputStreamPtr(std::move(block_input_stream));
}

View File

@ -22,7 +22,7 @@ struct DictionaryStructure;
class ColumnString;
struct IDictionaryBase
struct IDictionaryBase : public std::enable_shared_from_this<IDictionaryBase>
{
using Key = UInt64;

View File

@ -10,23 +10,28 @@
#include <Dictionaries/IDictionary.h>
#include <ext/range.hpp>
namespace DB
namespace DB
{
/*
* BlockInputStream implementation for external dictionaries
/*
* BlockInputStream implementation for external dictionaries
* read() returns single block consisting of the in-memory contents of the dictionaries
*/
template <class DictionaryType, class Key>
class RangeDictionaryBlockInputStream : public DictionaryBlockInputStreamBase
{
public:
using DictionatyPtr = std::shared_ptr<DictionaryType const>;
RangeDictionaryBlockInputStream(
const DictionaryType& dictionary, const Names & column_names, const PaddedPODArray<Key> & ids,
const PaddedPODArray<UInt16> & start_dates, const PaddedPODArray<UInt16> & end_dates);
DictionatyPtr dictionary, size_t max_block_size, const Names & column_names, PaddedPODArray<Key> && ids,
PaddedPODArray<UInt16> && start_dates, PaddedPODArray<UInt16> && end_dates);
String getName() const override { return "RangeDictionaryBlockInputStream"; }
protected:
Block getBlock(size_t start, size_t length) const override;
private:
template <class Type>
using DictionaryGetter = void (DictionaryType::*)(const std::string &, const PaddedPODArray<Key> &,
@ -35,29 +40,104 @@ private:
template <class AttributeType>
ColumnPtr getColumnFromAttribute(DictionaryGetter<AttributeType> getter,
const PaddedPODArray<Key>& ids, const PaddedPODArray<UInt16> & dates,
const DictionaryAttribute& attribute, const DictionaryType& dictionary);
const DictionaryAttribute& attribute, const DictionaryType& dictionary) const;
ColumnPtr getColumnFromAttributeString(const PaddedPODArray<Key>& ids, const PaddedPODArray<UInt16> & dates,
const DictionaryAttribute& attribute, const DictionaryType& dictionary);
const DictionaryAttribute& attribute, const DictionaryType& dictionary) const;
template <class T>
ColumnPtr getColumnFromPODArray(const PaddedPODArray<T>& array);
ColumnPtr getColumnFromPODArray(const PaddedPODArray<T>& array) const;
template <class T>
void addSpecialColumn(
const std::experimental::optional<DictionarySpecialAttribute>& attribute, DataTypePtr type,
const std::string & default_name, const std::unordered_set<std::string> & column_names,
const PaddedPODArray<T> & values, ColumnsWithTypeAndName& columns);
const PaddedPODArray<T> & values, ColumnsWithTypeAndName& columns) const;
Block fillBlock(const PaddedPODArray<Key> & ids,
const PaddedPODArray<UInt16> & start_dates, const PaddedPODArray<UInt16> & end_dates) const;
DictionatyPtr dictionary;
Names column_names;
PaddedPODArray<Key> ids;
PaddedPODArray<UInt16> start_dates;
PaddedPODArray<UInt16> end_dates;
};
template <class DictionaryType, class Key>
RangeDictionaryBlockInputStream<DictionaryType, Key>::RangeDictionaryBlockInputStream(
DictionatyPtr dictionary, size_t max_column_size, const Names & column_names, PaddedPODArray<Key> && ids,
PaddedPODArray<UInt16> && start_dates, PaddedPODArray<UInt16> && end_dates)
: DictionaryBlockInputStreamBase(ids.size(), max_column_size),
dictionary(dictionary), column_names(column_names),
ids(std::move(ids)), start_dates(std::move(start_dates)), end_dates(std::move(end_dates))
{
}
template <class DictionaryType, class Key>
Block RangeDictionaryBlockInputStream<DictionaryType, Key>::getBlock(size_t start, size_t length) const
{
PaddedPODArray<Key> block_ids;
PaddedPODArray<UInt16> block_start_dates;
PaddedPODArray<UInt16> block_end_dates;
block_ids.reserve(length);
block_start_dates.reserve(length);
block_end_dates.reserve(length);
for (auto idx : ext::range(start, start + length))
{
block_ids.push_back(ids[idx]);
block_start_dates.push_back(block_start_dates[idx]);
block_end_dates.push_back(block_end_dates[idx]);
}
return fillBlock(block_ids, block_start_dates, block_end_dates);
}
template <class DictionaryType, class Key>
template <class AttributeType>
ColumnPtr RangeDictionaryBlockInputStream<DictionaryType, Key>::getColumnFromAttribute(
DictionaryGetter<AttributeType> getter, const PaddedPODArray<Key>& ids,
const PaddedPODArray<UInt16> & dates, const DictionaryAttribute& attribute, const DictionaryType& dictionary) const
{
auto column_vector = std::make_unique<ColumnVector<AttributeType>>(ids.size());
(dictionary.*getter)(attribute.name, ids, dates, column_vector->getData());
return ColumnPtr(std::move(column_vector));
}
template <class DictionaryType, class Key>
ColumnPtr RangeDictionaryBlockInputStream<DictionaryType, Key>::getColumnFromAttributeString(
const PaddedPODArray<Key>& ids, const PaddedPODArray<UInt16> & dates,
const DictionaryAttribute& attribute, const DictionaryType& dictionary) const
{
auto column_string = std::make_unique<ColumnString>();
dictionary.getString(attribute.name, ids, dates, column_string.get());
return ColumnPtr(std::move(column_string));
}
template <class DictionaryType, class Key>
template <class T>
ColumnPtr RangeDictionaryBlockInputStream<DictionaryType, Key>::getColumnFromPODArray(const PaddedPODArray<T>& array) const
{
auto column_vector = std::make_unique<ColumnVector<T>>();
column_vector->getData().reserve(array.size());
for (T value : array)
{
column_vector->insert(value);
}
return ColumnPtr(std::move(column_vector));
}
template <class DictionaryType, class Key>
template <class T>
void RangeDictionaryBlockInputStream<DictionaryType, Key>::addSpecialColumn(
const std::experimental::optional<DictionarySpecialAttribute> & attribute, DataTypePtr type,
const std::string& default_name, const std::unordered_set<std::string> & column_names,
const PaddedPODArray<T> & values, ColumnsWithTypeAndName & columns)
const std::string& default_name, const std::unordered_set<std::string> & column_names,
const PaddedPODArray<T> & values, ColumnsWithTypeAndName & columns) const
{
std::string name = default_name;
if (attribute) {
name = attribute->name;
name = attribute->name;
}
if (column_names.find(name) != column_names.end()) {
columns.emplace_back(getColumnFromPODArray(values), type, name);
@ -65,12 +145,12 @@ name = attribute->name;
}
template <class DictionaryType, class Key>
RangeDictionaryBlockInputStream<DictionaryType, Key>::RangeDictionaryBlockInputStream(
const DictionaryType& dictionary, const Names & column_names, const PaddedPODArray<Key>& ids,
const PaddedPODArray<UInt16> & start_dates, const PaddedPODArray<UInt16> & end_dates)
Block RangeDictionaryBlockInputStream<DictionaryType, Key>::fillBlock(
const PaddedPODArray<Key>& ids,
const PaddedPODArray<UInt16> & start_dates, const PaddedPODArray<UInt16> & end_dates) const
{
ColumnsWithTypeAndName columns;
const DictionaryStructure& structure = dictionary.getStructure();
const DictionaryStructure& structure = dictionary->getStructure();
std::unordered_set<std::string> names(column_names.begin(), column_names.end());
@ -81,11 +161,11 @@ RangeDictionaryBlockInputStream<DictionaryType, Key>::RangeDictionaryBlockInputS
for (const auto idx : ext::range(0, structure.attributes.size()))
{
const DictionaryAttribute& attribute = structure.attributes[idx];
if (names.find(attribute.name) != names.end())
if (names.find(attribute.name) != names.end())
{
ColumnPtr column;
#define GET_COLUMN_FORM_ATTRIBUTE(TYPE)\
column = getColumnFromAttribute<TYPE>(&DictionaryType::get##TYPE, ids, start_dates, attribute, dictionary)
column = getColumnFromAttribute<TYPE>(&DictionaryType::get##TYPE, ids, start_dates, attribute, *dictionary)
switch (attribute.underlying_type)
{
case AttributeUnderlyingType::UInt8: GET_COLUMN_FORM_ATTRIBUTE(UInt8); break;
@ -99,47 +179,13 @@ RangeDictionaryBlockInputStream<DictionaryType, Key>::RangeDictionaryBlockInputS
case AttributeUnderlyingType::Float32: GET_COLUMN_FORM_ATTRIBUTE(Float32); break;
case AttributeUnderlyingType::Float64: GET_COLUMN_FORM_ATTRIBUTE(Float64); break;
case AttributeUnderlyingType::String:
column = getColumnFromAttributeString(ids, start_dates, attribute, dictionary); break;
column = getColumnFromAttributeString(ids, start_dates, attribute, *dictionary); break;
}
columns.emplace_back(column, attribute.type, attribute.name);
}
}
block = Block(columns);
return Block(columns);
}
template <class DictionaryType, class Key>
template <class AttributeType>
ColumnPtr RangeDictionaryBlockInputStream<DictionaryType, Key>::getColumnFromAttribute(
DictionaryGetter<AttributeType> getter, const PaddedPODArray<Key>& ids,
const PaddedPODArray<UInt16> & dates, const DictionaryAttribute& attribute, const DictionaryType& dictionary)
{
auto column_vector = std::make_unique<ColumnVector<AttributeType>>(ids.size());
(dictionary.*getter)(attribute.name, ids, dates, column_vector->getData());
return ColumnPtr(std::move(column_vector));
}
template <class DictionaryType, class Key>
ColumnPtr RangeDictionaryBlockInputStream<DictionaryType, Key>::getColumnFromAttributeString(
const PaddedPODArray<Key>& ids, const PaddedPODArray<UInt16> & dates,
const DictionaryAttribute& attribute, const DictionaryType& dictionary)
{
auto column_string = std::make_unique<ColumnString>();
dictionary.getString(attribute.name, ids, dates, column_string.get());
return ColumnPtr(std::move(column_string));
}
template <class DictionaryType, class Key>
template <class T>
ColumnPtr RangeDictionaryBlockInputStream<DictionaryType, Key>::getColumnFromPODArray(const PaddedPODArray<T>& array)
{
auto column_vector = std::make_unique<ColumnVector<T>>();
column_vector->getData().reserve(array.size());
for (T value : array)
{
column_vector->insert(value);
}
return ColumnPtr(std::move(column_vector));
}
}

View File

@ -354,7 +354,7 @@ const RangeHashedDictionary::Attribute & RangeHashedDictionary::getAttributeWith
return attribute;
}
void RangeHashedDictionary::getIdsAndDates(PaddedPODArray<Key> & ids,
void RangeHashedDictionary::getIdsAndDates(PaddedPODArray<Key> & ids,
PaddedPODArray<UInt16> & start_dates, PaddedPODArray<UInt16> & end_dates) const
{
const auto & attribute = attributes.front();
@ -376,14 +376,14 @@ void RangeHashedDictionary::getIdsAndDates(PaddedPODArray<Key> & ids,
}
template <typename T>
void RangeHashedDictionary::getIdsAndDates(const Attribute& attribute, PaddedPODArray<Key> & ids,
void RangeHashedDictionary::getIdsAndDates(const Attribute& attribute, PaddedPODArray<Key> & ids,
PaddedPODArray<UInt16> & start_dates, PaddedPODArray<UInt16> & end_dates) const
{
const HashMap<UInt64, Values<T>> & attr = *std::get<Ptr<T>>(attribute.maps);
for (const auto & key : attr) {
ids.push_back(key.first);
for (const auto & value : key.second)
for (const auto & value : key.second)
{
start_dates.push_back(value.range.first);
end_dates.push_back(value.range.second);
@ -399,7 +399,9 @@ BlockInputStreamPtr RangeHashedDictionary::getBlockInputStream(const Names & col
getIdsAndDates(ids, start_dates, end_dates);
using BlockInputStreamType = RangeDictionaryBlockInputStream<RangeHashedDictionary, Key>;
auto block_input_stream = std::make_unique<BlockInputStreamType>(*this, column_names, ids, start_dates, end_dates);
auto dict_ptr = std::static_pointer_cast<const RangeHashedDictionary>(shared_from_this());
auto block_input_stream = std::make_unique<BlockInputStreamType>(
dict_ptr, 2, column_names, std::move(ids), std::move(start_dates), std::move(end_dates));
return BlockInputStreamPtr(std::move(block_input_stream));
}

View File

@ -128,6 +128,8 @@ public:
void has(const ConstColumnPlainPtrs & key_columns, const DataTypes & key_types, PaddedPODArray<UInt8> & out) const;
BlockInputStreamPtr getBlockInputStream(const Names & column_names) const override { return source_ptr->loadAll(); }
private:
template <typename Value> using ContainerType = std::vector<Value>;
template <typename Value> using ContainerPtrType = std::unique_ptr<ContainerType<Value>>;