#include "RedisDictionarySource.h" #include "DictionarySourceFactory.h" #include "DictionaryStructure.h" namespace DB { namespace ErrorCodes { extern const int SUPPORT_IS_DISABLED; } void registerDictionarySourceRedis(DictionarySourceFactory & factory) { auto createTableSource = [=](const DictionaryStructure & dict_struct, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block, const Context & /* context */) -> DictionarySourcePtr { #if USE_POCO_REDIS return std::make_unique(dict_struct, config, config_prefix + ".redis", sample_block); #else (void)dict_struct; (void)config; (void)config_prefix; (void)sample_block; throw Exception{"Dictionary source of type `redis` is disabled because poco library was built without redis support.", ErrorCodes::SUPPORT_IS_DISABLED}; #endif }; factory.registerSource("redis", createTableSource); } } #if USE_POCO_REDIS # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include "RedisBlockInputStream.h" namespace DB { namespace ErrorCodes { extern const int UNSUPPORTED_METHOD; } static const size_t max_block_size = 8192; RedisDictionarySource::RedisDictionarySource( const DictionaryStructure & dict_struct, const std::string & host, UInt16 port, const Block & sample_block) : dict_struct{dict_struct} , host{host} , port{port} , sample_block{sample_block} , client{std::make_shared(host, port)} { } RedisDictionarySource::RedisDictionarySource( const DictionaryStructure & dict_struct, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block) : RedisDictionarySource( dict_struct, config.getString(config_prefix + ".host"), config.getUInt(config_prefix + ".port"), sample_block) { } RedisDictionarySource::RedisDictionarySource(const RedisDictionarySource & other) : RedisDictionarySource{other.dict_struct, other.host, other.port, other.sample_block} { } RedisDictionarySource::~RedisDictionarySource() = default; BlockInputStreamPtr RedisDictionarySource::loadAll() { Poco::Redis::Array commandForKeys; commandForKeys << "KEYS" << "*"; Poco::Redis::Array keys = client->execute(commandForKeys); return std::make_shared(client, std::move(keys), sample_block, max_block_size); } BlockInputStreamPtr RedisDictionarySource::loadIds(const std::vector & ids) { if (!dict_struct.id) throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD}; Poco::Redis::Array keys; for (UInt64 id : ids) keys << static_cast(id); return std::make_shared(client, std::move(keys), sample_block, max_block_size); } std::string RedisDictionarySource::toString() const { return "Redis: " + host + ':' + DB::toString(port); } } #endif