diff --git a/dbms/src/Dictionaries/RedisBlockInputStream.cpp b/dbms/src/Dictionaries/RedisBlockInputStream.cpp index a7d0b27bd09..32d9abc71a8 100644 --- a/dbms/src/Dictionaries/RedisBlockInputStream.cpp +++ b/dbms/src/Dictionaries/RedisBlockInputStream.cpp @@ -35,10 +35,11 @@ namespace DB RedisBlockInputStream::RedisBlockInputStream( - const Poco::Redis::Array & reply_array_, + const std::shared_ptr & client_, + const Poco::Redis::Array & keys_, const DB::Block & sample_block, const size_t max_block_size) - : reply_array(reply_array_), max_block_size{max_block_size} + : client(client_), keys(keys_), max_block_size{max_block_size} { description.init(sample_block); } @@ -102,6 +103,7 @@ namespace DB ErrorCodes::TYPE_MISMATCH}; } }; + switch (type) { case ValueType::UInt8: @@ -204,9 +206,7 @@ namespace DB }; size_t num_rows = 0; - - const auto & keys = reply_array.get(0); - const auto & values = reply_array.get(1); + Poco::Redis::Command commandForValues("MGET"); while (num_rows < max_block_size) { @@ -220,17 +220,21 @@ namespace DB const auto & key = *(keys.begin() + cursor); insertValueByIdx(0, key); + commandForValues.addRedisType(key); + } - const auto & value = *(values.begin() + cursor); + if (num_rows == 0) + return {}; + + Poco::Redis::Array values = client->execute(commandForValues); + for (size_t i = 0; i < num_rows; ++i) { + const Poco::Redis::RedisType::Ptr & value = *(values.begin() + i); if (value.isNull()) insertDefaultValue(*columns[1], *description.sample_block.getByPosition(1).column); else insertValueByIdx(1, value); } - if (num_rows == 0) - return {}; - return description.sample_block.cloneWithColumns(std::move(columns)); } diff --git a/dbms/src/Dictionaries/RedisBlockInputStream.h b/dbms/src/Dictionaries/RedisBlockInputStream.h index 1884ce7a0f6..d1c3ad157e9 100644 --- a/dbms/src/Dictionaries/RedisBlockInputStream.h +++ b/dbms/src/Dictionaries/RedisBlockInputStream.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include "ExternalResultDescription.h" @@ -18,11 +18,12 @@ namespace Poco namespace DB { /// Converts Redis Cursor to a stream of Blocks - class RedisBlockInputStream final : public IProfilingBlockInputStream + class RedisBlockInputStream final : public IBlockInputStream { public: RedisBlockInputStream( - const Poco::Redis::Array & reply_array_, + const std::shared_ptr & client_, + const Poco::Redis::Array & keys_, const Block & sample_block, const size_t max_block_size); @@ -35,7 +36,8 @@ namespace DB private: Block readImpl() override; - Poco::Redis::Array reply_array; + std::shared_ptr client; + Poco::Redis::Array keys; const size_t max_block_size; ExternalResultDescription description; size_t cursor = 0; diff --git a/dbms/src/Dictionaries/RedisDictionarySource.cpp b/dbms/src/Dictionaries/RedisDictionarySource.cpp index b3ec940a8d1..b4c1ac97330 100644 --- a/dbms/src/Dictionaries/RedisDictionarySource.cpp +++ b/dbms/src/Dictionaries/RedisDictionarySource.cpp @@ -53,17 +53,6 @@ namespace DB # include "RedisBlockInputStream.h" -namespace -{ - template - Poco::Redis::Array makeResult(const K & keys, const V & values) { - Poco::Redis::Array result; - result << keys << values; - return result; - } -} - - namespace DB { namespace ErrorCodes @@ -122,14 +111,7 @@ namespace DB Poco::Redis::Array keys = client->execute(commandForKeys); - Poco::Redis::Array commandForValues; - commandForValues << "MGET"; - for (const Poco::Redis::RedisType::Ptr & key : keys) - commandForValues.addRedisType(key); - - Poco::Redis::Array values = client->execute(commandForValues); - - return std::make_shared(makeResult(keys, values), sample_block, max_block_size); + return std::make_shared(client, std::move(keys), sample_block, max_block_size); } @@ -139,18 +121,11 @@ namespace DB throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD}; Poco::Redis::Array keys; - Poco::Redis::Array command; - command << "MGET"; for (const UInt64 id : ids) - { keys << static_cast(id); - command << static_cast(id); - } - Poco::Redis::Array values = client->execute(command); - - return std::make_shared(makeResult(keys, values), sample_block, max_block_size); + return std::make_shared(client, std::move(keys), sample_block, max_block_size); }