This commit is contained in:
Alexey Arno 2015-02-09 14:40:54 +03:00
commit 78283c6b85
34 changed files with 2868 additions and 42 deletions

View File

@ -20,8 +20,9 @@ DST=${2:-$SOURCE_PATH/../headers};
PATH="/usr/local/bin:/usr/local/sbin:/usr/bin:$PATH"
# Опция -mcx16 для того, чтобы выбиралось больше заголовочных файлов (с запасом).
for i in $(clang -M -xc++ -std=gnu++1y -Wall -Werror -msse4 -mpopcnt -O3 -g -fPIC \
for i in $(clang -M -xc++ -std=gnu++1y -Wall -Werror -msse4 -mcx16 -mpopcnt -O3 -g -fPIC \
$(cat "$SOURCE_PATH/CMakeLists.txt" | grep include_directories | grep -v METRICA_BINARY_DIR | sed -e "s!\${METRICA_SOURCE_DIR}!$SOURCE_PATH!; s!include_directories (!-I !; s!)!!;" | tr '\n' ' ') \
"$SOURCE_PATH/dbms/include/DB/Interpreters/SpecializedAggregator.h" |
tr -d '\\' |

View File

@ -232,6 +232,17 @@ public:
c_end = c_start + byte_size(n);
}
void resize_fill(size_t n, const T & value)
{
size_t old_size = size();
if (n > old_size)
{
reserve(n);
std::fill(t_end(), reinterpret_cast<T *>(c_end + n - old_size), value);
}
c_end = c_start + byte_size(n);
}
void clear()
{
c_end = c_start;

View File

@ -21,7 +21,7 @@ struct StringRef
StringRef(const char * data_, size_t size_) : data(data_), size(size_) {}
StringRef(const unsigned char * data_, size_t size_) : data(reinterpret_cast<const char *>(data_)), size(size_) {}
StringRef(const std::string & s) : data(s.data()), size(s.size()) {}
StringRef() {}
StringRef() = default;
std::string toString() const { return std::string(data, size); }
};
@ -48,7 +48,7 @@ inline bool memequalSSE2Wide(const char * p1, const char * p2, size_t size)
if ( compareSSE2(p1, p2)
&& compareSSE2(p1 + 16, p2 + 16)
&& compareSSE2(p1 + 32, p2 + 32)
&& compareSSE2(p1 + 40, p2 + 40))
&& compareSSE2(p1 + 48, p2 + 48))
{
p1 += 64;
p2 += 64;

View File

@ -0,0 +1,127 @@
#pragma once
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
#include <DB/Interpreters/executeQuery.h>
#include <statdaemons/ext/range.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Net/NetworkInterface.h>
namespace DB
{
const auto max_connections = 1;
class ClickhouseDictionarySource final : public IDictionarySource
{
static const auto max_block_size = 8192;
public:
ClickhouseDictionarySource(const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block, Context & context)
: host{config.getString(config_prefix + "host")},
port(config.getInt(config_prefix + "port")),
user{config.getString(config_prefix + "user", "")},
password{config.getString(config_prefix + "password", "")},
db{config.getString(config_prefix + "db", "")},
table{config.getString(config_prefix + "table")},
sample_block{sample_block}, context(context),
is_local{isLocal(host, port)},
pool{is_local ? nullptr : ext::make_unique<ConnectionPool>(
max_connections, host, port, db, user, password, context.getDataTypeFactory(),
"ClickhouseDictionarySource")
},
load_all_query{composeLoadAllQuery(sample_block, table)}
{}
ClickhouseDictionarySource(const ClickhouseDictionarySource & other)
: host{other.host}, port{other.port}, user{other.user}, password{other.password},
db{other.db}, table{other.db},
sample_block{other.sample_block}, context(other.context),
is_local{other.is_local},
pool{is_local ? nullptr : ext::make_unique<ConnectionPool>(
max_connections, host, port, db, user, password, context.getDataTypeFactory(),
"ClickhouseDictionarySource")},
load_all_query{other.load_all_query}
{}
BlockInputStreamPtr loadAll() override
{
if (is_local)
return executeQuery(load_all_query, context).in;
return new RemoteBlockInputStream{pool.get(), load_all_query, nullptr};
}
BlockInputStreamPtr loadId(const std::uint64_t id) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
/// @todo check update time somehow
bool isModified() const override { return true; }
DictionarySourcePtr clone() const override { return ext::make_unique<ClickhouseDictionarySource>(*this); }
private:
static std::string composeLoadAllQuery(const Block & block, const std::string & table)
{
std::string query{"SELECT "};
auto first = true;
for (const auto idx : ext::range(0, block.columns()))
{
if (!first)
query += ", ";
query += block.getByPosition(idx).name;
first = false;
}
query += " FROM " + table + ';';
return query;
}
static bool isLocal(const std::string & host, const UInt16 port)
{
const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0);
static auto interfaces = Poco::Net::NetworkInterface::list();
if (clickhouse_port == port)
{
return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
[&] (const Poco::Net::NetworkInterface & interface) {
return interface.address() == Poco::Net::IPAddress(host);
});
}
return false;
}
const std::string host;
const UInt16 port;
const std::string user;
const std::string password;
const std::string db;
const std::string table;
Block sample_block;
Context & context;
const bool is_local;
std::unique_ptr<ConnectionPool> pool;
const std::string load_all_query;
};
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <DB/Dictionaries/IDictionary.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
class Context;
class DictionaryFactory : public Singleton<DictionaryFactory>
{
public:
DictionaryPtr create(const std::string & name, Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, Context & context) const;
};
}

View File

@ -0,0 +1,71 @@
#pragma once
#include <DB/Core/Block.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Dictionaries/FileDictionarySource.h>
#include <DB/Dictionaries/MysqlDictionarySource.h>
#include <DB/Dictionaries/ClickhouseDictionarySource.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <Yandex/singleton.h>
#include <statdaemons/ext/memory.hpp>
namespace DB
{
namespace
{
Block createSampleBlock(const DictionaryStructure & dict_struct, const Context & context)
{
Block block{
ColumnWithNameAndType{
new ColumnUInt64,
new DataTypeUInt64,
dict_struct.id_name
}
};
for (const auto & attribute : dict_struct.attributes)
{
const auto & type = context.getDataTypeFactory().get(attribute.type);
block.insert(ColumnWithNameAndType{
type->createColumn(), type, attribute.name
});
}
return block;
}
}
class DictionarySourceFactory : public Singleton<DictionarySourceFactory>
{
public:
DictionarySourcePtr create(Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const DictionaryStructure & dict_struct,
Context & context) const
{
auto sample_block = createSampleBlock(dict_struct, context);
if (config.has(config_prefix + "file"))
{
const auto & filename = config.getString(config_prefix + "file.path");
const auto & format = config.getString(config_prefix + "file.format");
return ext::make_unique<FileDictionarySource>(filename, format, sample_block, context);
}
else if (config.has(config_prefix + "mysql"))
{
return ext::make_unique<MysqlDictionarySource>(config, config_prefix + "mysql", sample_block, context);
}
else if (config.has(config_prefix + "clickhouse"))
{
return ext::make_unique<ClickhouseDictionarySource>(config, config_prefix + "clickhouse.",
sample_block, context);
}
throw Exception{"unsupported source type"};
}
};
}

View File

@ -0,0 +1,157 @@
#pragma once
#include <DB/Core/ErrorCodes.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <vector>
#include <string>
#include <map>
namespace DB
{
enum class attribute_type
{
uint8,
uint16,
uint32,
uint64,
int8,
int16,
int32,
int64,
float32,
float64,
string
};
inline attribute_type getAttributeTypeByName(const std::string & type)
{
static const std::unordered_map<std::string, attribute_type> dictionary{
{ "UInt8", attribute_type::uint8 },
{ "UInt16", attribute_type::uint16 },
{ "UInt32", attribute_type::uint32 },
{ "UInt64", attribute_type::uint64 },
{ "Int8", attribute_type::int8 },
{ "Int16", attribute_type::int16 },
{ "Int32", attribute_type::int32 },
{ "Int64", attribute_type::int64 },
{ "Float32", attribute_type::float32 },
{ "Float64", attribute_type::float64 },
{ "String", attribute_type::string },
};
const auto it = dictionary.find(type);
if (it != std::end(dictionary))
return it->second;
throw Exception{
"Unknown type " + type,
ErrorCodes::UNKNOWN_TYPE
};
}
inline std::string toString(const attribute_type type)
{
switch (type)
{
case attribute_type::uint8: return "UInt8";
case attribute_type::uint16: return "UInt16";
case attribute_type::uint32: return "UInt32";
case attribute_type::uint64: return "UInt64";
case attribute_type::int8: return "Int8";
case attribute_type::int16: return "Int16";
case attribute_type::int32: return "Int32";
case attribute_type::int64: return "Int64";
case attribute_type::float32: return "Float32";
case attribute_type::float64: return "Float64";
case attribute_type::string: return "String";
}
throw Exception{
"Unknown attribute_type " + toString(type),
ErrorCodes::ARGUMENT_OUT_OF_BOUND
};
}
struct DictionaryLifetime
{
std::uint64_t min_sec;
std::uint64_t max_sec;
static DictionaryLifetime fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
const auto & lifetime_min_key = config_prefix + ".min";
const auto has_min = config.has(lifetime_min_key);
const std::uint64_t min_update_time = has_min ? config.getInt(lifetime_min_key) : config.getInt(config_prefix);
const std::uint64_t max_update_time = has_min ? config.getInt(config_prefix + ".max") : min_update_time;
return { min_update_time, max_update_time };
}
};
struct DictionaryAttribute
{
std::string name;
std::string type;
std::string null_value;
bool hierarchical;
bool injective;
};
struct DictionaryStructure
{
std::string id_name;
std::vector<DictionaryAttribute> attributes;
static DictionaryStructure fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
const auto & id_name = config.getString(config_prefix + ".id.name");
if (id_name.empty())
throw Exception{
"No 'id' specified for dictionary",
ErrorCodes::BAD_ARGUMENTS
};
DictionaryStructure result{id_name};
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
auto has_hierarchy = false;
for (const auto & key : keys)
{
if (0 != strncmp(key.data(), "attribute", strlen("attribute")))
continue;
const auto & prefix = config_prefix + '.' + key + '.';
const auto & name = config.getString(prefix + "name");
const auto & type = config.getString(prefix + "type");
const auto & null_value = config.getString(prefix + "null_value");
const auto hierarchical = config.getBool(prefix + "hierarchical", false);
const auto injective = config.getBool(prefix + "injective", false);
if (name.empty() || type.empty())
throw Exception{
"Properties 'name' and 'type' of an attribute cannot be empty",
ErrorCodes::BAD_ARGUMENTS
};
if (has_hierarchy && hierarchical)
throw Exception{
"Only one hierarchical attribute supported",
ErrorCodes::BAD_ARGUMENTS
};
has_hierarchy = has_hierarchy || hierarchical;
result.attributes.emplace_back(DictionaryAttribute{name, type, null_value, hierarchical, injective});
}
if (result.attributes.empty())
throw Exception{
"Dictionary has no attributes defined",
ErrorCodes::BAD_ARGUMENTS
};
return result;
}
};
}

View File

@ -0,0 +1,70 @@
#pragma once
#include <DB/Interpreters/Context.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/OwningBufferBlockInputStream.h>
#include <Poco/Timestamp.h>
#include <Poco/File.h>
namespace DB
{
class FileDictionarySource final : public IDictionarySource
{
static const auto max_block_size = 8192;
public:
FileDictionarySource(const std::string & filename, const std::string & format, Block & sample_block,
Context & context)
: filename{filename}, format{format}, sample_block{sample_block}, context(context),
last_modification{getLastModification()}
{}
FileDictionarySource(const FileDictionarySource & other)
: filename{other.filename}, format{other.format},
sample_block{other.sample_block}, context(other.context),
last_modification{other.last_modification}
{}
BlockInputStreamPtr loadAll() override
{
auto in_ptr = ext::make_unique<ReadBufferFromFile>(filename);
auto stream = context.getFormatFactory().getInput(
format, *in_ptr, sample_block, max_block_size, context.getDataTypeFactory());
last_modification = getLastModification();
return new OwningBufferBlockInputStream{stream, std::move(in_ptr)};
}
BlockInputStreamPtr loadId(const std::uint64_t id) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
bool isModified() const override { return getLastModification() > last_modification; }
DictionarySourcePtr clone() const override { return ext::make_unique<FileDictionarySource>(*this); }
private:
Poco::Timestamp getLastModification() const { return Poco::File{filename}.getLastModified(); }
const std::string filename;
const std::string format;
Block sample_block;
Context & context;
Poco::Timestamp last_modification;
};
}

View File

@ -0,0 +1,383 @@
#pragma once
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/DictionarySourceFactory.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <statdaemons/ext/range.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <vector>
namespace DB
{
const auto initial_array_size = 1024;
const auto max_array_size = 500000;
class FlatDictionary final : public IDictionary
{
public:
FlatDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime)
: name{name}, dict_struct(dict_struct),
source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime)
{
createAttributes();
loadData();
}
FlatDictionary(const FlatDictionary & other)
: FlatDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime}
{}
std::string getName() const override { return name; }
std::string getTypeName() const override { return "FlatDictionary"; }
bool isCached() const override { return false; }
DictionaryPtr clone() const override { return ext::make_unique<FlatDictionary>(*this); }
const IDictionarySource * const getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
bool hasHierarchy() const override { return hierarchical_attribute; }
id_t toParent(const id_t id) const override
{
const auto attr = hierarchical_attribute;
switch (hierarchical_attribute->type)
{
case attribute_type::uint8: return id < attr->uint8_array->size() ? (*attr->uint8_array)[id] : attr->uint8_null_value;
case attribute_type::uint16: return id < attr->uint16_array->size() ? (*attr->uint16_array)[id] : attr->uint16_null_value;
case attribute_type::uint32: return id < attr->uint32_array->size() ? (*attr->uint32_array)[id] : attr->uint32_null_value;
case attribute_type::uint64: return id < attr->uint64_array->size() ? (*attr->uint64_array)[id] : attr->uint64_null_value;
case attribute_type::int8: return id < attr->int8_array->size() ? (*attr->int8_array)[id] : attr->int8_null_value;
case attribute_type::int16: return id < attr->int16_array->size() ? (*attr->int16_array)[id] : attr->int16_null_value;
case attribute_type::int32: return id < attr->int32_array->size() ? (*attr->int32_array)[id] : attr->int32_null_value;
case attribute_type::int64: return id < attr->int64_array->size() ? (*attr->int64_array)[id] : attr->int64_null_value;
case attribute_type::float32:
case attribute_type::float64:
case attribute_type::string:
break;
}
throw Exception{
"Hierarchical attribute has non-integer type " + toString(hierarchical_attribute->type),
ErrorCodes::TYPE_MISMATCH
};
}
#define DECLARE_SAFE_GETTER(TYPE, NAME, LC_TYPE) \
TYPE get##NAME(const std::string & attribute_name, const id_t id) const override\
{\
const auto idx = getAttributeIndex(attribute_name);\
const auto & attribute = attributes[idx];\
if (attribute.type != attribute_type::LC_TYPE)\
throw Exception{\
"Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\
ErrorCodes::TYPE_MISMATCH\
};\
if (id < attribute.LC_TYPE##_array->size())\
return (*attribute.LC_TYPE##_array)[id];\
return attribute.LC_TYPE##_null_value;\
}
DECLARE_SAFE_GETTER(UInt8, UInt8, uint8)
DECLARE_SAFE_GETTER(UInt16, UInt16, uint16)
DECLARE_SAFE_GETTER(UInt32, UInt32, uint32)
DECLARE_SAFE_GETTER(UInt64, UInt64, uint64)
DECLARE_SAFE_GETTER(Int8, Int8, int8)
DECLARE_SAFE_GETTER(Int16, Int16, int16)
DECLARE_SAFE_GETTER(Int32, Int32, int32)
DECLARE_SAFE_GETTER(Int64, Int64, int64)
DECLARE_SAFE_GETTER(Float32, Float32, float32)
DECLARE_SAFE_GETTER(Float64, Float64, float64)
DECLARE_SAFE_GETTER(StringRef, String, string)
#undef DECLARE_SAFE_GETTER
std::size_t getAttributeIndex(const std::string & attribute_name) const override
{
const auto it = attribute_index_by_name.find(attribute_name);
if (it == std::end(attribute_index_by_name))
throw Exception{
"No such attribute '" + attribute_name + "'",
ErrorCodes::BAD_ARGUMENTS
};
return it->second;
}
#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\
bool is##NAME(const std::size_t attribute_idx) const override\
{\
return attributes[attribute_idx].type == attribute_type::LC_NAME;\
}
DECLARE_TYPE_CHECKER(UInt8, uint8)
DECLARE_TYPE_CHECKER(UInt16, uint16)
DECLARE_TYPE_CHECKER(UInt32, uint32)
DECLARE_TYPE_CHECKER(UInt64, uint64)
DECLARE_TYPE_CHECKER(Int8, int8)
DECLARE_TYPE_CHECKER(Int16, int16)
DECLARE_TYPE_CHECKER(Int32, int32)
DECLARE_TYPE_CHECKER(Int64, int64)
DECLARE_TYPE_CHECKER(Float32, float32)
DECLARE_TYPE_CHECKER(Float64, float64)
DECLARE_TYPE_CHECKER(String, string)
#undef DECLARE_TYPE_CHECKER
#define DECLARE_UNSAFE_GETTER(TYPE, NAME, LC_NAME)\
TYPE get##NAME##Unsafe(const std::size_t attribute_idx, const id_t id) const override\
{\
const auto & attribute = attributes[attribute_idx];\
if (id < attribute.LC_NAME##_array->size())\
return (*attribute.LC_NAME##_array)[id];\
return attribute.LC_NAME##_null_value;\
}
DECLARE_UNSAFE_GETTER(UInt8, UInt8, uint8)
DECLARE_UNSAFE_GETTER(UInt16, UInt16, uint16)
DECLARE_UNSAFE_GETTER(UInt32, UInt32, uint32)
DECLARE_UNSAFE_GETTER(UInt64, UInt64, uint64)
DECLARE_UNSAFE_GETTER(Int8, Int8, int8)
DECLARE_UNSAFE_GETTER(Int16, Int16, int16)
DECLARE_UNSAFE_GETTER(Int32, Int32, int32)
DECLARE_UNSAFE_GETTER(Int64, Int64, int64)
DECLARE_UNSAFE_GETTER(Float32, Float32, float32)
DECLARE_UNSAFE_GETTER(Float64, Float64, float64)
DECLARE_UNSAFE_GETTER(StringRef, String, string)
#undef DECLARE_UNSAFE_GETTER
private:
struct attribute_t
{
attribute_type type;
UInt8 uint8_null_value;
UInt16 uint16_null_value;
UInt32 uint32_null_value;
UInt64 uint64_null_value;
Int8 int8_null_value;
Int16 int16_null_value;
Int32 int32_null_value;
Int64 int64_null_value;
Float32 float32_null_value;
Float64 float64_null_value;
String string_null_value;
std::unique_ptr<PODArray<UInt8>> uint8_array;
std::unique_ptr<PODArray<UInt16>> uint16_array;
std::unique_ptr<PODArray<UInt32>> uint32_array;
std::unique_ptr<PODArray<UInt64>> uint64_array;
std::unique_ptr<PODArray<Int8>> int8_array;
std::unique_ptr<PODArray<Int16>> int16_array;
std::unique_ptr<PODArray<Int32>> int32_array;
std::unique_ptr<PODArray<Int64>> int64_array;
std::unique_ptr<PODArray<Float32>> float32_array;
std::unique_ptr<PODArray<Float64>> float64_array;
std::unique_ptr<Arena> string_arena;
std::unique_ptr<PODArray<StringRef>> string_array;
};
void createAttributes()
{
const auto size = dict_struct.attributes.size();
attributes.reserve(size);
for (const auto & attribute : dict_struct.attributes)
{
attribute_index_by_name.emplace(attribute.name, attributes.size());
attributes.push_back(std::move(createAttributeWithType(getAttributeTypeByName(attribute.type),
attribute.null_value)));
if (attribute.hierarchical)
hierarchical_attribute = &attributes.back();
}
}
void loadData()
{
auto stream = source_ptr->loadAll();
while (const auto block = stream->read())
{
const auto & id_column = *block.getByPosition(0).column;
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *block.getByPosition(attribute_idx + 1).column;
auto & attribute = attributes[attribute_idx];
for (const auto row_idx : ext::range(0, id_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
}
attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value)
{
attribute_t attr{type};
switch (type)
{
case attribute_type::uint8:
attr.uint8_null_value = DB::parse<UInt8>(null_value);
attr.uint8_array.reset(new PODArray<UInt8>);
attr.uint8_array->resize_fill(initial_array_size, attr.uint8_null_value);
break;
case attribute_type::uint16:
attr.uint16_null_value = DB::parse<UInt16>(null_value);
attr.uint16_array.reset(new PODArray<UInt16>);
attr.uint16_array->resize_fill(initial_array_size, attr.uint16_null_value);
break;
case attribute_type::uint32:
attr.uint32_null_value = DB::parse<UInt32>(null_value);
attr.uint32_array.reset(new PODArray<UInt32>);
attr.uint32_array->resize_fill(initial_array_size, attr.uint32_null_value);
break;
case attribute_type::uint64:
attr.uint64_null_value = DB::parse<UInt64>(null_value);
attr.uint64_array.reset(new PODArray<UInt64>);
attr.uint64_array->resize_fill(initial_array_size, attr.uint64_null_value);
break;
case attribute_type::int8:
attr.int8_null_value = DB::parse<Int8>(null_value);
attr.int8_array.reset(new PODArray<Int8>);
attr.int8_array->resize_fill(initial_array_size, attr.int8_null_value);
break;
case attribute_type::int16:
attr.int16_null_value = DB::parse<Int16>(null_value);
attr.int16_array.reset(new PODArray<Int16>);
attr.int16_array->resize_fill(initial_array_size, attr.int16_null_value);
break;
case attribute_type::int32:
attr.int32_null_value = DB::parse<Int32>(null_value);
attr.int32_array.reset(new PODArray<Int32>);
attr.int32_array->resize_fill(initial_array_size, attr.int32_null_value);
break;
case attribute_type::int64:
attr.int64_null_value = DB::parse<Int64>(null_value);
attr.int64_array.reset(new PODArray<Int64>);
attr.int64_array->resize_fill(initial_array_size, attr.int64_null_value);
break;
case attribute_type::float32:
attr.float32_null_value = DB::parse<Float32>(null_value);
attr.float32_array.reset(new PODArray<Float32>);
attr.float32_array->resize_fill(initial_array_size, attr.float32_null_value);
break;
case attribute_type::float64:
attr.float64_null_value = DB::parse<Float64>(null_value);
attr.float64_array.reset(new PODArray<Float64>);
attr.float64_array->resize_fill(initial_array_size, attr.float64_null_value);
break;
case attribute_type::string:
attr.string_null_value = null_value;
attr.string_arena.reset(new Arena);
attr.string_array.reset(new PODArray<StringRef>);
attr.string_array->resize_fill(initial_array_size, attr.string_null_value);
break;
}
return attr;
}
void setAttributeValue(attribute_t & attribute, const id_t id, const Field & value)
{
if (id >= max_array_size)
throw Exception{
"Identifier should be less than " + toString(max_array_size),
ErrorCodes::ARGUMENT_OUT_OF_BOUND
};
switch (attribute.type)
{
case attribute_type::uint8:
{
if (id >= attribute.uint8_array->size())
attribute.uint8_array->resize_fill(id, attribute.uint8_null_value);
(*attribute.uint8_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::uint16:
{
if (id >= attribute.uint16_array->size())
attribute.uint16_array->resize_fill(id, attribute.uint16_null_value);
(*attribute.uint16_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::uint32:
{
if (id >= attribute.uint32_array->size())
attribute.uint32_array->resize_fill(id, attribute.uint32_null_value);
(*attribute.uint32_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::uint64:
{
if (id >= attribute.uint64_array->size())
attribute.uint64_array->resize_fill(id, attribute.uint64_null_value);
(*attribute.uint64_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::int8:
{
if (id >= attribute.int8_array->size())
attribute.int8_array->resize_fill(id, attribute.int8_null_value);
(*attribute.int8_array)[id] = value.get<Int64>();
break;
}
case attribute_type::int16:
{
if (id >= attribute.int16_array->size())
attribute.int16_array->resize_fill(id, attribute.int16_null_value);
(*attribute.int16_array)[id] = value.get<Int64>();
break;
}
case attribute_type::int32:
{
if (id >= attribute.int32_array->size())
attribute.int32_array->resize_fill(id, attribute.int32_null_value);
(*attribute.int32_array)[id] = value.get<Int64>();
break;
}
case attribute_type::int64:
{
if (id >= attribute.int64_array->size())
attribute.int64_array->resize_fill(id, attribute.int64_null_value);
(*attribute.int64_array)[id] = value.get<Int64>();
break;
}
case attribute_type::float32:
{
if (id >= attribute.float32_array->size())
attribute.float32_array->resize_fill(id, attribute.float32_null_value);
(*attribute.float32_array)[id] = value.get<Float64>();
break;
}
case attribute_type::float64:
{
if (id >= attribute.float64_array->size())
attribute.float64_array->resize_fill(id, attribute.float64_null_value);
(*attribute.float64_array)[id] = value.get<Float64>();
break;
}
case attribute_type::string:
{
if (id >= attribute.string_array->size())
attribute.string_array->resize_fill(id, attribute.string_null_value);
const auto & string = value.get<String>();
const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size());
(*attribute.string_array)[id] = StringRef{string_in_arena, string.size()};
break;
}
};
}
const std::string name;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
std::map<std::string, std::size_t> attribute_index_by_name;
std::vector<attribute_t> attributes;
const attribute_t * hierarchical_attribute = nullptr;
};
}

View File

@ -0,0 +1,376 @@
#pragma once
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/DictionarySourceFactory.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Common/HashTable/HashMap.h>
#include <statdaemons/ext/range.hpp>
#include <statdaemons/ext/memory.hpp>
namespace DB
{
class HashedDictionary final : public IDictionary
{
public:
HashedDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime)
: name{name}, dict_struct(dict_struct),
source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime)
{
createAttributes();
loadData();
}
HashedDictionary(const HashedDictionary & other)
: HashedDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime}
{}
std::string getName() const override { return name; }
std::string getTypeName() const override { return "HashedDictionary"; }
bool isCached() const override { return false; }
DictionaryPtr clone() const override { return ext::make_unique<HashedDictionary>(*this); }
const IDictionarySource * const getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
bool hasHierarchy() const override { return hierarchical_attribute; }
id_t toParent(const id_t id) const override
{
const auto attr = hierarchical_attribute;
switch (hierarchical_attribute->type)
{
case attribute_type::uint8:
{
const auto it = attr->uint8_map->find(id);
return it != attr->uint8_map->end() ? it->second : attr->uint8_null_value;
}
case attribute_type::uint16:
{
const auto it = attr->uint16_map->find(id);
return it != attr->uint16_map->end() ? it->second : attr->uint16_null_value;
}
case attribute_type::uint32:
{
const auto it = attr->uint32_map->find(id);
return it != attr->uint32_map->end() ? it->second : attr->uint32_null_value;
}
case attribute_type::uint64:
{
const auto it = attr->uint64_map->find(id);
return it != attr->uint64_map->end() ? it->second : attr->uint64_null_value;
}
case attribute_type::int8:
{
const auto it = attr->int8_map->find(id);
return it != attr->int8_map->end() ? it->second : attr->int8_null_value;
}
case attribute_type::int16:
{
const auto it = attr->int16_map->find(id);
return it != attr->int16_map->end() ? it->second : attr->int16_null_value;
}
case attribute_type::int32:
{
const auto it = attr->int32_map->find(id);
return it != attr->int32_map->end() ? it->second : attr->int32_null_value;
}
case attribute_type::int64:
{
const auto it = attr->int64_map->find(id);
return it != attr->int64_map->end() ? it->second : attr->int64_null_value;
}
case attribute_type::float32:
case attribute_type::float64:
case attribute_type::string:
break;
};
throw Exception{
"Hierarchical attribute has non-integer type " + toString(hierarchical_attribute->type),
ErrorCodes::TYPE_MISMATCH
};
}
#define DECLARE_SAFE_GETTER(TYPE, NAME, LC_TYPE) \
TYPE get##NAME(const std::string & attribute_name, const id_t id) const override\
{\
const auto idx = getAttributeIndex(attribute_name);\
const auto & attribute = attributes[idx];\
if (attribute.type != attribute_type::LC_TYPE)\
throw Exception{\
"Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\
ErrorCodes::TYPE_MISMATCH\
};\
const auto it = attribute.LC_TYPE##_map->find(id);\
if (it != attribute.LC_TYPE##_map->end())\
return it->second;\
return attribute.LC_TYPE##_null_value;\
}
DECLARE_SAFE_GETTER(UInt8, UInt8, uint8)
DECLARE_SAFE_GETTER(UInt16, UInt16, uint16)
DECLARE_SAFE_GETTER(UInt32, UInt32, uint32)
DECLARE_SAFE_GETTER(UInt64, UInt64, uint64)
DECLARE_SAFE_GETTER(Int8, Int8, int8)
DECLARE_SAFE_GETTER(Int16, Int16, int16)
DECLARE_SAFE_GETTER(Int32, Int32, int32)
DECLARE_SAFE_GETTER(Int64, Int64, int64)
DECLARE_SAFE_GETTER(Float32, Float32, float32)
DECLARE_SAFE_GETTER(Float64, Float64, float64)
DECLARE_SAFE_GETTER(StringRef, String, string)
#undef DECLARE_SAFE_GETTER
std::size_t getAttributeIndex(const std::string & attribute_name) const override
{
const auto it = attribute_index_by_name.find(attribute_name);
if (it == std::end(attribute_index_by_name))
throw Exception{
"No such attribute '" + attribute_name + "'",
ErrorCodes::BAD_ARGUMENTS
};
return it->second;
}
#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\
bool is##NAME(const std::size_t attribute_idx) const override\
{\
return attributes[attribute_idx].type == attribute_type::LC_NAME;\
}
DECLARE_TYPE_CHECKER(UInt8, uint8)
DECLARE_TYPE_CHECKER(UInt16, uint16)
DECLARE_TYPE_CHECKER(UInt32, uint32)
DECLARE_TYPE_CHECKER(UInt64, uint64)
DECLARE_TYPE_CHECKER(Int8, int8)
DECLARE_TYPE_CHECKER(Int16, int16)
DECLARE_TYPE_CHECKER(Int32, int32)
DECLARE_TYPE_CHECKER(Int64, int64)
DECLARE_TYPE_CHECKER(Float32, float32)
DECLARE_TYPE_CHECKER(Float64, float64)
DECLARE_TYPE_CHECKER(String, string)
#undef DECLARE_TYPE_CHECKER
#define DECLARE_UNSAFE_GETTER(TYPE, NAME, LC_NAME)\
TYPE get##NAME##Unsafe(const std::size_t attribute_idx, const id_t id) const override\
{\
const auto & attribute = attributes[attribute_idx];\
const auto it = attribute.LC_NAME##_map->find(id);\
if (it != attribute.LC_NAME##_map->end())\
return it->second;\
return attribute.LC_NAME##_null_value;\
}
DECLARE_UNSAFE_GETTER(UInt8, UInt8, uint8)
DECLARE_UNSAFE_GETTER(UInt16, UInt16, uint16)
DECLARE_UNSAFE_GETTER(UInt32, UInt32, uint32)
DECLARE_UNSAFE_GETTER(UInt64, UInt64, uint64)
DECLARE_UNSAFE_GETTER(Int8, Int8, int8)
DECLARE_UNSAFE_GETTER(Int16, Int16, int16)
DECLARE_UNSAFE_GETTER(Int32, Int32, int32)
DECLARE_UNSAFE_GETTER(Int64, Int64, int64)
DECLARE_UNSAFE_GETTER(Float32, Float32, float32)
DECLARE_UNSAFE_GETTER(Float64, Float64, float64)
DECLARE_UNSAFE_GETTER(StringRef, String, string)
#undef DECLARE_UNSAFE_GETTER
private:
struct attribute_t
{
attribute_type type;
UInt8 uint8_null_value;
UInt16 uint16_null_value;
UInt32 uint32_null_value;
UInt64 uint64_null_value;
Int8 int8_null_value;
Int16 int16_null_value;
Int32 int32_null_value;
Int64 int64_null_value;
Float32 float32_null_value;
Float64 float64_null_value;
String string_null_value;
std::unique_ptr<HashMap<UInt64, UInt8>> uint8_map;
std::unique_ptr<HashMap<UInt64, UInt16>> uint16_map;
std::unique_ptr<HashMap<UInt64, UInt32>> uint32_map;
std::unique_ptr<HashMap<UInt64, UInt64>> uint64_map;
std::unique_ptr<HashMap<UInt64, Int8>> int8_map;
std::unique_ptr<HashMap<UInt64, Int16>> int16_map;
std::unique_ptr<HashMap<UInt64, Int32>> int32_map;
std::unique_ptr<HashMap<UInt64, Int64>> int64_map;
std::unique_ptr<HashMap<UInt64, Float32>> float32_map;
std::unique_ptr<HashMap<UInt64, Float64>> float64_map;
std::unique_ptr<Arena> string_arena;
std::unique_ptr<HashMap<UInt64, StringRef>> string_map;
};
void createAttributes()
{
const auto size = dict_struct.attributes.size();
attributes.reserve(size);
for (const auto & attribute : dict_struct.attributes)
{
attribute_index_by_name.emplace(attribute.name, attributes.size());
attributes.push_back(std::move(createAttributeWithType(getAttributeTypeByName(attribute.type),
attribute.null_value)));
if (attribute.hierarchical)
hierarchical_attribute = &attributes.back();
}
}
void loadData()
{
auto stream = source_ptr->loadAll();
while (const auto block = stream->read())
{
const auto & id_column = *block.getByPosition(0).column;
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *block.getByPosition(attribute_idx + 1).column;
auto & attribute = attributes[attribute_idx];
for (const auto row_idx : ext::range(0, id_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
}
attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value)
{
attribute_t attr{type};
switch (type)
{
case attribute_type::uint8:
attr.uint8_null_value = DB::parse<UInt8>(null_value);
attr.uint8_map.reset(new HashMap<UInt64, UInt8>);
break;
case attribute_type::uint16:
attr.uint16_null_value = DB::parse<UInt16>(null_value);
attr.uint16_map.reset(new HashMap<UInt64, UInt16>);
break;
case attribute_type::uint32:
attr.uint32_null_value = DB::parse<UInt32>(null_value);
attr.uint32_map.reset(new HashMap<UInt64, UInt32>);
break;
case attribute_type::uint64:
attr.uint64_null_value = DB::parse<UInt64>(null_value);
attr.uint64_map.reset(new HashMap<UInt64, UInt64>);
break;
case attribute_type::int8:
attr.int8_null_value = DB::parse<Int8>(null_value);
attr.int8_map.reset(new HashMap<UInt64, Int8>);
break;
case attribute_type::int16:
attr.int16_null_value = DB::parse<Int16>(null_value);
attr.int16_map.reset(new HashMap<UInt64, Int16>);
break;
case attribute_type::int32:
attr.int32_null_value = DB::parse<Int32>(null_value);
attr.int32_map.reset(new HashMap<UInt64, Int32>);
break;
case attribute_type::int64:
attr.int64_null_value = DB::parse<Int64>(null_value);
attr.int64_map.reset(new HashMap<UInt64, Int64>);
break;
case attribute_type::float32:
attr.float32_null_value = DB::parse<Float32>(null_value);
attr.float32_map.reset(new HashMap<UInt64, Float32>);
break;
case attribute_type::float64:
attr.float64_null_value = DB::parse<Float64>(null_value);
attr.float64_map.reset(new HashMap<UInt64, Float64>);
break;
case attribute_type::string:
attr.string_null_value = null_value;
attr.string_arena.reset(new Arena);
attr.string_map.reset(new HashMap<UInt64, StringRef>);
break;
}
return attr;
}
void setAttributeValue(attribute_t & attribute, const id_t id, const Field & value)
{
switch (attribute.type)
{
case attribute_type::uint8:
{
attribute.uint8_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::uint16:
{
attribute.uint16_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::uint32:
{
attribute.uint32_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::uint64:
{
attribute.uint64_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::int8:
{
attribute.int8_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::int16:
{
attribute.int16_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::int32:
{
attribute.int32_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::int64:
{
attribute.int64_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::float32:
{
attribute.float32_map->insert({ id, value.get<Float64>() });
break;
}
case attribute_type::float64:
{
attribute.float64_map->insert({ id, value.get<Float64>() });
break;
}
case attribute_type::string:
{
const auto & string = value.get<String>();
const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size());
attribute.string_map->insert({ id, StringRef{string_in_arena, string.size()} });
break;
}
};
}
const std::string name;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
std::map<std::string, std::size_t> attribute_index_by_name;
std::vector<attribute_t> attributes;
const attribute_t * hierarchical_attribute = nullptr;
};
}

View File

@ -0,0 +1,95 @@
#pragma once
#include <DB/Core/Field.h>
#include <DB/Core/StringRef.h>
#include <memory>
#include <Poco/Util/XMLConfiguration.h>
namespace DB
{
class IDictionarySource;
class IDictionary;
using DictionaryPtr = std::unique_ptr<IDictionary>;
class DictionaryLifetime;
class IDictionary
{
public:
using id_t = std::uint64_t;
virtual std::string getName() const = 0;
virtual std::string getTypeName() const = 0;
virtual bool isCached() const = 0;
virtual void reload() {}
virtual DictionaryPtr clone() const = 0;
virtual const IDictionarySource * const getSource() const = 0;
virtual const DictionaryLifetime & getLifetime() const = 0;
virtual bool hasHierarchy() const = 0;
/// do not call unless you ensure that hasHierarchy() returns true
virtual id_t toParent(id_t id) const = 0;
bool in(id_t child_id, const id_t ancestor_id) const
{
while (child_id != 0 && child_id != ancestor_id)
child_id = toParent(child_id);
return child_id != 0;
}
/// safe and slow functions, perform map lookup and type checks
virtual UInt8 getUInt8(const std::string & attribute_name, id_t id) const = 0;
virtual UInt16 getUInt16(const std::string & attribute_name, id_t id) const = 0;
virtual UInt32 getUInt32(const std::string & attribute_name, id_t id) const = 0;
virtual UInt64 getUInt64(const std::string & attribute_name, id_t id) const = 0;
virtual Int8 getInt8(const std::string & attribute_name, id_t id) const = 0;
virtual Int16 getInt16(const std::string & attribute_name, id_t id) const = 0;
virtual Int32 getInt32(const std::string & attribute_name, id_t id) const = 0;
virtual Int64 getInt64(const std::string & attribute_name, id_t id) const = 0;
virtual Float32 getFloat32(const std::string & attribute_name, id_t id) const = 0;
virtual Float64 getFloat64(const std::string & attribute_name, id_t id) const = 0;
virtual StringRef getString(const std::string & attribute_name, id_t id) const = 0;
/// unsafe functions for maximum performance, you are on your own ensuring type-safety
/// returns persistent attribute index for usage with following functions
virtual std::size_t getAttributeIndex(const std::string & attribute_name) const = 0;
/// type-checking functions
virtual bool isUInt8(std::size_t attribute_idx) const = 0;
virtual bool isUInt16(std::size_t attribute_idx) const = 0;
virtual bool isUInt32(std::size_t attribute_idx) const = 0;
virtual bool isUInt64(std::size_t attribute_idx) const = 0;
virtual bool isInt8(std::size_t attribute_idx) const = 0;
virtual bool isInt16(std::size_t attribute_idx) const = 0;
virtual bool isInt32(std::size_t attribute_idx) const = 0;
virtual bool isInt64(std::size_t attribute_idx) const = 0;
virtual bool isFloat32(std::size_t attribute_idx) const = 0;
virtual bool isFloat64(std::size_t attribute_idx) const = 0;
virtual bool isString(std::size_t attribute_idx) const = 0;
/// plain load from target container without any checks
virtual UInt8 getUInt8Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual UInt16 getUInt16Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual UInt32 getUInt32Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual UInt64 getUInt64Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual Int8 getInt8Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual Int16 getInt16Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual Int32 getInt32Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual Int64 getInt64Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual Float32 getFloat32Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual Float64 getFloat64Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual StringRef getStringUnsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual ~IDictionary() = default;
};
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <DB/DataStreams/IBlockInputStream.h>
#include <vector>
namespace DB
{
class IDictionarySource;
using DictionarySourcePtr = std::unique_ptr<IDictionarySource>;
class IDictionarySource
{
public:
virtual BlockInputStreamPtr loadAll() = 0;
virtual BlockInputStreamPtr loadId(const std::uint64_t id) = 0;
virtual BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) = 0;
virtual bool isModified() const = 0;
virtual DictionarySourcePtr clone() const = 0;
virtual ~IDictionarySource() = default;
};
}

View File

@ -0,0 +1,114 @@
#pragma once
#include <DB/Core/Block.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <statdaemons/ext/range.hpp>
#include <mysqlxx/Query.h>
#include <vector>
#include <string>
namespace DB
{
class MysqlBlockInputStream final : public IProfilingBlockInputStream
{
public:
MysqlBlockInputStream(mysqlxx::Query query, const Block & sample_block, const std::size_t max_block_size)
: query{std::move(query)}, result{query.use()}, sample_block{sample_block}, max_block_size{max_block_size}
{
types.reserve(sample_block.columns());
for (const auto idx : ext::range(0, sample_block.columns()))
{
const auto type = sample_block.getByPosition(idx).type.get();
if (typeid_cast<const DataTypeUInt8 *>(type))
types.push_back(attribute_type::uint8);
else if (typeid_cast<const DataTypeUInt16 *>(type))
types.push_back(attribute_type::uint16);
else if (typeid_cast<const DataTypeUInt32 *>(type))
types.push_back(attribute_type::uint32);
else if (typeid_cast<const DataTypeUInt64 *>(type))
types.push_back(attribute_type::uint64);
else if (typeid_cast<const DataTypeInt8 *>(type))
types.push_back(attribute_type::int8);
else if (typeid_cast<const DataTypeInt16 *>(type))
types.push_back(attribute_type::int16);
else if (typeid_cast<const DataTypeInt32 *>(type))
types.push_back(attribute_type::int32);
else if (typeid_cast<const DataTypeInt64 *>(type))
types.push_back(attribute_type::int64);
else if (typeid_cast<const DataTypeFloat32 *>(type))
types.push_back(attribute_type::float32);
else if (typeid_cast<const DataTypeInt64 *>(type))
types.push_back(attribute_type::float64);
else if (typeid_cast<const DataTypeString *>(type))
types.push_back(attribute_type::string);
else
throw Exception{
"Unsupported type " + type->getName(),
ErrorCodes::UNKNOWN_TYPE
};
}
}
String getName() const override { return "MysqlBlockInputStream"; }
String getID() const override
{
return "Mysql(" + query.str() + ")";
}
private:
Block readImpl() override
{
auto block = sample_block.cloneEmpty();
if (block.columns() != result.getNumFields())
throw Exception{
"mysqlxx::UserQueryResult contains " + toString(result.getNumFields()) + " columns while " +
toString(block.columns()) + " expected",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH
};
std::size_t rows = 0;
while (auto row = result.fetch())
{
for (const auto idx : ext::range(0, row.size()))
insertValue(block.getByPosition(idx).column, row[idx], types[idx]);
++rows;
if (rows == max_block_size)
break;
}
return rows == 0 ? Block{} : block;
};
static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const attribute_type type)
{
switch (type)
{
case attribute_type::uint8: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::uint16: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::uint32: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::uint64: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::int8: column->insert(static_cast<Int64>(value)); break;
case attribute_type::int16: column->insert(static_cast<Int64>(value)); break;
case attribute_type::int32: column->insert(static_cast<Int64>(value)); break;
case attribute_type::int64: column->insert(static_cast<Int64>(value)); break;
case attribute_type::float32: column->insert(static_cast<Float64>(value)); break;
case attribute_type::float64: column->insert(static_cast<Float64>(value)); break;
case attribute_type::string: column->insert(value.getString()); break;
}
}
mysqlxx::Query query;
mysqlxx::UseQueryResult result;
Block sample_block;
std::size_t max_block_size;
std::vector<attribute_type> types;
};
}

View File

@ -0,0 +1,108 @@
#pragma once
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/MysqlBlockInputStream.h>
#include <DB/Interpreters/Context.h>
#include <statdaemons/ext/range.hpp>
#include <mysqlxx/Pool.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
class MysqlDictionarySource final : public IDictionarySource
{
static const auto max_block_size = 8192;
public:
MysqlDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block, const Context & context)
: table{config.getString(config_prefix + ".table")},
sample_block{sample_block}, context(context),
pool{config, config_prefix},
load_all_query{composeLoadAllQuery(sample_block, table)},
last_modification{getLastModification()}
{}
MysqlDictionarySource(const MysqlDictionarySource & other)
: table{other.table},
sample_block{other.sample_block}, context(other.context),
pool{other.pool},
load_all_query{other.load_all_query}, last_modification{other.last_modification}
{}
BlockInputStreamPtr loadAll() override
{
return new MysqlBlockInputStream{pool.Get()->query(load_all_query), sample_block, max_block_size};
}
BlockInputStreamPtr loadId(const std::uint64_t id) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
bool isModified() const override { return getLastModification() > last_modification; }
DictionarySourcePtr clone() const override { return ext::make_unique<MysqlDictionarySource>(*this); }
private:
mysqlxx::DateTime getLastModification() const
{
const auto Update_time_idx = 12;
try
{
auto connection = pool.Get();
auto query = connection->query("SHOW TABLE STATUS LIKE '%" + table + "%';");
auto result = query.use();
auto row = result.fetch();
const auto & update_time = row[Update_time_idx];
return !update_time.isNull() ? update_time.getDateTime() : mysqlxx::DateTime{std::time(nullptr)};
}
catch (...)
{
tryLogCurrentException("MysqlDictionarySource");
}
return {};
}
static std::string composeLoadAllQuery(const Block & block, const std::string & table)
{
std::string query{"SELECT "};
auto first = true;
for (const auto idx : ext::range(0, block.columns()))
{
if (!first)
query += ", ";
query += block.getByPosition(idx).name;
first = false;
}
query += " FROM " + table + ';';
return query;
}
const std::string table;
Block sample_block;
const Context & context;
mutable mysqlxx::PoolWithFailover pool;
const std::string load_all_query;
mysqlxx::DateTime last_modification;
};
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <DB/IO/ReadBuffer.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <memory>
namespace DB
{
class OwningBufferBlockInputStream : public IProfilingBlockInputStream
{
public:
OwningBufferBlockInputStream(const BlockInputStreamPtr & stream, std::unique_ptr<ReadBuffer> buffer)
: stream{stream}, buffer{std::move(buffer)}
{
children.push_back(stream);
}
private:
Block readImpl() override { return stream->read(); }
String getName() const override { return "OwningBufferBlockInputStream"; }
String getID() const override {
return "OwningBuffer(" + stream->getID() + ")";
}
BlockInputStreamPtr stream;
std::unique_ptr<ReadBuffer> buffer;
};
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <memory>
namespace DB
{
template <typename T> struct release
{
void operator()(const T * const ptr) { ptr->release(); }
};
template <typename T> using config_ptr_t = std::unique_ptr<T, release<T>>;
}

View File

@ -11,6 +11,9 @@
#include <DB/Interpreters/Context.h>
#include <DB/Functions/IFunction.h>
#include <statdaemons/ext/range.hpp>
#include <DB/Dictionaries/FlatDictionary.h>
#include <DB/Dictionaries/HashedDictionary.h>
namespace DB
@ -715,4 +718,760 @@ public:
}
};
class FunctionDictGetString : public IFunction
{
public:
static constexpr auto name = "dictGetString";
static IFunction * create(const Context & context)
{
return new FunctionDictGetString{context.getDictionaries()};
};
FunctionDictGetString(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
private:
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if (arguments.size() != 3)
throw Exception{
"Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 3.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
};
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
if (!typeid_cast<const DataTypeString *>(arguments[1].get()))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
const auto id_arg = arguments[2].get();
if (!typeid_cast<const DataTypeUInt8 *>(id_arg) &&
!typeid_cast<const DataTypeUInt16 *>(id_arg) &&
!typeid_cast<const DataTypeUInt32 *>(id_arg) &&
!typeid_cast<const DataTypeUInt64 *>(id_arg) &&
!typeid_cast<const DataTypeInt8 *>(id_arg) &&
!typeid_cast<const DataTypeInt16 *>(id_arg) &&
!typeid_cast<const DataTypeInt32 *>(id_arg) &&
!typeid_cast<const DataTypeInt64 *>(id_arg))
{
throw Exception{
"Illegal type " + arguments[2]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
return new DataTypeString;
}
void execute(Block & block, const ColumnNumbers & arguments, const size_t result)
{
const auto dict_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[0]).column.get());
if (!dict_name_col)
throw Exception{
"First argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
};
}
template <typename DictionaryType>
bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result,
const IDictionary * const dictionary)
{
const auto dict = typeid_cast<const DictionaryType *>(dictionary);
if (!dict)
return false;
const auto attr_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[1]).column.get());
if (!attr_name_col)
throw Exception{
"Second argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
const auto & attr_name = attr_name_col->getData();
const auto id_col = block.getByPosition(arguments[2]).column.get();
if (!execute<UInt8>(block, result, dict, attr_name, id_col) &&
!execute<UInt16>(block, result, dict, attr_name, id_col) &&
!execute<UInt32>(block, result, dict, attr_name, id_col) &&
!execute<UInt64>(block, result, dict, attr_name, id_col) &&
!execute<Int8>(block, result, dict, attr_name, id_col) &&
!execute<Int16>(block, result, dict, attr_name, id_col) &&
!execute<Int32>(block, result, dict, attr_name, id_col) &&
!execute<Int64>(block, result, dict, attr_name, id_col))
{
throw Exception{
"Third argument of function " + getName() + " must be integral",
ErrorCodes::ILLEGAL_COLUMN
};
}
return true;
}
template <typename T, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * const dictionary,
const std::string & attr_name, const IColumn * const id_col_untyped)
{
if (const auto id_col = typeid_cast<const ColumnVector<T> *>(id_col_untyped))
{
const auto out = new ColumnString;
block.getByPosition(result).column = out;
const auto attribute_idx = dictionary->getAttributeIndex(attr_name);
if (!dictionary->isString(attribute_idx))
throw Exception{
"Type mismatch: attribute " + attr_name + " has type different from String",
ErrorCodes::TYPE_MISMATCH
};
for (const auto & id : id_col->getData())
{
const auto string_ref = dictionary->getStringUnsafe(attribute_idx, id);
out->insertData(string_ref.data, string_ref.size);
}
return true;
}
else if (const auto id_col = typeid_cast<const ColumnConst<T> *>(id_col_untyped))
{
block.getByPosition(result).column = new ColumnConst<String>{
id_col->size(),
dictionary->getString(attr_name, id_col->getData()).toString()
};
return true;
};
return false;
}
const Dictionaries & dictionaries;
};
template <typename DataType> struct DictGetTraits;
#define DECLARE_DICT_GET_TRAITS(TYPE, DATA_TYPE) \
template <> struct DictGetTraits<DATA_TYPE>\
{\
static TYPE get(const IDictionary * const dict, const std::string & name, const IDictionary::id_t id)\
{\
return dict->get##TYPE(name, id);\
}\
static bool is(const IDictionary * const dict, const std::size_t idx) { return dict->is##TYPE(idx); } \
static TYPE get(const IDictionary * const dict, const std::size_t idx, const IDictionary::id_t id)\
{\
return dict->get##TYPE##Unsafe(idx, id);\
}\
};
DECLARE_DICT_GET_TRAITS(UInt8, DataTypeUInt8)
DECLARE_DICT_GET_TRAITS(UInt16, DataTypeUInt16)
DECLARE_DICT_GET_TRAITS(UInt32, DataTypeUInt32)
DECLARE_DICT_GET_TRAITS(UInt64, DataTypeUInt64)
DECLARE_DICT_GET_TRAITS(Int8, DataTypeInt8)
DECLARE_DICT_GET_TRAITS(Int16, DataTypeInt16)
DECLARE_DICT_GET_TRAITS(Int32, DataTypeInt32)
DECLARE_DICT_GET_TRAITS(Int64, DataTypeInt64)
DECLARE_DICT_GET_TRAITS(Float32, DataTypeFloat32)
DECLARE_DICT_GET_TRAITS(Float64, DataTypeFloat64)
#undef DECLARE_DICT_GET_TRAITS
template <typename DataType>
class FunctionDictGet final : public IFunction
{
using Type = typename DataType::FieldType;
public:
static const std::string name;
static IFunction * create(const Context & context)
{
return new FunctionDictGet{context.getDictionaries()};
};
FunctionDictGet(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
private:
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if (arguments.size() != 3)
throw Exception{
"Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 3.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
};
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
if (!typeid_cast<const DataTypeString *>(arguments[1].get()))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
const auto id_arg = arguments[2].get();
if (!typeid_cast<const DataTypeUInt8 *>(id_arg) &&
!typeid_cast<const DataTypeUInt16 *>(id_arg) &&
!typeid_cast<const DataTypeUInt32 *>(id_arg) &&
!typeid_cast<const DataTypeUInt64 *>(id_arg) &&
!typeid_cast<const DataTypeInt8 *>(id_arg) &&
!typeid_cast<const DataTypeInt16 *>(id_arg) &&
!typeid_cast<const DataTypeInt32 *>(id_arg) &&
!typeid_cast<const DataTypeInt64 *>(id_arg))
{
throw Exception{
"Illegal type " + id_arg->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
return new DataType;
}
void execute(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[0]).column.get());
if (!dict_name_col)
throw Exception{
"First argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
};
}
template <typename DictionaryType>
bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result,
const IDictionary * const dictionary)
{
const auto dict = typeid_cast<const DictionaryType *>(dictionary);
if (!dict)
return false;
const auto attr_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[1]).column.get());
if (!attr_name_col)
throw Exception{
"Second argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
const auto & attr_name = attr_name_col->getData();
const auto id_col = block.getByPosition(arguments[2]).column.get();
if (!execute<UInt8>(block, result, dict, attr_name, id_col) &&
!execute<UInt16>(block, result, dict, attr_name, id_col) &&
!execute<UInt32>(block, result, dict, attr_name, id_col) &&
!execute<UInt64>(block, result, dict, attr_name, id_col) &&
!execute<Int8>(block, result, dict, attr_name, id_col) &&
!execute<Int16>(block, result, dict, attr_name, id_col) &&
!execute<Int32>(block, result, dict, attr_name, id_col) &&
!execute<Int64>(block, result, dict, attr_name, id_col))
{
throw Exception{
"Third argument of function " + getName() + " must be integral",
ErrorCodes::ILLEGAL_COLUMN
};
}
return true;
}
template <typename T, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * const dictionary,
const std::string & attr_name, const IColumn * const id_col_untyped)
{
if (const auto id_col = typeid_cast<const ColumnVector<T> *>(id_col_untyped))
{
const auto attribute_idx = dictionary->getAttributeIndex(attr_name);
if (!DictGetTraits<DataType>::is(dictionary, attribute_idx))
throw Exception{
"Type mismatch: attribute " + attr_name + " has type different from UInt64",
ErrorCodes::TYPE_MISMATCH
};
const auto out = new ColumnVector<Type>;
block.getByPosition(result).column = out;
const auto & ids = id_col->getData();
auto & data = out->getData();
const auto size = ids.size();
data.resize(size);
for (const auto idx : ext::range(0, size))
data[idx] = DictGetTraits<DataType>::get(dictionary, attribute_idx, ids[idx]);
return true;
}
else if (const auto id_col = typeid_cast<const ColumnConst<T> *>(id_col_untyped))
{
block.getByPosition(result).column = new ColumnConst<Type>{
id_col->size(),
DictGetTraits<DataType>::get(dictionary, attr_name, id_col->getData())
};
return true;
};
return false;
}
const Dictionaries & dictionaries;
};
template <typename DataType>
const std::string FunctionDictGet<DataType>::name = "dictGet" + TypeName<typename DataType::FieldType>::get();
using FunctionDictGetUInt8 = FunctionDictGet<DataTypeUInt8>;
using FunctionDictGetUInt16 = FunctionDictGet<DataTypeUInt16>;
using FunctionDictGetUInt32 = FunctionDictGet<DataTypeUInt32>;
using FunctionDictGetUInt64 = FunctionDictGet<DataTypeUInt64>;
using FunctionDictGetInt8 = FunctionDictGet<DataTypeInt8>;
using FunctionDictGetInt16 = FunctionDictGet<DataTypeInt16>;
using FunctionDictGetInt32 = FunctionDictGet<DataTypeInt32>;
using FunctionDictGetInt64 = FunctionDictGet<DataTypeInt64>;
using FunctionDictGetFloat32 = FunctionDictGet<DataTypeFloat32>;
using FunctionDictGetFloat64 = FunctionDictGet<DataTypeFloat64>;
class FunctionDictGetHierarchy final : public IFunction
{
public:
static constexpr auto name = "dictGetHierarchy";
static IFunction * create(const Context & context)
{
return new FunctionDictGetHierarchy{context.getDictionaries()};
};
FunctionDictGetHierarchy(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
private:
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if (arguments.size() != 2)
throw Exception{
"Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 2.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
};
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
const auto id_arg = arguments[1].get();
if (!typeid_cast<const DataTypeUInt8 *>(id_arg) &&
!typeid_cast<const DataTypeUInt16 *>(id_arg) &&
!typeid_cast<const DataTypeUInt32 *>(id_arg) &&
!typeid_cast<const DataTypeUInt64 *>(id_arg) &&
!typeid_cast<const DataTypeInt8 *>(id_arg) &&
!typeid_cast<const DataTypeInt16 *>(id_arg) &&
!typeid_cast<const DataTypeInt32 *>(id_arg) &&
!typeid_cast<const DataTypeInt64 *>(id_arg))
{
throw Exception{
"Illegal type " + id_arg->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
return new DataTypeArray{new DataTypeUInt64};
};
void execute(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[0]).column.get());
if (!dict_name_col)
throw Exception{
"First argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!dict->hasHierarchy())
throw Exception{
"Dictionary does not have a hierarchy",
ErrorCodes::UNSUPPORTED_METHOD
};
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
};
}
template <typename DictionaryType>
bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result,
const IDictionary * const dictionary)
{
const auto dict = typeid_cast<const DictionaryType *>(dictionary);
if (!dict)
return false;
const auto id_col = block.getByPosition(arguments[1]).column.get();
if (!execute<UInt8>(block, result, dict, id_col) &&
!execute<UInt16>(block, result, dict, id_col) &&
!execute<UInt32>(block, result, dict, id_col) &&
!execute<UInt64>(block, result, dict, id_col) &&
!execute<Int8>(block, result, dict, id_col) &&
!execute<Int16>(block, result, dict, id_col) &&
!execute<Int32>(block, result, dict, id_col) &&
!execute<Int64>(block, result, dict, id_col))
{
throw Exception{
"Second argument of function " + getName() + " must be integral",
ErrorCodes::ILLEGAL_COLUMN
};
}
return true;
}
template <typename T, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * const dictionary,
const IColumn * const id_col_untyped)
{
if (const auto id_col = typeid_cast<const ColumnVector<T> *>(id_col_untyped))
{
const auto backend = new ColumnVector<UInt64>;
const auto array = new ColumnArray{backend};
block.getByPosition(result).column = array;
const auto & in = id_col->getData();
const auto size = in.size();
auto & out = backend->getData();
auto & offsets = array->getOffsets();
offsets.resize(size);
out.reserve(size * 4);
for (const auto idx : ext::range(0, size))
{
IDictionary::id_t cur = in[idx];
while (cur)
{
out.push_back(cur);
cur = dictionary->toParent(cur);
}
offsets[idx] = out.size();
};
return true;
}
else if (const auto id_col = typeid_cast<const ColumnConst<T> *>(id_col_untyped))
{
Array res;
IDictionary::id_t cur = id_col->getData();
while (cur)
{
res.push_back(static_cast<typename NearestFieldType<T>::Type>(cur));
cur = dictionary->toParent(cur);
}
block.getByPosition(result).column = new ColumnConstArray{
id_col->size(),
res,
new DataTypeArray{new DataTypeUInt64}
};
return true;
};
return false;
}
const Dictionaries & dictionaries;
};
class FunctionDictIsIn final : public IFunction
{
public:
static constexpr auto name = "dictIsIn";
static IFunction * create(const Context & context)
{
return new FunctionDictIsIn{context.getDictionaries()};
};
FunctionDictIsIn(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
private:
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if (arguments.size() != 3)
throw Exception{
"Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 3.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
};
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
const auto child_id_arg = arguments[1].get();
if (!typeid_cast<const DataTypeUInt8 *>(child_id_arg) &&
!typeid_cast<const DataTypeUInt16 *>(child_id_arg) &&
!typeid_cast<const DataTypeUInt32 *>(child_id_arg) &&
!typeid_cast<const DataTypeUInt64 *>(child_id_arg) &&
!typeid_cast<const DataTypeInt8 *>(child_id_arg) &&
!typeid_cast<const DataTypeInt16 *>(child_id_arg) &&
!typeid_cast<const DataTypeInt32 *>(child_id_arg) &&
!typeid_cast<const DataTypeInt64 *>(child_id_arg))
{
throw Exception{
"Illegal type " + child_id_arg->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
const auto ancestor_id_arg = arguments[2].get();
if (!typeid_cast<const DataTypeUInt8 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeUInt16 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeUInt32 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeUInt64 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeInt8 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeInt16 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeInt32 *>(ancestor_id_arg) &&
!typeid_cast<const DataTypeInt64 *>(ancestor_id_arg))
{
throw Exception{
"Illegal type " + ancestor_id_arg->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
return new DataTypeUInt8;
}
void execute(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = typeid_cast<const ColumnConst<String> *>(block.getByPosition(arguments[0]).column.get());
if (!dict_name_col)
throw Exception{
"First argument of function " + getName() + " must be a constant string",
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!dict->hasHierarchy())
throw Exception{
"Dictionary does not have a hierarchy",
ErrorCodes::UNSUPPORTED_METHOD
};
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
};
}
template <typename DictionaryType>
bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result,
const IDictionary * const dictionary)
{
const auto dict = typeid_cast<const DictionaryType *>(dictionary);
if (!dict)
return false;
const auto child_id_col = block.getByPosition(arguments[1]).column.get();
const auto ancestor_id_col = block.getByPosition(arguments[2]).column.get();
if (!execute<UInt8>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<UInt16>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<UInt32>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<UInt64>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<Int8>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<Int16>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<Int32>(block, result, dict, child_id_col, ancestor_id_col) &&
!execute<Int64>(block, result, dict, child_id_col, ancestor_id_col))
{
throw Exception{
"Illegal column " + child_id_col->getName()
+ " of second argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN
};
}
return true;
}
template <typename T, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * const dictionary,
const IColumn * const child_id_col_untyped, const IColumn * const ancestor_id_col_untyped)
{
if (execute<T, ColumnVector<T>>(block, result, dictionary, child_id_col_untyped, ancestor_id_col_untyped) ||
execute<T, ColumnConst<T>>(block, result, dictionary, child_id_col_untyped, ancestor_id_col_untyped))
return true;
return false;
}
template <typename T, typename ColumnType, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * dictionary,
const IColumn * const child_id_col_untyped, const IColumn * const ancestor_id_col_untyped)
{
if (const auto child_id_col = typeid_cast<const ColumnType *>(child_id_col_untyped))
{
if (execute<T, UInt8>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, UInt16>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, UInt32>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, UInt64>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, Int8>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, Int16>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, Int32>(block, result, dictionary, child_id_col, ancestor_id_col_untyped) ||
execute<T, Int64>(block, result, dictionary, child_id_col, ancestor_id_col_untyped))
return true;
else
throw Exception{
"Illegal column " + ancestor_id_col_untyped->getName()
+ " of third argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN
};
}
return false;
}
template <typename T, typename U, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * const dictionary,
const ColumnVector<T> * const child_id_col, const IColumn * const ancestor_id_col_untyped)
{
if (const auto ancestor_id_col = typeid_cast<const ColumnVector<T> *>(ancestor_id_col_untyped))
{
const auto out = new ColumnVector<UInt8>;
block.getByPosition(result).column = out;
const auto & child_ids = child_id_col->getData();
const auto & ancestor_ids = ancestor_id_col->getData();
auto & data = out->getData();
const auto size = child_id_col->size();
data.resize(size);
for (const auto idx : ext::range(0, size))
data[idx] = dictionary->in(child_ids[idx], ancestor_ids[idx]);
return true;
}
else if (const auto ancestor_id_col = typeid_cast<const ColumnConst<T> *>(ancestor_id_col_untyped))
{
const auto out = new ColumnVector<UInt8>;
block.getByPosition(result).column = out;
const auto & child_ids = child_id_col->getData();
const auto ancestor_id = ancestor_id_col->getData();
auto & data = out->getData();
const auto size = child_id_col->size();
data.resize(size);
for (const auto idx : ext::range(0, size))
data[idx] = dictionary->in(child_ids[idx], ancestor_id);
return true;
}
return false;
}
template <typename T, typename U, typename DictionaryType>
bool execute(Block & block, const size_t result, const DictionaryType * const dictionary,
const ColumnConst<T> * const child_id_col, const IColumn * const ancestor_id_col_untyped)
{
if (const auto ancestor_id_col = typeid_cast<const ColumnVector<T> *>(ancestor_id_col_untyped))
{
const auto out = new ColumnVector<UInt8>;
block.getByPosition(result).column = out;
const auto child_id = child_id_col->getData();
const auto & ancestor_ids = ancestor_id_col->getData();
auto & data = out->getData();
const auto size = child_id_col->size();
data.resize(size);
for (const auto idx : ext::range(0, size))
data[idx] = dictionary->in(child_id, ancestor_ids[idx]);
return true;
}
else if (const auto ancestor_id_col = typeid_cast<const ColumnConst<T> *>(ancestor_id_col_untyped))
{
block.getByPosition(result).column = new ColumnConst<UInt8>{
child_id_col->size(),
dictionary->in(child_id_col->getData(), ancestor_id_col->getData())
};
return true;
}
return false;
}
const Dictionaries & dictionaries;
};
}

View File

@ -16,7 +16,6 @@
#include <DB/Storages/IStorage.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/Storages/MergeTree/MergeList.h>
#include <DB/TableFunctions/TableFunctionFactory.h>
@ -86,7 +85,6 @@ struct ContextShared
TableFunctionFactory table_function_factory; /// Табличные функции.
AggregateFunctionFactory aggregate_function_factory; /// Агрегатные функции.
DataTypeFactory data_type_factory; /// Типы данных.
StorageFactory storage_factory; /// Движки таблиц.
FormatFactory format_factory; /// Форматы.
mutable SharedPtr<Dictionaries> dictionaries; /// Словари Метрики. Инициализируются лениво.
Users users; /// Известные пользователи.
@ -259,7 +257,6 @@ public:
const TableFunctionFactory & getTableFunctionFactory() const { return shared->table_function_factory; }
const AggregateFunctionFactory & getAggregateFunctionFactory() const { return shared->aggregate_function_factory; }
const DataTypeFactory & getDataTypeFactory() const { return shared->data_type_factory; }
const StorageFactory & getStorageFactory() const { return shared->storage_factory; }
const FormatFactory & getFormatFactory() const { return shared->format_factory; }
const Dictionaries & getDictionaries() const;

View File

@ -1,6 +1,10 @@
#pragma once
#include <mutex>
#include <thread>
#include <unordered_map>
#include <chrono>
#include <random>
#include <Poco/SharedPtr.h>
@ -16,6 +20,9 @@ namespace DB
using Poco::SharedPtr;
class Context;
class IDictionary;
/// Словари Метрики, которые могут использоваться в функциях.
class Dictionaries
@ -24,15 +31,23 @@ private:
MultiVersion<RegionsHierarchies> regions_hierarchies;
MultiVersion<TechDataHierarchy> tech_data_hierarchy;
MultiVersion<RegionsNames> regions_names;
mutable std::mutex external_dictionaries_mutex;
std::unordered_map<std::string, std::shared_ptr<MultiVersion<IDictionary>>> external_dictionaries;
std::unordered_map<std::string, std::chrono::system_clock::time_point> update_times;
std::mt19937 rnd_engine;
Context & context;
/// Периодичность обновления справочников, в секундах.
int reload_period;
std::thread reloading_thread;
Poco::Event destroy;
std::thread reloading_externals_thread;
Poco::Event destroy{false};
Logger * log;
Poco::Timestamp dictionaries_last_modified{0};
void handleException() const
{
@ -106,6 +121,9 @@ private:
LOG_INFO(log, "Loaded dictionaries.");
}
void reloadExternals();
/// Обновляет каждые reload_period секунд.
void reloadPeriodically()
{
@ -118,20 +136,35 @@ private:
}
}
void reloadExternalsPeriodically()
{
const auto check_period = 5 * 1000;
while (true)
{
if (destroy.tryWait(check_period))
return;
reloadExternals();
}
}
public:
/// Справочники будут обновляться в отдельном потоке, каждые reload_period секунд.
Dictionaries(int reload_period_ = 3600)
: reload_period(reload_period_),
Dictionaries(Context & context, int reload_period_ = 3600)
: context(context), reload_period(reload_period_),
log(&Logger::get("Dictionaries"))
{
reloadImpl();
reloadExternals();
reloading_thread = std::thread([this] { reloadPeriodically(); });
reloading_externals_thread = std::thread{&Dictionaries::reloadExternalsPeriodically, this};
}
~Dictionaries()
{
destroy.set();
reloading_thread.join();
reloading_externals_thread.join();
}
MultiVersion<RegionsHierarchies>::Version getRegionsHierarchies() const
@ -148,6 +181,19 @@ public:
{
return regions_names.get();
}
MultiVersion<IDictionary>::Version getExternalDictionary(const std::string & name) const
{
const std::lock_guard<std::mutex> lock{external_dictionaries_mutex};
const auto it = external_dictionaries.find(name);
if (it == std::end(external_dictionaries))
throw Exception{
"No such dictionary: " + name,
ErrorCodes::BAD_ARGUMENTS
};
return it->second->get();
}
};
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <DB/Storages/IStorage.h>
#include <Yandex/singleton.h>
namespace DB
@ -11,7 +12,7 @@ class Context;
/** Позволяет создать таблицу по имени движка.
*/
class StorageFactory
class StorageFactory : public Singleton<StorageFactory>
{
public:
StoragePtr get(

View File

@ -165,7 +165,7 @@ void Connection::forceConnected()
bool Connection::ping()
{
LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")");
// LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")");
try
{

View File

@ -11,7 +11,6 @@
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/Parsers/ASTExpressionList.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <statdaemons/ext/memory.hpp>
#include <DB/Parsers/formatAST.h>

View File

@ -21,6 +21,19 @@ void registerFunctionsDictionaries(FunctionFactory & factory)
factory.registerFunction<FunctionOSHierarchy>();
factory.registerFunction<FunctionSEHierarchy>();
factory.registerFunction<FunctionRegionToName>();
factory.registerFunction<FunctionDictGetUInt8>();
factory.registerFunction<FunctionDictGetUInt16>();
factory.registerFunction<FunctionDictGetUInt32>();
factory.registerFunction<FunctionDictGetUInt64>();
factory.registerFunction<FunctionDictGetInt8>();
factory.registerFunction<FunctionDictGetInt16>();
factory.registerFunction<FunctionDictGetInt32>();
factory.registerFunction<FunctionDictGetInt64>();
factory.registerFunction<FunctionDictGetFloat32>();
factory.registerFunction<FunctionDictGetFloat64>();
factory.registerFunction<FunctionDictGetString>();
factory.registerFunction<FunctionDictGetHierarchy>();
factory.registerFunction<FunctionDictIsIn>();
}
}

View File

@ -0,0 +1,158 @@
/** Воспроизводит баг в gcc 4.8.2
* Баг: исключение не ловится.
*
* /usr/bin/c++ -std=c++11 -Wall -O3 ./io_and_exceptions.cpp && ./a.out
*
* Выводит:
* terminate called after throwing an instance of 'int'
* Aborted
*
* А должно ничего не выводить.
*
* В gcc 4.9 и clang 3.6 всё Ок.
*/
typedef unsigned long size_t;
class BufferBase
{
public:
typedef char * Position;
struct Buffer
{
Buffer(Position begin_pos_, Position end_pos_) : begin_pos(begin_pos_), end_pos(end_pos_) {}
inline Position begin() const { return begin_pos; }
inline Position end() const { return end_pos; }
inline size_t size() const { return end_pos - begin_pos; }
inline void resize(size_t size) { end_pos = begin_pos + size; }
private:
Position begin_pos;
Position end_pos;
};
BufferBase(Position ptr, size_t size, size_t offset)
: internal_buffer(ptr, ptr + size), working_buffer(ptr, ptr + size), pos(ptr + offset), bytes(0) {}
void set(Position ptr, size_t size, size_t offset)
{
internal_buffer = Buffer(ptr, ptr + size);
working_buffer = Buffer(ptr, ptr + size);
pos = ptr + offset;
}
inline Buffer & buffer() { return working_buffer; }
inline Position & position() { return pos; };
inline size_t offset() const { return pos - working_buffer.begin(); }
size_t count() const
{
return bytes + offset();
}
protected:
Buffer internal_buffer;
Buffer working_buffer;
Position pos;
size_t bytes;
};
class ReadBuffer : public BufferBase
{
public:
ReadBuffer(Position ptr, size_t size) : BufferBase(ptr, size, 0) { working_buffer.resize(0); }
ReadBuffer(Position ptr, size_t size, size_t offset) : BufferBase(ptr, size, offset) {}
inline bool next()
{
bytes += offset();
bool res = nextImpl();
if (!res)
working_buffer.resize(0);
pos = working_buffer.begin();
return res;
}
virtual ~ReadBuffer() {}
inline bool eof()
{
return pos == working_buffer.end() && !next();
}
private:
virtual bool nextImpl() { return false; };
};
class CompressedReadBuffer : public ReadBuffer
{
private:
bool nextImpl()
{
throw 1;
return true;
}
public:
CompressedReadBuffer() : ReadBuffer(nullptr, 0)
{
}
};
void readIntText(unsigned & x, ReadBuffer & buf)
{
x = 0;
while (!buf.eof())
{
switch (*buf.position())
{
case '+':
break;
case '9':
x *= 10;
break;
default:
return;
}
}
}
unsigned parse(const char * data)
{
unsigned res;
ReadBuffer buf(const_cast<char *>(data), 10, 0);
readIntText(res, buf);
return res;
}
int main()
{
CompressedReadBuffer in;
try
{
while (!in.eof())
;
}
catch (...)
{
}
return 0;
}
void f()
{
parse("123");
}

View File

@ -495,7 +495,11 @@ const Dictionaries & Context::getDictionaries() const
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->dictionaries)
shared->dictionaries = new Dictionaries;
{
if (!this->global_context)
throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
shared->dictionaries = new Dictionaries{ *this->global_context };
}
return *shared->dictionaries;
}

View File

@ -0,0 +1,151 @@
#include <DB/Interpreters/Dictionaries.h>
#include <DB/Dictionaries/DictionaryFactory.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/config_ptr_t.h>
#include <statdaemons/ext/scope_guard.hpp>
namespace DB
{
namespace
{
std::string getDictionariesConfigPath(const Poco::Util::AbstractConfiguration & config)
{
const auto path = config.getString("dictionaries_config", "");
if (path.empty())
return path;
if (path[0] != '/')
{
const auto app_config_path = config.getString("config-file", "config.xml");
const auto config_dir = Poco::Path{app_config_path}.parent().toString();
const auto absolute_path = config_dir + path;
if (Poco::File{absolute_path}.exists())
return absolute_path;
}
return path;
}
}
void Dictionaries::reloadExternals()
{
const auto config_path = getDictionariesConfigPath(Poco::Util::Application::instance().config());
const Poco::File config_file{config_path};
if (config_path.empty() || !config_file.exists())
{
LOG_WARNING(log, "config file '" + config_path + "' does not exist");
}
else
{
const auto last_modified = config_file.getLastModified();
if (last_modified > dictionaries_last_modified)
{
/// definitions of dictionaries may have changed, recreate all of them
dictionaries_last_modified = last_modified;
const config_ptr_t<Poco::Util::XMLConfiguration> config{new Poco::Util::XMLConfiguration{config_path}};
/// get all dictionaries' definitions
Poco::Util::AbstractConfiguration::Keys keys;
config->keys(keys);
/// for each dictionary defined in xml config
for (const auto & key : keys)
{
try
{
if (0 != strncmp(key.data(), "dictionary", strlen("dictionary")))
{
LOG_WARNING(log, "unknown node in dictionaries file: '" + key + "', 'dictionary'");
continue;
}
const auto & prefix = key + '.';
const auto & name = config->getString(prefix + "name");
if (name.empty())
{
LOG_WARNING(log, "dictionary name cannot be empty");
continue;
}
auto dict_ptr = DictionaryFactory::instance().create(name, *config, prefix, context);
if (!dict_ptr->isCached())
{
const auto & lifetime = dict_ptr->getLifetime();
if (lifetime.min_sec != 0 && lifetime.max_sec != 0)
{
std::uniform_int_distribution<std::uint64_t> distribution{
lifetime.min_sec,
lifetime.max_sec
};
update_times[name] = std::chrono::system_clock::now() +
std::chrono::seconds{distribution(rnd_engine)};
}
}
auto it = external_dictionaries.find(name);
/// add new dictionary or update an existing version
if (it == std::end(external_dictionaries))
{
const std::lock_guard<std::mutex> lock{external_dictionaries_mutex};
external_dictionaries.emplace(name, std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release()));
}
else
it->second->set(dict_ptr.release());
}
catch (...)
{
handleException();
}
}
}
}
/// periodic update
for (auto & dictionary : external_dictionaries)
{
try
{
auto current = dictionary.second->get();
const auto & lifetime = current->getLifetime();
/// do not update dictionaries with zero as lifetime
if (lifetime.min_sec == 0 || lifetime.max_sec == 0)
continue;
/// update only non-cached dictionaries
if (!current->isCached())
{
auto & update_time = update_times[current->getName()];
/// check that timeout has passed
if (std::chrono::system_clock::now() < update_time)
continue;
scope_exit({
/// calculate next update time
std::uniform_int_distribution<std::uint64_t> distribution{lifetime.min_sec, lifetime.max_sec};
update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
});
/// check source modified
if (current->getSource()->isModified())
{
/// create new version of dictionary
auto new_version = current->clone();
dictionary.second->set(new_version.release());
}
}
}
catch (...)
{
handleException();
}
}
}
}

View File

@ -0,0 +1,50 @@
#include <DB/Dictionaries/DictionaryFactory.h>
#include <DB/Dictionaries/DictionarySourceFactory.h>
#include <DB/Dictionaries/FlatDictionary.h>
#include <DB/Dictionaries/HashedDictionary.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <Yandex/singleton.h>
#include <statdaemons/ext/memory.hpp>
namespace DB
{
DictionaryPtr DictionaryFactory::create(const std::string & name, Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, Context & context) const
{
auto dict_struct = DictionaryStructure::fromConfig(config, config_prefix + "structure");
auto source_ptr = DictionarySourceFactory::instance().create(
config, config_prefix + "source.", dict_struct, context);
const auto dict_lifetime = DictionaryLifetime::fromConfig(config, config_prefix + "lifetime");
const auto & layout_prefix = config_prefix + "layout.";
if (config.has(layout_prefix + "flat"))
{
return ext::make_unique<FlatDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime);
}
else if (config.has(layout_prefix + "hashed"))
{
return ext::make_unique<HashedDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime);
}
else if (config.has(layout_prefix + "cache"))
{
const auto size = config.getInt(layout_prefix + "cache.size", 0);
if (size == 0)
throw Exception{
"Dictionary of type 'cache' cannot have size of 0 bytes",
ErrorCodes::TOO_SMALL_BUFFER_SIZE
};
throw Exception{
"Dictionary of type 'cache' is not yet implemented",
ErrorCodes::NOT_IMPLEMENTED
};
}
throw Exception{"No dictionary type specified", ErrorCodes::BAD_ARGUMENTS};
};
}

View File

@ -10,9 +10,6 @@
#include <DB/IO/copyData.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Parsers/formatAST.h>
#include <DB/Storages/StorageMerge.h>
#include <DB/Storages/StorageMergeTree.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <Poco/FileStream.h>

View File

@ -13,6 +13,7 @@
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/Parsers/ASTColumnDeclaration.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/Storages/StorageLog.h>
#include <DB/Storages/StorageSystemNumbers.h>
@ -194,7 +195,7 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists)
else
throw Exception("Incorrect CREATE query: required ENGINE.", ErrorCodes::ENGINE_REQUIRED);
res = context.getStorageFactory().get(
res = StorageFactory::instance().get(
storage_name, data_path, table_name, database_name, context,
context.getGlobalContext(), query_ptr, columns,
materialized_columns, alias_columns, column_defaults, create.attach);

View File

@ -238,7 +238,7 @@ UsersConfigReloader::~UsersConfigReloader()
quit = true;
thread.join();
}
catch(...)
catch (...)
{
tryLogCurrentException("~UsersConfigReloader");
}

View File

@ -49,7 +49,7 @@ public:
std::unique_ptr<OLAP::QueryConverter> olap_converter;
protected:
void initialize(Application& self)
void initialize(Application & self)
{
Daemon::initialize(self);
logger().information("starting up");
@ -61,7 +61,7 @@ protected:
Daemon::uninitialize();
}
int main(const std::vector<std::string>& args);
int main(const std::vector<std::string> & args);
};
}

View File

@ -38,7 +38,7 @@ namespace mysqlxx
*
* TODO: Упростить, используя PoolBase.
*/
class Pool
class Pool final
{
protected:
/** Информация о соединении. */
@ -191,20 +191,27 @@ public:
};
Pool(const std::string & config_name,
unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
const char * parent_config_name_ = nullptr)
: Pool{
Poco::Util::Application::instance().config(), config_name,
default_connections_, max_connections_, parent_config_name_
}
{}
/**
* @param config_name Имя параметра в конфигурационном файле
* @param default_connections_ Количество подключений по-умолчанию
* @param max_connections_ Максимальное количество подключений
*/
Pool(const std::string & config_name,
Pool(const Poco::Util::AbstractConfiguration & cfg, const std::string & config_name,
unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
const char * parent_config_name_ = nullptr)
: default_connections(default_connections_), max_connections(max_connections_),
initialized(false), was_successful(false)
: default_connections(default_connections_), max_connections(max_connections_)
{
Poco::Util::LayeredConfiguration & cfg = Poco::Util::Application::instance().config();
server = cfg.getString(config_name + ".host");
if (parent_config_name_)
@ -255,8 +262,19 @@ public:
unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS)
: default_connections(default_connections_), max_connections(max_connections_),
initialized(false), db(db_), server(server_), user(user_), password(password_), port(port_),
connect_timeout(connect_timeout_), rw_timeout(rw_timeout_), was_successful(false) {}
db(db_), server(server_), user(user_), password(password_), port(port_),
connect_timeout(connect_timeout_), rw_timeout(rw_timeout_) {}
Pool(const Pool & other)
: default_connections{other.default_connections},
max_connections{other.max_connections},
db{other.db}, server{other.server},
user{other.user}, password{other.password},
port{other.port}, connect_timeout{other.connect_timeout},
rw_timeout{other.rw_timeout}
{}
Pool & operator=(const Pool &) = delete;
~Pool()
{
@ -340,7 +358,7 @@ protected:
private:
/** Признак того, что мы инициализированы. */
bool initialized;
bool initialized{false};
/** Список соединений. */
typedef std::list<Connection *> Connections;
/** Список соединений. */
@ -360,10 +378,10 @@ private:
unsigned rw_timeout;
/** Хотя бы один раз было успешное соединение. */
bool was_successful;
bool was_successful{false};
/** Выполняет инициализацию класса, если мы еще не инициализированы. */
inline void initialize()
void initialize()
{
if (!initialized)
{

View File

@ -61,7 +61,7 @@ namespace mysqlxx
* </replica>
* </mysql_metrica>
*/
class PoolWithFailover
class PoolWithFailover final
{
private:
typedef Poco::SharedPtr<Pool> PoolPtr;
@ -100,7 +100,17 @@ namespace mysqlxx
PoolWithFailover(const std::string & config_name,
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name,
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(const PoolWithFailover & other);
PoolWithFailover & operator=(const PoolWithFailover &) = delete;
/** Выделяет соединение для работы. */
Entry Get();

View File

@ -2,13 +2,11 @@
using namespace mysqlxx;
PoolWithFailover::PoolWithFailover(const std::string & config_name, unsigned default_connections,
unsigned max_connections, size_t max_tries_)
: max_tries(max_tries_)
PoolWithFailover::PoolWithFailover(const Poco::Util::AbstractConfiguration & cfg,
const std::string & config_name, const unsigned default_connections,
const unsigned max_connections, const size_t max_tries)
: max_tries(max_tries)
{
Poco::Util::Application & app = Poco::Util::Application::instance();
Poco::Util::AbstractConfiguration & cfg = app.config();
if (cfg.has(config_name + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys replica_keys;
@ -20,7 +18,7 @@ PoolWithFailover::PoolWithFailover(const std::string & config_name, unsigned def
if (it->size() < std::string("replica").size() || it->substr(0, std::string("replica").size()) != "replica")
throw Poco::Exception("Unknown element in config: " + *it + ", expected replica");
std::string replica_name = config_name + "." + *it;
Replica replica(new Pool(replica_name, default_connections, max_connections, config_name.c_str()),
Replica replica(new Pool(cfg, replica_name, default_connections, max_connections, config_name.c_str()),
cfg.getInt(replica_name + ".priority", 0));
replicas_by_priority[replica.priority].push_back(replica);
}
@ -28,7 +26,28 @@ PoolWithFailover::PoolWithFailover(const std::string & config_name, unsigned def
}
else
{
replicas_by_priority[0].push_back(Replica(new Pool(config_name, default_connections, max_connections), 0));
replicas_by_priority[0].push_back(Replica(new Pool(cfg, config_name, default_connections, max_connections), 0));
}
}
PoolWithFailover::PoolWithFailover(const std::string & config_name, const unsigned default_connections,
const unsigned max_connections, const size_t max_tries)
: PoolWithFailover{
Poco::Util::Application::instance().config(), config_name,
default_connections, max_connections, max_tries
}
{}
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
: max_tries{other.max_tries}
{
for (const auto & replica_with_priority : other.replicas_by_priority)
{
Replicas replicas;
replicas.reserve(replica_with_priority.second.size());
for (const auto & replica : replica_with_priority.second)
replicas.emplace_back(new Pool{*replica.pool}, replica.priority);
replicas_by_priority.emplace(replica_with_priority.first, std::move(replicas));
}
}