Merge pull request #55732 from canhld94/rocks_db_serialization

Do not re-create serialization for each row during rocksdb sinking
This commit is contained in:
Han Fei 2023-10-19 15:54:50 +02:00 committed by GitHub
commit 448e178528
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 7 deletions

View File

@ -26,12 +26,13 @@ EmbeddedRocksDBSink::EmbeddedRocksDBSink(
break;
++primary_key_pos;
}
serializations = getHeader().getSerializations();
}
void EmbeddedRocksDBSink::consume(Chunk chunk)
{
auto rows = chunk.getNumRows();
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
const auto & columns = chunk.getColumns();
WriteBufferFromOwnString wb_key;
WriteBufferFromOwnString wb_value;
@ -43,12 +44,9 @@ void EmbeddedRocksDBSink::consume(Chunk chunk)
wb_key.restart();
wb_value.restart();
size_t idx = 0;
for (const auto & elem : block)
{
elem.type->getDefaultSerialization()->serializeBinary(*elem.column, i, idx == primary_key_pos ? wb_key : wb_value, {});
++idx;
}
for (size_t idx = 0; idx < columns.size(); ++idx)
serializations[idx]->serializeBinary(*columns[idx], i, idx == primary_key_pos ? wb_key : wb_value, {});
status = batch.Put(wb_key.str(), wb_value.str());
if (!status.ok())
throw Exception(ErrorCodes::ROCKSDB_ERROR, "RocksDB write error: {}", status.ToString());

View File

@ -24,6 +24,7 @@ private:
StorageEmbeddedRocksDB & storage;
StorageMetadataPtr metadata_snapshot;
size_t primary_key_pos = 0;
Serializations serializations;
};
}