ClickHouse/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

365 lines
13 KiB
C++
Raw Normal View History

#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/MergeTreeData.h>
2022-09-05 16:55:00 +00:00
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
2021-04-24 04:09:01 +00:00
#include <DataTypes/NestedUtils.h>
2020-10-23 17:57:17 +00:00
#include <Core/NamesAndTypes.h>
2020-09-16 13:24:07 +00:00
#include <Common/checkStackSize.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnConst.h>
2022-06-13 13:00:26 +00:00
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <unordered_set>
2018-01-10 00:04:08 +00:00
namespace DB
{
2020-02-25 18:10:48 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NO_SUCH_COLUMN_IN_TABLE;
2020-02-25 18:10:48 +00:00
}
2020-09-15 11:17:58 +00:00
namespace
{
/// Columns absent in part may depend on other absent columns so we are
/// searching all required physical columns recursively. Return true if found at
/// least one existing (physical) column in part.
2020-09-15 11:17:58 +00:00
bool injectRequiredColumnsRecursively(
const String & column_name,
2022-03-28 17:21:47 +00:00
const StorageSnapshotPtr & storage_snapshot,
2022-09-05 16:55:00 +00:00
const AlterConversions & alter_conversions,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
2022-04-27 13:38:09 +00:00
const GetColumnsOptions & options,
2020-09-15 11:17:58 +00:00
Names & columns,
NameSet & required_columns,
NameSet & injected_columns)
{
2020-09-16 13:24:07 +00:00
/// This is needed to prevent stack overflow in case of cyclic defaults or
/// huge AST which for some reason was not validated on parsing/interpreter
/// stages.
checkStackSize();
2020-09-15 11:17:58 +00:00
2022-03-28 17:21:47 +00:00
auto column_in_storage = storage_snapshot->tryGetColumn(options, column_name);
if (column_in_storage)
2020-09-15 11:17:58 +00:00
{
auto column_name_in_part = column_in_storage->getNameInStorage();
2020-10-23 17:57:17 +00:00
if (alter_conversions.isColumnRenamed(column_name_in_part))
column_name_in_part = alter_conversions.getColumnOldName(column_name_in_part);
2022-09-05 16:55:00 +00:00
auto column_in_part = data_part_info_for_reader.getColumns().tryGetByName(column_name_in_part);
2020-10-23 17:57:17 +00:00
if (column_in_part
&& (!column_in_storage->isSubcolumn()
|| column_in_part->type->tryGetSubcolumnType(column_in_storage->getSubcolumnName())))
2020-09-15 11:17:58 +00:00
{
2020-10-23 17:57:17 +00:00
/// ensure each column is added only once
if (!required_columns.contains(column_name))
2020-10-23 17:57:17 +00:00
{
columns.emplace_back(column_name);
required_columns.emplace(column_name);
injected_columns.emplace(column_name);
}
return true;
2020-09-15 11:17:58 +00:00
}
}
/// Column doesn't have default value and don't exist in part
/// don't need to add to required set.
2022-03-28 17:21:47 +00:00
auto metadata_snapshot = storage_snapshot->getMetadataForQuery();
const auto column_default = metadata_snapshot->getColumns().getDefault(column_name);
2020-09-15 11:17:58 +00:00
if (!column_default)
return false;
/// collect identifiers required for evaluation
IdentifierNameSet identifiers;
column_default->expression->collectIdentifierNames(identifiers);
bool result = false;
for (const auto & identifier : identifiers)
2022-04-27 13:38:09 +00:00
result |= injectRequiredColumnsRecursively(
2022-09-05 16:55:00 +00:00
identifier, storage_snapshot, alter_conversions, data_part_info_for_reader,
2022-04-27 13:38:09 +00:00
options, columns, required_columns, injected_columns);
2020-09-15 11:17:58 +00:00
return result;
}
}
2019-11-21 16:10:22 +00:00
2022-03-28 17:21:47 +00:00
NameSet injectRequiredColumns(
2022-09-05 16:55:00 +00:00
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
2022-03-28 17:21:47 +00:00
const StorageSnapshotPtr & storage_snapshot,
2022-04-27 13:38:09 +00:00
bool with_subcolumns,
2022-03-28 17:21:47 +00:00
Names & columns)
{
NameSet required_columns{std::begin(columns), std::end(columns)};
NameSet injected_columns;
2020-09-15 11:17:58 +00:00
bool have_at_least_one_physical_column = false;
2022-09-05 16:55:00 +00:00
AlterConversions alter_conversions;
if (!data_part_info_for_reader.isProjectionPart())
alter_conversions = data_part_info_for_reader.getAlterConversions();
2022-03-28 17:21:47 +00:00
2022-07-21 19:50:19 +00:00
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical)
.withExtendedObjects()
.withSystemColumns();
2022-04-27 13:38:09 +00:00
if (with_subcolumns)
options.withSubcolumns();
for (size_t i = 0; i < columns.size(); ++i)
{
2022-07-21 19:50:19 +00:00
/// We are going to fetch only physical columns and system columns
2022-03-28 17:21:47 +00:00
if (!storage_snapshot->tryGetColumn(options, columns[i]))
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "There is no physical column or subcolumn {} in table", columns[i]);
2020-09-15 11:17:58 +00:00
have_at_least_one_physical_column |= injectRequiredColumnsRecursively(
2022-03-28 17:21:47 +00:00
columns[i], storage_snapshot, alter_conversions,
2022-09-05 16:55:00 +00:00
data_part_info_for_reader, options, columns, required_columns, injected_columns);
}
/** Add a column of the minimum size.
* Used in case when no column is needed or files are missing, but at least you need to know number of rows.
* Adds to the columns.
*/
2020-09-15 11:17:58 +00:00
if (!have_at_least_one_physical_column)
{
2022-09-05 16:55:00 +00:00
const auto minimum_size_column_name = data_part_info_for_reader.getColumnNameWithMinimumCompressedSize(with_subcolumns);
columns.push_back(minimum_size_column_name);
/// correctly report added column
injected_columns.insert(columns.back());
}
return injected_columns;
}
MergeTreeReadTask::MergeTreeReadTask(
2022-09-05 16:55:00 +00:00
const MergeTreeData::DataPartPtr & data_part_,
const MarkRanges & mark_ranges_,
size_t part_index_in_query_,
const Names & ordered_names_,
const NameSet & column_name_set_,
const MergeTreeReadTaskColumns & task_columns_,
bool remove_prewhere_column_,
MergeTreeBlockSizePredictorPtr && size_predictor_)
2022-09-05 16:55:00 +00:00
: data_part{data_part_}
, mark_ranges{mark_ranges_}
, part_index_in_query{part_index_in_query_}
, ordered_names{ordered_names_}
, column_name_set{column_name_set_}
, task_columns{task_columns_}
, remove_prewhere_column{remove_prewhere_column_}
, size_predictor{std::move(size_predictor_)}
{
}
MergeTreeBlockSizePredictor::MergeTreeBlockSizePredictor(
const MergeTreeData::DataPartPtr & data_part_, const Names & columns, const Block & sample_block)
: data_part(data_part_)
{
number_of_rows_in_part = data_part->rows_count;
2019-01-22 19:56:53 +00:00
/// Initialize with sample block until update won't called.
2019-09-26 17:29:41 +00:00
initialize(sample_block, {}, columns);
}
2019-09-26 17:29:41 +00:00
void MergeTreeBlockSizePredictor::initialize(const Block & sample_block, const Columns & columns, const Names & names, bool from_update)
{
fixed_columns_bytes_per_row = 0;
dynamic_columns_infos.clear();
std::unordered_set<String> names_set;
if (!from_update)
2019-09-26 17:29:41 +00:00
names_set.insert(names.begin(), names.end());
2019-09-26 17:29:41 +00:00
size_t num_columns = sample_block.columns();
for (size_t pos = 0; pos < num_columns; ++pos)
{
2019-09-26 17:29:41 +00:00
const auto & column_with_type_and_name = sample_block.getByPosition(pos);
const String & column_name = column_with_type_and_name.name;
2019-09-26 17:29:41 +00:00
const ColumnPtr & column_data = from_update ? columns[pos]
: column_with_type_and_name.column;
if (!from_update && !names_set.contains(column_name))
continue;
/// At least PREWHERE filter column might be const.
if (typeid_cast<const ColumnConst *>(column_data.get()))
continue;
if (column_data->valuesHaveFixedSize())
{
size_t size_of_value = column_data->sizeOfValueIfFixed();
fixed_columns_bytes_per_row += column_data->sizeOfValueIfFixed();
max_size_per_row_fixed = std::max<size_t>(max_size_per_row_fixed, size_of_value);
}
else
{
ColumnInfo info;
info.name = column_name;
/// If column isn't fixed and doesn't have checksum, than take first
ColumnSize column_size = data_part->getColumnSize(column_name);
info.bytes_per_row_global = column_size.data_uncompressed
? column_size.data_uncompressed / number_of_rows_in_part
: column_data->byteSize() / std::max<size_t>(1, column_data->size());
dynamic_columns_infos.emplace_back(info);
}
}
bytes_per_row_global = fixed_columns_bytes_per_row;
for (auto & info : dynamic_columns_infos)
{
info.bytes_per_row = info.bytes_per_row_global;
bytes_per_row_global += info.bytes_per_row_global;
max_size_per_row_dynamic = std::max<double>(max_size_per_row_dynamic, info.bytes_per_row);
}
bytes_per_row_current = bytes_per_row_global;
}
void MergeTreeBlockSizePredictor::startBlock()
{
block_size_bytes = 0;
block_size_rows = 0;
for (auto & info : dynamic_columns_infos)
info.size_bytes = 0;
}
/// TODO: add last_read_row_in_part parameter to take into account gaps between adjacent ranges
2019-09-26 17:29:41 +00:00
void MergeTreeBlockSizePredictor::update(const Block & sample_block, const Columns & columns, size_t num_rows, double decay)
{
2019-09-26 17:29:41 +00:00
if (columns.size() != sample_block.columns())
throw Exception("Inconsistent number of columns passed to MergeTreeBlockSizePredictor. "
"Have " + toString(sample_block.columns()) + " in sample block "
"and " + toString(columns.size()) + " columns in list", ErrorCodes::LOGICAL_ERROR);
if (!is_initialized_in_update)
{
/// Reinitialize with read block to update estimation for DEFAULT and MATERIALIZED columns without data.
2019-09-26 17:29:41 +00:00
initialize(sample_block, columns, {}, true);
is_initialized_in_update = true;
}
2019-09-26 17:29:41 +00:00
if (num_rows < block_size_rows)
{
2019-09-26 17:29:41 +00:00
throw Exception("Updated block has less rows (" + toString(num_rows) + ") than previous one (" + toString(block_size_rows) + ")",
ErrorCodes::LOGICAL_ERROR);
}
2019-09-26 17:29:41 +00:00
size_t diff_rows = num_rows - block_size_rows;
block_size_bytes = num_rows * fixed_columns_bytes_per_row;
bytes_per_row_current = fixed_columns_bytes_per_row;
2019-09-26 17:29:41 +00:00
block_size_rows = num_rows;
/// Make recursive updates for each read row: v_{i+1} = (1 - decay) v_{i} + decay v_{target}
2019-01-22 19:56:53 +00:00
/// Use sum of geometric sequence formula to update multiple rows: v{n} = (1 - decay)^n v_{0} + (1 - (1 - decay)^n) v_{target}
/// NOTE: DEFAULT and MATERIALIZED columns without data has inaccurate estimation of v_{target}
2017-04-15 03:32:33 +00:00
double alpha = std::pow(1. - decay, diff_rows);
max_size_per_row_dynamic = 0;
for (auto & info : dynamic_columns_infos)
{
2019-09-26 17:29:41 +00:00
size_t new_size = columns[sample_block.getPositionByName(info.name)]->byteSize();
2017-04-15 03:32:33 +00:00
size_t diff_size = new_size - info.size_bytes;
2017-04-15 03:32:33 +00:00
double local_bytes_per_row = static_cast<double>(diff_size) / diff_rows;
info.bytes_per_row = alpha * info.bytes_per_row + (1. - alpha) * local_bytes_per_row;
info.size_bytes = new_size;
block_size_bytes += new_size;
bytes_per_row_current += info.bytes_per_row;
max_size_per_row_dynamic = std::max<double>(max_size_per_row_dynamic, info.bytes_per_row);
}
}
2019-07-19 14:56:00 +00:00
MergeTreeReadTaskColumns getReadTaskColumns(
2022-09-05 16:55:00 +00:00
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,
const Names & required_columns,
2022-07-21 19:50:19 +00:00
const Names & system_columns,
2022-04-27 13:38:09 +00:00
const PrewhereInfoPtr & prewhere_info,
bool with_subcolumns)
2019-07-19 14:56:00 +00:00
{
Names column_names = required_columns;
Names pre_column_names;
2022-07-21 19:50:19 +00:00
/// Read system columns such as lightweight delete mask "_row_exists" if it is persisted in the part
for (const auto & name : system_columns)
{
2022-09-05 16:55:00 +00:00
if (data_part_info_for_reader.getColumns().contains(name))
column_names.push_back(name);
}
2019-07-19 14:56:00 +00:00
/// inject columns required for defaults evaluation
2022-06-13 10:06:28 +00:00
injectRequiredColumns(
2022-09-05 16:55:00 +00:00
data_part_info_for_reader, storage_snapshot, with_subcolumns, column_names);
2019-07-19 14:56:00 +00:00
MergeTreeReadTaskColumns result;
2022-07-21 19:50:19 +00:00
auto options = GetColumnsOptions(GetColumnsOptions::All)
.withExtendedObjects()
.withSystemColumns();
if (with_subcolumns)
options.withSubcolumns();
if (prewhere_info)
2019-07-19 14:56:00 +00:00
{
NameSet pre_name_set;
2022-06-13 13:00:26 +00:00
/// Add column reading steps:
/// 1. Columns for row level filter
2022-05-30 17:58:23 +00:00
if (prewhere_info->row_level_filter)
2021-02-20 11:00:16 +00:00
{
2022-06-08 22:00:36 +00:00
Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames();
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, row_filter_column_names));
pre_name_set.insert(row_filter_column_names.begin(), row_filter_column_names.end());
}
/// 2. Columns for prewhere
2022-06-08 22:00:36 +00:00
Names all_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
2021-02-20 11:00:16 +00:00
2022-04-27 13:38:09 +00:00
const auto injected_pre_columns = injectRequiredColumns(
2022-09-05 16:55:00 +00:00
data_part_info_for_reader, storage_snapshot, with_subcolumns, all_pre_column_names);
2022-04-27 13:38:09 +00:00
2022-06-08 22:00:36 +00:00
for (const auto & name : all_pre_column_names)
{
if (pre_name_set.contains(name))
continue;
pre_column_names.push_back(name);
pre_name_set.insert(name);
}
2019-07-19 14:56:00 +00:00
Names post_column_names;
for (const auto & name : column_names)
if (!pre_name_set.contains(name))
2019-07-19 14:56:00 +00:00
post_column_names.push_back(name);
column_names = post_column_names;
}
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, pre_column_names));
2022-06-13 10:06:28 +00:00
/// 3. Rest of the requested columns
result.columns = storage_snapshot->getColumnsByNames(options, column_names);
2019-07-19 14:56:00 +00:00
return result;
}
2022-06-13 13:00:26 +00:00
std::string MergeTreeReadTaskColumns::dump() const
{
WriteBufferFromOwnString s;
for (size_t i = 0; i < pre_columns.size(); ++i)
{
s << "STEP " << i << ": " << pre_columns[i].toString() << "\n";
}
s << "COLUMNS: " << columns.toString() << "\n";
return s.str();
}
}