mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Merge
This commit is contained in:
parent
952f177eff
commit
f7aedd227d
@ -68,7 +68,9 @@ public:
|
||||
void appendProjectResult(ExpressionActionsChain & chain);
|
||||
|
||||
/// Если ast не запрос SELECT, просто получает все действия для вычисления выражения.
|
||||
ExpressionActionsPtr getActions();
|
||||
/// Если project_result, в выходном блоке останутся только вычисленные значения в нужном порядке, переименованные в алиасы.
|
||||
/// Иначе, из блока будут удаляться только временные столбцы.
|
||||
ExpressionActionsPtr getActions(bool project_result);
|
||||
|
||||
/// Действия, которые можно сделать над пустым блоком: добавление констант и применение функций, зависящих только от констант.
|
||||
/// Не выполняет подзапросы.
|
||||
|
@ -61,7 +61,7 @@ struct Limits
|
||||
size_t max_subquery_depth;
|
||||
size_t max_pipeline_depth;
|
||||
size_t max_ast_depth; /// Проверяются не во время парсинга,
|
||||
size_t max_ast_elements; /// а уже после парсинга запроса. TODO: циклы при разборе алиасов в Expression.
|
||||
size_t max_ast_elements; /// а уже после парсинга запроса.
|
||||
|
||||
bool readonly;
|
||||
|
||||
|
@ -3,7 +3,6 @@
|
||||
#include <sstream>
|
||||
|
||||
#include <DB/Interpreters/Context.h>
|
||||
#include <DB/Interpreters/Expression.h>
|
||||
#include <DB/Storages/IStorage.h>
|
||||
#include <DB/Core/SortDescription.h>
|
||||
#include <DB/Parsers/ASTExpressionList.h>
|
||||
|
@ -5,7 +5,7 @@
|
||||
|
||||
#include <DB/Core/SortDescription.h>
|
||||
#include <DB/Interpreters/Context.h>
|
||||
#include <DB/Interpreters/Expression.h>
|
||||
#include <DB/Interpreters/ExpressionActions.h>
|
||||
#include <DB/Storages/IStorage.h>
|
||||
|
||||
|
||||
@ -172,7 +172,7 @@ private:
|
||||
|
||||
StorageMergeTreeSettings settings;
|
||||
|
||||
SharedPtr<Expression> primary_expr;
|
||||
ExpressionActionsPtr primary_expr;
|
||||
SortDescription sort_descr;
|
||||
Block primary_key_sample;
|
||||
|
||||
@ -298,7 +298,7 @@ private:
|
||||
BlockInputStreams spreadMarkRangesAmongThreadsFinal(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size);
|
||||
|
||||
/// Создать выражение "Sign == 1".
|
||||
void createPositiveSignCondition(ExpressionPtr & out_expression, String & out_column);
|
||||
void createPositiveSignCondition(ExpressionActionsPtr & out_expression, String & out_column);
|
||||
|
||||
/// Загрузить множество кусков с данными с диска. Вызывается один раз - при создании объекта.
|
||||
void loadDataParts();
|
||||
|
@ -912,7 +912,7 @@ void ExpressionAnalyzer::getActionsBeforeAggregationImpl(ASTPtr ast, ExpressionA
|
||||
}
|
||||
|
||||
|
||||
ExpressionActionsPtr ExpressionAnalyzer::getActions()
|
||||
ExpressionActionsPtr ExpressionAnalyzer::getActions(bool project_result)
|
||||
{
|
||||
ExpressionActionsPtr actions = new ExpressionActions(columns, settings);
|
||||
NamesWithAliases result_columns;
|
||||
@ -924,18 +924,26 @@ ExpressionActionsPtr ExpressionAnalyzer::getActions()
|
||||
for (size_t i = 0; i < asts.size(); ++i)
|
||||
{
|
||||
result_columns.push_back(NameWithAlias(asts[i]->getColumnName(), asts[i]->getAlias()));
|
||||
result_names.push_back(result_columns.back().first);
|
||||
result_names.push_back(result_columns.back().second);
|
||||
getActionsImpl(asts[i], false, false, *actions);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
result_columns.push_back(NameWithAlias(ast->getColumnName(), ast->getAlias()));
|
||||
result_names.push_back(result_columns.back().first);
|
||||
result_names.push_back(result_columns.back().second);
|
||||
getActionsImpl(ast, false, false, *actions);
|
||||
}
|
||||
|
||||
actions->add(ExpressionActions::Action::project(result_columns));
|
||||
if (project_result)
|
||||
{
|
||||
actions->add(ExpressionActions::Action::project(result_columns));
|
||||
}
|
||||
else
|
||||
{
|
||||
for (NamesAndTypesList::iterator it = columns.begin(); it != columns.end(); ++it)
|
||||
result_names.push_back(it->first);
|
||||
}
|
||||
|
||||
actions->finalize(result_names);
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include <DB/Storages/MergeTree/PKCondition.h>
|
||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||
#include <DB/Interpreters/ExpressionAnalyzer.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -15,7 +16,7 @@ PKCondition::PKCondition(ASTPtr query, const Context & context_, const NamesAndT
|
||||
/** Вычисление выражений, зависящих только от констант.
|
||||
* Чтобы индекс мог использоваться, если написано, например WHERE Date = toDate(now()).
|
||||
*/
|
||||
Expression expr_for_constant_folding(query, context_, all_columns);
|
||||
ExpressionActionsPtr expr_for_constant_folding = ExpressionAnalyzer(query, context_, all_columns).getConstActions();
|
||||
Block block_with_constants;
|
||||
|
||||
/// В блоке должен быть хотя бы один столбец, чтобы у него было известно число строк.
|
||||
@ -25,7 +26,7 @@ PKCondition::PKCondition(ASTPtr query, const Context & context_, const NamesAndT
|
||||
dummy_column.column = new ColumnConstUInt8(1, 0);
|
||||
block_with_constants.insert(dummy_column);
|
||||
|
||||
expr_for_constant_folding.execute(block_with_constants, 0, true);
|
||||
expr_for_constant_folding->execute(block_with_constants);
|
||||
|
||||
/// Преобразуем секцию WHERE в обратную польскую строку.
|
||||
ASTSelectQuery & select = dynamic_cast<ASTSelectQuery &>(*query);
|
||||
|
@ -31,7 +31,6 @@
|
||||
#include <DB/DataStreams/narrowBlockInputStreams.h>
|
||||
#include <DB/DataStreams/copyData.h>
|
||||
#include <DB/DataStreams/FilterBlockInputStream.h>
|
||||
#include <DB/DataStreams/OldExpressionBlockInputStream.h>
|
||||
|
||||
#include <DB/Parsers/ASTExpressionList.h>
|
||||
#include <DB/Parsers/ASTSelectQuery.h>
|
||||
@ -40,6 +39,7 @@
|
||||
#include <DB/Parsers/ASTIdentifier.h>
|
||||
|
||||
#include <DB/Interpreters/sortBlock.h>
|
||||
#include <DB/Interpreters/ExpressionAnalyzer.h>
|
||||
|
||||
#include <DB/Storages/StorageMergeTree.h>
|
||||
#include <DB/Storages/MergeTree/PKCondition.h>
|
||||
@ -83,7 +83,7 @@ StorageMergeTree::StorageMergeTree(
|
||||
sort_descr.push_back(SortColumnDescription(name, 1));
|
||||
}
|
||||
|
||||
primary_expr = new Expression(primary_expr_ast, context, *columns);
|
||||
primary_expr = ExpressionAnalyzer(primary_expr_ast, context, *columns).getActions(false);
|
||||
primary_key_sample = primary_expr->getSampleBlock();
|
||||
|
||||
merge_threads = new boost::threadpool::pool(settings.merging_threads);
|
||||
@ -147,7 +147,7 @@ BlockInputStreams StorageMergeTree::read(
|
||||
UInt64 sampling_column_value_limit = 0;
|
||||
typedef Poco::SharedPtr<ASTFunction> ASTFunctionPtr;
|
||||
ASTFunctionPtr filter_function;
|
||||
ExpressionPtr filter_expression;
|
||||
ExpressionActionsPtr filter_expression;
|
||||
|
||||
ASTSelectQuery & select = *dynamic_cast<ASTSelectQuery*>(&*query);
|
||||
if (select.sample_size)
|
||||
@ -181,7 +181,7 @@ BlockInputStreams StorageMergeTree::read(
|
||||
}
|
||||
|
||||
UInt64 sampling_column_max = 0;
|
||||
DataTypePtr type = Expression(sampling_expression, context, *columns).getReturnTypes()[0];
|
||||
DataTypePtr type = primary_expr->getSampleBlock().getByName(sampling_expression->getColumnName()).type;
|
||||
|
||||
if (type->getName() == "UInt64")
|
||||
sampling_column_max = std::numeric_limits<UInt64>::max();
|
||||
@ -211,7 +211,7 @@ BlockInputStreams StorageMergeTree::read(
|
||||
filter_function->arguments = filter_function_args;
|
||||
filter_function->children.push_back(filter_function->arguments);
|
||||
|
||||
filter_expression = new Expression(filter_function, context, *columns);
|
||||
filter_expression = ExpressionAnalyzer(filter_function, context, *columns).getActions(false);
|
||||
|
||||
/// Добавим столбцы, нужные для sampling_expression.
|
||||
std::vector<String> add_columns = filter_expression->getRequiredColumns();
|
||||
@ -274,7 +274,7 @@ BlockInputStreams StorageMergeTree::read(
|
||||
for (size_t i = 0; i < res.size(); ++i)
|
||||
{
|
||||
BlockInputStreamPtr original_stream = res[i];
|
||||
BlockInputStreamPtr expression_stream = new OldExpressionBlockInputStream(original_stream, filter_expression);
|
||||
BlockInputStreamPtr expression_stream = new ExpressionBlockInputStream(original_stream, filter_expression);
|
||||
BlockInputStreamPtr filter_stream = new FilterBlockInputStream(expression_stream, filter_function->getColumnName());
|
||||
res[i] = filter_stream;
|
||||
}
|
||||
@ -391,7 +391,7 @@ BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreads(RangesInDataPar
|
||||
/// Распределить засечки между потоками и сделать, чтобы в ответе (почти) все данные были сколлапсированы (модификатор FINAL).
|
||||
BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreadsFinal(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size)
|
||||
{
|
||||
ExpressionPtr sign_filter_expression;
|
||||
ExpressionActionsPtr sign_filter_expression;
|
||||
String sign_filter_column;
|
||||
createPositiveSignCondition(sign_filter_expression, sign_filter_column);
|
||||
|
||||
@ -405,12 +405,12 @@ BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreadsFinal(RangesInDa
|
||||
max_block_size, column_names, *this,
|
||||
part.data_part, part.ranges, thisPtr());
|
||||
|
||||
to_collapse.push_back(new OldExpressionBlockInputStream(source_stream, primary_expr));
|
||||
to_collapse.push_back(new ExpressionBlockInputStream(source_stream, primary_expr));
|
||||
}
|
||||
|
||||
BlockInputStreams res;
|
||||
if (to_collapse.size() == 1)
|
||||
res.push_back(new FilterBlockInputStream(new OldExpressionBlockInputStream(to_collapse[0], sign_filter_expression), sign_filter_column));
|
||||
res.push_back(new FilterBlockInputStream(new ExpressionBlockInputStream(to_collapse[0], sign_filter_expression), sign_filter_column));
|
||||
else if (to_collapse.size() > 1)
|
||||
res.push_back(new CollapsingFinalBlockInputStream(to_collapse, sort_descr, sign_column));
|
||||
|
||||
@ -418,7 +418,7 @@ BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreadsFinal(RangesInDa
|
||||
}
|
||||
|
||||
|
||||
void StorageMergeTree::createPositiveSignCondition(ExpressionPtr & out_expression, String & out_column)
|
||||
void StorageMergeTree::createPositiveSignCondition(ExpressionActionsPtr & out_expression, String & out_column)
|
||||
{
|
||||
ASTFunction * function = new ASTFunction;
|
||||
ASTPtr function_ptr = function;
|
||||
@ -445,7 +445,7 @@ void StorageMergeTree::createPositiveSignCondition(ExpressionPtr & out_expressio
|
||||
one->type = new DataTypeInt8;
|
||||
one->value = Field(static_cast<Int64>(1));
|
||||
|
||||
out_expression = new Expression(function_ptr, context, *columns);
|
||||
out_expression = ExpressionAnalyzer(function_ptr, context, *columns).getActions(false);
|
||||
out_column = function->getColumnName();
|
||||
}
|
||||
|
||||
@ -839,7 +839,7 @@ void StorageMergeTree::mergeParts(std::vector<DataPartPtr> parts)
|
||||
for (size_t i = 0; i < parts.size(); ++i)
|
||||
{
|
||||
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
|
||||
src_streams.push_back(new OldExpressionBlockInputStream(new MergeTreeBlockInputStream(
|
||||
src_streams.push_back(new ExpressionBlockInputStream(new MergeTreeBlockInputStream(
|
||||
full_path + parts[i]->name + '/', DEFAULT_BLOCK_SIZE, all_column_names, *this, parts[i], ranges, StoragePtr()), primary_expr));
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user