#include "StorageMongoDB.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int MONGODB_CANNOT_AUTHENTICATE; } StorageMongoDB::StorageMongoDB( const StorageID & table_id_, const std::string & host_, uint16_t port_, const std::string & database_name_, const std::string & collection_name_, const std::string & username_, const std::string & password_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const Context & context_) : IStorage(table_id_) , host(host_) , port(port_) , database_name(database_name_) , collection_name(collection_name_) , username(username_) , password(password_) , global_context(context_) , connection{std::make_shared(host, port)} { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); storage_metadata.setConstraints(constraints_); setInMemoryMetadata(storage_metadata); } Pipes StorageMongoDB::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & /*query_info*/, const Context & /*context*/, QueryProcessingStage::Enum /*processed_stage*/, size_t max_block_size, unsigned) { metadata_snapshot->check(column_names, getVirtuals(), getStorageID()); #if POCO_VERSION >= 0x01070800 Poco::MongoDB::Database poco_db(database_name); if (!poco_db.authenticate(*connection, username, password, Poco::MongoDB::Database::AUTH_SCRAM_SHA1)) throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); #else authenticate(*connection, database_name, username, password); #endif Block sample_block; for (const String & column_name : column_names) { auto column_data = metadata_snapshot->getColumns().getPhysical(column_name); sample_block.insert({ column_data.type, column_data.name }); } Pipes pipes; pipes.emplace_back(std::make_shared( std::make_shared(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size, true))); return pipes; } void registerStorageMongoDB(StorageFactory & factory) { factory.registerStorage("MongoDB", [](const StorageFactory::Arguments & args) { ASTs & engine_args = args.engine_args; if (engine_args.size() != 5) throw Exception( "Storage MongoDB requires 5 parameters: MongoDB('host:port', database, collection, 'user', 'password').", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (auto & engine_arg : engine_args) engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.local_context); /// 27017 is the default MongoDB port. auto parsed_host_port = parseAddress(engine_args[0]->as().value.safeGet(), 27017); const String & remote_database = engine_args[1]->as().value.safeGet(); const String & collection = engine_args[2]->as().value.safeGet(); const String & username = engine_args[3]->as().value.safeGet(); const String & password = engine_args[4]->as().value.safeGet(); return StorageMongoDB::create( args.table_id, parsed_host_port.first, parsed_host_port.second, remote_database, collection, username, password, args.columns, args.constraints, args.context); }, { .source_access_type = AccessType::MONGO, }); } }