Less memcpy

This commit is contained in:
Alexey Milovidov 2024-10-20 03:33:42 +02:00
parent 66024821cf
commit d6e0da1777

View File

@ -102,23 +102,55 @@ void ParallelCompressedWriteBuffer::compress(Iterator buffer)
UInt32 uncompressed_size = static_cast<UInt32>(buffer->uncompressed_size);
UInt32 compressed_reserve_size = codec->getCompressedReserveSize(uncompressed_size);
buffer->compressed.resize(compressed_reserve_size);
UInt32 compressed_size = codec->compress(buffer->uncompressed.data(), uncompressed_size, buffer->compressed.data());
/// If all previous buffers have been written,
/// and if the output buffer has the required capacity,
/// we can compress data directly into the output buffer.
size_t required_out_capacity = compressed_reserve_size + sizeof(CityHash_v1_0_2::uint128);
bool can_write_directly = false;
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(buffer->compressed.data(), compressed_size);
/// Wait while all previous buffers have been written.
if (buffer->previous)
if (!buffer->previous)
{
can_write_directly = out.available() >= required_out_capacity;
}
else
{
CurrentMetrics::Increment metric_wait_increment(CurrentMetrics::ParallelCompressedWriteBufferWait);
std::unique_lock lock(mutex);
cond.wait(lock, [&]{ return !buffer->previous->busy || buffer->previous->sequence_num > buffer->sequence_num; });
can_write_directly = (!buffer->previous->busy || buffer->previous->sequence_num > buffer->sequence_num)
&& out.available() >= required_out_capacity;
}
writeBinaryLittleEndian(checksum.low64, out);
writeBinaryLittleEndian(checksum.high64, out);
if (can_write_directly)
{
char * out_compressed_ptr = out.position() + sizeof(CityHash_v1_0_2::uint128);
UInt32 compressed_size = codec->compress(working_buffer.begin(), uncompressed_size, out_compressed_ptr);
out.write(buffer->compressed.data(), compressed_size);
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(out_compressed_ptr, compressed_size);
writeBinaryLittleEndian(checksum.low64, out);
writeBinaryLittleEndian(checksum.high64, out);
out.position() += compressed_size;
}
else
{
buffer->compressed.resize(compressed_reserve_size);
UInt32 compressed_size = codec->compress(buffer->uncompressed.data(), uncompressed_size, buffer->compressed.data());
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(buffer->compressed.data(), compressed_size);
/// Wait while all previous buffers have been written.
if (buffer->previous)
{
CurrentMetrics::Increment metric_wait_increment(CurrentMetrics::ParallelCompressedWriteBufferWait);
std::unique_lock lock(mutex);
cond.wait(lock, [&]{ return !buffer->previous->busy || buffer->previous->sequence_num > buffer->sequence_num; });
}
writeBinaryLittleEndian(checksum.low64, out);
writeBinaryLittleEndian(checksum.high64, out);
out.write(buffer->compressed.data(), compressed_size);
}
std::unique_lock lock(mutex);
buffer->busy = false;