From 741f630141f714ad39b39d769dacfd1a1daa884b Mon Sep 17 00:00:00 2001 From: comunodi Date: Sun, 27 Jan 2019 16:14:02 +0300 Subject: [PATCH] Support loadIds --- .../Dictionaries/RedisBlockInputStream.cpp | 71 +++++------ dbms/src/Dictionaries/RedisBlockInputStream.h | 7 +- .../Dictionaries/RedisDictionarySource.cpp | 113 ++++++++---------- dbms/src/Dictionaries/RedisDictionarySource.h | 9 +- 4 files changed, 87 insertions(+), 113 deletions(-) diff --git a/dbms/src/Dictionaries/RedisBlockInputStream.cpp b/dbms/src/Dictionaries/RedisBlockInputStream.cpp index dfbb03a0034..a7d0b27bd09 100644 --- a/dbms/src/Dictionaries/RedisBlockInputStream.cpp +++ b/dbms/src/Dictionaries/RedisBlockInputStream.cpp @@ -35,10 +35,10 @@ namespace DB RedisBlockInputStream::RedisBlockInputStream( - std::shared_ptr client_, + const Poco::Redis::Array & reply_array_, const DB::Block & sample_block, const size_t max_block_size) - : client(client_), max_block_size{max_block_size} + : reply_array(reply_array_), max_block_size{max_block_size} { description.init(sample_block); } @@ -190,55 +190,42 @@ namespace DB for (const auto i : ext::range(0, size)) columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty(); + const auto insertValueByIdx = [this, &columns](size_t idx, const auto & value) + { + const auto & name = description.sample_block.getByPosition(idx).name; + if (description.types[idx].second) + { + ColumnNullable & column_nullable = static_cast(*columns[idx]); + insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value, name); + column_nullable.getNullMapData().emplace_back(0); + } + else + insertValue(*columns[idx], description.types[idx].first, value, name); + }; + size_t num_rows = 0; + + const auto & keys = reply_array.get(0); + const auto & values = reply_array.get(1); + while (num_rows < max_block_size) { - RedisArray commandForKeys; - commandForKeys << "SCAN" << cursor; - - auto replyForKeys = client->execute(commandForKeys); - if (cursor = replyForKeys.get(0); cursor == 0) - { + if (cursor == keys.size()) { all_read = true; break; } - auto response = replyForKeys.get(1); - if (response.isNull()) - continue; + ++num_rows; + ++cursor; - Poco::Redis::Array commandForValues; - commandForValues << "MGET"; + const auto & key = *(keys.begin() + cursor); + insertValueByIdx(0, key); - const auto insertValueByIdx = [this, &columns](size_t idx, const auto & value) - { - const auto & name = description.sample_block.getByPosition(idx).name; - if (description.types[idx].second) - { - ColumnNullable & column_nullable = static_cast(*columns[idx]); - insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value, name); - column_nullable.getNullMapData().emplace_back(0); - } - else - insertValue(*columns[idx], description.types[idx].first, value, name); - }; - - for (const auto & key : response) - { - ++num_rows; - String keyS = static_cast *>(key.get())->value(); - commandForValues << keyS; - insertValueByIdx(0, key); - } - - auto replyForValues = client->execute(commandForValues); - for (const auto & value : replyForValues) - { - if (value.isNull()) - insertDefaultValue(*columns[1], *description.sample_block.getByPosition(1).column); - else - insertValueByIdx(1, value); - } + const auto & value = *(values.begin() + cursor); + if (value.isNull()) + insertDefaultValue(*columns[1], *description.sample_block.getByPosition(1).column); + else + insertValueByIdx(1, value); } if (num_rows == 0) diff --git a/dbms/src/Dictionaries/RedisBlockInputStream.h b/dbms/src/Dictionaries/RedisBlockInputStream.h index 7e32b3ff8ff..1884ce7a0f6 100644 --- a/dbms/src/Dictionaries/RedisBlockInputStream.h +++ b/dbms/src/Dictionaries/RedisBlockInputStream.h @@ -9,6 +9,7 @@ namespace Poco { namespace Redis { + class Array; class Client; } } @@ -21,7 +22,7 @@ namespace DB { public: RedisBlockInputStream( - std::shared_ptr client_, + const Poco::Redis::Array & reply_array_, const Block & sample_block, const size_t max_block_size); @@ -34,10 +35,10 @@ namespace DB private: Block readImpl() override; - std::shared_ptr client; + Poco::Redis::Array reply_array; const size_t max_block_size; ExternalResultDescription description; - int64_t cursor = 0; + size_t cursor = 0; bool all_read = false; }; diff --git a/dbms/src/Dictionaries/RedisDictionarySource.cpp b/dbms/src/Dictionaries/RedisDictionarySource.cpp index 1fb5472b48b..90229c087dd 100644 --- a/dbms/src/Dictionaries/RedisDictionarySource.cpp +++ b/dbms/src/Dictionaries/RedisDictionarySource.cpp @@ -53,6 +53,17 @@ namespace DB # include "RedisBlockInputStream.h" +namespace +{ + template + Poco::Redis::Array makeResult(const K & keys, const V & values) { + Poco::Redis::Array result; + result << keys << values; + return result; + } +} + + namespace DB { namespace ErrorCodes @@ -106,83 +117,57 @@ namespace DB BlockInputStreamPtr RedisDictionarySource::loadAll() { - return std::make_shared(client, sample_block, max_block_size); + Int64 cursor = 0; + Poco::Redis::Array keys; + + do + { + Poco::Redis::Array commandForKeys; + commandForKeys << "SCAN" << cursor << "COUNT 1000"; + + Poco::Redis::Array replyForKeys = client->execute(commandForKeys); + cursor = replyForKeys.get(0); + + Poco::Redis::Array response = replyForKeys.get(1); + if (response.isNull()) + continue; + + for (const Poco::Redis::RedisType::Ptr & key : response) + keys.addRedisType(key); + } + while (cursor != 0); + + Poco::Redis::Array commandForValues; + commandForValues << "MGET"; + for (const Poco::Redis::RedisType::Ptr & key : keys) + commandForValues.addRedisType(key); + + Poco::Redis::Array values = client->execute(commandForValues); + + return std::make_shared(makeResult(keys, values), sample_block, max_block_size); } -/* + BlockInputStreamPtr RedisDictionarySource::loadIds(const std::vector & ids) { if (!dict_struct.id) throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD}; - Poco::Redis::Array ids_array(new Poco::Redis::Array); + Poco::Redis::Array keys; + Poco::Redis::Array command; + command << "MGET"; + for (const UInt64 id : ids) - ids_array->add(DB::toString(id), Int32(id)); - - cursor->query().selector().addNewDocument(dict_struct.id->name).add("$in", ids_array); - - return std::make_shared(connection, sample_block, max_block_size); - } - - - BlockInputStreamPtr RedisDictionarySource::loadKeys(const Columns & key_columns, const std::vector & requested_rows) - { - if (!dict_struct.key) - throw Exception{"'key' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD}; - - Poco::Redis::Array::Ptr keys_array(new Poco::Redis::Array); - - for (const auto row_idx : requested_rows) { - auto & key = keys_array->addNewDocument(DB::toString(row_idx)); - - for (const auto attr : ext::enumerate(*dict_struct.key)) - { - switch (attr.second.underlying_type) - { - case AttributeUnderlyingType::UInt8: - case AttributeUnderlyingType::UInt16: - case AttributeUnderlyingType::UInt32: - case AttributeUnderlyingType::UInt64: - case AttributeUnderlyingType::UInt128: - case AttributeUnderlyingType::Int8: - case AttributeUnderlyingType::Int16: - case AttributeUnderlyingType::Int32: - case AttributeUnderlyingType::Int64: - case AttributeUnderlyingType::Decimal32: - case AttributeUnderlyingType::Decimal64: - case AttributeUnderlyingType::Decimal128: - key.add(attr.second.name, Int32(key_columns[attr.first]->get64(row_idx))); - break; - - case AttributeUnderlyingType::Float32: - case AttributeUnderlyingType::Float64: - key.add(attr.second.name, applyVisitor(FieldVisitorConvertToNumber(), (*key_columns[attr.first])[row_idx])); - break; - - case AttributeUnderlyingType::String: - String _str(get((*key_columns[attr.first])[row_idx])); - /// Convert string to ObjectID - if (attr.second.is_object_id) - { - Poco::Redis::ObjectId::Ptr _id(new Poco::Redis::ObjectId(_str)); - key.add(attr.second.name, _id); - } - else - { - key.add(attr.second.name, _str); - } - break; - } - } + keys << static_cast(id); + command << static_cast(id); } - /// If more than one key we should use $or - cursor->query().selector().add("$or", keys_array); + Poco::Redis::Array values = client->execute(command); - return std::make_shared(connection, sample_block, max_block_size); + return std::make_shared(makeResult(keys, values), sample_block, max_block_size); } -*/ + std::string RedisDictionarySource::toString() const { diff --git a/dbms/src/Dictionaries/RedisDictionarySource.h b/dbms/src/Dictionaries/RedisDictionarySource.h index 61417fac393..e3566731f06 100644 --- a/dbms/src/Dictionaries/RedisDictionarySource.h +++ b/dbms/src/Dictionaries/RedisDictionarySource.h @@ -51,14 +51,15 @@ namespace DB bool supportsSelectiveLoad() const override { return true; } - BlockInputStreamPtr loadIds(const std::vector & /* ids */) override {throw 1;}; + BlockInputStreamPtr loadIds(const std::vector & ids) override; - BlockInputStreamPtr loadKeys(const Columns & /* key_columns */, const std::vector & /* requested_rows */) override {throw 1;}; + BlockInputStreamPtr loadKeys(const Columns & /* key_columns */, const std::vector & /* requested_rows */) override + { + throw Exception{"Method loadKeys is unsupported for RedisDictionarySource", ErrorCodes::NOT_IMPLEMENTED}; + }; - /// @todo: for Redis, modification date can somehow be determined from the `_id` object field bool isModified() const override { return true; } - ///Not yet supported bool hasUpdateField() const override { return false; } DictionarySourcePtr clone() const override { return std::make_unique(*this); }