fix cancelation for PartitionedSink

This commit is contained in:
Sema Checherinda 2024-11-20 10:44:07 +01:00
parent 0fd196771e
commit a264de35d1
2 changed files with 8 additions and 0 deletions

View File

@ -146,6 +146,12 @@ String PartitionedSink::replaceWildcards(const String & haystack, const String &
return boost::replace_all_copy(haystack, PartitionedSink::PARTITION_ID_WILDCARD, partition_id); return boost::replace_all_copy(haystack, PartitionedSink::PARTITION_ID_WILDCARD, partition_id);
} }
PartitionedSink::~PartitionedSink()
{
if (isCancelled())
for (auto & item : partition_id_to_sink)
item.second->cancel();
}
} }
// NOLINTEND(clang-analyzer-optin.core.EnumCastOutOfRange) // NOLINTEND(clang-analyzer-optin.core.EnumCastOutOfRange)

View File

@ -18,6 +18,8 @@ public:
PartitionedSink(const ASTPtr & partition_by, ContextPtr context_, const Block & sample_block_); PartitionedSink(const ASTPtr & partition_by, ContextPtr context_, const Block & sample_block_);
~PartitionedSink() override;
String getName() const override { return "PartitionedSink"; } String getName() const override { return "PartitionedSink"; }
void consume(Chunk & chunk) override; void consume(Chunk & chunk) override;