Support loadIds

This commit is contained in:
comunodi 2019-01-27 16:14:02 +03:00
parent 8472b26f07
commit 741f630141
4 changed files with 87 additions and 113 deletions

View File

@ -35,10 +35,10 @@ namespace DB
RedisBlockInputStream::RedisBlockInputStream(
std::shared_ptr<Poco::Redis::Client> client_,
const Poco::Redis::Array & reply_array_,
const DB::Block & sample_block,
const size_t max_block_size)
: client(client_), max_block_size{max_block_size}
: reply_array(reply_array_), max_block_size{max_block_size}
{
description.init(sample_block);
}
@ -190,55 +190,42 @@ namespace DB
for (const auto i : ext::range(0, size))
columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();
const auto insertValueByIdx = [this, &columns](size_t idx, const auto & value)
{
const auto & name = description.sample_block.getByPosition(idx).name;
if (description.types[idx].second)
{
ColumnNullable & column_nullable = static_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value, name);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertValue(*columns[idx], description.types[idx].first, value, name);
};
size_t num_rows = 0;
const auto & keys = reply_array.get<Poco::Redis::Array>(0);
const auto & values = reply_array.get<Poco::Redis::Array>(1);
while (num_rows < max_block_size)
{
RedisArray commandForKeys;
commandForKeys << "SCAN" << cursor;
auto replyForKeys = client->execute<RedisArray>(commandForKeys);
if (cursor = replyForKeys.get<int64_t>(0); cursor == 0)
{
if (cursor == keys.size()) {
all_read = true;
break;
}
auto response = replyForKeys.get<RedisArray>(1);
if (response.isNull())
continue;
++num_rows;
++cursor;
Poco::Redis::Array commandForValues;
commandForValues << "MGET";
const auto & key = *(keys.begin() + cursor);
insertValueByIdx(0, key);
const auto insertValueByIdx = [this, &columns](size_t idx, const auto & value)
{
const auto & name = description.sample_block.getByPosition(idx).name;
if (description.types[idx].second)
{
ColumnNullable & column_nullable = static_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value, name);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertValue(*columns[idx], description.types[idx].first, value, name);
};
for (const auto & key : response)
{
++num_rows;
String keyS = static_cast<const Poco::Redis::Type<String> *>(key.get())->value();
commandForValues << keyS;
insertValueByIdx(0, key);
}
auto replyForValues = client->execute<RedisArray>(commandForValues);
for (const auto & value : replyForValues)
{
if (value.isNull())
insertDefaultValue(*columns[1], *description.sample_block.getByPosition(1).column);
else
insertValueByIdx(1, value);
}
const auto & value = *(values.begin() + cursor);
if (value.isNull())
insertDefaultValue(*columns[1], *description.sample_block.getByPosition(1).column);
else
insertValueByIdx(1, value);
}
if (num_rows == 0)

View File

@ -9,6 +9,7 @@ namespace Poco
{
namespace Redis
{
class Array;
class Client;
}
}
@ -21,7 +22,7 @@ namespace DB
{
public:
RedisBlockInputStream(
std::shared_ptr<Poco::Redis::Client> client_,
const Poco::Redis::Array & reply_array_,
const Block & sample_block,
const size_t max_block_size);
@ -34,10 +35,10 @@ namespace DB
private:
Block readImpl() override;
std::shared_ptr<Poco::Redis::Client> client;
Poco::Redis::Array reply_array;
const size_t max_block_size;
ExternalResultDescription description;
int64_t cursor = 0;
size_t cursor = 0;
bool all_read = false;
};

View File

@ -53,6 +53,17 @@ namespace DB
# include "RedisBlockInputStream.h"
namespace
{
template <class K, class V>
Poco::Redis::Array makeResult(const K & keys, const V & values) {
Poco::Redis::Array result;
result << keys << values;
return result;
}
}
namespace DB
{
namespace ErrorCodes
@ -106,83 +117,57 @@ namespace DB
BlockInputStreamPtr RedisDictionarySource::loadAll()
{
return std::make_shared<RedisBlockInputStream>(client, sample_block, max_block_size);
Int64 cursor = 0;
Poco::Redis::Array keys;
do
{
Poco::Redis::Array commandForKeys;
commandForKeys << "SCAN" << cursor << "COUNT 1000";
Poco::Redis::Array replyForKeys = client->execute<Poco::Redis::Array>(commandForKeys);
cursor = replyForKeys.get<Int64>(0);
Poco::Redis::Array response = replyForKeys.get<Poco::Redis::Array>(1);
if (response.isNull())
continue;
for (const Poco::Redis::RedisType::Ptr & key : response)
keys.addRedisType(key);
}
while (cursor != 0);
Poco::Redis::Array commandForValues;
commandForValues << "MGET";
for (const Poco::Redis::RedisType::Ptr & key : keys)
commandForValues.addRedisType(key);
Poco::Redis::Array values = client->execute<Poco::Redis::Array>(commandForValues);
return std::make_shared<RedisBlockInputStream>(makeResult(keys, values), sample_block, max_block_size);
}
/*
BlockInputStreamPtr RedisDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
if (!dict_struct.id)
throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD};
Poco::Redis::Array ids_array(new Poco::Redis::Array);
Poco::Redis::Array keys;
Poco::Redis::Array command;
command << "MGET";
for (const UInt64 id : ids)
ids_array->add(DB::toString(id), Int32(id));
cursor->query().selector().addNewDocument(dict_struct.id->name).add("$in", ids_array);
return std::make_shared<RedisBlockInputStream>(connection, sample_block, max_block_size);
}
BlockInputStreamPtr RedisDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
if (!dict_struct.key)
throw Exception{"'key' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD};
Poco::Redis::Array::Ptr keys_array(new Poco::Redis::Array);
for (const auto row_idx : requested_rows)
{
auto & key = keys_array->addNewDocument(DB::toString(row_idx));
for (const auto attr : ext::enumerate(*dict_struct.key))
{
switch (attr.second.underlying_type)
{
case AttributeUnderlyingType::UInt8:
case AttributeUnderlyingType::UInt16:
case AttributeUnderlyingType::UInt32:
case AttributeUnderlyingType::UInt64:
case AttributeUnderlyingType::UInt128:
case AttributeUnderlyingType::Int8:
case AttributeUnderlyingType::Int16:
case AttributeUnderlyingType::Int32:
case AttributeUnderlyingType::Int64:
case AttributeUnderlyingType::Decimal32:
case AttributeUnderlyingType::Decimal64:
case AttributeUnderlyingType::Decimal128:
key.add(attr.second.name, Int32(key_columns[attr.first]->get64(row_idx)));
break;
case AttributeUnderlyingType::Float32:
case AttributeUnderlyingType::Float64:
key.add(attr.second.name, applyVisitor(FieldVisitorConvertToNumber<Float64>(), (*key_columns[attr.first])[row_idx]));
break;
case AttributeUnderlyingType::String:
String _str(get<String>((*key_columns[attr.first])[row_idx]));
/// Convert string to ObjectID
if (attr.second.is_object_id)
{
Poco::Redis::ObjectId::Ptr _id(new Poco::Redis::ObjectId(_str));
key.add(attr.second.name, _id);
}
else
{
key.add(attr.second.name, _str);
}
break;
}
}
keys << static_cast<Int64>(id);
command << static_cast<Int64>(id);
}
/// If more than one key we should use $or
cursor->query().selector().add("$or", keys_array);
Poco::Redis::Array values = client->execute<Poco::Redis::Array>(command);
return std::make_shared<RedisBlockInputStream>(connection, sample_block, max_block_size);
return std::make_shared<RedisBlockInputStream>(makeResult(keys, values), sample_block, max_block_size);
}
*/
std::string RedisDictionarySource::toString() const
{

View File

@ -51,14 +51,15 @@ namespace DB
bool supportsSelectiveLoad() const override { return true; }
BlockInputStreamPtr loadIds(const std::vector<UInt64> & /* ids */) override {throw 1;};
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & /* key_columns */, const std::vector<size_t> & /* requested_rows */) override {throw 1;};
BlockInputStreamPtr loadKeys(const Columns & /* key_columns */, const std::vector<size_t> & /* requested_rows */) override
{
throw Exception{"Method loadKeys is unsupported for RedisDictionarySource", ErrorCodes::NOT_IMPLEMENTED};
};
/// @todo: for Redis, modification date can somehow be determined from the `_id` object field
bool isModified() const override { return true; }
///Not yet supported
bool hasUpdateField() const override { return false; }
DictionarySourcePtr clone() const override { return std::make_unique<RedisDictionarySource>(*this); }