mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Fixes
This commit is contained in:
parent
11f88340a5
commit
4969da85d9
@ -2,37 +2,167 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
#include <Poco/MongoDB/Connection.h>
|
||||
#include <Poco/MongoDB/Cursor.h>
|
||||
#include <Poco/MongoDB/Element.h>
|
||||
#include <Poco/MongoDB/Database.h>
|
||||
#include <Poco/MongoDB/ObjectId.h>
|
||||
|
||||
#include <Columns/ColumnNullable.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <ext/range.h>
|
||||
#include <DataStreams/MongoDBBlockInputStream.h>
|
||||
#include <Poco/URI.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Poco/Version.h>
|
||||
|
||||
// only after poco
|
||||
// naming conflict:
|
||||
// Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value);
|
||||
// src/IO/WriteHelpers.h:146 #define writeCString(s, buf)
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/FieldVisitors.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <ext/range.h>
|
||||
#include "DictionaryStructure.h"
|
||||
#include "MongoDBBlockInputStream.h"
|
||||
|
||||
#include <ext/enumerate.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int TYPE_MISMATCH;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
extern const int MONGODB_CANNOT_AUTHENTICATE;
|
||||
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
|
||||
}
|
||||
|
||||
|
||||
#if POCO_VERSION < 0x01070800
|
||||
/// See https://pocoproject.org/forum/viewtopic.php?f=10&t=6326&p=11426&hilit=mongodb+auth#p11485
|
||||
void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password)
|
||||
{
|
||||
Poco::MongoDB::Database db(database);
|
||||
|
||||
/// Challenge-response authentication.
|
||||
std::string nonce;
|
||||
|
||||
/// First step: request nonce.
|
||||
{
|
||||
auto command = db.createCommand();
|
||||
command->setNumberToReturn(1);
|
||||
command->selector().add<Int32>("getnonce", 1);
|
||||
|
||||
Poco::MongoDB::ResponseMessage response;
|
||||
connection.sendRequest(*command, response);
|
||||
|
||||
if (response.documents().empty())
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned empty response for 'getnonce' command",
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
|
||||
auto doc = response.documents()[0];
|
||||
try
|
||||
{
|
||||
double ok = doc->get<double>("ok", 0);
|
||||
if (ok != 1)
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned response for 'getnonce' command that"
|
||||
" has field 'ok' missing or having wrong value",
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
|
||||
nonce = doc->get<std::string>("nonce", "");
|
||||
if (nonce.empty())
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned response for 'getnonce' command that"
|
||||
" has field 'nonce' missing or empty",
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
}
|
||||
catch (Poco::NotFoundException & e)
|
||||
{
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned response for 'getnonce' command that has missing required field: "
|
||||
+ e.displayText(),
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
}
|
||||
}
|
||||
|
||||
/// Second step: use nonce to calculate digest and send it back to the server.
|
||||
/// Digest is hex_md5(n.nonce + username + hex_md5(username + ":mongo:" + password))
|
||||
{
|
||||
std::string first = user + ":mongo:" + password;
|
||||
|
||||
Poco::MD5Engine md5;
|
||||
md5.update(first);
|
||||
std::string digest_first(Poco::DigestEngine::digestToHex(md5.digest()));
|
||||
std::string second = nonce + user + digest_first;
|
||||
md5.reset();
|
||||
md5.update(second);
|
||||
std::string digest_second(Poco::DigestEngine::digestToHex(md5.digest()));
|
||||
|
||||
auto command = db.createCommand();
|
||||
command->setNumberToReturn(1);
|
||||
command->selector()
|
||||
.add<Int32>("authenticate", 1)
|
||||
.add<std::string>("user", user)
|
||||
.add<std::string>("nonce", nonce)
|
||||
.add<std::string>("key", digest_second);
|
||||
|
||||
Poco::MongoDB::ResponseMessage response;
|
||||
connection.sendRequest(*command, response);
|
||||
|
||||
if (response.empty())
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned empty response for 'authenticate' command",
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
|
||||
auto doc = response.documents()[0];
|
||||
try
|
||||
{
|
||||
double ok = doc->get<double>("ok", 0);
|
||||
if (ok != 1)
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned response for 'authenticate' command that"
|
||||
" has field 'ok' missing or having wrong value",
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
}
|
||||
catch (Poco::NotFoundException & e)
|
||||
{
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned response for 'authenticate' command that has missing required field: "
|
||||
+ e.displayText(),
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select)
|
||||
{
|
||||
auto cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection);
|
||||
|
||||
/// Looks like selecting _id column is implicit by default.
|
||||
if (!sample_block_to_select.has("_id"))
|
||||
cursor->query().returnFieldSelector().add("_id", 0);
|
||||
|
||||
for (const auto & column : sample_block_to_select)
|
||||
cursor->query().returnFieldSelector().add(column.name, 1);
|
||||
return cursor;
|
||||
}
|
||||
|
||||
MongoDBBlockInputStream::MongoDBBlockInputStream(
|
||||
std::shared_ptr<Poco::MongoDB::Connection> & connection_,
|
||||
std::unique_ptr<Poco::MongoDB::Cursor> cursor_,
|
||||
const Block & sample_block,
|
||||
const UInt64 max_block_size_)
|
||||
: connection(connection_), cursor{std::move(cursor_)}, max_block_size{max_block_size_}
|
||||
UInt64 max_block_size_,
|
||||
bool strict_check_names_)
|
||||
: connection(connection_)
|
||||
, cursor{std::move(cursor_)}
|
||||
, max_block_size{max_block_size_}
|
||||
, strict_check_names{strict_check_names_}
|
||||
{
|
||||
description.init(sample_block);
|
||||
}
|
||||
@ -192,13 +322,17 @@ Block MongoDBBlockInputStream::readImpl()
|
||||
{
|
||||
Poco::MongoDB::ResponseMessage & response = cursor->next(*connection);
|
||||
|
||||
for (const auto & document : response.documents())
|
||||
for (auto & document : response.documents())
|
||||
{
|
||||
++num_rows;
|
||||
|
||||
for (const auto idx : ext::range(0, size))
|
||||
{
|
||||
const auto & name = description.sample_block.getByPosition(idx).name;
|
||||
|
||||
if (strict_check_names && !document->exists(name))
|
||||
throw Exception(fmt::format("Column {} is absent in MongoDB collection", backQuote(name)), ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
|
||||
|
||||
const Poco::MongoDB::Element::Ptr value = document->get(name);
|
||||
|
||||
if (value.isNull() || value->type() == Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId)
|
@ -14,9 +14,13 @@ namespace MongoDB
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password);
|
||||
|
||||
std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select);
|
||||
|
||||
/// Converts MongoDB Cursor to a stream of Blocks
|
||||
class MongoDBBlockInputStream final : public IBlockInputStream
|
||||
{
|
||||
@ -25,7 +29,8 @@ public:
|
||||
std::shared_ptr<Poco::MongoDB::Connection> & connection_,
|
||||
std::unique_ptr<Poco::MongoDB::Cursor> cursor_,
|
||||
const Block & sample_block,
|
||||
const UInt64 max_block_size_);
|
||||
UInt64 max_block_size_,
|
||||
bool strict_check_names_ = false);
|
||||
|
||||
~MongoDBBlockInputStream() override;
|
||||
|
||||
@ -41,6 +46,7 @@ private:
|
||||
const UInt64 max_block_size;
|
||||
ExternalResultDescription description;
|
||||
bool all_read = false;
|
||||
bool strict_check_names;
|
||||
};
|
||||
|
||||
}
|
@ -44,6 +44,7 @@ SRCS(
|
||||
SquashingBlockOutputStream.cpp
|
||||
SquashingTransform.cpp
|
||||
TTLBlockInputStream.cpp
|
||||
MongoDBBlockInputStream.cpp
|
||||
)
|
||||
|
||||
END()
|
||||
|
@ -52,7 +52,7 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/FieldVisitors.h>
|
||||
#include <ext/enumerate.h>
|
||||
#include "MongoDBBlockInputStream.h"
|
||||
#include <DataStreams/MongoDBBlockInputStream.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -67,104 +67,6 @@ namespace ErrorCodes
|
||||
static const UInt64 max_block_size = 8192;
|
||||
|
||||
|
||||
#if POCO_VERSION < 0x01070800
|
||||
/// See https://pocoproject.org/forum/viewtopic.php?f=10&t=6326&p=11426&hilit=mongodb+auth#p11485
|
||||
void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password)
|
||||
{
|
||||
Poco::MongoDB::Database db(database);
|
||||
|
||||
/// Challenge-response authentication.
|
||||
std::string nonce;
|
||||
|
||||
/// First step: request nonce.
|
||||
{
|
||||
auto command = db.createCommand();
|
||||
command->setNumberToReturn(1);
|
||||
command->selector().add<Int32>("getnonce", 1);
|
||||
|
||||
Poco::MongoDB::ResponseMessage response;
|
||||
connection.sendRequest(*command, response);
|
||||
|
||||
if (response.documents().empty())
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned empty response for 'getnonce' command",
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
|
||||
auto doc = response.documents()[0];
|
||||
try
|
||||
{
|
||||
double ok = doc->get<double>("ok", 0);
|
||||
if (ok != 1)
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned response for 'getnonce' command that"
|
||||
" has field 'ok' missing or having wrong value",
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
|
||||
nonce = doc->get<std::string>("nonce", "");
|
||||
if (nonce.empty())
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned response for 'getnonce' command that"
|
||||
" has field 'nonce' missing or empty",
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
}
|
||||
catch (Poco::NotFoundException & e)
|
||||
{
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned response for 'getnonce' command that has missing required field: "
|
||||
+ e.displayText(),
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
}
|
||||
}
|
||||
|
||||
/// Second step: use nonce to calculate digest and send it back to the server.
|
||||
/// Digest is hex_md5(n.nonce + username + hex_md5(username + ":mongo:" + password))
|
||||
{
|
||||
std::string first = user + ":mongo:" + password;
|
||||
|
||||
Poco::MD5Engine md5;
|
||||
md5.update(first);
|
||||
std::string digest_first(Poco::DigestEngine::digestToHex(md5.digest()));
|
||||
std::string second = nonce + user + digest_first;
|
||||
md5.reset();
|
||||
md5.update(second);
|
||||
std::string digest_second(Poco::DigestEngine::digestToHex(md5.digest()));
|
||||
|
||||
auto command = db.createCommand();
|
||||
command->setNumberToReturn(1);
|
||||
command->selector()
|
||||
.add<Int32>("authenticate", 1)
|
||||
.add<std::string>("user", user)
|
||||
.add<std::string>("nonce", nonce)
|
||||
.add<std::string>("key", digest_second);
|
||||
|
||||
Poco::MongoDB::ResponseMessage response;
|
||||
connection.sendRequest(*command, response);
|
||||
|
||||
if (response.empty())
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned empty response for 'authenticate' command",
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
|
||||
auto doc = response.documents()[0];
|
||||
try
|
||||
{
|
||||
double ok = doc->get<double>("ok", 0);
|
||||
if (ok != 1)
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned response for 'authenticate' command that"
|
||||
" has field 'ok' missing or having wrong value",
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
}
|
||||
catch (Poco::NotFoundException & e)
|
||||
{
|
||||
throw Exception(
|
||||
"Cannot authenticate in MongoDB: server returned response for 'authenticate' command that has missing required field: "
|
||||
+ e.displayText(),
|
||||
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
MongoDBDictionarySource::MongoDBDictionarySource(
|
||||
@ -245,19 +147,6 @@ MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource &
|
||||
MongoDBDictionarySource::~MongoDBDictionarySource() = default;
|
||||
|
||||
|
||||
std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select)
|
||||
{
|
||||
auto cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection);
|
||||
|
||||
/// Looks like selecting _id column is implicit by default.
|
||||
if (!sample_block_to_select.has("_id"))
|
||||
cursor->query().returnFieldSelector().add("_id", 0);
|
||||
|
||||
for (const auto & column : sample_block_to_select)
|
||||
cursor->query().returnFieldSelector().add(column.name, 1);
|
||||
return cursor;
|
||||
}
|
||||
|
||||
|
||||
BlockInputStreamPtr MongoDBDictionarySource::loadAll()
|
||||
{
|
||||
|
@ -19,7 +19,6 @@ namespace MongoDB
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
@ -27,10 +26,6 @@ namespace ErrorCodes
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password);
|
||||
|
||||
std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select);
|
||||
|
||||
/// Allows loading dictionaries from a MongoDB collection
|
||||
class MongoDBDictionarySource final : public IDictionarySource
|
||||
{
|
||||
|
@ -53,7 +53,6 @@ SRCS(
|
||||
HTTPDictionarySource.cpp
|
||||
LibraryDictionarySource.cpp
|
||||
LibraryDictionarySourceExternal.cpp
|
||||
MongoDBBlockInputStream.cpp
|
||||
MongoDBDictionarySource.cpp
|
||||
MySQLDictionarySource.cpp
|
||||
PolygonDictionary.cpp
|
||||
|
@ -14,8 +14,7 @@
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Processors/Sources/SourceFromInputStream.h>
|
||||
#include <Processors/Pipe.h>
|
||||
|
||||
#include <Dictionaries/MongoDBBlockInputStream.h>
|
||||
#include <DataStreams/MongoDBBlockInputStream.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -47,20 +46,23 @@ StorageMongoDB::StorageMongoDB(
|
||||
, global_context(context_)
|
||||
, connection{std::make_shared<Poco::MongoDB::Connection>(host, port)}
|
||||
{
|
||||
setColumns(columns_);
|
||||
setConstraints(constraints_);
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
storage_metadata.setColumns(columns_);
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
}
|
||||
|
||||
|
||||
Pipes StorageMongoDB::read(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & /*context*/,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
unsigned)
|
||||
{
|
||||
check(column_names);
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
|
||||
#if POCO_VERSION >= 0x01070800
|
||||
Poco::MongoDB::Database poco_db(database_name);
|
||||
@ -73,13 +75,13 @@ Pipes StorageMongoDB::read(
|
||||
Block sample_block;
|
||||
for (const String & column_name : column_names)
|
||||
{
|
||||
auto column_data = getColumns().getPhysical(column_name);
|
||||
auto column_data = metadata_snapshot->getColumns().getPhysical(column_name);
|
||||
sample_block.insert({ column_data.type, column_data.name });
|
||||
}
|
||||
|
||||
Pipes pipes;
|
||||
pipes.emplace_back(std::make_shared<SourceFromInputStream>(
|
||||
std::make_shared<MongoDBBlockInputStream>(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size)));
|
||||
std::make_shared<MongoDBBlockInputStream>(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size, true)));
|
||||
|
||||
return pipes;
|
||||
}
|
||||
@ -106,7 +108,6 @@ void registerStorageMongoDB(StorageFactory & factory)
|
||||
const String & username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
|
||||
|
||||
return StorageMongoDB::create(
|
||||
args.table_id,
|
||||
parsed_host_port.first,
|
||||
|
@ -7,8 +7,8 @@
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <mysqlxx/Pool.h>
|
||||
#include <Dictionaries/MongoDBDictionarySource.h>
|
||||
|
||||
#include <Poco/MongoDB/Connection.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -38,6 +38,7 @@ public:
|
||||
|
||||
Pipes read(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
|
@ -179,6 +179,7 @@ SRCS(
|
||||
TTLDescription.cpp
|
||||
KeyDescription.cpp
|
||||
SelectQueryDescription.cpp
|
||||
StorageMongoDB.cpp
|
||||
)
|
||||
|
||||
END()
|
||||
|
86
tests/integration/test_storage_mongodb/test.py
Normal file
86
tests/integration/test_storage_mongodb/test.py
Normal file
@ -0,0 +1,86 @@
|
||||
import pymongo
|
||||
|
||||
import pytest
|
||||
from helpers.client import QueryRuntimeException
|
||||
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance('node', with_mongo=True)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def get_mongo_connection():
|
||||
connection_str = 'mongodb://root:clickhouse@localhost:27018'
|
||||
return pymongo.MongoClient(connection_str)
|
||||
|
||||
|
||||
def test_simple_select(started_cluster):
|
||||
mongo_connection = get_mongo_connection()
|
||||
db = mongo_connection['test']
|
||||
db.add_user('root', 'clickhouse')
|
||||
simple_mongo_table = db['simple_table']
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({'key': i, 'data': hex(i * i)})
|
||||
simple_mongo_table.insert_many(data)
|
||||
|
||||
node.query("CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse')")
|
||||
|
||||
assert node.query("SELECT COUNT() FROM simple_mongo_table") == '100\n'
|
||||
assert node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + '\n'
|
||||
|
||||
assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + '\n'
|
||||
|
||||
|
||||
def test_complex_data_type(started_cluster):
|
||||
mongo_connection = get_mongo_connection()
|
||||
db = mongo_connection['test']
|
||||
db.add_user('root', 'clickhouse')
|
||||
incomplete_mongo_table = db['complex_table']
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({'key': i, 'data': hex(i * i), 'dict': {'a' : i, 'b': str(i)}})
|
||||
incomplete_mongo_table.insert_many(data)
|
||||
|
||||
node.query("CREATE TABLE incomplete_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse')")
|
||||
|
||||
assert node.query("SELECT COUNT() FROM incomplete_mongo_table") == '100\n'
|
||||
assert node.query("SELECT sum(key) FROM incomplete_mongo_table") == str(sum(range(0, 100))) + '\n'
|
||||
|
||||
assert node.query("SELECT data from incomplete_mongo_table where key = 42") == hex(42 * 42) + '\n'
|
||||
|
||||
|
||||
def test_incorrect_data_type(started_cluster):
|
||||
mongo_connection = get_mongo_connection()
|
||||
db = mongo_connection['test']
|
||||
db.add_user('root', 'clickhouse')
|
||||
strange_mongo_table = db['strange_table']
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({'key': i, 'data': hex(i * i), 'aaaa': 'Hello'})
|
||||
strange_mongo_table.insert_many(data)
|
||||
|
||||
node.query("CREATE TABLE strange_mongo_table(key String, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'strange_table', 'root', 'clickhouse')")
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node.query("SELECT COUNT() FROM strange_mongo_table")
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node.query("SELECT uniq(key) FROM strange_mongo_table")
|
||||
|
||||
node.query("CREATE TABLE strange_mongo_table2(key UInt64, data String, bbbb String) ENGINE = MongoDB('mongo1:27017', 'test', 'strange_table', 'root', 'clickhouse')")
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node.query("SELECT bbbb FROM strange_mongo_table2")
|
Loading…
Reference in New Issue
Block a user