version with processors

This commit is contained in:
chertus 2020-01-20 17:17:55 +03:00
parent d9835979eb
commit 0669eff031
7 changed files with 149 additions and 9 deletions

View File

@ -44,7 +44,7 @@ Block ExpressionBlockInputStream::readImpl()
return res;
}
Block SplittingExpressionBlockInputStream::readImpl()
Block InflatingExpressionBlockInputStream::readImpl()
{
if (!initialized)
{

View File

@ -35,10 +35,10 @@ private:
};
/// ExpressionBlockInputStream that could generate many out blocks for single input block.
class SplittingExpressionBlockInputStream : public ExpressionBlockInputStream
class InflatingExpressionBlockInputStream : public ExpressionBlockInputStream
{
public:
SplittingExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_)
InflatingExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_)
: ExpressionBlockInputStream(input, expression_)
{}

View File

@ -75,6 +75,7 @@
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/InflatingExpressionTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/MergingAggregatedTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
@ -1104,7 +1105,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType type)
{
bool on_totals = type == QueryPipeline::StreamType::Totals;
return std::make_shared<ExpressionTransform>(header, expressions.before_join, on_totals, default_totals);
return std::make_shared<InflatingExpressionTransform>(header, expressions.before_join, on_totals, default_totals);
});
}
else
@ -1112,7 +1113,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
header_before_join = pipeline.firstStream()->getHeader();
/// Applies to all sources except stream_with_non_joined_data.
for (auto & stream : pipeline.streams)
stream = std::make_shared<SplittingExpressionBlockInputStream>(stream, expressions.before_join);
stream = std::make_shared<InflatingExpressionBlockInputStream>(stream, expressions.before_join);
if (isMergeJoin(expressions.before_join->getTableJoinAlgo()) && settings.partial_merge_join_optimizations)
{

View File

@ -0,0 +1,102 @@
#include <Processors/Transforms/InflatingExpressionTransform.h>
#include <Interpreters/ExpressionAnalyzer.h>
namespace DB
{
static Block transformHeader(Block header, const ExpressionActionsPtr & expression)
{
expression->execute(header, true);
return header;
}
InflatingExpressionTransform::InflatingExpressionTransform(Block input_header, ExpressionActionsPtr expression_,
bool on_totals_, bool default_totals_)
: ISimpleTransform(input_header, transformHeader(input_header, expression_), on_totals_)
, expression(std::move(expression_))
, on_totals(on_totals_)
, default_totals(default_totals_)
{}
void InflatingExpressionTransform::work()
{
if (current_data.exception)
return;
try
{
transform(current_data.chunk);
}
catch (DB::Exception &)
{
current_data.exception = std::current_exception();
transformed = true;
has_input = false;
return;
}
/// The only change from ISimpleTransform::work()
if (!not_processed)
has_input = false;
if (!skip_empty_chunks || current_data.chunk)
transformed = true;
if (transformed && !current_data.chunk)
/// Support invariant that chunks must have the same number of columns as header.
current_data.chunk = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0);
}
void InflatingExpressionTransform::transform(Chunk & chunk)
{
if (!initialized)
{
initialized = true;
if (expression->resultIsAlwaysEmpty())
{
stopReading();
chunk = Chunk(getOutputPort().getHeader().getColumns(), 0);
return;
}
}
Block block;
if (on_totals)
{
if (default_totals && !expression->hasTotalsInJoin())
return;
block = readExecOnTotals(chunk);
}
else
block = readExec(chunk);
auto num_rows = block.rows();
chunk.setColumns(block.getColumns(), num_rows);
}
Block InflatingExpressionTransform::readExec(Chunk & chunk)
{
Block res;
if (likely(!not_processed))
{
res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
if (res)
expression->execute(res, not_processed, action_number);
}
else
{
res = std::move(not_processed->block);
expression->execute(res, not_processed, action_number);
}
return res;
}
Block InflatingExpressionTransform::readExecOnTotals(Chunk & chunk)
{
Block block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
expression->executeOnTotals(block);
return block;
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <Processors/ISimpleTransform.h>
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class InflatingExpressionTransform : public ISimpleTransform
{
public:
InflatingExpressionTransform(Block input_header, ExpressionActionsPtr expression_,
bool on_totals_ = false, bool default_totals_ = false);
String getName() const override { return "InflatingExpressionTransform"; }
void work() override;
protected:
void transform(Chunk & chunk) override;
private:
ExpressionActionsPtr expression;
bool on_totals;
bool default_totals;
bool initialized = false;
Chunk src_chunk;
ExtraBlockPtr not_processed;
size_t action_number = 0;
Block readExec(Chunk & chunk);
Block readExecOnTotals(Chunk & chunk);
};
}

View File

@ -3,7 +3,7 @@ defaults
10000000
10000000 10
errors
max_joined_block_size = 2000
max_joined_block_size_rows = 2000
10000000
10000000 10000
10000000 10

View File

@ -19,7 +19,7 @@ SELECT count(1), uniqExact(n) FROM (
USING k);
SELECT 'errors';
SET max_joined_block_size = 0;
SET max_joined_block_size_rows = 0;
SELECT count(1) FROM (
SELECT materialize(1) as k, n FROM numbers(10)
@ -31,8 +31,8 @@ SELECT count(1) FROM (
JOIN (SELECT materialize(1) AS k, number n FROM numbers(10000)) j
USING k); -- { serverError 241 }
SELECT 'max_joined_block_size = 2000';
SET max_joined_block_size = 2000;
SELECT 'max_joined_block_size_rows = 2000';
SET max_joined_block_size_rows = 2000;
SELECT count(1) FROM (
SELECT materialize(1) as k, n FROM numbers(10)