#include "RedisDictionarySource.h" #include "DictionarySourceFactory.h" #include "DictionaryStructure.h" namespace DB { namespace ErrorCodes { extern const int SUPPORT_IS_DISABLED; } void registerDictionarySourceRedis(DictionarySourceFactory & factory) { auto createTableSource = [=](const DictionaryStructure & dict_struct, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block, const Context & /* context */) -> DictionarySourcePtr { #if USE_POCO_REDIS return std::make_unique(dict_struct, config, config_prefix + ".redis", sample_block); #else (void)dict_struct; (void)config; (void)config_prefix; (void)sample_block; throw Exception{"Dictionary source of type `redis` is disabled because poco library was built without redis support.", ErrorCodes::SUPPORT_IS_DISABLED}; #endif }; factory.registerSource("redis", createTableSource); } } #if USE_POCO_REDIS # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include "RedisBlockInputStream.h" # include "Poco/Logger.h" # include "common/logger_useful.h" namespace DB { namespace ErrorCodes { extern const int UNSUPPORTED_METHOD; extern const int CANNOT_SELECT; } static const size_t max_block_size = 8192; RedisDictionarySource::RedisDictionarySource( const DictionaryStructure & dict_struct, const std::string & host, UInt16 port, UInt8 db_index, const Block & sample_block) : dict_struct{dict_struct} , host{host} , port{port} , db_index{db_index} , sample_block{sample_block} , client{std::make_shared(host, port)} { if (db_index != 0) { Poco::Redis::Array command; command << "SELECT" << static_cast(db_index); std::string reply = client->execute(command); if (reply != "+OK\r\n") throw Exception{"Selecting db with index " + DB::toString(db_index) + " failed with reason " + reply, ErrorCodes::CANNOT_SELECT}; } } RedisDictionarySource::RedisDictionarySource( const DictionaryStructure & dict_struct, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block) : RedisDictionarySource( dict_struct, config.getString(config_prefix + ".host"), config.getUInt(config_prefix + ".port"), config.getUInt(config_prefix + ".db_index", 0), sample_block) { } RedisDictionarySource::RedisDictionarySource(const RedisDictionarySource & other) : RedisDictionarySource{other.dict_struct, other.host, other.port, other.db_index, other.sample_block} { } RedisDictionarySource::~RedisDictionarySource() = default; BlockInputStreamPtr RedisDictionarySource::loadAll() { LOG_ERROR(&Logger::get("Redis"), "Redis in loadAll"); Poco::Redis::Array commandForKeys; commandForKeys << "KEYS" << "*"; LOG_ERROR(&Logger::get("Redis"), "Command for keys: " + commandForKeys.toString()); Poco::Redis::Array keys = client->execute(commandForKeys); LOG_ERROR(&Logger::get("Redis"), "Command for keys executed"); LOG_ERROR(&Logger::get("Redis"), "KEYS: " + keys.toString()); return std::make_shared(client, std::move(keys), sample_block, max_block_size); } BlockInputStreamPtr RedisDictionarySource::loadIds(const std::vector & ids) { LOG_ERROR(&Logger::get("Redis"), "Redis in loadIds"); if (!dict_struct.id) throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD}; Poco::Redis::Array keys; for (UInt64 id : ids) keys << static_cast(id); LOG_ERROR(&Logger::get("Redis"), "KEYS: " + keys.toString()); return std::make_shared(client, std::move(keys), sample_block, max_block_size); } std::string RedisDictionarySource::toString() const { return "Redis: " + host + ':' + DB::toString(port); } } #endif