Improve performance two times

This commit is contained in:
Alexey Milovidov 2020-11-08 20:09:41 +03:00 committed by sundy-li
parent 0001433b82
commit 73e2f1b7ed
3 changed files with 30 additions and 18 deletions

View File

@ -12,7 +12,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ROCKSDB_ERROR;
}
EmbeddedRocksDBBlockInputStream::EmbeddedRocksDBBlockInputStream(
@ -39,15 +39,18 @@ Block EmbeddedRocksDBBlockInputStream::readImpl()
}
MutableColumns columns = sample_block.cloneEmptyColumns();
size_t rows = 0;
for (; iterator->Valid(); iterator->Next())
{
ReadBufferFromString key_buffer(iterator->key());
ReadBufferFromString value_buffer(iterator->value());
for (const auto [idx, column_type] : ext::enumerate(sample_block.getColumnsWithTypeAndName()))
size_t idx = 0;
for (const auto & elem : sample_block)
{
column_type.type->deserializeBinary(*columns[idx], idx == primary_key_pos? key_buffer: value_buffer);
elem.type->deserializeBinary(*columns[idx], idx == primary_key_pos ? key_buffer : value_buffer);
++idx;
}
++rows;
if (rows >= max_block_size)
@ -58,7 +61,7 @@ Block EmbeddedRocksDBBlockInputStream::readImpl()
if (!iterator->status().ok())
{
throw Exception("Engine " + getName() + " got error while seeking key value datas: " + iterator->status().ToString(),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::ROCKSDB_ERROR);
}
return sample_block.cloneWithColumns(std::move(columns));
}

View File

@ -11,6 +11,21 @@ namespace ErrorCodes
extern const int ROCKSDB_ERROR;
}
EmbeddedRocksDBBlockOutputStream::EmbeddedRocksDBBlockOutputStream(
StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{
Block sample_block = metadata_snapshot->getSampleBlock();
for (const auto & elem : sample_block)
{
if (elem.name == storage.primary_key)
break;
++primary_key_pos;
}
}
Block EmbeddedRocksDBBlockOutputStream::getHeader() const
{
return metadata_snapshot->getSampleBlock();
@ -25,24 +40,20 @@ void EmbeddedRocksDBBlockOutputStream::write(const Block & block)
WriteBufferFromOwnString wb_value;
rocksdb::WriteBatch batch;
auto columns = metadata_snapshot->getColumns();
for (size_t i = 0; i < rows; i++)
{
wb_key.restart();
wb_value.restart();
for (const auto & col : columns)
size_t idx = 0;
for (const auto & elem : block)
{
const auto & type = block.getByName(col.name).type;
const auto & column = block.getByName(col.name).column;
if (col.name == storage.primary_key)
type->serializeBinary(*column, i, wb_key);
else
type->serializeBinary(*column, i, wb_value);
elem.type->serializeBinary(*elem.column, i, idx == primary_key_pos ? wb_key : wb_value);
++idx;
}
batch.Put(wb_key.str(), wb_value.str());
}
auto status = storage.rocksdb_ptr->Write(rocksdb::WriteOptions(), &batch);
if (!status.ok())
throw Exception("RocksDB write error: " + status.ToString(), ErrorCodes::ROCKSDB_ERROR);

View File

@ -12,12 +12,9 @@ class StorageEmbeddedRocksDB;
class EmbeddedRocksDBBlockOutputStream : public IBlockOutputStream
{
public:
explicit EmbeddedRocksDBBlockOutputStream(
EmbeddedRocksDBBlockOutputStream(
StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{}
const StorageMetadataPtr & metadata_snapshot_);
Block getHeader() const override;
void write(const Block & block) override;
@ -25,6 +22,7 @@ public:
private:
StorageEmbeddedRocksDB & storage;
StorageMetadataPtr metadata_snapshot;
size_t primary_key_pos = 0;
};
}