From a264de35d1eec53fd38b783cfc75fc497ed0c29b Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Wed, 20 Nov 2024 10:44:07 +0100 Subject: [PATCH 1/2] fix cancelation for PartitionedSink --- src/Storages/PartitionedSink.cpp | 6 ++++++ src/Storages/PartitionedSink.h | 2 ++ 2 files changed, 8 insertions(+) diff --git a/src/Storages/PartitionedSink.cpp b/src/Storages/PartitionedSink.cpp index ee2570756ed..0f93d1a5b75 100644 --- a/src/Storages/PartitionedSink.cpp +++ b/src/Storages/PartitionedSink.cpp @@ -146,6 +146,12 @@ String PartitionedSink::replaceWildcards(const String & haystack, const String & return boost::replace_all_copy(haystack, PartitionedSink::PARTITION_ID_WILDCARD, partition_id); } +PartitionedSink::~PartitionedSink() +{ + if (isCancelled()) + for (auto & item : partition_id_to_sink) + item.second->cancel(); +} } // NOLINTEND(clang-analyzer-optin.core.EnumCastOutOfRange) diff --git a/src/Storages/PartitionedSink.h b/src/Storages/PartitionedSink.h index fcd67556dc9..6487eaecfd1 100644 --- a/src/Storages/PartitionedSink.h +++ b/src/Storages/PartitionedSink.h @@ -18,6 +18,8 @@ public: PartitionedSink(const ASTPtr & partition_by, ContextPtr context_, const Block & sample_block_); + ~PartitionedSink() override; + String getName() const override { return "PartitionedSink"; } void consume(Chunk & chunk) override; From e68011e8f59894a240c04c18d283aba05cd5d6cc Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Wed, 20 Nov 2024 11:04:38 +0100 Subject: [PATCH 2/2] fix StorageJoin when not persistent --- src/Storages/StorageJoin.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Storages/StorageJoin.cpp b/src/Storages/StorageJoin.cpp index 0bab4861fa1..b8e3d9f8576 100644 --- a/src/Storages/StorageJoin.cpp +++ b/src/Storages/StorageJoin.cpp @@ -230,6 +230,11 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context) disk->replaceFile(path + tmp_backup_file_name, path + std::to_string(increment) + ".bin"); } + else + { + compressed_backup_buf.cancel(); + backup_buf->cancel(); + } } HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr analyzed_join, ContextPtr context, const Names & required_columns_names) const