Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2018-08-21 02:22:30 +03:00
commit bcbcbcdbb1
123 changed files with 1027 additions and 401 deletions

View File

@ -151,7 +151,7 @@ endif ()
if (USE_INTERNAL_LLVM_LIBRARY)
# ld: unknown option: --color-diagnostics
if (APPLE AND COMPILER_GCC)
if (APPLE)
set (LINKER_SUPPORTS_COLOR_DIAGNOSTICS 0 CACHE INTERNAL "")
endif ()
add_subdirectory (llvm/llvm)

View File

@ -157,6 +157,7 @@ target_link_libraries (dbms
${RE2_LIBRARY}
${RE2_ST_LIBRARY}
${BTRIE_LIBRARIES}
${Boost_PROGRAM_OPTIONS_LIBRARY}
)
if (NOT USE_INTERNAL_RE2_LIBRARY)

View File

@ -0,0 +1,30 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionRetention.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
namespace DB
{
namespace
{
AggregateFunctionPtr createAggregateFunctionRetention(const std::string & name, const DataTypes & arguments, const Array & params)
{
assertNoParameters(name, params);
if (arguments.size() > AggregateFunctionRetentionData::max_events )
throw Exception("Too many event arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<AggregateFunctionRetention>(arguments);
}
}
void registerAggregateFunctionRetention(AggregateFunctionFactory & factory)
{
factory.registerFunction("retention", createAggregateFunctionRetention, AggregateFunctionFactory::CaseInsensitive);
}
}

View File

@ -0,0 +1,150 @@
#pragma once
#include <iostream>
#include <sstream>
#include <unordered_set>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/ArenaAllocator.h>
#include <Common/typeid_cast.h>
#include <ext/range.h>
#include <bitset>
#include <AggregateFunctions/IAggregateFunction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
}
struct AggregateFunctionRetentionData
{
static constexpr auto max_events = 32;
using Events = std::bitset<max_events>;
Events events;
void add(UInt8 event)
{
events.set(event);
}
void merge(const AggregateFunctionRetentionData & other)
{
events |= other.events;
}
void serialize(WriteBuffer & buf) const
{
UInt32 event_value = events.to_ulong();
writeBinary(event_value, buf);
}
void deserialize(ReadBuffer & buf)
{
UInt32 event_value;
readBinary(event_value, buf);
events = event_value;
}
};
/**
* The max size of events is 32, that's enough for retention analytics
*
* Usage:
* - retention(cond1, cond2, cond3, ....)
* - returns [cond1_flag, cond1_flag && cond2_flag, cond1_flag && cond3_flag, ...]
*/
class AggregateFunctionRetention final
: public IAggregateFunctionDataHelper<AggregateFunctionRetentionData, AggregateFunctionRetention>
{
private:
UInt8 events_size;
public:
String getName() const override
{
return "retention";
}
AggregateFunctionRetention(const DataTypes & arguments)
{
for (const auto i : ext::range(0, arguments.size()))
{
auto cond_arg = arguments[i].get();
if (!typeid_cast<const DataTypeUInt8 *>(cond_arg))
throw Exception{"Illegal type " + cond_arg->getName() + " of argument " + toString(i) + " of aggregate function "
+ getName() + ", must be UInt8",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
events_size = arguments.size();
}
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt8>());
}
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override
{
for (const auto i : ext::range(0, events_size))
{
auto event = static_cast<const ColumnVector<UInt8> *>(columns[i])->getData()[row_num];
if (event)
{
this->data(place).add(i);
break;
}
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).serialize(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).deserialize(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
auto & data_to = static_cast<ColumnArray &>(to).getData();
auto & offsets_to = static_cast<ColumnArray &>(to).getOffsets();
const bool first_flag = this->data(place).events.test(0);
data_to.insert(first_flag ? Field(static_cast<UInt64>(1)) : Field(static_cast<UInt64>(0)));
for (const auto i : ext::range(1, events_size))
{
if (first_flag && this->data(place).events.test(i))
data_to.insert(Field(static_cast<UInt64>(1)));
else
data_to.insert(Field(static_cast<UInt64>(0)));
}
offsets_to.push_back(offsets_to.size() == 0 ? events_size : offsets_to.back() + events_size);
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -34,6 +34,7 @@ void registerAggregateFunctionCombinatorMerge(AggregateFunctionCombinatorFactory
void registerAggregateFunctionCombinatorNull(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionHistogram(AggregateFunctionFactory & factory);
void registerAggregateFunctionRetention(AggregateFunctionFactory & factory);
void registerAggregateFunctions()
{
@ -59,6 +60,7 @@ void registerAggregateFunctions()
registerAggregateFunctionsBitwise(factory);
registerAggregateFunctionsMaxIntersections(factory);
registerAggregateFunctionHistogram(factory);
registerAggregateFunctionRetention(factory);
}
{

View File

@ -6,7 +6,7 @@
#include <memory>
#include <common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
namespace ProfileEvents

View File

@ -1,4 +1,4 @@
#include <Common/BackgroundSchedulePool.h>
#include "BackgroundSchedulePool.h"
#include <Common/MemoryTracker.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>

View File

@ -15,6 +15,20 @@ namespace ErrorCodes
}
static ColumnPtr castColumnWithDiagnostic(const ColumnWithTypeAndName & src_elem, const ColumnWithTypeAndName & res_elem, const Context & context)
{
try
{
return castColumn(src_elem, res_elem.type, context);
}
catch (Exception & e)
{
e.addMessage("while converting source column " + backQuoteIfNeed(src_elem.name) + " to destination column " + backQuoteIfNeed(res_elem.name));
throw;
}
}
ConvertingBlockInputStream::ConvertingBlockInputStream(
const Context & context_,
const BlockInputStreamPtr & input,
@ -69,7 +83,7 @@ ConvertingBlockInputStream::ConvertingBlockInputStream(
/// Check conversion by dry run CAST function.
castColumn(src_elem, res_elem.type, context);
castColumnWithDiagnostic(src_elem, res_elem, context);
}
}
@ -87,7 +101,7 @@ Block ConvertingBlockInputStream::readImpl()
const auto & src_elem = src.getByPosition(conversion[res_pos]);
auto & res_elem = res.getByPosition(res_pos);
ColumnPtr converted = castColumn(src_elem, res_elem.type, context);
ColumnPtr converted = castColumnWithDiagnostic(src_elem, res_elem, context);
if (src_elem.column->isColumnConst() && !res_elem.column->isColumnConst())
converted = converted->convertToFullColumnIfConst();

View File

@ -17,8 +17,9 @@ namespace ErrorCodes
}
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name)
: expression(expression_)
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_,
const String & filter_column_name, bool remove_filter)
: remove_filter(remove_filter), expression(expression_)
{
children.push_back(input);
@ -40,6 +41,9 @@ FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input
FilterDescription filter_description_check(*column_elem.column);
column_elem.column = column_elem.type->createColumnConst(header.rows(), UInt64(1));
}
if (remove_filter)
header.erase(filter_column_name);
}
@ -69,7 +73,7 @@ Block FilterBlockInputStream::readImpl()
Block res;
if (constant_filter_description.always_false)
return res;
return removeFilterIfNeed(std::move(res));
/// Until non-empty block after filtering or end of stream.
while (1)
@ -81,7 +85,7 @@ Block FilterBlockInputStream::readImpl()
expression->execute(res);
if (constant_filter_description.always_true)
return res;
return removeFilterIfNeed(std::move(res));
size_t columns = res.columns();
ColumnPtr column = res.safeGetByPosition(filter_column).column;
@ -100,7 +104,7 @@ Block FilterBlockInputStream::readImpl()
}
if (constant_filter_description.always_true)
return res;
return removeFilterIfNeed(std::move(res));
FilterDescription filter_and_holder(*column);
@ -142,7 +146,7 @@ Block FilterBlockInputStream::readImpl()
/// Replace the column with the filter by a constant.
res.safeGetByPosition(filter_column).column = res.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, UInt64(1));
/// No need to touch the rest of the columns.
return res;
return removeFilterIfNeed(std::move(res));
}
/// Filter the rest of the columns.
@ -170,9 +174,18 @@ Block FilterBlockInputStream::readImpl()
current_column.column = current_column.column->filter(*filter_and_holder.data, -1);
}
return res;
return removeFilterIfNeed(std::move(res));
}
}
Block FilterBlockInputStream::removeFilterIfNeed(Block && block)
{
if (block && remove_filter)
block.erase(static_cast<size_t>(filter_column));
return std::move(block);
}
}

View File

@ -20,7 +20,8 @@ private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public:
FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_);
FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_,
const String & filter_column_name_, bool remove_filter = false);
String getName() const override;
Block getTotals() override;
@ -29,12 +30,16 @@ public:
protected:
Block readImpl() override;
bool remove_filter;
private:
ExpressionActionsPtr expression;
Block header;
ssize_t filter_column;
ConstantFilterDescription constant_filter_description;
Block removeFilterIfNeed(Block && block);
};
}

View File

@ -139,7 +139,7 @@ void RemoteBlockInputStream::sendExternalTables()
for (const auto & table : external_tables)
{
StoragePtr cur = table.second;
QueryProcessingStage::Enum read_from_table_stage = QueryProcessingStage::Complete;
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(context);
BlockInputStreams input = cur->read(cur->getColumns().getNamesOfPhysical(), {}, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
if (input.size() == 0)

View File

@ -47,7 +47,7 @@ try
Names column_names;
column_names.push_back("number");
QueryProcessingStage::Enum stage;
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreamPtr in;
in = table->read(column_names, {}, context, stage, 8192, 1)[0];

View File

@ -52,7 +52,7 @@ try
Names column_names;
column_names.push_back("number");
QueryProcessingStage::Enum stage;
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreamPtr in = table->read(column_names, {}, context, stage, 8192, 1)[0];
in = std::make_shared<FilterBlockInputStream>(in, expression, "equals(modulo(number, 3), 1)");

View File

@ -34,7 +34,7 @@ try
StoragePtr table = context.getTable("default", "hits6");
QueryProcessingStage::Enum stage;
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreams streams = table->read(column_names, {}, context, stage, settings.max_block_size, settings.max_threads);
for (size_t i = 0, size = streams.size(); i < size; ++i)

View File

@ -1,21 +1,18 @@
#include <map>
#include <set>
#include <boost/functional/hash/hash.hpp>
#include <Poco/Mutex.h>
#include <Poco/File.h>
#include <Poco/UUID.h>
#include <Poco/Net/IPAddress.h>
#include <common/logger_useful.h>
#include <pcg_random.hpp>
#include <Common/Macros.h>
#include <Common/escapeForFileName.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
#include <Formats/FormatFactory.h>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>

View File

@ -1079,10 +1079,25 @@ void ExpressionActionsChain::finalize()
for (int i = static_cast<int>(steps.size()) - 1; i >= 0; --i)
{
Names required_output = steps[i].required_output;
std::unordered_map<String, size_t> required_output_indexes;
for (size_t j = 0; j < required_output.size(); ++j)
required_output_indexes[required_output[j]] = j;
auto & can_remove_required_output = steps[i].can_remove_required_output;
if (i + 1 < static_cast<int>(steps.size()))
{
const NameSet & additional_input = steps[i + 1].additional_input;
for (const auto & it : steps[i + 1].actions->getRequiredColumnsWithTypes())
required_output.push_back(it.name);
{
if (additional_input.count(it.name) == 0)
{
auto iter = required_output_indexes.find(it.name);
if (iter == required_output_indexes.end())
required_output.push_back(it.name);
else if (!can_remove_required_output.empty())
can_remove_required_output[iter->second] = false;
}
}
}
steps[i].actions->finalize(required_output);
}

View File

@ -241,7 +241,14 @@ struct ExpressionActionsChain
struct Step
{
ExpressionActionsPtr actions;
/// Columns were added to the block before current step in addition to prev step output.
NameSet additional_input;
/// Columns which are required in the result of current step.
Names required_output;
/// True if column from required_output is needed only for current step and not used in next actions
/// (and can be removed from block). Example: filter column for where actions.
/// If not empty, has the same size with required_output; is filled in finalize().
std::vector<bool> can_remove_required_output;
Step(const ExpressionActionsPtr & actions_ = nullptr, const Names & required_output_ = Names())
: actions(actions_), required_output(required_output_) {}

View File

@ -273,8 +273,8 @@ ExpressionAnalyzer::ExpressionAnalyzer(
/// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns();
/// Push the predicate expression down to the sub-queries.
rewrite_sub_queries = PredicateExpressionsOptimizer(select_query, settings).optimize();
/// Push the predicate expression down to the subqueries.
rewrite_subqueries = PredicateExpressionsOptimizer(select_query, settings).optimize();
/// Delete the unnecessary from `source_columns` list. Create `unknown_required_source_columns`. Form `columns_added_by_join`.
collectUsedColumns();
@ -2733,6 +2733,67 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
return true;
}
bool ExpressionAnalyzer::appendPrewhere(ExpressionActionsChain & chain, bool only_types)
{
assertSelect();
if (!select_query->prewhere_expression)
return false;
initChain(chain, source_columns);
auto & step = chain.getLastStep();
getRootActions(select_query->prewhere_expression, only_types, false, step.actions);
String prewhere_column_name = select_query->prewhere_expression->getColumnName();
step.required_output.push_back(prewhere_column_name);
step.can_remove_required_output.push_back(true);
{
/// Remove unused source_columns from prewhere actions.
auto tmp_actions = std::make_shared<ExpressionActions>(source_columns, settings);
getRootActions(select_query->prewhere_expression, only_types, false, tmp_actions);
tmp_actions->finalize({prewhere_column_name});
auto required_columns = tmp_actions->getRequiredColumns();
NameSet required_source_columns(required_columns.begin(), required_columns.end());
auto names = step.actions->getSampleBlock().getNames();
NameSet name_set(names.begin(), names.end());
for (const auto & column : source_columns)
if (required_source_columns.count(column.name) == 0)
name_set.erase(column.name);
Names required_output(name_set.begin(), name_set.end());
step.actions->finalize(required_output);
}
{
/// Add empty action with input = {prewhere actions output} + {unused source columns}
/// Reasons:
/// 1. Remove remove source columns which are used only in prewhere actions during prewhere actions execution.
/// Example: select A prewhere B > 0. B can be removed at prewhere step.
/// 2. Store side columns which were calculated during prewhere actions execution if they are used.
/// Example: select F(A) prewhere F(A) > 0. F(A) can be saved from prewhere step.
/// 3. Check if we can remove filter column at prewhere step. If we can, action will store single REMOVE_COLUMN.
ColumnsWithTypeAndName columns = step.actions->getSampleBlock().getColumnsWithTypeAndName();
auto required_columns = step.actions->getRequiredColumns();
NameSet prewhere_input_names(required_columns.begin(), required_columns.end());
NameSet unused_source_columns;
for (const auto & column : source_columns)
{
if (prewhere_input_names.count(column.name) == 0)
{
columns.emplace_back(column.type, column.name);
unused_source_columns.emplace(column.name);
}
}
chain.steps.emplace_back(std::make_shared<ExpressionActions>(std::move(columns), settings));
chain.steps.back().additional_input = std::move(unused_source_columns);
}
return true;
}
bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_types)
{
@ -2745,6 +2806,8 @@ bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_t
ExpressionActionsChain::Step & step = chain.steps.back();
step.required_output.push_back(select_query->where_expression->getColumnName());
step.can_remove_required_output = {true};
getRootActions(select_query->where_expression, only_types, false, step.actions);
return true;

View File

@ -152,6 +152,8 @@ public:
/// Before aggregation:
bool appendArrayJoin(ExpressionActionsChain & chain, bool only_types);
bool appendJoin(ExpressionActionsChain & chain, bool only_types);
/// remove_filter is set in ExpressionActionsChain::finalize();
bool appendPrewhere(ExpressionActionsChain & chain, bool only_types);
bool appendWhere(ExpressionActionsChain & chain, bool only_types);
bool appendGroupBy(ExpressionActionsChain & chain, bool only_types);
void appendAggregateFunctionsArguments(ExpressionActionsChain & chain, bool only_types);
@ -189,7 +191,7 @@ public:
/// Create Set-s that we can from IN section to use the index on them.
void makeSetsForIndex();
bool isRewriteSubQueriesPredicate() { return rewrite_sub_queries; }
bool isRewriteSubqueriesPredicate() { return rewrite_subqueries; }
private:
ASTPtr ast;
@ -303,7 +305,7 @@ private:
size_t external_table_id = 1;
/// Predicate optimizer overrides the sub queries
bool rewrite_sub_queries = false;
bool rewrite_subqueries = false;
/** Remove all unnecessary columns from the list of all available columns of the table (`columns`).
* At the same time, form a set of unknown columns (`unknown_required_source_columns`),

View File

@ -198,7 +198,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (!context.tryGetExternalTable(it.first))
context.addExternalTable(it.first, it.second);
if (query_analyzer->isRewriteSubQueriesPredicate())
if (query_analyzer->isRewriteSubqueriesPredicate())
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
table_expression, getSubqueryContext(context), required_columns, QueryProcessingStage::Complete, subquery_depth + 1, only_analyze);
}
@ -293,9 +293,37 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
* throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries.
*/
bool has_prewhere = false;
bool has_where = false;
size_t where_step_num;
auto finalizeChain = [&](ExpressionActionsChain & chain)
{
chain.finalize();
if (has_prewhere)
res.prewhere_info->remove_prewhere_column = chain.steps.at(0).can_remove_required_output.at(0);
if (has_where)
res.remove_where_filter = chain.steps.at(where_step_num).can_remove_required_output.at(0);
has_prewhere = has_where = false;
chain.clear();
};
{
ExpressionActionsChain chain;
if (query_analyzer->appendPrewhere(chain, !res.first_stage))
{
has_prewhere = true;
res.prewhere_info = std::make_shared<PrewhereInfo>(
chain.steps.front().actions, query.prewhere_expression->getColumnName());
chain.addStep();
}
res.need_aggregate = query_analyzer->hasAggregation();
query_analyzer->appendArrayJoin(chain, dry_run || !res.first_stage);
@ -309,7 +337,8 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
if (query_analyzer->appendWhere(chain, dry_run || !res.first_stage))
{
res.has_where = true;
where_step_num = chain.steps.size() - 1;
has_where = res.has_where = true;
res.before_where = chain.getLastActions();
chain.addStep();
}
@ -320,8 +349,7 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
query_analyzer->appendAggregateFunctionsArguments(chain, dry_run || !res.first_stage);
res.before_aggregation = chain.getLastActions();
chain.finalize();
chain.clear();
finalizeChain(chain);
if (query_analyzer->appendHaving(chain, dry_run || !res.second_stage))
{
@ -348,8 +376,7 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
query_analyzer->appendProjectResult(chain);
res.final_projection = chain.getLastActions();
chain.finalize();
chain.clear();
finalizeChain(chain);
}
/// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys).
@ -376,31 +403,65 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
* then perform the remaining operations with one resulting stream.
*/
const Settings & settings = context.getSettingsRef();
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
/// PREWHERE optimization
if (storage)
{
if (!dry_run)
from_stage = storage->getQueryProcessingStage(context);
query_analyzer->makeSetsForIndex();
auto optimize_prewhere = [&](auto & merge_tree)
{
SelectQueryInfo query_info;
query_info.query = query_ptr;
query_info.sets = query_analyzer->getPreparedSets();
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if (settings.optimize_move_to_prewhere && query.where_expression && !query.prewhere_expression && !query.final())
MergeTreeWhereOptimizer{query_info, context, merge_tree.getData(), query_analyzer->getRequiredSourceColumns(), log};
};
if (const StorageMergeTree * merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get()))
optimize_prewhere(*merge_tree);
else if (const StorageReplicatedMergeTree * merge_tree = dynamic_cast<const StorageReplicatedMergeTree *>(storage.get()))
optimize_prewhere(*merge_tree);
}
AnalysisResult expressions;
if (dry_run)
{
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(source_header));
expressions = analyzeExpressions(QueryProcessingStage::FetchColumns, true);
if (expressions.prewhere_info)
pipeline.streams.back() = std::make_shared<FilterBlockInputStream>(
pipeline.streams.back(), expressions.prewhere_info->prewhere_actions,
expressions.prewhere_info->prewhere_column_name, expressions.prewhere_info->remove_prewhere_column
);
}
else
{
if (input)
pipeline.streams.push_back(input);
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
QueryProcessingStage::Enum from_stage = executeFetchColumns(pipeline);
expressions = analyzeExpressions(from_stage, false);
if (from_stage == QueryProcessingStage::WithMergeableState && to_stage == QueryProcessingStage::WithMergeableState)
if (from_stage == QueryProcessingStage::WithMergeableState &&
to_stage == QueryProcessingStage::WithMergeableState)
throw Exception("Distributed on Distributed is not supported", ErrorCodes::NOT_IMPLEMENTED);
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
executeFetchColumns(from_stage, pipeline, expressions.prewhere_info);
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
expressions = analyzeExpressions(from_stage, false);
}
const Settings & settings = context.getSettingsRef();
if (to_stage > QueryProcessingStage::FetchColumns)
{
/// Now we will compose block streams that perform the necessary actions.
@ -433,7 +494,7 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
}
if (expressions.has_where)
executeWhere(pipeline, expressions.before_where);
executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter);
if (expressions.need_aggregate)
executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);
@ -562,7 +623,8 @@ static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, siz
}
}
QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline & pipeline)
void InterpreterSelectQuery::executeFetchColumns(
QueryProcessingStage::Enum processing_stage, Pipeline & pipeline, const PrewhereInfoPtr & prewhere_info)
{
/// Actions to calculate ALIAS if required.
ExpressionActionsPtr alias_actions;
@ -652,8 +714,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
max_streams = 1;
}
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
/// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery or prepared input?
if (!pipeline.streams.empty())
{
@ -685,32 +745,24 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
if (max_streams > 1 && !is_remote)
max_streams *= settings.max_streams_to_max_threads_ratio;
query_analyzer->makeSetsForIndex();
SelectQueryInfo query_info;
query_info.query = query_ptr;
query_info.sets = query_analyzer->getPreparedSets();
query_info.prewhere_info = prewhere_info;
/// PREWHERE optimization
{
auto optimize_prewhere = [&](auto & merge_tree)
{
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if (settings.optimize_move_to_prewhere && query.where_expression && !query.prewhere_expression && !query.final())
MergeTreeWhereOptimizer{query_info, context, merge_tree.getData(), required_columns, log};
};
if (const StorageMergeTree * merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get()))
optimize_prewhere(*merge_tree);
else if (const StorageReplicatedMergeTree * merge_tree = dynamic_cast<const StorageReplicatedMergeTree *>(storage.get()))
optimize_prewhere(*merge_tree);
}
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
pipeline.streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams);
if (pipeline.streams.empty())
{
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns)));
if (query_info.prewhere_info)
pipeline.streams.back() = std::make_shared<FilterBlockInputStream>(
pipeline.streams.back(), prewhere_info->prewhere_actions,
prewhere_info->prewhere_column_name, prewhere_info->remove_prewhere_column
);
}
pipeline.transform([&](auto & stream)
{
stream->addTableLock(table_lock);
@ -755,23 +807,21 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR);
/// Aliases in table declaration.
if (from_stage == QueryProcessingStage::FetchColumns && alias_actions)
if (processing_stage == QueryProcessingStage::FetchColumns && alias_actions)
{
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<ExpressionBlockInputStream>(stream, alias_actions);
});
}
return from_stage;
}
void InterpreterSelectQuery::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression)
void InterpreterSelectQuery::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter)
{
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<FilterBlockInputStream>(stream, expression, query.where_expression->getColumnName());
stream = std::make_shared<FilterBlockInputStream>(stream, expression, query.where_expression->getColumnName(), remove_fiter);
});
}

View File

@ -8,6 +8,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/SelectQueryInfo.h>
namespace Poco { class Logger; }
@ -137,6 +138,8 @@ private:
bool has_order_by = false;
bool has_limit_by = false;
bool remove_where_filter = false;
ExpressionActionsPtr before_join; /// including JOIN
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
@ -154,6 +157,7 @@ private:
bool second_stage = false;
SubqueriesForSets subqueries_for_sets;
PrewhereInfoPtr prewhere_info;
};
AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage, bool dry_run);
@ -168,10 +172,9 @@ private:
/// dry_run - don't read from table, use empty header block instead.
void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run);
/// Fetch data from the table. Returns the stage to which the query was processed in Storage.
QueryProcessingStage::Enum executeFetchColumns(Pipeline & pipeline);
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, Pipeline & pipeline, const PrewhereInfoPtr & prewhere_info);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row);

View File

@ -24,20 +24,20 @@ bool PredicateExpressionsOptimizer::optimize()
if (!settings.enable_optimize_predicate_expression || !ast_select || !ast_select->tables)
return false;
SubQueriesProjectionColumns all_subquery_projection_columns;
SubqueriesProjectionColumns all_subquery_projection_columns;
getAllSubqueryProjectionColumns(ast_select->tables.get(), all_subquery_projection_columns);
bool is_rewrite_sub_queries = false;
bool is_rewrite_subqueries = false;
if (!all_subquery_projection_columns.empty())
{
is_rewrite_sub_queries |= optimizeImpl(ast_select->where_expression, all_subquery_projection_columns, false);
is_rewrite_sub_queries |= optimizeImpl(ast_select->prewhere_expression, all_subquery_projection_columns, true);
is_rewrite_subqueries |= optimizeImpl(ast_select->where_expression, all_subquery_projection_columns, false);
is_rewrite_subqueries |= optimizeImpl(ast_select->prewhere_expression, all_subquery_projection_columns, true);
}
return is_rewrite_sub_queries;
return is_rewrite_subqueries;
}
bool PredicateExpressionsOptimizer::optimizeImpl(
ASTPtr & outer_expression, SubQueriesProjectionColumns & sub_queries_projection_columns, bool is_prewhere)
ASTPtr & outer_expression, SubqueriesProjectionColumns & subqueries_projection_columns, bool is_prewhere)
{
/// split predicate with `and`
PredicateExpressions outer_predicate_expressions = splitConjunctionPredicate(outer_expression);
@ -49,7 +49,7 @@ bool PredicateExpressionsOptimizer::optimizeImpl(
getExpressionDependentColumns(outer_predicate, outer_predicate_dependent);
/// TODO: remove origin expression
for (const auto & subquery_projection_columns : sub_queries_projection_columns)
for (const auto & subquery_projection_columns : subqueries_projection_columns)
{
auto subquery = static_cast<ASTSelectQuery *>(subquery_projection_columns.first);
const ProjectionsWithAliases projection_columns = subquery_projection_columns.second;
@ -168,7 +168,7 @@ bool PredicateExpressionsOptimizer::isAggregateFunction(ASTPtr & node)
return false;
}
void PredicateExpressionsOptimizer::getAllSubqueryProjectionColumns(IAST * node, SubQueriesProjectionColumns & all_subquery_projection_columns)
void PredicateExpressionsOptimizer::getAllSubqueryProjectionColumns(IAST * node, SubqueriesProjectionColumns & all_subquery_projection_columns)
{
if (auto ast_subquery = typeid_cast<ASTSubquery *>(node))
{
@ -221,7 +221,7 @@ bool PredicateExpressionsOptimizer::optimizeExpression(const ASTPtr & outer_expr
return true;
}
void PredicateExpressionsOptimizer::getSubqueryProjectionColumns(IAST * subquery, SubQueriesProjectionColumns & all_subquery_projection_columns, ASTs & output_projections)
void PredicateExpressionsOptimizer::getSubqueryProjectionColumns(IAST * subquery, SubqueriesProjectionColumns & all_subquery_projection_columns, ASTs & output_projections)
{
if (auto * with_union_subquery = typeid_cast<ASTSelectWithUnionQuery *>(subquery))
for (auto & select : with_union_subquery->list_of_selects->children)

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
using PredicateExpressions = std::vector<ASTPtr>;
using ProjectionWithAlias = std::pair<ASTPtr, String>;
using ProjectionsWithAliases = std::vector<ProjectionWithAlias>;
using SubQueriesProjectionColumns = std::map<IAST *, ProjectionsWithAliases>;
using SubqueriesProjectionColumns = std::map<IAST *, ProjectionsWithAliases>;
/** This class provides functions for Push-Down predicate expressions
@ -61,7 +61,7 @@ private:
bool optimizeExpression(const ASTPtr & outer_expression, ASTPtr & subquery_expression, ASTSelectQuery * subquery);
bool optimizeImpl(ASTPtr & outer_expression, SubQueriesProjectionColumns & sub_queries_projection_columns, bool is_prewhere);
bool optimizeImpl(ASTPtr & outer_expression, SubqueriesProjectionColumns & subqueries_projection_columns, bool is_prewhere);
bool cannotPushDownOuterPredicate(
const ProjectionsWithAliases & subquery_projection_columns, ASTSelectQuery * subquery,
@ -72,9 +72,9 @@ private:
ASTPtr & inner_predicate);
void getAllSubqueryProjectionColumns(IAST * node, SubQueriesProjectionColumns & all_subquery_projection_columns);
void getAllSubqueryProjectionColumns(IAST * node, SubqueriesProjectionColumns & all_subquery_projection_columns);
void getSubqueryProjectionColumns(IAST * subquery, SubQueriesProjectionColumns & all_subquery_projection_columns, ASTs & output_projections);
void getSubqueryProjectionColumns(IAST * subquery, SubqueriesProjectionColumns & all_subquery_projection_columns, ASTs & output_projections);
};
}

View File

@ -13,8 +13,8 @@ bool ParserUnionQueryElement::parseImpl(Pos & pos, ASTPtr & node, Expected & exp
if (!ParserSubquery().parse(pos, node, expected) && !ParserSelectQuery().parse(pos, node, expected))
return false;
if (auto * ast_sub_query = typeid_cast<ASTSubquery *>(node.get()))
node = ast_sub_query->children.at(0);
if (auto * ast_subquery = typeid_cast<ASTSubquery *>(node.get()))
node = ast_subquery->children.at(0);
return true;
}

View File

@ -157,6 +157,11 @@ public:
return res;
}
/** Returns stage to which query is going to be processed in read() function.
* (Normally, the function only reads the columns from the list, but in other cases,
* for example, the request can be partially processed on a remote server.)
*/
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const { return QueryProcessingStage::FetchColumns; }
/** Read a set of columns from the table.
* Accepts a list of columns to read, as well as a description of the query,
@ -164,9 +169,7 @@ public:
* (indexes, locks, etc.)
* Returns a stream with which you can read data sequentially
* or multiple streams for parallel data reading.
* The `processed_stage` info is also written to what stage the request was processed.
* (Normally, the function only reads the columns from the list, but in other cases,
* for example, the request can be partially processed on a remote server.)
* The `processed_stage` must be the result of getQueryProcessingStage() function.
*
* context contains settings for one query.
* Usually Storage does not care about these settings, since they are used in the interpreter.
@ -181,7 +184,7 @@ public:
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
@ -344,6 +347,20 @@ protected:
using ITableDeclaration::ITableDeclaration;
using std::enable_shared_from_this<IStorage>::shared_from_this;
void checkQueryProcessingStage(QueryProcessingStage::Enum processed_stage, const Context & context)
{
auto expected_stage = getQueryProcessingStage(context);
checkQueryProcessingStage(processed_stage, expected_stage);
}
void checkQueryProcessingStage(QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum expected_stage)
{
if (processed_stage != expected_stage)
throw Exception("Unexpected query processing stage for storage " + getName() +
": expected " + QueryProcessingStage::toString(expected_stage) +
", got " + QueryProcessingStage::toString(processed_stage), ErrorCodes::LOGICAL_ERROR);
}
private:
friend class TableStructureReadLock;

View File

@ -269,12 +269,12 @@ BlockInputStreams StorageKafka::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
checkQueryProcessingStage(processed_stage, context);
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
if (num_consumers == 0)
return BlockInputStreams();

View File

@ -39,7 +39,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -6,6 +6,7 @@
#include <Columns/ColumnArray.h>
#include <Common/typeid_cast.h>
#include <ext/range.h>
#include <DataTypes/DataTypeNothing.h>
namespace DB
@ -20,8 +21,7 @@ namespace ErrorCodes
MergeTreeBaseBlockInputStream::MergeTreeBaseBlockInputStream(
MergeTreeData & storage,
const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column_name,
const PrewhereInfoPtr & prewhere_info,
UInt64 max_block_size_rows,
UInt64 preferred_block_size_bytes,
UInt64 preferred_max_column_in_block_size_bytes,
@ -32,8 +32,7 @@ MergeTreeBaseBlockInputStream::MergeTreeBaseBlockInputStream(
const Names & virt_column_names)
:
storage(storage),
prewhere_actions(prewhere_actions),
prewhere_column_name(prewhere_column_name),
prewhere_info(prewhere_info),
max_block_size_rows(max_block_size_rows),
preferred_block_size_bytes(preferred_block_size_bytes),
preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes),
@ -117,20 +116,20 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (!task->range_reader.isInitialized())
{
if (prewhere_actions)
if (prewhere_info)
{
if (reader->getColumns().empty())
{
task->range_reader = MergeTreeRangeReader(
pre_reader.get(), index_granularity, nullptr, prewhere_actions,
&prewhere_column_name, &task->ordered_names,
pre_reader.get(), index_granularity, nullptr, prewhere_info->prewhere_actions,
&prewhere_info->prewhere_column_name, &task->ordered_names,
task->should_reorder, task->remove_prewhere_column, true);
}
else
{
task->pre_range_reader = MergeTreeRangeReader(
pre_reader.get(), index_granularity, nullptr, prewhere_actions,
&prewhere_column_name, &task->ordered_names,
pre_reader.get(), index_granularity, nullptr, prewhere_info->prewhere_actions,
&prewhere_info->prewhere_column_name, &task->ordered_names,
task->should_reorder, task->remove_prewhere_column, false);
task->range_reader = MergeTreeRangeReader(
@ -141,7 +140,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
else
{
task->range_reader = MergeTreeRangeReader(
reader.get(), index_granularity, nullptr, prewhere_actions,
reader.get(), index_granularity, nullptr, nullptr,
nullptr, &task->ordered_names, task->should_reorder, false, true);
}
}
@ -167,10 +166,10 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
task->size_predictor->update(read_result.block);
}
if (read_result.block && prewhere_actions && !task->remove_prewhere_column)
if (read_result.block && prewhere_info && !task->remove_prewhere_column)
{
/// Convert const column to full here because it's cheaper to filter const column than full.
auto & column = read_result.block.getByName(prewhere_column_name);
auto & column = read_result.block.getByName(prewhere_info->prewhere_column_name);
column.column = column.column->convertToFullColumnIfConst();
}
@ -215,6 +214,20 @@ void MergeTreeBaseBlockInputStream::injectVirtualColumns(Block & block) const
}
void MergeTreeBaseBlockInputStream::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info)
{
if (prewhere_info)
{
prewhere_info->prewhere_actions->execute(block);
if (prewhere_info->remove_prewhere_column)
block.erase(prewhere_info->prewhere_column_name);
if (!block)
block.insert({nullptr, std::make_shared<DataTypeNothing>(), "_nothing"});
}
}
MergeTreeBaseBlockInputStream::~MergeTreeBaseBlockInputStream() = default;
}

View File

@ -3,6 +3,7 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
@ -18,8 +19,7 @@ class MergeTreeBaseBlockInputStream : public IProfilingBlockInputStream
public:
MergeTreeBaseBlockInputStream(
MergeTreeData & storage,
const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
UInt64 max_block_size_rows,
UInt64 preferred_block_size_bytes,
UInt64 preferred_max_column_in_block_size_bytes,
@ -31,8 +31,10 @@ public:
~MergeTreeBaseBlockInputStream() override;
static void executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info);
protected:
Block readImpl() override final;
Block readImpl() final;
/// Creates new this->task, and initilizes readers
virtual bool getNewTask() = 0;
@ -47,8 +49,7 @@ protected:
protected:
MergeTreeData & storage;
ExpressionActionsPtr prewhere_actions;
String prewhere_column_name;
PrewhereInfoPtr prewhere_info;
UInt64 max_block_size_rows;
UInt64 preferred_block_size_bytes;

View File

@ -24,8 +24,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
Names column_names,
const MarkRanges & mark_ranges_,
bool use_uncompressed_cache_,
ExpressionActionsPtr prewhere_actions_,
String prewhere_column_,
const PrewhereInfoPtr & prewhere_info,
bool check_columns,
size_t min_bytes_to_use_direct_io_,
size_t max_read_buffer_size_,
@ -34,10 +33,10 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
size_t part_index_in_query_,
bool quiet)
:
MergeTreeBaseBlockInputStream{storage_, prewhere_actions_, prewhere_column_, max_block_size_rows_,
MergeTreeBaseBlockInputStream{storage_, prewhere_info, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names},
ordered_names{column_names},
required_columns{column_names},
data_part{owned_data_part_},
part_columns_lock(data_part->columns_lock),
all_mark_ranges(mark_ranges_),
@ -61,7 +60,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
addTotalRowsApprox(total_rows);
header = storage.getSampleBlockForColumns(ordered_names);
header = storage.getSampleBlockForColumns(required_columns);
/// Types may be different during ALTER (when this stream is used to perform an ALTER).
/// NOTE: We may use similar code to implement non blocking ALTERs.
@ -79,6 +78,9 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
}
injectVirtualColumns(header);
executePrewhereActions(header, prewhere_info);
ordered_names = getHeader().getNames();
}
@ -99,15 +101,15 @@ try
}
is_first_task = false;
Names pre_column_names, column_names = ordered_names;
bool remove_prewhere_column = false;
Names pre_column_names;
Names column_names = required_columns;
/// inject columns required for defaults evaluation
bool should_reorder = !injectRequiredColumns(storage, data_part, column_names).empty();
if (prewhere_actions)
if (prewhere_info)
{
pre_column_names = prewhere_actions->getRequiredColumns();
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);
@ -117,9 +119,6 @@ try
should_reorder = true;
const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
/// If the expression in PREWHERE is not a column of the table, you do not need to output a column with it
/// (from storage expect to receive only the columns of the table).
remove_prewhere_column = !pre_name_set.count(prewhere_column_name);
Names post_column_names;
for (const auto & name : column_names)
@ -159,9 +158,9 @@ try
auto size_predictor = (preferred_block_size_bytes == 0) ? nullptr
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, data_part->storage.getSampleBlock());
task = std::make_unique<MergeTreeReadTask>(data_part, remaining_mark_ranges, part_index_in_query, ordered_names,
column_name_set, columns, pre_columns, remove_prewhere_column, should_reorder,
std::move(size_predictor));
task = std::make_unique<MergeTreeReadTask>(
data_part, remaining_mark_ranges, part_index_in_query, ordered_names, column_name_set, columns, pre_columns,
prewhere_info && prewhere_info->remove_prewhere_column, should_reorder, std::move(size_predictor));
if (!reader)
{
@ -175,7 +174,7 @@ try
owned_mark_cache.get(), save_marks_in_cache, storage,
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
if (prewhere_actions)
if (prewhere_info)
pre_reader = std::make_unique<MergeTreeReader>(
path, data_part, pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), save_marks_in_cache, storage,

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
@ -24,8 +25,7 @@ public:
Names column_names,
const MarkRanges & mark_ranges,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
String prewhere_column,
const PrewhereInfoPtr & prewhere_info,
bool check_columns,
size_t min_bytes_to_use_direct_io,
size_t max_read_buffer_size,
@ -51,6 +51,8 @@ private:
Block header;
/// Used by Task
Names required_columns;
/// Names from header. Used in order to order columns in read blocks.
Names ordered_names;
NameSet column_name_set;
NamesAndTypesList columns;

View File

@ -1208,7 +1208,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
MarkRanges ranges{MarkRange(0, part->marks_count)};
BlockInputStreamPtr part_in = std::make_shared<MergeTreeBlockInputStream>(
*this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, expression->getRequiredColumns(), ranges,
false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
false, nullptr, false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
auto compression_settings = this->context.chooseCompressionSettings(
part->bytes_on_disk,

View File

@ -572,6 +572,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes()) is locked after part->columns_lock
/// (which is locked in shared mode when input streams are created) and when inserting new data
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
auto compression_settings = data.context.chooseCompressionSettings(
merge_entry->total_size_bytes_compressed,
static_cast<double> (merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
String rows_sources_file_path;
std::unique_ptr<WriteBuffer> rows_sources_uncompressed_write_buf;
std::unique_ptr<WriteBuffer> rows_sources_write_buf;
@ -603,7 +612,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
{
auto input = std::make_unique<MergeTreeBlockInputStream>(
data, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, part->marks_count)),
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
false, nullptr, true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback(MergeProgressCallback(
merge_entry, sum_input_rows_upper_bound, column_sizes, watch_prev_elapsed, merge_alg));
@ -674,10 +683,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
if (deduplicate)
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, SizeLimits(), 0 /*limit_hint*/, Names());
auto compression_settings = data.context.chooseCompressionSettings(
merge_entry->total_size_bytes_compressed,
static_cast<double> (merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
MergedBlockOutputStream to{
data, new_part_tmp_path, merging_columns, compression_settings, merged_column_to_size, aio_threshold};
@ -747,7 +752,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
{
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->marks_count)},
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
false, nullptr, true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
column_part_stream->setProgressCallback(MergeProgressCallbackVerticalStep(
merge_entry, sum_input_rows_exact, column_sizes, column_name, watch_prev_elapsed));
@ -969,6 +974,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
String new_part_tmp_path = new_data_part->getFullPath();
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes()) is locked after part->columns_lock
/// (which is locked in shared mode when input streams are created) and when inserting new data
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
auto compression_settings = context.chooseCompressionSettings(
source_part->bytes_on_disk,
static_cast<double>(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes());
auto in = createInputStreamWithMutatedData(storage_from_source_part, commands, context_for_reading);
if (data.hasPrimaryKey())
@ -979,10 +993,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
auto compression_settings = context.chooseCompressionSettings(
source_part->bytes_on_disk,
static_cast<double>(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes());
MergedBlockOutputStream out(data, new_part_tmp_path, all_columns, compression_settings);
MergeTreeDataPart::MinMaxIndex minmax_idx;

View File

@ -135,13 +135,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
const Names & column_names_to_return,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned num_streams,
Int64 max_block_number_to_read) const
{
return readFromParts(
data.getDataPartsVector(), column_names_to_return, query_info, context, processed_stage,
data.getDataPartsVector(), column_names_to_return, query_info, context,
max_block_size, num_streams, max_block_number_to_read);
}
@ -150,7 +149,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
const Names & column_names_to_return,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned num_streams,
Int64 max_block_number_to_read) const
@ -207,7 +205,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
std::multiset<String> part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
data.check(real_column_names);
processed_stage = QueryProcessingStage::FetchColumns;
const Settings & settings = context.getSettingsRef();
Names primary_sort_columns = data.getPrimarySortColumns();
@ -510,23 +507,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
LOG_DEBUG(log, "MinMax index condition: " << minmax_idx_condition->toString());
/// PREWHERE
ExpressionActionsPtr prewhere_actions;
String prewhere_column;
if (select.prewhere_expression)
{
ExpressionAnalyzer analyzer(select.prewhere_expression, context, nullptr, available_real_columns);
prewhere_actions = analyzer.getActions(false);
prewhere_column = select.prewhere_expression->getColumnName();
SubqueriesForSets prewhere_subqueries = analyzer.getSubqueriesForSets();
/** Compute the subqueries right now.
* NOTE Disadvantage - these calculations do not fit into the query execution pipeline.
* They are done before the execution of the pipeline; they can not be interrupted; during the computation, packets of progress are not sent.
*/
if (!prewhere_subqueries.empty())
CreatingSetsBlockInputStream(std::make_shared<NullBlockInputStream>(Block()), prewhere_subqueries,
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode)).read();
}
RangesInDataParts parts_with_ranges;
@ -583,8 +566,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
column_names_to_read,
max_block_size,
settings.use_uncompressed_cache,
prewhere_actions,
prewhere_column,
query_info.prewhere_info,
virt_column_names,
settings);
}
@ -596,8 +578,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
column_names_to_read,
max_block_size,
settings.use_uncompressed_cache,
prewhere_actions,
prewhere_column,
query_info.prewhere_info,
virt_column_names,
settings);
}
@ -622,8 +603,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
const Names & column_names,
size_t max_block_size,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
const Names & virt_columns,
const Settings & settings) const
{
@ -658,7 +638,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size());
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
num_streams, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_actions, prewhere_column, true,
num_streams, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_info, true,
column_names, MergeTreeReadPool::BackoffSettings(settings), settings.preferred_block_size_bytes, false);
/// Let's estimate total number of rows for progress bar.
@ -670,7 +650,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
res.emplace_back(std::make_shared<MergeTreeThreadBlockInputStream>(
i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, data, use_uncompressed_cache,
prewhere_actions, prewhere_column, settings, virt_columns));
prewhere_info, settings, virt_columns));
if (i == 0)
{
@ -744,7 +724,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io,
use_uncompressed_cache, prewhere_info, true, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query);
res.push_back(source_stream);
@ -763,8 +743,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
const Names & column_names,
size_t max_block_size,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
const Names & virt_columns,
const Settings & settings) const
{
@ -790,7 +769,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
prewhere_info, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
virt_columns, part.part_index_in_query);
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.getPrimaryExpression()));

View File

@ -26,7 +26,6 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams,
Int64 max_block_number_to_read) const;
@ -36,7 +35,6 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams,
Int64 max_block_number_to_read) const;
@ -52,8 +50,7 @@ private:
const Names & column_names,
size_t max_block_size,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
const Names & virt_columns,
const Settings & settings) const;
@ -62,8 +59,7 @@ private:
const Names & column_names,
size_t max_block_size,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
const Names & virt_columns,
const Settings & settings) const;

View File

@ -1,10 +1,12 @@
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Columns/FilterDescription.h>
#include <ext/range.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnNothing.h>
#include <ext/range.h>
#if __SSE2__
#include <emmintrin.h>
#include <DataTypes/DataTypeNothing.h>
#endif
namespace DB
@ -436,7 +438,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
merge_tree_reader->evaluateMissingDefaults(read_result.block);
if (should_reorder || always_reorder || block.columns())
merge_tree_reader->reorderColumns(read_result.block, *ordered_names);
merge_tree_reader->reorderColumns(read_result.block, *ordered_names, prewhere_column_name);
}
}
else
@ -452,7 +454,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
merge_tree_reader->evaluateMissingDefaults(read_result.block);
if (should_reorder || always_reorder)
merge_tree_reader->reorderColumns(read_result.block, *ordered_names);
merge_tree_reader->reorderColumns(read_result.block, *ordered_names, prewhere_column_name);
}
}
@ -611,23 +613,25 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
if (!result.block)
return;
auto getNumRows = [&]()
{
/// If block has single column, it's filter. We need to count bytes in it in order to get the number of rows.
if (result.block.columns() > 1)
return result.block.rows();
else if (result.getFilter())
return countBytesInFilter(result.getFilter()->getData());
else
return prev_rows;
};
if (remove_prewhere_column)
result.block.erase(*prewhere_column_name);
else
{
/// Calculate the number of rows in block in order to create const column.
size_t rows = result.block.rows();
/// If block has single column, it's filter. We need to count bytes in it in order to get the number of rows.
if (result.block.columns() == 1)
{
if (result.getFilter())
rows = countBytesInFilter(result.getFilter()->getData());
else
rows = prev_rows;
}
prewhere_column.column = prewhere_column.type->createColumnConst(getNumRows(), UInt64(1));
prewhere_column.column = prewhere_column.type->createColumnConst(rows, UInt64(1));
}
/// If block is empty, create column in order to store rows number.
if (last_reader_in_chain && result.block.columns() == 0)
result.block.insert({ColumnNothing::create(getNumRows()), std::make_shared<DataTypeNothing>(), "_nothing"});
}
}

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <ext/range.h>
#include <Storages/MergeTree/MergeTreeBaseBlockInputStream.h>
namespace ProfileEvents
@ -15,19 +16,20 @@ namespace DB
MergeTreeReadPool::MergeTreeReadPool(
const size_t threads, const size_t sum_marks, const size_t min_marks_for_concurrent_read,
RangesInDataParts parts, MergeTreeData & data, const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column_name, const bool check_columns, const Names & column_names,
RangesInDataParts parts, MergeTreeData & data, const PrewhereInfoPtr & prewhere_info,
const bool check_columns, const Names & column_names,
const BackoffSettings & backoff_settings, size_t preferred_block_size_bytes,
const bool do_not_steal_tasks)
: backoff_settings{backoff_settings}, backoff_state{threads}, data{data},
column_names{column_names}, do_not_steal_tasks{do_not_steal_tasks}, predict_block_size_bytes{preferred_block_size_bytes > 0}
column_names{column_names}, do_not_steal_tasks{do_not_steal_tasks},
predict_block_size_bytes{preferred_block_size_bytes > 0}, prewhere_info{prewhere_info}
{
const auto per_part_sum_marks = fillPerPartInfo(parts, prewhere_actions, prewhere_column_name, check_columns);
const auto per_part_sum_marks = fillPerPartInfo(parts, prewhere_info, check_columns);
fillPerThreadInfo(threads, sum_marks, per_part_sum_marks, parts, min_marks_for_concurrent_read);
}
MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, const size_t thread)
MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, const size_t thread, const Names & ordered_names)
{
const std::lock_guard<std::mutex> lock{mutex};
@ -111,9 +113,9 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
: std::make_unique<MergeTreeBlockSizePredictor>(*per_part_size_predictor[part_idx]); /// make a copy
return std::make_unique<MergeTreeReadTask>(
part.data_part, ranges_to_get_from_part, part.part_index_in_query, column_names,
part.data_part, ranges_to_get_from_part, part.part_index_in_query, ordered_names,
per_part_column_name_set[part_idx], per_part_columns[part_idx], per_part_pre_columns[part_idx],
per_part_remove_prewhere_column[part_idx], per_part_should_reorder[part_idx], std::move(curr_task_size_predictor));
prewhere_info && prewhere_info->remove_prewhere_column, per_part_should_reorder[part_idx], std::move(curr_task_size_predictor));
}
@ -122,7 +124,6 @@ Block MergeTreeReadPool::getHeader() const
return data.getSampleBlockForColumns(column_names);
}
void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInfo info)
{
if (backoff_settings.min_read_latency_ms == 0 || do_not_steal_tasks)
@ -165,8 +166,7 @@ void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInf
std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
RangesInDataParts & parts, const ExpressionActionsPtr & prewhere_actions, const String & prewhere_column_name,
const bool check_columns)
RangesInDataParts & parts, const PrewhereInfoPtr & prewhere_info, const bool check_columns)
{
std::vector<size_t> per_part_sum_marks;
Block sample_block = data.getSampleBlock();
@ -193,10 +193,10 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
Names required_pre_column_names;
if (prewhere_actions)
if (prewhere_info)
{
/// collect columns required for PREWHERE evaluation
required_pre_column_names = prewhere_actions->getRequiredColumns();
required_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
/// there must be at least one column required for PREWHERE
if (required_pre_column_names.empty())
@ -208,13 +208,7 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
should_reoder = true;
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const NameSet pre_name_set{
std::begin(required_pre_column_names), std::end(required_pre_column_names)
};
/** If expression in PREWHERE is not table column, then no need to return column with it to caller
* (because storage is expected only to read table columns).
*/
per_part_remove_prewhere_column.push_back(0 == pre_name_set.count(prewhere_column_name));
const NameSet pre_name_set(required_pre_column_names.begin(), required_pre_column_names.end());
Names post_column_names;
for (const auto & name : required_column_names)
@ -223,8 +217,6 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
required_column_names = post_column_names;
}
else
per_part_remove_prewhere_column.push_back(false);
per_part_column_name_set.emplace_back(std::begin(required_column_names), std::end(required_column_names));

View File

@ -3,6 +3,7 @@
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/SelectQueryInfo.h>
#include <mutex>
@ -66,12 +67,12 @@ private:
public:
MergeTreeReadPool(
const size_t threads, const size_t sum_marks, const size_t min_marks_for_concurrent_read,
RangesInDataParts parts, MergeTreeData & data, const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column_name, const bool check_columns, const Names & column_names,
RangesInDataParts parts, MergeTreeData & data, const PrewhereInfoPtr & prewhere_info,
const bool check_columns, const Names & column_names,
const BackoffSettings & backoff_settings, size_t preferred_block_size_bytes,
const bool do_not_steal_tasks = false);
MergeTreeReadTaskPtr getTask(const size_t min_marks_to_read, const size_t thread);
MergeTreeReadTaskPtr getTask(const size_t min_marks_to_read, const size_t thread, const Names & ordered_names);
/** Each worker could call this method and pass information about read performance.
* If read performance is too low, pool could decide to lower number of threads: do not assign more tasks to several threads.
@ -83,8 +84,7 @@ public:
private:
std::vector<size_t> fillPerPartInfo(
RangesInDataParts & parts, const ExpressionActionsPtr & prewhere_actions, const String & prewhere_column_name,
const bool check_columns);
RangesInDataParts & parts, const PrewhereInfoPtr & prewhere_info, const bool check_columns);
void fillPerThreadInfo(
const size_t threads, const size_t sum_marks, std::vector<size_t> per_part_sum_marks,
@ -93,15 +93,15 @@ private:
std::vector<std::shared_lock<std::shared_mutex>> per_part_columns_lock;
MergeTreeData & data;
Names column_names;
Names ordered_names;
bool do_not_steal_tasks;
bool predict_block_size_bytes;
std::vector<NameSet> per_part_column_name_set;
std::vector<NamesAndTypesList> per_part_columns;
std::vector<NamesAndTypesList> per_part_pre_columns;
/// @todo actually all of these values are either true or false for the whole query, thus no vector required
std::vector<char> per_part_remove_prewhere_column;
std::vector<char> per_part_should_reorder;
std::vector<MergeTreeBlockSizePredictorPtr> per_part_size_predictor;
PrewhereInfoPtr prewhere_info;
struct Part
{

View File

@ -542,7 +542,7 @@ void MergeTreeReader::fillMissingColumns(Block & res, bool & should_reorder, boo
}
}
void MergeTreeReader::reorderColumns(Block & res, const Names & ordered_names)
void MergeTreeReader::reorderColumns(Block & res, const Names & ordered_names, const String * filter_name)
{
try
{
@ -552,6 +552,9 @@ void MergeTreeReader::reorderColumns(Block & res, const Names & ordered_names)
if (res.has(name))
ordered_block.insert(res.getByName(name));
if (filter_name && !ordered_block.has(*filter_name) && res.has(*filter_name))
ordered_block.insert(res.getByName(*filter_name));
std::swap(res, ordered_block);
}
catch (Exception & e)

View File

@ -45,7 +45,8 @@ public:
/// If at least one column was added, reorders all columns in the block according to ordered_names.
void fillMissingColumns(Block & res, bool & should_reorder, bool & should_evaluate_missing_defaults);
/// Sort columns to ensure consistent order among all blocks.
void reorderColumns(Block & res, const Names & ordered_names);
/// If filter_name is not nullptr and block has filter column, move it to the end of block.
void reorderColumns(Block & res, const Names & ordered_names, const String * filter_name);
/// Evaluate defaulted columns if necessary.
void evaluateMissingDefaults(Block & res);

View File

@ -16,12 +16,11 @@ MergeTreeThreadBlockInputStream::MergeTreeThreadBlockInputStream(
size_t preferred_max_column_in_block_size_bytes,
MergeTreeData & storage,
const bool use_uncompressed_cache,
const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
const Settings & settings,
const Names & virt_column_names)
:
MergeTreeBaseBlockInputStream{storage, prewhere_actions, prewhere_column, max_block_size_rows,
MergeTreeBaseBlockInputStream{storage, prewhere_info, max_block_size_rows,
preferred_block_size_bytes, preferred_max_column_in_block_size_bytes, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, use_uncompressed_cache, true, virt_column_names},
thread{thread},
@ -35,6 +34,8 @@ MergeTreeThreadBlockInputStream::MergeTreeThreadBlockInputStream(
}
else
min_marks_to_read = min_marks_to_read_;
ordered_names = getHeader().getNames();
}
@ -42,6 +43,7 @@ Block MergeTreeThreadBlockInputStream::getHeader() const
{
auto res = pool->getHeader();
injectVirtualColumns(res);
executePrewhereActions(res, prewhere_info);
return res;
}
@ -49,7 +51,7 @@ Block MergeTreeThreadBlockInputStream::getHeader() const
/// Requests read task from MergeTreeReadPool and signals whether it got one
bool MergeTreeThreadBlockInputStream::getNewTask()
{
task = pool->getTask(min_marks_to_read, thread);
task = pool->getTask(min_marks_to_read, thread, ordered_names);
if (!task)
{
@ -78,7 +80,7 @@ bool MergeTreeThreadBlockInputStream::getNewTask()
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback);
if (prewhere_actions)
if (prewhere_info)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
storage, task->mark_ranges, min_bytes_to_use_direct_io,
@ -92,7 +94,7 @@ bool MergeTreeThreadBlockInputStream::getNewTask()
storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size,
reader->getAvgValueSizeHints(), profile_callback);
if (prewhere_actions)
if (prewhere_info)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
storage, task->mark_ranges, min_bytes_to_use_direct_io,

View File

@ -23,8 +23,7 @@ public:
size_t preferred_max_column_in_block_size_bytes,
MergeTreeData & storage,
const bool use_uncompressed_cache,
const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column,
const PrewhereInfoPtr & prewhere_info,
const Settings & settings,
const Names & virt_column_names);
@ -38,11 +37,15 @@ protected:
/// Requests read task from MergeTreeReadPool and signals whether it got one
bool getNewTask() override;
private:
/// "thread" index (there are N threads and each thread is assigned index in interval [0..N-1])
size_t thread;
std::shared_ptr<MergeTreeReadPool> pool;
size_t min_marks_to_read;
/// Names from header. Used in order to order columns in read blocks.
Names ordered_names;
};
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <thread>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
#include <Common/ZooKeeper/Types.h>
#include <Core/Types.h>
#include <common/logger_useful.h>

View File

@ -4,10 +4,9 @@
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <common/logger_useful.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
#include <thread>
#include <map>
#include <pcg_random.hpp>

View File

@ -10,7 +10,7 @@
#include <Poco/Event.h>
#include <Core/Types.h>
#include <common/logger_useful.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
namespace DB
{

View File

@ -10,7 +10,7 @@
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
namespace DB

View File

@ -2,7 +2,7 @@
#include <Poco/Event.h>
#include <common/logger_useful.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
#include <Core/Types.h>
#include <thread>
#include <atomic>

View File

@ -22,12 +22,12 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams) override
{
return MergeTreeDataSelectExecutor(part->storage).readFromParts(
{part}, column_names, query_info, context, processed_stage, max_block_size, num_streams, 0);
{part}, column_names, query_info, context, max_block_size, num_streams, 0);
}
protected:

View File

@ -10,12 +10,29 @@ namespace DB
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class Set;
using SetPtr = std::shared_ptr<Set>;
/// Information about calculated sets in right hand side of IN.
using PreparedSets = std::unordered_map<StringRange, SetPtr, StringRangePointersHash, StringRangePointersEqualTo>;
struct PrewhereInfo
{
/// Actions which are executed on block in order to get filter column for prewhere step.
ExpressionActionsPtr prewhere_actions;
String prewhere_column_name;
bool remove_prewhere_column = false;
PrewhereInfo() = default;
explicit PrewhereInfo(ExpressionActionsPtr prewhere_actions_, String prewhere_column_name_)
: prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {}
};
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
/** Query along with some additional data,
* that can be used during query processing
@ -25,6 +42,8 @@ struct SelectQueryInfo
{
ASTPtr query;
PrewhereInfoPtr prewhere_info;
/// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3)
PreparedSets sets;

View File

@ -103,15 +103,30 @@ private:
};
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context) const
{
if (!no_destination)
{
auto destination = context.getTable(destination_database, destination_table);
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
return destination->getQueryProcessingStage(context);
}
return QueryProcessingStage::FetchColumns;
}
BlockInputStreams StorageBuffer::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
processed_stage = QueryProcessingStage::FetchColumns;
checkQueryProcessingStage(processed_stage, context);
BlockInputStreams streams_from_dst;

View File

@ -53,11 +53,13 @@ public:
std::string getName() const override { return "Buffer"; }
std::string getTableName() const override { return name; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -57,10 +57,9 @@ public:
reader->readSuffix();
}
Block getHeader() const override { return sample_block; }
Block getHeader() const override { return reader->getHeader(); }
private:
Block sample_block;
std::unique_ptr<ReadBufferFromFileDescriptor> read_buf;
BlockInputStreamPtr reader;
std::string file_name;
@ -78,8 +77,8 @@ static std::string resolvePath(const boost::filesystem::path & base_path, std::s
{
boost::filesystem::path resolved_path(path);
if (!resolved_path.is_absolute())
return (base_path / resolved_path).string();
return resolved_path.string();
return boost::filesystem::canonical(resolved_path, base_path).string();
return boost::filesystem::canonical(resolved_path).string();
}
static void checkCreationIsAllowed(const String & base_path, const String & path)
@ -262,10 +261,12 @@ void StorageCatBoostPool::createSampleBlockAndColumns()
BlockInputStreams StorageCatBoostPool::read(const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum & /*processed_stage*/,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned /*threads*/)
{
checkQueryProcessingStage(processed_stage, context);
auto stream = std::make_shared<CatBoostDatasetBlockInputStream>(
data_description_file_name, "TSV", sample_block, context, max_block_size);

View File

@ -17,7 +17,7 @@ public:
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned threads) override;

View File

@ -38,11 +38,11 @@ BlockInputStreams StorageDictionary::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned /*threads*/)
{
processed_stage = QueryProcessingStage::FetchColumns;
checkQueryProcessingStage(processed_stage, context);
auto dictionary = context.getExternalDictionaries().getDictionary(dictionary_name);
return BlockInputStreams{dictionary->getBlockInputStream(column_names, max_block_size)};
}

View File

@ -27,7 +27,7 @@ public:
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;

View File

@ -220,17 +220,14 @@ StoragePtr StorageDistributed::createWithOwnCluster(
return res;
}
BlockInputStreams StorageDistributed::read(
const Names & /*column_names*/,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context) const
{
auto cluster = getCluster();
return getQueryProcessingStage(context, cluster);
}
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context, const ClusterPtr & cluster) const
{
const Settings & settings = context.getSettingsRef();
size_t num_local_shards = cluster->getLocalShardCount();
@ -238,11 +235,24 @@ BlockInputStreams StorageDistributed::read(
size_t result_size = (num_remote_shards * settings.max_parallel_replicas) + num_local_shards;
if (settings.distributed_group_by_no_merge)
processed_stage = QueryProcessingStage::Complete;
return QueryProcessingStage::Complete;
else /// Normal mode.
processed_stage = result_size == 1
? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
return result_size == 1 ? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
}
BlockInputStreams StorageDistributed::read(
const Names & /*column_names*/,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
auto cluster = getCluster();
checkQueryProcessingStage(processed_stage, getQueryProcessingStage(context, cluster));
const Settings & settings = context.getSettingsRef();
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr);

View File

@ -60,11 +60,14 @@ public:
bool isRemote() const override { return true; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ClusterPtr & cluster) const;
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -190,10 +190,11 @@ BlockInputStreams StorageFile::read(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum & /*processed_stage*/,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned /*num_streams*/)
{
checkQueryProcessingStage(processed_stage, context);
return BlockInputStreams(1, std::make_shared<StorageFileBlockInputStream>(*this, context, max_block_size));
}

View File

@ -35,7 +35,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -572,12 +572,12 @@ BlockInputStreams StorageLog::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
checkQueryProcessingStage(processed_stage, context);
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
loadMarks();
NamesAndTypesList all_columns = Nested::collect(getColumns().getAllPhysical().addTypes(column_names));

View File

@ -30,7 +30,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -168,11 +168,16 @@ bool StorageMaterializedView::hasColumn(const String & column_name) const
return getTargetTable()->hasColumn(column_name);
}
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context) const
{
return getTargetTable()->getQueryProcessingStage(context);
}
BlockInputStreams StorageMaterializedView::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
@ -255,6 +260,12 @@ void StorageMaterializedView::freezePartition(const ASTPtr & partition, const St
getTargetTable()->freezePartition(partition, with_name, context);
}
void StorageMaterializedView::mutate(const MutationCommands & commands, const Context & context)
{
checkStatementCanBeForwarded();
getTargetTable()->mutate(commands, context);
}
void StorageMaterializedView::shutdown()
{
/// Make sure the dependency is removed after DETACH TABLE

View File

@ -39,17 +39,20 @@ public:
void clearColumnInPartition(const ASTPtr & partition, const Field & column_name, const Context & context) override;
void attachPartition(const ASTPtr & partition, bool part, const Context & context) override;
void freezePartition(const ASTPtr & partition, const String & with_name, const Context & context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
void shutdown() override;
void checkTableCanBeDropped() const override;
void checkPartitionCanBeDropped(const ASTPtr & partition) override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -83,13 +83,13 @@ StorageMemory::StorageMemory(String table_name_, ColumnsDescription columns_desc
BlockInputStreams StorageMemory::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum & processed_stage,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t /*max_block_size*/,
unsigned num_streams)
{
checkQueryProcessingStage(processed_stage, context);
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
std::lock_guard<std::mutex> lock(mutex);

View File

@ -32,7 +32,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -134,11 +134,46 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) cons
}
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context) const
{
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
auto database = context.getDatabase(source_database);
auto iterator = database->getIterator(context);
bool first = true;
while (iterator->isValid())
{
if (table_name_regexp.match(iterator->name()))
{
auto & table = iterator->table();
if (table.get() != this)
{
auto stage = table->getQueryProcessingStage(context);
if (first)
stage_in_source_tables = stage;
else if (stage != stage_in_source_tables)
throw Exception("Source tables for Merge table are processing data up to different stages",
ErrorCodes::INCOMPATIBLE_SOURCE_TABLES);
first = false;
}
}
iterator->next();
}
return stage_in_source_tables;
}
BlockInputStreams StorageMerge::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
@ -158,8 +193,6 @@ BlockInputStreams StorageMerge::read(
real_column_names.push_back(name);
}
std::optional<QueryProcessingStage::Enum> processed_stage_in_source_tables;
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
*/
@ -167,11 +200,20 @@ BlockInputStreams StorageMerge::read(
const ASTPtr & query = query_info.query;
/// If PREWHERE is used in query, you need to make sure that all tables support this.
if (typeid_cast<const ASTSelectQuery &>(*query).prewhere_expression)
for (const auto & elem : selected_tables)
for (const auto & elem : selected_tables)
{
/// Check processing stage again in case new table was added after getQueryProcessingStage call.
auto stage = elem.first->getQueryProcessingStage(context);
if (stage != processed_stage)
throw Exception("Source tables for Merge table are processing data up to different stages",
ErrorCodes::INCOMPATIBLE_SOURCE_TABLES);
/// If PREWHERE is used in query, you need to make sure that all tables support this.
if (typeid_cast<const ASTSelectQuery &>(*query).prewhere_expression)
if (!elem.first->supportsPrewhere())
throw Exception("Storage " + elem.first->getName() + " doesn't support PREWHERE.", ErrorCodes::ILLEGAL_PREWHERE);
throw Exception("Storage " + elem.first->getName() + " doesn't support PREWHERE.",
ErrorCodes::ILLEGAL_PREWHERE);
}
Block virtual_columns_block = getBlockWithVirtualColumns(selected_tables);
@ -212,30 +254,24 @@ BlockInputStreams StorageMerge::read(
SelectQueryInfo modified_query_info;
modified_query_info.query = modified_query_ast;
modified_query_info.prewhere_info = query_info.prewhere_info;
modified_query_info.sets = query_info.sets;
BlockInputStreams source_streams;
if (curr_table_number < num_streams)
{
QueryProcessingStage::Enum processed_stage_in_source_table = processed_stage;
source_streams = table->read(
real_column_names,
modified_query_info,
modified_context,
processed_stage_in_source_table,
processed_stage,
max_block_size,
tables_count >= num_streams ? 1 : (num_streams / tables_count));
if (!processed_stage_in_source_tables)
processed_stage_in_source_tables.emplace(processed_stage_in_source_table);
else if (processed_stage_in_source_table != *processed_stage_in_source_tables)
throw Exception("Source tables for Merge table are processing data up to different stages",
ErrorCodes::INCOMPATIBLE_SOURCE_TABLES);
if (!header)
{
switch (processed_stage_in_source_table)
switch (processed_stage)
{
case QueryProcessingStage::FetchColumns:
header = getSampleBlockForColumns(column_names);
@ -244,7 +280,7 @@ BlockInputStreams StorageMerge::read(
case QueryProcessingStage::Complete:
header = materializeBlock(InterpreterSelectQuery(
query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)),
processed_stage_in_source_table, true).getSampleBlock());
processed_stage, true).getSampleBlock());
break;
}
}
@ -261,25 +297,17 @@ BlockInputStreams StorageMerge::read(
}
else
{
if (!processed_stage_in_source_tables)
throw Exception("Logical error: unknown processed stage in source tables", ErrorCodes::LOGICAL_ERROR);
/// If many streams, initialize it lazily, to avoid long delay before start of query processing.
source_streams.emplace_back(std::make_shared<LazyBlockInputStream>(header, [=]() -> BlockInputStreamPtr
{
QueryProcessingStage::Enum processed_stage_in_source_table = processed_stage;
BlockInputStreams streams = table->read(
real_column_names,
modified_query_info,
modified_context,
processed_stage_in_source_table,
processed_stage,
max_block_size,
1);
if (processed_stage_in_source_table != *processed_stage_in_source_tables)
throw Exception("Source tables for Merge table are processing data up to different stages",
ErrorCodes::INCOMPATIBLE_SOURCE_TABLES);
if (streams.empty())
{
return std::make_shared<NullBlockInputStream>(header);
@ -303,9 +331,6 @@ BlockInputStreams StorageMerge::read(
res.insert(res.end(), source_streams.begin(), source_streams.end());
}
if (processed_stage_in_source_tables)
processed_stage = *processed_stage_in_source_tables;
if (res.empty())
return res;

View File

@ -29,11 +29,13 @@ public:
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const override;
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -110,11 +110,12 @@ BlockInputStreams StorageMergeTree::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
return reader.read(column_names, query_info, context, processed_stage, max_block_size, num_streams, 0);
checkQueryProcessingStage(processed_stage, context);
return reader.read(column_names, query_info, context, max_block_size, num_streams, 0);
}
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Settings & /*settings*/)

View File

@ -56,7 +56,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -51,12 +51,12 @@ BlockInputStreams StorageMySQL::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
checkQueryProcessingStage(processed_stage, context);
String query = transformQueryForExternalDatabase(
*query_info.query, getColumns().ordinary, IdentifierQuotingStyle::Backticks, remote_database_name, remote_table_name, context);

View File

@ -36,7 +36,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -23,11 +23,12 @@ public:
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo &,
const Context &,
QueryProcessingStage::Enum &,
const Context & context,
QueryProcessingStage::Enum processing_stage,
size_t,
unsigned) override
{
checkQueryProcessingStage(processing_stage, context);
return { std::make_shared<NullBlockInputStream>(getSampleBlockForColumns(column_names)) };
}

View File

@ -75,10 +75,12 @@ std::function<void(std::ostream &)> StorageODBC::getReadPOSTDataCallback(const N
BlockInputStreams StorageODBC::read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
check(column_names);
checkQueryProcessingStage(processed_stage, context);
odbc_bridge_helper.startODBCBridgeSync();
return IStorageURLBase::read(column_names, query_info, context, processed_stage, max_block_size, num_streams);

View File

@ -22,7 +22,7 @@ public:
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -2826,10 +2826,11 @@ BlockInputStreams StorageReplicatedMergeTree::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
checkQueryProcessingStage(processed_stage, context);
const Settings & settings = context.getSettingsRef();
/** The `select_sequential_consistency` setting has two meanings:
@ -2867,8 +2868,7 @@ BlockInputStreams StorageReplicatedMergeTree::read(
}
}
return reader.read(
column_names, query_info, context, processed_stage, max_block_size, num_streams, max_block_number_to_read);
return reader.read(column_names, query_info, context, max_block_size, num_streams, max_block_number_to_read);
}

View File

@ -23,7 +23,7 @@
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/LeaderElection.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/BackgroundSchedulePool.h>
namespace DB
@ -106,7 +106,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -235,14 +235,14 @@ BlockInputStreams StorageStripeLog::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
const size_t /*max_block_size*/,
unsigned num_streams)
{
checkQueryProcessingStage(processed_stage, context);
std::shared_lock<std::shared_mutex> lock(rwlock);
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
NameSet column_names_set(column_names.begin(), column_names.end());

View File

@ -32,7 +32,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -384,12 +384,12 @@ BlockInputStreams StorageTinyLog::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned /*num_streams*/)
{
checkQueryProcessingStage(processed_stage, context);
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
return BlockInputStreams(1, std::make_shared<TinyLogBlockInputStream>(
max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size));
}

View File

@ -31,7 +31,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -156,10 +156,12 @@ std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(con
BlockInputStreams IStorageURLBase::read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned /*num_streams*/)
{
checkQueryProcessingStage(processed_stage, context);
auto request_uri = uri;
auto params = getReadURIParams(column_names, query_info, context, processed_stage, max_block_size);
for (const auto & [param, value] : params)

View File

@ -24,7 +24,7 @@ public:
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -33,11 +33,11 @@ BlockInputStreams StorageView::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
processed_stage = QueryProcessingStage::FetchColumns;
checkQueryProcessingStage(processed_stage, context);
BlockInputStreams res = InterpreterSelectWithUnionQuery(inner_query, context, column_names).executeWithMultipleStreams();
/// It's expected that the columns read from storage are not constant.

View File

@ -25,7 +25,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -33,12 +33,12 @@ public:
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t /*max_block_size*/,
unsigned /*num_streams*/) override
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
checkQueryProcessingStage(processed_stage, context);
Block sample_block = getSampleBlock();
MutableColumns res_columns = sample_block.cloneEmptyColumns();

View File

@ -53,13 +53,13 @@ StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multi
BlockInputStreams StorageSystemNumbers::read(
const Names & column_names,
const SelectQueryInfo &,
const Context &,
QueryProcessingStage::Enum & processed_stage,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
checkQueryProcessingStage(processed_stage, context);
if (limit && limit < max_block_size)
{

View File

@ -29,7 +29,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -20,13 +20,13 @@ StorageSystemOne::StorageSystemOne(const std::string & name_)
BlockInputStreams StorageSystemOne::read(
const Names & column_names,
const SelectQueryInfo &,
const Context &,
QueryProcessingStage::Enum & processed_stage,
const Context & context,
QueryProcessingStage::Enum processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
checkQueryProcessingStage(processed_stage, context);
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(
Block{ColumnWithTypeAndName(

View File

@ -25,7 +25,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -237,13 +237,12 @@ BlockInputStreams StorageSystemPartsBase::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
bool has_state_column = hasStateColumn(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
checkQueryProcessingStage(processed_stage, context);
StoragesInfoStream stream(query_info, context, has_state_column);

View File

@ -27,7 +27,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -55,12 +55,12 @@ BlockInputStreams StorageSystemReplicas::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
checkQueryProcessingStage(processed_stage, context);
/// We collect a set of replicated tables.
std::map<String, std::map<String, StoragePtr>> replicated_tables;

View File

@ -22,7 +22,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

@ -60,11 +60,11 @@ BlockInputStreams StorageSystemTables::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
processed_stage = QueryProcessingStage::FetchColumns;
checkQueryProcessingStage(processed_stage, context);
check(column_names);

View File

@ -22,7 +22,7 @@ public:
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;

View File

Some files were not shown because too many files have changed in this diff Show More