Merge branch 'mongo' of https://github.com/ageraab/ClickHouse into storage_mongodb

This commit is contained in:
alesapin 2020-06-26 16:03:06 +03:00
commit 11f88340a5
9 changed files with 201 additions and 5 deletions

View File

@ -151,6 +151,7 @@ enum class AccessType
M(FILE, "", GLOBAL, SOURCES) \
M(URL, "", GLOBAL, SOURCES) \
M(REMOTE, "", GLOBAL, SOURCES) \
M(MONGO, "", GLOBAL, SOURCES) \
M(MYSQL, "", GLOBAL, SOURCES) \
M(ODBC, "", GLOBAL, SOURCES) \
M(JDBC, "", GLOBAL, SOURCES) \

View File

@ -278,6 +278,7 @@ dbms_target_link_libraries (
clickhouse_parsers
lz4
Poco::JSON
Poco::MongoDB
string_utils
PUBLIC
${MYSQLXX_LIBRARY}

View File

@ -69,8 +69,7 @@ static const UInt64 max_block_size = 8192;
#if POCO_VERSION < 0x01070800
/// See https://pocoproject.org/forum/viewtopic.php?f=10&t=6326&p=11426&hilit=mongodb+auth#p11485
static void
authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password)
void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password)
{
Poco::MongoDB::Database db(database);
@ -246,8 +245,7 @@ MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource &
MongoDBDictionarySource::~MongoDBDictionarySource() = default;
static std::unique_ptr<Poco::MongoDB::Cursor>
createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select)
std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select)
{
auto cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection);
@ -257,7 +255,6 @@ createCursor(const std::string & database, const std::string & collection, const
for (const auto & column : sample_block_to_select)
cursor->query().returnFieldSelector().add(column.name, 1);
return cursor;
}

View File

@ -15,6 +15,7 @@ namespace Util
namespace MongoDB
{
class Connection;
class Cursor;
}
}
@ -26,6 +27,10 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password);
std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select);
/// Allows loading dictionaries from a MongoDB collection
class MongoDBDictionarySource final : public IDictionarySource
{

View File

@ -0,0 +1,127 @@
#include "StorageMongoDB.h"
#include <Poco/MongoDB/Connection.h>
#include <Poco/MongoDB/Cursor.h>
#include <Poco/MongoDB/Database.h>
#include <Poco/Version.h>
#include <Storages/StorageFactory.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Common/parseAddress.h>
#include <IO/Operators.h>
#include <Parsers/ASTLiteral.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
#include <Dictionaries/MongoDBBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int MONGODB_CANNOT_AUTHENTICATE;
}
StorageMongoDB::StorageMongoDB(
const StorageID & table_id_,
const std::string & host_,
uint16_t port_,
const std::string & database_name_,
const std::string & collection_name_,
const std::string & username_,
const std::string & password_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_)
: IStorage(table_id_)
, host(host_)
, port(port_)
, database_name(database_name_)
, collection_name(collection_name_)
, username(username_)
, password(password_)
, global_context(context_)
, connection{std::make_shared<Poco::MongoDB::Connection>(host, port)}
{
setColumns(columns_);
setConstraints(constraints_);
}
Pipes StorageMongoDB::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned)
{
check(column_names);
#if POCO_VERSION >= 0x01070800
Poco::MongoDB::Database poco_db(database_name);
if (!poco_db.authenticate(*connection, username, password, Poco::MongoDB::Database::AUTH_SCRAM_SHA1))
throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
#else
authenticate(*connection, database_name, username, password);
#endif
Block sample_block;
for (const String & column_name : column_names)
{
auto column_data = getColumns().getPhysical(column_name);
sample_block.insert({ column_data.type, column_data.name });
}
Pipes pipes;
pipes.emplace_back(std::make_shared<SourceFromInputStream>(
std::make_shared<MongoDBBlockInputStream>(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size)));
return pipes;
}
void registerStorageMongoDB(StorageFactory & factory)
{
factory.registerStorage("MongoDB", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 5)
throw Exception(
"Storage MongoDB requires 5 parameters: MongoDB('host:port', database, collection, 'user', 'password').",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.local_context);
/// 27017 is the default MongoDB port.
auto parsed_host_port = parseAddress(engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(), 27017);
const String & remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
const String & collection = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
const String & username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
return StorageMongoDB::create(
args.table_id,
parsed_host_port.first,
parsed_host_port.second,
remote_database,
collection,
username,
password,
args.columns,
args.constraints,
args.context);
},
{
.source_access_type = AccessType::MONGO,
});
}
}

View File

@ -0,0 +1,60 @@
#pragma once
#include "config_core.h"
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
#include <mysqlxx/Pool.h>
#include <Dictionaries/MongoDBDictionarySource.h>
namespace DB
{
/* Implements storage in the MongoDB database.
* Use ENGINE = mysql(host_port, database_name, table_name, user_name, password)
* Read only.
*/
class StorageMongoDB final : public ext::shared_ptr_helper<StorageMongoDB>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageMongoDB>;
public:
StorageMongoDB(
const StorageID & table_id_,
const std::string & host_,
short unsigned int port_,
const std::string & database_name_,
const std::string & collection_name_,
const std::string & username_,
const std::string & password_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_);
std::string getName() const override { return "MongoDB"; }
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
std::string host;
short unsigned int port;
std::string database_name;
std::string collection_name;
std::string username;
std::string password;
Context global_context;
std::shared_ptr<Poco::MongoDB::Connection> connection;
};
}

View File

@ -43,6 +43,8 @@ void registerStorages()
registerStorageMySQL(factory);
#endif
registerStorageMongoDB(factory);
#if USE_RDKAFKA
registerStorageKafka(factory);
#endif

View File

@ -43,6 +43,8 @@ void registerStorageJDBC(StorageFactory & factory);
void registerStorageMySQL(StorageFactory & factory);
#endif
void registerStorageMongoDB(StorageFactory & factory);
#if USE_RDKAFKA
void registerStorageKafka(StorageFactory & factory);
#endif

View File

@ -104,6 +104,7 @@ INTROSPECTION ['INTROSPECTION FUNCTIONS'] \N ALL
FILE [] GLOBAL SOURCES
URL [] GLOBAL SOURCES
REMOTE [] GLOBAL SOURCES
MONGO [] GLOBAL SOURCES
MYSQL [] GLOBAL SOURCES
ODBC [] GLOBAL SOURCES
JDBC [] GLOBAL SOURCES