2019-10-01 16:50:08 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
|
2018-02-13 19:34:15 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeRangeReader.h>
|
2019-10-10 16:30:30 +00:00
|
|
|
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
|
|
|
#include <Storages/MergeTree/IMergeTreeReader.h>
|
2017-04-06 17:21:45 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
|
2021-12-09 10:39:28 +00:00
|
|
|
#include <Storages/MergeTree/RequestResponse.h>
|
2017-12-15 20:48:46 +00:00
|
|
|
#include <Columns/FilterDescription.h>
|
2023-02-07 17:50:31 +00:00
|
|
|
#include <Common/ElapsedTimeProfileEventIncrement.h>
|
2017-07-13 20:58:19 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2018-04-06 13:58:06 +00:00
|
|
|
#include <DataTypes/DataTypeNothing.h>
|
2020-06-30 16:41:43 +00:00
|
|
|
#include <DataTypes/DataTypeNullable.h>
|
2020-11-20 17:23:53 +00:00
|
|
|
#include <DataTypes/DataTypeUUID.h>
|
2021-02-10 14:12:49 +00:00
|
|
|
#include <DataTypes/DataTypeArray.h>
|
|
|
|
#include <Processors/Transforms/AggregatingTransform.h>
|
2023-02-10 14:08:15 +00:00
|
|
|
|
|
|
|
#include <Interpreters/ActionsDAG.h>
|
|
|
|
|
|
|
|
/// For CAST to bool
|
|
|
|
#include <Functions/CastOverloadResolver.h>
|
|
|
|
#include <Planner/PlannerActionsVisitor.h>
|
|
|
|
|
2021-12-09 10:39:28 +00:00
|
|
|
#include <city.h>
|
|
|
|
|
2023-02-07 17:50:31 +00:00
|
|
|
namespace ProfileEvents
|
|
|
|
{
|
|
|
|
extern const Event WaitPrefetchTaskMicroseconds;
|
|
|
|
};
|
|
|
|
|
2017-04-06 17:21:45 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2021-06-18 14:28:52 +00:00
|
|
|
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
|
2017-08-01 13:04:48 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
Fix query cancellation in case of allow_experimental_parallel_reading_from_replicas
CI found one hanged query [1], where the problem was that becase of
allow_experimental_parallel_reading_from_replicas the Cancel packet was
read by receivePartitionMergeTreeReadTaskResponseAssumeLocked() and so
the executor was not cancelled, while the code in
MergeTreeBaseSelectProcessor was not ready for this:
<details>
{
"is_initial_query": 0,
"elapsed": 1727.714379573,
"is_cancelled": 0,
"read_rows": "196577",
"read_bytes": "1179462",
"written_rows": "0",
"written_bytes": "0",
"query": "SELECT `CounterID`, `EventDate` FROM `test`.`hits` ORDER BY `CounterID` DESC, `EventDate` ASC LIMIT 50",
...
}
In logs:
2021.12.31 12:11:55.384735 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> executeQuery: (from [::ffff:127.0.0.1]:58094, initial_query_id: e2966ca5-e836-44ef-8f8e-d1c1b32a>
2021.12.31 12:11:55.454379 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Trace> ContextAccess (default): Access granted: SELECT(EventDate, CounterID) ON test.hits
2021.12.31 12:11:55.457583 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Trace> datasets.hits_v1: Parallel reading from replicas enabled true
2021.12.31 12:11:55.459739 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Trace> InterpreterSelectQuery: FetchColumns -> WithMergeableStateAfterAggregationAndLimit
2021.12.31 12:11:55.471048 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> datasets.hits_v1 (SelectExecutor): Key condition: unknown
2021.12.31 12:11:55.476514 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> datasets.hits_v1 (SelectExecutor): MinMax index condition: unknown
2021.12.31 12:11:55.488302 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> datasets.hits_v1 (SelectExecutor): Selected 2/2 parts by partition key, 2 parts by primary key, >
2021.12.31 12:11:55.494020 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> MergeTreeReverseSelectProcessor: Reading 1 ranges in reverse order from part 201403_20_20_0, app>
2021.12.31 12:11:55.497644 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> MergeTreeReverseSelectProcessor: Reading 138 ranges in reverse order from part 201403_19_19_2, a>
2021.12.31 12:11:55.536372 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Compact part, average mark size is 83886080
2021.12.31 12:11:55.558783 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.563960 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.577512 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.585660 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.613694 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.730597 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.743554 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> MergingSortedTransform: Merge sorted 3 blocks, 65567 rows in 0.243999671 sec., 268717.5754429603>
2021.12.31 12:11:55.744196 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.890923 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.891222 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
</details>
[1]: https://s3.amazonaws.com/clickhouse-test-reports/33341/0685fd99855bacd0bce02507c00a3bd7709eea61/stress_test__address__actions_.html
2022-01-07 16:33:03 +00:00
|
|
|
extern const int QUERY_WAS_CANCELLED;
|
2017-04-06 17:21:45 +00:00
|
|
|
}
|
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
static void injectNonConstVirtualColumns(
|
|
|
|
size_t rows,
|
|
|
|
Block & block,
|
|
|
|
const Names & virtual_columns);
|
|
|
|
|
|
|
|
static void injectPartConstVirtualColumns(
|
|
|
|
size_t rows,
|
|
|
|
Block & block,
|
|
|
|
MergeTreeReadTask * task,
|
|
|
|
const DataTypePtr & partition_value_type,
|
|
|
|
const Names & virtual_columns);
|
|
|
|
|
2017-04-06 17:21:45 +00:00
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
IMergeTreeSelectAlgorithm::IMergeTreeSelectAlgorithm(
|
2019-09-13 15:41:09 +00:00
|
|
|
Block header,
|
2019-08-03 11:02:40 +00:00
|
|
|
const MergeTreeData & storage_,
|
2021-07-09 03:15:41 +00:00
|
|
|
const StorageSnapshotPtr & storage_snapshot_,
|
2021-02-13 22:07:13 +00:00
|
|
|
const PrewhereInfoPtr & prewhere_info_,
|
2023-02-10 15:50:47 +00:00
|
|
|
const ExpressionActionsSettings & actions_settings_,
|
2019-08-03 11:02:40 +00:00
|
|
|
UInt64 max_block_size_rows_,
|
|
|
|
UInt64 preferred_block_size_bytes_,
|
|
|
|
UInt64 preferred_max_column_in_block_size_bytes_,
|
2019-12-18 15:54:45 +00:00
|
|
|
const MergeTreeReaderSettings & reader_settings_,
|
2019-08-03 11:02:40 +00:00
|
|
|
bool use_uncompressed_cache_,
|
2023-02-03 13:34:18 +00:00
|
|
|
const Names & virt_column_names_)
|
2022-11-18 20:09:20 +00:00
|
|
|
: storage(storage_)
|
2021-07-09 03:15:41 +00:00
|
|
|
, storage_snapshot(storage_snapshot_)
|
2021-02-13 22:07:13 +00:00
|
|
|
, prewhere_info(prewhere_info_)
|
2023-02-10 15:50:47 +00:00
|
|
|
, actions_settings(actions_settings_)
|
2023-02-09 14:37:06 +00:00
|
|
|
, prewhere_actions(getPrewhereActions(prewhere_info, actions_settings, reader_settings_.enable_multiple_prewhere_read_steps))
|
2020-06-16 14:25:08 +00:00
|
|
|
, max_block_size_rows(max_block_size_rows_)
|
|
|
|
, preferred_block_size_bytes(preferred_block_size_bytes_)
|
|
|
|
, preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_)
|
|
|
|
, reader_settings(reader_settings_)
|
|
|
|
, use_uncompressed_cache(use_uncompressed_cache_)
|
|
|
|
, virt_column_names(virt_column_names_)
|
2021-04-27 08:15:59 +00:00
|
|
|
, partition_value_type(storage.getPartitionValueType())
|
2023-02-07 17:50:31 +00:00
|
|
|
, owned_uncompressed_cache(use_uncompressed_cache ? storage.getContext()->getUncompressedCache() : nullptr)
|
|
|
|
, owned_mark_cache(storage.getContext()->getMarkCache())
|
2017-04-06 17:21:45 +00:00
|
|
|
{
|
2022-11-18 20:09:20 +00:00
|
|
|
header_without_const_virtual_columns = applyPrewhereActions(std::move(header), prewhere_info);
|
|
|
|
size_t non_const_columns_offset = header_without_const_virtual_columns.columns();
|
|
|
|
injectNonConstVirtualColumns(0, header_without_const_virtual_columns, virt_column_names);
|
2019-10-02 11:57:17 +00:00
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
for (size_t col_num = non_const_columns_offset; col_num < header_without_const_virtual_columns.columns(); ++col_num)
|
|
|
|
non_const_virtual_column_names.emplace_back(header_without_const_virtual_columns.getByPosition(col_num).name);
|
|
|
|
|
|
|
|
result_header = header_without_const_virtual_columns;
|
|
|
|
injectPartConstVirtualColumns(0, result_header, nullptr, partition_value_type, virt_column_names);
|
2022-11-16 17:48:08 +00:00
|
|
|
|
2022-12-28 14:51:16 +00:00
|
|
|
LOG_TEST(log, "PREWHERE actions: {}", (prewhere_actions ? prewhere_actions->dump() : std::string("<nullptr>")));
|
2022-09-05 16:55:00 +00:00
|
|
|
}
|
|
|
|
|
2023-02-10 21:15:27 +00:00
|
|
|
/// Adds a CAST node with the regular name ("CAST(...)") or with the provided name.
|
|
|
|
/// This is different from ActionsDAG::addCast() because it set the name equal to the original name effectively hiding the value before cast,
|
|
|
|
/// but it might be required for further steps with its original uncasted type.
|
|
|
|
static const ActionsDAG::Node & addCast(ActionsDAGPtr dag, const ActionsDAG::Node & node_to_cast, const String & type_name, const String & new_name = {})
|
|
|
|
{
|
|
|
|
Field cast_type_constant_value(type_name);
|
|
|
|
|
|
|
|
ColumnWithTypeAndName column;
|
|
|
|
column.name = calculateConstantActionNodeName(cast_type_constant_value);
|
|
|
|
column.column = DataTypeString().createColumnConst(0, cast_type_constant_value);
|
|
|
|
column.type = std::make_shared<DataTypeString>();
|
|
|
|
|
|
|
|
const auto * cast_type_constant_node = &dag->addColumn(std::move(column));
|
|
|
|
ActionsDAG::NodeRawConstPtrs children = {&node_to_cast, cast_type_constant_node};
|
|
|
|
FunctionOverloadResolverPtr func_builder_cast = CastInternalOverloadResolver<CastType::nonAccurate>::createImpl();
|
|
|
|
|
|
|
|
return dag->addFunction(func_builder_cast, std::move(children), new_name);
|
|
|
|
};
|
2022-03-15 06:34:25 +00:00
|
|
|
|
2023-02-09 14:37:06 +00:00
|
|
|
std::unique_ptr<PrewhereExprInfo> IMergeTreeSelectAlgorithm::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, bool enable_multiple_prewhere_read_steps)
|
2022-09-05 16:55:00 +00:00
|
|
|
{
|
|
|
|
std::unique_ptr<PrewhereExprInfo> prewhere_actions;
|
2021-06-25 14:49:28 +00:00
|
|
|
if (prewhere_info)
|
|
|
|
{
|
2021-06-29 11:53:34 +00:00
|
|
|
prewhere_actions = std::make_unique<PrewhereExprInfo>();
|
2021-06-25 14:49:28 +00:00
|
|
|
|
|
|
|
if (prewhere_info->row_level_filter)
|
2022-06-07 07:03:11 +00:00
|
|
|
{
|
|
|
|
PrewhereExprStep row_level_filter_step
|
|
|
|
{
|
|
|
|
.actions = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter, actions_settings),
|
|
|
|
.column_name = prewhere_info->row_level_column_name,
|
|
|
|
.remove_column = true,
|
2022-06-13 09:59:00 +00:00
|
|
|
.need_filter = true
|
2022-06-07 07:03:11 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
prewhere_actions->steps.emplace_back(std::move(row_level_filter_step));
|
|
|
|
}
|
|
|
|
|
2023-02-10 14:08:15 +00:00
|
|
|
//std::cerr << "ORIGINAL PREWHERE:\n" << prewhere_info->prewhere_actions->dumpDAG() << std::endl;
|
|
|
|
|
2023-02-10 21:15:27 +00:00
|
|
|
struct Step
|
|
|
|
{
|
|
|
|
ActionsDAGPtr actions;
|
|
|
|
String column_name;
|
|
|
|
};
|
|
|
|
std::vector<Step> steps;
|
|
|
|
|
2023-02-09 14:37:06 +00:00
|
|
|
if (enable_multiple_prewhere_read_steps)
|
|
|
|
{
|
2023-02-10 14:08:15 +00:00
|
|
|
/// Find all conjunctions in prewhere expression.
|
2023-02-09 14:37:06 +00:00
|
|
|
auto conjunctions = getConjunctionNodes(
|
|
|
|
prewhere_info->prewhere_actions->tryFindInOutputs(prewhere_info->prewhere_column_name),
|
|
|
|
{});
|
2023-02-07 19:45:40 +00:00
|
|
|
|
2023-02-10 14:08:15 +00:00
|
|
|
/// Save the list of inputs to the original prewhere expression.
|
2023-02-09 14:37:06 +00:00
|
|
|
auto inputs = prewhere_info->prewhere_actions->getInputs();
|
|
|
|
ColumnsWithTypeAndName all_inputs;
|
|
|
|
for (const auto & input : inputs)
|
|
|
|
all_inputs.emplace_back(input->column, input->result_type, input->result_name);
|
2023-02-07 19:45:40 +00:00
|
|
|
|
2023-02-09 14:37:06 +00:00
|
|
|
ActionsDAG::NodeRawConstPtrs all_conjunctions = std::move(conjunctions.allowed);
|
|
|
|
all_conjunctions.insert(all_conjunctions.end(), conjunctions.rejected.begin(), conjunctions.rejected.end());
|
2023-02-07 19:45:40 +00:00
|
|
|
|
2023-02-10 14:08:15 +00:00
|
|
|
/// Make separate DAG for each step
|
2023-02-09 14:37:06 +00:00
|
|
|
for (const auto & conjunction : all_conjunctions)
|
2023-02-08 16:07:23 +00:00
|
|
|
{
|
2023-02-09 14:37:06 +00:00
|
|
|
auto result_name = conjunction->result_name;
|
|
|
|
auto step_dag = ActionsDAG::cloneActionsForConjunction({conjunction}, all_inputs);
|
|
|
|
const auto & result_node = step_dag->findInOutputs(result_name);
|
2023-02-10 14:08:15 +00:00
|
|
|
/// Cast result to UInt8 if needed
|
2023-02-09 14:37:06 +00:00
|
|
|
if (result_node.result_type->getTypeId() != TypeIndex::UInt8)
|
|
|
|
{
|
2023-02-10 21:15:27 +00:00
|
|
|
const auto & cast_node = addCast(step_dag, result_node, "UInt8");
|
2023-02-10 14:08:15 +00:00
|
|
|
|
2023-02-09 14:37:06 +00:00
|
|
|
step_dag->addOrReplaceInOutputs(cast_node);
|
|
|
|
result_name = cast_node.result_name;
|
|
|
|
}
|
|
|
|
step_dag->removeUnusedActions(Names{result_name}, true, true);
|
|
|
|
steps.emplace_back(Step{step_dag, result_name});
|
2023-02-08 16:07:23 +00:00
|
|
|
}
|
2023-02-10 21:15:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (steps.size() > 1)
|
|
|
|
{
|
|
|
|
/// Save the list of outputs from the original prewhere expression.
|
|
|
|
auto original_outputs = prewhere_info->prewhere_actions->getOutputs();
|
|
|
|
std::unordered_map<String, DataTypePtr> outputs_required_by_next_steps;
|
|
|
|
for (const auto & output : original_outputs)
|
|
|
|
outputs_required_by_next_steps[output->result_name] = output->result_type;
|
2023-02-07 19:45:40 +00:00
|
|
|
|
2023-02-09 14:37:06 +00:00
|
|
|
/// "Rename" the last step result to the combined prewhere column name, because in fact it will be AND of all step results
|
2023-02-10 14:08:15 +00:00
|
|
|
if (steps.back().column_name != prewhere_info->prewhere_column_name &&
|
|
|
|
outputs_required_by_next_steps.contains(prewhere_info->prewhere_column_name))
|
|
|
|
{
|
2023-02-10 21:15:27 +00:00
|
|
|
const auto & prewhere_result_node = addCast(
|
2023-02-10 14:08:15 +00:00
|
|
|
steps.back().actions,
|
|
|
|
steps.back().actions->findInOutputs(steps.back().column_name),
|
|
|
|
outputs_required_by_next_steps[prewhere_info->prewhere_column_name]->getName(),
|
|
|
|
prewhere_info->prewhere_column_name);
|
|
|
|
|
|
|
|
steps.back().actions->addOrReplaceInOutputs(prewhere_result_node);
|
|
|
|
}
|
2023-02-07 23:07:19 +00:00
|
|
|
|
2023-02-09 14:37:06 +00:00
|
|
|
const size_t steps_before_prewhere = prewhere_actions->steps.size();
|
|
|
|
prewhere_actions->steps.resize(steps_before_prewhere + steps.size());
|
2023-02-07 19:45:40 +00:00
|
|
|
|
2023-02-09 14:37:06 +00:00
|
|
|
/// Check the steps in the reverse order so that we can maintain the list of outputs used by the next steps
|
|
|
|
/// and preserve them in the current step.
|
|
|
|
for (ssize_t i = steps.size() - 1; i >= 0; --i)
|
|
|
|
{
|
|
|
|
const auto & step = steps[i];
|
|
|
|
|
|
|
|
/// Return the condition column
|
|
|
|
Names step_outputs{step.column_name};
|
|
|
|
const bool remove_column = !outputs_required_by_next_steps.contains(step.column_name);
|
|
|
|
/// Preserve outputs computed at this step that are used by the next steps
|
|
|
|
for (const auto & output : outputs_required_by_next_steps)
|
2023-02-10 14:08:15 +00:00
|
|
|
if (step.actions->tryRestoreColumn(output.first))
|
|
|
|
step_outputs.emplace_back(output.first);
|
2023-02-09 14:37:06 +00:00
|
|
|
step.actions->removeUnusedActions(step_outputs, true, true);
|
|
|
|
|
|
|
|
/// Add current step columns as outputs that should be preserved from previous steps
|
|
|
|
for (const auto & input :step.actions->getInputs())
|
2023-02-10 14:08:15 +00:00
|
|
|
outputs_required_by_next_steps[input->result_name] = input->result_type;
|
2023-02-09 14:37:06 +00:00
|
|
|
|
|
|
|
//std::cerr << conjunction->result_name << "\n";
|
2023-02-10 14:08:15 +00:00
|
|
|
//std::cerr << "STEP " << i << ":\n" << step.actions->dumpDAG() << "\n";
|
2023-02-09 14:37:06 +00:00
|
|
|
|
|
|
|
PrewhereExprStep prewhere_step
|
|
|
|
{
|
|
|
|
.actions = std::make_shared<ExpressionActions>(step.actions, actions_settings),
|
|
|
|
.column_name = step.column_name,
|
|
|
|
.remove_column = remove_column,
|
|
|
|
.need_filter = false
|
|
|
|
};
|
|
|
|
prewhere_actions->steps[steps_before_prewhere + i] = std::move(prewhere_step);
|
|
|
|
}
|
2023-02-07 19:45:40 +00:00
|
|
|
|
2023-02-09 14:37:06 +00:00
|
|
|
prewhere_actions->steps.back().remove_column = prewhere_info->remove_prewhere_column;
|
|
|
|
prewhere_actions->steps.back().need_filter = prewhere_info->need_filter;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2023-02-07 19:45:40 +00:00
|
|
|
PrewhereExprStep prewhere_step
|
|
|
|
{
|
2023-02-09 14:37:06 +00:00
|
|
|
.actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings),
|
|
|
|
.column_name = prewhere_info->prewhere_column_name,
|
|
|
|
.remove_column = prewhere_info->remove_prewhere_column,
|
|
|
|
.need_filter = prewhere_info->need_filter
|
2023-02-07 19:45:40 +00:00
|
|
|
};
|
|
|
|
|
2023-02-09 14:37:06 +00:00
|
|
|
prewhere_actions->steps.emplace_back(std::move(prewhere_step));
|
|
|
|
}
|
2021-06-25 14:49:28 +00:00
|
|
|
}
|
2022-09-05 16:55:00 +00:00
|
|
|
|
|
|
|
return prewhere_actions;
|
2017-04-06 17:21:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
bool IMergeTreeSelectAlgorithm::getNewTask()
|
2021-12-09 10:39:28 +00:00
|
|
|
{
|
2023-02-03 13:34:18 +00:00
|
|
|
if (getNewTaskImpl())
|
2021-12-09 10:39:28 +00:00
|
|
|
{
|
2023-02-03 13:34:18 +00:00
|
|
|
finalizeNewTask();
|
2021-12-09 10:39:28 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
ChunkAndProgress IMergeTreeSelectAlgorithm::read()
|
2017-04-06 17:21:45 +00:00
|
|
|
{
|
2022-11-16 14:10:56 +00:00
|
|
|
size_t num_read_rows = 0;
|
|
|
|
size_t num_read_bytes = 0;
|
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
while (!is_cancelled)
|
2017-04-06 17:21:45 +00:00
|
|
|
{
|
Fix query cancellation in case of allow_experimental_parallel_reading_from_replicas
CI found one hanged query [1], where the problem was that becase of
allow_experimental_parallel_reading_from_replicas the Cancel packet was
read by receivePartitionMergeTreeReadTaskResponseAssumeLocked() and so
the executor was not cancelled, while the code in
MergeTreeBaseSelectProcessor was not ready for this:
<details>
{
"is_initial_query": 0,
"elapsed": 1727.714379573,
"is_cancelled": 0,
"read_rows": "196577",
"read_bytes": "1179462",
"written_rows": "0",
"written_bytes": "0",
"query": "SELECT `CounterID`, `EventDate` FROM `test`.`hits` ORDER BY `CounterID` DESC, `EventDate` ASC LIMIT 50",
...
}
In logs:
2021.12.31 12:11:55.384735 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> executeQuery: (from [::ffff:127.0.0.1]:58094, initial_query_id: e2966ca5-e836-44ef-8f8e-d1c1b32a>
2021.12.31 12:11:55.454379 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Trace> ContextAccess (default): Access granted: SELECT(EventDate, CounterID) ON test.hits
2021.12.31 12:11:55.457583 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Trace> datasets.hits_v1: Parallel reading from replicas enabled true
2021.12.31 12:11:55.459739 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Trace> InterpreterSelectQuery: FetchColumns -> WithMergeableStateAfterAggregationAndLimit
2021.12.31 12:11:55.471048 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> datasets.hits_v1 (SelectExecutor): Key condition: unknown
2021.12.31 12:11:55.476514 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> datasets.hits_v1 (SelectExecutor): MinMax index condition: unknown
2021.12.31 12:11:55.488302 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> datasets.hits_v1 (SelectExecutor): Selected 2/2 parts by partition key, 2 parts by primary key, >
2021.12.31 12:11:55.494020 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> MergeTreeReverseSelectProcessor: Reading 1 ranges in reverse order from part 201403_20_20_0, app>
2021.12.31 12:11:55.497644 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> MergeTreeReverseSelectProcessor: Reading 138 ranges in reverse order from part 201403_19_19_2, a>
2021.12.31 12:11:55.536372 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Compact part, average mark size is 83886080
2021.12.31 12:11:55.558783 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.563960 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.577512 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.585660 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.613694 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.730597 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.743554 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> MergingSortedTransform: Merge sorted 3 blocks, 65567 rows in 0.243999671 sec., 268717.5754429603>
2021.12.31 12:11:55.744196 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.890923 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.891222 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
</details>
[1]: https://s3.amazonaws.com/clickhouse-test-reports/33341/0685fd99855bacd0bce02507c00a3bd7709eea61/stress_test__address__actions_.html
2022-01-07 16:33:03 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
if ((!task || task->isFinished()) && !getNewTask())
|
2022-11-16 14:10:56 +00:00
|
|
|
break;
|
Fix query cancellation in case of allow_experimental_parallel_reading_from_replicas
CI found one hanged query [1], where the problem was that becase of
allow_experimental_parallel_reading_from_replicas the Cancel packet was
read by receivePartitionMergeTreeReadTaskResponseAssumeLocked() and so
the executor was not cancelled, while the code in
MergeTreeBaseSelectProcessor was not ready for this:
<details>
{
"is_initial_query": 0,
"elapsed": 1727.714379573,
"is_cancelled": 0,
"read_rows": "196577",
"read_bytes": "1179462",
"written_rows": "0",
"written_bytes": "0",
"query": "SELECT `CounterID`, `EventDate` FROM `test`.`hits` ORDER BY `CounterID` DESC, `EventDate` ASC LIMIT 50",
...
}
In logs:
2021.12.31 12:11:55.384735 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> executeQuery: (from [::ffff:127.0.0.1]:58094, initial_query_id: e2966ca5-e836-44ef-8f8e-d1c1b32a>
2021.12.31 12:11:55.454379 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Trace> ContextAccess (default): Access granted: SELECT(EventDate, CounterID) ON test.hits
2021.12.31 12:11:55.457583 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Trace> datasets.hits_v1: Parallel reading from replicas enabled true
2021.12.31 12:11:55.459739 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Trace> InterpreterSelectQuery: FetchColumns -> WithMergeableStateAfterAggregationAndLimit
2021.12.31 12:11:55.471048 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> datasets.hits_v1 (SelectExecutor): Key condition: unknown
2021.12.31 12:11:55.476514 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> datasets.hits_v1 (SelectExecutor): MinMax index condition: unknown
2021.12.31 12:11:55.488302 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> datasets.hits_v1 (SelectExecutor): Selected 2/2 parts by partition key, 2 parts by primary key, >
2021.12.31 12:11:55.494020 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> MergeTreeReverseSelectProcessor: Reading 1 ranges in reverse order from part 201403_20_20_0, app>
2021.12.31 12:11:55.497644 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> MergeTreeReverseSelectProcessor: Reading 138 ranges in reverse order from part 201403_19_19_2, a>
2021.12.31 12:11:55.536372 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Compact part, average mark size is 83886080
2021.12.31 12:11:55.558783 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.563960 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.577512 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.585660 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.613694 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.730597 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.743554 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> MergingSortedTransform: Merge sorted 3 blocks, 65567 rows in 0.243999671 sec., 268717.5754429603>
2021.12.31 12:11:55.744196 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.890923 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.891222 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
</details>
[1]: https://s3.amazonaws.com/clickhouse-test-reports/33341/0685fd99855bacd0bce02507c00a3bd7709eea61/stress_test__address__actions_.html
2022-01-07 16:33:03 +00:00
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
|
|
|
/// See MergeTreeBaseSelectProcessor::getTaskFromBuffer()
|
|
|
|
if (e.code() == ErrorCodes::QUERY_WAS_CANCELLED)
|
2022-11-16 14:10:56 +00:00
|
|
|
break;
|
Fix query cancellation in case of allow_experimental_parallel_reading_from_replicas
CI found one hanged query [1], where the problem was that becase of
allow_experimental_parallel_reading_from_replicas the Cancel packet was
read by receivePartitionMergeTreeReadTaskResponseAssumeLocked() and so
the executor was not cancelled, while the code in
MergeTreeBaseSelectProcessor was not ready for this:
<details>
{
"is_initial_query": 0,
"elapsed": 1727.714379573,
"is_cancelled": 0,
"read_rows": "196577",
"read_bytes": "1179462",
"written_rows": "0",
"written_bytes": "0",
"query": "SELECT `CounterID`, `EventDate` FROM `test`.`hits` ORDER BY `CounterID` DESC, `EventDate` ASC LIMIT 50",
...
}
In logs:
2021.12.31 12:11:55.384735 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> executeQuery: (from [::ffff:127.0.0.1]:58094, initial_query_id: e2966ca5-e836-44ef-8f8e-d1c1b32a>
2021.12.31 12:11:55.454379 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Trace> ContextAccess (default): Access granted: SELECT(EventDate, CounterID) ON test.hits
2021.12.31 12:11:55.457583 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Trace> datasets.hits_v1: Parallel reading from replicas enabled true
2021.12.31 12:11:55.459739 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Trace> InterpreterSelectQuery: FetchColumns -> WithMergeableStateAfterAggregationAndLimit
2021.12.31 12:11:55.471048 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> datasets.hits_v1 (SelectExecutor): Key condition: unknown
2021.12.31 12:11:55.476514 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> datasets.hits_v1 (SelectExecutor): MinMax index condition: unknown
2021.12.31 12:11:55.488302 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> datasets.hits_v1 (SelectExecutor): Selected 2/2 parts by partition key, 2 parts by primary key, >
2021.12.31 12:11:55.494020 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> MergeTreeReverseSelectProcessor: Reading 1 ranges in reverse order from part 201403_20_20_0, app>
2021.12.31 12:11:55.497644 [ 101532 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> MergeTreeReverseSelectProcessor: Reading 138 ranges in reverse order from part 201403_19_19_2, a>
2021.12.31 12:11:55.536372 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Compact part, average mark size is 83886080
2021.12.31 12:11:55.558783 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.563960 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.577512 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.585660 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.613694 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.730597 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.743554 [ 171701 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Debug> MergingSortedTransform: Merge sorted 3 blocks, 65567 rows in 0.243999671 sec., 268717.5754429603>
2021.12.31 12:11:55.744196 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.890923 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
2021.12.31 12:11:55.891222 [ 170601 ] {7e5f551e-5960-4fda-9447-9bfdae4660c1} <Test> MergeTreeBaseSelectProcessor: Reading from Wide part, average mark size is 3069
</details>
[1]: https://s3.amazonaws.com/clickhouse-test-reports/33341/0685fd99855bacd0bce02507c00a3bd7709eea61/stress_test__address__actions_.html
2022-01-07 16:33:03 +00:00
|
|
|
throw;
|
|
|
|
}
|
2017-04-06 17:21:45 +00:00
|
|
|
|
2019-09-13 15:41:09 +00:00
|
|
|
auto res = readFromPart();
|
2017-04-06 17:21:45 +00:00
|
|
|
|
2022-07-17 18:41:17 +00:00
|
|
|
if (res.row_count)
|
2019-09-13 15:41:09 +00:00
|
|
|
{
|
2022-07-17 18:41:17 +00:00
|
|
|
injectVirtualColumns(res.block, res.row_count, task.get(), partition_value_type, virt_column_names);
|
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
/// Reorder the columns according to result_header
|
2022-07-17 18:41:17 +00:00
|
|
|
Columns ordered_columns;
|
2022-11-18 20:09:20 +00:00
|
|
|
ordered_columns.reserve(result_header.columns());
|
|
|
|
for (size_t i = 0; i < result_header.columns(); ++i)
|
2022-07-17 18:41:17 +00:00
|
|
|
{
|
2022-11-18 20:09:20 +00:00
|
|
|
auto name = result_header.getByPosition(i).name;
|
2022-07-17 18:41:17 +00:00
|
|
|
ordered_columns.push_back(res.block.getByName(name).column);
|
|
|
|
}
|
|
|
|
|
2022-11-16 14:10:56 +00:00
|
|
|
/// Account a progress from previous empty chunks.
|
|
|
|
res.num_read_rows += num_read_rows;
|
|
|
|
res.num_read_bytes += num_read_bytes;
|
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
return ChunkAndProgress{
|
2022-11-15 21:23:18 +00:00
|
|
|
.chunk = Chunk(ordered_columns, res.row_count),
|
|
|
|
.num_read_rows = res.num_read_rows,
|
|
|
|
.num_read_bytes = res.num_read_bytes};
|
2019-09-13 15:41:09 +00:00
|
|
|
}
|
2022-11-16 14:10:56 +00:00
|
|
|
else
|
|
|
|
{
|
|
|
|
num_read_rows += res.num_read_rows;
|
|
|
|
num_read_bytes += res.num_read_bytes;
|
|
|
|
}
|
2017-04-10 14:06:44 +00:00
|
|
|
}
|
|
|
|
|
2022-11-16 14:10:56 +00:00
|
|
|
return {Chunk(), num_read_rows, num_read_bytes};
|
2017-04-06 17:21:45 +00:00
|
|
|
}
|
|
|
|
|
2023-02-07 17:50:31 +00:00
|
|
|
void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForCurrentTask(
|
|
|
|
const StorageMetadataPtr & metadata_snapshot,
|
|
|
|
const IMergeTreeReader::ValueSizeMap & value_size_map,
|
|
|
|
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
|
|
|
|
{
|
|
|
|
if (!task)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no task");
|
|
|
|
|
|
|
|
if (task->reader.valid())
|
|
|
|
{
|
|
|
|
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::WaitPrefetchTaskMicroseconds);
|
|
|
|
reader = task->reader.get();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
reader = task->data_part->getReader(
|
|
|
|
task->task_columns.columns, metadata_snapshot, task->mark_ranges,
|
|
|
|
owned_uncompressed_cache.get(), owned_mark_cache.get(),
|
|
|
|
reader_settings, value_size_map, profile_callback);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!task->pre_reader_for_step.empty())
|
|
|
|
{
|
|
|
|
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::WaitPrefetchTaskMicroseconds);
|
|
|
|
pre_reader_for_step.clear();
|
|
|
|
for (auto & pre_reader : task->pre_reader_for_step)
|
|
|
|
pre_reader_for_step.push_back(pre_reader.get());
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
initializeMergeTreePreReadersForPart(
|
|
|
|
task->data_part, task->task_columns, metadata_snapshot,
|
|
|
|
task->mark_ranges, value_size_map, profile_callback);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForPart(
|
2022-07-17 18:41:17 +00:00
|
|
|
MergeTreeData::DataPartPtr & data_part,
|
2023-02-07 17:50:31 +00:00
|
|
|
const MergeTreeReadTaskColumns & task_columns,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot,
|
|
|
|
const MarkRanges & mark_ranges,
|
|
|
|
const IMergeTreeReader::ValueSizeMap & value_size_map,
|
2022-07-17 18:41:17 +00:00
|
|
|
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
|
|
|
|
{
|
2023-02-07 17:50:31 +00:00
|
|
|
reader = data_part->getReader(
|
|
|
|
task_columns.columns, metadata_snapshot, mark_ranges,
|
|
|
|
owned_uncompressed_cache.get(), owned_mark_cache.get(),
|
|
|
|
reader_settings, value_size_map, profile_callback);
|
|
|
|
|
|
|
|
initializeMergeTreePreReadersForPart(
|
|
|
|
data_part, task_columns, metadata_snapshot,
|
|
|
|
mark_ranges, value_size_map, profile_callback);
|
|
|
|
}
|
2022-07-17 18:41:17 +00:00
|
|
|
|
2023-02-07 17:50:31 +00:00
|
|
|
void IMergeTreeSelectAlgorithm::initializeMergeTreePreReadersForPart(
|
|
|
|
MergeTreeData::DataPartPtr & data_part,
|
|
|
|
const MergeTreeReadTaskColumns & task_columns,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot,
|
|
|
|
const MarkRanges & mark_ranges,
|
|
|
|
const IMergeTreeReader::ValueSizeMap & value_size_map,
|
|
|
|
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
|
|
|
|
{
|
2022-07-17 18:41:17 +00:00
|
|
|
pre_reader_for_step.clear();
|
|
|
|
|
|
|
|
/// Add lightweight delete filtering step
|
2022-07-21 19:50:19 +00:00
|
|
|
if (reader_settings.apply_deleted_mask && data_part->hasLightweightDelete())
|
2022-07-17 18:41:17 +00:00
|
|
|
{
|
2023-02-07 17:50:31 +00:00
|
|
|
pre_reader_for_step.push_back(
|
|
|
|
data_part->getReader(
|
|
|
|
{LightweightDeleteDescription::FILTER_COLUMN}, metadata_snapshot,
|
|
|
|
mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(),
|
|
|
|
reader_settings, value_size_map, profile_callback));
|
2022-07-17 18:41:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (prewhere_info)
|
|
|
|
{
|
|
|
|
for (const auto & pre_columns_per_step : task_columns.pre_columns)
|
|
|
|
{
|
2023-02-07 17:50:31 +00:00
|
|
|
pre_reader_for_step.push_back(
|
|
|
|
data_part->getReader(
|
|
|
|
pre_columns_per_step, metadata_snapshot, mark_ranges,
|
|
|
|
owned_uncompressed_cache.get(), owned_mark_cache.get(),
|
|
|
|
reader_settings, value_size_map, profile_callback));
|
2022-07-17 18:41:17 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2017-04-06 17:21:45 +00:00
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
void IMergeTreeSelectAlgorithm::initializeRangeReaders(MergeTreeReadTask & current_task)
|
2019-07-18 14:41:11 +00:00
|
|
|
{
|
2022-09-05 16:55:00 +00:00
|
|
|
return initializeRangeReadersImpl(
|
|
|
|
current_task.range_reader, current_task.pre_range_readers, prewhere_info, prewhere_actions.get(),
|
|
|
|
reader.get(), current_task.data_part->hasLightweightDelete(), reader_settings,
|
|
|
|
pre_reader_for_step, lightweight_delete_filter_step, non_const_virtual_column_names);
|
|
|
|
}
|
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
void IMergeTreeSelectAlgorithm::initializeRangeReadersImpl(
|
2022-09-05 16:55:00 +00:00
|
|
|
MergeTreeRangeReader & range_reader, std::deque<MergeTreeRangeReader> & pre_range_readers,
|
|
|
|
PrewhereInfoPtr prewhere_info, const PrewhereExprInfo * prewhere_actions,
|
|
|
|
IMergeTreeReader * reader, bool has_lightweight_delete, const MergeTreeReaderSettings & reader_settings,
|
|
|
|
const std::vector<std::unique_ptr<IMergeTreeReader>> & pre_reader_for_step,
|
|
|
|
const PrewhereExprStep & lightweight_delete_filter_step, const Names & non_const_virtual_column_names)
|
|
|
|
{
|
|
|
|
MergeTreeRangeReader * prev_reader = nullptr;
|
2022-06-07 07:03:11 +00:00
|
|
|
bool last_reader = false;
|
2022-07-12 11:25:14 +00:00
|
|
|
size_t pre_readers_shift = 0;
|
|
|
|
|
2022-07-17 18:41:17 +00:00
|
|
|
/// Add filtering step with lightweight delete mask
|
2022-09-05 16:55:00 +00:00
|
|
|
if (reader_settings.apply_deleted_mask && has_lightweight_delete)
|
2022-07-12 11:25:14 +00:00
|
|
|
{
|
2022-09-05 16:55:00 +00:00
|
|
|
MergeTreeRangeReader pre_range_reader(pre_reader_for_step[0].get(), prev_reader, &lightweight_delete_filter_step, last_reader, non_const_virtual_column_names);
|
|
|
|
pre_range_readers.push_back(std::move(pre_range_reader));
|
|
|
|
prev_reader = &pre_range_readers.back();
|
2022-07-12 11:25:14 +00:00
|
|
|
pre_readers_shift++;
|
|
|
|
}
|
2022-06-07 07:03:11 +00:00
|
|
|
|
|
|
|
if (prewhere_info)
|
|
|
|
{
|
2022-07-12 11:25:14 +00:00
|
|
|
if (prewhere_actions->steps.size() + pre_readers_shift != pre_reader_for_step.size())
|
2022-09-05 16:55:00 +00:00
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"PREWHERE steps count mismatch, actions: {}, readers: {}",
|
|
|
|
prewhere_actions->steps.size(), pre_reader_for_step.size());
|
|
|
|
}
|
2022-06-13 13:00:26 +00:00
|
|
|
|
2022-06-07 07:03:11 +00:00
|
|
|
for (size_t i = 0; i < prewhere_actions->steps.size(); ++i)
|
|
|
|
{
|
|
|
|
last_reader = reader->getColumns().empty() && (i + 1 == prewhere_actions->steps.size());
|
|
|
|
|
2022-09-05 16:55:00 +00:00
|
|
|
MergeTreeRangeReader current_reader(pre_reader_for_step[i + pre_readers_shift].get(), prev_reader, &prewhere_actions->steps[i], last_reader, non_const_virtual_column_names);
|
|
|
|
|
|
|
|
pre_range_readers.push_back(std::move(current_reader));
|
|
|
|
prev_reader = &pre_range_readers.back();
|
2022-06-07 07:03:11 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!last_reader)
|
|
|
|
{
|
2022-09-05 16:55:00 +00:00
|
|
|
range_reader = MergeTreeRangeReader(reader, prev_reader, nullptr, true, non_const_virtual_column_names);
|
2022-06-07 07:03:11 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2022-06-13 13:00:26 +00:00
|
|
|
/// If all columns are read by pre_range_readers than move last pre_range_reader into range_reader
|
2022-09-05 16:55:00 +00:00
|
|
|
range_reader = std::move(pre_range_readers.back());
|
|
|
|
pre_range_readers.pop_back();
|
2022-06-07 07:03:11 +00:00
|
|
|
}
|
2019-07-18 14:41:11 +00:00
|
|
|
}
|
|
|
|
|
2022-05-13 19:00:44 +00:00
|
|
|
static UInt64 estimateNumRows(const MergeTreeReadTask & current_task, UInt64 current_preferred_block_size_bytes,
|
2022-12-05 20:10:31 +00:00
|
|
|
UInt64 current_max_block_size_rows, UInt64 current_preferred_max_column_in_block_size_bytes, double min_filtration_ratio, size_t min_marks_to_read)
|
2022-05-13 19:00:44 +00:00
|
|
|
{
|
|
|
|
const MergeTreeRangeReader & current_reader = current_task.range_reader;
|
|
|
|
|
|
|
|
if (!current_task.size_predictor)
|
|
|
|
return static_cast<size_t>(current_max_block_size_rows);
|
|
|
|
|
|
|
|
/// Calculates number of rows will be read using preferred_block_size_bytes.
|
|
|
|
/// Can't be less than avg_index_granularity.
|
|
|
|
size_t rows_to_read = current_task.size_predictor->estimateNumRows(current_preferred_block_size_bytes);
|
|
|
|
if (!rows_to_read)
|
|
|
|
return rows_to_read;
|
|
|
|
auto total_row_in_current_granule = current_reader.numRowsInCurrentGranule();
|
|
|
|
rows_to_read = std::max(total_row_in_current_granule, rows_to_read);
|
|
|
|
|
|
|
|
if (current_preferred_max_column_in_block_size_bytes)
|
|
|
|
{
|
|
|
|
/// Calculates number of rows will be read using preferred_max_column_in_block_size_bytes.
|
|
|
|
auto rows_to_read_for_max_size_column
|
|
|
|
= current_task.size_predictor->estimateNumRowsForMaxSizeColumn(current_preferred_max_column_in_block_size_bytes);
|
|
|
|
double filtration_ratio = std::max(min_filtration_ratio, 1.0 - current_task.size_predictor->filtered_rows_ratio);
|
|
|
|
auto rows_to_read_for_max_size_column_with_filtration
|
|
|
|
= static_cast<size_t>(rows_to_read_for_max_size_column / filtration_ratio);
|
|
|
|
|
|
|
|
/// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than current_index_granularity.
|
|
|
|
rows_to_read = std::min(rows_to_read, rows_to_read_for_max_size_column_with_filtration);
|
|
|
|
}
|
|
|
|
|
|
|
|
auto unread_rows_in_current_granule = current_reader.numPendingRowsInCurrentGranule();
|
|
|
|
if (unread_rows_in_current_granule >= rows_to_read)
|
|
|
|
return rows_to_read;
|
|
|
|
|
|
|
|
const MergeTreeIndexGranularity & index_granularity = current_task.data_part->index_granularity;
|
|
|
|
|
2022-12-05 20:10:31 +00:00
|
|
|
return index_granularity.countMarksForRows(current_reader.currentMark(), rows_to_read, current_reader.numReadRowsInCurrentGranule(), min_marks_to_read);
|
2022-05-13 19:00:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
IMergeTreeSelectAlgorithm::BlockAndProgress IMergeTreeSelectAlgorithm::readFromPartImpl()
|
2017-04-06 17:21:45 +00:00
|
|
|
{
|
2017-04-07 11:43:24 +00:00
|
|
|
if (task->size_predictor)
|
|
|
|
task->size_predictor->startBlock();
|
|
|
|
|
2019-06-14 19:11:41 +00:00
|
|
|
const UInt64 current_max_block_size_rows = max_block_size_rows;
|
|
|
|
const UInt64 current_preferred_block_size_bytes = preferred_block_size_bytes;
|
|
|
|
const UInt64 current_preferred_max_column_in_block_size_bytes = preferred_max_column_in_block_size_bytes;
|
2017-06-30 16:28:27 +00:00
|
|
|
const double min_filtration_ratio = 0.00001;
|
2017-06-21 17:19:35 +00:00
|
|
|
|
2022-05-13 19:00:44 +00:00
|
|
|
UInt64 recommended_rows = estimateNumRows(*task, current_preferred_block_size_bytes,
|
2022-12-05 20:10:31 +00:00
|
|
|
current_max_block_size_rows, current_preferred_max_column_in_block_size_bytes, min_filtration_ratio, min_marks_to_read);
|
2022-04-18 08:18:31 +00:00
|
|
|
UInt64 rows_to_read = std::max(static_cast<UInt64>(1), std::min(current_max_block_size_rows, recommended_rows));
|
2017-04-06 17:21:45 +00:00
|
|
|
|
2018-02-20 11:45:58 +00:00
|
|
|
auto read_result = task->range_reader.read(rows_to_read, task->mark_ranges);
|
2017-04-06 17:21:45 +00:00
|
|
|
|
2018-02-20 13:59:19 +00:00
|
|
|
/// All rows were filtered. Repeat.
|
2019-09-23 19:22:02 +00:00
|
|
|
if (read_result.num_rows == 0)
|
|
|
|
read_result.columns.clear();
|
2018-02-20 13:59:19 +00:00
|
|
|
|
2020-04-22 06:22:14 +00:00
|
|
|
const auto & sample_block = task->range_reader.getSampleBlock();
|
2019-09-26 17:29:41 +00:00
|
|
|
if (read_result.num_rows != 0 && sample_block.columns() != read_result.columns.size())
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent number of columns got from MergeTreeRangeReader. "
|
|
|
|
"Have {} in sample block and {} columns in list",
|
|
|
|
toString(sample_block.columns()), toString(read_result.columns.size()));
|
2019-09-26 17:29:41 +00:00
|
|
|
|
|
|
|
/// TODO: check columns have the same types as in header.
|
2018-02-20 13:59:19 +00:00
|
|
|
|
2019-09-23 19:22:02 +00:00
|
|
|
UInt64 num_filtered_rows = read_result.numReadRows() - read_result.num_rows;
|
2018-02-20 14:26:22 +00:00
|
|
|
|
2022-11-15 21:23:18 +00:00
|
|
|
size_t num_read_rows = read_result.numReadRows();
|
|
|
|
size_t num_read_bytes = read_result.numBytesRead();
|
2017-04-06 17:21:45 +00:00
|
|
|
|
2018-02-20 11:45:58 +00:00
|
|
|
if (task->size_predictor)
|
|
|
|
{
|
2018-03-05 14:41:43 +00:00
|
|
|
task->size_predictor->updateFilteredRowsRation(read_result.numReadRows(), num_filtered_rows);
|
2017-08-01 13:04:48 +00:00
|
|
|
|
2019-09-23 19:22:02 +00:00
|
|
|
if (!read_result.columns.empty())
|
2019-09-26 17:29:41 +00:00
|
|
|
task->size_predictor->update(sample_block, read_result.columns, read_result.num_rows);
|
2018-02-20 11:45:58 +00:00
|
|
|
}
|
2017-04-06 17:21:45 +00:00
|
|
|
|
2022-11-16 14:10:56 +00:00
|
|
|
Block block;
|
|
|
|
if (read_result.num_rows != 0)
|
|
|
|
block = sample_block.cloneWithColumns(read_result.columns);
|
2019-10-01 16:50:08 +00:00
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
BlockAndProgress res = {
|
2022-11-16 14:10:56 +00:00
|
|
|
.block = std::move(block),
|
2022-11-15 21:23:18 +00:00
|
|
|
.row_count = read_result.num_rows,
|
|
|
|
.num_read_rows = num_read_rows,
|
|
|
|
.num_read_bytes = num_read_bytes };
|
2017-04-10 17:10:33 +00:00
|
|
|
|
2022-07-17 18:41:17 +00:00
|
|
|
return res;
|
2017-04-06 17:21:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
IMergeTreeSelectAlgorithm::BlockAndProgress IMergeTreeSelectAlgorithm::readFromPart()
|
2019-07-18 14:41:11 +00:00
|
|
|
{
|
|
|
|
if (!task->range_reader.isInitialized())
|
|
|
|
initializeRangeReaders(*task);
|
|
|
|
|
|
|
|
return readFromPartImpl();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-10-31 11:32:24 +00:00
|
|
|
namespace
|
|
|
|
{
|
|
|
|
struct VirtualColumnsInserter
|
|
|
|
{
|
2022-07-17 18:41:17 +00:00
|
|
|
explicit VirtualColumnsInserter(Block & block_) : block(block_) {}
|
|
|
|
|
|
|
|
bool columnExists(const String & name) const { return block.has(name); }
|
|
|
|
|
|
|
|
void insertUInt8Column(const ColumnPtr & column, const String & name)
|
|
|
|
{
|
|
|
|
block.insert({column, std::make_shared<DataTypeUInt8>(), name});
|
|
|
|
}
|
|
|
|
|
|
|
|
void insertUInt64Column(const ColumnPtr & column, const String & name)
|
|
|
|
{
|
|
|
|
block.insert({column, std::make_shared<DataTypeUInt64>(), name});
|
|
|
|
}
|
|
|
|
|
|
|
|
void insertUUIDColumn(const ColumnPtr & column, const String & name)
|
|
|
|
{
|
|
|
|
block.insert({column, std::make_shared<DataTypeUUID>(), name});
|
|
|
|
}
|
|
|
|
|
2023-02-03 07:06:58 +00:00
|
|
|
void insertLowCardinalityColumn(const ColumnPtr & column, const String & name)
|
|
|
|
{
|
|
|
|
block.insert({column, std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), name});
|
|
|
|
}
|
|
|
|
|
2022-07-17 18:41:17 +00:00
|
|
|
void insertPartitionValueColumn(
|
|
|
|
size_t rows, const Row & partition_value, const DataTypePtr & partition_value_type, const String & name)
|
|
|
|
{
|
|
|
|
ColumnPtr column;
|
|
|
|
if (rows)
|
|
|
|
column = partition_value_type->createColumnConst(rows, Tuple(partition_value.begin(), partition_value.end()))
|
|
|
|
->convertToFullColumnIfConst();
|
|
|
|
else
|
|
|
|
column = partition_value_type->createColumn();
|
|
|
|
|
|
|
|
block.insert({column, partition_value_type, name});
|
|
|
|
}
|
|
|
|
|
|
|
|
Block & block;
|
2019-10-31 11:32:24 +00:00
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2022-04-11 13:43:09 +00:00
|
|
|
/// Adds virtual columns that are not const for all rows
|
|
|
|
static void injectNonConstVirtualColumns(
|
|
|
|
size_t rows,
|
2022-11-18 20:09:20 +00:00
|
|
|
Block & block,
|
2022-04-11 13:43:09 +00:00
|
|
|
const Names & virtual_columns)
|
|
|
|
{
|
2022-11-18 20:09:20 +00:00
|
|
|
VirtualColumnsInserter inserter(block);
|
2022-04-11 13:43:09 +00:00
|
|
|
for (const auto & virtual_column_name : virtual_columns)
|
|
|
|
{
|
|
|
|
if (virtual_column_name == "_part_offset")
|
2022-07-17 18:41:17 +00:00
|
|
|
{
|
|
|
|
if (!rows)
|
|
|
|
{
|
|
|
|
inserter.insertUInt64Column(DataTypeUInt64().createColumn(), virtual_column_name);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
if (!inserter.columnExists(virtual_column_name))
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Column {} must have been filled part reader",
|
|
|
|
virtual_column_name);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-25 14:15:15 +00:00
|
|
|
if (virtual_column_name == LightweightDeleteDescription::FILTER_COLUMN.name)
|
2022-07-17 18:41:17 +00:00
|
|
|
{
|
2022-07-18 07:36:28 +00:00
|
|
|
/// If _row_exists column isn't present in the part then fill it here with 1s
|
2022-07-17 18:41:17 +00:00
|
|
|
ColumnPtr column;
|
|
|
|
if (rows)
|
2022-07-25 14:15:15 +00:00
|
|
|
column = LightweightDeleteDescription::FILTER_COLUMN.type->createColumnConst(rows, 1)->convertToFullColumnIfConst();
|
2022-07-17 18:41:17 +00:00
|
|
|
else
|
2022-07-25 14:15:15 +00:00
|
|
|
column = LightweightDeleteDescription::FILTER_COLUMN.type->createColumn();
|
2022-07-17 18:41:17 +00:00
|
|
|
|
|
|
|
inserter.insertUInt8Column(column, virtual_column_name);
|
|
|
|
}
|
2022-04-11 13:43:09 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Adds virtual columns that are const for the whole part
|
|
|
|
static void injectPartConstVirtualColumns(
|
2021-04-27 08:15:59 +00:00
|
|
|
size_t rows,
|
2022-11-18 20:09:20 +00:00
|
|
|
Block & block,
|
2021-04-27 08:15:59 +00:00
|
|
|
MergeTreeReadTask * task,
|
|
|
|
const DataTypePtr & partition_value_type,
|
|
|
|
const Names & virtual_columns)
|
2017-04-06 17:21:45 +00:00
|
|
|
{
|
2022-11-18 20:09:20 +00:00
|
|
|
VirtualColumnsInserter inserter(block);
|
2017-04-06 17:21:45 +00:00
|
|
|
/// add virtual columns
|
|
|
|
/// Except _sample_factor, which is added from the outside.
|
2019-09-13 15:41:09 +00:00
|
|
|
if (!virtual_columns.empty())
|
2017-04-06 17:21:45 +00:00
|
|
|
{
|
2019-09-13 15:41:09 +00:00
|
|
|
if (unlikely(rows && !task))
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot insert virtual columns to non-empty chunk without specified task.");
|
2018-11-28 15:05:28 +00:00
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
const IMergeTreeDataPart * part = nullptr;
|
|
|
|
if (rows)
|
|
|
|
{
|
|
|
|
part = task->data_part.get();
|
|
|
|
if (part->isProjectionPart())
|
|
|
|
part = part->getParentPart();
|
|
|
|
}
|
2019-09-23 19:22:02 +00:00
|
|
|
for (const auto & virtual_column_name : virtual_columns)
|
2017-04-06 17:21:45 +00:00
|
|
|
{
|
2019-09-23 19:22:02 +00:00
|
|
|
if (virtual_column_name == "_part")
|
2017-04-06 17:21:45 +00:00
|
|
|
{
|
2018-02-19 03:56:08 +00:00
|
|
|
ColumnPtr column;
|
|
|
|
if (rows)
|
2023-02-02 16:13:24 +00:00
|
|
|
column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}
|
|
|
|
.createColumnConst(rows, part->name)
|
|
|
|
->convertToFullColumnIfConst();
|
2018-02-19 03:56:08 +00:00
|
|
|
else
|
2023-02-02 16:13:24 +00:00
|
|
|
column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn();
|
2018-02-19 03:56:08 +00:00
|
|
|
|
2023-02-03 07:06:58 +00:00
|
|
|
inserter.insertLowCardinalityColumn(column, virtual_column_name);
|
2017-04-06 17:21:45 +00:00
|
|
|
}
|
2019-09-23 19:22:02 +00:00
|
|
|
else if (virtual_column_name == "_part_index")
|
2017-04-06 17:21:45 +00:00
|
|
|
{
|
2018-02-19 03:56:08 +00:00
|
|
|
ColumnPtr column;
|
|
|
|
if (rows)
|
2018-10-22 08:54:54 +00:00
|
|
|
column = DataTypeUInt64().createColumnConst(rows, task->part_index_in_query)->convertToFullColumnIfConst();
|
2018-02-19 03:56:08 +00:00
|
|
|
else
|
|
|
|
column = DataTypeUInt64().createColumn();
|
|
|
|
|
2019-10-31 11:32:24 +00:00
|
|
|
inserter.insertUInt64Column(column, virtual_column_name);
|
2017-04-06 17:21:45 +00:00
|
|
|
}
|
2020-11-20 17:23:53 +00:00
|
|
|
else if (virtual_column_name == "_part_uuid")
|
|
|
|
{
|
|
|
|
ColumnPtr column;
|
|
|
|
if (rows)
|
2021-08-26 11:01:15 +00:00
|
|
|
column = DataTypeUUID().createColumnConst(rows, part->uuid)->convertToFullColumnIfConst();
|
2020-11-20 17:23:53 +00:00
|
|
|
else
|
|
|
|
column = DataTypeUUID().createColumn();
|
|
|
|
|
|
|
|
inserter.insertUUIDColumn(column, virtual_column_name);
|
|
|
|
}
|
2019-09-23 19:22:02 +00:00
|
|
|
else if (virtual_column_name == "_partition_id")
|
2018-09-10 09:53:13 +00:00
|
|
|
{
|
|
|
|
ColumnPtr column;
|
|
|
|
if (rows)
|
2023-02-02 16:13:24 +00:00
|
|
|
column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}
|
|
|
|
.createColumnConst(rows, part->info.partition_id)
|
|
|
|
->convertToFullColumnIfConst();
|
2018-09-10 09:53:13 +00:00
|
|
|
else
|
2023-02-02 16:13:24 +00:00
|
|
|
column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn();
|
2018-09-10 09:53:13 +00:00
|
|
|
|
2023-02-03 07:06:58 +00:00
|
|
|
inserter.insertLowCardinalityColumn(column, virtual_column_name);
|
2018-09-10 09:53:13 +00:00
|
|
|
}
|
2021-04-27 08:15:59 +00:00
|
|
|
else if (virtual_column_name == "_partition_value")
|
|
|
|
{
|
|
|
|
if (rows)
|
2021-08-26 11:01:15 +00:00
|
|
|
inserter.insertPartitionValueColumn(rows, part->partition.value, partition_value_type, virtual_column_name);
|
2021-04-27 08:15:59 +00:00
|
|
|
else
|
|
|
|
inserter.insertPartitionValueColumn(rows, {}, partition_value_type, virtual_column_name);
|
|
|
|
}
|
2017-04-06 17:21:45 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
void IMergeTreeSelectAlgorithm::injectVirtualColumns(
|
2022-07-17 18:41:17 +00:00
|
|
|
Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns)
|
2019-09-13 15:41:09 +00:00
|
|
|
{
|
2022-04-11 13:43:09 +00:00
|
|
|
/// First add non-const columns that are filled by the range reader and then const columns that we will fill ourselves.
|
|
|
|
/// Note that the order is important: virtual columns filled by the range reader must go first
|
2022-11-18 20:09:20 +00:00
|
|
|
injectNonConstVirtualColumns(row_count, block, virtual_columns);
|
|
|
|
injectPartConstVirtualColumns(row_count, block, task, partition_value_type, virtual_columns);
|
2019-09-13 15:41:09 +00:00
|
|
|
}
|
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
Block IMergeTreeSelectAlgorithm::applyPrewhereActions(Block block, const PrewhereInfoPtr & prewhere_info)
|
2018-04-06 13:58:06 +00:00
|
|
|
{
|
2021-02-13 22:07:13 +00:00
|
|
|
if (prewhere_info)
|
2018-04-06 13:58:06 +00:00
|
|
|
{
|
2021-02-15 19:48:06 +00:00
|
|
|
if (prewhere_info->row_level_filter)
|
|
|
|
{
|
2021-06-25 14:49:28 +00:00
|
|
|
block = prewhere_info->row_level_filter->updateHeader(std::move(block));
|
2021-02-15 19:48:06 +00:00
|
|
|
auto & row_level_column = block.getByName(prewhere_info->row_level_column_name);
|
|
|
|
if (!row_level_column.type->canBeUsedInBooleanContext())
|
2021-02-13 22:07:13 +00:00
|
|
|
{
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, "Invalid type for filter in PREWHERE: {}",
|
|
|
|
row_level_column.type->getName());
|
2021-02-13 22:07:13 +00:00
|
|
|
}
|
2021-02-20 11:00:16 +00:00
|
|
|
|
|
|
|
block.erase(prewhere_info->row_level_column_name);
|
2021-02-13 22:07:13 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (prewhere_info->prewhere_actions)
|
2021-06-25 14:49:28 +00:00
|
|
|
block = prewhere_info->prewhere_actions->updateHeader(std::move(block));
|
2021-02-13 22:07:13 +00:00
|
|
|
|
|
|
|
auto & prewhere_column = block.getByName(prewhere_info->prewhere_column_name);
|
2020-07-02 07:44:47 +00:00
|
|
|
if (!prewhere_column.type->canBeUsedInBooleanContext())
|
2021-02-13 22:07:13 +00:00
|
|
|
{
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, "Invalid type for filter in PREWHERE: {}",
|
|
|
|
prewhere_column.type->getName());
|
2021-02-13 22:07:13 +00:00
|
|
|
}
|
2020-06-30 14:20:27 +00:00
|
|
|
|
2021-02-13 22:07:13 +00:00
|
|
|
if (prewhere_info->remove_prewhere_column)
|
|
|
|
block.erase(prewhere_info->prewhere_column_name);
|
2019-11-15 03:38:35 +00:00
|
|
|
else
|
|
|
|
{
|
2021-06-21 13:01:02 +00:00
|
|
|
WhichDataType which(removeNullable(recursiveRemoveLowCardinality(prewhere_column.type)));
|
2022-10-24 08:37:52 +00:00
|
|
|
if (which.isNativeInt() || which.isNativeUInt())
|
2021-06-18 14:28:52 +00:00
|
|
|
prewhere_column.column = prewhere_column.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst();
|
|
|
|
else if (which.isFloat())
|
|
|
|
prewhere_column.column = prewhere_column.type->createColumnConst(block.rows(), 1.0f)->convertToFullColumnIfConst();
|
|
|
|
else
|
2022-10-25 11:13:41 +00:00
|
|
|
throw Exception(
|
2023-01-23 21:13:58 +00:00
|
|
|
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
|
|
|
|
"Illegal type {} of column for filter", prewhere_column.type->getName());
|
2019-11-15 03:38:35 +00:00
|
|
|
}
|
2018-04-06 13:58:06 +00:00
|
|
|
}
|
|
|
|
|
2019-09-13 15:41:09 +00:00
|
|
|
return block;
|
|
|
|
}
|
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
Block IMergeTreeSelectAlgorithm::transformHeader(
|
|
|
|
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns)
|
|
|
|
{
|
|
|
|
auto transformed = applyPrewhereActions(std::move(block), prewhere_info);
|
|
|
|
injectVirtualColumns(transformed, 0, nullptr, partition_value_type, virtual_columns);
|
|
|
|
return transformed;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::unique_ptr<MergeTreeBlockSizePredictor> IMergeTreeSelectAlgorithm::getSizePredictor(
|
2021-07-16 14:11:34 +00:00
|
|
|
const MergeTreeData::DataPartPtr & data_part,
|
|
|
|
const MergeTreeReadTaskColumns & task_columns,
|
2021-08-02 12:03:55 +00:00
|
|
|
const Block & sample_block)
|
2021-07-16 14:11:34 +00:00
|
|
|
{
|
|
|
|
const auto & required_column_names = task_columns.columns.getNames();
|
|
|
|
NameSet complete_column_names(required_column_names.begin(), required_column_names.end());
|
2022-06-07 07:03:11 +00:00
|
|
|
for (const auto & pre_columns_per_step : task_columns.pre_columns)
|
|
|
|
{
|
|
|
|
const auto & required_pre_column_names = pre_columns_per_step.getNames();
|
|
|
|
complete_column_names.insert(required_pre_column_names.begin(), required_pre_column_names.end());
|
|
|
|
}
|
2021-07-16 14:11:34 +00:00
|
|
|
|
|
|
|
return std::make_unique<MergeTreeBlockSizePredictor>(
|
2021-08-02 12:03:55 +00:00
|
|
|
data_part, Names(complete_column_names.begin(), complete_column_names.end()), sample_block);
|
2021-07-16 14:11:34 +00:00
|
|
|
}
|
2018-04-06 13:58:06 +00:00
|
|
|
|
2021-12-09 10:39:28 +00:00
|
|
|
|
2022-11-18 20:09:20 +00:00
|
|
|
IMergeTreeSelectAlgorithm::~IMergeTreeSelectAlgorithm() = default;
|
2017-04-06 17:21:45 +00:00
|
|
|
|
|
|
|
}
|