Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2019-08-11 22:24:53 +03:00
commit 1cd87078c2
22 changed files with 450 additions and 408 deletions

View File

@ -113,7 +113,8 @@ public:
} }
TKey key; TKey key;
size_t slot, hash; size_t slot;
size_t hash;
UInt64 count; UInt64 count;
UInt64 error; UInt64 error;
}; };
@ -147,15 +148,13 @@ public:
void insert(const TKey & key, UInt64 increment = 1, UInt64 error = 0) void insert(const TKey & key, UInt64 increment = 1, UInt64 error = 0)
{ {
// Increase weight of a key that already exists // Increase weight of a key that already exists
// It uses hashtable for both value mapping as a presence test (c_i != 0)
auto hash = counter_map.hash(key); auto hash = counter_map.hash(key);
auto it = counter_map.find(key, hash); auto counter = findCounter(key, hash);
if (it != counter_map.end()) if (counter)
{ {
auto c = it->getSecond(); counter->count += increment;
c->count += increment; counter->error += error;
c->error += error; percolate(counter);
percolate(c);
return; return;
} }
// Key doesn't exist, but can fit in the top K // Key doesn't exist, but can fit in the top K
@ -177,6 +176,7 @@ public:
push(new Counter(arena.emplace(key), increment, error, hash)); push(new Counter(arena.emplace(key), increment, error, hash));
return; return;
} }
const size_t alpha_mask = alpha_map.size() - 1; const size_t alpha_mask = alpha_map.size() - 1;
auto & alpha = alpha_map[hash & alpha_mask]; auto & alpha = alpha_map[hash & alpha_mask];
if (alpha + increment < min->count) if (alpha + increment < min->count)
@ -187,22 +187,9 @@ public:
// Erase the current minimum element // Erase the current minimum element
alpha_map[min->hash & alpha_mask] = min->count; alpha_map[min->hash & alpha_mask] = min->count;
it = counter_map.find(min->key, min->hash); destroyLastElement();
// Replace minimum with newly inserted element push(new Counter(arena.emplace(key), alpha + increment, alpha + error, hash));
if (it != counter_map.end())
{
arena.free(min->key);
min->hash = hash;
min->key = arena.emplace(key);
min->count = alpha + increment;
min->error = alpha + error;
percolate(min);
it->getSecond() = min;
it->getFirstMutable() = min->key;
counter_map.reinsert(it, hash);
}
} }
/* /*
@ -242,17 +229,35 @@ public:
// The list is sorted in descending order, we have to scan in reverse // The list is sorted in descending order, we have to scan in reverse
for (auto counter : boost::adaptors::reverse(rhs.counter_list)) for (auto counter : boost::adaptors::reverse(rhs.counter_list))
{ {
if (counter_map.find(counter->key) != counter_map.end()) size_t hash = counter_map.hash(counter->key);
if (auto current = findCounter(counter->key, hash))
{ {
// Subtract m2 previously added, guaranteed not negative // Subtract m2 previously added, guaranteed not negative
insert(counter->key, counter->count - m2, counter->error - m2); current->count += (counter->count - m2);
current->error += (counter->error - m2);
} }
else else
{ {
// Counters not monitored in S1 // Counters not monitored in S1
insert(counter->key, counter->count + m1, counter->error + m1); counter_list.push_back(new Counter(arena.emplace(counter->key), counter->count + m1, counter->error + m1, hash));
} }
} }
std::sort(counter_list.begin(), counter_list.end(), [](Counter * l, Counter * r) { return *l > *r; });
if (counter_list.size() > m_capacity)
{
for (size_t i = m_capacity; i < counter_list.size(); ++i)
{
arena.free(counter_list[i]->key);
delete counter_list[i];
}
counter_list.resize(m_capacity);
}
for (size_t i = 0; i < counter_list.size(); ++i)
counter_list[i]->slot = i;
rebuildCounterMap();
} }
std::vector<Counter> topK(size_t k) const std::vector<Counter> topK(size_t k) const
@ -336,7 +341,10 @@ private:
void destroyElements() void destroyElements()
{ {
for (auto counter : counter_list) for (auto counter : counter_list)
{
arena.free(counter->key);
delete counter; delete counter;
}
counter_map.clear(); counter_map.clear();
counter_list.clear(); counter_list.clear();
@ -346,19 +354,40 @@ private:
void destroyLastElement() void destroyLastElement()
{ {
auto last_element = counter_list.back(); auto last_element = counter_list.back();
auto cell = counter_map.find(last_element->key, last_element->hash);
cell->setZero();
counter_map.reinsert(cell, last_element->hash);
counter_list.pop_back();
arena.free(last_element->key); arena.free(last_element->key);
delete last_element; delete last_element;
counter_list.pop_back();
++removed_keys;
if (removed_keys * 2 > counter_map.size())
rebuildCounterMap();
} }
HashMap<TKey, Counter *, Hash, Grower, Allocator> counter_map; Counter * findCounter(const TKey & key, size_t hash)
{
auto it = counter_map.find(key, hash);
if (it == counter_map.end())
return nullptr;
return it->getSecond();
}
void rebuildCounterMap()
{
removed_keys = 0;
counter_map.clear();
for (auto counter : counter_list)
counter_map[counter->key] = counter;
}
using CounterMap = HashMap<TKey, Counter *, Hash, Grower, Allocator>;
CounterMap counter_map;
std::vector<Counter *> counter_list; std::vector<Counter *> counter_list;
std::vector<UInt64> alpha_map; std::vector<UInt64> alpha_map;
SpaceSavingArena<TKey> arena; SpaceSavingArena<TKey> arena;
size_t m_capacity; size_t m_capacity;
size_t removed_keys = 0;
}; };
} }

View File

@ -80,7 +80,7 @@ ExpressionActionsPtr AnalyzedJoin::createJoinedBlockActions(
ASTPtr query = expression_list; ASTPtr query = expression_list;
auto syntax_result = SyntaxAnalyzer(context).analyze(query, columns_from_joined_table, required_columns); auto syntax_result = SyntaxAnalyzer(context).analyze(query, columns_from_joined_table, required_columns);
ExpressionAnalyzer analyzer(query, syntax_result, context, {}, required_columns_set); ExpressionAnalyzer analyzer(query, syntax_result, context, required_columns_set);
return analyzer.getActions(true, false); return analyzer.getActions(true, false);
} }

View File

@ -33,6 +33,7 @@ struct AnalyzedJoin
private: private:
friend class SyntaxAnalyzer; friend class SyntaxAnalyzer;
friend struct SyntaxAnalyzerResult;
friend class ExpressionAnalyzer; friend class ExpressionAnalyzer;
Names key_names_left; Names key_names_left;

View File

@ -58,7 +58,6 @@
#include <Interpreters/ActionsVisitor.h> #include <Interpreters/ActionsVisitor.h>
#include <Interpreters/ExternalTablesVisitor.h> #include <Interpreters/ExternalTablesVisitor.h>
#include <Interpreters/GlobalSubqueriesVisitor.h> #include <Interpreters/GlobalSubqueriesVisitor.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
namespace DB namespace DB
{ {
@ -77,28 +76,15 @@ ExpressionAnalyzer::ExpressionAnalyzer(
const ASTPtr & query_, const ASTPtr & query_,
const SyntaxAnalyzerResultPtr & syntax_analyzer_result_, const SyntaxAnalyzerResultPtr & syntax_analyzer_result_,
const Context & context_, const Context & context_,
const NamesAndTypesList & additional_source_columns,
const NameSet & required_result_columns_, const NameSet & required_result_columns_,
size_t subquery_depth_, size_t subquery_depth_,
bool do_global_, bool do_global_,
const SubqueriesForSets & subqueries_for_sets_) const SubqueriesForSets & subqueries_for_sets_)
: ExpressionAnalyzerData(syntax_analyzer_result_->source_columns, required_result_columns_, subqueries_for_sets_) : ExpressionAnalyzerData(required_result_columns_, subqueries_for_sets_)
, query(query_), context(context_), settings(context.getSettings()) , query(query_), context(context_), settings(context.getSettings())
, subquery_depth(subquery_depth_), do_global(do_global_) , subquery_depth(subquery_depth_), do_global(do_global_)
, syntax(syntax_analyzer_result_) , syntax(syntax_analyzer_result_)
{ {
storage = syntax->storage;
rewrite_subqueries = syntax->rewrite_subqueries;
if (!additional_source_columns.empty())
{
source_columns.insert(source_columns.end(), additional_source_columns.begin(), additional_source_columns.end());
removeDuplicateColumns(source_columns);
}
/// Delete the unnecessary from `source_columns` list. Form `columns_added_by_join`.
collectUsedColumns();
/// external_tables, subqueries_for_sets for global subqueries. /// external_tables, subqueries_for_sets for global subqueries.
/// Replaces global subqueries with the generated names of temporary tables that will be sent to remote servers. /// Replaces global subqueries with the generated names of temporary tables that will be sent to remote servers.
initGlobalSubqueriesAndExternalTables(); initGlobalSubqueriesAndExternalTables();
@ -115,7 +101,7 @@ ExpressionAnalyzer::ExpressionAnalyzer(
bool ExpressionAnalyzer::isRemoteStorage() const bool ExpressionAnalyzer::isRemoteStorage() const
{ {
return storage && storage->isRemote(); return storage() && storage()->isRemote();
} }
@ -133,7 +119,7 @@ void ExpressionAnalyzer::analyzeAggregation()
if (select_query && (select_query->groupBy() || select_query->having())) if (select_query && (select_query->groupBy() || select_query->having()))
has_aggregation = true; has_aggregation = true;
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(source_columns, context); ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(sourceColumns(), context);
if (select_query) if (select_query)
{ {
@ -256,7 +242,7 @@ void ExpressionAnalyzer::makeSetsForIndex()
{ {
const auto * select_query = query->as<ASTSelectQuery>(); const auto * select_query = query->as<ASTSelectQuery>();
if (storage && select_query && storage->supportsIndexForIn()) if (storage() && select_query && storage()->supportsIndexForIn())
{ {
if (select_query->where()) if (select_query->where())
makeSetsForIndexImpl(select_query->where()); makeSetsForIndexImpl(select_query->where());
@ -312,7 +298,7 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node)
{ {
const IAST & args = *func->arguments; const IAST & args = *func->arguments;
if (storage && storage->mayBenefitFromIndexForIn(args.children.at(0), context)) if (storage() && storage()->mayBenefitFromIndexForIn(args.children.at(0), context))
{ {
const ASTPtr & arg = args.children.at(1); const ASTPtr & arg = args.children.at(1);
if (arg->as<ASTSubquery>() || arg->as<ASTIdentifier>()) if (arg->as<ASTSubquery>() || arg->as<ASTIdentifier>())
@ -322,9 +308,9 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node)
} }
else else
{ {
NamesAndTypesList temp_columns = source_columns; NamesAndTypesList temp_columns = sourceColumns();
temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end()); temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end());
for (const auto & joined_column : columns_added_by_join) for (const auto & joined_column : columnsAddedByJoin())
temp_columns.push_back(joined_column); temp_columns.push_back(joined_column);
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(temp_columns, context); ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(temp_columns, context);
getRootActions(func->arguments->children.at(0), true, temp_actions); getRootActions(func->arguments->children.at(0), true, temp_actions);
@ -343,7 +329,7 @@ void ExpressionAnalyzer::getRootActions(const ASTPtr & ast, bool no_subqueries,
{ {
LogAST log; LogAST log;
ActionsVisitor actions_visitor(context, settings.size_limits_for_set, subquery_depth, ActionsVisitor actions_visitor(context, settings.size_limits_for_set, subquery_depth,
source_columns, actions, prepared_sets, subqueries_for_sets, sourceColumns(), actions, prepared_sets, subqueries_for_sets,
no_subqueries, only_consts, !isRemoteStorage(), log.stream()); no_subqueries, only_consts, !isRemoteStorage(), log.stream());
actions_visitor.visit(ast); actions_visitor.visit(ast);
actions = actions_visitor.popActionsLevel(); actions = actions_visitor.popActionsLevel();
@ -356,7 +342,7 @@ void ExpressionAnalyzer::getActionsFromJoinKeys(const ASTTableJoin & table_join,
LogAST log; LogAST log;
ActionsVisitor actions_visitor(context, settings.size_limits_for_set, subquery_depth, ActionsVisitor actions_visitor(context, settings.size_limits_for_set, subquery_depth,
source_columns, actions, prepared_sets, subqueries_for_sets, sourceColumns(), actions, prepared_sets, subqueries_for_sets,
no_subqueries, only_consts, !isRemoteStorage(), log.stream()); no_subqueries, only_consts, !isRemoteStorage(), log.stream());
if (table_join.using_expression_list) if (table_join.using_expression_list)
@ -494,7 +480,7 @@ bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool on
if (!array_join_expression_list) if (!array_join_expression_list)
return false; return false;
initChain(chain, source_columns); initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back(); ExpressionActionsChain::Step & step = chain.steps.back();
getRootActions(array_join_expression_list, only_types, step.actions); getRootActions(array_join_expression_list, only_types, step.actions);
@ -507,12 +493,12 @@ bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool on
void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, bool only_types) const void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, bool only_types) const
{ {
if (only_types) if (only_types)
actions->add(ExpressionAction::ordinaryJoin(nullptr, analyzedJoin().key_names_left, columns_added_by_join)); actions->add(ExpressionAction::ordinaryJoin(nullptr, analyzedJoin().key_names_left, columnsAddedByJoin()));
else else
for (auto & subquery_for_set : subqueries_for_sets) for (auto & subquery_for_set : subqueries_for_sets)
if (subquery_for_set.second.join) if (subquery_for_set.second.join)
actions->add(ExpressionAction::ordinaryJoin(subquery_for_set.second.join, analyzedJoin().key_names_left, actions->add(ExpressionAction::ordinaryJoin(subquery_for_set.second.join, analyzedJoin().key_names_left,
columns_added_by_join)); columnsAddedByJoin()));
} }
static void appendRequiredColumns( static void appendRequiredColumns(
@ -536,7 +522,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
if (!select_query->join()) if (!select_query->join())
return false; return false;
initChain(chain, source_columns); initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back(); ExpressionActionsChain::Step & step = chain.steps.back();
const auto & join_element = select_query->join()->as<ASTTablesInSelectQueryElement &>(); const auto & join_element = select_query->join()->as<ASTTablesInSelectQueryElement &>();
@ -588,7 +574,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
auto & analyzed_join = analyzedJoin(); auto & analyzed_join = analyzedJoin();
/// Actions which need to be calculated on joined block. /// Actions which need to be calculated on joined block.
ExpressionActionsPtr joined_block_actions = ExpressionActionsPtr joined_block_actions =
analyzed_join.createJoinedBlockActions(columns_added_by_join, select_query, context); analyzed_join.createJoinedBlockActions(columnsAddedByJoin(), select_query, context);
/** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs /** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs
* - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1, * - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1,
@ -610,7 +596,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
NameSet required_columns(action_columns.begin(), action_columns.end()); NameSet required_columns(action_columns.begin(), action_columns.end());
appendRequiredColumns( appendRequiredColumns(
required_columns, joined_block_actions->getSampleBlock(), analyzed_join.key_names_right, columns_added_by_join); required_columns, joined_block_actions->getSampleBlock(), analyzed_join.key_names_right, columnsAddedByJoin());
auto original_map = analyzed_join.getOriginalColumnsMap(required_columns); auto original_map = analyzed_join.getOriginalColumnsMap(required_columns);
Names original_columns; Names original_columns;
@ -647,7 +633,7 @@ bool ExpressionAnalyzer::appendPrewhere(
if (!select_query->prewhere()) if (!select_query->prewhere())
return false; return false;
initChain(chain, source_columns); initChain(chain, sourceColumns());
auto & step = chain.getLastStep(); auto & step = chain.getLastStep();
getRootActions(select_query->prewhere(), only_types, step.actions); getRootActions(select_query->prewhere(), only_types, step.actions);
String prewhere_column_name = select_query->prewhere()->getColumnName(); String prewhere_column_name = select_query->prewhere()->getColumnName();
@ -656,7 +642,7 @@ bool ExpressionAnalyzer::appendPrewhere(
{ {
/// Remove unused source_columns from prewhere actions. /// Remove unused source_columns from prewhere actions.
auto tmp_actions = std::make_shared<ExpressionActions>(source_columns, context); auto tmp_actions = std::make_shared<ExpressionActions>(sourceColumns(), context);
getRootActions(select_query->prewhere(), only_types, tmp_actions); getRootActions(select_query->prewhere(), only_types, tmp_actions);
tmp_actions->finalize({prewhere_column_name}); tmp_actions->finalize({prewhere_column_name});
auto required_columns = tmp_actions->getRequiredColumns(); auto required_columns = tmp_actions->getRequiredColumns();
@ -676,7 +662,7 @@ bool ExpressionAnalyzer::appendPrewhere(
auto names = step.actions->getSampleBlock().getNames(); auto names = step.actions->getSampleBlock().getNames();
NameSet name_set(names.begin(), names.end()); NameSet name_set(names.begin(), names.end());
for (const auto & column : source_columns) for (const auto & column : sourceColumns())
if (required_source_columns.count(column.name) == 0) if (required_source_columns.count(column.name) == 0)
name_set.erase(column.name); name_set.erase(column.name);
@ -697,7 +683,7 @@ bool ExpressionAnalyzer::appendPrewhere(
NameSet prewhere_input_names(required_columns.begin(), required_columns.end()); NameSet prewhere_input_names(required_columns.begin(), required_columns.end());
NameSet unused_source_columns; NameSet unused_source_columns;
for (const auto & column : source_columns) for (const auto & column : sourceColumns())
{ {
if (prewhere_input_names.count(column.name) == 0) if (prewhere_input_names.count(column.name) == 0)
{ {
@ -722,7 +708,7 @@ bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_t
if (!select_query->where()) if (!select_query->where())
return false; return false;
initChain(chain, source_columns); initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back(); ExpressionActionsChain::Step & step = chain.steps.back();
step.required_output.push_back(select_query->where()->getColumnName()); step.required_output.push_back(select_query->where()->getColumnName());
@ -742,7 +728,7 @@ bool ExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain, bool only
if (!select_query->groupBy()) if (!select_query->groupBy())
return false; return false;
initChain(chain, source_columns); initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back(); ExpressionActionsChain::Step & step = chain.steps.back();
ASTs asts = select_query->groupBy()->children; ASTs asts = select_query->groupBy()->children;
@ -761,7 +747,7 @@ void ExpressionAnalyzer::appendAggregateFunctionsArguments(ExpressionActionsChai
assertAggregation(); assertAggregation();
initChain(chain, source_columns); initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back(); ExpressionActionsChain::Step & step = chain.steps.back();
for (size_t i = 0; i < aggregate_descriptions.size(); ++i) for (size_t i = 0; i < aggregate_descriptions.size(); ++i)
@ -899,7 +885,7 @@ void ExpressionAnalyzer::appendProjectResult(ExpressionActionsChain & chain) con
void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types) void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types)
{ {
initChain(chain, source_columns); initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back(); ExpressionActionsChain::Step & step = chain.steps.back();
getRootActions(expr, only_types, step.actions); getRootActions(expr, only_types, step.actions);
step.required_output.push_back(expr->getColumnName()); step.required_output.push_back(expr->getColumnName());
@ -921,7 +907,7 @@ void ExpressionAnalyzer::getActionsBeforeAggregation(const ASTPtr & ast, Express
ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result) ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result)
{ {
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(source_columns, context); ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(sourceColumns(), context);
NamesWithAliases result_columns; NamesWithAliases result_columns;
Names result_names; Names result_names;
@ -956,7 +942,7 @@ ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool proje
if (!(add_aliases && project_result)) if (!(add_aliases && project_result))
{ {
/// We will not delete the original columns. /// We will not delete the original columns.
for (const auto & column_name_type : source_columns) for (const auto & column_name_type : sourceColumns())
result_names.push_back(column_name_type.name); result_names.push_back(column_name_type.name);
} }
@ -982,164 +968,4 @@ void ExpressionAnalyzer::getAggregateInfo(Names & key_names, AggregateDescriptio
aggregates = aggregate_descriptions; aggregates = aggregate_descriptions;
} }
void ExpressionAnalyzer::collectUsedColumns()
{
/** Calculate which columns are required to execute the expression.
* Then, delete all other columns from the list of available columns.
* After execution, columns will only contain the list of columns needed to read from the table.
*/
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(query);
NameSet source_column_names;
for (const auto & column : source_columns)
source_column_names.insert(column.name);
NameSet required = columns_context.requiredColumns();
if (columns_context.has_table_join)
{
const AnalyzedJoin & analyzed_join = analyzedJoin();
NameSet avaliable_columns;
for (const auto & name : source_columns)
avaliable_columns.insert(name.name);
/// Add columns obtained by JOIN (if needed).
columns_added_by_join.clear();
for (const auto & joined_column : analyzed_join.available_joined_columns)
{
auto & name = joined_column.name;
if (avaliable_columns.count(name))
continue;
if (required.count(name))
{
/// Optimisation: do not add columns needed only in JOIN ON section.
if (columns_context.nameInclusion(name) > analyzed_join.rightKeyInclusion(name))
columns_added_by_join.push_back(joined_column);
required.erase(name);
}
}
}
NameSet array_join_sources;
if (columns_context.has_array_join)
{
/// Insert the columns required for the ARRAY JOIN calculation into the required columns list.
for (const auto & result_source : syntax->array_join_result_to_source)
array_join_sources.insert(result_source.second);
for (const auto & column_name_type : source_columns)
if (array_join_sources.count(column_name_type.name))
required.insert(column_name_type.name);
}
const auto * select_query = query->as<ASTSelectQuery>();
/// You need to read at least one column to find the number of rows.
if (select_query && required.empty())
{
/// We will find a column with minimum <compressed_size, type_size, uncompressed_size>.
/// Because it is the column that is cheapest to read.
struct ColumnSizeTuple
{
size_t compressed_size;
size_t type_size;
size_t uncompressed_size;
String name;
bool operator<(const ColumnSizeTuple & that) const
{
return std::tie(compressed_size, type_size, uncompressed_size)
< std::tie(that.compressed_size, that.type_size, that.uncompressed_size);
}
};
std::vector<ColumnSizeTuple> columns;
if (storage)
{
auto column_sizes = storage->getColumnSizes();
for (auto & source_column : source_columns)
{
auto c = column_sizes.find(source_column.name);
if (c == column_sizes.end())
continue;
size_t type_size = source_column.type->haveMaximumSizeOfValue() ? source_column.type->getMaximumSizeOfValueInMemory() : 100;
columns.emplace_back(ColumnSizeTuple{c->second.data_compressed, type_size, c->second.data_uncompressed, source_column.name});
}
}
if (columns.size())
required.insert(std::min_element(columns.begin(), columns.end())->name);
else
/// If we have no information about columns sizes, choose a column of minimum size of its data type.
required.insert(ExpressionActions::getSmallestColumn(source_columns));
}
NameSet unknown_required_source_columns = required;
for (NamesAndTypesList::iterator it = source_columns.begin(); it != source_columns.end();)
{
const String & column_name = it->name;
unknown_required_source_columns.erase(column_name);
if (!required.count(column_name))
source_columns.erase(it++);
else
++it;
}
/// If there are virtual columns among the unknown columns. Remove them from the list of unknown and add
/// in columns list, so that when further processing they are also considered.
if (storage)
{
for (auto it = unknown_required_source_columns.begin(); it != unknown_required_source_columns.end();)
{
if (storage->hasColumn(*it))
{
source_columns.push_back(storage->getColumn(*it));
unknown_required_source_columns.erase(it++);
}
else
++it;
}
}
if (!unknown_required_source_columns.empty())
{
std::stringstream ss;
ss << "Missing columns:";
for (const auto & name : unknown_required_source_columns)
ss << " '" << name << "'";
ss << " while processing query: '" << query << "'";
ss << ", required columns:";
for (const auto & name : columns_context.requiredColumns())
ss << " '" << name << "'";
if (!source_column_names.empty())
{
ss << ", source columns:";
for (const auto & name : source_column_names)
ss << " '" << name << "'";
}
else
ss << ", no source columns";
if (columns_context.has_table_join)
{
ss << ", joined columns:";
for (const auto & column : analyzedJoin().available_joined_columns)
ss << " '" << column.name << "'";
}
if (!array_join_sources.empty())
{
ss << ", arrayJoin columns:";
for (const auto & name : array_join_sources)
ss << " '" << name << "'";
}
throw Exception(ss.str(), ErrorCodes::UNKNOWN_IDENTIFIER);
}
}
} }

View File

@ -29,13 +29,8 @@ struct SyntaxAnalyzerResult;
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>; using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
/// ExpressionAnalyzer sources, intermediates and results. It splits data and logic, allows to test them separately. /// ExpressionAnalyzer sources, intermediates and results. It splits data and logic, allows to test them separately.
/// If you are not writing a test you probably don't need it. Use ExpressionAnalyzer itself.
struct ExpressionAnalyzerData struct ExpressionAnalyzerData
{ {
/// Original columns.
/// First, all available columns of the table are placed here. Then (when analyzing the query), unused columns are deleted.
NamesAndTypesList source_columns;
/// If non-empty, ignore all expressions in not from this list. /// If non-empty, ignore all expressions in not from this list.
NameSet required_result_columns; NameSet required_result_columns;
@ -55,18 +50,10 @@ struct ExpressionAnalyzerData
/// All new temporary tables obtained by performing the GLOBAL IN/JOIN subqueries. /// All new temporary tables obtained by performing the GLOBAL IN/JOIN subqueries.
Tables external_tables; Tables external_tables;
/// Predicate optimizer overrides the sub queries
bool rewrite_subqueries = false;
/// Columns will be added to block by join.
NamesAndTypesList columns_added_by_join; /// Subset of analyzed_join.available_joined_columns
protected: protected:
ExpressionAnalyzerData(const NamesAndTypesList & source_columns_, ExpressionAnalyzerData(const NameSet & required_result_columns_,
const NameSet & required_result_columns_,
const SubqueriesForSets & subqueries_for_sets_) const SubqueriesForSets & subqueries_for_sets_)
: source_columns(source_columns_), : required_result_columns(required_result_columns_),
required_result_columns(required_result_columns_),
subqueries_for_sets(subqueries_for_sets_) subqueries_for_sets(subqueries_for_sets_)
{} {}
}; };
@ -102,7 +89,6 @@ public:
const ASTPtr & query_, const ASTPtr & query_,
const SyntaxAnalyzerResultPtr & syntax_analyzer_result_, const SyntaxAnalyzerResultPtr & syntax_analyzer_result_,
const Context & context_, const Context & context_,
const NamesAndTypesList & additional_source_columns = {},
const NameSet & required_result_columns_ = {}, const NameSet & required_result_columns_ = {},
size_t subquery_depth_ = 0, size_t subquery_depth_ = 0,
bool do_global_ = false, bool do_global_ = false,
@ -114,11 +100,6 @@ public:
/// Get a list of aggregation keys and descriptions of aggregate functions if the query contains GROUP BY. /// Get a list of aggregation keys and descriptions of aggregate functions if the query contains GROUP BY.
void getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates) const; void getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates) const;
/** Get a set of columns that are enough to read from the table to evaluate the expression.
* Columns added from another table by JOIN are not counted.
*/
Names getRequiredSourceColumns() const { return source_columns.getNames(); }
/** These methods allow you to build a chain of transformations over a block, that receives values in the desired sections of the query. /** These methods allow you to build a chain of transformations over a block, that receives values in the desired sections of the query.
* *
* Example usage: * Example usage:
@ -182,25 +163,21 @@ public:
/// Create Set-s that we can from IN section to use the index on them. /// Create Set-s that we can from IN section to use the index on them.
void makeSetsForIndex(); void makeSetsForIndex();
bool isRewriteSubqueriesPredicate() { return rewrite_subqueries; }
bool hasGlobalSubqueries() { return has_global_subqueries; } bool hasGlobalSubqueries() { return has_global_subqueries; }
private: private:
ASTPtr query; ASTPtr query;
const Context & context; const Context & context;
const ExtractedSettings settings; const ExtractedSettings settings;
StoragePtr storage; /// The main table in FROM clause, if exists.
size_t subquery_depth; size_t subquery_depth;
bool do_global; /// Do I need to prepare for execution global subqueries when analyzing the query. bool do_global; /// Do I need to prepare for execution global subqueries when analyzing the query.
SyntaxAnalyzerResultPtr syntax; SyntaxAnalyzerResultPtr syntax;
const AnalyzedJoin & analyzedJoin() const { return syntax->analyzed_join; }
/** Remove all unnecessary columns from the list of all available columns of the table (`columns`). const StoragePtr & storage() const { return syntax->storage; } /// The main table in FROM clause, if exists.
* At the same time, form a set of columns added by JOIN (`columns_added_by_join`). const AnalyzedJoin & analyzedJoin() const { return syntax->analyzed_join; }
*/ const NamesAndTypesList & sourceColumns() const { return syntax->required_source_columns; }
void collectUsedColumns(); const NamesAndTypesList & columnsAddedByJoin() const { return syntax->columns_added_by_join; }
/// Find global subqueries in the GLOBAL IN/JOIN sections. Fills in external_tables. /// Find global subqueries in the GLOBAL IN/JOIN sections. Fills in external_tables.
void initGlobalSubqueriesAndExternalTables(); void initGlobalSubqueriesAndExternalTables();

View File

@ -295,9 +295,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
table_lock = storage->lockStructureForShare(false, context.getCurrentQueryId()); table_lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze( syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze(
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage); query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());
query_analyzer = std::make_unique<ExpressionAnalyzer>( query_analyzer = std::make_unique<ExpressionAnalyzer>(
query_ptr, syntax_analyzer_result, context, NamesAndTypesList(), query_ptr, syntax_analyzer_result, context,
NameSet(required_result_column_names.begin(), required_result_column_names.end()), NameSet(required_result_column_names.begin(), required_result_column_names.end()),
options.subquery_depth, !options.only_analyze); options.subquery_depth, !options.only_analyze);
@ -320,7 +320,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (!options.only_analyze || options.modify_inplace) if (!options.only_analyze || options.modify_inplace)
{ {
if (query_analyzer->isRewriteSubqueriesPredicate()) if (syntax_analyzer_result->rewrite_subqueries)
{ {
/// remake interpreter_subquery when PredicateOptimizer rewrites subqueries and main table is subquery /// remake interpreter_subquery when PredicateOptimizer rewrites subqueries and main table is subquery
if (is_subquery) if (is_subquery)
@ -339,7 +339,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
interpreter_subquery->ignoreWithTotals(); interpreter_subquery->ignoreWithTotals();
} }
required_columns = query_analyzer->getRequiredSourceColumns(); required_columns = syntax_analyzer_result->requiredSourceColumns();
if (storage) if (storage)
source_header = storage->getSampleBlockForColumns(required_columns); source_header = storage->getSampleBlockForColumns(required_columns);
@ -678,7 +678,16 @@ static SortingInfoPtr optimizeReadInOrder(const MergeTreeData & merge_tree, cons
size_t prefix_size = std::min(order_descr.size(), sorting_key_columns.size()); size_t prefix_size = std::min(order_descr.size(), sorting_key_columns.size());
auto order_by_expr = query.orderBy(); auto order_by_expr = query.orderBy();
auto syntax_result = SyntaxAnalyzer(context).analyze(order_by_expr, merge_tree.getColumns().getAllPhysical()); SyntaxAnalyzerResultPtr syntax_result;
try
{
syntax_result = SyntaxAnalyzer(context).analyze(order_by_expr, merge_tree.getColumns().getAllPhysical());
}
catch (const Exception &)
{
return {};
}
for (size_t i = 0; i < prefix_size; ++i) for (size_t i = 0; i < prefix_size; ++i)
{ {
/// Read in pk order in case of exact match with order key element /// Read in pk order in case of exact match with order key element
@ -792,7 +801,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable /// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if (settings.optimize_move_to_prewhere && query.where() && !query.prewhere() && !query.final()) if (settings.optimize_move_to_prewhere && query.where() && !query.prewhere() && !query.final())
MergeTreeWhereOptimizer{current_info, context, merge_tree, query_analyzer->getRequiredSourceColumns(), log}; MergeTreeWhereOptimizer{current_info, context, merge_tree, syntax_analyzer_result->requiredSourceColumns(), log};
}; };
if (const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get())) if (const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))

View File

@ -186,8 +186,7 @@ void MutationsInterpreter::prepare(bool dry_run)
{ {
auto query = column.default_desc.expression->clone(); auto query = column.default_desc.expression->clone();
auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns); auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
ExpressionAnalyzer analyzer(query, syntax_result, context); for (const String & dependency : syntax_result->requiredSourceColumns())
for (const String & dependency : analyzer.getRequiredSourceColumns())
{ {
if (updated_columns.count(dependency)) if (updated_columns.count(dependency))
column_to_affected_materialized[dependency].push_back(column.name); column_to_affected_materialized[dependency].push_back(column.name);

View File

@ -40,7 +40,7 @@ static std::vector<String> extractNamesFromLambda(const ASTFunction & node)
return names; return names;
} }
bool RequiredSourceColumnsMatcher::needChildVisit(ASTPtr & node, const ASTPtr & child) bool RequiredSourceColumnsMatcher::needChildVisit(const ASTPtr & node, const ASTPtr & child)
{ {
if (child->as<ASTSelectQuery>()) if (child->as<ASTSelectQuery>())
return false; return false;
@ -60,7 +60,7 @@ bool RequiredSourceColumnsMatcher::needChildVisit(ASTPtr & node, const ASTPtr &
return true; return true;
} }
void RequiredSourceColumnsMatcher::visit(ASTPtr & ast, Data & data) void RequiredSourceColumnsMatcher::visit(const ASTPtr & ast, Data & data)
{ {
/// results are columns /// results are columns
@ -111,7 +111,7 @@ void RequiredSourceColumnsMatcher::visit(ASTPtr & ast, Data & data)
} }
} }
void RequiredSourceColumnsMatcher::visit(ASTSelectQuery & select, const ASTPtr &, Data & data) void RequiredSourceColumnsMatcher::visit(const ASTSelectQuery & select, const ASTPtr &, Data & data)
{ {
/// special case for top-level SELECT items: they are publics /// special case for top-level SELECT items: they are publics
for (auto & node : select.select()->children) for (auto & node : select.select()->children)
@ -128,7 +128,7 @@ void RequiredSourceColumnsMatcher::visit(ASTSelectQuery & select, const ASTPtr &
Visitor(data).visit(node); Visitor(data).visit(node);
/// revisit select_expression_list (with children) when all the aliases are set /// revisit select_expression_list (with children) when all the aliases are set
Visitor(data).visit(select.refSelect()); Visitor(data).visit(select.select());
} }
void RequiredSourceColumnsMatcher::visit(const ASTIdentifier & node, const ASTPtr &, Data & data) void RequiredSourceColumnsMatcher::visit(const ASTIdentifier & node, const ASTPtr &, Data & data)
@ -158,7 +158,7 @@ void RequiredSourceColumnsMatcher::visit(const ASTFunction & node, const ASTPtr
} }
} }
void RequiredSourceColumnsMatcher::visit(ASTTablesInSelectQueryElement & node, const ASTPtr &, Data & data) void RequiredSourceColumnsMatcher::visit(const ASTTablesInSelectQueryElement & node, const ASTPtr &, Data & data)
{ {
ASTTableExpression * expr = nullptr; ASTTableExpression * expr = nullptr;
ASTTableJoin * join = nullptr; ASTTableJoin * join = nullptr;
@ -177,7 +177,7 @@ void RequiredSourceColumnsMatcher::visit(ASTTablesInSelectQueryElement & node, c
} }
/// ASTIdentifiers here are tables. Do not visit them as generic ones. /// ASTIdentifiers here are tables. Do not visit them as generic ones.
void RequiredSourceColumnsMatcher::visit(ASTTableExpression & node, const ASTPtr &, Data & data) void RequiredSourceColumnsMatcher::visit(const ASTTableExpression & node, const ASTPtr &, Data & data)
{ {
if (node.database_and_table_name) if (node.database_and_table_name)
data.addTableAliasIfAny(*node.database_and_table_name); data.addTableAliasIfAny(*node.database_and_table_name);

View File

@ -21,19 +21,19 @@ struct ASTTableExpression;
class RequiredSourceColumnsMatcher class RequiredSourceColumnsMatcher
{ {
public: public:
using Visitor = InDepthNodeVisitor<RequiredSourceColumnsMatcher, false>; using Visitor = ConstInDepthNodeVisitor<RequiredSourceColumnsMatcher, false>;
using Data = ColumnNamesContext; using Data = ColumnNamesContext;
static bool needChildVisit(ASTPtr & node, const ASTPtr & child); static bool needChildVisit(const ASTPtr & node, const ASTPtr & child);
static void visit(ASTPtr & ast, Data & data); static void visit(const ASTPtr & ast, Data & data);
private: private:
static void visit(const ASTIdentifier & node, const ASTPtr &, Data & data); static void visit(const ASTIdentifier & node, const ASTPtr &, Data & data);
static void visit(const ASTFunction & node, const ASTPtr &, Data & data); static void visit(const ASTFunction & node, const ASTPtr &, Data & data);
static void visit(ASTTablesInSelectQueryElement & node, const ASTPtr &, Data & data); static void visit(const ASTTablesInSelectQueryElement & node, const ASTPtr &, Data & data);
static void visit(ASTTableExpression & node, const ASTPtr &, Data & data); static void visit(const ASTTableExpression & node, const ASTPtr &, Data & data);
static void visit(const ASTArrayJoin & node, const ASTPtr &, Data & data); static void visit(const ASTArrayJoin & node, const ASTPtr &, Data & data);
static void visit(ASTSelectQuery & select, const ASTPtr &, Data & data); static void visit(const ASTSelectQuery & select, const ASTPtr &, Data & data);
}; };
/// Extracts all the information about columns and tables from ASTSelectQuery block into ColumnNamesContext object. /// Extracts all the information about columns and tables from ASTSelectQuery block into ColumnNamesContext object.

View File

@ -13,6 +13,7 @@
#include <Interpreters/CollectJoinOnKeysVisitor.h> #include <Interpreters/CollectJoinOnKeysVisitor.h>
#include <Interpreters/ExternalDictionaries.h> #include <Interpreters/ExternalDictionaries.h>
#include <Interpreters/OptimizeIfWithConstantConditionVisitor.h> #include <Interpreters/OptimizeIfWithConstantConditionVisitor.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
@ -44,6 +45,7 @@ namespace ErrorCodes
extern const int INVALID_JOIN_ON_EXPRESSION; extern const int INVALID_JOIN_ON_EXPRESSION;
extern const int EMPTY_LIST_OF_COLUMNS_QUERIED; extern const int EMPTY_LIST_OF_COLUMNS_QUERIED;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int UNKNOWN_IDENTIFIER;
} }
NameSet removeDuplicateColumns(NamesAndTypesList & columns) NameSet removeDuplicateColumns(NamesAndTypesList & columns)
@ -558,12 +560,181 @@ void checkJoin(const ASTTablesInSelectQueryElement * join)
} }
/// Calculate which columns are required to execute the expression.
/// Then, delete all other columns from the list of available columns.
/// After execution, columns will only contain the list of columns needed to read from the table.
void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns)
{
/// We caclulate required_source_columns with source_columns modifications and swap them on exit
required_source_columns = source_columns;
if (!additional_source_columns.empty())
{
source_columns.insert(source_columns.end(), additional_source_columns.begin(), additional_source_columns.end());
removeDuplicateColumns(source_columns);
}
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(query);
NameSet source_column_names;
for (const auto & column : source_columns)
source_column_names.insert(column.name);
NameSet required = columns_context.requiredColumns();
if (columns_context.has_table_join)
{
NameSet avaliable_columns;
for (const auto & name : source_columns)
avaliable_columns.insert(name.name);
/// Add columns obtained by JOIN (if needed).
columns_added_by_join.clear();
for (const auto & joined_column : analyzed_join.available_joined_columns)
{
auto & name = joined_column.name;
if (avaliable_columns.count(name))
continue;
if (required.count(name))
{
/// Optimisation: do not add columns needed only in JOIN ON section.
if (columns_context.nameInclusion(name) > analyzed_join.rightKeyInclusion(name))
columns_added_by_join.push_back(joined_column);
required.erase(name);
}
}
}
NameSet array_join_sources;
if (columns_context.has_array_join)
{
/// Insert the columns required for the ARRAY JOIN calculation into the required columns list.
for (const auto & result_source : array_join_result_to_source)
array_join_sources.insert(result_source.second);
for (const auto & column_name_type : source_columns)
if (array_join_sources.count(column_name_type.name))
required.insert(column_name_type.name);
}
const auto * select_query = query->as<ASTSelectQuery>();
/// You need to read at least one column to find the number of rows.
if (select_query && required.empty())
{
/// We will find a column with minimum <compressed_size, type_size, uncompressed_size>.
/// Because it is the column that is cheapest to read.
struct ColumnSizeTuple
{
size_t compressed_size;
size_t type_size;
size_t uncompressed_size;
String name;
bool operator<(const ColumnSizeTuple & that) const
{
return std::tie(compressed_size, type_size, uncompressed_size)
< std::tie(that.compressed_size, that.type_size, that.uncompressed_size);
}
};
std::vector<ColumnSizeTuple> columns;
if (storage)
{
auto column_sizes = storage->getColumnSizes();
for (auto & source_column : source_columns)
{
auto c = column_sizes.find(source_column.name);
if (c == column_sizes.end())
continue;
size_t type_size = source_column.type->haveMaximumSizeOfValue() ? source_column.type->getMaximumSizeOfValueInMemory() : 100;
columns.emplace_back(ColumnSizeTuple{c->second.data_compressed, type_size, c->second.data_uncompressed, source_column.name});
}
}
if (columns.size())
required.insert(std::min_element(columns.begin(), columns.end())->name);
else
/// If we have no information about columns sizes, choose a column of minimum size of its data type.
required.insert(ExpressionActions::getSmallestColumn(source_columns));
}
NameSet unknown_required_source_columns = required;
for (NamesAndTypesList::iterator it = source_columns.begin(); it != source_columns.end();)
{
const String & column_name = it->name;
unknown_required_source_columns.erase(column_name);
if (!required.count(column_name))
source_columns.erase(it++);
else
++it;
}
/// If there are virtual columns among the unknown columns. Remove them from the list of unknown and add
/// in columns list, so that when further processing they are also considered.
if (storage)
{
for (auto it = unknown_required_source_columns.begin(); it != unknown_required_source_columns.end();)
{
if (storage->hasColumn(*it))
{
source_columns.push_back(storage->getColumn(*it));
unknown_required_source_columns.erase(it++);
}
else
++it;
}
}
if (!unknown_required_source_columns.empty())
{
std::stringstream ss;
ss << "Missing columns:";
for (const auto & name : unknown_required_source_columns)
ss << " '" << name << "'";
ss << " while processing query: '" << queryToString(query) << "'";
ss << ", required columns:";
for (const auto & name : columns_context.requiredColumns())
ss << " '" << name << "'";
if (!source_column_names.empty())
{
ss << ", source columns:";
for (const auto & name : source_column_names)
ss << " '" << name << "'";
}
else
ss << ", no source columns";
if (columns_context.has_table_join)
{
ss << ", joined columns:";
for (const auto & column : analyzed_join.available_joined_columns)
ss << " '" << column.name << "'";
}
if (!array_join_sources.empty())
{
ss << ", arrayJoin columns:";
for (const auto & name : array_join_sources)
ss << " '" << name << "'";
}
throw Exception(ss.str(), ErrorCodes::UNKNOWN_IDENTIFIER);
}
required_source_columns.swap(source_columns);
}
SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze( SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
ASTPtr & query, ASTPtr & query,
const NamesAndTypesList & source_columns_, const NamesAndTypesList & source_columns_,
const Names & required_result_columns, const Names & required_result_columns,
StoragePtr storage) const StoragePtr storage,
const NamesAndTypesList & additional_source_columns) const
{ {
auto * select_query = query->as<ASTSelectQuery>(); auto * select_query = query->as<ASTSelectQuery>();
if (!storage && select_query) if (!storage && select_query)
@ -669,6 +840,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
collectJoinedColumns(result.analyzed_join, *select_query, source_columns_set, result.aliases, settings.join_use_nulls); collectJoinedColumns(result.analyzed_join, *select_query, source_columns_set, result.aliases, settings.join_use_nulls);
} }
result.collectUsedColumns(query, additional_source_columns);
return std::make_shared<const SyntaxAnalyzerResult>(result); return std::make_shared<const SyntaxAnalyzerResult>(result);
} }

View File

@ -13,8 +13,13 @@ NameSet removeDuplicateColumns(NamesAndTypesList & columns);
struct SyntaxAnalyzerResult struct SyntaxAnalyzerResult
{ {
StoragePtr storage; StoragePtr storage;
AnalyzedJoin analyzed_join;
NamesAndTypesList source_columns; NamesAndTypesList source_columns;
/// Set of columns that are enough to read from the table to evaluate the expression. It does not include joined columns.
NamesAndTypesList required_source_columns;
/// Columns will be added to block by JOIN. It's a subset of analyzed_join.available_joined_columns
NamesAndTypesList columns_added_by_join;
Aliases aliases; Aliases aliases;
@ -31,10 +36,11 @@ struct SyntaxAnalyzerResult
/// Note: not used further. /// Note: not used further.
NameToNameMap array_join_name_to_alias; NameToNameMap array_join_name_to_alias;
AnalyzedJoin analyzed_join;
/// Predicate optimizer overrides the sub queries /// Predicate optimizer overrides the sub queries
bool rewrite_subqueries = false; bool rewrite_subqueries = false;
void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns);
Names requiredSourceColumns() const { return required_source_columns.getNames(); }
}; };
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>; using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
@ -64,7 +70,8 @@ public:
ASTPtr & query, ASTPtr & query,
const NamesAndTypesList & source_columns_, const NamesAndTypesList & source_columns_,
const Names & required_result_columns = {}, const Names & required_result_columns = {},
StoragePtr storage = {}) const; StoragePtr storage = {},
const NamesAndTypesList & additional_source_columns = {}) const;
private: private:
const Context & context; const Context & context;

View File

@ -61,7 +61,7 @@ void evaluateMissingDefaults(Block & block,
auto syntax_result = SyntaxAnalyzer(context).analyze(default_expr_list, block.getNamesAndTypesList()); auto syntax_result = SyntaxAnalyzer(context).analyze(default_expr_list, block.getNamesAndTypesList());
auto expression_analyzer = ExpressionAnalyzer{default_expr_list, syntax_result, context}; auto expression_analyzer = ExpressionAnalyzer{default_expr_list, syntax_result, context};
auto required_source_columns = expression_analyzer.getRequiredSourceColumns(); auto required_source_columns = syntax_result->requiredSourceColumns();
auto rows_was = copy_block.rows(); auto rows_was = copy_block.rows();
// Delete all not needed columns in DEFAULT expression. // Delete all not needed columns in DEFAULT expression.

View File

@ -19,7 +19,7 @@ KafkaBlockInputStream::KafkaBlockInputStream(
if (!storage.getSchemaName().empty()) if (!storage.getSchemaName().empty())
context.setSetting("format_schema", storage.getSchemaName()); context.setSetting("format_schema", storage.getSchemaName());
virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneEmptyColumns(); virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneEmptyColumns();
} }
KafkaBlockInputStream::~KafkaBlockInputStream() KafkaBlockInputStream::~KafkaBlockInputStream()
@ -60,9 +60,14 @@ void KafkaBlockInputStream::readPrefixImpl()
auto read_callback = [this] auto read_callback = [this]
{ {
const auto * sub_buffer = buffer->subBufferAs<ReadBufferFromKafkaConsumer>(); const auto * sub_buffer = buffer->subBufferAs<ReadBufferFromKafkaConsumer>();
virtual_columns[0]->insert(sub_buffer->currentTopic()); // "topic" virtual_columns[0]->insert(sub_buffer->currentTopic()); // "topic"
virtual_columns[1]->insert(sub_buffer->currentKey()); // "key" virtual_columns[1]->insert(sub_buffer->currentKey()); // "key"
virtual_columns[2]->insert(sub_buffer->currentOffset()); // "offset" virtual_columns[2]->insert(sub_buffer->currentOffset()); // "offset"
virtual_columns[3]->insert(sub_buffer->currentPartition()); // "partition"
auto timestamp = sub_buffer->currentTimestamp();
if (timestamp)
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(timestamp->get_timestamp()).count()); // "timestamp"
}; };
auto child = FormatFactory::instance().getInput( auto child = FormatFactory::instance().getInput(
@ -79,8 +84,8 @@ Block KafkaBlockInputStream::readImpl()
if (!block) if (!block)
return block; return block;
Block virtual_block = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneWithColumns(std::move(virtual_columns)); Block virtual_block = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneWithColumns(std::move(virtual_columns));
virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneEmptyColumns(); virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneEmptyColumns();
for (const auto & column : virtual_block.getColumnsWithTypeAndName()) for (const auto & column : virtual_block.getColumnsWithTypeAndName())
block.insert(column); block.insert(column);

View File

@ -30,6 +30,8 @@ public:
String currentTopic() const { return current[-1].get_topic(); } String currentTopic() const { return current[-1].get_topic(); }
String currentKey() const { return current[-1].get_key(); } String currentKey() const { return current[-1].get_key(); }
auto currentOffset() const { return current[-1].get_offset(); } auto currentOffset() const { return current[-1].get_offset(); }
auto currentPartition() const { return current[-1].get_partition(); }
auto currentTimestamp() const { return current[-1].get_timestamp(); }
private: private:
using Messages = std::vector<cppkafka::Message>; using Messages = std::vector<cppkafka::Message>;

View File

@ -4,6 +4,8 @@
#include <DataStreams/LimitBlockInputStream.h> #include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h> #include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h> #include <DataStreams/copyData.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <Interpreters/InterpreterInsertQuery.h> #include <Interpreters/InterpreterInsertQuery.h>
@ -85,7 +87,9 @@ StorageKafka::StorageKafka(
columns_, columns_,
ColumnsDescription({{"_topic", std::make_shared<DataTypeString>()}, ColumnsDescription({{"_topic", std::make_shared<DataTypeString>()},
{"_key", std::make_shared<DataTypeString>()}, {"_key", std::make_shared<DataTypeString>()},
{"_offset", std::make_shared<DataTypeUInt64>()}}, true)) {"_offset", std::make_shared<DataTypeUInt64>()},
{"_partition", std::make_shared<DataTypeUInt64>()},
{"_timestamp", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())}}, true))
, table_name(table_name_) , table_name(table_name_)
, database_name(database_name_) , database_name(database_name_)
, global_context(context_) , global_context(context_)

View File

@ -134,8 +134,7 @@ MergeTreeData::MergeTreeData(
throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS); throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS);
auto syntax = SyntaxAnalyzer(global_context).analyze(sample_by_ast, getColumns().getAllPhysical()); auto syntax = SyntaxAnalyzer(global_context).analyze(sample_by_ast, getColumns().getAllPhysical());
columns_required_for_sampling = ExpressionAnalyzer(sample_by_ast, syntax, global_context) columns_required_for_sampling = syntax->requiredSourceColumns();
.getRequiredSourceColumns();
} }
MergeTreeDataFormatVersion min_format_version(0); MergeTreeDataFormatVersion min_format_version(0);
if (!date_column_name.empty()) if (!date_column_name.empty())
@ -295,8 +294,7 @@ void MergeTreeData::setPrimaryKeyIndicesAndColumns(
if (!added_key_column_expr_list->children.empty()) if (!added_key_column_expr_list->children.empty())
{ {
auto syntax = SyntaxAnalyzer(global_context).analyze(added_key_column_expr_list, all_columns); auto syntax = SyntaxAnalyzer(global_context).analyze(added_key_column_expr_list, all_columns);
Names used_columns = ExpressionAnalyzer(added_key_column_expr_list, syntax, global_context) Names used_columns = syntax->requiredSourceColumns();
.getRequiredSourceColumns();
NamesAndTypesList deleted_columns; NamesAndTypesList deleted_columns;
NamesAndTypesList added_columns; NamesAndTypesList added_columns;

View File

@ -8,7 +8,6 @@
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
#include <Interpreters/SyntaxAnalyzer.h> #include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Storages/transformQueryForExternalDatabase.h> #include <Storages/transformQueryForExternalDatabase.h>
#include <Storages/MergeTree/KeyCondition.h> #include <Storages/MergeTree/KeyCondition.h>
@ -111,8 +110,7 @@ String transformQueryForExternalDatabase(
{ {
auto clone_query = query.clone(); auto clone_query = query.clone();
auto syntax_result = SyntaxAnalyzer(context).analyze(clone_query, available_columns); auto syntax_result = SyntaxAnalyzer(context).analyze(clone_query, available_columns);
ExpressionAnalyzer analyzer(clone_query, syntax_result, context); const Names used_columns = syntax_result->requiredSourceColumns();
const Names & used_columns = analyzer.getRequiredSourceColumns();
auto select = std::make_shared<ASTSelectQuery>(); auto select = std::make_shared<ASTSelectQuery>();

View File

@ -61,10 +61,10 @@ def wait_kafka_is_available(max_retries=50):
time.sleep(1) time.sleep(1)
def kafka_produce(topic, messages): def kafka_produce(topic, messages, timestamp=None):
producer = KafkaProducer(bootstrap_servers="localhost:9092") producer = KafkaProducer(bootstrap_servers="localhost:9092")
for message in messages: for message in messages:
producer.send(topic=topic, value=message) producer.send(topic=topic, value=message, timestamp_ms=timestamp)
producer.flush() producer.flush()
print ("Produced {} messages for topic {}".format(len(messages), topic)) print ("Produced {} messages for topic {}".format(len(messages), topic))
@ -389,16 +389,16 @@ def test_kafka_virtual_columns(kafka_cluster):
messages = '' messages = ''
for i in range(25): for i in range(25):
messages += json.dumps({'key': i, 'value': i}) + '\n' messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('virt1', [messages]) kafka_produce('virt1', [messages], 0)
messages = '' messages = ''
for i in range(25, 50): for i in range(25, 50):
messages += json.dumps({'key': i, 'value': i}) + '\n' messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('virt1', [messages]) kafka_produce('virt1', [messages], 0)
result = '' result = ''
while True: while True:
result += instance.query('SELECT _key, key, _topic, value, _offset FROM test.kafka', ignore_error=True) result += instance.query('SELECT _key, key, _topic, value, _offset, _partition, _timestamp FROM test.kafka', ignore_error=True)
if kafka_check_result(result, False, 'test_kafka_virtual1.reference'): if kafka_check_result(result, False, 'test_kafka_virtual1.reference'):
break break
@ -417,20 +417,20 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
kafka_group_name = 'virt2', kafka_group_name = 'virt2',
kafka_format = 'JSONEachRow', kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n'; kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64) CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64, partition UInt64, timestamp Nullable(DateTime))
ENGINE = MergeTree() ENGINE = MergeTree()
ORDER BY key; ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT *, _key as kafka_key, _topic as topic, _offset as offset FROM test.kafka; SELECT *, _key as kafka_key, _topic as topic, _offset as offset, _partition as partition, _timestamp as timestamp FROM test.kafka;
''') ''')
messages = [] messages = []
for i in range(50): for i in range(50):
messages.append(json.dumps({'key': i, 'value': i})) messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('virt2', messages) kafka_produce('virt2', messages, 0)
while True: while True:
result = instance.query('SELECT kafka_key, key, topic, value, offset FROM test.view') result = instance.query('SELECT kafka_key, key, topic, value, offset, partition, timestamp FROM test.view')
if kafka_check_result(result, False, 'test_kafka_virtual2.reference'): if kafka_check_result(result, False, 'test_kafka_virtual2.reference'):
break break

View File

@ -1,50 +1,50 @@
0 virt1 0 0 0 virt1 0 0 0 0000-00-00 00:00:00
1 virt1 1 0 1 virt1 1 0 0 0000-00-00 00:00:00
2 virt1 2 0 2 virt1 2 0 0 0000-00-00 00:00:00
3 virt1 3 0 3 virt1 3 0 0 0000-00-00 00:00:00
4 virt1 4 0 4 virt1 4 0 0 0000-00-00 00:00:00
5 virt1 5 0 5 virt1 5 0 0 0000-00-00 00:00:00
6 virt1 6 0 6 virt1 6 0 0 0000-00-00 00:00:00
7 virt1 7 0 7 virt1 7 0 0 0000-00-00 00:00:00
8 virt1 8 0 8 virt1 8 0 0 0000-00-00 00:00:00
9 virt1 9 0 9 virt1 9 0 0 0000-00-00 00:00:00
10 virt1 10 0 10 virt1 10 0 0 0000-00-00 00:00:00
11 virt1 11 0 11 virt1 11 0 0 0000-00-00 00:00:00
12 virt1 12 0 12 virt1 12 0 0 0000-00-00 00:00:00
13 virt1 13 0 13 virt1 13 0 0 0000-00-00 00:00:00
14 virt1 14 0 14 virt1 14 0 0 0000-00-00 00:00:00
15 virt1 15 0 15 virt1 15 0 0 0000-00-00 00:00:00
16 virt1 16 0 16 virt1 16 0 0 0000-00-00 00:00:00
17 virt1 17 0 17 virt1 17 0 0 0000-00-00 00:00:00
18 virt1 18 0 18 virt1 18 0 0 0000-00-00 00:00:00
19 virt1 19 0 19 virt1 19 0 0 0000-00-00 00:00:00
20 virt1 20 0 20 virt1 20 0 0 0000-00-00 00:00:00
21 virt1 21 0 21 virt1 21 0 0 0000-00-00 00:00:00
22 virt1 22 0 22 virt1 22 0 0 0000-00-00 00:00:00
23 virt1 23 0 23 virt1 23 0 0 0000-00-00 00:00:00
24 virt1 24 0 24 virt1 24 0 0 0000-00-00 00:00:00
25 virt1 25 1 25 virt1 25 1 0 0000-00-00 00:00:00
26 virt1 26 1 26 virt1 26 1 0 0000-00-00 00:00:00
27 virt1 27 1 27 virt1 27 1 0 0000-00-00 00:00:00
28 virt1 28 1 28 virt1 28 1 0 0000-00-00 00:00:00
29 virt1 29 1 29 virt1 29 1 0 0000-00-00 00:00:00
30 virt1 30 1 30 virt1 30 1 0 0000-00-00 00:00:00
31 virt1 31 1 31 virt1 31 1 0 0000-00-00 00:00:00
32 virt1 32 1 32 virt1 32 1 0 0000-00-00 00:00:00
33 virt1 33 1 33 virt1 33 1 0 0000-00-00 00:00:00
34 virt1 34 1 34 virt1 34 1 0 0000-00-00 00:00:00
35 virt1 35 1 35 virt1 35 1 0 0000-00-00 00:00:00
36 virt1 36 1 36 virt1 36 1 0 0000-00-00 00:00:00
37 virt1 37 1 37 virt1 37 1 0 0000-00-00 00:00:00
38 virt1 38 1 38 virt1 38 1 0 0000-00-00 00:00:00
39 virt1 39 1 39 virt1 39 1 0 0000-00-00 00:00:00
40 virt1 40 1 40 virt1 40 1 0 0000-00-00 00:00:00
41 virt1 41 1 41 virt1 41 1 0 0000-00-00 00:00:00
42 virt1 42 1 42 virt1 42 1 0 0000-00-00 00:00:00
43 virt1 43 1 43 virt1 43 1 0 0000-00-00 00:00:00
44 virt1 44 1 44 virt1 44 1 0 0000-00-00 00:00:00
45 virt1 45 1 45 virt1 45 1 0 0000-00-00 00:00:00
46 virt1 46 1 46 virt1 46 1 0 0000-00-00 00:00:00
47 virt1 47 1 47 virt1 47 1 0 0000-00-00 00:00:00
48 virt1 48 1 48 virt1 48 1 0 0000-00-00 00:00:00
49 virt1 49 1 49 virt1 49 1 0 0000-00-00 00:00:00

View File

@ -1,50 +1,50 @@
0 virt2 0 0 0 virt2 0 0 0 0000-00-00 00:00:00
1 virt2 1 1 1 virt2 1 1 0 0000-00-00 00:00:00
2 virt2 2 2 2 virt2 2 2 0 0000-00-00 00:00:00
3 virt2 3 3 3 virt2 3 3 0 0000-00-00 00:00:00
4 virt2 4 4 4 virt2 4 4 0 0000-00-00 00:00:00
5 virt2 5 5 5 virt2 5 5 0 0000-00-00 00:00:00
6 virt2 6 6 6 virt2 6 6 0 0000-00-00 00:00:00
7 virt2 7 7 7 virt2 7 7 0 0000-00-00 00:00:00
8 virt2 8 8 8 virt2 8 8 0 0000-00-00 00:00:00
9 virt2 9 9 9 virt2 9 9 0 0000-00-00 00:00:00
10 virt2 10 10 10 virt2 10 10 0 0000-00-00 00:00:00
11 virt2 11 11 11 virt2 11 11 0 0000-00-00 00:00:00
12 virt2 12 12 12 virt2 12 12 0 0000-00-00 00:00:00
13 virt2 13 13 13 virt2 13 13 0 0000-00-00 00:00:00
14 virt2 14 14 14 virt2 14 14 0 0000-00-00 00:00:00
15 virt2 15 15 15 virt2 15 15 0 0000-00-00 00:00:00
16 virt2 16 16 16 virt2 16 16 0 0000-00-00 00:00:00
17 virt2 17 17 17 virt2 17 17 0 0000-00-00 00:00:00
18 virt2 18 18 18 virt2 18 18 0 0000-00-00 00:00:00
19 virt2 19 19 19 virt2 19 19 0 0000-00-00 00:00:00
20 virt2 20 20 20 virt2 20 20 0 0000-00-00 00:00:00
21 virt2 21 21 21 virt2 21 21 0 0000-00-00 00:00:00
22 virt2 22 22 22 virt2 22 22 0 0000-00-00 00:00:00
23 virt2 23 23 23 virt2 23 23 0 0000-00-00 00:00:00
24 virt2 24 24 24 virt2 24 24 0 0000-00-00 00:00:00
25 virt2 25 25 25 virt2 25 25 0 0000-00-00 00:00:00
26 virt2 26 26 26 virt2 26 26 0 0000-00-00 00:00:00
27 virt2 27 27 27 virt2 27 27 0 0000-00-00 00:00:00
28 virt2 28 28 28 virt2 28 28 0 0000-00-00 00:00:00
29 virt2 29 29 29 virt2 29 29 0 0000-00-00 00:00:00
30 virt2 30 30 30 virt2 30 30 0 0000-00-00 00:00:00
31 virt2 31 31 31 virt2 31 31 0 0000-00-00 00:00:00
32 virt2 32 32 32 virt2 32 32 0 0000-00-00 00:00:00
33 virt2 33 33 33 virt2 33 33 0 0000-00-00 00:00:00
34 virt2 34 34 34 virt2 34 34 0 0000-00-00 00:00:00
35 virt2 35 35 35 virt2 35 35 0 0000-00-00 00:00:00
36 virt2 36 36 36 virt2 36 36 0 0000-00-00 00:00:00
37 virt2 37 37 37 virt2 37 37 0 0000-00-00 00:00:00
38 virt2 38 38 38 virt2 38 38 0 0000-00-00 00:00:00
39 virt2 39 39 39 virt2 39 39 0 0000-00-00 00:00:00
40 virt2 40 40 40 virt2 40 40 0 0000-00-00 00:00:00
41 virt2 41 41 41 virt2 41 41 0 0000-00-00 00:00:00
42 virt2 42 42 42 virt2 42 42 0 0000-00-00 00:00:00
43 virt2 43 43 43 virt2 43 43 0 0000-00-00 00:00:00
44 virt2 44 44 44 virt2 44 44 0 0000-00-00 00:00:00
45 virt2 45 45 45 virt2 45 45 0 0000-00-00 00:00:00
46 virt2 46 46 46 virt2 46 46 0 0000-00-00 00:00:00
47 virt2 47 47 47 virt2 47 47 0 0000-00-00 00:00:00
48 virt2 48 48 48 virt2 48 48 0 0000-00-00 00:00:00
49 virt2 49 49 49 virt2 49 49 0 0000-00-00 00:00:00

View File

@ -0,0 +1,3 @@
['0','1','2','3','4','5','6','7','8','9']
['0','1','2','3','4','5','6','7','8','9']
['2999999','2999998','2999997','2999996','2999995','2999994','2999993','2999992','2999991','2999990']

View File

@ -0,0 +1,12 @@
DROP TABLE IF EXISTS topk;
CREATE TABLE topk (val1 String, val2 UInt32) ENGINE = MergeTree ORDER BY val1;
INSERT INTO topk SELECT toString(number), number FROM numbers(3000000);
INSERT INTO topk SELECT toString(number % 10), 999999999 FROM numbers(1000000);
SELECT arraySort(topK(10)(val1)) FROM topk;
SELECT arraySort(topKWeighted(10)(val1, val2)) FROM topk;
SELECT topKWeighted(10)(toString(number), number) from numbers(3000000);
DROP TABLE topk;