diff --git a/dbms/include/DB/Client/Connection.h b/dbms/include/DB/Client/Connection.h index 899113da061..2d968532101 100644 --- a/dbms/include/DB/Client/Connection.h +++ b/dbms/include/DB/Client/Connection.h @@ -4,6 +4,8 @@ #include +#include + #include #include #include @@ -33,6 +35,7 @@ typedef std::pair ExternalTableData; /// Вектор пар, описывающих таблицы typedef std::vector ExternalTablesData; + /** Соединение с сервером БД для использования в клиенте. * Как использовать - см. Core/Protocol.h * (Реализацию на стороне сервера - см. Server/TCPHandler.h) @@ -69,6 +72,12 @@ public: virtual ~Connection() {}; + /// Установить ограничитель сетевого трафика. Один ограничитель может использоваться одновременно для нескольких разных соединений. + void setThrottler(const ThrottlerPtr & throttler_) + { + throttler = throttler_; + } + /// Пакет, который может быть получен от сервера. struct Packet @@ -161,6 +170,11 @@ private: const DataTypeFactory & data_type_factory; + /** Если не nullptr, то используется, чтобы ограничить сетевой трафик. + * Учитывается только трафик при передаче блоков. Другие пакеты не учитываются. + */ + ThrottlerPtr throttler; + Poco::Timespan connect_timeout; Poco::Timespan receive_timeout; Poco::Timespan send_timeout; diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h index 47b4ade5a1f..4dec86bf592 100644 --- a/dbms/include/DB/Client/ParallelReplicas.h +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -1,85 +1,93 @@ #pragma once +#include #include #include + namespace DB { - /** Множество реплик одного шарда. + + +/** Для получения данных сразу из нескольких реплик (соединений) в рамках одного потока. + * В качестве вырожденного случая, может также работать с одним соединением. + * + * Интерфейс почти совпадает с Connection. + */ +class ParallelReplicas final : private boost::noncopyable +{ +public: + /// Принимает готовое соединение. + ParallelReplicas(Connection * connection_, Settings * settings_, ThrottlerPtr throttler_); + + /// Принимает пул, из которого нужно будет достать одно или несколько соединений. + ParallelReplicas(IConnectionPool * pool_, Settings * settings_, ThrottlerPtr throttler_); + + /// Отправить на реплики всё содержимое внешних таблиц. + void sendExternalTablesData(std::vector & data); + + /// Отправить запрос на реплики. + void sendQuery(const String & query, const String & query_id = "", + UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false); + + /// Получить пакет от какой-нибудь реплики. + Connection::Packet receivePacket(); + + /// Отменить запросы к репликам + void sendCancel(); + + /** На каждой реплике читать и пропускать все пакеты до EndOfStream или Exception. + * Возвращает EndOfStream, если не было получено никакого исключения. В противном + * случае возвращает последний полученный пакет типа Exception. */ - class ParallelReplicas final - { - public: - /// Принимает готовое соединение. - ParallelReplicas(Connection * connection_, Settings * settings_); + Connection::Packet drain(); - /// Принимает пул, из которого нужно будет достать одно или несколько соединений. - ParallelReplicas(IConnectionPool * pool_, Settings * settings_); + /// Получить адреса реплик в виде строки. + std::string dumpAddresses() const; - ParallelReplicas(const ParallelReplicas &) = delete; - ParallelReplicas & operator=(const ParallelReplicas &) = delete; + /// Возвращает количесто реплик. + size_t size() const { return replica_map.size(); } - /// Отправить на реплики всё содержимое внешних таблиц. - void sendExternalTablesData(std::vector & data); + /// Проверить, есть ли действительные реплики. + bool hasActiveReplicas() const { return active_replica_count > 0; } - /// Отправить запрос на реплики. - void sendQuery(const String & query, const String & query_id = "", - UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false); +private: + /// Реплики хэшированные по id сокета + using ReplicaMap = std::unordered_map; - /// Получить пакет от какой-нибудь реплики. - Connection::Packet receivePacket(); - /// Отменить запросы к репликам - void sendCancel(); + /// Зарегистрировать реплику. + void registerReplica(Connection * connection); - /** На каждой реплике читать и пропускать все пакеты до EndOfStream или Exception. - * Возвращает EndOfStream, если не было получено никакого исключения. В противном - * случае возвращает последний полученный пакет типа Exception. - */ - Connection::Packet drain(); + /// Получить реплику, на которой можно прочитать данные. + ReplicaMap::iterator getReplicaForReading(); - /// Получить адреса реплик в виде строки. - std::string dumpAddresses() const; + /** Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. + * Возвращает одну такую реплику, если она найдётся. + */ + ReplicaMap::iterator waitForReadEvent(); - /// Возвращает количесто реплик. - size_t size() const { return replica_map.size(); } + /// Пометить реплику как недействительную. + void invalidateReplica(ReplicaMap::iterator it); - /// Проверить, есть ли действительные реплики. - bool hasActiveReplicas() const { return active_replica_count > 0; } - private: - /// Реплики хэшированные по id сокета - using ReplicaMap = std::unordered_map; + Settings * settings; + ReplicaMap replica_map; - private: - /// Зарегистрировать реплику. - void registerReplica(Connection * connection); + /// Если не nullptr, то используется, чтобы ограничить сетевой трафик. + ThrottlerPtr throttler; - /// Получить реплику, на которой можно прочитать данные. - ReplicaMap::iterator getReplicaForReading(); + std::vector pool_entries; + ConnectionPool::Entry pool_entry; - /** Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. - * Возвращает одну такую реплику, если она найдётся. - */ - ReplicaMap::iterator waitForReadEvent(); + /// Текущее количество действительных соединений к репликам. + size_t active_replica_count; + /// Запрос выполняется параллельно на нескольких репликах. + bool supports_parallel_execution; + /// Отправили запрос + bool sent_query = false; + /// Отменили запрос + bool cancelled = false; +}; - /// Пометить реплику как недействительную. - void invalidateReplica(ReplicaMap::iterator it); - - private: - Settings * settings; - ReplicaMap replica_map; - - std::vector pool_entries; - ConnectionPool::Entry pool_entry; - - /// Текущее количество действительных соединений к репликам. - size_t active_replica_count; - /// Запрос выполняется параллельно на нескольких репликах. - bool supports_parallel_execution; - /// Отправили запрос - bool sent_query = false; - /// Отменили запрос - bool cancelled = false; - }; } diff --git a/dbms/include/DB/Common/Throttler.h b/dbms/include/DB/Common/Throttler.h new file mode 100644 index 00000000000..7f0e5848f0d --- /dev/null +++ b/dbms/include/DB/Common/Throttler.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include +#include + + +/** Позволяет ограничить скорость чего либо (в штуках в секунду) с помощью sleep. + * Особенности работы: + * - считается только средняя скорость, от момента первого вызова функции add; + * если были периоды с низкой скоростью, то в течение промежутка времени после них, скорость будет выше; + */ +class Throttler +{ +public: + Throttler(size_t max_speed_) : max_speed(max_speed_) {} + + void add(size_t amount) + { + size_t new_count; + UInt64 elapsed_ns; + + { + std::lock_guard lock(mutex); + + if (0 == count) + { + watch.start(); + elapsed_ns = 0; + } + else + elapsed_ns = watch.elapsed(); + + count += amount; + new_count = count; + } + + /// Сколько должно было бы пройти времени, если бы скорость была равна max_speed. + UInt64 desired_ns = new_count * 1000000000 / max_speed; + + if (desired_ns > elapsed_ns) + { + UInt64 sleep_ns = desired_ns - elapsed_ns; + timespec sleep_ts; + sleep_ts.tv_sec = sleep_ns / 1000000000; + sleep_ts.tv_nsec = sleep_ns % 1000000000; + nanosleep(&sleep_ts, nullptr); /// NOTE Завершается раньше в случае сигнала. Это считается нормальным. + } + } + +private: + size_t max_speed; + size_t count = 0; + Stopwatch watch {CLOCK_MONOTONIC_COARSE}; + std::mutex mutex; +}; + + +typedef std::shared_ptr ThrottlerPtr; diff --git a/dbms/include/DB/Common/isLocalAddress.h b/dbms/include/DB/Common/isLocalAddress.h new file mode 100644 index 00000000000..62318b34ba6 --- /dev/null +++ b/dbms/include/DB/Common/isLocalAddress.h @@ -0,0 +1,27 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +inline bool isLocalAddress(const Poco::Net::SocketAddress & address) +{ + const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); + static auto interfaces = Poco::Net::NetworkInterface::list(); + + if (clickhouse_port == address.port()) + { + return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(), + [&] (const Poco::Net::NetworkInterface & interface) { + return interface.address() == address.host(); + }); + } + + return false; +} + +} diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 5d37b757df3..0acafa906c2 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -32,29 +33,29 @@ private: public: /// Принимает готовое соединение. - RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, + RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, const Context & context = getDefaultContext()) - : connection(&connection_), query(query_), external_tables(external_tables_), stage(stage_), context(context) + : connection(&connection_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context) { init(settings_); } /// Принимает готовое соединение. Захватывает владение соединением из пула. - RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_, + RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, const Context & context = getDefaultContext()) - : pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), + : pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context) { init(settings_); } /// Принимает пул, из которого нужно будет достать одно или несколько соединений. - RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_, + RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, const Context & context = getDefaultContext()) - : pool(pool_), query(query_), external_tables(external_tables_), stage(stage_), context(context) + : pool(pool_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context) { init(settings_); } @@ -257,9 +258,9 @@ protected: { Settings * parallel_replicas_settings = send_settings ? &settings : nullptr; if (connection != nullptr) - parallel_replicas = ext::make_unique(connection, parallel_replicas_settings); + parallel_replicas = std::make_unique(connection, parallel_replicas_settings, throttler); else - parallel_replicas = ext::make_unique(pool, parallel_replicas_settings); + parallel_replicas = std::make_unique(pool, parallel_replicas_settings, throttler); } /// Возвращает true, если запрос отправлен, а ещё не выполнен. @@ -290,6 +291,8 @@ private: const String query; bool send_settings; Settings settings; + /// Если не nullptr, то используется, чтобы ограничить сетевой трафик. + ThrottlerPtr throttler; /// Временные таблицы, которые необходимо переслать на удаленные сервера. Tables external_tables; QueryProcessingStage::Enum stage; diff --git a/dbms/include/DB/Dictionaries/CacheDictionary.h b/dbms/include/DB/Dictionaries/CacheDictionary.h new file mode 100644 index 00000000000..6959e99682d --- /dev/null +++ b/dbms/include/DB/Dictionaries/CacheDictionary.h @@ -0,0 +1,137 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +class CacheDictionary final : public IDictionary +{ +public: + CacheDictionary(const std::string & name, const DictionaryStructure & dict_struct, + DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, + const std::size_t size) + : name{name}, dict_struct(dict_struct), + source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime), + size{size} + { + if (!this->source_ptr->supportsSelectiveLoad()) + throw Exception{ + "Source cannot be used with CacheDictionary", + ErrorCodes::UNSUPPORTED_METHOD + }; + } + + CacheDictionary(const CacheDictionary & other) + : CacheDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.size} + {} + + std::string getName() const override { return name; } + + std::string getTypeName() const override { return "CacheDictionary"; } + + bool isCached() const override { return true; } + + DictionaryPtr clone() const override { return std::make_unique(*this); } + + const IDictionarySource * getSource() const override { return source_ptr.get(); } + + const DictionaryLifetime & getLifetime() const override { return dict_lifetime; } + + bool hasHierarchy() const override { return false; } + + id_t toParent(const id_t id) const override { return 0; } + +#define DECLARE_SAFE_GETTER(TYPE, NAME, LC_TYPE) \ + TYPE get##NAME(const std::string & attribute_name, const id_t id) const override\ + {\ + return {};\ + } + DECLARE_SAFE_GETTER(UInt8, UInt8, uint8) + DECLARE_SAFE_GETTER(UInt16, UInt16, uint16) + DECLARE_SAFE_GETTER(UInt32, UInt32, uint32) + DECLARE_SAFE_GETTER(UInt64, UInt64, uint64) + DECLARE_SAFE_GETTER(Int8, Int8, int8) + DECLARE_SAFE_GETTER(Int16, Int16, int16) + DECLARE_SAFE_GETTER(Int32, Int32, int32) + DECLARE_SAFE_GETTER(Int64, Int64, int64) + DECLARE_SAFE_GETTER(Float32, Float32, float32) + DECLARE_SAFE_GETTER(Float64, Float64, float64) + DECLARE_SAFE_GETTER(StringRef, String, string) +#undef DECLARE_SAFE_GETTER + + std::size_t getAttributeIndex(const std::string & attribute_name) const override + { + return {}; + } + +#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\ + bool is##NAME(const std::size_t attribute_idx) const override\ + {\ + return true;\ + } + DECLARE_TYPE_CHECKER(UInt8, uint8) + DECLARE_TYPE_CHECKER(UInt16, uint16) + DECLARE_TYPE_CHECKER(UInt32, uint32) + DECLARE_TYPE_CHECKER(UInt64, uint64) + DECLARE_TYPE_CHECKER(Int8, int8) + DECLARE_TYPE_CHECKER(Int16, int16) + DECLARE_TYPE_CHECKER(Int32, int32) + DECLARE_TYPE_CHECKER(Int64, int64) + DECLARE_TYPE_CHECKER(Float32, float32) + DECLARE_TYPE_CHECKER(Float64, float64) + DECLARE_TYPE_CHECKER(String, string) +#undef DECLARE_TYPE_CHECKER + +#define DECLARE_UNSAFE_GETTER(TYPE, NAME, LC_NAME)\ + TYPE get##NAME##Unsafe(const std::size_t attribute_idx, const id_t id) const override\ + {\ + return {};\ + } + DECLARE_UNSAFE_GETTER(UInt8, UInt8, uint8) + DECLARE_UNSAFE_GETTER(UInt16, UInt16, uint16) + DECLARE_UNSAFE_GETTER(UInt32, UInt32, uint32) + DECLARE_UNSAFE_GETTER(UInt64, UInt64, uint64) + DECLARE_UNSAFE_GETTER(Int8, Int8, int8) + DECLARE_UNSAFE_GETTER(Int16, Int16, int16) + DECLARE_UNSAFE_GETTER(Int32, Int32, int32) + DECLARE_UNSAFE_GETTER(Int64, Int64, int64) + DECLARE_UNSAFE_GETTER(Float32, Float32, float32) + DECLARE_UNSAFE_GETTER(Float64, Float64, float64) + DECLARE_UNSAFE_GETTER(StringRef, String, string) +#undef DECLARE_UNSAFE_GETTER + +private: + const std::string name; + const DictionaryStructure dict_struct; + const DictionarySourcePtr source_ptr; + const DictionaryLifetime dict_lifetime; + const std::size_t size; + + union item + { + UInt8 uint8_value; + UInt16 uint16_value; + UInt32 uint32_value; + UInt64 uint64_value; + Int8 int8_value; + Int16 int16_value; + Int32 int32_value; + Int64 int64_value; + Float32 float32_value; + Float64 float64_value; + StringRef string_value; + }; + + struct cell + { + id_t id; + std::vector attrs; + }; + + std::vector cells; +}; + +} diff --git a/dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h b/dbms/include/DB/Dictionaries/ClickHouseDictionarySource.h similarity index 64% rename from dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h rename to dbms/include/DB/Dictionaries/ClickHouseDictionarySource.h index 10846fdf26e..d56f9472e2d 100644 --- a/dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h +++ b/dbms/include/DB/Dictionaries/ClickHouseDictionarySource.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -13,44 +14,50 @@ namespace DB const auto max_connections = 1; -class ClickhouseDictionarySource final : public IDictionarySource +/** Allows loading dictionaries from local or remote ClickHouse instance +* @todo use ConnectionPoolWithFailover +* @todo invent a way to keep track of source modifications +*/ +class ClickHouseDictionarySource final : public IDictionarySource { - static const auto max_block_size = 8192; - public: - ClickhouseDictionarySource(const Poco::Util::AbstractConfiguration & config, + ClickHouseDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block, Context & context) - : host{config.getString(config_prefix + "host")}, - port(config.getInt(config_prefix + "port")), - user{config.getString(config_prefix + "user", "")}, - password{config.getString(config_prefix + "password", "")}, - db{config.getString(config_prefix + "db", "")}, - table{config.getString(config_prefix + "table")}, + : host{config.getString(config_prefix + ".host")}, + port(config.getInt(config_prefix + ".port")), + user{config.getString(config_prefix + ".user", "")}, + password{config.getString(config_prefix + ".password", "")}, + db{config.getString(config_prefix + ".db", "")}, + table{config.getString(config_prefix + ".table")}, sample_block{sample_block}, context(context), - is_local{isLocal(host, port)}, - pool{is_local ? nullptr : ext::make_unique( + is_local{isLocalAddress({ host, port })}, + pool{is_local ? nullptr : std::make_unique( max_connections, host, port, db, user, password, context.getDataTypeFactory(), - "ClickhouseDictionarySource") + "ClickHouseDictionarySource") }, load_all_query{composeLoadAllQuery(sample_block, table)} {} - ClickhouseDictionarySource(const ClickhouseDictionarySource & other) + /// copy-constructor is provided in order to support cloneability + ClickHouseDictionarySource(const ClickHouseDictionarySource & other) : host{other.host}, port{other.port}, user{other.user}, password{other.password}, db{other.db}, table{other.db}, sample_block{other.sample_block}, context(other.context), is_local{other.is_local}, - pool{is_local ? nullptr : ext::make_unique( + pool{is_local ? nullptr : std::make_unique( max_connections, host, port, db, user, password, context.getDataTypeFactory(), - "ClickhouseDictionarySource")}, + "ClickHouseDictionarySource")}, load_all_query{other.load_all_query} {} BlockInputStreamPtr loadAll() override { + /** Query to local ClickHouse is marked internal in order to avoid + * the necessity of holding process_list_element shared pointer. + */ if (is_local) - return executeQuery(load_all_query, context).in; + return executeQuery(load_all_query, context, true).in; return new RemoteBlockInputStream{pool.get(), load_all_query, nullptr}; } @@ -70,12 +77,13 @@ public: }; } - /// @todo check update time somehow bool isModified() const override { return true; } + bool supportsSelectiveLoad() const override { return true; } - DictionarySourcePtr clone() const override { return ext::make_unique(*this); } + DictionarySourcePtr clone() const override { return std::make_unique(*this); } private: + /// @todo escape table and column names static std::string composeLoadAllQuery(const Block & block, const std::string & table) { std::string query{"SELECT "}; diff --git a/dbms/include/DB/Dictionaries/DictionarySourceFactory.h b/dbms/include/DB/Dictionaries/DictionarySourceFactory.h index e46c8e8fe5c..810f87e5d46 100644 --- a/dbms/include/DB/Dictionaries/DictionarySourceFactory.h +++ b/dbms/include/DB/Dictionaries/DictionarySourceFactory.h @@ -3,11 +3,11 @@ #include #include #include -#include -#include +#include +#include #include #include -#include +#include namespace DB { @@ -38,6 +38,7 @@ Block createSampleBlock(const DictionaryStructure & dict_struct, const Context & } +/// creates IDictionarySource instance from config and DictionaryStructure class DictionarySourceFactory : public Singleton { public: @@ -46,25 +47,38 @@ public: const DictionaryStructure & dict_struct, Context & context) const { + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_prefix, keys); + if (keys.size() != 1) + throw Exception{ + "Element dictionary.source should have exactly one child element", + ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG + }; + auto sample_block = createSampleBlock(dict_struct, context); - if (config.has(config_prefix + "file")) + const auto & source_type = keys.front(); + + if ("file" == source_type) { - const auto & filename = config.getString(config_prefix + "file.path"); - const auto & format = config.getString(config_prefix + "file.format"); - return ext::make_unique(filename, format, sample_block, context); + const auto filename = config.getString(config_prefix + ".file.path"); + const auto format = config.getString(config_prefix + ".file.format"); + return std::make_unique(filename, format, sample_block, context); } - else if (config.has(config_prefix + "mysql")) + else if ("mysql" == source_type) { - return ext::make_unique(config, config_prefix + "mysql", sample_block, context); + return std::make_unique(config, config_prefix + ".mysql", sample_block); } - else if (config.has(config_prefix + "clickhouse")) + else if ("clickhouse" == source_type) { - return ext::make_unique(config, config_prefix + "clickhouse.", + return std::make_unique(config, config_prefix + ".clickhouse", sample_block, context); } - throw Exception{"unsupported source type"}; + throw Exception{ + "Unknown dictionary source type: " + source_type, + ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG + }; } }; diff --git a/dbms/include/DB/Dictionaries/DictionaryStructure.h b/dbms/include/DB/Dictionaries/DictionaryStructure.h index cd552784551..15ed7946e1f 100644 --- a/dbms/include/DB/Dictionaries/DictionaryStructure.h +++ b/dbms/include/DB/Dictionaries/DictionaryStructure.h @@ -9,7 +9,7 @@ namespace DB { -enum class attribute_type +enum class AttributeType { uint8, uint16, @@ -24,20 +24,20 @@ enum class attribute_type string }; -inline attribute_type getAttributeTypeByName(const std::string & type) +inline AttributeType getAttributeTypeByName(const std::string & type) { - static const std::unordered_map dictionary{ - { "UInt8", attribute_type::uint8 }, - { "UInt16", attribute_type::uint16 }, - { "UInt32", attribute_type::uint32 }, - { "UInt64", attribute_type::uint64 }, - { "Int8", attribute_type::int8 }, - { "Int16", attribute_type::int16 }, - { "Int32", attribute_type::int32 }, - { "Int64", attribute_type::int64 }, - { "Float32", attribute_type::float32 }, - { "Float64", attribute_type::float64 }, - { "String", attribute_type::string }, + static const std::unordered_map dictionary{ + { "UInt8", AttributeType::uint8 }, + { "UInt16", AttributeType::uint16 }, + { "UInt32", AttributeType::uint32 }, + { "UInt64", AttributeType::uint64 }, + { "Int8", AttributeType::int8 }, + { "Int16", AttributeType::int16 }, + { "Int32", AttributeType::int32 }, + { "Int64", AttributeType::int64 }, + { "Float32", AttributeType::float32 }, + { "Float64", AttributeType::float64 }, + { "String", AttributeType::string }, }; const auto it = dictionary.find(type); @@ -50,44 +50,53 @@ inline attribute_type getAttributeTypeByName(const std::string & type) }; } -inline std::string toString(const attribute_type type) +inline std::string toString(const AttributeType type) { switch (type) { - case attribute_type::uint8: return "UInt8"; - case attribute_type::uint16: return "UInt16"; - case attribute_type::uint32: return "UInt32"; - case attribute_type::uint64: return "UInt64"; - case attribute_type::int8: return "Int8"; - case attribute_type::int16: return "Int16"; - case attribute_type::int32: return "Int32"; - case attribute_type::int64: return "Int64"; - case attribute_type::float32: return "Float32"; - case attribute_type::float64: return "Float64"; - case attribute_type::string: return "String"; + case AttributeType::uint8: return "UInt8"; + case AttributeType::uint16: return "UInt16"; + case AttributeType::uint32: return "UInt32"; + case AttributeType::uint64: return "UInt64"; + case AttributeType::int8: return "Int8"; + case AttributeType::int16: return "Int16"; + case AttributeType::int32: return "Int32"; + case AttributeType::int64: return "Int64"; + case AttributeType::float32: return "Float32"; + case AttributeType::float64: return "Float64"; + case AttributeType::string: return "String"; } throw Exception{ - "Unknown attribute_type " + toString(type), + "Unknown attribute_type " + toString(static_cast(type)), ErrorCodes::ARGUMENT_OUT_OF_BOUND }; } +/// Min and max lifetimes for a dictionary or it's entry struct DictionaryLifetime { std::uint64_t min_sec; std::uint64_t max_sec; - static DictionaryLifetime fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) + DictionaryLifetime(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) { const auto & lifetime_min_key = config_prefix + ".min"; const auto has_min = config.has(lifetime_min_key); - const std::uint64_t min_update_time = has_min ? config.getInt(lifetime_min_key) : config.getInt(config_prefix); - const std::uint64_t max_update_time = has_min ? config.getInt(config_prefix + ".max") : min_update_time; - return { min_update_time, max_update_time }; + + this->min_sec = has_min ? config.getInt(lifetime_min_key) : config.getInt(config_prefix); + this->max_sec = has_min ? config.getInt(config_prefix + ".max") : this->min_sec; } }; +/** Holds the description of a single dictionary attribute: +* - name, used for lookup into dictionary and source; +* - type, used in conjunction with DataTypeFactory and getAttributeTypeByname; +* - null_value, used as a default value for non-existent entries in the dictionary, +* decimal representation for numeric attributes; +* - hierarchical, whether this attribute defines a hierarchy; +* - injective, whether the mapping to parent is injective (can be used for optimization of GROUP BY?) +*/ struct DictionaryAttribute { std::string name; @@ -97,34 +106,34 @@ struct DictionaryAttribute bool injective; }; +/// Name of identifier plus list of attributes struct DictionaryStructure { std::string id_name; std::vector attributes; - static DictionaryStructure fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) + DictionaryStructure(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) + : id_name{config.getString(config_prefix + ".id.name")} { - const auto & id_name = config.getString(config_prefix + ".id.name"); if (id_name.empty()) throw Exception{ "No 'id' specified for dictionary", ErrorCodes::BAD_ARGUMENTS }; - DictionaryStructure result{id_name}; - Poco::Util::AbstractConfiguration::Keys keys; config.keys(config_prefix, keys); auto has_hierarchy = false; + for (const auto & key : keys) { if (0 != strncmp(key.data(), "attribute", strlen("attribute"))) continue; - const auto & prefix = config_prefix + '.' + key + '.'; - const auto & name = config.getString(prefix + "name"); - const auto & type = config.getString(prefix + "type"); - const auto & null_value = config.getString(prefix + "null_value"); + const auto prefix = config_prefix + '.' + key + '.'; + const auto name = config.getString(prefix + "name"); + const auto type = config.getString(prefix + "type"); + const auto null_value = config.getString(prefix + "null_value"); const auto hierarchical = config.getBool(prefix + "hierarchical", false); const auto injective = config.getBool(prefix + "injective", false); if (name.empty() || type.empty()) @@ -141,16 +150,16 @@ struct DictionaryStructure has_hierarchy = has_hierarchy || hierarchical; - result.attributes.emplace_back(DictionaryAttribute{name, type, null_value, hierarchical, injective}); + attributes.emplace_back(DictionaryAttribute{ + name, type, null_value, hierarchical, injective + }); } - if (result.attributes.empty()) + if (attributes.empty()) throw Exception{ "Dictionary has no attributes defined", ErrorCodes::BAD_ARGUMENTS }; - - return result; } }; diff --git a/dbms/include/DB/Dictionaries/FileDictionarySource.h b/dbms/include/DB/Dictionaries/FileDictionarySource.h index ac90f103727..52659058b36 100644 --- a/dbms/include/DB/Dictionaries/FileDictionarySource.h +++ b/dbms/include/DB/Dictionaries/FileDictionarySource.h @@ -10,13 +10,14 @@ namespace DB { +/// Allows loading dictionaries from a file with given format, does not support "random access" class FileDictionarySource final : public IDictionarySource { static const auto max_block_size = 8192; public: FileDictionarySource(const std::string & filename, const std::string & format, Block & sample_block, - Context & context) + const Context & context) : filename{filename}, format{format}, sample_block{sample_block}, context(context), last_modification{getLastModification()} {} @@ -29,7 +30,7 @@ public: BlockInputStreamPtr loadAll() override { - auto in_ptr = ext::make_unique(filename); + auto in_ptr = std::make_unique(filename); auto stream = context.getFormatFactory().getInput( format, *in_ptr, sample_block, max_block_size, context.getDataTypeFactory()); last_modification = getLastModification(); @@ -54,8 +55,9 @@ public: } bool isModified() const override { return getLastModification() > last_modification; } + bool supportsSelectiveLoad() const override { return false; } - DictionarySourcePtr clone() const override { return ext::make_unique(*this); } + DictionarySourcePtr clone() const override { return std::make_unique(*this); } private: Poco::Timestamp getLastModification() const { return Poco::File{filename}.getLastModified(); } @@ -63,7 +65,7 @@ private: const std::string filename; const std::string format; Block sample_block; - Context & context; + const Context & context; Poco::Timestamp last_modification; }; diff --git a/dbms/include/DB/Dictionaries/FlatDictionary.h b/dbms/include/DB/Dictionaries/FlatDictionary.h index c0604ccd701..3b018a061ec 100644 --- a/dbms/include/DB/Dictionaries/FlatDictionary.h +++ b/dbms/include/DB/Dictionaries/FlatDictionary.h @@ -2,10 +2,8 @@ #include #include -#include #include #include -#include #include namespace DB @@ -36,9 +34,9 @@ public: bool isCached() const override { return false; } - DictionaryPtr clone() const override { return ext::make_unique(*this); } + DictionaryPtr clone() const override { return std::make_unique(*this); } - const IDictionarySource * const getSource() const override { return source_ptr.get(); } + const IDictionarySource * getSource() const override { return source_ptr.get(); } const DictionaryLifetime & getLifetime() const override { return dict_lifetime; } @@ -50,18 +48,18 @@ public: switch (hierarchical_attribute->type) { - case attribute_type::uint8: return id < attr->uint8_array->size() ? (*attr->uint8_array)[id] : attr->uint8_null_value; - case attribute_type::uint16: return id < attr->uint16_array->size() ? (*attr->uint16_array)[id] : attr->uint16_null_value; - case attribute_type::uint32: return id < attr->uint32_array->size() ? (*attr->uint32_array)[id] : attr->uint32_null_value; - case attribute_type::uint64: return id < attr->uint64_array->size() ? (*attr->uint64_array)[id] : attr->uint64_null_value; - case attribute_type::int8: return id < attr->int8_array->size() ? (*attr->int8_array)[id] : attr->int8_null_value; - case attribute_type::int16: return id < attr->int16_array->size() ? (*attr->int16_array)[id] : attr->int16_null_value; - case attribute_type::int32: return id < attr->int32_array->size() ? (*attr->int32_array)[id] : attr->int32_null_value; - case attribute_type::int64: return id < attr->int64_array->size() ? (*attr->int64_array)[id] : attr->int64_null_value; - case attribute_type::float32: - case attribute_type::float64: - case attribute_type::string: - break; + case AttributeType::uint8: return id < attr->uint8_array->size() ? (*attr->uint8_array)[id] : attr->uint8_null_value; + case AttributeType::uint16: return id < attr->uint16_array->size() ? (*attr->uint16_array)[id] : attr->uint16_null_value; + case AttributeType::uint32: return id < attr->uint32_array->size() ? (*attr->uint32_array)[id] : attr->uint32_null_value; + case AttributeType::uint64: return id < attr->uint64_array->size() ? (*attr->uint64_array)[id] : attr->uint64_null_value; + case AttributeType::int8: return id < attr->int8_array->size() ? (*attr->int8_array)[id] : attr->int8_null_value; + case AttributeType::int16: return id < attr->int16_array->size() ? (*attr->int16_array)[id] : attr->int16_null_value; + case AttributeType::int32: return id < attr->int32_array->size() ? (*attr->int32_array)[id] : attr->int32_null_value; + case AttributeType::int64: return id < attr->int64_array->size() ? (*attr->int64_array)[id] : attr->int64_null_value; + case AttributeType::float32: + case AttributeType::float64: + case AttributeType::string: + break; } throw Exception{ @@ -75,7 +73,7 @@ public: {\ const auto idx = getAttributeIndex(attribute_name);\ const auto & attribute = attributes[idx];\ - if (attribute.type != attribute_type::LC_TYPE)\ + if (attribute.type != AttributeType::LC_TYPE)\ throw Exception{\ "Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\ ErrorCodes::TYPE_MISMATCH\ @@ -112,7 +110,7 @@ public: #define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\ bool is##NAME(const std::size_t attribute_idx) const override\ {\ - return attributes[attribute_idx].type == attribute_type::LC_NAME;\ + return attributes[attribute_idx].type == AttributeType::LC_NAME;\ } DECLARE_TYPE_CHECKER(UInt8, uint8) DECLARE_TYPE_CHECKER(UInt16, uint16) @@ -151,7 +149,7 @@ public: private: struct attribute_t { - attribute_type type; + AttributeType type; UInt8 uint8_null_value; UInt16 uint16_null_value; UInt32 uint32_null_value; @@ -195,6 +193,7 @@ private: void loadData() { auto stream = source_ptr->loadAll(); + stream->readPrefix(); while (const auto block = stream->read()) { @@ -209,65 +208,67 @@ private: setAttributeValue(attribute, id_column[row_idx].get(), attribute_column[row_idx]); } } + + stream->readSuffix(); } - attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value) + attribute_t createAttributeWithType(const AttributeType type, const std::string & null_value) { attribute_t attr{type}; switch (type) { - case attribute_type::uint8: + case AttributeType::uint8: attr.uint8_null_value = DB::parse(null_value); attr.uint8_array.reset(new PODArray); attr.uint8_array->resize_fill(initial_array_size, attr.uint8_null_value); break; - case attribute_type::uint16: + case AttributeType::uint16: attr.uint16_null_value = DB::parse(null_value); attr.uint16_array.reset(new PODArray); attr.uint16_array->resize_fill(initial_array_size, attr.uint16_null_value); break; - case attribute_type::uint32: + case AttributeType::uint32: attr.uint32_null_value = DB::parse(null_value); attr.uint32_array.reset(new PODArray); attr.uint32_array->resize_fill(initial_array_size, attr.uint32_null_value); break; - case attribute_type::uint64: + case AttributeType::uint64: attr.uint64_null_value = DB::parse(null_value); attr.uint64_array.reset(new PODArray); attr.uint64_array->resize_fill(initial_array_size, attr.uint64_null_value); break; - case attribute_type::int8: + case AttributeType::int8: attr.int8_null_value = DB::parse(null_value); attr.int8_array.reset(new PODArray); attr.int8_array->resize_fill(initial_array_size, attr.int8_null_value); break; - case attribute_type::int16: + case AttributeType::int16: attr.int16_null_value = DB::parse(null_value); attr.int16_array.reset(new PODArray); attr.int16_array->resize_fill(initial_array_size, attr.int16_null_value); break; - case attribute_type::int32: + case AttributeType::int32: attr.int32_null_value = DB::parse(null_value); attr.int32_array.reset(new PODArray); attr.int32_array->resize_fill(initial_array_size, attr.int32_null_value); break; - case attribute_type::int64: + case AttributeType::int64: attr.int64_null_value = DB::parse(null_value); attr.int64_array.reset(new PODArray); attr.int64_array->resize_fill(initial_array_size, attr.int64_null_value); break; - case attribute_type::float32: + case AttributeType::float32: attr.float32_null_value = DB::parse(null_value); attr.float32_array.reset(new PODArray); attr.float32_array->resize_fill(initial_array_size, attr.float32_null_value); break; - case attribute_type::float64: + case AttributeType::float64: attr.float64_null_value = DB::parse(null_value); attr.float64_array.reset(new PODArray); attr.float64_array->resize_fill(initial_array_size, attr.float64_null_value); break; - case attribute_type::string: + case AttributeType::string: attr.string_null_value = null_value; attr.string_arena.reset(new Arena); attr.string_array.reset(new PODArray); @@ -288,77 +289,77 @@ private: switch (attribute.type) { - case attribute_type::uint8: + case AttributeType::uint8: { if (id >= attribute.uint8_array->size()) attribute.uint8_array->resize_fill(id, attribute.uint8_null_value); (*attribute.uint8_array)[id] = value.get(); break; } - case attribute_type::uint16: + case AttributeType::uint16: { if (id >= attribute.uint16_array->size()) attribute.uint16_array->resize_fill(id, attribute.uint16_null_value); (*attribute.uint16_array)[id] = value.get(); break; } - case attribute_type::uint32: + case AttributeType::uint32: { if (id >= attribute.uint32_array->size()) attribute.uint32_array->resize_fill(id, attribute.uint32_null_value); (*attribute.uint32_array)[id] = value.get(); break; } - case attribute_type::uint64: + case AttributeType::uint64: { if (id >= attribute.uint64_array->size()) attribute.uint64_array->resize_fill(id, attribute.uint64_null_value); (*attribute.uint64_array)[id] = value.get(); break; } - case attribute_type::int8: + case AttributeType::int8: { if (id >= attribute.int8_array->size()) attribute.int8_array->resize_fill(id, attribute.int8_null_value); (*attribute.int8_array)[id] = value.get(); break; } - case attribute_type::int16: + case AttributeType::int16: { if (id >= attribute.int16_array->size()) attribute.int16_array->resize_fill(id, attribute.int16_null_value); (*attribute.int16_array)[id] = value.get(); break; } - case attribute_type::int32: + case AttributeType::int32: { if (id >= attribute.int32_array->size()) attribute.int32_array->resize_fill(id, attribute.int32_null_value); (*attribute.int32_array)[id] = value.get(); break; } - case attribute_type::int64: + case AttributeType::int64: { if (id >= attribute.int64_array->size()) attribute.int64_array->resize_fill(id, attribute.int64_null_value); (*attribute.int64_array)[id] = value.get(); break; } - case attribute_type::float32: + case AttributeType::float32: { if (id >= attribute.float32_array->size()) attribute.float32_array->resize_fill(id, attribute.float32_null_value); (*attribute.float32_array)[id] = value.get(); break; } - case attribute_type::float64: + case AttributeType::float64: { if (id >= attribute.float64_array->size()) attribute.float64_array->resize_fill(id, attribute.float64_null_value); (*attribute.float64_array)[id] = value.get(); break; } - case attribute_type::string: + case AttributeType::string: { if (id >= attribute.string_array->size()) attribute.string_array->resize_fill(id, attribute.string_null_value); diff --git a/dbms/include/DB/Dictionaries/HashedDictionary.h b/dbms/include/DB/Dictionaries/HashedDictionary.h index 4c83f6395ca..34015f36087 100644 --- a/dbms/include/DB/Dictionaries/HashedDictionary.h +++ b/dbms/include/DB/Dictionaries/HashedDictionary.h @@ -2,11 +2,10 @@ #include #include -#include #include #include #include -#include +#include namespace DB { @@ -33,9 +32,9 @@ public: bool isCached() const override { return false; } - DictionaryPtr clone() const override { return ext::make_unique(*this); } + DictionaryPtr clone() const override { return std::make_unique(*this); } - const IDictionarySource * const getSource() const override { return source_ptr.get(); } + const IDictionarySource * getSource() const override { return source_ptr.get(); } const DictionaryLifetime & getLifetime() const override { return dict_lifetime; } @@ -47,49 +46,49 @@ public: switch (hierarchical_attribute->type) { - case attribute_type::uint8: + case AttributeType::uint8: { const auto it = attr->uint8_map->find(id); return it != attr->uint8_map->end() ? it->second : attr->uint8_null_value; } - case attribute_type::uint16: + case AttributeType::uint16: { const auto it = attr->uint16_map->find(id); return it != attr->uint16_map->end() ? it->second : attr->uint16_null_value; } - case attribute_type::uint32: + case AttributeType::uint32: { const auto it = attr->uint32_map->find(id); return it != attr->uint32_map->end() ? it->second : attr->uint32_null_value; } - case attribute_type::uint64: + case AttributeType::uint64: { const auto it = attr->uint64_map->find(id); return it != attr->uint64_map->end() ? it->second : attr->uint64_null_value; } - case attribute_type::int8: + case AttributeType::int8: { const auto it = attr->int8_map->find(id); return it != attr->int8_map->end() ? it->second : attr->int8_null_value; } - case attribute_type::int16: + case AttributeType::int16: { const auto it = attr->int16_map->find(id); return it != attr->int16_map->end() ? it->second : attr->int16_null_value; } - case attribute_type::int32: + case AttributeType::int32: { const auto it = attr->int32_map->find(id); return it != attr->int32_map->end() ? it->second : attr->int32_null_value; } - case attribute_type::int64: + case AttributeType::int64: { const auto it = attr->int64_map->find(id); return it != attr->int64_map->end() ? it->second : attr->int64_null_value; } - case attribute_type::float32: - case attribute_type::float64: - case attribute_type::string: + case AttributeType::float32: + case AttributeType::float64: + case AttributeType::string: break; }; @@ -104,7 +103,7 @@ public: {\ const auto idx = getAttributeIndex(attribute_name);\ const auto & attribute = attributes[idx];\ - if (attribute.type != attribute_type::LC_TYPE)\ + if (attribute.type != AttributeType::LC_TYPE)\ throw Exception{\ "Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\ ErrorCodes::TYPE_MISMATCH\ @@ -142,7 +141,7 @@ public: #define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\ bool is##NAME(const std::size_t attribute_idx) const override\ {\ - return attributes[attribute_idx].type == attribute_type::LC_NAME;\ + return attributes[attribute_idx].type == AttributeType::LC_NAME;\ } DECLARE_TYPE_CHECKER(UInt8, uint8) DECLARE_TYPE_CHECKER(UInt16, uint16) @@ -182,7 +181,7 @@ public: private: struct attribute_t { - attribute_type type; + AttributeType type; UInt8 uint8_null_value; UInt16 uint16_null_value; UInt32 uint32_null_value; @@ -226,6 +225,7 @@ private: void loadData() { auto stream = source_ptr->loadAll(); + stream->readPrefix(); while (const auto block = stream->read()) { @@ -240,55 +240,57 @@ private: setAttributeValue(attribute, id_column[row_idx].get(), attribute_column[row_idx]); } } + + stream->readSuffix(); } - attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value) + attribute_t createAttributeWithType(const AttributeType type, const std::string & null_value) { attribute_t attr{type}; switch (type) { - case attribute_type::uint8: + case AttributeType::uint8: attr.uint8_null_value = DB::parse(null_value); attr.uint8_map.reset(new HashMap); break; - case attribute_type::uint16: + case AttributeType::uint16: attr.uint16_null_value = DB::parse(null_value); attr.uint16_map.reset(new HashMap); break; - case attribute_type::uint32: + case AttributeType::uint32: attr.uint32_null_value = DB::parse(null_value); attr.uint32_map.reset(new HashMap); break; - case attribute_type::uint64: + case AttributeType::uint64: attr.uint64_null_value = DB::parse(null_value); attr.uint64_map.reset(new HashMap); break; - case attribute_type::int8: + case AttributeType::int8: attr.int8_null_value = DB::parse(null_value); attr.int8_map.reset(new HashMap); break; - case attribute_type::int16: + case AttributeType::int16: attr.int16_null_value = DB::parse(null_value); attr.int16_map.reset(new HashMap); break; - case attribute_type::int32: + case AttributeType::int32: attr.int32_null_value = DB::parse(null_value); attr.int32_map.reset(new HashMap); break; - case attribute_type::int64: + case AttributeType::int64: attr.int64_null_value = DB::parse(null_value); attr.int64_map.reset(new HashMap); break; - case attribute_type::float32: + case AttributeType::float32: attr.float32_null_value = DB::parse(null_value); attr.float32_map.reset(new HashMap); break; - case attribute_type::float64: + case AttributeType::float64: attr.float64_null_value = DB::parse(null_value); attr.float64_map.reset(new HashMap); break; - case attribute_type::string: + case AttributeType::string: attr.string_null_value = null_value; attr.string_arena.reset(new Arena); attr.string_map.reset(new HashMap); @@ -302,57 +304,57 @@ private: { switch (attribute.type) { - case attribute_type::uint8: + case AttributeType::uint8: { attribute.uint8_map->insert({ id, value.get() }); break; } - case attribute_type::uint16: + case AttributeType::uint16: { attribute.uint16_map->insert({ id, value.get() }); break; } - case attribute_type::uint32: + case AttributeType::uint32: { attribute.uint32_map->insert({ id, value.get() }); break; } - case attribute_type::uint64: + case AttributeType::uint64: { attribute.uint64_map->insert({ id, value.get() }); break; } - case attribute_type::int8: + case AttributeType::int8: { attribute.int8_map->insert({ id, value.get() }); break; } - case attribute_type::int16: + case AttributeType::int16: { attribute.int16_map->insert({ id, value.get() }); break; } - case attribute_type::int32: + case AttributeType::int32: { attribute.int32_map->insert({ id, value.get() }); break; } - case attribute_type::int64: + case AttributeType::int64: { attribute.int64_map->insert({ id, value.get() }); break; } - case attribute_type::float32: + case AttributeType::float32: { attribute.float32_map->insert({ id, value.get() }); break; } - case attribute_type::float64: + case AttributeType::float64: { attribute.float64_map->insert({ id, value.get() }); break; } - case attribute_type::string: + case AttributeType::string: { const auto & string = value.get(); const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size()); diff --git a/dbms/include/DB/Dictionaries/IDictionary.h b/dbms/include/DB/Dictionaries/IDictionary.h index 088c9901912..09361caa6cf 100644 --- a/dbms/include/DB/Dictionaries/IDictionary.h +++ b/dbms/include/DB/Dictionaries/IDictionary.h @@ -18,7 +18,7 @@ class DictionaryLifetime; class IDictionary { public: - using id_t = std::uint64_t; + using id_t = std::uint64_t; virtual std::string getName() const = 0; @@ -28,7 +28,7 @@ public: virtual void reload() {} virtual DictionaryPtr clone() const = 0; - virtual const IDictionarySource * const getSource() const = 0; + virtual const IDictionarySource * getSource() const = 0; virtual const DictionaryLifetime & getLifetime() const = 0; @@ -89,7 +89,7 @@ public: virtual Float64 getFloat64Unsafe(std::size_t attribute_idx, id_t id) const = 0; virtual StringRef getStringUnsafe(std::size_t attribute_idx, id_t id) const = 0; - virtual ~IDictionary() = default; + virtual ~IDictionary() = default; }; } diff --git a/dbms/include/DB/Dictionaries/IDictionarySource.h b/dbms/include/DB/Dictionaries/IDictionarySource.h index 50404cc682c..3a0eea143db 100644 --- a/dbms/include/DB/Dictionaries/IDictionarySource.h +++ b/dbms/include/DB/Dictionaries/IDictionarySource.h @@ -9,12 +9,28 @@ namespace DB class IDictionarySource; using DictionarySourcePtr = std::unique_ptr; +/** Data-provider interface for external dictionaries, +* abstracts out the data source (file, MySQL, ClickHouse, external program, network request et cetera) +* from the presentation and memory layout (the dictionary itself). +*/ class IDictionarySource { public: + /// returns an input stream with all the data available from this source virtual BlockInputStreamPtr loadAll() = 0; + + /** Indicates whether this source supports "random access" loading of data + * loadId and loadIds can only be used if this function returns true. + */ + virtual bool supportsSelectiveLoad() const = 0; + + /// returns an input stream with the data for the requested identifier virtual BlockInputStreamPtr loadId(const std::uint64_t id) = 0; + + /// returns an input stream with the data for a collection of identifiers virtual BlockInputStreamPtr loadIds(const std::vector ids) = 0; + + /// indicates whether the source has been modified since last load* operation virtual bool isModified() const = 0; virtual DictionarySourcePtr clone() const = 0; diff --git a/dbms/include/DB/Dictionaries/MysqlBlockInputStream.h b/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h similarity index 54% rename from dbms/include/DB/Dictionaries/MysqlBlockInputStream.h rename to dbms/include/DB/Dictionaries/MySQLBlockInputStream.h index 120a02ac47f..26942ad9b28 100644 --- a/dbms/include/DB/Dictionaries/MysqlBlockInputStream.h +++ b/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h @@ -12,10 +12,11 @@ namespace DB { -class MysqlBlockInputStream final : public IProfilingBlockInputStream +/// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining +class MySQLBlockInputStream final : public IProfilingBlockInputStream { public: - MysqlBlockInputStream(mysqlxx::Query query, const Block & sample_block, const std::size_t max_block_size) + MySQLBlockInputStream(mysqlxx::Query query, const Block & sample_block, const std::size_t max_block_size) : query{std::move(query)}, result{query.use()}, sample_block{sample_block}, max_block_size{max_block_size} { types.reserve(sample_block.columns()); @@ -24,27 +25,27 @@ public: { const auto type = sample_block.getByPosition(idx).type.get(); if (typeid_cast(type)) - types.push_back(attribute_type::uint8); + types.push_back(AttributeType::uint8); else if (typeid_cast(type)) - types.push_back(attribute_type::uint16); + types.push_back(AttributeType::uint16); else if (typeid_cast(type)) - types.push_back(attribute_type::uint32); + types.push_back(AttributeType::uint32); else if (typeid_cast(type)) - types.push_back(attribute_type::uint64); + types.push_back(AttributeType::uint64); else if (typeid_cast(type)) - types.push_back(attribute_type::int8); + types.push_back(AttributeType::int8); else if (typeid_cast(type)) - types.push_back(attribute_type::int16); + types.push_back(AttributeType::int16); else if (typeid_cast(type)) - types.push_back(attribute_type::int32); + types.push_back(AttributeType::int32); else if (typeid_cast(type)) - types.push_back(attribute_type::int64); + types.push_back(AttributeType::int64); else if (typeid_cast(type)) - types.push_back(attribute_type::float32); + types.push_back(AttributeType::float32); else if (typeid_cast(type)) - types.push_back(attribute_type::float64); + types.push_back(AttributeType::float64); else if (typeid_cast(type)) - types.push_back(attribute_type::string); + types.push_back(AttributeType::string); else throw Exception{ "Unsupported type " + type->getName(), @@ -53,11 +54,11 @@ public: } } - String getName() const override { return "MysqlBlockInputStream"; } + String getName() const override { return "MySQLBlockInputStream"; } String getID() const override { - return "Mysql(" + query.str() + ")"; + return "MySQL(" + query.str() + ")"; } private: @@ -67,7 +68,7 @@ private: if (block.columns() != result.getNumFields()) throw Exception{ - "mysqlxx::UserQueryResult contains " + toString(result.getNumFields()) + " columns while " + + "mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while " + toString(block.columns()) + " expected", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH }; @@ -86,29 +87,29 @@ private: return rows == 0 ? Block{} : block; }; - static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const attribute_type type) + static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const AttributeType type) { switch (type) { - case attribute_type::uint8: column->insert(static_cast(value)); break; - case attribute_type::uint16: column->insert(static_cast(value)); break; - case attribute_type::uint32: column->insert(static_cast(value)); break; - case attribute_type::uint64: column->insert(static_cast(value)); break; - case attribute_type::int8: column->insert(static_cast(value)); break; - case attribute_type::int16: column->insert(static_cast(value)); break; - case attribute_type::int32: column->insert(static_cast(value)); break; - case attribute_type::int64: column->insert(static_cast(value)); break; - case attribute_type::float32: column->insert(static_cast(value)); break; - case attribute_type::float64: column->insert(static_cast(value)); break; - case attribute_type::string: column->insert(value.getString()); break; + case AttributeType::uint8: column->insert(static_cast(value)); break; + case AttributeType::uint16: column->insert(static_cast(value)); break; + case AttributeType::uint32: column->insert(static_cast(value)); break; + case AttributeType::uint64: column->insert(static_cast(value)); break; + case AttributeType::int8: column->insert(static_cast(value)); break; + case AttributeType::int16: column->insert(static_cast(value)); break; + case AttributeType::int32: column->insert(static_cast(value)); break; + case AttributeType::int64: column->insert(static_cast(value)); break; + case AttributeType::float32: column->insert(static_cast(value)); break; + case AttributeType::float64: column->insert(static_cast(value)); break; + case AttributeType::string: column->insert(value.getString()); break; } } mysqlxx::Query query; mysqlxx::UseQueryResult result; Block sample_block; - std::size_t max_block_size; - std::vector types; + const std::size_t max_block_size; + std::vector types; }; } diff --git a/dbms/include/DB/Dictionaries/MysqlDictionarySource.h b/dbms/include/DB/Dictionaries/MySQLDictionarySource.h similarity index 64% rename from dbms/include/DB/Dictionaries/MysqlDictionarySource.h rename to dbms/include/DB/Dictionaries/MySQLDictionarySource.h index e97382cd223..dd1f49aca75 100644 --- a/dbms/include/DB/Dictionaries/MysqlDictionarySource.h +++ b/dbms/include/DB/Dictionaries/MySQLDictionarySource.h @@ -1,39 +1,42 @@ #pragma once #include -#include -#include +#include #include #include #include +#include namespace DB { -class MysqlDictionarySource final : public IDictionarySource +/// Allows loading dictionaries from a MySQL database +class MySQLDictionarySource final : public IDictionarySource { static const auto max_block_size = 8192; public: - MysqlDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, - Block & sample_block, const Context & context) + MySQLDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, + Block & sample_block) : table{config.getString(config_prefix + ".table")}, - sample_block{sample_block}, context(context), + sample_block{sample_block}, pool{config, config_prefix}, load_all_query{composeLoadAllQuery(sample_block, table)}, last_modification{getLastModification()} {} - MysqlDictionarySource(const MysqlDictionarySource & other) + /// copy-constructor is provided in order to support cloneability + MySQLDictionarySource(const MySQLDictionarySource & other) : table{other.table}, - sample_block{other.sample_block}, context(other.context), + sample_block{other.sample_block}, pool{other.pool}, load_all_query{other.load_all_query}, last_modification{other.last_modification} {} BlockInputStreamPtr loadAll() override { - return new MysqlBlockInputStream{pool.Get()->query(load_all_query), sample_block, max_block_size}; + last_modification = getLastModification(); + return new MySQLBlockInputStream{pool.Get()->query(load_all_query), sample_block, max_block_size}; } BlockInputStreamPtr loadId(const std::uint64_t id) override @@ -53,8 +56,9 @@ public: } bool isModified() const override { return getLastModification() > last_modification; } + bool supportsSelectiveLoad() const override { return true; } - DictionarySourcePtr clone() const override { return ext::make_unique(*this); } + DictionarySourcePtr clone() const override { return std::make_unique(*this); } private: mysqlxx::DateTime getLastModification() const @@ -64,20 +68,23 @@ private: try { auto connection = pool.Get(); - auto query = connection->query("SHOW TABLE STATUS LIKE '%" + table + "%';"); + auto query = connection->query("SHOW TABLE STATUS LIKE '%" + strconvert::escaped_for_like(table) + "%';"); auto result = query.use(); auto row = result.fetch(); const auto & update_time = row[Update_time_idx]; - return !update_time.isNull() ? update_time.getDateTime() : mysqlxx::DateTime{std::time(nullptr)}; + if (!update_time.isNull()) + return update_time.getDateTime(); } catch (...) { - tryLogCurrentException("MysqlDictionarySource"); + tryLogCurrentException("MySQLDictionarySource"); } - return {}; + /// we suppose failure to get modification time is not an error, therefore return current time + return mysqlxx::DateTime{std::time(nullptr)}; } + /// @todo escape table and column names static std::string composeLoadAllQuery(const Block & block, const std::string & table) { std::string query{"SELECT "}; @@ -99,7 +106,6 @@ private: const std::string table; Block sample_block; - const Context & context; mutable mysqlxx::PoolWithFailover pool; const std::string load_all_query; mysqlxx::DateTime last_modification; diff --git a/dbms/include/DB/Dictionaries/OwningBufferBlockInputStream.h b/dbms/include/DB/Dictionaries/OwningBufferBlockInputStream.h index b50d4c322a7..cc4c066f9f9 100644 --- a/dbms/include/DB/Dictionaries/OwningBufferBlockInputStream.h +++ b/dbms/include/DB/Dictionaries/OwningBufferBlockInputStream.h @@ -7,6 +7,9 @@ namespace DB { +/** Provides reading from a Buffer, taking exclusive ownership over it's lifetime, +* simplifies usage of ReadBufferFromFile (no need to manage buffer lifetime) etc. +*/ class OwningBufferBlockInputStream : public IProfilingBlockInputStream { public: @@ -21,9 +24,7 @@ private: String getName() const override { return "OwningBufferBlockInputStream"; } - String getID() const override { - return "OwningBuffer(" + stream->getID() + ")"; - } + String getID() const override { return "OwningBuffer(" + stream->getID() + ")"; } BlockInputStreamPtr stream; std::unique_ptr buffer; diff --git a/dbms/include/DB/Dictionaries/config_ptr_t.h b/dbms/include/DB/Dictionaries/config_ptr_t.h deleted file mode 100644 index f5f29792cc7..00000000000 --- a/dbms/include/DB/Dictionaries/config_ptr_t.h +++ /dev/null @@ -1,15 +0,0 @@ -#pragma once - -#include - -namespace DB -{ - -template struct release -{ - void operator()(const T * const ptr) { ptr->release(); } -}; - -template using config_ptr_t = std::unique_ptr>; - -} diff --git a/dbms/include/DB/Functions/FunctionsDictionaries.h b/dbms/include/DB/Functions/FunctionsDictionaries.h index c5e2a95989f..8bfef963177 100644 --- a/dbms/include/DB/Functions/FunctionsDictionaries.h +++ b/dbms/include/DB/Functions/FunctionsDictionaries.h @@ -14,6 +14,7 @@ #include #include #include +#include namespace DB @@ -35,6 +36,19 @@ namespace DB * * Получить массив идентификаторов регионов, состоящий из исходного и цепочки родителей. Порядок implementation defined. * regionHierarchy, OSHierarchy, SEHierarchy. + * + * Функции, использующие подключаемые (внешние) словари. + * + * Получить значение аттрибута заданного типа. + * dictGetType(dictionary, attribute, id), + * Type - placeholder для имени типа, в данный момент поддерживаются любые числовые и строковой типы. + * Тип должен соответствовать реальному типу аттрибута, с которым он был объявлен в структуре словаря. + * + * Получить массив идентификаторов, состоящий из исходного и цепочки родителей. + * dictGetHierarchy(dictionary, id). + * + * Является ли первы йидентификатор потомком второго. + * dictIsIn(dictionary, child_id, parent_id). */ @@ -726,10 +740,10 @@ public: static IFunction * create(const Context & context) { - return new FunctionDictGetString{context.getDictionaries()}; + return new FunctionDictGetString{context.getExternalDictionaries()}; }; - FunctionDictGetString(const Dictionaries & dictionaries) : dictionaries(dictionaries) {} + FunctionDictGetString(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {} String getName() const override { return name; } @@ -746,7 +760,8 @@ private: if (!typeid_cast(arguments[0].get())) { throw Exception{ - "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + + ", expected a string.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -754,7 +769,8 @@ private: if (!typeid_cast(arguments[1].get())) { throw Exception{ - "Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[1]->getName() + " of second argument of function " + getName() + + ", expected a string.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -770,7 +786,8 @@ private: !typeid_cast(id_arg)) { throw Exception{ - "Illegal type " + arguments[2]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[2]->getName() + " of third argument of function " + getName() + + ", expected an integer.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -787,11 +804,12 @@ private: ErrorCodes::ILLEGAL_COLUMN }; - auto dict = dictionaries.getExternalDictionary(dict_name_col->getData()); + auto dict = dictionaries.getDictionary(dict_name_col->getData()); const auto dict_ptr = dict.get(); if (!executeDispatch(block, arguments, result, dict_ptr) && - !executeDispatch(block, arguments, result, dict_ptr)) + !executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr)) throw Exception{ "Unsupported dictionary type " + dict_ptr->getTypeName(), ErrorCodes::UNKNOWN_TYPE @@ -871,7 +889,7 @@ private: return false; } - const Dictionaries & dictionaries; + const ExternalDictionaries & dictionaries; }; @@ -911,10 +929,10 @@ public: static IFunction * create(const Context & context) { - return new FunctionDictGet{context.getDictionaries()}; + return new FunctionDictGet{context.getExternalDictionaries()}; }; - FunctionDictGet(const Dictionaries & dictionaries) : dictionaries(dictionaries) {} + FunctionDictGet(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {} String getName() const override { return name; } @@ -931,7 +949,8 @@ private: if (!typeid_cast(arguments[0].get())) { throw Exception{ - "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + + ", expected a string.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -939,7 +958,8 @@ private: if (!typeid_cast(arguments[1].get())) { throw Exception{ - "Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[1]->getName() + " of second argument of function " + getName() + + ", expected a string.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -955,7 +975,8 @@ private: !typeid_cast(id_arg)) { throw Exception{ - "Illegal type " + id_arg->getName() + " of argument of function " + getName(), + "Illegal type " + id_arg->getName() + " of third argument of function " + getName() + + ", expected an integer.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -972,11 +993,12 @@ private: ErrorCodes::ILLEGAL_COLUMN }; - auto dict = dictionaries.getExternalDictionary(dict_name_col->getData()); + auto dict = dictionaries.getDictionary(dict_name_col->getData()); const auto dict_ptr = dict.get(); if (!executeDispatch(block, arguments, result, dict_ptr) && - !executeDispatch(block, arguments, result, dict_ptr)) + !executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr)) throw Exception{ "Unsupported dictionary type " + dict_ptr->getTypeName(), ErrorCodes::UNKNOWN_TYPE @@ -1058,7 +1080,7 @@ private: return false; } - const Dictionaries & dictionaries; + const ExternalDictionaries & dictionaries; }; template @@ -1084,10 +1106,10 @@ public: static IFunction * create(const Context & context) { - return new FunctionDictGetHierarchy{context.getDictionaries()}; + return new FunctionDictGetHierarchy{context.getExternalDictionaries()}; }; - FunctionDictGetHierarchy(const Dictionaries & dictionaries) : dictionaries(dictionaries) {} + FunctionDictGetHierarchy(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {} String getName() const override { return name; } @@ -1104,7 +1126,8 @@ private: if (!typeid_cast(arguments[0].get())) { throw Exception{ - "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + + ", expected a string.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -1120,7 +1143,8 @@ private: !typeid_cast(id_arg)) { throw Exception{ - "Illegal type " + id_arg->getName() + " of argument of function " + getName(), + "Illegal type " + id_arg->getName() + " of second argument of function " + getName() + + ", expected an integer.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -1137,7 +1161,7 @@ private: ErrorCodes::ILLEGAL_COLUMN }; - auto dict = dictionaries.getExternalDictionary(dict_name_col->getData()); + auto dict = dictionaries.getDictionary(dict_name_col->getData()); const auto dict_ptr = dict.get(); if (!dict->hasHierarchy()) @@ -1147,7 +1171,8 @@ private: }; if (!executeDispatch(block, arguments, result, dict_ptr) && - !executeDispatch(block, arguments, result, dict_ptr)) + !executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr)) throw Exception{ "Unsupported dictionary type " + dict_ptr->getTypeName(), ErrorCodes::UNKNOWN_TYPE @@ -1234,7 +1259,7 @@ private: return false; } - const Dictionaries & dictionaries; + const ExternalDictionaries & dictionaries; }; @@ -1245,10 +1270,10 @@ public: static IFunction * create(const Context & context) { - return new FunctionDictIsIn{context.getDictionaries()}; + return new FunctionDictIsIn{context.getExternalDictionaries()}; }; - FunctionDictIsIn(const Dictionaries & dictionaries) : dictionaries(dictionaries) {} + FunctionDictIsIn(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {} String getName() const override { return name; } @@ -1265,7 +1290,8 @@ private: if (!typeid_cast(arguments[0].get())) { throw Exception{ - "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + + ", expected a string.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -1281,7 +1307,8 @@ private: !typeid_cast(child_id_arg)) { throw Exception{ - "Illegal type " + child_id_arg->getName() + " of argument of function " + getName(), + "Illegal type " + child_id_arg->getName() + " of second argument of function " + getName() + + ", expected an integer.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -1297,7 +1324,8 @@ private: !typeid_cast(ancestor_id_arg)) { throw Exception{ - "Illegal type " + ancestor_id_arg->getName() + " of argument of function " + getName(), + "Illegal type " + ancestor_id_arg->getName() + " of argument of third function " + getName() + + ", expected an integer.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -1314,7 +1342,7 @@ private: ErrorCodes::ILLEGAL_COLUMN }; - auto dict = dictionaries.getExternalDictionary(dict_name_col->getData()); + auto dict = dictionaries.getDictionary(dict_name_col->getData()); const auto dict_ptr = dict.get(); if (!dict->hasHierarchy()) @@ -1324,7 +1352,8 @@ private: }; if (!executeDispatch(block, arguments, result, dict_ptr) && - !executeDispatch(block, arguments, result, dict_ptr)) + !executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr)) throw Exception{ "Unsupported dictionary type " + dict_ptr->getTypeName(), ErrorCodes::UNKNOWN_TYPE @@ -1470,7 +1499,7 @@ private: return false; } - const Dictionaries & dictionaries; + const ExternalDictionaries & dictionaries; }; diff --git a/dbms/include/DB/Functions/FunctionsStringArray.h b/dbms/include/DB/Functions/FunctionsStringArray.h index 629d215bad0..12f8f994b94 100644 --- a/dbms/include/DB/Functions/FunctionsStringArray.h +++ b/dbms/include/DB/Functions/FunctionsStringArray.h @@ -267,7 +267,7 @@ public: throw Exception("Illegal column " + col->getName() + " of first argument of function " + getName() + ". Must be constant string.", ErrorCodes::ILLEGAL_COLUMN); - re = Regexps::get(col->getData()); + re = Regexps::get(col->getData()); capture = re->getNumberOfSubpatterns() > 0 ? 1 : 0; matches.resize(capture + 1); diff --git a/dbms/include/DB/Functions/FunctionsStringSearch.h b/dbms/include/DB/Functions/FunctionsStringSearch.h index c8ed6b852f6..ffc95a8928d 100644 --- a/dbms/include/DB/Functions/FunctionsStringSearch.h +++ b/dbms/include/DB/Functions/FunctionsStringSearch.h @@ -3,7 +3,7 @@ #include #include -#include +#include #include #include @@ -299,24 +299,24 @@ namespace Regexps } template - inline Regexp createRegexp(const std::string & pattern) { return pattern; } + inline Regexp createRegexp(const std::string & pattern, int flags) { return {pattern, flags}; } template <> - inline Regexp createRegexp(const std::string & pattern) { return likePatternToRegexp(pattern); } + inline Regexp createRegexp(const std::string & pattern, int flags) { return {likePatternToRegexp(pattern), flags}; } - template + template inline Pointer get(const std::string & pattern) { /// C++11 has thread-safe function-local statics on most modern compilers. - static KnownRegexps known_regexps; + static KnownRegexps known_regexps; /// Разные переменные для разных параметров шаблона. static std::mutex mutex; std::lock_guard lock{mutex}; auto it = known_regexps.find(pattern); if (known_regexps.end() == it) - it = known_regexps.emplace(pattern, ext::make_unique()).first; + it = known_regexps.emplace(pattern, std::make_unique()).first; return it->second->get([&pattern] { - return new Regexp{createRegexp(pattern)}; + return new Regexp{createRegexp(pattern, no_capture ? OptimizedRegularExpression::RE_NO_CAPTURE : 0)}; }); } } @@ -373,7 +373,7 @@ struct MatchImpl } else { - const auto & regexp = Regexps::get(pattern); + const auto & regexp = Regexps::get(pattern); size_t size = offsets.size(); for (size_t i = 0; i < size; ++i) res[i] = revert ^ regexp->match(reinterpret_cast(&data[i != 0 ? offsets[i - 1] : 0]), (i != 0 ? offsets[i] - offsets[i - 1] : offsets[0]) - 1); @@ -382,7 +382,7 @@ struct MatchImpl static void constant(const std::string & data, const std::string & pattern, UInt8 & res) { - const auto & regexp = Regexps::get(pattern); + const auto & regexp = Regexps::get(pattern); res = revert ^ regexp->match(data); } }; @@ -397,7 +397,7 @@ struct ExtractImpl res_data.reserve(data.size() / 5); res_offsets.resize(offsets.size()); - const auto & regexp = Regexps::get(pattern); + const auto & regexp = Regexps::get(pattern); unsigned capture = regexp->getNumberOfSubpatterns() > 0 ? 1 : 0; OptimizedRegularExpression::MatchVec matches; diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index 3cdc20dde70..18a228bef83 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -87,6 +88,7 @@ struct ContextShared DataTypeFactory data_type_factory; /// Типы данных. FormatFactory format_factory; /// Форматы. mutable SharedPtr dictionaries; /// Словари Метрики. Инициализируются лениво. + mutable SharedPtr external_dictionaries; Users users; /// Известные пользователи. Quotas quotas; /// Известные квоты на использование ресурсов. mutable UncompressedCachePtr uncompressed_cache; /// Кэш разжатых блоков. @@ -259,6 +261,7 @@ public: const DataTypeFactory & getDataTypeFactory() const { return shared->data_type_factory; } const FormatFactory & getFormatFactory() const { return shared->format_factory; } const Dictionaries & getDictionaries() const; + const ExternalDictionaries & getExternalDictionaries() const; InterserverIOHandler & getInterserverIOHandler() { return shared->interserver_io_handler; } diff --git a/dbms/include/DB/Interpreters/Dictionaries.h b/dbms/include/DB/Interpreters/Dictionaries.h index e943d12fe61..94e12484daa 100644 --- a/dbms/include/DB/Interpreters/Dictionaries.h +++ b/dbms/include/DB/Interpreters/Dictionaries.h @@ -1,13 +1,5 @@ #pragma once -#include -#include -#include -#include -#include - -#include - #include #include #include @@ -18,10 +10,7 @@ namespace DB { -using Poco::SharedPtr; - class Context; -class IDictionary; /// Словари Метрики, которые могут использоваться в функциях. @@ -31,22 +20,15 @@ private: MultiVersion regions_hierarchies; MultiVersion tech_data_hierarchy; MultiVersion regions_names; - mutable std::mutex external_dictionaries_mutex; - std::unordered_map>> external_dictionaries; - std::unordered_map update_times; - std::mt19937 rnd_engine; - Context & context; /// Периодичность обновления справочников, в секундах. int reload_period; std::thread reloading_thread; - std::thread reloading_externals_thread; - Poco::Event destroy{false}; + Poco::Event destroy; Logger * log; - Poco::Timestamp dictionaries_last_modified{0}; void handleException() const @@ -122,7 +104,6 @@ private: } - void reloadExternals(); /// Обновляет каждые reload_period секунд. void reloadPeriodically() @@ -136,35 +117,19 @@ private: } } - void reloadExternalsPeriodically() - { - const auto check_period = 5 * 1000; - while (true) - { - if (destroy.tryWait(check_period)) - return; - - reloadExternals(); - } - } - public: /// Справочники будут обновляться в отдельном потоке, каждые reload_period секунд. - Dictionaries(Context & context, int reload_period_ = 3600) - : context(context), reload_period(reload_period_), - log(&Logger::get("Dictionaries")) + Dictionaries(int reload_period_ = 3600) + : reload_period(reload_period_), log(&Logger::get("Dictionaries")) { reloadImpl(); - reloadExternals(); reloading_thread = std::thread([this] { reloadPeriodically(); }); - reloading_externals_thread = std::thread{&Dictionaries::reloadExternalsPeriodically, this}; } ~Dictionaries() { destroy.set(); reloading_thread.join(); - reloading_externals_thread.join(); } MultiVersion::Version getRegionsHierarchies() const @@ -181,19 +146,6 @@ public: { return regions_names.get(); } - - MultiVersion::Version getExternalDictionary(const std::string & name) const - { - const std::lock_guard lock{external_dictionaries_mutex}; - const auto it = external_dictionaries.find(name); - if (it == std::end(external_dictionaries)) - throw Exception{ - "No such dictionary: " + name, - ErrorCodes::BAD_ARGUMENTS - }; - - return it->second->get(); - } }; } diff --git a/dbms/include/DB/Interpreters/ExternalDictionaries.h b/dbms/include/DB/Interpreters/ExternalDictionaries.h new file mode 100644 index 00000000000..171d1a39242 --- /dev/null +++ b/dbms/include/DB/Interpreters/ExternalDictionaries.h @@ -0,0 +1,122 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class Context; +class IDictionary; + +/** Manages user-defined dictionaries. +* Monitors configuration file and automatically reloads dictionaries in a separate thread. +* The monitoring thread wakes up every @check_period_sec seconds and checks +* modification time of dictionaries' configuration file. If said time is greater than +* @config_last_modified, the dictionaries are created from scratch using configuration file, +* possibly overriding currently existing dictionaries with the same name (previous versions of +* overridden dictionaries will live as long as there are any users retaining them). +* +* Apart from checking configuration file for modifications, each non-cached dictionary +* has a lifetime of its own and may be updated if it's source reports that it has been +* modified. The time of next update is calculated by choosing uniformly a random number +* distributed between lifetime.min_sec and lifetime.max_sec. +* If either of lifetime.min_sec and lifetime.max_sec is zero, such dictionary is never updated. +*/ +class ExternalDictionaries +{ +private: + static const auto check_period_sec = 5; + + mutable std::mutex dictionaries_mutex; + std::unordered_map>> dictionaries; + std::unordered_map update_times; + std::mt19937_64 rnd_engine{getSeed()}; + + Context & context; + + std::thread reloading_thread; + Poco::Event destroy; + + Logger * log; + + Poco::Timestamp config_last_modified{0}; + + void handleException() const + { + try + { + throw; + } + catch (const Poco::Exception & e) + { + LOG_ERROR(log, "Cannot load exter dictionary! You must resolve this manually. " << e.displayText()); + return; + } + catch (...) + { + LOG_ERROR(log, "Cannot load dictionary! You must resolve this manually."); + return; + } + } + + void reloadImpl(); + + void reloadPeriodically() + { + while (true) + { + if (destroy.tryWait(check_period_sec * 1000)) + return; + + reloadImpl(); + } + } + + static std::uint64_t getSeed() + { + timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return ts.tv_nsec ^ getpid(); + } + +public: + /// Справочники будут обновляться в отдельном потоке, каждые reload_period секунд. + ExternalDictionaries(Context & context) + : context(context), log(&Logger::get("ExternalDictionaries")) + { + reloadImpl(); + reloading_thread = std::thread{&ExternalDictionaries::reloadPeriodically, this}; + } + + ~ExternalDictionaries() + { + destroy.set(); + reloading_thread.join(); + } + + MultiVersion::Version getDictionary(const std::string & name) const + { + const std::lock_guard lock{dictionaries_mutex}; + const auto it = dictionaries.find(name); + if (it == std::end(dictionaries)) + throw Exception{ + "No such dictionary: " + name, + ErrorCodes::BAD_ARGUMENTS + }; + + return it->second->get(); + } +}; + +} diff --git a/dbms/include/DB/Interpreters/Limits.h b/dbms/include/DB/Interpreters/Limits.h index 6f6ee03b483..7383ee4e4d8 100644 --- a/dbms/include/DB/Interpreters/Limits.h +++ b/dbms/include/DB/Interpreters/Limits.h @@ -87,6 +87,9 @@ struct Limits \ /** Максимальное использование памяти при обработке запроса. 0 - не ограничено. */ \ M(SettingUInt64, max_memory_usage, 0) \ + \ + /** Максимальная скорость обмена данными по сети в байтах в секунду. 0 - не ограничена. */ \ + M(SettingUInt64, max_network_bandwidth, 0) \ #define DECLARE(TYPE, NAME, DEFAULT) \ TYPE NAME {DEFAULT}; diff --git a/dbms/include/DB/Interpreters/evaluateMissingDefaults.h b/dbms/include/DB/Interpreters/evaluateMissingDefaults.h index 5473b847f71..420cda457c7 100644 --- a/dbms/include/DB/Interpreters/evaluateMissingDefaults.h +++ b/dbms/include/DB/Interpreters/evaluateMissingDefaults.h @@ -16,7 +16,7 @@ inline void evaluateMissingDefaults(Block & block, if (column_defaults.empty()) return; - ASTPtr default_expr_list{ext::make_unique().release()}; + ASTPtr default_expr_list{std::make_unique().release()}; for (const auto & column : required_columns) { diff --git a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h index d3e57785d42..8d1e0b17f8b 100644 --- a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h +++ b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include diff --git a/dbms/include/DB/Storages/IStorage.h b/dbms/include/DB/Storages/IStorage.h index 815464cffcb..aeed983161d 100644 --- a/dbms/include/DB/Storages/IStorage.h +++ b/dbms/include/DB/Storages/IStorage.h @@ -14,7 +14,7 @@ #include #include #include -#include +#include namespace DB @@ -129,7 +129,7 @@ public: */ TableDataWriteLockPtr lockDataForAlter() { - auto res = ext::make_unique(data_lock); + auto res = std::make_unique(data_lock); if (is_dropped) throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED); return res; @@ -137,7 +137,7 @@ public: TableStructureWriteLockPtr lockStructureForAlter() { - auto res = ext::make_unique(structure_lock); + auto res = std::make_unique(structure_lock); if (is_dropped) throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED); return res; diff --git a/dbms/include/DB/Storages/MergeTree/MergeList.h b/dbms/include/DB/Storages/MergeTree/MergeList.h index 138db6ba944..533d8c46b1d 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeList.h +++ b/dbms/include/DB/Storages/MergeTree/MergeList.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include #include @@ -67,7 +67,7 @@ public: EntryPtr insert(Args &&... args) { std::lock_guard lock{mutex}; - return ext::make_unique(*this, merges.emplace(merges.end(), std::forward(args)...)); + return std::make_unique(*this, merges.emplace(merges.end(), std::forward(args)...)); } container_t get() const diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeWhereOptimizer.h b/dbms/include/DB/Storages/MergeTree/MergeTreeWhereOptimizer.h index fa272831634..8ec5657b756 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeWhereOptimizer.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeWhereOptimizer.h @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/dbms/include/DB/TableFunctions/TableFunctionRemote.h b/dbms/include/DB/TableFunctions/TableFunctionRemote.h index 8ce917e30c7..5bd461fe87e 100644 --- a/dbms/include/DB/TableFunctions/TableFunctionRemote.h +++ b/dbms/include/DB/TableFunctions/TableFunctionRemote.h @@ -92,9 +92,8 @@ private: /// Отправляем на первый попавшийся шард BlockInputStreamPtr input{ new RemoteBlockInputStream{ - cluster.pools.front().get(), query, &settings, - Tables(), QueryProcessingStage::Complete, context - } + cluster.pools.front().get(), query, &settings, nullptr, + Tables(), QueryProcessingStage::Complete, context} }; input->readPrefix(); diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 57a93d60c4d..9b498318b1a 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -277,15 +277,22 @@ void Connection::sendData(const Block & block, const String & name) if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) writeStringBinary(name, *out); + size_t prev_bytes = out->count(); + block.checkNestedArraysOffsets(); block_out->write(block); maybe_compressed_out->next(); out->next(); + + if (throttler) + throttler->add(out->count() - prev_bytes); } void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String & name) { + /// NOTE В этом методе не используется throttler (хотя можно использовать, но это пока не важно). + writeVarUInt(Protocol::Client::Data, *out); if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) @@ -436,8 +443,15 @@ Block Connection::receiveData() if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) readStringBinary(external_table_name, *in); + size_t prev_bytes = in->count(); + /// Прочитать из сети один блок - return block_in->read(); + Block res = block_in->read(); + + if (throttler) + throttler->add(in->count() - prev_bytes); + + return res; } diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp index 9e5fee8c3d7..7e8d52baa6f 100644 --- a/dbms/src/Client/ParallelReplicas.cpp +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -2,112 +2,163 @@ namespace DB { - ParallelReplicas::ParallelReplicas(Connection * connection_, Settings * settings_) - : settings(settings_), - active_replica_count(1), - supports_parallel_execution(false) + +ParallelReplicas::ParallelReplicas(Connection * connection_, Settings * settings_, ThrottlerPtr throttler_) + : settings(settings_), throttler(throttler_), + active_replica_count(1), + supports_parallel_execution(false) +{ + registerReplica(connection_); +} + +ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_, ThrottlerPtr throttler_) + : settings(settings_), throttler(throttler_) +{ + if (pool_ == nullptr) + throw Exception("Null pool specified", ErrorCodes::LOGICAL_ERROR); + + bool has_many_replicas = (settings != nullptr) && (settings->max_parallel_replicas > 1); + if (has_many_replicas) { - registerReplica(connection_); + pool_entries = pool_->getMany(settings); + active_replica_count = pool_entries.size(); + supports_parallel_execution = (active_replica_count > 1); + + if (active_replica_count == 0) + throw Exception("No connection available", ErrorCodes::LOGICAL_ERROR); + + replica_map.reserve(active_replica_count); + for (auto & entry : pool_entries) + registerReplica(&*entry); } - - ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_) - : settings(settings_) + else { - if (pool_ == nullptr) - throw Exception("Null pool specified", ErrorCodes::LOGICAL_ERROR); + active_replica_count = 1; + supports_parallel_execution = false; - bool has_many_replicas = (settings != nullptr) && (settings->max_parallel_replicas > 1); - if (has_many_replicas) - { - pool_entries = pool_->getMany(settings); - active_replica_count = pool_entries.size(); - supports_parallel_execution = (active_replica_count > 1); - - if (active_replica_count == 0) - throw Exception("No connection available", ErrorCodes::LOGICAL_ERROR); - - replica_map.reserve(active_replica_count); - for (auto & entry : pool_entries) - registerReplica(&*entry); - } - else - { - active_replica_count = 1; - supports_parallel_execution = false; - - pool_entry = pool_->get(settings); - registerReplica(&*pool_entry); - } + pool_entry = pool_->get(settings); + registerReplica(&*pool_entry); } +} - void ParallelReplicas::sendExternalTablesData(std::vector & data) +void ParallelReplicas::sendExternalTablesData(std::vector & data) +{ + if (!sent_query) + throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR); + + if (data.size() < active_replica_count) + throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); + + auto it = data.begin(); + for (auto & e : replica_map) { - if (!sent_query) - throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR); + Connection * connection = e.second; + if (connection != nullptr) + connection->sendExternalTablesData(*it); + ++it; + } +} - if (data.size() < active_replica_count) - throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); +void ParallelReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data) +{ + if (sent_query) + throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR); + + if (supports_parallel_execution) + { + Settings query_settings = *settings; + query_settings.parallel_replicas_count = active_replica_count; + UInt64 offset = 0; - auto it = data.begin(); for (auto & e : replica_map) { Connection * connection = e.second; if (connection != nullptr) - connection->sendExternalTablesData(*it); - ++it; - } - } - - void ParallelReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data) - { - if (sent_query) - throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR); - - if (supports_parallel_execution) - { - Settings query_settings = *settings; - query_settings.parallel_replicas_count = active_replica_count; - UInt64 offset = 0; - - for (auto & e : replica_map) { - Connection * connection = e.second; - if (connection != nullptr) - { - query_settings.parallel_replica_offset = offset; - connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data); - ++offset; - } - } - - if (offset > 0) - sent_query = true; - } - else - { - auto it = replica_map.begin(); - Connection * connection = it->second; - if (connection != nullptr) - { - connection->sendQuery(query, query_id, stage, settings, with_pending_data); - sent_query = true; + query_settings.parallel_replica_offset = offset; + connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data); + ++offset; } } + + if (offset > 0) + sent_query = true; } - - Connection::Packet ParallelReplicas::receivePacket() + else { - if (!sent_query) - throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR); - if (!hasActiveReplicas()) - throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR); - - auto it = getReplicaForReading(); - if (it == replica_map.end()) - throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA); - + auto it = replica_map.begin(); Connection * connection = it->second; - Connection::Packet packet = connection->receivePacket(); + if (connection != nullptr) + { + connection->sendQuery(query, query_id, stage, settings, with_pending_data); + sent_query = true; + } + } +} + +Connection::Packet ParallelReplicas::receivePacket() +{ + if (!sent_query) + throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR); + if (!hasActiveReplicas()) + throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR); + + auto it = getReplicaForReading(); + if (it == replica_map.end()) + throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA); + + Connection * connection = it->second; + Connection::Packet packet = connection->receivePacket(); + + switch (packet.type) + { + case Protocol::Server::Data: + case Protocol::Server::Progress: + case Protocol::Server::ProfileInfo: + case Protocol::Server::Totals: + case Protocol::Server::Extremes: + break; + + case Protocol::Server::EndOfStream: + invalidateReplica(it); + break; + + case Protocol::Server::Exception: + default: + connection->disconnect(); + invalidateReplica(it); + break; + } + + return packet; +} + +void ParallelReplicas::sendCancel() +{ + if (!sent_query || cancelled) + throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR); + + for (auto & e : replica_map) + { + Connection * connection = e.second; + if (connection != nullptr) + connection->sendCancel(); + } + + cancelled = true; +} + +Connection::Packet ParallelReplicas::drain() +{ + if (!cancelled) + throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR); + + Connection::Packet res; + res.type = Protocol::Server::EndOfStream; + + while (hasActiveReplicas()) + { + Connection::Packet packet = receivePacket(); switch (packet.type) { @@ -116,153 +167,109 @@ namespace DB case Protocol::Server::ProfileInfo: case Protocol::Server::Totals: case Protocol::Server::Extremes: - break; - case Protocol::Server::EndOfStream: - invalidateReplica(it); break; case Protocol::Server::Exception: default: - connection->disconnect(); - invalidateReplica(it); + /// Если мы получили исключение или неизвестный пакет, сохраняем его. + res = packet; break; } - - return packet; } - void ParallelReplicas::sendCancel() + return res; +} + +std::string ParallelReplicas::dumpAddresses() const +{ + bool is_first = true; + std::ostringstream os; + for (auto & e : replica_map) { - if (!sent_query || cancelled) - throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR); + const Connection * connection = e.second; + if (connection != nullptr) + { + os << (is_first ? "" : "; ") << connection->getServerAddress(); + if (is_first) { is_first = false; } + } + } + + return os.str(); +} + + +void ParallelReplicas::registerReplica(Connection * connection) +{ + if (connection == nullptr) + throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR); + auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection)); + if (!res.second) + throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR); + + if (throttler) + connection->setThrottler(throttler); +} + + +ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getReplicaForReading() +{ + ReplicaMap::iterator it; + + if (supports_parallel_execution) + it = waitForReadEvent(); + else + { + it = replica_map.begin(); + if (it->second == nullptr) + it = replica_map.end(); + } + + return it; +} + +ParallelReplicas::ReplicaMap::iterator ParallelReplicas::waitForReadEvent() +{ + Poco::Net::Socket::SocketList read_list; + read_list.reserve(active_replica_count); + + /** Сначала проверяем, есть ли данные, которые уже лежат в буфере + * хоть одного соединения. + */ + for (auto & e : replica_map) + { + Connection * connection = e.second; + if ((connection != nullptr) && connection->hasReadBufferPendingData()) + read_list.push_back(connection->socket); + } + + /** Если не было найдено никаких данных, то проверяем, есть ли соединения + * готовые для чтения. + */ + if (read_list.empty()) + { + Poco::Net::Socket::SocketList write_list; + Poco::Net::Socket::SocketList except_list; for (auto & e : replica_map) { Connection * connection = e.second; if (connection != nullptr) - connection->sendCancel(); - } - - cancelled = true; - } - - Connection::Packet ParallelReplicas::drain() - { - if (!cancelled) - throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR); - - Connection::Packet res; - res.type = Protocol::Server::EndOfStream; - - while (hasActiveReplicas()) - { - Connection::Packet packet = receivePacket(); - - switch (packet.type) - { - case Protocol::Server::Data: - case Protocol::Server::Progress: - case Protocol::Server::ProfileInfo: - case Protocol::Server::Totals: - case Protocol::Server::Extremes: - case Protocol::Server::EndOfStream: - break; - - case Protocol::Server::Exception: - default: - /// Если мы получили исключение или неизвестный пакет, сохраняем его. - res = packet; - break; - } - } - - return res; - } - - std::string ParallelReplicas::dumpAddresses() const - { - bool is_first = true; - std::ostringstream os; - for (auto & e : replica_map) - { - const Connection * connection = e.second; - if (connection != nullptr) - { - os << (is_first ? "" : "; ") << connection->getServerAddress(); - if (is_first) { is_first = false; } - } - } - - return os.str(); - } - - void ParallelReplicas::registerReplica(Connection * connection) - { - if (connection == nullptr) - throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR); - auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection)); - if (!res.second) - throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR); - } - - ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getReplicaForReading() - { - ReplicaMap::iterator it; - - if (supports_parallel_execution) - it = waitForReadEvent(); - else - { - it = replica_map.begin(); - if (it->second == nullptr) - it = replica_map.end(); - } - - return it; - } - - ParallelReplicas::ReplicaMap::iterator ParallelReplicas::waitForReadEvent() - { - Poco::Net::Socket::SocketList read_list; - read_list.reserve(active_replica_count); - - /** Сначала проверяем, есть ли данные, которые уже лежат в буфере - * хоть одного соединения. - */ - for (auto & e : replica_map) - { - Connection * connection = e.second; - if ((connection != nullptr) && connection->hasReadBufferPendingData()) read_list.push_back(connection->socket); } - - /** Если не было найдено никаких данных, то проверяем, есть ли соединения - * готовые для чтения. - */ - if (read_list.empty()) - { - Poco::Net::Socket::SocketList write_list; - Poco::Net::Socket::SocketList except_list; - - for (auto & e : replica_map) - { - Connection * connection = e.second; - if (connection != nullptr) - read_list.push_back(connection->socket); - } - int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->poll_interval * 1000000); - if (n == 0) - return replica_map.end(); - } - - auto & socket = read_list[rand() % read_list.size()]; - return replica_map.find(socket.impl()->sockfd()); + int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->poll_interval * 1000000); + if (n == 0) + return replica_map.end(); } - void ParallelReplicas::invalidateReplica(ParallelReplicas::ReplicaMap::iterator it) - { - it->second = nullptr; - --active_replica_count; - } + auto & socket = read_list[rand() % read_list.size()]; + return replica_map.find(socket.impl()->sockfd()); +} + +void ParallelReplicas::invalidateReplica(ParallelReplicas::ReplicaMap::iterator it) +{ + it->second = nullptr; + --active_replica_count; +} + } diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index 336adfa9b78..2ceef5e7e52 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -11,7 +11,7 @@ #include #include -#include +#include #include diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index f18be042568..7da53e13f85 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -255,19 +256,7 @@ bool Cluster::isLocal(const Address & address) /// - её порт совпадает с портом, который слушает сервер; /// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера /// то нужно всегда ходить на этот шард без межпроцессного взаимодействия - const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); - static auto interfaces = Poco::Net::NetworkInterface::list(); - - if (clickhouse_port == address.host_port.port() && - interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(), - [&](const Poco::Net::NetworkInterface & interface) { return interface.address() == address.host_port.host(); })) - { - LOG_INFO(&Poco::Util::Application::instance().logger(), - "Replica with address " << address.host_port.toString() << " will be processed as local."); - return true; - } - - return false; + return isLocalAddress(address.host_port); } } diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index ec39314ba58..3e123550a44 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -495,13 +495,24 @@ const Dictionaries & Context::getDictionaries() const Poco::ScopedLock lock(shared->mutex); if (!shared->dictionaries) + shared->dictionaries = new Dictionaries; + + return *shared->dictionaries; +} + + +const ExternalDictionaries & Context::getExternalDictionaries() const +{ + Poco::ScopedLock lock(shared->mutex); + + if (!shared->external_dictionaries) { if (!this->global_context) throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR); - shared->dictionaries = new Dictionaries{ *this->global_context }; + shared->external_dictionaries = new ExternalDictionaries{*this->global_context}; } - return *shared->dictionaries; + return *shared->external_dictionaries; } diff --git a/dbms/src/Interpreters/DictionaryFactory.cpp b/dbms/src/Interpreters/DictionaryFactory.cpp index 2843a31d08d..d36926a0c29 100644 --- a/dbms/src/Interpreters/DictionaryFactory.cpp +++ b/dbms/src/Interpreters/DictionaryFactory.cpp @@ -2,9 +2,10 @@ #include #include #include +#include #include +#include #include -#include namespace DB { @@ -12,39 +13,48 @@ namespace DB DictionaryPtr DictionaryFactory::create(const std::string & name, Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Context & context) const { - auto dict_struct = DictionaryStructure::fromConfig(config, config_prefix + "structure"); + Poco::Util::AbstractConfiguration::Keys keys; + const auto & layout_prefix = config_prefix + ".layout"; + config.keys(layout_prefix, keys); + if (keys.size() != 1) + throw Exception{ + "Element dictionary.layout should have exactly one child element", + ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG + }; + + const DictionaryStructure dict_struct{config, config_prefix + ".structure"}; auto source_ptr = DictionarySourceFactory::instance().create( - config, config_prefix + "source.", dict_struct, context); + config, config_prefix + ".source", dict_struct, context); - const auto dict_lifetime = DictionaryLifetime::fromConfig(config, config_prefix + "lifetime"); + const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"}; - const auto & layout_prefix = config_prefix + "layout."; + const auto & layout_type = keys.front(); - if (config.has(layout_prefix + "flat")) + if ("flat" == layout_type) { - return ext::make_unique(name, dict_struct, std::move(source_ptr), dict_lifetime); + return std::make_unique(name, dict_struct, std::move(source_ptr), dict_lifetime); } - else if (config.has(layout_prefix + "hashed")) + else if ("hashed" == layout_type) { - return ext::make_unique(name, dict_struct, std::move(source_ptr), dict_lifetime); + return std::make_unique(name, dict_struct, std::move(source_ptr), dict_lifetime); } - else if (config.has(layout_prefix + "cache")) + else if ("cache" == layout_type) { - const auto size = config.getInt(layout_prefix + "cache.size", 0); + const auto size = config.getInt(layout_prefix + ".cache.size"); if (size == 0) throw Exception{ "Dictionary of type 'cache' cannot have size of 0 bytes", ErrorCodes::TOO_SMALL_BUFFER_SIZE }; - throw Exception{ - "Dictionary of type 'cache' is not yet implemented", - ErrorCodes::NOT_IMPLEMENTED - }; + return std::make_unique(name, dict_struct, std::move(source_ptr), dict_lifetime, size); } - throw Exception{"No dictionary type specified", ErrorCodes::BAD_ARGUMENTS}; + throw Exception{ + "Unknown dictionary layout type: " + layout_type, + ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG + }; }; } diff --git a/dbms/src/Interpreters/Dictionaries.cpp b/dbms/src/Interpreters/ExternalDictionaries.cpp similarity index 80% rename from dbms/src/Interpreters/Dictionaries.cpp rename to dbms/src/Interpreters/ExternalDictionaries.cpp index a4e713e68eb..3865b72feeb 100644 --- a/dbms/src/Interpreters/Dictionaries.cpp +++ b/dbms/src/Interpreters/ExternalDictionaries.cpp @@ -1,9 +1,9 @@ -#include +#include #include #include #include -#include #include +#include namespace DB { @@ -29,7 +29,7 @@ namespace } } -void Dictionaries::reloadExternals() +void ExternalDictionaries::reloadImpl() { const auto config_path = getDictionariesConfigPath(Poco::Util::Application::instance().config()); const Poco::File config_file{config_path}; @@ -41,12 +41,15 @@ void Dictionaries::reloadExternals() else { const auto last_modified = config_file.getLastModified(); - if (last_modified > dictionaries_last_modified) + if (last_modified > config_last_modified) { /// definitions of dictionaries may have changed, recreate all of them - dictionaries_last_modified = last_modified; + config_last_modified = last_modified; - const config_ptr_t config{new Poco::Util::XMLConfiguration{config_path}}; + const auto config = new Poco::Util::XMLConfiguration{config_path}; + SCOPE_EXIT( + config->release(); + ); /// get all dictionaries' definitions Poco::Util::AbstractConfiguration::Keys keys; @@ -63,16 +66,14 @@ void Dictionaries::reloadExternals() continue; } - const auto & prefix = key + '.'; - - const auto & name = config->getString(prefix + "name"); + const auto name = config->getString(key + ".name"); if (name.empty()) { LOG_WARNING(log, "dictionary name cannot be empty"); continue; } - auto dict_ptr = DictionaryFactory::instance().create(name, *config, prefix, context); + auto dict_ptr = DictionaryFactory::instance().create(name, *config, key, context); if (!dict_ptr->isCached()) { const auto & lifetime = dict_ptr->getLifetime(); @@ -87,12 +88,12 @@ void Dictionaries::reloadExternals() } } - auto it = external_dictionaries.find(name); + auto it = dictionaries.find(name); /// add new dictionary or update an existing version - if (it == std::end(external_dictionaries)) + if (it == std::end(dictionaries)) { - const std::lock_guard lock{external_dictionaries_mutex}; - external_dictionaries.emplace(name, std::make_shared>(dict_ptr.release())); + const std::lock_guard lock{dictionaries_mutex}; + dictionaries.emplace(name, std::make_shared>(dict_ptr.release())); } else it->second->set(dict_ptr.release()); @@ -106,7 +107,7 @@ void Dictionaries::reloadExternals() } /// periodic update - for (auto & dictionary : external_dictionaries) + for (auto & dictionary : dictionaries) { try { @@ -126,11 +127,11 @@ void Dictionaries::reloadExternals() if (std::chrono::system_clock::now() < update_time) continue; - scope_exit({ + SCOPE_EXIT( /// calculate next update time std::uniform_int_distribution distribution{lifetime.min_sec, lifetime.max_sec}; update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; - }); + ); /// check source modified if (current->getSource()->isModified()) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 837830fa526..b4b1c3e0ee7 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include #include @@ -491,7 +491,7 @@ int Server::main(const std::vector & args) global_context->setMacros(Macros(config(), "macros")); std::string users_config_path = config().getString("users_config", config().getString("config-file", "config.xml")); - auto users_config_reloader = ext::make_unique(users_config_path, global_context.get()); + auto users_config_reloader = std::make_unique(users_config_path, global_context.get()); /// Максимальное количество одновременно выполняющихся запросов. global_context->getProcessList().setMaxSize(config().getInt("max_concurrent_queries", 0)); @@ -536,7 +536,7 @@ int Server::main(const std::vector & args) { const auto profile_events_transmitter = config().getBool("use_graphite", true) - ? ext::make_unique() + ? std::make_unique() : nullptr; const std::string listen_host = config().getString("listen_host", "::"); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 8bfd831fd1b..5064169eb37 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -324,7 +324,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts( { MarkRanges ranges(1, MarkRange(0, parts[i]->size)); - auto input = ext::make_unique( + auto input = std::make_unique( data.getFullPath() + parts[i]->name + '/', DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data, parts[i], ranges, false, nullptr, ""); @@ -348,19 +348,19 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts( switch (data.mode) { case MergeTreeData::Ordinary: - merged_stream = ext::make_unique(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); + merged_stream = std::make_unique(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::Collapsing: - merged_stream = ext::make_unique(src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE); + merged_stream = std::make_unique(src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::Summing: - merged_stream = ext::make_unique(src_streams, data.getSortDescription(), data.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); + merged_stream = std::make_unique(src_streams, data.getSortDescription(), data.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::Aggregating: - merged_stream = ext::make_unique(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); + merged_stream = std::make_unique(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; default: diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 5860b8c326c..bcbae892eaa 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -12,7 +12,7 @@ #include -#include +#include namespace DB { @@ -160,10 +160,15 @@ BlockInputStreams StorageDistributed::read( query, remote_database, remote_table); const auto & modified_query = queryToString(modified_query_ast); + /// Ограничение сетевого трафика, если нужно. + ThrottlerPtr throttler; + if (settings.limits.max_network_bandwidth) + throttler.reset(new Throttler(settings.limits.max_network_bandwidth)); + /// Цикл по шардам. for (auto & conn_pool : cluster.pools) res.emplace_back(new RemoteBlockInputStream{ - conn_pool, modified_query, &new_settings, + conn_pool, modified_query, &new_settings, throttler, external_tables, processed_stage, context}); /// Добавляем запросы к локальному ClickHouse. @@ -234,7 +239,7 @@ bool StorageDistributed::hasColumn(const String & column_name) const void StorageDistributed::createDirectoryMonitor(const std::string & name) { - directory_monitors.emplace(name, ext::make_unique(*this, name)); + directory_monitors.emplace(name, std::make_unique(*this, name)); } void StorageDistributed::createDirectoryMonitors() diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index c8484c491de..8a9ceb09d99 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2107,7 +2107,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, auto zookeeper = getZooKeeper(); const MergeTreeMergeBlocker merge_blocker{merger}; const auto unreplicated_merge_blocker = unreplicated_merger ? - ext::make_unique(*unreplicated_merger) : nullptr; + std::make_unique(*unreplicated_merger) : nullptr; LOG_DEBUG(log, "Doing ALTER"); diff --git a/tools/init.d/template b/tools/init.d/template index b2ecbe66dd8..ca290798493 100755 --- a/tools/init.d/template +++ b/tools/init.d/template @@ -93,6 +93,11 @@ any_runs() if [[ $(running_processes) -gt 0 ]]; then return 0; else return 1; fi } +all_runs() +{ + [[ $(running_processes) -eq $NUMBER_OF_PROCESSES ]] +} + wait4done() { while any_runs; do @@ -108,7 +113,7 @@ start() echo -n "Start $PROGRAM service: " - if [[ $(running_processes) -eq $NUMBER_OF_PROCESSES ]]; then + if all_runs; then echo -n "already running " EXIT_STATUS=1 else @@ -236,7 +241,7 @@ main() restart ;; condstart) - any_runs || start + all_runs || start ;; condstop) any_runs && stop