mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 10:02:01 +00:00
Cleanup.
This commit is contained in:
parent
04286bc270
commit
c5ae139c97
@ -1462,7 +1462,7 @@ try
|
||||
while (in_executor.pull(block))
|
||||
{
|
||||
Columns columns = obfuscator.generate(block.getColumns());
|
||||
std::ignore = out_executor.push(header.cloneWithColumns(columns));
|
||||
out_executor.push(header.cloneWithColumns(columns));
|
||||
processed_rows += block.rows();
|
||||
if (!silent)
|
||||
std::cerr << "Processed " << processed_rows << " rows\n";
|
||||
|
@ -287,17 +287,13 @@ void LocalConnection::sendData(const Block & block, const String &, bool)
|
||||
if (!block)
|
||||
return;
|
||||
|
||||
bool inserted = false;
|
||||
if (state->pushing_async_executor)
|
||||
inserted = state->pushing_async_executor->push(block);
|
||||
state->pushing_async_executor->push(block);
|
||||
else if (state->pushing_executor)
|
||||
inserted = state->pushing_executor->push(block);
|
||||
state->pushing_executor->push(block);
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown executor");
|
||||
|
||||
if (!inserted)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data");
|
||||
|
||||
if (send_profile_events)
|
||||
sendProfileEvents();
|
||||
}
|
||||
|
@ -556,7 +556,7 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
|
||||
PushingPipelineExecutor executor(io.pipeline);
|
||||
|
||||
executor.start();
|
||||
std::ignore = executor.push(block);
|
||||
executor.push(block);
|
||||
executor.finish();
|
||||
}
|
||||
catch (...)
|
||||
|
@ -15,6 +15,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int QUERY_WAS_CANCELLED;
|
||||
}
|
||||
|
||||
class PushingAsyncSource : public ISource
|
||||
@ -176,17 +177,17 @@ void PushingAsyncPipelineExecutor::start()
|
||||
data->thread = ThreadFromGlobalPool(std::move(func));
|
||||
}
|
||||
|
||||
static void checkExecutionStatus(PipelineExecutor::ExecutionStatus status)
|
||||
[[noreturn]] static void throwOnExecutionStatus(PipelineExecutor::ExecutionStatus status)
|
||||
{
|
||||
if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout
|
||||
|| status == PipelineExecutor::ExecutionStatus::CancelledByUser)
|
||||
return;
|
||||
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
|
||||
}
|
||||
|
||||
bool PushingAsyncPipelineExecutor::push(Chunk chunk)
|
||||
void PushingAsyncPipelineExecutor::push(Chunk chunk)
|
||||
{
|
||||
if (!started)
|
||||
start();
|
||||
@ -195,14 +196,12 @@ bool PushingAsyncPipelineExecutor::push(Chunk chunk)
|
||||
data->rethrowExceptionIfHas();
|
||||
|
||||
if (!is_pushed)
|
||||
checkExecutionStatus(data->executor->getExecutionStatus());
|
||||
|
||||
return is_pushed;
|
||||
throwOnExecutionStatus(data->executor->getExecutionStatus());
|
||||
}
|
||||
|
||||
bool PushingAsyncPipelineExecutor::push(Block block)
|
||||
void PushingAsyncPipelineExecutor::push(Block block)
|
||||
{
|
||||
return push(Chunk(block.getColumns(), block.rows()));
|
||||
push(Chunk(block.getColumns(), block.rows()));
|
||||
}
|
||||
|
||||
void PushingAsyncPipelineExecutor::finish()
|
||||
|
@ -36,11 +36,8 @@ public:
|
||||
|
||||
void start();
|
||||
|
||||
/// Return 'true' if push was successful.
|
||||
/// Return 'false' if pipline was cancelled without exception.
|
||||
/// This may happen in case of timeout_overflow_mode = 'break' OR internal bug.
|
||||
[[nodiscard]] bool push(Chunk chunk);
|
||||
[[nodiscard]] bool push(Block block);
|
||||
void push(Chunk chunk);
|
||||
void push(Block block);
|
||||
|
||||
void finish();
|
||||
|
||||
|
@ -11,6 +11,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int QUERY_WAS_CANCELLED;
|
||||
}
|
||||
|
||||
class PushingSource : public ISource
|
||||
@ -80,56 +81,43 @@ const Block & PushingPipelineExecutor::getHeader() const
|
||||
return pushing_source->getPort().getHeader();
|
||||
}
|
||||
|
||||
static void checkExecutionStatus(PipelineExecutor::ExecutionStatus status)
|
||||
[[noreturn]] static void throwOnExecutionStatus(PipelineExecutor::ExecutionStatus status)
|
||||
{
|
||||
if (status == PipelineExecutor::ExecutionStatus::CancelledByTimeout
|
||||
|| status == PipelineExecutor::ExecutionStatus::CancelledByUser)
|
||||
return;
|
||||
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
|
||||
}
|
||||
|
||||
bool PushingPipelineExecutor::start()
|
||||
void PushingPipelineExecutor::start()
|
||||
{
|
||||
if (started)
|
||||
return true;
|
||||
return;
|
||||
|
||||
started = true;
|
||||
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
|
||||
executor->setReadProgressCallback(pipeline.getReadProgressCallback());
|
||||
|
||||
if (!executor->executeStep(&input_wait_flag))
|
||||
{
|
||||
checkExecutionStatus(executor->getExecutionStatus());
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
throwOnExecutionStatus(executor->getExecutionStatus());
|
||||
}
|
||||
|
||||
bool PushingPipelineExecutor::push(Chunk chunk)
|
||||
void PushingPipelineExecutor::push(Chunk chunk)
|
||||
{
|
||||
if (!started)
|
||||
{
|
||||
if (!start())
|
||||
return false;
|
||||
}
|
||||
start();
|
||||
|
||||
pushing_source->setData(std::move(chunk));
|
||||
|
||||
if (!executor->executeStep(&input_wait_flag))
|
||||
{
|
||||
checkExecutionStatus(executor->getExecutionStatus());
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
throwOnExecutionStatus(executor->getExecutionStatus());
|
||||
}
|
||||
|
||||
bool PushingPipelineExecutor::push(Block block)
|
||||
void PushingPipelineExecutor::push(Block block)
|
||||
{
|
||||
return push(Chunk(block.getColumns(), block.rows()));
|
||||
push(Chunk(block.getColumns(), block.rows()));
|
||||
}
|
||||
|
||||
void PushingPipelineExecutor::finish()
|
||||
|
@ -35,13 +35,10 @@ public:
|
||||
/// Get structure of returned block or chunk.
|
||||
const Block & getHeader() const;
|
||||
|
||||
bool start();
|
||||
void start();
|
||||
|
||||
/// Return 'true' if push was successful.
|
||||
/// Return 'false' if pipline was cancelled without exception.
|
||||
/// This may happen in case of timeout_overflow_mode = 'break' OR internal bug.
|
||||
[[nodiscard]] bool push(Chunk chunk);
|
||||
[[nodiscard]] bool push(Block block);
|
||||
void push(Chunk chunk);
|
||||
void push(Block block);
|
||||
|
||||
void finish();
|
||||
|
||||
|
@ -215,8 +215,7 @@ void CreatingSetsTransform::consume(Chunk chunk)
|
||||
if (!done_with_table)
|
||||
{
|
||||
block = materializeBlock(block);
|
||||
if (!executor->push(block))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot insert into a table");
|
||||
executor->push(block);
|
||||
|
||||
rows_to_transfer += block.rows();
|
||||
bytes_to_transfer += block.bytes();
|
||||
|
@ -1012,8 +1012,7 @@ namespace
|
||||
while (pipeline_executor->pull(block))
|
||||
{
|
||||
if (block)
|
||||
if (!executor.push(block))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data");
|
||||
executor.push(block);
|
||||
}
|
||||
|
||||
if (isQueryCancelled())
|
||||
|
@ -932,18 +932,12 @@ void TCPHandler::processInsertQuery()
|
||||
executor.start();
|
||||
|
||||
if (processed_data)
|
||||
{
|
||||
if (!executor.push(std::move(processed_data)))
|
||||
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
|
||||
}
|
||||
executor.push(std::move(processed_data));
|
||||
else
|
||||
startInsertQuery();
|
||||
|
||||
while (readDataNext())
|
||||
{
|
||||
if (!executor.push(std::move(state.block_for_insert)))
|
||||
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
|
||||
}
|
||||
executor.push(std::move(state.block_for_insert));
|
||||
|
||||
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
|
||||
executor.cancel();
|
||||
@ -2040,8 +2034,7 @@ bool TCPHandler::receiveData(bool scalar)
|
||||
QueryPipeline temporary_table_out(storage->write(ASTPtr(), metadata_snapshot, query_context, /*async_insert=*/false));
|
||||
PushingPipelineExecutor executor(temporary_table_out);
|
||||
executor.start();
|
||||
if (!executor.push(block))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot insert into temporary table");
|
||||
executor.push(block);
|
||||
executor.finish();
|
||||
}
|
||||
else if (state.need_receive_data_for_input)
|
||||
|
@ -89,8 +89,7 @@ static void writeBlockConvert(PushingPipelineExecutor & executor, const Block &
|
||||
{
|
||||
Block adopted_block = adoptBlock(executor.getHeader(), block, log);
|
||||
for (size_t i = 0; i < repeats; ++i)
|
||||
if (!executor.push(adopted_block))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data");
|
||||
executor.push(adopted_block);
|
||||
}
|
||||
|
||||
|
||||
@ -409,8 +408,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
|
||||
|
||||
Block adopted_shard_block = adoptBlock(job.executor->getHeader(), shard_block, log);
|
||||
if (!job.executor->push(adopted_shard_block))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send data");
|
||||
job.executor->push(adopted_shard_block);
|
||||
}
|
||||
else // local
|
||||
{
|
||||
|
@ -1069,8 +1069,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
|
||||
auto block_io = interpreter.execute();
|
||||
PushingPipelineExecutor executor(block_io.pipeline);
|
||||
executor.start();
|
||||
if (!executor.push(std::move(block_to_write)))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "StorageBuffer could not write data to destination table");
|
||||
executor.push(std::move(block_to_write));
|
||||
executor.finish();
|
||||
}
|
||||
|
||||
|
@ -98,7 +98,7 @@ std::string writeData(int rows, DB::StoragePtr & table, const DB::ContextPtr con
|
||||
QueryPipeline pipeline(table->write({}, metadata_snapshot, context, /*async_insert=*/false));
|
||||
|
||||
PushingPipelineExecutor executor(pipeline);
|
||||
std::ignore = executor.push(block);
|
||||
executor.push(block);
|
||||
executor.finish();
|
||||
|
||||
return data;
|
||||
|
Loading…
Reference in New Issue
Block a user