ClickHouse/dbms/Dictionaries/MongoDBDictionarySource.cpp

353 lines
12 KiB
C++
Raw Normal View History

#include "MongoDBDictionarySource.h"
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
2019-12-15 06:34:43 +00:00
#include "registerDictionaries.h"
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
}
void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
{
auto create_table_source = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
const Context & /* context */,
bool /* check_config */) -> DictionarySourcePtr {
#if USE_POCO_MONGODB
return std::make_unique<MongoDBDictionarySource>(dict_struct, config, config_prefix + ".mongodb", sample_block);
#else
(void)dict_struct;
(void)config;
(void)config_prefix;
(void)sample_block;
throw Exception{"Dictionary source of type `mongodb` is disabled because poco library was built without mongodb support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
};
factory.registerSource("mongodb", create_table_source);
}
}
2017-12-01 20:21:35 +00:00
#if USE_POCO_MONGODB
# include <Poco/MongoDB/Array.h>
# include <Poco/MongoDB/Connection.h>
# include <Poco/MongoDB/Cursor.h>
# include <Poco/MongoDB/Database.h>
# include <Poco/MongoDB/ObjectId.h>
# include <Poco/Util/AbstractConfiguration.h>
# include <Poco/Version.h>
// only after poco
// naming conflict:
2018-08-24 05:25:00 +00:00
// Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value);
// dbms/IO/WriteHelpers.h:146 #define writeCString(s, buf)
# include <IO/WriteHelpers.h>
# include <Common/FieldVisitors.h>
# include <ext/enumerate.h>
# include "MongoDBBlockInputStream.h"
2016-12-08 02:49:04 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int MONGODB_CANNOT_AUTHENTICATE;
2016-12-08 02:49:04 +00:00
}
2019-02-10 16:55:12 +00:00
static const UInt64 max_block_size = 8192;
2016-12-08 02:49:04 +00:00
# if POCO_VERSION < 0x01070800
/// See https://pocoproject.org/forum/viewtopic.php?f=10&t=6326&p=11426&hilit=mongodb+auth#p11485
static void
authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password)
2016-12-08 02:49:04 +00:00
{
Poco::MongoDB::Database db(database);
/// Challenge-response authentication.
std::string nonce;
/// First step: request nonce.
{
auto command = db.createCommand();
command->setNumberToReturn(1);
command->selector().add<Int32>("getnonce", 1);
Poco::MongoDB::ResponseMessage response;
connection.sendRequest(*command, response);
if (response.documents().empty())
throw Exception(
"Cannot authenticate in MongoDB: server returned empty response for 'getnonce' command",
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
auto doc = response.documents()[0];
try
{
double ok = doc->get<double>("ok", 0);
if (ok != 1)
throw Exception(
"Cannot authenticate in MongoDB: server returned response for 'getnonce' command that"
" has field 'ok' missing or having wrong value",
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
nonce = doc->get<std::string>("nonce", "");
if (nonce.empty())
throw Exception(
"Cannot authenticate in MongoDB: server returned response for 'getnonce' command that"
" has field 'nonce' missing or empty",
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
}
catch (Poco::NotFoundException & e)
{
throw Exception(
"Cannot authenticate in MongoDB: server returned response for 'getnonce' command that has missing required field: "
+ e.displayText(),
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
}
}
/// Second step: use nonce to calculate digest and send it back to the server.
/// Digest is hex_md5(n.nonce + username + hex_md5(username + ":mongo:" + password))
{
std::string first = user + ":mongo:" + password;
Poco::MD5Engine md5;
md5.update(first);
std::string digest_first(Poco::DigestEngine::digestToHex(md5.digest()));
2018-11-26 00:56:50 +00:00
std::string second = nonce + user + digest_first;
md5.reset();
md5.update(second);
std::string digest_second(Poco::DigestEngine::digestToHex(md5.digest()));
auto command = db.createCommand();
command->setNumberToReturn(1);
command->selector()
.add<Int32>("authenticate", 1)
.add<std::string>("user", user)
.add<std::string>("nonce", nonce)
.add<std::string>("key", digest_second);
Poco::MongoDB::ResponseMessage response;
connection.sendRequest(*command, response);
if (response.empty())
throw Exception(
"Cannot authenticate in MongoDB: server returned empty response for 'authenticate' command",
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
auto doc = response.documents()[0];
try
{
double ok = doc->get<double>("ok", 0);
if (ok != 1)
throw Exception(
"Cannot authenticate in MongoDB: server returned response for 'authenticate' command that"
" has field 'ok' missing or having wrong value",
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
}
catch (Poco::NotFoundException & e)
{
throw Exception(
"Cannot authenticate in MongoDB: server returned response for 'authenticate' command that has missing required field: "
+ e.displayText(),
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
}
}
}
# endif
2016-12-08 02:49:04 +00:00
MongoDBDictionarySource::MongoDBDictionarySource(
2019-08-03 11:02:40 +00:00
const DictionaryStructure & dict_struct_,
const std::string & host_,
UInt16 port_,
const std::string & user_,
const std::string & password_,
const std::string & method_,
const std::string & db_,
const std::string & collection_,
const Block & sample_block_)
: dict_struct{dict_struct_}
, host{host_}
, port{port_}
, user{user_}
, password{password_}
, method{method_}
, db{db_}
, collection{collection_}
, sample_block{sample_block_}
, connection{std::make_shared<Poco::MongoDB::Connection>(host, port)}
2016-12-08 02:49:04 +00:00
{
if (!user.empty())
{
# if POCO_VERSION >= 0x01070800
Poco::MongoDB::Database poco_db(db);
2019-02-25 10:45:22 +00:00
if (!poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method))
throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
# else
authenticate(*connection, db, user, password);
# endif
}
2016-12-08 02:49:04 +00:00
}
MongoDBDictionarySource::MongoDBDictionarySource(
2019-08-03 11:02:40 +00:00
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
2019-08-03 11:02:40 +00:00
Block & sample_block_)
: MongoDBDictionarySource(
2019-08-03 11:02:40 +00:00
dict_struct_,
config.getString(config_prefix + ".host"),
config.getUInt(config_prefix + ".port"),
config.getString(config_prefix + ".user", ""),
config.getString(config_prefix + ".password", ""),
config.getString(config_prefix + ".method", ""),
config.getString(config_prefix + ".db", ""),
config.getString(config_prefix + ".collection"),
2019-08-03 11:02:40 +00:00
sample_block_)
2016-12-08 02:49:04 +00:00
{
}
MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource & other)
: MongoDBDictionarySource{other.dict_struct,
other.host,
other.port,
other.user,
other.password,
other.method,
other.db,
other.collection,
other.sample_block}
2016-12-08 02:49:04 +00:00
{
}
MongoDBDictionarySource::~MongoDBDictionarySource() = default;
static std::unique_ptr<Poco::MongoDB::Cursor>
createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select)
{
auto cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection);
/// Looks like selecting _id column is implicit by default.
if (!sample_block_to_select.has("_id"))
cursor->query().returnFieldSelector().add("_id", 0);
for (const auto & column : sample_block_to_select)
cursor->query().returnFieldSelector().add(column.name, 1);
return cursor;
}
2016-12-08 02:49:04 +00:00
BlockInputStreamPtr MongoDBDictionarySource::loadAll()
{
return std::make_shared<MongoDBBlockInputStream>(connection, createCursor(db, collection, sample_block), sample_block, max_block_size);
2016-12-08 02:49:04 +00:00
}
BlockInputStreamPtr MongoDBDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
if (!dict_struct.id)
throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD};
2016-12-08 02:49:04 +00:00
auto cursor = createCursor(db, collection, sample_block);
/** NOTE: While building array, Poco::MongoDB requires passing of different unused element names, along with values.
* In general, Poco::MongoDB is quite inefficient and bulky.
*/
Poco::MongoDB::Array::Ptr ids_array(new Poco::MongoDB::Array);
for (const UInt64 id : ids)
ids_array->add(DB::toString(id), Int32(id));
cursor->query().selector().addNewDocument(dict_struct.id->name).add("$in", ids_array);
2016-12-08 02:49:04 +00:00
return std::make_shared<MongoDBBlockInputStream>(connection, std::move(cursor), sample_block, max_block_size);
2016-12-08 02:49:04 +00:00
}
BlockInputStreamPtr MongoDBDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
2016-12-08 02:49:04 +00:00
{
if (!dict_struct.key)
throw Exception{"'key' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD};
auto cursor = createCursor(db, collection, sample_block);
Poco::MongoDB::Array::Ptr keys_array(new Poco::MongoDB::Array);
2018-02-06 09:45:52 +00:00
for (const auto row_idx : requested_rows)
{
2018-02-06 11:15:13 +00:00
auto & key = keys_array->addNewDocument(DB::toString(row_idx));
for (const auto attr : ext::enumerate(*dict_struct.key))
{
switch (attr.second.underlying_type)
{
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType::utUInt8:
case AttributeUnderlyingType::utUInt16:
case AttributeUnderlyingType::utUInt32:
case AttributeUnderlyingType::utUInt64:
case AttributeUnderlyingType::utUInt128:
case AttributeUnderlyingType::utInt8:
case AttributeUnderlyingType::utInt16:
case AttributeUnderlyingType::utInt32:
case AttributeUnderlyingType::utInt64:
case AttributeUnderlyingType::utDecimal32:
case AttributeUnderlyingType::utDecimal64:
case AttributeUnderlyingType::utDecimal128:
key.add(attr.second.name, Int32(key_columns[attr.first]->get64(row_idx)));
break;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType::utFloat32:
case AttributeUnderlyingType::utFloat64:
2019-12-29 19:36:02 +00:00
key.add(attr.second.name, key_columns[attr.first]->getFloat64(row_idx));
break;
2019-08-03 11:02:40 +00:00
case AttributeUnderlyingType::utString:
String loaded_str(get<String>((*key_columns[attr.first])[row_idx]));
2018-02-06 09:45:52 +00:00
/// Convert string to ObjectID
if (attr.second.is_object_id)
2018-02-06 09:45:52 +00:00
{
Poco::MongoDB::ObjectId::Ptr loaded_id(new Poco::MongoDB::ObjectId(loaded_str));
key.add(attr.second.name, loaded_id);
2018-02-06 09:45:52 +00:00
}
else
{
key.add(attr.second.name, loaded_str);
2018-02-06 09:45:52 +00:00
}
break;
}
}
}
/// If more than one key we should use $or
cursor->query().selector().add("$or", keys_array);
return std::make_shared<MongoDBBlockInputStream>(connection, std::move(cursor), sample_block, max_block_size);
2016-12-08 02:49:04 +00:00
}
std::string MongoDBDictionarySource::toString() const
{
return "MongoDB: " + db + '.' + collection + ',' + (user.empty() ? " " : " " + user + '@') + host + ':' + DB::toString(port);
2016-12-08 02:49:04 +00:00
}
}
#endif