dbms: sparse array implementation of FlatDictionary. [#METR-13298]

This commit is contained in:
Andrey Mironov 2015-01-22 17:32:38 +03:00
parent 300b7342e7
commit 92664ed612
8 changed files with 522 additions and 28 deletions

View File

@ -1,7 +1,6 @@
#pragma once
#include <DB/Dictionaries/FlatDictionary.h>
#include <DB/Dictionaries/IDictionary.h>
#include <Yandex/singleton.h>
#include <statdaemons/ext/memory.hpp>
#include <Poco/Util/AbstractConfiguration.h>
@ -12,9 +11,40 @@ namespace DB
class DictionaryFactory : public Singleton<DictionaryFactory>
{
public:
DictionaryPtr create() const
DictionaryPtr create(const Poco::Util::XMLConfiguration & config, const std::string & config_prefix,
const Context & context) const
{
return ext::make_unique<FlatDictionary>();
const auto & layout_prefix = config_prefix + "layout.";
auto dict_struct = DictionaryStructure::fromXML(config, config_prefix + "structure");
if (config.has(layout_prefix + "flat"))
{
return ext::make_unique<FlatDictionary>(dict_struct, config, config_prefix, context);
}
else if (config.has(layout_prefix + "hashed"))
{
throw Exception{
"Dictionary of type 'hashed' is not yet implemented",
ErrorCodes::NOT_IMPLEMENTED
};
}
else if (config.has(layout_prefix + "cache"))
{
const auto size = config.getInt(layout_prefix + "cache.size", 0);
if (size == 0)
throw Exception{
"Dictionary of type 'cache' cannot have size of 0 bytes",
ErrorCodes::TOO_SMALL_BUFFER_SIZE
};
throw Exception{
"Dictionary of type 'cache' is not yet implemented",
ErrorCodes::NOT_IMPLEMENTED
};
}
throw Exception{"No dictionary type specified", ErrorCodes::BAD_ARGUMENTS};
}
};

View File

@ -1,6 +1,11 @@
#pragma once
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Core/Block.h>
#include <DB/Interpreters/Context.h>
#include <DB/DataStreams/TabSeparatedRowInputStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <statdaemons/ext/range.hpp>
#include <Poco/Util/XMLConfiguration.h>
#include <map>
#include <vector>
@ -8,25 +13,259 @@
namespace DB
{
const auto max_array_size = 500000;
const auto max_block_size = 8192;
/// @todo manage arrays using std::vector or PODArray, start with an initial size, expand up to max_array_size
class FlatDictionary : public IDictionary
{
public:
FlatDictionary() = default;
FlatDictionary(const DictionaryStructure & dict_struct, const Poco::Util::XMLConfiguration & config,
const std::string & config_prefix, const Context & context)
{
for (const auto & attribute : dict_struct.attributes)
{
attributes.emplace(attribute.name,
createAttributeWithType(getAttributeTypeByName(attribute.type), attribute.null_value));
StringRef getString(const id_t id, const std::string & attribute_name) const override {
return { "", 0 };
}
if (attribute.hierarchical)
hierarchical_attribute = &attributes[attribute.name];
}
UInt64 getUInt64(const id_t id, const std::string & attribute_name) const override {
return 0;
if (config.has(config_prefix + "source.file"))
{
const auto & file_name = config.getString(config_prefix + "source.file.path");
const auto & format = config.getString(config_prefix + "source.file.format");
ReadBufferFromFile in{file_name};
auto sample_block = createSampleBlock(dict_struct, context);
auto stream = context.getFormatFactory().getInput(
format, in, sample_block, max_block_size, context.getDataTypeFactory());
while (const auto block = stream->read())
{
const auto & id_column = *block.getByPosition(0).column;
for (const auto attribute_idx : ext::range(1, attributes.size()))
{
const auto & attribute_column = *block.getByPosition(attribute_idx).column;
auto & attribute = attributes[dict_struct.attributes[attribute_idx - 1].name];
for (const auto row_idx : ext::range(1, attribute_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
}
}
private:
using value_t = std::pair<id_t, Field>;
using attribute_t = std::vector<value_t>;
using attributes_t = std::map<std::string, attribute_t>;
UInt64 getUInt64(const id_t id, const std::string & attribute_name) const override
{
const auto & attribute = findAttribute(attribute_name);
attribute_t attributes;
if (attribute.type != attribute_type::uint64)
throw Exception{
"Type mismatch: attribute " + attribute_name + " has a type different from UInt64",
ErrorCodes::TYPE_MISMATCH
};
if (id < max_array_size)
return attribute.uint64_array[id];
return attribute.uint64_null_value;
}
StringRef getString(const id_t id, const std::string & attribute_name) const override
{
const auto & attribute = findAttribute(attribute_name);
if (attribute.type != attribute_type::string)
throw Exception{
"Type mismatch: attribute " + attribute_name + " has a type different from String",
ErrorCodes::TYPE_MISMATCH
};
if (id < max_array_size)
return { attribute.string_array[id].data(), attribute.string_array[id].size() };
return { attribute.string_null_value.data(), attribute.string_null_value.size() };
}
private:
enum class attribute_type
{
uint8,
uint16,
uint32,
uint64,
int8,
int16,
int32,
int64,
string
};
struct attribute_t
{
attribute_type type;
UInt8 uint8_null_value;
UInt16 uint16_null_value;
UInt32 uint32_null_value;
UInt64 uint64_null_value;
Int8 int8_null_value;
Int16 int16_null_value;
Int32 int32_null_value;
Int64 int64_null_value;
String string_null_value;
std::unique_ptr<UInt8[]> uint8_array;
std::unique_ptr<UInt16[]> uint16_array;
std::unique_ptr<UInt32[]> uint32_array;
std::unique_ptr<UInt64[]> uint64_array;
std::unique_ptr<Int8[]> int8_array;
std::unique_ptr<Int16[]> int16_array;
std::unique_ptr<Int32[]> int32_array;
std::unique_ptr<Int64[]> int64_array;
std::unique_ptr<String[]> string_array;
};
using attributes_t = std::map<std::string, attribute_t>;
attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value)
{
attribute_t attr{type};
switch (type)
{
case attribute_type::uint8:
attr.uint8_null_value = DB::parse<UInt8>(null_value);
attr.uint8_array.reset(new UInt8[max_array_size]);
std::fill(attr.uint8_array.get(), attr.uint8_array.get() + max_array_size, attr.uint8_null_value);
break;
case attribute_type::uint16:
attr.uint16_null_value = DB::parse<UInt16>(null_value);
attr.uint16_array.reset(new UInt16[max_array_size]);
std::fill(attr.uint16_array.get(), attr.uint16_array.get() + max_array_size, attr.uint16_null_value);
break;
case attribute_type::uint32:
attr.uint32_null_value = DB::parse<UInt32>(null_value);
attr.uint32_array.reset(new UInt32[max_array_size]);
std::fill(attr.uint32_array.get(), attr.uint32_array.get() + max_array_size, attr.uint32_null_value);
break;
case attribute_type::uint64:
attr.uint64_null_value = DB::parse<UInt64>(null_value);
attr.uint64_array.reset(new UInt64[max_array_size]);
std::fill(attr.uint64_array.get(), attr.uint64_array.get() + max_array_size, attr.uint64_null_value);
break;
case attribute_type::int8:
attr.int8_null_value = DB::parse<Int8>(null_value);
attr.int8_array.reset(new Int8[max_array_size]);
std::fill(attr.int8_array.get(), attr.int8_array.get() + max_array_size, attr.int8_null_value);
break;
case attribute_type::int16:
attr.int16_null_value = DB::parse<Int16>(null_value);
attr.int16_array.reset(new Int16[max_array_size]);
std::fill(attr.int16_array.get(), attr.int16_array.get() + max_array_size, attr.int16_null_value);
break;
case attribute_type::int32:
attr.int32_null_value = DB::parse<Int32>(null_value);
attr.int32_array.reset(new Int32[max_array_size]);
std::fill(attr.int32_array.get(), attr.int32_array.get() + max_array_size, attr.int32_null_value);
break;
case attribute_type::int64:
attr.int64_null_value = DB::parse<Int64>(null_value);
attr.int64_array.reset(new Int64[max_array_size]);
std::fill(attr.int64_array.get(), attr.int64_array.get() + max_array_size, attr.int64_null_value);
break;
case attribute_type::string:
attr.string_null_value = null_value;
attr.string_array.reset(new String[max_array_size]);
std::fill(attr.string_array.get(), attr.string_array.get() + max_array_size, attr.string_null_value);
break;
}
return attr;
}
attribute_type getAttributeTypeByName(const std::string & type)
{
static const std::unordered_map<std::string, attribute_type> dictionary{
{ "UInt8", attribute_type::uint8 },
{ "UInt16", attribute_type::uint16 },
{ "UInt32", attribute_type::uint32 },
{ "UInt64", attribute_type::uint64 },
{ "Int8", attribute_type::int8 },
{ "Int16", attribute_type::int16 },
{ "Int32", attribute_type::int32 },
{ "Int64", attribute_type::int64 },
{ "String", attribute_type::string },
};
const auto it = dictionary.find(type);
if (it != std::end(dictionary))
return it->second;
throw Exception{
"Unknown type " + type,
ErrorCodes::UNKNOWN_TYPE
};
}
void setAttributeValue(attribute_t & attribute, const id_t id, const Field & value)
{
if (id >= max_array_size)
throw Exception{
"Identifier should be less than " + toString(max_array_size),
ErrorCodes::ARGUMENT_OUT_OF_BOUND
};
switch (attribute.type)
{
case attribute_type::uint8: attribute.uint8_array[id] = value.get<UInt64>(); break;
case attribute_type::uint16: attribute.uint16_array[id] = value.get<UInt64>(); break;
case attribute_type::uint32: attribute.uint32_array[id] = value.get<UInt64>(); break;
case attribute_type::uint64: attribute.uint64_array[id] = value.get<UInt64>(); break;
case attribute_type::int8: attribute.int8_array[id] = value.get<Int64>(); break;
case attribute_type::int16: attribute.int16_array[id] = value.get<Int64>(); break;
case attribute_type::int32: attribute.int32_array[id] = value.get<Int64>(); break;
case attribute_type::int64: attribute.int64_array[id] = value.get<Int64>(); break;
case attribute_type::string: attribute.string_array[id] = value.get<String>(); break;
}
}
static Block createSampleBlock(const DictionaryStructure & dict_struct, const Context & context)
{
Block block{
ColumnWithNameAndType{
new ColumnUInt64,
new DataTypeUInt64,
dict_struct.id_name
}
};
for (const auto & attribute : dict_struct.attributes)
{
const auto & type = context.getDataTypeFactory().get(attribute.type);
block.insert(ColumnWithNameAndType{
type->createColumn(), type, attribute.name
});
}
return block;
}
const attribute_t & findAttribute(const std::string & attribute_name) const
{
const auto it = attributes.find(attribute_name);
if (it == std::end(attributes))
throw Exception{
"No such attribute '" + attribute_name + "'",
ErrorCodes::BAD_ARGUMENTS
};
return it->second;
}
attributes_t attributes;
const attribute_t * hierarchical_attribute = nullptr;
};
}

View File

@ -2,6 +2,7 @@
#include <DB/Core/Field.h>
#include <memory>
#include <Poco/Util/XMLConfiguration.h>
namespace DB
{
@ -11,12 +12,78 @@ class IDictionary
public:
using id_t = std::uint64_t;
virtual StringRef getString(const id_t id, const std::string & attribute_name) const = 0;
virtual UInt64 getUInt64(const id_t id, const std::string & attribute_name) const = 0;
virtual StringRef getString(const id_t id, const std::string & attribute_name) const = 0;
virtual ~IDictionary() = default;
};
using DictionaryPtr = std::unique_ptr<IDictionary>;
struct DictionaryAttribute
{
std::string name;
std::string type;
std::string null_value;
bool hierarchical;
bool injective;
};
struct DictionaryStructure
{
std::string id_name;
std::vector<DictionaryAttribute> attributes;
static DictionaryStructure fromXML(const Poco::Util::XMLConfiguration & config, const std::string & config_prefix)
{
const auto & id_name = config.getString(config_prefix + ".id.name");
if (id_name.empty())
throw Exception{
"No 'id' specified for dictionary",
ErrorCodes::BAD_ARGUMENTS
};
DictionaryStructure result{id_name};
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
auto has_hierarchy = false;
for (const auto & key : keys)
{
if (0 != strncmp(key.data(), "attribute", strlen("attribute")))
continue;
const auto & prefix = config_prefix + '.' + key + '.';
const auto & name = config.getString(prefix + "name");
const auto & type = config.getString(prefix + "type");
const auto & null_value = config.getString(prefix + "null_value");
const auto hierarchical = config.getBool(prefix + "hierarchical", false);
const auto injective = config.getBool(prefix + "injective", false);
if (name.empty() || type.empty())
throw Exception{
"Properties 'name' and 'type' of an attribute cannot be empty",
ErrorCodes::BAD_ARGUMENTS
};
if (has_hierarchy && hierarchical)
throw Exception{
"Only one hierarchical attribute supported",
ErrorCodes::BAD_ARGUMENTS
};
has_hierarchy = has_hierarchy || hierarchical;
result.attributes.emplace_back(DictionaryAttribute{name, type, null_value, hierarchical, injective});
}
if (result.attributes.empty())
throw Exception{
"Dictionary has no attributes defined",
ErrorCodes::BAD_ARGUMENTS
};
return result;
}
};
}

View File

@ -908,4 +908,146 @@ private:
const Dictionaries & dictionaries;
};
template <typename IntegralType>
class FunctionDictGetInteger: public IFunction
{
public:
static const std::string name;
static IFunction * create(const Context & context)
{
return new FunctionDictGetInteger{context.getDictionaries()};
};
FunctionDictGetInteger(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
private:
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if (arguments.size() != 3)
throw Exception{
"Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 3.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
};
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
if (!typeid_cast<const DataTypeString *>(arguments[1].get()))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
const auto id_arg = arguments[2].get();
if (!typeid_cast<const DataTypeUInt8 *>(id_arg) &&
!typeid_cast<const DataTypeUInt16 *>(id_arg) &&
!typeid_cast<const DataTypeUInt32 *>(id_arg) &&
!typeid_cast<const DataTypeUInt64 *>(id_arg) &&
!typeid_cast<const DataTypeInt8 *>(id_arg) &&
!typeid_cast<const DataTypeInt16 *>(id_arg) &&
!typeid_cast<const DataTypeInt32 *>(id_arg) &&
!typeid_cast<const DataTypeInt64 *>(id_arg))
{
throw Exception{
"Illegal type " + arguments[2]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
return new typename DataTypeFromFieldType<IntegralType>::Type;
}
void execute(Block & block, const ColumnNumbers & arguments, const size_t result)
{
const auto dict_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[0]).column.get());
if (!dict_name_col)
throw Exception{
"First argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
const auto attr_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[1]).column.get());
if (!attr_name_col)
throw Exception{
"Second argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
const auto & attr_name = attr_name_col->getData();
const auto id_col = block.getByPosition(arguments[2]).column.get();
if (!execute<UInt8>(block, result, dict, attr_name, id_col) &&
!execute<UInt16>(block, result, dict, attr_name, id_col) &&
!execute<UInt32>(block, result, dict, attr_name, id_col) &&
!execute<UInt64>(block, result, dict, attr_name, id_col) &&
!execute<Int8>(block, result, dict, attr_name, id_col) &&
!execute<Int16>(block, result, dict, attr_name, id_col) &&
!execute<Int32>(block, result, dict, attr_name, id_col) &&
!execute<Int64>(block, result, dict, attr_name, id_col))
{
throw Exception{
"Third argument of function " + getName() + " must be integral",
ErrorCodes::ILLEGAL_COLUMN
};
}
}
template <typename T>
bool execute(Block & block, const size_t result, const MultiVersion<IDictionary>::Version & dictionary,
const std::string & attr_name, const IColumn * const id_col_untyped)
{
if (const auto id_col = typeid_cast<const ColumnVector<T> *>(id_col_untyped))
{
const auto out = new ColumnVector<IntegralType>;
block.getByPosition(result).column = out;
for (const auto & id : id_col->getData())
out->insert(dictionary->getUInt64(id, attr_name));
return true;
}
else if (const auto id_col = typeid_cast<const ColumnConst<T> *>(id_col_untyped))
{
block.getByPosition(result).column = new ColumnConst<IntegralType>{
id_col->size(),
static_cast<IntegralType>(dictionary->getUInt64(id_col->getData(), attr_name))
};
return true;
};
return false;
}
const Dictionaries & dictionaries;
};
template <typename IntegralType>
const std::string FunctionDictGetInteger<IntegralType>::name = "dictGet" + TypeName<IntegralType>::get();
using FunctionDictGetUInt8 = FunctionDictGetInteger<UInt8>;
using FunctionDictGetUInt16 = FunctionDictGetInteger<UInt16>;
using FunctionDictGetUInt32 = FunctionDictGetInteger<UInt32>;
using FunctionDictGetUInt64 = FunctionDictGetInteger<UInt64>;
using FunctionDictGetInt8 = FunctionDictGetInteger<Int8>;
using FunctionDictGetInt16 = FunctionDictGetInteger<Int16>;
using FunctionDictGetInt32 = FunctionDictGetInteger<Int32>;
using FunctionDictGetInt64 = FunctionDictGetInteger<Int64>;
}

View File

@ -13,7 +13,6 @@
#include <statdaemons/RegionsNames.h>
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Dictionaries/FlatDictionary.h>
namespace DB
@ -21,6 +20,8 @@ namespace DB
using Poco::SharedPtr;
class Context;
/// Словари Метрики, которые могут использоваться в функциях.
class Dictionaries
@ -32,6 +33,7 @@ private:
MultiVersion<RegionsNames> regions_names;
std::unordered_map<std::string, std::shared_ptr<MultiVersion<IDictionary>>> external_dictionaries;
const Context & context;
/// Периодичность обновления справочников, в секундах.
int reload_period;
@ -153,8 +155,8 @@ private:
public:
/// Справочники будут обновляться в отдельном потоке, каждые reload_period секунд.
Dictionaries(int reload_period_ = 3600)
: reload_period(reload_period_),
Dictionaries(const Context & context, int reload_period_ = 3600)
: context(context), reload_period(reload_period_),
log(&Logger::get("Dictionaries"))
{
reloadImpl();

View File

@ -25,6 +25,14 @@ void registerFunctionsDictionaries(FunctionFactory & factory)
factory.registerFunction<FunctionSEHierarchy>();
factory.registerFunction<FunctionCategoryHierarchy>();
factory.registerFunction<FunctionRegionToName>();
factory.registerFunction<FunctionDictGetUInt8>();
factory.registerFunction<FunctionDictGetUInt16>();
factory.registerFunction<FunctionDictGetUInt32>();
factory.registerFunction<FunctionDictGetUInt64>();
factory.registerFunction<FunctionDictGetInt8>();
factory.registerFunction<FunctionDictGetInt16>();
factory.registerFunction<FunctionDictGetInt32>();
factory.registerFunction<FunctionDictGetInt64>();
factory.registerFunction<FunctionDictGetString>();
}

View File

@ -496,7 +496,7 @@ const Dictionaries & Context::getDictionaries() const
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->dictionaries)
shared->dictionaries = new Dictionaries;
shared->dictionaries = new Dictionaries{*this};
return *shared->dictionaries;
}

View File

@ -18,6 +18,9 @@ namespace
void Dictionaries::reloadExternalDictionaries()
{
const auto config_path = Poco::Util::Application::instance().config().getString("dictionaries_config");
if (config_path.empty())
return;
const config_ptr_t<Poco::Util::XMLConfiguration> config{new Poco::Util::XMLConfiguration{config_path}};
/// get all dictionaries' definitions
@ -29,28 +32,31 @@ void Dictionaries::reloadExternalDictionaries()
{
if (0 != strncmp(key.data(), "dictionary", strlen("dictionary")))
{
/// @todo maybe output a warning
LOG_WARNING(log, "unknown node in dictionaries file: '" + key + "', 'dictionary'");
continue;
}
std::cout << key << std::endl;
const auto & prefix = key + '.';
const auto & name = config->getString(prefix + "name");
if (name.empty())
{
/// @todo handle error, dictionary name cannot be empty
LOG_WARNING(log, "dictionary name cannot be empty");
continue;
}
auto dict_ptr = DictionaryFactory::instance().create();
const auto it = external_dictionaries.find(name);
if (it == std::end(external_dictionaries))
try
{
external_dictionaries.emplace(name, std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release()));
auto dict_ptr = DictionaryFactory::instance().create(*config, prefix, context);
const auto it = external_dictionaries.find(name);
if (it == std::end(external_dictionaries))
external_dictionaries.emplace(name, std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release()));
else
it->second->set(dict_ptr.release());
}
else
catch (const Exception &)
{
it->second->set(dict_ptr.release());
handleException();
}
}
};