Some more async writes.

This commit is contained in:
Nikolai Kochetov 2022-01-14 19:53:55 +00:00
parent 843983ea06
commit 6d49a62666
14 changed files with 212 additions and 83 deletions

View File

@ -516,7 +516,7 @@ AsynchronousReaderPtr IDiskRemote::getThreadPoolReader()
ThreadPool & IDiskRemote::getThreadPoolWriter() ThreadPool & IDiskRemote::getThreadPoolWriter()
{ {
constexpr size_t pool_size = 50; constexpr size_t pool_size = 300;
constexpr size_t queue_size = 1000000; constexpr size_t queue_size = 1000000;
static ThreadPool writer(pool_size, pool_size, queue_size); static ThreadPool writer(pool_size, pool_size, queue_size);
return writer; return writer;

View File

@ -593,8 +593,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
CompressionCodecFactory::instance().get("NONE", {})); CompressionCodecFactory::instance().get("NONE", {}));
part_out.write(block); part_out.write(block);
auto written_files = part_out.finalizePart(new_projection_part); auto finalizer = part_out.finalizePart(new_projection_part, false);
part_out.finish(new_projection_part, std::move(written_files), false); part_out.finish(std::move(finalizer));
new_projection_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true); new_projection_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);
new_data_part->addProjectionPart(projection_name, std::move(new_projection_part)); new_data_part->addProjectionPart(projection_name, std::move(new_projection_part));
} }
@ -618,8 +618,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
CompressionCodecFactory::instance().get("NONE", {})); CompressionCodecFactory::instance().get("NONE", {}));
part_out.write(block); part_out.write(block);
auto written_files = part_out.finalizePart(new_data_part); auto finalizer = part_out.finalizePart(new_data_part, false);
part_out.finish(new_data_part, std::move(written_files), false); part_out.finish(std::move(finalizer));
new_data_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true); new_data_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);
return new_data_part; return new_data_part;

View File

@ -632,14 +632,14 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const
global_ctx->new_data_part->addProjectionPart(part->name, std::move(part)); global_ctx->new_data_part->addProjectionPart(part->name, std::move(part));
} }
MergedBlockOutputStream::WrittenFiles written_files; std::optional<MergedBlockOutputStream::Finalizer> finalizer;
if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical) if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical)
written_files = global_ctx->to->finalizePart(global_ctx->new_data_part); finalizer = global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync);
else else
written_files = global_ctx->to->finalizePart( finalizer = global_ctx->to->finalizePart(
global_ctx->new_data_part, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns); global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns);
global_ctx->to->finish(global_ctx->new_data_part, std::move(written_files), ctx->need_sync); global_ctx->to->finish(std::move(*finalizer));
global_ctx->promise.set_value(global_ctx->new_data_part); global_ctx->promise.set_value(global_ctx->new_data_part);

View File

@ -125,14 +125,14 @@ void MergeTreeDataPartInMemory::flushToDisk(const String & base_path, const Stri
projection_compression_codec); projection_compression_codec);
projection_out.write(projection_part->block); projection_out.write(projection_part->block);
auto written_files = projection_out.finalizePart(projection_data_part); auto finalizer = projection_out.finalizePart(projection_data_part, false);
projection_out.finish(projection_data_part, std::move(written_files), false); projection_out.finish(std::move(finalizer));
new_data_part->addProjectionPart(projection_name, std::move(projection_data_part)); new_data_part->addProjectionPart(projection_name, std::move(projection_data_part));
} }
} }
auto written_files = out.finalizePart(new_data_part); auto finalizer = out.finalizePart(new_data_part, false);
out.finish(new_data_part, std::move(written_files), false); out.finish(std::move(finalizer));
} }
void MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const void MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const

View File

@ -137,6 +137,12 @@ void updateTTL(
} }
void MergeTreeDataWriter::TempPart::finalize()
{
for (auto & stream : streams)
stream.stream->finish(std::move(stream.finalizer));
}
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts( BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{ {
@ -270,9 +276,10 @@ Block MergeTreeDataWriter::mergeBlock(
return block.cloneWithColumns(status.chunk.getColumns()); return block.cloneWithColumns(status.chunk.getColumns());
} }
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart( MergeTreeDataWriter::TempPart MergeTreeDataWriter::writeTempPart(
BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{ {
TempPart temp_part;
Block & block = block_with_partition.block; Block & block = block_with_partition.block;
static const String TMP_PREFIX = "tmp_insert_"; static const String TMP_PREFIX = "tmp_insert_";
@ -343,7 +350,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(
/// If optimize_on_insert is true, block may become empty after merge. /// If optimize_on_insert is true, block may become empty after merge.
/// There is no need to create empty part. /// There is no need to create empty part.
if (expected_size == 0) if (expected_size == 0)
return nullptr; return temp_part;
DB::IMergeTreeDataPart::TTLInfos move_ttl_infos; DB::IMergeTreeDataPart::TTLInfos move_ttl_infos;
const auto & move_ttl_entries = metadata_snapshot->getMoveTTLs(); const auto & move_ttl_entries = metadata_snapshot->getMoveTTLs();
@ -418,31 +425,37 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0); auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
const auto & index_factory = MergeTreeIndexFactory::instance(); const auto & index_factory = MergeTreeIndexFactory::instance();
MergedBlockOutputStream out(new_data_part, metadata_snapshot,columns, auto out = std::make_unique<MergedBlockOutputStream>(new_data_part, metadata_snapshot,columns,
index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec); index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec);
bool sync_on_insert = data_settings->fsync_after_insert; out->writeWithPermutation(block, perm_ptr);
out.writeWithPermutation(block, perm_ptr);
for (const auto & projection : metadata_snapshot->getProjections()) for (const auto & projection : metadata_snapshot->getProjections())
{ {
auto projection_block = projection.calculate(block, context); auto projection_block = projection.calculate(block, context);
if (projection_block.rows()) if (projection_block.rows())
new_data_part->addProjectionPart( {
projection.name, writeProjectionPart(data, log, projection_block, projection, new_data_part.get())); auto proj_temp_part = writeProjectionPart(data, log, projection_block, projection, new_data_part.get());
new_data_part->addProjectionPart(projection.name, std::move(proj_temp_part.part));
for (auto & stream : proj_temp_part.streams)
temp_part.streams.emplace_back(std::move(stream));
}
} }
auto written_files = out.finalizePart(new_data_part); auto finalizer = out->finalizePart(new_data_part, data_settings->fsync_after_insert);
out.finish(new_data_part, std::move(written_files), sync_on_insert);
temp_part.part = new_data_part;
temp_part.streams.emplace_back(TempPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)});
/// out.finish(new_data_part, std::move(written_files), sync_on_insert);
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows()); ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows());
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes()); ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes());
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->getBytesOnDisk()); ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->getBytesOnDisk());
return new_data_part; return temp_part;
} }
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl( MergeTreeDataWriter::TempPart MergeTreeDataWriter::writeProjectionPartImpl(
const String & part_name, const String & part_name,
MergeTreeDataPartType part_type, MergeTreeDataPartType part_type,
const String & relative_path, const String & relative_path,
@ -453,6 +466,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl(
Block block, Block block,
const ProjectionDescription & projection) const ProjectionDescription & projection)
{ {
TempPart temp_part;
const StorageMetadataPtr & metadata_snapshot = projection.metadata; const StorageMetadataPtr & metadata_snapshot = projection.metadata;
MergeTreePartInfo new_part_info("all", 0, 0, 0); MergeTreePartInfo new_part_info("all", 0, 0, 0);
auto new_data_part = data.createPart( auto new_data_part = data.createPart(
@ -524,25 +538,28 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl(
/// either default lz4 or compression method with zero thresholds on absolute and relative part size. /// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0); auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
MergedBlockOutputStream out( auto out = std::make_unique<MergedBlockOutputStream>(
new_data_part, new_data_part,
metadata_snapshot, metadata_snapshot,
columns, columns,
{}, MergeTreeIndices{},
compression_codec); compression_codec);
out.writeWithPermutation(block, perm_ptr); out->writeWithPermutation(block, perm_ptr);
auto written_files = out.finalizePart(new_data_part); auto finalizer = out->finalizePart(new_data_part, false);
out.finish(new_data_part, std::move(written_files), false); temp_part.part = new_data_part;
temp_part.streams.emplace_back(TempPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)});
// out.finish(new_data_part, std::move(written_files), false);
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterRows, block.rows()); ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterRows, block.rows());
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterUncompressedBytes, block.bytes()); ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterUncompressedBytes, block.bytes());
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterCompressedBytes, new_data_part->getBytesOnDisk()); ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterCompressedBytes, new_data_part->getBytesOnDisk());
return new_data_part; return temp_part;
} }
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPart( MergeTreeDataWriter::TempPart MergeTreeDataWriter::writeProjectionPart(
MergeTreeData & data, Poco::Logger * log, Block block, const ProjectionDescription & projection, const IMergeTreeDataPart * parent_part) MergeTreeData & data, Poco::Logger * log, Block block, const ProjectionDescription & projection, const IMergeTreeDataPart * parent_part)
{ {
String part_name = projection.name; String part_name = projection.name;
@ -574,7 +591,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPart(
/// This is used for projection materialization process which may contain multiple stages of /// This is used for projection materialization process which may contain multiple stages of
/// projection part merges. /// projection part merges.
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart( MergeTreeDataWriter::TempPart MergeTreeDataWriter::writeTempProjectionPart(
MergeTreeData & data, MergeTreeData & data,
Poco::Logger * log, Poco::Logger * log,
Block block, Block block,
@ -609,7 +626,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart(
projection); projection);
} }
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeInMemoryProjectionPart( MergeTreeDataWriter::TempPart MergeTreeDataWriter::writeInMemoryProjectionPart(
const MergeTreeData & data, const MergeTreeData & data,
Poco::Logger * log, Poco::Logger * log,
Block block, Block block,

View File

@ -10,6 +10,7 @@
#include <Interpreters/sortBlock.h> #include <Interpreters/sortBlock.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
namespace DB namespace DB
@ -46,11 +47,25 @@ public:
*/ */
MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert); MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert);
MergeTreeData::MutableDataPartPtr struct TempPart
writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context); {
MergeTreeData::MutableDataPartPtr part;
struct Stream
{
std::unique_ptr<MergedBlockOutputStream> stream;
MergedBlockOutputStream::Finalizer finalizer;
};
std::vector<Stream> streams;
void finalize();
};
TempPart writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
/// For insertion. /// For insertion.
static MergeTreeData::MutableDataPartPtr writeProjectionPart( static TempPart writeProjectionPart(
MergeTreeData & data, MergeTreeData & data,
Poco::Logger * log, Poco::Logger * log,
Block block, Block block,
@ -58,7 +73,7 @@ public:
const IMergeTreeDataPart * parent_part); const IMergeTreeDataPart * parent_part);
/// For mutation: MATERIALIZE PROJECTION. /// For mutation: MATERIALIZE PROJECTION.
static MergeTreeData::MutableDataPartPtr writeTempProjectionPart( static TempPart writeTempProjectionPart(
MergeTreeData & data, MergeTreeData & data,
Poco::Logger * log, Poco::Logger * log,
Block block, Block block,
@ -67,7 +82,7 @@ public:
size_t block_num); size_t block_num);
/// For WriteAheadLog AddPart. /// For WriteAheadLog AddPart.
static MergeTreeData::MutableDataPartPtr writeInMemoryProjectionPart( static TempPart writeInMemoryProjectionPart(
const MergeTreeData & data, const MergeTreeData & data,
Poco::Logger * log, Poco::Logger * log,
Block block, Block block,
@ -82,7 +97,7 @@ public:
const MergeTreeData::MergingParams & merging_params); const MergeTreeData::MergingParams & merging_params);
private: private:
static MergeTreeData::MutableDataPartPtr writeProjectionPartImpl( static TempPart writeProjectionPartImpl(
const String & part_name, const String & part_name,
MergeTreeDataPartType part_type, MergeTreeDataPartType part_type,
const String & relative_path, const String & relative_path,

View File

@ -7,6 +7,21 @@
namespace DB namespace DB
{ {
MergeTreeSink::~MergeTreeSink() = default;
MergeTreeSink::MergeTreeSink(
StorageMergeTree & storage_,
StorageMetadataPtr metadata_snapshot_,
size_t max_parts_per_block_,
ContextPtr context_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_parts_per_block(max_parts_per_block_)
, context(context_)
{
}
void MergeTreeSink::onStart() void MergeTreeSink::onStart()
{ {
/// Only check "too many parts" before write, /// Only check "too many parts" before write,
@ -14,6 +29,17 @@ void MergeTreeSink::onStart()
storage.delayInsertOrThrowIfNeeded(); storage.delayInsertOrThrowIfNeeded();
} }
void MergeTreeSink::onFinish()
{
finishPrevPart();
}
struct MergeTreeSink::PrevPart
{
MergeTreeDataWriter::TempPart temp_part;
UInt64 elapsed_ns;
};
void MergeTreeSink::consume(Chunk chunk) void MergeTreeSink::consume(Chunk chunk)
{ {
@ -24,22 +50,49 @@ void MergeTreeSink::consume(Chunk chunk)
{ {
Stopwatch watch; Stopwatch watch;
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, context); auto temp_part = storage.writer.writeTempPart(current_block, metadata_snapshot, context);
LOG_TRACE(&Poco::Logger::get("MergeTreeSink"), "Written part {}", temp_part.part->getNameWithState());
UInt64 elapsed_ns = watch.elapsed();
/// If optimize_on_insert setting is true, current_block could become empty after merge /// If optimize_on_insert setting is true, current_block could become empty after merge
/// and we didn't create part. /// and we didn't create part.
if (!part) if (!temp_part.part)
continue; continue;
finishPrevPart();
prev_part = std::make_unique<MergeTreeSink::PrevPart>();
prev_part->temp_part = std::move(temp_part);
prev_part->elapsed_ns = elapsed_ns;
}
}
void MergeTreeSink::finishPrevPart()
{
if (prev_part)
{
LOG_TRACE(&Poco::Logger::get("MergeTreeSink"), "Finalizing part {}", prev_part->temp_part.part->getNameWithState());
prev_part->temp_part.finalize();
LOG_TRACE(&Poco::Logger::get("MergeTreeSink"), "Finalized part {}", prev_part->temp_part.part->getNameWithState());
auto & part = prev_part->temp_part.part;
/// Part can be deduplicated, so increment counters and add to part log only if it's really added /// Part can be deduplicated, so increment counters and add to part log only if it's really added
if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog())) if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog()))
{ {
PartLog::addNewPart(storage.getContext(), part, watch.elapsed()); PartLog::addNewPart(storage.getContext(), part, prev_part->elapsed_ns);
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'. /// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
storage.background_operations_assignee.trigger(); storage.background_operations_assignee.trigger();
} }
LOG_TRACE(&Poco::Logger::get("MergeTreeSink"), "Renamed part {}", prev_part->temp_part.part->getNameWithState());
} }
prev_part.reset();
} }
} }

View File

@ -16,26 +16,27 @@ class MergeTreeSink : public SinkToStorage
public: public:
MergeTreeSink( MergeTreeSink(
StorageMergeTree & storage_, StorageMergeTree & storage_,
const StorageMetadataPtr metadata_snapshot_, StorageMetadataPtr metadata_snapshot_,
size_t max_parts_per_block_, size_t max_parts_per_block_,
ContextPtr context_) ContextPtr context_);
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_) ~MergeTreeSink() override;
, metadata_snapshot(metadata_snapshot_)
, max_parts_per_block(max_parts_per_block_)
, context(context_)
{
}
String getName() const override { return "MergeTreeSink"; } String getName() const override { return "MergeTreeSink"; }
void consume(Chunk chunk) override; void consume(Chunk chunk) override;
void onStart() override; void onStart() override;
void onFinish() override;
private: private:
StorageMergeTree & storage; StorageMergeTree & storage;
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
size_t max_parts_per_block; size_t max_parts_per_block;
ContextPtr context; ContextPtr context;
struct PrevPart;
std::unique_ptr<PrevPart> prev_part;
void finishPrevPart();
}; };
} }

View File

@ -208,13 +208,13 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
for (const auto & projection : metadata_snapshot->getProjections()) for (const auto & projection : metadata_snapshot->getProjections())
{ {
auto projection_block = projection.calculate(block, context); auto projection_block = projection.calculate(block, context);
auto temp_part = MergeTreeDataWriter::writeInMemoryProjectionPart(storage, log, projection_block, projection, part.get());
temp_part.finalize();
if (projection_block.rows()) if (projection_block.rows())
part->addProjectionPart( part->addProjectionPart(projection.name, std::move(temp_part.part));
projection.name,
MergeTreeDataWriter::writeInMemoryProjectionPart(storage, log, projection_block, projection, part.get()));
} }
auto written_files = part_out.finalizePart(part); auto finalizer = part_out.finalizePart(part, false);
part_out.finish(part, std::move(written_files), false); part_out.finish(std::move(finalizer));
min_block_number = std::min(min_block_number, part->info.min_block); min_block_number = std::min(min_block_number, part->info.min_block);
max_block_number = std::max(max_block_number, part->info.max_block); max_block_number = std::max(max_block_number, part->info.max_block);

View File

@ -51,8 +51,21 @@ void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IC
writeImpl(block, permutation); writeImpl(block, permutation);
} }
MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePart( struct MergedBlockOutputStream::Finalizer::Impl
{
MergeTreeData::MutableDataPartPtr part;
std::vector<std::unique_ptr<WriteBufferFromFileBase>> written_files;
bool sync;
};
MergedBlockOutputStream::Finalizer::~Finalizer() = default;
MergedBlockOutputStream::Finalizer::Finalizer(Finalizer &&) = default;
MergedBlockOutputStream::Finalizer & MergedBlockOutputStream::Finalizer::operator=(Finalizer &&) = default;
MergedBlockOutputStream::Finalizer::Finalizer(std::unique_ptr<Impl> impl_) : impl(std::move(impl_)) {}
MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePart(
MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::MutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list, const NamesAndTypesList * total_columns_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums) MergeTreeData::DataPart::Checksums * additional_column_checksums)
{ {
@ -65,6 +78,8 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePart(
/// Finish columns serialization. /// Finish columns serialization.
writer->fillChecksums(checksums); writer->fillChecksums(checksums);
LOG_TRACE(&Poco::Logger::get("MergedBlockOutputStream"), "filled checksums {}", new_part->getNameWithState());
for (const auto & [projection_name, projection_part] : new_part->getProjectionParts()) for (const auto & [projection_name, projection_part] : new_part->getProjectionParts())
checksums.addFile( checksums.addFile(
projection_name + ".proj", projection_name + ".proj",
@ -81,9 +96,11 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePart(
? new_serialization_infos ? new_serialization_infos
: new_part->getSerializationInfos(); : new_part->getSerializationInfos();
WrittenFiles written_files; auto finalizer = std::make_unique<Finalizer::Impl>();
finalizer->sync = sync;
finalizer->part = new_part;
if (new_part->isStoredOnDisk()) if (new_part->isStoredOnDisk())
written_files = finalizePartOnDisk(new_part, part_columns, serialization_infos, checksums); finalizer->written_files = finalizePartOnDisk(new_part, part_columns, serialization_infos, checksums);
if (reset_columns) if (reset_columns)
new_part->setColumns(part_columns, serialization_infos); new_part->setColumns(part_columns, serialization_infos);
@ -96,23 +113,24 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePart(
new_part->index_granularity = writer->getIndexGranularity(); new_part->index_granularity = writer->getIndexGranularity();
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(); new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk();
return written_files; if (default_codec != nullptr)
new_part->default_codec = default_codec;
return Finalizer(std::move(finalizer));
} }
void MergedBlockOutputStream::finish(MergeTreeData::MutableDataPartPtr & new_part, WrittenFiles files_to_finalize, bool sync) void MergedBlockOutputStream::finish(Finalizer finalizer)
{ {
writer->finish(sync); writer->finish(finalizer.impl->sync);
for (auto & file : files_to_finalize) for (auto & file : finalizer.impl->written_files)
{ {
file->finalize(); file->finalize();
if (sync) if (sync)
file->sync(); file->sync();
} }
if (default_codec != nullptr) finalizer.impl->part->storage.lockSharedData(*finalizer.impl->part);
new_part->default_codec = default_codec;
new_part->storage.lockSharedData(*new_part);
} }
MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDisk( MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDisk(
@ -121,6 +139,9 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
SerializationInfoByName & serialization_infos, SerializationInfoByName & serialization_infos,
MergeTreeData::DataPart::Checksums & checksums) MergeTreeData::DataPart::Checksums & checksums)
{ {
LOG_TRACE(&Poco::Logger::get("MergedBlockOutputStream"), "finalizing {}", new_part->getNameWithState());
WrittenFiles written_files; WrittenFiles written_files;
if (new_part->isProjectionPart()) if (new_part->isProjectionPart())
{ {

View File

@ -32,16 +32,26 @@ public:
*/ */
void writeWithPermutation(const Block & block, const IColumn::Permutation * permutation); void writeWithPermutation(const Block & block, const IColumn::Permutation * permutation);
using WrittenFiles = std::vector<std::unique_ptr<WriteBufferFromFileBase>>; struct Finalizer
{
struct Impl;
std::unique_ptr<Impl> impl;
explicit Finalizer(std::unique_ptr<Impl> impl_);
~Finalizer();
Finalizer(Finalizer &&);
Finalizer & operator=(Finalizer &&);
};
/// Finalize writing part and fill inner structures /// Finalize writing part and fill inner structures
/// If part is new and contains projections, they should be added before invoking this method. /// If part is new and contains projections, they should be added before invoking this method.
WrittenFiles finalizePart( Finalizer finalizePart(
MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::MutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list = nullptr, const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr); MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
void finish(MergeTreeData::MutableDataPartPtr & new_part, WrittenFiles files_to_finalize, bool sync); void finish(Finalizer finalizer);
private: private:
/** If `permutation` is given, it rearranges the values in the columns when writing. /** If `permutation` is given, it rearranges the values in the columns when writing.
@ -49,7 +59,8 @@ private:
*/ */
void writeImpl(const Block & block, const IColumn::Permutation * permutation); void writeImpl(const Block & block, const IColumn::Permutation * permutation);
MergedBlockOutputStream::WrittenFiles finalizePartOnDisk( using WrittenFiles = std::vector<std::unique_ptr<WriteBufferFromFileBase>>;
WrittenFiles finalizePartOnDisk(
const MergeTreeData::MutableDataPartPtr & new_part, const MergeTreeData::MutableDataPartPtr & new_part,
NamesAndTypesList & part_columns, NamesAndTypesList & part_columns,
SerializationInfoByName & serialization_infos, SerializationInfoByName & serialization_infos,

View File

@ -805,8 +805,12 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
const auto & projection = *ctx->projections_to_build[i]; const auto & projection = *ctx->projections_to_build[i];
auto projection_block = projection_squashes[i].add(projection.calculate(cur_block, ctx->context)); auto projection_block = projection_squashes[i].add(projection.calculate(cur_block, ctx->context));
if (projection_block) if (projection_block)
projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart( {
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num)); auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num);
tmp_part.finalize();
projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
}
} }
(*ctx->mutate_entry)->rows_written += cur_block.rows(); (*ctx->mutate_entry)->rows_written += cur_block.rows();
@ -824,8 +828,10 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
auto projection_block = projection_squash.add({}); auto projection_block = projection_squash.add({});
if (projection_block) if (projection_block)
{ {
projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart( auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num)); *ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num);
temp_part.finalize();
projection_parts[projection.name].emplace_back(std::move(temp_part.part));
} }
} }
@ -977,8 +983,8 @@ private:
ctx->mutating_executor.reset(); ctx->mutating_executor.reset();
ctx->mutating_pipeline.reset(); ctx->mutating_pipeline.reset();
auto written_files = static_pointer_cast<MergedBlockOutputStream>(ctx->out)->finalizePart(ctx->new_data_part); auto finalizer = static_pointer_cast<MergedBlockOutputStream>(ctx->out)->finalizePart(ctx->new_data_part, ctx->need_sync);
static_pointer_cast<MergedBlockOutputStream>(ctx->out)->finish(ctx->new_data_part, std::move(written_files), ctx->need_sync); static_pointer_cast<MergedBlockOutputStream>(ctx->out)->finish(std::move(finalizer));
ctx->out.reset(); ctx->out.reset();
} }

View File

@ -147,11 +147,11 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
/// Write part to the filesystem under temporary name. Calculate a checksum. /// Write part to the filesystem under temporary name. Calculate a checksum.
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, context); auto temp_part = storage.writer.writeTempPart(current_block, metadata_snapshot, context);
/// If optimize_on_insert setting is true, current_block could become empty after merge /// If optimize_on_insert setting is true, current_block could become empty after merge
/// and we didn't create part. /// and we didn't create part.
if (!part) if (!temp_part.part)
continue; continue;
String block_id; String block_id;
@ -160,7 +160,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
{ {
/// We add the hash from the data and partition identifier to deduplication ID. /// We add the hash from the data and partition identifier to deduplication ID.
/// That is, do not insert the same data to the same partition twice. /// That is, do not insert the same data to the same partition twice.
block_id = part->getZeroLevelPartBlockID(); block_id = temp_part.part->getZeroLevelPartBlockID();
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows()); LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows());
} }
@ -169,6 +169,11 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
LOG_DEBUG(log, "Wrote block with {} rows", current_block.block.rows()); LOG_DEBUG(log, "Wrote block with {} rows", current_block.block.rows());
} }
for (auto & stream : temp_part.streams)
stream.stream->finish(std::move(stream.finalizer));
auto & part = temp_part.part;
try try
{ {
commitPart(zookeeper, part, block_id); commitPart(zookeeper, part, block_id);

View File

@ -7405,8 +7405,8 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
/// TODO(ab): What projections should we add to the empty part? How can we make sure that it /// TODO(ab): What projections should we add to the empty part? How can we make sure that it
/// won't block future merges? Perhaps we should also check part emptiness when selecting parts /// won't block future merges? Perhaps we should also check part emptiness when selecting parts
/// to merge. /// to merge.
auto written_files = out.finalizePart(new_data_part); auto finalizer = out.finalizePart(new_data_part, sync_on_insert);
out.finish(new_data_part, std::move(written_files), sync_on_insert); out.finish(std::move(finalizer));
try try
{ {