diff --git a/dbms/include/DB/Common/PODArray.h b/dbms/include/DB/Common/PODArray.h index 7ab10342e8c..43b93568a97 100644 --- a/dbms/include/DB/Common/PODArray.h +++ b/dbms/include/DB/Common/PODArray.h @@ -232,6 +232,17 @@ public: c_end = c_start + byte_size(n); } + void resize_fill(size_t n, const T & value) + { + size_t old_size = size(); + if (n > old_size) + { + reserve(n); + std::fill(t_end(), reinterpret_cast(c_end + n - old_size), value); + } + c_end = c_start + byte_size(n); + } + void clear() { c_end = c_start; diff --git a/dbms/include/DB/Core/StringRef.h b/dbms/include/DB/Core/StringRef.h index 8ab432210a9..e7ca944e33a 100644 --- a/dbms/include/DB/Core/StringRef.h +++ b/dbms/include/DB/Core/StringRef.h @@ -21,7 +21,7 @@ struct StringRef StringRef(const char * data_, size_t size_) : data(data_), size(size_) {} StringRef(const unsigned char * data_, size_t size_) : data(reinterpret_cast(data_)), size(size_) {} StringRef(const std::string & s) : data(s.data()), size(s.size()) {} - StringRef() {} + StringRef() = default; std::string toString() const { return std::string(data, size); } }; diff --git a/dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h b/dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h new file mode 100644 index 00000000000..badd32057a1 --- /dev/null +++ b/dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h @@ -0,0 +1,127 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +const auto max_connections = 1; + +class ClickhouseDictionarySource final : public IDictionarySource +{ + static const auto max_block_size = 8192; + +public: + ClickhouseDictionarySource(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, + Block & sample_block, Context & context) + : host{config.getString(config_prefix + "host")}, + port(config.getInt(config_prefix + "port")), + user{config.getString(config_prefix + "user", "")}, + password{config.getString(config_prefix + "password", "")}, + db{config.getString(config_prefix + "db", "")}, + table{config.getString(config_prefix + "table")}, + sample_block{sample_block}, context(context), + is_local{isLocal(host, port)}, + pool{is_local ? nullptr : ext::make_unique( + max_connections, host, port, db, user, password, context.getDataTypeFactory(), + "ClickhouseDictionarySource") + }, + load_all_query{composeLoadAllQuery(sample_block, table)} + {} + + ClickhouseDictionarySource(const ClickhouseDictionarySource & other) + : host{other.host}, port{other.port}, user{other.user}, password{other.password}, + db{other.db}, table{other.db}, + sample_block{other.sample_block}, context(other.context), + is_local{other.is_local}, + pool{is_local ? nullptr : ext::make_unique( + max_connections, host, port, db, user, password, context.getDataTypeFactory(), + "ClickhouseDictionarySource")}, + load_all_query{other.load_all_query} + {} + + BlockInputStreamPtr loadAll() override + { + if (is_local) + return executeQuery(load_all_query, context).in; + return new RemoteBlockInputStream{pool.get(), load_all_query, nullptr}; + } + + BlockInputStreamPtr loadId(const std::uint64_t id) override + { + throw Exception{ + "Method unsupported", + ErrorCodes::NOT_IMPLEMENTED + }; + } + + BlockInputStreamPtr loadIds(const std::vector ids) override + { + throw Exception{ + "Method unsupported", + ErrorCodes::NOT_IMPLEMENTED + }; + } + + /// @todo check update time somehow + bool isModified() const override { return true; } + + DictionarySourcePtr clone() const override { return ext::make_unique(*this); } + +private: + static std::string composeLoadAllQuery(const Block & block, const std::string & table) + { + std::string query{"SELECT "}; + + auto first = true; + for (const auto idx : ext::range(0, block.columns())) + { + if (!first) + query += ", "; + + query += block.getByPosition(idx).name; + first = false; + } + + query += " FROM " + table + ';'; + + return query; + } + + static bool isLocal(const std::string & host, const UInt16 port) + { + const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); + static auto interfaces = Poco::Net::NetworkInterface::list(); + + if (clickhouse_port == port) + { + return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(), + [&] (const Poco::Net::NetworkInterface & interface) { + return interface.address() == Poco::Net::IPAddress(host); + }); + } + + return false; + } + + const std::string host; + const UInt16 port; + const std::string user; + const std::string password; + const std::string db; + const std::string table; + Block sample_block; + Context & context; + const bool is_local; + std::unique_ptr pool; + const std::string load_all_query; +}; + +} diff --git a/dbms/include/DB/Dictionaries/DictionaryFactory.h b/dbms/include/DB/Dictionaries/DictionaryFactory.h new file mode 100644 index 00000000000..a46d9a41e8b --- /dev/null +++ b/dbms/include/DB/Dictionaries/DictionaryFactory.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + +namespace DB +{ + +class Context; + +class DictionaryFactory : public Singleton +{ +public: + DictionaryPtr create(const std::string & name, Poco::Util::AbstractConfiguration & config, + const std::string & config_prefix, Context & context) const; +}; + +} diff --git a/dbms/include/DB/Dictionaries/DictionarySourceFactory.h b/dbms/include/DB/Dictionaries/DictionarySourceFactory.h new file mode 100644 index 00000000000..b6678d11b57 --- /dev/null +++ b/dbms/include/DB/Dictionaries/DictionarySourceFactory.h @@ -0,0 +1,71 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace +{ + +Block createSampleBlock(const DictionaryStructure & dict_struct, const Context & context) +{ + Block block{ + ColumnWithNameAndType{ + new ColumnUInt64, + new DataTypeUInt64, + dict_struct.id_name + } + }; + + for (const auto & attribute : dict_struct.attributes) + { + const auto & type = context.getDataTypeFactory().get(attribute.type); + block.insert(ColumnWithNameAndType{ + type->createColumn(), type, attribute.name + }); + } + + return block; +} + +} + +class DictionarySourceFactory : public Singleton +{ +public: + DictionarySourcePtr create(Poco::Util::AbstractConfiguration & config, + const std::string & config_prefix, + const DictionaryStructure & dict_struct, + Context & context) const + { + auto sample_block = createSampleBlock(dict_struct, context); + + if (config.has(config_prefix + "file")) + { + const auto & filename = config.getString(config_prefix + "file.path"); + const auto & format = config.getString(config_prefix + "file.format"); + return ext::make_unique(filename, format, sample_block, context); + } + else if (config.has(config_prefix + "mysql")) + { + return ext::make_unique(config, config_prefix + "mysql.", sample_block, context); + } + else if (config.has(config_prefix + "clickhouse")) + { + return nullptr;//ext::make_unique(config, config_prefix + "clickhouse.", + //sample_block, context); + } + + throw Exception{"unsupported source type"}; + } +}; + +} diff --git a/dbms/include/DB/Dictionaries/DictionaryStructure.h b/dbms/include/DB/Dictionaries/DictionaryStructure.h new file mode 100644 index 00000000000..cd552784551 --- /dev/null +++ b/dbms/include/DB/Dictionaries/DictionaryStructure.h @@ -0,0 +1,157 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ + +enum class attribute_type +{ + uint8, + uint16, + uint32, + uint64, + int8, + int16, + int32, + int64, + float32, + float64, + string +}; + +inline attribute_type getAttributeTypeByName(const std::string & type) +{ + static const std::unordered_map dictionary{ + { "UInt8", attribute_type::uint8 }, + { "UInt16", attribute_type::uint16 }, + { "UInt32", attribute_type::uint32 }, + { "UInt64", attribute_type::uint64 }, + { "Int8", attribute_type::int8 }, + { "Int16", attribute_type::int16 }, + { "Int32", attribute_type::int32 }, + { "Int64", attribute_type::int64 }, + { "Float32", attribute_type::float32 }, + { "Float64", attribute_type::float64 }, + { "String", attribute_type::string }, + }; + + const auto it = dictionary.find(type); + if (it != std::end(dictionary)) + return it->second; + + throw Exception{ + "Unknown type " + type, + ErrorCodes::UNKNOWN_TYPE + }; +} + +inline std::string toString(const attribute_type type) +{ + switch (type) + { + case attribute_type::uint8: return "UInt8"; + case attribute_type::uint16: return "UInt16"; + case attribute_type::uint32: return "UInt32"; + case attribute_type::uint64: return "UInt64"; + case attribute_type::int8: return "Int8"; + case attribute_type::int16: return "Int16"; + case attribute_type::int32: return "Int32"; + case attribute_type::int64: return "Int64"; + case attribute_type::float32: return "Float32"; + case attribute_type::float64: return "Float64"; + case attribute_type::string: return "String"; + } + + throw Exception{ + "Unknown attribute_type " + toString(type), + ErrorCodes::ARGUMENT_OUT_OF_BOUND + }; +} + +struct DictionaryLifetime +{ + std::uint64_t min_sec; + std::uint64_t max_sec; + + static DictionaryLifetime fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) + { + const auto & lifetime_min_key = config_prefix + ".min"; + const auto has_min = config.has(lifetime_min_key); + const std::uint64_t min_update_time = has_min ? config.getInt(lifetime_min_key) : config.getInt(config_prefix); + const std::uint64_t max_update_time = has_min ? config.getInt(config_prefix + ".max") : min_update_time; + return { min_update_time, max_update_time }; + } +}; + +struct DictionaryAttribute +{ + std::string name; + std::string type; + std::string null_value; + bool hierarchical; + bool injective; +}; + +struct DictionaryStructure +{ + std::string id_name; + std::vector attributes; + + static DictionaryStructure fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) + { + const auto & id_name = config.getString(config_prefix + ".id.name"); + if (id_name.empty()) + throw Exception{ + "No 'id' specified for dictionary", + ErrorCodes::BAD_ARGUMENTS + }; + + DictionaryStructure result{id_name}; + + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_prefix, keys); + auto has_hierarchy = false; + for (const auto & key : keys) + { + if (0 != strncmp(key.data(), "attribute", strlen("attribute"))) + continue; + + const auto & prefix = config_prefix + '.' + key + '.'; + const auto & name = config.getString(prefix + "name"); + const auto & type = config.getString(prefix + "type"); + const auto & null_value = config.getString(prefix + "null_value"); + const auto hierarchical = config.getBool(prefix + "hierarchical", false); + const auto injective = config.getBool(prefix + "injective", false); + if (name.empty() || type.empty()) + throw Exception{ + "Properties 'name' and 'type' of an attribute cannot be empty", + ErrorCodes::BAD_ARGUMENTS + }; + + if (has_hierarchy && hierarchical) + throw Exception{ + "Only one hierarchical attribute supported", + ErrorCodes::BAD_ARGUMENTS + }; + + has_hierarchy = has_hierarchy || hierarchical; + + result.attributes.emplace_back(DictionaryAttribute{name, type, null_value, hierarchical, injective}); + } + + if (result.attributes.empty()) + throw Exception{ + "Dictionary has no attributes defined", + ErrorCodes::BAD_ARGUMENTS + }; + + return result; + } +}; + +} diff --git a/dbms/include/DB/Dictionaries/FileDictionarySource.h b/dbms/include/DB/Dictionaries/FileDictionarySource.h new file mode 100644 index 00000000000..ac90f103727 --- /dev/null +++ b/dbms/include/DB/Dictionaries/FileDictionarySource.h @@ -0,0 +1,70 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class FileDictionarySource final : public IDictionarySource +{ + static const auto max_block_size = 8192; + +public: + FileDictionarySource(const std::string & filename, const std::string & format, Block & sample_block, + Context & context) + : filename{filename}, format{format}, sample_block{sample_block}, context(context), + last_modification{getLastModification()} + {} + + FileDictionarySource(const FileDictionarySource & other) + : filename{other.filename}, format{other.format}, + sample_block{other.sample_block}, context(other.context), + last_modification{other.last_modification} + {} + + BlockInputStreamPtr loadAll() override + { + auto in_ptr = ext::make_unique(filename); + auto stream = context.getFormatFactory().getInput( + format, *in_ptr, sample_block, max_block_size, context.getDataTypeFactory()); + last_modification = getLastModification(); + + return new OwningBufferBlockInputStream{stream, std::move(in_ptr)}; + } + + BlockInputStreamPtr loadId(const std::uint64_t id) override + { + throw Exception{ + "Method unsupported", + ErrorCodes::NOT_IMPLEMENTED + }; + } + + BlockInputStreamPtr loadIds(const std::vector ids) override + { + throw Exception{ + "Method unsupported", + ErrorCodes::NOT_IMPLEMENTED + }; + } + + bool isModified() const override { return getLastModification() > last_modification; } + + DictionarySourcePtr clone() const override { return ext::make_unique(*this); } + +private: + Poco::Timestamp getLastModification() const { return Poco::File{filename}.getLastModified(); } + + const std::string filename; + const std::string format; + Block sample_block; + Context & context; + Poco::Timestamp last_modification; +}; + +} diff --git a/dbms/include/DB/Dictionaries/FlatDictionary.h b/dbms/include/DB/Dictionaries/FlatDictionary.h new file mode 100644 index 00000000000..c0604ccd701 --- /dev/null +++ b/dbms/include/DB/Dictionaries/FlatDictionary.h @@ -0,0 +1,383 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +const auto initial_array_size = 1024; +const auto max_array_size = 500000; + +class FlatDictionary final : public IDictionary +{ +public: + FlatDictionary(const std::string & name, const DictionaryStructure & dict_struct, + DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime) + : name{name}, dict_struct(dict_struct), + source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime) + { + createAttributes(); + loadData(); + } + + FlatDictionary(const FlatDictionary & other) + : FlatDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime} + {} + + std::string getName() const override { return name; } + + std::string getTypeName() const override { return "FlatDictionary"; } + + bool isCached() const override { return false; } + + DictionaryPtr clone() const override { return ext::make_unique(*this); } + + const IDictionarySource * const getSource() const override { return source_ptr.get(); } + + const DictionaryLifetime & getLifetime() const override { return dict_lifetime; } + + bool hasHierarchy() const override { return hierarchical_attribute; } + + id_t toParent(const id_t id) const override + { + const auto attr = hierarchical_attribute; + + switch (hierarchical_attribute->type) + { + case attribute_type::uint8: return id < attr->uint8_array->size() ? (*attr->uint8_array)[id] : attr->uint8_null_value; + case attribute_type::uint16: return id < attr->uint16_array->size() ? (*attr->uint16_array)[id] : attr->uint16_null_value; + case attribute_type::uint32: return id < attr->uint32_array->size() ? (*attr->uint32_array)[id] : attr->uint32_null_value; + case attribute_type::uint64: return id < attr->uint64_array->size() ? (*attr->uint64_array)[id] : attr->uint64_null_value; + case attribute_type::int8: return id < attr->int8_array->size() ? (*attr->int8_array)[id] : attr->int8_null_value; + case attribute_type::int16: return id < attr->int16_array->size() ? (*attr->int16_array)[id] : attr->int16_null_value; + case attribute_type::int32: return id < attr->int32_array->size() ? (*attr->int32_array)[id] : attr->int32_null_value; + case attribute_type::int64: return id < attr->int64_array->size() ? (*attr->int64_array)[id] : attr->int64_null_value; + case attribute_type::float32: + case attribute_type::float64: + case attribute_type::string: + break; + } + + throw Exception{ + "Hierarchical attribute has non-integer type " + toString(hierarchical_attribute->type), + ErrorCodes::TYPE_MISMATCH + }; + } + +#define DECLARE_SAFE_GETTER(TYPE, NAME, LC_TYPE) \ + TYPE get##NAME(const std::string & attribute_name, const id_t id) const override\ + {\ + const auto idx = getAttributeIndex(attribute_name);\ + const auto & attribute = attributes[idx];\ + if (attribute.type != attribute_type::LC_TYPE)\ + throw Exception{\ + "Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\ + ErrorCodes::TYPE_MISMATCH\ + };\ + if (id < attribute.LC_TYPE##_array->size())\ + return (*attribute.LC_TYPE##_array)[id];\ + return attribute.LC_TYPE##_null_value;\ + } + DECLARE_SAFE_GETTER(UInt8, UInt8, uint8) + DECLARE_SAFE_GETTER(UInt16, UInt16, uint16) + DECLARE_SAFE_GETTER(UInt32, UInt32, uint32) + DECLARE_SAFE_GETTER(UInt64, UInt64, uint64) + DECLARE_SAFE_GETTER(Int8, Int8, int8) + DECLARE_SAFE_GETTER(Int16, Int16, int16) + DECLARE_SAFE_GETTER(Int32, Int32, int32) + DECLARE_SAFE_GETTER(Int64, Int64, int64) + DECLARE_SAFE_GETTER(Float32, Float32, float32) + DECLARE_SAFE_GETTER(Float64, Float64, float64) + DECLARE_SAFE_GETTER(StringRef, String, string) +#undef DECLARE_SAFE_GETTER + + std::size_t getAttributeIndex(const std::string & attribute_name) const override + { + const auto it = attribute_index_by_name.find(attribute_name); + if (it == std::end(attribute_index_by_name)) + throw Exception{ + "No such attribute '" + attribute_name + "'", + ErrorCodes::BAD_ARGUMENTS + }; + + return it->second; + } + +#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\ + bool is##NAME(const std::size_t attribute_idx) const override\ + {\ + return attributes[attribute_idx].type == attribute_type::LC_NAME;\ + } + DECLARE_TYPE_CHECKER(UInt8, uint8) + DECLARE_TYPE_CHECKER(UInt16, uint16) + DECLARE_TYPE_CHECKER(UInt32, uint32) + DECLARE_TYPE_CHECKER(UInt64, uint64) + DECLARE_TYPE_CHECKER(Int8, int8) + DECLARE_TYPE_CHECKER(Int16, int16) + DECLARE_TYPE_CHECKER(Int32, int32) + DECLARE_TYPE_CHECKER(Int64, int64) + DECLARE_TYPE_CHECKER(Float32, float32) + DECLARE_TYPE_CHECKER(Float64, float64) + DECLARE_TYPE_CHECKER(String, string) +#undef DECLARE_TYPE_CHECKER + +#define DECLARE_UNSAFE_GETTER(TYPE, NAME, LC_NAME)\ + TYPE get##NAME##Unsafe(const std::size_t attribute_idx, const id_t id) const override\ + {\ + const auto & attribute = attributes[attribute_idx];\ + if (id < attribute.LC_NAME##_array->size())\ + return (*attribute.LC_NAME##_array)[id];\ + return attribute.LC_NAME##_null_value;\ + } + DECLARE_UNSAFE_GETTER(UInt8, UInt8, uint8) + DECLARE_UNSAFE_GETTER(UInt16, UInt16, uint16) + DECLARE_UNSAFE_GETTER(UInt32, UInt32, uint32) + DECLARE_UNSAFE_GETTER(UInt64, UInt64, uint64) + DECLARE_UNSAFE_GETTER(Int8, Int8, int8) + DECLARE_UNSAFE_GETTER(Int16, Int16, int16) + DECLARE_UNSAFE_GETTER(Int32, Int32, int32) + DECLARE_UNSAFE_GETTER(Int64, Int64, int64) + DECLARE_UNSAFE_GETTER(Float32, Float32, float32) + DECLARE_UNSAFE_GETTER(Float64, Float64, float64) + DECLARE_UNSAFE_GETTER(StringRef, String, string) +#undef DECLARE_UNSAFE_GETTER + +private: + struct attribute_t + { + attribute_type type; + UInt8 uint8_null_value; + UInt16 uint16_null_value; + UInt32 uint32_null_value; + UInt64 uint64_null_value; + Int8 int8_null_value; + Int16 int16_null_value; + Int32 int32_null_value; + Int64 int64_null_value; + Float32 float32_null_value; + Float64 float64_null_value; + String string_null_value; + std::unique_ptr> uint8_array; + std::unique_ptr> uint16_array; + std::unique_ptr> uint32_array; + std::unique_ptr> uint64_array; + std::unique_ptr> int8_array; + std::unique_ptr> int16_array; + std::unique_ptr> int32_array; + std::unique_ptr> int64_array; + std::unique_ptr> float32_array; + std::unique_ptr> float64_array; + std::unique_ptr string_arena; + std::unique_ptr> string_array; + }; + + void createAttributes() + { + const auto size = dict_struct.attributes.size(); + attributes.reserve(size); + for (const auto & attribute : dict_struct.attributes) + { + attribute_index_by_name.emplace(attribute.name, attributes.size()); + attributes.push_back(std::move(createAttributeWithType(getAttributeTypeByName(attribute.type), + attribute.null_value))); + + if (attribute.hierarchical) + hierarchical_attribute = &attributes.back(); + } + } + + void loadData() + { + auto stream = source_ptr->loadAll(); + + while (const auto block = stream->read()) + { + const auto & id_column = *block.getByPosition(0).column; + + for (const auto attribute_idx : ext::range(0, attributes.size())) + { + const auto & attribute_column = *block.getByPosition(attribute_idx + 1).column; + auto & attribute = attributes[attribute_idx]; + + for (const auto row_idx : ext::range(0, id_column.size())) + setAttributeValue(attribute, id_column[row_idx].get(), attribute_column[row_idx]); + } + } + } + + attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value) + { + attribute_t attr{type}; + + switch (type) + { + case attribute_type::uint8: + attr.uint8_null_value = DB::parse(null_value); + attr.uint8_array.reset(new PODArray); + attr.uint8_array->resize_fill(initial_array_size, attr.uint8_null_value); + break; + case attribute_type::uint16: + attr.uint16_null_value = DB::parse(null_value); + attr.uint16_array.reset(new PODArray); + attr.uint16_array->resize_fill(initial_array_size, attr.uint16_null_value); + break; + case attribute_type::uint32: + attr.uint32_null_value = DB::parse(null_value); + attr.uint32_array.reset(new PODArray); + attr.uint32_array->resize_fill(initial_array_size, attr.uint32_null_value); + break; + case attribute_type::uint64: + attr.uint64_null_value = DB::parse(null_value); + attr.uint64_array.reset(new PODArray); + attr.uint64_array->resize_fill(initial_array_size, attr.uint64_null_value); + break; + case attribute_type::int8: + attr.int8_null_value = DB::parse(null_value); + attr.int8_array.reset(new PODArray); + attr.int8_array->resize_fill(initial_array_size, attr.int8_null_value); + break; + case attribute_type::int16: + attr.int16_null_value = DB::parse(null_value); + attr.int16_array.reset(new PODArray); + attr.int16_array->resize_fill(initial_array_size, attr.int16_null_value); + break; + case attribute_type::int32: + attr.int32_null_value = DB::parse(null_value); + attr.int32_array.reset(new PODArray); + attr.int32_array->resize_fill(initial_array_size, attr.int32_null_value); + break; + case attribute_type::int64: + attr.int64_null_value = DB::parse(null_value); + attr.int64_array.reset(new PODArray); + attr.int64_array->resize_fill(initial_array_size, attr.int64_null_value); + break; + case attribute_type::float32: + attr.float32_null_value = DB::parse(null_value); + attr.float32_array.reset(new PODArray); + attr.float32_array->resize_fill(initial_array_size, attr.float32_null_value); + break; + case attribute_type::float64: + attr.float64_null_value = DB::parse(null_value); + attr.float64_array.reset(new PODArray); + attr.float64_array->resize_fill(initial_array_size, attr.float64_null_value); + break; + case attribute_type::string: + attr.string_null_value = null_value; + attr.string_arena.reset(new Arena); + attr.string_array.reset(new PODArray); + attr.string_array->resize_fill(initial_array_size, attr.string_null_value); + break; + } + + return attr; + } + + void setAttributeValue(attribute_t & attribute, const id_t id, const Field & value) + { + if (id >= max_array_size) + throw Exception{ + "Identifier should be less than " + toString(max_array_size), + ErrorCodes::ARGUMENT_OUT_OF_BOUND + }; + + switch (attribute.type) + { + case attribute_type::uint8: + { + if (id >= attribute.uint8_array->size()) + attribute.uint8_array->resize_fill(id, attribute.uint8_null_value); + (*attribute.uint8_array)[id] = value.get(); + break; + } + case attribute_type::uint16: + { + if (id >= attribute.uint16_array->size()) + attribute.uint16_array->resize_fill(id, attribute.uint16_null_value); + (*attribute.uint16_array)[id] = value.get(); + break; + } + case attribute_type::uint32: + { + if (id >= attribute.uint32_array->size()) + attribute.uint32_array->resize_fill(id, attribute.uint32_null_value); + (*attribute.uint32_array)[id] = value.get(); + break; + } + case attribute_type::uint64: + { + if (id >= attribute.uint64_array->size()) + attribute.uint64_array->resize_fill(id, attribute.uint64_null_value); + (*attribute.uint64_array)[id] = value.get(); + break; + } + case attribute_type::int8: + { + if (id >= attribute.int8_array->size()) + attribute.int8_array->resize_fill(id, attribute.int8_null_value); + (*attribute.int8_array)[id] = value.get(); + break; + } + case attribute_type::int16: + { + if (id >= attribute.int16_array->size()) + attribute.int16_array->resize_fill(id, attribute.int16_null_value); + (*attribute.int16_array)[id] = value.get(); + break; + } + case attribute_type::int32: + { + if (id >= attribute.int32_array->size()) + attribute.int32_array->resize_fill(id, attribute.int32_null_value); + (*attribute.int32_array)[id] = value.get(); + break; + } + case attribute_type::int64: + { + if (id >= attribute.int64_array->size()) + attribute.int64_array->resize_fill(id, attribute.int64_null_value); + (*attribute.int64_array)[id] = value.get(); + break; + } + case attribute_type::float32: + { + if (id >= attribute.float32_array->size()) + attribute.float32_array->resize_fill(id, attribute.float32_null_value); + (*attribute.float32_array)[id] = value.get(); + break; + } + case attribute_type::float64: + { + if (id >= attribute.float64_array->size()) + attribute.float64_array->resize_fill(id, attribute.float64_null_value); + (*attribute.float64_array)[id] = value.get(); + break; + } + case attribute_type::string: + { + if (id >= attribute.string_array->size()) + attribute.string_array->resize_fill(id, attribute.string_null_value); + const auto & string = value.get(); + const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size()); + (*attribute.string_array)[id] = StringRef{string_in_arena, string.size()}; + break; + } + }; + } + + const std::string name; + const DictionaryStructure dict_struct; + const DictionarySourcePtr source_ptr; + const DictionaryLifetime dict_lifetime; + + std::map attribute_index_by_name; + std::vector attributes; + const attribute_t * hierarchical_attribute = nullptr; +}; + +} diff --git a/dbms/include/DB/Dictionaries/HashedDictionary.h b/dbms/include/DB/Dictionaries/HashedDictionary.h new file mode 100644 index 00000000000..4c83f6395ca --- /dev/null +++ b/dbms/include/DB/Dictionaries/HashedDictionary.h @@ -0,0 +1,376 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class HashedDictionary final : public IDictionary +{ +public: + HashedDictionary(const std::string & name, const DictionaryStructure & dict_struct, + DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime) + : name{name}, dict_struct(dict_struct), + source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime) + { + createAttributes(); + loadData(); + } + + HashedDictionary(const HashedDictionary & other) + : HashedDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime} + {} + + std::string getName() const override { return name; } + + std::string getTypeName() const override { return "HashedDictionary"; } + + bool isCached() const override { return false; } + + DictionaryPtr clone() const override { return ext::make_unique(*this); } + + const IDictionarySource * const getSource() const override { return source_ptr.get(); } + + const DictionaryLifetime & getLifetime() const override { return dict_lifetime; } + + bool hasHierarchy() const override { return hierarchical_attribute; } + + id_t toParent(const id_t id) const override + { + const auto attr = hierarchical_attribute; + + switch (hierarchical_attribute->type) + { + case attribute_type::uint8: + { + const auto it = attr->uint8_map->find(id); + return it != attr->uint8_map->end() ? it->second : attr->uint8_null_value; + } + case attribute_type::uint16: + { + const auto it = attr->uint16_map->find(id); + return it != attr->uint16_map->end() ? it->second : attr->uint16_null_value; + } + case attribute_type::uint32: + { + const auto it = attr->uint32_map->find(id); + return it != attr->uint32_map->end() ? it->second : attr->uint32_null_value; + } + case attribute_type::uint64: + { + const auto it = attr->uint64_map->find(id); + return it != attr->uint64_map->end() ? it->second : attr->uint64_null_value; + } + case attribute_type::int8: + { + const auto it = attr->int8_map->find(id); + return it != attr->int8_map->end() ? it->second : attr->int8_null_value; + } + case attribute_type::int16: + { + const auto it = attr->int16_map->find(id); + return it != attr->int16_map->end() ? it->second : attr->int16_null_value; + } + case attribute_type::int32: + { + const auto it = attr->int32_map->find(id); + return it != attr->int32_map->end() ? it->second : attr->int32_null_value; + } + case attribute_type::int64: + { + const auto it = attr->int64_map->find(id); + return it != attr->int64_map->end() ? it->second : attr->int64_null_value; + } + case attribute_type::float32: + case attribute_type::float64: + case attribute_type::string: + break; + }; + + throw Exception{ + "Hierarchical attribute has non-integer type " + toString(hierarchical_attribute->type), + ErrorCodes::TYPE_MISMATCH + }; + } + +#define DECLARE_SAFE_GETTER(TYPE, NAME, LC_TYPE) \ + TYPE get##NAME(const std::string & attribute_name, const id_t id) const override\ + {\ + const auto idx = getAttributeIndex(attribute_name);\ + const auto & attribute = attributes[idx];\ + if (attribute.type != attribute_type::LC_TYPE)\ + throw Exception{\ + "Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\ + ErrorCodes::TYPE_MISMATCH\ + };\ + const auto it = attribute.LC_TYPE##_map->find(id);\ + if (it != attribute.LC_TYPE##_map->end())\ + return it->second;\ + return attribute.LC_TYPE##_null_value;\ + } + DECLARE_SAFE_GETTER(UInt8, UInt8, uint8) + DECLARE_SAFE_GETTER(UInt16, UInt16, uint16) + DECLARE_SAFE_GETTER(UInt32, UInt32, uint32) + DECLARE_SAFE_GETTER(UInt64, UInt64, uint64) + DECLARE_SAFE_GETTER(Int8, Int8, int8) + DECLARE_SAFE_GETTER(Int16, Int16, int16) + DECLARE_SAFE_GETTER(Int32, Int32, int32) + DECLARE_SAFE_GETTER(Int64, Int64, int64) + DECLARE_SAFE_GETTER(Float32, Float32, float32) + DECLARE_SAFE_GETTER(Float64, Float64, float64) + DECLARE_SAFE_GETTER(StringRef, String, string) +#undef DECLARE_SAFE_GETTER + + std::size_t getAttributeIndex(const std::string & attribute_name) const override + { + const auto it = attribute_index_by_name.find(attribute_name); + if (it == std::end(attribute_index_by_name)) + throw Exception{ + "No such attribute '" + attribute_name + "'", + ErrorCodes::BAD_ARGUMENTS + }; + + return it->second; + } + +#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\ + bool is##NAME(const std::size_t attribute_idx) const override\ + {\ + return attributes[attribute_idx].type == attribute_type::LC_NAME;\ + } + DECLARE_TYPE_CHECKER(UInt8, uint8) + DECLARE_TYPE_CHECKER(UInt16, uint16) + DECLARE_TYPE_CHECKER(UInt32, uint32) + DECLARE_TYPE_CHECKER(UInt64, uint64) + DECLARE_TYPE_CHECKER(Int8, int8) + DECLARE_TYPE_CHECKER(Int16, int16) + DECLARE_TYPE_CHECKER(Int32, int32) + DECLARE_TYPE_CHECKER(Int64, int64) + DECLARE_TYPE_CHECKER(Float32, float32) + DECLARE_TYPE_CHECKER(Float64, float64) + DECLARE_TYPE_CHECKER(String, string) +#undef DECLARE_TYPE_CHECKER + +#define DECLARE_UNSAFE_GETTER(TYPE, NAME, LC_NAME)\ + TYPE get##NAME##Unsafe(const std::size_t attribute_idx, const id_t id) const override\ + {\ + const auto & attribute = attributes[attribute_idx];\ + const auto it = attribute.LC_NAME##_map->find(id);\ + if (it != attribute.LC_NAME##_map->end())\ + return it->second;\ + return attribute.LC_NAME##_null_value;\ + } + DECLARE_UNSAFE_GETTER(UInt8, UInt8, uint8) + DECLARE_UNSAFE_GETTER(UInt16, UInt16, uint16) + DECLARE_UNSAFE_GETTER(UInt32, UInt32, uint32) + DECLARE_UNSAFE_GETTER(UInt64, UInt64, uint64) + DECLARE_UNSAFE_GETTER(Int8, Int8, int8) + DECLARE_UNSAFE_GETTER(Int16, Int16, int16) + DECLARE_UNSAFE_GETTER(Int32, Int32, int32) + DECLARE_UNSAFE_GETTER(Int64, Int64, int64) + DECLARE_UNSAFE_GETTER(Float32, Float32, float32) + DECLARE_UNSAFE_GETTER(Float64, Float64, float64) + DECLARE_UNSAFE_GETTER(StringRef, String, string) +#undef DECLARE_UNSAFE_GETTER + +private: + struct attribute_t + { + attribute_type type; + UInt8 uint8_null_value; + UInt16 uint16_null_value; + UInt32 uint32_null_value; + UInt64 uint64_null_value; + Int8 int8_null_value; + Int16 int16_null_value; + Int32 int32_null_value; + Int64 int64_null_value; + Float32 float32_null_value; + Float64 float64_null_value; + String string_null_value; + std::unique_ptr> uint8_map; + std::unique_ptr> uint16_map; + std::unique_ptr> uint32_map; + std::unique_ptr> uint64_map; + std::unique_ptr> int8_map; + std::unique_ptr> int16_map; + std::unique_ptr> int32_map; + std::unique_ptr> int64_map; + std::unique_ptr> float32_map; + std::unique_ptr> float64_map; + std::unique_ptr string_arena; + std::unique_ptr> string_map; + }; + + void createAttributes() + { + const auto size = dict_struct.attributes.size(); + attributes.reserve(size); + for (const auto & attribute : dict_struct.attributes) + { + attribute_index_by_name.emplace(attribute.name, attributes.size()); + attributes.push_back(std::move(createAttributeWithType(getAttributeTypeByName(attribute.type), + attribute.null_value))); + + if (attribute.hierarchical) + hierarchical_attribute = &attributes.back(); + } + } + + void loadData() + { + auto stream = source_ptr->loadAll(); + + while (const auto block = stream->read()) + { + const auto & id_column = *block.getByPosition(0).column; + + for (const auto attribute_idx : ext::range(0, attributes.size())) + { + const auto & attribute_column = *block.getByPosition(attribute_idx + 1).column; + auto & attribute = attributes[attribute_idx]; + + for (const auto row_idx : ext::range(0, id_column.size())) + setAttributeValue(attribute, id_column[row_idx].get(), attribute_column[row_idx]); + } + } + } + + attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value) + { + attribute_t attr{type}; + + switch (type) + { + case attribute_type::uint8: + attr.uint8_null_value = DB::parse(null_value); + attr.uint8_map.reset(new HashMap); + break; + case attribute_type::uint16: + attr.uint16_null_value = DB::parse(null_value); + attr.uint16_map.reset(new HashMap); + break; + case attribute_type::uint32: + attr.uint32_null_value = DB::parse(null_value); + attr.uint32_map.reset(new HashMap); + break; + case attribute_type::uint64: + attr.uint64_null_value = DB::parse(null_value); + attr.uint64_map.reset(new HashMap); + break; + case attribute_type::int8: + attr.int8_null_value = DB::parse(null_value); + attr.int8_map.reset(new HashMap); + break; + case attribute_type::int16: + attr.int16_null_value = DB::parse(null_value); + attr.int16_map.reset(new HashMap); + break; + case attribute_type::int32: + attr.int32_null_value = DB::parse(null_value); + attr.int32_map.reset(new HashMap); + break; + case attribute_type::int64: + attr.int64_null_value = DB::parse(null_value); + attr.int64_map.reset(new HashMap); + break; + case attribute_type::float32: + attr.float32_null_value = DB::parse(null_value); + attr.float32_map.reset(new HashMap); + break; + case attribute_type::float64: + attr.float64_null_value = DB::parse(null_value); + attr.float64_map.reset(new HashMap); + break; + case attribute_type::string: + attr.string_null_value = null_value; + attr.string_arena.reset(new Arena); + attr.string_map.reset(new HashMap); + break; + } + + return attr; + } + + void setAttributeValue(attribute_t & attribute, const id_t id, const Field & value) + { + switch (attribute.type) + { + case attribute_type::uint8: + { + attribute.uint8_map->insert({ id, value.get() }); + break; + } + case attribute_type::uint16: + { + attribute.uint16_map->insert({ id, value.get() }); + break; + } + case attribute_type::uint32: + { + attribute.uint32_map->insert({ id, value.get() }); + break; + } + case attribute_type::uint64: + { + attribute.uint64_map->insert({ id, value.get() }); + break; + } + case attribute_type::int8: + { + attribute.int8_map->insert({ id, value.get() }); + break; + } + case attribute_type::int16: + { + attribute.int16_map->insert({ id, value.get() }); + break; + } + case attribute_type::int32: + { + attribute.int32_map->insert({ id, value.get() }); + break; + } + case attribute_type::int64: + { + attribute.int64_map->insert({ id, value.get() }); + break; + } + case attribute_type::float32: + { + attribute.float32_map->insert({ id, value.get() }); + break; + } + case attribute_type::float64: + { + attribute.float64_map->insert({ id, value.get() }); + break; + } + case attribute_type::string: + { + const auto & string = value.get(); + const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size()); + attribute.string_map->insert({ id, StringRef{string_in_arena, string.size()} }); + break; + } + }; + } + + const std::string name; + const DictionaryStructure dict_struct; + const DictionarySourcePtr source_ptr; + const DictionaryLifetime dict_lifetime; + + std::map attribute_index_by_name; + std::vector attributes; + const attribute_t * hierarchical_attribute = nullptr; + +}; + +} diff --git a/dbms/include/DB/Dictionaries/IDictionary.h b/dbms/include/DB/Dictionaries/IDictionary.h new file mode 100644 index 00000000000..088c9901912 --- /dev/null +++ b/dbms/include/DB/Dictionaries/IDictionary.h @@ -0,0 +1,95 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +class IDictionarySource; + +class IDictionary; +using DictionaryPtr = std::unique_ptr; + +class DictionaryLifetime; + +class IDictionary +{ +public: + using id_t = std::uint64_t; + + virtual std::string getName() const = 0; + + virtual std::string getTypeName() const = 0; + + virtual bool isCached() const = 0; + virtual void reload() {} + virtual DictionaryPtr clone() const = 0; + + virtual const IDictionarySource * const getSource() const = 0; + + virtual const DictionaryLifetime & getLifetime() const = 0; + + virtual bool hasHierarchy() const = 0; + + /// do not call unless you ensure that hasHierarchy() returns true + virtual id_t toParent(id_t id) const = 0; + + bool in(id_t child_id, const id_t ancestor_id) const + { + while (child_id != 0 && child_id != ancestor_id) + child_id = toParent(child_id); + + return child_id != 0; + } + + /// safe and slow functions, perform map lookup and type checks + virtual UInt8 getUInt8(const std::string & attribute_name, id_t id) const = 0; + virtual UInt16 getUInt16(const std::string & attribute_name, id_t id) const = 0; + virtual UInt32 getUInt32(const std::string & attribute_name, id_t id) const = 0; + virtual UInt64 getUInt64(const std::string & attribute_name, id_t id) const = 0; + virtual Int8 getInt8(const std::string & attribute_name, id_t id) const = 0; + virtual Int16 getInt16(const std::string & attribute_name, id_t id) const = 0; + virtual Int32 getInt32(const std::string & attribute_name, id_t id) const = 0; + virtual Int64 getInt64(const std::string & attribute_name, id_t id) const = 0; + virtual Float32 getFloat32(const std::string & attribute_name, id_t id) const = 0; + virtual Float64 getFloat64(const std::string & attribute_name, id_t id) const = 0; + virtual StringRef getString(const std::string & attribute_name, id_t id) const = 0; + + /// unsafe functions for maximum performance, you are on your own ensuring type-safety + + /// returns persistent attribute index for usage with following functions + virtual std::size_t getAttributeIndex(const std::string & attribute_name) const = 0; + + /// type-checking functions + virtual bool isUInt8(std::size_t attribute_idx) const = 0; + virtual bool isUInt16(std::size_t attribute_idx) const = 0; + virtual bool isUInt32(std::size_t attribute_idx) const = 0; + virtual bool isUInt64(std::size_t attribute_idx) const = 0; + virtual bool isInt8(std::size_t attribute_idx) const = 0; + virtual bool isInt16(std::size_t attribute_idx) const = 0; + virtual bool isInt32(std::size_t attribute_idx) const = 0; + virtual bool isInt64(std::size_t attribute_idx) const = 0; + virtual bool isFloat32(std::size_t attribute_idx) const = 0; + virtual bool isFloat64(std::size_t attribute_idx) const = 0; + virtual bool isString(std::size_t attribute_idx) const = 0; + + /// plain load from target container without any checks + virtual UInt8 getUInt8Unsafe(std::size_t attribute_idx, id_t id) const = 0; + virtual UInt16 getUInt16Unsafe(std::size_t attribute_idx, id_t id) const = 0; + virtual UInt32 getUInt32Unsafe(std::size_t attribute_idx, id_t id) const = 0; + virtual UInt64 getUInt64Unsafe(std::size_t attribute_idx, id_t id) const = 0; + virtual Int8 getInt8Unsafe(std::size_t attribute_idx, id_t id) const = 0; + virtual Int16 getInt16Unsafe(std::size_t attribute_idx, id_t id) const = 0; + virtual Int32 getInt32Unsafe(std::size_t attribute_idx, id_t id) const = 0; + virtual Int64 getInt64Unsafe(std::size_t attribute_idx, id_t id) const = 0; + virtual Float32 getFloat32Unsafe(std::size_t attribute_idx, id_t id) const = 0; + virtual Float64 getFloat64Unsafe(std::size_t attribute_idx, id_t id) const = 0; + virtual StringRef getStringUnsafe(std::size_t attribute_idx, id_t id) const = 0; + + virtual ~IDictionary() = default; +}; + +} diff --git a/dbms/include/DB/Dictionaries/IDictionarySource.h b/dbms/include/DB/Dictionaries/IDictionarySource.h new file mode 100644 index 00000000000..50404cc682c --- /dev/null +++ b/dbms/include/DB/Dictionaries/IDictionarySource.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include + +namespace DB +{ + +class IDictionarySource; +using DictionarySourcePtr = std::unique_ptr; + +class IDictionarySource +{ +public: + virtual BlockInputStreamPtr loadAll() = 0; + virtual BlockInputStreamPtr loadId(const std::uint64_t id) = 0; + virtual BlockInputStreamPtr loadIds(const std::vector ids) = 0; + virtual bool isModified() const = 0; + + virtual DictionarySourcePtr clone() const = 0; + + virtual ~IDictionarySource() = default; +}; + +} diff --git a/dbms/include/DB/Dictionaries/MysqlBlockInputStream.h b/dbms/include/DB/Dictionaries/MysqlBlockInputStream.h new file mode 100644 index 00000000000..120a02ac47f --- /dev/null +++ b/dbms/include/DB/Dictionaries/MysqlBlockInputStream.h @@ -0,0 +1,114 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class MysqlBlockInputStream final : public IProfilingBlockInputStream +{ +public: + MysqlBlockInputStream(mysqlxx::Query query, const Block & sample_block, const std::size_t max_block_size) + : query{std::move(query)}, result{query.use()}, sample_block{sample_block}, max_block_size{max_block_size} + { + types.reserve(sample_block.columns()); + + for (const auto idx : ext::range(0, sample_block.columns())) + { + const auto type = sample_block.getByPosition(idx).type.get(); + if (typeid_cast(type)) + types.push_back(attribute_type::uint8); + else if (typeid_cast(type)) + types.push_back(attribute_type::uint16); + else if (typeid_cast(type)) + types.push_back(attribute_type::uint32); + else if (typeid_cast(type)) + types.push_back(attribute_type::uint64); + else if (typeid_cast(type)) + types.push_back(attribute_type::int8); + else if (typeid_cast(type)) + types.push_back(attribute_type::int16); + else if (typeid_cast(type)) + types.push_back(attribute_type::int32); + else if (typeid_cast(type)) + types.push_back(attribute_type::int64); + else if (typeid_cast(type)) + types.push_back(attribute_type::float32); + else if (typeid_cast(type)) + types.push_back(attribute_type::float64); + else if (typeid_cast(type)) + types.push_back(attribute_type::string); + else + throw Exception{ + "Unsupported type " + type->getName(), + ErrorCodes::UNKNOWN_TYPE + }; + } + } + + String getName() const override { return "MysqlBlockInputStream"; } + + String getID() const override + { + return "Mysql(" + query.str() + ")"; + } + +private: + Block readImpl() override + { + auto block = sample_block.cloneEmpty(); + + if (block.columns() != result.getNumFields()) + throw Exception{ + "mysqlxx::UserQueryResult contains " + toString(result.getNumFields()) + " columns while " + + toString(block.columns()) + " expected", + ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH + }; + + std::size_t rows = 0; + while (auto row = result.fetch()) + { + for (const auto idx : ext::range(0, row.size())) + insertValue(block.getByPosition(idx).column, row[idx], types[idx]); + + ++rows; + if (rows == max_block_size) + break; + } + + return rows == 0 ? Block{} : block; + }; + + static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const attribute_type type) + { + switch (type) + { + case attribute_type::uint8: column->insert(static_cast(value)); break; + case attribute_type::uint16: column->insert(static_cast(value)); break; + case attribute_type::uint32: column->insert(static_cast(value)); break; + case attribute_type::uint64: column->insert(static_cast(value)); break; + case attribute_type::int8: column->insert(static_cast(value)); break; + case attribute_type::int16: column->insert(static_cast(value)); break; + case attribute_type::int32: column->insert(static_cast(value)); break; + case attribute_type::int64: column->insert(static_cast(value)); break; + case attribute_type::float32: column->insert(static_cast(value)); break; + case attribute_type::float64: column->insert(static_cast(value)); break; + case attribute_type::string: column->insert(value.getString()); break; + } + } + + mysqlxx::Query query; + mysqlxx::UseQueryResult result; + Block sample_block; + std::size_t max_block_size; + std::vector types; +}; + +} diff --git a/dbms/include/DB/Dictionaries/MysqlDictionarySource.h b/dbms/include/DB/Dictionaries/MysqlDictionarySource.h new file mode 100644 index 00000000000..32ec45feea9 --- /dev/null +++ b/dbms/include/DB/Dictionaries/MysqlDictionarySource.h @@ -0,0 +1,120 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class MysqlDictionarySource final : public IDictionarySource +{ + static const auto max_block_size = 8192; + +public: + MysqlDictionarySource(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, + Block & sample_block, const Context & context) + : host{config.getString(config_prefix + "host")}, + port(config.getInt(config_prefix + "port")), + user{config.getString(config_prefix + "user", "")}, + password{config.getString(config_prefix + "password", "")}, + db{config.getString(config_prefix + "db", "")}, + table{config.getString(config_prefix + "table")}, + sample_block{sample_block}, context(context), + pool{db, host, user, password, port}, + load_all_query{composeLoadAllQuery(sample_block, table)}, + last_modification{getLastModification()} + {} + + MysqlDictionarySource(const MysqlDictionarySource & other) + : host{other.host}, port{other.port}, user{other.user}, password{other.password}, + db{other.db}, table{other.db}, + sample_block{other.sample_block}, context(other.context), + pool{db, host, user, password, port}, + load_all_query{other.load_all_query}, last_modification{other.last_modification} + {} + + BlockInputStreamPtr loadAll() override + { + return new MysqlBlockInputStream{pool.Get()->query(load_all_query), sample_block, max_block_size}; + } + + BlockInputStreamPtr loadId(const std::uint64_t id) override + { + throw Exception{ + "Method unsupported", + ErrorCodes::NOT_IMPLEMENTED + }; + } + + BlockInputStreamPtr loadIds(const std::vector ids) override + { + throw Exception{ + "Method unsupported", + ErrorCodes::NOT_IMPLEMENTED + }; + } + + bool isModified() const override { return getLastModification() > last_modification; } + + DictionarySourcePtr clone() const override { return ext::make_unique(*this); } + +private: + mysqlxx::DateTime getLastModification() const + { + const auto Create_time_idx = 11; + const auto Update_time_idx = 12; + + try + { + auto connection = pool.Get(); + auto query = connection->query("SHOW TABLE STATUS LIKE '%" + table + "%';"); + auto result = query.use(); + auto row = result.fetch(); + const auto & update_time = row[Update_time_idx]; + return !update_time.isNull() ? update_time.getDateTime() : row[Create_time_idx].getDateTime(); + } + catch (...) + { + tryLogCurrentException("MysqlDictionarySource"); + } + + return {}; + } + + static std::string composeLoadAllQuery(const Block & block, const std::string & table) + { + std::string query{"SELECT "}; + + auto first = true; + for (const auto idx : ext::range(0, block.columns())) + { + if (!first) + query += ", "; + + query += block.getByPosition(idx).name; + first = false; + } + + query += " FROM " + table + ';'; + + return query; + } + + const std::string host; + const UInt16 port; + const std::string user; + const std::string password; + const std::string db; + const std::string table; + Block sample_block; + const Context & context; + mutable mysqlxx::Pool pool; + const std::string load_all_query; + mysqlxx::DateTime last_modification; +}; + +} diff --git a/dbms/include/DB/Dictionaries/OwningBufferBlockInputStream.h b/dbms/include/DB/Dictionaries/OwningBufferBlockInputStream.h new file mode 100644 index 00000000000..b50d4c322a7 --- /dev/null +++ b/dbms/include/DB/Dictionaries/OwningBufferBlockInputStream.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +class OwningBufferBlockInputStream : public IProfilingBlockInputStream +{ +public: + OwningBufferBlockInputStream(const BlockInputStreamPtr & stream, std::unique_ptr buffer) + : stream{stream}, buffer{std::move(buffer)} + { + children.push_back(stream); + } + +private: + Block readImpl() override { return stream->read(); } + + String getName() const override { return "OwningBufferBlockInputStream"; } + + String getID() const override { + return "OwningBuffer(" + stream->getID() + ")"; + } + + BlockInputStreamPtr stream; + std::unique_ptr buffer; +}; + +} diff --git a/dbms/include/DB/Dictionaries/config_ptr_t.h b/dbms/include/DB/Dictionaries/config_ptr_t.h new file mode 100644 index 00000000000..f5f29792cc7 --- /dev/null +++ b/dbms/include/DB/Dictionaries/config_ptr_t.h @@ -0,0 +1,15 @@ +#pragma once + +#include + +namespace DB +{ + +template struct release +{ + void operator()(const T * const ptr) { ptr->release(); } +}; + +template using config_ptr_t = std::unique_ptr>; + +} diff --git a/dbms/include/DB/Functions/FunctionsDictionaries.h b/dbms/include/DB/Functions/FunctionsDictionaries.h index a0e0908d531..c5e2a95989f 100644 --- a/dbms/include/DB/Functions/FunctionsDictionaries.h +++ b/dbms/include/DB/Functions/FunctionsDictionaries.h @@ -11,6 +11,9 @@ #include #include +#include +#include +#include namespace DB @@ -715,4 +718,760 @@ public: } }; + +class FunctionDictGetString : public IFunction +{ +public: + static constexpr auto name = "dictGetString"; + + static IFunction * create(const Context & context) + { + return new FunctionDictGetString{context.getDictionaries()}; + }; + + FunctionDictGetString(const Dictionaries & dictionaries) : dictionaries(dictionaries) {} + + String getName() const override { return name; } + +private: + DataTypePtr getReturnType(const DataTypes & arguments) const override + { + if (arguments.size() != 3) + throw Exception{ + "Number of arguments for function " + getName() + " doesn't match: passed " + + toString(arguments.size()) + ", should be 3.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH + }; + + if (!typeid_cast(arguments[0].get())) + { + throw Exception{ + "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + if (!typeid_cast(arguments[1].get())) + { + throw Exception{ + "Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + const auto id_arg = arguments[2].get(); + if (!typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg)) + { + throw Exception{ + "Illegal type " + arguments[2]->getName() + " of argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + return new DataTypeString; + } + + void execute(Block & block, const ColumnNumbers & arguments, const size_t result) + { + const auto dict_name_col = typeid_cast *>(block.getByPosition(arguments[0]).column.get()); + if (!dict_name_col) + throw Exception{ + "First argument of function " + getName() + " must be a constant string", + ErrorCodes::ILLEGAL_COLUMN + }; + + auto dict = dictionaries.getExternalDictionary(dict_name_col->getData()); + const auto dict_ptr = dict.get(); + + if (!executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr)) + throw Exception{ + "Unsupported dictionary type " + dict_ptr->getTypeName(), + ErrorCodes::UNKNOWN_TYPE + }; + } + + template + bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result, + const IDictionary * const dictionary) + { + const auto dict = typeid_cast(dictionary); + if (!dict) + return false; + + const auto attr_name_col = typeid_cast *>(block.getByPosition(arguments[1]).column.get()); + if (!attr_name_col) + throw Exception{ + "Second argument of function " + getName() + " must be a constant string", + ErrorCodes::ILLEGAL_COLUMN + }; + + const auto & attr_name = attr_name_col->getData(); + + const auto id_col = block.getByPosition(arguments[2]).column.get(); + if (!execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col)) + { + throw Exception{ + "Third argument of function " + getName() + " must be integral", + ErrorCodes::ILLEGAL_COLUMN + }; + } + + return true; + } + + template + bool execute(Block & block, const size_t result, const DictionaryType * const dictionary, + const std::string & attr_name, const IColumn * const id_col_untyped) + { + if (const auto id_col = typeid_cast *>(id_col_untyped)) + { + const auto out = new ColumnString; + block.getByPosition(result).column = out; + + const auto attribute_idx = dictionary->getAttributeIndex(attr_name); + if (!dictionary->isString(attribute_idx)) + throw Exception{ + "Type mismatch: attribute " + attr_name + " has type different from String", + ErrorCodes::TYPE_MISMATCH + }; + + for (const auto & id : id_col->getData()) + { + const auto string_ref = dictionary->getStringUnsafe(attribute_idx, id); + out->insertData(string_ref.data, string_ref.size); + } + + return true; + } + else if (const auto id_col = typeid_cast *>(id_col_untyped)) + { + block.getByPosition(result).column = new ColumnConst{ + id_col->size(), + dictionary->getString(attr_name, id_col->getData()).toString() + }; + + return true; + }; + + return false; + } + + const Dictionaries & dictionaries; +}; + + +template struct DictGetTraits; +#define DECLARE_DICT_GET_TRAITS(TYPE, DATA_TYPE) \ +template <> struct DictGetTraits\ +{\ + static TYPE get(const IDictionary * const dict, const std::string & name, const IDictionary::id_t id)\ + {\ + return dict->get##TYPE(name, id);\ + }\ + static bool is(const IDictionary * const dict, const std::size_t idx) { return dict->is##TYPE(idx); } \ + static TYPE get(const IDictionary * const dict, const std::size_t idx, const IDictionary::id_t id)\ + {\ + return dict->get##TYPE##Unsafe(idx, id);\ + }\ +}; +DECLARE_DICT_GET_TRAITS(UInt8, DataTypeUInt8) +DECLARE_DICT_GET_TRAITS(UInt16, DataTypeUInt16) +DECLARE_DICT_GET_TRAITS(UInt32, DataTypeUInt32) +DECLARE_DICT_GET_TRAITS(UInt64, DataTypeUInt64) +DECLARE_DICT_GET_TRAITS(Int8, DataTypeInt8) +DECLARE_DICT_GET_TRAITS(Int16, DataTypeInt16) +DECLARE_DICT_GET_TRAITS(Int32, DataTypeInt32) +DECLARE_DICT_GET_TRAITS(Int64, DataTypeInt64) +DECLARE_DICT_GET_TRAITS(Float32, DataTypeFloat32) +DECLARE_DICT_GET_TRAITS(Float64, DataTypeFloat64) +#undef DECLARE_DICT_GET_TRAITS + +template +class FunctionDictGet final : public IFunction +{ + using Type = typename DataType::FieldType; + +public: + static const std::string name; + + static IFunction * create(const Context & context) + { + return new FunctionDictGet{context.getDictionaries()}; + }; + + FunctionDictGet(const Dictionaries & dictionaries) : dictionaries(dictionaries) {} + + String getName() const override { return name; } + +private: + DataTypePtr getReturnType(const DataTypes & arguments) const override + { + if (arguments.size() != 3) + throw Exception{ + "Number of arguments for function " + getName() + " doesn't match: passed " + + toString(arguments.size()) + ", should be 3.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH + }; + + if (!typeid_cast(arguments[0].get())) + { + throw Exception{ + "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + if (!typeid_cast(arguments[1].get())) + { + throw Exception{ + "Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + const auto id_arg = arguments[2].get(); + if (!typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg)) + { + throw Exception{ + "Illegal type " + id_arg->getName() + " of argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + return new DataType; + } + + void execute(Block & block, const ColumnNumbers & arguments, const size_t result) override + { + const auto dict_name_col = typeid_cast *>(block.getByPosition(arguments[0]).column.get()); + if (!dict_name_col) + throw Exception{ + "First argument of function " + getName() + " must be a constant string", + ErrorCodes::ILLEGAL_COLUMN + }; + + auto dict = dictionaries.getExternalDictionary(dict_name_col->getData()); + const auto dict_ptr = dict.get(); + + if (!executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr)) + throw Exception{ + "Unsupported dictionary type " + dict_ptr->getTypeName(), + ErrorCodes::UNKNOWN_TYPE + }; + } + + template + bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result, + const IDictionary * const dictionary) + { + const auto dict = typeid_cast(dictionary); + if (!dict) + return false; + + const auto attr_name_col = typeid_cast *>(block.getByPosition(arguments[1]).column.get()); + if (!attr_name_col) + throw Exception{ + "Second argument of function " + getName() + " must be a constant string", + ErrorCodes::ILLEGAL_COLUMN + }; + + const auto & attr_name = attr_name_col->getData(); + + const auto id_col = block.getByPosition(arguments[2]).column.get(); + if (!execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col) && + !execute(block, result, dict, attr_name, id_col)) + { + throw Exception{ + "Third argument of function " + getName() + " must be integral", + ErrorCodes::ILLEGAL_COLUMN + }; + } + + return true; + } + + template + bool execute(Block & block, const size_t result, const DictionaryType * const dictionary, + const std::string & attr_name, const IColumn * const id_col_untyped) + { + if (const auto id_col = typeid_cast *>(id_col_untyped)) + { + const auto attribute_idx = dictionary->getAttributeIndex(attr_name); + if (!DictGetTraits::is(dictionary, attribute_idx)) + throw Exception{ + "Type mismatch: attribute " + attr_name + " has type different from UInt64", + ErrorCodes::TYPE_MISMATCH + }; + + const auto out = new ColumnVector; + block.getByPosition(result).column = out; + + const auto & ids = id_col->getData(); + auto & data = out->getData(); + const auto size = ids.size(); + data.resize(size); + + for (const auto idx : ext::range(0, size)) + data[idx] = DictGetTraits::get(dictionary, attribute_idx, ids[idx]); + + return true; + } + else if (const auto id_col = typeid_cast *>(id_col_untyped)) + { + block.getByPosition(result).column = new ColumnConst{ + id_col->size(), + DictGetTraits::get(dictionary, attr_name, id_col->getData()) + }; + + return true; + }; + + return false; + } + + const Dictionaries & dictionaries; +}; + +template +const std::string FunctionDictGet::name = "dictGet" + TypeName::get(); + + +using FunctionDictGetUInt8 = FunctionDictGet; +using FunctionDictGetUInt16 = FunctionDictGet; +using FunctionDictGetUInt32 = FunctionDictGet; +using FunctionDictGetUInt64 = FunctionDictGet; +using FunctionDictGetInt8 = FunctionDictGet; +using FunctionDictGetInt16 = FunctionDictGet; +using FunctionDictGetInt32 = FunctionDictGet; +using FunctionDictGetInt64 = FunctionDictGet; +using FunctionDictGetFloat32 = FunctionDictGet; +using FunctionDictGetFloat64 = FunctionDictGet; + + +class FunctionDictGetHierarchy final : public IFunction +{ +public: + static constexpr auto name = "dictGetHierarchy"; + + static IFunction * create(const Context & context) + { + return new FunctionDictGetHierarchy{context.getDictionaries()}; + }; + + FunctionDictGetHierarchy(const Dictionaries & dictionaries) : dictionaries(dictionaries) {} + + String getName() const override { return name; } + +private: + DataTypePtr getReturnType(const DataTypes & arguments) const override + { + if (arguments.size() != 2) + throw Exception{ + "Number of arguments for function " + getName() + " doesn't match: passed " + + toString(arguments.size()) + ", should be 2.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH + }; + + if (!typeid_cast(arguments[0].get())) + { + throw Exception{ + "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + const auto id_arg = arguments[1].get(); + if (!typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg) && + !typeid_cast(id_arg)) + { + throw Exception{ + "Illegal type " + id_arg->getName() + " of argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + return new DataTypeArray{new DataTypeUInt64}; + }; + + void execute(Block & block, const ColumnNumbers & arguments, const size_t result) override + { + const auto dict_name_col = typeid_cast *>(block.getByPosition(arguments[0]).column.get()); + if (!dict_name_col) + throw Exception{ + "First argument of function " + getName() + " must be a constant string", + ErrorCodes::ILLEGAL_COLUMN + }; + + auto dict = dictionaries.getExternalDictionary(dict_name_col->getData()); + const auto dict_ptr = dict.get(); + + if (!dict->hasHierarchy()) + throw Exception{ + "Dictionary does not have a hierarchy", + ErrorCodes::UNSUPPORTED_METHOD + }; + + if (!executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr)) + throw Exception{ + "Unsupported dictionary type " + dict_ptr->getTypeName(), + ErrorCodes::UNKNOWN_TYPE + }; + } + + template + bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result, + const IDictionary * const dictionary) + { + const auto dict = typeid_cast(dictionary); + if (!dict) + return false; + + const auto id_col = block.getByPosition(arguments[1]).column.get(); + if (!execute(block, result, dict, id_col) && + !execute(block, result, dict, id_col) && + !execute(block, result, dict, id_col) && + !execute(block, result, dict, id_col) && + !execute(block, result, dict, id_col) && + !execute(block, result, dict, id_col) && + !execute(block, result, dict, id_col) && + !execute(block, result, dict, id_col)) + { + throw Exception{ + "Second argument of function " + getName() + " must be integral", + ErrorCodes::ILLEGAL_COLUMN + }; + } + + return true; + } + + template + bool execute(Block & block, const size_t result, const DictionaryType * const dictionary, + const IColumn * const id_col_untyped) + { + if (const auto id_col = typeid_cast *>(id_col_untyped)) + { + const auto backend = new ColumnVector; + const auto array = new ColumnArray{backend}; + block.getByPosition(result).column = array; + + const auto & in = id_col->getData(); + const auto size = in.size(); + auto & out = backend->getData(); + auto & offsets = array->getOffsets(); + offsets.resize(size); + out.reserve(size * 4); + + for (const auto idx : ext::range(0, size)) + { + IDictionary::id_t cur = in[idx]; + while (cur) + { + out.push_back(cur); + cur = dictionary->toParent(cur); + } + offsets[idx] = out.size(); + }; + + return true; + } + else if (const auto id_col = typeid_cast *>(id_col_untyped)) + { + Array res; + + IDictionary::id_t cur = id_col->getData(); + while (cur) + { + res.push_back(static_cast::Type>(cur)); + cur = dictionary->toParent(cur); + } + + block.getByPosition(result).column = new ColumnConstArray{ + id_col->size(), + res, + new DataTypeArray{new DataTypeUInt64} + }; + + return true; + }; + + return false; + } + + const Dictionaries & dictionaries; +}; + + +class FunctionDictIsIn final : public IFunction +{ +public: + static constexpr auto name = "dictIsIn"; + + static IFunction * create(const Context & context) + { + return new FunctionDictIsIn{context.getDictionaries()}; + }; + + FunctionDictIsIn(const Dictionaries & dictionaries) : dictionaries(dictionaries) {} + + String getName() const override { return name; } + +private: + DataTypePtr getReturnType(const DataTypes & arguments) const override + { + if (arguments.size() != 3) + throw Exception{ + "Number of arguments for function " + getName() + " doesn't match: passed " + + toString(arguments.size()) + ", should be 3.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH + }; + + if (!typeid_cast(arguments[0].get())) + { + throw Exception{ + "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + const auto child_id_arg = arguments[1].get(); + if (!typeid_cast(child_id_arg) && + !typeid_cast(child_id_arg) && + !typeid_cast(child_id_arg) && + !typeid_cast(child_id_arg) && + !typeid_cast(child_id_arg) && + !typeid_cast(child_id_arg) && + !typeid_cast(child_id_arg) && + !typeid_cast(child_id_arg)) + { + throw Exception{ + "Illegal type " + child_id_arg->getName() + " of argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + const auto ancestor_id_arg = arguments[2].get(); + if (!typeid_cast(ancestor_id_arg) && + !typeid_cast(ancestor_id_arg) && + !typeid_cast(ancestor_id_arg) && + !typeid_cast(ancestor_id_arg) && + !typeid_cast(ancestor_id_arg) && + !typeid_cast(ancestor_id_arg) && + !typeid_cast(ancestor_id_arg) && + !typeid_cast(ancestor_id_arg)) + { + throw Exception{ + "Illegal type " + ancestor_id_arg->getName() + " of argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + return new DataTypeUInt8; + } + + void execute(Block & block, const ColumnNumbers & arguments, const size_t result) override + { + const auto dict_name_col = typeid_cast *>(block.getByPosition(arguments[0]).column.get()); + if (!dict_name_col) + throw Exception{ + "First argument of function " + getName() + " must be a constant string", + ErrorCodes::ILLEGAL_COLUMN + }; + + auto dict = dictionaries.getExternalDictionary(dict_name_col->getData()); + const auto dict_ptr = dict.get(); + + if (!dict->hasHierarchy()) + throw Exception{ + "Dictionary does not have a hierarchy", + ErrorCodes::UNSUPPORTED_METHOD + }; + + if (!executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr)) + throw Exception{ + "Unsupported dictionary type " + dict_ptr->getTypeName(), + ErrorCodes::UNKNOWN_TYPE + }; + } + + template + bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result, + const IDictionary * const dictionary) + { + const auto dict = typeid_cast(dictionary); + if (!dict) + return false; + + const auto child_id_col = block.getByPosition(arguments[1]).column.get(); + const auto ancestor_id_col = block.getByPosition(arguments[2]).column.get(); + if (!execute(block, result, dict, child_id_col, ancestor_id_col) && + !execute(block, result, dict, child_id_col, ancestor_id_col) && + !execute(block, result, dict, child_id_col, ancestor_id_col) && + !execute(block, result, dict, child_id_col, ancestor_id_col) && + !execute(block, result, dict, child_id_col, ancestor_id_col) && + !execute(block, result, dict, child_id_col, ancestor_id_col) && + !execute(block, result, dict, child_id_col, ancestor_id_col) && + !execute(block, result, dict, child_id_col, ancestor_id_col)) + { + throw Exception{ + "Illegal column " + child_id_col->getName() + + " of second argument of function " + getName(), + ErrorCodes::ILLEGAL_COLUMN + }; + } + + return true; + } + + template + bool execute(Block & block, const size_t result, const DictionaryType * const dictionary, + const IColumn * const child_id_col_untyped, const IColumn * const ancestor_id_col_untyped) + { + if (execute>(block, result, dictionary, child_id_col_untyped, ancestor_id_col_untyped) || + execute>(block, result, dictionary, child_id_col_untyped, ancestor_id_col_untyped)) + return true; + + return false; + } + + template + bool execute(Block & block, const size_t result, const DictionaryType * dictionary, + const IColumn * const child_id_col_untyped, const IColumn * const ancestor_id_col_untyped) + { + if (const auto child_id_col = typeid_cast(child_id_col_untyped)) + { + if (execute(block, result, dictionary, child_id_col, ancestor_id_col_untyped) || + execute(block, result, dictionary, child_id_col, ancestor_id_col_untyped) || + execute(block, result, dictionary, child_id_col, ancestor_id_col_untyped) || + execute(block, result, dictionary, child_id_col, ancestor_id_col_untyped) || + execute(block, result, dictionary, child_id_col, ancestor_id_col_untyped) || + execute(block, result, dictionary, child_id_col, ancestor_id_col_untyped) || + execute(block, result, dictionary, child_id_col, ancestor_id_col_untyped) || + execute(block, result, dictionary, child_id_col, ancestor_id_col_untyped)) + return true; + else + throw Exception{ + "Illegal column " + ancestor_id_col_untyped->getName() + + " of third argument of function " + getName(), + ErrorCodes::ILLEGAL_COLUMN + }; + } + + return false; + } + + template + bool execute(Block & block, const size_t result, const DictionaryType * const dictionary, + const ColumnVector * const child_id_col, const IColumn * const ancestor_id_col_untyped) + { + if (const auto ancestor_id_col = typeid_cast *>(ancestor_id_col_untyped)) + { + const auto out = new ColumnVector; + block.getByPosition(result).column = out; + + const auto & child_ids = child_id_col->getData(); + const auto & ancestor_ids = ancestor_id_col->getData(); + auto & data = out->getData(); + const auto size = child_id_col->size(); + data.resize(size); + + for (const auto idx : ext::range(0, size)) + data[idx] = dictionary->in(child_ids[idx], ancestor_ids[idx]); + + return true; + } + else if (const auto ancestor_id_col = typeid_cast *>(ancestor_id_col_untyped)) + { + const auto out = new ColumnVector; + block.getByPosition(result).column = out; + + const auto & child_ids = child_id_col->getData(); + const auto ancestor_id = ancestor_id_col->getData(); + auto & data = out->getData(); + const auto size = child_id_col->size(); + data.resize(size); + + for (const auto idx : ext::range(0, size)) + data[idx] = dictionary->in(child_ids[idx], ancestor_id); + + return true; + } + + return false; + } + + template + bool execute(Block & block, const size_t result, const DictionaryType * const dictionary, + const ColumnConst * const child_id_col, const IColumn * const ancestor_id_col_untyped) + { + if (const auto ancestor_id_col = typeid_cast *>(ancestor_id_col_untyped)) + { + const auto out = new ColumnVector; + block.getByPosition(result).column = out; + + const auto child_id = child_id_col->getData(); + const auto & ancestor_ids = ancestor_id_col->getData(); + auto & data = out->getData(); + const auto size = child_id_col->size(); + data.resize(size); + + for (const auto idx : ext::range(0, size)) + data[idx] = dictionary->in(child_id, ancestor_ids[idx]); + + return true; + } + else if (const auto ancestor_id_col = typeid_cast *>(ancestor_id_col_untyped)) + { + block.getByPosition(result).column = new ColumnConst{ + child_id_col->size(), + dictionary->in(child_id_col->getData(), ancestor_id_col->getData()) + }; + + return true; + } + + return false; + } + + const Dictionaries & dictionaries; +}; + + } diff --git a/dbms/include/DB/Interpreters/Dictionaries.h b/dbms/include/DB/Interpreters/Dictionaries.h index efa6b3bed57..54862706979 100644 --- a/dbms/include/DB/Interpreters/Dictionaries.h +++ b/dbms/include/DB/Interpreters/Dictionaries.h @@ -1,6 +1,10 @@ #pragma once +#include #include +#include +#include +#include #include @@ -16,6 +20,9 @@ namespace DB using Poco::SharedPtr; +class Context; +class IDictionary; + /// Словари Метрики, которые могут использоваться в функциях. class Dictionaries @@ -24,15 +31,23 @@ private: MultiVersion regions_hierarchies; MultiVersion tech_data_hierarchy; MultiVersion regions_names; + mutable std::mutex external_dictionaries_mutex; + std::unordered_map>> external_dictionaries; + std::unordered_map update_times; + std::mt19937 rnd_engine; + Context & context; /// Периодичность обновления справочников, в секундах. int reload_period; std::thread reloading_thread; + std::thread reloading_externals_thread; Poco::Event destroy; Logger * log; + Poco::Timestamp dictionaries_last_modified{0}; + void handleException() const { @@ -106,6 +121,9 @@ private: LOG_INFO(log, "Loaded dictionaries."); } + + void reloadExternals(); + /// Обновляет каждые reload_period секунд. void reloadPeriodically() { @@ -118,20 +136,35 @@ private: } } + void reloadExternalsPeriodically() + { + const auto check_period = 5 * 1000; + while (true) + { + if (destroy.tryWait(check_period)) + return; + + reloadExternals(); + } + } + public: /// Справочники будут обновляться в отдельном потоке, каждые reload_period секунд. - Dictionaries(int reload_period_ = 3600) - : reload_period(reload_period_), + Dictionaries(Context & context, int reload_period_ = 3600) + : context(context), reload_period(reload_period_), log(&Logger::get("Dictionaries")) { reloadImpl(); + reloadExternals(); reloading_thread = std::thread([this] { reloadPeriodically(); }); + reloading_externals_thread = std::thread{&Dictionaries::reloadExternalsPeriodically, this}; } ~Dictionaries() { destroy.set(); reloading_thread.join(); + reloading_externals_thread.join(); } MultiVersion::Version getRegionsHierarchies() const @@ -148,6 +181,19 @@ public: { return regions_names.get(); } + + MultiVersion::Version getExternalDictionary(const std::string & name) const + { + const std::lock_guard lock{external_dictionaries_mutex}; + const auto it = external_dictionaries.find(name); + if (it == std::end(external_dictionaries)) + throw Exception{ + "No such dictionary: " + name, + ErrorCodes::BAD_ARGUMENTS + }; + + return it->second->get(); + } }; } diff --git a/dbms/src/Functions/FunctionsDictionaries.cpp b/dbms/src/Functions/FunctionsDictionaries.cpp index 73960dbf13c..093bb0f532a 100644 --- a/dbms/src/Functions/FunctionsDictionaries.cpp +++ b/dbms/src/Functions/FunctionsDictionaries.cpp @@ -21,6 +21,19 @@ void registerFunctionsDictionaries(FunctionFactory & factory) factory.registerFunction(); factory.registerFunction(); factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); } - + } diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 16b710b9d7b..ec39314ba58 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -495,7 +495,11 @@ const Dictionaries & Context::getDictionaries() const Poco::ScopedLock lock(shared->mutex); if (!shared->dictionaries) - shared->dictionaries = new Dictionaries; + { + if (!this->global_context) + throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR); + shared->dictionaries = new Dictionaries{ *this->global_context }; + } return *shared->dictionaries; } diff --git a/dbms/src/Interpreters/Dictionaries.cpp b/dbms/src/Interpreters/Dictionaries.cpp new file mode 100644 index 00000000000..d93d0bd49c4 --- /dev/null +++ b/dbms/src/Interpreters/Dictionaries.cpp @@ -0,0 +1,112 @@ +#include +#include +#include +#include +#include + +namespace DB +{ + +void Dictionaries::reloadExternals() +{ + const auto config_path = Poco::Util::Application::instance().config().getString("dictionaries_config"); + if (config_path.empty()) + return; + + const auto last_modified = Poco::File{config_path}.getLastModified(); + if (last_modified > dictionaries_last_modified) + { + /// definitions of dictionaries may have changed, recreate all of them + dictionaries_last_modified = last_modified; + + const config_ptr_t config{new Poco::Util::XMLConfiguration{config_path}}; + + /// get all dictionaries' definitions + Poco::Util::AbstractConfiguration::Keys keys; + config->keys(keys); + + /// for each dictionary defined in xml config + for (const auto & key : keys) + { + try + { + if (0 != strncmp(key.data(), "dictionary", strlen("dictionary"))) + { + LOG_WARNING(log, "unknown node in dictionaries file: '" + key + "', 'dictionary'"); + continue; + } + + const auto & prefix = key + '.'; + + const auto & name = config->getString(prefix + "name"); + if (name.empty()) + { + LOG_WARNING(log, "dictionary name cannot be empty"); + continue; + } + + auto dict_ptr = DictionaryFactory::instance().create(name, *config, prefix, context); + if (!dict_ptr->isCached()) + { + const auto & lifetime = dict_ptr->getLifetime(); + std::uniform_int_distribution distribution{lifetime.min_sec, lifetime.max_sec}; + update_times[name] = std::chrono::system_clock::now() + + std::chrono::seconds{distribution(rnd_engine)}; + } + + auto it = external_dictionaries.find(name); + /// add new dictionary or update an existing version + if (it == std::end(external_dictionaries)) + { + const std::lock_guard lock{external_dictionaries_mutex}; + external_dictionaries.emplace(name, std::make_shared>(dict_ptr.release())); + } + else + it->second->set(dict_ptr.release()); + } + catch (...) + { + handleException(); + } + } + } + else + { + /// periodic update + for (auto & dictionary : external_dictionaries) + { + try + { + auto current = dictionary.second->get(); + /// update only non-cached dictionaries + if (!current->isCached()) + { + auto & update_time = update_times[current->getName()]; + + /// check that timeout has passed + if (std::chrono::system_clock::now() < update_time) + continue; + + /// check source modified + if (current->getSource()->isModified()) + { + /// create new version of dictionary + auto new_version = current->clone(); + dictionary.second->set(new_version.release()); + } + + /// calculate next update time + const auto & lifetime = current->getLifetime(); + std::uniform_int_distribution distribution{lifetime.min_sec, lifetime.max_sec}; + update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; + } + } + catch (...) + { + handleException(); + } + } + } +} + +} diff --git a/dbms/src/Interpreters/DictionaryFactory.cpp b/dbms/src/Interpreters/DictionaryFactory.cpp new file mode 100644 index 00000000000..2843a31d08d --- /dev/null +++ b/dbms/src/Interpreters/DictionaryFactory.cpp @@ -0,0 +1,50 @@ +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +DictionaryPtr DictionaryFactory::create(const std::string & name, Poco::Util::AbstractConfiguration & config, + const std::string & config_prefix, Context & context) const +{ + auto dict_struct = DictionaryStructure::fromConfig(config, config_prefix + "structure"); + + auto source_ptr = DictionarySourceFactory::instance().create( + config, config_prefix + "source.", dict_struct, context); + + const auto dict_lifetime = DictionaryLifetime::fromConfig(config, config_prefix + "lifetime"); + + const auto & layout_prefix = config_prefix + "layout."; + + if (config.has(layout_prefix + "flat")) + { + return ext::make_unique(name, dict_struct, std::move(source_ptr), dict_lifetime); + } + else if (config.has(layout_prefix + "hashed")) + { + return ext::make_unique(name, dict_struct, std::move(source_ptr), dict_lifetime); + } + else if (config.has(layout_prefix + "cache")) + { + const auto size = config.getInt(layout_prefix + "cache.size", 0); + if (size == 0) + throw Exception{ + "Dictionary of type 'cache' cannot have size of 0 bytes", + ErrorCodes::TOO_SMALL_BUFFER_SIZE + }; + + throw Exception{ + "Dictionary of type 'cache' is not yet implemented", + ErrorCodes::NOT_IMPLEMENTED + }; + } + + throw Exception{"No dictionary type specified", ErrorCodes::BAD_ARGUMENTS}; +}; + +} diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 234ab353c81..837830fa526 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -238,7 +238,7 @@ UsersConfigReloader::~UsersConfigReloader() quit = true; thread.join(); } - catch(...) + catch (...) { tryLogCurrentException("~UsersConfigReloader"); } diff --git a/dbms/src/Server/Server.h b/dbms/src/Server/Server.h index f29440f4b02..c2847e53464 100644 --- a/dbms/src/Server/Server.h +++ b/dbms/src/Server/Server.h @@ -49,7 +49,7 @@ public: std::unique_ptr olap_converter; protected: - void initialize(Application& self) + void initialize(Application & self) { Daemon::initialize(self); logger().information("starting up"); @@ -61,7 +61,7 @@ protected: Daemon::uninitialize(); } - int main(const std::vector& args); + int main(const std::vector & args); }; }