diff --git a/src/Storages/RocksDB/EmbeddedRocksDBSink.cpp b/src/Storages/RocksDB/EmbeddedRocksDBSink.cpp index 47d036c943d..c451cfd1bf5 100644 --- a/src/Storages/RocksDB/EmbeddedRocksDBSink.cpp +++ b/src/Storages/RocksDB/EmbeddedRocksDBSink.cpp @@ -26,12 +26,13 @@ EmbeddedRocksDBSink::EmbeddedRocksDBSink( break; ++primary_key_pos; } + serializations = getHeader().getSerializations(); } void EmbeddedRocksDBSink::consume(Chunk chunk) { auto rows = chunk.getNumRows(); - auto block = getHeader().cloneWithColumns(chunk.detachColumns()); + const auto & columns = chunk.getColumns(); WriteBufferFromOwnString wb_key; WriteBufferFromOwnString wb_value; @@ -43,12 +44,9 @@ void EmbeddedRocksDBSink::consume(Chunk chunk) wb_key.restart(); wb_value.restart(); - size_t idx = 0; - for (const auto & elem : block) - { - elem.type->getDefaultSerialization()->serializeBinary(*elem.column, i, idx == primary_key_pos ? wb_key : wb_value, {}); - ++idx; - } + for (size_t idx = 0; idx < columns.size(); ++idx) + serializations[idx]->serializeBinary(*columns[idx], i, idx == primary_key_pos ? wb_key : wb_value, {}); + status = batch.Put(wb_key.str(), wb_value.str()); if (!status.ok()) throw Exception(ErrorCodes::ROCKSDB_ERROR, "RocksDB write error: {}", status.ToString()); diff --git a/src/Storages/RocksDB/EmbeddedRocksDBSink.h b/src/Storages/RocksDB/EmbeddedRocksDBSink.h index e9e98c7df50..011322df829 100644 --- a/src/Storages/RocksDB/EmbeddedRocksDBSink.h +++ b/src/Storages/RocksDB/EmbeddedRocksDBSink.h @@ -24,6 +24,7 @@ private: StorageEmbeddedRocksDB & storage; StorageMetadataPtr metadata_snapshot; size_t primary_key_pos = 0; + Serializations serializations; }; }