dbms: development [#METR-11370].

This commit is contained in:
Alexey Milovidov 2014-06-13 03:21:38 +04:00
parent 607485d82a
commit 1ba07d65ad
8 changed files with 165 additions and 84 deletions

View File

@ -135,6 +135,7 @@ private:
AggregateDescriptions aggregate_descriptions;
std::unordered_map<String, SetPtr> sets_with_subqueries;
Joins joins;
typedef std::unordered_map<String, ASTPtr> Aliases;
Aliases aliases;
@ -158,8 +159,18 @@ private:
static NamesAndTypesList::iterator findColumn(const String & name, NamesAndTypesList & cols);
NamesAndTypesList::iterator findColumn(const String & name) { return findColumn(name, columns); }
/** Из списка всех доступных столбцов таблицы (columns) удалить все ненужные.
* Заодно, сформировать множество неизвестных столбцов (unknown_required_columns).
*/
void removeUnusedColumns();
/** Найти обычные (не ARRAY) JOIN-ы, записать их в joins.
* Удалить из множества столбцов required_columns те, которые будут присоединены (JOIN).
* То есть, столбцы, являющиеся результатом подзапроса в секции JOIN,
* но не входящие в ключи JOIN-а (USING).
*/
void findJoins(NameSet & required_columns);
/** Создать словарь алиасов.
*/
void createAliasesDict(ASTPtr & ast, int ignore_levels = 0);

View File

@ -16,12 +16,29 @@ namespace DB
class InterpreterSelectQuery
{
public:
InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete, size_t subquery_depth_ = 0, BlockInputStreamPtr input = nullptr);
InterpreterSelectQuery(
ASTPtr query_ptr_,
const Context & context_,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr);
InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, const Names & required_column_names,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete, size_t subquery_depth_ = 0, BlockInputStreamPtr input = nullptr);
InterpreterSelectQuery(
ASTPtr query_ptr_,
const Context & context_,
const Names & required_column_names, bool ignore_unknown_required_columns_,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr);
InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, const Names & required_column_names, const NamesAndTypesList & table_column_names, QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete, size_t subquery_depth_ = 0, BlockInputStreamPtr input = nullptr);
InterpreterSelectQuery(
ASTPtr query_ptr_,
const Context & context_,
const Names & required_column_names, bool ignore_unknown_required_columns_,
const NamesAndTypesList & table_column_names,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr);
/// Выполнить запрос, получить поток блоков для чтения
BlockInputStreamPtr execute();
@ -36,13 +53,13 @@ public:
private:
typedef Poco::SharedPtr<ExpressionAnalyzer> ExpressionAnalyzerPtr;
void init(BlockInputStreamPtr input, const NamesAndTypesList & table_column_names = NamesAndTypesList());
/** Из какой таблицы читать. JOIN-ы не поддерживаются.
*/
void getDatabaseAndTableNames(String & database_name, String & table_name);
/** Выбрать из списка столбцов какой-нибудь, лучше - минимального размера.
*/
String getAnyColumn();

View File

@ -42,9 +42,24 @@ public:
ASTJoin() {}
ASTJoin(StringRange range_) : IAST(range_) {}
/** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "Join"; };
String getID() const
{
String res;
{
WriteBufferFromString wb(res);
if (locality == Global)
writeString("Global", wb);
writeString(strictness == Any ? "Any" : "All", wb);
writeString(kind == Inner ? "Inner" : "Left", wb);
writeString("Join", wb);
}
return res;
};
ASTPtr clone() const
{

View File

@ -33,7 +33,7 @@ public:
ASTSelectQuery() {}
ASTSelectQuery(StringRange range_) : ASTQueryWithOutput(range_) {}
/** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "SelectQuery"; };
@ -52,7 +52,7 @@ public:
}
/// Переписывает select_expression_list, чтобы вернуть только необходимые столбцы в правильном порядке.
void rewriteSelectExpressionList(const Names & column_names)
void rewriteSelectExpressionList(const Names & column_names, bool ignore_unknown = false)
{
ASTPtr result = new ASTExpressionList;
ASTs asts = select_expression_list->children;
@ -80,11 +80,10 @@ public:
done = 1;
}
}
if (!done)
if (!done && !ignore_unknown)
throw Exception("Error while rewriting expressioin list for select query."
" Could not find alias: " + column_names[i],
DB::ErrorCodes::UNKNOWN_IDENTIFIER);
}
for (auto & child : children)

View File

@ -173,7 +173,7 @@ BlockInputStreamPtr getVirtualColumnsBlocks(ASTPtr query, const Block & input, c
new_select.children.push_back(new_select.where_expression);
/// Возвращаем результат выполнения нового запроса на блоке виртуальных функций
InterpreterSelectQuery interpreter(new_query, context, columns, input.getColumnsList(),
InterpreterSelectQuery interpreter(new_query, context, columns, false, input.getColumnsList(),
QueryProcessingStage::Complete, 0, new OneBlockInputStream(input));
return interpreter.execute();

View File

@ -1376,42 +1376,7 @@ Sets ExpressionAnalyzer::getSetsWithSubqueries()
Joins ExpressionAnalyzer::getJoinsWithSubqueries()
{
std::cerr << __PRETTY_FUNCTION__ << std::endl;
if (select_query->join)
{
std::cerr << "Found JOIN" << std::endl;
auto & node = dynamic_cast<ASTJoin &>(*select_query->join);
auto & join_keys_expr_list = dynamic_cast<ASTExpressionList &>(*node.using_expr_list);
size_t num_join_keys = join_keys_expr_list.children.size();
Names join_key_names(num_join_keys);
for (size_t i = 0; i < num_join_keys; ++i)
join_key_names[i] = join_keys_expr_list.children[i]->getColumnName();
JoinPtr join = new Join(join_key_names, settings.limits);
/** Для подзапроса в секции JOIN не действуют ограничения на максимальный размер результата.
* Так как результат этого поздапроса - ещё не результат всего запроса.
* Вместо этого работают ограничения max_rows_in_set, max_bytes_in_set, set_overflow_mode.
* TODO: отдельные ограничения для JOIN.
*/
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.limits.max_result_rows = 0;
subquery_settings.limits.max_result_bytes = 0;
/// Вычисление extremes не имеет смысла и не нужно (если его делать, то в результате всего запроса могут взяться extremes подзапроса).
subquery_settings.extremes = 0;
subquery_context.setSettings(subquery_settings);
InterpreterSelectQuery interpreter(node.subquery->children[0], subquery_context, QueryProcessingStage::Complete, subquery_depth + 1);
join->setSource(interpreter.execute());
return Joins(1, join);
}
return Joins();
return joins;
}
@ -1548,6 +1513,7 @@ void ExpressionAnalyzer::removeUnusedColumns()
}
getRequiredColumnsImpl(ast, required, ignored);
findJoins(required);
/// Вставляем в список требуемых столбцов столбцы, нужные для вычисления ARRAY JOIN.
NameSet array_join_sources;
@ -1591,6 +1557,65 @@ void ExpressionAnalyzer::removeUnusedColumns()
}
}
void ExpressionAnalyzer::findJoins(NameSet & required_columns)
{
if (!select_query || !select_query->join)
return;
std::cerr << "Found JOIN" << std::endl;
auto & node = dynamic_cast<ASTJoin &>(*select_query->join);
auto & join_keys_expr_list = dynamic_cast<ASTExpressionList &>(*node.using_expr_list);
size_t num_join_keys = join_keys_expr_list.children.size();
Names join_key_names(num_join_keys);
NameSet join_key_names_set;
for (size_t i = 0; i < num_join_keys; ++i)
{
String name = join_keys_expr_list.children[i]->getColumnName();
join_key_names[i] = name;
if (!join_key_names_set.insert(name).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
}
JoinPtr join = new Join(join_key_names, settings.limits);
/** Для подзапроса в секции JOIN не действуют ограничения на максимальный размер результата.
* Так как результат этого поздапроса - ещё не результат всего запроса.
* Вместо этого работают ограничения max_rows_in_set, max_bytes_in_set, set_overflow_mode.
* TODO: отдельные ограничения для JOIN.
*/
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.limits.max_result_rows = 0;
subquery_settings.limits.max_result_bytes = 0;
/// Вычисление extremes не имеет смысла и не нужно (если его делать, то в результате всего запроса могут взяться extremes подзапроса).
subquery_settings.extremes = 0;
subquery_context.setSettings(subquery_settings);
InterpreterSelectQuery interpreter(
node.subquery->children[0], subquery_context,
/// Будем использовать в подзапросе только столбцы, которые есть в required_columns.
Names(required_columns.begin(), required_columns.end()), true,
QueryProcessingStage::Complete, subquery_depth + 1);
Block right_table_sample = interpreter.getSampleBlock();
join->setSource(interpreter.execute());
joins.push_back(join);
/// Удаляем из required_columns столбцы, которые есть в подзапросе, но нет в USING-е.
for (NameSet::iterator it = required_columns.begin(); it != required_columns.end();)
{
if (right_table_sample.has(*it) && !join_key_names_set.count(*it))
required_columns.erase(it++);
else
++it;
}
}
Names ExpressionAnalyzer::getRequiredColumns()
{
if (!unknown_required_columns.empty())
@ -1605,6 +1630,14 @@ Names ExpressionAnalyzer::getRequiredColumns()
void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast, NameSet & required_columns, NameSet & ignored_names)
{
/** Найдём все идентификаторы в запросе.
* Будем искать их рекурсивно, обходя в глубину AST.
* При этом:
* - для лямбда функций не будем брать формальные параметры;
* - не опускаемся в подзапросы (там свои идентификаторы);
* - некоторое исключение для секции ARRAY JOIN (в ней идентификаторы немного другие).
*/
if (ASTIdentifier * node = dynamic_cast<ASTIdentifier *>(&*ast))
{
if (node->kind == ASTIdentifier::Column
@ -1629,7 +1662,7 @@ void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast, NameSet & required_c
if (!lambda_args_tuple || lambda_args_tuple->name != "tuple")
throw Exception("First argument of lambda must be a tuple", ErrorCodes::TYPE_MISMATCH);
/// Не нужно добавлять параметры лямбда-выражения в required_columns.
/// Не нужно добавлять формальные параметры лямбда-выражения в required_columns.
Names added_ignored;
for (size_t i = 0 ; i < lambda_args_tuple->arguments->children.size(); ++i)
{
@ -1655,12 +1688,12 @@ void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast, NameSet & required_c
ASTSelectQuery * select = dynamic_cast<ASTSelectQuery *>(&*ast);
for (size_t i = 0; i < ast->children.size(); ++i)
/// Рекурсивный обход выражения.
for (auto & child : ast->children)
{
ASTPtr child = ast->children[i];
/// Не пойдем в секцию ARRAY JOIN, потому что там нужно смотреть на имена не-ARRAY-JOIN-енных столбцов.
/// Туда removeUnusedColumns отправит нас отдельно.
/** Не пойдем в секцию ARRAY JOIN, потому что там нужно смотреть на имена не-ARRAY-JOIN-енных столбцов.
* Туда removeUnusedColumns отправит нас отдельно.
*/
if (!dynamic_cast<ASTSubquery *>(&*child) && !dynamic_cast<ASTSelectQuery *>(&*child) &&
!(select && child == select->array_join_expression_list))
getRequiredColumnsImpl(child, required_columns, ignored_names);

View File

@ -96,8 +96,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
init(input_);
}
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, const Names & required_column_names_,
QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_)
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_,
const Names & required_column_names_, bool ignore_unknown_required_columns_,
QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_)
: query_ptr(query_ptr_), query(dynamic_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_),
log(&Logger::get("InterpreterSelectQuery"))
@ -106,13 +107,14 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
* Но если используется DISTINCT, то все столбцы считаются нужными, так как иначе DISTINCT работал бы по-другому.
*/
if (!query.distinct)
query.rewriteSelectExpressionList(required_column_names_);
query.rewriteSelectExpressionList(required_column_names_, ignore_unknown_required_columns_);
init(input_);
}
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, const Names & required_column_names_,
const NamesAndTypesList & table_column_names, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_)
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_,
const Names & required_column_names_, bool ignore_unknown_required_columns_,
const NamesAndTypesList & table_column_names, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_)
: query_ptr(query_ptr_), query(dynamic_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_),
log(&Logger::get("InterpreterSelectQuery"))
@ -121,7 +123,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
* Но если используется DISTINCT, то все столбцы считаются нужными, так как иначе DISTINCT работал бы по-другому.
*/
if (!query.distinct)
query.rewriteSelectExpressionList(required_column_names_);
query.rewriteSelectExpressionList(required_column_names_, ignore_unknown_required_columns_);
init(input_, table_column_names);
}
@ -223,7 +225,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
/// Столбцы из списка SELECT, до переименования в алиасы.
Names selected_columns;
/// Нужно ли выполнять первую часть конвейера - выполняемую на удаленных серверах при распределенной обработке.
bool first_stage = from_stage < QueryProcessingStage::WithMergeableState
&& to_stage >= QueryProcessingStage::WithMergeableState;
@ -282,9 +284,9 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
/// Перед выполнением HAVING уберем из блока лишние столбцы (в основном, ключи агрегации).
if (has_having)
before_having->prependProjectInput();
/// Теперь составим потоки блоков, выполняющие нужные действия.
/// Нужно ли агрегировать в отдельную строку строки, не прошедшие max_rows_to_group_by.
bool aggregate_overflow_row =
need_aggregate &&
@ -346,7 +348,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
if (has_order_by)
executeOrder(streams);
executeProjection(streams, final_projection);
/// На этой стадии можно считать минимумы и максимумы, если надо.
@ -360,9 +362,9 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
*/
if (query.limit_length && streams.size() > 1 && !query.distinct)
executePreLimit(streams);
executeUnion(streams);
/// Если было более одного источника - то нужно выполнить DISTINCT ещё раз после их слияния.
if (need_second_distinct_pass)
executeDistinct(streams, false, Names());
@ -383,7 +385,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());
/// Ограничения действуют только на конечный результат.
if (to_stage == QueryProcessingStage::Complete)
{
@ -439,7 +441,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
subquery_context.setSettings(subquery_settings);
interpreter_subquery = new InterpreterSelectQuery(
query.table, subquery_context, required_columns, QueryProcessingStage::Complete, subquery_depth + 1);
query.table, subquery_context, required_columns, false, QueryProcessingStage::Complete, subquery_depth + 1);
}
/// если в настройках установлен default_sample != 1, то все запросы выполняем с сэмплингом
@ -450,10 +452,10 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
if (query.sample_size && (!storage || !storage->supportsSampling()))
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
if (query.final && (!storage || !storage->supportsFinal()))
throw Exception(storage ? "Storage " + storage->getName() + " doesn't support FINAL" : "Illegal FINAL", ErrorCodes::ILLEGAL_FINAL);
if (query.prewhere_expression && (!storage || !storage->supportsPrewhere()))
throw Exception(storage ? "Storage " + storage->getName() + " doesn't support PREWHERE" : "Illegal PREWHERE", ErrorCodes::ILLEGAL_PREWHERE);
@ -471,7 +473,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
Settings settings_for_storage = settings;
if (storage && storage->isRemote())
settings.max_threads = settings.max_distributed_connections;
/// Ограничение на количество столбцов для чтения.
if (settings.limits.max_columns_to_read && required_columns.size() > settings.limits.max_columns_to_read)
throw Exception("Limit for number of columns to read exceeded. "
@ -496,7 +498,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
}
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
query_analyzer->makeSetsForIndex();
/// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос?
if (!interpreter_subquery)
@ -537,7 +539,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
limits.timeout_before_checking_execution_speed = settings.limits.timeout_before_checking_execution_speed;
QuotaForIntervals & quota = context.getQuota();
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&**it))
@ -580,7 +582,7 @@ void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, Exp
query_analyzer->getAggregateInfo(key_names, aggregates);
bool separate_totals = to_stage > QueryProcessingStage::WithMergeableState;
/// Если источников несколько, то выполняем параллельную агрегацию
if (streams.size() > 1)
{
@ -707,7 +709,7 @@ void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams)
limits.max_bytes_to_read = settings.limits.max_bytes_to_sort;
limits.read_overflow_mode = settings.limits.sort_overflow_mode;
sorting_stream->setLimits(limits);
stream = maybeAsynchronous(sorting_stream, is_async);
}

View File

@ -16,6 +16,7 @@ StoragePtr StorageView::create(const String & table_name_, const String & databa
return (new StorageView(table_name_, database_name_, context_, query_, columns_))->thisPtr();
}
StorageView::StorageView(const String & table_name_, const String & database_name_,
Context & context_, ASTPtr & query_, NamesAndTypesListPtr columns_):
table_name(table_name_), database_name(database_name_), context(context_), columns(columns_)
@ -46,9 +47,12 @@ StorageView::StorageView(const String & table_name_, const String & database_nam
" Could not retrieve table name from select query.",
DB::ErrorCodes::LOGICAL_ERROR);
context.getGlobalContext().addDependency(DatabaseAndTableName(select_database_name, select_table_name), DatabaseAndTableName(database_name, table_name));
context.getGlobalContext().addDependency(
DatabaseAndTableName(select_database_name, select_table_name),
DatabaseAndTableName(database_name, table_name));
}
BlockInputStreams StorageView::read(
const Names & column_names,
ASTPtr query,
@ -57,16 +61,16 @@ BlockInputStreams StorageView::read(
size_t max_block_size,
unsigned threads)
{
ASTPtr view_query = getInnerQuery();
InterpreterSelectQuery result (view_query, context, column_names);
BlockInputStreams answer;
answer.push_back(result.execute());
return answer;
return BlockInputStreams(1,
InterpreterSelectQuery(getInnerQuery(), context, column_names, false).execute());
}
void StorageView::drop() {
context.getGlobalContext().removeDependency(DatabaseAndTableName(select_database_name, select_table_name), DatabaseAndTableName(database_name, table_name));
void StorageView::drop()
{
context.getGlobalContext().removeDependency(
DatabaseAndTableName(select_database_name, select_table_name),
DatabaseAndTableName(database_name, table_name));
}