This commit is contained in:
Andrey Mironov 2015-02-04 14:01:06 +03:00
commit cee209db3e
23 changed files with 2606 additions and 8 deletions

View File

@ -232,6 +232,17 @@ public:
c_end = c_start + byte_size(n);
}
void resize_fill(size_t n, const T & value)
{
size_t old_size = size();
if (n > old_size)
{
reserve(n);
std::fill(t_end(), reinterpret_cast<T *>(c_end + n - old_size), value);
}
c_end = c_start + byte_size(n);
}
void clear()
{
c_end = c_start;

View File

@ -21,7 +21,7 @@ struct StringRef
StringRef(const char * data_, size_t size_) : data(data_), size(size_) {}
StringRef(const unsigned char * data_, size_t size_) : data(reinterpret_cast<const char *>(data_)), size(size_) {}
StringRef(const std::string & s) : data(s.data()), size(s.size()) {}
StringRef() {}
StringRef() = default;
std::string toString() const { return std::string(data, size); }
};

View File

@ -0,0 +1,127 @@
#pragma once
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/executeQuery.h>
#include <statdaemons/ext/range.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Net/NetworkInterface.h>
namespace DB
{
const auto max_connections = 1;
class ClickhouseDictionarySource final : public IDictionarySource
{
static const auto max_block_size = 8192;
public:
ClickhouseDictionarySource(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block, Context & context)
: host{config.getString(config_prefix + "host")},
port(config.getInt(config_prefix + "port")),
user{config.getString(config_prefix + "user", "")},
password{config.getString(config_prefix + "password", "")},
db{config.getString(config_prefix + "db", "")},
table{config.getString(config_prefix + "table")},
sample_block{sample_block}, context(context),
is_local{isLocal(host, port)},
pool{is_local ? nullptr : ext::make_unique<ConnectionPool>(
max_connections, host, port, db, user, password, context.getDataTypeFactory(),
"ClickhouseDictionarySource")
},
load_all_query{composeLoadAllQuery(sample_block, table)}
{}
ClickhouseDictionarySource(const ClickhouseDictionarySource & other)
: host{other.host}, port{other.port}, user{other.user}, password{other.password},
db{other.db}, table{other.db},
sample_block{other.sample_block}, context(other.context),
is_local{other.is_local},
pool{is_local ? nullptr : ext::make_unique<ConnectionPool>(
max_connections, host, port, db, user, password, context.getDataTypeFactory(),
"ClickhouseDictionarySource")},
load_all_query{other.load_all_query}
{}
BlockInputStreamPtr loadAll() override
{
if (is_local)
return executeQuery(load_all_query, context).in;
return new RemoteBlockInputStream{pool.get(), load_all_query, nullptr};
}
BlockInputStreamPtr loadId(const std::uint64_t id) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
/// @todo check update time somehow
bool isModified() const override { return true; }
DictionarySourcePtr clone() const override { return ext::make_unique<ClickhouseDictionarySource>(*this); }
private:
static std::string composeLoadAllQuery(const Block & block, const std::string & table)
{
std::string query{"SELECT "};
auto first = true;
for (const auto idx : ext::range(0, block.columns()))
{
if (!first)
query += ", ";
query += block.getByPosition(idx).name;
first = false;
}
query += " FROM " + table + ';';
return query;
}
static bool isLocal(const std::string & host, const UInt16 port)
{
const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0);
static auto interfaces = Poco::Net::NetworkInterface::list();
if (clickhouse_port == port)
{
return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
[&] (const Poco::Net::NetworkInterface & interface) {
return interface.address() == Poco::Net::IPAddress(host);
});
}
return false;
}
const std::string host;
const UInt16 port;
const std::string user;
const std::string password;
const std::string db;
const std::string table;
Block sample_block;
Context & context;
const bool is_local;
std::unique_ptr<ConnectionPool> pool;
const std::string load_all_query;
};
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <DB/Dictionaries/IDictionary.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
class Context;
class DictionaryFactory : public Singleton<DictionaryFactory>
{
public:
DictionaryPtr create(const std::string & name, Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, Context & context) const;
};
}

View File

@ -0,0 +1,71 @@
#pragma once
#include <DB/Core/Block.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Dictionaries/FileDictionarySource.h>
#include <DB/Dictionaries/MysqlDictionarySource.h>
#include <DB/Dictionaries/ClickhouseDictionarySource.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <Yandex/singleton.h>
#include <statdaemons/ext/memory.hpp>
namespace DB
{
namespace
{
Block createSampleBlock(const DictionaryStructure & dict_struct, const Context & context)
{
Block block{
ColumnWithNameAndType{
new ColumnUInt64,
new DataTypeUInt64,
dict_struct.id_name
}
};
for (const auto & attribute : dict_struct.attributes)
{
const auto & type = context.getDataTypeFactory().get(attribute.type);
block.insert(ColumnWithNameAndType{
type->createColumn(), type, attribute.name
});
}
return block;
}
}
class DictionarySourceFactory : public Singleton<DictionarySourceFactory>
{
public:
DictionarySourcePtr create(Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const DictionaryStructure & dict_struct,
Context & context) const
{
auto sample_block = createSampleBlock(dict_struct, context);
if (config.has(config_prefix + "file"))
{
const auto & filename = config.getString(config_prefix + "file.path");
const auto & format = config.getString(config_prefix + "file.format");
return ext::make_unique<FileDictionarySource>(filename, format, sample_block, context);
}
else if (config.has(config_prefix + "mysql"))
{
return ext::make_unique<MysqlDictionarySource>(config, config_prefix + "mysql.", sample_block, context);
}
else if (config.has(config_prefix + "clickhouse"))
{
return nullptr;//ext::make_unique<ClickhouseDictionarySource>(config, config_prefix + "clickhouse.",
//sample_block, context);
}
throw Exception{"unsupported source type"};
}
};
}

View File

@ -0,0 +1,157 @@
#pragma once
#include <DB/Core/ErrorCodes.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <vector>
#include <string>
#include <map>
namespace DB
{
enum class attribute_type
{
uint8,
uint16,
uint32,
uint64,
int8,
int16,
int32,
int64,
float32,
float64,
string
};
inline attribute_type getAttributeTypeByName(const std::string & type)
{
static const std::unordered_map<std::string, attribute_type> dictionary{
{ "UInt8", attribute_type::uint8 },
{ "UInt16", attribute_type::uint16 },
{ "UInt32", attribute_type::uint32 },
{ "UInt64", attribute_type::uint64 },
{ "Int8", attribute_type::int8 },
{ "Int16", attribute_type::int16 },
{ "Int32", attribute_type::int32 },
{ "Int64", attribute_type::int64 },
{ "Float32", attribute_type::float32 },
{ "Float64", attribute_type::float64 },
{ "String", attribute_type::string },
};
const auto it = dictionary.find(type);
if (it != std::end(dictionary))
return it->second;
throw Exception{
"Unknown type " + type,
ErrorCodes::UNKNOWN_TYPE
};
}
inline std::string toString(const attribute_type type)
{
switch (type)
{
case attribute_type::uint8: return "UInt8";
case attribute_type::uint16: return "UInt16";
case attribute_type::uint32: return "UInt32";
case attribute_type::uint64: return "UInt64";
case attribute_type::int8: return "Int8";
case attribute_type::int16: return "Int16";
case attribute_type::int32: return "Int32";
case attribute_type::int64: return "Int64";
case attribute_type::float32: return "Float32";
case attribute_type::float64: return "Float64";
case attribute_type::string: return "String";
}
throw Exception{
"Unknown attribute_type " + toString(type),
ErrorCodes::ARGUMENT_OUT_OF_BOUND
};
}
struct DictionaryLifetime
{
std::uint64_t min_sec;
std::uint64_t max_sec;
static DictionaryLifetime fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
const auto & lifetime_min_key = config_prefix + ".min";
const auto has_min = config.has(lifetime_min_key);
const std::uint64_t min_update_time = has_min ? config.getInt(lifetime_min_key) : config.getInt(config_prefix);
const std::uint64_t max_update_time = has_min ? config.getInt(config_prefix + ".max") : min_update_time;
return { min_update_time, max_update_time };
}
};
struct DictionaryAttribute
{
std::string name;
std::string type;
std::string null_value;
bool hierarchical;
bool injective;
};
struct DictionaryStructure
{
std::string id_name;
std::vector<DictionaryAttribute> attributes;
static DictionaryStructure fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
const auto & id_name = config.getString(config_prefix + ".id.name");
if (id_name.empty())
throw Exception{
"No 'id' specified for dictionary",
ErrorCodes::BAD_ARGUMENTS
};
DictionaryStructure result{id_name};
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
auto has_hierarchy = false;
for (const auto & key : keys)
{
if (0 != strncmp(key.data(), "attribute", strlen("attribute")))
continue;
const auto & prefix = config_prefix + '.' + key + '.';
const auto & name = config.getString(prefix + "name");
const auto & type = config.getString(prefix + "type");
const auto & null_value = config.getString(prefix + "null_value");
const auto hierarchical = config.getBool(prefix + "hierarchical", false);
const auto injective = config.getBool(prefix + "injective", false);
if (name.empty() || type.empty())
throw Exception{
"Properties 'name' and 'type' of an attribute cannot be empty",
ErrorCodes::BAD_ARGUMENTS
};
if (has_hierarchy && hierarchical)
throw Exception{
"Only one hierarchical attribute supported",
ErrorCodes::BAD_ARGUMENTS
};
has_hierarchy = has_hierarchy || hierarchical;
result.attributes.emplace_back(DictionaryAttribute{name, type, null_value, hierarchical, injective});
}
if (result.attributes.empty())
throw Exception{
"Dictionary has no attributes defined",
ErrorCodes::BAD_ARGUMENTS
};
return result;
}
};
}

View File

@ -0,0 +1,70 @@
#pragma once
#include <DB/Interpreters/Context.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/OwningBufferBlockInputStream.h>
#include <Poco/Timestamp.h>
#include <Poco/File.h>
namespace DB
{
class FileDictionarySource final : public IDictionarySource
{
static const auto max_block_size = 8192;
public:
FileDictionarySource(const std::string & filename, const std::string & format, Block & sample_block,
Context & context)
: filename{filename}, format{format}, sample_block{sample_block}, context(context),
last_modification{getLastModification()}
{}
FileDictionarySource(const FileDictionarySource & other)
: filename{other.filename}, format{other.format},
sample_block{other.sample_block}, context(other.context),
last_modification{other.last_modification}
{}
BlockInputStreamPtr loadAll() override
{
auto in_ptr = ext::make_unique<ReadBufferFromFile>(filename);
auto stream = context.getFormatFactory().getInput(
format, *in_ptr, sample_block, max_block_size, context.getDataTypeFactory());
last_modification = getLastModification();
return new OwningBufferBlockInputStream{stream, std::move(in_ptr)};
}
BlockInputStreamPtr loadId(const std::uint64_t id) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
bool isModified() const override { return getLastModification() > last_modification; }
DictionarySourcePtr clone() const override { return ext::make_unique<FileDictionarySource>(*this); }
private:
Poco::Timestamp getLastModification() const { return Poco::File{filename}.getLastModified(); }
const std::string filename;
const std::string format;
Block sample_block;
Context & context;
Poco::Timestamp last_modification;
};
}

View File

@ -0,0 +1,383 @@
#pragma once
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/DictionarySourceFactory.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <statdaemons/ext/range.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <vector>
namespace DB
{
const auto initial_array_size = 1024;
const auto max_array_size = 500000;
class FlatDictionary final : public IDictionary
{
public:
FlatDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime)
: name{name}, dict_struct(dict_struct),
source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime)
{
createAttributes();
loadData();
}
FlatDictionary(const FlatDictionary & other)
: FlatDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime}
{}
std::string getName() const override { return name; }
std::string getTypeName() const override { return "FlatDictionary"; }
bool isCached() const override { return false; }
DictionaryPtr clone() const override { return ext::make_unique<FlatDictionary>(*this); }
const IDictionarySource * const getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
bool hasHierarchy() const override { return hierarchical_attribute; }
id_t toParent(const id_t id) const override
{
const auto attr = hierarchical_attribute;
switch (hierarchical_attribute->type)
{
case attribute_type::uint8: return id < attr->uint8_array->size() ? (*attr->uint8_array)[id] : attr->uint8_null_value;
case attribute_type::uint16: return id < attr->uint16_array->size() ? (*attr->uint16_array)[id] : attr->uint16_null_value;
case attribute_type::uint32: return id < attr->uint32_array->size() ? (*attr->uint32_array)[id] : attr->uint32_null_value;
case attribute_type::uint64: return id < attr->uint64_array->size() ? (*attr->uint64_array)[id] : attr->uint64_null_value;
case attribute_type::int8: return id < attr->int8_array->size() ? (*attr->int8_array)[id] : attr->int8_null_value;
case attribute_type::int16: return id < attr->int16_array->size() ? (*attr->int16_array)[id] : attr->int16_null_value;
case attribute_type::int32: return id < attr->int32_array->size() ? (*attr->int32_array)[id] : attr->int32_null_value;
case attribute_type::int64: return id < attr->int64_array->size() ? (*attr->int64_array)[id] : attr->int64_null_value;
case attribute_type::float32:
case attribute_type::float64:
case attribute_type::string:
break;
}
throw Exception{
"Hierarchical attribute has non-integer type " + toString(hierarchical_attribute->type),
ErrorCodes::TYPE_MISMATCH
};
}
#define DECLARE_SAFE_GETTER(TYPE, NAME, LC_TYPE) \
TYPE get##NAME(const std::string & attribute_name, const id_t id) const override\
{\
const auto idx = getAttributeIndex(attribute_name);\
const auto & attribute = attributes[idx];\
if (attribute.type != attribute_type::LC_TYPE)\
throw Exception{\
"Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\
ErrorCodes::TYPE_MISMATCH\
};\
if (id < attribute.LC_TYPE##_array->size())\
return (*attribute.LC_TYPE##_array)[id];\
return attribute.LC_TYPE##_null_value;\
}
DECLARE_SAFE_GETTER(UInt8, UInt8, uint8)
DECLARE_SAFE_GETTER(UInt16, UInt16, uint16)
DECLARE_SAFE_GETTER(UInt32, UInt32, uint32)
DECLARE_SAFE_GETTER(UInt64, UInt64, uint64)
DECLARE_SAFE_GETTER(Int8, Int8, int8)
DECLARE_SAFE_GETTER(Int16, Int16, int16)
DECLARE_SAFE_GETTER(Int32, Int32, int32)
DECLARE_SAFE_GETTER(Int64, Int64, int64)
DECLARE_SAFE_GETTER(Float32, Float32, float32)
DECLARE_SAFE_GETTER(Float64, Float64, float64)
DECLARE_SAFE_GETTER(StringRef, String, string)
#undef DECLARE_SAFE_GETTER
std::size_t getAttributeIndex(const std::string & attribute_name) const override
{
const auto it = attribute_index_by_name.find(attribute_name);
if (it == std::end(attribute_index_by_name))
throw Exception{
"No such attribute '" + attribute_name + "'",
ErrorCodes::BAD_ARGUMENTS
};
return it->second;
}
#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\
bool is##NAME(const std::size_t attribute_idx) const override\
{\
return attributes[attribute_idx].type == attribute_type::LC_NAME;\
}
DECLARE_TYPE_CHECKER(UInt8, uint8)
DECLARE_TYPE_CHECKER(UInt16, uint16)
DECLARE_TYPE_CHECKER(UInt32, uint32)
DECLARE_TYPE_CHECKER(UInt64, uint64)
DECLARE_TYPE_CHECKER(Int8, int8)
DECLARE_TYPE_CHECKER(Int16, int16)
DECLARE_TYPE_CHECKER(Int32, int32)
DECLARE_TYPE_CHECKER(Int64, int64)
DECLARE_TYPE_CHECKER(Float32, float32)
DECLARE_TYPE_CHECKER(Float64, float64)
DECLARE_TYPE_CHECKER(String, string)
#undef DECLARE_TYPE_CHECKER
#define DECLARE_UNSAFE_GETTER(TYPE, NAME, LC_NAME)\
TYPE get##NAME##Unsafe(const std::size_t attribute_idx, const id_t id) const override\
{\
const auto & attribute = attributes[attribute_idx];\
if (id < attribute.LC_NAME##_array->size())\
return (*attribute.LC_NAME##_array)[id];\
return attribute.LC_NAME##_null_value;\
}
DECLARE_UNSAFE_GETTER(UInt8, UInt8, uint8)
DECLARE_UNSAFE_GETTER(UInt16, UInt16, uint16)
DECLARE_UNSAFE_GETTER(UInt32, UInt32, uint32)
DECLARE_UNSAFE_GETTER(UInt64, UInt64, uint64)
DECLARE_UNSAFE_GETTER(Int8, Int8, int8)
DECLARE_UNSAFE_GETTER(Int16, Int16, int16)
DECLARE_UNSAFE_GETTER(Int32, Int32, int32)
DECLARE_UNSAFE_GETTER(Int64, Int64, int64)
DECLARE_UNSAFE_GETTER(Float32, Float32, float32)
DECLARE_UNSAFE_GETTER(Float64, Float64, float64)
DECLARE_UNSAFE_GETTER(StringRef, String, string)
#undef DECLARE_UNSAFE_GETTER
private:
struct attribute_t
{
attribute_type type;
UInt8 uint8_null_value;
UInt16 uint16_null_value;
UInt32 uint32_null_value;
UInt64 uint64_null_value;
Int8 int8_null_value;
Int16 int16_null_value;
Int32 int32_null_value;
Int64 int64_null_value;
Float32 float32_null_value;
Float64 float64_null_value;
String string_null_value;
std::unique_ptr<PODArray<UInt8>> uint8_array;
std::unique_ptr<PODArray<UInt16>> uint16_array;
std::unique_ptr<PODArray<UInt32>> uint32_array;
std::unique_ptr<PODArray<UInt64>> uint64_array;
std::unique_ptr<PODArray<Int8>> int8_array;
std::unique_ptr<PODArray<Int16>> int16_array;
std::unique_ptr<PODArray<Int32>> int32_array;
std::unique_ptr<PODArray<Int64>> int64_array;
std::unique_ptr<PODArray<Float32>> float32_array;
std::unique_ptr<PODArray<Float64>> float64_array;
std::unique_ptr<Arena> string_arena;
std::unique_ptr<PODArray<StringRef>> string_array;
};
void createAttributes()
{
const auto size = dict_struct.attributes.size();
attributes.reserve(size);
for (const auto & attribute : dict_struct.attributes)
{
attribute_index_by_name.emplace(attribute.name, attributes.size());
attributes.push_back(std::move(createAttributeWithType(getAttributeTypeByName(attribute.type),
attribute.null_value)));
if (attribute.hierarchical)
hierarchical_attribute = &attributes.back();
}
}
void loadData()
{
auto stream = source_ptr->loadAll();
while (const auto block = stream->read())
{
const auto & id_column = *block.getByPosition(0).column;
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *block.getByPosition(attribute_idx + 1).column;
auto & attribute = attributes[attribute_idx];
for (const auto row_idx : ext::range(0, id_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
}
attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value)
{
attribute_t attr{type};
switch (type)
{
case attribute_type::uint8:
attr.uint8_null_value = DB::parse<UInt8>(null_value);
attr.uint8_array.reset(new PODArray<UInt8>);
attr.uint8_array->resize_fill(initial_array_size, attr.uint8_null_value);
break;
case attribute_type::uint16:
attr.uint16_null_value = DB::parse<UInt16>(null_value);
attr.uint16_array.reset(new PODArray<UInt16>);
attr.uint16_array->resize_fill(initial_array_size, attr.uint16_null_value);
break;
case attribute_type::uint32:
attr.uint32_null_value = DB::parse<UInt32>(null_value);
attr.uint32_array.reset(new PODArray<UInt32>);
attr.uint32_array->resize_fill(initial_array_size, attr.uint32_null_value);
break;
case attribute_type::uint64:
attr.uint64_null_value = DB::parse<UInt64>(null_value);
attr.uint64_array.reset(new PODArray<UInt64>);
attr.uint64_array->resize_fill(initial_array_size, attr.uint64_null_value);
break;
case attribute_type::int8:
attr.int8_null_value = DB::parse<Int8>(null_value);
attr.int8_array.reset(new PODArray<Int8>);
attr.int8_array->resize_fill(initial_array_size, attr.int8_null_value);
break;
case attribute_type::int16:
attr.int16_null_value = DB::parse<Int16>(null_value);
attr.int16_array.reset(new PODArray<Int16>);
attr.int16_array->resize_fill(initial_array_size, attr.int16_null_value);
break;
case attribute_type::int32:
attr.int32_null_value = DB::parse<Int32>(null_value);
attr.int32_array.reset(new PODArray<Int32>);
attr.int32_array->resize_fill(initial_array_size, attr.int32_null_value);
break;
case attribute_type::int64:
attr.int64_null_value = DB::parse<Int64>(null_value);
attr.int64_array.reset(new PODArray<Int64>);
attr.int64_array->resize_fill(initial_array_size, attr.int64_null_value);
break;
case attribute_type::float32:
attr.float32_null_value = DB::parse<Float32>(null_value);
attr.float32_array.reset(new PODArray<Float32>);
attr.float32_array->resize_fill(initial_array_size, attr.float32_null_value);
break;
case attribute_type::float64:
attr.float64_null_value = DB::parse<Float64>(null_value);
attr.float64_array.reset(new PODArray<Float64>);
attr.float64_array->resize_fill(initial_array_size, attr.float64_null_value);
break;
case attribute_type::string:
attr.string_null_value = null_value;
attr.string_arena.reset(new Arena);
attr.string_array.reset(new PODArray<StringRef>);
attr.string_array->resize_fill(initial_array_size, attr.string_null_value);
break;
}
return attr;
}
void setAttributeValue(attribute_t & attribute, const id_t id, const Field & value)
{
if (id >= max_array_size)
throw Exception{
"Identifier should be less than " + toString(max_array_size),
ErrorCodes::ARGUMENT_OUT_OF_BOUND
};
switch (attribute.type)
{
case attribute_type::uint8:
{
if (id >= attribute.uint8_array->size())
attribute.uint8_array->resize_fill(id, attribute.uint8_null_value);
(*attribute.uint8_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::uint16:
{
if (id >= attribute.uint16_array->size())
attribute.uint16_array->resize_fill(id, attribute.uint16_null_value);
(*attribute.uint16_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::uint32:
{
if (id >= attribute.uint32_array->size())
attribute.uint32_array->resize_fill(id, attribute.uint32_null_value);
(*attribute.uint32_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::uint64:
{
if (id >= attribute.uint64_array->size())
attribute.uint64_array->resize_fill(id, attribute.uint64_null_value);
(*attribute.uint64_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::int8:
{
if (id >= attribute.int8_array->size())
attribute.int8_array->resize_fill(id, attribute.int8_null_value);
(*attribute.int8_array)[id] = value.get<Int64>();
break;
}
case attribute_type::int16:
{
if (id >= attribute.int16_array->size())
attribute.int16_array->resize_fill(id, attribute.int16_null_value);
(*attribute.int16_array)[id] = value.get<Int64>();
break;
}
case attribute_type::int32:
{
if (id >= attribute.int32_array->size())
attribute.int32_array->resize_fill(id, attribute.int32_null_value);
(*attribute.int32_array)[id] = value.get<Int64>();
break;
}
case attribute_type::int64:
{
if (id >= attribute.int64_array->size())
attribute.int64_array->resize_fill(id, attribute.int64_null_value);
(*attribute.int64_array)[id] = value.get<Int64>();
break;
}
case attribute_type::float32:
{
if (id >= attribute.float32_array->size())
attribute.float32_array->resize_fill(id, attribute.float32_null_value);
(*attribute.float32_array)[id] = value.get<Float64>();
break;
}
case attribute_type::float64:
{
if (id >= attribute.float64_array->size())
attribute.float64_array->resize_fill(id, attribute.float64_null_value);
(*attribute.float64_array)[id] = value.get<Float64>();
break;
}
case attribute_type::string:
{
if (id >= attribute.string_array->size())
attribute.string_array->resize_fill(id, attribute.string_null_value);
const auto & string = value.get<String>();
const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size());
(*attribute.string_array)[id] = StringRef{string_in_arena, string.size()};
break;
}
};
}
const std::string name;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
std::map<std::string, std::size_t> attribute_index_by_name;
std::vector<attribute_t> attributes;
const attribute_t * hierarchical_attribute = nullptr;
};
}

View File

@ -0,0 +1,376 @@
#pragma once
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/DictionarySourceFactory.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Common/HashTable/HashMap.h>
#include <statdaemons/ext/range.hpp>
#include <statdaemons/ext/memory.hpp>
namespace DB
{
class HashedDictionary final : public IDictionary
{
public:
HashedDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime)
: name{name}, dict_struct(dict_struct),
source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime)
{
createAttributes();
loadData();
}
HashedDictionary(const HashedDictionary & other)
: HashedDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime}
{}
std::string getName() const override { return name; }
std::string getTypeName() const override { return "HashedDictionary"; }
bool isCached() const override { return false; }
DictionaryPtr clone() const override { return ext::make_unique<HashedDictionary>(*this); }
const IDictionarySource * const getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
bool hasHierarchy() const override { return hierarchical_attribute; }
id_t toParent(const id_t id) const override
{
const auto attr = hierarchical_attribute;
switch (hierarchical_attribute->type)
{
case attribute_type::uint8:
{
const auto it = attr->uint8_map->find(id);
return it != attr->uint8_map->end() ? it->second : attr->uint8_null_value;
}
case attribute_type::uint16:
{
const auto it = attr->uint16_map->find(id);
return it != attr->uint16_map->end() ? it->second : attr->uint16_null_value;
}
case attribute_type::uint32:
{
const auto it = attr->uint32_map->find(id);
return it != attr->uint32_map->end() ? it->second : attr->uint32_null_value;
}
case attribute_type::uint64:
{
const auto it = attr->uint64_map->find(id);
return it != attr->uint64_map->end() ? it->second : attr->uint64_null_value;
}
case attribute_type::int8:
{
const auto it = attr->int8_map->find(id);
return it != attr->int8_map->end() ? it->second : attr->int8_null_value;
}
case attribute_type::int16:
{
const auto it = attr->int16_map->find(id);
return it != attr->int16_map->end() ? it->second : attr->int16_null_value;
}
case attribute_type::int32:
{
const auto it = attr->int32_map->find(id);
return it != attr->int32_map->end() ? it->second : attr->int32_null_value;
}
case attribute_type::int64:
{
const auto it = attr->int64_map->find(id);
return it != attr->int64_map->end() ? it->second : attr->int64_null_value;
}
case attribute_type::float32:
case attribute_type::float64:
case attribute_type::string:
break;
};
throw Exception{
"Hierarchical attribute has non-integer type " + toString(hierarchical_attribute->type),
ErrorCodes::TYPE_MISMATCH
};
}
#define DECLARE_SAFE_GETTER(TYPE, NAME, LC_TYPE) \
TYPE get##NAME(const std::string & attribute_name, const id_t id) const override\
{\
const auto idx = getAttributeIndex(attribute_name);\
const auto & attribute = attributes[idx];\
if (attribute.type != attribute_type::LC_TYPE)\
throw Exception{\
"Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\
ErrorCodes::TYPE_MISMATCH\
};\
const auto it = attribute.LC_TYPE##_map->find(id);\
if (it != attribute.LC_TYPE##_map->end())\
return it->second;\
return attribute.LC_TYPE##_null_value;\
}
DECLARE_SAFE_GETTER(UInt8, UInt8, uint8)
DECLARE_SAFE_GETTER(UInt16, UInt16, uint16)
DECLARE_SAFE_GETTER(UInt32, UInt32, uint32)
DECLARE_SAFE_GETTER(UInt64, UInt64, uint64)
DECLARE_SAFE_GETTER(Int8, Int8, int8)
DECLARE_SAFE_GETTER(Int16, Int16, int16)
DECLARE_SAFE_GETTER(Int32, Int32, int32)
DECLARE_SAFE_GETTER(Int64, Int64, int64)
DECLARE_SAFE_GETTER(Float32, Float32, float32)
DECLARE_SAFE_GETTER(Float64, Float64, float64)
DECLARE_SAFE_GETTER(StringRef, String, string)
#undef DECLARE_SAFE_GETTER
std::size_t getAttributeIndex(const std::string & attribute_name) const override
{
const auto it = attribute_index_by_name.find(attribute_name);
if (it == std::end(attribute_index_by_name))
throw Exception{
"No such attribute '" + attribute_name + "'",
ErrorCodes::BAD_ARGUMENTS
};
return it->second;
}
#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\
bool is##NAME(const std::size_t attribute_idx) const override\
{\
return attributes[attribute_idx].type == attribute_type::LC_NAME;\
}
DECLARE_TYPE_CHECKER(UInt8, uint8)
DECLARE_TYPE_CHECKER(UInt16, uint16)
DECLARE_TYPE_CHECKER(UInt32, uint32)
DECLARE_TYPE_CHECKER(UInt64, uint64)
DECLARE_TYPE_CHECKER(Int8, int8)
DECLARE_TYPE_CHECKER(Int16, int16)
DECLARE_TYPE_CHECKER(Int32, int32)
DECLARE_TYPE_CHECKER(Int64, int64)
DECLARE_TYPE_CHECKER(Float32, float32)
DECLARE_TYPE_CHECKER(Float64, float64)
DECLARE_TYPE_CHECKER(String, string)
#undef DECLARE_TYPE_CHECKER
#define DECLARE_UNSAFE_GETTER(TYPE, NAME, LC_NAME)\
TYPE get##NAME##Unsafe(const std::size_t attribute_idx, const id_t id) const override\
{\
const auto & attribute = attributes[attribute_idx];\
const auto it = attribute.LC_NAME##_map->find(id);\
if (it != attribute.LC_NAME##_map->end())\
return it->second;\
return attribute.LC_NAME##_null_value;\
}
DECLARE_UNSAFE_GETTER(UInt8, UInt8, uint8)
DECLARE_UNSAFE_GETTER(UInt16, UInt16, uint16)
DECLARE_UNSAFE_GETTER(UInt32, UInt32, uint32)
DECLARE_UNSAFE_GETTER(UInt64, UInt64, uint64)
DECLARE_UNSAFE_GETTER(Int8, Int8, int8)
DECLARE_UNSAFE_GETTER(Int16, Int16, int16)
DECLARE_UNSAFE_GETTER(Int32, Int32, int32)
DECLARE_UNSAFE_GETTER(Int64, Int64, int64)
DECLARE_UNSAFE_GETTER(Float32, Float32, float32)
DECLARE_UNSAFE_GETTER(Float64, Float64, float64)
DECLARE_UNSAFE_GETTER(StringRef, String, string)
#undef DECLARE_UNSAFE_GETTER
private:
struct attribute_t
{
attribute_type type;
UInt8 uint8_null_value;
UInt16 uint16_null_value;
UInt32 uint32_null_value;
UInt64 uint64_null_value;
Int8 int8_null_value;
Int16 int16_null_value;
Int32 int32_null_value;
Int64 int64_null_value;
Float32 float32_null_value;
Float64 float64_null_value;
String string_null_value;
std::unique_ptr<HashMap<UInt64, UInt8>> uint8_map;
std::unique_ptr<HashMap<UInt64, UInt16>> uint16_map;
std::unique_ptr<HashMap<UInt64, UInt32>> uint32_map;
std::unique_ptr<HashMap<UInt64, UInt64>> uint64_map;
std::unique_ptr<HashMap<UInt64, Int8>> int8_map;
std::unique_ptr<HashMap<UInt64, Int16>> int16_map;
std::unique_ptr<HashMap<UInt64, Int32>> int32_map;
std::unique_ptr<HashMap<UInt64, Int64>> int64_map;
std::unique_ptr<HashMap<UInt64, Float32>> float32_map;
std::unique_ptr<HashMap<UInt64, Float64>> float64_map;
std::unique_ptr<Arena> string_arena;
std::unique_ptr<HashMap<UInt64, StringRef>> string_map;
};
void createAttributes()
{
const auto size = dict_struct.attributes.size();
attributes.reserve(size);
for (const auto & attribute : dict_struct.attributes)
{
attribute_index_by_name.emplace(attribute.name, attributes.size());
attributes.push_back(std::move(createAttributeWithType(getAttributeTypeByName(attribute.type),
attribute.null_value)));
if (attribute.hierarchical)
hierarchical_attribute = &attributes.back();
}
}
void loadData()
{
auto stream = source_ptr->loadAll();
while (const auto block = stream->read())
{
const auto & id_column = *block.getByPosition(0).column;
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *block.getByPosition(attribute_idx + 1).column;
auto & attribute = attributes[attribute_idx];
for (const auto row_idx : ext::range(0, id_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
}
attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value)
{
attribute_t attr{type};
switch (type)
{
case attribute_type::uint8:
attr.uint8_null_value = DB::parse<UInt8>(null_value);
attr.uint8_map.reset(new HashMap<UInt64, UInt8>);
break;
case attribute_type::uint16:
attr.uint16_null_value = DB::parse<UInt16>(null_value);
attr.uint16_map.reset(new HashMap<UInt64, UInt16>);
break;
case attribute_type::uint32:
attr.uint32_null_value = DB::parse<UInt32>(null_value);
attr.uint32_map.reset(new HashMap<UInt64, UInt32>);
break;
case attribute_type::uint64:
attr.uint64_null_value = DB::parse<UInt64>(null_value);
attr.uint64_map.reset(new HashMap<UInt64, UInt64>);
break;
case attribute_type::int8:
attr.int8_null_value = DB::parse<Int8>(null_value);
attr.int8_map.reset(new HashMap<UInt64, Int8>);
break;
case attribute_type::int16:
attr.int16_null_value = DB::parse<Int16>(null_value);
attr.int16_map.reset(new HashMap<UInt64, Int16>);
break;
case attribute_type::int32:
attr.int32_null_value = DB::parse<Int32>(null_value);
attr.int32_map.reset(new HashMap<UInt64, Int32>);
break;
case attribute_type::int64:
attr.int64_null_value = DB::parse<Int64>(null_value);
attr.int64_map.reset(new HashMap<UInt64, Int64>);
break;
case attribute_type::float32:
attr.float32_null_value = DB::parse<Float32>(null_value);
attr.float32_map.reset(new HashMap<UInt64, Float32>);
break;
case attribute_type::float64:
attr.float64_null_value = DB::parse<Float64>(null_value);
attr.float64_map.reset(new HashMap<UInt64, Float64>);
break;
case attribute_type::string:
attr.string_null_value = null_value;
attr.string_arena.reset(new Arena);
attr.string_map.reset(new HashMap<UInt64, StringRef>);
break;
}
return attr;
}
void setAttributeValue(attribute_t & attribute, const id_t id, const Field & value)
{
switch (attribute.type)
{
case attribute_type::uint8:
{
attribute.uint8_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::uint16:
{
attribute.uint16_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::uint32:
{
attribute.uint32_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::uint64:
{
attribute.uint64_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::int8:
{
attribute.int8_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::int16:
{
attribute.int16_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::int32:
{
attribute.int32_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::int64:
{
attribute.int64_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::float32:
{
attribute.float32_map->insert({ id, value.get<Float64>() });
break;
}
case attribute_type::float64:
{
attribute.float64_map->insert({ id, value.get<Float64>() });
break;
}
case attribute_type::string:
{
const auto & string = value.get<String>();
const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size());
attribute.string_map->insert({ id, StringRef{string_in_arena, string.size()} });
break;
}
};
}
const std::string name;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
std::map<std::string, std::size_t> attribute_index_by_name;
std::vector<attribute_t> attributes;
const attribute_t * hierarchical_attribute = nullptr;
};
}

View File

@ -0,0 +1,95 @@
#pragma once
#include <DB/Core/Field.h>
#include <DB/Core/StringRef.h>
#include <memory>
#include <Poco/Util/XMLConfiguration.h>
namespace DB
{
class IDictionarySource;
class IDictionary;
using DictionaryPtr = std::unique_ptr<IDictionary>;
class DictionaryLifetime;
class IDictionary
{
public:
using id_t = std::uint64_t;
virtual std::string getName() const = 0;
virtual std::string getTypeName() const = 0;
virtual bool isCached() const = 0;
virtual void reload() {}
virtual DictionaryPtr clone() const = 0;
virtual const IDictionarySource * const getSource() const = 0;
virtual const DictionaryLifetime & getLifetime() const = 0;
virtual bool hasHierarchy() const = 0;
/// do not call unless you ensure that hasHierarchy() returns true
virtual id_t toParent(id_t id) const = 0;
bool in(id_t child_id, const id_t ancestor_id) const
{
while (child_id != 0 && child_id != ancestor_id)
child_id = toParent(child_id);
return child_id != 0;
}
/// safe and slow functions, perform map lookup and type checks
virtual UInt8 getUInt8(const std::string & attribute_name, id_t id) const = 0;
virtual UInt16 getUInt16(const std::string & attribute_name, id_t id) const = 0;
virtual UInt32 getUInt32(const std::string & attribute_name, id_t id) const = 0;
virtual UInt64 getUInt64(const std::string & attribute_name, id_t id) const = 0;
virtual Int8 getInt8(const std::string & attribute_name, id_t id) const = 0;
virtual Int16 getInt16(const std::string & attribute_name, id_t id) const = 0;
virtual Int32 getInt32(const std::string & attribute_name, id_t id) const = 0;
virtual Int64 getInt64(const std::string & attribute_name, id_t id) const = 0;
virtual Float32 getFloat32(const std::string & attribute_name, id_t id) const = 0;
virtual Float64 getFloat64(const std::string & attribute_name, id_t id) const = 0;
virtual StringRef getString(const std::string & attribute_name, id_t id) const = 0;
/// unsafe functions for maximum performance, you are on your own ensuring type-safety
/// returns persistent attribute index for usage with following functions
virtual std::size_t getAttributeIndex(const std::string & attribute_name) const = 0;
/// type-checking functions
virtual bool isUInt8(std::size_t attribute_idx) const = 0;
virtual bool isUInt16(std::size_t attribute_idx) const = 0;
virtual bool isUInt32(std::size_t attribute_idx) const = 0;
virtual bool isUInt64(std::size_t attribute_idx) const = 0;
virtual bool isInt8(std::size_t attribute_idx) const = 0;
virtual bool isInt16(std::size_t attribute_idx) const = 0;
virtual bool isInt32(std::size_t attribute_idx) const = 0;
virtual bool isInt64(std::size_t attribute_idx) const = 0;
virtual bool isFloat32(std::size_t attribute_idx) const = 0;
virtual bool isFloat64(std::size_t attribute_idx) const = 0;
virtual bool isString(std::size_t attribute_idx) const = 0;
/// plain load from target container without any checks
virtual UInt8 getUInt8Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual UInt16 getUInt16Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual UInt32 getUInt32Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual UInt64 getUInt64Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual Int8 getInt8Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual Int16 getInt16Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual Int32 getInt32Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual Int64 getInt64Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual Float32 getFloat32Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual Float64 getFloat64Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual StringRef getStringUnsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual ~IDictionary() = default;
};
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <DB/DataStreams/IBlockInputStream.h>
#include <vector>
namespace DB
{
class IDictionarySource;
using DictionarySourcePtr = std::unique_ptr<IDictionarySource>;
class IDictionarySource
{
public:
virtual BlockInputStreamPtr loadAll() = 0;
virtual BlockInputStreamPtr loadId(const std::uint64_t id) = 0;
virtual BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) = 0;
virtual bool isModified() const = 0;
virtual DictionarySourcePtr clone() const = 0;
virtual ~IDictionarySource() = default;
};
}

View File

@ -0,0 +1,114 @@
#pragma once
#include <DB/Core/Block.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <statdaemons/ext/range.hpp>
#include <mysqlxx/Query.h>
#include <vector>
#include <string>
namespace DB
{
class MysqlBlockInputStream final : public IProfilingBlockInputStream
{
public:
MysqlBlockInputStream(mysqlxx::Query query, const Block & sample_block, const std::size_t max_block_size)
: query{std::move(query)}, result{query.use()}, sample_block{sample_block}, max_block_size{max_block_size}
{
types.reserve(sample_block.columns());
for (const auto idx : ext::range(0, sample_block.columns()))
{
const auto type = sample_block.getByPosition(idx).type.get();
if (typeid_cast<const DataTypeUInt8 *>(type))
types.push_back(attribute_type::uint8);
else if (typeid_cast<const DataTypeUInt16 *>(type))
types.push_back(attribute_type::uint16);
else if (typeid_cast<const DataTypeUInt32 *>(type))
types.push_back(attribute_type::uint32);
else if (typeid_cast<const DataTypeUInt64 *>(type))
types.push_back(attribute_type::uint64);
else if (typeid_cast<const DataTypeInt8 *>(type))
types.push_back(attribute_type::int8);
else if (typeid_cast<const DataTypeInt16 *>(type))
types.push_back(attribute_type::int16);
else if (typeid_cast<const DataTypeInt32 *>(type))
types.push_back(attribute_type::int32);
else if (typeid_cast<const DataTypeInt64 *>(type))
types.push_back(attribute_type::int64);
else if (typeid_cast<const DataTypeFloat32 *>(type))
types.push_back(attribute_type::float32);
else if (typeid_cast<const DataTypeInt64 *>(type))
types.push_back(attribute_type::float64);
else if (typeid_cast<const DataTypeString *>(type))
types.push_back(attribute_type::string);
else
throw Exception{
"Unsupported type " + type->getName(),
ErrorCodes::UNKNOWN_TYPE
};
}
}
String getName() const override { return "MysqlBlockInputStream"; }
String getID() const override
{
return "Mysql(" + query.str() + ")";
}
private:
Block readImpl() override
{
auto block = sample_block.cloneEmpty();
if (block.columns() != result.getNumFields())
throw Exception{
"mysqlxx::UserQueryResult contains " + toString(result.getNumFields()) + " columns while " +
toString(block.columns()) + " expected",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH
};
std::size_t rows = 0;
while (auto row = result.fetch())
{
for (const auto idx : ext::range(0, row.size()))
insertValue(block.getByPosition(idx).column, row[idx], types[idx]);
++rows;
if (rows == max_block_size)
break;
}
return rows == 0 ? Block{} : block;
};
static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const attribute_type type)
{
switch (type)
{
case attribute_type::uint8: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::uint16: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::uint32: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::uint64: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::int8: column->insert(static_cast<Int64>(value)); break;
case attribute_type::int16: column->insert(static_cast<Int64>(value)); break;
case attribute_type::int32: column->insert(static_cast<Int64>(value)); break;
case attribute_type::int64: column->insert(static_cast<Int64>(value)); break;
case attribute_type::float32: column->insert(static_cast<Float64>(value)); break;
case attribute_type::float64: column->insert(static_cast<Float64>(value)); break;
case attribute_type::string: column->insert(value.getString()); break;
}
}
mysqlxx::Query query;
mysqlxx::UseQueryResult result;
Block sample_block;
std::size_t max_block_size;
std::vector<attribute_type> types;
};
}

View File

@ -0,0 +1,120 @@
#pragma once
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/MysqlBlockInputStream.h>
#include <DB/Interpreters/Context.h>
#include <statdaemons/ext/range.hpp>
#include <mysqlxx/Pool.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
class MysqlDictionarySource final : public IDictionarySource
{
static const auto max_block_size = 8192;
public:
MysqlDictionarySource(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block, const Context & context)
: host{config.getString(config_prefix + "host")},
port(config.getInt(config_prefix + "port")),
user{config.getString(config_prefix + "user", "")},
password{config.getString(config_prefix + "password", "")},
db{config.getString(config_prefix + "db", "")},
table{config.getString(config_prefix + "table")},
sample_block{sample_block}, context(context),
pool{db, host, user, password, port},
load_all_query{composeLoadAllQuery(sample_block, table)},
last_modification{getLastModification()}
{}
MysqlDictionarySource(const MysqlDictionarySource & other)
: host{other.host}, port{other.port}, user{other.user}, password{other.password},
db{other.db}, table{other.db},
sample_block{other.sample_block}, context(other.context),
pool{db, host, user, password, port},
load_all_query{other.load_all_query}, last_modification{other.last_modification}
{}
BlockInputStreamPtr loadAll() override
{
return new MysqlBlockInputStream{pool.Get()->query(load_all_query), sample_block, max_block_size};
}
BlockInputStreamPtr loadId(const std::uint64_t id) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
bool isModified() const override { return getLastModification() > last_modification; }
DictionarySourcePtr clone() const override { return ext::make_unique<MysqlDictionarySource>(*this); }
private:
mysqlxx::DateTime getLastModification() const
{
const auto Create_time_idx = 11;
const auto Update_time_idx = 12;
try
{
auto connection = pool.Get();
auto query = connection->query("SHOW TABLE STATUS LIKE '%" + table + "%';");
auto result = query.use();
auto row = result.fetch();
const auto & update_time = row[Update_time_idx];
return !update_time.isNull() ? update_time.getDateTime() : row[Create_time_idx].getDateTime();
}
catch (...)
{
tryLogCurrentException("MysqlDictionarySource");
}
return {};
}
static std::string composeLoadAllQuery(const Block & block, const std::string & table)
{
std::string query{"SELECT "};
auto first = true;
for (const auto idx : ext::range(0, block.columns()))
{
if (!first)
query += ", ";
query += block.getByPosition(idx).name;
first = false;
}
query += " FROM " + table + ';';
return query;
}
const std::string host;
const UInt16 port;
const std::string user;
const std::string password;
const std::string db;
const std::string table;
Block sample_block;
const Context & context;
mutable mysqlxx::Pool pool;
const std::string load_all_query;
mysqlxx::DateTime last_modification;
};
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <DB/IO/ReadBuffer.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <memory>
namespace DB
{
class OwningBufferBlockInputStream : public IProfilingBlockInputStream
{
public:
OwningBufferBlockInputStream(const BlockInputStreamPtr & stream, std::unique_ptr<ReadBuffer> buffer)
: stream{stream}, buffer{std::move(buffer)}
{
children.push_back(stream);
}
private:
Block readImpl() override { return stream->read(); }
String getName() const override { return "OwningBufferBlockInputStream"; }
String getID() const override {
return "OwningBuffer(" + stream->getID() + ")";
}
BlockInputStreamPtr stream;
std::unique_ptr<ReadBuffer> buffer;
};
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <memory>
namespace DB
{
template <typename T> struct release
{
void operator()(const T * const ptr) { ptr->release(); }
};
template <typename T> using config_ptr_t = std::unique_ptr<T, release<T>>;
}

View File

@ -11,6 +11,9 @@
#include <DB/Interpreters/Context.h>
#include <DB/Functions/IFunction.h>
#include <statdaemons/ext/range.hpp>
#include <DB/Dictionaries/FlatDictionary.h>
#include <DB/Dictionaries/HashedDictionary.h>
namespace DB
@ -715,4 +718,760 @@ public:
}
};
class FunctionDictGetString : public IFunction
{
public:
static constexpr auto name = "dictGetString";
static IFunction * create(const Context & context)
{
return new FunctionDictGetString{context.getDictionaries()};
};
FunctionDictGetString(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
private:
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if (arguments.size() != 3)
throw Exception{
"Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 3.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
};
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
if (!typeid_cast<const DataTypeString *>(arguments[1].get()))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
const auto id_arg = arguments[2].get();
if (!typeid_cast<const DataTypeUInt8 *>(id_arg) &&
!typeid_cast<const DataTypeUInt16 *>(id_arg) &&
!typeid_cast<const DataTypeUInt32 *>(id_arg) &&
!typeid_cast<const DataTypeUInt64 *>(id_arg) &&
!typeid_cast<const DataTypeInt8 *>(id_arg) &&
!typeid_cast<const DataTypeInt16 *>(id_arg) &&
!typeid_cast<const DataTypeInt32 *>(id_arg) &&
!typeid_cast<const DataTypeInt64 *>(id_arg))
{
throw Exception{
"Illegal type " + arguments[2]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
return new DataTypeString;
}
void execute(Block & block, const ColumnNumbers & arguments, const size_t result)
{
const auto dict_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[0]).column.get());
if (!dict_name_col)
throw Exception{
"First argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
};
}
template <typename DictionaryType>
bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result,
const IDictionary * const dictionary)
{
const auto dict = typeid_cast<const DictionaryType *>(dictionary);
if (!dict)
return false;
const auto attr_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[1]).column.get());
if (!attr_name_col)
throw Exception{
"Second argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
const auto & attr_name = attr_name_col->getData();
const auto id_col = block.getByPosition(arguments[2]).column.get();
if (!execute<UInt8>(block, result, dict, attr_name, id_col) &&
!execute<UInt16>(block, result, dict, attr_name, id_col) &&
!execute<UInt32>(block, result, dict, attr_name, id_col) &&
!execute<UInt64>(block, result, dict, attr_name, id_col) &&
!execute<Int8>(block, result, dict, attr_name, id_col) &&
!execute<Int16>(block, result, dict, attr_name, id_col) &&
!execute<Int32>(block, result, dict, attr_name, id_col) &&
!execute<Int64>(block, result, dict, attr_name, id_col))
{
throw Exception{
"Third argument of function " + getName() + " must be integral",
ErrorCodes::ILLEGAL_COLUMN
};
}
return true;
}
template <typename T, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * const dictionary,
const std::string & attr_name, const IColumn * const id_col_untyped)
{
if (const auto id_col = typeid_cast<const ColumnVector<T> *>(id_col_untyped))
{
const auto out = new ColumnString;
block.getByPosition(result).column = out;
const auto attribute_idx = dictionary->getAttributeIndex(attr_name);
if (!dictionary->isString(attribute_idx))
throw Exception{
"Type mismatch: attribute " + attr_name + " has type different from String",
ErrorCodes::TYPE_MISMATCH
};
for (const auto & id : id_col->getData())
{
const auto string_ref = dictionary->getStringUnsafe(attribute_idx, id);
out->insertData(string_ref.data, string_ref.size);
}
return true;
}
else if (const auto id_col = typeid_cast<const ColumnConst<T> *>(id_col_untyped))
{
block.getByPosition(result).column = new ColumnConst<String>{
id_col->size(),
dictionary->getString(attr_name, id_col->getData()).toString()
};
return true;
};
return false;
}
const Dictionaries & dictionaries;
};
template <typename DataType> struct DictGetTraits;
#define DECLARE_DICT_GET_TRAITS(TYPE, DATA_TYPE) \
template <> struct DictGetTraits<DATA_TYPE>\
{\
static TYPE get(const IDictionary * const dict, const std::string & name, const IDictionary::id_t id)\
{\
return dict->get##TYPE(name, id);\
}\
static bool is(const IDictionary * const dict, const std::size_t idx) { return dict->is##TYPE(idx); } \
static TYPE get(const IDictionary * const dict, const std::size_t idx, const IDictionary::id_t id)\
{\
return dict->get##TYPE##Unsafe(idx, id);\
}\
};
DECLARE_DICT_GET_TRAITS(UInt8, DataTypeUInt8)
DECLARE_DICT_GET_TRAITS(UInt16, DataTypeUInt16)
DECLARE_DICT_GET_TRAITS(UInt32, DataTypeUInt32)
DECLARE_DICT_GET_TRAITS(UInt64, DataTypeUInt64)
DECLARE_DICT_GET_TRAITS(Int8, DataTypeInt8)
DECLARE_DICT_GET_TRAITS(Int16, DataTypeInt16)
DECLARE_DICT_GET_TRAITS(Int32, DataTypeInt32)
DECLARE_DICT_GET_TRAITS(Int64, DataTypeInt64)
DECLARE_DICT_GET_TRAITS(Float32, DataTypeFloat32)
DECLARE_DICT_GET_TRAITS(Float64, DataTypeFloat64)
#undef DECLARE_DICT_GET_TRAITS
template <typename DataType>
class FunctionDictGet final : public IFunction
{
using Type = typename DataType::FieldType;
public:
static const std::string name;
static IFunction * create(const Context & context)
{
return new FunctionDictGet{context.getDictionaries()};
};
FunctionDictGet(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
private:
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if (arguments.size() != 3)
throw Exception{
"Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 3.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
};
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
if (!typeid_cast<const DataTypeString *>(arguments[1].get()))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
const auto id_arg = arguments[2].get();
if (!typeid_cast<const DataTypeUInt8 *>(id_arg) &&
!typeid_cast<const DataTypeUInt16 *>(id_arg) &&
!typeid_cast<const DataTypeUInt32 *>(id_arg) &&
!typeid_cast<const DataTypeUInt64 *>(id_arg) &&
!typeid_cast<const DataTypeInt8 *>(id_arg) &&
!typeid_cast<const DataTypeInt16 *>(id_arg) &&
!typeid_cast<const DataTypeInt32 *>(id_arg) &&
!typeid_cast<const DataTypeInt64 *>(id_arg))
{
throw Exception{
"Illegal type " + id_arg->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
return new DataType;
}
void execute(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[0]).column.get());
if (!dict_name_col)
throw Exception{
"First argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
};
}
template <typename DictionaryType>
bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result,
const IDictionary * const dictionary)
{
const auto dict = typeid_cast<const DictionaryType *>(dictionary);
if (!dict)
return false;
const auto attr_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[1]).column.get());
if (!attr_name_col)
throw Exception{
"Second argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
const auto & attr_name = attr_name_col->getData();
const auto id_col = block.getByPosition(arguments[2]).column.get();
if (!execute<UInt8>(block, result, dict, attr_name, id_col) &&
!execute<UInt16>(block, result, dict, attr_name, id_col) &&
!execute<UInt32>(block, result, dict, attr_name, id_col) &&
!execute<UInt64>(block, result, dict, attr_name, id_col) &&
!execute<Int8>(block, result, dict, attr_name, id_col) &&
!execute<Int16>(block, result, dict, attr_name, id_col) &&
!execute<Int32>(block, result, dict, attr_name, id_col) &&
!execute<Int64>(block, result, dict, attr_name, id_col))
{
throw Exception{
"Third argument of function " + getName() + " must be integral",
ErrorCodes::ILLEGAL_COLUMN
};
}
return true;
}
template <typename T, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * const dictionary,
const std::string & attr_name, const IColumn * const id_col_untyped)
{
if (const auto id_col = typeid_cast<const ColumnVector<T> *>(id_col_untyped))
{
const auto attribute_idx = dictionary->getAttributeIndex(attr_name);
if (!DictGetTraits<DataType>::is(dictionary, attribute_idx))
throw Exception{
"Type mismatch: attribute " + attr_name + " has type different from UInt64",
ErrorCodes::TYPE_MISMATCH
};
const auto out = new ColumnVector<Type>;
block.getByPosition(result).column = out;
const auto & ids = id_col->getData();
auto & data = out->getData();
const auto size = ids.size();
data.resize(size);
for (const auto idx : ext::range(0, size))
data[idx] = DictGetTraits<DataType>::get(dictionary, attribute_idx, ids[idx]);
return true;
}
else if (const auto id_col = typeid_cast<const ColumnConst<T> *>(id_col_untyped))
{
block.getByPosition(result).column = new ColumnConst<Type>{
id_col->size(),
DictGetTraits<DataType>::get(dictionary, attr_name, id_col->getData())
};
return true;
};
return false;
}
const Dictionaries & dictionaries;
};
template <typename DataType>
const std::string FunctionDictGet<DataType>::name = "dictGet" + TypeName<typename DataType::FieldType>::get();
using FunctionDictGetUInt8 = FunctionDictGet<DataTypeUInt8>;
using FunctionDictGetUInt16 = FunctionDictGet<DataTypeUInt16>;
using FunctionDictGetUInt32 = FunctionDictGet<DataTypeUInt32>;
using FunctionDictGetUInt64 = FunctionDictGet<DataTypeUInt64>;
using FunctionDictGetInt8 = FunctionDictGet<DataTypeInt8>;
using FunctionDictGetInt16 = FunctionDictGet<DataTypeInt16>;
using FunctionDictGetInt32 = FunctionDictGet<DataTypeInt32>;
using FunctionDictGetInt64 = FunctionDictGet<DataTypeInt64>;
using FunctionDictGetFloat32 = FunctionDictGet<DataTypeFloat32>;
using FunctionDictGetFloat64 = FunctionDictGet<DataTypeFloat64>;
class FunctionDictGetHierarchy final : public IFunction
{
public:
static constexpr auto name = "dictGetHierarchy";
static IFunction * create(const Context & context)
{
return new FunctionDictGetHierarchy{context.getDictionaries()};
};
FunctionDictGetHierarchy(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
private:
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if (arguments.size() != 2)
throw Exception{
"Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 2.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
};
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
const auto id_arg = arguments[1].get();
if (!typeid_cast<const DataTypeUInt8 *>(id_arg) &&
!typeid_cast<const DataTypeUInt16 *>(id_arg) &&
!typeid_cast<const DataTypeUInt32 *>(id_arg) &&
!typeid_cast<const DataTypeUInt64 *>(id_arg) &&
!typeid_cast<const DataTypeInt8 *>(id_arg) &&
!typeid_cast<const DataTypeInt16 *>(id_arg) &&
!typeid_cast<const DataTypeInt32 *>(id_arg) &&
!typeid_cast<const DataTypeInt64 *>(id_arg))
{
throw Exception{
"Illegal type " + id_arg->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
return new DataTypeArray{new DataTypeUInt64};
};
void execute(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[0]).column.get());
if (!dict_name_col)
throw Exception{
"First argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!dict->hasHierarchy())
throw Exception{
"Dictionary does not have a hierarchy",
ErrorCodes::UNSUPPORTED_METHOD
};
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
};
}
template <typename DictionaryType>
bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result,
const IDictionary * const dictionary)
{
const auto dict = typeid_cast<const DictionaryType *>(dictionary);
if (!dict)
return false;
const auto id_col = block.getByPosition(arguments[1]).column.get();
if (!execute<UInt8>(block, result, dict, id_col) &&
!execute<UInt16>(block, result, dict, id_col) &&
!execute<UInt32>(block, result, dict, id_col) &&
!execute<UInt64>(block, result, dict, id_col) &&
!execute<Int8>(block, result, dict, id_col) &&
!execute<Int16>(block, result, dict, id_col) &&
!execute<Int32>(block, result, dict, id_col) &&
!execute<Int64>(block, result, dict, id_col))
{
throw Exception{
"Second argument of function " + getName() + " must be integral",
ErrorCodes::ILLEGAL_COLUMN
};
}
return true;
}
template <typename T, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * const dictionary,
const IColumn * const id_col_untyped)
{
if (const auto id_col = typeid_cast<const ColumnVector<T> *>(id_col_untyped))
{
const auto backend = new ColumnVector<UInt64>;
const auto array = new ColumnArray{backend};
block.getByPosition(result).column = array;
const auto & in = id_col->getData();
const auto size = in.size();
auto & out = backend->getData();
auto & offsets = array->getOffsets();
offsets.resize(size);
out.reserve(size * 4);
for (const auto idx : ext::range(0, size))
{
IDictionary::id_t cur = in[idx];
while (cur)
{
out.push_back(cur);
cur = dictionary->toParent(cur);
}
offsets[idx] = out.size();
};
return true;
}
else if (const auto id_col = typeid_cast<const ColumnConst<T> *>(id_col_untyped))
{
Array res;
IDictionary::id_t cur = id_col->getData();
while (cur)
{
res.push_back(static_cast<typename NearestFieldType<T>::Type>(cur));
cur = dictionary->toParent(cur);
}
block.getByPosition(result).column = new ColumnConstArray{
id_col->size(),
res,
new DataTypeArray{new DataTypeUInt64}
};
return true;
};
return false;
}
const Dictionaries & dictionaries;
};
class FunctionDictIsIn final : public IFunction
{
public:
static constexpr auto name = "dictIsIn";
static IFunction * create(const Context & context)
{
return new FunctionDictIsIn{context.getDictionaries()};
};
FunctionDictIsIn(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
private:
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if (arguments.size() != 3)
throw Exception{
"Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 3.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
};
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
const auto child_id_arg = arguments[1].get();
if (!typeid_cast<const DataTypeUInt8 *>(child_id_arg) &&
!typeid_cast<const DataTypeUInt16 *>(child_id_arg) &&
!typeid_cast<const DataTypeUInt32 *>(child_id_arg) &&
!typeid_cast<const DataTypeUInt64 *>(child_id_arg) &&
!typeid_cast<const DataTypeInt8 *>(child_id_arg) &&
!typeid_cast<const DataTypeInt16 *>(child_id_arg) &&
!typeid_cast<const DataTypeInt32 *>(child_id_arg) &&
!typeid_cast<const DataTypeInt64 *>(child_id_arg))
{
throw Exception{
"Illegal type " + child_id_arg->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
const auto ancestor_id_arg = arguments[2].get();
if (!typeid_cast<const DataTypeUInt8 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeUInt16 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeUInt32 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeUInt64 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeInt8 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeInt16 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeInt32 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeInt64 *>(ancestor_id_arg))
{
throw Exception{
"Illegal type " + ancestor_id_arg->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
return new DataTypeUInt8;
}
void execute(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[0]).column.get());
if (!dict_name_col)
throw Exception{
"First argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!dict->hasHierarchy())
throw Exception{
"Dictionary does not have a hierarchy",
ErrorCodes::UNSUPPORTED_METHOD
};
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
};
}
template <typename DictionaryType>
bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result,
const IDictionary * const dictionary)
{
const auto dict = typeid_cast<const DictionaryType *>(dictionary);
if (!dict)
return false;
const auto child_id_col = block.getByPosition(arguments[1]).column.get();
const auto ancestor_id_col = block.getByPosition(arguments[2]).column.get();
if (!execute<UInt8>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<UInt16>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<UInt32>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<UInt64>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<Int8>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<Int16>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<Int32>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<Int64>(block, result, dict, child_id_col, ancestor_id_col))
{
throw Exception{
"Illegal column " + child_id_col->getName()
+ " of second argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN
};
}
return true;
}
template <typename T, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * const dictionary,
const IColumn * const child_id_col_untyped, const IColumn * const ancestor_id_col_untyped)
{
if (execute<T, ColumnVector<T>>(block, result, dictionary, child_id_col_untyped, ancestor_id_col_untyped) ||
execute<T, ColumnConst<T>>(block, result, dictionary, child_id_col_untyped, ancestor_id_col_untyped))
return true;
return false;
}
template <typename T, typename ColumnType, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * dictionary,
const IColumn * const child_id_col_untyped, const IColumn * const ancestor_id_col_untyped)
{
if (const auto child_id_col = typeid_cast<const ColumnType *>(child_id_col_untyped))
{
if (execute<T, UInt8>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, UInt16>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, UInt32>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, UInt64>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, Int8>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, Int16>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, Int32>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, Int64>(block, result, dictionary, child_id_col, ancestor_id_col_untyped))
return true;
else
throw Exception{
"Illegal column " + ancestor_id_col_untyped->getName()
+ " of third argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN
};
}
return false;
}
template <typename T, typename U, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * const dictionary,
const ColumnVector<T> * const child_id_col, const IColumn * const ancestor_id_col_untyped)
{
if (const auto ancestor_id_col = typeid_cast<const ColumnVector<T> *>(ancestor_id_col_untyped))
{
const auto out = new ColumnVector<UInt8>;
block.getByPosition(result).column = out;
const auto & child_ids = child_id_col->getData();
const auto & ancestor_ids = ancestor_id_col->getData();
auto & data = out->getData();
const auto size = child_id_col->size();
data.resize(size);
for (const auto idx : ext::range(0, size))
data[idx] = dictionary->in(child_ids[idx], ancestor_ids[idx]);
return true;
}
else if (const auto ancestor_id_col = typeid_cast<const ColumnConst<T> *>(ancestor_id_col_untyped))
{
const auto out = new ColumnVector<UInt8>;
block.getByPosition(result).column = out;
const auto & child_ids = child_id_col->getData();
const auto ancestor_id = ancestor_id_col->getData();
auto & data = out->getData();
const auto size = child_id_col->size();
data.resize(size);
for (const auto idx : ext::range(0, size))
data[idx] = dictionary->in(child_ids[idx], ancestor_id);
return true;
}
return false;
}
template <typename T, typename U, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * const dictionary,
const ColumnConst<T> * const child_id_col, const IColumn * const ancestor_id_col_untyped)
{
if (const auto ancestor_id_col = typeid_cast<const ColumnVector<T> *>(ancestor_id_col_untyped))
{
const auto out = new ColumnVector<UInt8>;
block.getByPosition(result).column = out;
const auto child_id = child_id_col->getData();
const auto & ancestor_ids = ancestor_id_col->getData();
auto & data = out->getData();
const auto size = child_id_col->size();
data.resize(size);
for (const auto idx : ext::range(0, size))
data[idx] = dictionary->in(child_id, ancestor_ids[idx]);
return true;
}
else if (const auto ancestor_id_col = typeid_cast<const ColumnConst<T> *>(ancestor_id_col_untyped))
{
block.getByPosition(result).column = new ColumnConst<UInt8>{
child_id_col->size(),
dictionary->in(child_id_col->getData(), ancestor_id_col->getData())
};
return true;
}
return false;
}
const Dictionaries & dictionaries;
};
}

View File

@ -1,6 +1,10 @@
#pragma once
#include <mutex>
#include <thread>
#include <unordered_map>
#include <chrono>
#include <random>
#include <Poco/SharedPtr.h>
@ -16,6 +20,9 @@ namespace DB
using Poco::SharedPtr;
class Context;
class IDictionary;
/// Словари Метрики, которые могут использоваться в функциях.
class Dictionaries
@ -24,15 +31,23 @@ private:
MultiVersion<RegionsHierarchies> regions_hierarchies;
MultiVersion<TechDataHierarchy> tech_data_hierarchy;
MultiVersion<RegionsNames> regions_names;
mutable std::mutex external_dictionaries_mutex;
std::unordered_map<std::string, std::shared_ptr<MultiVersion<IDictionary>>> external_dictionaries;
std::unordered_map<std::string, std::chrono::system_clock::time_point> update_times;
std::mt19937 rnd_engine;
Context & context;
/// Периодичность обновления справочников, в секундах.
int reload_period;
std::thread reloading_thread;
std::thread reloading_externals_thread;
Poco::Event destroy;
Logger * log;
Poco::Timestamp dictionaries_last_modified{0};
void handleException() const
{
@ -106,6 +121,9 @@ private:
LOG_INFO(log, "Loaded dictionaries.");
}
void reloadExternals();
/// Обновляет каждые reload_period секунд.
void reloadPeriodically()
{
@ -118,20 +136,35 @@ private:
}
}
void reloadExternalsPeriodically()
{
const auto check_period = 5 * 1000;
while (true)
{
if (destroy.tryWait(check_period))
return;
reloadExternals();
}
}
public:
/// Справочники будут обновляться в отдельном потоке, каждые reload_period секунд.
Dictionaries(int reload_period_ = 3600)
: reload_period(reload_period_),
Dictionaries(Context & context, int reload_period_ = 3600)
: context(context), reload_period(reload_period_),
log(&Logger::get("Dictionaries"))
{
reloadImpl();
reloadExternals();
reloading_thread = std::thread([this] { reloadPeriodically(); });
reloading_externals_thread = std::thread{&Dictionaries::reloadExternalsPeriodically, this};
}
~Dictionaries()
{
destroy.set();
reloading_thread.join();
reloading_externals_thread.join();
}
MultiVersion<RegionsHierarchies>::Version getRegionsHierarchies() const
@ -148,6 +181,19 @@ public:
{
return regions_names.get();
}
MultiVersion<IDictionary>::Version getExternalDictionary(const std::string & name) const
{
const std::lock_guard<std::mutex> lock{external_dictionaries_mutex};
const auto it = external_dictionaries.find(name);
if (it == std::end(external_dictionaries))
throw Exception{
"No such dictionary: " + name,
ErrorCodes::BAD_ARGUMENTS
};
return it->second->get();
}
};
}

View File

@ -21,6 +21,19 @@ void registerFunctionsDictionaries(FunctionFactory & factory)
factory.registerFunction<FunctionOSHierarchy>();
factory.registerFunction<FunctionSEHierarchy>();
factory.registerFunction<FunctionRegionToName>();
factory.registerFunction<FunctionDictGetUInt8>();
factory.registerFunction<FunctionDictGetUInt16>();
factory.registerFunction<FunctionDictGetUInt32>();
factory.registerFunction<FunctionDictGetUInt64>();
factory.registerFunction<FunctionDictGetInt8>();
factory.registerFunction<FunctionDictGetInt16>();
factory.registerFunction<FunctionDictGetInt32>();
factory.registerFunction<FunctionDictGetInt64>();
factory.registerFunction<FunctionDictGetFloat32>();
factory.registerFunction<FunctionDictGetFloat64>();
factory.registerFunction<FunctionDictGetString>();
factory.registerFunction<FunctionDictGetHierarchy>();
factory.registerFunction<FunctionDictIsIn>();
}
}

View File

@ -495,7 +495,11 @@ const Dictionaries & Context::getDictionaries() const
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->dictionaries)
shared->dictionaries = new Dictionaries;
{
if (!this->global_context)
throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
shared->dictionaries = new Dictionaries{ *this->global_context };
}
return *shared->dictionaries;
}

View File

@ -0,0 +1,112 @@
#include <DB/Interpreters/Dictionaries.h>
#include <DB/Dictionaries/DictionaryFactory.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/config_ptr_t.h>
namespace DB
{
void Dictionaries::reloadExternals()
{
const auto config_path = Poco::Util::Application::instance().config().getString("dictionaries_config");
if (config_path.empty())
return;
const auto last_modified = Poco::File{config_path}.getLastModified();
if (last_modified > dictionaries_last_modified)
{
/// definitions of dictionaries may have changed, recreate all of them
dictionaries_last_modified = last_modified;
const config_ptr_t<Poco::Util::XMLConfiguration> config{new Poco::Util::XMLConfiguration{config_path}};
/// get all dictionaries' definitions
Poco::Util::AbstractConfiguration::Keys keys;
config->keys(keys);
/// for each dictionary defined in xml config
for (const auto & key : keys)
{
try
{
if (0 != strncmp(key.data(), "dictionary", strlen("dictionary")))
{
LOG_WARNING(log, "unknown node in dictionaries file: '" + key + "', 'dictionary'");
continue;
}
const auto & prefix = key + '.';
const auto & name = config->getString(prefix + "name");
if (name.empty())
{
LOG_WARNING(log, "dictionary name cannot be empty");
continue;
}
auto dict_ptr = DictionaryFactory::instance().create(name, *config, prefix, context);
if (!dict_ptr->isCached())
{
const auto & lifetime = dict_ptr->getLifetime();
std::uniform_int_distribution<std::uint64_t> distribution{lifetime.min_sec, lifetime.max_sec};
update_times[name] = std::chrono::system_clock::now() +
std::chrono::seconds{distribution(rnd_engine)};
}
auto it = external_dictionaries.find(name);
/// add new dictionary or update an existing version
if (it == std::end(external_dictionaries))
{
const std::lock_guard<std::mutex> lock{external_dictionaries_mutex};
external_dictionaries.emplace(name, std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release()));
}
else
it->second->set(dict_ptr.release());
}
catch (...)
{
handleException();
}
}
}
else
{
/// periodic update
for (auto & dictionary : external_dictionaries)
{
try
{
auto current = dictionary.second->get();
/// update only non-cached dictionaries
if (!current->isCached())
{
auto & update_time = update_times[current->getName()];
/// check that timeout has passed
if (std::chrono::system_clock::now() < update_time)
continue;
/// check source modified
if (current->getSource()->isModified())
{
/// create new version of dictionary
auto new_version = current->clone();
dictionary.second->set(new_version.release());
}
/// calculate next update time
const auto & lifetime = current->getLifetime();
std::uniform_int_distribution<std::uint64_t> distribution{lifetime.min_sec, lifetime.max_sec};
update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
}
}
catch (...)
{
handleException();
}
}
}
}
}

View File

@ -0,0 +1,50 @@
#include <DB/Dictionaries/DictionaryFactory.h>
#include <DB/Dictionaries/DictionarySourceFactory.h>
#include <DB/Dictionaries/FlatDictionary.h>
#include <DB/Dictionaries/HashedDictionary.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <Yandex/singleton.h>
#include <statdaemons/ext/memory.hpp>
namespace DB
{
DictionaryPtr DictionaryFactory::create(const std::string & name, Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, Context & context) const
{
auto dict_struct = DictionaryStructure::fromConfig(config, config_prefix + "structure");
auto source_ptr = DictionarySourceFactory::instance().create(
config, config_prefix + "source.", dict_struct, context);
const auto dict_lifetime = DictionaryLifetime::fromConfig(config, config_prefix + "lifetime");
const auto & layout_prefix = config_prefix + "layout.";
if (config.has(layout_prefix + "flat"))
{
return ext::make_unique<FlatDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime);
}
else if (config.has(layout_prefix + "hashed"))
{
return ext::make_unique<HashedDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime);
}
else if (config.has(layout_prefix + "cache"))
{
const auto size = config.getInt(layout_prefix + "cache.size", 0);
if (size == 0)
throw Exception{
"Dictionary of type 'cache' cannot have size of 0 bytes",
ErrorCodes::TOO_SMALL_BUFFER_SIZE
};
throw Exception{
"Dictionary of type 'cache' is not yet implemented",
ErrorCodes::NOT_IMPLEMENTED
};
}
throw Exception{"No dictionary type specified", ErrorCodes::BAD_ARGUMENTS};
};
}

View File

@ -238,7 +238,7 @@ UsersConfigReloader::~UsersConfigReloader()
quit = true;
thread.join();
}
catch(...)
catch (...)
{
tryLogCurrentException("~UsersConfigReloader");
}

View File

@ -49,7 +49,7 @@ public:
std::unique_ptr<OLAP::QueryConverter> olap_converter;
protected:
void initialize(Application& self)
void initialize(Application & self)
{
Daemon::initialize(self);
logger().information("starting up");
@ -61,7 +61,7 @@ protected:
Daemon::uninitialize();
}
int main(const std::vector<std::string>& args);
int main(const std::vector<std::string> & args);
};
}