Merge pull request #50061 from evillique/update-mongo

Update MongoDB protocol & fix inserts
This commit is contained in:
Nikolay Degterinsky 2023-06-10 22:52:15 +02:00 committed by GitHub
commit dc9001a969
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 1398 additions and 101 deletions

View File

@ -117,6 +117,9 @@ public:
void readRaw(char * buffer, std::streamsize length);
/// Reads length bytes of raw data into buffer.
void readCString(std::string& value);
/// Reads zero-terminated C-string into value.
void readBOM();
/// Reads a byte-order mark from the stream and configures
/// the reader for the encountered byte order.

View File

@ -56,6 +56,8 @@ public:
LITTLE_ENDIAN_BYTE_ORDER = 3 /// little-endian byte-order
};
static const std::streamsize DEFAULT_MAX_CSTR_LENGTH { 1024 };
BinaryWriter(std::ostream & ostr, StreamByteOrder byteOrder = NATIVE_BYTE_ORDER);
/// Creates the BinaryWriter.
@ -131,6 +133,9 @@ public:
void writeRaw(const char * buffer, std::streamsize length);
/// Writes length raw bytes from the given buffer to the stream.
void writeCString(const char* cString, std::streamsize maxLength = DEFAULT_MAX_CSTR_LENGTH);
/// Writes zero-terminated C-string.
void writeBOM();
/// Writes a byte-order mark to the stream. A byte order mark is
/// a 16-bit integer with a value of 0xFEFF, written in host byte-order.

View File

@ -274,6 +274,31 @@ void BinaryReader::readRaw(char* buffer, std::streamsize length)
}
void BinaryReader::readCString(std::string& value)
{
value.clear();
if (!_istr.good())
{
return;
}
value.reserve(256);
while (true)
{
char c;
_istr.get(c);
if (!_istr.good())
{
break;
}
if (c == '\0')
{
break;
}
value += c;
}
}
void BinaryReader::readBOM()
{
UInt16 bom;

View File

@ -332,6 +332,15 @@ void BinaryWriter::writeRaw(const char* buffer, std::streamsize length)
}
void BinaryWriter::writeCString(const char* cString, std::streamsize maxLength)
{
const std::size_t len = ::strnlen(cString, maxLength);
writeRaw(cString, len);
static const char zero = '\0';
_ostr.write(&zero, sizeof(zero));
}
void BinaryWriter::writeBOM()
{
UInt16 value = 0xFEFF;

View File

@ -13,3 +13,4 @@ target_compile_options (_poco_mongodb
target_include_directories (_poco_mongodb SYSTEM PUBLIC "include")
target_link_libraries (_poco_mongodb PUBLIC Poco::Net)

View File

@ -33,7 +33,7 @@ namespace MongoDB
/// This class represents a BSON Array.
{
public:
typedef SharedPtr<Array> Ptr;
using Ptr = SharedPtr<Array>;
Array();
/// Creates an empty Array.
@ -41,8 +41,31 @@ namespace MongoDB
virtual ~Array();
/// Destroys the Array.
// Document template functions available for backward compatibility
using Document::add;
using Document::get;
template <typename T>
T get(int pos) const
Document & add(T value)
/// Creates an element with the name from the current pos and value and
/// adds it to the array document.
///
/// The active document is returned to allow chaining of the add methods.
{
return Document::add<T>(Poco::NumberFormatter::format(size()), value);
}
Document & add(const char * value)
/// Creates an element with a name from the current pos and value and
/// adds it to the array document.
///
/// The active document is returned to allow chaining of the add methods.
{
return Document::add(Poco::NumberFormatter::format(size()), value);
}
template <typename T>
T get(std::size_t pos) const
/// Returns the element at the given index and tries to convert
/// it to the template type. If the element is not found, a
/// Poco::NotFoundException will be thrown. If the element cannot be
@ -52,7 +75,7 @@ namespace MongoDB
}
template <typename T>
T get(int pos, const T & deflt) const
T get(std::size_t pos, const T & deflt) const
/// Returns the element at the given index and tries to convert
/// it to the template type. If the element is not found, or
/// has the wrong type, the deflt argument will be returned.
@ -60,12 +83,12 @@ namespace MongoDB
return Document::get<T>(Poco::NumberFormatter::format(pos), deflt);
}
Element::Ptr get(int pos) const;
Element::Ptr get(std::size_t pos) const;
/// Returns the element at the given index.
/// An empty element will be returned if the element is not found.
template <typename T>
bool isType(int pos) const
bool isType(std::size_t pos) const
/// Returns true if the type of the element equals the TypeId of ElementTrait,
/// otherwise false.
{
@ -74,6 +97,9 @@ namespace MongoDB
std::string toString(int indent = 0) const;
/// Returns a string representation of the Array.
private:
friend void BSONReader::read<Array::Ptr>(Array::Ptr & to);
};

View File

@ -40,7 +40,7 @@ namespace MongoDB
/// A Binary stores its data in a Poco::Buffer<unsigned char>.
{
public:
typedef SharedPtr<Binary> Ptr;
using Ptr = SharedPtr<Binary>;
Binary();
/// Creates an empty Binary with subtype 0.

View File

@ -18,6 +18,7 @@
#define MongoDB_Connection_INCLUDED
#include "Poco/MongoDB/OpMsgMessage.h"
#include "Poco/MongoDB/RequestMessage.h"
#include "Poco/MongoDB/ResponseMessage.h"
#include "Poco/Mutex.h"
@ -39,7 +40,7 @@ namespace MongoDB
/// for more information on the wire protocol.
{
public:
typedef Poco::SharedPtr<Connection> Ptr;
using Ptr = Poco::SharedPtr<Connection>;
class MongoDB_API SocketFactory
{
@ -145,6 +146,21 @@ namespace MongoDB
/// Use this when a response is expected: only a "query" or "getmore"
/// request will return a response.
void sendRequest(OpMsgMessage & request, OpMsgMessage & response);
/// Sends a request to the MongoDB server and receives the response
/// using newer wire protocol with OP_MSG.
void sendRequest(OpMsgMessage & request);
/// Sends an unacknowledged request to the MongoDB server using newer
/// wire protocol with OP_MSG.
/// No response is sent by the server.
void readResponse(OpMsgMessage & response);
/// Reads additional response data when previous message's flag moreToCome
/// indicates that server will send more data.
/// NOTE: See comments in OpMsgCursor code.
protected:
void connect();

View File

@ -40,6 +40,9 @@ namespace MongoDB
Cursor(const std::string & fullCollectionName, QueryRequest::Flags flags = QueryRequest::QUERY_DEFAULT);
/// Creates a Cursor for the given database and collection ("database.collection"), using the specified flags.
Cursor(const Document & aggregationResponse);
/// Creates a Cursor for the given aggregation query response.
virtual ~Cursor();
/// Destroys the Cursor.

View File

@ -26,6 +26,8 @@
#include "Poco/MongoDB/QueryRequest.h"
#include "Poco/MongoDB/UpdateRequest.h"
#include "Poco/MongoDB/OpMsgCursor.h"
#include "Poco/MongoDB/OpMsgMessage.h"
namespace Poco
{
@ -45,6 +47,9 @@ namespace MongoDB
virtual ~Database();
/// Destroys the Database.
const std::string & name() const;
/// Database name
bool authenticate(
Connection & connection,
const std::string & username,
@ -62,34 +67,49 @@ namespace MongoDB
/// May throw a Poco::ProtocolException if authentication fails for a reason other than
/// invalid credentials.
Document::Ptr queryBuildInfo(Connection & connection) const;
/// Queries server build info (all wire protocols)
Document::Ptr queryServerHello(Connection & connection) const;
/// Queries hello response from server (all wire protocols)
Int64 count(Connection & connection, const std::string & collectionName) const;
/// Sends a count request for the given collection to MongoDB.
/// Sends a count request for the given collection to MongoDB. (old wire protocol)
///
/// If the command fails, -1 is returned.
Poco::SharedPtr<Poco::MongoDB::QueryRequest> createCommand() const;
/// Creates a QueryRequest for a command.
/// Creates a QueryRequest for a command. (old wire protocol)
Poco::SharedPtr<Poco::MongoDB::QueryRequest> createCountRequest(const std::string & collectionName) const;
/// Creates a QueryRequest to count the given collection.
/// The collectionname must not contain the database name.
/// The collectionname must not contain the database name. (old wire protocol)
Poco::SharedPtr<Poco::MongoDB::DeleteRequest> createDeleteRequest(const std::string & collectionName) const;
/// Creates a DeleteRequest to delete documents in the given collection.
/// The collectionname must not contain the database name.
/// The collectionname must not contain the database name. (old wire protocol)
Poco::SharedPtr<Poco::MongoDB::InsertRequest> createInsertRequest(const std::string & collectionName) const;
/// Creates an InsertRequest to insert new documents in the given collection.
/// The collectionname must not contain the database name.
/// The collectionname must not contain the database name. (old wire protocol)
Poco::SharedPtr<Poco::MongoDB::QueryRequest> createQueryRequest(const std::string & collectionName) const;
/// Creates a QueryRequest.
/// Creates a QueryRequest. (old wire protocol)
/// The collectionname must not contain the database name.
Poco::SharedPtr<Poco::MongoDB::UpdateRequest> createUpdateRequest(const std::string & collectionName) const;
/// Creates an UpdateRequest.
/// Creates an UpdateRequest. (old wire protocol)
/// The collectionname must not contain the database name.
Poco::SharedPtr<Poco::MongoDB::OpMsgMessage> createOpMsgMessage(const std::string & collectionName) const;
/// Creates OpMsgMessage. (new wire protocol)
Poco::SharedPtr<Poco::MongoDB::OpMsgMessage> createOpMsgMessage() const;
/// Creates OpMsgMessage for database commands that do not require collection as an argument. (new wire protocol)
Poco::SharedPtr<Poco::MongoDB::OpMsgCursor> createOpMsgCursor(const std::string & collectionName) const;
/// Creates OpMsgCursor. (new wire protocol)
Poco::MongoDB::Document::Ptr ensureIndex(
Connection & connection,
const std::string & collection,
@ -100,14 +120,16 @@ namespace MongoDB
int version = 0,
int ttl = 0);
/// Creates an index. The document returned is the result of a getLastError call.
/// For more info look at the ensureIndex information on the MongoDB website.
/// For more info look at the ensureIndex information on the MongoDB website. (old wire protocol)
Document::Ptr getLastErrorDoc(Connection & connection) const;
/// Sends the getLastError command to the database and returns the error document.
/// (old wire protocol)
std::string getLastError(Connection & connection) const;
/// Sends the getLastError command to the database and returns the err element
/// from the error document. When err is null, an empty string is returned.
/// (old wire protocol)
static const std::string AUTH_MONGODB_CR;
/// Default authentication mechanism prior to MongoDB 3.0.
@ -115,6 +137,27 @@ namespace MongoDB
static const std::string AUTH_SCRAM_SHA1;
/// Default authentication mechanism for MongoDB 3.0.
enum WireVersion
/// Wire version as reported by the command hello.
/// See details in MongoDB github, repository specifications.
/// @see queryServerHello
{
VER_26 = 1,
VER_26_2 = 2,
VER_30 = 3,
VER_32 = 4,
VER_34 = 5,
VER_36 = 6, ///< First wire version that supports OP_MSG
VER_40 = 7,
VER_42 = 8,
VER_44 = 9,
VER_50 = 13,
VER_51 = 14, ///< First wire version that supports only OP_MSG
VER_52 = 15,
VER_53 = 16,
VER_60 = 17
};
protected:
bool authCR(Connection & connection, const std::string & username, const std::string & password);
bool authSCRAM(Connection & connection, const std::string & username, const std::string & password);
@ -127,6 +170,12 @@ namespace MongoDB
//
// inlines
//
inline const std::string & Database::name() const
{
return _dbname;
}
inline Poco::SharedPtr<Poco::MongoDB::QueryRequest> Database::createCommand() const
{
Poco::SharedPtr<Poco::MongoDB::QueryRequest> cmd = createQueryRequest("$cmd");
@ -158,6 +207,24 @@ namespace MongoDB
return new Poco::MongoDB::UpdateRequest(_dbname + '.' + collectionName);
}
// -- New wire protocol commands
inline Poco::SharedPtr<Poco::MongoDB::OpMsgMessage> Database::createOpMsgMessage(const std::string & collectionName) const
{
return new Poco::MongoDB::OpMsgMessage(_dbname, collectionName);
}
inline Poco::SharedPtr<Poco::MongoDB::OpMsgMessage> Database::createOpMsgMessage() const
{
// Collection name for database commands is not needed.
return createOpMsgMessage("");
}
inline Poco::SharedPtr<Poco::MongoDB::OpMsgCursor> Database::createOpMsgCursor(const std::string & collectionName) const
{
return new Poco::MongoDB::OpMsgCursor(_dbname, collectionName);
}
}
} // namespace Poco::MongoDB

View File

@ -31,6 +31,7 @@ namespace Poco
namespace MongoDB
{
class Array;
class ElementFindByName
{
@ -48,8 +49,8 @@ namespace MongoDB
/// Represents a MongoDB (BSON) document.
{
public:
typedef SharedPtr<Document> Ptr;
typedef std::vector<Document::Ptr> Vector;
using Ptr = SharedPtr<Document>;
using Vector = std::vector<Document::Ptr>;
Document();
/// Creates an empty Document.
@ -86,6 +87,10 @@ namespace MongoDB
/// Unlike the other add methods, this method returns
/// a reference to the new document.
Array & addNewArray(const std::string & name);
/// Create a new array and add it to this document.
/// Method returns a reference to the new array.
void clear();
/// Removes all elements from the document.
@ -95,7 +100,7 @@ namespace MongoDB
bool empty() const;
/// Returns true if the document doesn't contain any documents.
bool exists(const std::string & name);
bool exists(const std::string & name) const;
/// Returns true if the document has an element with the given name.
template <typename T>
@ -158,6 +163,9 @@ namespace MongoDB
/// return an Int64. When the element is not found, a
/// Poco::NotFoundException will be thrown.
bool remove(const std::string & name);
/// Removes an element from the document.
template <typename T>
bool isType(const std::string & name) const
/// Returns true when the type of the element equals the TypeId of ElementTrait.
@ -227,12 +235,23 @@ namespace MongoDB
}
inline bool Document::exists(const std::string & name)
inline bool Document::exists(const std::string & name) const
{
return std::find_if(_elements.begin(), _elements.end(), ElementFindByName(name)) != _elements.end();
}
inline bool Document::remove(const std::string & name)
{
auto it = std::find_if(_elements.begin(), _elements.end(), ElementFindByName(name));
if (it == _elements.end())
return false;
_elements.erase(it);
return true;
}
inline std::size_t Document::size() const
{
return _elements.size();

View File

@ -45,7 +45,7 @@ namespace MongoDB
/// Represents an Element of a Document or an Array.
{
public:
typedef Poco::SharedPtr<Element> Ptr;
using Ptr = Poco::SharedPtr<Element>;
explicit Element(const std::string & name);
/// Creates the Element with the given name.
@ -80,7 +80,7 @@ namespace MongoDB
}
typedef std::list<Element::Ptr> ElementSet;
using ElementSet = std::list<Element::Ptr>;
template <typename T>
@ -266,7 +266,7 @@ namespace MongoDB
}
typedef Nullable<unsigned char> NullValue;
using NullValue = Nullable<unsigned char>;
// BSON Null Value

View File

@ -35,7 +35,7 @@ namespace MongoDB
/// Represents JavaScript type in BSON.
{
public:
typedef SharedPtr<JavaScriptCode> Ptr;
using Ptr = SharedPtr<JavaScriptCode>;
JavaScriptCode();
/// Creates an empty JavaScriptCode object.

View File

@ -28,6 +28,9 @@ namespace MongoDB
{
class Message; // Required to disambiguate friend declaration in MessageHeader.
class MongoDB_API MessageHeader
/// Represents the message header which is always prepended to a
/// MongoDB request or response message.
@ -37,14 +40,18 @@ namespace MongoDB
enum OpCode
{
// Opcodes deprecated in MongoDB 5.0
OP_REPLY = 1,
OP_MSG = 1000,
OP_UPDATE = 2001,
OP_INSERT = 2002,
OP_QUERY = 2004,
OP_GET_MORE = 2005,
OP_DELETE = 2006,
OP_KILL_CURSORS = 2007
OP_KILL_CURSORS = 2007,
/// Opcodes supported in MongoDB 5.1 and later
OP_COMPRESSED = 2012,
OP_MSG = 2013
};
explicit MessageHeader(OpCode);

View File

@ -33,6 +33,13 @@
//
#if defined(_WIN32) && defined(POCO_DLL)
# if defined(MongoDB_EXPORTS)
# define MongoDB_API __declspec(dllexport)
# else
# define MongoDB_API __declspec(dllimport)
# endif
#endif
#if !defined(MongoDB_API)
@ -47,6 +54,11 @@
//
// Automatically link MongoDB library.
//
#if defined(_MSC_VER)
# if !defined(POCO_NO_AUTOMATIC_LIBS) && !defined(MongoDB_EXPORTS)
# pragma comment(lib, "PocoMongoDB" POCO_LIB_SUFFIX)
# endif
#endif
#endif // MongoDBMongoDB_INCLUDED

View File

@ -44,7 +44,7 @@ namespace MongoDB
/// as its value.
{
public:
typedef SharedPtr<ObjectId> Ptr;
using Ptr = SharedPtr<ObjectId>;
explicit ObjectId(const std::string & id);
/// Creates an ObjectId from a string.

View File

@ -0,0 +1,96 @@
//
// OpMsgCursor.h
//
// Library: MongoDB
// Package: MongoDB
// Module: OpMsgCursor
//
// Definition of the OpMsgCursor class.
//
// Copyright (c) 2012, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef MongoDB_OpMsgCursor_INCLUDED
#define MongoDB_OpMsgCursor_INCLUDED
#include "Poco/MongoDB/Connection.h"
#include "Poco/MongoDB/MongoDB.h"
#include "Poco/MongoDB/OpMsgMessage.h"
namespace Poco
{
namespace MongoDB
{
class MongoDB_API OpMsgCursor : public Document
/// OpMsgCursor is an helper class for querying multiple documents using OpMsgMessage.
{
public:
OpMsgCursor(const std::string & dbname, const std::string & collectionName);
/// Creates a OpMsgCursor for the given database and collection.
virtual ~OpMsgCursor();
/// Destroys the OpMsgCursor.
void setEmptyFirstBatch(bool empty);
/// Empty first batch is used to get error response faster with little server processing
bool emptyFirstBatch() const;
void setBatchSize(Int32 batchSize);
/// Set non-default batch size
Int32 batchSize() const;
/// Current batch size (zero or negative number indicates default batch size)
Int64 cursorID() const;
OpMsgMessage & next(Connection & connection);
/// Tries to get the next documents. As long as response message has a
/// cursor ID next can be called to retrieve the next bunch of documents.
///
/// The cursor must be killed (see kill()) when not all documents are needed.
OpMsgMessage & query();
/// Returns the associated query.
void kill(Connection & connection);
/// Kills the cursor and reset it so that it can be reused.
private:
OpMsgMessage _query;
OpMsgMessage _response;
bool _emptyFirstBatch{false};
Int32 _batchSize{-1};
/// Batch size used in the cursor. Zero or negative value means that default shall be used.
Int64 _cursorID{0};
};
//
// inlines
//
inline OpMsgMessage & OpMsgCursor::query()
{
return _query;
}
inline Int64 OpMsgCursor::cursorID() const
{
return _cursorID;
}
}
} // namespace Poco::MongoDB
#endif // MongoDB_OpMsgCursor_INCLUDED

View File

@ -0,0 +1,163 @@
//
// OpMsgMessage.h
//
// Library: MongoDB
// Package: MongoDB
// Module: OpMsgMessage
//
// Definition of the OpMsgMessage class.
//
// Copyright (c) 2022, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef MongoDB_OpMsgMessage_INCLUDED
#define MongoDB_OpMsgMessage_INCLUDED
#include "Poco/MongoDB/Document.h"
#include "Poco/MongoDB/Message.h"
#include "Poco/MongoDB/MongoDB.h"
#include <string>
namespace Poco
{
namespace MongoDB
{
class MongoDB_API OpMsgMessage : public Message
/// This class represents a request/response (OP_MSG) to send requests and receive responses to/from MongoDB.
{
public:
// Constants for most often used MongoDB commands that can be sent using OP_MSG
// For complete list see: https://www.mongodb.com/docs/manual/reference/command/
// Query and write
static const std::string CMD_INSERT;
static const std::string CMD_DELETE;
static const std::string CMD_UPDATE;
static const std::string CMD_FIND;
static const std::string CMD_FIND_AND_MODIFY;
static const std::string CMD_GET_MORE;
// Aggregation
static const std::string CMD_AGGREGATE;
static const std::string CMD_COUNT;
static const std::string CMD_DISTINCT;
static const std::string CMD_MAP_REDUCE;
// Replication and administration
static const std::string CMD_HELLO;
static const std::string CMD_REPL_SET_GET_STATUS;
static const std::string CMD_REPL_SET_GET_CONFIG;
static const std::string CMD_CREATE;
static const std::string CMD_CREATE_INDEXES;
static const std::string CMD_DROP;
static const std::string CMD_DROP_DATABASE;
static const std::string CMD_KILL_CURSORS;
static const std::string CMD_LIST_DATABASES;
static const std::string CMD_LIST_INDEXES;
// Diagnostic
static const std::string CMD_BUILD_INFO;
static const std::string CMD_COLL_STATS;
static const std::string CMD_DB_STATS;
static const std::string CMD_HOST_INFO;
enum Flags : UInt32
{
MSG_FLAGS_DEFAULT = 0,
MSG_CHECKSUM_PRESENT = (1 << 0),
MSG_MORE_TO_COME = (1 << 1),
/// Sender will send another message and is not prepared for overlapping messages
MSG_EXHAUST_ALLOWED = (1 << 16)
/// Client is prepared for multiple replies (using the moreToCome bit) to this request
};
OpMsgMessage();
/// Creates an OpMsgMessage for response.
OpMsgMessage(const std::string & databaseName, const std::string & collectionName, UInt32 flags = MSG_FLAGS_DEFAULT);
/// Creates an OpMsgMessage for requests.
virtual ~OpMsgMessage();
const std::string & databaseName() const;
const std::string & collectionName() const;
void setCommandName(const std::string & command);
/// Sets the command name and clears the command document
void setCursor(Poco::Int64 cursorID, Poco::Int32 batchSize = -1);
/// Sets the command "getMore" for the cursor id with batch size (if it is not negative).
const std::string & commandName() const;
/// Current command name.
void setAcknowledgedRequest(bool ack);
/// Set false to create request that does not return response.
/// It has effect only for commands that write or delete documents.
/// Default is true (request returns acknowledge response).
bool acknowledgedRequest() const;
UInt32 flags() const;
Document & body();
/// Access to body document.
/// Additional query arguments shall be added after setting the command name.
const Document & body() const;
Document::Vector & documents();
/// Documents prepared for request or retrieved in response.
const Document::Vector & documents() const;
/// Documents prepared for request or retrieved in response.
bool responseOk() const;
/// Reads "ok" status from the response message.
void clear();
/// Clears the message.
void send(std::ostream & ostr);
/// Writes the request to stream.
void read(std::istream & istr);
/// Reads the response from the stream.
private:
enum PayloadType : UInt8
{
PAYLOAD_TYPE_0 = 0,
PAYLOAD_TYPE_1 = 1
};
std::string _databaseName;
std::string _collectionName;
UInt32 _flags{MSG_FLAGS_DEFAULT};
std::string _commandName;
bool _acknowledged{true};
Document _body;
Document::Vector _documents;
};
}
} // namespace Poco::MongoDB
#endif // MongoDB_OpMsgMessage_INCLUDED

View File

@ -94,7 +94,23 @@ namespace MongoDB
operator Connection::Ptr() { return _connection; }
#if defined(POCO_ENABLE_CPP11)
// Disable copy to prevent unwanted release of resources: C++11 way
PooledConnection(const PooledConnection &) = delete;
PooledConnection & operator=(const PooledConnection &) = delete;
// Enable move semantics
PooledConnection(PooledConnection && other) = default;
PooledConnection & operator=(PooledConnection &&) = default;
#endif
private:
#if !defined(POCO_ENABLE_CPP11)
// Disable copy to prevent unwanted release of resources: pre C++11 way
PooledConnection(const PooledConnection &);
PooledConnection & operator=(const PooledConnection &);
#endif
Poco::ObjectPool<Connection, Connection::Ptr> & _pool;
Connection::Ptr _connection;
};

View File

@ -33,7 +33,7 @@ namespace MongoDB
/// Represents a regular expression in BSON format.
{
public:
typedef SharedPtr<RegularExpression> Ptr;
using Ptr = SharedPtr<RegularExpression>;
RegularExpression();
/// Creates an empty RegularExpression.

View File

@ -38,6 +38,9 @@ namespace MongoDB
ResponseMessage();
/// Creates an empty ResponseMessage.
ResponseMessage(const Int64 & cursorID);
/// Creates an ResponseMessage for existing cursor ID.
virtual ~ResponseMessage();
/// Destroys the ResponseMessage.

View File

@ -31,7 +31,7 @@ Array::~Array()
}
Element::Ptr Array::get(int pos) const
Element::Ptr Array::get(std::size_t pos) const
{
std::string name = Poco::NumberFormatter::format(pos);
return Document::get(name);

View File

@ -319,4 +319,30 @@ void Connection::sendRequest(RequestMessage& request, ResponseMessage& response)
}
void Connection::sendRequest(OpMsgMessage& request, OpMsgMessage& response)
{
Poco::Net::SocketOutputStream sos(_socket);
request.send(sos);
response.clear();
readResponse(response);
}
void Connection::sendRequest(OpMsgMessage& request)
{
request.setAcknowledgedRequest(false);
Poco::Net::SocketOutputStream sos(_socket);
request.send(sos);
}
void Connection::readResponse(OpMsgMessage& response)
{
Poco::Net::SocketInputStream sis(_socket);
response.read(sis);
}
} } // Poco::MongoDB

View File

@ -33,6 +33,12 @@ Cursor::Cursor(const std::string& fullCollectionName, QueryRequest::Flags flags)
}
Cursor::Cursor(const Document& aggregationResponse) :
_query(aggregationResponse.get<Poco::MongoDB::Document::Ptr>("cursor")->get<std::string>("ns")),
_response(aggregationResponse.get<Poco::MongoDB::Document::Ptr>("cursor")->get<Int64>("id"))
{
}
Cursor::~Cursor()
{
try

View File

@ -334,6 +334,50 @@ bool Database::authSCRAM(Connection& connection, const std::string& username, co
}
Document::Ptr Database::queryBuildInfo(Connection& connection) const
{
// build info can be issued on "config" system database
Poco::SharedPtr<Poco::MongoDB::QueryRequest> request = createCommand();
request->selector().add("buildInfo", 1);
Poco::MongoDB::ResponseMessage response;
connection.sendRequest(*request, response);
Document::Ptr buildInfo;
if ( response.documents().size() > 0 )
{
buildInfo = response.documents()[0];
}
else
{
throw Poco::ProtocolException("Didn't get a response from the buildinfo command");
}
return buildInfo;
}
Document::Ptr Database::queryServerHello(Connection& connection) const
{
// hello can be issued on "config" system database
Poco::SharedPtr<Poco::MongoDB::QueryRequest> request = createCommand();
request->selector().add("hello", 1);
Poco::MongoDB::ResponseMessage response;
connection.sendRequest(*request, response);
Document::Ptr hello;
if ( response.documents().size() > 0 )
{
hello = response.documents()[0];
}
else
{
throw Poco::ProtocolException("Didn't get a response from the hello command");
}
return hello;
}
Int64 Database::count(Connection& connection, const std::string& collectionName) const
{
Poco::SharedPtr<Poco::MongoDB::QueryRequest> countRequest = createCountRequest(collectionName);
@ -390,7 +434,7 @@ Document::Ptr Database::getLastErrorDoc(Connection& connection) const
{
Document::Ptr errorDoc;
Poco::SharedPtr<Poco::MongoDB::QueryRequest> request = createQueryRequest("$cmd");
Poco::SharedPtr<Poco::MongoDB::QueryRequest> request = createCommand();
request->setNumberToReturn(1);
request->selector().add("getLastError", 1);
@ -420,7 +464,7 @@ std::string Database::getLastError(Connection& connection) const
Poco::SharedPtr<Poco::MongoDB::QueryRequest> Database::createCountRequest(const std::string& collectionName) const
{
Poco::SharedPtr<Poco::MongoDB::QueryRequest> request = createQueryRequest("$cmd");
Poco::SharedPtr<Poco::MongoDB::QueryRequest> request = createCommand();
request->setNumberToReturn(1);
request->selector().add("count", collectionName);
return request;

View File

@ -35,6 +35,14 @@ Document::~Document()
}
Array& Document::addNewArray(const std::string& name)
{
Array::Ptr newArray = new Array();
add(name, newArray);
return *newArray;
}
Element::Ptr Document::get(const std::string& name) const
{
Element::Ptr element;
@ -198,7 +206,7 @@ void Document::write(BinaryWriter& writer)
else
{
std::stringstream sstream;
Poco::BinaryWriter tempWriter(sstream);
Poco::BinaryWriter tempWriter(sstream, BinaryWriter::LITTLE_ENDIAN_BYTE_ORDER);
for (ElementSet::iterator it = _elements.begin(); it != _elements.end(); ++it)
{
tempWriter << static_cast<unsigned char>((*it)->type());

View File

@ -42,7 +42,7 @@ void MessageHeader::read(BinaryReader& reader)
Int32 opCode;
reader >> opCode;
_opCode = (OpCode) opCode;
_opCode = static_cast<OpCode>(opCode);
if (!reader.good())
{
@ -56,7 +56,7 @@ void MessageHeader::write(BinaryWriter& writer)
writer << _messageLength;
writer << _requestID;
writer << _responseTo;
writer << (Int32) _opCode;
writer << static_cast<Int32>(_opCode);
}

View File

@ -0,0 +1,187 @@
//
// OpMsgCursor.cpp
//
// Library: MongoDB
// Package: MongoDB
// Module: OpMsgCursor
//
// Copyright (c) 2022, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#include "Poco/MongoDB/OpMsgCursor.h"
#include "Poco/MongoDB/Array.h"
//
// NOTE:
//
// MongoDB specification indicates that the flag MSG_EXHAUST_ALLOWED shall be
// used in the request when the receiver is ready to receive multiple messages
// without sending additional requests in between. Sender (MongoDB) indicates
// that more messages follow with flag MSG_MORE_TO_COME.
//
// It seems that this does not work properly. MSG_MORE_TO_COME is set and reading
// next messages sometimes works, however often the data is missing in response
// or the message header contains wrong message length and reading blocks.
// Opcode in the header is correct.
//
// Using MSG_EXHAUST_ALLOWED is therefore currently disabled.
//
// It seems that related JIRA ticket is:
//
// https://jira.mongodb.org/browse/SERVER-57297
//
// https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst
//
#define MONGODB_EXHAUST_ALLOWED_WORKS false
namespace Poco {
namespace MongoDB {
static const std::string keyCursor {"cursor"};
static const std::string keyFirstBatch {"firstBatch"};
static const std::string keyNextBatch {"nextBatch"};
static Poco::Int64 cursorIdFromResponse(const MongoDB::Document& doc);
OpMsgCursor::OpMsgCursor(const std::string& db, const std::string& collection):
#if MONGODB_EXHAUST_ALLOWED_WORKS
_query(db, collection, OpMsgMessage::MSG_EXHAUST_ALLOWED)
#else
_query(db, collection)
#endif
{
}
OpMsgCursor::~OpMsgCursor()
{
try
{
poco_assert_dbg(_cursorID == 0);
}
catch (...)
{
}
}
void OpMsgCursor::setEmptyFirstBatch(bool empty)
{
_emptyFirstBatch = empty;
}
bool OpMsgCursor::emptyFirstBatch() const
{
return _emptyFirstBatch;
}
void OpMsgCursor::setBatchSize(Int32 batchSize)
{
_batchSize = batchSize;
}
Int32 OpMsgCursor::batchSize() const
{
return _batchSize;
}
OpMsgMessage& OpMsgCursor::next(Connection& connection)
{
if (_cursorID == 0)
{
_response.clear();
if (_emptyFirstBatch || _batchSize > 0)
{
Int32 bsize = _emptyFirstBatch ? 0 : _batchSize;
if (_query.commandName() == OpMsgMessage::CMD_FIND)
{
_query.body().add("batchSize", bsize);
}
else if (_query.commandName() == OpMsgMessage::CMD_AGGREGATE)
{
auto& cursorDoc = _query.body().addNewDocument("cursor");
cursorDoc.add("batchSize", bsize);
}
}
connection.sendRequest(_query, _response);
const auto& rdoc = _response.body();
_cursorID = cursorIdFromResponse(rdoc);
}
else
{
#if MONGODB_EXHAUST_ALLOWED_WORKS
std::cout << "Response flags: " << _response.flags() << std::endl;
if (_response.flags() & OpMsgMessage::MSG_MORE_TO_COME)
{
std::cout << "More to come. Reading more response: " << std::endl;
_response.clear();
connection.readResponse(_response);
}
else
#endif
{
_response.clear();
_query.setCursor(_cursorID, _batchSize);
connection.sendRequest(_query, _response);
}
}
const auto& rdoc = _response.body();
_cursorID = cursorIdFromResponse(rdoc);
return _response;
}
void OpMsgCursor::kill(Connection& connection)
{
_response.clear();
if (_cursorID != 0)
{
_query.setCommandName(OpMsgMessage::CMD_KILL_CURSORS);
MongoDB::Array::Ptr cursors = new MongoDB::Array();
cursors->add<Poco::Int64>(_cursorID);
_query.body().add("cursors", cursors);
connection.sendRequest(_query, _response);
const auto killed = _response.body().get<MongoDB::Array::Ptr>("cursorsKilled", nullptr);
if (!killed || killed->size() != 1 || killed->get<Poco::Int64>(0, -1) != _cursorID)
{
throw Poco::ProtocolException("Cursor not killed as expected: " + std::to_string(_cursorID));
}
_cursorID = 0;
_query.clear();
_response.clear();
}
}
Poco::Int64 cursorIdFromResponse(const MongoDB::Document& doc)
{
Poco::Int64 id {0};
auto cursorDoc = doc.get<Document::Ptr>(keyCursor, nullptr);
if(cursorDoc)
{
id = cursorDoc->get<Poco::Int64>("id", 0);
}
return id;
}
} } // Namespace Poco::MongoDB

View File

@ -0,0 +1,412 @@
//
// OpMsgMessage.cpp
//
// Library: MongoDB
// Package: MongoDB
// Module: OpMsgMessage
//
// Copyright (c) 2022, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#include "Poco/MongoDB/OpMsgMessage.h"
#include "Poco/MongoDB/MessageHeader.h"
#include "Poco/MongoDB/Array.h"
#include "Poco/StreamCopier.h"
#include "Poco/Logger.h"
#define POCO_MONGODB_DUMP false
namespace Poco {
namespace MongoDB {
// Query and write
const std::string OpMsgMessage::CMD_INSERT { "insert" };
const std::string OpMsgMessage::CMD_DELETE { "delete" };
const std::string OpMsgMessage::CMD_UPDATE { "update" };
const std::string OpMsgMessage::CMD_FIND { "find" };
const std::string OpMsgMessage::CMD_FIND_AND_MODIFY { "findAndModify" };
const std::string OpMsgMessage::CMD_GET_MORE { "getMore" };
// Aggregation
const std::string OpMsgMessage::CMD_AGGREGATE { "aggregate" };
const std::string OpMsgMessage::CMD_COUNT { "count" };
const std::string OpMsgMessage::CMD_DISTINCT { "distinct" };
const std::string OpMsgMessage::CMD_MAP_REDUCE { "mapReduce" };
// Replication and administration
const std::string OpMsgMessage::CMD_HELLO { "hello" };
const std::string OpMsgMessage::CMD_REPL_SET_GET_STATUS { "replSetGetStatus" };
const std::string OpMsgMessage::CMD_REPL_SET_GET_CONFIG { "replSetGetConfig" };
const std::string OpMsgMessage::CMD_CREATE { "create" };
const std::string OpMsgMessage::CMD_CREATE_INDEXES { "createIndexes" };
const std::string OpMsgMessage::CMD_DROP { "drop" };
const std::string OpMsgMessage::CMD_DROP_DATABASE { "dropDatabase" };
const std::string OpMsgMessage::CMD_KILL_CURSORS { "killCursors" };
const std::string OpMsgMessage::CMD_LIST_DATABASES { "listDatabases" };
const std::string OpMsgMessage::CMD_LIST_INDEXES { "listIndexes" };
// Diagnostic
const std::string OpMsgMessage::CMD_BUILD_INFO { "buildInfo" };
const std::string OpMsgMessage::CMD_COLL_STATS { "collStats" };
const std::string OpMsgMessage::CMD_DB_STATS { "dbStats" };
const std::string OpMsgMessage::CMD_HOST_INFO { "hostInfo" };
static const std::string& commandIdentifier(const std::string& command);
/// Commands have different names for the payload that is sent in a separate section
static const std::string keyCursor {"cursor"};
static const std::string keyFirstBatch {"firstBatch"};
static const std::string keyNextBatch {"nextBatch"};
OpMsgMessage::OpMsgMessage() :
Message(MessageHeader::OP_MSG)
{
}
OpMsgMessage::OpMsgMessage(const std::string& databaseName, const std::string& collectionName, UInt32 flags) :
Message(MessageHeader::OP_MSG),
_databaseName(databaseName),
_collectionName(collectionName),
_flags(flags)
{
}
OpMsgMessage::~OpMsgMessage()
{
}
const std::string& OpMsgMessage::databaseName() const
{
return _databaseName;
}
const std::string& OpMsgMessage::collectionName() const
{
return _collectionName;
}
void OpMsgMessage::setCommandName(const std::string& command)
{
_commandName = command;
_body.clear();
// IMPORTANT: Command name must be first
if (_collectionName.empty())
{
// Collection is not specified. It is assumed that this particular command does
// not need it.
_body.add(_commandName, Int32(1));
}
else
{
_body.add(_commandName, _collectionName);
}
_body.add("$db", _databaseName);
}
void OpMsgMessage::setCursor(Poco::Int64 cursorID, Poco::Int32 batchSize)
{
_commandName = OpMsgMessage::CMD_GET_MORE;
_body.clear();
// IMPORTANT: Command name must be first
_body.add(_commandName, cursorID);
_body.add("$db", _databaseName);
_body.add("collection", _collectionName);
if (batchSize > 0)
{
_body.add("batchSize", batchSize);
}
}
const std::string& OpMsgMessage::commandName() const
{
return _commandName;
}
void OpMsgMessage::setAcknowledgedRequest(bool ack)
{
const auto& id = commandIdentifier(_commandName);
if (id.empty())
return;
_acknowledged = ack;
auto writeConcern = _body.get<Document::Ptr>("writeConcern", nullptr);
if (writeConcern)
writeConcern->remove("w");
if (ack)
{
_flags = _flags & (~MSG_MORE_TO_COME);
}
else
{
_flags = _flags | MSG_MORE_TO_COME;
if (!writeConcern)
_body.addNewDocument("writeConcern").add("w", 0);
else
writeConcern->add("w", 0);
}
}
bool OpMsgMessage::acknowledgedRequest() const
{
return _acknowledged;
}
UInt32 OpMsgMessage::flags() const
{
return _flags;
}
Document& OpMsgMessage::body()
{
return _body;
}
const Document& OpMsgMessage::body() const
{
return _body;
}
Document::Vector& OpMsgMessage::documents()
{
return _documents;
}
const Document::Vector& OpMsgMessage::documents() const
{
return _documents;
}
bool OpMsgMessage::responseOk() const
{
Poco::Int64 ok {false};
if (_body.exists("ok"))
{
ok = _body.getInteger("ok");
}
return (ok != 0);
}
void OpMsgMessage::clear()
{
_flags = MSG_FLAGS_DEFAULT;
_commandName.clear();
_body.clear();
_documents.clear();
}
void OpMsgMessage::send(std::ostream& ostr)
{
BinaryWriter socketWriter(ostr, BinaryWriter::LITTLE_ENDIAN_BYTE_ORDER);
// Serialise the body
std::stringstream ss;
BinaryWriter writer(ss, BinaryWriter::LITTLE_ENDIAN_BYTE_ORDER);
writer << _flags;
writer << PAYLOAD_TYPE_0;
_body.write(writer);
if (!_documents.empty())
{
// Serialise attached documents
std::stringstream ssdoc;
BinaryWriter wdoc(ssdoc, BinaryWriter::LITTLE_ENDIAN_BYTE_ORDER);
for (auto& doc: _documents)
{
doc->write(wdoc);
}
wdoc.flush();
const std::string& identifier = commandIdentifier(_commandName);
const Poco::Int32 size = static_cast<Poco::Int32>(sizeof(size) + identifier.size() + 1 + ssdoc.tellp());
writer << PAYLOAD_TYPE_1;
writer << size;
writer.writeCString(identifier.c_str());
StreamCopier::copyStream(ssdoc, ss);
}
writer.flush();
#if POCO_MONGODB_DUMP
const std::string section = ss.str();
std::string dump;
Logger::formatDump(dump, section.data(), section.length());
std::cout << dump << std::endl;
#endif
messageLength(static_cast<Poco::Int32>(ss.tellp()));
_header.write(socketWriter);
StreamCopier::copyStream(ss, ostr);
ostr.flush();
}
void OpMsgMessage::read(std::istream& istr)
{
std::string message;
{
BinaryReader reader(istr, BinaryReader::LITTLE_ENDIAN_BYTE_ORDER);
_header.read(reader);
poco_assert_dbg(_header.opCode() == _header.OP_MSG);
const std::streamsize remainingSize {_header.getMessageLength() - _header.MSG_HEADER_SIZE };
message.reserve(remainingSize);
#if POCO_MONGODB_DUMP
std::cout
<< "Message hdr: " << _header.getMessageLength() << " " << remainingSize << " "
<< _header.opCode() << " " << _header.getRequestID() << " " << _header.responseTo()
<< std::endl;
#endif
reader.readRaw(remainingSize, message);
#if POCO_MONGODB_DUMP
std::string dump;
Logger::formatDump(dump, message.data(), message.length());
std::cout << dump << std::endl;
#endif
}
// Read complete message and then interpret it.
std::istringstream msgss(message);
BinaryReader reader(msgss, BinaryReader::LITTLE_ENDIAN_BYTE_ORDER);
Poco::UInt8 payloadType {0xFF};
reader >> _flags;
reader >> payloadType;
poco_assert_dbg(payloadType == PAYLOAD_TYPE_0);
_body.read(reader);
// Read next sections from the buffer
while (msgss.good())
{
// NOTE: Not tested yet with database, because it returns everything in the body.
// Does MongoDB ever return documents as Payload type 1?
reader >> payloadType;
if (!msgss.good())
{
break;
}
poco_assert_dbg(payloadType == PAYLOAD_TYPE_1);
#if POCO_MONGODB_DUMP
std::cout << "section payload: " << payloadType << std::endl;
#endif
Poco::Int32 sectionSize {0};
reader >> sectionSize;
poco_assert_dbg(sectionSize > 0);
#if POCO_MONGODB_DUMP
std::cout << "section size: " << sectionSize << std::endl;
#endif
std::streamoff offset = sectionSize - sizeof(sectionSize);
std::streampos endOfSection = msgss.tellg() + offset;
std::string identifier;
reader.readCString(identifier);
#if POCO_MONGODB_DUMP
std::cout << "section identifier: " << identifier << std::endl;
#endif
// Loop to read documents from this section.
while (msgss.tellg() < endOfSection)
{
#if POCO_MONGODB_DUMP
std::cout << "section doc: " << msgss.tellg() << " " << endOfSection << std::endl;
#endif
Document::Ptr doc = new Document();
doc->read(reader);
_documents.push_back(doc);
if (msgss.tellg() < 0)
{
break;
}
}
}
// Extract documents from the cursor batch if they are there.
MongoDB::Array::Ptr batch;
auto curDoc = _body.get<MongoDB::Document::Ptr>(keyCursor, nullptr);
if (curDoc)
{
batch = curDoc->get<MongoDB::Array::Ptr>(keyFirstBatch, nullptr);
if (!batch)
{
batch = curDoc->get<MongoDB::Array::Ptr>(keyNextBatch, nullptr);
}
}
if (batch)
{
for(std::size_t i = 0; i < batch->size(); i++)
{
const auto& d = batch->get<MongoDB::Document::Ptr>(i, nullptr);
if (d)
{
_documents.push_back(d);
}
}
}
}
const std::string& commandIdentifier(const std::string& command)
{
// Names of identifiers for commands that send bulk documents in the request
// The identifier is set in the section type 1.
static std::map<std::string, std::string> identifiers {
{ OpMsgMessage::CMD_INSERT, "documents" },
{ OpMsgMessage::CMD_DELETE, "deletes" },
{ OpMsgMessage::CMD_UPDATE, "updates" },
// Not sure if create index can send document section
{ OpMsgMessage::CMD_CREATE_INDEXES, "indexes" }
};
const auto i = identifiers.find(command);
if (i != identifiers.end())
{
return i->second;
}
// This likely means that documents are incorrectly set for a command
// that does not send list of documents in section type 1.
static const std::string emptyIdentifier;
return emptyIdentifier;
}
} } // namespace Poco::MongoDB

View File

@ -35,7 +35,7 @@ RequestMessage::~RequestMessage()
void RequestMessage::send(std::ostream& ostr)
{
std::stringstream ss;
BinaryWriter requestWriter(ss);
BinaryWriter requestWriter(ss, BinaryWriter::LITTLE_ENDIAN_BYTE_ORDER);
buildRequest(requestWriter);
requestWriter.flush();

View File

@ -30,6 +30,16 @@ ResponseMessage::ResponseMessage():
}
ResponseMessage::ResponseMessage(const Int64& cursorID):
Message(MessageHeader::OP_REPLY),
_responseFlags(0),
_cursorID(cursorID),
_startingFrom(0),
_numberReturned(0)
{
}
ResponseMessage::~ResponseMessage()
{
}

View File

@ -170,7 +170,7 @@ MongoDBDictionarySource::~MongoDBDictionarySource() = default;
QueryPipeline MongoDBDictionarySource::loadAll()
{
return QueryPipeline(std::make_shared<MongoDBSource>(connection, createCursor(db, collection, sample_block), sample_block, max_block_size));
return QueryPipeline(std::make_shared<MongoDBSource>(connection, db, collection, Poco::MongoDB::Document{}, sample_block, max_block_size));
}
QueryPipeline MongoDBDictionarySource::loadIds(const std::vector<UInt64> & ids)
@ -178,7 +178,7 @@ QueryPipeline MongoDBDictionarySource::loadIds(const std::vector<UInt64> & ids)
if (!dict_struct.id)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'id' is required for selective loading");
auto cursor = createCursor(db, collection, sample_block);
Poco::MongoDB::Document query;
/** NOTE: While building array, Poco::MongoDB requires passing of different unused element names, along with values.
* In general, Poco::MongoDB is quite inefficient and bulky.
@ -188,9 +188,9 @@ QueryPipeline MongoDBDictionarySource::loadIds(const std::vector<UInt64> & ids)
for (const UInt64 id : ids)
ids_array->add(DB::toString(id), static_cast<Int32>(id));
cursor->query().selector().addNewDocument(dict_struct.id->name).add("$in", ids_array);
query.addNewDocument(dict_struct.id->name).add("$in", ids_array);
return QueryPipeline(std::make_shared<MongoDBSource>(connection, std::move(cursor), sample_block, max_block_size));
return QueryPipeline(std::make_shared<MongoDBSource>(connection, db, collection, query, sample_block, max_block_size));
}
@ -199,8 +199,7 @@ QueryPipeline MongoDBDictionarySource::loadKeys(const Columns & key_columns, con
if (!dict_struct.key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'key' is required for selective loading");
auto cursor = createCursor(db, collection, sample_block);
Poco::MongoDB::Document query;
Poco::MongoDB::Array::Ptr keys_array(new Poco::MongoDB::Array);
for (const auto row_idx : requested_rows)
@ -254,9 +253,9 @@ QueryPipeline MongoDBDictionarySource::loadKeys(const Columns & key_columns, con
}
/// If more than one key we should use $or
cursor->query().selector().add("$or", keys_array);
query.add("$or", keys_array);
return QueryPipeline(std::make_shared<MongoDBSource>(connection, std::move(cursor), sample_block, max_block_size));
return QueryPipeline(std::make_shared<MongoDBSource>(connection, db, collection, query, sample_block, max_block_size));
}
std::string MongoDBDictionarySource::toString() const

View File

@ -16,7 +16,6 @@ namespace Util
namespace MongoDB
{
class Connection;
class Cursor;
}
}

View File

@ -3,10 +3,12 @@
#include <string>
#include <vector>
#include <Poco/MongoDB/Array.h>
#include <Poco/MongoDB/Database.h>
#include <Poco/MongoDB/Connection.h>
#include <Poco/MongoDB/Cursor.h>
#include <Poco/MongoDB/OpMsgCursor.h>
#include <Poco/MongoDB/ObjectId.h>
#include <Poco/MongoDB/Array.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
@ -365,27 +367,79 @@ namespace
}
std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select)
bool isMongoDBWireProtocolOld(Poco::MongoDB::Connection & connection_)
{
auto cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection);
Poco::MongoDB::Database db("config");
Poco::MongoDB::Document::Ptr doc = db.queryServerHello(connection_);
auto wire_version = doc->getInteger("maxWireVersion");
return wire_version < Poco::MongoDB::Database::WireVersion::VER_36;
}
MongoDBCursor::MongoDBCursor(
const std::string & database,
const std::string & collection,
const Block & sample_block_to_select,
const Poco::MongoDB::Document & query,
Poco::MongoDB::Connection & connection)
: is_wire_protocol_old(isMongoDBWireProtocolOld(connection))
{
Poco::MongoDB::Document projection;
/// Looks like selecting _id column is implicit by default.
if (!sample_block_to_select.has("_id"))
cursor->query().returnFieldSelector().add("_id", 0);
projection.add("_id", 0);
for (const auto & column : sample_block_to_select)
cursor->query().returnFieldSelector().add(column.name, 1);
return cursor;
projection.add(column.name, 1);
if (is_wire_protocol_old)
{
old_cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection);
old_cursor->query().selector() = query;
old_cursor->query().returnFieldSelector() = projection;
}
else
{
new_cursor = std::make_unique<Poco::MongoDB::OpMsgCursor>(database, collection);
new_cursor->query().setCommandName(Poco::MongoDB::OpMsgMessage::CMD_FIND);
new_cursor->query().body().addNewDocument("filter") = query;
new_cursor->query().body().addNewDocument("projection") = projection;
}
}
Poco::MongoDB::Document::Vector MongoDBCursor::nextDocuments(Poco::MongoDB::Connection & connection)
{
if (is_wire_protocol_old)
{
auto response = old_cursor->next(connection);
cursor_id = response.cursorID();
return std::move(response.documents());
}
else
{
auto response = new_cursor->next(connection);
cursor_id = new_cursor->cursorID();
return std::move(response.documents());
}
}
Int64 MongoDBCursor::cursorID() const
{
return cursor_id;
}
MongoDBSource::MongoDBSource(
std::shared_ptr<Poco::MongoDB::Connection> & connection_,
std::unique_ptr<Poco::MongoDB::Cursor> cursor_,
const String & database_name_,
const String & collection_name_,
const Poco::MongoDB::Document & query_,
const Block & sample_block,
UInt64 max_block_size_)
: ISource(sample_block.cloneEmpty())
, connection(connection_)
, cursor{std::move(cursor_)}
, cursor(database_name_, collection_name_, sample_block, query_, *connection_)
, max_block_size{max_block_size_}
{
description.init(sample_block);
@ -412,9 +466,9 @@ Chunk MongoDBSource::generate()
size_t num_rows = 0;
while (num_rows < max_block_size)
{
Poco::MongoDB::ResponseMessage & response = cursor->next(*connection);
auto documents = cursor.nextDocuments(*connection);
for (auto & document : response.documents())
for (auto & document : documents)
{
if (document->exists("ok") && document->exists("$err")
&& document->exists("code") && document->getInteger("ok") == 0)
@ -458,7 +512,7 @@ Chunk MongoDBSource::generate()
}
}
if (response.cursorID() == 0)
if (cursor.cursorID() == 0)
{
all_read = true;
break;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Poco/MongoDB/Element.h>
#include <Poco/MongoDB/Array.h>
#include <Core/Block.h>
#include <Processors/ISource.h>
@ -14,7 +15,9 @@ namespace Poco
namespace MongoDB
{
class Connection;
class Document;
class Cursor;
class OpMsgCursor;
}
}
@ -30,7 +33,28 @@ struct MongoDBArrayInfo
void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password);
std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select);
bool isMongoDBWireProtocolOld(Poco::MongoDB::Connection & connection_);
class MongoDBCursor
{
public:
MongoDBCursor(
const std::string & database,
const std::string & collection,
const Block & sample_block_to_select,
const Poco::MongoDB::Document & query,
Poco::MongoDB::Connection & connection);
Poco::MongoDB::Document::Vector nextDocuments(Poco::MongoDB::Connection & connection);
Int64 cursorID() const;
private:
const bool is_wire_protocol_old;
std::unique_ptr<Poco::MongoDB::Cursor> old_cursor;
std::unique_ptr<Poco::MongoDB::OpMsgCursor> new_cursor;
Int64 cursor_id = 0;
};
/// Converts MongoDB Cursor to a stream of Blocks
class MongoDBSource final : public ISource
@ -38,7 +62,9 @@ class MongoDBSource final : public ISource
public:
MongoDBSource(
std::shared_ptr<Poco::MongoDB::Connection> & connection_,
std::unique_ptr<Poco::MongoDB::Cursor> cursor_,
const String & database_name_,
const String & collection_name_,
const Poco::MongoDB::Document & query_,
const Block & sample_block,
UInt64 max_block_size_);
@ -50,7 +76,7 @@ private:
Chunk generate() override;
std::shared_ptr<Poco::MongoDB::Connection> connection;
std::unique_ptr<Poco::MongoDB::Cursor> cursor;
MongoDBCursor cursor;
const UInt64 max_block_size;
ExternalResultDescription description;
bool all_read = false;

View File

@ -99,6 +99,7 @@ public:
, db_name(db_name_)
, metadata_snapshot{metadata_snapshot_}
, connection(connection_)
, is_wire_protocol_old(isMongoDBWireProtocolOld(*connection_))
{
}
@ -107,7 +108,7 @@ public:
void consume(Chunk chunk) override
{
Poco::MongoDB::Database db(db_name);
Poco::MongoDB::Document::Ptr index = new Poco::MongoDB::Document();
Poco::MongoDB::Document::Vector documents;
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
@ -118,27 +119,44 @@ public:
const auto data_types = block.getDataTypes();
const auto data_names = block.getNames();
std::vector<std::string> row(num_cols);
documents.reserve(num_rows);
for (const auto i : collections::range(0, num_rows))
{
Poco::MongoDB::Document::Ptr document = new Poco::MongoDB::Document();
for (const auto j : collections::range(0, num_cols))
{
WriteBufferFromOwnString ostr;
data_types[j]->getDefaultSerialization()->serializeText(*columns[j], i, ostr, FormatSettings{});
row[j] = ostr.str();
index->add(data_names[j], row[j]);
document->add(data_names[j], ostr.str());
}
documents.push_back(std::move(document));
}
if (is_wire_protocol_old)
{
Poco::SharedPtr<Poco::MongoDB::InsertRequest> insert_request = db.createInsertRequest(collection_name);
insert_request->documents().push_back(index);
insert_request->documents() = std::move(documents);
connection->sendRequest(*insert_request);
}
else
{
Poco::SharedPtr<Poco::MongoDB::OpMsgMessage> insert_request = db.createOpMsgMessage(collection_name);
insert_request->setCommandName(Poco::MongoDB::OpMsgMessage::CMD_INSERT);
insert_request->documents() = std::move(documents);
connection->sendRequest(*insert_request);
}
}
private:
String collection_name;
String db_name;
StorageMetadataPtr metadata_snapshot;
std::shared_ptr<Poco::MongoDB::Connection> connection;
const bool is_wire_protocol_old;
};
@ -162,7 +180,7 @@ Pipe StorageMongoDB::read(
sample_block.insert({ column_data.type, column_data.name });
}
return Pipe(std::make_shared<MongoDBSource>(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size));
return Pipe(std::make_shared<MongoDBSource>(connection, database_name, collection_name, Poco::MongoDB::Document{}, sample_block, max_block_size));
}
SinkToStoragePtr StorageMongoDB::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr /* context */, bool /*async_insert*/)

View File

@ -71,6 +71,40 @@ def test_simple_select(started_cluster):
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_simple_select_from_view(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
simple_mongo_table = db["simple_table"]
data = []
for i in range(0, 100):
data.append({"key": i, "data": hex(i * i)})
simple_mongo_table.insert_many(data)
simple_mongo_table_view = db.create_collection(
"simple_table_view", viewOn="simple_table"
)
node = started_cluster.instances["node"]
node.query(
"CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table_view', 'root', 'clickhouse')"
)
assert node.query("SELECT COUNT() FROM simple_mongo_table") == "100\n"
assert (
node.query("SELECT sum(key) FROM simple_mongo_table")
== str(sum(range(0, 100))) + "\n"
)
assert (
node.query("SELECT data from simple_mongo_table where key = 42")
== hex(42 * 42) + "\n"
)
node.query("DROP TABLE simple_mongo_table")
simple_mongo_table_view.drop()
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_arrays(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
@ -411,13 +445,16 @@ def test_simple_insert_select(started_cluster):
node.query(
"CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse')"
)
node.query("INSERT INTO simple_mongo_table SELECT 1, 'kek'")
node.query(
"INSERT INTO simple_mongo_table SELECT number, 'kek' || toString(number) FROM numbers(10)"
)
assert (
node.query("SELECT data from simple_mongo_table where key = 1").strip() == "kek"
node.query("SELECT data from simple_mongo_table where key = 7").strip()
== "kek7"
)
node.query("INSERT INTO simple_mongo_table(key) SELECT 12")
assert int(node.query("SELECT count() from simple_mongo_table")) == 2
assert int(node.query("SELECT count() from simple_mongo_table")) == 11
assert (
node.query("SELECT data from simple_mongo_table where key = 12").strip() == ""
)