Rewrite PushingToViewsBIS part 1.

This commit is contained in:
Nikolai Kochetov 2021-08-13 19:32:29 +03:00
parent ad00aaa18c
commit 6b1030c9b8
5 changed files with 129 additions and 28 deletions

View File

@ -145,7 +145,6 @@ protected:
Poco::Logger * log = nullptr;
friend class CurrentThread;
friend class PushingToViewsBlockOutputStream;
/// Use ptr not to add extra dependencies in the header
std::unique_ptr<RUsageCounters> last_rusage;
@ -188,6 +187,11 @@ public:
return query_context.lock();
}
void disableProfiling()
{
query_profiled_enabled = false;
}
/// Starts new query and create new thread group for it, current thread becomes master thread of the query
void initializeQuery();

View File

@ -30,17 +30,13 @@
namespace DB
{
PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
const StoragePtr & storage_,
const StorageMetadataPtr & metadata_snapshot_,
ContextPtr context_,
const ASTPtr & query_ptr_,
bool no_destination)
: WithContext(context_)
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, log(&Poco::Logger::get("PushingToViewsBlockOutputStream"))
, query_ptr(query_ptr_)
Drain buildPushingToViewsDrainImpl(
const StoragePtr & storage,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const ASTPtr & query_ptr,
bool no_destination,
std::vector<TableLockHolder> & locks)
{
checkStackSize();
@ -48,19 +44,20 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
* but it's clear that here is not the best place for this functionality.
*/
addTableLock(
storage->lockForShare(getContext()->getInitialQueryId(), getContext()->getSettingsRef().lock_acquire_timeout));
locks.emplace_back(storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout));
/// If the "root" table deduplicates blocks, there are no need to make deduplication for children
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks
bool disable_deduplication_for_children = false;
if (!getContext()->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views)
if (!context->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views)
disable_deduplication_for_children = !no_destination && storage->supportsDeduplication();
auto table_id = storage->getStorageID();
Dependencies dependencies = DatabaseCatalog::instance().getDependencies(table_id);
/// We need special context for materialized views insertions
ContextMutablePtr select_context;
ContextMutablePtr insert_context;
if (!dependencies.empty())
{
select_context = Context::createCopy(context);
@ -79,21 +76,22 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
insert_context->setSetting("min_insert_block_size_bytes", insert_settings.min_insert_block_size_bytes_for_materialized_views.value);
}
std::vector<ViewRuntimeData> views;
for (const auto & database_table : dependencies)
{
auto dependent_table = DatabaseCatalog::instance().getTable(database_table, getContext());
auto dependent_table = DatabaseCatalog::instance().getTable(database_table, context);
auto dependent_metadata_snapshot = dependent_table->getInMemoryMetadataPtr();
ASTPtr query;
BlockOutputStreamPtr out;
Drain out;
QueryViewsLogElement::ViewType type = QueryViewsLogElement::ViewType::DEFAULT;
String target_name = database_table.getFullTableName();
if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get()))
{
type = QueryViewsLogElement::ViewType::MATERIALIZED;
addTableLock(
materialized_view->lockForShare(getContext()->getInitialQueryId(), getContext()->getSettingsRef().lock_acquire_timeout));
locks.emplace_back(materialized_view->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout));
StoragePtr inner_table = materialized_view->getTargetTable();
auto inner_table_id = inner_table->getStorageID();
@ -129,12 +127,12 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
{
type = QueryViewsLogElement::ViewType::LIVE;
query = live_view->getInnerQuery(); // Used only to log in system.query_views_log
out = std::make_shared<PushingToViewsBlockOutputStream>(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true);
out = buildPushingToViewsDrainImpl(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, locks);
}
else
out = std::make_shared<PushingToViewsBlockOutputStream>(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr());
out = buildPushingToViewsDrainImpl(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), false, locks);
/// If the materialized view is executed outside of a query, for example as a result of SYSTEM FLUSH LOGS or
/// SYSTEM FLUSH DISTRIBUTED ..., we can't attach to any thread group and we won't log, so there is no point on collecting metrics
@ -142,7 +140,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
ThreadGroupStatusPtr running_group = current_thread && current_thread->getThreadGroup()
? current_thread->getThreadGroup()
: MainThreadStatus::getInstance().thread_group;
: MainThreadStatus::getInstance().getThreadGroup();
if (running_group)
{
/// We are creating a ThreadStatus per view to store its metrics individually
@ -156,8 +154,8 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
/// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one
/// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means
/// N times more interruptions
thread_status->query_profiled_enabled = false;
thread_status->setupState(running_group);
thread_status->disableProfiling();
thread_status->attachQuery(running_group);
}
QueryViewsLogElement::ViewRuntimeStats runtime_stats{
@ -173,7 +171,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
/// Add the view to the query access info so it can appear in system.query_log
if (!no_destination)
{
getContext()->getQueryContext()->addQueryAccessInfo(
context->getQueryContext()->addQueryAccessInfo(
backQuoteIfNeed(database_table.getDatabaseName()), target_name, {}, "", database_table.getFullTableName());
}
}

View File

@ -5,6 +5,7 @@
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage.h>
#include <Common/Stopwatch.h>
#include <Processors/Drain.h>
namespace Poco
{
@ -20,7 +21,7 @@ struct ViewRuntimeData
{
const ASTPtr query;
StorageID table_id;
BlockOutputStreamPtr out;
Drain out;
std::exception_ptr exception;
QueryViewsLogElement::ViewRuntimeStats runtime_stats;

75
src/Processors/Drain.cpp Normal file
View File

@ -0,0 +1,75 @@
#include <Processors/Drain.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static void checkSink(const IProcessor & sink)
{
if (!sink.getOutputs().empty())
throw Exception("Sink for drain shouldn't have any output, but " + sink.getName() + " has " +
toString(sink.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
if (sink.getInputs().empty())
throw Exception("Sink for drain should have single input, but it doesn't have any",
ErrorCodes::LOGICAL_ERROR);
if (sink.getInputs().size() > 1)
throw Exception("Sink for drain should have single input, but " + sink.getName() + " has " +
toString(sink.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (sink.getInputs().front().isConnected())
throw Exception("Sink for drain has connected input.", ErrorCodes::LOGICAL_ERROR);
}
static void checkTransform(const IProcessor & transform)
{
if (transform.getInputs().size() != 1)
throw Exception("Transform for drain should have single input, "
"but " + transform.getName() + " has " +
toString(transform.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (transform.getOutputs().size() != 1)
throw Exception("Transform for drain should have single output, "
"but " + transform.getName() + " has " +
toString(transform.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
if (transform.getInputs().front().isConnected())
throw Exception("Transform for drain has connected input.", ErrorCodes::LOGICAL_ERROR);
if (transform.getOutputs().front().isConnected())
throw Exception("Transform for drain has connected input.", ErrorCodes::LOGICAL_ERROR);
}
void checkInitialized(const Processors & processors)
{
if (processors.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Drain is not initialized");
}
Drain::Drain(ProcessorPtr processor)
{
checkSink(*processor);
processors.emplace_back(std::move(processor));
}
void Drain::addTransform(ProcessorPtr processor)
{
checkInitialized(processors);
checkTransform(*processor);
connect(processor->getOutputs().front(), processors.back()->getInputs().front());
processors.emplace_back(std::move(processor));
}
InputPort & Drain::getPort() const
{
checkInitialized(processors);
return processors.back()->getInputs().front();
}
}

23
src/Processors/Drain.h Normal file
View File

@ -0,0 +1,23 @@
#pragma once
#include <Processors/IProcessor.h>
namespace DB
{
class Drain
{
public:
Drain() = default;
explicit Drain(ProcessorPtr processor);
void addTransform(ProcessorPtr processor);
InputPort & getPort() const;
const Block & getHeader() const { return getPort().getHeader(); }
private:
Processors processors;
};
}