add support of cache layout for redis dictionaries with complex key

This commit is contained in:
Anton Popov 2020-10-14 22:23:58 +03:00
parent 57575a5a12
commit 3050b2e67b
3 changed files with 42 additions and 9 deletions

View File

@ -42,6 +42,7 @@ namespace DB
extern const int UNSUPPORTED_METHOD;
extern const int INVALID_CONFIG_PARAMETER;
extern const int INTERNAL_REDIS_ERROR;
extern const int LOGICAL_ERROR;
}
@ -79,7 +80,13 @@ namespace DB
throw Exception{"Redis source with storage type \'hash_map\' requires 2 keys",
ErrorCodes::INVALID_CONFIG_PARAMETER};
// suppose key[0] is primary key, key[1] is secondary key
for (const auto & key : *dict_struct.key)
if (!isInteger(key.type) && !isString(key.type))
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER,
"Redis source supports only integer or string key, but key '{}' of type {} given", key.name, key.type->getName());
}
if (!password.empty())
{
RedisCommand command("AUTH");
@ -207,8 +214,8 @@ namespace DB
if (!client->isConnected())
client->connect(host, port);
if (storage_type != RedisStorageType::SIMPLE)
throw Exception{"Cannot use loadIds with \'simple\' storage type", ErrorCodes::UNSUPPORTED_METHOD};
if (storage_type == RedisStorageType::HASH_MAP)
throw Exception{"Cannot use loadIds with 'hash_map' storage type", ErrorCodes::UNSUPPORTED_METHOD};
if (!dict_struct.id)
throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD};
@ -221,6 +228,36 @@ namespace DB
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size);
}
BlockInputStreamPtr RedisDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
if (!client->isConnected())
client->connect(host, port);
if (key_columns.size() != dict_struct.key->size())
throw Exception{"The size of key_columns does not equal to the size of dictionary key", ErrorCodes::LOGICAL_ERROR};
RedisArray keys;
for (auto row : requested_rows)
{
RedisArray key;
for (size_t i = 0; i < key_columns.size(); ++i)
{
const auto & type = dict_struct.key->at(i).type;
if (isInteger(type))
key << DB::toString(key_columns[i]->get64(row));
else if (isString(type))
key << get<String>((*key_columns[i])[row]);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected type of key in Redis dictionary");
}
keys.add(key);
}
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size);
}
String RedisDictionarySource::toString() const
{
return "Redis: " + host + ':' + DB::toString(port);

View File

@ -70,11 +70,7 @@ namespace ErrorCodes
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & /* key_columns */, const std::vector<size_t> & /* requested_rows */) override
{
// Redis does not support native indexing
throw Exception{"Method loadKeys is unsupported for RedisDictionarySource", ErrorCodes::NOT_IMPLEMENTED};
}
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override { return true; }

View File

@ -68,9 +68,9 @@ LAYOUTS = [
Layout("hashed"),
Layout("cache"),
Layout("complex_key_hashed"),
# Layout("complex_key_cache"), # Currently not supported
Layout("complex_key_cache"),
Layout("direct"),
# Layout("complex_key_direct") # Currently not supported
Layout("complex_key_direct")
]
DICTIONARIES = []