diff --git a/src/Storages/PartitionedSink.cpp b/src/Storages/PartitionedSink.cpp index ee2570756ed..0f93d1a5b75 100644 --- a/src/Storages/PartitionedSink.cpp +++ b/src/Storages/PartitionedSink.cpp @@ -146,6 +146,12 @@ String PartitionedSink::replaceWildcards(const String & haystack, const String & return boost::replace_all_copy(haystack, PartitionedSink::PARTITION_ID_WILDCARD, partition_id); } +PartitionedSink::~PartitionedSink() +{ + if (isCancelled()) + for (auto & item : partition_id_to_sink) + item.second->cancel(); +} } // NOLINTEND(clang-analyzer-optin.core.EnumCastOutOfRange) diff --git a/src/Storages/PartitionedSink.h b/src/Storages/PartitionedSink.h index fcd67556dc9..6487eaecfd1 100644 --- a/src/Storages/PartitionedSink.h +++ b/src/Storages/PartitionedSink.h @@ -18,6 +18,8 @@ public: PartitionedSink(const ASTPtr & partition_by, ContextPtr context_, const Block & sample_block_); + ~PartitionedSink() override; + String getName() const override { return "PartitionedSink"; } void consume(Chunk & chunk) override; diff --git a/src/Storages/StorageJoin.cpp b/src/Storages/StorageJoin.cpp index 0bab4861fa1..b8e3d9f8576 100644 --- a/src/Storages/StorageJoin.cpp +++ b/src/Storages/StorageJoin.cpp @@ -230,6 +230,11 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context) disk->replaceFile(path + tmp_backup_file_name, path + std::to_string(increment) + ".bin"); } + else + { + compressed_backup_buf.cancel(); + backup_buf->cancel(); + } } HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr analyzed_join, ContextPtr context, const Names & required_columns_names) const