Merge pull request #29428 from vdimir/rocksdb-istream-to-source

This commit is contained in:
Vladimir C 2021-09-28 12:06:41 +03:00 committed by GitHub
commit 6c2db9c29b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 77 additions and 134 deletions

View File

@ -1,66 +0,0 @@
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Context.h>
#include <IO/ReadBufferFromString.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <Storages/RocksDB/EmbeddedRocksDBBlockInputStream.h>
#include <rocksdb/db.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ROCKSDB_ERROR;
}
EmbeddedRocksDBBlockInputStream::EmbeddedRocksDBBlockInputStream(
StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_,
size_t max_block_size_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_block_size(max_block_size_)
{
sample_block = metadata_snapshot->getSampleBlock();
primary_key_pos = sample_block.getPositionByName(storage.primary_key);
}
Block EmbeddedRocksDBBlockInputStream::readImpl()
{
if (finished)
return {};
if (!iterator)
{
iterator = std::unique_ptr<rocksdb::Iterator>(storage.rocksdb_ptr->NewIterator(rocksdb::ReadOptions()));
iterator->SeekToFirst();
}
MutableColumns columns = sample_block.cloneEmptyColumns();
for (size_t rows = 0; iterator->Valid() && rows < max_block_size; ++rows, iterator->Next())
{
ReadBufferFromString key_buffer(iterator->key());
ReadBufferFromString value_buffer(iterator->value());
size_t idx = 0;
for (const auto & elem : sample_block)
{
auto serialization = elem.type->getDefaultSerialization();
serialization->deserializeBinary(*columns[idx], idx == primary_key_pos ? key_buffer : value_buffer);
++idx;
}
}
finished = !iterator->Valid();
if (!iterator->status().ok())
{
throw Exception("Engine " + getName() + " got error while seeking key value data: " + iterator->status().ToString(),
ErrorCodes::ROCKSDB_ERROR);
}
return sample_block.cloneWithColumns(std::move(columns));
}
}

View File

@ -1,39 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace rocksdb
{
class Iterator;
}
namespace DB
{
class StorageEmbeddedRocksDB;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
class EmbeddedRocksDBBlockInputStream : public IBlockInputStream
{
public:
EmbeddedRocksDBBlockInputStream(
StorageEmbeddedRocksDB & storage_, const StorageMetadataPtr & metadata_snapshot_, size_t max_block_size_);
String getName() const override { return "EmbeddedRocksDB"; }
Block getHeader() const override { return sample_block; }
Block readImpl() override;
private:
StorageEmbeddedRocksDB & storage;
StorageMetadataPtr metadata_snapshot;
const size_t max_block_size;
Block sample_block;
std::unique_ptr<rocksdb::Iterator> iterator;
size_t primary_key_pos;
bool finished = false;
};
}

View File

@ -1,6 +1,5 @@
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <Storages/RocksDB/EmbeddedRocksDBSink.h>
#include <Storages/RocksDB/EmbeddedRocksDBBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
@ -182,15 +181,15 @@ class EmbeddedRocksDBSource : public SourceWithProgress
public:
EmbeddedRocksDBSource(
const StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Block & header,
FieldVectorPtr keys_,
FieldVector::const_iterator begin_,
FieldVector::const_iterator end_,
const size_t max_block_size_)
: SourceWithProgress(metadata_snapshot_->getSampleBlock())
: SourceWithProgress(header)
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, keys(std::move(keys_))
, primary_key_pos(header.getPositionByName(storage.getPrimaryKey()))
, keys(keys_)
, begin(begin_)
, end(end_)
, it(begin)
@ -198,12 +197,29 @@ public:
{
}
String getName() const override
EmbeddedRocksDBSource(
const StorageEmbeddedRocksDB & storage_,
const Block & header,
std::unique_ptr<rocksdb::Iterator> iterator_,
const size_t max_block_size_)
: SourceWithProgress(header)
, storage(storage_)
, primary_key_pos(header.getPositionByName(storage.getPrimaryKey()))
, iterator(std::move(iterator_))
, max_block_size(max_block_size_)
{
return storage.getName();
}
String getName() const override { return storage.getName(); }
Chunk generate() override
{
if (keys)
return generateWithKeys();
return generateFullScan();
}
Chunk generateWithKeys()
{
if (it >= end)
return {};
@ -213,16 +229,15 @@ public:
std::vector<std::string> serialized_keys(num_keys);
std::vector<rocksdb::Slice> slices_keys(num_keys);
const auto & sample_block = metadata_snapshot->getSampleBlock();
const auto & key_column = sample_block.getByName(storage.getPrimaryKey());
auto columns = sample_block.cloneEmptyColumns();
size_t primary_key_pos = sample_block.getPositionByName(storage.getPrimaryKey());
const auto & sample_block = getPort().getHeader();
const auto & key_column_type = sample_block.getByName(storage.getPrimaryKey()).type;
size_t rows_processed = 0;
while (it < end && rows_processed < max_block_size)
{
WriteBufferFromString wb(serialized_keys[rows_processed]);
key_column.type->getDefaultSerialization()->serializeBinary(*it, wb);
key_column_type->getDefaultSerialization()->serializeBinary(*it, wb);
wb.finalize();
slices_keys[rows_processed] = std::move(serialized_keys[rows_processed]);
@ -230,6 +245,7 @@ public:
++rows_processed;
}
MutableColumns columns = sample_block.cloneEmptyColumns();
std::vector<String> values;
auto statuses = storage.multiGet(slices_keys, values);
for (size_t i = 0; i < statuses.size(); ++i)
@ -238,13 +254,7 @@ public:
{
ReadBufferFromString key_buffer(slices_keys[i]);
ReadBufferFromString value_buffer(values[i]);
size_t idx = 0;
for (const auto & elem : sample_block)
{
elem.type->getDefaultSerialization()->deserializeBinary(*columns[idx], idx == primary_key_pos ? key_buffer : value_buffer);
++idx;
}
fillColumns(key_buffer, value_buffer, columns);
}
}
@ -252,14 +262,54 @@ public:
return Chunk(std::move(columns), num_rows);
}
Chunk generateFullScan()
{
if (!iterator->Valid())
return {};
const auto & sample_block = getPort().getHeader();
MutableColumns columns = sample_block.cloneEmptyColumns();
for (size_t rows = 0; iterator->Valid() && rows < max_block_size; ++rows, iterator->Next())
{
ReadBufferFromString key_buffer(iterator->key());
ReadBufferFromString value_buffer(iterator->value());
fillColumns(key_buffer, value_buffer, columns);
}
if (!iterator->status().ok())
{
throw Exception("Engine " + getName() + " got error while seeking key value data: " + iterator->status().ToString(),
ErrorCodes::ROCKSDB_ERROR);
}
Block block = sample_block.cloneWithColumns(std::move(columns));
return Chunk(block.getColumns(), block.rows());
}
void fillColumns(ReadBufferFromString & key_buffer, ReadBufferFromString & value_buffer, MutableColumns & columns)
{
size_t idx = 0;
for (const auto & elem : getPort().getHeader())
{
elem.type->getDefaultSerialization()->deserializeBinary(*columns[idx], idx == primary_key_pos ? key_buffer : value_buffer);
++idx;
}
}
private:
const StorageEmbeddedRocksDB & storage;
const StorageMetadataPtr metadata_snapshot;
FieldVectorPtr keys;
size_t primary_key_pos;
/// For key scan
FieldVectorPtr keys = nullptr;
FieldVector::const_iterator begin;
FieldVector::const_iterator end;
FieldVector::const_iterator it;
/// For full scan
std::unique_ptr<rocksdb::Iterator> iterator = nullptr;
const size_t max_block_size;
};
@ -379,7 +429,6 @@ void StorageEmbeddedRocksDB::initDB()
rocksdb_ptr = std::unique_ptr<rocksdb::DB>(db);
}
Pipe StorageEmbeddedRocksDB::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -394,13 +443,14 @@ Pipe StorageEmbeddedRocksDB::read(
FieldVectorPtr keys;
bool all_scan = false;
auto primary_key_data_type = metadata_snapshot->getSampleBlock().getByName(primary_key).type;
Block sample_block = metadata_snapshot->getSampleBlock();
auto primary_key_data_type = sample_block.getByName(primary_key).type;
std::tie(keys, all_scan) = getFilterKeys(primary_key, primary_key_data_type, query_info);
if (all_scan)
{
auto reader = std::make_shared<EmbeddedRocksDBBlockInputStream>(
*this, metadata_snapshot, max_block_size);
return Pipe(std::make_shared<SourceFromInputStream>(reader));
auto iterator = std::unique_ptr<rocksdb::Iterator>(rocksdb_ptr->NewIterator(rocksdb::ReadOptions()));
iterator->SeekToFirst();
return Pipe(std::make_shared<EmbeddedRocksDBSource>(*this, sample_block, std::move(iterator), max_block_size));
}
else
{
@ -424,7 +474,7 @@ Pipe StorageEmbeddedRocksDB::read(
size_t end = num_keys * (thread_idx + 1) / num_threads;
pipes.emplace_back(std::make_shared<EmbeddedRocksDBSource>(
*this, metadata_snapshot, keys, keys->begin() + begin, keys->begin() + end, max_block_size));
*this, sample_block, keys, keys->begin() + begin, keys->begin() + end, max_block_size));
}
return Pipe::unitePipes(std::move(pipes));
}
@ -436,7 +486,6 @@ SinkToStoragePtr StorageEmbeddedRocksDB::write(
return std::make_shared<EmbeddedRocksDBSink>(*this, metadata_snapshot);
}
static StoragePtr create(const StorageFactory::Arguments & args)
{
// TODO custom RocksDBSettings, table function

View File

@ -23,7 +23,6 @@ class StorageEmbeddedRocksDB final : public shared_ptr_helper<StorageEmbeddedRoc
{
friend struct shared_ptr_helper<StorageEmbeddedRocksDB>;
friend class EmbeddedRocksDBSink;
friend class EmbeddedRocksDBBlockInputStream;
public:
std::string getName() const override { return "EmbeddedRocksDB"; }