Preparation

This commit is contained in:
Alexey Milovidov 2024-10-21 00:12:06 +02:00
parent 1236422559
commit ab10830317
5 changed files with 72 additions and 31 deletions

View File

@ -103,6 +103,9 @@
M(IOThreads, "Number of threads in the IO thread pool.") \
M(IOThreadsActive, "Number of threads in the IO thread pool running a task.") \
M(IOThreadsScheduled, "Number of queued or active jobs in the IO thread pool.") \
M(CompressionThread, "Number of threads in compression thread pools.") \
M(CompressionThreadActive, "Number of threads in compression thread pools running a task.") \
M(CompressionThreadScheduled, "Number of queued or active jobs in compression thread pools.") \
M(ThreadPoolRemoteFSReaderThreads, "Number of threads in the thread pool for remote_filesystem_read_method=threadpool.") \
M(ThreadPoolRemoteFSReaderThreadsActive, "Number of threads in the thread pool for remote_filesystem_read_method=threadpool running a task.") \
M(ThreadPoolRemoteFSReaderThreadsScheduled, "Number of queued or active jobs in the thread pool for remote_filesystem_read_method=threadpool.") \

View File

@ -28,6 +28,8 @@ struct WriteSettings
bool use_adaptive_write_buffer = false;
size_t adaptive_write_buffer_initial_size = 16 * 1024;
size_t max_compression_threads = 1;
bool write_through_distributed_cache = false;
DistributedCacheSettings distributed_cache_settings;

View File

@ -3,16 +3,26 @@
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Compression/ParallelCompressedWriteBuffer.h>
#include <Common/logger_useful.h>
namespace ProfileEvents
{
extern const Event MergeTreeDataWriterSkipIndicesCalculationMicroseconds;
extern const Event MergeTreeDataWriterStatisticsCalculationMicroseconds;
extern const Event MergeTreeDataWriterSkipIndicesCalculationMicroseconds;
extern const Event MergeTreeDataWriterStatisticsCalculationMicroseconds;
}
namespace CurrentMetrics
{
extern const Metric CompressionThread;
extern const Metric CompressionThreadActive;
extern const Metric CompressionThreadScheduled;
}
namespace DB
{
namespace MergeTreeSetting
{
extern const MergeTreeSettingsUInt64 index_granularity;
@ -32,9 +42,9 @@ void MergeTreeDataPartWriterOnDisk::Stream::preFinalize()
/// Otherwise some data might stuck in the buffers above plain_file and marks_file
/// Also the order is important
compressed_hashing.finalize();
compressed_hashing->finalize();
compressor->finalize();
plain_hashing.finalize();
plain_hashing->finalize();
if (marks_hashing)
{
@ -86,12 +96,36 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream(
escaped_column_name(escaped_column_name_),
data_file_extension{data_file_extension_},
marks_file_extension{marks_file_extension_},
plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
plain_hashing(*plain_file),
compressor(std::make_unique<CompressedWriteBuffer>(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size)),
compressed_hashing(*compressor),
compress_marks(MarkType(marks_file_extension).compressed)
{
plain_file = data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings);
plain_hashing.emplace(*plain_file);
if (query_write_settings.max_compression_threads > 1)
{
compression_thread_pool.emplace(
CurrentMetrics::CompressionThread, CurrentMetrics::CompressionThreadActive, CurrentMetrics::CompressionThreadScheduled,
query_write_settings.max_compression_threads);
compressor = std::make_unique<ParallelCompressedWriteBuffer>(
*plain_hashing,
compression_codec_,
max_compress_block_size_,
query_write_settings.max_compression_threads,
*compression_thread_pool);
}
else
{
compressor = std::make_unique<CompressedWriteBuffer>(
*plain_hashing,
compression_codec_,
max_compress_block_size_,
query_write_settings.use_adaptive_write_buffer,
query_write_settings.adaptive_write_buffer_initial_size);
}
compressed_hashing.emplace(*compressor);
marks_file = data_part_storage->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings);
marks_hashing.emplace(*marks_file);
marks_compressor.emplace(*marks_hashing, marks_compression_codec_, marks_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size);
@ -110,7 +144,7 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream(
data_file_extension{data_file_extension_},
plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
plain_hashing(*plain_file),
compressor(std::make_unique<CompressedWriteBuffer>(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size)),
compressor(std::make_unique<CompressedWriteBuffer>(*plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size)),
compressed_hashing(*compressor),
compress_marks(false)
{
@ -121,10 +155,10 @@ void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPa
String name = escaped_column_name;
checksums.files[name + data_file_extension].is_compressed = true;
checksums.files[name + data_file_extension].uncompressed_size = compressed_hashing.count();
checksums.files[name + data_file_extension].uncompressed_hash = compressed_hashing.getHash();
checksums.files[name + data_file_extension].file_size = plain_hashing.count();
checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash();
checksums.files[name + data_file_extension].uncompressed_size = compressed_hashing->count();
checksums.files[name + data_file_extension].uncompressed_hash = compressed_hashing->getHash();
checksums.files[name + data_file_extension].file_size = plain_hashing->count();
checksums.files[name + data_file_extension].file_hash = plain_hashing->getHash();
if (marks_hashing)
{
@ -391,7 +425,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
{
if (skip_index_accumulated_marks[i] == index_helper->index.granularity)
{
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed_hashing);
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(*stream.compressed_hashing);
skip_index_accumulated_marks[i] = 0;
}
@ -399,11 +433,11 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
{
skip_indices_aggregators[i] = index_helper->createIndexAggregatorForPart(store, settings);
if (stream.compressed_hashing.offset() >= settings.min_compress_block_size)
stream.compressed_hashing.next();
if (stream.compressed_hashing->offset() >= settings.min_compress_block_size)
stream.compressed_hashing->next();
writeBinaryLittleEndian(stream.plain_hashing.count(), marks_out);
writeBinaryLittleEndian(stream.compressed_hashing.offset(), marks_out);
writeBinaryLittleEndian(stream.plain_hashing->count(), marks_out);
writeBinaryLittleEndian(stream.compressed_hashing->offset(), marks_out);
/// Actually this numbers is redundant, but we have to store them
/// to be compatible with the normal .mrk2 file format
@ -483,7 +517,7 @@ void MergeTreeDataPartWriterOnDisk::fillSkipIndicesChecksums(MergeTreeData::Data
{
auto & stream = *skip_indices_streams[i];
if (!skip_indices_aggregators[i]->empty())
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed_hashing);
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(*stream.compressed_hashing);
/// Register additional files written only by the full-text index. Required because otherwise DROP TABLE complains about unknown
/// files. Note that the provided actual checksums are bogus. The problem is that at this point the file writes happened already and
@ -523,7 +557,7 @@ void MergeTreeDataPartWriterOnDisk::fillStatisticsChecksums(MergeTreeData::DataP
for (size_t i = 0; i < stats.size(); i++)
{
auto & stream = *stats_streams[i];
stats[i]->serialize(stream.compressed_hashing);
stats[i]->serialize(*stream.compressed_hashing);
stream.preFinalize();
stream.addToChecksums(checksums);
}

View File

@ -27,7 +27,7 @@ struct Granule
/// this granule can be continuation of the previous one.
bool mark_on_start;
/// if true: When this granule will be written to disk all rows for corresponding mark will
/// be wrtten. It doesn't mean that rows_to_write == index_granularity.getMarkRows(mark_number),
/// be written. It doesn't mean that rows_to_write == index_granularity.getMarkRows(mark_number),
/// We may have a lot of small blocks between two marks and this may be the last one.
bool is_complete;
};
@ -74,10 +74,10 @@ public:
/// compressed_hashing -> compressor -> plain_hashing -> plain_file
std::unique_ptr<WriteBufferFromFileBase> plain_file;
HashingWriteBuffer plain_hashing;
std::optional<HashingWriteBuffer> plain_hashing;
/// This could be either CompressedWriteBuffer or ParallelCompressedWriteBuffer
std::unique_ptr<WriteBuffer> compressor;
HashingWriteBuffer compressed_hashing;
std::optional<HashingWriteBuffer> compressed_hashing;
/// marks_compressed_hashing -> marks_compressor -> marks_hashing -> marks_file
std::unique_ptr<WriteBufferFromFileBase> marks_file;
@ -88,10 +88,11 @@ public:
bool is_prefinalized = false;
/// Thread pool for parallel compression.
std::optional<ThreadPool> compression_thread_pool;
void preFinalize();
void finalize();
void sync() const;
void addToChecksums(MergeTreeDataPartChecksums & checksums);

View File

@ -9,6 +9,7 @@
#include <Storages/ColumnsDescription.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
namespace DB
{
@ -230,7 +231,7 @@ ISerialization::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGett
if (is_offsets && offset_columns.contains(stream_name))
return nullptr;
return &column_streams.at(stream_name)->compressed_hashing;
return &column_streams.at(stream_name)->compressed_hashing.value();
};
}
@ -399,13 +400,13 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
auto & stream = *column_streams[stream_name];
/// There could already be enough data to compress into the new block.
if (stream.compressed_hashing.offset() >= min_compress_block_size)
stream.compressed_hashing.next();
if (stream.compressed_hashing->offset() >= min_compress_block_size)
stream.compressed_hashing->next();
StreamNameAndMark stream_with_mark;
stream_with_mark.stream_name = stream_name;
stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing.count();
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed_hashing.offset();
stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing->count();
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed_hashing->offset();
result.push_back(stream_with_mark);
}, name_and_type.type, column_sample);
@ -438,7 +439,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
if (is_offsets && offset_columns.contains(stream_name))
return;
column_streams.at(stream_name)->compressed_hashing.nextIfAtEnd();
column_streams.at(stream_name)->compressed_hashing->nextIfAtEnd();
}, name_and_type.type, column.getPtr());
}