added DictionaryBlockInputStream

This commit is contained in:
Nikolai Kochetov 2017-04-27 20:16:24 +03:00
parent bdf998d20f
commit 9243439e9b
6 changed files with 192 additions and 4 deletions

View File

@ -1,6 +1,9 @@
#include <functional>
#include <sstream>
#include <memory>
#include <Columns/ColumnsNumber.h>
#include <Dictionaries/CacheDictionary.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnString.h>
#include <Common/BitHelpers.h>
#include <Common/randomSeed.h>
#include <Common/HashTable/Hash.h>
@ -8,6 +11,9 @@
#include <Common/ProfilingScopedRWLock.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <DataTypes/DataTypesNumber.h>
#include <Dictionaries/CacheDictionary.h>
#include <Dictionaries/DictionaryBlockInputStream.h>
#include <ext/size.hpp>
#include <ext/range.hpp>
#include <ext/map.hpp>
@ -946,4 +952,33 @@ CacheDictionary::Attribute & CacheDictionary::getAttribute(const std::string & a
return attributes[it->second];
}
bool CacheDictionary::isEmptyCell(const UInt64 idx) const
{
return (idx != zero_cell_idx && cells[idx].id == 0) || (cells[idx].data
== ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t()));
}
PaddedPODArray<CacheDictionary::Key> CacheDictionary::getCachedIds() const
{
PaddedPODArray<Key> array;
for (size_t idx = 0; idx < cells.size(); ++idx)
{
auto& cell = cells[idx];
if (!isEmptyCell(idx))
{
array.push_back(cell.id);
}
}
return array;
}
BlockInputStreamPtr CacheDictionary::blockInputStreamFromCache() const
{
auto block_input_stream = std::make_unique<DictionaryBlockInputStream<CacheDictionary, Key>>(*this, getCachedIds());
return BlockInputStreamPtr(std::move(block_input_stream));
}
}

View File

@ -16,6 +16,7 @@
#include <tuple>
#include <random>
class A;
namespace DB
{
@ -58,6 +59,10 @@ public:
const IDictionarySource * getSource() const override { return source_ptr.get(); }
BlockInputStreamPtr blockInputStreamFromCache() const;
BlockInputStreamPtr getBlockInputStream() const override { return blockInputStreamFromCache(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
const DictionaryStructure & getStructure() const override { return dict_struct; }
@ -208,6 +213,10 @@ private:
const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated,
AbsentIdHandler && on_id_not_found) const;
PaddedPODArray<Key> getCachedIds() const;
bool isEmptyCell(const UInt64 idx) const;
UInt64 getCellIdx(const Key id) const;
void setDefaultAttributeValue(Attribute & attribute, const Key idx) const;

View File

@ -0,0 +1,137 @@
#pragma once
#include <Columns/IColumn.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnString.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/DictionaryStructure.h>
#include <ext/range.hpp>
namespace DB {
/*
* BlockInputStream implementation for external dictionaries
* read() returns single block consisting of the in-memory contents of the dictionaries
*/
template <class DictionaryType, class Key>
class DictionaryBlockInputStream : public IProfilingBlockInputStream
{
public:
DictionaryBlockInputStream(const DictionaryType& dictionary, const PaddedPODArray<Key> & ids);
String getID() const override;
String getName() const override { return "DictionaryBlockInputStream"; }
protected:
Block readImpl() override;
void readPrefixImpl() override { was_read = false; }
void readSuffixImpl() override { was_read = false; }
private:
Block block;
bool was_read;
template<class Type>
using DictionaryGetter = void (DictionaryType::*)(const std::string &,
const PaddedPODArray<Key> &, PaddedPODArray<Type> &) const;
template <class AttributeType>
ColumnPtr getColumnFromAttribute(DictionaryGetter<AttributeType> getter, const PaddedPODArray<Key>& ids,
const DictionaryAttribute& attribute, const DictionaryType& dictionary);
ColumnPtr getColumnFromAttributeString(const PaddedPODArray<Key>& ids,
const DictionaryAttribute& attribute, const DictionaryType& dictionary);
ColumnPtr getColumnFromIds(const PaddedPODArray<Key>& ids);
};
template <class DictionaryType, class Key>
DictionaryBlockInputStream<DictionaryType, Key>::DictionaryBlockInputStream(const DictionaryType& dictionary,
const PaddedPODArray<Key>& ids)
: was_read(false)
{
ColumnsWithTypeAndName columns;
const DictionaryStructure& structure = dictionary.getStructure();
std::string id_column_name = "id";
if (structure.id)
id_column_name = structure.id->name;
columns.emplace_back(getColumnFromIds(ids), std::make_shared<DataTypeUInt64>(), id_column_name);
for (const auto idx : ext::range(0, structure.attributes.size()))
{
const DictionaryAttribute& attribute = structure.attributes[idx];
ColumnPtr column;
#define GET_COLUMN_FORM_ATTRIBUTE(TYPE)\
column = getColumnFromAttribute<TYPE>(&DictionaryType::get##TYPE, ids, attribute, dictionary)
switch (attribute.underlying_type)
{
case AttributeUnderlyingType::UInt8: GET_COLUMN_FORM_ATTRIBUTE(UInt8); break;
case AttributeUnderlyingType::UInt16: GET_COLUMN_FORM_ATTRIBUTE(UInt16); break;
case AttributeUnderlyingType::UInt32: GET_COLUMN_FORM_ATTRIBUTE(UInt32); break;
case AttributeUnderlyingType::UInt64: GET_COLUMN_FORM_ATTRIBUTE(UInt64); break;
case AttributeUnderlyingType::Int8: GET_COLUMN_FORM_ATTRIBUTE(Int8); break;
case AttributeUnderlyingType::Int16: GET_COLUMN_FORM_ATTRIBUTE(Int16); break;
case AttributeUnderlyingType::Int32: GET_COLUMN_FORM_ATTRIBUTE(Int32); break;
case AttributeUnderlyingType::Int64: GET_COLUMN_FORM_ATTRIBUTE(Int64); break;
case AttributeUnderlyingType::Float32: GET_COLUMN_FORM_ATTRIBUTE(Float32); break;
case AttributeUnderlyingType::Float64: GET_COLUMN_FORM_ATTRIBUTE(Float64); break;
case AttributeUnderlyingType::String: column = getColumnFromAttributeString(ids, attribute, dictionary); break;
}
columns.emplace_back(column, attribute.type, attribute.name);
}
block = Block(columns);
}
template <class DictionaryType, class Key>
String DictionaryBlockInputStream<DictionaryType, Key>::getID() const
{
std::stringstream ss;
ss << static_cast<const void*> (this);
return ss.str();
}
template <class DictionaryType, class Key>
Block DictionaryBlockInputStream<DictionaryType, Key>::readImpl()
{
if (was_read)
return Block();
was_read = true;
return block;
}
template <class DictionaryType, class Key>
template <class AttributeType>
ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromAttribute(
DictionaryBlockInputStream::DictionaryGetter<AttributeType> getter,
const PaddedPODArray<Key>& ids, const DictionaryAttribute& attribute, const DictionaryType& dictionary)
{
auto column_vector = std::make_unique<ColumnVector<AttributeType>>(ids.size());
(dictionary.*getter)(attribute.name, ids, column_vector->getData());
return ColumnPtr(std::move(column_vector));
}
template <class DictionaryType, class Key>
ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromAttributeString(
const PaddedPODArray<Key>& ids, const DictionaryAttribute& attribute, const DictionaryType& dictionary)
{
auto column_string = std::make_unique<ColumnString>();
dictionary.getString(attribute.name, ids, column_string.get());
return ColumnPtr(std::move(column_string));
}
template <class DictionaryType, class Key>
ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromIds(const PaddedPODArray<Key>& ids)
{
auto column_vector = std::make_unique<ColumnVector<UInt64>>();
column_vector->getData().reserve(ids.size());
for (UInt64 id : ids)
{
column_vector->insert(id);
}
return ColumnPtr(std::move(column_vector));
}
}

View File

@ -191,6 +191,8 @@ private:
const AncestorType & ancestor_ids,
PaddedPODArray<UInt8> & out) const;
PaddedPODArray<Key> getCachedIds() const;
const std::string name;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;

View File

@ -2,6 +2,8 @@
#include <Core/Field.h>
#include <Core/StringRef.h>
#include <DataStreams/IBlockInputStream.h>
#include <Dictionaries/IDictionarySource.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Common/PODArray.h>
#include <memory>
@ -53,6 +55,8 @@ struct IDictionaryBase
virtual bool isInjective(const std::string & attribute_name) const = 0;
virtual BlockInputStreamPtr getBlockInputStream() const { return const_cast<IDictionarySource*>(getSource())->loadAll(); }
virtual ~IDictionaryBase() = default;
};

View File

@ -3,8 +3,10 @@
#include <Parsers/ASTCreateQuery.h>
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/CacheDictionary.h>
#include <Storages/StorageDictionary.h>
#include <Interpreters/Context.h>
#include <common/logger_useful.h>
namespace DB {
@ -61,14 +63,13 @@ BlockInputStreams StorageDictionary::read(
const unsigned threads)
{
processed_stage = QueryProcessingStage::FetchColumns;
IDictionarySource* source = const_cast<IDictionarySource*>(dictionary->getSource());
return BlockInputStreams{source->loadAll()};
return BlockInputStreams{dictionary->getBlockInputStream()};
}
void StorageDictionary::checkNamesAndTypesCompatibleWithDictionary()
{
const DictionaryStructure & dictionaryStructure = dictionary->getStructure();
std::set<NameAndTypePair> dictionaryNamesAndTypes;
for (const auto & attribute : dictionaryStructure.attributes) {
dictionaryNamesAndTypes.insert(NameAndTypePair(attribute.name, attribute.type));