Rewrite PushingToViewsBlockOutputStream part 1.

This commit is contained in:
Nikolai Kochetov 2021-08-23 13:46:52 +03:00
parent 6b1030c9b8
commit d7e78f3ea9
5 changed files with 378 additions and 38 deletions

View File

@ -210,6 +210,7 @@ add_object_library(clickhouse_processors_formats Processors/Formats)
add_object_library(clickhouse_processors_formats_impl Processors/Formats/Impl)
add_object_library(clickhouse_processors_transforms Processors/Transforms)
add_object_library(clickhouse_processors_sources Processors/Sources)
add_object_library(clickhouse_processors_sinks Processors/Sinks)
add_object_library(clickhouse_processors_merges Processors/Merges)
add_object_library(clickhouse_processors_merges_algorithms Processors/Merges/Algorithms)
add_object_library(clickhouse_processors_queryplan Processors/QueryPlan)

View File

@ -10,6 +10,9 @@
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Storages/LiveView/StorageLiveView.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Storages/StorageMaterializedView.h>
@ -30,6 +33,115 @@
namespace DB
{
class ExceptionCollectingTransform : public IProcessor
{
public:
ExceptionCollectingTransform(const Block & header, size_t num_inputs)
: IProcessor(InputPorts(num_inputs, header), {header})
, output(outputs.front())
{
has_exception.assign(num_inputs, false);
}
Status prepare() override
{
if (output.isFinished())
{
for (auto & input : inputs)
input.close();
return Status::Finished;
}
if (!output.canPush())
return Status::PortFull;
size_t num_finished = 0;
size_t pos = 0;
for (auto & input : inputs)
{
auto i = pos;
++pos;
if (input.isFinished())
{
++num_finished;
continue;
}
input.setNeeded();
if (input.hasData())
{
auto data = input.pullData();
if (data.exception)
{
if (i == 0 || !has_exception[i])
{
has_exception[i] = true;
output.pushData(std::move(data));
return Status::PortFull;
}
}
if (input.isFinished())
++num_finished;
}
}
if (num_finished == inputs.size())
{
output.finish();
return Status::Finished;
}
return Status::NeedData;
}
private:
OutputPort & output;
std::vector<bool> has_exception;
};
class ExceptionHandlingSink : public IProcessor
{
public:
explicit ExceptionHandlingSink(Block header)
: IProcessor({std::move(header)}, {})
, input(inputs.front())
{
}
Status prepare() override
{
while (!input.isFinished())
{
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
auto data = input.pullData();
if (data.exception)
exceptions.emplace_back(std::move(data.exception));
}
if (!exceptions.empty())
return Status::Ready;
return Status::Finished;
}
void work() override
{
auto exception = std::move(exceptions.at(0));
exceptions.clear();
std::rethrow_exception(std::move(exception));
}
private:
InputPort & input;
std::vector<std::exception_ptr> exceptions;
};
Drain buildPushingToViewsDrainImpl(
const StoragePtr & storage,
const StorageMetadataPtr & metadata_snapshot,
@ -179,12 +291,11 @@ Drain buildPushingToViewsDrainImpl(
/// Do not push to destination table if the flag is set
if (!no_destination)
{
auto sink = storage->write(query_ptr, storage->getInMemoryMetadataPtr(), getContext());
auto sink = storage->write(query_ptr, storage->getInMemoryMetadataPtr(), context);
metadata_snapshot->check(sink->getPort().getHeader().getColumnsWithTypeAndName());
metadata_snapshot->check(sink->getHeader().getColumnsWithTypeAndName());
replicated_output = dynamic_cast<ReplicatedMergeTreeSink *>(sink.get());
output = std::make_shared<PushingToSinkBlockOutputStream>(std::move(sink));
auto replicated_output = dynamic_cast<ReplicatedMergeTreeSink *>(sink.get());
}
}
@ -403,9 +514,21 @@ void PushingToViewsBlockOutputStream::flush()
view.out->flush();
}
void PushingToViewsBlockOutputStream::process(const Block & block, ViewRuntimeData & view)
static void process(Block & block, ViewRuntimeData & view)
{
BlockInputStreamPtr in;
const auto & storage = view.storage;
const auto & metadata_snapshot = view.metadata_snapshot;
const auto & context = view.context;
/// We create a table with the same name as original table and the same alias columns,
/// but it will contain single block (that is INSERT-ed into main table).
/// InterpreterSelectQuery will do processing of alias columns.
auto local_context = Context::createCopy(context);
local_context->addViewSource(StorageValues::create(
storage->getStorageID(),
metadata_snapshot->getColumns(),
block,
storage->getVirtuals()));
/// We need keep InterpreterSelectQuery, until the processing will be finished, since:
///
@ -418,45 +541,76 @@ void PushingToViewsBlockOutputStream::process(const Block & block, ViewRuntimeDa
/// (the problem raises only when function uses context from the
/// execute*() method, like FunctionDictGet do)
/// - These objects live inside query pipeline (DataStreams) and the reference become dangling.
std::optional<InterpreterSelectQuery> select;
InterpreterSelectQuery select(view.query, local_context, SelectQueryOptions());
if (view.runtime_stats.type == QueryViewsLogElement::ViewType::MATERIALIZED)
{
/// We create a table with the same name as original table and the same alias columns,
/// but it will contain single block (that is INSERT-ed into main table).
/// InterpreterSelectQuery will do processing of alias columns.
auto io = select.execute();
io.pipeline.resize(1);
auto local_context = Context::createCopy(select_context);
local_context->addViewSource(
StorageValues::create(storage->getStorageID(), metadata_snapshot->getColumns(), block, storage->getVirtuals()));
select.emplace(view.query, local_context, SelectQueryOptions());
in = std::make_shared<MaterializingBlockInputStream>(select->execute().getInputStream());
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
io.pipeline.addTransform(std::make_shared<SquashingChunksTransform>(
io.pipeline.getHeader(),
context->getSettingsRef().min_insert_block_size_rows,
context->getSettingsRef().min_insert_block_size_bytes));
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
in = std::make_shared<SquashingBlockInputStream>(
in, getContext()->getSettingsRef().min_insert_block_size_rows, getContext()->getSettingsRef().min_insert_block_size_bytes);
in = std::make_shared<ConvertingBlockInputStream>(in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name);
}
else
in = std::make_shared<OneBlockInputStream>(block);
auto converting = ActionsDAG::makeConvertingActions(
io.pipeline.getHeader().getColumnsWithTypeAndName(),
view.sample_block.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
in->setProgressCallback([this](const Progress & progress)
io.pipeline.addTransform(std::make_shared<ExpressionTransform>(
io.pipeline.getHeader(),
std::make_shared<ExpressionActions>(std::move(converting))));
io.pipeline.setProgressCallback([context](const Progress & progress)
{
CurrentThread::updateProgressIn(progress);
this->onProgress(progress);
if (auto callback = context->getProgressCallback())
callback(progress);
});
in->readPrefix();
PullingPipelineExecutor executor(io.pipeline);
if (!executor.pull(block))
throw Exception(ErrorCodes::LOGICAL_ERROR, "No nothing is returned from view inner query {}", view.query);
while (Block result_block = in->read())
if (executor.pull(block))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Single chunk is expected from view inner query {}", view.query);
}
void ExecutingInnerQueryFromViewTransform::transform(Chunk & chunk)
{
Stopwatch watch;
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
if (view.runtime_stats.thread_status)
{
Nested::validateArraySizes(result_block);
view.out->write(result_block);
/// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread
view.runtime_stats.thread_status->resetPerformanceCountersLastUsage();
current_thread = view.runtime_stats.thread_status.get();
}
in->readSuffix();
try
{
auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns());
process(block, view);
chunk.setColumns(block.getColumns(), block.rows());
}
catch (Exception & ex)
{
ex.addMessage("while pushing to view " + view.table_id.getNameForLogs());
view.setException(std::current_exception());
}
catch (...)
{
view.setException(std::current_exception());
}
if (view.runtime_stats.thread_status)
view.runtime_stats.thread_status->updatePerformanceCounters();
view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds();
}
void PushingToViewsBlockOutputStream::checkExceptionsInViews()

View File

@ -6,6 +6,7 @@
#include <Storages/IStorage.h>
#include <Common/Stopwatch.h>
#include <Processors/Drain.h>
#include <Processors/ISimpleTransform.h>
namespace Poco
{
@ -20,8 +21,14 @@ class ReplicatedMergeTreeSink;
struct ViewRuntimeData
{
const ASTPtr query;
Block sample_block;
StorageID table_id;
Drain out;
StoragePtr storage;
StorageMetadataPtr metadata_snapshot;
ContextPtr context;
std::exception_ptr exception;
QueryViewsLogElement::ViewRuntimeStats runtime_stats;
@ -71,5 +78,22 @@ private:
void logQueryViews();
};
class ExecutingInnerQueryFromViewTransform final : public ExceptionKeepingTransform
{
public:
ExecutingInnerQueryFromViewTransform(const Block & header, ViewRuntimeData view_data)
: ExceptionKeepingTransform(header, view_data.sample_block)
, view(std::move(view_data))
{
}
String getName() const override { return "ExecutingInnerQueryFromView"; }
protected:
void transform(Chunk & chunk) override;
private:
ViewRuntimeData view;
};
}

View File

@ -0,0 +1,112 @@
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
{
ExceptionKeepingTransform::ExceptionKeepingTransform(const Block & in_header, const Block & out_header)
: IProcessor({in_header}, {out_header})
, input(inputs.front()), output(outputs.front())
{
}
IProcessor::Status ExceptionKeepingTransform::prepare()
{
if (!was_on_start_called)
return Status::Ready;
/// Check can output.
if (output.isFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Output port is finished for {}", getName());
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// Output if has data.
if (ready_output)
{
output.pushData(std::move(data));
ready_output = false;
return Status::PortFull;
}
if (!ready_input)
{
if (input.isFinished())
{
if (!was_on_finish_called)
return Status::Ready;
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
data = input.pullData(true);
if (data.exception)
{
output.pushData(std::move(data));
return Status::PortFull;
}
ready_input = true;
}
return Status::Ready;
}
void ExceptionKeepingTransform::work()
{
if (!was_on_start_called)
{
was_on_start_called = true;
onStart();
}
if (ready_input)
{
ready_input = false;
ready_output = true;
try
{
transform(data.chunk);
}
catch (...)
{
data.chunk.clear();
data.exception = std::current_exception();
}
}
else if (!was_on_finish_called)
{
was_on_finish_called = true;
try
{
onFinish();
}
catch (...)
{
ready_input = true;
data.exception = std::current_exception();
}
}
}
SinkToStorage::SinkToStorage(const Block & header) : ExceptionKeepingTransform(header, header) {}
void SinkToStorage::transform(Chunk & chunk)
{
consume(chunk.clone());
}
}

View File

@ -1,21 +1,70 @@
#pragma once
#include <Processors/ISink.h>
#include <Processors/IProcessor.h>
#include <Storages/TableLockHolder.h>
namespace DB
{
/// Has one input and one output.
/// Works similarly to ISimpleTransform, but with much care about exceptions.
///
/// If input contain exception, this exception is pushed directly to output port.
/// If input contain data chunk, transform() is called for it.
/// When transform throws exception itself, data chunk is replaced by caught exception.
/// Transformed chunk or newly caught exception is pushed to output.
///
/// There may be any number of exceptions read from input, transform keeps the order.
/// It is expected that output port won't be closed from the other side before all data is processed.
///
/// Method onStart() is called before reading any data.
/// Exception from it is not pushed into pipeline, but thrown immediately.
///
/// Method onFinish() is called after all data from input is processed.
/// In case of exception, it is additionally pushed into pipeline.
class ExceptionKeepingTransform : public IProcessor
{
private:
InputPort & input;
OutputPort & output;
Port::Data data;
bool ready_input = false;
bool ready_output = false;
bool was_on_start_called = false;
bool was_on_finish_called = false;
protected:
virtual void transform(Chunk & chunk) = 0;
virtual void onStart() {}
virtual void onFinish() {}
public:
ExceptionKeepingTransform(const Block & in_header, const Block & out_header);
Status prepare() override;
void work() override;
InputPort & getInputPort() { return input; }
OutputPort & getOutputPort() { return output; }
};
/// Sink which is returned from Storage::read.
/// The same as ISink, but also can hold table lock.
class SinkToStorage : public ISink
class SinkToStorage : public ExceptionKeepingTransform
{
public:
using ISink::ISink;
explicit SinkToStorage(const Block & header);
const Block & getHeader() const { return inputs.front().getHeader(); }
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
protected:
virtual void consume(Chunk chunk) = 0;
private:
std::vector<TableLockHolder> table_locks;
void transform(Chunk & chunk) override;
};
using SinkToStoragePtr = std::shared_ptr<SinkToStorage>;