Merge branch 'master' into speed-up-parts-removal

This commit is contained in:
Alexey Milovidov 2019-08-11 22:26:24 +03:00
commit 006a5e51b7
22 changed files with 452 additions and 409 deletions

View File

@ -113,7 +113,8 @@ public:
}
TKey key;
size_t slot, hash;
size_t slot;
size_t hash;
UInt64 count;
UInt64 error;
};
@ -147,15 +148,13 @@ public:
void insert(const TKey & key, UInt64 increment = 1, UInt64 error = 0)
{
// Increase weight of a key that already exists
// It uses hashtable for both value mapping as a presence test (c_i != 0)
auto hash = counter_map.hash(key);
auto it = counter_map.find(key, hash);
if (it != counter_map.end())
auto counter = findCounter(key, hash);
if (counter)
{
auto c = it->getSecond();
c->count += increment;
c->error += error;
percolate(c);
counter->count += increment;
counter->error += error;
percolate(counter);
return;
}
// Key doesn't exist, but can fit in the top K
@ -177,6 +176,7 @@ public:
push(new Counter(arena.emplace(key), increment, error, hash));
return;
}
const size_t alpha_mask = alpha_map.size() - 1;
auto & alpha = alpha_map[hash & alpha_mask];
if (alpha + increment < min->count)
@ -187,22 +187,9 @@ public:
// Erase the current minimum element
alpha_map[min->hash & alpha_mask] = min->count;
it = counter_map.find(min->key, min->hash);
destroyLastElement();
// Replace minimum with newly inserted element
if (it != counter_map.end())
{
arena.free(min->key);
min->hash = hash;
min->key = arena.emplace(key);
min->count = alpha + increment;
min->error = alpha + error;
percolate(min);
it->getSecond() = min;
it->getFirstMutable() = min->key;
counter_map.reinsert(it, hash);
}
push(new Counter(arena.emplace(key), alpha + increment, alpha + error, hash));
}
/*
@ -242,17 +229,35 @@ public:
// The list is sorted in descending order, we have to scan in reverse
for (auto counter : boost::adaptors::reverse(rhs.counter_list))
{
if (counter_map.find(counter->key) != counter_map.end())
size_t hash = counter_map.hash(counter->key);
if (auto current = findCounter(counter->key, hash))
{
// Subtract m2 previously added, guaranteed not negative
insert(counter->key, counter->count - m2, counter->error - m2);
current->count += (counter->count - m2);
current->error += (counter->error - m2);
}
else
{
// Counters not monitored in S1
insert(counter->key, counter->count + m1, counter->error + m1);
counter_list.push_back(new Counter(arena.emplace(counter->key), counter->count + m1, counter->error + m1, hash));
}
}
std::sort(counter_list.begin(), counter_list.end(), [](Counter * l, Counter * r) { return *l > *r; });
if (counter_list.size() > m_capacity)
{
for (size_t i = m_capacity; i < counter_list.size(); ++i)
{
arena.free(counter_list[i]->key);
delete counter_list[i];
}
counter_list.resize(m_capacity);
}
for (size_t i = 0; i < counter_list.size(); ++i)
counter_list[i]->slot = i;
rebuildCounterMap();
}
std::vector<Counter> topK(size_t k) const
@ -336,7 +341,10 @@ private:
void destroyElements()
{
for (auto counter : counter_list)
{
arena.free(counter->key);
delete counter;
}
counter_map.clear();
counter_list.clear();
@ -346,19 +354,40 @@ private:
void destroyLastElement()
{
auto last_element = counter_list.back();
auto cell = counter_map.find(last_element->key, last_element->hash);
cell->setZero();
counter_map.reinsert(cell, last_element->hash);
counter_list.pop_back();
arena.free(last_element->key);
delete last_element;
counter_list.pop_back();
++removed_keys;
if (removed_keys * 2 > counter_map.size())
rebuildCounterMap();
}
HashMap<TKey, Counter *, Hash, Grower, Allocator> counter_map;
Counter * findCounter(const TKey & key, size_t hash)
{
auto it = counter_map.find(key, hash);
if (it == counter_map.end())
return nullptr;
return it->getSecond();
}
void rebuildCounterMap()
{
removed_keys = 0;
counter_map.clear();
for (auto counter : counter_list)
counter_map[counter->key] = counter;
}
using CounterMap = HashMap<TKey, Counter *, Hash, Grower, Allocator>;
CounterMap counter_map;
std::vector<Counter *> counter_list;
std::vector<UInt64> alpha_map;
SpaceSavingArena<TKey> arena;
size_t m_capacity;
size_t removed_keys = 0;
};
}

View File

@ -80,7 +80,7 @@ ExpressionActionsPtr AnalyzedJoin::createJoinedBlockActions(
ASTPtr query = expression_list;
auto syntax_result = SyntaxAnalyzer(context).analyze(query, columns_from_joined_table, required_columns);
ExpressionAnalyzer analyzer(query, syntax_result, context, {}, required_columns_set);
ExpressionAnalyzer analyzer(query, syntax_result, context, required_columns_set);
return analyzer.getActions(true, false);
}

View File

@ -33,6 +33,7 @@ struct AnalyzedJoin
private:
friend class SyntaxAnalyzer;
friend struct SyntaxAnalyzerResult;
friend class ExpressionAnalyzer;
Names key_names_left;

View File

@ -58,7 +58,6 @@
#include <Interpreters/ActionsVisitor.h>
#include <Interpreters/ExternalTablesVisitor.h>
#include <Interpreters/GlobalSubqueriesVisitor.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
namespace DB
{
@ -77,28 +76,15 @@ ExpressionAnalyzer::ExpressionAnalyzer(
const ASTPtr & query_,
const SyntaxAnalyzerResultPtr & syntax_analyzer_result_,
const Context & context_,
const NamesAndTypesList & additional_source_columns,
const NameSet & required_result_columns_,
size_t subquery_depth_,
bool do_global_,
const SubqueriesForSets & subqueries_for_sets_)
: ExpressionAnalyzerData(syntax_analyzer_result_->source_columns, required_result_columns_, subqueries_for_sets_)
: ExpressionAnalyzerData(required_result_columns_, subqueries_for_sets_)
, query(query_), context(context_), settings(context.getSettings())
, subquery_depth(subquery_depth_), do_global(do_global_)
, syntax(syntax_analyzer_result_)
{
storage = syntax->storage;
rewrite_subqueries = syntax->rewrite_subqueries;
if (!additional_source_columns.empty())
{
source_columns.insert(source_columns.end(), additional_source_columns.begin(), additional_source_columns.end());
removeDuplicateColumns(source_columns);
}
/// Delete the unnecessary from `source_columns` list. Form `columns_added_by_join`.
collectUsedColumns();
/// external_tables, subqueries_for_sets for global subqueries.
/// Replaces global subqueries with the generated names of temporary tables that will be sent to remote servers.
initGlobalSubqueriesAndExternalTables();
@ -115,7 +101,7 @@ ExpressionAnalyzer::ExpressionAnalyzer(
bool ExpressionAnalyzer::isRemoteStorage() const
{
return storage && storage->isRemote();
return storage() && storage()->isRemote();
}
@ -133,7 +119,7 @@ void ExpressionAnalyzer::analyzeAggregation()
if (select_query && (select_query->groupBy() || select_query->having()))
has_aggregation = true;
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(source_columns, context);
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(sourceColumns(), context);
if (select_query)
{
@ -256,7 +242,7 @@ void ExpressionAnalyzer::makeSetsForIndex()
{
const auto * select_query = query->as<ASTSelectQuery>();
if (storage && select_query && storage->supportsIndexForIn())
if (storage() && select_query && storage()->supportsIndexForIn())
{
if (select_query->where())
makeSetsForIndexImpl(select_query->where());
@ -312,7 +298,7 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node)
{
const IAST & args = *func->arguments;
if (storage && storage->mayBenefitFromIndexForIn(args.children.at(0), context))
if (storage() && storage()->mayBenefitFromIndexForIn(args.children.at(0), context))
{
const ASTPtr & arg = args.children.at(1);
if (arg->as<ASTSubquery>() || arg->as<ASTIdentifier>())
@ -322,9 +308,9 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node)
}
else
{
NamesAndTypesList temp_columns = source_columns;
NamesAndTypesList temp_columns = sourceColumns();
temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end());
for (const auto & joined_column : columns_added_by_join)
for (const auto & joined_column : columnsAddedByJoin())
temp_columns.push_back(joined_column);
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(temp_columns, context);
getRootActions(func->arguments->children.at(0), true, temp_actions);
@ -343,7 +329,7 @@ void ExpressionAnalyzer::getRootActions(const ASTPtr & ast, bool no_subqueries,
{
LogAST log;
ActionsVisitor actions_visitor(context, settings.size_limits_for_set, subquery_depth,
source_columns, actions, prepared_sets, subqueries_for_sets,
sourceColumns(), actions, prepared_sets, subqueries_for_sets,
no_subqueries, only_consts, !isRemoteStorage(), log.stream());
actions_visitor.visit(ast);
actions = actions_visitor.popActionsLevel();
@ -356,7 +342,7 @@ void ExpressionAnalyzer::getActionsFromJoinKeys(const ASTTableJoin & table_join,
LogAST log;
ActionsVisitor actions_visitor(context, settings.size_limits_for_set, subquery_depth,
source_columns, actions, prepared_sets, subqueries_for_sets,
sourceColumns(), actions, prepared_sets, subqueries_for_sets,
no_subqueries, only_consts, !isRemoteStorage(), log.stream());
if (table_join.using_expression_list)
@ -494,7 +480,7 @@ bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool on
if (!array_join_expression_list)
return false;
initChain(chain, source_columns);
initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back();
getRootActions(array_join_expression_list, only_types, step.actions);
@ -507,12 +493,12 @@ bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool on
void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, bool only_types) const
{
if (only_types)
actions->add(ExpressionAction::ordinaryJoin(nullptr, analyzedJoin().key_names_left, columns_added_by_join));
actions->add(ExpressionAction::ordinaryJoin(nullptr, analyzedJoin().key_names_left, columnsAddedByJoin()));
else
for (auto & subquery_for_set : subqueries_for_sets)
if (subquery_for_set.second.join)
actions->add(ExpressionAction::ordinaryJoin(subquery_for_set.second.join, analyzedJoin().key_names_left,
columns_added_by_join));
columnsAddedByJoin()));
}
static void appendRequiredColumns(
@ -536,7 +522,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
if (!select_query->join())
return false;
initChain(chain, source_columns);
initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back();
const auto & join_element = select_query->join()->as<ASTTablesInSelectQueryElement &>();
@ -588,7 +574,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
auto & analyzed_join = analyzedJoin();
/// Actions which need to be calculated on joined block.
ExpressionActionsPtr joined_block_actions =
analyzed_join.createJoinedBlockActions(columns_added_by_join, select_query, context);
analyzed_join.createJoinedBlockActions(columnsAddedByJoin(), select_query, context);
/** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs
* - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1,
@ -610,7 +596,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
NameSet required_columns(action_columns.begin(), action_columns.end());
appendRequiredColumns(
required_columns, joined_block_actions->getSampleBlock(), analyzed_join.key_names_right, columns_added_by_join);
required_columns, joined_block_actions->getSampleBlock(), analyzed_join.key_names_right, columnsAddedByJoin());
auto original_map = analyzed_join.getOriginalColumnsMap(required_columns);
Names original_columns;
@ -647,7 +633,7 @@ bool ExpressionAnalyzer::appendPrewhere(
if (!select_query->prewhere())
return false;
initChain(chain, source_columns);
initChain(chain, sourceColumns());
auto & step = chain.getLastStep();
getRootActions(select_query->prewhere(), only_types, step.actions);
String prewhere_column_name = select_query->prewhere()->getColumnName();
@ -656,7 +642,7 @@ bool ExpressionAnalyzer::appendPrewhere(
{
/// Remove unused source_columns from prewhere actions.
auto tmp_actions = std::make_shared<ExpressionActions>(source_columns, context);
auto tmp_actions = std::make_shared<ExpressionActions>(sourceColumns(), context);
getRootActions(select_query->prewhere(), only_types, tmp_actions);
tmp_actions->finalize({prewhere_column_name});
auto required_columns = tmp_actions->getRequiredColumns();
@ -676,7 +662,7 @@ bool ExpressionAnalyzer::appendPrewhere(
auto names = step.actions->getSampleBlock().getNames();
NameSet name_set(names.begin(), names.end());
for (const auto & column : source_columns)
for (const auto & column : sourceColumns())
if (required_source_columns.count(column.name) == 0)
name_set.erase(column.name);
@ -697,7 +683,7 @@ bool ExpressionAnalyzer::appendPrewhere(
NameSet prewhere_input_names(required_columns.begin(), required_columns.end());
NameSet unused_source_columns;
for (const auto & column : source_columns)
for (const auto & column : sourceColumns())
{
if (prewhere_input_names.count(column.name) == 0)
{
@ -722,7 +708,7 @@ bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_t
if (!select_query->where())
return false;
initChain(chain, source_columns);
initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back();
step.required_output.push_back(select_query->where()->getColumnName());
@ -742,7 +728,7 @@ bool ExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain, bool only
if (!select_query->groupBy())
return false;
initChain(chain, source_columns);
initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back();
ASTs asts = select_query->groupBy()->children;
@ -761,7 +747,7 @@ void ExpressionAnalyzer::appendAggregateFunctionsArguments(ExpressionActionsChai
assertAggregation();
initChain(chain, source_columns);
initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back();
for (size_t i = 0; i < aggregate_descriptions.size(); ++i)
@ -899,7 +885,7 @@ void ExpressionAnalyzer::appendProjectResult(ExpressionActionsChain & chain) con
void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types)
{
initChain(chain, source_columns);
initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back();
getRootActions(expr, only_types, step.actions);
step.required_output.push_back(expr->getColumnName());
@ -921,7 +907,7 @@ void ExpressionAnalyzer::getActionsBeforeAggregation(const ASTPtr & ast, Express
ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result)
{
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(source_columns, context);
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(sourceColumns(), context);
NamesWithAliases result_columns;
Names result_names;
@ -956,7 +942,7 @@ ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool proje
if (!(add_aliases && project_result))
{
/// We will not delete the original columns.
for (const auto & column_name_type : source_columns)
for (const auto & column_name_type : sourceColumns())
result_names.push_back(column_name_type.name);
}
@ -982,164 +968,4 @@ void ExpressionAnalyzer::getAggregateInfo(Names & key_names, AggregateDescriptio
aggregates = aggregate_descriptions;
}
void ExpressionAnalyzer::collectUsedColumns()
{
/** Calculate which columns are required to execute the expression.
* Then, delete all other columns from the list of available columns.
* After execution, columns will only contain the list of columns needed to read from the table.
*/
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(query);
NameSet source_column_names;
for (const auto & column : source_columns)
source_column_names.insert(column.name);
NameSet required = columns_context.requiredColumns();
if (columns_context.has_table_join)
{
const AnalyzedJoin & analyzed_join = analyzedJoin();
NameSet avaliable_columns;
for (const auto & name : source_columns)
avaliable_columns.insert(name.name);
/// Add columns obtained by JOIN (if needed).
columns_added_by_join.clear();
for (const auto & joined_column : analyzed_join.available_joined_columns)
{
auto & name = joined_column.name;
if (avaliable_columns.count(name))
continue;
if (required.count(name))
{
/// Optimisation: do not add columns needed only in JOIN ON section.
if (columns_context.nameInclusion(name) > analyzed_join.rightKeyInclusion(name))
columns_added_by_join.push_back(joined_column);
required.erase(name);
}
}
}
NameSet array_join_sources;
if (columns_context.has_array_join)
{
/// Insert the columns required for the ARRAY JOIN calculation into the required columns list.
for (const auto & result_source : syntax->array_join_result_to_source)
array_join_sources.insert(result_source.second);
for (const auto & column_name_type : source_columns)
if (array_join_sources.count(column_name_type.name))
required.insert(column_name_type.name);
}
const auto * select_query = query->as<ASTSelectQuery>();
/// You need to read at least one column to find the number of rows.
if (select_query && required.empty())
{
/// We will find a column with minimum <compressed_size, type_size, uncompressed_size>.
/// Because it is the column that is cheapest to read.
struct ColumnSizeTuple
{
size_t compressed_size;
size_t type_size;
size_t uncompressed_size;
String name;
bool operator<(const ColumnSizeTuple & that) const
{
return std::tie(compressed_size, type_size, uncompressed_size)
< std::tie(that.compressed_size, that.type_size, that.uncompressed_size);
}
};
std::vector<ColumnSizeTuple> columns;
if (storage)
{
auto column_sizes = storage->getColumnSizes();
for (auto & source_column : source_columns)
{
auto c = column_sizes.find(source_column.name);
if (c == column_sizes.end())
continue;
size_t type_size = source_column.type->haveMaximumSizeOfValue() ? source_column.type->getMaximumSizeOfValueInMemory() : 100;
columns.emplace_back(ColumnSizeTuple{c->second.data_compressed, type_size, c->second.data_uncompressed, source_column.name});
}
}
if (columns.size())
required.insert(std::min_element(columns.begin(), columns.end())->name);
else
/// If we have no information about columns sizes, choose a column of minimum size of its data type.
required.insert(ExpressionActions::getSmallestColumn(source_columns));
}
NameSet unknown_required_source_columns = required;
for (NamesAndTypesList::iterator it = source_columns.begin(); it != source_columns.end();)
{
const String & column_name = it->name;
unknown_required_source_columns.erase(column_name);
if (!required.count(column_name))
source_columns.erase(it++);
else
++it;
}
/// If there are virtual columns among the unknown columns. Remove them from the list of unknown and add
/// in columns list, so that when further processing they are also considered.
if (storage)
{
for (auto it = unknown_required_source_columns.begin(); it != unknown_required_source_columns.end();)
{
if (storage->hasColumn(*it))
{
source_columns.push_back(storage->getColumn(*it));
unknown_required_source_columns.erase(it++);
}
else
++it;
}
}
if (!unknown_required_source_columns.empty())
{
std::stringstream ss;
ss << "Missing columns:";
for (const auto & name : unknown_required_source_columns)
ss << " '" << name << "'";
ss << " while processing query: '" << query << "'";
ss << ", required columns:";
for (const auto & name : columns_context.requiredColumns())
ss << " '" << name << "'";
if (!source_column_names.empty())
{
ss << ", source columns:";
for (const auto & name : source_column_names)
ss << " '" << name << "'";
}
else
ss << ", no source columns";
if (columns_context.has_table_join)
{
ss << ", joined columns:";
for (const auto & column : analyzedJoin().available_joined_columns)
ss << " '" << column.name << "'";
}
if (!array_join_sources.empty())
{
ss << ", arrayJoin columns:";
for (const auto & name : array_join_sources)
ss << " '" << name << "'";
}
throw Exception(ss.str(), ErrorCodes::UNKNOWN_IDENTIFIER);
}
}
}

View File

@ -29,13 +29,8 @@ struct SyntaxAnalyzerResult;
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
/// ExpressionAnalyzer sources, intermediates and results. It splits data and logic, allows to test them separately.
/// If you are not writing a test you probably don't need it. Use ExpressionAnalyzer itself.
struct ExpressionAnalyzerData
{
/// Original columns.
/// First, all available columns of the table are placed here. Then (when analyzing the query), unused columns are deleted.
NamesAndTypesList source_columns;
/// If non-empty, ignore all expressions in not from this list.
NameSet required_result_columns;
@ -55,18 +50,10 @@ struct ExpressionAnalyzerData
/// All new temporary tables obtained by performing the GLOBAL IN/JOIN subqueries.
Tables external_tables;
/// Predicate optimizer overrides the sub queries
bool rewrite_subqueries = false;
/// Columns will be added to block by join.
NamesAndTypesList columns_added_by_join; /// Subset of analyzed_join.available_joined_columns
protected:
ExpressionAnalyzerData(const NamesAndTypesList & source_columns_,
const NameSet & required_result_columns_,
ExpressionAnalyzerData(const NameSet & required_result_columns_,
const SubqueriesForSets & subqueries_for_sets_)
: source_columns(source_columns_),
required_result_columns(required_result_columns_),
: required_result_columns(required_result_columns_),
subqueries_for_sets(subqueries_for_sets_)
{}
};
@ -102,7 +89,6 @@ public:
const ASTPtr & query_,
const SyntaxAnalyzerResultPtr & syntax_analyzer_result_,
const Context & context_,
const NamesAndTypesList & additional_source_columns = {},
const NameSet & required_result_columns_ = {},
size_t subquery_depth_ = 0,
bool do_global_ = false,
@ -114,11 +100,6 @@ public:
/// Get a list of aggregation keys and descriptions of aggregate functions if the query contains GROUP BY.
void getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates) const;
/** Get a set of columns that are enough to read from the table to evaluate the expression.
* Columns added from another table by JOIN are not counted.
*/
Names getRequiredSourceColumns() const { return source_columns.getNames(); }
/** These methods allow you to build a chain of transformations over a block, that receives values in the desired sections of the query.
*
* Example usage:
@ -182,25 +163,21 @@ public:
/// Create Set-s that we can from IN section to use the index on them.
void makeSetsForIndex();
bool isRewriteSubqueriesPredicate() { return rewrite_subqueries; }
bool hasGlobalSubqueries() { return has_global_subqueries; }
private:
ASTPtr query;
const Context & context;
const ExtractedSettings settings;
StoragePtr storage; /// The main table in FROM clause, if exists.
size_t subquery_depth;
bool do_global; /// Do I need to prepare for execution global subqueries when analyzing the query.
SyntaxAnalyzerResultPtr syntax;
const AnalyzedJoin & analyzedJoin() const { return syntax->analyzed_join; }
/** Remove all unnecessary columns from the list of all available columns of the table (`columns`).
* At the same time, form a set of columns added by JOIN (`columns_added_by_join`).
*/
void collectUsedColumns();
const StoragePtr & storage() const { return syntax->storage; } /// The main table in FROM clause, if exists.
const AnalyzedJoin & analyzedJoin() const { return syntax->analyzed_join; }
const NamesAndTypesList & sourceColumns() const { return syntax->required_source_columns; }
const NamesAndTypesList & columnsAddedByJoin() const { return syntax->columns_added_by_join; }
/// Find global subqueries in the GLOBAL IN/JOIN sections. Fills in external_tables.
void initGlobalSubqueriesAndExternalTables();

View File

@ -295,9 +295,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
table_lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze(
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage);
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());
query_analyzer = std::make_unique<ExpressionAnalyzer>(
query_ptr, syntax_analyzer_result, context, NamesAndTypesList(),
query_ptr, syntax_analyzer_result, context,
NameSet(required_result_column_names.begin(), required_result_column_names.end()),
options.subquery_depth, !options.only_analyze);
@ -320,7 +320,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (!options.only_analyze || options.modify_inplace)
{
if (query_analyzer->isRewriteSubqueriesPredicate())
if (syntax_analyzer_result->rewrite_subqueries)
{
/// remake interpreter_subquery when PredicateOptimizer rewrites subqueries and main table is subquery
if (is_subquery)
@ -339,7 +339,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
interpreter_subquery->ignoreWithTotals();
}
required_columns = query_analyzer->getRequiredSourceColumns();
required_columns = syntax_analyzer_result->requiredSourceColumns();
if (storage)
source_header = storage->getSampleBlockForColumns(required_columns);
@ -678,7 +678,16 @@ static SortingInfoPtr optimizeReadInOrder(const MergeTreeData & merge_tree, cons
size_t prefix_size = std::min(order_descr.size(), sorting_key_columns.size());
auto order_by_expr = query.orderBy();
auto syntax_result = SyntaxAnalyzer(context).analyze(order_by_expr, merge_tree.getColumns().getAllPhysical());
SyntaxAnalyzerResultPtr syntax_result;
try
{
syntax_result = SyntaxAnalyzer(context).analyze(order_by_expr, merge_tree.getColumns().getAllPhysical());
}
catch (const Exception &)
{
return {};
}
for (size_t i = 0; i < prefix_size; ++i)
{
/// Read in pk order in case of exact match with order key element
@ -792,7 +801,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if (settings.optimize_move_to_prewhere && query.where() && !query.prewhere() && !query.final())
MergeTreeWhereOptimizer{current_info, context, merge_tree, query_analyzer->getRequiredSourceColumns(), log};
MergeTreeWhereOptimizer{current_info, context, merge_tree, syntax_analyzer_result->requiredSourceColumns(), log};
};
if (const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))

View File

@ -186,8 +186,7 @@ void MutationsInterpreter::prepare(bool dry_run)
{
auto query = column.default_desc.expression->clone();
auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
ExpressionAnalyzer analyzer(query, syntax_result, context);
for (const String & dependency : analyzer.getRequiredSourceColumns())
for (const String & dependency : syntax_result->requiredSourceColumns())
{
if (updated_columns.count(dependency))
column_to_affected_materialized[dependency].push_back(column.name);

View File

@ -40,7 +40,7 @@ static std::vector<String> extractNamesFromLambda(const ASTFunction & node)
return names;
}
bool RequiredSourceColumnsMatcher::needChildVisit(ASTPtr & node, const ASTPtr & child)
bool RequiredSourceColumnsMatcher::needChildVisit(const ASTPtr & node, const ASTPtr & child)
{
if (child->as<ASTSelectQuery>())
return false;
@ -60,7 +60,7 @@ bool RequiredSourceColumnsMatcher::needChildVisit(ASTPtr & node, const ASTPtr &
return true;
}
void RequiredSourceColumnsMatcher::visit(ASTPtr & ast, Data & data)
void RequiredSourceColumnsMatcher::visit(const ASTPtr & ast, Data & data)
{
/// results are columns
@ -111,7 +111,7 @@ void RequiredSourceColumnsMatcher::visit(ASTPtr & ast, Data & data)
}
}
void RequiredSourceColumnsMatcher::visit(ASTSelectQuery & select, const ASTPtr &, Data & data)
void RequiredSourceColumnsMatcher::visit(const ASTSelectQuery & select, const ASTPtr &, Data & data)
{
/// special case for top-level SELECT items: they are publics
for (auto & node : select.select()->children)
@ -128,7 +128,7 @@ void RequiredSourceColumnsMatcher::visit(ASTSelectQuery & select, const ASTPtr &
Visitor(data).visit(node);
/// revisit select_expression_list (with children) when all the aliases are set
Visitor(data).visit(select.refSelect());
Visitor(data).visit(select.select());
}
void RequiredSourceColumnsMatcher::visit(const ASTIdentifier & node, const ASTPtr &, Data & data)
@ -158,7 +158,7 @@ void RequiredSourceColumnsMatcher::visit(const ASTFunction & node, const ASTPtr
}
}
void RequiredSourceColumnsMatcher::visit(ASTTablesInSelectQueryElement & node, const ASTPtr &, Data & data)
void RequiredSourceColumnsMatcher::visit(const ASTTablesInSelectQueryElement & node, const ASTPtr &, Data & data)
{
ASTTableExpression * expr = nullptr;
ASTTableJoin * join = nullptr;
@ -177,7 +177,7 @@ void RequiredSourceColumnsMatcher::visit(ASTTablesInSelectQueryElement & node, c
}
/// ASTIdentifiers here are tables. Do not visit them as generic ones.
void RequiredSourceColumnsMatcher::visit(ASTTableExpression & node, const ASTPtr &, Data & data)
void RequiredSourceColumnsMatcher::visit(const ASTTableExpression & node, const ASTPtr &, Data & data)
{
if (node.database_and_table_name)
data.addTableAliasIfAny(*node.database_and_table_name);

View File

@ -21,19 +21,19 @@ struct ASTTableExpression;
class RequiredSourceColumnsMatcher
{
public:
using Visitor = InDepthNodeVisitor<RequiredSourceColumnsMatcher, false>;
using Visitor = ConstInDepthNodeVisitor<RequiredSourceColumnsMatcher, false>;
using Data = ColumnNamesContext;
static bool needChildVisit(ASTPtr & node, const ASTPtr & child);
static void visit(ASTPtr & ast, Data & data);
static bool needChildVisit(const ASTPtr & node, const ASTPtr & child);
static void visit(const ASTPtr & ast, Data & data);
private:
static void visit(const ASTIdentifier & node, const ASTPtr &, Data & data);
static void visit(const ASTFunction & node, const ASTPtr &, Data & data);
static void visit(ASTTablesInSelectQueryElement & node, const ASTPtr &, Data & data);
static void visit(ASTTableExpression & node, const ASTPtr &, Data & data);
static void visit(const ASTTablesInSelectQueryElement & node, const ASTPtr &, Data & data);
static void visit(const ASTTableExpression & node, const ASTPtr &, Data & data);
static void visit(const ASTArrayJoin & node, const ASTPtr &, Data & data);
static void visit(ASTSelectQuery & select, const ASTPtr &, Data & data);
static void visit(const ASTSelectQuery & select, const ASTPtr &, Data & data);
};
/// Extracts all the information about columns and tables from ASTSelectQuery block into ColumnNamesContext object.

View File

@ -13,6 +13,7 @@
#include <Interpreters/CollectJoinOnKeysVisitor.h>
#include <Interpreters/ExternalDictionaries.h>
#include <Interpreters/OptimizeIfWithConstantConditionVisitor.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
@ -44,6 +45,7 @@ namespace ErrorCodes
extern const int INVALID_JOIN_ON_EXPRESSION;
extern const int EMPTY_LIST_OF_COLUMNS_QUERIED;
extern const int NOT_IMPLEMENTED;
extern const int UNKNOWN_IDENTIFIER;
}
NameSet removeDuplicateColumns(NamesAndTypesList & columns)
@ -558,12 +560,181 @@ void checkJoin(const ASTTablesInSelectQueryElement * join)
}
/// Calculate which columns are required to execute the expression.
/// Then, delete all other columns from the list of available columns.
/// After execution, columns will only contain the list of columns needed to read from the table.
void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns)
{
/// We caclulate required_source_columns with source_columns modifications and swap them on exit
required_source_columns = source_columns;
if (!additional_source_columns.empty())
{
source_columns.insert(source_columns.end(), additional_source_columns.begin(), additional_source_columns.end());
removeDuplicateColumns(source_columns);
}
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(query);
NameSet source_column_names;
for (const auto & column : source_columns)
source_column_names.insert(column.name);
NameSet required = columns_context.requiredColumns();
if (columns_context.has_table_join)
{
NameSet avaliable_columns;
for (const auto & name : source_columns)
avaliable_columns.insert(name.name);
/// Add columns obtained by JOIN (if needed).
columns_added_by_join.clear();
for (const auto & joined_column : analyzed_join.available_joined_columns)
{
auto & name = joined_column.name;
if (avaliable_columns.count(name))
continue;
if (required.count(name))
{
/// Optimisation: do not add columns needed only in JOIN ON section.
if (columns_context.nameInclusion(name) > analyzed_join.rightKeyInclusion(name))
columns_added_by_join.push_back(joined_column);
required.erase(name);
}
}
}
NameSet array_join_sources;
if (columns_context.has_array_join)
{
/// Insert the columns required for the ARRAY JOIN calculation into the required columns list.
for (const auto & result_source : array_join_result_to_source)
array_join_sources.insert(result_source.second);
for (const auto & column_name_type : source_columns)
if (array_join_sources.count(column_name_type.name))
required.insert(column_name_type.name);
}
const auto * select_query = query->as<ASTSelectQuery>();
/// You need to read at least one column to find the number of rows.
if (select_query && required.empty())
{
/// We will find a column with minimum <compressed_size, type_size, uncompressed_size>.
/// Because it is the column that is cheapest to read.
struct ColumnSizeTuple
{
size_t compressed_size;
size_t type_size;
size_t uncompressed_size;
String name;
bool operator<(const ColumnSizeTuple & that) const
{
return std::tie(compressed_size, type_size, uncompressed_size)
< std::tie(that.compressed_size, that.type_size, that.uncompressed_size);
}
};
std::vector<ColumnSizeTuple> columns;
if (storage)
{
auto column_sizes = storage->getColumnSizes();
for (auto & source_column : source_columns)
{
auto c = column_sizes.find(source_column.name);
if (c == column_sizes.end())
continue;
size_t type_size = source_column.type->haveMaximumSizeOfValue() ? source_column.type->getMaximumSizeOfValueInMemory() : 100;
columns.emplace_back(ColumnSizeTuple{c->second.data_compressed, type_size, c->second.data_uncompressed, source_column.name});
}
}
if (columns.size())
required.insert(std::min_element(columns.begin(), columns.end())->name);
else
/// If we have no information about columns sizes, choose a column of minimum size of its data type.
required.insert(ExpressionActions::getSmallestColumn(source_columns));
}
NameSet unknown_required_source_columns = required;
for (NamesAndTypesList::iterator it = source_columns.begin(); it != source_columns.end();)
{
const String & column_name = it->name;
unknown_required_source_columns.erase(column_name);
if (!required.count(column_name))
source_columns.erase(it++);
else
++it;
}
/// If there are virtual columns among the unknown columns. Remove them from the list of unknown and add
/// in columns list, so that when further processing they are also considered.
if (storage)
{
for (auto it = unknown_required_source_columns.begin(); it != unknown_required_source_columns.end();)
{
if (storage->hasColumn(*it))
{
source_columns.push_back(storage->getColumn(*it));
unknown_required_source_columns.erase(it++);
}
else
++it;
}
}
if (!unknown_required_source_columns.empty())
{
std::stringstream ss;
ss << "Missing columns:";
for (const auto & name : unknown_required_source_columns)
ss << " '" << name << "'";
ss << " while processing query: '" << queryToString(query) << "'";
ss << ", required columns:";
for (const auto & name : columns_context.requiredColumns())
ss << " '" << name << "'";
if (!source_column_names.empty())
{
ss << ", source columns:";
for (const auto & name : source_column_names)
ss << " '" << name << "'";
}
else
ss << ", no source columns";
if (columns_context.has_table_join)
{
ss << ", joined columns:";
for (const auto & column : analyzed_join.available_joined_columns)
ss << " '" << column.name << "'";
}
if (!array_join_sources.empty())
{
ss << ", arrayJoin columns:";
for (const auto & name : array_join_sources)
ss << " '" << name << "'";
}
throw Exception(ss.str(), ErrorCodes::UNKNOWN_IDENTIFIER);
}
required_source_columns.swap(source_columns);
}
SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
ASTPtr & query,
const NamesAndTypesList & source_columns_,
const Names & required_result_columns,
StoragePtr storage) const
StoragePtr storage,
const NamesAndTypesList & additional_source_columns) const
{
auto * select_query = query->as<ASTSelectQuery>();
if (!storage && select_query)
@ -669,6 +840,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
collectJoinedColumns(result.analyzed_join, *select_query, source_columns_set, result.aliases, settings.join_use_nulls);
}
result.collectUsedColumns(query, additional_source_columns);
return std::make_shared<const SyntaxAnalyzerResult>(result);
}

View File

@ -13,8 +13,13 @@ NameSet removeDuplicateColumns(NamesAndTypesList & columns);
struct SyntaxAnalyzerResult
{
StoragePtr storage;
AnalyzedJoin analyzed_join;
NamesAndTypesList source_columns;
/// Set of columns that are enough to read from the table to evaluate the expression. It does not include joined columns.
NamesAndTypesList required_source_columns;
/// Columns will be added to block by JOIN. It's a subset of analyzed_join.available_joined_columns
NamesAndTypesList columns_added_by_join;
Aliases aliases;
@ -31,10 +36,11 @@ struct SyntaxAnalyzerResult
/// Note: not used further.
NameToNameMap array_join_name_to_alias;
AnalyzedJoin analyzed_join;
/// Predicate optimizer overrides the sub queries
bool rewrite_subqueries = false;
void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns);
Names requiredSourceColumns() const { return required_source_columns.getNames(); }
};
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
@ -64,7 +70,8 @@ public:
ASTPtr & query,
const NamesAndTypesList & source_columns_,
const Names & required_result_columns = {},
StoragePtr storage = {}) const;
StoragePtr storage = {},
const NamesAndTypesList & additional_source_columns = {}) const;
private:
const Context & context;

View File

@ -61,7 +61,7 @@ void evaluateMissingDefaults(Block & block,
auto syntax_result = SyntaxAnalyzer(context).analyze(default_expr_list, block.getNamesAndTypesList());
auto expression_analyzer = ExpressionAnalyzer{default_expr_list, syntax_result, context};
auto required_source_columns = expression_analyzer.getRequiredSourceColumns();
auto required_source_columns = syntax_result->requiredSourceColumns();
auto rows_was = copy_block.rows();
// Delete all not needed columns in DEFAULT expression.

View File

@ -19,7 +19,7 @@ KafkaBlockInputStream::KafkaBlockInputStream(
if (!storage.getSchemaName().empty())
context.setSetting("format_schema", storage.getSchemaName());
virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneEmptyColumns();
virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneEmptyColumns();
}
KafkaBlockInputStream::~KafkaBlockInputStream()
@ -60,9 +60,14 @@ void KafkaBlockInputStream::readPrefixImpl()
auto read_callback = [this]
{
const auto * sub_buffer = buffer->subBufferAs<ReadBufferFromKafkaConsumer>();
virtual_columns[0]->insert(sub_buffer->currentTopic()); // "topic"
virtual_columns[1]->insert(sub_buffer->currentKey()); // "key"
virtual_columns[2]->insert(sub_buffer->currentOffset()); // "offset"
virtual_columns[0]->insert(sub_buffer->currentTopic()); // "topic"
virtual_columns[1]->insert(sub_buffer->currentKey()); // "key"
virtual_columns[2]->insert(sub_buffer->currentOffset()); // "offset"
virtual_columns[3]->insert(sub_buffer->currentPartition()); // "partition"
auto timestamp = sub_buffer->currentTimestamp();
if (timestamp)
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(timestamp->get_timestamp()).count()); // "timestamp"
};
auto child = FormatFactory::instance().getInput(
@ -79,8 +84,8 @@ Block KafkaBlockInputStream::readImpl()
if (!block)
return block;
Block virtual_block = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneWithColumns(std::move(virtual_columns));
virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneEmptyColumns();
Block virtual_block = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneWithColumns(std::move(virtual_columns));
virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneEmptyColumns();
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
block.insert(column);

View File

@ -30,6 +30,8 @@ public:
String currentTopic() const { return current[-1].get_topic(); }
String currentKey() const { return current[-1].get_key(); }
auto currentOffset() const { return current[-1].get_offset(); }
auto currentPartition() const { return current[-1].get_partition(); }
auto currentTimestamp() const { return current[-1].get_timestamp(); }
private:
using Messages = std::vector<cppkafka::Message>;

View File

@ -4,6 +4,8 @@
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/InterpreterInsertQuery.h>
@ -85,7 +87,9 @@ StorageKafka::StorageKafka(
columns_,
ColumnsDescription({{"_topic", std::make_shared<DataTypeString>()},
{"_key", std::make_shared<DataTypeString>()},
{"_offset", std::make_shared<DataTypeUInt64>()}}, true))
{"_offset", std::make_shared<DataTypeUInt64>()},
{"_partition", std::make_shared<DataTypeUInt64>()},
{"_timestamp", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())}}, true))
, table_name(table_name_)
, database_name(database_name_)
, global_context(context_)

View File

@ -134,8 +134,7 @@ MergeTreeData::MergeTreeData(
throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS);
auto syntax = SyntaxAnalyzer(global_context).analyze(sample_by_ast, getColumns().getAllPhysical());
columns_required_for_sampling = ExpressionAnalyzer(sample_by_ast, syntax, global_context)
.getRequiredSourceColumns();
columns_required_for_sampling = syntax->requiredSourceColumns();
}
MergeTreeDataFormatVersion min_format_version(0);
if (!date_column_name.empty())
@ -295,8 +294,7 @@ void MergeTreeData::setPrimaryKeyIndicesAndColumns(
if (!added_key_column_expr_list->children.empty())
{
auto syntax = SyntaxAnalyzer(global_context).analyze(added_key_column_expr_list, all_columns);
Names used_columns = ExpressionAnalyzer(added_key_column_expr_list, syntax, global_context)
.getRequiredSourceColumns();
Names used_columns = syntax->requiredSourceColumns();
NamesAndTypesList deleted_columns;
NamesAndTypesList added_columns;
@ -743,7 +741,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
auto lock = lockParts();
data_parts_indexes.clear();
bool has_adaptive_parts = false, has_non_adaptive_parts = false;
bool has_adaptive_parts = false;
bool has_non_adaptive_parts = false;
for (const String & file_name : part_file_names)
{
MergeTreePartInfo part_info;

View File

@ -8,7 +8,6 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Storages/MergeTree/KeyCondition.h>
@ -111,8 +110,7 @@ String transformQueryForExternalDatabase(
{
auto clone_query = query.clone();
auto syntax_result = SyntaxAnalyzer(context).analyze(clone_query, available_columns);
ExpressionAnalyzer analyzer(clone_query, syntax_result, context);
const Names & used_columns = analyzer.getRequiredSourceColumns();
const Names used_columns = syntax_result->requiredSourceColumns();
auto select = std::make_shared<ASTSelectQuery>();

View File

@ -61,10 +61,10 @@ def wait_kafka_is_available(max_retries=50):
time.sleep(1)
def kafka_produce(topic, messages):
def kafka_produce(topic, messages, timestamp=None):
producer = KafkaProducer(bootstrap_servers="localhost:9092")
for message in messages:
producer.send(topic=topic, value=message)
producer.send(topic=topic, value=message, timestamp_ms=timestamp)
producer.flush()
print ("Produced {} messages for topic {}".format(len(messages), topic))
@ -389,16 +389,16 @@ def test_kafka_virtual_columns(kafka_cluster):
messages = ''
for i in range(25):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('virt1', [messages])
kafka_produce('virt1', [messages], 0)
messages = ''
for i in range(25, 50):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('virt1', [messages])
kafka_produce('virt1', [messages], 0)
result = ''
while True:
result += instance.query('SELECT _key, key, _topic, value, _offset FROM test.kafka', ignore_error=True)
result += instance.query('SELECT _key, key, _topic, value, _offset, _partition, _timestamp FROM test.kafka', ignore_error=True)
if kafka_check_result(result, False, 'test_kafka_virtual1.reference'):
break
@ -417,20 +417,20 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
kafka_group_name = 'virt2',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64)
CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64, partition UInt64, timestamp Nullable(DateTime))
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT *, _key as kafka_key, _topic as topic, _offset as offset FROM test.kafka;
SELECT *, _key as kafka_key, _topic as topic, _offset as offset, _partition as partition, _timestamp as timestamp FROM test.kafka;
''')
messages = []
for i in range(50):
messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('virt2', messages)
kafka_produce('virt2', messages, 0)
while True:
result = instance.query('SELECT kafka_key, key, topic, value, offset FROM test.view')
result = instance.query('SELECT kafka_key, key, topic, value, offset, partition, timestamp FROM test.view')
if kafka_check_result(result, False, 'test_kafka_virtual2.reference'):
break

View File

@ -1,50 +1,50 @@
0 virt1 0 0
1 virt1 1 0
2 virt1 2 0
3 virt1 3 0
4 virt1 4 0
5 virt1 5 0
6 virt1 6 0
7 virt1 7 0
8 virt1 8 0
9 virt1 9 0
10 virt1 10 0
11 virt1 11 0
12 virt1 12 0
13 virt1 13 0
14 virt1 14 0
15 virt1 15 0
16 virt1 16 0
17 virt1 17 0
18 virt1 18 0
19 virt1 19 0
20 virt1 20 0
21 virt1 21 0
22 virt1 22 0
23 virt1 23 0
24 virt1 24 0
25 virt1 25 1
26 virt1 26 1
27 virt1 27 1
28 virt1 28 1
29 virt1 29 1
30 virt1 30 1
31 virt1 31 1
32 virt1 32 1
33 virt1 33 1
34 virt1 34 1
35 virt1 35 1
36 virt1 36 1
37 virt1 37 1
38 virt1 38 1
39 virt1 39 1
40 virt1 40 1
41 virt1 41 1
42 virt1 42 1
43 virt1 43 1
44 virt1 44 1
45 virt1 45 1
46 virt1 46 1
47 virt1 47 1
48 virt1 48 1
49 virt1 49 1
0 virt1 0 0 0 0000-00-00 00:00:00
1 virt1 1 0 0 0000-00-00 00:00:00
2 virt1 2 0 0 0000-00-00 00:00:00
3 virt1 3 0 0 0000-00-00 00:00:00
4 virt1 4 0 0 0000-00-00 00:00:00
5 virt1 5 0 0 0000-00-00 00:00:00
6 virt1 6 0 0 0000-00-00 00:00:00
7 virt1 7 0 0 0000-00-00 00:00:00
8 virt1 8 0 0 0000-00-00 00:00:00
9 virt1 9 0 0 0000-00-00 00:00:00
10 virt1 10 0 0 0000-00-00 00:00:00
11 virt1 11 0 0 0000-00-00 00:00:00
12 virt1 12 0 0 0000-00-00 00:00:00
13 virt1 13 0 0 0000-00-00 00:00:00
14 virt1 14 0 0 0000-00-00 00:00:00
15 virt1 15 0 0 0000-00-00 00:00:00
16 virt1 16 0 0 0000-00-00 00:00:00
17 virt1 17 0 0 0000-00-00 00:00:00
18 virt1 18 0 0 0000-00-00 00:00:00
19 virt1 19 0 0 0000-00-00 00:00:00
20 virt1 20 0 0 0000-00-00 00:00:00
21 virt1 21 0 0 0000-00-00 00:00:00
22 virt1 22 0 0 0000-00-00 00:00:00
23 virt1 23 0 0 0000-00-00 00:00:00
24 virt1 24 0 0 0000-00-00 00:00:00
25 virt1 25 1 0 0000-00-00 00:00:00
26 virt1 26 1 0 0000-00-00 00:00:00
27 virt1 27 1 0 0000-00-00 00:00:00
28 virt1 28 1 0 0000-00-00 00:00:00
29 virt1 29 1 0 0000-00-00 00:00:00
30 virt1 30 1 0 0000-00-00 00:00:00
31 virt1 31 1 0 0000-00-00 00:00:00
32 virt1 32 1 0 0000-00-00 00:00:00
33 virt1 33 1 0 0000-00-00 00:00:00
34 virt1 34 1 0 0000-00-00 00:00:00
35 virt1 35 1 0 0000-00-00 00:00:00
36 virt1 36 1 0 0000-00-00 00:00:00
37 virt1 37 1 0 0000-00-00 00:00:00
38 virt1 38 1 0 0000-00-00 00:00:00
39 virt1 39 1 0 0000-00-00 00:00:00
40 virt1 40 1 0 0000-00-00 00:00:00
41 virt1 41 1 0 0000-00-00 00:00:00
42 virt1 42 1 0 0000-00-00 00:00:00
43 virt1 43 1 0 0000-00-00 00:00:00
44 virt1 44 1 0 0000-00-00 00:00:00
45 virt1 45 1 0 0000-00-00 00:00:00
46 virt1 46 1 0 0000-00-00 00:00:00
47 virt1 47 1 0 0000-00-00 00:00:00
48 virt1 48 1 0 0000-00-00 00:00:00
49 virt1 49 1 0 0000-00-00 00:00:00

View File

@ -1,50 +1,50 @@
0 virt2 0 0
1 virt2 1 1
2 virt2 2 2
3 virt2 3 3
4 virt2 4 4
5 virt2 5 5
6 virt2 6 6
7 virt2 7 7
8 virt2 8 8
9 virt2 9 9
10 virt2 10 10
11 virt2 11 11
12 virt2 12 12
13 virt2 13 13
14 virt2 14 14
15 virt2 15 15
16 virt2 16 16
17 virt2 17 17
18 virt2 18 18
19 virt2 19 19
20 virt2 20 20
21 virt2 21 21
22 virt2 22 22
23 virt2 23 23
24 virt2 24 24
25 virt2 25 25
26 virt2 26 26
27 virt2 27 27
28 virt2 28 28
29 virt2 29 29
30 virt2 30 30
31 virt2 31 31
32 virt2 32 32
33 virt2 33 33
34 virt2 34 34
35 virt2 35 35
36 virt2 36 36
37 virt2 37 37
38 virt2 38 38
39 virt2 39 39
40 virt2 40 40
41 virt2 41 41
42 virt2 42 42
43 virt2 43 43
44 virt2 44 44
45 virt2 45 45
46 virt2 46 46
47 virt2 47 47
48 virt2 48 48
49 virt2 49 49
0 virt2 0 0 0 0000-00-00 00:00:00
1 virt2 1 1 0 0000-00-00 00:00:00
2 virt2 2 2 0 0000-00-00 00:00:00
3 virt2 3 3 0 0000-00-00 00:00:00
4 virt2 4 4 0 0000-00-00 00:00:00
5 virt2 5 5 0 0000-00-00 00:00:00
6 virt2 6 6 0 0000-00-00 00:00:00
7 virt2 7 7 0 0000-00-00 00:00:00
8 virt2 8 8 0 0000-00-00 00:00:00
9 virt2 9 9 0 0000-00-00 00:00:00
10 virt2 10 10 0 0000-00-00 00:00:00
11 virt2 11 11 0 0000-00-00 00:00:00
12 virt2 12 12 0 0000-00-00 00:00:00
13 virt2 13 13 0 0000-00-00 00:00:00
14 virt2 14 14 0 0000-00-00 00:00:00
15 virt2 15 15 0 0000-00-00 00:00:00
16 virt2 16 16 0 0000-00-00 00:00:00
17 virt2 17 17 0 0000-00-00 00:00:00
18 virt2 18 18 0 0000-00-00 00:00:00
19 virt2 19 19 0 0000-00-00 00:00:00
20 virt2 20 20 0 0000-00-00 00:00:00
21 virt2 21 21 0 0000-00-00 00:00:00
22 virt2 22 22 0 0000-00-00 00:00:00
23 virt2 23 23 0 0000-00-00 00:00:00
24 virt2 24 24 0 0000-00-00 00:00:00
25 virt2 25 25 0 0000-00-00 00:00:00
26 virt2 26 26 0 0000-00-00 00:00:00
27 virt2 27 27 0 0000-00-00 00:00:00
28 virt2 28 28 0 0000-00-00 00:00:00
29 virt2 29 29 0 0000-00-00 00:00:00
30 virt2 30 30 0 0000-00-00 00:00:00
31 virt2 31 31 0 0000-00-00 00:00:00
32 virt2 32 32 0 0000-00-00 00:00:00
33 virt2 33 33 0 0000-00-00 00:00:00
34 virt2 34 34 0 0000-00-00 00:00:00
35 virt2 35 35 0 0000-00-00 00:00:00
36 virt2 36 36 0 0000-00-00 00:00:00
37 virt2 37 37 0 0000-00-00 00:00:00
38 virt2 38 38 0 0000-00-00 00:00:00
39 virt2 39 39 0 0000-00-00 00:00:00
40 virt2 40 40 0 0000-00-00 00:00:00
41 virt2 41 41 0 0000-00-00 00:00:00
42 virt2 42 42 0 0000-00-00 00:00:00
43 virt2 43 43 0 0000-00-00 00:00:00
44 virt2 44 44 0 0000-00-00 00:00:00
45 virt2 45 45 0 0000-00-00 00:00:00
46 virt2 46 46 0 0000-00-00 00:00:00
47 virt2 47 47 0 0000-00-00 00:00:00
48 virt2 48 48 0 0000-00-00 00:00:00
49 virt2 49 49 0 0000-00-00 00:00:00

View File

@ -0,0 +1,3 @@
['0','1','2','3','4','5','6','7','8','9']
['0','1','2','3','4','5','6','7','8','9']
['2999999','2999998','2999997','2999996','2999995','2999994','2999993','2999992','2999991','2999990']

View File

@ -0,0 +1,12 @@
DROP TABLE IF EXISTS topk;
CREATE TABLE topk (val1 String, val2 UInt32) ENGINE = MergeTree ORDER BY val1;
INSERT INTO topk SELECT toString(number), number FROM numbers(3000000);
INSERT INTO topk SELECT toString(number % 10), 999999999 FROM numbers(1000000);
SELECT arraySort(topK(10)(val1)) FROM topk;
SELECT arraySort(topKWeighted(10)(val1, val2)) FROM topk;
SELECT topKWeighted(10)(toString(number), number) from numbers(3000000);
DROP TABLE topk;