From 183f90e45a7601e5ad4af63b24dabfc506a637ae Mon Sep 17 00:00:00 2001 From: Nikolay Degterinsky Date: Mon, 22 May 2023 02:02:09 +0000 Subject: [PATCH 1/6] Update MongoDB protocol --- .../Foundation/include/Poco/BinaryReader.h | 3 + .../Foundation/include/Poco/BinaryWriter.h | 5 + base/poco/Foundation/src/BinaryReader.cpp | 25 ++ base/poco/Foundation/src/BinaryWriter.cpp | 11 +- base/poco/MongoDB/CMakeLists.txt | 1 + .../poco/MongoDB/include/Poco/MongoDB/Array.h | 36 +- .../MongoDB/include/Poco/MongoDB/Binary.h | 2 +- .../MongoDB/include/Poco/MongoDB/Connection.h | 22 +- .../MongoDB/include/Poco/MongoDB/Cursor.h | 3 + .../MongoDB/include/Poco/MongoDB/Database.h | 83 +++- .../MongoDB/include/Poco/MongoDB/Document.h | 27 +- .../MongoDB/include/Poco/MongoDB/Element.h | 6 +- .../include/Poco/MongoDB/JavaScriptCode.h | 2 +- .../include/Poco/MongoDB/MessageHeader.h | 11 +- .../MongoDB/include/Poco/MongoDB/MongoDB.h | 12 + .../MongoDB/include/Poco/MongoDB/ObjectId.h | 2 +- .../include/Poco/MongoDB/OpMsgCursor.h | 96 ++++ .../include/Poco/MongoDB/OpMsgMessage.h | 163 +++++++ .../Poco/MongoDB/PoolableConnectionFactory.h | 16 + .../include/Poco/MongoDB/RegularExpression.h | 2 +- .../include/Poco/MongoDB/ResponseMessage.h | 3 + base/poco/MongoDB/src/Array.cpp | 4 +- base/poco/MongoDB/src/Connection.cpp | 26 ++ base/poco/MongoDB/src/Cursor.cpp | 6 + base/poco/MongoDB/src/Database.cpp | 48 +- base/poco/MongoDB/src/DeleteRequest.cpp | 4 +- base/poco/MongoDB/src/Document.cpp | 14 +- base/poco/MongoDB/src/Element.cpp | 2 +- base/poco/MongoDB/src/GetMoreRequest.cpp | 2 +- base/poco/MongoDB/src/InsertRequest.cpp | 2 +- base/poco/MongoDB/src/KillCursorsRequest.cpp | 2 +- base/poco/MongoDB/src/Message.cpp | 2 +- base/poco/MongoDB/src/MessageHeader.cpp | 12 +- base/poco/MongoDB/src/ObjectId.cpp | 2 +- base/poco/MongoDB/src/OpMsgCursor.cpp | 187 ++++++++ base/poco/MongoDB/src/OpMsgMessage.cpp | 412 ++++++++++++++++++ base/poco/MongoDB/src/QueryRequest.cpp | 6 +- base/poco/MongoDB/src/RegularExpression.cpp | 4 +- base/poco/MongoDB/src/ReplicaSet.cpp | 6 +- base/poco/MongoDB/src/RequestMessage.cpp | 4 +- base/poco/MongoDB/src/ResponseMessage.cpp | 20 +- base/poco/MongoDB/src/UpdateRequest.cpp | 2 +- .../runner/compose/docker_compose_mongo.yml | 2 +- .../compose/docker_compose_mongo_secure.yml | 2 +- src/Dictionaries/MongoDBDictionarySource.cpp | 15 +- src/Dictionaries/MongoDBDictionarySource.h | 1 - src/Processors/Sources/MongoDBSource.cpp | 76 +++- src/Processors/Sources/MongoDBSource.h | 32 +- src/Storages/StorageMongoDB.cpp | 34 +- .../integration/test_storage_mongodb/test.py | 42 +- 50 files changed, 1399 insertions(+), 103 deletions(-) create mode 100644 base/poco/MongoDB/include/Poco/MongoDB/OpMsgCursor.h create mode 100644 base/poco/MongoDB/include/Poco/MongoDB/OpMsgMessage.h create mode 100644 base/poco/MongoDB/src/OpMsgCursor.cpp create mode 100644 base/poco/MongoDB/src/OpMsgMessage.cpp diff --git a/base/poco/Foundation/include/Poco/BinaryReader.h b/base/poco/Foundation/include/Poco/BinaryReader.h index 4042b507a2f..2b9bca29944 100644 --- a/base/poco/Foundation/include/Poco/BinaryReader.h +++ b/base/poco/Foundation/include/Poco/BinaryReader.h @@ -117,6 +117,9 @@ public: void readRaw(char * buffer, std::streamsize length); /// Reads length bytes of raw data into buffer. + void readCString(std::string& value); + /// Reads zero-terminated C-string into value. + void readBOM(); /// Reads a byte-order mark from the stream and configures /// the reader for the encountered byte order. diff --git a/base/poco/Foundation/include/Poco/BinaryWriter.h b/base/poco/Foundation/include/Poco/BinaryWriter.h index aa280d4ccab..a35d76d84bc 100644 --- a/base/poco/Foundation/include/Poco/BinaryWriter.h +++ b/base/poco/Foundation/include/Poco/BinaryWriter.h @@ -56,6 +56,8 @@ public: LITTLE_ENDIAN_BYTE_ORDER = 3 /// little-endian byte-order }; + static const std::streamsize DEFAULT_MAX_CSTR_LENGTH { 1024 }; + BinaryWriter(std::ostream & ostr, StreamByteOrder byteOrder = NATIVE_BYTE_ORDER); /// Creates the BinaryWriter. @@ -131,6 +133,9 @@ public: void writeRaw(const char * buffer, std::streamsize length); /// Writes length raw bytes from the given buffer to the stream. + void writeCString(const char* cString, std::streamsize maxLength = DEFAULT_MAX_CSTR_LENGTH); + /// Writes zero-terminated C-string. + void writeBOM(); /// Writes a byte-order mark to the stream. A byte order mark is /// a 16-bit integer with a value of 0xFEFF, written in host byte-order. diff --git a/base/poco/Foundation/src/BinaryReader.cpp b/base/poco/Foundation/src/BinaryReader.cpp index f2961e03966..37ec2bc9040 100644 --- a/base/poco/Foundation/src/BinaryReader.cpp +++ b/base/poco/Foundation/src/BinaryReader.cpp @@ -274,6 +274,31 @@ void BinaryReader::readRaw(char* buffer, std::streamsize length) } +void BinaryReader::readCString(std::string& value) +{ + value.clear(); + if (!_istr.good()) + { + return; + } + value.reserve(256); + while (true) + { + char c; + _istr.get(c); + if (!_istr.good()) + { + break; + } + if (c == '\0') + { + break; + } + value += c; + } +} + + void BinaryReader::readBOM() { UInt16 bom; diff --git a/base/poco/Foundation/src/BinaryWriter.cpp b/base/poco/Foundation/src/BinaryWriter.cpp index 6db5ab7cb90..c3fcabc4374 100644 --- a/base/poco/Foundation/src/BinaryWriter.cpp +++ b/base/poco/Foundation/src/BinaryWriter.cpp @@ -271,7 +271,7 @@ BinaryWriter& BinaryWriter::operator << (const std::string& value) BinaryWriter& BinaryWriter::operator << (const char* value) { poco_check_ptr (value); - + if (_pTextConverter) { std::string converted; @@ -332,6 +332,15 @@ void BinaryWriter::writeRaw(const char* buffer, std::streamsize length) } +void BinaryWriter::writeCString(const char* cString, std::streamsize maxLength) +{ + const std::size_t len = ::strnlen(cString, maxLength); + writeRaw(cString, len); + static const char zero = '\0'; + _ostr.write(&zero, sizeof(zero)); +} + + void BinaryWriter::writeBOM() { UInt16 value = 0xFEFF; diff --git a/base/poco/MongoDB/CMakeLists.txt b/base/poco/MongoDB/CMakeLists.txt index 8f5c6be2cae..bb6f90ed8f5 100644 --- a/base/poco/MongoDB/CMakeLists.txt +++ b/base/poco/MongoDB/CMakeLists.txt @@ -13,3 +13,4 @@ target_compile_options (_poco_mongodb target_include_directories (_poco_mongodb SYSTEM PUBLIC "include") target_link_libraries (_poco_mongodb PUBLIC Poco::Net) + diff --git a/base/poco/MongoDB/include/Poco/MongoDB/Array.h b/base/poco/MongoDB/include/Poco/MongoDB/Array.h index 4ed9cdd87ee..8a30c785b2d 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/Array.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/Array.h @@ -33,7 +33,7 @@ namespace MongoDB /// This class represents a BSON Array. { public: - typedef SharedPtr Ptr; + using Ptr = SharedPtr; Array(); /// Creates an empty Array. @@ -41,8 +41,31 @@ namespace MongoDB virtual ~Array(); /// Destroys the Array. + // Document template functions available for backward compatibility + using Document::add; + using Document::get; + template - T get(int pos) const + Document & add(T value) + /// Creates an element with the name from the current pos and value and + /// adds it to the array document. + /// + /// The active document is returned to allow chaining of the add methods. + { + return Document::add(Poco::NumberFormatter::format(size()), value); + } + + Document & add(const char * value) + /// Creates an element with a name from the current pos and value and + /// adds it to the array document. + /// + /// The active document is returned to allow chaining of the add methods. + { + return Document::add(Poco::NumberFormatter::format(size()), value); + } + + template + T get(std::size_t pos) const /// Returns the element at the given index and tries to convert /// it to the template type. If the element is not found, a /// Poco::NotFoundException will be thrown. If the element cannot be @@ -52,7 +75,7 @@ namespace MongoDB } template - T get(int pos, const T & deflt) const + T get(std::size_t pos, const T & deflt) const /// Returns the element at the given index and tries to convert /// it to the template type. If the element is not found, or /// has the wrong type, the deflt argument will be returned. @@ -60,12 +83,12 @@ namespace MongoDB return Document::get(Poco::NumberFormatter::format(pos), deflt); } - Element::Ptr get(int pos) const; + Element::Ptr get(std::size_t pos) const; /// Returns the element at the given index. /// An empty element will be returned if the element is not found. template - bool isType(int pos) const + bool isType(std::size_t pos) const /// Returns true if the type of the element equals the TypeId of ElementTrait, /// otherwise false. { @@ -74,6 +97,9 @@ namespace MongoDB std::string toString(int indent = 0) const; /// Returns a string representation of the Array. + + private: + friend void BSONReader::read(Array::Ptr & to); }; diff --git a/base/poco/MongoDB/include/Poco/MongoDB/Binary.h b/base/poco/MongoDB/include/Poco/MongoDB/Binary.h index 1005cb000f5..aad8736e8b6 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/Binary.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/Binary.h @@ -40,7 +40,7 @@ namespace MongoDB /// A Binary stores its data in a Poco::Buffer. { public: - typedef SharedPtr Ptr; + using Ptr = SharedPtr; Binary(); /// Creates an empty Binary with subtype 0. diff --git a/base/poco/MongoDB/include/Poco/MongoDB/Connection.h b/base/poco/MongoDB/include/Poco/MongoDB/Connection.h index dcb813b75bc..cf679d530aa 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/Connection.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/Connection.h @@ -18,6 +18,7 @@ #define MongoDB_Connection_INCLUDED +#include "Poco/MongoDB/OpMsgMessage.h" #include "Poco/MongoDB/RequestMessage.h" #include "Poco/MongoDB/ResponseMessage.h" #include "Poco/Mutex.h" @@ -39,7 +40,7 @@ namespace MongoDB /// for more information on the wire protocol. { public: - typedef Poco::SharedPtr Ptr; + using Ptr = Poco::SharedPtr; class MongoDB_API SocketFactory { @@ -90,7 +91,7 @@ namespace MongoDB Poco::Net::SocketAddress address() const; /// Returns the address of the MongoDB server. - + const std::string & uri() const; /// Returns the uri on which the connection was made. @@ -145,6 +146,21 @@ namespace MongoDB /// Use this when a response is expected: only a "query" or "getmore" /// request will return a response. + void sendRequest(OpMsgMessage & request, OpMsgMessage & response); + /// Sends a request to the MongoDB server and receives the response + /// using newer wire protocol with OP_MSG. + + void sendRequest(OpMsgMessage & request); + /// Sends an unacknowledged request to the MongoDB server using newer + /// wire protocol with OP_MSG. + /// No response is sent by the server. + + void readResponse(OpMsgMessage & response); + /// Reads additional response data when previous message's flag moreToCome + /// indicates that server will send more data. + /// NOTE: See comments in OpMsgCursor code. + + protected: void connect(); @@ -164,7 +180,7 @@ namespace MongoDB } inline const std::string & Connection::uri() const { - return _uri; + return _uri; } diff --git a/base/poco/MongoDB/include/Poco/MongoDB/Cursor.h b/base/poco/MongoDB/include/Poco/MongoDB/Cursor.h index 4aed9fe64fb..8849d737a62 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/Cursor.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/Cursor.h @@ -40,6 +40,9 @@ namespace MongoDB Cursor(const std::string & fullCollectionName, QueryRequest::Flags flags = QueryRequest::QUERY_DEFAULT); /// Creates a Cursor for the given database and collection ("database.collection"), using the specified flags. + Cursor(const Document & aggregationResponse); + /// Creates a Cursor for the given aggregation query response. + virtual ~Cursor(); /// Destroys the Cursor. diff --git a/base/poco/MongoDB/include/Poco/MongoDB/Database.h b/base/poco/MongoDB/include/Poco/MongoDB/Database.h index 62aea632b08..3334a673df6 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/Database.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/Database.h @@ -26,6 +26,8 @@ #include "Poco/MongoDB/QueryRequest.h" #include "Poco/MongoDB/UpdateRequest.h" +#include "Poco/MongoDB/OpMsgCursor.h" +#include "Poco/MongoDB/OpMsgMessage.h" namespace Poco { @@ -45,6 +47,9 @@ namespace MongoDB virtual ~Database(); /// Destroys the Database. + const std::string & name() const; + /// Database name + bool authenticate( Connection & connection, const std::string & username, @@ -62,34 +67,49 @@ namespace MongoDB /// May throw a Poco::ProtocolException if authentication fails for a reason other than /// invalid credentials. + Document::Ptr queryBuildInfo(Connection & connection) const; + /// Queries server build info (all wire protocols) + + Document::Ptr queryServerHello(Connection & connection) const; + /// Queries hello response from server (all wire protocols) + Int64 count(Connection & connection, const std::string & collectionName) const; - /// Sends a count request for the given collection to MongoDB. + /// Sends a count request for the given collection to MongoDB. (old wire protocol) /// /// If the command fails, -1 is returned. Poco::SharedPtr createCommand() const; - /// Creates a QueryRequest for a command. + /// Creates a QueryRequest for a command. (old wire protocol) Poco::SharedPtr createCountRequest(const std::string & collectionName) const; /// Creates a QueryRequest to count the given collection. - /// The collectionname must not contain the database name. + /// The collectionname must not contain the database name. (old wire protocol) Poco::SharedPtr createDeleteRequest(const std::string & collectionName) const; /// Creates a DeleteRequest to delete documents in the given collection. - /// The collectionname must not contain the database name. + /// The collectionname must not contain the database name. (old wire protocol) Poco::SharedPtr createInsertRequest(const std::string & collectionName) const; /// Creates an InsertRequest to insert new documents in the given collection. - /// The collectionname must not contain the database name. + /// The collectionname must not contain the database name. (old wire protocol) Poco::SharedPtr createQueryRequest(const std::string & collectionName) const; - /// Creates a QueryRequest. + /// Creates a QueryRequest. (old wire protocol) /// The collectionname must not contain the database name. Poco::SharedPtr createUpdateRequest(const std::string & collectionName) const; - /// Creates an UpdateRequest. + /// Creates an UpdateRequest. (old wire protocol) /// The collectionname must not contain the database name. + Poco::SharedPtr createOpMsgMessage(const std::string & collectionName) const; + /// Creates OpMsgMessage. (new wire protocol) + + Poco::SharedPtr createOpMsgMessage() const; + /// Creates OpMsgMessage for database commands that do not require collection as an argument. (new wire protocol) + + Poco::SharedPtr createOpMsgCursor(const std::string & collectionName) const; + /// Creates OpMsgCursor. (new wire protocol) + Poco::MongoDB::Document::Ptr ensureIndex( Connection & connection, const std::string & collection, @@ -100,14 +120,16 @@ namespace MongoDB int version = 0, int ttl = 0); /// Creates an index. The document returned is the result of a getLastError call. - /// For more info look at the ensureIndex information on the MongoDB website. + /// For more info look at the ensureIndex information on the MongoDB website. (old wire protocol) Document::Ptr getLastErrorDoc(Connection & connection) const; /// Sends the getLastError command to the database and returns the error document. + /// (old wire protocol) std::string getLastError(Connection & connection) const; /// Sends the getLastError command to the database and returns the err element /// from the error document. When err is null, an empty string is returned. + /// (old wire protocol) static const std::string AUTH_MONGODB_CR; /// Default authentication mechanism prior to MongoDB 3.0. @@ -115,6 +137,27 @@ namespace MongoDB static const std::string AUTH_SCRAM_SHA1; /// Default authentication mechanism for MongoDB 3.0. + enum WireVersion + /// Wire version as reported by the command hello. + /// See details in MongoDB github, repository specifications. + /// @see queryServerHello + { + VER_26 = 1, + VER_26_2 = 2, + VER_30 = 3, + VER_32 = 4, + VER_34 = 5, + VER_36 = 6, ///< First wire version that supports OP_MSG + VER_40 = 7, + VER_42 = 8, + VER_44 = 9, + VER_50 = 13, + VER_51 = 14, ///< First wire version that supports only OP_MSG + VER_52 = 15, + VER_53 = 16, + VER_60 = 17 + }; + protected: bool authCR(Connection & connection, const std::string & username, const std::string & password); bool authSCRAM(Connection & connection, const std::string & username, const std::string & password); @@ -127,6 +170,12 @@ namespace MongoDB // // inlines // + inline const std::string & Database::name() const + { + return _dbname; + } + + inline Poco::SharedPtr Database::createCommand() const { Poco::SharedPtr cmd = createQueryRequest("$cmd"); @@ -158,6 +207,24 @@ namespace MongoDB return new Poco::MongoDB::UpdateRequest(_dbname + '.' + collectionName); } + // -- New wire protocol commands + + inline Poco::SharedPtr Database::createOpMsgMessage(const std::string & collectionName) const + { + return new Poco::MongoDB::OpMsgMessage(_dbname, collectionName); + } + + inline Poco::SharedPtr Database::createOpMsgMessage() const + { + // Collection name for database commands is not needed. + return createOpMsgMessage(""); + } + + inline Poco::SharedPtr Database::createOpMsgCursor(const std::string & collectionName) const + { + return new Poco::MongoDB::OpMsgCursor(_dbname, collectionName); + } + } } // namespace Poco::MongoDB diff --git a/base/poco/MongoDB/include/Poco/MongoDB/Document.h b/base/poco/MongoDB/include/Poco/MongoDB/Document.h index 12889663827..9e1df349e20 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/Document.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/Document.h @@ -31,6 +31,7 @@ namespace Poco namespace MongoDB { + class Array; class ElementFindByName { @@ -48,8 +49,8 @@ namespace MongoDB /// Represents a MongoDB (BSON) document. { public: - typedef SharedPtr Ptr; - typedef std::vector Vector; + using Ptr = SharedPtr; + using Vector = std::vector; Document(); /// Creates an empty Document. @@ -86,6 +87,10 @@ namespace MongoDB /// Unlike the other add methods, this method returns /// a reference to the new document. + Array & addNewArray(const std::string & name); + /// Create a new array and add it to this document. + /// Method returns a reference to the new array. + void clear(); /// Removes all elements from the document. @@ -95,7 +100,7 @@ namespace MongoDB bool empty() const; /// Returns true if the document doesn't contain any documents. - bool exists(const std::string & name); + bool exists(const std::string & name) const; /// Returns true if the document has an element with the given name. template @@ -158,6 +163,9 @@ namespace MongoDB /// return an Int64. When the element is not found, a /// Poco::NotFoundException will be thrown. + bool remove(const std::string & name); + /// Removes an element from the document. + template bool isType(const std::string & name) const /// Returns true when the type of the element equals the TypeId of ElementTrait. @@ -227,12 +235,23 @@ namespace MongoDB } - inline bool Document::exists(const std::string & name) + inline bool Document::exists(const std::string & name) const { return std::find_if(_elements.begin(), _elements.end(), ElementFindByName(name)) != _elements.end(); } + inline bool Document::remove(const std::string & name) + { + auto it = std::find_if(_elements.begin(), _elements.end(), ElementFindByName(name)); + if (it == _elements.end()) + return false; + + _elements.erase(it); + return true; + } + + inline std::size_t Document::size() const { return _elements.size(); diff --git a/base/poco/MongoDB/include/Poco/MongoDB/Element.h b/base/poco/MongoDB/include/Poco/MongoDB/Element.h index b5592bd0e0b..26525d7d02b 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/Element.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/Element.h @@ -45,7 +45,7 @@ namespace MongoDB /// Represents an Element of a Document or an Array. { public: - typedef Poco::SharedPtr Ptr; + using Ptr = Poco::SharedPtr; explicit Element(const std::string & name); /// Creates the Element with the given name. @@ -80,7 +80,7 @@ namespace MongoDB } - typedef std::list ElementSet; + using ElementSet = std::list; template @@ -266,7 +266,7 @@ namespace MongoDB } - typedef Nullable NullValue; + using NullValue = Nullable; // BSON Null Value diff --git a/base/poco/MongoDB/include/Poco/MongoDB/JavaScriptCode.h b/base/poco/MongoDB/include/Poco/MongoDB/JavaScriptCode.h index df1edc16817..c0f584b7c19 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/JavaScriptCode.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/JavaScriptCode.h @@ -35,7 +35,7 @@ namespace MongoDB /// Represents JavaScript type in BSON. { public: - typedef SharedPtr Ptr; + using Ptr = SharedPtr; JavaScriptCode(); /// Creates an empty JavaScriptCode object. diff --git a/base/poco/MongoDB/include/Poco/MongoDB/MessageHeader.h b/base/poco/MongoDB/include/Poco/MongoDB/MessageHeader.h index 2b88e30fc74..98f45e876c1 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/MessageHeader.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/MessageHeader.h @@ -28,6 +28,9 @@ namespace MongoDB { + class Message; // Required to disambiguate friend declaration in MessageHeader. + + class MongoDB_API MessageHeader /// Represents the message header which is always prepended to a /// MongoDB request or response message. @@ -37,14 +40,18 @@ namespace MongoDB enum OpCode { + // Opcodes deprecated in MongoDB 5.0 OP_REPLY = 1, - OP_MSG = 1000, OP_UPDATE = 2001, OP_INSERT = 2002, OP_QUERY = 2004, OP_GET_MORE = 2005, OP_DELETE = 2006, - OP_KILL_CURSORS = 2007 + OP_KILL_CURSORS = 2007, + + /// Opcodes supported in MongoDB 5.1 and later + OP_COMPRESSED = 2012, + OP_MSG = 2013 }; explicit MessageHeader(OpCode); diff --git a/base/poco/MongoDB/include/Poco/MongoDB/MongoDB.h b/base/poco/MongoDB/include/Poco/MongoDB/MongoDB.h index 253f1f8ab27..de246ddc9dd 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/MongoDB.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/MongoDB.h @@ -33,6 +33,13 @@ // +#if defined(_WIN32) && defined(POCO_DLL) +# if defined(MongoDB_EXPORTS) +# define MongoDB_API __declspec(dllexport) +# else +# define MongoDB_API __declspec(dllimport) +# endif +#endif #if !defined(MongoDB_API) @@ -47,6 +54,11 @@ // // Automatically link MongoDB library. // +#if defined(_MSC_VER) +# if !defined(POCO_NO_AUTOMATIC_LIBS) && !defined(MongoDB_EXPORTS) +# pragma comment(lib, "PocoMongoDB" POCO_LIB_SUFFIX) +# endif +#endif #endif // MongoDBMongoDB_INCLUDED diff --git a/base/poco/MongoDB/include/Poco/MongoDB/ObjectId.h b/base/poco/MongoDB/include/Poco/MongoDB/ObjectId.h index 76bb190db48..8a335320ea0 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/ObjectId.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/ObjectId.h @@ -44,7 +44,7 @@ namespace MongoDB /// as its value. { public: - typedef SharedPtr Ptr; + using Ptr = SharedPtr; explicit ObjectId(const std::string & id); /// Creates an ObjectId from a string. diff --git a/base/poco/MongoDB/include/Poco/MongoDB/OpMsgCursor.h b/base/poco/MongoDB/include/Poco/MongoDB/OpMsgCursor.h new file mode 100644 index 00000000000..a465a71bb1c --- /dev/null +++ b/base/poco/MongoDB/include/Poco/MongoDB/OpMsgCursor.h @@ -0,0 +1,96 @@ +// +// OpMsgCursor.h +// +// Library: MongoDB +// Package: MongoDB +// Module: OpMsgCursor +// +// Definition of the OpMsgCursor class. +// +// Copyright (c) 2012, Applied Informatics Software Engineering GmbH. +// and Contributors. +// +// SPDX-License-Identifier: BSL-1.0 +// + + +#ifndef MongoDB_OpMsgCursor_INCLUDED +#define MongoDB_OpMsgCursor_INCLUDED + + +#include "Poco/MongoDB/Connection.h" +#include "Poco/MongoDB/MongoDB.h" +#include "Poco/MongoDB/OpMsgMessage.h" + +namespace Poco +{ +namespace MongoDB +{ + + + class MongoDB_API OpMsgCursor : public Document + /// OpMsgCursor is an helper class for querying multiple documents using OpMsgMessage. + { + public: + OpMsgCursor(const std::string & dbname, const std::string & collectionName); + /// Creates a OpMsgCursor for the given database and collection. + + virtual ~OpMsgCursor(); + /// Destroys the OpMsgCursor. + + void setEmptyFirstBatch(bool empty); + /// Empty first batch is used to get error response faster with little server processing + + bool emptyFirstBatch() const; + + void setBatchSize(Int32 batchSize); + /// Set non-default batch size + + Int32 batchSize() const; + /// Current batch size (zero or negative number indicates default batch size) + + Int64 cursorID() const; + + OpMsgMessage & next(Connection & connection); + /// Tries to get the next documents. As long as response message has a + /// cursor ID next can be called to retrieve the next bunch of documents. + /// + /// The cursor must be killed (see kill()) when not all documents are needed. + + OpMsgMessage & query(); + /// Returns the associated query. + + void kill(Connection & connection); + /// Kills the cursor and reset it so that it can be reused. + + private: + OpMsgMessage _query; + OpMsgMessage _response; + + bool _emptyFirstBatch{false}; + Int32 _batchSize{-1}; + /// Batch size used in the cursor. Zero or negative value means that default shall be used. + + Int64 _cursorID{0}; + }; + + + // + // inlines + // + inline OpMsgMessage & OpMsgCursor::query() + { + return _query; + } + + inline Int64 OpMsgCursor::cursorID() const + { + return _cursorID; + } + + +} +} // namespace Poco::MongoDB + + +#endif // MongoDB_OpMsgCursor_INCLUDED diff --git a/base/poco/MongoDB/include/Poco/MongoDB/OpMsgMessage.h b/base/poco/MongoDB/include/Poco/MongoDB/OpMsgMessage.h new file mode 100644 index 00000000000..699c7fc4e12 --- /dev/null +++ b/base/poco/MongoDB/include/Poco/MongoDB/OpMsgMessage.h @@ -0,0 +1,163 @@ +// +// OpMsgMessage.h +// +// Library: MongoDB +// Package: MongoDB +// Module: OpMsgMessage +// +// Definition of the OpMsgMessage class. +// +// Copyright (c) 2022, Applied Informatics Software Engineering GmbH. +// and Contributors. +// +// SPDX-License-Identifier: BSL-1.0 +// + + +#ifndef MongoDB_OpMsgMessage_INCLUDED +#define MongoDB_OpMsgMessage_INCLUDED + + +#include "Poco/MongoDB/Document.h" +#include "Poco/MongoDB/Message.h" +#include "Poco/MongoDB/MongoDB.h" + +#include + +namespace Poco +{ +namespace MongoDB +{ + + + class MongoDB_API OpMsgMessage : public Message + /// This class represents a request/response (OP_MSG) to send requests and receive responses to/from MongoDB. + { + public: + // Constants for most often used MongoDB commands that can be sent using OP_MSG + // For complete list see: https://www.mongodb.com/docs/manual/reference/command/ + + // Query and write + static const std::string CMD_INSERT; + static const std::string CMD_DELETE; + static const std::string CMD_UPDATE; + static const std::string CMD_FIND; + static const std::string CMD_FIND_AND_MODIFY; + static const std::string CMD_GET_MORE; + + // Aggregation + static const std::string CMD_AGGREGATE; + static const std::string CMD_COUNT; + static const std::string CMD_DISTINCT; + static const std::string CMD_MAP_REDUCE; + + // Replication and administration + static const std::string CMD_HELLO; + static const std::string CMD_REPL_SET_GET_STATUS; + static const std::string CMD_REPL_SET_GET_CONFIG; + + static const std::string CMD_CREATE; + static const std::string CMD_CREATE_INDEXES; + static const std::string CMD_DROP; + static const std::string CMD_DROP_DATABASE; + static const std::string CMD_KILL_CURSORS; + static const std::string CMD_LIST_DATABASES; + static const std::string CMD_LIST_INDEXES; + + // Diagnostic + static const std::string CMD_BUILD_INFO; + static const std::string CMD_COLL_STATS; + static const std::string CMD_DB_STATS; + static const std::string CMD_HOST_INFO; + + + enum Flags : UInt32 + { + MSG_FLAGS_DEFAULT = 0, + + MSG_CHECKSUM_PRESENT = (1 << 0), + + MSG_MORE_TO_COME = (1 << 1), + /// Sender will send another message and is not prepared for overlapping messages + + MSG_EXHAUST_ALLOWED = (1 << 16) + /// Client is prepared for multiple replies (using the moreToCome bit) to this request + }; + + OpMsgMessage(); + /// Creates an OpMsgMessage for response. + + OpMsgMessage(const std::string & databaseName, const std::string & collectionName, UInt32 flags = MSG_FLAGS_DEFAULT); + /// Creates an OpMsgMessage for requests. + + virtual ~OpMsgMessage(); + + const std::string & databaseName() const; + + const std::string & collectionName() const; + + void setCommandName(const std::string & command); + /// Sets the command name and clears the command document + + void setCursor(Poco::Int64 cursorID, Poco::Int32 batchSize = -1); + /// Sets the command "getMore" for the cursor id with batch size (if it is not negative). + + const std::string & commandName() const; + /// Current command name. + + void setAcknowledgedRequest(bool ack); + /// Set false to create request that does not return response. + /// It has effect only for commands that write or delete documents. + /// Default is true (request returns acknowledge response). + + bool acknowledgedRequest() const; + + UInt32 flags() const; + + Document & body(); + /// Access to body document. + /// Additional query arguments shall be added after setting the command name. + + const Document & body() const; + + Document::Vector & documents(); + /// Documents prepared for request or retrieved in response. + + const Document::Vector & documents() const; + /// Documents prepared for request or retrieved in response. + + bool responseOk() const; + /// Reads "ok" status from the response message. + + void clear(); + /// Clears the message. + + void send(std::ostream & ostr); + /// Writes the request to stream. + + void read(std::istream & istr); + /// Reads the response from the stream. + + private: + enum PayloadType : UInt8 + { + PAYLOAD_TYPE_0 = 0, + PAYLOAD_TYPE_1 = 1 + }; + + std::string _databaseName; + std::string _collectionName; + UInt32 _flags{MSG_FLAGS_DEFAULT}; + std::string _commandName; + bool _acknowledged{true}; + + Document _body; + Document::Vector _documents; + }; + + +} +} // namespace Poco::MongoDB + + +#endif // MongoDB_OpMsgMessage_INCLUDED diff --git a/base/poco/MongoDB/include/Poco/MongoDB/PoolableConnectionFactory.h b/base/poco/MongoDB/include/Poco/MongoDB/PoolableConnectionFactory.h index 9d35c728e5e..53f4a5127ef 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/PoolableConnectionFactory.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/PoolableConnectionFactory.h @@ -94,7 +94,23 @@ namespace MongoDB operator Connection::Ptr() { return _connection; } +#if defined(POCO_ENABLE_CPP11) + // Disable copy to prevent unwanted release of resources: C++11 way + PooledConnection(const PooledConnection &) = delete; + PooledConnection & operator=(const PooledConnection &) = delete; + + // Enable move semantics + PooledConnection(PooledConnection && other) = default; + PooledConnection & operator=(PooledConnection &&) = default; +#endif + private: +#if !defined(POCO_ENABLE_CPP11) + // Disable copy to prevent unwanted release of resources: pre C++11 way + PooledConnection(const PooledConnection &); + PooledConnection & operator=(const PooledConnection &); +#endif + Poco::ObjectPool & _pool; Connection::Ptr _connection; }; diff --git a/base/poco/MongoDB/include/Poco/MongoDB/RegularExpression.h b/base/poco/MongoDB/include/Poco/MongoDB/RegularExpression.h index b9a8694d321..244b8c14163 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/RegularExpression.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/RegularExpression.h @@ -33,7 +33,7 @@ namespace MongoDB /// Represents a regular expression in BSON format. { public: - typedef SharedPtr Ptr; + using Ptr = SharedPtr; RegularExpression(); /// Creates an empty RegularExpression. diff --git a/base/poco/MongoDB/include/Poco/MongoDB/ResponseMessage.h b/base/poco/MongoDB/include/Poco/MongoDB/ResponseMessage.h index 132859cc75f..9cb92cb16c4 100644 --- a/base/poco/MongoDB/include/Poco/MongoDB/ResponseMessage.h +++ b/base/poco/MongoDB/include/Poco/MongoDB/ResponseMessage.h @@ -38,6 +38,9 @@ namespace MongoDB ResponseMessage(); /// Creates an empty ResponseMessage. + ResponseMessage(const Int64 & cursorID); + /// Creates an ResponseMessage for existing cursor ID. + virtual ~ResponseMessage(); /// Destroys the ResponseMessage. diff --git a/base/poco/MongoDB/src/Array.cpp b/base/poco/MongoDB/src/Array.cpp index c6d96d1371d..6fff0994d82 100644 --- a/base/poco/MongoDB/src/Array.cpp +++ b/base/poco/MongoDB/src/Array.cpp @@ -20,7 +20,7 @@ namespace Poco { namespace MongoDB { -Array::Array(): +Array::Array(): Document() { } @@ -31,7 +31,7 @@ Array::~Array() } -Element::Ptr Array::get(int pos) const +Element::Ptr Array::get(std::size_t pos) const { std::string name = Poco::NumberFormatter::format(pos); return Document::get(name); diff --git a/base/poco/MongoDB/src/Connection.cpp b/base/poco/MongoDB/src/Connection.cpp index 38c31d2250a..fa20887054b 100644 --- a/base/poco/MongoDB/src/Connection.cpp +++ b/base/poco/MongoDB/src/Connection.cpp @@ -319,4 +319,30 @@ void Connection::sendRequest(RequestMessage& request, ResponseMessage& response) } +void Connection::sendRequest(OpMsgMessage& request, OpMsgMessage& response) +{ + Poco::Net::SocketOutputStream sos(_socket); + request.send(sos); + + response.clear(); + readResponse(response); +} + + +void Connection::sendRequest(OpMsgMessage& request) +{ + request.setAcknowledgedRequest(false); + Poco::Net::SocketOutputStream sos(_socket); + request.send(sos); +} + + +void Connection::readResponse(OpMsgMessage& response) +{ + Poco::Net::SocketInputStream sis(_socket); + response.read(sis); +} + + + } } // Poco::MongoDB diff --git a/base/poco/MongoDB/src/Cursor.cpp b/base/poco/MongoDB/src/Cursor.cpp index 69031e0ab65..ef7a4ca961d 100644 --- a/base/poco/MongoDB/src/Cursor.cpp +++ b/base/poco/MongoDB/src/Cursor.cpp @@ -33,6 +33,12 @@ Cursor::Cursor(const std::string& fullCollectionName, QueryRequest::Flags flags) } +Cursor::Cursor(const Document& aggregationResponse) : + _query(aggregationResponse.get("cursor")->get("ns")), + _response(aggregationResponse.get("cursor")->get("id")) +{ +} + Cursor::~Cursor() { try diff --git a/base/poco/MongoDB/src/Database.cpp b/base/poco/MongoDB/src/Database.cpp index 2b31523bdc4..1a0d3cfe559 100644 --- a/base/poco/MongoDB/src/Database.cpp +++ b/base/poco/MongoDB/src/Database.cpp @@ -334,6 +334,50 @@ bool Database::authSCRAM(Connection& connection, const std::string& username, co } +Document::Ptr Database::queryBuildInfo(Connection& connection) const +{ + // build info can be issued on "config" system database + Poco::SharedPtr request = createCommand(); + request->selector().add("buildInfo", 1); + + Poco::MongoDB::ResponseMessage response; + connection.sendRequest(*request, response); + + Document::Ptr buildInfo; + if ( response.documents().size() > 0 ) + { + buildInfo = response.documents()[0]; + } + else + { + throw Poco::ProtocolException("Didn't get a response from the buildinfo command"); + } + return buildInfo; +} + + +Document::Ptr Database::queryServerHello(Connection& connection) const +{ + // hello can be issued on "config" system database + Poco::SharedPtr request = createCommand(); + request->selector().add("hello", 1); + + Poco::MongoDB::ResponseMessage response; + connection.sendRequest(*request, response); + + Document::Ptr hello; + if ( response.documents().size() > 0 ) + { + hello = response.documents()[0]; + } + else + { + throw Poco::ProtocolException("Didn't get a response from the hello command"); + } + return hello; +} + + Int64 Database::count(Connection& connection, const std::string& collectionName) const { Poco::SharedPtr countRequest = createCountRequest(collectionName); @@ -390,7 +434,7 @@ Document::Ptr Database::getLastErrorDoc(Connection& connection) const { Document::Ptr errorDoc; - Poco::SharedPtr request = createQueryRequest("$cmd"); + Poco::SharedPtr request = createCommand(); request->setNumberToReturn(1); request->selector().add("getLastError", 1); @@ -420,7 +464,7 @@ std::string Database::getLastError(Connection& connection) const Poco::SharedPtr Database::createCountRequest(const std::string& collectionName) const { - Poco::SharedPtr request = createQueryRequest("$cmd"); + Poco::SharedPtr request = createCommand(); request->setNumberToReturn(1); request->selector().add("count", collectionName); return request; diff --git a/base/poco/MongoDB/src/DeleteRequest.cpp b/base/poco/MongoDB/src/DeleteRequest.cpp index 67a88c33302..ba75beb55fb 100644 --- a/base/poco/MongoDB/src/DeleteRequest.cpp +++ b/base/poco/MongoDB/src/DeleteRequest.cpp @@ -20,8 +20,8 @@ namespace MongoDB { DeleteRequest::DeleteRequest(const std::string& collectionName, DeleteRequest::Flags flags): - RequestMessage(MessageHeader::OP_DELETE), - _flags(flags), + RequestMessage(MessageHeader::OP_DELETE), + _flags(flags), _fullCollectionName(collectionName), _selector() { diff --git a/base/poco/MongoDB/src/Document.cpp b/base/poco/MongoDB/src/Document.cpp index 114fc993891..f7c5c9c5dc6 100644 --- a/base/poco/MongoDB/src/Document.cpp +++ b/base/poco/MongoDB/src/Document.cpp @@ -35,6 +35,14 @@ Document::~Document() } +Array& Document::addNewArray(const std::string& name) +{ + Array::Ptr newArray = new Array(); + add(name, newArray); + return *newArray; +} + + Element::Ptr Document::get(const std::string& name) const { Element::Ptr element; @@ -84,7 +92,7 @@ void Document::read(BinaryReader& reader) while (type != '\0') { Element::Ptr element; - + std::string name = BSONReader(reader).readCString(); switch (type) @@ -198,7 +206,7 @@ void Document::write(BinaryWriter& writer) else { std::stringstream sstream; - Poco::BinaryWriter tempWriter(sstream); + Poco::BinaryWriter tempWriter(sstream, BinaryWriter::LITTLE_ENDIAN_BYTE_ORDER); for (ElementSet::iterator it = _elements.begin(); it != _elements.end(); ++it) { tempWriter << static_cast((*it)->type()); @@ -207,7 +215,7 @@ void Document::write(BinaryWriter& writer) element->write(tempWriter); } tempWriter.flush(); - + Poco::Int32 len = static_cast(5 + sstream.tellp()); /* 5 = sizeof(len) + 0-byte */ writer << len; writer.writeRaw(sstream.str()); diff --git a/base/poco/MongoDB/src/Element.cpp b/base/poco/MongoDB/src/Element.cpp index 89629e0503e..f91ce264493 100644 --- a/base/poco/MongoDB/src/Element.cpp +++ b/base/poco/MongoDB/src/Element.cpp @@ -24,7 +24,7 @@ Element::Element(const std::string& name) : _name(name) } -Element::~Element() +Element::~Element() { } diff --git a/base/poco/MongoDB/src/GetMoreRequest.cpp b/base/poco/MongoDB/src/GetMoreRequest.cpp index f8a6b73c6ad..2c1f6909eb7 100644 --- a/base/poco/MongoDB/src/GetMoreRequest.cpp +++ b/base/poco/MongoDB/src/GetMoreRequest.cpp @@ -21,7 +21,7 @@ namespace MongoDB { GetMoreRequest::GetMoreRequest(const std::string& collectionName, Int64 cursorID): - RequestMessage(MessageHeader::OP_GET_MORE), + RequestMessage(MessageHeader::OP_GET_MORE), _fullCollectionName(collectionName), _numberToReturn(100), _cursorID(cursorID) diff --git a/base/poco/MongoDB/src/InsertRequest.cpp b/base/poco/MongoDB/src/InsertRequest.cpp index ec8dc9cf94a..65be5654b3e 100644 --- a/base/poco/MongoDB/src/InsertRequest.cpp +++ b/base/poco/MongoDB/src/InsertRequest.cpp @@ -20,7 +20,7 @@ namespace MongoDB { InsertRequest::InsertRequest(const std::string& collectionName, Flags flags): - RequestMessage(MessageHeader::OP_INSERT), + RequestMessage(MessageHeader::OP_INSERT), _flags(flags), _fullCollectionName(collectionName) { diff --git a/base/poco/MongoDB/src/KillCursorsRequest.cpp b/base/poco/MongoDB/src/KillCursorsRequest.cpp index 6baa0e0be8f..448002aa16a 100644 --- a/base/poco/MongoDB/src/KillCursorsRequest.cpp +++ b/base/poco/MongoDB/src/KillCursorsRequest.cpp @@ -37,7 +37,7 @@ void KillCursorsRequest::buildRequest(BinaryWriter& writer) for (std::vector::iterator it = _cursors.begin(); it != _cursors.end(); ++it) { writer << *it; - } + } } diff --git a/base/poco/MongoDB/src/Message.cpp b/base/poco/MongoDB/src/Message.cpp index c29d282d15a..7b1cb23bab6 100644 --- a/base/poco/MongoDB/src/Message.cpp +++ b/base/poco/MongoDB/src/Message.cpp @@ -19,7 +19,7 @@ namespace Poco { namespace MongoDB { -Message::Message(MessageHeader::OpCode opcode): +Message::Message(MessageHeader::OpCode opcode): _header(opcode) { } diff --git a/base/poco/MongoDB/src/MessageHeader.cpp b/base/poco/MongoDB/src/MessageHeader.cpp index 222121243db..b472bcec465 100644 --- a/base/poco/MongoDB/src/MessageHeader.cpp +++ b/base/poco/MongoDB/src/MessageHeader.cpp @@ -20,10 +20,10 @@ namespace Poco { namespace MongoDB { -MessageHeader::MessageHeader(OpCode opCode): - _messageLength(0), - _requestID(0), - _responseTo(0), +MessageHeader::MessageHeader(OpCode opCode): + _messageLength(0), + _requestID(0), + _responseTo(0), _opCode(opCode) { } @@ -42,7 +42,7 @@ void MessageHeader::read(BinaryReader& reader) Int32 opCode; reader >> opCode; - _opCode = (OpCode) opCode; + _opCode = static_cast(opCode); if (!reader.good()) { @@ -56,7 +56,7 @@ void MessageHeader::write(BinaryWriter& writer) writer << _messageLength; writer << _requestID; writer << _responseTo; - writer << (Int32) _opCode; + writer << static_cast(_opCode); } diff --git a/base/poco/MongoDB/src/ObjectId.cpp b/base/poco/MongoDB/src/ObjectId.cpp index 3065a2ffc30..0125c246c2d 100644 --- a/base/poco/MongoDB/src/ObjectId.cpp +++ b/base/poco/MongoDB/src/ObjectId.cpp @@ -32,7 +32,7 @@ ObjectId::ObjectId(const std::string& id) poco_assert_dbg(id.size() == 24); const char* p = id.c_str(); - for (std::size_t i = 0; i < 12; ++i) + for (std::size_t i = 0; i < 12; ++i) { _id[i] = fromHex(p); p += 2; diff --git a/base/poco/MongoDB/src/OpMsgCursor.cpp b/base/poco/MongoDB/src/OpMsgCursor.cpp new file mode 100644 index 00000000000..bc95851ae33 --- /dev/null +++ b/base/poco/MongoDB/src/OpMsgCursor.cpp @@ -0,0 +1,187 @@ +// +// OpMsgCursor.cpp +// +// Library: MongoDB +// Package: MongoDB +// Module: OpMsgCursor +// +// Copyright (c) 2022, Applied Informatics Software Engineering GmbH. +// and Contributors. +// +// SPDX-License-Identifier: BSL-1.0 +// + + +#include "Poco/MongoDB/OpMsgCursor.h" +#include "Poco/MongoDB/Array.h" + +// +// NOTE: +// +// MongoDB specification indicates that the flag MSG_EXHAUST_ALLOWED shall be +// used in the request when the receiver is ready to receive multiple messages +// without sending additional requests in between. Sender (MongoDB) indicates +// that more messages follow with flag MSG_MORE_TO_COME. +// +// It seems that this does not work properly. MSG_MORE_TO_COME is set and reading +// next messages sometimes works, however often the data is missing in response +// or the message header contains wrong message length and reading blocks. +// Opcode in the header is correct. +// +// Using MSG_EXHAUST_ALLOWED is therefore currently disabled. +// +// It seems that related JIRA ticket is: +// +// https://jira.mongodb.org/browse/SERVER-57297 +// +// https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst +// + +#define MONGODB_EXHAUST_ALLOWED_WORKS false + +namespace Poco { +namespace MongoDB { + + +static const std::string keyCursor {"cursor"}; +static const std::string keyFirstBatch {"firstBatch"}; +static const std::string keyNextBatch {"nextBatch"}; + +static Poco::Int64 cursorIdFromResponse(const MongoDB::Document& doc); + + +OpMsgCursor::OpMsgCursor(const std::string& db, const std::string& collection): +#if MONGODB_EXHAUST_ALLOWED_WORKS + _query(db, collection, OpMsgMessage::MSG_EXHAUST_ALLOWED) +#else + _query(db, collection) +#endif +{ +} + +OpMsgCursor::~OpMsgCursor() +{ + try + { + poco_assert_dbg(_cursorID == 0); + } + catch (...) + { + } +} + + +void OpMsgCursor::setEmptyFirstBatch(bool empty) +{ + _emptyFirstBatch = empty; +} + + +bool OpMsgCursor::emptyFirstBatch() const +{ + return _emptyFirstBatch; +} + + +void OpMsgCursor::setBatchSize(Int32 batchSize) +{ + _batchSize = batchSize; +} + + +Int32 OpMsgCursor::batchSize() const +{ + return _batchSize; +} + + +OpMsgMessage& OpMsgCursor::next(Connection& connection) +{ + if (_cursorID == 0) + { + _response.clear(); + + if (_emptyFirstBatch || _batchSize > 0) + { + Int32 bsize = _emptyFirstBatch ? 0 : _batchSize; + if (_query.commandName() == OpMsgMessage::CMD_FIND) + { + _query.body().add("batchSize", bsize); + } + else if (_query.commandName() == OpMsgMessage::CMD_AGGREGATE) + { + auto& cursorDoc = _query.body().addNewDocument("cursor"); + cursorDoc.add("batchSize", bsize); + } + } + + connection.sendRequest(_query, _response); + + const auto& rdoc = _response.body(); + _cursorID = cursorIdFromResponse(rdoc); + } + else + { +#if MONGODB_EXHAUST_ALLOWED_WORKS + std::cout << "Response flags: " << _response.flags() << std::endl; + if (_response.flags() & OpMsgMessage::MSG_MORE_TO_COME) + { + std::cout << "More to come. Reading more response: " << std::endl; + _response.clear(); + connection.readResponse(_response); + } + else +#endif + { + _response.clear(); + _query.setCursor(_cursorID, _batchSize); + connection.sendRequest(_query, _response); + } + } + + const auto& rdoc = _response.body(); + _cursorID = cursorIdFromResponse(rdoc); + + return _response; +} + + +void OpMsgCursor::kill(Connection& connection) +{ + _response.clear(); + if (_cursorID != 0) + { + _query.setCommandName(OpMsgMessage::CMD_KILL_CURSORS); + + MongoDB::Array::Ptr cursors = new MongoDB::Array(); + cursors->add(_cursorID); + _query.body().add("cursors", cursors); + + connection.sendRequest(_query, _response); + + const auto killed = _response.body().get("cursorsKilled", nullptr); + if (!killed || killed->size() != 1 || killed->get(0, -1) != _cursorID) + { + throw Poco::ProtocolException("Cursor not killed as expected: " + std::to_string(_cursorID)); + } + + _cursorID = 0; + _query.clear(); + _response.clear(); + } +} + + +Poco::Int64 cursorIdFromResponse(const MongoDB::Document& doc) +{ + Poco::Int64 id {0}; + auto cursorDoc = doc.get(keyCursor, nullptr); + if(cursorDoc) + { + id = cursorDoc->get("id", 0); + } + return id; +} + + +} } // Namespace Poco::MongoDB diff --git a/base/poco/MongoDB/src/OpMsgMessage.cpp b/base/poco/MongoDB/src/OpMsgMessage.cpp new file mode 100644 index 00000000000..2b55772ca59 --- /dev/null +++ b/base/poco/MongoDB/src/OpMsgMessage.cpp @@ -0,0 +1,412 @@ +// +// OpMsgMessage.cpp +// +// Library: MongoDB +// Package: MongoDB +// Module: OpMsgMessage +// +// Copyright (c) 2022, Applied Informatics Software Engineering GmbH. +// and Contributors. +// +// SPDX-License-Identifier: BSL-1.0 +// + +#include "Poco/MongoDB/OpMsgMessage.h" +#include "Poco/MongoDB/MessageHeader.h" +#include "Poco/MongoDB/Array.h" +#include "Poco/StreamCopier.h" +#include "Poco/Logger.h" + +#define POCO_MONGODB_DUMP false + +namespace Poco { +namespace MongoDB { + +// Query and write +const std::string OpMsgMessage::CMD_INSERT { "insert" }; +const std::string OpMsgMessage::CMD_DELETE { "delete" }; +const std::string OpMsgMessage::CMD_UPDATE { "update" }; +const std::string OpMsgMessage::CMD_FIND { "find" }; +const std::string OpMsgMessage::CMD_FIND_AND_MODIFY { "findAndModify" }; +const std::string OpMsgMessage::CMD_GET_MORE { "getMore" }; + +// Aggregation +const std::string OpMsgMessage::CMD_AGGREGATE { "aggregate" }; +const std::string OpMsgMessage::CMD_COUNT { "count" }; +const std::string OpMsgMessage::CMD_DISTINCT { "distinct" }; +const std::string OpMsgMessage::CMD_MAP_REDUCE { "mapReduce" }; + +// Replication and administration +const std::string OpMsgMessage::CMD_HELLO { "hello" }; +const std::string OpMsgMessage::CMD_REPL_SET_GET_STATUS { "replSetGetStatus" }; +const std::string OpMsgMessage::CMD_REPL_SET_GET_CONFIG { "replSetGetConfig" }; + +const std::string OpMsgMessage::CMD_CREATE { "create" }; +const std::string OpMsgMessage::CMD_CREATE_INDEXES { "createIndexes" }; +const std::string OpMsgMessage::CMD_DROP { "drop" }; +const std::string OpMsgMessage::CMD_DROP_DATABASE { "dropDatabase" }; +const std::string OpMsgMessage::CMD_KILL_CURSORS { "killCursors" }; +const std::string OpMsgMessage::CMD_LIST_DATABASES { "listDatabases" }; +const std::string OpMsgMessage::CMD_LIST_INDEXES { "listIndexes" }; + +// Diagnostic +const std::string OpMsgMessage::CMD_BUILD_INFO { "buildInfo" }; +const std::string OpMsgMessage::CMD_COLL_STATS { "collStats" }; +const std::string OpMsgMessage::CMD_DB_STATS { "dbStats" }; +const std::string OpMsgMessage::CMD_HOST_INFO { "hostInfo" }; + + +static const std::string& commandIdentifier(const std::string& command); + /// Commands have different names for the payload that is sent in a separate section + + +static const std::string keyCursor {"cursor"}; +static const std::string keyFirstBatch {"firstBatch"}; +static const std::string keyNextBatch {"nextBatch"}; + + +OpMsgMessage::OpMsgMessage() : + Message(MessageHeader::OP_MSG) +{ +} + + +OpMsgMessage::OpMsgMessage(const std::string& databaseName, const std::string& collectionName, UInt32 flags) : + Message(MessageHeader::OP_MSG), + _databaseName(databaseName), + _collectionName(collectionName), + _flags(flags) +{ +} + + +OpMsgMessage::~OpMsgMessage() +{ +} + +const std::string& OpMsgMessage::databaseName() const +{ + return _databaseName; +} + + +const std::string& OpMsgMessage::collectionName() const +{ + return _collectionName; +} + + +void OpMsgMessage::setCommandName(const std::string& command) +{ + _commandName = command; + _body.clear(); + + // IMPORTANT: Command name must be first + if (_collectionName.empty()) + { + // Collection is not specified. It is assumed that this particular command does + // not need it. + _body.add(_commandName, Int32(1)); + } + else + { + _body.add(_commandName, _collectionName); + } + _body.add("$db", _databaseName); +} + + +void OpMsgMessage::setCursor(Poco::Int64 cursorID, Poco::Int32 batchSize) +{ + _commandName = OpMsgMessage::CMD_GET_MORE; + _body.clear(); + + // IMPORTANT: Command name must be first + _body.add(_commandName, cursorID); + _body.add("$db", _databaseName); + _body.add("collection", _collectionName); + if (batchSize > 0) + { + _body.add("batchSize", batchSize); + } +} + + +const std::string& OpMsgMessage::commandName() const +{ + return _commandName; +} + + +void OpMsgMessage::setAcknowledgedRequest(bool ack) +{ + const auto& id = commandIdentifier(_commandName); + if (id.empty()) + return; + + _acknowledged = ack; + + auto writeConcern = _body.get("writeConcern", nullptr); + if (writeConcern) + writeConcern->remove("w"); + + if (ack) + { + _flags = _flags & (~MSG_MORE_TO_COME); + } + else + { + _flags = _flags | MSG_MORE_TO_COME; + if (!writeConcern) + _body.addNewDocument("writeConcern").add("w", 0); + else + writeConcern->add("w", 0); + } + +} + + +bool OpMsgMessage::acknowledgedRequest() const +{ + return _acknowledged; +} + + +UInt32 OpMsgMessage::flags() const +{ + return _flags; +} + + +Document& OpMsgMessage::body() +{ + return _body; +} + + +const Document& OpMsgMessage::body() const +{ + return _body; +} + + +Document::Vector& OpMsgMessage::documents() +{ + return _documents; +} + + +const Document::Vector& OpMsgMessage::documents() const +{ + return _documents; +} + + +bool OpMsgMessage::responseOk() const +{ + Poco::Int64 ok {false}; + if (_body.exists("ok")) + { + ok = _body.getInteger("ok"); + } + return (ok != 0); +} + + +void OpMsgMessage::clear() +{ + _flags = MSG_FLAGS_DEFAULT; + _commandName.clear(); + _body.clear(); + _documents.clear(); +} + + +void OpMsgMessage::send(std::ostream& ostr) +{ + BinaryWriter socketWriter(ostr, BinaryWriter::LITTLE_ENDIAN_BYTE_ORDER); + + // Serialise the body + std::stringstream ss; + BinaryWriter writer(ss, BinaryWriter::LITTLE_ENDIAN_BYTE_ORDER); + writer << _flags; + + writer << PAYLOAD_TYPE_0; + _body.write(writer); + + if (!_documents.empty()) + { + // Serialise attached documents + + std::stringstream ssdoc; + BinaryWriter wdoc(ssdoc, BinaryWriter::LITTLE_ENDIAN_BYTE_ORDER); + for (auto& doc: _documents) + { + doc->write(wdoc); + } + wdoc.flush(); + + const std::string& identifier = commandIdentifier(_commandName); + const Poco::Int32 size = static_cast(sizeof(size) + identifier.size() + 1 + ssdoc.tellp()); + writer << PAYLOAD_TYPE_1; + writer << size; + writer.writeCString(identifier.c_str()); + StreamCopier::copyStream(ssdoc, ss); + } + writer.flush(); + +#if POCO_MONGODB_DUMP + const std::string section = ss.str(); + std::string dump; + Logger::formatDump(dump, section.data(), section.length()); + std::cout << dump << std::endl; +#endif + + messageLength(static_cast(ss.tellp())); + + _header.write(socketWriter); + StreamCopier::copyStream(ss, ostr); + + ostr.flush(); +} + + +void OpMsgMessage::read(std::istream& istr) +{ + std::string message; + { + BinaryReader reader(istr, BinaryReader::LITTLE_ENDIAN_BYTE_ORDER); + _header.read(reader); + + poco_assert_dbg(_header.opCode() == _header.OP_MSG); + + const std::streamsize remainingSize {_header.getMessageLength() - _header.MSG_HEADER_SIZE }; + message.reserve(remainingSize); + +#if POCO_MONGODB_DUMP + std::cout + << "Message hdr: " << _header.getMessageLength() << " " << remainingSize << " " + << _header.opCode() << " " << _header.getRequestID() << " " << _header.responseTo() + << std::endl; +#endif + + reader.readRaw(remainingSize, message); + +#if POCO_MONGODB_DUMP + std::string dump; + Logger::formatDump(dump, message.data(), message.length()); + std::cout << dump << std::endl; +#endif + } + // Read complete message and then interpret it. + + std::istringstream msgss(message); + BinaryReader reader(msgss, BinaryReader::LITTLE_ENDIAN_BYTE_ORDER); + + Poco::UInt8 payloadType {0xFF}; + + reader >> _flags; + reader >> payloadType; + poco_assert_dbg(payloadType == PAYLOAD_TYPE_0); + + _body.read(reader); + + // Read next sections from the buffer + while (msgss.good()) + { + // NOTE: Not tested yet with database, because it returns everything in the body. + // Does MongoDB ever return documents as Payload type 1? + reader >> payloadType; + if (!msgss.good()) + { + break; + } + poco_assert_dbg(payloadType == PAYLOAD_TYPE_1); +#if POCO_MONGODB_DUMP + std::cout << "section payload: " << payloadType << std::endl; +#endif + + Poco::Int32 sectionSize {0}; + reader >> sectionSize; + poco_assert_dbg(sectionSize > 0); + +#if POCO_MONGODB_DUMP + std::cout << "section size: " << sectionSize << std::endl; +#endif + std::streamoff offset = sectionSize - sizeof(sectionSize); + std::streampos endOfSection = msgss.tellg() + offset; + + std::string identifier; + reader.readCString(identifier); +#if POCO_MONGODB_DUMP + std::cout << "section identifier: " << identifier << std::endl; +#endif + + // Loop to read documents from this section. + while (msgss.tellg() < endOfSection) + { +#if POCO_MONGODB_DUMP + std::cout << "section doc: " << msgss.tellg() << " " << endOfSection << std::endl; +#endif + Document::Ptr doc = new Document(); + doc->read(reader); + _documents.push_back(doc); + if (msgss.tellg() < 0) + { + break; + } + } + } + + // Extract documents from the cursor batch if they are there. + MongoDB::Array::Ptr batch; + auto curDoc = _body.get(keyCursor, nullptr); + if (curDoc) + { + batch = curDoc->get(keyFirstBatch, nullptr); + if (!batch) + { + batch = curDoc->get(keyNextBatch, nullptr); + } + } + if (batch) + { + for(std::size_t i = 0; i < batch->size(); i++) + { + const auto& d = batch->get(i, nullptr); + if (d) + { + _documents.push_back(d); + } + } + } + +} + +const std::string& commandIdentifier(const std::string& command) +{ + // Names of identifiers for commands that send bulk documents in the request + // The identifier is set in the section type 1. + static std::map identifiers { + { OpMsgMessage::CMD_INSERT, "documents" }, + { OpMsgMessage::CMD_DELETE, "deletes" }, + { OpMsgMessage::CMD_UPDATE, "updates" }, + + // Not sure if create index can send document section + { OpMsgMessage::CMD_CREATE_INDEXES, "indexes" } + }; + + const auto i = identifiers.find(command); + if (i != identifiers.end()) + { + return i->second; + } + + // This likely means that documents are incorrectly set for a command + // that does not send list of documents in section type 1. + static const std::string emptyIdentifier; + return emptyIdentifier; +} + + +} } // namespace Poco::MongoDB diff --git a/base/poco/MongoDB/src/QueryRequest.cpp b/base/poco/MongoDB/src/QueryRequest.cpp index 7044335ba30..6d7d23a8456 100644 --- a/base/poco/MongoDB/src/QueryRequest.cpp +++ b/base/poco/MongoDB/src/QueryRequest.cpp @@ -20,10 +20,10 @@ namespace MongoDB { QueryRequest::QueryRequest(const std::string& collectionName, QueryRequest::Flags flags): - RequestMessage(MessageHeader::OP_QUERY), - _flags(flags), + RequestMessage(MessageHeader::OP_QUERY), + _flags(flags), _fullCollectionName(collectionName), - _numberToSkip(0), + _numberToSkip(0), _numberToReturn(100), _selector(), _returnFieldSelector() diff --git a/base/poco/MongoDB/src/RegularExpression.cpp b/base/poco/MongoDB/src/RegularExpression.cpp index e95e7da82e1..5f7eb6bb51b 100644 --- a/base/poco/MongoDB/src/RegularExpression.cpp +++ b/base/poco/MongoDB/src/RegularExpression.cpp @@ -25,8 +25,8 @@ RegularExpression::RegularExpression() } -RegularExpression::RegularExpression(const std::string& pattern, const std::string& options): - _pattern(pattern), +RegularExpression::RegularExpression(const std::string& pattern, const std::string& options): + _pattern(pattern), _options(options) { } diff --git a/base/poco/MongoDB/src/ReplicaSet.cpp b/base/poco/MongoDB/src/ReplicaSet.cpp index b56fea49311..fce2f2bdada 100644 --- a/base/poco/MongoDB/src/ReplicaSet.cpp +++ b/base/poco/MongoDB/src/ReplicaSet.cpp @@ -21,7 +21,7 @@ namespace Poco { namespace MongoDB { -ReplicaSet::ReplicaSet(const std::vector &addresses): +ReplicaSet::ReplicaSet(const std::vector &addresses): _addresses(addresses) { } @@ -81,8 +81,8 @@ Connection::Ptr ReplicaSet::isMaster(const Net::SocketAddress& address) { conn = 0; } - - return 0; + + return 0; } diff --git a/base/poco/MongoDB/src/RequestMessage.cpp b/base/poco/MongoDB/src/RequestMessage.cpp index 6391d966198..999ed8a6ba1 100644 --- a/base/poco/MongoDB/src/RequestMessage.cpp +++ b/base/poco/MongoDB/src/RequestMessage.cpp @@ -21,7 +21,7 @@ namespace Poco { namespace MongoDB { -RequestMessage::RequestMessage(MessageHeader::OpCode opcode): +RequestMessage::RequestMessage(MessageHeader::OpCode opcode): Message(opcode) { } @@ -35,7 +35,7 @@ RequestMessage::~RequestMessage() void RequestMessage::send(std::ostream& ostr) { std::stringstream ss; - BinaryWriter requestWriter(ss); + BinaryWriter requestWriter(ss, BinaryWriter::LITTLE_ENDIAN_BYTE_ORDER); buildRequest(requestWriter); requestWriter.flush(); diff --git a/base/poco/MongoDB/src/ResponseMessage.cpp b/base/poco/MongoDB/src/ResponseMessage.cpp index 3254ace63e6..e8216767494 100644 --- a/base/poco/MongoDB/src/ResponseMessage.cpp +++ b/base/poco/MongoDB/src/ResponseMessage.cpp @@ -21,10 +21,20 @@ namespace MongoDB { ResponseMessage::ResponseMessage(): - Message(MessageHeader::OP_REPLY), - _responseFlags(0), - _cursorID(0), - _startingFrom(0), + Message(MessageHeader::OP_REPLY), + _responseFlags(0), + _cursorID(0), + _startingFrom(0), + _numberReturned(0) +{ +} + + +ResponseMessage::ResponseMessage(const Int64& cursorID): + Message(MessageHeader::OP_REPLY), + _responseFlags(0), + _cursorID(cursorID), + _startingFrom(0), _numberReturned(0) { } @@ -50,7 +60,7 @@ void ResponseMessage::read(std::istream& istr) clear(); BinaryReader reader(istr, BinaryReader::LITTLE_ENDIAN_BYTE_ORDER); - + _header.read(reader); reader >> _responseFlags; diff --git a/base/poco/MongoDB/src/UpdateRequest.cpp b/base/poco/MongoDB/src/UpdateRequest.cpp index 2af4621ff64..7477fc752d5 100644 --- a/base/poco/MongoDB/src/UpdateRequest.cpp +++ b/base/poco/MongoDB/src/UpdateRequest.cpp @@ -20,7 +20,7 @@ namespace MongoDB { UpdateRequest::UpdateRequest(const std::string& collectionName, UpdateRequest::Flags flags): - RequestMessage(MessageHeader::OP_UPDATE), + RequestMessage(MessageHeader::OP_UPDATE), _flags(flags), _fullCollectionName(collectionName), _selector(), diff --git a/docker/test/integration/runner/compose/docker_compose_mongo.yml b/docker/test/integration/runner/compose/docker_compose_mongo.yml index 9a6eae6ca8c..60361e9e98d 100644 --- a/docker/test/integration/runner/compose/docker_compose_mongo.yml +++ b/docker/test/integration/runner/compose/docker_compose_mongo.yml @@ -1,7 +1,7 @@ version: '2.3' services: mongo1: - image: mongo:5.0 + image: mongo:5.1 restart: always environment: MONGO_INITDB_ROOT_USERNAME: root diff --git a/docker/test/integration/runner/compose/docker_compose_mongo_secure.yml b/docker/test/integration/runner/compose/docker_compose_mongo_secure.yml index 193e5d26568..f5b0ffed130 100644 --- a/docker/test/integration/runner/compose/docker_compose_mongo_secure.yml +++ b/docker/test/integration/runner/compose/docker_compose_mongo_secure.yml @@ -1,7 +1,7 @@ version: '2.3' services: mongo1: - image: mongo:3.6 + image: mongo:3.5 restart: always environment: MONGO_INITDB_ROOT_USERNAME: root diff --git a/src/Dictionaries/MongoDBDictionarySource.cpp b/src/Dictionaries/MongoDBDictionarySource.cpp index b7e342f3c80..46910fa9f6a 100644 --- a/src/Dictionaries/MongoDBDictionarySource.cpp +++ b/src/Dictionaries/MongoDBDictionarySource.cpp @@ -170,7 +170,7 @@ MongoDBDictionarySource::~MongoDBDictionarySource() = default; QueryPipeline MongoDBDictionarySource::loadAll() { - return QueryPipeline(std::make_shared(connection, createCursor(db, collection, sample_block), sample_block, max_block_size)); + return QueryPipeline(std::make_shared(connection, db, collection, Poco::MongoDB::Document{}, sample_block, max_block_size)); } QueryPipeline MongoDBDictionarySource::loadIds(const std::vector & ids) @@ -178,7 +178,7 @@ QueryPipeline MongoDBDictionarySource::loadIds(const std::vector & ids) if (!dict_struct.id) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'id' is required for selective loading"); - auto cursor = createCursor(db, collection, sample_block); + Poco::MongoDB::Document query; /** NOTE: While building array, Poco::MongoDB requires passing of different unused element names, along with values. * In general, Poco::MongoDB is quite inefficient and bulky. @@ -188,9 +188,9 @@ QueryPipeline MongoDBDictionarySource::loadIds(const std::vector & ids) for (const UInt64 id : ids) ids_array->add(DB::toString(id), static_cast(id)); - cursor->query().selector().addNewDocument(dict_struct.id->name).add("$in", ids_array); + query.addNewDocument(dict_struct.id->name).add("$in", ids_array); - return QueryPipeline(std::make_shared(connection, std::move(cursor), sample_block, max_block_size)); + return QueryPipeline(std::make_shared(connection, db, collection, query, sample_block, max_block_size)); } @@ -199,8 +199,7 @@ QueryPipeline MongoDBDictionarySource::loadKeys(const Columns & key_columns, con if (!dict_struct.key) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'key' is required for selective loading"); - auto cursor = createCursor(db, collection, sample_block); - + Poco::MongoDB::Document query; Poco::MongoDB::Array::Ptr keys_array(new Poco::MongoDB::Array); for (const auto row_idx : requested_rows) @@ -254,9 +253,9 @@ QueryPipeline MongoDBDictionarySource::loadKeys(const Columns & key_columns, con } /// If more than one key we should use $or - cursor->query().selector().add("$or", keys_array); + query.add("$or", keys_array); - return QueryPipeline(std::make_shared(connection, std::move(cursor), sample_block, max_block_size)); + return QueryPipeline(std::make_shared(connection, db, collection, query, sample_block, max_block_size)); } std::string MongoDBDictionarySource::toString() const diff --git a/src/Dictionaries/MongoDBDictionarySource.h b/src/Dictionaries/MongoDBDictionarySource.h index fefcb1bff9f..6d93bc6c090 100644 --- a/src/Dictionaries/MongoDBDictionarySource.h +++ b/src/Dictionaries/MongoDBDictionarySource.h @@ -16,7 +16,6 @@ namespace Util namespace MongoDB { class Connection; - class Cursor; } } diff --git a/src/Processors/Sources/MongoDBSource.cpp b/src/Processors/Sources/MongoDBSource.cpp index 279a842143f..94b9cb7ad64 100644 --- a/src/Processors/Sources/MongoDBSource.cpp +++ b/src/Processors/Sources/MongoDBSource.cpp @@ -3,10 +3,12 @@ #include #include +#include +#include #include #include +#include #include -#include #include #include @@ -365,27 +367,79 @@ namespace } -std::unique_ptr createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select) +bool isMongoDBWireProtocolOld(Poco::MongoDB::Connection & connection_) { - auto cursor = std::make_unique(database, collection); + Poco::MongoDB::Database db("config"); + Poco::MongoDB::Document::Ptr doc = db.queryServerHello(connection_); + auto _wireVersion = doc->getInteger("maxWireVersion"); + return _wireVersion < Poco::MongoDB::Database::WireVersion::VER_36; +} + + +MongoDBCursor::MongoDBCursor( + const std::string & database, + const std::string & collection, + const Block & sample_block_to_select, + const Poco::MongoDB::Document & query, + Poco::MongoDB::Connection & connection) + : is_wire_protocol_old(isMongoDBWireProtocolOld(connection)) +{ + Poco::MongoDB::Document projection; /// Looks like selecting _id column is implicit by default. if (!sample_block_to_select.has("_id")) - cursor->query().returnFieldSelector().add("_id", 0); + projection.add("_id", 0); for (const auto & column : sample_block_to_select) - cursor->query().returnFieldSelector().add(column.name, 1); - return cursor; + projection.add(column.name, 1); + + if (is_wire_protocol_old) + { + old_cursor = std::make_unique(database, collection); + old_cursor->query().selector() = query; + old_cursor->query().returnFieldSelector() = projection; + } + else + { + new_cursor = std::make_unique(database, collection); + new_cursor->query().setCommandName(Poco::MongoDB::OpMsgMessage::CMD_FIND); + new_cursor->query().body().addNewDocument("filter") = query; + new_cursor->query().body().addNewDocument("projection") = projection; + } } +Poco::MongoDB::Document::Vector MongoDBCursor::nextDocuments(Poco::MongoDB::Connection & connection) +{ + if (is_wire_protocol_old) + { + auto response = old_cursor->next(connection); + cursorID_ = response.cursorID(); + return std::move(response.documents()); + } + else + { + auto response = new_cursor->next(connection); + cursorID_ = new_cursor->cursorID(); + return std::move(response.documents()); + } +} + +Int64 MongoDBCursor::cursorID() +{ + return cursorID_; +} + + MongoDBSource::MongoDBSource( std::shared_ptr & connection_, - std::unique_ptr cursor_, + const String & database_name_, + const String & collection_name_, + const Poco::MongoDB::Document & query_, const Block & sample_block, UInt64 max_block_size_) : ISource(sample_block.cloneEmpty()) , connection(connection_) - , cursor{std::move(cursor_)} + , cursor(database_name_, collection_name_, sample_block, query_, *connection_) , max_block_size{max_block_size_} { description.init(sample_block); @@ -412,9 +466,9 @@ Chunk MongoDBSource::generate() size_t num_rows = 0; while (num_rows < max_block_size) { - Poco::MongoDB::ResponseMessage & response = cursor->next(*connection); + auto documents = cursor.nextDocuments(*connection); - for (auto & document : response.documents()) + for (auto & document : documents) { if (document->exists("ok") && document->exists("$err") && document->exists("code") && document->getInteger("ok") == 0) @@ -458,7 +512,7 @@ Chunk MongoDBSource::generate() } } - if (response.cursorID() == 0) + if (cursor.cursorID() == 0) { all_read = true; break; diff --git a/src/Processors/Sources/MongoDBSource.h b/src/Processors/Sources/MongoDBSource.h index d4681d2c05f..f816ccfd1c9 100644 --- a/src/Processors/Sources/MongoDBSource.h +++ b/src/Processors/Sources/MongoDBSource.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -14,7 +15,9 @@ namespace Poco namespace MongoDB { class Connection; + class Document; class Cursor; + class OpMsgCursor; } } @@ -30,7 +33,28 @@ struct MongoDBArrayInfo void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password); -std::unique_ptr createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select); +bool isMongoDBWireProtocolOld(Poco::MongoDB::Connection & connection_); + +class MongoDBCursor +{ +public: + MongoDBCursor( + const std::string & database, + const std::string & collection, + const Block & sample_block_to_select, + const Poco::MongoDB::Document & query, + Poco::MongoDB::Connection & connection); + + Poco::MongoDB::Document::Vector nextDocuments(Poco::MongoDB::Connection & connection); + + Int64 cursorID(); + +private: + const bool is_wire_protocol_old; + std::unique_ptr old_cursor; + std::unique_ptr new_cursor; + Int64 cursorID_ = 0; +}; /// Converts MongoDB Cursor to a stream of Blocks class MongoDBSource final : public ISource @@ -38,7 +62,9 @@ class MongoDBSource final : public ISource public: MongoDBSource( std::shared_ptr & connection_, - std::unique_ptr cursor_, + const String & database_name_, + const String & collection_name_, + const Poco::MongoDB::Document & query_, const Block & sample_block, UInt64 max_block_size_); @@ -50,7 +76,7 @@ private: Chunk generate() override; std::shared_ptr connection; - std::unique_ptr cursor; + MongoDBCursor cursor; const UInt64 max_block_size; ExternalResultDescription description; bool all_read = false; diff --git a/src/Storages/StorageMongoDB.cpp b/src/Storages/StorageMongoDB.cpp index 63b8c2d00a1..2a1d7e80c07 100644 --- a/src/Storages/StorageMongoDB.cpp +++ b/src/Storages/StorageMongoDB.cpp @@ -99,6 +99,7 @@ public: , db_name(db_name_) , metadata_snapshot{metadata_snapshot_} , connection(connection_) + , is_wire_protocol_old(isMongoDBWireProtocolOld(*connection_)) { } @@ -107,7 +108,7 @@ public: void consume(Chunk chunk) override { Poco::MongoDB::Database db(db_name); - Poco::MongoDB::Document::Ptr index = new Poco::MongoDB::Document(); + Poco::MongoDB::Document::Vector documents; auto block = getHeader().cloneWithColumns(chunk.detachColumns()); @@ -118,20 +119,35 @@ public: const auto data_types = block.getDataTypes(); const auto data_names = block.getNames(); - std::vector row(num_cols); + documents.reserve(num_rows); + for (const auto i : collections::range(0, num_rows)) { + Poco::MongoDB::Document::Ptr document = new Poco::MongoDB::Document(); + for (const auto j : collections::range(0, num_cols)) { WriteBufferFromOwnString ostr; data_types[j]->getDefaultSerialization()->serializeText(*columns[j], i, ostr, FormatSettings{}); - row[j] = ostr.str(); - index->add(data_names[j], row[j]); + document->add(data_names[j], ostr.str()); } + + documents.push_back(std::move(document)); + } + + if (is_wire_protocol_old) + { + Poco::SharedPtr insert_request = db.createInsertRequest(collection_name); + insert_request->documents() = std::move(documents); + connection->sendRequest(*insert_request); + } + else + { + Poco::SharedPtr insert_request = db.createOpMsgMessage(collection_name); + insert_request->setCommandName(Poco::MongoDB::OpMsgMessage::CMD_INSERT); + insert_request->documents() = std::move(documents); + connection->sendRequest(*insert_request); } - Poco::SharedPtr insert_request = db.createInsertRequest(collection_name); - insert_request->documents().push_back(index); - connection->sendRequest(*insert_request); } private: @@ -139,6 +155,8 @@ private: String db_name; StorageMetadataPtr metadata_snapshot; std::shared_ptr connection; + + const bool is_wire_protocol_old; }; @@ -162,7 +180,7 @@ Pipe StorageMongoDB::read( sample_block.insert({ column_data.type, column_data.name }); } - return Pipe(std::make_shared(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size)); + return Pipe(std::make_shared(connection, database_name, collection_name, Poco::MongoDB::Document{}, sample_block, max_block_size)); } SinkToStoragePtr StorageMongoDB::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr /* context */) diff --git a/tests/integration/test_storage_mongodb/test.py b/tests/integration/test_storage_mongodb/test.py index 6ba5520704d..e6e77c64515 100644 --- a/tests/integration/test_storage_mongodb/test.py +++ b/tests/integration/test_storage_mongodb/test.py @@ -71,6 +71,39 @@ def test_simple_select(started_cluster): simple_mongo_table.drop() +def test_simple_select_from_view(started_cluster): + mongo_connection = get_mongo_connection(started_cluster) + db = mongo_connection["test"] + db.add_user("root", "clickhouse") + simple_mongo_table = db["simple_table"] + data = [] + for i in range(0, 100): + data.append({"key": i, "data": hex(i * i)}) + simple_mongo_table.insert_many(data) + simple_mongo_table_view = db.create_collection( + "simple_table_view", viewOn="simple_table" + ) + + node = started_cluster.instances["node"] + node.query( + "CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo2:27017', 'test', 'simple_table_view', 'root', 'clickhouse')" + ) + + assert node.query("SELECT COUNT() FROM simple_mongo_table") == "100\n" + assert ( + node.query("SELECT sum(key) FROM simple_mongo_table") + == str(sum(range(0, 100))) + "\n" + ) + + assert ( + node.query("SELECT data from simple_mongo_table where key = 42") + == hex(42 * 42) + "\n" + ) + node.query("DROP TABLE simple_mongo_table") + simple_mongo_table_view.drop() + simple_mongo_table.drop() + + @pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"]) def test_arrays(started_cluster): mongo_connection = get_mongo_connection(started_cluster) @@ -411,13 +444,16 @@ def test_simple_insert_select(started_cluster): node.query( "CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse')" ) - node.query("INSERT INTO simple_mongo_table SELECT 1, 'kek'") + node.query( + "INSERT INTO simple_mongo_table SELECT number, 'kek' || toString(number) FROM numbers(10)" + ) assert ( - node.query("SELECT data from simple_mongo_table where key = 1").strip() == "kek" + node.query("SELECT data from simple_mongo_table where key = 7").strip() + == "kek7" ) node.query("INSERT INTO simple_mongo_table(key) SELECT 12") - assert int(node.query("SELECT count() from simple_mongo_table")) == 2 + assert int(node.query("SELECT count() from simple_mongo_table")) == 11 assert ( node.query("SELECT data from simple_mongo_table where key = 12").strip() == "" ) From 90f4b1777832a8e6c3a343671be091c727f3e065 Mon Sep 17 00:00:00 2001 From: Nikolay Degterinsky Date: Mon, 22 May 2023 15:45:18 +0000 Subject: [PATCH 2/6] Fix build & test --- .../test/integration/runner/compose/docker_compose_mongo.yml | 4 ++-- src/Processors/Sources/MongoDBSource.cpp | 2 +- src/Processors/Sources/MongoDBSource.h | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docker/test/integration/runner/compose/docker_compose_mongo.yml b/docker/test/integration/runner/compose/docker_compose_mongo.yml index 60361e9e98d..8cdcbc421e8 100644 --- a/docker/test/integration/runner/compose/docker_compose_mongo.yml +++ b/docker/test/integration/runner/compose/docker_compose_mongo.yml @@ -1,7 +1,7 @@ version: '2.3' services: mongo1: - image: mongo:5.1 + image: mongo:6.0 restart: always environment: MONGO_INITDB_ROOT_USERNAME: root @@ -11,7 +11,7 @@ services: command: --profile=2 --verbose mongo2: - image: mongo:5.0 + image: mongo:6.0 restart: always ports: - ${MONGO_NO_CRED_EXTERNAL_PORT:-27017}:${MONGO_NO_CRED_INTERNAL_PORT:-27017} diff --git a/src/Processors/Sources/MongoDBSource.cpp b/src/Processors/Sources/MongoDBSource.cpp index 94b9cb7ad64..74dfa13158c 100644 --- a/src/Processors/Sources/MongoDBSource.cpp +++ b/src/Processors/Sources/MongoDBSource.cpp @@ -424,7 +424,7 @@ Poco::MongoDB::Document::Vector MongoDBCursor::nextDocuments(Poco::MongoDB::Conn } } -Int64 MongoDBCursor::cursorID() +Int64 MongoDBCursor::cursorID() const { return cursorID_; } diff --git a/src/Processors/Sources/MongoDBSource.h b/src/Processors/Sources/MongoDBSource.h index f816ccfd1c9..2bc5481e20b 100644 --- a/src/Processors/Sources/MongoDBSource.h +++ b/src/Processors/Sources/MongoDBSource.h @@ -47,7 +47,7 @@ public: Poco::MongoDB::Document::Vector nextDocuments(Poco::MongoDB::Connection & connection); - Int64 cursorID(); + Int64 cursorID() const; private: const bool is_wire_protocol_old; From be49281044eba2be91c46666ce12a28da446585c Mon Sep 17 00:00:00 2001 From: Nikolay Degterinsky Date: Wed, 24 May 2023 00:48:09 +0000 Subject: [PATCH 3/6] Try to fix test --- .../test/integration/runner/compose/docker_compose_mongo.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/test/integration/runner/compose/docker_compose_mongo.yml b/docker/test/integration/runner/compose/docker_compose_mongo.yml index 8cdcbc421e8..9a6eae6ca8c 100644 --- a/docker/test/integration/runner/compose/docker_compose_mongo.yml +++ b/docker/test/integration/runner/compose/docker_compose_mongo.yml @@ -1,7 +1,7 @@ version: '2.3' services: mongo1: - image: mongo:6.0 + image: mongo:5.0 restart: always environment: MONGO_INITDB_ROOT_USERNAME: root @@ -11,7 +11,7 @@ services: command: --profile=2 --verbose mongo2: - image: mongo:6.0 + image: mongo:5.0 restart: always ports: - ${MONGO_NO_CRED_EXTERNAL_PORT:-27017}:${MONGO_NO_CRED_INTERNAL_PORT:-27017} From f48845fa0d56dc81a44fd6314342849325d78b0b Mon Sep 17 00:00:00 2001 From: Nikolay Degterinsky Date: Thu, 25 May 2023 12:23:35 +0000 Subject: [PATCH 4/6] Fix test once again --- .../integration/runner/compose/docker_compose_mongo_secure.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/test/integration/runner/compose/docker_compose_mongo_secure.yml b/docker/test/integration/runner/compose/docker_compose_mongo_secure.yml index f5b0ffed130..193e5d26568 100644 --- a/docker/test/integration/runner/compose/docker_compose_mongo_secure.yml +++ b/docker/test/integration/runner/compose/docker_compose_mongo_secure.yml @@ -1,7 +1,7 @@ version: '2.3' services: mongo1: - image: mongo:3.5 + image: mongo:3.6 restart: always environment: MONGO_INITDB_ROOT_USERNAME: root From ad85faabd1941b891b406225ecf7c6b568c8328f Mon Sep 17 00:00:00 2001 From: Nikolay Degterinsky Date: Mon, 5 Jun 2023 16:09:53 +0000 Subject: [PATCH 5/6] Fix test --- tests/integration/test_storage_mongodb/test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_storage_mongodb/test.py b/tests/integration/test_storage_mongodb/test.py index e6e77c64515..6ce71fb91fa 100644 --- a/tests/integration/test_storage_mongodb/test.py +++ b/tests/integration/test_storage_mongodb/test.py @@ -71,6 +71,7 @@ def test_simple_select(started_cluster): simple_mongo_table.drop() +@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"]) def test_simple_select_from_view(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] @@ -86,7 +87,7 @@ def test_simple_select_from_view(started_cluster): node = started_cluster.instances["node"] node.query( - "CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo2:27017', 'test', 'simple_table_view', 'root', 'clickhouse')" + "CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table_view', 'root', 'clickhouse')" ) assert node.query("SELECT COUNT() FROM simple_mongo_table") == "100\n" From c71edb6c798163ad50a077a19a3bf74eb57ba212 Mon Sep 17 00:00:00 2001 From: Nikolay Degterinsky Date: Fri, 9 Jun 2023 17:29:42 +0000 Subject: [PATCH 6/6] Fix style --- src/Processors/Sources/MongoDBSource.cpp | 10 +++++----- src/Processors/Sources/MongoDBSource.h | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Processors/Sources/MongoDBSource.cpp b/src/Processors/Sources/MongoDBSource.cpp index 74dfa13158c..cd4db416a29 100644 --- a/src/Processors/Sources/MongoDBSource.cpp +++ b/src/Processors/Sources/MongoDBSource.cpp @@ -371,8 +371,8 @@ bool isMongoDBWireProtocolOld(Poco::MongoDB::Connection & connection_) { Poco::MongoDB::Database db("config"); Poco::MongoDB::Document::Ptr doc = db.queryServerHello(connection_); - auto _wireVersion = doc->getInteger("maxWireVersion"); - return _wireVersion < Poco::MongoDB::Database::WireVersion::VER_36; + auto wire_version = doc->getInteger("maxWireVersion"); + return wire_version < Poco::MongoDB::Database::WireVersion::VER_36; } @@ -413,20 +413,20 @@ Poco::MongoDB::Document::Vector MongoDBCursor::nextDocuments(Poco::MongoDB::Conn if (is_wire_protocol_old) { auto response = old_cursor->next(connection); - cursorID_ = response.cursorID(); + cursor_id = response.cursorID(); return std::move(response.documents()); } else { auto response = new_cursor->next(connection); - cursorID_ = new_cursor->cursorID(); + cursor_id = new_cursor->cursorID(); return std::move(response.documents()); } } Int64 MongoDBCursor::cursorID() const { - return cursorID_; + return cursor_id; } diff --git a/src/Processors/Sources/MongoDBSource.h b/src/Processors/Sources/MongoDBSource.h index 2bc5481e20b..0e95d42c028 100644 --- a/src/Processors/Sources/MongoDBSource.h +++ b/src/Processors/Sources/MongoDBSource.h @@ -53,7 +53,7 @@ private: const bool is_wire_protocol_old; std::unique_ptr old_cursor; std::unique_ptr new_cursor; - Int64 cursorID_ = 0; + Int64 cursor_id = 0; }; /// Converts MongoDB Cursor to a stream of Blocks