Rewrite PushingToViewsBlockOutputStream part 3.

This commit is contained in:
Nikolai Kochetov 2021-08-31 16:50:07 +03:00
parent eccee47e72
commit 5be05c2ef9
5 changed files with 121 additions and 42 deletions

View File

@ -35,13 +35,18 @@ namespace DB
struct ViewsData
{
std::vector<ViewRuntimeData> views;
std::list<ViewRuntimeData> views;
ContextPtr context;
/// In case of exception happened while inserting into main table, it is pushed to pipeline.
/// Remember the first one, we should keep them after view processing.
std::atomic_bool has_exception = false;
std::exception_ptr first_exception;
ViewsData(std::list<ViewRuntimeData> views_, ContextPtr context_)
: views(std::move(views_)), context(std::move(context_))
{
}
};
using ViewsDataPtr = std::shared_ptr<ViewsData>;
@ -113,7 +118,7 @@ private:
ViewsDataPtr views_data;
};
static void logQueryViews(std::vector<ViewRuntimeData> & views, ContextPtr context);
static void logQueryViews(std::list<ViewRuntimeData> & views, ContextPtr context);
class FinalizingViewsTransform final : public IProcessor
{
@ -192,11 +197,12 @@ public:
void work() override
{
size_t num_views = statuses.size();
for (size_t i = 0; i < num_views; ++i)
size_t i = 0;
for (auto & view : views_data->views)
{
auto & view = views_data->views[i];
auto & status = statuses[i];
++i;
if (status.exception)
{
if (!any_exception)
@ -268,6 +274,11 @@ Drain buildPushingToViewsDrainImpl(
{
checkStackSize();
/// If we don't write directly to the destination
/// then expect that we're inserting with precalculated virtual columns
auto storage_header = no_destination ? metadata_snapshot->getSampleBlockWithVirtuals(storage->getVirtuals())
: metadata_snapshot->getSampleBlock();
/** TODO This is a very important line. At any insertion into the table one of streams should own lock.
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
* but it's clear that here is not the best place for this functionality.
@ -304,7 +315,8 @@ Drain buildPushingToViewsDrainImpl(
insert_context->setSetting("min_insert_block_size_bytes", insert_settings.min_insert_block_size_bytes_for_materialized_views.value);
}
std::vector<ViewRuntimeData> views;
std::list<ViewRuntimeData> views;
std::vector<Drain> drains;
for (const auto & database_table : dependencies)
{
@ -349,7 +361,7 @@ Drain buildPushingToViewsDrainImpl(
ASTPtr insert_query_ptr(insert.release());
InterpreterInsertQuery interpreter(insert_query_ptr, insert_context);
BlockIO io = interpreter.execute();
out = io.out;
out = std::move(io.out);
}
else if (const auto * live_view = dynamic_cast<const StorageLiveView *>(dependent_table.get()))
{
@ -393,8 +405,21 @@ Drain buildPushingToViewsDrainImpl(
0,
std::chrono::system_clock::now(),
QueryViewsLogElement::ViewStatus::EXCEPTION_BEFORE_START};
views.emplace_back(ViewRuntimeData{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)});
views.emplace_back(ViewRuntimeData{
std::move(query),
out.getHeader(),
database_table,
dependent_table,
dependent_metadata_snapshot,
select_context,
nullptr,
std::move(runtime_stats)});
auto executing_inner_query = std::make_shared<ExecutingInnerQueryFromViewTransform>(storage_header, views.back());
out.addTransform(std::move(executing_inner_query));
drains.emplace_back(std::move(out));
/// Add the view to the query access info so it can appear in system.query_log
if (!no_destination)
@ -404,14 +429,24 @@ Drain buildPushingToViewsDrainImpl(
}
}
size_t num_views = views.size();
if (num_views != 0)
{
auto views_data = std::make_shared<ViewsData>(std::move(views), context);
auto copying_data = std::make_shared<CopyingDataToViewsTransform>(storage_header, std::move(views_data));
auto it = copying_data->getOutputs().begin();
for (auto & drain : drains)
connect(*it, drain.getPort());
}
/// Do not push to destination table if the flag is set
if (!no_destination)
{
auto sink = storage->write(query_ptr, storage->getInMemoryMetadataPtr(), context);
metadata_snapshot->check(sink->getHeader().getColumnsWithTypeAndName());
auto replicated_output = dynamic_cast<ReplicatedMergeTreeSink *>(sink.get());
}
}
@ -632,8 +667,6 @@ void PushingToViewsBlockOutputStream::flush()
static void process(Block & block, ViewRuntimeData & view)
{
const auto & storage = view.storage;
const auto & metadata_snapshot = view.metadata_snapshot;
const auto & context = view.context;
/// We create a table with the same name as original table and the same alias columns,
@ -641,10 +674,10 @@ static void process(Block & block, ViewRuntimeData & view)
/// InterpreterSelectQuery will do processing of alias columns.
auto local_context = Context::createCopy(context);
local_context->addViewSource(StorageValues::create(
storage->getStorageID(),
metadata_snapshot->getColumns(),
view.table_id,
view.metadata_snapshot->getColumns(),
block,
storage->getVirtuals()));
view.storage->getVirtuals()));
/// We need keep InterpreterSelectQuery, until the processing will be finished, since:
///
@ -716,7 +749,7 @@ void PushingToViewsBlockOutputStream::checkExceptionsInViews()
}
}
static void logQueryViews(std::vector<ViewRuntimeData> & views, ContextPtr context)
static void logQueryViews(std::list<ViewRuntimeData> & views, ContextPtr context)
{
const auto & settings = context->getSettingsRef();
const UInt64 min_query_duration = settings.log_queries_min_query_duration_ms.totalMilliseconds();

View File

@ -81,7 +81,7 @@ private:
class ExecutingInnerQueryFromViewTransform final : public ExceptionKeepingTransform
{
public:
ExecutingInnerQueryFromViewTransform(const Block & header, ViewRuntimeData view_data)
ExecutingInnerQueryFromViewTransform(const Block & header, ViewRuntimeData & view_data)
: ExceptionKeepingTransform(header, view_data.sample_block)
, view(std::move(view_data))
{
@ -94,7 +94,7 @@ protected:
void onFinish() override;
private:
ViewRuntimeData view;
ViewRuntimeData & view;
};
}

View File

@ -1,6 +1,7 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <Common/ThreadStatus.h>
#include <Common/Scope
#include <Common/Stopwatch.h>
#include <common/scope_guard.h>
namespace DB
{
@ -40,7 +41,7 @@ IProcessor::Status ExceptionKeepingTransform::prepare()
{
if (input.isFinished())
{
if (!was_on_finish_called)
if (!was_on_finish_called && !has_exception)
return Status::Ready;
output.finish();
@ -56,6 +57,7 @@ IProcessor::Status ExceptionKeepingTransform::prepare()
if (data.exception)
{
has_exception = true;
output.pushData(std::move(data));
return Status::PortFull;
}
@ -66,10 +68,46 @@ IProcessor::Status ExceptionKeepingTransform::prepare()
return Status::Ready;
}
static std::exception_ptr runStep(std::function<void()> func, ExceptionKeepingTransform::RuntimeData * runtime_data)
static std::exception_ptr runStep(std::function<void()> step, ExceptionKeepingTransform::RuntimeData * runtime_data)
{
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
if (runtime_data && runtime_data->thread_status)
{
/// Change thread context to store individual metrics. Once the work in done, go back to the original thread
runtime_data->thread_status->resetPerformanceCountersLastUsage();
current_thread = runtime_data->thread_status.get();
}
std::exception_ptr res;
Stopwatch watch;
try
{
step();
}
catch (Exception & exception)
{
if (runtime_data && !runtime_data->additional_exception_message.empty())
exception.addMessage(runtime_data->additional_exception_message);
res = std::current_exception();
}
catch (...)
{
res = std::current_exception();
}
if (runtime_data)
{
if (runtime_data->thread_status)
runtime_data->thread_status->updatePerformanceCounters();
runtime_data->elapsed_ms += watch.elapsedMilliseconds();
}
return res;
}
void ExceptionKeepingTransform::work()
@ -77,35 +115,37 @@ void ExceptionKeepingTransform::work()
if (!was_on_start_called)
{
was_on_start_called = true;
onStart();
}
if (ready_input)
if (auto exception = runStep([this] { onStart(); }, runtime_data.get()))
{
has_exception = true;
ready_output = true;
data.exception = std::move(exception);
}
}
else if (ready_input)
{
ready_input = false;
ready_output = true;
try
{
transform(data.chunk);
}
catch (...)
if (auto exception = runStep([this] { transform(data.chunk); }, runtime_data.get()))
{
has_exception = true;
data.chunk.clear();
data.exception = std::current_exception();
data.exception = std::move(exception);
}
if (data.chunk || data.exception)
ready_output = true;
}
else if (!was_on_finish_called)
{
was_on_finish_called = true;
try
if (auto exception = runStep([this] { onFinish(); }, runtime_data.get()))
{
onFinish();
}
catch (...)
{
ready_input = true;
data.exception = std::current_exception();
has_exception = true;
ready_output = true;
data.exception = std::move(exception);
}
}
}
@ -115,6 +155,8 @@ SinkToStorage::SinkToStorage(const Block & header) : ExceptionKeepingTransform(h
void SinkToStorage::transform(Chunk & chunk)
{
consume(chunk.clone());
if (lastBlockIsDuplicate())
chunk.clear();
}
}

View File

@ -19,9 +19,7 @@ class ThreadStatus;
/// It is expected that output port won't be closed from the other side before all data is processed.
///
/// Method onStart() is called before reading any data.
/// Exception from it is not pushed into pipeline, but thrown immediately.
///
/// Method onFinish() is called after all data from input is processed.
/// Method onFinish() is called after all data from input is processed, if no exception happened.
/// In case of exception, it is additionally pushed into pipeline.
class ExceptionKeepingTransform : public IProcessor
{
@ -32,6 +30,7 @@ private:
bool ready_input = false;
bool ready_output = false;
bool has_exception = false;
bool was_on_start_called = false;
bool was_on_finish_called = false;
@ -76,6 +75,7 @@ public:
protected:
virtual void consume(Chunk chunk) = 0;
virtual bool lastBlockIsDuplicate() const { return false; }
private:
std::vector<TableLockHolder> table_locks;

View File

@ -44,8 +44,12 @@ public:
void writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
/// For proper deduplication in MaterializedViews
bool lastBlockIsDuplicate() const
bool lastBlockIsDuplicate() const override
{
/// If MV is responsible for deduplication, block is not considered duplicating.
if (context->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views)
return false;
return last_block_is_duplicate;
}