mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 01:22:04 +00:00
dbms: allow use of clickhouse as a dictionary source [#METR-13298]
This commit is contained in:
parent
95bb52b8e5
commit
7addd501fe
109
dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h
Normal file
109
dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h
Normal file
@ -0,0 +1,109 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/Interpreters/Context.h>
|
||||
#include <DB/Dictionaries/IDictionarySource.h>
|
||||
#include <DB/Client/ConnectionPool.h>
|
||||
#include <statdaemons/ext/range.hpp>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <DB/DataStreams/RemoteBlockInputStream.h>
|
||||
#include <DB/Interpreters/InterpreterSelectQuery.h>
|
||||
#include <DB/Interpreters/executeQuery.h>
|
||||
#include <Poco/Net/NetworkInterface.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ClickhouseDictionarySource final : public IDictionarySource
|
||||
{
|
||||
static const auto max_block_size = 8192;
|
||||
static const auto max_connections = 1;
|
||||
|
||||
public:
|
||||
ClickhouseDictionarySource(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
|
||||
Block & sample_block, const Context & context)
|
||||
: host{config.getString(config_prefix + "host")},
|
||||
port(config.getInt(config_prefix + "port")),
|
||||
is_local{isLocal(host, port)},
|
||||
pool{is_local ? nullptr : ext::make_unique<ConnectionPool>(
|
||||
max_connections, host, port,
|
||||
config.getString(config_prefix + "db", ""),
|
||||
config.getString(config_prefix + "user", ""),
|
||||
config.getString(config_prefix + "password", ""),
|
||||
context.getDataTypeFactory(),
|
||||
"ClickhouseDictionarySource")
|
||||
},
|
||||
sample_block{sample_block}, context(context),
|
||||
table{config.getString(config_prefix + "table")},
|
||||
load_all_query{composeLoadAllQuery(sample_block, table)}
|
||||
{}
|
||||
|
||||
private:
|
||||
BlockInputStreamPtr loadAll() override
|
||||
{
|
||||
if (is_local)
|
||||
return executeQuery(load_all_query, context).in;
|
||||
return new RemoteBlockInputStream{pool.get(), load_all_query, nullptr};
|
||||
}
|
||||
|
||||
BlockInputStreamPtr loadId(const std::uint64_t id) override
|
||||
{
|
||||
throw Exception{
|
||||
"Method unsupported",
|
||||
ErrorCodes::NOT_IMPLEMENTED
|
||||
};
|
||||
}
|
||||
|
||||
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override
|
||||
{
|
||||
throw Exception{
|
||||
"Method unsupported",
|
||||
ErrorCodes::NOT_IMPLEMENTED
|
||||
};
|
||||
}
|
||||
|
||||
static std::string composeLoadAllQuery(const Block & block, const std::string & table)
|
||||
{
|
||||
std::string query{"SELECT "};
|
||||
|
||||
auto first = true;
|
||||
for (const auto idx : ext::range(0, block.columns()))
|
||||
{
|
||||
if (!first)
|
||||
query += ", ";
|
||||
|
||||
query += block.getByPosition(idx).name;
|
||||
first = false;
|
||||
}
|
||||
|
||||
query += " FROM " + table + ';';
|
||||
|
||||
return query;
|
||||
}
|
||||
|
||||
static bool isLocal(const std::string & host, const UInt16 port)
|
||||
{
|
||||
const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0);
|
||||
static auto interfaces = Poco::Net::NetworkInterface::list();
|
||||
|
||||
if (clickhouse_port == port)
|
||||
{
|
||||
return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
|
||||
[&] (const Poco::Net::NetworkInterface & interface) {
|
||||
return interface.address() == Poco::Net::IPAddress(host);
|
||||
});
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
const std::string host;
|
||||
const UInt16 port;
|
||||
const bool is_local;
|
||||
std::unique_ptr<ConnectionPool> pool;
|
||||
Block sample_block;
|
||||
Context context;
|
||||
const std::string table;
|
||||
const std::string load_all_query;
|
||||
};
|
||||
|
||||
}
|
@ -4,6 +4,7 @@
|
||||
#include <DB/Dictionaries/DictionaryStructure.h>
|
||||
#include <DB/Dictionaries/FileDictionarySource.h>
|
||||
#include <DB/Dictionaries/MysqlDictionarySource.h>
|
||||
#include <DB/Dictionaries/ClickhouseDictionarySource.h>
|
||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||
#include <Yandex/singleton.h>
|
||||
#include <statdaemons/ext/memory.hpp>
|
||||
@ -57,6 +58,11 @@ public:
|
||||
{
|
||||
return ext::make_unique<MysqlDictionarySource>(config, config_prefix + "mysql.", sample_block, context);
|
||||
}
|
||||
else if (config.has(config_prefix + "clickhouse"))
|
||||
{
|
||||
return ext::make_unique<ClickhouseDictionarySource>(config, config_prefix + "clickhouse.",
|
||||
sample_block, context);
|
||||
}
|
||||
|
||||
throw Exception{"unsupported source type"};
|
||||
}
|
||||
|
@ -2,12 +2,10 @@
|
||||
|
||||
#include <DB/Interpreters/Context.h>
|
||||
#include <DB/Dictionaries/MysqlBlockInputStream.h>
|
||||
#include <DB/Dictionaries/DictionaryStructure.h>
|
||||
#include <DB/Dictionaries/IDictionarySource.h>
|
||||
#include <DB/Dictionaries/config_ptr_t.h>
|
||||
#include <statdaemons/ext/range.hpp>
|
||||
#include <mysqlxx/Pool.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Poco/Util/LayeredConfiguration.h>
|
||||
|
||||
namespace DB
|
||||
|
Loading…
Reference in New Issue
Block a user