2019-01-15 22:08:56 +00:00
|
|
|
#include "RedisDictionarySource.h"
|
|
|
|
#include "DictionarySourceFactory.h"
|
|
|
|
#include "DictionaryStructure.h"
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int SUPPORT_IS_DISABLED;
|
|
|
|
}
|
|
|
|
|
|
|
|
void registerDictionarySourceRedis(DictionarySourceFactory & factory)
|
|
|
|
{
|
|
|
|
auto createTableSource = [=](const DictionaryStructure & dict_struct,
|
|
|
|
const Poco::Util::AbstractConfiguration & config,
|
|
|
|
const std::string & config_prefix,
|
|
|
|
Block & sample_block,
|
|
|
|
const Context & /* context */) -> DictionarySourcePtr {
|
|
|
|
#if USE_POCO_REDIS
|
2019-03-30 21:42:13 +00:00
|
|
|
return std::make_unique<RedisDictionarySource>(dict_struct, config, config_prefix + ".redis", sample_block);
|
2019-01-15 22:08:56 +00:00
|
|
|
#else
|
2019-03-30 21:42:13 +00:00
|
|
|
(void)dict_struct;
|
2019-01-15 22:08:56 +00:00
|
|
|
(void)config;
|
|
|
|
(void)config_prefix;
|
|
|
|
(void)sample_block;
|
|
|
|
throw Exception{"Dictionary source of type `redis` is disabled because poco library was built without redis support.",
|
|
|
|
ErrorCodes::SUPPORT_IS_DISABLED};
|
|
|
|
#endif
|
|
|
|
};
|
|
|
|
factory.registerSource("redis", createTableSource);
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#if USE_POCO_REDIS
|
|
|
|
|
|
|
|
# include <Poco/Redis/Array.h>
|
|
|
|
# include <Poco/Redis/Client.h>
|
|
|
|
# include <Poco/Redis/Command.h>
|
|
|
|
# include <Poco/Redis/Type.h>
|
|
|
|
# include <Poco/Util/AbstractConfiguration.h>
|
|
|
|
|
|
|
|
# include <Common/FieldVisitors.h>
|
2019-05-30 21:16:12 +00:00
|
|
|
# include <IO/WriteHelpers.h>
|
|
|
|
|
2019-01-15 22:08:56 +00:00
|
|
|
# include "RedisBlockInputStream.h"
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int UNSUPPORTED_METHOD;
|
2019-04-14 17:50:05 +00:00
|
|
|
extern const int CANNOT_SELECT;
|
2019-04-16 23:13:07 +00:00
|
|
|
extern const int INVALID_CONFIG_PARAMETER;
|
2019-01-15 22:08:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-09-17 13:35:19 +00:00
|
|
|
static const size_t max_block_size = 8192;
|
2019-01-15 22:08:56 +00:00
|
|
|
|
|
|
|
RedisDictionarySource::RedisDictionarySource(
|
2019-09-12 14:48:28 +00:00
|
|
|
const DictionaryStructure & dict_struct_,
|
|
|
|
const std::string & host_,
|
|
|
|
UInt16 port_,
|
|
|
|
UInt8 db_index_,
|
2019-09-16 16:17:56 +00:00
|
|
|
RedisStorageType storage_type_,
|
2019-09-12 14:48:28 +00:00
|
|
|
const Block & sample_block_)
|
|
|
|
: dict_struct{dict_struct_}
|
|
|
|
, host{host_}
|
|
|
|
, port{port_}
|
|
|
|
, db_index{db_index_}
|
|
|
|
, storage_type{storage_type_}
|
|
|
|
, sample_block{sample_block_}
|
2019-01-15 22:08:56 +00:00
|
|
|
, client{std::make_shared<Poco::Redis::Client>(host, port)}
|
|
|
|
{
|
2019-04-16 23:13:07 +00:00
|
|
|
if (dict_struct.attributes.size() != 1)
|
|
|
|
throw Exception{"Invalid number of non key columns for Redis source: " +
|
|
|
|
DB::toString(dict_struct.attributes.size()) + ", expected 1",
|
|
|
|
ErrorCodes::INVALID_CONFIG_PARAMETER};
|
|
|
|
|
2019-09-12 14:48:28 +00:00
|
|
|
if (storage_type == RedisStorageType::HASH_MAP)
|
2019-09-16 16:17:56 +00:00
|
|
|
{
|
2019-04-16 23:13:07 +00:00
|
|
|
if (!dict_struct.key.has_value())
|
2019-05-25 00:28:09 +00:00
|
|
|
throw Exception{"Redis source with storage type \'hash_map\' must have key",
|
2019-04-16 23:13:07 +00:00
|
|
|
ErrorCodes::INVALID_CONFIG_PARAMETER};
|
2019-09-16 16:17:56 +00:00
|
|
|
|
2019-04-16 23:13:07 +00:00
|
|
|
if (dict_struct.key.value().size() > 2)
|
|
|
|
throw Exception{"Redis source with complex keys having more than 2 attributes are unsupported",
|
|
|
|
ErrorCodes::INVALID_CONFIG_PARAMETER};
|
|
|
|
// suppose key[0] is primary key, key[1] is secondary key
|
|
|
|
}
|
|
|
|
|
2019-04-14 17:05:50 +00:00
|
|
|
if (db_index != 0)
|
|
|
|
{
|
2019-09-16 16:17:56 +00:00
|
|
|
RedisCommand command("SELECT");
|
2019-04-16 23:13:07 +00:00
|
|
|
command << static_cast<Int64>(db_index);
|
2019-04-14 17:44:44 +00:00
|
|
|
std::string reply = client->execute<std::string>(command);
|
2019-04-14 17:05:50 +00:00
|
|
|
if (reply != "+OK\r\n")
|
|
|
|
throw Exception{"Selecting db with index " + DB::toString(db_index) + " failed with reason " + reply,
|
2019-04-14 17:50:05 +00:00
|
|
|
ErrorCodes::CANNOT_SELECT};
|
2019-04-14 17:05:50 +00:00
|
|
|
}
|
2019-01-15 22:08:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
RedisDictionarySource::RedisDictionarySource(
|
2019-09-12 14:48:28 +00:00
|
|
|
const DictionaryStructure & dict_struct_,
|
|
|
|
const Poco::Util::AbstractConfiguration & config_,
|
|
|
|
const std::string & config_prefix_,
|
|
|
|
Block & sample_block_)
|
2019-01-15 22:08:56 +00:00
|
|
|
: RedisDictionarySource(
|
2019-09-12 14:48:28 +00:00
|
|
|
dict_struct_,
|
|
|
|
config_.getString(config_prefix_ + ".host"),
|
|
|
|
config_.getUInt(config_prefix_ + ".port"),
|
|
|
|
config_.getUInt(config_prefix_ + ".db_index", 0),
|
|
|
|
parseStorageType(config_.getString(config_prefix_ + ".storage_type", "")),
|
|
|
|
sample_block_)
|
2019-01-15 22:08:56 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
RedisDictionarySource::RedisDictionarySource(const RedisDictionarySource & other)
|
|
|
|
: RedisDictionarySource{other.dict_struct,
|
|
|
|
other.host,
|
|
|
|
other.port,
|
2019-04-14 17:05:50 +00:00
|
|
|
other.db_index,
|
2019-04-16 23:13:07 +00:00
|
|
|
other.storage_type,
|
2019-01-15 22:08:56 +00:00
|
|
|
other.sample_block}
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
RedisDictionarySource::~RedisDictionarySource() = default;
|
|
|
|
|
2019-09-16 16:17:56 +00:00
|
|
|
static std::string storageTypeToKeyType(RedisStorageType type)
|
2019-09-13 17:38:56 +00:00
|
|
|
{
|
|
|
|
switch (type)
|
|
|
|
{
|
2019-09-16 16:17:56 +00:00
|
|
|
case RedisStorageType::SIMPLE:
|
2019-09-13 17:38:56 +00:00
|
|
|
return "string";
|
2019-09-16 16:17:56 +00:00
|
|
|
case RedisStorageType::HASH_MAP:
|
2019-09-13 17:38:56 +00:00
|
|
|
return "hash";
|
|
|
|
default:
|
|
|
|
return "none";
|
|
|
|
}
|
|
|
|
|
2019-09-16 16:17:56 +00:00
|
|
|
__builtin_unreachable();
|
2019-09-13 17:38:56 +00:00
|
|
|
}
|
2019-01-15 22:08:56 +00:00
|
|
|
|
|
|
|
BlockInputStreamPtr RedisDictionarySource::loadAll()
|
|
|
|
{
|
2019-09-16 16:17:56 +00:00
|
|
|
RedisCommand command_for_keys("KEYS");
|
2019-04-16 23:13:07 +00:00
|
|
|
command_for_keys << "*";
|
2019-02-13 00:05:43 +00:00
|
|
|
|
2019-09-13 17:38:56 +00:00
|
|
|
/// Get only keys for specified storage type.
|
2019-09-16 16:17:56 +00:00
|
|
|
auto all_keys = client->execute<RedisArray>(command_for_keys);
|
2019-09-17 13:35:19 +00:00
|
|
|
if (all_keys.isNull())
|
|
|
|
return std::make_shared<RedisBlockInputStream>(client, RedisArray{}, storage_type, sample_block, max_block_size);
|
|
|
|
|
2019-09-16 16:17:56 +00:00
|
|
|
RedisArray keys;
|
2019-09-13 17:38:56 +00:00
|
|
|
auto key_type = storageTypeToKeyType(storage_type);
|
|
|
|
for (auto & key : all_keys)
|
2019-09-16 16:17:56 +00:00
|
|
|
if (key_type == client->execute<std::string>(RedisCommand("TYPE").addRedisType(key)))
|
2019-09-13 17:38:56 +00:00
|
|
|
keys.addRedisType(std::move(key));
|
2019-01-27 13:14:02 +00:00
|
|
|
|
2019-09-17 13:35:19 +00:00
|
|
|
if (storage_type == RedisStorageType::HASH_MAP)
|
2019-04-16 23:13:07 +00:00
|
|
|
{
|
2019-09-16 16:17:56 +00:00
|
|
|
RedisArray hkeys;
|
2019-04-16 23:13:07 +00:00
|
|
|
for (const auto & key : keys)
|
|
|
|
{
|
2019-09-16 16:17:56 +00:00
|
|
|
RedisCommand command_for_secondary_keys("HKEYS");
|
2019-04-16 23:13:07 +00:00
|
|
|
command_for_secondary_keys.addRedisType(key);
|
2019-06-02 01:22:06 +00:00
|
|
|
|
2019-09-16 16:17:56 +00:00
|
|
|
auto secondary_keys = client->execute<RedisArray>(command_for_secondary_keys);
|
2019-04-16 23:13:07 +00:00
|
|
|
|
2019-09-16 16:17:56 +00:00
|
|
|
RedisArray primary_with_secondary;
|
2019-06-02 01:22:06 +00:00
|
|
|
primary_with_secondary.addRedisType(key);
|
2019-09-13 17:38:56 +00:00
|
|
|
for (const auto & secondary_key : secondary_keys)
|
2019-09-16 16:17:56 +00:00
|
|
|
{
|
2019-06-02 01:22:06 +00:00
|
|
|
primary_with_secondary.addRedisType(secondary_key);
|
2019-09-16 16:17:56 +00:00
|
|
|
/// Do not store more than max_block_size values for one request.
|
|
|
|
if (primary_with_secondary.size() == max_block_size + 1)
|
|
|
|
{
|
|
|
|
hkeys.add(std::move(primary_with_secondary));
|
|
|
|
primary_with_secondary.clear();
|
|
|
|
primary_with_secondary.addRedisType(key);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (primary_with_secondary.size() > 1)
|
|
|
|
hkeys.add(std::move(primary_with_secondary));
|
2019-04-16 23:13:07 +00:00
|
|
|
}
|
2019-09-16 16:17:56 +00:00
|
|
|
|
2019-09-13 17:38:56 +00:00
|
|
|
keys = std::move(hkeys);
|
2019-04-16 23:13:07 +00:00
|
|
|
}
|
|
|
|
|
2019-09-16 16:17:56 +00:00
|
|
|
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size);
|
2019-01-15 22:08:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-01-27 13:14:02 +00:00
|
|
|
BlockInputStreamPtr RedisDictionarySource::loadIds(const std::vector<UInt64> & ids)
|
2019-01-15 22:08:56 +00:00
|
|
|
{
|
2019-04-16 23:13:07 +00:00
|
|
|
if (storage_type != RedisStorageType::SIMPLE)
|
|
|
|
throw Exception{"Cannot use loadIds with \'simple\' storage type", ErrorCodes::UNSUPPORTED_METHOD};
|
|
|
|
|
2019-01-27 13:14:02 +00:00
|
|
|
if (!dict_struct.id)
|
|
|
|
throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD};
|
2019-01-15 22:08:56 +00:00
|
|
|
|
2019-09-16 16:17:56 +00:00
|
|
|
RedisArray keys;
|
2019-01-15 22:08:56 +00:00
|
|
|
|
2019-03-30 21:42:13 +00:00
|
|
|
for (UInt64 id : ids)
|
2019-05-25 22:53:31 +00:00
|
|
|
keys << DB::toString(id);
|
2019-01-15 22:08:56 +00:00
|
|
|
|
2019-09-16 16:17:56 +00:00
|
|
|
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size);
|
2019-01-15 22:08:56 +00:00
|
|
|
}
|
2019-01-27 13:14:02 +00:00
|
|
|
|
2019-01-15 22:08:56 +00:00
|
|
|
std::string RedisDictionarySource::toString() const
|
|
|
|
{
|
|
|
|
return "Redis: " + host + ':' + DB::toString(port);
|
|
|
|
}
|
|
|
|
|
2019-09-16 16:17:56 +00:00
|
|
|
RedisStorageType RedisDictionarySource::parseStorageType(const std::string & storage_type_str)
|
2019-04-17 10:14:07 +00:00
|
|
|
{
|
2019-09-16 16:17:56 +00:00
|
|
|
if (storage_type_str == "hash_map")
|
|
|
|
return RedisStorageType::HASH_MAP;
|
|
|
|
else if (!storage_type_str.empty() && storage_type_str != "simple")
|
|
|
|
throw Exception("Unknown storage type " + storage_type_str + " for Redis dictionary", ErrorCodes::INVALID_CONFIG_PARAMETER);
|
|
|
|
|
|
|
|
return RedisStorageType::SIMPLE;
|
2019-04-16 23:13:07 +00:00
|
|
|
}
|
2019-01-15 22:08:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#endif
|