Optimize memory consumption

This commit is contained in:
comunodi 2019-01-28 01:22:18 +03:00
parent cc5678f6cc
commit 933906403a
3 changed files with 21 additions and 40 deletions

View File

@ -35,10 +35,11 @@ namespace DB
RedisBlockInputStream::RedisBlockInputStream( RedisBlockInputStream::RedisBlockInputStream(
const Poco::Redis::Array & reply_array_, const std::shared_ptr<Poco::Redis::Client> & client_,
const Poco::Redis::Array & keys_,
const DB::Block & sample_block, const DB::Block & sample_block,
const size_t max_block_size) const size_t max_block_size)
: reply_array(reply_array_), max_block_size{max_block_size} : client(client_), keys(keys_), max_block_size{max_block_size}
{ {
description.init(sample_block); description.init(sample_block);
} }
@ -102,6 +103,7 @@ namespace DB
ErrorCodes::TYPE_MISMATCH}; ErrorCodes::TYPE_MISMATCH};
} }
}; };
switch (type) switch (type)
{ {
case ValueType::UInt8: case ValueType::UInt8:
@ -204,9 +206,7 @@ namespace DB
}; };
size_t num_rows = 0; size_t num_rows = 0;
Poco::Redis::Command commandForValues("MGET");
const auto & keys = reply_array.get<Poco::Redis::Array>(0);
const auto & values = reply_array.get<Poco::Redis::Array>(1);
while (num_rows < max_block_size) while (num_rows < max_block_size)
{ {
@ -220,17 +220,21 @@ namespace DB
const auto & key = *(keys.begin() + cursor); const auto & key = *(keys.begin() + cursor);
insertValueByIdx(0, key); insertValueByIdx(0, key);
commandForValues.addRedisType(key);
}
const auto & value = *(values.begin() + cursor); if (num_rows == 0)
return {};
Poco::Redis::Array values = client->execute<Poco::Redis::Array>(commandForValues);
for (size_t i = 0; i < num_rows; ++i) {
const Poco::Redis::RedisType::Ptr & value = *(values.begin() + i);
if (value.isNull()) if (value.isNull())
insertDefaultValue(*columns[1], *description.sample_block.getByPosition(1).column); insertDefaultValue(*columns[1], *description.sample_block.getByPosition(1).column);
else else
insertValueByIdx(1, value); insertValueByIdx(1, value);
} }
if (num_rows == 0)
return {};
return description.sample_block.cloneWithColumns(std::move(columns)); return description.sample_block.cloneWithColumns(std::move(columns));
} }

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <Core/Block.h> #include <Core/Block.h>
#include <DataStreams/IProfilingBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include "ExternalResultDescription.h" #include "ExternalResultDescription.h"
@ -18,11 +18,12 @@ namespace Poco
namespace DB namespace DB
{ {
/// Converts Redis Cursor to a stream of Blocks /// Converts Redis Cursor to a stream of Blocks
class RedisBlockInputStream final : public IProfilingBlockInputStream class RedisBlockInputStream final : public IBlockInputStream
{ {
public: public:
RedisBlockInputStream( RedisBlockInputStream(
const Poco::Redis::Array & reply_array_, const std::shared_ptr<Poco::Redis::Client> & client_,
const Poco::Redis::Array & keys_,
const Block & sample_block, const Block & sample_block,
const size_t max_block_size); const size_t max_block_size);
@ -35,7 +36,8 @@ namespace DB
private: private:
Block readImpl() override; Block readImpl() override;
Poco::Redis::Array reply_array; std::shared_ptr<Poco::Redis::Client> client;
Poco::Redis::Array keys;
const size_t max_block_size; const size_t max_block_size;
ExternalResultDescription description; ExternalResultDescription description;
size_t cursor = 0; size_t cursor = 0;

View File

@ -53,17 +53,6 @@ namespace DB
# include "RedisBlockInputStream.h" # include "RedisBlockInputStream.h"
namespace
{
template <class K, class V>
Poco::Redis::Array makeResult(const K & keys, const V & values) {
Poco::Redis::Array result;
result << keys << values;
return result;
}
}
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
@ -122,14 +111,7 @@ namespace DB
Poco::Redis::Array keys = client->execute<Poco::Redis::Array>(commandForKeys); Poco::Redis::Array keys = client->execute<Poco::Redis::Array>(commandForKeys);
Poco::Redis::Array commandForValues; return std::make_shared<RedisBlockInputStream>(client, std::move(keys), sample_block, max_block_size);
commandForValues << "MGET";
for (const Poco::Redis::RedisType::Ptr & key : keys)
commandForValues.addRedisType(key);
Poco::Redis::Array values = client->execute<Poco::Redis::Array>(commandForValues);
return std::make_shared<RedisBlockInputStream>(makeResult(keys, values), sample_block, max_block_size);
} }
@ -139,18 +121,11 @@ namespace DB
throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD}; throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD};
Poco::Redis::Array keys; Poco::Redis::Array keys;
Poco::Redis::Array command;
command << "MGET";
for (const UInt64 id : ids) for (const UInt64 id : ids)
{
keys << static_cast<Int64>(id); keys << static_cast<Int64>(id);
command << static_cast<Int64>(id);
}
Poco::Redis::Array values = client->execute<Poco::Redis::Array>(command); return std::make_shared<RedisBlockInputStream>(client, std::move(keys), sample_block, max_block_size);
return std::make_shared<RedisBlockInputStream>(makeResult(keys, values), sample_block, max_block_size);
} }