Fix other tests.

This commit is contained in:
Nikolai Kochetov 2021-09-08 21:29:38 +03:00
parent f34cb097d8
commit 999a4fe831
26 changed files with 141 additions and 64 deletions

View File

@ -1,5 +1,6 @@
#include <DataStreams/CountingBlockOutputStream.h> #include <DataStreams/CountingBlockOutputStream.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <iostream>
namespace ProfileEvents namespace ProfileEvents
@ -17,6 +18,8 @@ void CountingTransform::transform(Chunk & chunk)
Progress local_progress(chunk.getNumRows(), chunk.bytes(), 0); Progress local_progress(chunk.getNumRows(), chunk.bytes(), 0);
progress.incrementPiecewiseAtomically(local_progress); progress.incrementPiecewiseAtomically(local_progress);
std::cerr << "============ counting adding progress for " << static_cast<const void *>(thread_status) << ' ' << chunk.getNumRows() << " rows\n";
if (thread_status) if (thread_status)
{ {
thread_status->performance_counters.increment(ProfileEvents::InsertedRows, local_progress.read_rows); thread_status->performance_counters.increment(ProfileEvents::InsertedRows, local_progress.read_rows);

View File

@ -173,6 +173,7 @@ public:
if (input.hasData()) if (input.hasData())
{ {
auto data = input.pullData(); auto data = input.pullData();
//std::cerr << "********** FinalizingViewsTransform got input " << i << " has exc " << bool(data.exception) << std::endl;
if (data.exception) if (data.exception)
{ {
if (views_data->has_exception && views_data->first_exception == data.exception) if (views_data->has_exception && views_data->first_exception == data.exception)
@ -197,6 +198,9 @@ public:
if (!statuses.empty()) if (!statuses.empty())
return Status::Ready; return Status::Ready;
if (any_exception)
output.pushException(std::move(any_exception));
output.finish(); output.finish();
return Status::Finished; return Status::Finished;
} }
@ -217,6 +221,7 @@ public:
if (!any_exception) if (!any_exception)
any_exception = status.exception; any_exception = status.exception;
//std::cerr << "=== Setting exception for " << view.table_id.getFullNameNotQuoted() << std::endl;
view.setException(std::move(status.exception)); view.setException(std::move(status.exception));
} }
else else
@ -265,10 +270,10 @@ Chain buildPushingToViewsDrain(
ContextPtr context, ContextPtr context,
const ASTPtr & query_ptr, const ASTPtr & query_ptr,
bool no_destination, bool no_destination,
std::vector<TableLockHolder> & locks,
ExceptionKeepingTransformRuntimeDataPtr runtime_data) ExceptionKeepingTransformRuntimeDataPtr runtime_data)
{ {
checkStackSize(); checkStackSize();
Chain result_chain;
/// If we don't write directly to the destination /// If we don't write directly to the destination
/// then expect that we're inserting with precalculated virtual columns /// then expect that we're inserting with precalculated virtual columns
@ -279,7 +284,7 @@ Chain buildPushingToViewsDrain(
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream, * Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
* but it's clear that here is not the best place for this functionality. * but it's clear that here is not the best place for this functionality.
*/ */
locks.emplace_back(storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout)); result_chain.addTableLock(storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout));
/// If the "root" table deduplicates blocks, there are no need to make deduplication for children /// If the "root" table deduplicates blocks, there are no need to make deduplication for children
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks /// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks
@ -355,7 +360,7 @@ Chain buildPushingToViewsDrain(
if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get())) if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get()))
{ {
type = QueryViewsLogElement::ViewType::MATERIALIZED; type = QueryViewsLogElement::ViewType::MATERIALIZED;
locks.emplace_back(materialized_view->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout)); result_chain.addTableLock(materialized_view->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout));
StoragePtr inner_table = materialized_view->getTargetTable(); StoragePtr inner_table = materialized_view->getTargetTable();
auto inner_table_id = inner_table->getStorageID(); auto inner_table_id = inner_table->getStorageID();
@ -383,7 +388,7 @@ Chain buildPushingToViewsDrain(
insert->columns = std::move(list); insert->columns = std::move(list);
ASTPtr insert_query_ptr(insert.release()); ASTPtr insert_query_ptr(insert.release());
InterpreterInsertQuery interpreter(insert_query_ptr, insert_context, false, false, false, runtime_data); InterpreterInsertQuery interpreter(insert_query_ptr, insert_context, false, false, false, view_runtime_data);
BlockIO io = interpreter.execute(); BlockIO io = interpreter.execute();
out = std::move(io.out); out = std::move(io.out);
} }
@ -392,11 +397,11 @@ Chain buildPushingToViewsDrain(
type = QueryViewsLogElement::ViewType::LIVE; type = QueryViewsLogElement::ViewType::LIVE;
query = live_view->getInnerQuery(); // Used only to log in system.query_views_log query = live_view->getInnerQuery(); // Used only to log in system.query_views_log
out = buildPushingToViewsDrain( out = buildPushingToViewsDrain(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, locks, view_runtime_data); dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, view_runtime_data);
} }
else else
out = buildPushingToViewsDrain( out = buildPushingToViewsDrain(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), false, locks, view_runtime_data); dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), false, view_runtime_data);
QueryViewsLogElement::ViewRuntimeStats runtime_stats{ QueryViewsLogElement::ViewRuntimeStats runtime_stats{
target_name, target_name,
@ -435,8 +440,6 @@ Chain buildPushingToViewsDrain(
} }
} }
Chain result_chain;
size_t num_views = views_data->views.size(); size_t num_views = views_data->views.size();
if (num_views != 0) if (num_views != 0)
{ {
@ -454,6 +457,7 @@ Chain buildPushingToViewsDrain(
for (auto & chain : chains) for (auto & chain : chains)
{ {
result_chain.attachResourcesFrom(chain);
connect(*out, chain.getInputPort()); connect(*out, chain.getInputPort());
connect(chain.getOutputPort(), *in); connect(chain.getOutputPort(), *in);
++in; ++in;
@ -581,6 +585,8 @@ static void logQueryViews(std::list<ViewRuntimeData> & views, ContextPtr context
try try
{ {
//std::cerr << "============ Logging for " << static_cast<const void *>(view.runtime_stats.thread_status.get()) << ' ' << view.table_id.getNameForLogs() << "\n";
if (view.runtime_stats.thread_status) if (view.runtime_stats.thread_status)
view.runtime_stats.thread_status->logToQueryViewsLog(view); view.runtime_stats.thread_status->logToQueryViewsLog(view);
} }

View File

@ -51,7 +51,6 @@ Chain buildPushingToViewsDrain(
ContextPtr context, ContextPtr context,
const ASTPtr & query_ptr, const ASTPtr & query_ptr,
bool no_destination, bool no_destination,
std::vector<TableLockHolder> & locks,
ExceptionKeepingTransformRuntimeDataPtr runtime_data); ExceptionKeepingTransformRuntimeDataPtr runtime_data);

View File

@ -26,6 +26,8 @@
#include <Parsers/ASTSubquery.h> #include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h> #include <Parsers/ASTTablesInSelectQuery.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h> #include <Interpreters/ExpressionActions.h>
#include <Interpreters/misc.h> #include <Interpreters/misc.h>

View File

@ -56,6 +56,7 @@
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h> #include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Parsers/formatAST.h> #include <Parsers/formatAST.h>
namespace DB namespace DB

View File

@ -51,7 +51,7 @@ InterpreterInsertQuery::InterpreterInsertQuery(
, allow_materialized(allow_materialized_) , allow_materialized(allow_materialized_)
, no_squash(no_squash_) , no_squash(no_squash_)
, no_destination(no_destination_) , no_destination(no_destination_)
, runtime_data(runtime_data_) , runtime_data(std::move(runtime_data_))
{ {
checkStackSize(); checkStackSize();
} }
@ -281,10 +281,7 @@ BlockIO InterpreterInsertQuery::execute()
} }
else else
{ {
std::vector<TableLockHolder> locks; out = buildPushingToViewsDrain(table, metadata_snapshot, getContext(), query_ptr, no_destination, runtime_data);
out = buildPushingToViewsDrain(table, metadata_snapshot, getContext(), query_ptr, no_destination, locks, runtime_data);
for (auto & lock : locks)
res.pipeline.addTableLock(std::move(lock));
} }
/// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order. /// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order.

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Processors/IProcessor.h> #include <Processors/IProcessor.h>
#include <Processors/PipelineResourcesHolder.h>
namespace DB namespace DB
{ {
@ -35,11 +36,16 @@ public:
const std::list<ProcessorPtr> & getProcessors() const { return processors; } const std::list<ProcessorPtr> & getProcessors() const { return processors; }
static std::list<ProcessorPtr> getProcessors(Chain chain) { return std::move(chain.processors); } static std::list<ProcessorPtr> getProcessors(Chain chain) { return std::move(chain.processors); }
void addTableLock(TableLockHolder lock) { holder.table_locks.emplace_back(std::move(lock)); }
void attachResourcesFrom(Chain & other) { holder = std::move(other.holder); }
PipelineResourcesHolder detachResources() { return std::move(holder); }
private: private:
/// -> source -> transform -> ... -> transform -> sink -> /// -> source -> transform -> ... -> transform -> sink ->
/// ^ -> -> -> -> ^ /// ^ -> -> -> -> ^
/// input port output port /// input port output port
std::list<ProcessorPtr> processors; std::list<ProcessorPtr> processors;
PipelineResourcesHolder holder;
}; };
} }

View File

@ -9,6 +9,7 @@
#include <Processors/Transforms/ExtremesTransform.h> #include <Processors/Transforms/ExtremesTransform.h>
#include <Processors/Formats/IOutputFormat.h> #include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/NullSource.h> #include <Processors/Sources/NullSource.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Columns/ColumnConst.h> #include <Columns/ColumnConst.h>
namespace DB namespace DB
@ -98,18 +99,9 @@ static OutputPort * uniteTotals(const OutputPortRawPtrs & ports, const Block & h
return totals_port; return totals_port;
} }
Pipe::Holder & Pipe::Holder::operator=(Holder && rhs) void Pipe::addQueryPlan(std::unique_ptr<QueryPlan> plan)
{ {
table_locks.insert(table_locks.end(), rhs.table_locks.begin(), rhs.table_locks.end()); holder.query_plans.emplace_back(std::move(plan));
storage_holders.insert(storage_holders.end(), rhs.storage_holders.begin(), rhs.storage_holders.end());
interpreter_context.insert(interpreter_context.end(),
rhs.interpreter_context.begin(), rhs.interpreter_context.end());
for (auto & plan : rhs.query_plans)
query_plans.emplace_back(std::move(plan));
query_id_holder = std::move(rhs.query_id_holder);
return *this;
} }
Pipe::Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, OutputPort * extremes) Pipe::Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, OutputPort * extremes)
@ -689,6 +681,7 @@ void Pipe::addChains(std::vector<Chain> chains)
connect(*output_ports[i], chains[i].getInputPort()); connect(*output_ports[i], chains[i].getInputPort());
output_ports[i] = &chains[i].getOutputPort(); output_ports[i] = &chains[i].getOutputPort();
holder = chains[i].detachResources();
auto added_processors = Chain::getProcessors(std::move(chains[i])); auto added_processors = Chain::getProcessors(std::move(chains[i]));
for (auto & transform : added_processors) for (auto & transform : added_processors)
{ {

View File

@ -1,12 +1,10 @@
#pragma once #pragma once
#include <Processors/IProcessor.h> #include <Processors/IProcessor.h>
#include <Processors/QueryPlan/QueryIdHolder.h> #include <Processors/PipelineResourcesHolder.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Chain.h> #include <Processors/Chain.h>
#include <Access/EnabledQuota.h> #include <Access/EnabledQuota.h>
#include <DataStreams/SizeLimits.h> #include <DataStreams/SizeLimits.h>
#include <Storages/TableLockHolder.h>
namespace DB namespace DB
{ {
@ -18,9 +16,6 @@ using Pipes = std::vector<Pipe>;
class QueryPipeline; class QueryPipeline;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
using OutputPortRawPtrs = std::vector<OutputPort *>; using OutputPortRawPtrs = std::vector<OutputPort *>;
/// Pipe is a set of processors which represents the part of pipeline. /// Pipe is a set of processors which represents the part of pipeline.
@ -118,29 +113,11 @@ public:
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); } void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }
void addQueryIdHolder(std::shared_ptr<QueryIdHolder> query_id_holder) { holder.query_id_holder = std::move(query_id_holder); } void addQueryIdHolder(std::shared_ptr<QueryIdHolder> query_id_holder) { holder.query_id_holder = std::move(query_id_holder); }
/// For queries with nested interpreters (i.e. StorageDistributed) /// For queries with nested interpreters (i.e. StorageDistributed)
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { holder.query_plans.emplace_back(std::move(plan)); } void addQueryPlan(std::unique_ptr<QueryPlan> plan);
private: private:
/// Destruction order: processors, header, locks, temporary storages, local contexts /// Destruction order: processors, header, locks, temporary storages, local contexts
PipelineResourcesHolder holder;
struct Holder
{
Holder() = default;
Holder(Holder &&) = default;
/// Custom mode assignment does not destroy data from lhs. It appends data from rhs to lhs.
Holder& operator=(Holder &&);
/// Some processors may implicitly use Context or temporary Storage created by Interpreter.
/// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here,
/// because QueryPipeline is alive until query is finished.
std::vector<std::shared_ptr<const Context>> interpreter_context;
std::vector<StoragePtr> storage_holders;
std::vector<TableLockHolder> table_locks;
std::vector<std::unique_ptr<QueryPlan>> query_plans;
std::shared_ptr<QueryIdHolder> query_id_holder;
};
Holder holder;
/// Header is common for all output below. /// Header is common for all output below.
Block header; Block header;

View File

@ -0,0 +1,25 @@
#include <Processors/PipelineResourcesHolder.h>
#include <Processors/QueryPlan/QueryPlan.h>
namespace DB
{
PipelineResourcesHolder::PipelineResourcesHolder() = default;
PipelineResourcesHolder::PipelineResourcesHolder(PipelineResourcesHolder &&) = default;
PipelineResourcesHolder::~PipelineResourcesHolder() = default;
PipelineResourcesHolder & PipelineResourcesHolder::operator=(PipelineResourcesHolder && rhs)
{
table_locks.insert(table_locks.end(), rhs.table_locks.begin(), rhs.table_locks.end());
storage_holders.insert(storage_holders.end(), rhs.storage_holders.begin(), rhs.storage_holders.end());
interpreter_context.insert(interpreter_context.end(),
rhs.interpreter_context.begin(), rhs.interpreter_context.end());
for (auto & plan : rhs.query_plans)
query_plans.emplace_back(std::move(plan));
query_id_holder = std::move(rhs.query_id_holder);
return *this;
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <Storages/TableLockHolder.h>
#include <Processors/QueryPlan/QueryIdHolder.h>
namespace DB
{
class QueryPipeline;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
class QueryPlan;
class Context;
struct PipelineResourcesHolder
{
PipelineResourcesHolder();
PipelineResourcesHolder(PipelineResourcesHolder &&);
~PipelineResourcesHolder();
/// Custom mode assignment does not destroy data from lhs. It appends data from rhs to lhs.
PipelineResourcesHolder& operator=(PipelineResourcesHolder &&);
/// Some processors may implicitly use Context or temporary Storage created by Interpreter.
/// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here,
/// because QueryPipeline is alive until query is finished.
std::vector<std::shared_ptr<const Context>> interpreter_context;
std::vector<StoragePtr> storage_holders;
std::vector<TableLockHolder> table_locks;
std::vector<std::unique_ptr<QueryPlan>> query_plans;
std::shared_ptr<QueryIdHolder> query_id_holder;
};
}

View File

@ -21,6 +21,7 @@
#include <Processors/DelayedPortsProcessor.h> #include <Processors/DelayedPortsProcessor.h>
#include <Processors/RowsBeforeLimitCounter.h> #include <Processors/RowsBeforeLimitCounter.h>
#include <Processors/Sources/RemoteSource.h> #include <Processors/Sources/RemoteSource.h>
#include <Processors/QueryPlan/QueryPlan.h>
namespace DB namespace DB
{ {
@ -29,6 +30,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
void QueryPipeline::addQueryPlan(std::unique_ptr<QueryPlan> plan)
{
pipe.addQueryPlan(std::move(plan));
}
void QueryPipeline::checkInitialized() void QueryPipeline::checkInitialized()
{ {
if (!initialized()) if (!initialized())

View File

@ -123,7 +123,7 @@ public:
void addTableLock(TableLockHolder lock) { pipe.addTableLock(std::move(lock)); } void addTableLock(TableLockHolder lock) { pipe.addTableLock(std::move(lock)); }
void addInterpreterContext(std::shared_ptr<const Context> context) { pipe.addInterpreterContext(std::move(context)); } void addInterpreterContext(std::shared_ptr<const Context> context) { pipe.addInterpreterContext(std::move(context)); }
void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); } void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); }
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { pipe.addQueryPlan(std::move(plan)); } void addQueryPlan(std::unique_ptr<QueryPlan> plan);
void setLimits(const StreamLocalLimits & limits) { pipe.setLimits(limits); } void setLimits(const StreamLocalLimits & limits) { pipe.setLimits(limits); }
void setLeafLimits(const SizeLimits & limits) { pipe.setLeafLimits(limits); } void setLeafLimits(const SizeLimits & limits) { pipe.setLeafLimits(limits); }
void setQuota(const std::shared_ptr<const EnabledQuota> & quota) { pipe.setQuota(quota); } void setQuota(const std::shared_ptr<const EnabledQuota> & quota) { pipe.setQuota(quota); }

View File

@ -1,4 +1,5 @@
#include <Processors/QueryPlan/CreatingSetsStep.h> #include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPipeline.h> #include <Processors/QueryPipeline.h>
#include <Processors/Transforms/CreatingSetsTransform.h> #include <Processors/Transforms/CreatingSetsTransform.h>
#include <IO/Operators.h> #include <IO/Operators.h>

View File

@ -1,4 +1,5 @@
#include <Processors/QueryPlan/ReadFromRemote.h> #include <Processors/QueryPlan/ReadFromRemote.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ExpressionStep.h> #include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h> #include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <DataStreams/RemoteQueryExecutor.h> #include <DataStreams/RemoteQueryExecutor.h>

View File

@ -98,6 +98,7 @@ static std::exception_ptr runStep(std::function<void()> step, ExceptionKeepingTr
} }
catch (Exception & exception) catch (Exception & exception)
{ {
// std::cerr << "===== got exception " << getExceptionMessage(exception, false);
if (runtime_data && !runtime_data->additional_exception_message.empty()) if (runtime_data && !runtime_data->additional_exception_message.empty())
exception.addMessage(runtime_data->additional_exception_message); exception.addMessage(runtime_data->additional_exception_message);
@ -105,6 +106,7 @@ static std::exception_ptr runStep(std::function<void()> step, ExceptionKeepingTr
} }
catch (...) catch (...)
{ {
// std::cerr << "===== got exception " << getExceptionMessage(std::current_exception(), false);
res = std::current_exception(); res = std::current_exception();
} }
@ -118,9 +120,10 @@ static std::exception_ptr runStep(std::function<void()> step, ExceptionKeepingTr
return res; return res;
} }
\
void ExceptionKeepingTransform::work() void ExceptionKeepingTransform::work()
{ {
// std::cerr << "============ Executing " << getName() << std::endl;
if (!was_on_start_called) if (!was_on_start_called)
{ {
was_on_start_called = true; was_on_start_called = true;
@ -138,6 +141,8 @@ void ExceptionKeepingTransform::work()
if (auto exception = runStep([this] { transform(data.chunk); }, runtime_data.get())) if (auto exception = runStep([this] { transform(data.chunk); }, runtime_data.get()))
{ {
// std::cerr << "===== got exception in " << getName() << std::endl;
// std::cerr << getExceptionMessage(exception, true) << std::endl;
has_exception = true; has_exception = true;
data.chunk.clear(); data.chunk.clear();
data.exception = std::move(exception); data.exception = std::move(exception);

View File

@ -37,7 +37,7 @@ using ExceptionKeepingTransformRuntimeDataPtr = std::shared_ptr<ExceptionKeeping
/// In case of exception, it is additionally pushed into pipeline. /// In case of exception, it is additionally pushed into pipeline.
class ExceptionKeepingTransform : public IProcessor class ExceptionKeepingTransform : public IProcessor
{ {
private: protected:
InputPort & input; InputPort & input;
OutputPort & output; OutputPort & output;
Port::Data data; Port::Data data;
@ -48,7 +48,7 @@ private:
bool was_on_start_called = false; bool was_on_start_called = false;
bool was_on_finish_called = false; bool was_on_finish_called = false;
protected: //protected:
virtual void transform(Chunk & chunk) = 0; virtual void transform(Chunk & chunk) = 0;
virtual void onStart() {} virtual void onStart() {}
virtual void onFinish() {} virtual void onFinish() {}

View File

@ -6,23 +6,33 @@ namespace DB
SquashingChunksTransform::SquashingChunksTransform( SquashingChunksTransform::SquashingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, bool reserve_memory) const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, bool reserve_memory)
: IAccumulatingTransform(header, header) : ExceptionKeepingTransform(header, header)
, squashing(min_block_size_rows, min_block_size_bytes, reserve_memory) , squashing(min_block_size_rows, min_block_size_bytes, reserve_memory)
{ {
} }
void SquashingChunksTransform::consume(Chunk chunk) void SquashingChunksTransform::transform(Chunk & chunk)
{ {
if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()))) if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())))
{ {
setReadyChunk(Chunk(block.getColumns(), block.rows())); chunk.setColumns(block.getColumns(), block.rows());
} }
} }
Chunk SquashingChunksTransform::generate() void SquashingChunksTransform::onFinish()
{ {
auto block = squashing.add({}); auto block = squashing.add({});
return Chunk(block.getColumns(), block.rows()); finish_chunk.setColumns(block.getColumns(), block.rows());
}
void SquashingChunksTransform::work()
{
ExceptionKeepingTransform::work();
if (finish_chunk)
{
data.chunk = std::move(finish_chunk);
ready_output = true;
}
} }
} }

View File

@ -1,11 +1,11 @@
#pragma once #pragma once
#include <Processors/IAccumulatingTransform.h> #include <Processors/Sinks/SinkToStorage.h>
#include <DataStreams/SquashingTransform.h> #include <DataStreams/SquashingTransform.h>
namespace DB namespace DB
{ {
class SquashingChunksTransform : public IAccumulatingTransform class SquashingChunksTransform : public ExceptionKeepingTransform
{ {
public: public:
explicit SquashingChunksTransform( explicit SquashingChunksTransform(
@ -13,12 +13,16 @@ public:
String getName() const override { return "SquashingTransform"; } String getName() const override { return "SquashingTransform"; }
void work() override;
protected: protected:
void consume(Chunk chunk) override; void transform(Chunk & chunk) override;
Chunk generate() override; void onFinish() override;
private: private:
SquashingTransform squashing; SquashingTransform squashing;
Chunk finish_chunk;
}; };
} }

View File

@ -11,6 +11,7 @@
#include <Parsers/ASTSetQuery.h> #include <Parsers/ASTSetQuery.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h> #include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Storages/AlterCommands.h> #include <Storages/AlterCommands.h>

View File

@ -26,6 +26,7 @@
#include <Processors/Transforms/ExpressionTransform.h> #include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Sources/SourceFromInputStream.h> #include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/SinkToStorage.h> #include <Processors/Sinks/SinkToStorage.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h> #include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h> #include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/UnionStep.h> #include <Processors/QueryPlan/UnionStep.h>

View File

@ -53,6 +53,7 @@
#include <Interpreters/getTableExpressions.h> #include <Interpreters/getTableExpressions.h>
#include <Functions/IFunction.h> #include <Functions/IFunction.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h> #include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h> #include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h> #include <Processors/QueryPlan/ReadFromPreparedSource.h>

View File

@ -22,6 +22,7 @@
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/checkStackSize.h> #include <Common/checkStackSize.h>
#include <Processors/Sources/SourceFromInputStream.h> #include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h> #include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/ExpressionStep.h> #include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h> #include <Processors/QueryPlan/BuildQueryPipelineSettings.h>

View File

@ -24,6 +24,7 @@
#include <Storages/MergeTree/MergeList.h> #include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/checkDataPart.h> #include <Storages/MergeTree/checkDataPart.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h> #include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h> #include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <optional> #include <optional>

View File

@ -42,6 +42,7 @@
#include <Parsers/ASTCheckQuery.h> #include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTSetQuery.h> #include <Parsers/ASTSetQuery.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h> #include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h> #include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>

View File

@ -16,6 +16,7 @@
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/Transforms/MaterializingTransform.h> #include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ExpressionStep.h> #include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h> #include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h> #include <Processors/QueryPlan/BuildQueryPipelineSettings.h>