read in order enabled

This commit is contained in:
Dmitry 2020-05-05 17:35:23 +03:00
parent d9ac0e8fef
commit e98c23a8cf
9 changed files with 77 additions and 31 deletions

View File

@ -57,6 +57,12 @@ struct SortColumnDescription
{
return !(*this == other);
}
std::string dump() const {
std::stringstream ss;
ss << column_name << ":" << column_number << ":dir " << direction;
return ss.str();
}
};
/// Description of the sorting rule for several columns.

View File

@ -1042,7 +1042,7 @@ protected:
size_t total_size_of_aggregate_states = 0; /// The total size of the row from the aggregate functions.
// add info to track alignment requirement
// If there are states whose alignmentment are v1, ..vn, align_aggregate_states will be max(v1, ... vn)
// If there are states whose alignment are v1, ..vn, align_aggregate_states will be max(v1, ... vn)
size_t align_aggregate_states = 1;
bool all_aggregates_has_trivial_destructor = false;

View File

@ -747,6 +747,10 @@ bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain
group_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(all_columns, context));
getRootActions(child, only_types, group_by_elements_actions.back());
}
// std::cerr << "group_by_elements_actions\n";
// for (const auto & elem : group_by_elements_actions) {
// std::cerr << elem->dumpActions() << "\n";
// }
}
return true;
@ -840,8 +844,11 @@ bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain
order_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(all_columns, context));
getRootActions(child, only_types, order_by_elements_actions.back());
}
// std::cerr << "order_by_elements_actions\n";
// for (const auto & elem : order_by_elements_actions) {
// std::cerr << elem->dumpActions() << "\n";
// }
}
return true;
}

View File

@ -1744,19 +1744,11 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
/// TODO better case determination
if (group_by_info && settings.optimize_aggregation_in_order)
{
// std::cerr << "\n\n";
// for (const auto & elem : group_by_info->order_key_prefix_descr)
// std::cerr << elem.column_name << " ";
// std::cerr << "\n\n";
auto & query = getSelectQuery();
SortDescription group_by_descr = getSortDescriptionFromGroupBy(query, *context);
UInt64 limit = getLimitForSorting(query, *context);
///TODO Finish sorting first
// UInt64 limit = getLimitForSorting(query, *context);
// executeOrderOptimized(pipeline, group_by_info, limit, group_by_descr);
pipeline.resize(1);
executeOrderOptimized(pipeline, group_by_info, limit, group_by_descr);
pipeline.addSimpleTransform([&](const Block & header)
{
@ -2153,7 +2145,6 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting
* and then merge them into one sorted stream.
* At this stage we merge per-thread streams into one.
*/
std::cerr << "\nHello optimized order here!\n";
executeOrderOptimized(pipeline, input_sorting_info, limit, output_order_descr);
return;
}

View File

@ -32,7 +32,6 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
column_description.column_name.clear();
}
}
res_key_columns.resize(params->params.keys_size);
res_aggregate_columns.resize(params->params.aggregates_size);
@ -63,11 +62,14 @@ static bool less(const MutableColumns & lhs, const ColumnRawPtrs & rhs, size_t i
}
return false;
}
/// TODO something broken when there are 10'000'000 rows od data need to investigate
/// TODO maybe move all things inside the Aggregator?
void AggregatingInOrderTransform::consume(Chunk chunk)
{
// std::cerr << "\nchunk " << x++ << " of size " << chunk.getNumRows() << "\n";
// sz += chunk.getNumRows();
/// Find the position of last already read key in current chunk.
size_t rows = chunk.getNumRows();
@ -92,7 +94,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
if (!res_block_size)
{
// std::cerr << "\nCreating first state with key " << key_begin << "\n";
// std::cerr << "Creating first state with key " << key_begin << "\n";
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns);
++res_block_size;
}
@ -129,11 +131,11 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
if (key_begin != rows)
{
// std::cerr << "\nFinalizing the last state.\n";
// std::cerr << "Finalizing the last state.\n";
/// We finalize last key aggregation states if a new key found (Not found if high == rows)
params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
// std::cerr << "\nCreating state with key " << key_begin << "\n";
// std::cerr << "Creating state with key " << key_begin << "\n";
/// We create a new state for the new key and update res_key_columns
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns);
++res_block_size;
@ -218,14 +220,16 @@ IProcessor::Status AggregatingInOrderTransform::prepare()
void AggregatingInOrderTransform::generate()
{
// std::cerr << sz << "\n";
// std::cerr << "\nFinalizing the last state in generate().\n";
params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
Block res = params->getHeader().cloneEmpty();
for (size_t i = 0; i < res_key_columns.size(); ++i)
{
res.getByPosition(i).column = std::move(res_key_columns[i]);
}
for (size_t i = 0; i < res_aggregate_columns.size(); ++i)
{
res.getByPosition(i + res_key_columns.size()).column = std::move(res_aggregate_columns[i]);

View File

@ -26,6 +26,8 @@ public:
private:
void generate();
// size_t x = 1;
// size_t sz = 0;
size_t res_block_size{};
MutableColumns res_key_columns;

View File

@ -646,6 +646,27 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
settings,
reader_settings);
}
else if (settings.optimize_aggregation_in_order && query_info.group_by_info)
{
size_t prefix_size = query_info.group_by_info->order_key_prefix_descr.size();
auto order_key_prefix_ast = data.sorting_key_expr_ast->clone();
order_key_prefix_ast->children.resize(prefix_size);
auto syntax_result = SyntaxAnalyzer(context).analyze(order_key_prefix_ast, data.getColumns().getAllPhysical());
auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActions(false);
res = spreadMarkRangesAmongStreamsWithOrder(
std::move(parts_with_ranges),
num_streams,
column_names_to_read,
max_block_size,
settings.use_uncompressed_cache,
query_info,
sorting_key_prefix_expr,
virt_column_names,
settings,
reader_settings);
}
else
{
res = spreadMarkRangesAmongStreams(
@ -827,6 +848,8 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
{
size_t sum_marks = 0;
const InputSortingInfoPtr & input_sorting_info = query_info.input_sorting_info;
const InputSortingInfoPtr & group_by_info = query_info.group_by_info;
size_t adaptive_parts = 0;
std::vector<size_t> sum_marks_in_parts(parts.size());
const auto data_settings = data.getSettings();
@ -969,10 +992,13 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
}
parts.emplace_back(part);
}
/// TODO Better code
if (group_by_info)
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, group_by_info->direction);
else
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_sorting_info->direction);
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_sorting_info->direction);
if (input_sorting_info->direction == 1)
if (group_by_info || input_sorting_info->direction == 1)
{
pipes.emplace_back(std::make_shared<MergeTreeSelectProcessor>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
@ -995,9 +1021,17 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
if (pipes.size() > 1)
{
SortDescription sort_description;
for (size_t j = 0; j < input_sorting_info->order_key_prefix_descr.size(); ++j)
sort_description.emplace_back(data.sorting_key_columns[j],
input_sorting_info->direction, 1);
/// TODO Better code
if (group_by_info)
{
for (size_t j = 0; j < group_by_info->order_key_prefix_descr.size(); ++j)
sort_description.emplace_back(data.sorting_key_columns[j], group_by_info->direction, 1);
}
else
{
for (size_t j = 0; j < input_sorting_info->order_key_prefix_descr.size(); ++j)
sort_description.emplace_back(data.sorting_key_columns[j], input_sorting_info->direction, 1);
}
for (auto & pipe : pipes)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), sorting_key_prefix_expr));

View File

@ -55,7 +55,7 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
int read_direction = required_sort_description.at(0).direction;
size_t prefix_size = std::min(required_sort_description.size(), sorting_key_columns.size());
std::cerr << "Looking for common prefix\n";
for (size_t i = 0; i < prefix_size; ++i)
{
if (forbidden_columns.count(required_sort_description[i].column_name))
@ -72,6 +72,7 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
bool found_function = false;
for (const auto & action : elements_actions[i]->getActions())
{
std::cerr << action.toString() << "\n";
if (action.type != ExpressionAction::APPLY_FUNCTION)
continue;
@ -82,7 +83,7 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
}
else
found_function = true;
std::cerr << "Function was found\n";
if (action.argument_names.size() != 1 || action.argument_names.at(0) != sorting_key_columns[i])
{
current_direction = 0;
@ -95,7 +96,7 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
current_direction = 0;
break;
}
std::cerr << "Function has info about monotonicity\n";
auto monotonicity = func.getMonotonicityForRange(*func.getArgumentTypes().at(0), {}, {});
if (!monotonicity.is_monotonic)
{
@ -104,14 +105,15 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
}
else if (!monotonicity.is_positive)
current_direction *= -1;
std::cerr << "Function is monotonic\n";
}
if (!found_function)
current_direction = 0;
std::cerr << current_direction << " " << read_direction << "\n";
if (!current_direction || (i > 0 && current_direction != read_direction))
break;
std::cerr << "Adding function\n";
if (i == 0)
read_direction = current_direction;

View File

@ -80,7 +80,7 @@ struct SelectQueryInfo
/// We can modify it while reading from storage
mutable InputSortingInfoPtr input_sorting_info;
mutable InputSortingInfoPtr group_by_info;
InputSortingInfoPtr group_by_info;
/// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3)