ClickHouse/src/Dictionaries/CassandraDictionarySource.cpp

125 lines
3.9 KiB
C++
Raw Normal View History

2019-02-16 10:11:49 +00:00
#include "CassandraDictionarySource.h"
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
}
void registerDictionarySourceCassandra(DictionarySourceFactory & factory)
{
auto createTableSource = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
2020-05-19 17:48:28 +00:00
const Context & /* context */,
bool /*check_config*/) -> DictionarySourcePtr {
2019-02-16 10:11:49 +00:00
#if USE_CASSANDRA
2019-05-29 23:01:25 +00:00
return std::make_unique<CassandraDictionarySource>(dict_struct, config, config_prefix + ".cassandra", sample_block);
2019-02-16 10:11:49 +00:00
#else
(void)dict_struct;
(void)config;
(void)config_prefix;
(void)sample_block;
throw Exception{"Dictionary source of type `cassandra` is disabled because library was built without cassandra support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
};
factory.registerSource("cassandra", createTableSource);
}
}
#if USE_CASSANDRA
# include <cassandra.h>
2019-05-22 21:09:29 +00:00
# include <IO/WriteHelpers.h>
# include "CassandraBlockInputStream.h"
2019-02-16 10:11:49 +00:00
namespace DB
{
2019-05-29 23:01:25 +00:00
namespace ErrorCodes
{
2019-02-16 10:11:49 +00:00
extern const int UNSUPPORTED_METHOD;
extern const int WRONG_PASSWORD;
}
static const size_t max_block_size = 8192;
CassandraDictionarySource::CassandraDictionarySource(
2020-05-19 17:48:28 +00:00
const DB::DictionaryStructure & dict_struct_,
const std::string & host_,
UInt16 port_,
const std::string & user_,
const std::string & password_,
const std::string & method_,
const std::string & db_,
const DB::Block & sample_block_)
: dict_struct(dict_struct_)
, host(host_)
, port(port_)
, user(user_)
, password(password_)
, method(method_)
, db(db_)
, sample_block(sample_block_)
, cluster(cass_cluster_new())
, session(cass_session_new())
2019-02-16 10:11:49 +00:00
{
cass_cluster_set_contact_points(cluster, toConnectionString(host, port).c_str());
}
CassandraDictionarySource::CassandraDictionarySource(
2020-05-19 17:48:28 +00:00
const DB::DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DB::Block & sample_block_)
2019-02-16 10:11:49 +00:00
: CassandraDictionarySource(
2020-05-19 17:48:28 +00:00
dict_struct_,
2019-02-16 10:11:49 +00:00
config.getString(config_prefix + ".host"),
config.getUInt(config_prefix + ".port"),
config.getString(config_prefix + ".user", ""),
config.getString(config_prefix + ".password", ""),
config.getString(config_prefix + ".method", ""),
config.getString(config_prefix + ".db", ""),
2020-05-19 17:48:28 +00:00
sample_block_)
2019-02-16 10:11:49 +00:00
{
}
CassandraDictionarySource::CassandraDictionarySource(const CassandraDictionarySource & other)
: CassandraDictionarySource{other.dict_struct,
other.host,
other.port,
other.user,
other.password,
other.method,
other.db,
other.sample_block}
{
}
CassandraDictionarySource::~CassandraDictionarySource() {
cass_session_free(session);
cass_cluster_free(cluster);
}
std::string CassandraDictionarySource::toConnectionString(const std::string &host, const UInt16 port) {
return host + (port != 0 ? ":" + std::to_string(port) : "");
}
2019-05-22 21:09:29 +00:00
BlockInputStreamPtr CassandraDictionarySource::loadAll() {
return std::make_shared<CassandraBlockInputStream>(nullptr, "", sample_block, max_block_size);
}
std::string CassandraDictionarySource::toString() const {
return "Cassandra: " + /*db + '.' + collection + ',' + (user.empty() ? " " : " " + user + '@') + */ host + ':' + DB::toString(port);
}
2019-02-16 10:11:49 +00:00
}
#endif