dbms: GLOBAL JOINs: development [#METR-11370].

This commit is contained in:
Alexey Milovidov 2014-07-04 05:40:22 +04:00
parent 694f1205d6
commit be8c22f47f
2 changed files with 105 additions and 126 deletions

View File

@ -199,8 +199,8 @@ private:
/// Превратить перечисление значений или подзапрос в ASTSet. node - функция in или notIn.
void makeSet(ASTFunction * node, const Block & sample_block);
/// Запустить подзапрос в секции GLOBAL IN, создать временную таблицу типа Memory и запомнить эту пару в переменной external_tables.
void addExternalStorage(ASTFunction * node);
/// Запустить подзапрос в секции GLOBAL IN/JOIN, создать временную таблицу типа Memory и запомнить эту пару в переменной external_tables.
void addExternalStorage(ASTPtr & subquery_or_table_name);
void getArrayJoinedColumns();
void getArrayJoinedColumnsImpl(ASTPtr ast);
@ -211,13 +211,13 @@ private:
struct ScopeStack;
void getActionsImpl(ASTPtr ast, bool no_subqueries, bool only_consts, ScopeStack & actions_stack);
void getRootActionsImpl(ASTPtr ast, bool no_subqueries, bool only_consts, ExpressionActionsPtr & actions);
void getRootActions(ASTPtr ast, bool no_subqueries, bool only_consts, ExpressionActionsPtr & actions);
void getActionsBeforeAggregationImpl(ASTPtr ast, ExpressionActionsPtr & actions, bool no_subqueries);
void getActionsBeforeAggregation(ASTPtr ast, ExpressionActionsPtr & actions, bool no_subqueries);
/// Добавить агрегатные функции в aggregate_descriptions.
/// Установить has_aggregation = true, если есть хоть одна агрегатная функция.
void getAggregatesImpl(ASTPtr ast, ExpressionActionsPtr & actions);
void getAggregates(ASTPtr ast, ExpressionActionsPtr & actions);
/** Получить множество нужных столбцов для чтения из таблицы.
* При этом, столбцы, указанные в ignored_names, считаются ненужными. И параметр ignored_names может модифицироваться.

View File

@ -41,7 +41,7 @@ void ExpressionAnalyzer::init()
{
select_query = typeid_cast<ASTSelectQuery *>(&*ast);
createAliasesDict(ast); /// Если есть агрегатные функции, присвоит has_aggregation=true.
createAliasesDict(ast); /// Если есть агрегатные функции, присвоит has_aggregation = true.
normalizeTree();
findExternalTables(ast);
@ -58,17 +58,17 @@ void ExpressionAnalyzer::init()
if (select_query && select_query->array_join_expression_list)
{
getRootActionsImpl(select_query->array_join_expression_list, true, false, temp_actions);
getRootActions(select_query->array_join_expression_list, true, false, temp_actions);
addMultipleArrayJoinAction(temp_actions);
}
if (select_query && select_query->join)
{
getRootActionsImpl(typeid_cast<ASTJoin &>(*select_query->join).using_expr_list, true, false, temp_actions);
getRootActions(typeid_cast<ASTJoin &>(*select_query->join).using_expr_list, true, false, temp_actions);
addJoinAction(temp_actions, true);
}
getAggregatesImpl(ast, temp_actions);
getAggregates(ast, temp_actions);
if (has_aggregation)
{
@ -81,7 +81,7 @@ void ExpressionAnalyzer::init()
const ASTs & group_asts = select_query->group_expression_list->children;
for (size_t i = 0; i < group_asts.size(); ++i)
{
getRootActionsImpl(group_asts[i], true, false, temp_actions);
getRootActions(group_asts[i], true, false, temp_actions);
NameAndTypePair key;
key.first = group_asts[i]->getColumnName();
@ -315,8 +315,9 @@ void ExpressionAnalyzer::normalizeTreeImpl(ASTPtr & ast, MapOfASTs & finished_as
node->kind = ASTFunction::FUNCTION;
}
/// Для GLOBAL IN.
if (do_global && (node->name == "globalIn" || node->name == "globalNotIn"))
addExternalStorage(node);
addExternalStorage(node->arguments->children.at(1));
}
if (ASTJoin * node = typeid_cast<ASTJoin *>(&*ast))
@ -325,8 +326,9 @@ void ExpressionAnalyzer::normalizeTreeImpl(ASTPtr & ast, MapOfASTs & finished_as
if (ASTIdentifier * right = typeid_cast<ASTIdentifier *>(&*node->table))
right->kind = ASTIdentifier::Table;
/* if (do_global && node->locality == ASTJoin::Global)
addExternalStorage(node->table);*/
/// Для GLOBAL JOIN.
if (do_global && node->locality == ASTJoin::Global)
addExternalStorage(node->table);
}
current_asts.erase(initial_ast);
@ -371,113 +373,99 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(ASTPtr & node, const Block & sampl
void ExpressionAnalyzer::findExternalTables(ASTPtr & ast)
{
/// Рекурсивные вызовы. Намеренно опускаемся в подзапросы.
/// Обход снизу. Намеренно опускаемся в подзапросы.
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
findExternalTables(*it);
/// Если идентификатор типа таблица
StoragePtr external_storage;
if (ASTIdentifier * node = typeid_cast<ASTIdentifier *>(&*ast))
if (node->kind == ASTIdentifier::Kind::Table)
if ((external_storage = context.tryGetExternalTable(node->name)))
external_tables[node->name] = external_storage;
if (ASTFunction * node = typeid_cast<ASTFunction *>(&*ast))
{
if (node->name == "globalIn" || node->name == "globalNotIn" || node->name == "In" || node->name == "NotIn")
{
IAST & args = *node->arguments;
ASTPtr & arg = args.children[1];
/// Если имя таблицы для селекта
if (ASTIdentifier * id = typeid_cast<ASTIdentifier *>(&*arg))
if ((external_storage = context.tryGetExternalTable(id->name)))
external_tables[id->name] = external_storage;
}
}
}
void ExpressionAnalyzer::addExternalStorage(ASTFunction * node)
void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name)
{
/// Сгенерируем имя для внешней таблицы.
String external_table_name = "_data";
while (context.tryGetExternalTable(external_table_name + toString(external_table_id)))
while (context.tryGetExternalTable("_data" + toString(external_table_id)))
++external_table_id;
IAST & args = *node->arguments; /// TODO Для JOIN.
ASTPtr & arg = args.children[1];
StoragePtr external_storage;
/// Если подзапрос или имя таблицы для селекта
if (typeid_cast<const ASTSubquery *>(&*arg) || typeid_cast<const ASTIdentifier *>(&*arg))
/// Подзапрос или имя таблицы. Имя таблицы аналогично подзапросу SELECT * FROM t.
const ASTSubquery * subquery = typeid_cast<const ASTSubquery *>(&*subquery_or_table_name);
const ASTIdentifier * table = typeid_cast<const ASTIdentifier *>(&*subquery_or_table_name);
if (!subquery && !table)
throw Exception("IN/JOIN supports only SELECT subqueries.", ErrorCodes::BAD_ARGUMENTS);
/** Для подзапроса в секции IN/JOIN не действуют ограничения на максимальный размер результата.
* Так как результат этого поздапроса - ещё не результат всего запроса.
* Вместо этого работают ограничения
* max_rows_in_set, max_bytes_in_set, set_overflow_mode,
* max_rows_in_join, max_bytes_in_join, join_overflow_mode.
*/
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.limits.max_result_rows = 0;
subquery_settings.limits.max_result_bytes = 0;
/// Вычисление extremes не имеет смысла и не нужно (если его делать, то в результате всего запроса могут взяться extremes подзапроса).
subquery_settings.extremes = 0;
subquery_context.setSettings(subquery_settings);
ASTPtr query;
if (table)
{
/** Для подзапроса в секции IN/JOIN не действуют ограничения на максимальный размер результата.
* Так как результат этого поздапроса - ещё не результат всего запроса.
* Вместо этого работают ограничения
* max_rows_in_set, max_bytes_in_set, set_overflow_mode,
* max_rows_in_join, max_bytes_in_join, join_overflow_mode.
*/
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.limits.max_result_rows = 0;
subquery_settings.limits.max_result_bytes = 0;
/// Вычисление extremes не имеет смысла и не нужно (если его делать, то в результате всего запроса могут взяться extremes подзапроса).
subquery_settings.extremes = 0;
subquery_context.setSettings(subquery_settings);
ParserSelectQuery parser;
ASTPtr subquery;
if (const ASTIdentifier * table = typeid_cast<const ASTIdentifier *>(&*arg))
StoragePtr existing_storage;
/// Если это уже внешняя таблица, ничего заполять не нужно. Просто запоминаем ее наличие.
if ((existing_storage = context.tryGetExternalTable(table->name)))
{
ParserSelectQuery parser;
StoragePtr existing_storage;
/// Если это уже внешняя таблица, ничего заполять не нужно. Просто запоминаем ее наличие.
if ((existing_storage = context.tryGetExternalTable(table->name)))
{
external_tables[table->name] = existing_storage;
return;
}
String query = "SELECT * FROM " + table->name;
const char * begin = query.data();
const char * end = begin + query.size();
const char * pos = begin;
Expected expected = "";
bool parse_res = parser.parse(pos, end, subquery, expected);
if (!parse_res)
throw Exception("Error in parsing SELECT query while creating set or join for table " + table->name + ".",
ErrorCodes::LOGICAL_ERROR);
external_tables[table->name] = existing_storage;
return;
}
else
subquery = arg->children[0];
InterpreterSelectQuery interpreter(subquery, subquery_context, QueryProcessingStage::Complete, subquery_depth + 1);
String query_str = "SELECT * FROM " + table->name;
const char * begin = query_str.data();
const char * end = begin + query_str.size();
const char * pos = begin;
Expected expected = "";
Block sample = interpreter.getSampleBlock();
NamesAndTypesListPtr columns = new NamesAndTypesList(sample.getColumnsList());
String external_table_name = "_data" + toString(external_table_id++);
external_storage = StorageMemory::create(external_table_name, columns);
ASTIdentifier * ast_ident = new ASTIdentifier;
ast_ident->kind = ASTIdentifier::Table;
ast_ident->name = external_storage->getTableName();
arg = ast_ident;
external_tables[external_table_name] = external_storage;
external_data[external_table_name] = interpreter.execute();
/// Добавляем множество, при обработке которого будет заполнена внешняя таблица.
ASTSet * ast_set = new ASTSet("external_" + arg->getColumnName());
ast_set->set = new Set(settings.limits);
ast_set->set->setSource(external_data[external_table_name]);
ast_set->set->setExternalOutput(external_tables[external_table_name]);
ast_set->set->setOnlyExternal(true);
sets_with_subqueries[ast_set->getColumnName()] = ast_set->set;
bool parse_res = parser.parse(pos, end, query, expected);
if (!parse_res)
throw Exception("Error in parsing SELECT query while creating set or join for table " + table->name + ".",
ErrorCodes::LOGICAL_ERROR);
}
else
throw Exception("IN/JOIN supports only SELECT subqueries.", ErrorCodes::BAD_ARGUMENTS);
query = subquery->children.at(0);
InterpreterSelectQuery interpreter(query, subquery_context, QueryProcessingStage::Complete, subquery_depth + 1);
Block sample = interpreter.getSampleBlock();
NamesAndTypesListPtr columns = new NamesAndTypesList(sample.getColumnsList());
String external_table_name = "_data" + toString(external_table_id++);
external_storage = StorageMemory::create(external_table_name, columns);
ASTIdentifier * ast_ident = new ASTIdentifier;
ast_ident->kind = ASTIdentifier::Table;
ast_ident->name = external_storage->getTableName();
subquery_or_table_name = ast_ident;
external_tables[external_table_name] = external_storage;
external_data[external_table_name] = interpreter.execute();
/// Добавляем множество, при обработке которого будет заполнена внешняя таблица. // TODO JOIN
ASTSet * ast_set = new ASTSet("external_" + subquery_or_table_name->getColumnName());
ast_set->set = new Set(settings.limits);
ast_set->set->setSource(external_data[external_table_name]);
ast_set->set->setExternalOutput(external_tables[external_table_name]);
ast_set->set->setOnlyExternal(true);
sets_with_subqueries[ast_set->getColumnName()] = ast_set->set;
}
@ -757,7 +745,7 @@ struct ExpressionAnalyzer::ScopeStack
};
void ExpressionAnalyzer::getRootActionsImpl(ASTPtr ast, bool no_subqueries, bool only_consts, ExpressionActionsPtr & actions)
void ExpressionAnalyzer::getRootActions(ASTPtr ast, bool no_subqueries, bool only_consts, ExpressionActionsPtr & actions)
{
ScopeStack scopes(actions, settings);
getActionsImpl(ast, no_subqueries, only_consts, scopes);
@ -1103,7 +1091,7 @@ void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool onl
}
void ExpressionAnalyzer::getAggregatesImpl(ASTPtr ast, ExpressionActionsPtr & actions)
void ExpressionAnalyzer::getAggregates(ASTPtr ast, ExpressionActionsPtr & actions)
{
ASTFunction * node = typeid_cast<ASTFunction *>(&*ast);
if (node && node->kind == ASTFunction::AGGREGATE_FUNCTION)
@ -1122,7 +1110,7 @@ void ExpressionAnalyzer::getAggregatesImpl(ASTPtr ast, ExpressionActionsPtr & ac
for (size_t i = 0; i < arguments.size(); ++i)
{
getRootActionsImpl(arguments[i], true, false, actions);
getRootActions(arguments[i], true, false, actions);
const std::string & name = arguments[i]->getColumnName();
types[i] = actions->getSampleBlock().getByName(name).type;
aggregate.argument_names[i] = name;
@ -1158,7 +1146,7 @@ void ExpressionAnalyzer::getAggregatesImpl(ASTPtr ast, ExpressionActionsPtr & ac
{
ASTPtr child = ast->children[i];
if (!typeid_cast<ASTSubquery *>(&*child) && !typeid_cast<ASTSelectQuery *>(&*child))
getAggregatesImpl(child, actions);
getAggregates(child, actions);
}
}
}
@ -1207,7 +1195,7 @@ bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool on
initChain(chain, columns);
ExpressionActionsChain::Step & step = chain.steps.back();
getRootActionsImpl(select_query->array_join_expression_list, only_types, false, step.actions);
getRootActions(select_query->array_join_expression_list, only_types, false, step.actions);
addMultipleArrayJoinAction(step.actions);
@ -1230,7 +1218,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
ExpressionActionsChain::Step & step = chain.steps.back();
ASTJoin & ast_join = typeid_cast<ASTJoin &>(*select_query->join);
getRootActionsImpl(ast_join.using_expr_list, only_types, false, step.actions);
getRootActions(ast_join.using_expr_list, only_types, false, step.actions);
{
Names join_key_names_left(join_key_names_left_set.begin(), join_key_names_left_set.end());
@ -1285,7 +1273,7 @@ bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_t
ExpressionActionsChain::Step & step = chain.steps.back();
step.required_output.push_back(select_query->where_expression->getColumnName());
getRootActionsImpl(select_query->where_expression, only_types, false, step.actions);
getRootActions(select_query->where_expression, only_types, false, step.actions);
return true;
}
@ -1304,7 +1292,7 @@ bool ExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain, bool only
for (size_t i = 0; i < asts.size(); ++i)
{
step.required_output.push_back(asts[i]->getColumnName());
getRootActionsImpl(asts[i], only_types, false, step.actions);
getRootActions(asts[i], only_types, false, step.actions);
}
return true;
@ -1325,13 +1313,13 @@ void ExpressionAnalyzer::appendAggregateFunctionsArguments(ExpressionActionsChai
}
}
getActionsBeforeAggregationImpl(select_query->select_expression_list, step.actions, only_types);
getActionsBeforeAggregation(select_query->select_expression_list, step.actions, only_types);
if (select_query->having_expression)
getActionsBeforeAggregationImpl(select_query->having_expression, step.actions, only_types);
getActionsBeforeAggregation(select_query->having_expression, step.actions, only_types);
if (select_query->order_expression_list)
getActionsBeforeAggregationImpl(select_query->order_expression_list, step.actions, only_types);
getActionsBeforeAggregation(select_query->order_expression_list, step.actions, only_types);
}
bool ExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain, bool only_types)
@ -1345,7 +1333,7 @@ bool ExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain, bool only_
ExpressionActionsChain::Step & step = chain.steps.back();
step.required_output.push_back(select_query->having_expression->getColumnName());
getRootActionsImpl(select_query->having_expression, only_types, false, step.actions);
getRootActions(select_query->having_expression, only_types, false, step.actions);
return true;
}
@ -1357,7 +1345,7 @@ void ExpressionAnalyzer::appendSelect(ExpressionActionsChain & chain, bool only_
initChain(chain, aggregated_columns);
ExpressionActionsChain::Step & step = chain.steps.back();
getRootActionsImpl(select_query->select_expression_list, only_types, false, step.actions);
getRootActions(select_query->select_expression_list, only_types, false, step.actions);
ASTs asts = select_query->select_expression_list->children;
for (size_t i = 0; i < asts.size(); ++i)
@ -1376,7 +1364,7 @@ bool ExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, bool only
initChain(chain, aggregated_columns);
ExpressionActionsChain::Step & step = chain.steps.back();
getRootActionsImpl(select_query->order_expression_list, only_types, false, step.actions);
getRootActions(select_query->order_expression_list, only_types, false, step.actions);
ASTs asts = select_query->order_expression_list->children;
for (size_t i = 0; i < asts.size(); ++i)
@ -1436,7 +1424,7 @@ Block ExpressionAnalyzer::getSelectSampleBlock()
for (size_t i = 0; i < asts.size(); ++i)
{
result_columns.push_back(NameWithAlias(asts[i]->getColumnName(), asts[i]->getAliasOrColumnName()));
getRootActionsImpl(asts[i], true, false, temp_actions);
getRootActions(asts[i], true, false, temp_actions);
}
temp_actions->add(ExpressionAction::project(result_columns));
@ -1444,25 +1432,16 @@ Block ExpressionAnalyzer::getSelectSampleBlock()
return temp_actions->getSampleBlock();
}
void ExpressionAnalyzer::getActionsBeforeAggregationImpl(ASTPtr ast, ExpressionActionsPtr & actions, bool no_subqueries)
void ExpressionAnalyzer::getActionsBeforeAggregation(ASTPtr ast, ExpressionActionsPtr & actions, bool no_subqueries)
{
ASTFunction * node = typeid_cast<ASTFunction *>(&*ast);
if (node && node->kind == ASTFunction::AGGREGATE_FUNCTION)
{
ASTs & arguments = node->arguments->children;
for (size_t i = 0; i < arguments.size(); ++i)
{
getRootActionsImpl(arguments[i], no_subqueries, false, actions);
}
}
if (node && node->kind == ASTFunction::AGGREGATE_FUNCTION)
for (auto & argument : node->arguments->children)
getRootActions(argument, no_subqueries, false, actions);
else
{
for (size_t i = 0; i < ast->children.size(); ++i)
{
getActionsBeforeAggregationImpl(ast->children[i], actions, no_subqueries);
}
}
for (auto & child : ast->children)
getActionsBeforeAggregation(child, actions, no_subqueries);
}
@ -1489,7 +1468,7 @@ ExpressionActionsPtr ExpressionAnalyzer::getActions(bool project_result)
alias = name;
result_columns.push_back(NameWithAlias(name, alias));
result_names.push_back(alias);
getRootActionsImpl(asts[i], false, false, actions);
getRootActions(asts[i], false, false, actions);
}
if (project_result)
@ -1513,7 +1492,7 @@ ExpressionActionsPtr ExpressionAnalyzer::getConstActions()
{
ExpressionActionsPtr actions = new ExpressionActions(NamesAndTypesList(), settings);
getRootActionsImpl(ast, true, true, actions);
getRootActions(ast, true, true, actions);
return actions;
}