diff --git a/src/Dictionaries/MongoDBBlockInputStream.cpp b/src/DataStreams/MongoDBBlockInputStream.cpp similarity index 61% rename from src/Dictionaries/MongoDBBlockInputStream.cpp rename to src/DataStreams/MongoDBBlockInputStream.cpp index fbfb738f057..1f68c2d5264 100644 --- a/src/Dictionaries/MongoDBBlockInputStream.cpp +++ b/src/DataStreams/MongoDBBlockInputStream.cpp @@ -2,37 +2,167 @@ #include #include +#include #include #include #include +#include #include #include #include #include #include +#include +#include +#include +#include +#include +#include +#include + +// only after poco +// naming conflict: +// Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value); +// src/IO/WriteHelpers.h:146 #define writeCString(s, buf) #include #include -#include -#include -#include "DictionaryStructure.h" -#include "MongoDBBlockInputStream.h" - +#include namespace DB { + namespace ErrorCodes { extern const int TYPE_MISMATCH; + extern const int UNSUPPORTED_METHOD; + extern const int MONGODB_CANNOT_AUTHENTICATE; + extern const int NOT_FOUND_COLUMN_IN_BLOCK; } +#if POCO_VERSION < 0x01070800 +/// See https://pocoproject.org/forum/viewtopic.php?f=10&t=6326&p=11426&hilit=mongodb+auth#p11485 +void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password) +{ + Poco::MongoDB::Database db(database); + + /// Challenge-response authentication. + std::string nonce; + + /// First step: request nonce. + { + auto command = db.createCommand(); + command->setNumberToReturn(1); + command->selector().add("getnonce", 1); + + Poco::MongoDB::ResponseMessage response; + connection.sendRequest(*command, response); + + if (response.documents().empty()) + throw Exception( + "Cannot authenticate in MongoDB: server returned empty response for 'getnonce' command", + ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); + + auto doc = response.documents()[0]; + try + { + double ok = doc->get("ok", 0); + if (ok != 1) + throw Exception( + "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that" + " has field 'ok' missing or having wrong value", + ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); + + nonce = doc->get("nonce", ""); + if (nonce.empty()) + throw Exception( + "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that" + " has field 'nonce' missing or empty", + ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); + } + catch (Poco::NotFoundException & e) + { + throw Exception( + "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that has missing required field: " + + e.displayText(), + ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); + } + } + + /// Second step: use nonce to calculate digest and send it back to the server. + /// Digest is hex_md5(n.nonce + username + hex_md5(username + ":mongo:" + password)) + { + std::string first = user + ":mongo:" + password; + + Poco::MD5Engine md5; + md5.update(first); + std::string digest_first(Poco::DigestEngine::digestToHex(md5.digest())); + std::string second = nonce + user + digest_first; + md5.reset(); + md5.update(second); + std::string digest_second(Poco::DigestEngine::digestToHex(md5.digest())); + + auto command = db.createCommand(); + command->setNumberToReturn(1); + command->selector() + .add("authenticate", 1) + .add("user", user) + .add("nonce", nonce) + .add("key", digest_second); + + Poco::MongoDB::ResponseMessage response; + connection.sendRequest(*command, response); + + if (response.empty()) + throw Exception( + "Cannot authenticate in MongoDB: server returned empty response for 'authenticate' command", + ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); + + auto doc = response.documents()[0]; + try + { + double ok = doc->get("ok", 0); + if (ok != 1) + throw Exception( + "Cannot authenticate in MongoDB: server returned response for 'authenticate' command that" + " has field 'ok' missing or having wrong value", + ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); + } + catch (Poco::NotFoundException & e) + { + throw Exception( + "Cannot authenticate in MongoDB: server returned response for 'authenticate' command that has missing required field: " + + e.displayText(), + ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); + } + } +} +#endif + +std::unique_ptr createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select) +{ + auto cursor = std::make_unique(database, collection); + + /// Looks like selecting _id column is implicit by default. + if (!sample_block_to_select.has("_id")) + cursor->query().returnFieldSelector().add("_id", 0); + + for (const auto & column : sample_block_to_select) + cursor->query().returnFieldSelector().add(column.name, 1); + return cursor; +} + MongoDBBlockInputStream::MongoDBBlockInputStream( std::shared_ptr & connection_, std::unique_ptr cursor_, const Block & sample_block, - const UInt64 max_block_size_) - : connection(connection_), cursor{std::move(cursor_)}, max_block_size{max_block_size_} + UInt64 max_block_size_, + bool strict_check_names_) + : connection(connection_) + , cursor{std::move(cursor_)} + , max_block_size{max_block_size_} + , strict_check_names{strict_check_names_} { description.init(sample_block); } @@ -192,13 +322,17 @@ Block MongoDBBlockInputStream::readImpl() { Poco::MongoDB::ResponseMessage & response = cursor->next(*connection); - for (const auto & document : response.documents()) + for (auto & document : response.documents()) { ++num_rows; for (const auto idx : ext::range(0, size)) { const auto & name = description.sample_block.getByPosition(idx).name; + + if (strict_check_names && !document->exists(name)) + throw Exception(fmt::format("Column {} is absent in MongoDB collection", backQuote(name)), ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK); + const Poco::MongoDB::Element::Ptr value = document->get(name); if (value.isNull() || value->type() == Poco::MongoDB::ElementTraits::TypeId) diff --git a/src/Dictionaries/MongoDBBlockInputStream.h b/src/DataStreams/MongoDBBlockInputStream.h similarity index 70% rename from src/Dictionaries/MongoDBBlockInputStream.h rename to src/DataStreams/MongoDBBlockInputStream.h index d5d692c827c..a844e3104e4 100644 --- a/src/Dictionaries/MongoDBBlockInputStream.h +++ b/src/DataStreams/MongoDBBlockInputStream.h @@ -14,9 +14,13 @@ namespace MongoDB } } - namespace DB { + +void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password); + +std::unique_ptr createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select); + /// Converts MongoDB Cursor to a stream of Blocks class MongoDBBlockInputStream final : public IBlockInputStream { @@ -25,7 +29,8 @@ public: std::shared_ptr & connection_, std::unique_ptr cursor_, const Block & sample_block, - const UInt64 max_block_size_); + UInt64 max_block_size_, + bool strict_check_names_ = false); ~MongoDBBlockInputStream() override; @@ -41,6 +46,7 @@ private: const UInt64 max_block_size; ExternalResultDescription description; bool all_read = false; + bool strict_check_names; }; } diff --git a/src/DataStreams/ya.make b/src/DataStreams/ya.make index 4c391cf839a..2ce82156c9d 100644 --- a/src/DataStreams/ya.make +++ b/src/DataStreams/ya.make @@ -44,6 +44,7 @@ SRCS( SquashingBlockOutputStream.cpp SquashingTransform.cpp TTLBlockInputStream.cpp + MongoDBBlockInputStream.cpp ) END() diff --git a/src/Dictionaries/MongoDBDictionarySource.cpp b/src/Dictionaries/MongoDBDictionarySource.cpp index b597bb72487..3ec21b3bb75 100644 --- a/src/Dictionaries/MongoDBDictionarySource.cpp +++ b/src/Dictionaries/MongoDBDictionarySource.cpp @@ -52,7 +52,7 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory) #include #include #include -#include "MongoDBBlockInputStream.h" +#include namespace DB @@ -67,104 +67,6 @@ namespace ErrorCodes static const UInt64 max_block_size = 8192; -#if POCO_VERSION < 0x01070800 -/// See https://pocoproject.org/forum/viewtopic.php?f=10&t=6326&p=11426&hilit=mongodb+auth#p11485 -void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password) -{ - Poco::MongoDB::Database db(database); - - /// Challenge-response authentication. - std::string nonce; - - /// First step: request nonce. - { - auto command = db.createCommand(); - command->setNumberToReturn(1); - command->selector().add("getnonce", 1); - - Poco::MongoDB::ResponseMessage response; - connection.sendRequest(*command, response); - - if (response.documents().empty()) - throw Exception( - "Cannot authenticate in MongoDB: server returned empty response for 'getnonce' command", - ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); - - auto doc = response.documents()[0]; - try - { - double ok = doc->get("ok", 0); - if (ok != 1) - throw Exception( - "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that" - " has field 'ok' missing or having wrong value", - ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); - - nonce = doc->get("nonce", ""); - if (nonce.empty()) - throw Exception( - "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that" - " has field 'nonce' missing or empty", - ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); - } - catch (Poco::NotFoundException & e) - { - throw Exception( - "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that has missing required field: " - + e.displayText(), - ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); - } - } - - /// Second step: use nonce to calculate digest and send it back to the server. - /// Digest is hex_md5(n.nonce + username + hex_md5(username + ":mongo:" + password)) - { - std::string first = user + ":mongo:" + password; - - Poco::MD5Engine md5; - md5.update(first); - std::string digest_first(Poco::DigestEngine::digestToHex(md5.digest())); - std::string second = nonce + user + digest_first; - md5.reset(); - md5.update(second); - std::string digest_second(Poco::DigestEngine::digestToHex(md5.digest())); - - auto command = db.createCommand(); - command->setNumberToReturn(1); - command->selector() - .add("authenticate", 1) - .add("user", user) - .add("nonce", nonce) - .add("key", digest_second); - - Poco::MongoDB::ResponseMessage response; - connection.sendRequest(*command, response); - - if (response.empty()) - throw Exception( - "Cannot authenticate in MongoDB: server returned empty response for 'authenticate' command", - ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); - - auto doc = response.documents()[0]; - try - { - double ok = doc->get("ok", 0); - if (ok != 1) - throw Exception( - "Cannot authenticate in MongoDB: server returned response for 'authenticate' command that" - " has field 'ok' missing or having wrong value", - ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); - } - catch (Poco::NotFoundException & e) - { - throw Exception( - "Cannot authenticate in MongoDB: server returned response for 'authenticate' command that has missing required field: " - + e.displayText(), - ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); - } - } -} -#endif MongoDBDictionarySource::MongoDBDictionarySource( @@ -245,19 +147,6 @@ MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource & MongoDBDictionarySource::~MongoDBDictionarySource() = default; -std::unique_ptr createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select) -{ - auto cursor = std::make_unique(database, collection); - - /// Looks like selecting _id column is implicit by default. - if (!sample_block_to_select.has("_id")) - cursor->query().returnFieldSelector().add("_id", 0); - - for (const auto & column : sample_block_to_select) - cursor->query().returnFieldSelector().add(column.name, 1); - return cursor; -} - BlockInputStreamPtr MongoDBDictionarySource::loadAll() { diff --git a/src/Dictionaries/MongoDBDictionarySource.h b/src/Dictionaries/MongoDBDictionarySource.h index d8ade4b17c1..ad7b66fe1a7 100644 --- a/src/Dictionaries/MongoDBDictionarySource.h +++ b/src/Dictionaries/MongoDBDictionarySource.h @@ -19,7 +19,6 @@ namespace MongoDB } } - namespace DB { namespace ErrorCodes @@ -27,10 +26,6 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } -void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password); - -std::unique_ptr createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select); - /// Allows loading dictionaries from a MongoDB collection class MongoDBDictionarySource final : public IDictionarySource { diff --git a/src/Dictionaries/ya.make b/src/Dictionaries/ya.make index 3de623a9a8b..e5522affd4b 100644 --- a/src/Dictionaries/ya.make +++ b/src/Dictionaries/ya.make @@ -53,7 +53,6 @@ SRCS( HTTPDictionarySource.cpp LibraryDictionarySource.cpp LibraryDictionarySourceExternal.cpp - MongoDBBlockInputStream.cpp MongoDBDictionarySource.cpp MySQLDictionarySource.cpp PolygonDictionary.cpp diff --git a/src/Storages/StorageMongoDB.cpp b/src/Storages/StorageMongoDB.cpp index db19e08b990..ee6296a6c8f 100644 --- a/src/Storages/StorageMongoDB.cpp +++ b/src/Storages/StorageMongoDB.cpp @@ -14,8 +14,7 @@ #include #include #include - -#include +#include namespace DB { @@ -47,20 +46,23 @@ StorageMongoDB::StorageMongoDB( , global_context(context_) , connection{std::make_shared(host, port)} { - setColumns(columns_); - setConstraints(constraints_); + StorageInMemoryMetadata storage_metadata; + storage_metadata.setColumns(columns_); + storage_metadata.setConstraints(constraints_); + setInMemoryMetadata(storage_metadata); } Pipes StorageMongoDB::read( const Names & column_names, + const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & /*query_info*/, const Context & /*context*/, QueryProcessingStage::Enum /*processed_stage*/, size_t max_block_size, unsigned) { - check(column_names); + metadata_snapshot->check(column_names, getVirtuals(), getStorageID()); #if POCO_VERSION >= 0x01070800 Poco::MongoDB::Database poco_db(database_name); @@ -73,13 +75,13 @@ Pipes StorageMongoDB::read( Block sample_block; for (const String & column_name : column_names) { - auto column_data = getColumns().getPhysical(column_name); + auto column_data = metadata_snapshot->getColumns().getPhysical(column_name); sample_block.insert({ column_data.type, column_data.name }); } Pipes pipes; pipes.emplace_back(std::make_shared( - std::make_shared(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size))); + std::make_shared(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size, true))); return pipes; } @@ -106,7 +108,6 @@ void registerStorageMongoDB(StorageFactory & factory) const String & username = engine_args[3]->as().value.safeGet(); const String & password = engine_args[4]->as().value.safeGet(); - return StorageMongoDB::create( args.table_id, parsed_host_port.first, diff --git a/src/Storages/StorageMongoDB.h b/src/Storages/StorageMongoDB.h index c037972f36b..951fda7ef40 100644 --- a/src/Storages/StorageMongoDB.h +++ b/src/Storages/StorageMongoDB.h @@ -7,8 +7,8 @@ #include #include -#include -#include + +#include namespace DB @@ -38,6 +38,7 @@ public: Pipes read( const Names & column_names, + const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage, diff --git a/src/Storages/ya.make b/src/Storages/ya.make index 4ca1dad1b1b..88e0968307d 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -179,6 +179,7 @@ SRCS( TTLDescription.cpp KeyDescription.cpp SelectQueryDescription.cpp + StorageMongoDB.cpp ) END() diff --git a/tests/integration/test_storage_mongodb/test.py b/tests/integration/test_storage_mongodb/test.py new file mode 100644 index 00000000000..90534949b0b --- /dev/null +++ b/tests/integration/test_storage_mongodb/test.py @@ -0,0 +1,86 @@ +import pymongo + +import pytest +from helpers.client import QueryRuntimeException + + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +node = cluster.add_instance('node', with_mongo=True) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + + finally: + cluster.shutdown() + + +def get_mongo_connection(): + connection_str = 'mongodb://root:clickhouse@localhost:27018' + return pymongo.MongoClient(connection_str) + + +def test_simple_select(started_cluster): + mongo_connection = get_mongo_connection() + db = mongo_connection['test'] + db.add_user('root', 'clickhouse') + simple_mongo_table = db['simple_table'] + data = [] + for i in range(0, 100): + data.append({'key': i, 'data': hex(i * i)}) + simple_mongo_table.insert_many(data) + + node.query("CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse')") + + assert node.query("SELECT COUNT() FROM simple_mongo_table") == '100\n' + assert node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + '\n' + + assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + '\n' + + +def test_complex_data_type(started_cluster): + mongo_connection = get_mongo_connection() + db = mongo_connection['test'] + db.add_user('root', 'clickhouse') + incomplete_mongo_table = db['complex_table'] + data = [] + for i in range(0, 100): + data.append({'key': i, 'data': hex(i * i), 'dict': {'a' : i, 'b': str(i)}}) + incomplete_mongo_table.insert_many(data) + + node.query("CREATE TABLE incomplete_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse')") + + assert node.query("SELECT COUNT() FROM incomplete_mongo_table") == '100\n' + assert node.query("SELECT sum(key) FROM incomplete_mongo_table") == str(sum(range(0, 100))) + '\n' + + assert node.query("SELECT data from incomplete_mongo_table where key = 42") == hex(42 * 42) + '\n' + + +def test_incorrect_data_type(started_cluster): + mongo_connection = get_mongo_connection() + db = mongo_connection['test'] + db.add_user('root', 'clickhouse') + strange_mongo_table = db['strange_table'] + data = [] + for i in range(0, 100): + data.append({'key': i, 'data': hex(i * i), 'aaaa': 'Hello'}) + strange_mongo_table.insert_many(data) + + node.query("CREATE TABLE strange_mongo_table(key String, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'strange_table', 'root', 'clickhouse')") + + with pytest.raises(QueryRuntimeException): + node.query("SELECT COUNT() FROM strange_mongo_table") + + with pytest.raises(QueryRuntimeException): + node.query("SELECT uniq(key) FROM strange_mongo_table") + + node.query("CREATE TABLE strange_mongo_table2(key UInt64, data String, bbbb String) ENGINE = MongoDB('mongo1:27017', 'test', 'strange_table', 'root', 'clickhouse')") + + with pytest.raises(QueryRuntimeException): + node.query("SELECT bbbb FROM strange_mongo_table2")