Merge pull request #37213 from aaapetrenko/master

add TableFunctionMongoDB and tests
This commit is contained in:
Kseniia Sumarokova 2022-07-14 12:09:07 +02:00 committed by GitHub
commit 530dac6487
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 516 additions and 1 deletions

View File

@ -401,11 +401,11 @@ dbms_target_link_libraries (
clickhouse_parsers
ch_contrib::lz4
Poco::JSON
Poco::MongoDB
string_utils
PUBLIC
boost::system
clickhouse_common_io
Poco::MongoDB
)
if (TARGET ch::mysqlxx)

View File

@ -15,6 +15,7 @@
#include <Parsers/ASTLiteral.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/Transforms/MongoDBSource.h>
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
{
@ -86,6 +87,62 @@ void StorageMongoDB::connectIfNotConnected()
}
class StorageMongoDBSink : public SinkToStorage
{
public:
explicit StorageMongoDBSink(
const std::string & collection_name_,
const std::string & db_name_,
const StorageMetadataPtr & metadata_snapshot_,
std::shared_ptr<Poco::MongoDB::Connection> connection_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, collection_name(collection_name_)
, db_name(db_name_)
, metadata_snapshot{metadata_snapshot_}
, connection(connection_)
{
}
String getName() const override { return "StorageMongoDBSink"; }
void consume(Chunk chunk) override
{
Poco::MongoDB::Database db(db_name);
Poco::MongoDB::Document::Ptr index = new Poco::MongoDB::Document();
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
size_t num_rows = block.rows();
size_t num_cols = block.columns();
const auto columns = block.getColumns();
const auto data_types = block.getDataTypes();
const auto data_names = block.getNames();
std::vector<std::string> row(num_cols);
for (const auto i : collections::range(0, num_rows))
{
for (const auto j : collections::range(0, num_cols))
{
WriteBufferFromOwnString ostr;
data_types[j]->getDefaultSerialization()->serializeText(*columns[j], i, ostr, FormatSettings{});
row[j] = ostr.str();
index->add(data_names[j], row[j]);
}
}
Poco::SharedPtr<Poco::MongoDB::InsertRequest> insert_request = db.createInsertRequest(collection_name);
insert_request->documents().push_back(index);
connection->sendRequest(*insert_request);
}
private:
String collection_name;
String db_name;
StorageMetadataPtr metadata_snapshot;
std::shared_ptr<Poco::MongoDB::Connection> connection;
};
Pipe StorageMongoDB::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
@ -109,6 +166,11 @@ Pipe StorageMongoDB::read(
return Pipe(std::make_shared<MongoDBSource>(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size));
}
SinkToStoragePtr StorageMongoDB::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr /* context */)
{
connectIfNotConnected();
return std::make_shared<StorageMongoDBSink>(collection_name, database_name, metadata_snapshot, connection);
}
StorageMongoDBConfiguration StorageMongoDB::getConfiguration(ASTs engine_args, ContextPtr context)
{

View File

@ -39,6 +39,11 @@ public:
size_t max_block_size,
unsigned num_streams) override;
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr context) override;
static StorageMongoDBConfiguration getConfiguration(ASTs engine_args, ContextPtr context);
private:

View File

@ -0,0 +1,104 @@
#include <Common/Exception.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <TableFunctions/TableFunctionMongoDB.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <TableFunctions/registerTableFunctions.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/ColumnsDescription.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StoragePtr TableFunctionMongoDB::executeImpl(const ASTPtr & /*ast_function*/,
ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
auto storage = std::make_shared<StorageMongoDB>(
StorageID(configuration->database, table_name),
configuration->host,
configuration->port,
configuration->database,
configuration->table,
configuration->username,
configuration->password,
configuration->options,
columns,
ConstraintsDescription(),
String{});
storage->startup();
return storage;
}
ColumnsDescription TableFunctionMongoDB::getActualTableStructure(ContextPtr context) const
{
return parseColumnsListFromString(structure, context);
}
void TableFunctionMongoDB::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
const auto & func_args = ast_function->as<ASTFunction &>();
if (!func_args.arguments)
throw Exception("Table function 'mongodb' must have arguments.", ErrorCodes::BAD_ARGUMENTS);
ASTs & args = func_args.arguments->children;
if (args.size() < 6 || args.size() > 7)
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Table function 'mongodb' requires from 6 to 7 parameters: mongodb('host:port', database, collection, 'user', 'password', structure, [, 'options'])");
}
ASTs main_arguments(args.begin(), args.begin() + 5);
for (size_t i = 5; i < args.size(); ++i)
{
if (const auto * ast_func = typeid_cast<const ASTFunction *>(args[i].get()))
{
const auto * args_expr = assert_cast<const ASTExpressionList *>(ast_func->arguments.get());
auto function_args = args_expr->children;
if (function_args.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
auto arg_name = function_args[0]->as<ASTIdentifier>()->name();
if (arg_name == "structure")
structure = checkAndGetLiteralArgument<String>(function_args[1], "structure");
else if (arg_name == "options")
main_arguments.push_back(function_args[1]);
}
else if (i == 5)
{
structure = checkAndGetLiteralArgument<String>(args[i], "structure");
}
else if (i == 6)
{
main_arguments.push_back(args[i]);
}
}
configuration = StorageMongoDB::getConfiguration(main_arguments, context);
}
void registerTableFunctionMongoDB(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionMongoDB>();
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <TableFunctions/ITableFunction.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/StorageMongoDB.h>
namespace DB
{
class TableFunctionMongoDB : public ITableFunction
{
public:
static constexpr auto name = "mongodb";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context,
const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "MongoDB"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
std::optional<StorageMongoDBConfiguration> configuration;
String structure;
};
}

View File

@ -19,6 +19,7 @@ void registerTableFunctions()
registerTableFunctionValues(factory);
registerTableFunctionInput(factory);
registerTableFunctionGenerate(factory);
registerTableFunctionMongoDB(factory);
registerTableFunctionMeiliSearch(factory);

View File

@ -17,6 +17,7 @@ void registerTableFunctionURL(TableFunctionFactory & factory);
void registerTableFunctionValues(TableFunctionFactory & factory);
void registerTableFunctionInput(TableFunctionFactory & factory);
void registerTableFunctionGenerate(TableFunctionFactory & factory);
void registerTableFunctionMongoDB(TableFunctionFactory & factory);
void registerTableFunctionMeiliSearch(TableFunctionFactory & factory);

View File

@ -253,3 +253,30 @@ def test_missing_columns(started_cluster):
result = node.query("SELECT count() FROM simple_mongo_table WHERE isNull(data)")
assert result == "10\n"
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_simple_insert_select(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
simple_mongo_table = db["simple_table"]
node = started_cluster.instances["node"]
node.query("DROP TABLE IF EXISTS simple_mongo_table")
node.query(
"CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse')"
)
node.query("INSERT INTO simple_mongo_table SELECT 1, 'kek'")
assert (
node.query("SELECT data from simple_mongo_table where key = 1").strip() == "kek"
)
node.query("INSERT INTO simple_mongo_table(key) SELECT 12")
assert int(node.query("SELECT count() from simple_mongo_table")) == 2
assert (
node.query("SELECT data from simple_mongo_table where key = 12").strip() == ""
)
node.query("DROP TABLE simple_mongo_table")
simple_mongo_table.drop()

View File

@ -0,0 +1,8 @@
<clickhouse>
<openSSL>
<client>
<!-- For self-signed certificate -->
<verificationMode>none</verificationMode>
</client>
</openSSL>
</clickhouse>

View File

@ -0,0 +1,276 @@
import pymongo
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
@pytest.fixture(scope="module")
def started_cluster(request):
try:
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
with_mongo=True,
main_configs=[
"configs_secure/config.d/ssl_conf.xml",
],
with_mongo_secure=request.param,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_mongo_connection(started_cluster, secure=False, with_credentials=True):
connection_str = ""
if with_credentials:
connection_str = "mongodb://root:clickhouse@localhost:{}".format(
started_cluster.mongo_port
)
else:
connection_str = "mongodb://localhost:{}".format(
started_cluster.mongo_no_cred_port
)
if secure:
connection_str += "/?tls=true&tlsAllowInvalidCertificates=true"
return pymongo.MongoClient(connection_str)
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_simple_select(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
simple_mongo_table = db["simple_table"]
node = started_cluster.instances["node"]
for i in range(0, 100):
node.query(
"INSERT INTO FUNCTION mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String') (key, data) VALUES ({}, '{}')".format(
i, hex(i * i)
)
)
assert (
node.query(
"SELECT COUNT() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
)
== "100\n"
)
assert (
node.query(
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
)
== str(sum(range(0, 100))) + "\n"
)
assert (
node.query(
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', 'key UInt64, data String')"
)
== str(sum(range(0, 100))) + "\n"
)
assert (
node.query(
"SELECT data from mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String') where key = 42"
)
== hex(42 * 42) + "\n"
)
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_complex_data_type(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
incomplete_mongo_table = db["complex_table"]
data = []
for i in range(0, 100):
data.append({"key": i, "data": hex(i * i), "dict": {"a": i, "b": str(i)}})
incomplete_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
assert (
node.query(
"SELECT COUNT() FROM mongodb('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse', structure='key UInt64, data String, dict Map(UInt64, String)')"
)
== "100\n"
)
assert (
node.query(
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse', structure='key UInt64, data String, dict Map(UInt64, String)')"
)
== str(sum(range(0, 100))) + "\n"
)
assert (
node.query(
"SELECT data from mongodb('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse', structure='key UInt64, data String, dict Map(UInt64, String)') where key = 42"
)
== hex(42 * 42) + "\n"
)
incomplete_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_incorrect_data_type(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
strange_mongo_table = db["strange_table"]
data = []
for i in range(0, 100):
data.append({"key": i, "data": hex(i * i), "aaaa": "Hello"})
strange_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
with pytest.raises(QueryRuntimeException):
node.query(
"SELECT aaaa FROM mongodb('mongo1:27017', 'test', 'strange_table', 'root', 'clickhouse', structure='key UInt64, data String')"
)
strange_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [True], indirect=["started_cluster"])
def test_secure_connection(started_cluster):
mongo_connection = get_mongo_connection(started_cluster, secure=True)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
simple_mongo_table = db["simple_table"]
data = []
for i in range(0, 100):
data.append({"key": i, "data": hex(i * i)})
simple_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
assert (
node.query(
"SELECT COUNT() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='ssl=true')"
)
== "100\n"
)
assert (
node.query(
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='ssl=true')"
)
== str(sum(range(0, 100))) + "\n"
)
assert (
node.query(
"SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', 'key UInt64, data String', 'ssl=true')"
)
== str(sum(range(0, 100))) + "\n"
)
assert (
node.query(
"SELECT data from mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='ssl=true') where key = 42"
)
== hex(42 * 42) + "\n"
)
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_predefined_connection_configuration(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
simple_mongo_table = db["simple_table"]
data = []
for i in range(0, 100):
data.append({"key": i, "data": hex(i * i)})
simple_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
assert (
node.query(
"SELECT count() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
)
== "100\n"
)
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_no_credentials(started_cluster):
mongo_connection = get_mongo_connection(started_cluster, with_credentials=False)
db = mongo_connection["test"]
simple_mongo_table = db["simple_table"]
data = []
for i in range(0, 100):
data.append({"key": i, "data": hex(i * i)})
simple_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
assert (
node.query(
"SELECT count() FROM mongodb('mongo2:27017', 'test', 'simple_table', '', '', structure='key UInt64, data String')"
)
== "100\n"
)
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_auth_source(started_cluster):
mongo_connection = get_mongo_connection(started_cluster, with_credentials=False)
admin_db = mongo_connection["admin"]
admin_db.add_user(
"root",
"clickhouse",
roles=[{"role": "userAdminAnyDatabase", "db": "admin"}, "readWriteAnyDatabase"],
)
simple_mongo_table = admin_db["simple_table"]
data = []
for i in range(0, 50):
data.append({"key": i, "data": hex(i * i)})
simple_mongo_table.insert_many(data)
db = mongo_connection["test"]
simple_mongo_table = db["simple_table"]
data = []
for i in range(0, 100):
data.append({"key": i, "data": hex(i * i)})
simple_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
node.query_and_get_error(
"SELECT count() FROM mongodb('mongo2:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
)
assert (
node.query(
"SELECT count() FROM mongodb('mongo2:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='authSource=admin')"
)
== "100\n"
)
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_missing_columns(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
simple_mongo_table = db["simple_table"]
data = []
for i in range(0, 10):
data.append({"key": i, "data": hex(i * i)})
for i in range(0, 10):
data.append({"key": i})
simple_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
result = node.query(
"SELECT count() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data Nullable(String)') WHERE isNull(data)"
)
assert result == "10\n"
simple_mongo_table.drop()