dbms: MongoDB dictionary source draft (actually it works, but there are some improvements to be made) [#METR-17854]

This commit is contained in:
Andrey Mironov 2015-10-09 17:51:31 +03:00
parent cbe8f384ce
commit 8e4f30fff0
6 changed files with 453 additions and 3 deletions

View File

@ -294,6 +294,7 @@ namespace ErrorCodes
LIMIT_EXCEEDED = 290,
DATABASE_ACCESS_DENIED = 291,
LEADERSHIP_CHANGED = 292,
MONGODB_INIT_FAILED = 293,
KEEPER_EXCEPTION = 999,
POCO_EXCEPTION = 1000,

View File

@ -25,7 +25,7 @@ public:
ClickHouseDictionarySource(const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block, Context & context)
const Block & sample_block, Context & context)
: dict_struct{dict_struct},
host{config.getString(config_prefix + ".host")},
port(config.getInt(config_prefix + ".port")),

View File

@ -5,6 +5,7 @@
#include <DB/Dictionaries/FileDictionarySource.h>
#include <DB/Dictionaries/MySQLDictionarySource.h>
#include <DB/Dictionaries/ClickHouseDictionarySource.h>
#include <DB/Dictionaries/MongoDBDictionarySource.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <common/singleton.h>
#include <memory>
@ -84,6 +85,11 @@ public:
return std::make_unique<ClickHouseDictionarySource>(dict_struct, config, config_prefix + ".clickhouse",
sample_block, context);
}
else if ("mongodb" == source_type)
{
return std::make_unique<MongoDBDictionarySource>(dict_struct, config, config_prefix + ".mongodb",
sample_block, context);
}
throw Exception{
name + ": unknown dictionary source type: " + source_type,

View File

@ -0,0 +1,316 @@
#pragma once
#include <DB/Core/Block.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeDate.h>
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/Columns/ColumnString.h>
#include <ext/range.hpp>
#include <mongo/client/dbclient.h>
#include <vector>
#include <string>
namespace DB
{
/// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining
class MongoDBBlockInputStream final : public IProfilingBlockInputStream
{
enum struct value_type_t
{
UInt8,
UInt16,
UInt32,
UInt64,
Int8,
Int16,
Int32,
Int64,
Float32,
Float64,
String,
Date,
DateTime
};
public:
MongoDBBlockInputStream(
std::unique_ptr<mongo::DBClientCursor> cursor_, const Block & sample_block, const std::size_t max_block_size)
: cursor{std::move(cursor_)}, sample_block{sample_block}, max_block_size{max_block_size}
{
if (!cursor->more())
return;
types.reserve(sample_block.columns());
for (const auto idx : ext::range(0, sample_block.columns()))
{
const auto & column = sample_block.getByPosition(idx);
const auto type = column.type.get();
if (typeid_cast<const DataTypeUInt8 *>(type))
types.push_back(value_type_t::UInt8);
else if (typeid_cast<const DataTypeUInt16 *>(type))
types.push_back(value_type_t::UInt16);
else if (typeid_cast<const DataTypeUInt32 *>(type))
types.push_back(value_type_t::UInt32);
else if (typeid_cast<const DataTypeUInt64 *>(type))
types.push_back(value_type_t::UInt64);
else if (typeid_cast<const DataTypeInt8 *>(type))
types.push_back(value_type_t::Int8);
else if (typeid_cast<const DataTypeInt16 *>(type))
types.push_back(value_type_t::Int16);
else if (typeid_cast<const DataTypeInt32 *>(type))
types.push_back(value_type_t::Int32);
else if (typeid_cast<const DataTypeInt64 *>(type))
types.push_back(value_type_t::Int64);
else if (typeid_cast<const DataTypeFloat32 *>(type))
types.push_back(value_type_t::Float32);
else if (typeid_cast<const DataTypeInt64 *>(type))
types.push_back(value_type_t::Float64);
else if (typeid_cast<const DataTypeString *>(type))
types.push_back(value_type_t::String);
else if (typeid_cast<const DataTypeDate *>(type))
types.push_back(value_type_t::Date);
else if (typeid_cast<const DataTypeDateTime *>(type))
types.push_back(value_type_t::DateTime);
else
throw Exception{
"Unsupported type " + type->getName(),
ErrorCodes::UNKNOWN_TYPE
};
names.emplace_back(column.name);
}
}
String getName() const override { return "MongoDB"; }
String getID() const override
{
using stream = std::ostringstream;
return "MongoDB(@" + static_cast<stream &>(stream{} << cursor.get()).str() + ")";
}
private:
Block readImpl() override
{
if (!cursor->more())
return {};
auto block = sample_block.cloneEmpty();
/// cache pointers returned by the calls to getByPosition
std::vector<IColumn *> columns(block.columns());
const auto size = columns.size();
for (const auto i : ext::range(0, size))
columns[i] = block.getByPosition(i).column.get();
std::size_t num_rows = 0;
while (cursor->more())
{
const auto row = cursor->next();
for (const auto idx : ext::range(0, size))
{
const auto value = row[names[idx]];
if (value.ok())
insertValue(columns[idx], types[idx], value);
else
insertDefaultValue(columns[idx], types[idx]);
}
++num_rows;
if (num_rows == max_block_size)
break;
}
return block;
}
static void insertValue(IColumn * const column, const value_type_t type, const mongo::BSONElement & value)
{
switch (type)
{
case value_type_t::UInt8:
{
if (value.type() != mongo::Bool)
throw Exception{
"Type mismatch, expected Bool, got " + std::string{mongo::typeName(value.type())},
ErrorCodes::TYPE_MISMATCH
};
static_cast<ColumnFloat64 *>(column)->insert(value.boolean());
break;
}
case value_type_t::UInt16:
{
if (!value.isNumber())
throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())},
ErrorCodes::TYPE_MISMATCH
};
static_cast<ColumnUInt16 *>(column)->insert(value.numberInt());
break;
}
case value_type_t::UInt32:
{
if (!value.isNumber())
throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())},
ErrorCodes::TYPE_MISMATCH
};
static_cast<ColumnUInt32 *>(column)->insert(value.numberInt());
break;
}
case value_type_t::UInt64:
{
if (!value.isNumber())
throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())},
ErrorCodes::TYPE_MISMATCH
};
static_cast<ColumnUInt64 *>(column)->insert(value.numberLong());
break;
}
case value_type_t::Int8:
{
if (!value.isNumber())
throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())},
ErrorCodes::TYPE_MISMATCH
};
static_cast<ColumnInt8 *>(column)->insert(value.numberInt());
break;
}
case value_type_t::Int16:
{
if (!value.isNumber())
throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())},
ErrorCodes::TYPE_MISMATCH
};
static_cast<ColumnInt16 *>(column)->insert(value.numberInt());
break;
}
case value_type_t::Int32:
{
if (!value.isNumber())
throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())},
ErrorCodes::TYPE_MISMATCH
};
static_cast<ColumnInt32 *>(column)->insert(value.numberInt());
break;
}
case value_type_t::Int64:
{
if (!value.isNumber())
throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())},
ErrorCodes::TYPE_MISMATCH
};
static_cast<ColumnInt64 *>(column)->insert(value.numberLong());
break;
}
case value_type_t::Float32:
{
if (!value.isNumber())
throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())},
ErrorCodes::TYPE_MISMATCH
};
static_cast<ColumnFloat32 *>(column)->insert(value.number());
break;
}
case value_type_t::Float64:
{
if (!value.isNumber())
throw Exception{
"Type mismatch, expected a number, got " + std::string{mongo::typeName(value.type())},
ErrorCodes::TYPE_MISMATCH
};
static_cast<ColumnFloat64 *>(column)->insert(value.number());
break;
}
case value_type_t::String:
{
if (value.type() != mongo::String)
throw Exception{
"Type mismatch, expected String, got " + std::string{mongo::typeName(value.type())},
ErrorCodes::TYPE_MISMATCH
};
const auto string = value.String();
static_cast<ColumnString *>(column)->insertDataWithTerminatingZero(string.data(), string.size() + 1);
break;
}
case value_type_t::Date:
{
if (value.type() != mongo::Date)
throw Exception{
"Type mismatch, expected Date, got " + std::string{mongo::typeName(value.type())},
ErrorCodes::TYPE_MISMATCH
};
static_cast<ColumnUInt16 *>(column)->insert(
UInt16{DateLUT::instance().toDayNum(value.date().toTimeT())});
break;
}
case value_type_t::DateTime:
{
if (value.type() != mongo::Date)
throw Exception{
"Type mismatch, expected Date, got " + std::string{mongo::typeName(value.type())},
ErrorCodes::TYPE_MISMATCH
};
static_cast<ColumnUInt32 *>(column)->insert(value.date().toTimeT());
break;
}
}
}
/// @todo insert default value from the dictionary attribute definition
static void insertDefaultValue(IColumn * const column, const value_type_t type)
{
switch (type)
{
case value_type_t::UInt8: static_cast<ColumnUInt8 *>(column)->insertDefault(); break;
case value_type_t::UInt16: static_cast<ColumnUInt16 *>(column)->insertDefault(); break;
case value_type_t::UInt32: static_cast<ColumnUInt32 *>(column)->insertDefault(); break;
case value_type_t::UInt64: static_cast<ColumnUInt64 *>(column)->insertDefault(); break;
case value_type_t::Int8: static_cast<ColumnInt8 *>(column)->insertDefault(); break;
case value_type_t::Int16: static_cast<ColumnInt16 *>(column)->insertDefault(); break;
case value_type_t::Int32: static_cast<ColumnInt32 *>(column)->insertDefault(); break;
case value_type_t::Int64: static_cast<ColumnInt64 *>(column)->insertDefault(); break;
case value_type_t::Float32: static_cast<ColumnFloat32 *>(column)->insertDefault(); break;
case value_type_t::Float64: static_cast<ColumnFloat64 *>(column)->insertDefault(); break;
case value_type_t::String: static_cast<ColumnString *>(column)->insertDefault(); break;
case value_type_t::Date: static_cast<ColumnUInt16 *>(column)->insertDefault(); break;
case value_type_t::DateTime: static_cast<ColumnUInt32 *>(column)->insertDefault(); break;
}
}
std::unique_ptr<mongo::DBClientCursor> cursor;
Block sample_block;
const std::size_t max_block_size;
std::vector<value_type_t> types;
std::vector<mongo::StringData> names;
};
}

View File

@ -0,0 +1,128 @@
#pragma once
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/MongoDBBlockInputStream.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <mongo/client/dbclient.h>
namespace DB
{
/// Allows loading dictionaries from a MySQL database
class MongoDBDictionarySource final : public IDictionarySource
{
MongoDBDictionarySource(
const DictionaryStructure & dict_struct, const std::string & host, const std::string & port,
const std::string & user, const std::string & password,
const std::string & db, const std::string & collection,
const Block & sample_block, Context & context)
: dict_struct{dict_struct}, host{host}, port{port}, user{user}, password{password},
db{db}, collection{collection}, sample_block{sample_block}, context(context),
connection{true}
{
init();
connection.connect(host + ':' + port);
if (!user.empty())
{
std::string error;
if (!connection.auth(db, user, password, error))
throw DB::Exception{
"Could not authenticate to a MongoDB database " + db + " with provided credentials: " + error,
ErrorCodes::WRONG_PASSWORD
};
}
/// compose BSONObj containing all requested fields
mongo::BSONObjBuilder builder;
builder << "_id" << 0;
for (const auto & column : sample_block.getColumns())
builder << column.name << 1;
fields_to_query = builder.obj();
}
static void init()
{
static const auto mongo_init_status = mongo::client::initialize();
if (!mongo_init_status.isOK())
throw DB::Exception{
"mongo::client::initialize() failed: " + mongo_init_status.toString(),
ErrorCodes::MONGODB_INIT_FAILED
};
LOG_TRACE(&Logger::get("MongoDBDictionarySource"), "mongo::client::initialize() ok");
}
public:
MongoDBDictionarySource(
const DictionaryStructure & dict_struct, const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, Block & sample_block, Context & context)
: MongoDBDictionarySource{
dict_struct,
config.getString(config_prefix + ".host"),
config.getString(config_prefix + ".port"),
config.getString(config_prefix + ".user", ""),
config.getString(config_prefix + ".password", ""),
config.getString(config_prefix + ".db", ""),
config.getString(config_prefix + ".collection"),
sample_block, context
}
{
}
MongoDBDictionarySource(const MongoDBDictionarySource & other)
: MongoDBDictionarySource{
other.dict_struct, other.host, other.port, other.user, other.password,
other.db, other.collection, other.sample_block, other.context
}
{
}
BlockInputStreamPtr loadAll() override
{
return new MongoDBBlockInputStream{
connection.query(db + '.' + collection, {}, 0, 0, &fields_to_query),
sample_block, 8192
};
}
bool supportsSelectiveLoad() const override { return true; }
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> & ids) override
{
return new MongoDBBlockInputStream{
connection.query(db + '.' + collection, {}, 0, 0, &fields_to_query),
sample_block, 8192
};
}
bool isModified() const override { return false; }
DictionarySourcePtr clone() const override { return std::make_unique<MongoDBDictionarySource>(*this); }
std::string toString() const override
{
return "MongoDB: " + db + '.' + collection + ',' + (user.empty() ? " " : " " + user + '@') + host + ':' + port;
}
private:
const DictionaryStructure dict_struct;
const std::string host;
const std::string port;
const std::string user;
const std::string password;
const std::string db;
const std::string collection;
Block sample_block;
Context & context;
mongo::DBClientConnection connection;
mongo::BSONObj fields_to_query;
};
}

View File

@ -19,7 +19,7 @@ class MySQLDictionarySource final : public IDictionarySource
public:
MySQLDictionarySource(const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block)
const Block & sample_block)
: dict_struct{dict_struct},
db{config.getString(config_prefix + ".db", "")},
table{config.getString(config_prefix + ".table")},
@ -78,7 +78,6 @@ public:
private:
Logger * log = &Logger::get("MySQLDictionarySource");
static std::string quoteForLike(const std::string s)
{
std::string tmp;