Incomplete

This commit is contained in:
Alexey Milovidov 2024-10-21 18:39:20 +02:00
parent dba7c9cf4a
commit e6bae901ed
5 changed files with 29 additions and 25 deletions

View File

@ -44,6 +44,8 @@ void ParallelCompressedWriteBuffer::nextImpl()
/// The buffer will be compressed and processed in the thread.
current_buffer->busy = true;
current_buffer->sequence_num = current_sequence_num;
current_buffer->out_callback = callback;
callback = {};
++current_sequence_num;
current_buffer->uncompressed_size = offset();
pool.scheduleOrThrowOnError([this, my_current_buffer = current_buffer, thread_group = CurrentThread::getGroup()]
@ -60,7 +62,7 @@ void ParallelCompressedWriteBuffer::nextImpl()
compress(my_current_buffer);
});
const BufferPair * previous_buffer = &*current_buffer;
BufferPair * previous_buffer = &*current_buffer;
++current_buffer;
if (current_buffer == buffers.end())
{
@ -153,6 +155,8 @@ void ParallelCompressedWriteBuffer::compress(Iterator buffer)
}
std::unique_lock lock(mutex);
if (buffer->out_callback)
buffer->out_callback();
buffer->busy = false;
cond.notify_all();
}

View File

@ -31,24 +31,11 @@ public:
~ParallelCompressedWriteBuffer() override;
/// The amount of compressed data
size_t getCompressedBytes()
/// This function will be called once after compressing the next data and sending it to the out.
/// It can be used to fill information about marks.
void setCompletionCallback(std::function<void()> callback_)
{
nextIfAtEnd();
return out.count();
}
/// How many uncompressed bytes were written to the buffer
size_t getUncompressedBytes()
{
return count();
}
/// How many bytes are in the buffer (not yet compressed)
size_t getRemainingBytes()
{
nextIfAtEnd();
return offset();
callback = callback_;
}
private:
@ -71,15 +58,18 @@ private:
Memory<> uncompressed;
size_t uncompressed_size = 0;
PODArray<char> compressed;
const BufferPair * previous = nullptr;
BufferPair * previous = nullptr;
size_t sequence_num = 0;
bool busy = false;
std::function<void()> out_callback;
};
std::mutex mutex;
std::condition_variable cond;
std::list<BufferPair> buffers;
std::function<void()> callback;
using Iterator = std::list<BufferPair>::iterator;
Iterator current_buffer;
size_t current_sequence_num = 0;

View File

@ -113,6 +113,8 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream(
max_compress_block_size_,
query_write_settings.max_compression_threads,
*compression_thread_pool);
is_compressor_parallel = true;
}
else
{

View File

@ -76,6 +76,7 @@ public:
std::unique_ptr<WriteBufferFromFileBase> plain_file;
std::optional<HashingWriteBuffer> plain_hashing;
/// This could be either CompressedWriteBuffer or ParallelCompressedWriteBuffer
bool is_compressor_parallel = false;
std::unique_ptr<WriteBuffer> compressor;
std::optional<HashingWriteBuffer> compressed_hashing;

View File

@ -400,15 +400,22 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
auto & stream = *column_streams[stream_name];
/// There could already be enough data to compress into the new block.
auto push_mark = [&]
{
StreamNameAndMark stream_with_mark;
stream_with_mark.stream_name = stream_name;
stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing->count();
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed_hashing->offset();
result.push_back(stream_with_mark);
};
if (stream.compressed_hashing->offset() >= min_compress_block_size)
{
stream.compressed_hashing->next();
}
StreamNameAndMark stream_with_mark;
stream_with_mark.stream_name = stream_name;
stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing->count();
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed_hashing->offset();
result.push_back(stream_with_mark);
push_mark();
}, name_and_type.type, column_sample);
return result;