mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
Add status to PipelineExecutor. Verify status of pusing pipeline.
This commit is contained in:
parent
cabe90d048
commit
04286bc270
@ -1462,7 +1462,7 @@ try
|
||||
while (in_executor.pull(block))
|
||||
{
|
||||
Columns columns = obfuscator.generate(block.getColumns());
|
||||
out_executor.push(header.cloneWithColumns(columns));
|
||||
std::ignore = out_executor.push(header.cloneWithColumns(columns));
|
||||
processed_rows += block.rows();
|
||||
if (!silent)
|
||||
std::cerr << "Processed " << processed_rows << " rows\n";
|
||||
|
@ -287,13 +287,17 @@ void LocalConnection::sendData(const Block & block, const String &, bool)
|
||||
if (!block)
|
||||
return;
|
||||
|
||||
bool inserted = false;
|
||||
if (state->pushing_async_executor)
|
||||
state->pushing_async_executor->push(block);
|
||||
inserted = state->pushing_async_executor->push(block);
|
||||
else if (state->pushing_executor)
|
||||
state->pushing_executor->push(block);
|
||||
inserted = state->pushing_executor->push(block);
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown executor");
|
||||
|
||||
if (!inserted)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data");
|
||||
|
||||
if (send_profile_events)
|
||||
sendProfileEvents();
|
||||
}
|
||||
|
@ -556,7 +556,7 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
|
||||
PushingPipelineExecutor executor(io.pipeline);
|
||||
|
||||
executor.start();
|
||||
executor.push(block);
|
||||
std::ignore = executor.push(block);
|
||||
executor.finish();
|
||||
}
|
||||
catch (...)
|
||||
|
@ -96,7 +96,7 @@ bool ExecutingGraph::addEdges(uint64_t node)
|
||||
return was_edge_added;
|
||||
}
|
||||
|
||||
bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
|
||||
ExecutingGraph::UpdateNodeStatus ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
|
||||
{
|
||||
auto & cur_node = *nodes[pid];
|
||||
Processors new_processors;
|
||||
@ -108,7 +108,7 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
|
||||
catch (...)
|
||||
{
|
||||
cur_node.exception = std::current_exception();
|
||||
return false;
|
||||
return UpdateNodeStatus::Exception;
|
||||
}
|
||||
|
||||
{
|
||||
@ -118,7 +118,7 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
|
||||
{
|
||||
for (auto & processor : new_processors)
|
||||
processor->cancel();
|
||||
return false;
|
||||
return UpdateNodeStatus::Cancelled;
|
||||
}
|
||||
processors->insert(processors->end(), new_processors.begin(), new_processors.end());
|
||||
|
||||
@ -178,7 +178,7 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
return UpdateNodeStatus::Done;
|
||||
}
|
||||
|
||||
void ExecutingGraph::initializeExecution(Queue & queue)
|
||||
@ -213,7 +213,7 @@ void ExecutingGraph::initializeExecution(Queue & queue)
|
||||
}
|
||||
|
||||
|
||||
bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue)
|
||||
ExecutingGraph::UpdateNodeStatus ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue)
|
||||
{
|
||||
std::stack<Edge *> updated_edges;
|
||||
std::stack<uint64_t> updated_processors;
|
||||
@ -309,7 +309,7 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
|
||||
catch (...)
|
||||
{
|
||||
node.exception = std::current_exception();
|
||||
return false;
|
||||
return UpdateNodeStatus::Exception;
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
@ -386,8 +386,9 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
|
||||
read_lock.unlock();
|
||||
{
|
||||
std::unique_lock lock(nodes_mutex);
|
||||
if (!expandPipeline(updated_processors, pid))
|
||||
return false;
|
||||
auto status = expandPipeline(updated_processors, pid);
|
||||
if (status != UpdateNodeStatus::Done)
|
||||
return status;
|
||||
}
|
||||
read_lock.lock();
|
||||
|
||||
@ -397,7 +398,7 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
return UpdateNodeStatus::Done;
|
||||
}
|
||||
|
||||
void ExecutingGraph::cancel(bool cancel_all_processors)
|
||||
|
@ -138,10 +138,17 @@ public:
|
||||
/// Traverse graph the first time to update all the childless nodes.
|
||||
void initializeExecution(Queue & queue);
|
||||
|
||||
enum class UpdateNodeStatus
|
||||
{
|
||||
Done,
|
||||
Exception,
|
||||
Cancelled,
|
||||
};
|
||||
|
||||
/// Update processor with pid number (call IProcessor::prepare).
|
||||
/// Check parents and children of current processor and push them to stacks if they also need to be updated.
|
||||
/// If processor wants to be expanded, lock will be upgraded to get write access to pipeline.
|
||||
bool updateNode(uint64_t pid, Queue & queue, Queue & async_queue);
|
||||
UpdateNodeStatus updateNode(uint64_t pid, Queue & queue, Queue & async_queue);
|
||||
|
||||
void cancel(bool cancel_all_processors = true);
|
||||
|
||||
@ -155,7 +162,7 @@ private:
|
||||
|
||||
/// Update graph after processor (pid) returned ExpandPipeline status.
|
||||
/// All new nodes and nodes with updated ports are pushed into stack.
|
||||
bool expandPipeline(std::stack<uint64_t> & stack, uint64_t pid);
|
||||
UpdateNodeStatus expandPipeline(std::stack<uint64_t> & stack, uint64_t pid);
|
||||
|
||||
std::shared_ptr<Processors> processors;
|
||||
std::vector<bool> source_processors;
|
||||
|
@ -77,9 +77,9 @@ const Processors & PipelineExecutor::getProcessors() const
|
||||
return graph->getProcessors();
|
||||
}
|
||||
|
||||
void PipelineExecutor::cancel()
|
||||
void PipelineExecutor::cancel(ExecutionStatus reason)
|
||||
{
|
||||
cancelled = true;
|
||||
tryUpdateExecutionStatus(ExecutionStatus::Executing, reason);
|
||||
finish();
|
||||
graph->cancel();
|
||||
}
|
||||
@ -98,6 +98,11 @@ void PipelineExecutor::finish()
|
||||
tasks.finish();
|
||||
}
|
||||
|
||||
bool PipelineExecutor::tryUpdateExecutionStatus(ExecutionStatus expected, ExecutionStatus desired)
|
||||
{
|
||||
return execution_status.compare_exchange_strong(expected, desired);
|
||||
}
|
||||
|
||||
void PipelineExecutor::execute(size_t num_threads, bool concurrency_control)
|
||||
{
|
||||
checkTimeLimit();
|
||||
@ -120,7 +125,7 @@ void PipelineExecutor::execute(size_t num_threads, bool concurrency_control)
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
span.addAttribute(ExecutionStatus::fromCurrentException());
|
||||
span.addAttribute(DB::ExecutionStatus::fromCurrentException());
|
||||
|
||||
#ifndef NDEBUG
|
||||
LOG_TRACE(log, "Exception while executing query. Current state:\n{}", dumpPipeline());
|
||||
@ -169,7 +174,7 @@ bool PipelineExecutor::checkTimeLimitSoft()
|
||||
// We call cancel here so that all processors are notified and tasks waken up
|
||||
// so that the "break" is faster and doesn't wait for long events
|
||||
if (!continuing)
|
||||
cancel();
|
||||
cancel(ExecutionStatus::CancelledByTimeout);
|
||||
|
||||
return continuing;
|
||||
}
|
||||
@ -195,7 +200,8 @@ void PipelineExecutor::finalizeExecution()
|
||||
{
|
||||
checkTimeLimit();
|
||||
|
||||
if (cancelled)
|
||||
auto status = execution_status.load();
|
||||
if (status == ExecutionStatus::CancelledByTimeout || status == ExecutionStatus::CancelledByUser)
|
||||
return;
|
||||
|
||||
bool all_processors_finished = true;
|
||||
@ -271,7 +277,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
|
||||
break;
|
||||
|
||||
if (!context.executeTask())
|
||||
cancel();
|
||||
cancel(ExecutionStatus::Exception);
|
||||
|
||||
if (tasks.isFinished())
|
||||
break;
|
||||
@ -289,11 +295,13 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
|
||||
Queue async_queue;
|
||||
|
||||
/// Prepare processor after execution.
|
||||
if (!graph->updateNode(context.getProcessorID(), queue, async_queue))
|
||||
cancel();
|
||||
auto status = graph->updateNode(context.getProcessorID(), queue, async_queue);
|
||||
if (status == ExecutingGraph::UpdateNodeStatus::Exception)
|
||||
cancel(ExecutionStatus::Exception);
|
||||
|
||||
/// Push other tasks to global queue.
|
||||
tasks.pushTasks(queue, async_queue, context);
|
||||
if (status == ExecutingGraph::UpdateNodeStatus::Done)
|
||||
tasks.pushTasks(queue, async_queue, context);
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
@ -309,7 +317,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
|
||||
{
|
||||
/// spawnThreads can throw an exception, for example CANNOT_SCHEDULE_TASK.
|
||||
/// We should cancel execution properly before rethrow.
|
||||
cancel();
|
||||
cancel(ExecutionStatus::Exception);
|
||||
throw;
|
||||
}
|
||||
|
||||
@ -328,6 +336,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
|
||||
void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_control)
|
||||
{
|
||||
is_execution_initialized = true;
|
||||
tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing);
|
||||
|
||||
size_t use_threads = num_threads;
|
||||
|
||||
@ -393,7 +402,7 @@ void PipelineExecutor::executeImpl(size_t num_threads, bool concurrency_control)
|
||||
{
|
||||
/// If finished_flag is not set, there was an exception.
|
||||
/// Cancel execution in this case.
|
||||
cancel();
|
||||
cancel(ExecutionStatus::Exception);
|
||||
if (pool)
|
||||
pool->wait();
|
||||
}
|
||||
|
@ -48,8 +48,20 @@ public:
|
||||
|
||||
const Processors & getProcessors() const;
|
||||
|
||||
enum class ExecutionStatus
|
||||
{
|
||||
NotStarted,
|
||||
Executing,
|
||||
Finished,
|
||||
Exception,
|
||||
CancelledByUser,
|
||||
CancelledByTimeout,
|
||||
};
|
||||
|
||||
/// Cancel execution. May be called from another thread.
|
||||
void cancel();
|
||||
void cancel() { cancel(ExecutionStatus::CancelledByUser); }
|
||||
|
||||
ExecutionStatus getExecutionStatus() const { return execution_status.load(); }
|
||||
|
||||
/// Cancel processors which only read data from source. May be called from another thread.
|
||||
void cancelReading();
|
||||
@ -81,7 +93,7 @@ private:
|
||||
/// system.opentelemetry_span_log
|
||||
bool trace_processors = false;
|
||||
|
||||
std::atomic_bool cancelled = false;
|
||||
std::atomic<ExecutionStatus> execution_status = ExecutionStatus::NotStarted;
|
||||
std::atomic_bool cancelled_reading = false;
|
||||
|
||||
LoggerPtr log = getLogger("PipelineExecutor");
|
||||
@ -105,6 +117,10 @@ private:
|
||||
void executeStepImpl(size_t thread_num, std::atomic_bool * yield_flag = nullptr);
|
||||
void executeSingleThread(size_t thread_num);
|
||||
void finish();
|
||||
void cancel(ExecutionStatus reason);
|
||||
|
||||
/// If execution_status == from, change it to desired.
|
||||
bool tryUpdateExecutionStatus(ExecutionStatus expected, ExecutionStatus desired);
|
||||
|
||||
String dumpPipeline() const;
|
||||
};
|
||||
|
@ -176,7 +176,17 @@ void PushingAsyncPipelineExecutor::start()
|
||||
data->thread = ThreadFromGlobalPool(std::move(func));
|
||||
}
|
||||
|
||||
void PushingAsyncPipelineExecutor::push(Chunk chunk)
|
||||
static void checkExecutionStatus(PipelineExecutor::ExecutionStatus status)
|
||||
{
|
||||
if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout
|
||||
|| status == PipelineExecutor::ExecutionStatus::CancelledByUser)
|
||||
return;
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
|
||||
}
|
||||
|
||||
bool PushingAsyncPipelineExecutor::push(Chunk chunk)
|
||||
{
|
||||
if (!started)
|
||||
start();
|
||||
@ -185,13 +195,14 @@ void PushingAsyncPipelineExecutor::push(Chunk chunk)
|
||||
data->rethrowExceptionIfHas();
|
||||
|
||||
if (!is_pushed)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Pipeline for PushingAsyncPipelineExecutor was finished before all data was inserted");
|
||||
checkExecutionStatus(data->executor->getExecutionStatus());
|
||||
|
||||
return is_pushed;
|
||||
}
|
||||
|
||||
void PushingAsyncPipelineExecutor::push(Block block)
|
||||
bool PushingAsyncPipelineExecutor::push(Block block)
|
||||
{
|
||||
push(Chunk(block.getColumns(), block.rows()));
|
||||
return push(Chunk(block.getColumns(), block.rows()));
|
||||
}
|
||||
|
||||
void PushingAsyncPipelineExecutor::finish()
|
||||
|
@ -36,8 +36,11 @@ public:
|
||||
|
||||
void start();
|
||||
|
||||
void push(Chunk chunk);
|
||||
void push(Block block);
|
||||
/// Return 'true' if push was successful.
|
||||
/// Return 'false' if pipline was cancelled without exception.
|
||||
/// This may happen in case of timeout_overflow_mode = 'break' OR internal bug.
|
||||
[[nodiscard]] bool push(Chunk chunk);
|
||||
[[nodiscard]] bool push(Block block);
|
||||
|
||||
void finish();
|
||||
|
||||
|
@ -80,36 +80,56 @@ const Block & PushingPipelineExecutor::getHeader() const
|
||||
return pushing_source->getPort().getHeader();
|
||||
}
|
||||
|
||||
static void checkExecutionStatus(PipelineExecutor::ExecutionStatus status)
|
||||
{
|
||||
if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout
|
||||
|| status == PipelineExecutor::ExecutionStatus::CancelledByUser)
|
||||
return;
|
||||
|
||||
void PushingPipelineExecutor::start()
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
|
||||
}
|
||||
|
||||
bool PushingPipelineExecutor::start()
|
||||
{
|
||||
if (started)
|
||||
return;
|
||||
return true;
|
||||
|
||||
started = true;
|
||||
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
|
||||
executor->setReadProgressCallback(pipeline.getReadProgressCallback());
|
||||
|
||||
if (!executor->executeStep(&input_wait_flag))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
|
||||
{
|
||||
checkExecutionStatus(executor->getExecutionStatus());
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void PushingPipelineExecutor::push(Chunk chunk)
|
||||
bool PushingPipelineExecutor::push(Chunk chunk)
|
||||
{
|
||||
if (!started)
|
||||
start();
|
||||
{
|
||||
if (!start())
|
||||
return false;
|
||||
}
|
||||
|
||||
pushing_source->setData(std::move(chunk));
|
||||
|
||||
if (!executor->executeStep(&input_wait_flag))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
|
||||
{
|
||||
checkExecutionStatus(executor->getExecutionStatus());
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void PushingPipelineExecutor::push(Block block)
|
||||
bool PushingPipelineExecutor::push(Block block)
|
||||
{
|
||||
push(Chunk(block.getColumns(), block.rows()));
|
||||
return push(Chunk(block.getColumns(), block.rows()));
|
||||
}
|
||||
|
||||
void PushingPipelineExecutor::finish()
|
||||
|
@ -35,10 +35,13 @@ public:
|
||||
/// Get structure of returned block or chunk.
|
||||
const Block & getHeader() const;
|
||||
|
||||
void start();
|
||||
bool start();
|
||||
|
||||
void push(Chunk chunk);
|
||||
void push(Block block);
|
||||
/// Return 'true' if push was successful.
|
||||
/// Return 'false' if pipline was cancelled without exception.
|
||||
/// This may happen in case of timeout_overflow_mode = 'break' OR internal bug.
|
||||
[[nodiscard]] bool push(Chunk chunk);
|
||||
[[nodiscard]] bool push(Block block);
|
||||
|
||||
void finish();
|
||||
|
||||
|
@ -215,7 +215,8 @@ void CreatingSetsTransform::consume(Chunk chunk)
|
||||
if (!done_with_table)
|
||||
{
|
||||
block = materializeBlock(block);
|
||||
executor->push(block);
|
||||
if (!executor->push(block))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot insert into a table");
|
||||
|
||||
rows_to_transfer += block.rows();
|
||||
bytes_to_transfer += block.bytes();
|
||||
|
@ -1012,7 +1012,8 @@ namespace
|
||||
while (pipeline_executor->pull(block))
|
||||
{
|
||||
if (block)
|
||||
executor.push(block);
|
||||
if (!executor.push(block))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data");
|
||||
}
|
||||
|
||||
if (isQueryCancelled())
|
||||
|
@ -932,12 +932,18 @@ void TCPHandler::processInsertQuery()
|
||||
executor.start();
|
||||
|
||||
if (processed_data)
|
||||
executor.push(std::move(processed_data));
|
||||
{
|
||||
if (!executor.push(std::move(processed_data)))
|
||||
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
|
||||
}
|
||||
else
|
||||
startInsertQuery();
|
||||
|
||||
while (readDataNext())
|
||||
executor.push(std::move(state.block_for_insert));
|
||||
{
|
||||
if (!executor.push(std::move(state.block_for_insert)))
|
||||
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
|
||||
}
|
||||
|
||||
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
|
||||
executor.cancel();
|
||||
@ -2034,7 +2040,8 @@ bool TCPHandler::receiveData(bool scalar)
|
||||
QueryPipeline temporary_table_out(storage->write(ASTPtr(), metadata_snapshot, query_context, /*async_insert=*/false));
|
||||
PushingPipelineExecutor executor(temporary_table_out);
|
||||
executor.start();
|
||||
executor.push(block);
|
||||
if (!executor.push(block))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot insert into temporary table");
|
||||
executor.finish();
|
||||
}
|
||||
else if (state.need_receive_data_for_input)
|
||||
|
@ -89,7 +89,8 @@ static void writeBlockConvert(PushingPipelineExecutor & executor, const Block &
|
||||
{
|
||||
Block adopted_block = adoptBlock(executor.getHeader(), block, log);
|
||||
for (size_t i = 0; i < repeats; ++i)
|
||||
executor.push(adopted_block);
|
||||
if (!executor.push(adopted_block))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data");
|
||||
}
|
||||
|
||||
|
||||
@ -408,7 +409,8 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
|
||||
|
||||
Block adopted_shard_block = adoptBlock(job.executor->getHeader(), shard_block, log);
|
||||
job.executor->push(adopted_shard_block);
|
||||
if (!job.executor->push(adopted_shard_block))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data");
|
||||
}
|
||||
else // local
|
||||
{
|
||||
|
@ -1069,7 +1069,8 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
|
||||
auto block_io = interpreter.execute();
|
||||
PushingPipelineExecutor executor(block_io.pipeline);
|
||||
executor.start();
|
||||
executor.push(std::move(block_to_write));
|
||||
if (!executor.push(std::move(block_to_write)))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "StorageBuffer could not write data to destination table");
|
||||
executor.finish();
|
||||
}
|
||||
|
||||
|
@ -98,7 +98,7 @@ std::string writeData(int rows, DB::StoragePtr & table, const DB::ContextPtr con
|
||||
QueryPipeline pipeline(table->write({}, metadata_snapshot, context, /*async_insert=*/false));
|
||||
|
||||
PushingPipelineExecutor executor(pipeline);
|
||||
executor.push(block);
|
||||
std::ignore = executor.push(block);
|
||||
executor.finish();
|
||||
|
||||
return data;
|
||||
|
@ -0,0 +1,2 @@
|
||||
QUERY_WAS_CANCELLED
|
||||
QUERY_WAS_CANCELLED
|
8
tests/queries/0_stateless/03221_insert_timeout_overflow_mode.sh
Executable file
8
tests/queries/0_stateless/03221_insert_timeout_overflow_mode.sh
Executable file
@ -0,0 +1,8 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "create table null_t (number UInt64) engine = Null;"
|
||||
${CLICKHOUSE_CLIENT} --query "select sleep(0.1) from system.numbers settings max_block_size = 1 format Native" 2>/dev/null | ${CLICKHOUSE_CLIENT} --max_execution_time = 0.3 --timeout_overflow_mode = 'break' --query "insert into null_t format Native" 2>&1 | grep -o "QUERY_WAS_CANCELLED"
|
Loading…
Reference in New Issue
Block a user