cancel method is noexcept

This commit is contained in:
Sema Checherinda 2024-07-15 17:41:32 +02:00
parent 55724a1bcd
commit 344e5b716d
33 changed files with 70 additions and 64 deletions

View File

@ -243,7 +243,7 @@ public:
}
/// Clear and finish queue
void clearAndFinish()
void clearAndFinish() noexcept
{
{
std::lock_guard lock(queue_mutex);

View File

@ -99,7 +99,6 @@ void CompletedPipelineExecutor::execute()
if (is_cancelled_callback())
{
LOG_INFO(getLogger("CompletedPipelineExecutor"), "execute CancelCallback FULLY_CANCELLED");
data->executor->cancel();
}
}
@ -121,7 +120,6 @@ CompletedPipelineExecutor::~CompletedPipelineExecutor()
{
if (data && data->executor)
{
LOG_INFO(getLogger("CompletedPipelineExecutor"), "~CompletedPipelineExecutor");
data->executor->cancel();
}
}

View File

@ -32,7 +32,7 @@ public:
private:
Chunk read() override;
void onCancelX() override
void onCancel() noexcept override
{
is_stopped = 1;
}

View File

@ -32,7 +32,7 @@ public:
protected:
Chunk read() override;
void onCancelX() override
void onCancel() noexcept override
{
is_stopped = 1;
}

View File

@ -64,7 +64,7 @@ public:
protected:
Chunk read() override;
void onCancelX() override { is_stopped = 1; }
void onCancel() noexcept override { is_stopped = 1; }
private:
void prepareFileReader();

View File

@ -34,7 +34,7 @@ public:
protected:
Chunk read() override;
void onCancelX() override
void onCancel() noexcept override
{
is_stopped = 1;
}

View File

@ -96,7 +96,7 @@ namespace DB
}
void ParallelFormattingOutputFormat::finishAndWait()
void ParallelFormattingOutputFormat::finishAndWait() noexcept
{
emergency_stop = true;

View File

@ -122,7 +122,7 @@ public:
started_prefix = true;
}
void onCancelX() override
void onCancel() noexcept override
{
finishAndWait();
}
@ -268,7 +268,7 @@ private:
bool collected_suffix = false;
bool collected_finalize = false;
void finishAndWait();
void finishAndWait() noexcept;
void onBackgroundException()
{

View File

@ -176,7 +176,7 @@ Chunk ParallelParsingInputFormat::read()
if (background_exception)
{
lock.unlock();
onCancelX();
onCancel();
std::rethrow_exception(background_exception);
}

View File

@ -137,7 +137,7 @@ private:
Chunk read() final;
void onCancelX() final
void onCancel() noexcept final
{
/*
* The format parsers themselves are not being cancelled here, so we'll
@ -292,7 +292,7 @@ private:
first_parser_finished.wait();
}
void finishAndWait()
void finishAndWait() noexcept
{
/// Defending concurrent segmentator thread join
std::lock_guard finish_and_wait_lock(finish_and_wait_mutex);

View File

@ -68,7 +68,7 @@ public:
private:
Chunk read() override;
void onCancelX() override
void onCancel() noexcept override
{
is_stopped = 1;
}

View File

@ -268,7 +268,7 @@ void ParquetBlockOutputFormat::resetFormatterImpl()
staging_bytes = 0;
}
void ParquetBlockOutputFormat::onCancelX()
void ParquetBlockOutputFormat::onCancel() noexcept
{
is_stopped = true;
}

View File

@ -112,7 +112,7 @@ private:
void consume(Chunk) override;
void finalizeImpl() override;
void resetFormatterImpl() override;
void onCancelX() override;
void onCancel() noexcept override;
void writeRowGroup(std::vector<Chunk> chunks);
void writeUsingArrow(std::vector<Chunk> chunks);

View File

@ -65,7 +65,7 @@ public:
private:
Chunk read() override;
void onCancelX() override
void onCancel() noexcept override
{
is_stopped = 1;
}

View File

@ -29,7 +29,7 @@ public:
void setRowsBeforeLimit(size_t rows_before_limit) override;
void onCancelX() override
void onCancel() noexcept override
{
queue.clearAndFinish();
}

View File

@ -9,14 +9,14 @@
namespace DB
{
void IProcessor::cancel()
void IProcessor::cancel() noexcept
{
bool already_cancelled = is_cancelled.exchange(true, std::memory_order_acq_rel);
if (already_cancelled)
return;
onCancelX();
onCancel();
}
String IProcessor::debug() const

View File

@ -238,7 +238,7 @@ public:
/// In case if query was cancelled executor will wait till all processors finish their jobs.
/// Generally, there is no reason to check this flag. However, it may be reasonable for long operations (e.g. i/o).
bool isCancelled() const { return is_cancelled.load(std::memory_order_acquire); }
void cancel();
void cancel() noexcept;
/// Additional method which is called in case if ports were updated while work() method.
/// May be used to stop execution in rare cases.
@ -363,7 +363,7 @@ public:
virtual void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr /* counter */) {}
protected:
virtual void onCancelX() {}
virtual void onCancel() noexcept {}
std::atomic<bool> is_cancelled{false};

View File

@ -4,6 +4,8 @@
#include <QueryPipeline/StreamLocalLimits.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Common/Exception.h>
#include <Common/Logger.h>
namespace DB
{
@ -182,9 +184,16 @@ std::optional<Chunk> RemoteSource::tryGenerate()
return chunk;
}
void RemoteSource::onCancelX()
void RemoteSource::onCancel() noexcept
{
query_executor->cancel();
try
{
query_executor->cancel();
}
catch (...)
{
tryLogCurrentException(getLogger("RemoteSource"), "Error occurs on cancelation.");
}
}
void RemoteSource::onUpdatePorts()

View File

@ -36,7 +36,7 @@ public:
protected:
std::optional<Chunk> tryGenerate() override;
void onCancelX() override;
void onCancel() noexcept override;
private:
bool was_query_sent = false;

View File

@ -375,7 +375,7 @@ public:
return prepareTwoLevel();
}
void onCancelX() override
void onCancel() noexcept override
{
shared_data->is_cancelled.store(true, std::memory_order_seq_cst);
}

View File

@ -557,7 +557,6 @@ void TCPHandler::runImpl()
if (getQueryCancellationStatus() == CancellationStatus::FULLY_CANCELLED)
{
LOG_INFO(log, "CancelCallback FULLY_CANCELLED");
return true;
}

View File

@ -596,7 +596,7 @@ void DistributedSink::onFinish()
}
}
void DistributedSink::onCancelX()
void DistributedSink::onCancel() noexcept
{
std::lock_guard lock(execution_mutex);
if (pool && !pool->finished())
@ -607,14 +607,26 @@ void DistributedSink::onCancelX()
}
catch (...)
{
tryLogCurrentException(storage.log);
tryLogCurrentException(storage.log, "Error occurs on cancelation.");
}
}
for (auto & shard_jobs : per_shard_jobs)
{
for (JobReplica & job : shard_jobs.replicas_jobs)
if (job.executor)
job.executor->cancel();
{
try
{
if (job.executor)
job.executor->cancel();
}
catch (...)
{
tryLogCurrentException(storage.log, "Error occurs on cancelation.");
}
}
}
}

View File

@ -53,7 +53,7 @@ public:
void onFinish() override;
private:
void onCancelX() override;
void onCancel() noexcept override;
IColumn::Selector createSelector(const Block & source_block) const;

View File

@ -54,7 +54,7 @@ public:
String getName() const override { return "LiveViewEventsSource"; }
void onCancelX() override
void onCancel() noexcept override
{
if (storage->shutdown_called)
return;

View File

@ -36,7 +36,7 @@ public:
String getName() const override { return "LiveViewSource"; }
void onCancelX() override
void onCancel() noexcept override
{
if (storage->shutdown_called)
return;

View File

@ -48,7 +48,7 @@ public:
ChunkAndProgress read();
void cancel() { is_cancelled = true; }
void cancel() noexcept { is_cancelled = true; }
const MergeTreeReaderSettings & getSettings() const { return reader_settings; }

View File

@ -28,9 +28,6 @@ struct MergeTreeSink::DelayedChunk
MergeTreeSink::~MergeTreeSink()
{
size_t addr = delayed_chunk ? size_t(delayed_chunk.get()) : 0;
LOG_INFO(storage.log, "~ReplicatedMergeTreeSinkImpl, delayed_chunk {}, called from {}", addr, StackTrace().toString());
if (!delayed_chunk)
return;
@ -40,8 +37,6 @@ MergeTreeSink::~MergeTreeSink()
}
delayed_chunk.reset();
LOG_INFO(storage.log, "~ReplicatedMergeTreeSinkImpl end");
}
MergeTreeSink::MergeTreeSink(

View File

@ -149,7 +149,7 @@ std::string MergeTreeSource::getName() const
return processor->getName();
}
void MergeTreeSource::onCancelX()
void MergeTreeSource::onCancel() noexcept
{
processor->cancel();
}

View File

@ -26,7 +26,7 @@ public:
protected:
std::optional<Chunk> tryGenerate() override;
void onCancelX() override;
void onCancel() noexcept override;
private:
MergeTreeSelectProcessorPtr processor;

View File

@ -155,9 +155,6 @@ ReplicatedMergeTreeSinkImpl<async_insert>::ReplicatedMergeTreeSinkImpl(
template<bool async_insert>
ReplicatedMergeTreeSinkImpl<async_insert>::~ReplicatedMergeTreeSinkImpl()
{
size_t addr = delayed_chunk ? size_t(delayed_chunk.get()) : 0;
LOG_INFO(log, "~ReplicatedMergeTreeSinkImpl, delayed_chunk {}, called from {}", addr, StackTrace().toString());
if (!delayed_chunk)
return;
@ -167,8 +164,6 @@ ReplicatedMergeTreeSinkImpl<async_insert>::~ReplicatedMergeTreeSinkImpl()
}
delayed_chunk.reset();
LOG_INFO(log, "~ReplicatedMergeTreeSinkImpl end");
}
template<bool async_insert>
@ -273,8 +268,6 @@ size_t ReplicatedMergeTreeSinkImpl<async_insert>::checkQuorumPrecondition(const
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
{
LOG_INFO(log, "consume");
if (num_blocks_processed > 0)
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event, context, false);
@ -448,9 +441,6 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
template<>
void ReplicatedMergeTreeSinkImpl<false>::finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper)
{
size_t addr = delayed_chunk ? size_t(delayed_chunk.get()) : 0;
LOG_INFO(log, "finishDelayedChunk {}", addr);
if (!delayed_chunk)
return;
@ -480,22 +470,16 @@ void ReplicatedMergeTreeSinkImpl<false>::finishDelayedChunk(const ZooKeeperWithF
{
auto counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(partition.part_counters.getPartiallyAtomicSnapshot());
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, partition.elapsed_ns, counters_snapshot), ExecutionStatus::fromCurrentException("", true));
size_t addr1 = delayed_chunk ? size_t(delayed_chunk.get()) : 0;
LOG_INFO(log, "finishDelayedChunk exception, delayed_chunk {}", addr1);
throw;
}
}
delayed_chunk.reset();
LOG_INFO(log, "finishDelayedChunk end, delayed_chunk {}", bool(delayed_chunk));
}
template<>
void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper)
{
if (!delayed_chunk)
return;

View File

@ -1,6 +1,7 @@
#include <Storages/MessageQueueSink.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
namespace DB
@ -79,4 +80,16 @@ void MessageQueueSink::consume(Chunk chunk)
}
void MessageQueueSink::onCancel() noexcept
{
try
{
onFinish();
}
catch (...)
{
tryLogCurrentException(getLogger("MessageQueueSink"), "Error occurs on cancelation.");
}
}
}

View File

@ -33,17 +33,13 @@ public:
const String & storage_name_,
const ContextPtr & context_);
~MessageQueueSink() override
{
onFinish();
}
String getName() const override { return storage_name + "Sink"; }
void consume(Chunk chunk) override;
void onStart() override;
void onFinish() override;
void onCancel() noexcept override;
void onException(std::exception_ptr /* exception */) override { onFinish(); }
protected:

View File

@ -2,8 +2,8 @@
#include <Formats/FormatFactory.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/isValidUTF8.h>
#include "base/defines.h"
#include <Storages/ObjectStorage/Utils.h>
#include <base/defines.h>
namespace DB
{