mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
fix poco redis array NPE
This commit is contained in:
parent
9a495cbf99
commit
23d6c835d8
@ -109,6 +109,8 @@ RedisArrayPtr getRedisHashMapKeys(const RedisConnectionPtr & connection, RedisAr
|
||||
command_for_secondary_keys.addRedisType(key);
|
||||
|
||||
auto secondary_keys = connection->client->execute<RedisArray>(command_for_secondary_keys);
|
||||
if (secondary_keys.isNull())
|
||||
continue;
|
||||
|
||||
RedisArray primary_with_secondary;
|
||||
primary_with_secondary.addRedisType(key);
|
||||
@ -131,4 +133,24 @@ RedisArrayPtr getRedisHashMapKeys(const RedisConnectionPtr & connection, RedisAr
|
||||
return hkeys;
|
||||
}
|
||||
|
||||
RedisColumnType getRedisColumnType(RedisStorageType storage_type, const Names & all_columns, const String & column)
|
||||
{
|
||||
String redis_col_key = all_columns.at(0);
|
||||
if (column == redis_col_key)
|
||||
return RedisColumnType::KEY;
|
||||
|
||||
if (storage_type == RedisStorageType::HASH_MAP)
|
||||
{
|
||||
String redis_col_field = all_columns.at(1);
|
||||
if (column == redis_col_field)
|
||||
return RedisColumnType::FIELD;
|
||||
else
|
||||
return RedisColumnType::VALUE;
|
||||
}
|
||||
else
|
||||
{
|
||||
return RedisColumnType::VALUE;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#include <Core/Defines.h>
|
||||
#include <base/BorrowedObjectPool.h>
|
||||
#include <Core/Names.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -28,7 +29,7 @@ enum class RedisColumnType
|
||||
{
|
||||
/// Redis key
|
||||
KEY,
|
||||
/// Redis map field
|
||||
/// Redis hash field
|
||||
FIELD,
|
||||
/// Redis value
|
||||
VALUE
|
||||
@ -80,4 +81,8 @@ RedisConnectionPtr getRedisConnection(RedisPoolPtr pool, const RedisConfiguratio
|
||||
/// eg: keys -> [key1, key2] and get [[key1, field1, field2], [key2, field1, field2]]
|
||||
RedisArrayPtr getRedisHashMapKeys(const RedisConnectionPtr & connection, RedisArray & keys);
|
||||
|
||||
/// Get RedisColumnType of a column, If storage_type is
|
||||
/// SIMPLE: all_columns must have 2 iterm and the first one is Redis key the second one is value
|
||||
/// HASH_MAP: all_columns must have 2 iterm and the first one is Redis key the second is field, the third is value.
|
||||
RedisColumnType getRedisColumnType(RedisStorageType storage_type, const Names & all_columns, const String & column);
|
||||
}
|
||||
|
@ -5,15 +5,11 @@
|
||||
#include <Storages/KVStorageUtils.h>
|
||||
|
||||
#include <unordered_set>
|
||||
#include <Core/Settings.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Processors/Sinks/SinkToStorage.h>
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/NamedCollections/NamedCollections.h>
|
||||
#include <Common/parseAddress.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
@ -28,29 +24,6 @@ namespace ErrorCodes
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
RedisColumnType getRedisColumnType(RedisStorageType storage_type, const Names & all_columns, const String & column)
|
||||
{
|
||||
String redis_col_key = all_columns.at(0);
|
||||
if (column == redis_col_key)
|
||||
return RedisColumnType::KEY;
|
||||
|
||||
if (storage_type == RedisStorageType::HASH_MAP)
|
||||
{
|
||||
String redis_col_field = all_columns.at(1);
|
||||
if (column == redis_col_field)
|
||||
return RedisColumnType::FIELD;
|
||||
else
|
||||
return RedisColumnType::VALUE;
|
||||
}
|
||||
else
|
||||
{
|
||||
return RedisColumnType::VALUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
StorageRedis::StorageRedis(
|
||||
const StorageID & table_id_,
|
||||
const RedisConfiguration & configuration_,
|
||||
@ -79,9 +52,7 @@ Pipe StorageRedis::read(
|
||||
size_t max_block_size,
|
||||
size_t num_streams)
|
||||
{
|
||||
LOG_INFO(log, "num_streams {}", num_streams);// TODO delete
|
||||
auto connection = getRedisConnection(pool, configuration);
|
||||
|
||||
storage_snapshot->check(column_names);
|
||||
|
||||
Block sample_block;
|
||||
@ -93,7 +64,6 @@ Pipe StorageRedis::read(
|
||||
auto column_data = storage_snapshot->metadata->getColumns().getPhysical(column_name);
|
||||
sample_block.insert({column_data.type, column_data.name});
|
||||
redis_types.push_back(getRedisColumnType(configuration.storage_type, all_columns, column_name));
|
||||
LOG_INFO(log, "Request column: {}, Redis type: {}", column_data.name, *redis_types.crbegin()); // TODO delete
|
||||
}
|
||||
|
||||
FieldVectorPtr fields;
|
||||
@ -104,16 +74,15 @@ Pipe StorageRedis::read(
|
||||
|
||||
std::tie(fields, all_scan) = getFilterKeys(primary_key, primary_key_data_type, query_info, context);
|
||||
|
||||
/// TODO hash_map hgetall
|
||||
if (all_scan)
|
||||
{
|
||||
RedisCommand command_for_keys("KEYS");
|
||||
/// generate keys by table name prefix
|
||||
command_for_keys << table_id.getTableName() + ":" + toString(configuration.storage_type) + ":*";
|
||||
command_for_keys << table_id.getTableName() + ":" + storageTypeToKeyType(configuration.storage_type) + ":*";
|
||||
|
||||
auto all_keys = connection->client->execute<RedisArray>(command_for_keys);
|
||||
|
||||
if (all_keys.size() == 0)
|
||||
if (all_keys.isNull() || all_keys.size() == 0)
|
||||
return {};
|
||||
|
||||
Pipes pipes;
|
||||
|
@ -11,8 +11,8 @@ namespace DB
|
||||
* Read only.
|
||||
*
|
||||
* Note If storage_type is
|
||||
* simple: there should be 2 columns and the first one is key in Redis, the second one is value.
|
||||
* hash_map: there should be 3 columns and the first one is key in Redis and the second is the field of Redis Map.
|
||||
* SIMPLE: there should be 2 columns and the first one is key in Redis, the second one is value.
|
||||
* HASH_MAP: there should be 3 columns and the first one is key in Redis and the second is the field of Redis Map.
|
||||
*/
|
||||
class StorageRedis : public IStorage
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user