ClickHouse/src/Dictionaries/RedisDictionarySource.cpp

280 lines
10 KiB
C++
Raw Normal View History

2019-01-15 22:08:56 +00:00
#include "RedisDictionarySource.h"
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
2019-12-15 06:34:43 +00:00
#include "registerDictionaries.h"
2019-01-15 22:08:56 +00:00
namespace DB
{
void registerDictionarySourceRedis(DictionarySourceFactory & factory)
{
auto create_table_source = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
Block & sample_block,
ContextPtr /* context */,
2020-08-15 03:10:57 +00:00
const std::string & /* default_database */,
bool /* check_config */) -> DictionarySourcePtr {
2019-03-30 21:42:13 +00:00
return std::make_unique<RedisDictionarySource>(dict_struct, config, config_prefix + ".redis", sample_block);
};
factory.registerSource("redis", create_table_source);
2019-01-15 22:08:56 +00:00
}
}
2019-01-15 22:08:56 +00:00
#include <Poco/Redis/Array.h>
#include <Poco/Redis/Client.h>
#include <Poco/Redis/Command.h>
#include <Poco/Redis/Type.h>
#include <Poco/Util/AbstractConfiguration.h>
2019-01-15 22:08:56 +00:00
#include <IO/WriteHelpers.h>
2019-05-30 21:16:12 +00:00
#include "RedisBlockInputStream.h"
2019-01-15 22:08:56 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
2019-04-16 23:13:07 +00:00
extern const int INVALID_CONFIG_PARAMETER;
extern const int INTERNAL_REDIS_ERROR;
extern const int LOGICAL_ERROR;
2019-01-15 22:08:56 +00:00
}
static const size_t max_block_size = 8192;
2019-01-15 22:08:56 +00:00
RedisDictionarySource::RedisDictionarySource(
2019-09-12 14:48:28 +00:00
const DictionaryStructure & dict_struct_,
const String & host_,
2019-09-12 14:48:28 +00:00
UInt16 port_,
UInt8 db_index_,
const String & password_,
RedisStorageType storage_type_,
2019-09-12 14:48:28 +00:00
const Block & sample_block_)
: dict_struct{dict_struct_}
, host{host_}
, port{port_}
, db_index{db_index_}
, password{password_}
2019-09-12 14:48:28 +00:00
, storage_type{storage_type_}
, sample_block{sample_block_}
2019-01-15 22:08:56 +00:00
, client{std::make_shared<Poco::Redis::Client>(host, port)}
{
2019-04-16 23:13:07 +00:00
if (dict_struct.attributes.size() != 1)
2021-04-10 18:48:36 +00:00
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER,
"Invalid number of non key columns for Redis source: {}, expected 1",
DB::toString(dict_struct.attributes.size()));
2019-04-16 23:13:07 +00:00
2019-09-12 14:48:28 +00:00
if (storage_type == RedisStorageType::HASH_MAP)
{
if (!dict_struct.key)
2021-04-10 18:48:36 +00:00
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER,
"Redis source with storage type \'hash_map\' must have key");
if (dict_struct.key->size() != 2)
2021-04-10 18:48:36 +00:00
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER,
"Redis source with storage type \'hash_map\' requires 2 keys");
2019-04-16 23:13:07 +00:00
// suppose key[0] is primary key, key[1] is secondary key
for (const auto & key : *dict_struct.key)
if (!isInteger(key.type) && !isString(key.type))
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER,
2021-04-10 18:48:36 +00:00
"Redis source supports only integer or string key, but key '{}' of type {} given",
key.name,
key.type->getName());
2019-04-16 23:13:07 +00:00
}
if (!password.empty())
{
RedisCommand command("AUTH");
command << password;
String reply = client->execute<String>(command);
if (reply != "OK")
2021-04-10 18:48:36 +00:00
throw Exception(ErrorCodes::INTERNAL_REDIS_ERROR,
"Authentication failed with reason {}",
reply);
}
2019-04-16 23:13:07 +00:00
if (db_index != 0)
{
RedisCommand command("SELECT");
command << std::to_string(db_index);
String reply = client->execute<String>(command);
if (reply != "OK")
2021-04-10 18:48:36 +00:00
throw Exception(ErrorCodes::INTERNAL_REDIS_ERROR,
"Selecting database with index {} failed with reason {}",
DB::toString(db_index),
reply);
}
2019-01-15 22:08:56 +00:00
}
RedisDictionarySource::RedisDictionarySource(
2019-09-12 14:48:28 +00:00
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config_,
const String & config_prefix_,
2019-09-12 14:48:28 +00:00
Block & sample_block_)
2019-01-15 22:08:56 +00:00
: RedisDictionarySource(
2019-09-12 14:48:28 +00:00
dict_struct_,
config_.getString(config_prefix_ + ".host"),
config_.getUInt(config_prefix_ + ".port"),
config_.getUInt(config_prefix_ + ".db_index", 0),
config_.getString(config_prefix_ + ".password",""),
2019-09-12 14:48:28 +00:00
parseStorageType(config_.getString(config_prefix_ + ".storage_type", "")),
sample_block_)
2019-01-15 22:08:56 +00:00
{
}
RedisDictionarySource::RedisDictionarySource(const RedisDictionarySource & other)
: RedisDictionarySource{other.dict_struct,
other.host,
other.port,
other.db_index,
other.password,
2019-04-16 23:13:07 +00:00
other.storage_type,
2019-01-15 22:08:56 +00:00
other.sample_block}
{
}
RedisDictionarySource::~RedisDictionarySource() = default;
static String storageTypeToKeyType(RedisStorageType type)
2019-09-13 17:38:56 +00:00
{
switch (type)
{
case RedisStorageType::SIMPLE:
2019-09-13 17:38:56 +00:00
return "string";
case RedisStorageType::HASH_MAP:
2019-09-13 17:38:56 +00:00
return "hash";
default:
return "none";
}
__builtin_unreachable();
2019-09-13 17:38:56 +00:00
}
2019-01-15 22:08:56 +00:00
BlockInputStreamPtr RedisDictionarySource::loadAll()
{
if (!client->isConnected())
client->connect(host, port);
RedisCommand command_for_keys("KEYS");
2019-04-16 23:13:07 +00:00
command_for_keys << "*";
2019-02-13 00:05:43 +00:00
2019-09-13 17:38:56 +00:00
/// Get only keys for specified storage type.
auto all_keys = client->execute<RedisArray>(command_for_keys);
if (all_keys.isNull())
return std::make_shared<RedisBlockInputStream>(client, RedisArray{}, storage_type, sample_block, max_block_size);
RedisArray keys;
2019-09-13 17:38:56 +00:00
auto key_type = storageTypeToKeyType(storage_type);
2020-04-22 07:03:43 +00:00
for (const auto & key : all_keys)
if (key_type == client->execute<String>(RedisCommand("TYPE").addRedisType(key)))
2019-09-13 17:38:56 +00:00
keys.addRedisType(std::move(key));
2019-01-27 13:14:02 +00:00
if (storage_type == RedisStorageType::HASH_MAP)
2019-04-16 23:13:07 +00:00
{
RedisArray hkeys;
2019-04-16 23:13:07 +00:00
for (const auto & key : keys)
{
RedisCommand command_for_secondary_keys("HKEYS");
2019-04-16 23:13:07 +00:00
command_for_secondary_keys.addRedisType(key);
2019-06-02 01:22:06 +00:00
auto secondary_keys = client->execute<RedisArray>(command_for_secondary_keys);
2019-04-16 23:13:07 +00:00
RedisArray primary_with_secondary;
2019-06-02 01:22:06 +00:00
primary_with_secondary.addRedisType(key);
2019-09-13 17:38:56 +00:00
for (const auto & secondary_key : secondary_keys)
{
2019-06-02 01:22:06 +00:00
primary_with_secondary.addRedisType(secondary_key);
/// Do not store more than max_block_size values for one request.
if (primary_with_secondary.size() == max_block_size + 1)
{
2020-03-18 02:02:24 +00:00
hkeys.add(primary_with_secondary);
primary_with_secondary.clear();
primary_with_secondary.addRedisType(key);
}
}
2020-07-30 23:06:01 +00:00
if (primary_with_secondary.size() > 1)
hkeys.add(std::move(primary_with_secondary));
2019-04-16 23:13:07 +00:00
}
2019-09-13 17:38:56 +00:00
keys = std::move(hkeys);
2019-04-16 23:13:07 +00:00
}
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size);
2019-01-15 22:08:56 +00:00
}
2019-01-27 13:14:02 +00:00
BlockInputStreamPtr RedisDictionarySource::loadIds(const std::vector<UInt64> & ids)
2019-01-15 22:08:56 +00:00
{
if (!client->isConnected())
client->connect(host, port);
if (storage_type == RedisStorageType::HASH_MAP)
2021-04-10 18:48:36 +00:00
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot use loadIds with 'hash_map' storage type");
2019-04-16 23:13:07 +00:00
2019-01-27 13:14:02 +00:00
if (!dict_struct.id)
2021-04-10 18:48:36 +00:00
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'id' is required for selective loading");
2019-01-15 22:08:56 +00:00
RedisArray keys;
2019-01-15 22:08:56 +00:00
2019-03-30 21:42:13 +00:00
for (UInt64 id : ids)
2019-05-25 22:53:31 +00:00
keys << DB::toString(id);
2019-01-15 22:08:56 +00:00
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size);
2019-01-15 22:08:56 +00:00
}
2019-01-27 13:14:02 +00:00
BlockInputStreamPtr RedisDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
if (!client->isConnected())
client->connect(host, port);
if (key_columns.size() != dict_struct.key->size())
2021-04-10 18:48:36 +00:00
throw Exception(ErrorCodes::LOGICAL_ERROR, "The size of key_columns does not equal to the size of dictionary key");
RedisArray keys;
for (auto row : requested_rows)
{
RedisArray key;
for (size_t i = 0; i < key_columns.size(); ++i)
{
const auto & type = dict_struct.key->at(i).type;
if (isInteger(type))
key << DB::toString(key_columns[i]->get64(row));
else if (isString(type))
key << get<String>((*key_columns[i])[row]);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected type of key in Redis dictionary");
}
keys.add(key);
}
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size);
}
String RedisDictionarySource::toString() const
2019-01-15 22:08:56 +00:00
{
return "Redis: " + host + ':' + DB::toString(port);
}
RedisStorageType RedisDictionarySource::parseStorageType(const String & storage_type_str)
2019-04-17 10:14:07 +00:00
{
if (storage_type_str == "hash_map")
return RedisStorageType::HASH_MAP;
else if (!storage_type_str.empty() && storage_type_str != "simple")
2021-04-10 18:48:36 +00:00
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Unknown storage type {} for Redis dictionary", storage_type_str);
return RedisStorageType::SIMPLE;
2019-04-16 23:13:07 +00:00
}
2019-01-15 22:08:56 +00:00
}