From 7addd501febbee4a1510494f47e1bc63938e61a5 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Thu, 29 Jan 2015 14:51:52 +0300 Subject: [PATCH] dbms: allow use of clickhouse as a dictionary source [#METR-13298] --- .../Dictionaries/ClickhouseDictionarySource.h | 109 ++++++++++++++++++ .../DB/Dictionaries/DictionarySourceFactory.h | 6 + .../DB/Dictionaries/MysqlDictionarySource.h | 2 - 3 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h diff --git a/dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h b/dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h new file mode 100644 index 00000000000..fdb360edc47 --- /dev/null +++ b/dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h @@ -0,0 +1,109 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class ClickhouseDictionarySource final : public IDictionarySource +{ + static const auto max_block_size = 8192; + static const auto max_connections = 1; + +public: + ClickhouseDictionarySource(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, + Block & sample_block, const Context & context) + : host{config.getString(config_prefix + "host")}, + port(config.getInt(config_prefix + "port")), + is_local{isLocal(host, port)}, + pool{is_local ? nullptr : ext::make_unique( + max_connections, host, port, + config.getString(config_prefix + "db", ""), + config.getString(config_prefix + "user", ""), + config.getString(config_prefix + "password", ""), + context.getDataTypeFactory(), + "ClickhouseDictionarySource") + }, + sample_block{sample_block}, context(context), + table{config.getString(config_prefix + "table")}, + load_all_query{composeLoadAllQuery(sample_block, table)} + {} + +private: + BlockInputStreamPtr loadAll() override + { + if (is_local) + return executeQuery(load_all_query, context).in; + return new RemoteBlockInputStream{pool.get(), load_all_query, nullptr}; + } + + BlockInputStreamPtr loadId(const std::uint64_t id) override + { + throw Exception{ + "Method unsupported", + ErrorCodes::NOT_IMPLEMENTED + }; + } + + BlockInputStreamPtr loadIds(const std::vector ids) override + { + throw Exception{ + "Method unsupported", + ErrorCodes::NOT_IMPLEMENTED + }; + } + + static std::string composeLoadAllQuery(const Block & block, const std::string & table) + { + std::string query{"SELECT "}; + + auto first = true; + for (const auto idx : ext::range(0, block.columns())) + { + if (!first) + query += ", "; + + query += block.getByPosition(idx).name; + first = false; + } + + query += " FROM " + table + ';'; + + return query; + } + + static bool isLocal(const std::string & host, const UInt16 port) + { + const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); + static auto interfaces = Poco::Net::NetworkInterface::list(); + + if (clickhouse_port == port) + { + return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(), + [&] (const Poco::Net::NetworkInterface & interface) { + return interface.address() == Poco::Net::IPAddress(host); + }); + } + + return false; + } + + const std::string host; + const UInt16 port; + const bool is_local; + std::unique_ptr pool; + Block sample_block; + Context context; + const std::string table; + const std::string load_all_query; +}; + +} diff --git a/dbms/include/DB/Dictionaries/DictionarySourceFactory.h b/dbms/include/DB/Dictionaries/DictionarySourceFactory.h index 63ccadcc005..aa322bcc20e 100644 --- a/dbms/include/DB/Dictionaries/DictionarySourceFactory.h +++ b/dbms/include/DB/Dictionaries/DictionarySourceFactory.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -57,6 +58,11 @@ public: { return ext::make_unique(config, config_prefix + "mysql.", sample_block, context); } + else if (config.has(config_prefix + "clickhouse")) + { + return ext::make_unique(config, config_prefix + "clickhouse.", + sample_block, context); + } throw Exception{"unsupported source type"}; } diff --git a/dbms/include/DB/Dictionaries/MysqlDictionarySource.h b/dbms/include/DB/Dictionaries/MysqlDictionarySource.h index 2a340ccc6c1..c55b552674a 100644 --- a/dbms/include/DB/Dictionaries/MysqlDictionarySource.h +++ b/dbms/include/DB/Dictionaries/MysqlDictionarySource.h @@ -2,12 +2,10 @@ #include #include -#include #include #include #include #include -#include #include namespace DB