From 04286bc270f9f473a07bc4ae27ae61d96256f775 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 13 Aug 2024 14:45:05 +0000 Subject: [PATCH 1/3] Add status to PipelineExecutor. Verify status of pusing pipeline. --- programs/obfuscator/Obfuscator.cpp | 2 +- src/Client/LocalConnection.cpp | 8 +++- src/Interpreters/SystemLog.cpp | 2 +- src/Processors/Executors/ExecutingGraph.cpp | 19 ++++----- src/Processors/Executors/ExecutingGraph.h | 11 ++++- src/Processors/Executors/PipelineExecutor.cpp | 31 +++++++++----- src/Processors/Executors/PipelineExecutor.h | 20 +++++++++- .../PushingAsyncPipelineExecutor.cpp | 21 +++++++--- .../Executors/PushingAsyncPipelineExecutor.h | 7 +++- .../Executors/PushingPipelineExecutor.cpp | 40 ++++++++++++++----- .../Executors/PushingPipelineExecutor.h | 9 +++-- .../Transforms/CreatingSetsTransform.cpp | 3 +- src/Server/GRPCServer.cpp | 3 +- src/Server/TCPHandler.cpp | 13 ++++-- src/Storages/Distributed/DistributedSink.cpp | 6 ++- src/Storages/StorageBuffer.cpp | 3 +- src/Storages/tests/gtest_storage_log.cpp | 2 +- ...221_insert_timeout_overflow_mode.reference | 2 + .../03221_insert_timeout_overflow_mode.sh | 8 ++++ 19 files changed, 153 insertions(+), 57 deletions(-) create mode 100644 tests/queries/0_stateless/03221_insert_timeout_overflow_mode.reference create mode 100755 tests/queries/0_stateless/03221_insert_timeout_overflow_mode.sh diff --git a/programs/obfuscator/Obfuscator.cpp b/programs/obfuscator/Obfuscator.cpp index 688ae1a1143..7c13215e350 100644 --- a/programs/obfuscator/Obfuscator.cpp +++ b/programs/obfuscator/Obfuscator.cpp @@ -1462,7 +1462,7 @@ try while (in_executor.pull(block)) { Columns columns = obfuscator.generate(block.getColumns()); - out_executor.push(header.cloneWithColumns(columns)); + std::ignore = out_executor.push(header.cloneWithColumns(columns)); processed_rows += block.rows(); if (!silent) std::cerr << "Processed " << processed_rows << " rows\n"; diff --git a/src/Client/LocalConnection.cpp b/src/Client/LocalConnection.cpp index 072184e0a66..8f1e0958002 100644 --- a/src/Client/LocalConnection.cpp +++ b/src/Client/LocalConnection.cpp @@ -287,13 +287,17 @@ void LocalConnection::sendData(const Block & block, const String &, bool) if (!block) return; + bool inserted = false; if (state->pushing_async_executor) - state->pushing_async_executor->push(block); + inserted = state->pushing_async_executor->push(block); else if (state->pushing_executor) - state->pushing_executor->push(block); + inserted = state->pushing_executor->push(block); else throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown executor"); + if (!inserted) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data"); + if (send_profile_events) sendProfileEvents(); } diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index 572481e6b12..0cad56af00a 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -556,7 +556,7 @@ void SystemLog::flushImpl(const std::vector & to_flush, PushingPipelineExecutor executor(io.pipeline); executor.start(); - executor.push(block); + std::ignore = executor.push(block); executor.finish(); } catch (...) diff --git a/src/Processors/Executors/ExecutingGraph.cpp b/src/Processors/Executors/ExecutingGraph.cpp index 6d5b60d8159..10470325bb8 100644 --- a/src/Processors/Executors/ExecutingGraph.cpp +++ b/src/Processors/Executors/ExecutingGraph.cpp @@ -96,7 +96,7 @@ bool ExecutingGraph::addEdges(uint64_t node) return was_edge_added; } -bool ExecutingGraph::expandPipeline(std::stack & stack, uint64_t pid) +ExecutingGraph::UpdateNodeStatus ExecutingGraph::expandPipeline(std::stack & stack, uint64_t pid) { auto & cur_node = *nodes[pid]; Processors new_processors; @@ -108,7 +108,7 @@ bool ExecutingGraph::expandPipeline(std::stack & stack, uint64_t pid) catch (...) { cur_node.exception = std::current_exception(); - return false; + return UpdateNodeStatus::Exception; } { @@ -118,7 +118,7 @@ bool ExecutingGraph::expandPipeline(std::stack & stack, uint64_t pid) { for (auto & processor : new_processors) processor->cancel(); - return false; + return UpdateNodeStatus::Cancelled; } processors->insert(processors->end(), new_processors.begin(), new_processors.end()); @@ -178,7 +178,7 @@ bool ExecutingGraph::expandPipeline(std::stack & stack, uint64_t pid) } } - return true; + return UpdateNodeStatus::Done; } void ExecutingGraph::initializeExecution(Queue & queue) @@ -213,7 +213,7 @@ void ExecutingGraph::initializeExecution(Queue & queue) } -bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue) +ExecutingGraph::UpdateNodeStatus ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue) { std::stack updated_edges; std::stack updated_processors; @@ -309,7 +309,7 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue catch (...) { node.exception = std::current_exception(); - return false; + return UpdateNodeStatus::Exception; } #ifndef NDEBUG @@ -386,8 +386,9 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue read_lock.unlock(); { std::unique_lock lock(nodes_mutex); - if (!expandPipeline(updated_processors, pid)) - return false; + auto status = expandPipeline(updated_processors, pid); + if (status != UpdateNodeStatus::Done) + return status; } read_lock.lock(); @@ -397,7 +398,7 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue } } - return true; + return UpdateNodeStatus::Done; } void ExecutingGraph::cancel(bool cancel_all_processors) diff --git a/src/Processors/Executors/ExecutingGraph.h b/src/Processors/Executors/ExecutingGraph.h index 71dcd360a2c..e1a6ac96203 100644 --- a/src/Processors/Executors/ExecutingGraph.h +++ b/src/Processors/Executors/ExecutingGraph.h @@ -138,10 +138,17 @@ public: /// Traverse graph the first time to update all the childless nodes. void initializeExecution(Queue & queue); + enum class UpdateNodeStatus + { + Done, + Exception, + Cancelled, + }; + /// Update processor with pid number (call IProcessor::prepare). /// Check parents and children of current processor and push them to stacks if they also need to be updated. /// If processor wants to be expanded, lock will be upgraded to get write access to pipeline. - bool updateNode(uint64_t pid, Queue & queue, Queue & async_queue); + UpdateNodeStatus updateNode(uint64_t pid, Queue & queue, Queue & async_queue); void cancel(bool cancel_all_processors = true); @@ -155,7 +162,7 @@ private: /// Update graph after processor (pid) returned ExpandPipeline status. /// All new nodes and nodes with updated ports are pushed into stack. - bool expandPipeline(std::stack & stack, uint64_t pid); + UpdateNodeStatus expandPipeline(std::stack & stack, uint64_t pid); std::shared_ptr processors; std::vector source_processors; diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp index 82cad471a29..23b3a6d9f5f 100644 --- a/src/Processors/Executors/PipelineExecutor.cpp +++ b/src/Processors/Executors/PipelineExecutor.cpp @@ -77,9 +77,9 @@ const Processors & PipelineExecutor::getProcessors() const return graph->getProcessors(); } -void PipelineExecutor::cancel() +void PipelineExecutor::cancel(ExecutionStatus reason) { - cancelled = true; + tryUpdateExecutionStatus(ExecutionStatus::Executing, reason); finish(); graph->cancel(); } @@ -98,6 +98,11 @@ void PipelineExecutor::finish() tasks.finish(); } +bool PipelineExecutor::tryUpdateExecutionStatus(ExecutionStatus expected, ExecutionStatus desired) +{ + return execution_status.compare_exchange_strong(expected, desired); +} + void PipelineExecutor::execute(size_t num_threads, bool concurrency_control) { checkTimeLimit(); @@ -120,7 +125,7 @@ void PipelineExecutor::execute(size_t num_threads, bool concurrency_control) } catch (...) { - span.addAttribute(ExecutionStatus::fromCurrentException()); + span.addAttribute(DB::ExecutionStatus::fromCurrentException()); #ifndef NDEBUG LOG_TRACE(log, "Exception while executing query. Current state:\n{}", dumpPipeline()); @@ -169,7 +174,7 @@ bool PipelineExecutor::checkTimeLimitSoft() // We call cancel here so that all processors are notified and tasks waken up // so that the "break" is faster and doesn't wait for long events if (!continuing) - cancel(); + cancel(ExecutionStatus::CancelledByTimeout); return continuing; } @@ -195,7 +200,8 @@ void PipelineExecutor::finalizeExecution() { checkTimeLimit(); - if (cancelled) + auto status = execution_status.load(); + if (status == ExecutionStatus::CancelledByTimeout || status == ExecutionStatus::CancelledByUser) return; bool all_processors_finished = true; @@ -271,7 +277,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie break; if (!context.executeTask()) - cancel(); + cancel(ExecutionStatus::Exception); if (tasks.isFinished()) break; @@ -289,11 +295,13 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie Queue async_queue; /// Prepare processor after execution. - if (!graph->updateNode(context.getProcessorID(), queue, async_queue)) - cancel(); + auto status = graph->updateNode(context.getProcessorID(), queue, async_queue); + if (status == ExecutingGraph::UpdateNodeStatus::Exception) + cancel(ExecutionStatus::Exception); /// Push other tasks to global queue. - tasks.pushTasks(queue, async_queue, context); + if (status == ExecutingGraph::UpdateNodeStatus::Done) + tasks.pushTasks(queue, async_queue, context); } #ifndef NDEBUG @@ -309,7 +317,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie { /// spawnThreads can throw an exception, for example CANNOT_SCHEDULE_TASK. /// We should cancel execution properly before rethrow. - cancel(); + cancel(ExecutionStatus::Exception); throw; } @@ -328,6 +336,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_control) { is_execution_initialized = true; + tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing); size_t use_threads = num_threads; @@ -393,7 +402,7 @@ void PipelineExecutor::executeImpl(size_t num_threads, bool concurrency_control) { /// If finished_flag is not set, there was an exception. /// Cancel execution in this case. - cancel(); + cancel(ExecutionStatus::Exception); if (pool) pool->wait(); } diff --git a/src/Processors/Executors/PipelineExecutor.h b/src/Processors/Executors/PipelineExecutor.h index ae119355cb5..79d0a29d4e1 100644 --- a/src/Processors/Executors/PipelineExecutor.h +++ b/src/Processors/Executors/PipelineExecutor.h @@ -48,8 +48,20 @@ public: const Processors & getProcessors() const; + enum class ExecutionStatus + { + NotStarted, + Executing, + Finished, + Exception, + CancelledByUser, + CancelledByTimeout, + }; + /// Cancel execution. May be called from another thread. - void cancel(); + void cancel() { cancel(ExecutionStatus::CancelledByUser); } + + ExecutionStatus getExecutionStatus() const { return execution_status.load(); } /// Cancel processors which only read data from source. May be called from another thread. void cancelReading(); @@ -81,7 +93,7 @@ private: /// system.opentelemetry_span_log bool trace_processors = false; - std::atomic_bool cancelled = false; + std::atomic execution_status = ExecutionStatus::NotStarted; std::atomic_bool cancelled_reading = false; LoggerPtr log = getLogger("PipelineExecutor"); @@ -105,6 +117,10 @@ private: void executeStepImpl(size_t thread_num, std::atomic_bool * yield_flag = nullptr); void executeSingleThread(size_t thread_num); void finish(); + void cancel(ExecutionStatus reason); + + /// If execution_status == from, change it to desired. + bool tryUpdateExecutionStatus(ExecutionStatus expected, ExecutionStatus desired); String dumpPipeline() const; }; diff --git a/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp index 830a96533ed..db5cf451c9e 100644 --- a/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp @@ -176,7 +176,17 @@ void PushingAsyncPipelineExecutor::start() data->thread = ThreadFromGlobalPool(std::move(func)); } -void PushingAsyncPipelineExecutor::push(Chunk chunk) +static void checkExecutionStatus(PipelineExecutor::ExecutionStatus status) +{ + if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout + || status == PipelineExecutor::ExecutionStatus::CancelledByUser) + return; + + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Pipeline for PushingPipelineExecutor was finished before all data was inserted"); +} + +bool PushingAsyncPipelineExecutor::push(Chunk chunk) { if (!started) start(); @@ -185,13 +195,14 @@ void PushingAsyncPipelineExecutor::push(Chunk chunk) data->rethrowExceptionIfHas(); if (!is_pushed) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Pipeline for PushingAsyncPipelineExecutor was finished before all data was inserted"); + checkExecutionStatus(data->executor->getExecutionStatus()); + + return is_pushed; } -void PushingAsyncPipelineExecutor::push(Block block) +bool PushingAsyncPipelineExecutor::push(Block block) { - push(Chunk(block.getColumns(), block.rows())); + return push(Chunk(block.getColumns(), block.rows())); } void PushingAsyncPipelineExecutor::finish() diff --git a/src/Processors/Executors/PushingAsyncPipelineExecutor.h b/src/Processors/Executors/PushingAsyncPipelineExecutor.h index f976cd4c339..7835aaf596f 100644 --- a/src/Processors/Executors/PushingAsyncPipelineExecutor.h +++ b/src/Processors/Executors/PushingAsyncPipelineExecutor.h @@ -36,8 +36,11 @@ public: void start(); - void push(Chunk chunk); - void push(Block block); + /// Return 'true' if push was successful. + /// Return 'false' if pipline was cancelled without exception. + /// This may happen in case of timeout_overflow_mode = 'break' OR internal bug. + [[nodiscard]] bool push(Chunk chunk); + [[nodiscard]] bool push(Block block); void finish(); diff --git a/src/Processors/Executors/PushingPipelineExecutor.cpp b/src/Processors/Executors/PushingPipelineExecutor.cpp index 696932932df..3133cfd9a1e 100644 --- a/src/Processors/Executors/PushingPipelineExecutor.cpp +++ b/src/Processors/Executors/PushingPipelineExecutor.cpp @@ -80,36 +80,56 @@ const Block & PushingPipelineExecutor::getHeader() const return pushing_source->getPort().getHeader(); } +static void checkExecutionStatus(PipelineExecutor::ExecutionStatus status) +{ + if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout + || status == PipelineExecutor::ExecutionStatus::CancelledByUser) + return; -void PushingPipelineExecutor::start() + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Pipeline for PushingPipelineExecutor was finished before all data was inserted"); +} + +bool PushingPipelineExecutor::start() { if (started) - return; + return true; started = true; executor = std::make_shared(pipeline.processors, pipeline.process_list_element); executor->setReadProgressCallback(pipeline.getReadProgressCallback()); if (!executor->executeStep(&input_wait_flag)) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Pipeline for PushingPipelineExecutor was finished before all data was inserted"); + { + checkExecutionStatus(executor->getExecutionStatus()); + return false; + } + + return true; } -void PushingPipelineExecutor::push(Chunk chunk) +bool PushingPipelineExecutor::push(Chunk chunk) { if (!started) - start(); + { + if (!start()) + return false; + } pushing_source->setData(std::move(chunk)); if (!executor->executeStep(&input_wait_flag)) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Pipeline for PushingPipelineExecutor was finished before all data was inserted"); + { + checkExecutionStatus(executor->getExecutionStatus()); + return false; + } + + return true; } -void PushingPipelineExecutor::push(Block block) +bool PushingPipelineExecutor::push(Block block) { - push(Chunk(block.getColumns(), block.rows())); + return push(Chunk(block.getColumns(), block.rows())); } void PushingPipelineExecutor::finish() diff --git a/src/Processors/Executors/PushingPipelineExecutor.h b/src/Processors/Executors/PushingPipelineExecutor.h index f549c9482db..4021f61fb6b 100644 --- a/src/Processors/Executors/PushingPipelineExecutor.h +++ b/src/Processors/Executors/PushingPipelineExecutor.h @@ -35,10 +35,13 @@ public: /// Get structure of returned block or chunk. const Block & getHeader() const; - void start(); + bool start(); - void push(Chunk chunk); - void push(Block block); + /// Return 'true' if push was successful. + /// Return 'false' if pipline was cancelled without exception. + /// This may happen in case of timeout_overflow_mode = 'break' OR internal bug. + [[nodiscard]] bool push(Chunk chunk); + [[nodiscard]] bool push(Block block); void finish(); diff --git a/src/Processors/Transforms/CreatingSetsTransform.cpp b/src/Processors/Transforms/CreatingSetsTransform.cpp index eeb8f4a6060..857233ac028 100644 --- a/src/Processors/Transforms/CreatingSetsTransform.cpp +++ b/src/Processors/Transforms/CreatingSetsTransform.cpp @@ -215,7 +215,8 @@ void CreatingSetsTransform::consume(Chunk chunk) if (!done_with_table) { block = materializeBlock(block); - executor->push(block); + if (!executor->push(block)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot insert into a table"); rows_to_transfer += block.rows(); bytes_to_transfer += block.bytes(); diff --git a/src/Server/GRPCServer.cpp b/src/Server/GRPCServer.cpp index d8a4d7f0e1f..c261d76ef33 100644 --- a/src/Server/GRPCServer.cpp +++ b/src/Server/GRPCServer.cpp @@ -1012,7 +1012,8 @@ namespace while (pipeline_executor->pull(block)) { if (block) - executor.push(block); + if (!executor.push(block)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data"); } if (isQueryCancelled()) diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 448dfafbd9d..283b60b533c 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -932,12 +932,18 @@ void TCPHandler::processInsertQuery() executor.start(); if (processed_data) - executor.push(std::move(processed_data)); + { + if (!executor.push(std::move(processed_data))) + throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled"); + } else startInsertQuery(); while (readDataNext()) - executor.push(std::move(state.block_for_insert)); + { + if (!executor.push(std::move(state.block_for_insert))) + throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled"); + } if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED) executor.cancel(); @@ -2034,7 +2040,8 @@ bool TCPHandler::receiveData(bool scalar) QueryPipeline temporary_table_out(storage->write(ASTPtr(), metadata_snapshot, query_context, /*async_insert=*/false)); PushingPipelineExecutor executor(temporary_table_out); executor.start(); - executor.push(block); + if (!executor.push(block)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot insert into temporary table"); executor.finish(); } else if (state.need_receive_data_for_input) diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index e3e73e42096..69f9e5b9380 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -89,7 +89,8 @@ static void writeBlockConvert(PushingPipelineExecutor & executor, const Block & { Block adopted_block = adoptBlock(executor.getHeader(), block, log); for (size_t i = 0; i < repeats; ++i) - executor.push(adopted_block); + if (!executor.push(adopted_block)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data"); } @@ -408,7 +409,8 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; Block adopted_shard_block = adoptBlock(job.executor->getHeader(), shard_block, log); - job.executor->push(adopted_shard_block); + if (!job.executor->push(adopted_shard_block)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data"); } else // local { diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index f753d369d2d..3223a2813a3 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -1069,7 +1069,8 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl auto block_io = interpreter.execute(); PushingPipelineExecutor executor(block_io.pipeline); executor.start(); - executor.push(std::move(block_to_write)); + if (!executor.push(std::move(block_to_write))) + throw Exception(ErrorCodes::LOGICAL_ERROR, "StorageBuffer could not write data to destination table"); executor.finish(); } diff --git a/src/Storages/tests/gtest_storage_log.cpp b/src/Storages/tests/gtest_storage_log.cpp index d75f3616f21..60890337cb4 100644 --- a/src/Storages/tests/gtest_storage_log.cpp +++ b/src/Storages/tests/gtest_storage_log.cpp @@ -98,7 +98,7 @@ std::string writeData(int rows, DB::StoragePtr & table, const DB::ContextPtr con QueryPipeline pipeline(table->write({}, metadata_snapshot, context, /*async_insert=*/false)); PushingPipelineExecutor executor(pipeline); - executor.push(block); + std::ignore = executor.push(block); executor.finish(); return data; diff --git a/tests/queries/0_stateless/03221_insert_timeout_overflow_mode.reference b/tests/queries/0_stateless/03221_insert_timeout_overflow_mode.reference new file mode 100644 index 00000000000..68538c3f75b --- /dev/null +++ b/tests/queries/0_stateless/03221_insert_timeout_overflow_mode.reference @@ -0,0 +1,2 @@ +QUERY_WAS_CANCELLED +QUERY_WAS_CANCELLED diff --git a/tests/queries/0_stateless/03221_insert_timeout_overflow_mode.sh b/tests/queries/0_stateless/03221_insert_timeout_overflow_mode.sh new file mode 100755 index 00000000000..030c5211b2d --- /dev/null +++ b/tests/queries/0_stateless/03221_insert_timeout_overflow_mode.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +${CLICKHOUSE_CLIENT} --query "create table null_t (number UInt64) engine = Null;" +${CLICKHOUSE_CLIENT} --query "select sleep(0.1) from system.numbers settings max_block_size = 1 format Native" 2>/dev/null | ${CLICKHOUSE_CLIENT} --max_execution_time = 0.3 --timeout_overflow_mode = 'break' --query "insert into null_t format Native" 2>&1 | grep -o "QUERY_WAS_CANCELLED" From c5ae139c972d46d1e0bfa6ab5f165a049b6786f5 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 13 Aug 2024 15:18:07 +0000 Subject: [PATCH 2/3] Cleanup. --- programs/obfuscator/Obfuscator.cpp | 2 +- src/Client/LocalConnection.cpp | 8 ++--- src/Interpreters/SystemLog.cpp | 2 +- .../PushingAsyncPipelineExecutor.cpp | 15 ++++---- .../Executors/PushingAsyncPipelineExecutor.h | 7 ++-- .../Executors/PushingPipelineExecutor.cpp | 34 ++++++------------- .../Executors/PushingPipelineExecutor.h | 9 ++--- .../Transforms/CreatingSetsTransform.cpp | 3 +- src/Server/GRPCServer.cpp | 3 +- src/Server/TCPHandler.cpp | 13 ++----- src/Storages/Distributed/DistributedSink.cpp | 6 ++-- src/Storages/StorageBuffer.cpp | 3 +- src/Storages/tests/gtest_storage_log.cpp | 2 +- 13 files changed, 36 insertions(+), 71 deletions(-) diff --git a/programs/obfuscator/Obfuscator.cpp b/programs/obfuscator/Obfuscator.cpp index 7c13215e350..688ae1a1143 100644 --- a/programs/obfuscator/Obfuscator.cpp +++ b/programs/obfuscator/Obfuscator.cpp @@ -1462,7 +1462,7 @@ try while (in_executor.pull(block)) { Columns columns = obfuscator.generate(block.getColumns()); - std::ignore = out_executor.push(header.cloneWithColumns(columns)); + out_executor.push(header.cloneWithColumns(columns)); processed_rows += block.rows(); if (!silent) std::cerr << "Processed " << processed_rows << " rows\n"; diff --git a/src/Client/LocalConnection.cpp b/src/Client/LocalConnection.cpp index 8f1e0958002..072184e0a66 100644 --- a/src/Client/LocalConnection.cpp +++ b/src/Client/LocalConnection.cpp @@ -287,17 +287,13 @@ void LocalConnection::sendData(const Block & block, const String &, bool) if (!block) return; - bool inserted = false; if (state->pushing_async_executor) - inserted = state->pushing_async_executor->push(block); + state->pushing_async_executor->push(block); else if (state->pushing_executor) - inserted = state->pushing_executor->push(block); + state->pushing_executor->push(block); else throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown executor"); - if (!inserted) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data"); - if (send_profile_events) sendProfileEvents(); } diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index 0cad56af00a..572481e6b12 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -556,7 +556,7 @@ void SystemLog::flushImpl(const std::vector & to_flush, PushingPipelineExecutor executor(io.pipeline); executor.start(); - std::ignore = executor.push(block); + executor.push(block); executor.finish(); } catch (...) diff --git a/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp index db5cf451c9e..866d224a08d 100644 --- a/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp @@ -15,6 +15,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int QUERY_WAS_CANCELLED; } class PushingAsyncSource : public ISource @@ -176,17 +177,17 @@ void PushingAsyncPipelineExecutor::start() data->thread = ThreadFromGlobalPool(std::move(func)); } -static void checkExecutionStatus(PipelineExecutor::ExecutionStatus status) +[[noreturn]] static void throwOnExecutionStatus(PipelineExecutor::ExecutionStatus status) { if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout || status == PipelineExecutor::ExecutionStatus::CancelledByUser) - return; + throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PushingPipelineExecutor was finished before all data was inserted"); } -bool PushingAsyncPipelineExecutor::push(Chunk chunk) +void PushingAsyncPipelineExecutor::push(Chunk chunk) { if (!started) start(); @@ -195,14 +196,12 @@ bool PushingAsyncPipelineExecutor::push(Chunk chunk) data->rethrowExceptionIfHas(); if (!is_pushed) - checkExecutionStatus(data->executor->getExecutionStatus()); - - return is_pushed; + throwOnExecutionStatus(data->executor->getExecutionStatus()); } -bool PushingAsyncPipelineExecutor::push(Block block) +void PushingAsyncPipelineExecutor::push(Block block) { - return push(Chunk(block.getColumns(), block.rows())); + push(Chunk(block.getColumns(), block.rows())); } void PushingAsyncPipelineExecutor::finish() diff --git a/src/Processors/Executors/PushingAsyncPipelineExecutor.h b/src/Processors/Executors/PushingAsyncPipelineExecutor.h index 7835aaf596f..f976cd4c339 100644 --- a/src/Processors/Executors/PushingAsyncPipelineExecutor.h +++ b/src/Processors/Executors/PushingAsyncPipelineExecutor.h @@ -36,11 +36,8 @@ public: void start(); - /// Return 'true' if push was successful. - /// Return 'false' if pipline was cancelled without exception. - /// This may happen in case of timeout_overflow_mode = 'break' OR internal bug. - [[nodiscard]] bool push(Chunk chunk); - [[nodiscard]] bool push(Block block); + void push(Chunk chunk); + void push(Block block); void finish(); diff --git a/src/Processors/Executors/PushingPipelineExecutor.cpp b/src/Processors/Executors/PushingPipelineExecutor.cpp index 3133cfd9a1e..7a1c0111a3a 100644 --- a/src/Processors/Executors/PushingPipelineExecutor.cpp +++ b/src/Processors/Executors/PushingPipelineExecutor.cpp @@ -11,6 +11,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int QUERY_WAS_CANCELLED; } class PushingSource : public ISource @@ -80,56 +81,43 @@ const Block & PushingPipelineExecutor::getHeader() const return pushing_source->getPort().getHeader(); } -static void checkExecutionStatus(PipelineExecutor::ExecutionStatus status) +[[noreturn]] static void throwOnExecutionStatus(PipelineExecutor::ExecutionStatus status) { if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout || status == PipelineExecutor::ExecutionStatus::CancelledByUser) - return; + throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PushingPipelineExecutor was finished before all data was inserted"); } -bool PushingPipelineExecutor::start() +void PushingPipelineExecutor::start() { if (started) - return true; + return; started = true; executor = std::make_shared(pipeline.processors, pipeline.process_list_element); executor->setReadProgressCallback(pipeline.getReadProgressCallback()); if (!executor->executeStep(&input_wait_flag)) - { - checkExecutionStatus(executor->getExecutionStatus()); - return false; - } - - return true; + throwOnExecutionStatus(executor->getExecutionStatus()); } -bool PushingPipelineExecutor::push(Chunk chunk) +void PushingPipelineExecutor::push(Chunk chunk) { if (!started) - { - if (!start()) - return false; - } + start(); pushing_source->setData(std::move(chunk)); if (!executor->executeStep(&input_wait_flag)) - { - checkExecutionStatus(executor->getExecutionStatus()); - return false; - } - - return true; + throwOnExecutionStatus(executor->getExecutionStatus()); } -bool PushingPipelineExecutor::push(Block block) +void PushingPipelineExecutor::push(Block block) { - return push(Chunk(block.getColumns(), block.rows())); + push(Chunk(block.getColumns(), block.rows())); } void PushingPipelineExecutor::finish() diff --git a/src/Processors/Executors/PushingPipelineExecutor.h b/src/Processors/Executors/PushingPipelineExecutor.h index 4021f61fb6b..f549c9482db 100644 --- a/src/Processors/Executors/PushingPipelineExecutor.h +++ b/src/Processors/Executors/PushingPipelineExecutor.h @@ -35,13 +35,10 @@ public: /// Get structure of returned block or chunk. const Block & getHeader() const; - bool start(); + void start(); - /// Return 'true' if push was successful. - /// Return 'false' if pipline was cancelled without exception. - /// This may happen in case of timeout_overflow_mode = 'break' OR internal bug. - [[nodiscard]] bool push(Chunk chunk); - [[nodiscard]] bool push(Block block); + void push(Chunk chunk); + void push(Block block); void finish(); diff --git a/src/Processors/Transforms/CreatingSetsTransform.cpp b/src/Processors/Transforms/CreatingSetsTransform.cpp index 857233ac028..eeb8f4a6060 100644 --- a/src/Processors/Transforms/CreatingSetsTransform.cpp +++ b/src/Processors/Transforms/CreatingSetsTransform.cpp @@ -215,8 +215,7 @@ void CreatingSetsTransform::consume(Chunk chunk) if (!done_with_table) { block = materializeBlock(block); - if (!executor->push(block)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot insert into a table"); + executor->push(block); rows_to_transfer += block.rows(); bytes_to_transfer += block.bytes(); diff --git a/src/Server/GRPCServer.cpp b/src/Server/GRPCServer.cpp index c261d76ef33..d8a4d7f0e1f 100644 --- a/src/Server/GRPCServer.cpp +++ b/src/Server/GRPCServer.cpp @@ -1012,8 +1012,7 @@ namespace while (pipeline_executor->pull(block)) { if (block) - if (!executor.push(block)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data"); + executor.push(block); } if (isQueryCancelled()) diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 283b60b533c..448dfafbd9d 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -932,18 +932,12 @@ void TCPHandler::processInsertQuery() executor.start(); if (processed_data) - { - if (!executor.push(std::move(processed_data))) - throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled"); - } + executor.push(std::move(processed_data)); else startInsertQuery(); while (readDataNext()) - { - if (!executor.push(std::move(state.block_for_insert))) - throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled"); - } + executor.push(std::move(state.block_for_insert)); if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED) executor.cancel(); @@ -2040,8 +2034,7 @@ bool TCPHandler::receiveData(bool scalar) QueryPipeline temporary_table_out(storage->write(ASTPtr(), metadata_snapshot, query_context, /*async_insert=*/false)); PushingPipelineExecutor executor(temporary_table_out); executor.start(); - if (!executor.push(block)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot insert into temporary table"); + executor.push(block); executor.finish(); } else if (state.need_receive_data_for_input) diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index 69f9e5b9380..e3e73e42096 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -89,8 +89,7 @@ static void writeBlockConvert(PushingPipelineExecutor & executor, const Block & { Block adopted_block = adoptBlock(executor.getHeader(), block, log); for (size_t i = 0; i < repeats; ++i) - if (!executor.push(adopted_block)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data"); + executor.push(adopted_block); } @@ -409,8 +408,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; Block adopted_shard_block = adoptBlock(job.executor->getHeader(), shard_block, log); - if (!job.executor->push(adopted_shard_block)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data"); + job.executor->push(adopted_shard_block); } else // local { diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 3223a2813a3..f753d369d2d 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -1069,8 +1069,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl auto block_io = interpreter.execute(); PushingPipelineExecutor executor(block_io.pipeline); executor.start(); - if (!executor.push(std::move(block_to_write))) - throw Exception(ErrorCodes::LOGICAL_ERROR, "StorageBuffer could not write data to destination table"); + executor.push(std::move(block_to_write)); executor.finish(); } diff --git a/src/Storages/tests/gtest_storage_log.cpp b/src/Storages/tests/gtest_storage_log.cpp index 60890337cb4..d75f3616f21 100644 --- a/src/Storages/tests/gtest_storage_log.cpp +++ b/src/Storages/tests/gtest_storage_log.cpp @@ -98,7 +98,7 @@ std::string writeData(int rows, DB::StoragePtr & table, const DB::ContextPtr con QueryPipeline pipeline(table->write({}, metadata_snapshot, context, /*async_insert=*/false)); PushingPipelineExecutor executor(pipeline); - std::ignore = executor.push(block); + executor.push(block); executor.finish(); return data; From dcf96fa9f4363f7607e9b5ed82056d94c49a6ee3 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 13 Aug 2024 17:57:06 +0200 Subject: [PATCH 3/3] Update 03221_insert_timeout_overflow_mode.sh --- tests/queries/0_stateless/03221_insert_timeout_overflow_mode.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/03221_insert_timeout_overflow_mode.sh b/tests/queries/0_stateless/03221_insert_timeout_overflow_mode.sh index 030c5211b2d..db943a665cb 100755 --- a/tests/queries/0_stateless/03221_insert_timeout_overflow_mode.sh +++ b/tests/queries/0_stateless/03221_insert_timeout_overflow_mode.sh @@ -5,4 +5,4 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . "$CUR_DIR"/../shell_config.sh ${CLICKHOUSE_CLIENT} --query "create table null_t (number UInt64) engine = Null;" -${CLICKHOUSE_CLIENT} --query "select sleep(0.1) from system.numbers settings max_block_size = 1 format Native" 2>/dev/null | ${CLICKHOUSE_CLIENT} --max_execution_time = 0.3 --timeout_overflow_mode = 'break' --query "insert into null_t format Native" 2>&1 | grep -o "QUERY_WAS_CANCELLED" +${CLICKHOUSE_CLIENT} --query "select sleep(0.1) from system.numbers settings max_block_size = 1 format Native" 2>/dev/null | ${CLICKHOUSE_CLIENT} --max_execution_time 0.3 --timeout_overflow_mode break --query "insert into null_t format Native" 2>&1 | grep -o "QUERY_WAS_CANCELLED"