Parallel compression: development

This commit is contained in:
Alexey Milovidov 2024-10-20 03:15:10 +02:00
parent 5e433ea537
commit 7229ffd507

View File

@ -1,9 +1,7 @@
#include <city.h>
#include <iostream>
#include <base/types.h>
#include <base/defines.h>
#include <base/getThreadId.h>
#include <IO/WriteHelpers.h>
#include <Common/setThreadName.h>
@ -31,7 +29,6 @@ ParallelCompressedWriteBuffer::ParallelCompressedWriteBuffer(
ThreadPool & pool_)
: WriteBuffer(nullptr, 0), out(out_), codec(codec_), buf_size(buf_size_), num_threads(num_threads_), pool(pool_)
{
std::cerr << getThreadId() << " Create a new buffer 1\n";
buffers.emplace_back(buf_size);
current_buffer = buffers.begin();
BufferBase::set(current_buffer->uncompressed.data(), buf_size, 0);
@ -70,19 +67,15 @@ void ParallelCompressedWriteBuffer::nextImpl()
if (buffers.size() < num_threads)
{
/// If we didn't use all num_threads buffers yet, create a new one.
std::cerr << getThreadId() << " Create a new buffer " << (buffers.size() + 1) << "\n";
current_buffer = buffers.emplace(current_buffer, buf_size);
}
else
{
/// Otherwise, wrap around to the first buffer in the list.
std::cerr << getThreadId() << " Wrap around\n";
current_buffer = buffers.begin();
}
}
if (current_buffer->busy)
std::cerr << getThreadId() << " Wait while the buffer " << current_buffer->sequence_num << " becomes not busy\n";
/// Wait while the buffer becomes not busy
{
CurrentMetrics::Increment metric_increment(CurrentMetrics::ParallelCompressedWriteBufferWait);
@ -102,7 +95,6 @@ void ParallelCompressedWriteBuffer::finalizeImpl()
void ParallelCompressedWriteBuffer::compress(Iterator buffer)
{
std::cerr << getThreadId() << " Compressing " << buffer->sequence_num << "...\n";
CurrentMetrics::Increment metric_increment(CurrentMetrics::ParallelCompressedWriteBufferThreads);
chassert(buffer->uncompressed_size <= INT_MAX);
@ -114,8 +106,6 @@ void ParallelCompressedWriteBuffer::compress(Iterator buffer)
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(buffer->compressed.data(), compressed_size);
if (buffer->previous && buffer->previous->busy)
std::cerr << getThreadId() << " Compressed " << buffer->sequence_num << ", waiting for prev buffer to be written...\n";
/// Wait while all previous buffers have been written.
{
CurrentMetrics::Increment metric_wait_increment(CurrentMetrics::ParallelCompressedWriteBufferWait);
@ -123,8 +113,6 @@ void ParallelCompressedWriteBuffer::compress(Iterator buffer)
cond.wait(lock, [&]{ return !buffer->previous || !buffer->previous->busy || buffer->previous->sequence_num > buffer->sequence_num; });
}
std::cerr << getThreadId() << " Writing " << buffer->sequence_num << "...\n";
writeBinaryLittleEndian(checksum.low64, out);
writeBinaryLittleEndian(checksum.high64, out);