Parallel compression: development

This commit is contained in:
Alexey Milovidov 2024-10-20 03:23:07 +02:00
parent 7229ffd507
commit 66024821cf

View File

@ -77,6 +77,7 @@ void ParallelCompressedWriteBuffer::nextImpl()
}
/// Wait while the buffer becomes not busy
if (current_buffer->busy)
{
CurrentMetrics::Increment metric_increment(CurrentMetrics::ParallelCompressedWriteBufferWait);
cond.wait(lock, [&]{ return !current_buffer->busy; });
@ -107,10 +108,11 @@ void ParallelCompressedWriteBuffer::compress(Iterator buffer)
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(buffer->compressed.data(), compressed_size);
/// Wait while all previous buffers have been written.
if (buffer->previous)
{
CurrentMetrics::Increment metric_wait_increment(CurrentMetrics::ParallelCompressedWriteBufferWait);
std::unique_lock lock(mutex);
cond.wait(lock, [&]{ return !buffer->previous || !buffer->previous->busy || buffer->previous->sequence_num > buffer->sequence_num; });
cond.wait(lock, [&]{ return !buffer->previous->busy || buffer->previous->sequence_num > buffer->sequence_num; });
}
writeBinaryLittleEndian(checksum.low64, out);