Rewrite PushingToViewsBlockOutputStream part 4.

This commit is contained in:
Nikolai Kochetov 2021-09-01 21:41:50 +03:00
parent 5be05c2ef9
commit 79ecb6667e
21 changed files with 446 additions and 587 deletions

View File

@ -7,14 +7,14 @@ namespace DB
{
AddingDefaultBlockOutputStream::AddingDefaultBlockOutputStream(
const BlockOutputStreamPtr & output_,
const Block & header_,
const Block & in_header,
const Block & out_header,
const ColumnsDescription & columns_,
ContextPtr context_,
bool null_as_default_)
: output(output_), header(header_)
{
auto dag = addMissingDefaults(header_, output->getHeader().getNamesAndTypesList(), columns_, context_, null_as_default_);
auto dag = addMissingDefaults(in_header, output_header.getNamesAndTypesList(), columns_, context_, null_as_default_);
adding_defaults_actions = std::make_shared<ExpressionActions>(std::move(dag), ExpressionActionsSettings::fromContext(context_, CompileExpressions::yes));
}
@ -25,19 +25,6 @@ void AddingDefaultBlockOutputStream::write(const Block & block)
output->write(copy);
}
void AddingDefaultBlockOutputStream::flush()
{
output->flush();
}
void AddingDefaultBlockOutputStream::writePrefix()
{
output->writePrefix();
}
void AddingDefaultBlockOutputStream::writeSuffix()
{
output->writeSuffix();
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/ISimpleTransform.h>
#include <Columns/ColumnConst.h>
#include <Storages/ColumnsDescription.h>
@ -20,18 +20,17 @@ class Context;
* Also the stream can substitute NULL into DEFAULT value in case of INSERT SELECT query (null_as_default) if according setting is 1.
* All three types of columns are materialized (not constants).
*/
class AddingDefaultBlockOutputStream : public IBlockOutputStream
class AddingMissingDefaultsTransform : public ISimpleTransform
{
public:
AddingDefaultBlockOutputStream(
const BlockOutputStreamPtr & output_,
const Block & header_,
AddingMissingDefaultsTransform(
const Block & in_header,
const Block & out_header,
const ColumnsDescription & columns_,
ContextPtr context_,
bool null_as_default_ = false);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void transform(Chunk & chunk) override;
void flush() override;
@ -39,8 +38,6 @@ public:
void writeSuffix() override;
private:
BlockOutputStreamPtr output;
const Block header;
ExpressionActionsPtr adding_defaults_actions;
};

View File

@ -5,6 +5,7 @@
#include <functional>
#include <Processors/QueryPipeline.h>
#include <Processors/Chain.h>
namespace DB
@ -25,7 +26,7 @@ struct BlockIO
std::shared_ptr<ProcessListEntry> process_list_entry;
BlockOutputStreamPtr out;
Chain out;
BlockInputStreamPtr in;
QueryPipeline pipeline;

View File

@ -22,26 +22,24 @@ namespace ErrorCodes
}
CheckConstraintsBlockOutputStream::CheckConstraintsBlockOutputStream(
CheckConstraintsTransform::CheckConstraintsTransform(
const StorageID & table_id_,
const BlockOutputStreamPtr & output_,
const Block & header_,
const Block & header,
const ConstraintsDescription & constraints_,
ContextPtr context_)
: table_id(table_id_),
output(output_),
header(header_),
constraints(constraints_),
expressions(constraints_.getExpressions(context_, header.getNamesAndTypesList()))
: ISimpleTransform(header, header, false)
, table_id(table_id_)
, constraints(constraints_)
, expressions(constraints_.getExpressions(context_, header.getNamesAndTypesList()))
{
}
void CheckConstraintsBlockOutputStream::write(const Block & block)
void CheckConstraintsTransform::transform(Chunk & chunk)
{
if (block.rows() > 0)
if (chunk.getNumRows() > 0)
{
Block block_to_calculate = block;
Block block_to_calculate = getInputPort().getHeader().cloneWithColumns(chunk.getColumns());
for (size_t i = 0; i < expressions.size(); ++i)
{
auto constraint_expr = expressions[i];
@ -101,7 +99,7 @@ void CheckConstraintsBlockOutputStream::write(const Block & block)
column_values_msg.reserve(approx_bytes_for_col * related_columns.size());
for (const auto & name : related_columns)
{
const IColumn & column = *block.getByName(name).column;
const IColumn & column = *chunk.getColumns()[getInputPort().getHeader().getPositionByName(name)];
assert(row_idx < column.size());
if (!first)
@ -124,23 +122,7 @@ void CheckConstraintsBlockOutputStream::write(const Block & block)
}
}
output->write(block);
rows_written += block.rows();
}
void CheckConstraintsBlockOutputStream::flush()
{
output->flush();
}
void CheckConstraintsBlockOutputStream::writePrefix()
{
output->writePrefix();
}
void CheckConstraintsBlockOutputStream::writeSuffix()
{
output->writeSuffix();
rows_written += chunk.getNumRows();
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/ISimpleTransform.h>
#include <Storages/ConstraintsDescription.h>
#include <Interpreters/StorageID.h>
@ -12,28 +12,21 @@ namespace DB
* Otherwise just pass block to output unchanged.
*/
class CheckConstraintsBlockOutputStream : public IBlockOutputStream
class CheckConstraintsTransform final : public ISimpleTransform
{
public:
CheckConstraintsBlockOutputStream(
CheckConstraintsTransform(
const StorageID & table_,
const BlockOutputStreamPtr & output_,
const Block & header_,
const Block & header,
const ConstraintsDescription & constraints_,
ContextPtr context_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
String getName() const override { return "CheckConstraintsTransform"; }
void flush() override;
void writePrefix() override;
void writeSuffix() override;
void transform(Chunk & chunk) override;
private:
StorageID table_id;
BlockOutputStreamPtr output;
Block header;
const ConstraintsDescription constraints;
const ConstraintsExpressions expressions;
size_t rows_written = 0;

View File

@ -12,11 +12,9 @@ namespace ProfileEvents
namespace DB
{
void CountingBlockOutputStream::write(const Block & block)
void CountingTransform::transform(Chunk & chunk)
{
stream->write(block);
Progress local_progress(block.rows(), block.bytes(), 0);
Progress local_progress(chunk.getNumRows(), chunk.bytes(), 0);
progress.incrementPiecewiseAtomically(local_progress);
ProfileEvents::increment(ProfileEvents::InsertedRows, local_progress.read_rows);

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/ISimpleTransform.h>
#include <Interpreters/ProcessList.h>
@ -9,11 +9,12 @@ namespace DB
/// Proxy class which counts number of written block, rows, bytes
class CountingBlockOutputStream : public IBlockOutputStream
class CountingTransform final : public ISimpleTransform
{
public:
CountingBlockOutputStream(const BlockOutputStreamPtr & stream_)
: stream(stream_) {}
explicit CountingTransform(const Block & header) : ISimpleTransform(header, header, false) {}
String getName() const override { return "CountingTransform"; }
void setProgressCallback(const ProgressCallback & callback)
{
@ -30,17 +31,9 @@ public:
return progress;
}
Block getHeader() const override { return stream->getHeader(); }
void write(const Block & block) override;
void writePrefix() override { stream->writePrefix(); }
void writeSuffix() override { stream->writeSuffix(); }
void flush() override { stream->flush(); }
void onProgress(const Progress & current_progress) override { stream->onProgress(current_progress); }
String getContentType() const override { return stream->getContentType(); }
void transform(Chunk & chunk) override;
protected:
BlockOutputStreamPtr stream;
Progress progress;
ProgressCallback progress_callback;
QueryStatus * process_elem = nullptr;

View File

@ -1,10 +1,4 @@
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
@ -19,12 +13,9 @@
#include <Storages/StorageValues.h>
#include <Common/CurrentThread.h>
#include <Common/MemoryTracker.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/ThreadStatus.h>
#include <Common/checkStackSize.h>
#include <Common/setThreadName.h>
#include <common/logger_useful.h>
#include <common/scope_guard.h>
#include <atomic>
@ -128,9 +119,18 @@ class FinalizingViewsTransform final : public IProcessor
bool is_first = false;
};
static InputPorts initPorts(std::vector<Block> headers)
{
InputPorts res;
for (auto & header : headers)
res.emplace_back(std::move(header));
return res;
}
public:
FinalizingViewsTransform(const Block & header, ViewsDataPtr data)
: IProcessor(InputPorts(data->views.size(), header), {header})
FinalizingViewsTransform(std::vector<Block> headers, ViewsDataPtr data)
: IProcessor(initPorts(std::move(headers)), {Block()})
, output(outputs.front())
, views_data(std::move(data))
{
@ -224,53 +224,54 @@ private:
std::exception_ptr any_exception;
};
class ExceptionHandlingSink : public IProcessor
{
public:
explicit ExceptionHandlingSink(Block header)
: IProcessor({std::move(header)}, {})
, input(inputs.front())
{
}
//class ExceptionHandlingSink : public IProcessor
//{
//public:
// explicit ExceptionHandlingSink(Block header)
// : IProcessor({std::move(header)}, {})
// , input(inputs.front())
// {
// }
//
// Status prepare() override
// {
// while (!input.isFinished())
// {
// input.setNeeded();
// if (!input.hasData())
// return Status::NeedData;
//
// auto data = input.pullData();
// if (data.exception)
// exceptions.emplace_back(std::move(data.exception));
// }
//
// if (!exceptions.empty())
// return Status::Ready;
//
// return Status::Finished;
// }
//
// void work() override
// {
// auto exception = std::move(exceptions.at(0));
// exceptions.clear();
// std::rethrow_exception(std::move(exception));
// }
//
//private:
// InputPort & input;
// std::vector<std::exception_ptr> exceptions;
//};
Status prepare() override
{
while (!input.isFinished())
{
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
auto data = input.pullData();
if (data.exception)
exceptions.emplace_back(std::move(data.exception));
}
if (!exceptions.empty())
return Status::Ready;
return Status::Finished;
}
void work() override
{
auto exception = std::move(exceptions.at(0));
exceptions.clear();
std::rethrow_exception(std::move(exception));
}
private:
InputPort & input;
std::vector<std::exception_ptr> exceptions;
};
Drain buildPushingToViewsDrainImpl(
Chain buildPushingToViewsDrain(
const StoragePtr & storage,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const ASTPtr & query_ptr,
bool no_destination,
std::vector<TableLockHolder> & locks)
std::vector<TableLockHolder> & locks,
ExceptionKeepingTransformRuntimeDataPtr runtime_data)
{
checkStackSize();
@ -316,7 +317,7 @@ Drain buildPushingToViewsDrainImpl(
}
std::list<ViewRuntimeData> views;
std::vector<Drain> drains;
std::vector<Chain> chains;
for (const auto & database_table : dependencies)
{
@ -324,10 +325,38 @@ Drain buildPushingToViewsDrainImpl(
auto dependent_metadata_snapshot = dependent_table->getInMemoryMetadataPtr();
ASTPtr query;
Drain out;
Chain out;
QueryViewsLogElement::ViewType type = QueryViewsLogElement::ViewType::DEFAULT;
String target_name = database_table.getFullTableName();
/// If the materialized view is executed outside of a query, for example as a result of SYSTEM FLUSH LOGS or
/// SYSTEM FLUSH DISTRIBUTED ..., we can't attach to any thread group and we won't log, so there is no point on collecting metrics
std::unique_ptr<ThreadStatus> thread_status = nullptr;
ThreadGroupStatusPtr running_group = current_thread && current_thread->getThreadGroup()
? current_thread->getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
if (running_group)
{
/// We are creating a ThreadStatus per view to store its metrics individually
/// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls
/// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work,
/// and switch back to the original thread_status.
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
thread_status = std::make_unique<ThreadStatus>();
/// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one
/// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means
/// N times more interruptions
thread_status->disableProfiling();
thread_status->attachQuery(running_group);
}
auto view_runtime_data = std::make_shared<ExceptionKeepingTransformRuntimeData>(
std::move(thread_status),
database_table.getNameForLogs());
if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get()))
{
type = QueryViewsLogElement::ViewType::MATERIALIZED;
@ -359,7 +388,7 @@ Drain buildPushingToViewsDrainImpl(
insert->columns = std::move(list);
ASTPtr insert_query_ptr(insert.release());
InterpreterInsertQuery interpreter(insert_query_ptr, insert_context);
InterpreterInsertQuery interpreter(insert_query_ptr, insert_context, false, false, false, runtime_data);
BlockIO io = interpreter.execute();
out = std::move(io.out);
}
@ -367,36 +396,12 @@ Drain buildPushingToViewsDrainImpl(
{
type = QueryViewsLogElement::ViewType::LIVE;
query = live_view->getInnerQuery(); // Used only to log in system.query_views_log
out = buildPushingToViewsDrainImpl(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, locks);
out = buildPushingToViewsDrain(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, locks, view_runtime_data);
}
else
out = buildPushingToViewsDrainImpl(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), false, locks);
/// If the materialized view is executed outside of a query, for example as a result of SYSTEM FLUSH LOGS or
/// SYSTEM FLUSH DISTRIBUTED ..., we can't attach to any thread group and we won't log, so there is no point on collecting metrics
std::unique_ptr<ThreadStatus> thread_status = nullptr;
ThreadGroupStatusPtr running_group = current_thread && current_thread->getThreadGroup()
? current_thread->getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
if (running_group)
{
/// We are creating a ThreadStatus per view to store its metrics individually
/// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls
/// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work,
/// and switch back to the original thread_status.
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
thread_status = std::make_unique<ThreadStatus>();
/// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one
/// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means
/// N times more interruptions
thread_status->disableProfiling();
thread_status->attachQuery(running_group);
}
out = buildPushingToViewsDrain(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), false, locks, view_runtime_data);
QueryViewsLogElement::ViewRuntimeStats runtime_stats{
target_name,
@ -408,7 +413,7 @@ Drain buildPushingToViewsDrainImpl(
views.emplace_back(ViewRuntimeData{
std::move(query),
out.getHeader(),
out.getInputHeader(),
database_table,
dependent_table,
dependent_metadata_snapshot,
@ -417,9 +422,10 @@ Drain buildPushingToViewsDrainImpl(
std::move(runtime_stats)});
auto executing_inner_query = std::make_shared<ExecutingInnerQueryFromViewTransform>(storage_header, views.back());
out.addTransform(std::move(executing_inner_query));
executing_inner_query->setRuntimeData(view_runtime_data);
drains.emplace_back(std::move(out));
out.addSource(std::move(executing_inner_query));
chains.emplace_back(std::move(out));
/// Add the view to the query access info so it can appear in system.query_log
if (!no_destination)
@ -429,240 +435,46 @@ Drain buildPushingToViewsDrainImpl(
}
}
Chain result_chain;
size_t num_views = views.size();
if (num_views != 0)
{
std::vector<Block> headers;
headers.reserve(num_views);
for (const auto & chain : chains)
headers.push_back(chain.getOutputHeader());
auto views_data = std::make_shared<ViewsData>(std::move(views), context);
auto copying_data = std::make_shared<CopyingDataToViewsTransform>(storage_header, std::move(views_data));
auto it = copying_data->getOutputs().begin();
for (auto & drain : drains)
connect(*it, drain.getPort());
auto copying_data = std::make_shared<CopyingDataToViewsTransform>(storage_header, views_data);
auto finalizing_views = std::make_shared<FinalizingViewsTransform>(std::move(headers), views_data);
auto out = copying_data->getOutputs().begin();
auto in = finalizing_views->getInputs().begin();
std::list<ProcessorPtr> processors;
for (auto & chain : chains)
{
connect(*out, chain.getInputPort());
connect(chain.getOutputPort(), *in);
processors.splice(processors.end(), Chain::getProcessors(std::move(chain)));
}
processors.emplace_front(std::move(copying_data));
processors.emplace_back(std::move(finalizing_views));
result_chain = Chain(std::move(processors));
}
/// Do not push to destination table if the flag is set
if (!no_destination)
{
auto sink = storage->write(query_ptr, storage->getInMemoryMetadataPtr(), context);
sink->setRuntimeData(runtime_data);
metadata_snapshot->check(sink->getHeader().getColumnsWithTypeAndName());
}
}
Block PushingToViewsBlockOutputStream::getHeader() const
{
/// If we don't write directly to the destination
/// then expect that we're inserting with precalculated virtual columns
if (output)
return metadata_snapshot->getSampleBlock();
else
return metadata_snapshot->getSampleBlockWithVirtuals(storage->getVirtuals());
}
/// Auxiliary function to do the setup and teardown to run a view individually and collect its metrics inside the view ThreadStatus
void inline runViewStage(ViewRuntimeData & view, const std::string & action, std::function<void()> stage)
{
Stopwatch watch;
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
if (view.runtime_stats.thread_status)
{
/// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread
view.runtime_stats.thread_status->resetPerformanceCountersLastUsage();
current_thread = view.runtime_stats.thread_status.get();
result_chain.addSource(std::move(sink));
}
try
{
stage();
}
catch (Exception & ex)
{
ex.addMessage(action + " " + view.table_id.getNameForLogs());
view.setException(std::current_exception());
}
catch (...)
{
view.setException(std::current_exception());
}
if (view.runtime_stats.thread_status)
view.runtime_stats.thread_status->updatePerformanceCounters();
view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds();
}
void PushingToViewsBlockOutputStream::write(const Block & block)
{
/** Throw an exception if the sizes of arrays - elements of nested data structures doesn't match.
* We have to make this assertion before writing to table, because storage engine may assume that they have equal sizes.
* NOTE It'd better to do this check in serialization of nested structures (in place when this assumption is required),
* but currently we don't have methods for serialization of nested structures "as a whole".
*/
Nested::validateArraySizes(block);
if (auto * live_view = dynamic_cast<StorageLiveView *>(storage.get()))
{
StorageLiveView::writeIntoLiveView(*live_view, block, getContext());
}
else
{
if (output)
/// TODO: to support virtual and alias columns inside MVs, we should return here the inserted block extended
/// with additional columns directly from storage and pass it to MVs instead of raw block.
output->write(block);
}
if (views.empty())
return;
/// Don't process materialized views if this block is duplicate
const Settings & settings = getContext()->getSettingsRef();
if (!settings.deduplicate_blocks_in_dependent_materialized_views && replicated_output && replicated_output->lastBlockIsDuplicate())
return;
size_t max_threads = 1;
if (settings.parallel_view_processing)
max_threads = settings.max_threads ? std::min(static_cast<size_t>(settings.max_threads), views.size()) : views.size();
if (max_threads > 1)
{
ThreadPool pool(max_threads);
for (auto & view : views)
{
pool.scheduleOrThrowOnError([&] {
setThreadName("PushingToViews");
runViewStage(view, "while pushing to view", [&]() { process(block, view); });
});
}
pool.wait();
}
else
{
for (auto & view : views)
{
runViewStage(view, "while pushing to view", [&]() { process(block, view); });
}
}
}
void PushingToViewsBlockOutputStream::writePrefix()
{
if (output)
output->writePrefix();
for (auto & view : views)
{
runViewStage(view, "while writing prefix to view", [&] { view.out->writePrefix(); });
if (view.exception)
{
logQueryViews();
std::rethrow_exception(view.exception);
}
}
}
void PushingToViewsBlockOutputStream::writeSuffix()
{
if (output)
output->writeSuffix();
if (views.empty())
return;
auto process_suffix = [](ViewRuntimeData & view)
{
view.out->writeSuffix();
view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::QUERY_FINISH);
};
static std::string stage_step = "while writing suffix to view";
/// Run writeSuffix() for views in separate thread pool.
/// In could have been done in PushingToViewsBlockOutputStream::process, however
/// it is not good if insert into main table fail but into view succeed.
const Settings & settings = getContext()->getSettingsRef();
size_t max_threads = 1;
if (settings.parallel_view_processing)
max_threads = settings.max_threads ? std::min(static_cast<size_t>(settings.max_threads), views.size()) : views.size();
bool exception_happened = false;
if (max_threads > 1)
{
ThreadPool pool(max_threads);
std::atomic_uint8_t exception_count = 0;
for (auto & view : views)
{
if (view.exception)
{
exception_happened = true;
continue;
}
pool.scheduleOrThrowOnError([&] {
setThreadName("PushingToViews");
runViewStage(view, stage_step, [&] { process_suffix(view); });
if (view.exception)
{
exception_count.fetch_add(1, std::memory_order_relaxed);
}
else
{
LOG_TRACE(
log,
"Pushing (parallel {}) from {} to {} took {} ms.",
max_threads,
storage->getStorageID().getNameForLogs(),
view.table_id.getNameForLogs(),
view.runtime_stats.elapsed_ms);
}
});
}
pool.wait();
exception_happened |= exception_count.load(std::memory_order_relaxed) != 0;
}
else
{
for (auto & view : views)
{
if (view.exception)
{
exception_happened = true;
continue;
}
runViewStage(view, stage_step, [&] { process_suffix(view); });
if (view.exception)
{
exception_happened = true;
}
else
{
LOG_TRACE(
log,
"Pushing (sequentially) from {} to {} took {} ms.",
storage->getStorageID().getNameForLogs(),
view.table_id.getNameForLogs(),
view.runtime_stats.elapsed_ms);
}
}
}
if (exception_happened)
checkExceptionsInViews();
if (views.size() > 1)
{
UInt64 milliseconds = main_watch.elapsedMilliseconds();
LOG_DEBUG(log, "Pushing from {} to {} views took {} ms.", storage->getStorageID().getNameForLogs(), views.size(), milliseconds);
}
logQueryViews();
}
void PushingToViewsBlockOutputStream::flush()
{
if (output)
output->flush();
for (auto & view : views)
view.out->flush();
return result_chain;
}
static void process(Block & block, ViewRuntimeData & view)
@ -729,24 +541,9 @@ static void process(Block & block, ViewRuntimeData & view)
void ExecutingInnerQueryFromViewTransform::transform(Chunk & chunk)
{
runViewStage(view, "while pushing to view", [&]
{
auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns());
process(block, view);
chunk.setColumns(block.getColumns(), block.rows());
});
}
void PushingToViewsBlockOutputStream::checkExceptionsInViews()
{
for (auto & view : views)
{
if (view.exception)
{
logQueryViews();
std::rethrow_exception(view.exception);
}
}
auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns());
process(block, view);
chunk.setColumns(block.getColumns(), block.rows());
}
static void logQueryViews(std::list<ViewRuntimeData> & views, ContextPtr context)

View File

@ -3,10 +3,11 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Interpreters/QueryViewsLog.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage.h>
#include <Common/Stopwatch.h>
#include <Processors/Drain.h>
#include <Processors/Chain.h>
#include <Processors/ISimpleTransform.h>
#include <Storages/IStorage.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Common/Stopwatch.h>
namespace Poco
{
@ -18,6 +19,9 @@ namespace DB
class ReplicatedMergeTreeSink;
struct ExceptionKeepingTransformRuntimeData;
using ExceptionKeepingTransformRuntimeDataPtr = std::shared_ptr<ExceptionKeepingTransformRuntimeData>;
struct ViewRuntimeData
{
const ASTPtr query;
@ -41,49 +45,22 @@ struct ViewRuntimeData
/** Writes data to the specified table and to all dependent materialized views.
*/
class PushingToViewsBlockOutputStream : public IBlockOutputStream, WithContext
{
public:
PushingToViewsBlockOutputStream(
const StoragePtr & storage_,
const StorageMetadataPtr & metadata_snapshot_,
ContextPtr context_,
const ASTPtr & query_ptr_,
bool no_destination = false);
Chain buildPushingToViewsDrain(
const StoragePtr & storage,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const ASTPtr & query_ptr,
bool no_destination,
std::vector<TableLockHolder> & locks,
ExceptionKeepingTransformRuntimeDataPtr runtime_data);
Block getHeader() const override;
void write(const Block & block) override;
void flush() override;
void writePrefix() override;
void writeSuffix() override;
void onProgress(const Progress & progress) override;
private:
StoragePtr storage;
StorageMetadataPtr metadata_snapshot;
BlockOutputStreamPtr output;
ReplicatedMergeTreeSink * replicated_output = nullptr;
Poco::Logger * log;
ASTPtr query_ptr;
Stopwatch main_watch;
std::vector<ViewRuntimeData> views;
ContextMutablePtr select_context;
ContextMutablePtr insert_context;
void process(const Block & block, ViewRuntimeData & view);
void checkExceptionsInViews();
void logQueryViews();
};
class ExecutingInnerQueryFromViewTransform final : public ExceptionKeepingTransform
{
public:
ExecutingInnerQueryFromViewTransform(const Block & header, ViewRuntimeData & view_data)
: ExceptionKeepingTransform(header, view_data.sample_block)
, view(std::move(view_data))
, view(view_data)
{
}
@ -91,7 +68,6 @@ public:
protected:
void transform(Chunk & chunk) override;
void onFinish() override;
private:
ViewRuntimeData & view;

View File

@ -19,7 +19,9 @@
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageMaterializedView.h>
#include <TableFunctions/TableFunctionFactory.h>
@ -28,6 +30,7 @@
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/processColumnTransformers.h>
#include <Interpreters/addMissingDefaults.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/ColumnNullable.h>
@ -43,12 +46,14 @@ namespace ErrorCodes
}
InterpreterInsertQuery::InterpreterInsertQuery(
const ASTPtr & query_ptr_, ContextPtr context_, bool allow_materialized_, bool no_squash_, bool no_destination_)
const ASTPtr & query_ptr_, ContextPtr context_, bool allow_materialized_, bool no_squash_, bool no_destination_,
ExceptionKeepingTransformRuntimeDataPtr runtime_data_)
: WithContext(context_)
, query_ptr(query_ptr_)
, allow_materialized(allow_materialized_)
, no_squash(no_squash_)
, no_destination(no_destination_)
, runtime_data(runtime_data_)
{
checkStackSize();
}
@ -174,7 +179,7 @@ BlockIO InterpreterInsertQuery::execute()
}
}
BlockOutputStreams out_streams;
std::vector<Chain> out_chains;
if (!is_distributed_insert_select || query.watch)
{
size_t out_streams_size = 1;
@ -266,28 +271,45 @@ BlockIO InterpreterInsertQuery::execute()
for (size_t i = 0; i < out_streams_size; i++)
{
/// We create a pipeline of several streams, into which we will write data.
BlockOutputStreamPtr out;
Chain out;
/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.
/// Otherwise we'll get duplicates when MV reads same rows again from Kafka.
if (table->noPushingToViews() && !no_destination)
out = std::make_shared<PushingToSinkBlockOutputStream>(table->write(query_ptr, metadata_snapshot, getContext()));
{
auto sink = table->write(query_ptr, metadata_snapshot, getContext());
sink->setRuntimeData(runtime_data);
out.addSource(std::move(sink));
}
else
out = std::make_shared<PushingToViewsBlockOutputStream>(table, metadata_snapshot, getContext(), query_ptr, no_destination);
{
std::vector<TableLockHolder> locks;
out = buildPushingToViewsDrain(table, metadata_snapshot, getContext(), query_ptr, no_destination, locks, runtime_data);
for (auto & lock : locks)
res.pipeline.addTableLock(std::move(lock));
}
/// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order.
/// Checking constraints. It must be done after calculation of all defaults, so we can check them on calculated columns.
if (const auto & constraints = metadata_snapshot->getConstraints(); !constraints.empty())
out = std::make_shared<CheckConstraintsBlockOutputStream>(
query.table_id, out, out->getHeader(), metadata_snapshot->getConstraints(), getContext());
out.addSource(std::make_shared<CheckConstraintsTransform>(
query.table_id, out.getInputHeader(), metadata_snapshot->getConstraints(), getContext()));
bool null_as_default = query.select && getContext()->getSettingsRef().insert_null_as_default;
auto adding_missing_defaults_dag = addMissingDefaults(
query_sample_block,
out.getInputHeader().getNamesAndTypesList(),
metadata_snapshot->getColumns(),
getContext(),
null_as_default);
auto adding_missing_defaults_actions = std::make_shared<ExpressionActions>(adding_missing_defaults_dag);
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, metadata_snapshot->getColumns(), getContext(), null_as_default);
out.addSource(std::make_shared<ExpressionTransform>(query_sample_block, adding_missing_defaults_actions));
/// It's important to squash blocks as early as possible (before other transforms),
/// because other transforms may work inefficient if block size is small.
@ -298,16 +320,17 @@ BlockIO InterpreterInsertQuery::execute()
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();
out = std::make_shared<SquashingBlockOutputStream>(
out,
out->getHeader(),
out.addSource(std::make_shared<SquashingChunksTransform>(
out.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0);
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0));
}
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(getContext()->getProcessListElement());
out_streams.emplace_back(std::move(out_wrapper));
auto counting = std::make_shared<CountingTransform>(out.getInputHeader());
counting->setProcessListElement(getContext()->getProcessListElement());
out.addSource(std::move(counting));
out_chains.emplace_back(std::move(out));
}
}
@ -318,7 +341,7 @@ BlockIO InterpreterInsertQuery::execute()
}
else if (query.select || query.watch)
{
const auto & header = out_streams.at(0)->getHeader();
const auto & header = out_chains.at(0).getInputHeader();
auto actions_dag = ActionsDAG::makeConvertingActions(
res.pipeline.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
@ -330,15 +353,11 @@ BlockIO InterpreterInsertQuery::execute()
return std::make_shared<ExpressionTransform>(in_header, actions);
});
res.pipeline.setSinks([&](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr
res.pipeline.addChains(std::move(out_chains));
res.pipeline.setSinks([&](const Block & cur_header, QueryPipeline::StreamType) -> ProcessorPtr
{
if (type != QueryPipeline::StreamType::Main)
return nullptr;
auto stream = std::move(out_streams.back());
out_streams.pop_back();
return std::make_shared<SinkToOutputStream>(std::move(stream));
return std::make_shared<EmptySink>(cur_header);
});
if (!allow_materialized)
@ -353,13 +372,14 @@ BlockIO InterpreterInsertQuery::execute()
auto pipe = getSourceFromFromASTInsertQuery(query_ptr, nullptr, query_sample_block, getContext(), nullptr);
res.pipeline.init(std::move(pipe));
res.pipeline.resize(1);
res.pipeline.setSinks([&](const Block &, Pipe::StreamType)
res.pipeline.addChains(std::move(out_chains));
res.pipeline.setSinks([&](const Block & cur_header, Pipe::StreamType)
{
return std::make_shared<SinkToOutputStream>(out_streams.at(0));
return std::make_shared<EmptySink>(cur_header);
});
}
else
res.out = std::move(out_streams.at(0));
res.out = std::move(out_chains.at(0));
res.pipeline.addStorageHolder(table);
if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(table.get()))

View File

@ -9,6 +9,9 @@
namespace DB
{
struct ExceptionKeepingTransformRuntimeData;
using ExceptionKeepingTransformRuntimeDataPtr = std::shared_ptr<ExceptionKeepingTransformRuntimeData>;
/** Interprets the INSERT query.
*/
class InterpreterInsertQuery : public IInterpreter, WithContext
@ -19,7 +22,8 @@ public:
ContextPtr context_,
bool allow_materialized_ = false,
bool no_squash_ = false,
bool no_destination_ = false);
bool no_destination_ = false,
ExceptionKeepingTransformRuntimeDataPtr runtime_data = nullptr);
/** Prepare a request for execution. Return block streams
* - the stream into which you can write data to execute the query, if INSERT;
@ -40,6 +44,7 @@ private:
const bool allow_materialized;
const bool no_squash;
const bool no_destination;
ExceptionKeepingTransformRuntimeDataPtr runtime_data;
};

108
src/Processors/Chain.cpp Normal file
View File

@ -0,0 +1,108 @@
#include <IO/WriteHelpers.h>
#include <Processors/Chain.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static void checkSingleInput(const IProcessor & transform)
{
if (transform.getInputs().size() != 1)
throw Exception("Transform for chain should have single input, "
"but " + transform.getName() + " has " +
toString(transform.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (transform.getInputs().front().isConnected())
throw Exception("Transform for chain has connected input.", ErrorCodes::LOGICAL_ERROR);
}
static void checkSingleOutput(const IProcessor & transform)
{
if (transform.getOutputs().size() != 1)
throw Exception("Transform for chain should have single output, "
"but " + transform.getName() + " has " +
toString(transform.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
if (transform.getOutputs().front().isConnected())
throw Exception("Transform for chain has connected input.", ErrorCodes::LOGICAL_ERROR);
}
static void checkTransform(const IProcessor & transform)
{
checkSingleInput(transform);
checkSingleOutput(transform);
}
static void checkInitialized(const std::list<ProcessorPtr> & processors)
{
if (processors.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Drain is not initialized");
}
Chain::Chain(ProcessorPtr processor)
{
checkTransform(*processor);
processors.emplace_back(std::move(processor));
}
Chain::Chain(std::list<ProcessorPtr> processors_) : processors(std::move(processors_))
{
if (processors.empty())
return;
checkSingleInput(*processors.front());
checkSingleOutput(*processors.back());
for (const auto & processor : processors)
{
for (const auto & input : processor->getInputs())
if (&input != &getInputPort() && !input.isConnected())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot initialize chain because there is a not connected input for {}",
processor->getName());
for (const auto & output : processor->getOutputs())
if (&output != &getOutputPort() && !output.isConnected())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot initialize chain because there is a not connected output for {}",
processor->getName());
}
}
void Chain::addSource(ProcessorPtr processor)
{
checkTransform(*processor);
if (!processors.empty())
connect(processor->getOutputs().front(), getInputPort());
processors.emplace_front(std::move(processor));
}
void Chain::addSink(ProcessorPtr processor)
{
checkTransform(*processor);
if (!processors.empty())
connect(getOutputPort(), processor->getInputs().front());
processors.emplace_front(std::move(processor));
}
InputPort & Chain::getInputPort() const
{
checkInitialized(processors);
return processors.front()->getInputs().front();
}
OutputPort & Chain::getOutputPort() const
{
checkInitialized(processors);
return processors.back()->getOutputs().front();
}
}

41
src/Processors/Chain.h Normal file
View File

@ -0,0 +1,41 @@
#pragma once
#include <Processors/IProcessor.h>
namespace DB
{
class Chain
{
public:
Chain() = default;
Chain(Chain &&) = default;
Chain(const Chain &) = delete;
Chain & operator=(Chain &&) = default;
Chain & operator=(const Chain &) = delete;
explicit Chain(ProcessorPtr processor);
explicit Chain(std::list<ProcessorPtr> processors);
bool empty() const { return processors.empty(); }
void addSource(ProcessorPtr processor);
void addSink(ProcessorPtr processor);
InputPort & getInputPort() const;
OutputPort & getOutputPort() const;
const Block & getInputHeader() const { return getInputPort().getHeader(); }
const Block & getOutputHeader() const { return getOutputPort().getHeader(); }
static std::list<ProcessorPtr> getProcessors(Chain chain) { return std::move(chain.processors); }
private:
/// -> source -> transform -> ... -> transform -> sink ->
/// ^ -> -> -> -> ^
/// input port output port
std::list<ProcessorPtr> processors;
};
}

View File

@ -1,75 +0,0 @@
#include <Processors/Drain.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static void checkSink(const IProcessor & sink)
{
if (!sink.getOutputs().empty())
throw Exception("Sink for drain shouldn't have any output, but " + sink.getName() + " has " +
toString(sink.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
if (sink.getInputs().empty())
throw Exception("Sink for drain should have single input, but it doesn't have any",
ErrorCodes::LOGICAL_ERROR);
if (sink.getInputs().size() > 1)
throw Exception("Sink for drain should have single input, but " + sink.getName() + " has " +
toString(sink.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (sink.getInputs().front().isConnected())
throw Exception("Sink for drain has connected input.", ErrorCodes::LOGICAL_ERROR);
}
static void checkTransform(const IProcessor & transform)
{
if (transform.getInputs().size() != 1)
throw Exception("Transform for drain should have single input, "
"but " + transform.getName() + " has " +
toString(transform.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (transform.getOutputs().size() != 1)
throw Exception("Transform for drain should have single output, "
"but " + transform.getName() + " has " +
toString(transform.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
if (transform.getInputs().front().isConnected())
throw Exception("Transform for drain has connected input.", ErrorCodes::LOGICAL_ERROR);
if (transform.getOutputs().front().isConnected())
throw Exception("Transform for drain has connected input.", ErrorCodes::LOGICAL_ERROR);
}
void checkInitialized(const Processors & processors)
{
if (processors.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Drain is not initialized");
}
Drain::Drain(ProcessorPtr processor)
{
checkSink(*processor);
processors.emplace_back(std::move(processor));
}
void Drain::addTransform(ProcessorPtr processor)
{
checkInitialized(processors);
checkTransform(*processor);
connect(processor->getOutputs().front(), processors.back()->getInputs().front());
processors.emplace_back(std::move(processor));
}
InputPort & Drain::getPort() const
{
checkInitialized(processors);
return processors.back()->getInputs().front();
}
}

View File

@ -1,23 +0,0 @@
#pragma once
#include <Processors/IProcessor.h>
namespace DB
{
class Drain
{
public:
Drain() = default;
explicit Drain(ProcessorPtr processor);
void addTransform(ProcessorPtr processor);
InputPort & getPort() const;
const Block & getHeader() const { return getPort().getHeader(); }
private:
Processors processors;
};
}

View File

@ -667,6 +667,41 @@ void Pipe::addSimpleTransform(const ProcessorGetter & getter)
addSimpleTransform([&](const Block & stream_header, StreamType) { return getter(stream_header); });
}
void Pipe::addChains(std::vector<Chain> chains)
{
if (output_ports.size() != chains.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot add chains to Pipe because "
"number of output ports ({}) is not equal to the number of chains ({})",
output_ports.size(), chains.size());
dropTotals();
dropExtremes();
Block new_header;
for (size_t i = 0; i < output_ports.size(); ++i)
{
if (i == 0)
new_header = chains[i].getOutputHeader();
else
assertBlocksHaveEqualStructure(new_header, chains[i].getOutputHeader(), "QueryPipeline");
connect(*output_ports[i], chains[i].getInputPort());
output_ports[i] = &chains[i].getOutputPort();
auto added_processors = Chain::getProcessors(std::move(chains[i]));
for (auto & transform : added_processors)
{
if (collected_processors)
collected_processors->emplace_back(transform);
processors.emplace_back(std::move(transform));
}
}
header = std::move(new_header);
}
void Pipe::resize(size_t num_streams, bool force, bool strict)
{
if (output_ports.empty())

View File

@ -3,6 +3,7 @@
#include <Processors/IProcessor.h>
#include <Processors/QueryPlan/QueryIdHolder.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Chain.h>
#include <Access/EnabledQuota.h>
#include <DataStreams/SizeLimits.h>
#include <Storages/TableLockHolder.h>
@ -86,6 +87,9 @@ public:
void addSimpleTransform(const ProcessorGetter & getter);
void addSimpleTransform(const ProcessorGetterWithStreamKind & getter);
/// Add chain to every output port.
void addChains(std::vector<Chain> chains);
/// Changes the number of output ports if needed. Adds ResizeTransform.
void resize(size_t num_streams, bool force = false, bool strict = false);

View File

@ -103,6 +103,12 @@ void QueryPipeline::addTransform(ProcessorPtr transform, InputPort * totals, Inp
pipe.addTransform(std::move(transform), totals, extremes);
}
void QueryPipeline::addChains(std::vector<Chain> chains)
{
checkInitializedAndNotCompleted();
pipe.addChains(std::move(chains));
}
void QueryPipeline::transform(const Transformer & transformer)
{
checkInitializedAndNotCompleted();

View File

@ -56,6 +56,8 @@ public:
void addTransform(ProcessorPtr transform);
void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes);
void addChains(std::vector<Chain> chains);
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform pipeline in general way.
void transform(const Transformer & transformer);

View File

@ -6,6 +6,13 @@
namespace DB
{
ExceptionKeepingTransformRuntimeData::ExceptionKeepingTransformRuntimeData(
std::unique_ptr<ThreadStatus> thread_status_,
std::string additional_exception_message_)
: thread_status(std::move(thread_status_))
, additional_exception_message(std::move(additional_exception_message_))
{
}
ExceptionKeepingTransform::ExceptionKeepingTransform(const Block & in_header, const Block & out_header)
: IProcessor({in_header}, {out_header})
@ -68,7 +75,7 @@ IProcessor::Status ExceptionKeepingTransform::prepare()
return Status::Ready;
}
static std::exception_ptr runStep(std::function<void()> step, ExceptionKeepingTransform::RuntimeData * runtime_data)
static std::exception_ptr runStep(std::function<void()> step, ExceptionKeepingTransformRuntimeData * runtime_data)
{
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });

View File

@ -7,6 +7,20 @@ namespace DB
class ThreadStatus;
struct ExceptionKeepingTransformRuntimeData
{
std::unique_ptr<ThreadStatus> thread_status = nullptr;
UInt64 elapsed_ms = 0;
std::string additional_exception_message;
ExceptionKeepingTransformRuntimeData(
std::unique_ptr<ThreadStatus> thread_status_,
std::string additional_exception_message_);
};
using ExceptionKeepingTransformRuntimeDataPtr = std::shared_ptr<ExceptionKeepingTransformRuntimeData>;
/// Has one input and one output.
/// Works similarly to ISimpleTransform, but with much care about exceptions.
///
@ -48,19 +62,10 @@ public:
InputPort & getInputPort() { return input; }
OutputPort & getOutputPort() { return output; }
struct RuntimeData
{
std::unique_ptr<ThreadStatus> thread_status = nullptr;
UInt64 elapsed_ms = 0;
std::string additional_exception_message;
};
using RuntimeDataPtr = std::shared_ptr<RuntimeData>;
void setRuntimeData(RuntimeDataPtr runtime_data_) { runtime_data = std::move(runtime_data_); }
void setRuntimeData(ExceptionKeepingTransformRuntimeDataPtr runtime_data_) { runtime_data = std::move(runtime_data_); }
private:
RuntimeDataPtr runtime_data;
ExceptionKeepingTransformRuntimeDataPtr runtime_data;
};