dbms: better optimization of ARRAY JOIN; tests for it. [#METR-11017]

This commit is contained in:
Michael Kolupaev 2014-04-30 23:19:29 +04:00
parent fc8aa282c0
commit 4cee45e004
8 changed files with 162 additions and 53 deletions

View File

@ -26,9 +26,6 @@ class ExpressionActions
public:
struct Action
{
private:
Action() {}
public:
enum Type
{
@ -168,6 +165,14 @@ public:
/// Добавляет в начало удаление всех лишних столбцов.
void prependProjectInput();
/// Добавить в начало указанное действие типа ARRAY JOIN. Поменять соответствующие входные типы на массивы.
/// Если в списке ARRAY JOIN есть неизвестные столбцы, взять их типы из sample_block, а сразу после ARRAY JOIN удалить.
void prependArrayJoin(const Action & action, const Block & sample_block);
/// Если последнее действие - ARRAY JOIN, и оно не влияет на столбцы из required_columns, выбросить и вернуть его.
/// Поменять соответствующие выходные типы на массивы.
bool popUnusedArrayJoin(const Names & required_columns, Action & out_action);
/// - Добавляет действия для удаления всех столбцов, кроме указанных.
/// - Убирает неиспользуемые входные столбцы.
@ -246,39 +251,9 @@ struct ExpressionActionsChain
Settings settings;
Steps steps;
void addStep()
{
if (steps.empty())
throw Exception("Cannot add action to empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR);
ColumnsWithNameAndType columns = steps.back().actions->getSampleBlock().getColumns();
steps.push_back(Step(new ExpressionActions(columns, settings)));
}
void addStep();
void finalize()
{
for (int i = static_cast<int>(steps.size()) - 1; i >= 0; --i)
{
steps[i].actions->finalize(steps[i].required_output);
if (i > 0)
{
Names & previous_output = steps[i - 1].required_output;
const NamesAndTypesList & columns = steps[i].actions->getRequiredColumnsWithTypes();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
previous_output.push_back(it->first);
std::sort(previous_output.begin(), previous_output.end());
previous_output.erase(std::unique(previous_output.begin(), previous_output.end()), previous_output.end());
/// Если на выходе предыдущего шага образуются ненужные столбцы, добавим в начало этого шага их выбрасывание.
/// За исключением случая, когда мы выбросим все столбцы и потеряем количество строк в блоке.
if (!steps[i].actions->getRequiredColumnsWithTypes().empty()
&& previous_output.size() > steps[i].actions->getRequiredColumnsWithTypes().size())
steps[i].actions->prependProjectInput();
}
}
}
void finalize();
void clear()
{
@ -300,6 +275,8 @@ struct ExpressionActionsChain
return steps.back();
}
std::string dumpChain();
};
}

View File

@ -413,6 +413,51 @@ void ExpressionActions::prependProjectInput()
actions.insert(actions.begin(), Action::project(getRequiredColumns()));
}
void ExpressionActions::prependArrayJoin(const Action & action, const Block & sample_block)
{
if (action.type != Action::ARRAY_JOIN)
throw Exception("ARRAY_JOIN action expected", ErrorCodes::LOGICAL_ERROR);
NameSet array_join_set(action.array_joined_columns.begin(), action.array_joined_columns.end());
for (auto & it : input_columns)
{
if (array_join_set.count(it.first))
{
array_join_set.erase(it.first);
it.second = new DataTypeArray(it.second);
}
}
for (const std::string & name : array_join_set)
{
input_columns.push_back(NameAndTypePair(name, sample_block.getByName(name).type));
actions.insert(actions.begin(), Action::removeColumn(name));
}
actions.insert(actions.begin(), action);
optimizeArrayJoin();
}
bool ExpressionActions::popUnusedArrayJoin(const Names & required_columns, Action & out_action)
{
if (actions.empty() || actions.back().type != Action::ARRAY_JOIN)
return false;
NameSet required_set(required_columns.begin(), required_columns.end());
for (const std::string & name : actions.back().array_joined_columns)
{
if (required_set.count(name))
return false;
}
for (const std::string & name : actions.back().array_joined_columns)
{
DataTypePtr & type = sample_block.getByName(name).type;
type = new DataTypeArray(type);
}
out_action = actions.back();
actions.pop_back();
return true;
}
void ExpressionActions::execute(Block & block) const
{
for (size_t i = 0; i < actions.size(); ++i)
@ -454,10 +499,6 @@ void ExpressionActions::finalize(const Names & output_columns)
final_columns.insert(name);
}
/// Не будем оставлять блок пустым, чтобы не потерять количество строк в нем.
if (final_columns.empty())
final_columns.insert(getSmallestColumn(input_columns));
/// Какие столбцы нужны, чтобы выполнить действия от текущего до последнего.
NameSet needed_columns = final_columns;
/// Какие столбцы никто не будет трогать от текущего действия до последнего.
@ -486,14 +527,24 @@ void ExpressionActions::finalize(const Names & output_columns)
/// Не будем ARRAY JOIN-ить столбцы, которые дальше не используются.
/// Обычно такие столбцы не используются и до ARRAY JOIN, и поэтому выбрасываются дальше в этой функции.
/// Не будем убирать все столбцы, чтобы не потерять количество строк.
NameSet::iterator it = action.array_joined_columns.begin();
while (it != action.array_joined_columns.end() && action.array_joined_columns.size() > 1)
for (auto it = action.array_joined_columns.begin(); it != action.array_joined_columns.end();)
{
NameSet::iterator jt = it;
++it;
if (!needed_columns.count(*jt))
bool need = needed_columns.count(*it);
if (!need && action.array_joined_columns.size() > 1)
{
action.array_joined_columns.erase(jt);
action.array_joined_columns.erase(it++);
}
else
{
needed_columns.insert(*it);
unmodified_columns.erase(*it);
/// Если никакие результаты ARRAY JOIN не используются, принудительно оставим на выходе произвольный столбец,
/// чтобы не потерять количество строк.
if (!need)
final_columns.insert(*it);
++it;
}
}
}
@ -531,6 +582,10 @@ void ExpressionActions::finalize(const Names & output_columns)
/// Не будем выбрасывать все входные столбцы, чтобы не потерять количество строк в блоке.
if (needed_columns.empty() && !input_columns.empty())
needed_columns.insert(getSmallestColumn(input_columns));
/// Не будем оставлять блок пустым, чтобы не потерять количество строк в нем.
if (final_columns.empty())
final_columns.insert(getSmallestColumn(input_columns));
for (NamesAndTypesList::iterator it = input_columns.begin(); it != input_columns.end();)
{
@ -699,4 +754,65 @@ void ExpressionActions::optimizeArrayJoin()
}
}
void ExpressionActionsChain::addStep()
{
if (steps.empty())
throw Exception("Cannot add action to empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR);
ColumnsWithNameAndType columns = steps.back().actions->getSampleBlock().getColumns();
steps.push_back(Step(new ExpressionActions(columns, settings)));
}
void ExpressionActionsChain::finalize()
{
/// Финализируем все шаги. Справа налево, чтобы определять ненужные входные столбцы.
for (int i = static_cast<int>(steps.size()) - 1; i >= 0; --i)
{
Names required_output = steps[i].required_output;
if (i + 1 < static_cast<int>(steps.size()))
{
for (const auto & it : steps[i + 1].actions->getRequiredColumnsWithTypes())
required_output.push_back(it.first);
}
steps[i].actions->finalize(required_output);
}
/// Когда возможно, перенесем ARRAY JOIN из более ранних шагов в более поздние.
for (size_t i = 1; i < steps.size(); ++i)
{
ExpressionActions::Action action;
if (steps[i - 1].actions->popUnusedArrayJoin(steps[i - 1].required_output, action))
steps[i].actions->prependArrayJoin(action, steps[i - 1].actions->getSampleBlock());
}
/// Добавим выбрасывание ненужных столбцов в начало каждого шага.
for (size_t i = 1; i < steps.size(); ++i)
{
size_t columns_from_previous = steps[i - 1].actions->getSampleBlock().columns();
/// Если на выходе предыдущего шага образуются ненужные столбцы, добавим в начало этого шага их выбрасывание.
/// За исключением случая, когда мы выбросим все столбцы и потеряем количество строк в блоке.
if (!steps[i].actions->getRequiredColumnsWithTypes().empty()
&& columns_from_previous > steps[i].actions->getRequiredColumnsWithTypes().size())
steps[i].actions->prependProjectInput();
}
}
std::string ExpressionActionsChain::dumpChain()
{
std::stringstream ss;
for (size_t i = 0; i < steps.size(); ++i)
{
ss << "step " << i << "\n";
ss << "required output:\n";
for (const std::string & name : steps[i].required_output)
ss << name << "\n";
ss << "\n" << steps[i].actions->dumpActions() << "\n";
}
return ss.str();
}
}

View File

@ -1283,9 +1283,6 @@ bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool on
addMultipleArrayJoinAction(*step.actions);
for (NameToNameMap::iterator it = array_join_result_to_source.begin(); it != array_join_result_to_source.end(); ++it)
step.required_output.push_back(it->first);
return true;
}

View File

@ -240,8 +240,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
need_aggregate = query_analyzer->hasAggregation();
if (query_analyzer->appendArrayJoin(chain, !first_stage))
array_join = chain.getLastActions();
query_analyzer->appendArrayJoin(chain, !first_stage);
if (query_analyzer->appendWhere(chain, !first_stage))
{
@ -340,7 +339,6 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
executeHaving(streams, before_having);
executeExpression(streams, before_order_and_select);
executeDistinct(streams, true, selected_columns);
need_second_distinct_pass = streams.size() > 1;

View File

@ -1,4 +1,5 @@
CREATE DATABASE IF NOT EXISTS test;
DROP TABLE IF EXISTS test.big_array;
CREATE TABLE IF NOT EXISTS test.big_array (x Array(UInt8)) ENGINE=TinyLog;
CREATE TABLE test.big_array (x Array(UInt8)) ENGINE=TinyLog;
INSERT INTO test.big_array SELECT groupArray(number % 255) AS x FROM (SELECT * FROM system.numbers LIMIT 1000000);
SELECT sum(y) AS s FROM remote('127.0.0.{1,2}', test, big_array) ARRAY JOIN x AS y;

View File

@ -1 +1,2 @@
SELECT sum(s) FROM (SELECT y AS s FROM remote('127.0.0.{1,2}', test, big_array) ARRAY JOIN x AS y)
SELECT sum(s) FROM (SELECT y AS s FROM remote('127.0.0.{1,2}', test, big_array) ARRAY JOIN x AS y);
DROP TABLE test.big_array;

View File

@ -0,0 +1,6 @@
1000000
1000000
1000000 499999500000
1000000
1000000 499999500000
1000000

View File

@ -0,0 +1,13 @@
CREATE DATABASE IF NOT EXISTS test;
DROP TABLE IF EXISTS test.big_array;
CREATE TABLE test.big_array (x Array(UInt8)) ENGINE=TinyLog;
INSERT INTO test.big_array SELECT groupArray(number % 255) AS x FROM (SELECT * FROM system.numbers LIMIT 1000000);
SELECT count() FROM test.big_array ARRAY JOIN x;
SELECT count() FROM test.big_array ARRAY JOIN x AS y;
SELECT countIf(has(x, toUInt64(10))), sum(y) FROM temp.ar ARRAY JOIN x AS y;
SELECT countIf(has(x, toUInt64(10))) FROM temp.ar ARRAY JOIN x AS y;
SELECT countIf(has(x, toUInt64(10))), sum(y) FROM temp.ar ARRAY JOIN x AS y WHERE 1;
SELECT countIf(has(x, toUInt64(10))) FROM temp.ar ARRAY JOIN x AS y WHERE 1;
DROP TABLE test.big_array;