#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int MONGODB_CANNOT_AUTHENTICATE; } StorageMongoDB::StorageMongoDB( const StorageID & table_id_, const std::string & host_, uint16_t port_, const std::string & database_name_, const std::string & collection_name_, const std::string & username_, const std::string & password_, const std::string & options_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const String & comment) : IStorage(table_id_) , database_name(database_name_) , collection_name(collection_name_) , username(username_) , password(password_) , uri("mongodb://" + host_ + ":" + std::to_string(port_) + "/" + database_name_ + "?" + options_) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); storage_metadata.setConstraints(constraints_); storage_metadata.setComment(comment); setInMemoryMetadata(storage_metadata); } void StorageMongoDB::connectIfNotConnected() { std::lock_guard lock{connection_mutex}; if (!connection) { StorageMongoDBSocketFactory factory; connection = std::make_shared(uri, factory); } if (!authenticated) { Poco::URI poco_uri(uri); auto query_params = poco_uri.getQueryParameters(); auto auth_source = std::find_if(query_params.begin(), query_params.end(), [&](const std::pair & param) { return param.first == "authSource"; }); auto auth_db = database_name; if (auth_source != query_params.end()) auth_db = auth_source->second; if (!username.empty() && !password.empty()) { Poco::MongoDB::Database poco_db(auth_db); if (!poco_db.authenticate(*connection, username, password, Poco::MongoDB::Database::AUTH_SCRAM_SHA1)) throw Exception(ErrorCodes::MONGODB_CANNOT_AUTHENTICATE, "Cannot authenticate in MongoDB, incorrect user or password"); } authenticated = true; } } class StorageMongoDBSink : public SinkToStorage { public: explicit StorageMongoDBSink( const std::string & collection_name_, const std::string & db_name_, const StorageMetadataPtr & metadata_snapshot_, std::shared_ptr connection_) : SinkToStorage(metadata_snapshot_->getSampleBlock()) , collection_name(collection_name_) , db_name(db_name_) , metadata_snapshot{metadata_snapshot_} , connection(connection_) , is_wire_protocol_old(isMongoDBWireProtocolOld(*connection_)) { } String getName() const override { return "StorageMongoDBSink"; } void consume(Chunk chunk) override { Poco::MongoDB::Database db(db_name); Poco::MongoDB::Document::Vector documents; auto block = getHeader().cloneWithColumns(chunk.detachColumns()); size_t num_rows = block.rows(); size_t num_cols = block.columns(); const auto columns = block.getColumns(); const auto data_types = block.getDataTypes(); const auto data_names = block.getNames(); documents.reserve(num_rows); for (const auto i : collections::range(0, num_rows)) { Poco::MongoDB::Document::Ptr document = new Poco::MongoDB::Document(); for (const auto j : collections::range(0, num_cols)) { insertValueIntoMongoDB(*document, data_names[j], *data_types[j], *columns[j], i); } documents.push_back(std::move(document)); } if (is_wire_protocol_old) { Poco::SharedPtr insert_request = db.createInsertRequest(collection_name); insert_request->documents() = std::move(documents); connection->sendRequest(*insert_request); } else { Poco::SharedPtr insert_request = db.createOpMsgMessage(collection_name); insert_request->setCommandName(Poco::MongoDB::OpMsgMessage::CMD_INSERT); insert_request->documents() = std::move(documents); connection->sendRequest(*insert_request); } } private: void insertValueIntoMongoDB( Poco::MongoDB::Document & document, const std::string & name, const IDataType & data_type, const IColumn & column, size_t idx) { WhichDataType which(data_type); if (which.isArray()) { const ColumnArray & column_array = assert_cast(column); const ColumnArray::Offsets & offsets = column_array.getOffsets(); size_t offset = offsets[idx - 1]; size_t next_offset = offsets[idx]; const IColumn & nested_column = column_array.getData(); const auto * array_type = assert_cast(&data_type); const DataTypePtr & nested_type = array_type->getNestedType(); Poco::MongoDB::Array::Ptr array = new Poco::MongoDB::Array(); for (size_t i = 0; i + offset < next_offset; ++i) { insertValueIntoMongoDB(*array, Poco::NumberFormatter::format(i), *nested_type, nested_column, i + offset); } document.add(name, array); return; } /// MongoDB does not support UInt64 type, so just cast it to Int64 if (which.isNativeUInt()) document.add(name, static_cast(column.getUInt(idx))); else if (which.isNativeInt()) document.add(name, static_cast(column.getInt(idx))); else if (which.isFloat32()) document.add(name, static_cast(column.getFloat32(idx))); else if (which.isFloat64()) document.add(name, static_cast(column.getFloat64(idx))); else if (which.isDate()) document.add(name, Poco::Timestamp(DateLUT::instance().fromDayNum(DayNum(column.getUInt(idx))) * 1000000)); else if (which.isDateTime()) document.add(name, Poco::Timestamp(column.getUInt(idx) * 1000000)); else { WriteBufferFromOwnString ostr; data_type.getDefaultSerialization()->serializeText(column, idx, ostr, FormatSettings{}); document.add(name, ostr.str()); } } String collection_name; String db_name; StorageMetadataPtr metadata_snapshot; std::shared_ptr connection; const bool is_wire_protocol_old; }; Pipe StorageMongoDB::read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & /*query_info*/, ContextPtr /*context*/, QueryProcessingStage::Enum /*processed_stage*/, size_t max_block_size, size_t /*num_streams*/) { connectIfNotConnected(); storage_snapshot->check(column_names); Block sample_block; for (const String & column_name : column_names) { auto column_data = storage_snapshot->metadata->getColumns().getPhysical(column_name); sample_block.insert({ column_data.type, column_data.name }); } return Pipe(std::make_shared(connection, database_name, collection_name, Poco::MongoDB::Document{}, sample_block, max_block_size)); } SinkToStoragePtr StorageMongoDB::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr /* context */, bool /*async_insert*/) { connectIfNotConnected(); return std::make_shared(collection_name, database_name, metadata_snapshot, connection); } StorageMongoDB::Configuration StorageMongoDB::getConfiguration(ASTs engine_args, ContextPtr context) { Configuration configuration; if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context)) { validateNamedCollection( *named_collection, ValidateKeysMultiset{"host", "port", "user", "username", "password", "database", "db", "collection", "table"}, {"options"}); configuration.host = named_collection->getAny({"host", "hostname"}); configuration.port = static_cast(named_collection->get("port")); configuration.username = named_collection->getAny({"user", "username"}); configuration.password = named_collection->get("password"); configuration.database = named_collection->getAny({"database", "db"}); configuration.table = named_collection->getAny({"collection", "table"}); configuration.options = named_collection->getOrDefault("options", ""); } else { if (engine_args.size() < 5 || engine_args.size() > 6) throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Storage MongoDB requires from 5 to 6 parameters: " "MongoDB('host:port', database, collection, 'user', 'password' [, 'options'])."); for (auto & engine_arg : engine_args) engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); /// 27017 is the default MongoDB port. auto parsed_host_port = parseAddress(checkAndGetLiteralArgument(engine_args[0], "host:port"), 27017); configuration.host = parsed_host_port.first; configuration.port = parsed_host_port.second; configuration.database = checkAndGetLiteralArgument(engine_args[1], "database"); configuration.table = checkAndGetLiteralArgument(engine_args[2], "table"); configuration.username = checkAndGetLiteralArgument(engine_args[3], "username"); configuration.password = checkAndGetLiteralArgument(engine_args[4], "password"); if (engine_args.size() >= 6) configuration.options = checkAndGetLiteralArgument(engine_args[5], "database"); } context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port)); return configuration; } void registerStorageMongoDB(StorageFactory & factory) { factory.registerStorage("MongoDB", [](const StorageFactory::Arguments & args) { auto configuration = StorageMongoDB::getConfiguration(args.engine_args, args.getLocalContext()); return std::make_shared( args.table_id, configuration.host, configuration.port, configuration.database, configuration.table, configuration.username, configuration.password, configuration.options, args.columns, args.constraints, args.comment); }, { .source_access_type = AccessType::MONGO, }); } }