Fix 'WriteBuffer is not finalized' with timeout_overflow_mode=break

This commit is contained in:
Michael Kolupaev 2024-07-05 19:51:05 +00:00
parent 76119a4567
commit 1247021c1e
6 changed files with 26 additions and 4 deletions

View File

@ -40,7 +40,7 @@ static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception
&& exception.code() != ErrorCodes::QUERY_WAS_CANCELLED;
}
static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_progress_callback)
static bool executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_progress_callback)
{
try
{
@ -60,7 +60,7 @@ static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_
read_progress_callback->addTotalBytes(read_progress->counters.total_bytes);
if (!read_progress_callback->onProgress(read_progress->counters.read_rows, read_progress->counters.read_bytes, read_progress->limits))
node->processor->cancel();
return false;
}
}
}
@ -71,6 +71,7 @@ static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_
exception.addMessage("While executing " + node->processor->getName());
throw exception;
}
return true;
}
bool ExecutionThreadContext::executeTask()
@ -91,9 +92,10 @@ bool ExecutionThreadContext::executeTask()
execution_time_watch.emplace();
#endif
bool ok = false;
try
{
executeJob(node, read_progress_callback);
ok = executeJob(node, read_progress_callback);
++node->num_executed_jobs;
}
catch (...)
@ -113,7 +115,7 @@ bool ExecutionThreadContext::executeTask()
if (trace_processors)
span->addAttribute("execution_time_ns", execution_time_watch->elapsed());
#endif
return node->exception == nullptr;
return ok;
}
void ExecutionThreadContext::rethrowExceptionIfHas()

View File

@ -29,6 +29,8 @@ public:
/// For merges in mutations it may need special logic, it's done inside ProgressCallback.
void disableProfileEventUpdate() { update_profile_events = false; }
/// Returns false if the query should be quietly cancelled.
/// Throws exception if the query should fail.
bool onProgress(uint64_t read_rows, uint64_t read_bytes, const StorageLimitsList & storage_limits);
private:

View File

@ -1155,6 +1155,16 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::onFinish()
finishDelayedChunk(std::make_shared<ZooKeeperWithFaultInjection>(zookeeper));
}
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::onCancel()
{
if (!delayed_chunk)
return;
for (auto & partition : delayed_chunk->partitions)
partition.temp_part.cancel();
delayed_chunk.reset();
}
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::waitForQuorum(
const ZooKeeperWithFaultInjectionPtr & zookeeper,

View File

@ -54,6 +54,8 @@ public:
void consume(Chunk chunk) override;
void onFinish() override;
void onCancel() override;
String getName() const override { return "ReplicatedMergeTreeSink"; }
/// For ATTACHing existing data on filesystem.

View File

@ -0,0 +1,5 @@
drop table if exists a sync;
create table a (x Int8) engine ReplicatedMergeTree('/tables/{database}/a','0') order by x;
insert into a select sleepEachRow(1) from numbers(10000) settings max_block_size=1, min_insert_block_size_rows=1, max_execution_time=2, timeout_overflow_mode='break';
insert into a select sleepEachRow(1) from numbers(10000) settings max_block_size=1, min_insert_block_size_rows=1, max_execution_time=2, timeout_overflow_mode='break', deduplicate_blocks_in_dependent_materialized_views=1;
select count() < 30 from a;