Merge pull request #11243 from ClickHouse/remove-experimental-use-processors-flag-4

Remove some code.
This commit is contained in:
alexey-milovidov 2020-05-30 21:05:14 +03:00 committed by GitHub
commit 3eea042d16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 175 additions and 1141 deletions

View File

@ -1293,7 +1293,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
local_context.setSettings(task_cluster->settings_pull);
local_context.setSetting("skip_unavailable_shards", true);
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_select_ast, local_context)->execute().in);
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_select_ast, local_context)->execute().getInputStream());
count = (block) ? block.safeGetByPosition(0).column->getUInt(0) : 0;
}
@ -1403,7 +1403,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute();
BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute();
input = io_select.in;
input = io_select.getInputStream();
output = io_insert.out;
}
@ -1690,7 +1690,7 @@ std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti
Context local_context = context;
local_context.setSettings(task_cluster->settings_pull);
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().in);
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().getInputStream());
std::set<String> res;
if (block)
@ -1735,7 +1735,7 @@ const auto & settings = context.getSettingsRef();
Context local_context = context;
local_context.setSettings(task_cluster->settings_pull);
return InterpreterFactory::get(query_ast, local_context)->execute().in->read().rows() != 0;
return InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows() != 0;
}
bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTimeouts & timeouts,
@ -1774,7 +1774,7 @@ bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTi
Context local_context = context;
local_context.setSettings(task_cluster->settings_pull);
auto result = InterpreterFactory::get(query_ast, local_context)->execute().in->read().rows();
auto result = InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows();
if (result != 0)
LOG_DEBUG(log, "Partition {} piece number {} is PRESENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
else

View File

@ -13,6 +13,7 @@ limitations under the License. */
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Transforms/AggregatingTransform.h>
namespace DB
@ -38,7 +39,12 @@ protected:
Block res = *it;
++it;
return Chunk(res.getColumns(), res.rows());
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = res.info.bucket_num;
info->is_overflows = res.info.is_overflows;
return Chunk(res.getColumns(), res.rows(), std::move(info));
}
private:

View File

@ -274,7 +274,7 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
StorageValues::create(
storage->getStorageID(), storage->getColumns(), block, storage->getVirtuals()));
select.emplace(view.query, local_context, SelectQueryOptions());
in = std::make_shared<MaterializingBlockInputStream>(select->execute().in);
in = std::make_shared<MaterializingBlockInputStream>(select->execute().getInputStream());
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY

View File

@ -706,7 +706,7 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
{
auto interpreter = interpretSubquery(right_in_operand, data.context, data.subquery_depth, {});
subquery_for_set.source = std::make_shared<LazyBlockInputStream>(
interpreter->getSampleBlock(), [interpreter]() mutable { return interpreter->execute().in; });
interpreter->getSampleBlock(), [interpreter]() mutable { return interpreter->execute().getInputStream(); });
/** Why is LazyBlockInputStream used?
*

View File

@ -15,6 +15,7 @@
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
namespace ProfileEvents
{
@ -70,35 +71,14 @@ SelectStreamFactory::SelectStreamFactory(
namespace
{
Pipe createLocalStream(
const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage,
bool add_totals_port, bool add_extremes_port, bool force_tree_shaped_pipeline)
QueryPipeline createLocalStream(
const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage)
{
checkStackSize();
InterpreterSelectQuery interpreter{query_ast, context, SelectQueryOptions(processed_stage)};
if (force_tree_shaped_pipeline)
{
/// This flag means that pipeline must be tree-shaped,
/// so we can't enable processors for InterpreterSelectQuery here.
auto stream = interpreter.execute().in;
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
if (add_totals_port)
source->addTotalsPort();
if (add_extremes_port)
source->addExtremesPort();
Pipe pipe(std::move(source));
pipe.addSimpleTransform(std::make_shared<ConvertingTransform>(
pipe.getHeader(), header, ConvertingTransform::MatchColumnsMode::Name));
return pipe;
}
auto pipeline = interpreter.executeWithProcessors();
auto pipeline = interpreter.execute().pipeline;
pipeline.addSimpleTransform([&](const Block & source_header)
{
@ -116,7 +96,8 @@ Pipe createLocalStream(
*/
/// return std::make_shared<MaterializingBlockInputStream>(stream);
return std::move(pipeline).getPipe();
pipeline.setMaxThreads(1);
return pipeline;
}
String formattedAST(const ASTPtr & ast)
@ -134,7 +115,7 @@ void SelectStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info,
const String &, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
const SelectQueryInfo & query_info,
const SelectQueryInfo &,
Pipes & res)
{
bool force_add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
@ -152,8 +133,7 @@ void SelectStreamFactory::createForShard(
auto emplace_local_stream = [&]()
{
res.emplace_back(createLocalStream(modified_query_ast, header, context, processed_stage,
add_totals_port, add_extremes_port, query_info.force_tree_shaped_pipeline));
res.emplace_back(createLocalStream(modified_query_ast, header, context, processed_stage).getPipe());
};
String modified_query = formattedAST(modified_query_ast);
@ -266,7 +246,7 @@ void SelectStreamFactory::createForShard(
auto lazily_create_stream = [
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, context, throttler,
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
stage = processed_stage, local_delay, add_totals_port, add_extremes_port]()
stage = processed_stage, local_delay]()
-> BlockInputStreamPtr
{
auto current_settings = context.getSettingsRef();
@ -297,8 +277,8 @@ void SelectStreamFactory::createForShard(
}
if (try_results.empty() || local_delay < max_remote_delay)
return std::make_shared<TreeExecutorBlockInputStream>(
createLocalStream(modified_query_ast, header, context, stage, add_totals_port, add_extremes_port, true));
return std::make_shared<PipelineExecutingBlockInputStream>(
createLocalStream(modified_query_ast, header, context, stage));
else
{
std::vector<IConnectionPool::Entry> connections;

View File

@ -111,11 +111,11 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
}
else
{
BlockIO res = interpreter.execute();
auto stream = interpreter.execute().getInputStream();
try
{
block = res.in->read();
block = stream->read();
if (!block)
{
@ -126,7 +126,7 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
return;
}
if (block.rows() != 1 || res.in->read())
if (block.rows() != 1 || stream->read())
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
}
catch (const Exception & e)

View File

@ -297,13 +297,13 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
}
auto interpreter_subquery = interpretSubquery(subquery_or_table_name, context, {}, query_options);
BlockIO res = interpreter_subquery->execute();
auto stream = interpreter_subquery->execute().getInputStream();
SetPtr set = std::make_shared<Set>(settings.size_limits_for_set, true, context.getSettingsRef().transform_null_in);
set->setHeader(res.in->getHeader());
set->setHeader(stream->getHeader());
res.in->readPrefix();
while (Block block = res.in->read())
stream->readPrefix();
while (Block block = stream->read())
{
/// If the limits have been exceeded, give up and let the default subquery processing actions take place.
if (!set->insertFromBlock(block))
@ -311,7 +311,7 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
}
set->finishInsert();
res.in->readSuffix();
stream->readSuffix();
prepared_sets[set_key] = std::move(set);
}

View File

@ -134,7 +134,7 @@ public:
ast = database_and_table_name;
external_tables[external_table_name] = external_storage_holder;
subqueries_for_sets[external_table_name].source = interpreter->execute().in;
subqueries_for_sets[external_table_name].source = interpreter->execute().getInputStream();
subqueries_for_sets[external_table_name].table = external_storage;
/** NOTE If it was written IN tmp_table - the existing temporary (but not external) table,

View File

@ -2,14 +2,8 @@
#include <DataStreams/BlockIO.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/** Interpreters interface for different queries.
*/
@ -22,14 +16,10 @@ public:
*/
virtual BlockIO execute() = 0;
virtual QueryPipeline executeWithProcessors() { throw Exception("executeWithProcessors not implemented", ErrorCodes::NOT_IMPLEMENTED); }
virtual bool canExecuteWithProcessors() const { return false; }
virtual bool ignoreQuota() const { return false; }
virtual bool ignoreLimits() const { return false; }
virtual ~IInterpreter() {}
virtual ~IInterpreter() = default;
};
}

View File

@ -204,7 +204,7 @@ BlockIO InterpreterInsertQuery::execute()
{
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{ query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res.pipeline = interpreter_select.executeWithProcessors();
res = interpreter_select.execute();
if (table->supportsParallelInsert() && settings.max_insert_threads > 1)
out_streams_size = std::min(size_t(settings.max_insert_threads), res.pipeline.getNumStreams());

File diff suppressed because it is too large Load Diff

View File

@ -77,12 +77,6 @@ public:
/// Execute a query. Get the stream of blocks to read.
BlockIO execute() override;
/// Execute the query and return multuple streams for parallel processing.
BlockInputStreams executeWithMultipleStreams(QueryPipeline & parent_pipeline);
QueryPipeline executeWithProcessors() override;
bool canExecuteWithProcessors() const override { return true; }
bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; }
@ -108,89 +102,15 @@ private:
Block getSampleBlockImpl();
struct Pipeline
{
/** Streams of data.
* The source data streams are produced in the executeFetchColumns function.
* Then they are converted (wrapped in other streams) using the `execute*` functions,
* to get the whole pipeline running the query.
*/
BlockInputStreams streams;
/** When executing FULL or RIGHT JOIN, there will be a data stream from which you can read "not joined" rows.
* It has a special meaning, since reading from it should be done after reading from the main streams.
* It is appended to the main streams in UnionBlockInputStream or ParallelAggregatingBlockInputStream.
*/
BlockInputStreamPtr stream_with_non_joined_data;
bool union_stream = false;
/// Cache value of InterpreterSelectQuery::max_streams
size_t max_threads = 1;
BlockInputStreamPtr & firstStream() { return streams.at(0); }
template <typename Transform>
void transform(Transform && transformation)
{
for (auto & stream : streams)
transformation(stream);
if (stream_with_non_joined_data)
transformation(stream_with_non_joined_data);
}
bool hasMoreThanOneStream() const
{
return streams.size() + (stream_with_non_joined_data ? 1 : 0) > 1;
}
/// Resulting stream is mix of other streams data. Distinct and/or order guaranties are broken.
bool hasMixedStreams() const
{
return hasMoreThanOneStream() || union_stream;
}
bool hasDelayedStream() const { return stream_with_non_joined_data != nullptr; }
bool initialized() const { return !streams.empty(); }
/// Compatibility with QueryPipeline (Processors)
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
size_t getNumThreads() const { return max_threads; }
};
template <typename TPipeline>
void executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe, QueryPipeline & save_context_and_storage);
void executeImpl(QueryPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe);
/// Different stages of query execution.
/// dry_run - don't read from table, use empty header block instead.
void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run);
template <typename TPipeline>
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
void executeFetchColumns(
QueryProcessingStage::Enum processing_stage,
QueryPipeline & pipeline,
const PrewhereInfoPtr & prewhere_info,
const Names & columns_to_remove_after_prewhere,
QueryPipeline & save_context_and_storage);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression);
static void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(Pipeline & pipeline, InputSortingInfoPtr sorting_info);
void executeWithFill(Pipeline & pipeline);
void executeMergeSorted(Pipeline & pipeline);
void executePreLimit(Pipeline & pipeline);
void executeUnion(Pipeline & pipeline, Block header);
void executeLimitBy(Pipeline & pipeline);
void executeLimit(Pipeline & pipeline);
void executeOffset(Pipeline & pipeline);
static void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeDistinct(Pipeline & pipeline, bool before_order, Names columns);
void executeExtremes(Pipeline & pipeline);
void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, const std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit);
const Names & columns_to_remove_after_prewhere);
void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
@ -213,17 +133,12 @@ private:
String generateFilterActions(ExpressionActionsPtr & actions, const ASTPtr & row_policy_filter, const Names & prerequisite_columns = {}) const;
/// Add ConvertingBlockInputStream to specified header.
static void unifyStreams(Pipeline & pipeline, Block header);
enum class Modificator
{
ROLLUP = 0,
CUBE = 1
};
void executeRollupOrCube(Pipeline & pipeline, Modificator modificator);
void executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator);
/** If there is a SETTINGS section in the SELECT query, then apply settings from it.

View File

@ -3,15 +3,9 @@
#include <Interpreters/Context.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <Columns/getLeastSuperColumn.h>
#include <Columns/ColumnConst.h>
#include <Common/typeid_cast.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTExpressionList.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPipeline.h>
@ -180,69 +174,10 @@ Block InterpreterSelectWithUnionQuery::getSampleBlock(
}
BlockInputStreams InterpreterSelectWithUnionQuery::executeWithMultipleStreams(QueryPipeline & parent_pipeline)
{
BlockInputStreams nested_streams;
for (auto & interpreter : nested_interpreters)
{
BlockInputStreams streams = interpreter->executeWithMultipleStreams(parent_pipeline);
nested_streams.insert(nested_streams.end(), streams.begin(), streams.end());
}
/// Unify data structure.
if (nested_interpreters.size() > 1)
{
for (auto & stream : nested_streams)
stream = std::make_shared<ConvertingBlockInputStream>(stream, result_header,ConvertingBlockInputStream::MatchColumnsMode::Position);
parent_pipeline.addInterpreterContext(context);
}
/// Update max_streams due to:
/// - max_distributed_connections for Distributed() engine
/// - max_streams_to_max_threads_ratio
///
/// XXX: res.pipeline.getMaxThreads() cannot be used since it is capped to
/// number of streams, which is empty for non-Processors case.
max_streams = (*std::min_element(nested_interpreters.begin(), nested_interpreters.end(), [](const auto &a, const auto &b)
{
return a->getMaxStreams() < b->getMaxStreams();
}))->getMaxStreams();
return nested_streams;
}
BlockIO InterpreterSelectWithUnionQuery::execute()
{
BlockIO res;
BlockInputStreams nested_streams = executeWithMultipleStreams(res.pipeline);
BlockInputStreamPtr result_stream;
if (nested_streams.empty())
{
result_stream = std::make_shared<NullBlockInputStream>(getSampleBlock());
}
else if (nested_streams.size() == 1)
{
result_stream = nested_streams.front();
nested_streams.clear();
}
else
{
result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr, max_streams);
nested_streams.clear();
}
res.in = result_stream;
res.pipeline.addInterpreterContext(context);
return res;
}
QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
{
QueryPipeline main_pipeline;
QueryPipeline & main_pipeline = res.pipeline;
std::vector<QueryPipeline> pipelines;
bool has_main_pipeline = false;
@ -254,12 +189,12 @@ QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
if (!has_main_pipeline)
{
has_main_pipeline = true;
main_pipeline = interpreter->executeWithProcessors();
main_pipeline = interpreter->execute().pipeline;
headers.emplace_back(main_pipeline.getHeader());
}
else
{
pipelines.emplace_back(interpreter->executeWithProcessors());
pipelines.emplace_back(interpreter->execute().pipeline);
headers.emplace_back(pipelines.back().getHeader());
}
}
@ -280,7 +215,7 @@ QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
main_pipeline.addInterpreterContext(context);
return main_pipeline;
return res;
}

View File

@ -29,12 +29,6 @@ public:
BlockIO execute() override;
/// Execute the query without union of streams.
BlockInputStreams executeWithMultipleStreams(QueryPipeline & parent_pipeline);
QueryPipeline executeWithProcessors() override;
bool canExecuteWithProcessors() const override { return true; }
bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; }

View File

@ -184,7 +184,7 @@ bool isStorageTouchedByMutations(
/// For some reason it may copy context and and give it into ExpressionBlockInputStream
/// after that we will use context from destroyed stack frame in our stream.
InterpreterSelectQuery interpreter(select_query, context_copy, storage, SelectQueryOptions().ignoreLimits());
BlockInputStreamPtr in = interpreter.execute().in;
BlockInputStreamPtr in = interpreter.execute().getInputStream();
Block block = in->read();
if (!block.rows())
@ -687,7 +687,7 @@ void MutationsInterpreter::validate(TableStructureReadLockHolder &)
}
/// Do not use getSampleBlock in order to check the whole pipeline.
Block first_stage_header = select_interpreter->execute().in->getHeader();
Block first_stage_header = select_interpreter->execute().getInputStream()->getHeader();
BlockInputStreamPtr in = std::make_shared<NullBlockInputStream>(first_stage_header);
addStreamsForLaterStages(stages, in)->getHeader();
}
@ -697,7 +697,7 @@ BlockInputStreamPtr MutationsInterpreter::execute(TableStructureReadLockHolder &
if (!can_execute)
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
BlockInputStreamPtr in = select_interpreter->execute().in;
BlockInputStreamPtr in = select_interpreter->execute().getInputStream();
auto result_stream = addStreamsForLaterStages(stages, in);

View File

@ -13,7 +13,7 @@ void SubqueryForSet::makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery>
{
joined_block_aliases = std::move(joined_block_aliases_);
source = std::make_shared<LazyBlockInputStream>(interpreter->getSampleBlock(),
[interpreter]() mutable { return interpreter->execute().in; });
[interpreter]() mutable { return interpreter->execute().getInputStream(); });
sample_block = source->getHeader();
renameColumns(sample_block);

View File

@ -280,7 +280,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion.
String query(begin, query_end);
BlockIO res;
QueryPipeline & pipeline = res.pipeline;
String query_for_logging;
@ -338,7 +337,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
context.resetInputCallbacks();
auto interpreter = InterpreterFactory::get(ast, context, stage);
bool use_processors = interpreter->canExecuteWithProcessors();
std::shared_ptr<const EnabledQuota> quota;
if (!interpreter->ignoreQuota())
@ -358,10 +356,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}
if (use_processors)
pipeline = interpreter->executeWithProcessors();
else
res = interpreter->execute();
res = interpreter->execute();
QueryPipeline & pipeline = res.pipeline;
bool use_processors = pipeline.initialized();
if (res.pipeline.initialized())
use_processors = true;

View File

@ -74,10 +74,6 @@ void PullingPipelineExecutor::cancel()
/// Cancel execution if it wasn't finished.
if (executor)
executor->cancel();
/// Read all data and finish execution.
Chunk chunk;
while (pull(chunk));
}
Chunk PullingPipelineExecutor::getTotals()

View File

@ -59,7 +59,11 @@ ConvertingTransform::ConvertingTransform(
break;
case MatchColumnsMode::Name:
if (source.has(res_elem.name))
/// It may seem strange, but sometimes block may have columns with the same name.
/// For this specific case, try to get column from the same position if it has correct name first.
if (result_col_num < source.columns() && source.getByPosition(result_col_num).name == res_elem.name)
conversion[result_col_num] = result_col_num;
else if (source.has(res_elem.name))
conversion[result_col_num] = source.getPositionByName(res_elem.name);
else
throw Exception("Cannot find column " + backQuoteIfNeed(res_elem.name) + " in source stream",

View File

@ -397,25 +397,6 @@ void IStorage::checkAlterIsPossible(const AlterCommands & commands, const Settin
}
}
BlockInputStreams IStorage::readStreams(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
ForceTreeShapedPipeline enable_tree_shape(query_info);
auto pipes = read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
BlockInputStreams res;
res.reserve(pipes.size());
for (auto & pipe : pipes)
res.emplace_back(std::make_shared<TreeExecutorBlockInputStream>(std::move(pipe)));
return res;
}
StorageID IStorage::getStorageID() const
{

View File

@ -306,16 +306,6 @@ public:
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** The same as read, but returns BlockInputStreams.
*/
BlockInputStreams readStreams(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/);
/** Writes the data to a table.
* Receives a description of the query, which can contain information about the data write method.
* Returns an object by which you can write data sequentially.

View File

@ -111,7 +111,7 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(const Context & conte
InterpreterSelectQuery interpreter(mergeable_query->clone(), context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().in);
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().getInputStream());
while (Block this_block = view_mergeable_stream->read())
base_blocks->push_back(this_block);
@ -148,7 +148,7 @@ BlockInputStreamPtr StorageLiveView::completeQuery(Pipes pipes)
block_context->addExternalTable(getBlocksTableName(), TemporaryTableHolder(global_context, creator));
InterpreterSelectQuery select(getInnerBlocksQuery(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().getInputStream());
/// Squashing is needed here because the view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
@ -218,7 +218,7 @@ void StorageLiveView::writeIntoLiveView(
QueryProcessingStage::WithMergeableState);
auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
select_block.execute().in);
select_block.execute().getInputStream());
while (Block this_block = data_mergeable_stream->read())
new_mergeable_blocks->push_back(this_block);

View File

@ -1151,7 +1151,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (num_streams > settings.max_final_threads)
num_streams = settings.max_final_threads;
if (num_streams <= 1 || sort_description.empty() || query_info.force_tree_shaped_pipeline)
if (num_streams <= 1 || sort_description.empty())
{
Pipe pipe(std::move(pipes), get_merging_processor());

View File

@ -80,28 +80,6 @@ struct SelectQueryInfo
/// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3)
PreparedSets sets;
/// Temporary flag is needed to support old pipeline with input streams.
/// If enabled, then pipeline returned by storage must be a tree.
/// Processors from the tree can't return ExpandPipeline status.
mutable bool force_tree_shaped_pipeline = false;
};
/// RAII class to enable force_tree_shaped_pipeline for SelectQueryInfo.
/// Looks awful, but I hope it's temporary.
struct ForceTreeShapedPipeline
{
explicit ForceTreeShapedPipeline(const SelectQueryInfo & info_) : info(info_)
{
force_tree_shaped_pipeline = info.force_tree_shaped_pipeline;
info.force_tree_shaped_pipeline = true;
}
~ForceTreeShapedPipeline() { info.force_tree_shaped_pipeline = force_tree_shaped_pipeline; }
private:
bool force_tree_shaped_pipeline;
const SelectQueryInfo & info;
};
}

View File

@ -234,7 +234,7 @@ Pipes StorageBuffer::read(
*/
if (processed_stage > QueryProcessingStage::FetchColumns)
for (auto & pipe : pipes_from_buffers)
pipe = InterpreterSelectQuery(query_info.query, context, std::move(pipe), SelectQueryOptions(processed_stage)).executeWithProcessors().getPipe();
pipe = InterpreterSelectQuery(query_info.query, context, std::move(pipe), SelectQueryOptions(processed_stage)).execute().pipeline.getPipe();
if (query_info.prewhere_info)
{

View File

@ -238,20 +238,9 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
if (!storage)
{
if (query_info.force_tree_shaped_pipeline)
{
/// This flag means that pipeline must be tree-shaped,
/// so we can't enable processors for InterpreterSelectQuery here.
auto stream = InterpreterSelectQuery(modified_query_info.query, *modified_context, std::make_shared<OneBlockInputStream>(header),
SelectQueryOptions(processed_stage).analyze()).execute().in;
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
return pipes;
}
auto pipe = InterpreterSelectQuery(modified_query_info.query, *modified_context,
std::make_shared<OneBlockInputStream>(header),
SelectQueryOptions(processed_stage).analyze()).executeWithProcessors().getPipe();
SelectQueryOptions(processed_stage).analyze()).execute().pipeline.getPipe();
pipe.addInterpreterContext(modified_context);
pipes.emplace_back(std::move(pipe));
return pipes;
@ -276,15 +265,8 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
InterpreterSelectQuery interpreter{modified_query_info.query, *modified_context, SelectQueryOptions(processed_stage)};
if (query_info.force_tree_shaped_pipeline)
{
BlockInputStreamPtr stream = interpreter.execute().in;
Pipe pipe(std::make_shared<SourceFromInputStream>(std::move(stream)));
pipes.emplace_back(std::move(pipe));
}
else
{
Pipe pipe = interpreter.executeWithProcessors().getPipe();
Pipe pipe = interpreter.execute().pipeline.getPipe();
pipes.emplace_back(std::move(pipe));
}

View File

@ -65,42 +65,24 @@ Pipes StorageView::read(
current_inner_query = getRuntimeViewQuery(*query_info.query->as<const ASTSelectQuery>(), context);
InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, {}, column_names);
/// FIXME res may implicitly use some objects owned be pipeline, but them will be destructed after return
if (query_info.force_tree_shaped_pipeline)
auto pipeline = interpreter.execute().pipeline;
/// It's expected that the columns read from storage are not constant.
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.
pipeline.addSimpleTransform([](const Block & header)
{
QueryPipeline pipeline;
BlockInputStreams streams = interpreter.executeWithMultipleStreams(pipeline);
return std::make_shared<MaterializingTransform>(header);
});
for (auto & stream : streams)
{
stream = std::make_shared<MaterializingBlockInputStream>(stream);
stream = std::make_shared<ConvertingBlockInputStream>(stream, getSampleBlockForColumns(column_names),
ConvertingBlockInputStream::MatchColumnsMode::Name);
}
for (auto & stream : streams)
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
}
else
/// And also convert to expected structure.
pipeline.addSimpleTransform([&](const Block & header)
{
auto pipeline = interpreter.executeWithProcessors();
return std::make_shared<ConvertingTransform>(header, getSampleBlockForColumns(column_names),
ConvertingTransform::MatchColumnsMode::Name);
});
/// It's expected that the columns read from storage are not constant.
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.
pipeline.addSimpleTransform([](const Block & header)
{
return std::make_shared<MaterializingTransform>(header);
});
/// And also convert to expected structure.
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, getSampleBlockForColumns(column_names),
ConvertingTransform::MatchColumnsMode::Name);
});
pipes = std::move(pipeline).getPipes();
}
pipes = std::move(pipeline).getPipes();
return pipes;
}

View File

@ -864,6 +864,8 @@ def test_double_move_while_select(started_cluster, name, positive):
thread = threading.Thread(target=long_select)
thread.start()
time.sleep(1)
node1.query("ALTER TABLE {name} MOVE PART '{part}' TO DISK 'jbod1'".format(name=name, part=parts[0]))
# Fill jbod1 to force ClickHouse to make move of partition 1 to external.

View File

@ -1,4 +1,4 @@
0
0
100000
200000
800000
1600000

View File

@ -54,7 +54,7 @@ EOL
echo "create table null_01278 as data_01278 Engine=Null();" | execute
for i in $(seq 1 $TEST_01278_PARTS); do
echo "create table part_01278_$i as data_01278 Engine=Buffer(currentDatabase(), null_01278, 1, 86400, 86400, 1e5, 1e6, 10e6, 100e6);"
echo "create materialized view mv_01278_$i to part_01278_$i as select * from data_01278 where key%$TEST_01278_PARTS+1 == $i;"
echo "create materialized view mv_01278_$i to part_01278_$i as select * from data_01278 where key%$TEST_01278_PARTS+1 != $i;"
done | execute
echo "create table out_01278 as data_01278 Engine=Merge(currentDatabase(), 'part_01278_');" | execute