Refactor some code.

This commit is contained in:
Nikolai Kochetov 2021-09-09 16:59:15 +03:00
parent 5c220d62b3
commit 47b96add9a
10 changed files with 257 additions and 241 deletions

View File

@ -27,7 +27,7 @@ CheckConstraintsTransform::CheckConstraintsTransform(
const Block & header,
const ConstraintsDescription & constraints_,
ContextPtr context_)
: ISimpleTransform(header, header, false)
: ExceptionKeepingTransform(header, header)
, table_id(table_id_)
, constraints(constraints_)
, expressions(constraints_.getExpressions(context_, header.getNamesAndTypesList()))

View File

@ -1,6 +1,6 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Processors/Transforms/ExceptionKeepingTransform.h>
#include <Storages/ConstraintsDescription.h>
#include <Interpreters/StorageID.h>
@ -12,7 +12,7 @@ namespace DB
* Otherwise just pass block to output unchanged.
*/
class CheckConstraintsTransform final : public ISimpleTransform
class CheckConstraintsTransform final : public ExceptionKeepingTransform
{
public:
CheckConstraintsTransform(

View File

@ -1,6 +1,6 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Processors/Transforms/ExceptionKeepingTransform.h>
#include <Interpreters/ProcessList.h>
@ -9,11 +9,11 @@ namespace DB
/// Proxy class which counts number of written block, rows, bytes
class CountingTransform final : public ISimpleTransform
class CountingTransform final : public ExceptionKeepingTransform
{
public:
explicit CountingTransform(const Block & header, ThreadStatus * thread_status_ = nullptr)
: ISimpleTransform(header, header, false), thread_status(thread_status_) {}
: ExceptionKeepingTransform(header, header), thread_status(thread_status_) {}
String getName() const override { return "CountingTransform"; }

View File

@ -1,175 +1,10 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <Common/ThreadStatus.h>
#include <Common/Stopwatch.h>
#include <common/scope_guard.h>
#include <DataTypes/NestedUtils.h>
#include <iostream>
namespace DB
{
ExceptionKeepingTransformRuntimeData::ExceptionKeepingTransformRuntimeData(
ThreadStatus * thread_status_,
std::string additional_exception_message_)
: thread_status(thread_status_)
, additional_exception_message(std::move(additional_exception_message_))
{
}
ExceptionKeepingTransform::ExceptionKeepingTransform(const Block & in_header, const Block & out_header)
: IProcessor({in_header}, {out_header})
, input(inputs.front()), output(outputs.front())
{
}
IProcessor::Status ExceptionKeepingTransform::prepare()
{
if (!was_on_start_called)
return Status::Ready;
/// Check can output.
if (output.isFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Output port is finished for {}", getName());
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// Output if has data.
if (ready_output)
{
output.pushData(std::move(data));
ready_output = false;
return Status::PortFull;
}
if (!ready_input)
{
if (input.isFinished())
{
if (!was_on_finish_called && !has_exception)
return Status::Ready;
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
data = input.pullData(true);
if (data.exception)
{
has_exception = true;
output.pushData(std::move(data));
return Status::PortFull;
}
if (has_exception)
/// In case of exception, just drop all other data.
/// If transform is stateful, it's state may be broken after exception from transform()
data.chunk.clear();
else
ready_input = true;
}
return Status::Ready;
}
static std::exception_ptr runStep(std::function<void()> step, ExceptionKeepingTransformRuntimeData * runtime_data)
{
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
if (runtime_data && runtime_data->thread_status)
{
/// Change thread context to store individual metrics. Once the work in done, go back to the original thread
runtime_data->thread_status->resetPerformanceCountersLastUsage();
current_thread = runtime_data->thread_status;
}
std::exception_ptr res;
Stopwatch watch;
try
{
step();
}
catch (Exception & exception)
{
// std::cerr << "===== got exception " << getExceptionMessage(exception, false);
if (runtime_data && !runtime_data->additional_exception_message.empty())
exception.addMessage(runtime_data->additional_exception_message);
res = std::current_exception();
}
catch (...)
{
// std::cerr << "===== got exception " << getExceptionMessage(std::current_exception(), false);
res = std::current_exception();
}
if (runtime_data)
{
if (runtime_data->thread_status)
runtime_data->thread_status->updatePerformanceCounters();
runtime_data->elapsed_ms += watch.elapsedMilliseconds();
}
return res;
}
void ExceptionKeepingTransform::work()
{
// std::cerr << "============ Executing " << getName() << std::endl;
if (!was_on_start_called)
{
was_on_start_called = true;
if (auto exception = runStep([this] { onStart(); }, runtime_data.get()))
{
has_exception = true;
ready_output = true;
data.exception = std::move(exception);
}
}
else if (ready_input)
{
ready_input = false;
if (auto exception = runStep([this] { transform(data.chunk); }, runtime_data.get()))
{
// std::cerr << "===== got exception in " << getName() << std::endl;
// std::cerr << getExceptionMessage(exception, true) << std::endl;
has_exception = true;
data.chunk.clear();
data.exception = std::move(exception);
}
if (data.chunk || data.exception)
ready_output = true;
}
else if (!was_on_finish_called)
{
was_on_finish_called = true;
if (auto exception = runStep([this] { onFinish(); }, runtime_data.get()))
{
has_exception = true;
ready_output = true;
data.exception = std::move(exception);
}
}
}
SinkToStorage::SinkToStorage(const Block & header) : ExceptionKeepingTransform(header, header) {}
SinkToStorage::SinkToStorage(const Block & header) : ExceptionKeepingTransform(header, header, false) {}
void SinkToStorage::transform(Chunk & chunk)
{

View File

@ -1,74 +1,10 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Storages/TableLockHolder.h>
#include <Processors/Transforms/ExceptionKeepingTransform.h>
namespace DB
{
class ThreadStatus;
struct ExceptionKeepingTransformRuntimeData
{
ThreadStatus * thread_status = nullptr;
UInt64 elapsed_ms = 0;
std::string additional_exception_message;
ExceptionKeepingTransformRuntimeData(
ThreadStatus * thread_status_,
std::string additional_exception_message_);
};
using ExceptionKeepingTransformRuntimeDataPtr = std::shared_ptr<ExceptionKeepingTransformRuntimeData>;
/// Has one input and one output.
/// Works similarly to ISimpleTransform, but with much care about exceptions.
///
/// If input contain exception, this exception is pushed directly to output port.
/// If input contain data chunk, transform() is called for it.
/// When transform throws exception itself, data chunk is replaced by caught exception.
/// Transformed chunk or newly caught exception is pushed to output.
///
/// There may be any number of exceptions read from input, transform keeps the order.
/// It is expected that output port won't be closed from the other side before all data is processed.
///
/// Method onStart() is called before reading any data.
/// Method onFinish() is called after all data from input is processed, if no exception happened.
/// In case of exception, it is additionally pushed into pipeline.
class ExceptionKeepingTransform : public IProcessor
{
protected:
InputPort & input;
OutputPort & output;
Port::Data data;
bool ready_input = false;
bool ready_output = false;
bool has_exception = false;
bool was_on_start_called = false;
bool was_on_finish_called = false;
//protected:
virtual void transform(Chunk & chunk) = 0;
virtual void onStart() {}
virtual void onFinish() {}
public:
ExceptionKeepingTransform(const Block & in_header, const Block & out_header);
Status prepare() override;
void work() override;
InputPort & getInputPort() { return input; }
OutputPort & getOutputPort() { return output; }
void setRuntimeData(ExceptionKeepingTransformRuntimeDataPtr runtime_data_) { runtime_data = std::move(runtime_data_); }
private:
ExceptionKeepingTransformRuntimeDataPtr runtime_data;
};
/// Sink which is returned from Storage::read.
class SinkToStorage : public ExceptionKeepingTransform
{

View File

@ -0,0 +1,172 @@
#include <Processors/Transforms/ExceptionKeepingTransform.h>
#include <Common/ThreadStatus.h>
#include <Common/Stopwatch.h>
#include <common/scope_guard.h>
#include <iostream>
namespace DB
{
ExceptionKeepingTransformRuntimeData::ExceptionKeepingTransformRuntimeData(
ThreadStatus * thread_status_,
std::string additional_exception_message_)
: thread_status(thread_status_)
, additional_exception_message(std::move(additional_exception_message_))
{
}
ExceptionKeepingTransform::ExceptionKeepingTransform(const Block & in_header, const Block & out_header, bool ignore_on_start_and_finish_)
: IProcessor({in_header}, {out_header})
, input(inputs.front()), output(outputs.front())
, ignore_on_start_and_finish(ignore_on_start_and_finish_)
{
}
IProcessor::Status ExceptionKeepingTransform::prepare()
{
if (!ignore_on_start_and_finish && !was_on_start_called)
return Status::Ready;
/// Check can output.
if (output.isFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Output port is finished for {}", getName());
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// Output if has data.
if (ready_output)
{
output.pushData(std::move(data));
ready_output = false;
return Status::PortFull;
}
if (!ready_input)
{
if (input.isFinished())
{
if (!ignore_on_start_and_finish && !was_on_finish_called && !has_exception)
return Status::Ready;
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
data = input.pullData(true);
if (data.exception)
{
has_exception = true;
output.pushData(std::move(data));
return Status::PortFull;
}
if (has_exception)
/// In case of exception, just drop all other data.
/// If transform is stateful, it's state may be broken after exception from transform()
data.chunk.clear();
else
ready_input = true;
}
return Status::Ready;
}
static std::exception_ptr runStep(std::function<void()> step, ExceptionKeepingTransformRuntimeData * runtime_data)
{
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
if (runtime_data && runtime_data->thread_status)
{
/// Change thread context to store individual metrics. Once the work in done, go back to the original thread
runtime_data->thread_status->resetPerformanceCountersLastUsage();
current_thread = runtime_data->thread_status;
}
std::exception_ptr res;
Stopwatch watch;
try
{
step();
}
catch (Exception & exception)
{
// std::cerr << "===== got exception " << getExceptionMessage(exception, false);
if (runtime_data && !runtime_data->additional_exception_message.empty())
exception.addMessage(runtime_data->additional_exception_message);
res = std::current_exception();
}
catch (...)
{
// std::cerr << "===== got exception " << getExceptionMessage(std::current_exception(), false);
res = std::current_exception();
}
if (runtime_data)
{
if (runtime_data->thread_status)
runtime_data->thread_status->updatePerformanceCounters();
runtime_data->elapsed_ms += watch.elapsedMilliseconds();
}
return res;
}
void ExceptionKeepingTransform::work()
{
// std::cerr << "============ Executing " << getName() << std::endl;
if (!ignore_on_start_and_finish && !was_on_start_called)
{
was_on_start_called = true;
if (auto exception = runStep([this] { onStart(); }, runtime_data.get()))
{
has_exception = true;
ready_output = true;
data.exception = std::move(exception);
}
}
else if (ready_input)
{
ready_input = false;
if (auto exception = runStep([this] { transform(data.chunk); }, runtime_data.get()))
{
// std::cerr << "===== got exception in " << getName() << std::endl;
// std::cerr << getExceptionMessage(exception, true) << std::endl;
has_exception = true;
data.chunk.clear();
data.exception = std::move(exception);
}
if (data.chunk || data.exception)
ready_output = true;
}
else if (!ignore_on_start_and_finish && !was_on_finish_called)
{
was_on_finish_called = true;
if (auto exception = runStep([this] { onFinish(); }, runtime_data.get()))
{
has_exception = true;
ready_output = true;
data.exception = std::move(exception);
}
}
}
}

View File

@ -0,0 +1,73 @@
#pragma once
#include <Processors/IProcessor.h>
namespace DB
{
class ThreadStatus;
struct ExceptionKeepingTransformRuntimeData
{
ThreadStatus * thread_status = nullptr;
UInt64 elapsed_ms = 0;
std::string additional_exception_message;
ExceptionKeepingTransformRuntimeData(
ThreadStatus * thread_status_,
std::string additional_exception_message_);
};
using ExceptionKeepingTransformRuntimeDataPtr = std::shared_ptr<ExceptionKeepingTransformRuntimeData>;
/// Has one input and one output.
/// Works similarly to ISimpleTransform, but with much care about exceptions.
///
/// If input contain exception, this exception is pushed directly to output port.
/// If input contain data chunk, transform() is called for it.
/// When transform throws exception itself, data chunk is replaced by caught exception.
/// Transformed chunk or newly caught exception is pushed to output.
///
/// There may be any number of exceptions read from input, transform keeps the order.
/// It is expected that output port won't be closed from the other side before all data is processed.
///
/// Method onStart() is called before reading any data.
/// Method onFinish() is called after all data from input is processed, if no exception happened.
/// In case of exception, it is additionally pushed into pipeline.
class ExceptionKeepingTransform : public IProcessor
{
protected:
InputPort & input;
OutputPort & output;
Port::Data data;
bool ready_input = false;
bool ready_output = false;
bool has_exception = false;
const bool ignore_on_start_and_finish = true;
bool was_on_start_called = false;
bool was_on_finish_called = false;
//protected:
virtual void transform(Chunk & chunk) = 0;
virtual void onStart() {}
virtual void onFinish() {}
public:
ExceptionKeepingTransform(const Block & in_header, const Block & out_header, bool ignore_on_start_and_finish_ = true);
Status prepare() override;
void work() override;
InputPort & getInputPort() { return input; }
OutputPort & getOutputPort() { return output; }
void setRuntimeData(ExceptionKeepingTransformRuntimeDataPtr runtime_data_) { runtime_data = std::move(runtime_data_); }
private:
ExceptionKeepingTransformRuntimeDataPtr runtime_data;
};
}

View File

@ -10,7 +10,7 @@ Block ExpressionTransform::transformHeader(Block header, const ActionsDAG & expr
ExpressionTransform::ExpressionTransform(const Block & header_, ExpressionActionsPtr expression_)
: ISimpleTransform(header_, transformHeader(header_, expression_->getActionsDAG()), false)
: ExceptionKeepingTransform(header_, transformHeader(header_, expression_->getActionsDAG()))
, expression(std::move(expression_))
{
}

View File

@ -1,5 +1,5 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Processors/Transforms/ExceptionKeepingTransform.h>
namespace DB
{
@ -14,7 +14,7 @@ class ActionsDAG;
* For example: hits * 2 + 3, url LIKE '%yandex%'
* The expression processes each row independently of the others.
*/
class ExpressionTransform : public ISimpleTransform
class ExpressionTransform final : public ExceptionKeepingTransform
{
public:
ExpressionTransform(

View File

@ -6,7 +6,7 @@ namespace DB
SquashingChunksTransform::SquashingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, bool reserve_memory)
: ExceptionKeepingTransform(header, header)
: ExceptionKeepingTransform(header, header, false)
, squashing(min_block_size_rows, min_block_size_bytes, reserve_memory)
{
}