mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #49225 from MikhailBurdukov/mongo_dict_tls
Connection options for MongoDB dictionaries
This commit is contained in:
commit
7d867d7632
@ -1658,6 +1658,7 @@ Example of settings:
|
||||
<password></password>
|
||||
<db>test</db>
|
||||
<collection>dictionary_source</collection>
|
||||
<options>ssl=true</options>
|
||||
</mongodb>
|
||||
</source>
|
||||
```
|
||||
@ -1672,6 +1673,7 @@ SOURCE(MONGODB(
|
||||
password ''
|
||||
db 'test'
|
||||
collection 'dictionary_source'
|
||||
options 'ssl=true'
|
||||
))
|
||||
```
|
||||
|
||||
@ -1683,6 +1685,8 @@ Setting fields:
|
||||
- `password` – Password of the MongoDB user.
|
||||
- `db` – Name of the database.
|
||||
- `collection` – Name of the collection.
|
||||
- `options` - MongoDB connection string options (optional parameter).
|
||||
|
||||
|
||||
### Redis
|
||||
|
||||
|
@ -3,13 +3,13 @@
|
||||
#include "DictionaryStructure.h"
|
||||
#include "registerDictionaries.h"
|
||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
||||
|
||||
#include <Storages/StorageMongoDBSocketFactory.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
|
||||
"host", "port", "user", "password", "db", "database", "uri", "collection", "name", "method"};
|
||||
"host", "port", "user", "password", "db", "database", "uri", "collection", "name", "method", "options"};
|
||||
|
||||
void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
|
||||
{
|
||||
@ -51,6 +51,7 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
|
||||
config.getString(config_prefix + ".method", ""),
|
||||
configuration.database,
|
||||
config.getString(config_prefix + ".collection"),
|
||||
config.getString(config_prefix + ".options", ""),
|
||||
sample_block);
|
||||
};
|
||||
|
||||
@ -98,6 +99,7 @@ MongoDBDictionarySource::MongoDBDictionarySource(
|
||||
const std::string & method_,
|
||||
const std::string & db_,
|
||||
const std::string & collection_,
|
||||
const std::string & options_,
|
||||
const Block & sample_block_)
|
||||
: dict_struct{dict_struct_}
|
||||
, uri{uri_}
|
||||
@ -108,13 +110,15 @@ MongoDBDictionarySource::MongoDBDictionarySource(
|
||||
, method{method_}
|
||||
, db{db_}
|
||||
, collection{collection_}
|
||||
, options(options_)
|
||||
, sample_block{sample_block_}
|
||||
, connection{std::make_shared<Poco::MongoDB::Connection>()}
|
||||
{
|
||||
|
||||
StorageMongoDBSocketFactory socket_factory;
|
||||
if (!uri.empty())
|
||||
{
|
||||
// Connect with URI.
|
||||
Poco::MongoDB::Connection::SocketFactory socket_factory;
|
||||
connection->connect(uri, socket_factory);
|
||||
|
||||
Poco::URI poco_uri(connection->uri());
|
||||
@ -140,8 +144,10 @@ MongoDBDictionarySource::MongoDBDictionarySource(
|
||||
}
|
||||
else
|
||||
{
|
||||
// Connect with host/port/user/etc.
|
||||
connection->connect(host, port);
|
||||
// Connect with host/port/user/etc through constructing the uri
|
||||
std::string uri_constructed("mongodb://" + host + ":" + std::to_string(port) + "/" + db + (options.empty() ? "" : "?" + options));
|
||||
connection->connect(uri_constructed, socket_factory);
|
||||
|
||||
if (!user.empty())
|
||||
{
|
||||
Poco::MongoDB::Database poco_db(db);
|
||||
@ -154,7 +160,9 @@ MongoDBDictionarySource::MongoDBDictionarySource(
|
||||
|
||||
MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource & other)
|
||||
: MongoDBDictionarySource{
|
||||
other.dict_struct, other.uri, other.host, other.port, other.user, other.password, other.method, other.db, other.collection, other.sample_block}
|
||||
other.dict_struct, other.uri, other.host, other.port, other.user, other.password, other.method, other.db,
|
||||
other.collection, other.options, other.sample_block
|
||||
}
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -41,6 +41,7 @@ public:
|
||||
const std::string & method_,
|
||||
const std::string & db_,
|
||||
const std::string & collection_,
|
||||
const std::string & options,
|
||||
const Block & sample_block_);
|
||||
|
||||
MongoDBDictionarySource(const MongoDBDictionarySource & other);
|
||||
@ -80,6 +81,7 @@ private:
|
||||
const std::string method;
|
||||
std::string db;
|
||||
const std::string collection;
|
||||
const std::string options;
|
||||
Block sample_block;
|
||||
|
||||
std::shared_ptr<Poco::MongoDB::Connection> connection;
|
||||
|
@ -161,6 +161,29 @@ class SourceMySQL(ExternalSource):
|
||||
|
||||
|
||||
class SourceMongo(ExternalSource):
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
internal_hostname,
|
||||
internal_port,
|
||||
docker_hostname,
|
||||
docker_port,
|
||||
user,
|
||||
password,
|
||||
secure=False,
|
||||
):
|
||||
ExternalSource.__init__(
|
||||
self,
|
||||
name,
|
||||
internal_hostname,
|
||||
internal_port,
|
||||
docker_hostname,
|
||||
docker_port,
|
||||
user,
|
||||
password,
|
||||
)
|
||||
self.secure = secure
|
||||
|
||||
def get_source_str(self, table_name):
|
||||
return """
|
||||
<mongodb>
|
||||
@ -170,6 +193,7 @@ class SourceMongo(ExternalSource):
|
||||
<password>{password}</password>
|
||||
<db>test</db>
|
||||
<collection>{tbl}</collection>
|
||||
{options}
|
||||
</mongodb>
|
||||
""".format(
|
||||
host=self.docker_hostname,
|
||||
@ -177,6 +201,7 @@ class SourceMongo(ExternalSource):
|
||||
user=self.user,
|
||||
password=self.password,
|
||||
tbl=table_name,
|
||||
options="<options>ssl=true</options>" if self.secure else "",
|
||||
)
|
||||
|
||||
def prepare(self, structure, table_name, cluster):
|
||||
@ -186,6 +211,8 @@ class SourceMongo(ExternalSource):
|
||||
user=self.user,
|
||||
password=self.password,
|
||||
)
|
||||
if self.secure:
|
||||
connection_str += "/?tls=true&tlsAllowInvalidCertificates=true"
|
||||
self.connection = pymongo.MongoClient(connection_str)
|
||||
self.converters = {}
|
||||
for field in structure.get_all_fields():
|
||||
@ -228,7 +255,7 @@ class SourceMongoURI(SourceMongo):
|
||||
def get_source_str(self, table_name):
|
||||
return """
|
||||
<mongodb>
|
||||
<uri>mongodb://{user}:{password}@{host}:{port}/test</uri>
|
||||
<uri>mongodb://{user}:{password}@{host}:{port}/test{options}</uri>
|
||||
<collection>{tbl}</collection>
|
||||
</mongodb>
|
||||
""".format(
|
||||
@ -237,6 +264,7 @@ class SourceMongoURI(SourceMongo):
|
||||
user=self.user,
|
||||
password=self.password,
|
||||
tbl=table_name,
|
||||
options="?ssl=true" if self.secure else "",
|
||||
)
|
||||
|
||||
|
||||
|
@ -0,0 +1,8 @@
|
||||
<clickhouse>
|
||||
<openSSL>
|
||||
<client>
|
||||
<!-- For self-signed certificate -->
|
||||
<verificationMode>none</verificationMode>
|
||||
</client>
|
||||
</openSSL>
|
||||
</clickhouse>
|
@ -17,14 +17,71 @@ ranged_tester = None
|
||||
test_name = "mongo"
|
||||
|
||||
|
||||
def setup_module(module):
|
||||
global cluster
|
||||
global node
|
||||
global simple_tester
|
||||
global complex_tester
|
||||
global ranged_tester
|
||||
@pytest.fixture(scope="module")
|
||||
def secure_connection(request):
|
||||
return request.param
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def cluster(secure_connection):
|
||||
return ClickHouseCluster(__file__)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def source(secure_connection, cluster):
|
||||
return SourceMongo(
|
||||
"MongoDB",
|
||||
"localhost",
|
||||
cluster.mongo_port,
|
||||
cluster.mongo_host,
|
||||
"27017",
|
||||
"root",
|
||||
"clickhouse",
|
||||
secure=secure_connection,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def simple_tester(source):
|
||||
tester = SimpleLayoutTester(test_name)
|
||||
tester.cleanup()
|
||||
tester.create_dictionaries(source)
|
||||
return tester
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def complex_tester(source):
|
||||
tester = ComplexLayoutTester(test_name)
|
||||
tester.create_dictionaries(source)
|
||||
return tester
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def ranged_tester(source):
|
||||
tester = RangedLayoutTester(test_name)
|
||||
tester.create_dictionaries(source)
|
||||
return tester
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def main_config(secure_connection):
|
||||
main_config = []
|
||||
if secure_connection:
|
||||
main_config.append(os.path.join("configs", "disable_ssl_verification.xml"))
|
||||
else:
|
||||
main_config.append(os.path.join("configs", "ssl_verification.xml"))
|
||||
return main_config
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster(
|
||||
secure_connection,
|
||||
cluster,
|
||||
main_config,
|
||||
simple_tester,
|
||||
ranged_tester,
|
||||
complex_tester,
|
||||
):
|
||||
SOURCE = SourceMongo(
|
||||
"MongoDB",
|
||||
"localhost",
|
||||
@ -33,35 +90,18 @@ def setup_module(module):
|
||||
"27017",
|
||||
"root",
|
||||
"clickhouse",
|
||||
secure=secure_connection,
|
||||
)
|
||||
|
||||
simple_tester = SimpleLayoutTester(test_name)
|
||||
simple_tester.cleanup()
|
||||
simple_tester.create_dictionaries(SOURCE)
|
||||
|
||||
complex_tester = ComplexLayoutTester(test_name)
|
||||
complex_tester.create_dictionaries(SOURCE)
|
||||
|
||||
ranged_tester = RangedLayoutTester(test_name)
|
||||
ranged_tester.create_dictionaries(SOURCE)
|
||||
# Since that all .xml configs were created
|
||||
|
||||
main_configs = []
|
||||
main_configs.append(os.path.join("configs", "disable_ssl_verification.xml"))
|
||||
|
||||
dictionaries = simple_tester.list_dictionaries()
|
||||
|
||||
node = cluster.add_instance(
|
||||
"node", main_configs=main_configs, dictionaries=dictionaries, with_mongo=True
|
||||
"node",
|
||||
main_configs=main_config,
|
||||
dictionaries=dictionaries,
|
||||
with_mongo=True,
|
||||
with_mongo_secure=secure_connection,
|
||||
)
|
||||
|
||||
|
||||
def teardown_module(module):
|
||||
simple_tester.cleanup()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
@ -75,16 +115,25 @@ def started_cluster():
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("secure_connection", [False], indirect=["secure_connection"])
|
||||
@pytest.mark.parametrize("layout_name", sorted(LAYOUTS_SIMPLE))
|
||||
def test_simple(started_cluster, layout_name):
|
||||
simple_tester.execute(layout_name, node)
|
||||
def test_simple(secure_connection, started_cluster, layout_name, simple_tester):
|
||||
simple_tester.execute(layout_name, started_cluster.instances["node"])
|
||||
|
||||
|
||||
@pytest.mark.parametrize("secure_connection", [False], indirect=["secure_connection"])
|
||||
@pytest.mark.parametrize("layout_name", sorted(LAYOUTS_COMPLEX))
|
||||
def test_complex(started_cluster, layout_name):
|
||||
complex_tester.execute(layout_name, node)
|
||||
def test_complex(secure_connection, started_cluster, layout_name, complex_tester):
|
||||
complex_tester.execute(layout_name, started_cluster.instances["node"])
|
||||
|
||||
|
||||
@pytest.mark.parametrize("secure_connection", [False], indirect=["secure_connection"])
|
||||
@pytest.mark.parametrize("layout_name", sorted(LAYOUTS_RANGED))
|
||||
def test_ranged(started_cluster, layout_name):
|
||||
ranged_tester.execute(layout_name, node)
|
||||
def test_ranged(secure_connection, started_cluster, layout_name, ranged_tester):
|
||||
ranged_tester.execute(layout_name, started_cluster.instances["node"])
|
||||
|
||||
|
||||
@pytest.mark.parametrize("secure_connection", [True], indirect=["secure_connection"])
|
||||
@pytest.mark.parametrize("layout_name", sorted(LAYOUTS_SIMPLE))
|
||||
def test_simple_ssl(secure_connection, started_cluster, layout_name, simple_tester):
|
||||
simple_tester.execute(layout_name, started_cluster.instances["node"])
|
||||
|
@ -8,25 +8,22 @@ from helpers.cluster import ClickHouseCluster
|
||||
from helpers.dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
|
||||
from helpers.external_sources import SourceMongoURI
|
||||
|
||||
SOURCE = None
|
||||
cluster = None
|
||||
node = None
|
||||
simple_tester = None
|
||||
complex_tester = None
|
||||
ranged_tester = None
|
||||
test_name = "mongo_uri"
|
||||
|
||||
|
||||
def setup_module(module):
|
||||
global cluster
|
||||
global node
|
||||
global simple_tester
|
||||
global complex_tester
|
||||
global ranged_tester
|
||||
@pytest.fixture(scope="module")
|
||||
def secure_connection(request):
|
||||
return request.param
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
SOURCE = SourceMongoURI(
|
||||
@pytest.fixture(scope="module")
|
||||
def cluster(secure_connection):
|
||||
return ClickHouseCluster(__file__)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def source(secure_connection, cluster):
|
||||
return SourceMongoURI(
|
||||
"MongoDB",
|
||||
"localhost",
|
||||
cluster.mongo_port,
|
||||
@ -34,52 +31,55 @@ def setup_module(module):
|
||||
"27017",
|
||||
"root",
|
||||
"clickhouse",
|
||||
secure=secure_connection,
|
||||
)
|
||||
|
||||
simple_tester = SimpleLayoutTester(test_name)
|
||||
simple_tester.cleanup()
|
||||
simple_tester.create_dictionaries(SOURCE)
|
||||
|
||||
complex_tester = ComplexLayoutTester(test_name)
|
||||
complex_tester.create_dictionaries(SOURCE)
|
||||
@pytest.fixture(scope="module")
|
||||
def simple_tester(source):
|
||||
tester = SimpleLayoutTester(test_name)
|
||||
tester.cleanup()
|
||||
tester.create_dictionaries(source)
|
||||
return tester
|
||||
|
||||
ranged_tester = RangedLayoutTester(test_name)
|
||||
ranged_tester.create_dictionaries(SOURCE)
|
||||
# Since that all .xml configs were created
|
||||
|
||||
main_configs = []
|
||||
main_configs.append(os.path.join("configs", "disable_ssl_verification.xml"))
|
||||
@pytest.fixture(scope="module")
|
||||
def main_config(secure_connection):
|
||||
main_config = []
|
||||
if secure_connection:
|
||||
main_config.append(os.path.join("configs", "disable_ssl_verification.xml"))
|
||||
else:
|
||||
main_config.append(os.path.join("configs", "ssl_verification.xml"))
|
||||
return main_config
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster(secure_connection, cluster, main_config, simple_tester):
|
||||
dictionaries = simple_tester.list_dictionaries()
|
||||
|
||||
node = cluster.add_instance(
|
||||
"uri_node",
|
||||
main_configs=main_configs,
|
||||
main_configs=main_config,
|
||||
dictionaries=dictionaries,
|
||||
with_mongo=True,
|
||||
with_mongo_secure=secure_connection,
|
||||
)
|
||||
|
||||
|
||||
def teardown_module(module):
|
||||
simple_tester.cleanup()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
simple_tester.prepare(cluster)
|
||||
complex_tester.prepare(cluster)
|
||||
ranged_tester.prepare(cluster)
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
# See comment in SourceMongoURI
|
||||
@pytest.mark.parametrize("secure_connection", [False], indirect=["secure_connection"])
|
||||
@pytest.mark.parametrize("layout_name", ["flat"])
|
||||
def test_simple(started_cluster, layout_name):
|
||||
simple_tester.execute(layout_name, node)
|
||||
def test_simple(secure_connection, started_cluster, simple_tester, layout_name):
|
||||
simple_tester.execute(layout_name, started_cluster.instances["uri_node"])
|
||||
|
||||
|
||||
@pytest.mark.parametrize("secure_connection", [True], indirect=["secure_connection"])
|
||||
@pytest.mark.parametrize("layout_name", ["flat"])
|
||||
def test_simple_ssl(secure_connection, started_cluster, simple_tester, layout_name):
|
||||
simple_tester.execute(layout_name, started_cluster.instances["uri_node"])
|
||||
|
Loading…
Reference in New Issue
Block a user