mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
added IStorage::getQueryProcessingStage
This commit is contained in:
parent
66905a1366
commit
a3308bf675
2
contrib/poco
vendored
2
contrib/poco
vendored
@ -1 +1 @@
|
||||
Subproject commit 930a7ec1154f4f9711edfb4b4a39f9fff2a5bbb5
|
||||
Subproject commit 2d5a158303adf9d47b980cdcfdb26cee1460704e
|
@ -235,9 +235,9 @@ BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams()
|
||||
}
|
||||
|
||||
|
||||
InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage,
|
||||
ExpressionActionsChain & chain)
|
||||
InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage)
|
||||
{
|
||||
ExpressionActionsChain chain;
|
||||
AnalysisResult res;
|
||||
|
||||
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
|
||||
@ -253,8 +253,16 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
|
||||
*/
|
||||
|
||||
std::shared_ptr<bool> remove_where_filter;
|
||||
std::shared_ptr<bool> remove_prewhere_filter;
|
||||
|
||||
{
|
||||
if (query_analyzer->appendPrewhere(chain, false, remove_prewhere_filter))
|
||||
{
|
||||
res.prewhere_info = std::make_shared<PrewhereInfo>(
|
||||
chain.steps.front().actions, query.prewhere_expression->getColumnName());
|
||||
chain.addStep();
|
||||
}
|
||||
|
||||
res.need_aggregate = query_analyzer->hasAggregation();
|
||||
|
||||
query_analyzer->appendArrayJoin(chain, !res.first_stage);
|
||||
@ -311,6 +319,9 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
|
||||
chain.clear();
|
||||
}
|
||||
|
||||
if (res.prewhere_info)
|
||||
res.prewhere_info->remove_prewhere_column = *remove_prewhere_filter;
|
||||
|
||||
/// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys).
|
||||
if (res.has_where)
|
||||
{
|
||||
@ -343,13 +354,14 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
|
||||
|
||||
AnalysisResult expressions;
|
||||
{
|
||||
ExpressionActionsChain chain;
|
||||
PrewhereInfoPtr prewhere_info;
|
||||
std::shared_ptr<bool> remove_prewhere_filter;
|
||||
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
|
||||
if (storage)
|
||||
from_stage = storage->getQueryProcessingStage(context);
|
||||
|
||||
expressions = analyzeExpressions(from_stage);
|
||||
|
||||
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
|
||||
QueryProcessingStage::Enum from_stage = executeFetchColumns(pipeline, dry_run, chain,
|
||||
prewhere_info, remove_prewhere_filter);
|
||||
executeFetchColumns(from_stage, pipeline, dry_run, expressions.prewhere_info);
|
||||
|
||||
if (from_stage == QueryProcessingStage::WithMergeableState &&
|
||||
to_stage == QueryProcessingStage::WithMergeableState)
|
||||
@ -358,11 +370,6 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
|
||||
if (!dry_run)
|
||||
LOG_TRACE(log,
|
||||
QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
|
||||
|
||||
expressions = analyzeExpressions(from_stage, chain);
|
||||
|
||||
if (prewhere_info)
|
||||
prewhere_info->remove_prewhere_column = *remove_prewhere_filter;
|
||||
}
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
@ -528,9 +535,8 @@ static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, siz
|
||||
}
|
||||
}
|
||||
|
||||
QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(
|
||||
Pipeline & pipeline, bool dry_run, ExpressionActionsChain & chain,
|
||||
PrewhereInfoPtr & prewhere_info, std::shared_ptr<bool> & remove_prewhere_filter)
|
||||
void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum from_stage, Pipeline & pipeline,
|
||||
bool dry_run, const PrewhereInfoPtr & prewhere_info)
|
||||
{
|
||||
/// List of columns to read to execute the query.
|
||||
Names required_columns = query_analyzer->getRequiredSourceColumns();
|
||||
@ -648,8 +654,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(
|
||||
max_streams = 1;
|
||||
}
|
||||
|
||||
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
|
||||
|
||||
query_analyzer->makeSetsForIndex();
|
||||
|
||||
/// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery or prepared input?
|
||||
@ -680,6 +684,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(
|
||||
SelectQueryInfo query_info;
|
||||
query_info.query = query_ptr;
|
||||
query_info.sets = query_analyzer->getPreparedSets();
|
||||
query_info.prewhere_info = prewhere_info;
|
||||
|
||||
/// PREWHERE optimization
|
||||
{
|
||||
@ -696,13 +701,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(
|
||||
optimize_prewhere(*merge_tree);
|
||||
}
|
||||
|
||||
if (!dry_run && query_analyzer->appendPrewhere(chain, false, remove_prewhere_filter))
|
||||
{
|
||||
query_info.prewhere_info = prewhere_info = std::make_shared<PrewhereInfo>(
|
||||
chain.steps.front().actions, query.prewhere_expression->getColumnName());
|
||||
chain.addStep();
|
||||
}
|
||||
|
||||
if (!dry_run)
|
||||
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
|
||||
|
||||
@ -711,8 +709,10 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(
|
||||
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns)));
|
||||
|
||||
if (query_info.prewhere_info)
|
||||
pipeline.streams.back() = std::make_shared<PrewhereFilterBlockInputStream>(pipeline.streams.back(),
|
||||
query_info.prewhere_info);
|
||||
pipeline.streams.back() = std::make_shared<FilterBlockInputStream>(
|
||||
pipeline.streams.back(), prewhere_info->prewhere_actions,
|
||||
prewhere_info->prewhere_column_name, prewhere_info->remove_prewhere_column
|
||||
);
|
||||
}
|
||||
|
||||
pipeline.transform([&](auto & stream)
|
||||
|
@ -144,9 +144,10 @@ private:
|
||||
bool second_stage = false;
|
||||
|
||||
SubqueriesForSets subqueries_for_sets;
|
||||
PrewhereInfoPtr prewhere_info;
|
||||
};
|
||||
|
||||
AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage, ExpressionActionsChain & chain);
|
||||
AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage);
|
||||
|
||||
|
||||
/** From which table to read. With JOIN, the "left" table is returned.
|
||||
@ -158,10 +159,9 @@ private:
|
||||
/// dry_run - don't read from table, use empty header block instead.
|
||||
void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run);
|
||||
|
||||
/// Fetch data from the table. Returns the stage to which the query was processed in Storage.
|
||||
QueryProcessingStage::Enum executeFetchColumns(Pipeline & pipeline, bool dry_run, ExpressionActionsChain & chain,
|
||||
PrewhereInfoPtr & prewhere_info,
|
||||
std::shared_ptr<bool> & remove_prewhere_filter);
|
||||
/// Fetch data from the table.
|
||||
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, Pipeline & pipeline, bool dry_run,
|
||||
const PrewhereInfoPtr & prewhere_info);
|
||||
|
||||
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
|
||||
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
|
||||
|
@ -150,6 +150,11 @@ public:
|
||||
return res;
|
||||
}
|
||||
|
||||
/** Returns stage to which query is going to be processed in read() function.
|
||||
* (Normally, the function only reads the columns from the list, but in other cases,
|
||||
* for example, the request can be partially processed on a remote server.)
|
||||
*/
|
||||
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const { return QueryProcessingStage::FetchColumns; }
|
||||
|
||||
/** Read a set of columns from the table.
|
||||
* Accepts a list of columns to read, as well as a description of the query,
|
||||
@ -157,9 +162,7 @@ public:
|
||||
* (indexes, locks, etc.)
|
||||
* Returns a stream with which you can read data sequentially
|
||||
* or multiple streams for parallel data reading.
|
||||
* The `processed_stage` info is also written to what stage the request was processed.
|
||||
* (Normally, the function only reads the columns from the list, but in other cases,
|
||||
* for example, the request can be partially processed on a remote server.)
|
||||
* The `processed_stage` must be the result of getQueryProcessingStage() function.
|
||||
*
|
||||
* context contains settings for one query.
|
||||
* Usually Storage does not care about these settings, since they are used in the interpreter.
|
||||
@ -174,7 +177,7 @@ public:
|
||||
const Names & /*column_names*/,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & /*context*/,
|
||||
QueryProcessingStage::Enum & /*processed_stage*/,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t /*max_block_size*/,
|
||||
unsigned /*num_streams*/)
|
||||
{
|
||||
@ -303,6 +306,15 @@ protected:
|
||||
using ITableDeclaration::ITableDeclaration;
|
||||
using std::enable_shared_from_this<IStorage>::shared_from_this;
|
||||
|
||||
void checkQueryProcessingStage(QueryProcessingStage::Enum processed_stage, const Context & context)
|
||||
{
|
||||
auto expected = getQueryProcessingStage(context);
|
||||
if (processed_stage != expected)
|
||||
throw Exception("Unexpected query processing stage for storage " + getName() +
|
||||
": expected " + QueryProcessingStage::toString(expected) +
|
||||
", got " + QueryProcessingStage::toString(processed_stage), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
private:
|
||||
friend class TableStructureReadLock;
|
||||
|
||||
|
@ -134,7 +134,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
|
||||
const Names & column_names_to_return,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
const size_t max_block_size,
|
||||
const unsigned num_streams,
|
||||
Int64 max_block_number_to_read) const
|
||||
@ -193,7 +192,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
|
||||
std::multiset<String> part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
|
||||
|
||||
data.check(real_column_names);
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
SortDescription sort_descr = data.getPrimarySortDescription();
|
||||
|
@ -26,7 +26,6 @@ public:
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams,
|
||||
Int64 max_block_number_to_read) const;
|
||||
|
@ -102,15 +102,30 @@ private:
|
||||
};
|
||||
|
||||
|
||||
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context) const
|
||||
{
|
||||
if (!no_destination)
|
||||
{
|
||||
auto destination = context.getTable(destination_database, destination_table);
|
||||
|
||||
if (destination.get() == this)
|
||||
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
|
||||
|
||||
return destination->getQueryProcessingStage(context);
|
||||
}
|
||||
|
||||
return QueryProcessingStage::FetchColumns;
|
||||
}
|
||||
|
||||
BlockInputStreams StorageBuffer::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
BlockInputStreams streams_from_dst;
|
||||
|
||||
|
@ -53,11 +53,13 @@ public:
|
||||
std::string getName() const override { return "Buffer"; }
|
||||
std::string getTableName() const override { return name; }
|
||||
|
||||
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
|
||||
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -259,10 +259,12 @@ void StorageCatBoostPool::createSampleBlockAndColumns()
|
||||
BlockInputStreams StorageCatBoostPool::read(const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & /*processed_stage*/,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned /*threads*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
auto stream = std::make_shared<CatBoostDatasetBlockInputStream>(
|
||||
data_description_file_name, "TSV", sample_block, context, max_block_size);
|
||||
|
||||
|
@ -17,7 +17,7 @@ public:
|
||||
BlockInputStreams read(const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned threads) override;
|
||||
|
||||
|
@ -36,13 +36,13 @@ StorageDictionary::StorageDictionary(
|
||||
|
||||
BlockInputStreams StorageDictionary::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const SelectQueryInfo query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const size_t max_block_size,
|
||||
const unsigned /*threads*/)
|
||||
{
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
auto dictionary = context.getExternalDictionaries().getDictionary(dictionary_name);
|
||||
return BlockInputStreams{dictionary->getBlockInputStream(column_names, max_block_size)};
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ public:
|
||||
BlockInputStreams read(const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size = DEFAULT_BLOCK_SIZE,
|
||||
unsigned threads = 1) override;
|
||||
|
||||
|
@ -175,14 +175,7 @@ StoragePtr StorageDistributed::createWithOwnCluster(
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
BlockInputStreams StorageDistributed::read(
|
||||
const Names & /*column_names*/,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
const size_t /*max_block_size*/,
|
||||
const unsigned /*num_streams*/)
|
||||
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context) const
|
||||
{
|
||||
auto cluster = getCluster();
|
||||
|
||||
@ -193,11 +186,24 @@ BlockInputStreams StorageDistributed::read(
|
||||
size_t result_size = (num_remote_shards * settings.max_parallel_replicas) + num_local_shards;
|
||||
|
||||
if (settings.distributed_group_by_no_merge)
|
||||
processed_stage = QueryProcessingStage::Complete;
|
||||
return QueryProcessingStage::Complete;
|
||||
else /// Normal mode.
|
||||
processed_stage = result_size == 1
|
||||
? QueryProcessingStage::Complete
|
||||
: QueryProcessingStage::WithMergeableState;
|
||||
return result_size == 1 ? QueryProcessingStage::Complete
|
||||
: QueryProcessingStage::WithMergeableState;
|
||||
}
|
||||
|
||||
BlockInputStreams StorageDistributed::read(
|
||||
const Names & /*column_names*/,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const size_t /*max_block_size*/,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
auto cluster = getCluster();
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
|
||||
const auto & modified_query_ast = rewriteSelectQuery(
|
||||
query_info.query, remote_database, remote_table);
|
||||
|
@ -52,11 +52,13 @@ public:
|
||||
|
||||
bool isRemote() const override { return true; }
|
||||
|
||||
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
|
||||
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -170,10 +170,11 @@ BlockInputStreams StorageFile::read(
|
||||
const Names & /*column_names*/,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & /*processed_stage*/,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned /*num_streams*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
return BlockInputStreams(1, std::make_shared<StorageFileBlockInputStream>(*this, context, max_block_size));
|
||||
}
|
||||
|
||||
|
@ -35,7 +35,7 @@ public:
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -236,12 +236,12 @@ BlockInputStreams StorageKafka::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
check(column_names);
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
|
||||
if (num_consumers == 0)
|
||||
return BlockInputStreams();
|
||||
|
@ -39,7 +39,7 @@ public:
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -491,12 +491,12 @@ BlockInputStreams StorageLog::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
check(column_names);
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
loadMarks();
|
||||
|
||||
NamesAndTypesList all_columns = Nested::collect(getColumns().getAllPhysical().addTypes(column_names));
|
||||
|
@ -30,7 +30,7 @@ public:
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -170,11 +170,16 @@ bool StorageMaterializedView::hasColumn(const String & column_name) const
|
||||
return getTargetTable()->hasColumn(column_name);
|
||||
}
|
||||
|
||||
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context) const
|
||||
{
|
||||
return getTargetTable()->getQueryProcessingStage(context);
|
||||
}
|
||||
|
||||
BlockInputStreams StorageMaterializedView::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const size_t max_block_size,
|
||||
const unsigned num_streams)
|
||||
{
|
||||
|
@ -41,11 +41,13 @@ public:
|
||||
void shutdown() override;
|
||||
bool checkTableCanBeDropped() const override;
|
||||
|
||||
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
|
||||
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -83,11 +83,12 @@ StorageMemory::StorageMemory(String table_name_, ColumnsDescription columns_desc
|
||||
BlockInputStreams StorageMemory::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & /*context*/,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t /*max_block_size*/,
|
||||
unsigned num_streams)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
check(column_names);
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
|
||||
|
@ -32,7 +32,7 @@ public:
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -132,11 +132,46 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) cons
|
||||
}
|
||||
|
||||
|
||||
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context) const
|
||||
{
|
||||
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
|
||||
|
||||
auto database = context.getDatabase(source_database);
|
||||
auto iterator = database->getIterator(context);
|
||||
|
||||
bool first = true;
|
||||
|
||||
while (iterator->isValid())
|
||||
{
|
||||
if (table_name_regexp.match(iterator->name()))
|
||||
{
|
||||
auto & table = iterator->table();
|
||||
if (table.get() != this)
|
||||
{
|
||||
auto stage = table->getQueryProcessingStage(context);
|
||||
|
||||
if (first)
|
||||
stage_in_source_tables = stage;
|
||||
else if (stage != stage_in_source_tables)
|
||||
throw Exception("Source tables for Merge table are processing data up to different stages",
|
||||
ErrorCodes::INCOMPATIBLE_SOURCE_TABLES);
|
||||
|
||||
first = false;
|
||||
}
|
||||
}
|
||||
|
||||
iterator->next();
|
||||
}
|
||||
|
||||
return stage_in_source_tables;
|
||||
}
|
||||
|
||||
|
||||
BlockInputStreams StorageMerge::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const size_t max_block_size,
|
||||
const unsigned num_streams)
|
||||
{
|
||||
@ -156,8 +191,6 @@ BlockInputStreams StorageMerge::read(
|
||||
real_column_names.push_back(name);
|
||||
}
|
||||
|
||||
std::optional<QueryProcessingStage::Enum> processed_stage_in_source_tables;
|
||||
|
||||
/** First we make list of selected tables to find out its size.
|
||||
* This is necessary to correctly pass the recommended number of threads to each table.
|
||||
*/
|
||||
@ -218,24 +251,17 @@ BlockInputStreams StorageMerge::read(
|
||||
|
||||
if (curr_table_number < num_streams)
|
||||
{
|
||||
QueryProcessingStage::Enum processed_stage_in_source_table = processed_stage;
|
||||
source_streams = table->read(
|
||||
real_column_names,
|
||||
modified_query_info,
|
||||
modified_context,
|
||||
processed_stage_in_source_table,
|
||||
processed_stage,
|
||||
max_block_size,
|
||||
tables_count >= num_streams ? 1 : (num_streams / tables_count));
|
||||
|
||||
if (!processed_stage_in_source_tables)
|
||||
processed_stage_in_source_tables.emplace(processed_stage_in_source_table);
|
||||
else if (processed_stage_in_source_table != *processed_stage_in_source_tables)
|
||||
throw Exception("Source tables for Merge table are processing data up to different stages",
|
||||
ErrorCodes::INCOMPATIBLE_SOURCE_TABLES);
|
||||
|
||||
if (!header)
|
||||
{
|
||||
switch (processed_stage_in_source_table)
|
||||
switch (processed_stage)
|
||||
{
|
||||
case QueryProcessingStage::FetchColumns:
|
||||
header = getSampleBlockForColumns(column_names);
|
||||
@ -263,25 +289,17 @@ BlockInputStreams StorageMerge::read(
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!processed_stage_in_source_tables)
|
||||
throw Exception("Logical error: unknown processed stage in source tables", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
/// If many streams, initialize it lazily, to avoid long delay before start of query processing.
|
||||
source_streams.emplace_back(std::make_shared<LazyBlockInputStream>(header, [=]() -> BlockInputStreamPtr
|
||||
{
|
||||
QueryProcessingStage::Enum processed_stage_in_source_table = processed_stage;
|
||||
BlockInputStreams streams = table->read(
|
||||
real_column_names,
|
||||
modified_query_info,
|
||||
modified_context,
|
||||
processed_stage_in_source_table,
|
||||
processed_stage,
|
||||
max_block_size,
|
||||
1);
|
||||
|
||||
if (processed_stage_in_source_table != *processed_stage_in_source_tables)
|
||||
throw Exception("Source tables for Merge table are processing data up to different stages",
|
||||
ErrorCodes::INCOMPATIBLE_SOURCE_TABLES);
|
||||
|
||||
if (streams.empty())
|
||||
{
|
||||
return std::make_shared<NullBlockInputStream>(header);
|
||||
@ -305,9 +323,6 @@ BlockInputStreams StorageMerge::read(
|
||||
res.insert(res.end(), source_streams.begin(), source_streams.end());
|
||||
}
|
||||
|
||||
if (processed_stage_in_source_tables)
|
||||
processed_stage = *processed_stage_in_source_tables;
|
||||
|
||||
if (res.empty())
|
||||
return res;
|
||||
|
||||
|
@ -29,11 +29,13 @@ public:
|
||||
NameAndTypePair getColumn(const String & column_name) const override;
|
||||
bool hasColumn(const String & column_name) const override;
|
||||
|
||||
QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const override;
|
||||
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -101,11 +101,12 @@ BlockInputStreams StorageMergeTree::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const size_t max_block_size,
|
||||
const unsigned num_streams)
|
||||
{
|
||||
return reader.read(column_names, query_info, context, processed_stage, max_block_size, num_streams, 0);
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
return reader.read(column_names, query_info, context, max_block_size, num_streams, 0);
|
||||
}
|
||||
|
||||
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Settings & /*settings*/)
|
||||
|
@ -56,7 +56,7 @@ public:
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -37,12 +37,12 @@ BlockInputStreams StorageMySQL::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned)
|
||||
{
|
||||
check(column_names);
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
String query = transformQueryForExternalDatabase(*query_info.query, getColumns().ordinary, remote_database_name, remote_table_name, context);
|
||||
|
||||
Block sample_block;
|
||||
|
@ -33,7 +33,7 @@ public:
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -23,11 +23,12 @@ public:
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo &,
|
||||
const Context &,
|
||||
QueryProcessingStage::Enum &,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processing_stage,
|
||||
size_t,
|
||||
unsigned) override
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
return { std::make_shared<NullBlockInputStream>(getSampleBlockForColumns(column_names)) };
|
||||
}
|
||||
|
||||
|
@ -37,12 +37,12 @@ BlockInputStreams StorageODBC::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned /*num_streams*/)
|
||||
{
|
||||
check(column_names);
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
String query = transformQueryForExternalDatabase(
|
||||
*query_info.query, getColumns().ordinary, remote_database_name, remote_table_name, context);
|
||||
|
||||
|
@ -35,7 +35,7 @@ public:
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -2439,10 +2439,11 @@ BlockInputStreams StorageReplicatedMergeTree::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const size_t max_block_size,
|
||||
const unsigned num_streams)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
|
||||
/** The `select_sequential_consistency` setting has two meanings:
|
||||
@ -2480,8 +2481,7 @@ BlockInputStreams StorageReplicatedMergeTree::read(
|
||||
}
|
||||
}
|
||||
|
||||
return reader.read(
|
||||
column_names, query_info, context, processed_stage, max_block_size, num_streams, max_block_number_to_read);
|
||||
return reader.read(column_names, query_info, context, max_block_size, num_streams, max_block_number_to_read);
|
||||
}
|
||||
|
||||
|
||||
|
@ -101,7 +101,7 @@ public:
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -232,14 +232,14 @@ BlockInputStreams StorageStripeLog::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const size_t /*max_block_size*/,
|
||||
unsigned num_streams)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
||||
|
||||
check(column_names);
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
|
||||
NameSet column_names_set(column_names.begin(), column_names.end());
|
||||
|
||||
|
@ -32,7 +32,7 @@ public:
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -341,12 +341,12 @@ BlockInputStreams StorageTinyLog::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const size_t max_block_size,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
check(column_names);
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
return BlockInputStreams(1, std::make_shared<TinyLogBlockInputStream>(
|
||||
max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size));
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ public:
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
@ -33,11 +33,11 @@ BlockInputStreams StorageView::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const size_t /*max_block_size*/,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
BlockInputStreams res = InterpreterSelectWithUnionQuery(inner_query, context, column_names).executeWithMultipleStreams();
|
||||
|
||||
/// It's expected that the columns read from storage are not constant.
|
||||
|
@ -25,7 +25,7 @@ public:
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user