mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Parallel compression
This commit is contained in:
parent
bd1905635c
commit
7d81ecb183
@ -41,6 +41,10 @@
|
||||
M(PostgreSQLConnection, "Number of client connections using PostgreSQL protocol") \
|
||||
M(OpenFileForRead, "Number of files open for reading") \
|
||||
M(OpenFileForWrite, "Number of files open for writing") \
|
||||
M(Compressing, "Number of compress operations using internal compression codecs") \
|
||||
M(Decompressing, "Number of decompress operations using internal compression codecs") \
|
||||
M(ParallelCompressedWriteBufferThreads, "Number of threads in all instances of ParallelCompressedWriteBuffer - these threads are doing parallel compression and writing") \
|
||||
M(ParallelCompressedWriteBufferWait, "Number of threads in all instances of ParallelCompressedWriteBuffer that are currently waiting for buffer to become available for writing") \
|
||||
M(TotalTemporaryFiles, "Number of temporary files created") \
|
||||
M(TemporaryFilesForSort, "Number of temporary files created for external sorting") \
|
||||
M(TemporaryFilesForAggregation, "Number of temporary files created for external aggregation") \
|
||||
|
@ -2,7 +2,6 @@
|
||||
#include <cstring>
|
||||
|
||||
#include <base/types.h>
|
||||
#include <base/unaligned.h>
|
||||
#include <base/defines.h>
|
||||
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
@ -5,11 +5,18 @@
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <base/unaligned.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Compression/CompressionCodecMultiple.h>
|
||||
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric Compressing;
|
||||
extern const Metric Decompressing;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -80,6 +87,8 @@ UInt32 ICompressionCodec::compress(const char * source, UInt32 source_size, char
|
||||
{
|
||||
assert(source != nullptr && dest != nullptr);
|
||||
|
||||
CurrentMetrics::Increment metric_increment(CurrentMetrics::Compressing);
|
||||
|
||||
dest[0] = getMethodByte();
|
||||
UInt8 header_size = getHeaderSize();
|
||||
/// Write data from header_size
|
||||
@ -93,6 +102,8 @@ UInt32 ICompressionCodec::decompress(const char * source, UInt32 source_size, ch
|
||||
{
|
||||
assert(source != nullptr && dest != nullptr);
|
||||
|
||||
CurrentMetrics::Increment metric_increment(CurrentMetrics::Decompressing);
|
||||
|
||||
UInt8 header_size = getHeaderSize();
|
||||
if (source_size < header_size)
|
||||
throw Exception(decompression_error_code,
|
||||
|
118
src/Compression/ParallelCompressedWriteBuffer.cpp
Normal file
118
src/Compression/ParallelCompressedWriteBuffer.cpp
Normal file
@ -0,0 +1,118 @@
|
||||
#include <city.h>
|
||||
#include <cstring>
|
||||
|
||||
#include <base/types.h>
|
||||
#include <base/defines.h>
|
||||
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/scope_guard_safe.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
||||
#include <Compression/ParallelCompressedWriteBuffer.h>
|
||||
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric ParallelCompressedWriteBufferThreads;
|
||||
extern const Metric ParallelCompressedWriteBufferWait;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ParallelCompressedWriteBuffer::ParallelCompressedWriteBuffer(
|
||||
WriteBuffer & out_,
|
||||
CompressionCodecPtr codec_,
|
||||
size_t buf_size_,
|
||||
size_t num_threads_,
|
||||
ThreadPool & pool_)
|
||||
: WriteBuffer(nullptr, 0), out(out_), codec(codec_), buf_size(buf_size_), num_threads(num_threads_), pool(pool_)
|
||||
{
|
||||
buffers.emplace_back(buf_size);
|
||||
current_buffer = buffers.begin();
|
||||
BufferBase::set(current_buffer->uncompressed.data(), buf_size, 0);
|
||||
}
|
||||
|
||||
void ParallelCompressedWriteBuffer::nextImpl()
|
||||
{
|
||||
if (!offset())
|
||||
return;
|
||||
|
||||
std::unique_lock lock(mutex);
|
||||
|
||||
/// The buffer will be compressed and processed in the thread.
|
||||
current_buffer->busy = true;
|
||||
pool.trySchedule([this, my_current_buffer = current_buffer, thread_group = CurrentThread::getGroup()]
|
||||
{
|
||||
SCOPE_EXIT_SAFE(
|
||||
if (thread_group)
|
||||
CurrentThread::detachFromGroupIfNotDetached();
|
||||
);
|
||||
|
||||
if (thread_group)
|
||||
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||
setThreadName("ParallelCompres");
|
||||
|
||||
compress(my_current_buffer);
|
||||
});
|
||||
|
||||
const BufferPair * previous_buffer = &*current_buffer;
|
||||
++current_buffer;
|
||||
if (current_buffer == buffers.end())
|
||||
{
|
||||
if (buffers.size() < num_threads)
|
||||
{
|
||||
/// If we didn't use all num_threads buffers yet, create a new one.
|
||||
current_buffer = buffers.emplace(current_buffer, buf_size);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Otherwise, wrap around to the first buffer in the list.
|
||||
current_buffer = buffers.begin();
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait while the buffer becomes not busy
|
||||
{
|
||||
CurrentMetrics::Increment metric_increment(CurrentMetrics::ParallelCompressedWriteBufferWait);
|
||||
cond.wait(lock, [&]{ return !current_buffer->busy; });
|
||||
}
|
||||
|
||||
/// Now this buffer can be used.
|
||||
current_buffer->previous = previous_buffer;
|
||||
BufferBase::set(current_buffer->uncompressed.data(), buf_size, 0);
|
||||
}
|
||||
|
||||
void ParallelCompressedWriteBuffer::compress(Iterator buffer)
|
||||
{
|
||||
CurrentMetrics::Increment metric_increment(CurrentMetrics::ParallelCompressedWriteBufferThreads);
|
||||
|
||||
chassert(offset() <= INT_MAX);
|
||||
UInt32 decompressed_size = static_cast<UInt32>(offset());
|
||||
UInt32 compressed_reserve_size = codec->getCompressedReserveSize(decompressed_size);
|
||||
|
||||
buffer->compressed.resize(compressed_reserve_size);
|
||||
UInt32 compressed_size = codec->compress(working_buffer.begin(), decompressed_size, buffer->compressed.data());
|
||||
|
||||
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(buffer->compressed.data(), compressed_size);
|
||||
|
||||
/// Wait while all previous buffers have been written.
|
||||
{
|
||||
CurrentMetrics::Increment metric_wait_increment(CurrentMetrics::ParallelCompressedWriteBufferWait);
|
||||
std::unique_lock lock(mutex);
|
||||
cond.wait(lock, [&]{ return !buffer->previous || !buffer->previous->busy; });
|
||||
}
|
||||
|
||||
writeBinaryLittleEndian(checksum.low64, out);
|
||||
writeBinaryLittleEndian(checksum.high64, out);
|
||||
|
||||
out.write(buffer->compressed.data(), compressed_size);
|
||||
|
||||
std::unique_lock lock(mutex);
|
||||
buffer->busy = false;
|
||||
cond.notify_all();
|
||||
}
|
||||
|
||||
}
|
87
src/Compression/ParallelCompressedWriteBuffer.h
Normal file
87
src/Compression/ParallelCompressedWriteBuffer.h
Normal file
@ -0,0 +1,87 @@
|
||||
#pragma once
|
||||
|
||||
#include <list>
|
||||
#include <memory>
|
||||
|
||||
#include <Common/PODArray.h>
|
||||
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <IO/BufferWithOwnMemory.h>
|
||||
#include <Compression/ICompressionCodec.h>
|
||||
#include <Compression/CompressionFactory.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Uses multi-buffering for parallel compression.
|
||||
* When the buffer is filled, it will be compressed in the background,
|
||||
* and a new buffer is created for the next input data.
|
||||
*/
|
||||
class ParallelCompressedWriteBuffer final : public WriteBuffer
|
||||
{
|
||||
public:
|
||||
explicit ParallelCompressedWriteBuffer(
|
||||
WriteBuffer & out_,
|
||||
CompressionCodecPtr codec_,
|
||||
size_t buf_size_,
|
||||
size_t num_threads_,
|
||||
ThreadPool & pool_);
|
||||
|
||||
~ParallelCompressedWriteBuffer() override;
|
||||
|
||||
/// The amount of compressed data
|
||||
size_t getCompressedBytes()
|
||||
{
|
||||
nextIfAtEnd();
|
||||
return out.count();
|
||||
}
|
||||
|
||||
/// How many uncompressed bytes were written to the buffer
|
||||
size_t getUncompressedBytes()
|
||||
{
|
||||
return count();
|
||||
}
|
||||
|
||||
/// How many bytes are in the buffer (not yet compressed)
|
||||
size_t getRemainingBytes()
|
||||
{
|
||||
nextIfAtEnd();
|
||||
return offset();
|
||||
}
|
||||
|
||||
private:
|
||||
void nextImpl() override;
|
||||
void finalizeImpl() override;
|
||||
|
||||
WriteBuffer & out;
|
||||
CompressionCodecPtr codec;
|
||||
size_t buf_size;
|
||||
size_t num_threads;
|
||||
ThreadPool & pool;
|
||||
|
||||
struct BufferPair
|
||||
{
|
||||
BufferPair(size_t input_size)
|
||||
: uncompressed(input_size)
|
||||
{
|
||||
}
|
||||
|
||||
Memory<> uncompressed;
|
||||
PODArray<char> compressed;
|
||||
const BufferPair * previous = nullptr;
|
||||
bool busy = false;
|
||||
};
|
||||
|
||||
std::mutex mutex;
|
||||
std::condition_variable cond;
|
||||
std::list<BufferPair> buffers;
|
||||
|
||||
using Iterator = std::list<BufferPair>::iterator;
|
||||
Iterator current_buffer;
|
||||
|
||||
void compress(Iterator buffer);
|
||||
};
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user