diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index da3b5557dbf..c9737e2e846 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -103,6 +103,9 @@ M(IOThreads, "Number of threads in the IO thread pool.") \ M(IOThreadsActive, "Number of threads in the IO thread pool running a task.") \ M(IOThreadsScheduled, "Number of queued or active jobs in the IO thread pool.") \ + M(CompressionThread, "Number of threads in compression thread pools.") \ + M(CompressionThreadActive, "Number of threads in compression thread pools running a task.") \ + M(CompressionThreadScheduled, "Number of queued or active jobs in compression thread pools.") \ M(ThreadPoolRemoteFSReaderThreads, "Number of threads in the thread pool for remote_filesystem_read_method=threadpool.") \ M(ThreadPoolRemoteFSReaderThreadsActive, "Number of threads in the thread pool for remote_filesystem_read_method=threadpool running a task.") \ M(ThreadPoolRemoteFSReaderThreadsScheduled, "Number of queued or active jobs in the thread pool for remote_filesystem_read_method=threadpool.") \ diff --git a/src/IO/WriteSettings.h b/src/IO/WriteSettings.h index 94410f787f0..8016dede6ea 100644 --- a/src/IO/WriteSettings.h +++ b/src/IO/WriteSettings.h @@ -28,6 +28,8 @@ struct WriteSettings bool use_adaptive_write_buffer = false; size_t adaptive_write_buffer_initial_size = 16 * 1024; + size_t max_compression_threads = 1; + bool write_through_distributed_cache = false; DistributedCacheSettings distributed_cache_settings; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index c250726aba1..8047d1cf2e9 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -3,16 +3,26 @@ #include #include #include +#include #include + namespace ProfileEvents { -extern const Event MergeTreeDataWriterSkipIndicesCalculationMicroseconds; -extern const Event MergeTreeDataWriterStatisticsCalculationMicroseconds; + extern const Event MergeTreeDataWriterSkipIndicesCalculationMicroseconds; + extern const Event MergeTreeDataWriterStatisticsCalculationMicroseconds; +} + +namespace CurrentMetrics +{ + extern const Metric CompressionThread; + extern const Metric CompressionThreadActive; + extern const Metric CompressionThreadScheduled; } namespace DB { + namespace MergeTreeSetting { extern const MergeTreeSettingsUInt64 index_granularity; @@ -32,9 +42,9 @@ void MergeTreeDataPartWriterOnDisk::Stream::preFinalize() /// Otherwise some data might stuck in the buffers above plain_file and marks_file /// Also the order is important - compressed_hashing.finalize(); + compressed_hashing->finalize(); compressor->finalize(); - plain_hashing.finalize(); + plain_hashing->finalize(); if (marks_hashing) { @@ -86,12 +96,36 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( escaped_column_name(escaped_column_name_), data_file_extension{data_file_extension_}, marks_file_extension{marks_file_extension_}, - plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)), - plain_hashing(*plain_file), - compressor(std::make_unique(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size)), - compressed_hashing(*compressor), compress_marks(MarkType(marks_file_extension).compressed) { + plain_file = data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings); + plain_hashing.emplace(*plain_file); + + if (query_write_settings.max_compression_threads > 1) + { + compression_thread_pool.emplace( + CurrentMetrics::CompressionThread, CurrentMetrics::CompressionThreadActive, CurrentMetrics::CompressionThreadScheduled, + query_write_settings.max_compression_threads); + + compressor = std::make_unique( + *plain_hashing, + compression_codec_, + max_compress_block_size_, + query_write_settings.max_compression_threads, + *compression_thread_pool); + } + else + { + compressor = std::make_unique( + *plain_hashing, + compression_codec_, + max_compress_block_size_, + query_write_settings.use_adaptive_write_buffer, + query_write_settings.adaptive_write_buffer_initial_size); + } + + compressed_hashing.emplace(*compressor); + marks_file = data_part_storage->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings); marks_hashing.emplace(*marks_file); marks_compressor.emplace(*marks_hashing, marks_compression_codec_, marks_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size); @@ -110,7 +144,7 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( data_file_extension{data_file_extension_}, plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)), plain_hashing(*plain_file), - compressor(std::make_unique(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size)), + compressor(std::make_unique(*plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size)), compressed_hashing(*compressor), compress_marks(false) { @@ -121,10 +155,10 @@ void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPa String name = escaped_column_name; checksums.files[name + data_file_extension].is_compressed = true; - checksums.files[name + data_file_extension].uncompressed_size = compressed_hashing.count(); - checksums.files[name + data_file_extension].uncompressed_hash = compressed_hashing.getHash(); - checksums.files[name + data_file_extension].file_size = plain_hashing.count(); - checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash(); + checksums.files[name + data_file_extension].uncompressed_size = compressed_hashing->count(); + checksums.files[name + data_file_extension].uncompressed_hash = compressed_hashing->getHash(); + checksums.files[name + data_file_extension].file_size = plain_hashing->count(); + checksums.files[name + data_file_extension].file_hash = plain_hashing->getHash(); if (marks_hashing) { @@ -391,7 +425,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block { if (skip_index_accumulated_marks[i] == index_helper->index.granularity) { - skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed_hashing); + skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(*stream.compressed_hashing); skip_index_accumulated_marks[i] = 0; } @@ -399,11 +433,11 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block { skip_indices_aggregators[i] = index_helper->createIndexAggregatorForPart(store, settings); - if (stream.compressed_hashing.offset() >= settings.min_compress_block_size) - stream.compressed_hashing.next(); + if (stream.compressed_hashing->offset() >= settings.min_compress_block_size) + stream.compressed_hashing->next(); - writeBinaryLittleEndian(stream.plain_hashing.count(), marks_out); - writeBinaryLittleEndian(stream.compressed_hashing.offset(), marks_out); + writeBinaryLittleEndian(stream.plain_hashing->count(), marks_out); + writeBinaryLittleEndian(stream.compressed_hashing->offset(), marks_out); /// Actually this numbers is redundant, but we have to store them /// to be compatible with the normal .mrk2 file format @@ -483,7 +517,7 @@ void MergeTreeDataPartWriterOnDisk::fillSkipIndicesChecksums(MergeTreeData::Data { auto & stream = *skip_indices_streams[i]; if (!skip_indices_aggregators[i]->empty()) - skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed_hashing); + skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(*stream.compressed_hashing); /// Register additional files written only by the full-text index. Required because otherwise DROP TABLE complains about unknown /// files. Note that the provided actual checksums are bogus. The problem is that at this point the file writes happened already and @@ -523,7 +557,7 @@ void MergeTreeDataPartWriterOnDisk::fillStatisticsChecksums(MergeTreeData::DataP for (size_t i = 0; i < stats.size(); i++) { auto & stream = *stats_streams[i]; - stats[i]->serialize(stream.compressed_hashing); + stats[i]->serialize(*stream.compressed_hashing); stream.preFinalize(); stream.addToChecksums(checksums); } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h index 0d80333368d..046571cb83f 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h @@ -27,7 +27,7 @@ struct Granule /// this granule can be continuation of the previous one. bool mark_on_start; /// if true: When this granule will be written to disk all rows for corresponding mark will - /// be wrtten. It doesn't mean that rows_to_write == index_granularity.getMarkRows(mark_number), + /// be written. It doesn't mean that rows_to_write == index_granularity.getMarkRows(mark_number), /// We may have a lot of small blocks between two marks and this may be the last one. bool is_complete; }; @@ -74,10 +74,10 @@ public: /// compressed_hashing -> compressor -> plain_hashing -> plain_file std::unique_ptr plain_file; - HashingWriteBuffer plain_hashing; + std::optional plain_hashing; /// This could be either CompressedWriteBuffer or ParallelCompressedWriteBuffer std::unique_ptr compressor; - HashingWriteBuffer compressed_hashing; + std::optional compressed_hashing; /// marks_compressed_hashing -> marks_compressor -> marks_hashing -> marks_file std::unique_ptr marks_file; @@ -88,10 +88,11 @@ public: bool is_prefinalized = false; + /// Thread pool for parallel compression. + std::optional compression_thread_pool; + void preFinalize(); - void finalize(); - void sync() const; void addToChecksums(MergeTreeDataPartChecksums & checksums); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index d1d4aa4f5b0..523e4c4a31d 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -9,6 +9,7 @@ #include #include + namespace DB { @@ -230,7 +231,7 @@ ISerialization::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGett if (is_offsets && offset_columns.contains(stream_name)) return nullptr; - return &column_streams.at(stream_name)->compressed_hashing; + return &column_streams.at(stream_name)->compressed_hashing.value(); }; } @@ -399,13 +400,13 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn( auto & stream = *column_streams[stream_name]; /// There could already be enough data to compress into the new block. - if (stream.compressed_hashing.offset() >= min_compress_block_size) - stream.compressed_hashing.next(); + if (stream.compressed_hashing->offset() >= min_compress_block_size) + stream.compressed_hashing->next(); StreamNameAndMark stream_with_mark; stream_with_mark.stream_name = stream_name; - stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing.count(); - stream_with_mark.mark.offset_in_decompressed_block = stream.compressed_hashing.offset(); + stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing->count(); + stream_with_mark.mark.offset_in_decompressed_block = stream.compressed_hashing->offset(); result.push_back(stream_with_mark); }, name_and_type.type, column_sample); @@ -438,7 +439,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule( if (is_offsets && offset_columns.contains(stream_name)) return; - column_streams.at(stream_name)->compressed_hashing.nextIfAtEnd(); + column_streams.at(stream_name)->compressed_hashing->nextIfAtEnd(); }, name_and_type.type, column.getPtr()); }