From 6d49a6266665304e67b0a51a591e46a44cc7252d Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 14 Jan 2022 19:53:55 +0000 Subject: [PATCH] Some more async writes. --- src/Disks/IDiskRemote.cpp | 2 +- src/Storages/MergeTree/DataPartsExchange.cpp | 8 +-- src/Storages/MergeTree/MergeTask.cpp | 10 ++-- .../MergeTree/MergeTreeDataPartInMemory.cpp | 8 +-- .../MergeTree/MergeTreeDataWriter.cpp | 59 ++++++++++++------- src/Storages/MergeTree/MergeTreeDataWriter.h | 27 +++++++-- src/Storages/MergeTree/MergeTreeSink.cpp | 59 ++++++++++++++++++- src/Storages/MergeTree/MergeTreeSink.h | 19 +++--- .../MergeTree/MergeTreeWriteAheadLog.cpp | 10 ++-- .../MergeTree/MergedBlockOutputStream.cpp | 41 +++++++++---- .../MergeTree/MergedBlockOutputStream.h | 19 ++++-- src/Storages/MergeTree/MutateTask.cpp | 18 ++++-- .../MergeTree/ReplicatedMergeTreeSink.cpp | 11 +++- src/Storages/StorageReplicatedMergeTree.cpp | 4 +- 14 files changed, 212 insertions(+), 83 deletions(-) diff --git a/src/Disks/IDiskRemote.cpp b/src/Disks/IDiskRemote.cpp index ecf1366f171..4f08d567933 100644 --- a/src/Disks/IDiskRemote.cpp +++ b/src/Disks/IDiskRemote.cpp @@ -516,7 +516,7 @@ AsynchronousReaderPtr IDiskRemote::getThreadPoolReader() ThreadPool & IDiskRemote::getThreadPoolWriter() { - constexpr size_t pool_size = 50; + constexpr size_t pool_size = 300; constexpr size_t queue_size = 1000000; static ThreadPool writer(pool_size, pool_size, queue_size); return writer; diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index f9e7431dfb6..76c580cf88a 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -593,8 +593,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( CompressionCodecFactory::instance().get("NONE", {})); part_out.write(block); - auto written_files = part_out.finalizePart(new_projection_part); - part_out.finish(new_projection_part, std::move(written_files), false); + auto finalizer = part_out.finalizePart(new_projection_part, false); + part_out.finish(std::move(finalizer)); new_projection_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true); new_data_part->addProjectionPart(projection_name, std::move(new_projection_part)); } @@ -618,8 +618,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( CompressionCodecFactory::instance().get("NONE", {})); part_out.write(block); - auto written_files = part_out.finalizePart(new_data_part); - part_out.finish(new_data_part, std::move(written_files), false); + auto finalizer = part_out.finalizePart(new_data_part, false); + part_out.finish(std::move(finalizer)); new_data_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true); return new_data_part; diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 13385cac32a..3dae7f8927e 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -632,14 +632,14 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const global_ctx->new_data_part->addProjectionPart(part->name, std::move(part)); } - MergedBlockOutputStream::WrittenFiles written_files; + std::optional finalizer; if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical) - written_files = global_ctx->to->finalizePart(global_ctx->new_data_part); + finalizer = global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync); else - written_files = global_ctx->to->finalizePart( - global_ctx->new_data_part, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns); + finalizer = global_ctx->to->finalizePart( + global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns); - global_ctx->to->finish(global_ctx->new_data_part, std::move(written_files), ctx->need_sync); + global_ctx->to->finish(std::move(*finalizer)); global_ctx->promise.set_value(global_ctx->new_data_part); diff --git a/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp b/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp index b95c9c78fef..a5c144d9810 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp @@ -125,14 +125,14 @@ void MergeTreeDataPartInMemory::flushToDisk(const String & base_path, const Stri projection_compression_codec); projection_out.write(projection_part->block); - auto written_files = projection_out.finalizePart(projection_data_part); - projection_out.finish(projection_data_part, std::move(written_files), false); + auto finalizer = projection_out.finalizePart(projection_data_part, false); + projection_out.finish(std::move(finalizer)); new_data_part->addProjectionPart(projection_name, std::move(projection_data_part)); } } - auto written_files = out.finalizePart(new_data_part); - out.finish(new_data_part, std::move(written_files), false); + auto finalizer = out.finalizePart(new_data_part, false); + out.finish(std::move(finalizer)); } void MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 25e4020096c..428786fb7eb 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -137,6 +137,12 @@ void updateTTL( } +void MergeTreeDataWriter::TempPart::finalize() +{ + for (auto & stream : streams) + stream.stream->finish(std::move(stream.finalizer)); +} + BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts( const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) { @@ -270,9 +276,10 @@ Block MergeTreeDataWriter::mergeBlock( return block.cloneWithColumns(status.chunk.getColumns()); } -MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart( +MergeTreeDataWriter::TempPart MergeTreeDataWriter::writeTempPart( BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) { + TempPart temp_part; Block & block = block_with_partition.block; static const String TMP_PREFIX = "tmp_insert_"; @@ -343,7 +350,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart( /// If optimize_on_insert is true, block may become empty after merge. /// There is no need to create empty part. if (expected_size == 0) - return nullptr; + return temp_part; DB::IMergeTreeDataPart::TTLInfos move_ttl_infos; const auto & move_ttl_entries = metadata_snapshot->getMoveTTLs(); @@ -418,31 +425,37 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart( auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0); const auto & index_factory = MergeTreeIndexFactory::instance(); - MergedBlockOutputStream out(new_data_part, metadata_snapshot,columns, + auto out = std::make_unique(new_data_part, metadata_snapshot,columns, index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec); - bool sync_on_insert = data_settings->fsync_after_insert; - - out.writeWithPermutation(block, perm_ptr); + out->writeWithPermutation(block, perm_ptr); for (const auto & projection : metadata_snapshot->getProjections()) { auto projection_block = projection.calculate(block, context); if (projection_block.rows()) - new_data_part->addProjectionPart( - projection.name, writeProjectionPart(data, log, projection_block, projection, new_data_part.get())); + { + auto proj_temp_part = writeProjectionPart(data, log, projection_block, projection, new_data_part.get()); + new_data_part->addProjectionPart(projection.name, std::move(proj_temp_part.part)); + for (auto & stream : proj_temp_part.streams) + temp_part.streams.emplace_back(std::move(stream)); + } } - auto written_files = out.finalizePart(new_data_part); - out.finish(new_data_part, std::move(written_files), sync_on_insert); + auto finalizer = out->finalizePart(new_data_part, data_settings->fsync_after_insert); + + temp_part.part = new_data_part; + temp_part.streams.emplace_back(TempPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)}); + + /// out.finish(new_data_part, std::move(written_files), sync_on_insert); ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows()); ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes()); ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->getBytesOnDisk()); - return new_data_part; + return temp_part; } -MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl( +MergeTreeDataWriter::TempPart MergeTreeDataWriter::writeProjectionPartImpl( const String & part_name, MergeTreeDataPartType part_type, const String & relative_path, @@ -453,6 +466,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl( Block block, const ProjectionDescription & projection) { + TempPart temp_part; const StorageMetadataPtr & metadata_snapshot = projection.metadata; MergeTreePartInfo new_part_info("all", 0, 0, 0); auto new_data_part = data.createPart( @@ -524,25 +538,28 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl( /// either default lz4 or compression method with zero thresholds on absolute and relative part size. auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0); - MergedBlockOutputStream out( + auto out = std::make_unique( new_data_part, metadata_snapshot, columns, - {}, + MergeTreeIndices{}, compression_codec); - out.writeWithPermutation(block, perm_ptr); - auto written_files = out.finalizePart(new_data_part); - out.finish(new_data_part, std::move(written_files), false); + out->writeWithPermutation(block, perm_ptr); + auto finalizer = out->finalizePart(new_data_part, false); + temp_part.part = new_data_part; + temp_part.streams.emplace_back(TempPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)}); + + // out.finish(new_data_part, std::move(written_files), false); ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterRows, block.rows()); ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterUncompressedBytes, block.bytes()); ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterCompressedBytes, new_data_part->getBytesOnDisk()); - return new_data_part; + return temp_part; } -MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPart( +MergeTreeDataWriter::TempPart MergeTreeDataWriter::writeProjectionPart( MergeTreeData & data, Poco::Logger * log, Block block, const ProjectionDescription & projection, const IMergeTreeDataPart * parent_part) { String part_name = projection.name; @@ -574,7 +591,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPart( /// This is used for projection materialization process which may contain multiple stages of /// projection part merges. -MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart( +MergeTreeDataWriter::TempPart MergeTreeDataWriter::writeTempProjectionPart( MergeTreeData & data, Poco::Logger * log, Block block, @@ -609,7 +626,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart( projection); } -MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeInMemoryProjectionPart( +MergeTreeDataWriter::TempPart MergeTreeDataWriter::writeInMemoryProjectionPart( const MergeTreeData & data, Poco::Logger * log, Block block, diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.h b/src/Storages/MergeTree/MergeTreeDataWriter.h index f16ec877113..e78017dc887 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.h +++ b/src/Storages/MergeTree/MergeTreeDataWriter.h @@ -10,6 +10,7 @@ #include #include +#include namespace DB @@ -46,11 +47,25 @@ public: */ MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert); - MergeTreeData::MutableDataPartPtr - writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context); + struct TempPart + { + MergeTreeData::MutableDataPartPtr part; + + struct Stream + { + std::unique_ptr stream; + MergedBlockOutputStream::Finalizer finalizer; + }; + + std::vector streams; + + void finalize(); + }; + + TempPart writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context); /// For insertion. - static MergeTreeData::MutableDataPartPtr writeProjectionPart( + static TempPart writeProjectionPart( MergeTreeData & data, Poco::Logger * log, Block block, @@ -58,7 +73,7 @@ public: const IMergeTreeDataPart * parent_part); /// For mutation: MATERIALIZE PROJECTION. - static MergeTreeData::MutableDataPartPtr writeTempProjectionPart( + static TempPart writeTempProjectionPart( MergeTreeData & data, Poco::Logger * log, Block block, @@ -67,7 +82,7 @@ public: size_t block_num); /// For WriteAheadLog AddPart. - static MergeTreeData::MutableDataPartPtr writeInMemoryProjectionPart( + static TempPart writeInMemoryProjectionPart( const MergeTreeData & data, Poco::Logger * log, Block block, @@ -82,7 +97,7 @@ public: const MergeTreeData::MergingParams & merging_params); private: - static MergeTreeData::MutableDataPartPtr writeProjectionPartImpl( + static TempPart writeProjectionPartImpl( const String & part_name, MergeTreeDataPartType part_type, const String & relative_path, diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index 5e97f80d849..7a46e5e040b 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -7,6 +7,21 @@ namespace DB { +MergeTreeSink::~MergeTreeSink() = default; + +MergeTreeSink::MergeTreeSink( + StorageMergeTree & storage_, + StorageMetadataPtr metadata_snapshot_, + size_t max_parts_per_block_, + ContextPtr context_) + : SinkToStorage(metadata_snapshot_->getSampleBlock()) + , storage(storage_) + , metadata_snapshot(metadata_snapshot_) + , max_parts_per_block(max_parts_per_block_) + , context(context_) +{ +} + void MergeTreeSink::onStart() { /// Only check "too many parts" before write, @@ -14,6 +29,17 @@ void MergeTreeSink::onStart() storage.delayInsertOrThrowIfNeeded(); } +void MergeTreeSink::onFinish() +{ + finishPrevPart(); +} + +struct MergeTreeSink::PrevPart +{ + MergeTreeDataWriter::TempPart temp_part; + UInt64 elapsed_ns; +}; + void MergeTreeSink::consume(Chunk chunk) { @@ -24,22 +50,49 @@ void MergeTreeSink::consume(Chunk chunk) { Stopwatch watch; - MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, context); + auto temp_part = storage.writer.writeTempPart(current_block, metadata_snapshot, context); + + LOG_TRACE(&Poco::Logger::get("MergeTreeSink"), "Written part {}", temp_part.part->getNameWithState()); + + UInt64 elapsed_ns = watch.elapsed(); /// If optimize_on_insert setting is true, current_block could become empty after merge /// and we didn't create part. - if (!part) + if (!temp_part.part) continue; + finishPrevPart(); + + prev_part = std::make_unique(); + prev_part->temp_part = std::move(temp_part); + prev_part->elapsed_ns = elapsed_ns; + } +} + +void MergeTreeSink::finishPrevPart() +{ + if (prev_part) + { + + LOG_TRACE(&Poco::Logger::get("MergeTreeSink"), "Finalizing part {}", prev_part->temp_part.part->getNameWithState()); + prev_part->temp_part.finalize(); + LOG_TRACE(&Poco::Logger::get("MergeTreeSink"), "Finalized part {}", prev_part->temp_part.part->getNameWithState()); + + auto & part = prev_part->temp_part.part; + /// Part can be deduplicated, so increment counters and add to part log only if it's really added if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog())) { - PartLog::addNewPart(storage.getContext(), part, watch.elapsed()); + PartLog::addNewPart(storage.getContext(), part, prev_part->elapsed_ns); /// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'. storage.background_operations_assignee.trigger(); } + + LOG_TRACE(&Poco::Logger::get("MergeTreeSink"), "Renamed part {}", prev_part->temp_part.part->getNameWithState()); } + + prev_part.reset(); } } diff --git a/src/Storages/MergeTree/MergeTreeSink.h b/src/Storages/MergeTree/MergeTreeSink.h index 60ac62c7592..28288e5bd74 100644 --- a/src/Storages/MergeTree/MergeTreeSink.h +++ b/src/Storages/MergeTree/MergeTreeSink.h @@ -16,26 +16,27 @@ class MergeTreeSink : public SinkToStorage public: MergeTreeSink( StorageMergeTree & storage_, - const StorageMetadataPtr metadata_snapshot_, + StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_, - ContextPtr context_) - : SinkToStorage(metadata_snapshot_->getSampleBlock()) - , storage(storage_) - , metadata_snapshot(metadata_snapshot_) - , max_parts_per_block(max_parts_per_block_) - , context(context_) - { - } + ContextPtr context_); + + ~MergeTreeSink() override; String getName() const override { return "MergeTreeSink"; } void consume(Chunk chunk) override; void onStart() override; + void onFinish() override; private: StorageMergeTree & storage; StorageMetadataPtr metadata_snapshot; size_t max_parts_per_block; ContextPtr context; + + struct PrevPart; + std::unique_ptr prev_part; + + void finishPrevPart(); }; } diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp index 299cd6114bf..f10a211cac6 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp @@ -208,13 +208,13 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor for (const auto & projection : metadata_snapshot->getProjections()) { auto projection_block = projection.calculate(block, context); + auto temp_part = MergeTreeDataWriter::writeInMemoryProjectionPart(storage, log, projection_block, projection, part.get()); + temp_part.finalize(); if (projection_block.rows()) - part->addProjectionPart( - projection.name, - MergeTreeDataWriter::writeInMemoryProjectionPart(storage, log, projection_block, projection, part.get())); + part->addProjectionPart(projection.name, std::move(temp_part.part)); } - auto written_files = part_out.finalizePart(part); - part_out.finish(part, std::move(written_files), false); + auto finalizer = part_out.finalizePart(part, false); + part_out.finish(std::move(finalizer)); min_block_number = std::min(min_block_number, part->info.min_block); max_block_number = std::max(max_block_number, part->info.max_block); diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 2427be40579..0cb4319c877 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -51,8 +51,21 @@ void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IC writeImpl(block, permutation); } -MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePart( +struct MergedBlockOutputStream::Finalizer::Impl +{ + MergeTreeData::MutableDataPartPtr part; + std::vector> written_files; + bool sync; +}; + +MergedBlockOutputStream::Finalizer::~Finalizer() = default; +MergedBlockOutputStream::Finalizer::Finalizer(Finalizer &&) = default; +MergedBlockOutputStream::Finalizer & MergedBlockOutputStream::Finalizer::operator=(Finalizer &&) = default; +MergedBlockOutputStream::Finalizer::Finalizer(std::unique_ptr impl_) : impl(std::move(impl_)) {} + +MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePart( MergeTreeData::MutableDataPartPtr & new_part, + bool sync, const NamesAndTypesList * total_columns_list, MergeTreeData::DataPart::Checksums * additional_column_checksums) { @@ -65,6 +78,8 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePart( /// Finish columns serialization. writer->fillChecksums(checksums); + LOG_TRACE(&Poco::Logger::get("MergedBlockOutputStream"), "filled checksums {}", new_part->getNameWithState()); + for (const auto & [projection_name, projection_part] : new_part->getProjectionParts()) checksums.addFile( projection_name + ".proj", @@ -81,9 +96,11 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePart( ? new_serialization_infos : new_part->getSerializationInfos(); - WrittenFiles written_files; + auto finalizer = std::make_unique(); + finalizer->sync = sync; + finalizer->part = new_part; if (new_part->isStoredOnDisk()) - written_files = finalizePartOnDisk(new_part, part_columns, serialization_infos, checksums); + finalizer->written_files = finalizePartOnDisk(new_part, part_columns, serialization_infos, checksums); if (reset_columns) new_part->setColumns(part_columns, serialization_infos); @@ -96,23 +113,24 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePart( new_part->index_granularity = writer->getIndexGranularity(); new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(); - return written_files; + if (default_codec != nullptr) + new_part->default_codec = default_codec; + + return Finalizer(std::move(finalizer)); } -void MergedBlockOutputStream::finish(MergeTreeData::MutableDataPartPtr & new_part, WrittenFiles files_to_finalize, bool sync) +void MergedBlockOutputStream::finish(Finalizer finalizer) { - writer->finish(sync); + writer->finish(finalizer.impl->sync); - for (auto & file : files_to_finalize) + for (auto & file : finalizer.impl->written_files) { file->finalize(); if (sync) file->sync(); } - if (default_codec != nullptr) - new_part->default_codec = default_codec; - new_part->storage.lockSharedData(*new_part); + finalizer.impl->part->storage.lockSharedData(*finalizer.impl->part); } MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDisk( @@ -121,6 +139,9 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis SerializationInfoByName & serialization_infos, MergeTreeData::DataPart::Checksums & checksums) { + + LOG_TRACE(&Poco::Logger::get("MergedBlockOutputStream"), "finalizing {}", new_part->getNameWithState()); + WrittenFiles written_files; if (new_part->isProjectionPart()) { diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.h b/src/Storages/MergeTree/MergedBlockOutputStream.h index 3b3029bc0e1..88bdacd61a9 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -32,16 +32,26 @@ public: */ void writeWithPermutation(const Block & block, const IColumn::Permutation * permutation); - using WrittenFiles = std::vector>; + struct Finalizer + { + struct Impl; + std::unique_ptr impl; + + explicit Finalizer(std::unique_ptr impl_); + ~Finalizer(); + Finalizer(Finalizer &&); + Finalizer & operator=(Finalizer &&); + }; /// Finalize writing part and fill inner structures /// If part is new and contains projections, they should be added before invoking this method. - WrittenFiles finalizePart( + Finalizer finalizePart( MergeTreeData::MutableDataPartPtr & new_part, + bool sync, const NamesAndTypesList * total_columns_list = nullptr, MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr); - void finish(MergeTreeData::MutableDataPartPtr & new_part, WrittenFiles files_to_finalize, bool sync); + void finish(Finalizer finalizer); private: /** If `permutation` is given, it rearranges the values in the columns when writing. @@ -49,7 +59,8 @@ private: */ void writeImpl(const Block & block, const IColumn::Permutation * permutation); - MergedBlockOutputStream::WrittenFiles finalizePartOnDisk( + using WrittenFiles = std::vector>; + WrittenFiles finalizePartOnDisk( const MergeTreeData::MutableDataPartPtr & new_part, NamesAndTypesList & part_columns, SerializationInfoByName & serialization_infos, diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 2d6606e5b72..c5d07a7ba02 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -805,8 +805,12 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() const auto & projection = *ctx->projections_to_build[i]; auto projection_block = projection_squashes[i].add(projection.calculate(cur_block, ctx->context)); if (projection_block) - projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart( - *ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num)); + { + auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( + *ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num); + tmp_part.finalize(); + projection_parts[projection.name].emplace_back(std::move(tmp_part.part)); + } } (*ctx->mutate_entry)->rows_written += cur_block.rows(); @@ -824,8 +828,10 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() auto projection_block = projection_squash.add({}); if (projection_block) { - projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart( - *ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num)); + auto temp_part = MergeTreeDataWriter::writeTempProjectionPart( + *ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num); + temp_part.finalize(); + projection_parts[projection.name].emplace_back(std::move(temp_part.part)); } } @@ -977,8 +983,8 @@ private: ctx->mutating_executor.reset(); ctx->mutating_pipeline.reset(); - auto written_files = static_pointer_cast(ctx->out)->finalizePart(ctx->new_data_part); - static_pointer_cast(ctx->out)->finish(ctx->new_data_part, std::move(written_files), ctx->need_sync); + auto finalizer = static_pointer_cast(ctx->out)->finalizePart(ctx->new_data_part, ctx->need_sync); + static_pointer_cast(ctx->out)->finish(std::move(finalizer)); ctx->out.reset(); } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 1ce748640dc..12f7c2d7c1c 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -147,11 +147,11 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk) /// Write part to the filesystem under temporary name. Calculate a checksum. - MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, context); + auto temp_part = storage.writer.writeTempPart(current_block, metadata_snapshot, context); /// If optimize_on_insert setting is true, current_block could become empty after merge /// and we didn't create part. - if (!part) + if (!temp_part.part) continue; String block_id; @@ -160,7 +160,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk) { /// We add the hash from the data and partition identifier to deduplication ID. /// That is, do not insert the same data to the same partition twice. - block_id = part->getZeroLevelPartBlockID(); + block_id = temp_part.part->getZeroLevelPartBlockID(); LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows()); } @@ -169,6 +169,11 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk) LOG_DEBUG(log, "Wrote block with {} rows", current_block.block.rows()); } + for (auto & stream : temp_part.streams) + stream.stream->finish(std::move(stream.finalizer)); + + auto & part = temp_part.part; + try { commitPart(zookeeper, part, block_id); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 89f096a6d2a..d7bb849a213 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7405,8 +7405,8 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP /// TODO(ab): What projections should we add to the empty part? How can we make sure that it /// won't block future merges? Perhaps we should also check part emptiness when selecting parts /// to merge. - auto written_files = out.finalizePart(new_data_part); - out.finish(new_data_part, std::move(written_files), sync_on_insert); + auto finalizer = out.finalizePart(new_data_part, sync_on_insert); + out.finish(std::move(finalizer)); try {