#include "RedisDictionarySource.h" #include "DictionarySourceFactory.h" #include "DictionaryStructure.h" namespace DB { namespace ErrorCodes { extern const int SUPPORT_IS_DISABLED; } void registerDictionarySourceRedis(DictionarySourceFactory & factory) { auto createTableSource = [=](const DictionaryStructure & dict_struct, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block, const Context & /* context */) -> DictionarySourcePtr { #if USE_POCO_REDIS return std::make_unique(dict_struct, config, config_prefix + ".redis", sample_block); #else (void)dict_struct; (void)config; (void)config_prefix; (void)sample_block; throw Exception{"Dictionary source of type `redis` is disabled because poco library was built without redis support.", ErrorCodes::SUPPORT_IS_DISABLED}; #endif }; factory.registerSource("redis", createTableSource); } } #if USE_POCO_REDIS # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include "RedisBlockInputStream.h" # include "Poco/Logger.h" # include "common/logger_useful.h" namespace DB { namespace ErrorCodes { extern const int UNSUPPORTED_METHOD; extern const int CANNOT_SELECT; extern const int INVALID_CONFIG_PARAMETER; } static const size_t max_block_size = 8192; RedisDictionarySource::RedisDictionarySource( const DictionaryStructure & dict_struct, const std::string & host, UInt16 port, UInt8 db_index, RedisStorageType::Id storage_type, const Block & sample_block) : dict_struct{dict_struct} , host{host} , port{port} , db_index{db_index} , storage_type{storage_type} , sample_block{sample_block} , client{std::make_shared(host, port)} { LOG_INFO(&Logger::get("Redis"), "in ctor"); LOG_INFO(&Logger::get("Redis"), dict_struct.attributes.size()); if (dict_struct.attributes.size() != 1) throw Exception{"Invalid number of non key columns for Redis source: " + DB::toString(dict_struct.attributes.size()) + ", expected 1", ErrorCodes::INVALID_CONFIG_PARAMETER}; LOG_INFO(&Logger::get("Redis"), "After first check"); if (storage_type == RedisStorageType::HASH_MAP) { LOG_INFO(&Logger::get("Redis"), "SET STORAGE_TYPE"); if (!dict_struct.key.has_value()) throw Exception{"Redis source with storage type \'hash_map\' must have key", ErrorCodes::INVALID_CONFIG_PARAMETER}; if (dict_struct.key.value().size() > 2) throw Exception{"Redis source with complex keys having more than 2 attributes are unsupported", ErrorCodes::INVALID_CONFIG_PARAMETER}; // suppose key[0] is primary key, key[1] is secondary key } LOG_INFO(&Logger::get("Redis"), "After second check"); if (db_index != 0) { LOG_INFO(&Logger::get("Redis"), "SET DB_INDEX"); Poco::Redis::Command command("SELECT"); command << static_cast(db_index); std::string reply = client->execute(command); if (reply != "+OK\r\n") throw Exception{"Selecting db with index " + DB::toString(db_index) + " failed with reason " + reply, ErrorCodes::CANNOT_SELECT}; } LOG_INFO(&Logger::get("Redis"), "After third check"); } RedisDictionarySource::RedisDictionarySource( const DictionaryStructure & dict_struct, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block) : RedisDictionarySource( dict_struct, config.getString(config_prefix + ".host"), config.getUInt(config_prefix + ".port"), config.getUInt(config_prefix + ".db_index", 0), parseStorageType(config.getString(config_prefix + ".storage_type", "")), sample_block) { } RedisDictionarySource::RedisDictionarySource(const RedisDictionarySource & other) : RedisDictionarySource{other.dict_struct, other.host, other.port, other.db_index, other.storage_type, other.sample_block} { } RedisDictionarySource::~RedisDictionarySource() = default; BlockInputStreamPtr RedisDictionarySource::loadAll() { LOG_INFO(&Logger::get("Redis"), "Redis in loadAll"); Poco::Redis::Command command_for_keys("KEYS"); command_for_keys << "*"; LOG_INFO(&Logger::get("Redis"), "Command for keys: " + command_for_keys.toString()); Poco::Redis::Array keys = client->execute(command_for_keys); LOG_INFO(&Logger::get("Redis"), "Command for keys executed"); LOG_INFO(&Logger::get("Redis"), "KEYS: " + keys.toString()); if (storage_type == RedisStorageType::HASH_MAP && dict_struct.key->size() == 2) { Poco::Redis::Array hkeys; for (const auto & key : keys) { Poco::Redis::Command command_for_secondary_keys("HKEYS"); command_for_secondary_keys.addRedisType(key); Poco::Redis::Array reply_for_primary_key = client->execute(command_for_secondary_keys); LOG_INFO(&Logger::get("Redis"), "Command for hkeys executed"); Poco::SharedPtr primary_with_secondary; primary_with_secondary->addRedisType(key); for (const auto & secondary_key : reply_for_primary_key) primary_with_secondary->addRedisType(secondary_key); LOG_INFO(&Logger::get("Redis"), "HKEYS: " + primary_with_secondary->toString()); hkeys.add(*primary_with_secondary); } keys = hkeys; } return std::make_shared(client, std::move(keys), sample_block, max_block_size); } BlockInputStreamPtr RedisDictionarySource::loadIds(const std::vector & ids) { LOG_INFO(&Logger::get("Redis"), "Redis in loadIds"); if (storage_type != RedisStorageType::SIMPLE) throw Exception{"Cannot use loadIds with \'simple\' storage type", ErrorCodes::UNSUPPORTED_METHOD}; if (!dict_struct.id) throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD}; Poco::Redis::Array keys; for (UInt64 id : ids) keys << static_cast(id); LOG_INFO(&Logger::get("Redis"), "KEYS: " + keys.toString()); return std::make_shared(client, std::move(keys), sample_block, max_block_size); } std::string RedisDictionarySource::toString() const { return "Redis: " + host + ':' + DB::toString(port); } RedisStorageType::Id RedisDictionarySource::parseStorageType(const std::string & storage_type) { RedisStorageType::Id storage_type_id = RedisStorageType::valueOf(storage_type); if (storage_type_id == RedisStorageType::UNKNOWN) storage_type_id = RedisStorageType::SIMPLE; return storage_type_id; } } #endif