From 1247021c1ed7fdc6dbf6698a66bd258a7cd1d649 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Fri, 5 Jul 2024 19:51:05 +0000 Subject: [PATCH] Fix 'WriteBuffer is not finalized' with timeout_overflow_mode=break --- src/Processors/Executors/ExecutionThreadContext.cpp | 10 ++++++---- src/QueryPipeline/ReadProgressCallback.h | 2 ++ src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp | 10 ++++++++++ src/Storages/MergeTree/ReplicatedMergeTreeSink.h | 2 ++ .../03201_insert_timeout_overflow_mode.reference | 1 + .../0_stateless/03201_insert_timeout_overflow_mode.sql | 5 +++++ 6 files changed, 26 insertions(+), 4 deletions(-) create mode 100644 tests/queries/0_stateless/03201_insert_timeout_overflow_mode.reference create mode 100644 tests/queries/0_stateless/03201_insert_timeout_overflow_mode.sql diff --git a/src/Processors/Executors/ExecutionThreadContext.cpp b/src/Processors/Executors/ExecutionThreadContext.cpp index 05669725f9a..935f7ff8937 100644 --- a/src/Processors/Executors/ExecutionThreadContext.cpp +++ b/src/Processors/Executors/ExecutionThreadContext.cpp @@ -40,7 +40,7 @@ static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception && exception.code() != ErrorCodes::QUERY_WAS_CANCELLED; } -static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_progress_callback) +static bool executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_progress_callback) { try { @@ -60,7 +60,7 @@ static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_ read_progress_callback->addTotalBytes(read_progress->counters.total_bytes); if (!read_progress_callback->onProgress(read_progress->counters.read_rows, read_progress->counters.read_bytes, read_progress->limits)) - node->processor->cancel(); + return false; } } } @@ -71,6 +71,7 @@ static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_ exception.addMessage("While executing " + node->processor->getName()); throw exception; } + return true; } bool ExecutionThreadContext::executeTask() @@ -91,9 +92,10 @@ bool ExecutionThreadContext::executeTask() execution_time_watch.emplace(); #endif + bool ok = false; try { - executeJob(node, read_progress_callback); + ok = executeJob(node, read_progress_callback); ++node->num_executed_jobs; } catch (...) @@ -113,7 +115,7 @@ bool ExecutionThreadContext::executeTask() if (trace_processors) span->addAttribute("execution_time_ns", execution_time_watch->elapsed()); #endif - return node->exception == nullptr; + return ok; } void ExecutionThreadContext::rethrowExceptionIfHas() diff --git a/src/QueryPipeline/ReadProgressCallback.h b/src/QueryPipeline/ReadProgressCallback.h index 7dfed9df5da..c20147e0948 100644 --- a/src/QueryPipeline/ReadProgressCallback.h +++ b/src/QueryPipeline/ReadProgressCallback.h @@ -29,6 +29,8 @@ public: /// For merges in mutations it may need special logic, it's done inside ProgressCallback. void disableProfileEventUpdate() { update_profile_events = false; } + /// Returns false if the query should be quietly cancelled. + /// Throws exception if the query should fail. bool onProgress(uint64_t read_rows, uint64_t read_bytes, const StorageLimitsList & storage_limits); private: diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 4b4f4c33e7d..72169812b4f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -1155,6 +1155,16 @@ void ReplicatedMergeTreeSinkImpl::onFinish() finishDelayedChunk(std::make_shared(zookeeper)); } +template +void ReplicatedMergeTreeSinkImpl::onCancel() +{ + if (!delayed_chunk) + return; + for (auto & partition : delayed_chunk->partitions) + partition.temp_part.cancel(); + delayed_chunk.reset(); +} + template void ReplicatedMergeTreeSinkImpl::waitForQuorum( const ZooKeeperWithFaultInjectionPtr & zookeeper, diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.h b/src/Storages/MergeTree/ReplicatedMergeTreeSink.h index 39623c20584..b1796a35ed2 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.h @@ -54,6 +54,8 @@ public: void consume(Chunk chunk) override; void onFinish() override; + void onCancel() override; + String getName() const override { return "ReplicatedMergeTreeSink"; } /// For ATTACHing existing data on filesystem. diff --git a/tests/queries/0_stateless/03201_insert_timeout_overflow_mode.reference b/tests/queries/0_stateless/03201_insert_timeout_overflow_mode.reference new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/tests/queries/0_stateless/03201_insert_timeout_overflow_mode.reference @@ -0,0 +1 @@ +1 diff --git a/tests/queries/0_stateless/03201_insert_timeout_overflow_mode.sql b/tests/queries/0_stateless/03201_insert_timeout_overflow_mode.sql new file mode 100644 index 00000000000..90694e5880b --- /dev/null +++ b/tests/queries/0_stateless/03201_insert_timeout_overflow_mode.sql @@ -0,0 +1,5 @@ +drop table if exists a sync; +create table a (x Int8) engine ReplicatedMergeTree('/tables/{database}/a','0') order by x; +insert into a select sleepEachRow(1) from numbers(10000) settings max_block_size=1, min_insert_block_size_rows=1, max_execution_time=2, timeout_overflow_mode='break'; +insert into a select sleepEachRow(1) from numbers(10000) settings max_block_size=1, min_insert_block_size_rows=1, max_execution_time=2, timeout_overflow_mode='break', deduplicate_blocks_in_dependent_materialized_views=1; +select count() < 30 from a;