This commit is contained in:
kssenii 2021-12-09 14:40:51 +00:00
parent 75fef5d3e4
commit f5a77fca39
9 changed files with 52 additions and 14 deletions

View File

@ -9,3 +9,9 @@ services:
ports:
- ${MONGO_EXTERNAL_PORT}:${MONGO_INTERNAL_PORT}
command: --profile=2 --verbose
mongo2:
image: mongo:latest
restart: always
ports:
- "27018:27017"

View File

@ -601,6 +601,7 @@
M(631, UNKNOWN_FILE_SIZE) \
M(632, UNEXPECTED_DATA_AFTER_PARSED_VALUE) \
M(633, QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW) \
M(634, MONGODB_ERROR) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -36,6 +36,7 @@ namespace ErrorCodes
extern const int MONGODB_CANNOT_AUTHENTICATE;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int UNKNOWN_TYPE;
extern const int MONGODB_ERROR;
}
@ -327,6 +328,14 @@ Chunk MongoDBSource::generate()
for (auto & document : response.documents())
{
if (document->exists("ok") && document->exists("$err")
&& document->exists("code") && document->getInteger("ok") == 0)
{
auto code = document->getInteger("code");
const Poco::MongoDB::Element::Ptr value = document->get("$err");
auto message = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(*value).value();
throw Exception(ErrorCodes::MONGODB_ERROR, "Got error from MongoDB: {}, code: {}", message, code);
}
++num_rows;
for (const auto idx : collections::range(0, size))

View File

@ -86,7 +86,7 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
configuration.username = config.getString(collection_prefix + ".user", "");
configuration.password = config.getString(collection_prefix + ".password", "");
configuration.database = config.getString(collection_prefix + ".database", "");
configuration.table = config.getString(collection_prefix + ".table", "");
configuration.table = config.getString(collection_prefix + ".table", config.getString(collection_prefix + ".collection", ""));
configuration.schema = config.getString(collection_prefix + ".schema", "");
configuration.addresses_expr = config.getString(collection_prefix + ".addresses_expr", "");

View File

@ -40,7 +40,6 @@ struct StorageMySQLConfiguration : ExternalDataSourceConfiguration
struct StorageMongoDBConfiguration : ExternalDataSourceConfiguration
{
String collection;
String options;
};

View File

@ -67,9 +67,12 @@ void StorageMongoDB::connectIfNotConnected()
if (!authenticated)
{
# if POCO_VERSION >= 0x01070800
Poco::MongoDB::Database poco_db(database_name);
if (!poco_db.authenticate(*connection, username, password, Poco::MongoDB::Database::AUTH_SCRAM_SHA1))
throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
if (!username.empty() && !password.empty())
{
Poco::MongoDB::Database poco_db(database_name);
if (!poco_db.authenticate(*connection, username, password, Poco::MongoDB::Database::AUTH_SCRAM_SHA1))
throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
}
# else
authenticate(*connection, database_name, username, password);
# endif
@ -112,9 +115,7 @@ StorageMongoDBConfiguration StorageMongoDB::getConfiguration(ASTs engine_args, C
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "collection")
configuration.collection = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else if (arg_name == "options")
if (arg_name == "options")
configuration.options = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -139,7 +140,7 @@ StorageMongoDBConfiguration StorageMongoDB::getConfiguration(ASTs engine_args, C
configuration.host = parsed_host_port.first;
configuration.port = parsed_host_port.second;
configuration.database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
configuration.collection = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
configuration.table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
configuration.username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
configuration.password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
@ -163,7 +164,7 @@ void registerStorageMongoDB(StorageFactory & factory)
configuration.host,
configuration.port,
configuration.database,
configuration.collection,
configuration.table,
configuration.username,
configuration.password,
configuration.options,

View File

@ -725,6 +725,8 @@ class ClickHouseCluster:
env_variables['MONGO_HOST'] = self.mongo_host
env_variables['MONGO_EXTERNAL_PORT'] = str(self.mongo_port)
env_variables['MONGO_INTERNAL_PORT'] = "27017"
env_variables['MONGO_EXTERNAL_PORT_2'] = "27018"
env_variables['MONGO_INTERNAL_PORT_2'] = "27017"
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_mongo.yml')])
self.base_mongo_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_mongo.yml')]
@ -2107,7 +2109,7 @@ class ClickHouseInstance:
except Exception as e:
logging.warning(f"Current start attempt failed. Will kill {pid} just in case.")
self.exec_in_container(["bash", "-c", f"kill -9 {pid}"], user='root', nothrow=True)
time.sleep(time_to_sleep)
time.sleep(time_to_sleep)
raise Exception("Cannot start ClickHouse, see additional info in logs")

View File

@ -6,7 +6,7 @@
<host>mongo1</host>
<port>27017</port>
<database>test</database>
<table>simple_table</table>
<collection>simple_table</collection>
</mongo1>
</named_collections>
</clickhouse>

View File

@ -20,8 +20,12 @@ def started_cluster(request):
cluster.shutdown()
def get_mongo_connection(started_cluster, secure=False):
connection_str = 'mongodb://root:clickhouse@localhost:{}'.format(started_cluster.mongo_port)
def get_mongo_connection(started_cluster, secure=False, with_credentials=True):
connection_str = ''
if with_credentials:
connection_str = 'mongodb://root:clickhouse@localhost:{}'.format(started_cluster.mongo_port)
else:
connection_str = 'mongodb://localhost:27018'
if secure:
connection_str += '/?tls=true&tlsAllowInvalidCertificates=true'
return pymongo.MongoClient(connection_str)
@ -138,4 +142,20 @@ def test_predefined_connection_configuration(started_cluster):
node = started_cluster.instances['node']
node.query("create table simple_mongo_table(key UInt64, data String) engine = MongoDB(mongo1)")
assert node.query("SELECT count() FROM simple_mongo_table") == '100\n'
simple_mongo_table.drop()
@pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster'])
def test_no_credentials(started_cluster):
mongo_connection = get_mongo_connection(started_cluster, with_credentials=False)
db = mongo_connection['test']
simple_mongo_table = db['simple_table']
data = []
for i in range(0, 100):
data.append({'key': i, 'data': hex(i * i)})
simple_mongo_table.insert_many(data)
node = started_cluster.instances['node']
node.query("create table simple_mongo_table_2(key UInt64, data String) engine = MongoDB('mongo2:27017', 'test', 'simple_table', '', '')")
assert node.query("SELECT count() FROM simple_mongo_table_2") == '100\n'
simple_mongo_table.drop()