mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Remove some code.
This commit is contained in:
parent
a697560738
commit
1fa795988f
@ -22,10 +22,6 @@ public:
|
||||
*/
|
||||
virtual BlockIO execute() = 0;
|
||||
|
||||
virtual QueryPipeline executeWithProcessors() { throw Exception("executeWithProcessors not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
virtual bool canExecuteWithProcessors() const { return false; }
|
||||
|
||||
virtual bool ignoreQuota() const { return false; }
|
||||
virtual bool ignoreLimits() const { return false; }
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -77,12 +77,6 @@ public:
|
||||
/// Execute a query. Get the stream of blocks to read.
|
||||
BlockIO execute() override;
|
||||
|
||||
/// Execute the query and return multuple streams for parallel processing.
|
||||
BlockInputStreams executeWithMultipleStreams(QueryPipeline & parent_pipeline);
|
||||
|
||||
QueryPipeline executeWithProcessors() override;
|
||||
bool canExecuteWithProcessors() const override { return true; }
|
||||
|
||||
bool ignoreLimits() const override { return options.ignore_limits; }
|
||||
bool ignoreQuota() const override { return options.ignore_quota; }
|
||||
|
||||
@ -108,89 +102,15 @@ private:
|
||||
|
||||
Block getSampleBlockImpl();
|
||||
|
||||
struct Pipeline
|
||||
{
|
||||
/** Streams of data.
|
||||
* The source data streams are produced in the executeFetchColumns function.
|
||||
* Then they are converted (wrapped in other streams) using the `execute*` functions,
|
||||
* to get the whole pipeline running the query.
|
||||
*/
|
||||
BlockInputStreams streams;
|
||||
|
||||
/** When executing FULL or RIGHT JOIN, there will be a data stream from which you can read "not joined" rows.
|
||||
* It has a special meaning, since reading from it should be done after reading from the main streams.
|
||||
* It is appended to the main streams in UnionBlockInputStream or ParallelAggregatingBlockInputStream.
|
||||
*/
|
||||
BlockInputStreamPtr stream_with_non_joined_data;
|
||||
bool union_stream = false;
|
||||
|
||||
/// Cache value of InterpreterSelectQuery::max_streams
|
||||
size_t max_threads = 1;
|
||||
|
||||
BlockInputStreamPtr & firstStream() { return streams.at(0); }
|
||||
|
||||
template <typename Transform>
|
||||
void transform(Transform && transformation)
|
||||
{
|
||||
for (auto & stream : streams)
|
||||
transformation(stream);
|
||||
|
||||
if (stream_with_non_joined_data)
|
||||
transformation(stream_with_non_joined_data);
|
||||
}
|
||||
|
||||
bool hasMoreThanOneStream() const
|
||||
{
|
||||
return streams.size() + (stream_with_non_joined_data ? 1 : 0) > 1;
|
||||
}
|
||||
|
||||
/// Resulting stream is mix of other streams data. Distinct and/or order guaranties are broken.
|
||||
bool hasMixedStreams() const
|
||||
{
|
||||
return hasMoreThanOneStream() || union_stream;
|
||||
}
|
||||
|
||||
bool hasDelayedStream() const { return stream_with_non_joined_data != nullptr; }
|
||||
bool initialized() const { return !streams.empty(); }
|
||||
|
||||
/// Compatibility with QueryPipeline (Processors)
|
||||
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
|
||||
size_t getNumThreads() const { return max_threads; }
|
||||
};
|
||||
|
||||
template <typename TPipeline>
|
||||
void executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe, QueryPipeline & save_context_and_storage);
|
||||
void executeImpl(QueryPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe);
|
||||
|
||||
/// Different stages of query execution.
|
||||
|
||||
/// dry_run - don't read from table, use empty header block instead.
|
||||
void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run);
|
||||
|
||||
template <typename TPipeline>
|
||||
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
|
||||
void executeFetchColumns(
|
||||
QueryProcessingStage::Enum processing_stage,
|
||||
QueryPipeline & pipeline,
|
||||
const PrewhereInfoPtr & prewhere_info,
|
||||
const Names & columns_to_remove_after_prewhere,
|
||||
QueryPipeline & save_context_and_storage);
|
||||
|
||||
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
|
||||
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
|
||||
void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final);
|
||||
void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
|
||||
void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression);
|
||||
static void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression);
|
||||
void executeOrder(Pipeline & pipeline, InputSortingInfoPtr sorting_info);
|
||||
void executeWithFill(Pipeline & pipeline);
|
||||
void executeMergeSorted(Pipeline & pipeline);
|
||||
void executePreLimit(Pipeline & pipeline);
|
||||
void executeUnion(Pipeline & pipeline, Block header);
|
||||
void executeLimitBy(Pipeline & pipeline);
|
||||
void executeLimit(Pipeline & pipeline);
|
||||
void executeOffset(Pipeline & pipeline);
|
||||
static void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression);
|
||||
void executeDistinct(Pipeline & pipeline, bool before_order, Names columns);
|
||||
void executeExtremes(Pipeline & pipeline);
|
||||
void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, const std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
|
||||
void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit);
|
||||
const Names & columns_to_remove_after_prewhere);
|
||||
|
||||
void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
|
||||
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
|
||||
@ -213,17 +133,12 @@ private:
|
||||
|
||||
String generateFilterActions(ExpressionActionsPtr & actions, const ASTPtr & row_policy_filter, const Names & prerequisite_columns = {}) const;
|
||||
|
||||
/// Add ConvertingBlockInputStream to specified header.
|
||||
static void unifyStreams(Pipeline & pipeline, Block header);
|
||||
|
||||
enum class Modificator
|
||||
{
|
||||
ROLLUP = 0,
|
||||
CUBE = 1
|
||||
};
|
||||
|
||||
void executeRollupOrCube(Pipeline & pipeline, Modificator modificator);
|
||||
|
||||
void executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator);
|
||||
|
||||
/** If there is a SETTINGS section in the SELECT query, then apply settings from it.
|
||||
|
@ -3,15 +3,9 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <DataStreams/UnionBlockInputStream.h>
|
||||
#include <DataStreams/NullBlockInputStream.h>
|
||||
#include <DataStreams/ConcatBlockInputStream.h>
|
||||
#include <DataStreams/ConvertingBlockInputStream.h>
|
||||
#include <Columns/getLeastSuperColumn.h>
|
||||
#include <Columns/ColumnConst.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
|
||||
#include <Processors/Sources/NullSource.h>
|
||||
#include <Processors/QueryPipeline.h>
|
||||
@ -180,69 +174,10 @@ Block InterpreterSelectWithUnionQuery::getSampleBlock(
|
||||
}
|
||||
|
||||
|
||||
BlockInputStreams InterpreterSelectWithUnionQuery::executeWithMultipleStreams(QueryPipeline & parent_pipeline)
|
||||
{
|
||||
BlockInputStreams nested_streams;
|
||||
|
||||
for (auto & interpreter : nested_interpreters)
|
||||
{
|
||||
BlockInputStreams streams = interpreter->executeWithMultipleStreams(parent_pipeline);
|
||||
nested_streams.insert(nested_streams.end(), streams.begin(), streams.end());
|
||||
}
|
||||
|
||||
/// Unify data structure.
|
||||
if (nested_interpreters.size() > 1)
|
||||
{
|
||||
for (auto & stream : nested_streams)
|
||||
stream = std::make_shared<ConvertingBlockInputStream>(stream, result_header,ConvertingBlockInputStream::MatchColumnsMode::Position);
|
||||
parent_pipeline.addInterpreterContext(context);
|
||||
}
|
||||
|
||||
/// Update max_streams due to:
|
||||
/// - max_distributed_connections for Distributed() engine
|
||||
/// - max_streams_to_max_threads_ratio
|
||||
///
|
||||
/// XXX: res.pipeline.getMaxThreads() cannot be used since it is capped to
|
||||
/// number of streams, which is empty for non-Processors case.
|
||||
max_streams = (*std::min_element(nested_interpreters.begin(), nested_interpreters.end(), [](const auto &a, const auto &b)
|
||||
{
|
||||
return a->getMaxStreams() < b->getMaxStreams();
|
||||
}))->getMaxStreams();
|
||||
|
||||
return nested_streams;
|
||||
}
|
||||
|
||||
|
||||
BlockIO InterpreterSelectWithUnionQuery::execute()
|
||||
{
|
||||
BlockIO res;
|
||||
BlockInputStreams nested_streams = executeWithMultipleStreams(res.pipeline);
|
||||
BlockInputStreamPtr result_stream;
|
||||
|
||||
if (nested_streams.empty())
|
||||
{
|
||||
result_stream = std::make_shared<NullBlockInputStream>(getSampleBlock());
|
||||
}
|
||||
else if (nested_streams.size() == 1)
|
||||
{
|
||||
result_stream = nested_streams.front();
|
||||
nested_streams.clear();
|
||||
}
|
||||
else
|
||||
{
|
||||
result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr, max_streams);
|
||||
nested_streams.clear();
|
||||
}
|
||||
|
||||
res.in = result_stream;
|
||||
res.pipeline.addInterpreterContext(context);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
|
||||
{
|
||||
QueryPipeline main_pipeline;
|
||||
QueryPipeline & main_pipeline = res.pipeline;
|
||||
std::vector<QueryPipeline> pipelines;
|
||||
bool has_main_pipeline = false;
|
||||
|
||||
@ -254,12 +189,12 @@ QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
|
||||
if (!has_main_pipeline)
|
||||
{
|
||||
has_main_pipeline = true;
|
||||
main_pipeline = interpreter->executeWithProcessors();
|
||||
main_pipeline = interpreter->execute().pipeline;
|
||||
headers.emplace_back(main_pipeline.getHeader());
|
||||
}
|
||||
else
|
||||
{
|
||||
pipelines.emplace_back(interpreter->executeWithProcessors());
|
||||
pipelines.emplace_back(interpreter->execute().pipeline);
|
||||
headers.emplace_back(pipelines.back().getHeader());
|
||||
}
|
||||
}
|
||||
@ -280,7 +215,7 @@ QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
|
||||
|
||||
main_pipeline.addInterpreterContext(context);
|
||||
|
||||
return main_pipeline;
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
|
@ -29,12 +29,6 @@ public:
|
||||
|
||||
BlockIO execute() override;
|
||||
|
||||
/// Execute the query without union of streams.
|
||||
BlockInputStreams executeWithMultipleStreams(QueryPipeline & parent_pipeline);
|
||||
|
||||
QueryPipeline executeWithProcessors() override;
|
||||
bool canExecuteWithProcessors() const override { return true; }
|
||||
|
||||
bool ignoreLimits() const override { return options.ignore_limits; }
|
||||
bool ignoreQuota() const override { return options.ignore_quota; }
|
||||
|
||||
|
@ -280,7 +280,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
/// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion.
|
||||
String query(begin, query_end);
|
||||
BlockIO res;
|
||||
QueryPipeline & pipeline = res.pipeline;
|
||||
|
||||
String query_for_logging;
|
||||
|
||||
@ -338,7 +337,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
context.resetInputCallbacks();
|
||||
|
||||
auto interpreter = InterpreterFactory::get(ast, context, stage);
|
||||
bool use_processors = interpreter->canExecuteWithProcessors();
|
||||
|
||||
std::shared_ptr<const EnabledQuota> quota;
|
||||
if (!interpreter->ignoreQuota())
|
||||
@ -358,10 +356,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
|
||||
}
|
||||
|
||||
if (use_processors)
|
||||
pipeline = interpreter->executeWithProcessors();
|
||||
else
|
||||
res = interpreter->execute();
|
||||
res = interpreter->execute();
|
||||
QueryPipeline & pipeline = res.pipeline;
|
||||
bool use_processors = pipeline.initialized();
|
||||
|
||||
if (res.pipeline.initialized())
|
||||
use_processors = true;
|
||||
|
@ -397,25 +397,6 @@ void IStorage::checkAlterIsPossible(const AlterCommands & commands, const Settin
|
||||
}
|
||||
}
|
||||
|
||||
BlockInputStreams IStorage::readStreams(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
ForceTreeShapedPipeline enable_tree_shape(query_info);
|
||||
auto pipes = read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
|
||||
BlockInputStreams res;
|
||||
res.reserve(pipes.size());
|
||||
|
||||
for (auto & pipe : pipes)
|
||||
res.emplace_back(std::make_shared<TreeExecutorBlockInputStream>(std::move(pipe)));
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
StorageID IStorage::getStorageID() const
|
||||
{
|
||||
|
@ -303,16 +303,6 @@ public:
|
||||
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/** The same as read, but returns BlockInputStreams.
|
||||
*/
|
||||
BlockInputStreams readStreams(
|
||||
const Names & /*column_names*/,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & /*context*/,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t /*max_block_size*/,
|
||||
unsigned /*num_streams*/);
|
||||
|
||||
/** Writes the data to a table.
|
||||
* Receives a description of the query, which can contain information about the data write method.
|
||||
* Returns an object by which you can write data sequentially.
|
||||
|
@ -80,28 +80,6 @@ struct SelectQueryInfo
|
||||
/// Prepared sets are used for indices by storage engine.
|
||||
/// Example: x IN (1, 2, 3)
|
||||
PreparedSets sets;
|
||||
|
||||
/// Temporary flag is needed to support old pipeline with input streams.
|
||||
/// If enabled, then pipeline returned by storage must be a tree.
|
||||
/// Processors from the tree can't return ExpandPipeline status.
|
||||
mutable bool force_tree_shaped_pipeline = false;
|
||||
};
|
||||
|
||||
/// RAII class to enable force_tree_shaped_pipeline for SelectQueryInfo.
|
||||
/// Looks awful, but I hope it's temporary.
|
||||
struct ForceTreeShapedPipeline
|
||||
{
|
||||
explicit ForceTreeShapedPipeline(const SelectQueryInfo & info_) : info(info_)
|
||||
{
|
||||
force_tree_shaped_pipeline = info.force_tree_shaped_pipeline;
|
||||
info.force_tree_shaped_pipeline = true;
|
||||
}
|
||||
|
||||
~ForceTreeShapedPipeline() { info.force_tree_shaped_pipeline = force_tree_shaped_pipeline; }
|
||||
|
||||
private:
|
||||
bool force_tree_shaped_pipeline;
|
||||
const SelectQueryInfo & info;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -64,16 +64,8 @@ Pipes StorageView::read(
|
||||
|
||||
QueryPipeline pipeline;
|
||||
InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, {}, column_names);
|
||||
/// FIXME res may implicitly use some objects owned be pipeline, but them will be destructed after return
|
||||
if (query_info.force_tree_shaped_pipeline)
|
||||
{
|
||||
BlockInputStreams streams = interpreter.executeWithMultipleStreams(pipeline);
|
||||
for (auto & stream : streams)
|
||||
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
|
||||
}
|
||||
else
|
||||
/// TODO: support multiple streams here. Need more general interface than pipes.
|
||||
pipes.emplace_back(interpreter.executeWithProcessors().getPipe());
|
||||
/// TODO: support multiple streams here. Need more general interface than pipes.
|
||||
pipes.emplace_back(interpreter.execute().pipeline.getPipe());
|
||||
|
||||
/// It's expected that the columns read from storage are not constant.
|
||||
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.
|
||||
|
Loading…
Reference in New Issue
Block a user