Merge pull request #73193 from ClickHouse/backport/24.9/72126

Backport #72126 to 24.9: fix cancelation for PartitionedSink
This commit is contained in:
Sema Checherinda 2024-12-13 16:59:33 +01:00 committed by GitHub
commit 25d896b2df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 13 additions and 0 deletions

View File

@ -146,6 +146,12 @@ String PartitionedSink::replaceWildcards(const String & haystack, const String &
return boost::replace_all_copy(haystack, PartitionedSink::PARTITION_ID_WILDCARD, partition_id); return boost::replace_all_copy(haystack, PartitionedSink::PARTITION_ID_WILDCARD, partition_id);
} }
PartitionedSink::~PartitionedSink()
{
if (isCancelled())
for (auto & item : partition_id_to_sink)
item.second->cancel();
}
} }
// NOLINTEND(clang-analyzer-optin.core.EnumCastOutOfRange) // NOLINTEND(clang-analyzer-optin.core.EnumCastOutOfRange)

View File

@ -18,6 +18,8 @@ public:
PartitionedSink(const ASTPtr & partition_by, ContextPtr context_, const Block & sample_block_); PartitionedSink(const ASTPtr & partition_by, ContextPtr context_, const Block & sample_block_);
~PartitionedSink() override;
String getName() const override { return "PartitionedSink"; } String getName() const override { return "PartitionedSink"; }
void consume(Chunk & chunk) override; void consume(Chunk & chunk) override;

View File

@ -231,6 +231,11 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
disk->replaceFile(path + tmp_backup_file_name, path + std::to_string(increment) + ".bin"); disk->replaceFile(path + tmp_backup_file_name, path + std::to_string(increment) + ".bin");
} }
else
{
compressed_backup_buf.cancel();
backup_buf->cancel();
}
} }
HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join, ContextPtr context, const Names & required_columns_names) const HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join, ContextPtr context, const Names & required_columns_names) const