Merge pull request #7574 from ClickHouse/processors-4

Fix tests for processors
This commit is contained in:
alexey-milovidov 2019-11-08 00:46:30 +03:00 committed by GitHub
commit a2e79cc427
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 183 additions and 61 deletions

View File

@ -146,6 +146,9 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
}
}
if (subquery.set)
subquery.set->finishInsert();
if (table_out)
table_out->writeSuffix();

View File

@ -12,7 +12,7 @@ namespace DB
class OneBlockInputStream : public IBlockInputStream
{
public:
explicit OneBlockInputStream(const Block & block_) : block(block_) {}
explicit OneBlockInputStream(Block block_) : block(std::move(block_)) { block.checkNumberOfRows(); }
String getName() const override { return "One"; }

View File

@ -1189,8 +1189,9 @@ bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) con
/// Check has column in (empty set).
String set_to_check;
for (auto & action : actions)
for (auto it = actions.rbegin(); it != actions.rend(); ++it)
{
auto & action = *it;
if (action.type == action.APPLY_FUNCTION && action.function_base)
{
auto name = action.function_base->getName();
@ -1199,6 +1200,7 @@ bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) con
&& action.argument_names.size() > 1)
{
set_to_check = action.argument_names[1];
break;
}
}
}
@ -1212,7 +1214,7 @@ bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) con
// Constant ColumnSet cannot be empty, so we only need to check non-constant ones.
if (auto * column_set = checkAndGetColumn<const ColumnSet>(action.added_column.get()))
{
if (column_set->getData()->getTotalRowCount() == 0)
if (column_set->getData()->isCreated() && column_set->getData()->getTotalRowCount() == 0)
return true;
}
}

View File

@ -249,6 +249,8 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
if (!set->insertFromBlock(block))
return;
}
set->finishInsert();
res.in->readSuffix();
prepared_sets[set_key] = std::move(set);

View File

@ -92,6 +92,7 @@
#include <Processors/Transforms/FinishSortingTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataStreams/materializeBlock.h>
#include <Processors/Pipe.h>
namespace DB
@ -1034,7 +1035,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (options.only_analyze)
{
if constexpr (pipeline_with_processors)
pipeline.init({std::make_shared<NullSource>(source_header)});
pipeline.init(Pipe(std::make_shared<NullSource>(source_header)));
else
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(source_header));
@ -1075,7 +1076,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (prepared_input)
{
if constexpr (pipeline_with_processors)
pipeline.init({std::make_shared<SourceFromInputStream>(prepared_input)});
pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(prepared_input)));
else
pipeline.streams.push_back(prepared_input);
}
@ -1401,7 +1402,7 @@ void InterpreterSelectQuery::executeFetchColumns(
auto istream = std::make_shared<OneBlockInputStream>(block_with_count);
if constexpr (pipeline_with_processors)
pipeline.init({std::make_shared<SourceFromInputStream>(istream)});
pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(istream)));
else
pipeline.streams.emplace_back(istream);
from_stage = QueryProcessingStage::WithMergeableState;
@ -1666,9 +1667,19 @@ void InterpreterSelectQuery::executeFetchColumns(
query_info.prewhere_info = prewhere_info;
query_info.sorting_info = sorting_info;
auto streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams);
BlockInputStreams streams;
Pipes pipes;
if (streams.empty())
/// Will work with pipes directly if storage support processors.
/// Code is temporarily copy-pasted while moving to new pipeline.
bool use_pipes = pipeline_with_processors && storage->supportProcessorsPipeline();
if (use_pipes)
pipes = storage->readWithProcessors(required_columns, query_info, context, processing_stage, max_block_size, max_streams);
else
streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams);
if (streams.empty() && !use_pipes)
{
streams = {std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns))};
@ -1691,9 +1702,36 @@ void InterpreterSelectQuery::executeFetchColumns(
}
}
/// Copy-paste from prev if.
if (pipes.empty() && use_pipes)
{
Pipe pipe(std::make_shared<NullSource>(storage->getSampleBlockForColumns(required_columns)));
if (query_info.prewhere_info)
{
pipe.addSimpleTransform(std::make_shared<FilterTransform>(
pipe.getHeader(),
prewhere_info->prewhere_actions,
prewhere_info->prewhere_column_name,
prewhere_info->remove_prewhere_column));
if (query_info.prewhere_info->remove_columns_actions)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), query_info.prewhere_info->remove_columns_actions));
}
pipes.emplace_back(std::move(pipe));
}
for (auto & stream : streams)
stream->addTableLock(table_lock);
if constexpr (pipeline_with_processors)
{
/// Table lock is stored inside pipeline here.
if (use_pipes)
pipeline.addTableLock(table_lock);
}
/// Set the limits and quota for reading data, the speed and time of the query.
{
IBlockInputStream::LocalLimits limits;
@ -1728,11 +1766,21 @@ void InterpreterSelectQuery::executeFetchColumns(
if (options.to_stage == QueryProcessingStage::Complete)
stream->setQuota(quota);
}
/// Copy-paste
for (auto & pipe : pipes)
{
if (!options.ignore_limits)
pipe.setLimits(limits);
if (options.to_stage == QueryProcessingStage::Complete)
pipe.setQuota(quota);
}
}
if constexpr (pipeline_with_processors)
{
if (streams.size() == 1)
if (streams.size() == 1 || pipes.size() == 1)
pipeline.setMaxThreads(streams.size());
/// Unify streams. They must have same headers.
@ -1744,9 +1792,8 @@ void InterpreterSelectQuery::executeFetchColumns(
if (first_header.columns() > 1 && first_header.has("_dummy"))
first_header.erase("_dummy");
for (size_t i = 0; i < streams.size(); ++i)
for (auto & stream : streams)
{
auto & stream = streams[i];
auto header = stream->getHeader();
auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
if (!blocksHaveEqualStructure(first_header, header))
@ -1754,12 +1801,6 @@ void InterpreterSelectQuery::executeFetchColumns(
}
}
Processors sources;
sources.reserve(streams.size());
/// Pin sources for merge tree tables.
bool pin_sources = dynamic_cast<const MergeTreeData *>(storage.get()) != nullptr;
for (auto & stream : streams)
{
bool force_add_agg_info = processing_stage == QueryProcessingStage::WithMergeableState;
@ -1768,13 +1809,18 @@ void InterpreterSelectQuery::executeFetchColumns(
if (processing_stage == QueryProcessingStage::Complete)
source->addTotalsPort();
if (pin_sources)
source->setStream(sources.size());
sources.emplace_back(std::move(source));
pipes.emplace_back(std::move(source));
}
pipeline.init(std::move(sources));
/// Pin sources for merge tree tables.
bool pin_sources = dynamic_cast<const MergeTreeData *>(storage.get()) != nullptr;
if (pin_sources)
{
for (size_t i = 0; i < pipes.size(); ++i)
pipes[i].pinSources(i);
}
pipeline.init(std::move(pipes));
}
else
pipeline.streams = std::move(streams);

View File

@ -14,6 +14,7 @@
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Pipe.h>
namespace DB
@ -236,7 +237,7 @@ QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
}
if (!has_main_pipeline)
main_pipeline.init({ std::make_shared<NullSource>(getSampleBlock()) });
main_pipeline.init(Pipe(std::make_shared<NullSource>(getSampleBlock())));
if (!pipelines.empty())
{

View File

@ -293,6 +293,7 @@ void Set::createFromAST(const DataTypes & types, ASTPtr node, const Context & co
Block block = header.cloneWithColumns(std::move(columns));
insertFromBlock(block);
finishInsert();
}

View File

@ -56,6 +56,10 @@ public:
/// Returns false, if some limit was exceeded and no need to insert more data.
bool insertFromBlock(const Block & block);
/// Call after all blocks were inserted. To get the information that set is already created.
void finishInsert() { is_created = true; }
bool isCreated() const { return is_created; }
/** For columns of 'block', check belonging of corresponding rows to the set.
* Return UInt8 column with the result.
@ -111,6 +115,9 @@ private:
/// Do we need to additionally store all elements of the set in explicit form for subsequent use for index.
bool fill_set_elements;
/// Check if set contains all the data.
bool is_created = false;
/// If in the left part columns contains the same types as the elements of the set.
void executeOrdinary(
const ColumnRawPtrs & key_columns,

View File

@ -44,8 +44,8 @@ static void checkSource(const IProcessor & source)
throw Exception("Source for pipe should have single output, but it doesn't have any",
ErrorCodes::LOGICAL_ERROR);
if (source.getOutputs().size() != 1)
throw Exception("Source for pipe should have single output, but " + source.getName() + " has " +
if (source.getOutputs().size() > 2)
throw Exception("Source for pipe should have single or two outputs, but " + source.getName() + " has " +
toString(source.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
@ -54,6 +54,10 @@ Pipe::Pipe(ProcessorPtr source)
{
checkSource(*source);
output_port = &source->getOutputs().front();
if (source->getOutputs().size() > 1)
totals = &source->getOutputs().back();
processors.emplace_back(std::move(source));
}
@ -84,4 +88,31 @@ void Pipe::addSimpleTransform(ProcessorPtr transform)
processors.emplace_back(std::move(transform));
}
void Pipe::setLimits(const ISourceWithProgress::LocalLimits & limits)
{
for (auto & processor : processors)
{
if (auto * source_with_progress = dynamic_cast<ISourceWithProgress *>(processor.get()))
source_with_progress->setLimits(limits);
}
}
void Pipe::setQuota(QuotaForIntervals & quota)
{
for (auto & processor : processors)
{
if (auto * source_with_progress = dynamic_cast<ISourceWithProgress *>(processor.get()))
source_with_progress->setQuota(quota);
}
}
void Pipe::pinSources(size_t executor_number)
{
for (auto & processor : processors)
{
if (auto * source = dynamic_cast<ISource *>(processor.get()))
source->setStream(executor_number);
}
}
}

View File

@ -1,4 +1,6 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/Sources/SourceWithProgress.h>
namespace DB
{
@ -6,6 +8,8 @@ namespace DB
class Pipe;
using Pipes = std::vector<Pipe>;
class QuotaForIntervals;
/// Pipe is a set of processors which represents the part of pipeline with single output.
/// All processors in pipe are connected. All ports are connected except the output one.
class Pipe
@ -33,9 +37,20 @@ public:
Processors detachProcessors() && { return std::move(processors); }
/// Specify quotas and limits for every ISourceWithProgress.
void setLimits(const SourceWithProgress::LocalLimits & limits);
void setQuota(QuotaForIntervals & quota);
/// Set information about preferred executor number for sources.
void pinSources(size_t executor_number);
void setTotalsPort(OutputPort * totals_) { totals = totals_; }
OutputPort * getTotalsPort() const { return totals; }
private:
Processors processors;
OutputPort * output_port = nullptr;
OutputPort * totals = nullptr;
};
}

View File

@ -14,7 +14,6 @@
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Common/typeid_cast.h>
@ -48,36 +47,41 @@ void QueryPipeline::checkSource(const ProcessorPtr & source, bool can_have_total
toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
void QueryPipeline::init(Processors sources)
void QueryPipeline::init(Pipe pipe)
{
Pipes pipes;
pipes.emplace_back(std::move(pipe));
init(std::move(pipes));
}
void QueryPipeline::init(Pipes pipes)
{
if (initialized())
throw Exception("Pipeline has already been initialized.", ErrorCodes::LOGICAL_ERROR);
if (sources.empty())
throw Exception("Can't initialize pipeline with empty source list.", ErrorCodes::LOGICAL_ERROR);
if (pipes.empty())
throw Exception("Can't initialize pipeline with empty pipes list.", ErrorCodes::LOGICAL_ERROR);
std::vector<OutputPort *> totals;
for (auto & source : sources)
for (auto & pipe : pipes)
{
checkSource(source, true);
auto & header = source->getOutputs().front().getHeader();
auto & header = pipe.getHeader();
if (current_header)
assertBlocksHaveEqualStructure(current_header, header, "QueryPipeline");
else
current_header = header;
if (source->getOutputs().size() > 1)
if (auto * totals_port = pipe.getTotalsPort())
{
assertBlocksHaveEqualStructure(current_header, source->getOutputs().back().getHeader(), "QueryPipeline");
totals.emplace_back(&source->getOutputs().back());
assertBlocksHaveEqualStructure(current_header, totals_port->getHeader(), "QueryPipeline");
totals.emplace_back(totals_port);
}
/// source->setStream(streams.size());
streams.emplace_back(&source->getOutputs().front());
processors.emplace_back(std::move(source));
streams.emplace_back(&pipe.getPort());
auto cur_processors = std::move(pipe).detachProcessors();
processors.insert(processors.end(), cur_processors.begin(), cur_processors.end());
}
if (!totals.empty())

View File

@ -1,6 +1,7 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Pipe.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
@ -11,7 +12,7 @@ namespace DB
class TableStructureReadLock;
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
using TableStructureReadLocks = std::vector<TableStructureReadLockPtr>;
using TableStructureReadLocks = std::vector<TableStructureReadLockHolder>;
class Context;
@ -22,8 +23,9 @@ class QueryPipeline
public:
QueryPipeline() = default;
/// Each source must have single output port and no inputs. All outputs must have same header.
void init(Processors sources);
/// All pipes must have same header.
void init(Pipes pipes);
void init(Pipe pipe); /// Simple init for single pipe
bool initialized() { return !processors.empty(); }
enum class StreamType
@ -72,7 +74,7 @@ public:
const Block & getHeader() const { return current_header; }
void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); }
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
/// For compatibility with IBlockInputStream.
void setProgressCallback(const ProgressCallback & callback);

View File

@ -115,8 +115,11 @@ Chunk SourceFromInputStream::generate()
if (auto totals_block = stream->getTotals())
{
totals.setColumns(totals_block.getColumns(), 1);
has_totals = true;
if (totals_block.rows() == 1) /// Sometimes we can get empty totals. Skip it.
{
totals.setColumns(totals_block.getColumns(), 1);
has_totals = true;
}
}
is_stream_finished = true;

View File

@ -141,6 +141,9 @@ void CreatingSetsTransform::work()
auto finishCurrentSubquery = [&]()
{
if (subquery.set)
subquery.set->finishInsert();
if (table_out)
table_out->writeSuffix();

View File

@ -64,7 +64,8 @@ FilterTransform::FilterTransform(
IProcessor::Status FilterTransform::prepare()
{
if (constant_filter_description.always_false)
if (constant_filter_description.always_false
|| expression->checkColumnIsAlwaysFalse(filter_column_name))
{
input.close();
output.finish();
@ -83,18 +84,6 @@ void FilterTransform::removeFilterIfNeed(Chunk & chunk)
void FilterTransform::transform(Chunk & chunk)
{
if (!initialized)
{
initialized = true;
/// Cannot check this in prepare. Because in prepare columns for set may be not created yet.
if (expression->checkColumnIsAlwaysFalse(filter_column_name))
{
stopReading();
chunk = Chunk(getOutputPort().getHeader().getColumns(), 0);
return;
}
}
size_t num_rows_before_filtration = chunk.getNumRows();
auto columns = chunk.detachColumns();

View File

@ -36,8 +36,6 @@ private:
/// Header after expression, but before removing filter column.
Block transformed_header;
bool initialized = false;
void removeFilterIfNeed(Chunk & chunk);
};

View File

@ -268,6 +268,8 @@ public:
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual bool supportProcessorsPipeline() const { return false; }
/** Writes the data to a table.
* Receives a description of the query, which can contain information about the data write method.
* Returns an object by which you can write data sequentially.

View File

@ -55,6 +55,7 @@ private:
HashJoinPtr join;
void insertBlock(const Block & block) override;
void finishInsert() override {}
size_t getSize() const override;
protected:

View File

@ -45,6 +45,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
std::optional<UInt64> totalRows() const override;
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;

View File

@ -97,6 +97,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
std::optional<UInt64> totalRows() const override;
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;

View File

@ -70,6 +70,7 @@ void SetOrJoinBlockOutputStream::write(const Block & block)
void SetOrJoinBlockOutputStream::writeSuffix()
{
table.finishInsert();
backup_stream.flush();
compressed_backup_buf.next();
backup_buf.next();
@ -123,6 +124,7 @@ StorageSet::StorageSet(
void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block); }
void StorageSet::finishInsert() { set->finishInsert(); }
size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
@ -180,8 +182,11 @@ void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
NativeBlockInputStream backup_stream(compressed_backup_buf, 0);
backup_stream.readPrefix();
while (Block block = backup_stream.read())
insertBlock(block);
finishInsert();
backup_stream.readSuffix();
/// TODO Add speed, compressed bytes, data volume in memory, compression ratio ... Generalize all statistics logging in project.

View File

@ -50,6 +50,8 @@ private:
/// Insert the block into the state.
virtual void insertBlock(const Block & block) = 0;
/// Call after all blocks were inserted.
virtual void finishInsert() = 0;
virtual size_t getSize() const = 0;
};
@ -75,6 +77,7 @@ private:
SetPtr set;
void insertBlock(const Block & block) override;
void finishInsert() override;
size_t getSize() const override;
protected:

View File

@ -1,6 +1,6 @@
SET max_bytes_before_external_group_by = 200000000;
SET max_memory_usage = 1000000000;
SET max_memory_usage = 1500000000;
SET max_threads = 12;
SELECT URL, uniq(SearchPhrase) AS u FROM test.hits GROUP BY URL ORDER BY u DESC, URL LIMIT 10;