Merge pull request #10915 from ClickHouse/aku/mongodb-uri

Support MongoDB URI
This commit is contained in:
Alexander Kuzmenkov 2020-05-21 23:02:12 +03:00 committed by GitHub
commit fecd84d3c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 103 additions and 50 deletions

View File

@ -8,3 +8,4 @@ services:
MONGO_INITDB_ROOT_PASSWORD: clickhouse
ports:
- 27018:27017
command: --profile=2 --verbose

View File

@ -5,6 +5,8 @@
#include "DictionaryStructure.h"
#include "getDictionaryConfigurationFromAST.h"
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
@ -41,6 +43,9 @@ DictionaryPtr DictionaryFactory::create(
const DictionaryStructure dict_struct{config, config_prefix + ".structure"};
DictionarySourcePtr source_ptr = DictionarySourceFactory::instance().create(name, config, config_prefix + ".source", dict_struct, context, check_source_config);
LOG_TRACE(&Poco::Logger::get("DictionaryFactory"),
"Created dictionary source '" << source_ptr->toString()
<< "' for dictionary '" << name << "'");
const auto & layout_type = keys.front();

View File

@ -8,25 +8,40 @@ namespace DB
void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
{
auto create_table_source = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
const Context & /* context */,
bool /* check_config */) -> DictionarySourcePtr {
return std::make_unique<MongoDBDictionarySource>(dict_struct, config, config_prefix + ".mongodb", sample_block);
auto create_mongo_db_dictionary = [](
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & root_config_prefix,
Block & sample_block,
const Context &,
bool /* check_config */)
{
const auto config_prefix = root_config_prefix + ".mongodb";
return std::make_unique<MongoDBDictionarySource>(dict_struct,
config.getString(config_prefix + ".uri", ""),
config.getString(config_prefix + ".host", ""),
config.getUInt(config_prefix + ".port", 0),
config.getString(config_prefix + ".user", ""),
config.getString(config_prefix + ".password", ""),
config.getString(config_prefix + ".method", ""),
config.getString(config_prefix + ".db", ""),
config.getString(config_prefix + ".collection"),
sample_block);
};
factory.registerSource("mongodb", create_table_source);
factory.registerSource("mongodb", create_mongo_db_dictionary);
}
}
#include <common/logger_useful.h>
#include <Poco/MongoDB/Array.h>
#include <Poco/MongoDB/Connection.h>
#include <Poco/MongoDB/Cursor.h>
#include <Poco/MongoDB/Database.h>
#include <Poco/MongoDB/ObjectId.h>
#include <Poco/URI.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Version.h>
@ -155,6 +170,7 @@ authenticate(Poco::MongoDB::Connection & connection, const std::string & databas
MongoDBDictionarySource::MongoDBDictionarySource(
const DictionaryStructure & dict_struct_,
const std::string & uri_,
const std::string & host_,
UInt16 port_,
const std::string & user_,
@ -164,6 +180,7 @@ MongoDBDictionarySource::MongoDBDictionarySource(
const std::string & collection_,
const Block & sample_block_)
: dict_struct{dict_struct_}
, uri{uri_}
, host{host_}
, port{port_}
, user{user_}
@ -172,43 +189,56 @@ MongoDBDictionarySource::MongoDBDictionarySource(
, db{db_}
, collection{collection_}
, sample_block{sample_block_}
, connection{std::make_shared<Poco::MongoDB::Connection>(host, port)}
, connection{std::make_shared<Poco::MongoDB::Connection>()}
{
if (!user.empty())
if (!uri.empty())
{
#if POCO_VERSION >= 0x01070800
Poco::MongoDB::Database poco_db(db);
if (!poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method))
throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
#else
authenticate(*connection, db, user, password);
#endif
Poco::URI poco_uri(uri);
// Parse database from URI. This is required for correctness -- the
// cursor is created using database name and colleciton name, so we have
// to specify them properly.
db = poco_uri.getPath();
// getPath() may return a leading slash, remove it.
if (!db.empty() && db[0] == '/')
{
db.erase(0, 1);
}
// Parse some other parts from URI, for logging and display purposes.
host = poco_uri.getHost();
port = poco_uri.getPort();
user = poco_uri.getUserInfo();
if (size_t separator = user.find(':'); separator != std::string::npos)
{
user.resize(separator);
}
// Connect with URI.
Poco::MongoDB::Connection::SocketFactory socket_factory;
connection->connect(uri, socket_factory);
}
else
{
// Connect with host/port/user/etc.
connection->connect(host, port);
if (!user.empty())
{
#if POCO_VERSION >= 0x01070800
Poco::MongoDB::Database poco_db(db);
if (!poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method))
throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
#else
authenticate(*connection, db, user, password);
#endif
}
}
}
MongoDBDictionarySource::MongoDBDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block_)
: MongoDBDictionarySource(
dict_struct_,
config.getString(config_prefix + ".host"),
config.getUInt(config_prefix + ".port"),
config.getString(config_prefix + ".user", ""),
config.getString(config_prefix + ".password", ""),
config.getString(config_prefix + ".method", ""),
config.getString(config_prefix + ".db", ""),
config.getString(config_prefix + ".collection"),
sample_block_)
{
}
MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource & other)
: MongoDBDictionarySource{
other.dict_struct, other.host, other.port, other.user, other.password, other.method, other.db, other.collection, other.sample_block}
other.dict_struct, other.uri, other.host, other.port, other.user, other.password, other.method, other.db, other.collection, other.sample_block}
{
}

View File

@ -29,8 +29,10 @@ namespace ErrorCodes
/// Allows loading dictionaries from a MongoDB collection
class MongoDBDictionarySource final : public IDictionarySource
{
public:
MongoDBDictionarySource(
const DictionaryStructure & dict_struct_,
const std::string & uri_,
const std::string & host_,
UInt16 port_,
const std::string & user_,
@ -40,13 +42,6 @@ class MongoDBDictionarySource final : public IDictionarySource
const std::string & collection_,
const Block & sample_block_);
public:
MongoDBDictionarySource(
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block);
MongoDBDictionarySource(const MongoDBDictionarySource & other);
~MongoDBDictionarySource() override;
@ -76,12 +71,13 @@ public:
private:
const DictionaryStructure dict_struct;
const std::string host;
const UInt16 port;
const std::string user;
const std::string uri;
std::string host;
UInt16 port;
std::string user;
const std::string password;
const std::string method;
const std::string db;
std::string db;
const std::string collection;
Block sample_block;

View File

@ -178,6 +178,26 @@ class SourceMongo(ExternalSource):
result = tbl.insert_many(to_insert)
class SourceMongoURI(SourceMongo):
def compatible_with_layout(self, layout):
# It is enough to test one layout for this dictionary, since we're
# only testing that the connection with URI works.
return layout.name == 'flat'
def get_source_str(self, table_name):
return '''
<mongodb>
<uri>mongodb://{user}:{password}@{host}:{port}/test</uri>
<collection>{tbl}</collection>
</mongodb>
'''.format(
host=self.docker_hostname,
port=self.docker_port,
user=self.user,
password=self.password,
tbl=table_name,
)
class SourceClickHouse(ExternalSource):
def get_source_str(self, table_name):

View File

@ -4,7 +4,7 @@ import os
from helpers.cluster import ClickHouseCluster
from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed
from external_sources import SourceMongo, SourceHTTP, SourceHTTPS, SourceRedis
from external_sources import SourceMongo, SourceMongoURI, SourceHTTP, SourceHTTPS, SourceRedis
import math
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -106,9 +106,9 @@ VALUES = {
LAYOUTS = [
Layout("flat"),
Layout("hashed"),
Layout("cache"),
Layout("flat"),
Layout("complex_key_hashed"),
Layout("complex_key_cache"),
Layout("range_hashed"),
@ -118,6 +118,7 @@ LAYOUTS = [
SOURCES = [
SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMongoURI("MongoDB_URI", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"),
SourceClickHouse("RemoteClickHouse", "localhost", "9000", "clickhouse1", "9000", "default", ""),
SourceClickHouse("LocalClickHouse", "localhost", "9000", "node", "9000", "default", ""),