From c5ae139c972d46d1e0bfa6ab5f165a049b6786f5 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 13 Aug 2024 15:18:07 +0000 Subject: [PATCH] Cleanup. --- programs/obfuscator/Obfuscator.cpp | 2 +- src/Client/LocalConnection.cpp | 8 ++--- src/Interpreters/SystemLog.cpp | 2 +- .../PushingAsyncPipelineExecutor.cpp | 15 ++++---- .../Executors/PushingAsyncPipelineExecutor.h | 7 ++-- .../Executors/PushingPipelineExecutor.cpp | 34 ++++++------------- .../Executors/PushingPipelineExecutor.h | 9 ++--- .../Transforms/CreatingSetsTransform.cpp | 3 +- src/Server/GRPCServer.cpp | 3 +- src/Server/TCPHandler.cpp | 13 ++----- src/Storages/Distributed/DistributedSink.cpp | 6 ++-- src/Storages/StorageBuffer.cpp | 3 +- src/Storages/tests/gtest_storage_log.cpp | 2 +- 13 files changed, 36 insertions(+), 71 deletions(-) diff --git a/programs/obfuscator/Obfuscator.cpp b/programs/obfuscator/Obfuscator.cpp index 7c13215e350..688ae1a1143 100644 --- a/programs/obfuscator/Obfuscator.cpp +++ b/programs/obfuscator/Obfuscator.cpp @@ -1462,7 +1462,7 @@ try while (in_executor.pull(block)) { Columns columns = obfuscator.generate(block.getColumns()); - std::ignore = out_executor.push(header.cloneWithColumns(columns)); + out_executor.push(header.cloneWithColumns(columns)); processed_rows += block.rows(); if (!silent) std::cerr << "Processed " << processed_rows << " rows\n"; diff --git a/src/Client/LocalConnection.cpp b/src/Client/LocalConnection.cpp index 8f1e0958002..072184e0a66 100644 --- a/src/Client/LocalConnection.cpp +++ b/src/Client/LocalConnection.cpp @@ -287,17 +287,13 @@ void LocalConnection::sendData(const Block & block, const String &, bool) if (!block) return; - bool inserted = false; if (state->pushing_async_executor) - inserted = state->pushing_async_executor->push(block); + state->pushing_async_executor->push(block); else if (state->pushing_executor) - inserted = state->pushing_executor->push(block); + state->pushing_executor->push(block); else throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown executor"); - if (!inserted) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data"); - if (send_profile_events) sendProfileEvents(); } diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index 0cad56af00a..572481e6b12 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -556,7 +556,7 @@ void SystemLog::flushImpl(const std::vector & to_flush, PushingPipelineExecutor executor(io.pipeline); executor.start(); - std::ignore = executor.push(block); + executor.push(block); executor.finish(); } catch (...) diff --git a/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp index db5cf451c9e..866d224a08d 100644 --- a/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PushingAsyncPipelineExecutor.cpp @@ -15,6 +15,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int QUERY_WAS_CANCELLED; } class PushingAsyncSource : public ISource @@ -176,17 +177,17 @@ void PushingAsyncPipelineExecutor::start() data->thread = ThreadFromGlobalPool(std::move(func)); } -static void checkExecutionStatus(PipelineExecutor::ExecutionStatus status) +[[noreturn]] static void throwOnExecutionStatus(PipelineExecutor::ExecutionStatus status) { if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout || status == PipelineExecutor::ExecutionStatus::CancelledByUser) - return; + throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PushingPipelineExecutor was finished before all data was inserted"); } -bool PushingAsyncPipelineExecutor::push(Chunk chunk) +void PushingAsyncPipelineExecutor::push(Chunk chunk) { if (!started) start(); @@ -195,14 +196,12 @@ bool PushingAsyncPipelineExecutor::push(Chunk chunk) data->rethrowExceptionIfHas(); if (!is_pushed) - checkExecutionStatus(data->executor->getExecutionStatus()); - - return is_pushed; + throwOnExecutionStatus(data->executor->getExecutionStatus()); } -bool PushingAsyncPipelineExecutor::push(Block block) +void PushingAsyncPipelineExecutor::push(Block block) { - return push(Chunk(block.getColumns(), block.rows())); + push(Chunk(block.getColumns(), block.rows())); } void PushingAsyncPipelineExecutor::finish() diff --git a/src/Processors/Executors/PushingAsyncPipelineExecutor.h b/src/Processors/Executors/PushingAsyncPipelineExecutor.h index 7835aaf596f..f976cd4c339 100644 --- a/src/Processors/Executors/PushingAsyncPipelineExecutor.h +++ b/src/Processors/Executors/PushingAsyncPipelineExecutor.h @@ -36,11 +36,8 @@ public: void start(); - /// Return 'true' if push was successful. - /// Return 'false' if pipline was cancelled without exception. - /// This may happen in case of timeout_overflow_mode = 'break' OR internal bug. - [[nodiscard]] bool push(Chunk chunk); - [[nodiscard]] bool push(Block block); + void push(Chunk chunk); + void push(Block block); void finish(); diff --git a/src/Processors/Executors/PushingPipelineExecutor.cpp b/src/Processors/Executors/PushingPipelineExecutor.cpp index 3133cfd9a1e..7a1c0111a3a 100644 --- a/src/Processors/Executors/PushingPipelineExecutor.cpp +++ b/src/Processors/Executors/PushingPipelineExecutor.cpp @@ -11,6 +11,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int QUERY_WAS_CANCELLED; } class PushingSource : public ISource @@ -80,56 +81,43 @@ const Block & PushingPipelineExecutor::getHeader() const return pushing_source->getPort().getHeader(); } -static void checkExecutionStatus(PipelineExecutor::ExecutionStatus status) +[[noreturn]] static void throwOnExecutionStatus(PipelineExecutor::ExecutionStatus status) { if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout || status == PipelineExecutor::ExecutionStatus::CancelledByUser) - return; + throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PushingPipelineExecutor was finished before all data was inserted"); } -bool PushingPipelineExecutor::start() +void PushingPipelineExecutor::start() { if (started) - return true; + return; started = true; executor = std::make_shared(pipeline.processors, pipeline.process_list_element); executor->setReadProgressCallback(pipeline.getReadProgressCallback()); if (!executor->executeStep(&input_wait_flag)) - { - checkExecutionStatus(executor->getExecutionStatus()); - return false; - } - - return true; + throwOnExecutionStatus(executor->getExecutionStatus()); } -bool PushingPipelineExecutor::push(Chunk chunk) +void PushingPipelineExecutor::push(Chunk chunk) { if (!started) - { - if (!start()) - return false; - } + start(); pushing_source->setData(std::move(chunk)); if (!executor->executeStep(&input_wait_flag)) - { - checkExecutionStatus(executor->getExecutionStatus()); - return false; - } - - return true; + throwOnExecutionStatus(executor->getExecutionStatus()); } -bool PushingPipelineExecutor::push(Block block) +void PushingPipelineExecutor::push(Block block) { - return push(Chunk(block.getColumns(), block.rows())); + push(Chunk(block.getColumns(), block.rows())); } void PushingPipelineExecutor::finish() diff --git a/src/Processors/Executors/PushingPipelineExecutor.h b/src/Processors/Executors/PushingPipelineExecutor.h index 4021f61fb6b..f549c9482db 100644 --- a/src/Processors/Executors/PushingPipelineExecutor.h +++ b/src/Processors/Executors/PushingPipelineExecutor.h @@ -35,13 +35,10 @@ public: /// Get structure of returned block or chunk. const Block & getHeader() const; - bool start(); + void start(); - /// Return 'true' if push was successful. - /// Return 'false' if pipline was cancelled without exception. - /// This may happen in case of timeout_overflow_mode = 'break' OR internal bug. - [[nodiscard]] bool push(Chunk chunk); - [[nodiscard]] bool push(Block block); + void push(Chunk chunk); + void push(Block block); void finish(); diff --git a/src/Processors/Transforms/CreatingSetsTransform.cpp b/src/Processors/Transforms/CreatingSetsTransform.cpp index 857233ac028..eeb8f4a6060 100644 --- a/src/Processors/Transforms/CreatingSetsTransform.cpp +++ b/src/Processors/Transforms/CreatingSetsTransform.cpp @@ -215,8 +215,7 @@ void CreatingSetsTransform::consume(Chunk chunk) if (!done_with_table) { block = materializeBlock(block); - if (!executor->push(block)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot insert into a table"); + executor->push(block); rows_to_transfer += block.rows(); bytes_to_transfer += block.bytes(); diff --git a/src/Server/GRPCServer.cpp b/src/Server/GRPCServer.cpp index c261d76ef33..d8a4d7f0e1f 100644 --- a/src/Server/GRPCServer.cpp +++ b/src/Server/GRPCServer.cpp @@ -1012,8 +1012,7 @@ namespace while (pipeline_executor->pull(block)) { if (block) - if (!executor.push(block)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data"); + executor.push(block); } if (isQueryCancelled()) diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 283b60b533c..448dfafbd9d 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -932,18 +932,12 @@ void TCPHandler::processInsertQuery() executor.start(); if (processed_data) - { - if (!executor.push(std::move(processed_data))) - throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled"); - } + executor.push(std::move(processed_data)); else startInsertQuery(); while (readDataNext()) - { - if (!executor.push(std::move(state.block_for_insert))) - throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled"); - } + executor.push(std::move(state.block_for_insert)); if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED) executor.cancel(); @@ -2040,8 +2034,7 @@ bool TCPHandler::receiveData(bool scalar) QueryPipeline temporary_table_out(storage->write(ASTPtr(), metadata_snapshot, query_context, /*async_insert=*/false)); PushingPipelineExecutor executor(temporary_table_out); executor.start(); - if (!executor.push(block)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot insert into temporary table"); + executor.push(block); executor.finish(); } else if (state.need_receive_data_for_input) diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index 69f9e5b9380..e3e73e42096 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -89,8 +89,7 @@ static void writeBlockConvert(PushingPipelineExecutor & executor, const Block & { Block adopted_block = adoptBlock(executor.getHeader(), block, log); for (size_t i = 0; i < repeats; ++i) - if (!executor.push(adopted_block)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data"); + executor.push(adopted_block); } @@ -409,8 +408,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; Block adopted_shard_block = adoptBlock(job.executor->getHeader(), shard_block, log); - if (!job.executor->push(adopted_shard_block)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data"); + job.executor->push(adopted_shard_block); } else // local { diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 3223a2813a3..f753d369d2d 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -1069,8 +1069,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl auto block_io = interpreter.execute(); PushingPipelineExecutor executor(block_io.pipeline); executor.start(); - if (!executor.push(std::move(block_to_write))) - throw Exception(ErrorCodes::LOGICAL_ERROR, "StorageBuffer could not write data to destination table"); + executor.push(std::move(block_to_write)); executor.finish(); } diff --git a/src/Storages/tests/gtest_storage_log.cpp b/src/Storages/tests/gtest_storage_log.cpp index 60890337cb4..d75f3616f21 100644 --- a/src/Storages/tests/gtest_storage_log.cpp +++ b/src/Storages/tests/gtest_storage_log.cpp @@ -98,7 +98,7 @@ std::string writeData(int rows, DB::StoragePtr & table, const DB::ContextPtr con QueryPipeline pipeline(table->write({}, metadata_snapshot, context, /*async_insert=*/false)); PushingPipelineExecutor executor(pipeline); - std::ignore = executor.push(block); + executor.push(block); executor.finish(); return data;