mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Introduce class Statistics
This commit is contained in:
parent
aa332b778a
commit
ec871cb3b6
@ -677,13 +677,13 @@ String IMergeTreeDataPart::getColumnNameWithMinimumCompressedSize(const NamesAnd
|
||||
return *minimum_size_column;
|
||||
}
|
||||
|
||||
ColumnsStatistics IMergeTreeDataPart::loadStatistics() const
|
||||
Statistics IMergeTreeDataPart::loadStatistics() const
|
||||
{
|
||||
const auto & metadata_snaphost = storage.getInMemoryMetadata();
|
||||
const auto & metadata_snapshot = storage.getInMemoryMetadata();
|
||||
|
||||
auto total_statistics = MergeTreeStatisticsFactory::instance().getMany(metadata_snaphost.getColumns());
|
||||
auto total_statistics = MergeTreeStatisticsFactory::instance().getMany(metadata_snapshot.getColumns());
|
||||
|
||||
ColumnsStatistics result;
|
||||
Statistics result(rows_count);
|
||||
for (auto & stat : total_statistics)
|
||||
{
|
||||
String file_name = stat->getFileName() + STATS_FILE_SUFFIX;
|
||||
@ -697,7 +697,7 @@ ColumnsStatistics IMergeTreeDataPart::loadStatistics() const
|
||||
auto stat_file = metadata_manager->read(file_name);
|
||||
CompressedReadBuffer compressed_buffer(*stat_file);
|
||||
stat->deserialize(compressed_buffer);
|
||||
result.push_back(stat);
|
||||
result.addColumnStat(stat);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -171,7 +171,7 @@ public:
|
||||
|
||||
void remove();
|
||||
|
||||
ColumnsStatistics loadStatistics() const;
|
||||
Statistics loadStatistics() const;
|
||||
|
||||
/// Initialize columns (from columns.txt if exists, or create from column files if not).
|
||||
/// Load various metadata into memory: checksums from checksums.txt, index if required, etc.
|
||||
|
@ -488,9 +488,10 @@ ConditionSelectivityEstimator MergeTreeData::getConditionSelectivityEstimatorByP
|
||||
|
||||
ASTPtr expression_ast;
|
||||
|
||||
ConditionSelectivityEstimator result;
|
||||
PartitionPruner partition_pruner(storage_snapshot->metadata, filter_dag, local_context);
|
||||
|
||||
Statistics merged_stats;
|
||||
|
||||
if (partition_pruner.isUseless())
|
||||
{
|
||||
/// Read all partitions.
|
||||
@ -498,10 +499,7 @@ ConditionSelectivityEstimator MergeTreeData::getConditionSelectivityEstimatorByP
|
||||
try
|
||||
{
|
||||
auto stats = part->loadStatistics();
|
||||
/// TODO: We only have one stats file for every part.
|
||||
result.addRows(part->rows_count);
|
||||
for (const auto & stat : stats)
|
||||
result.merge(part->info.getPartNameV1(), stat);
|
||||
merged_stats.merge(stats);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -516,9 +514,7 @@ ConditionSelectivityEstimator MergeTreeData::getConditionSelectivityEstimatorByP
|
||||
if (!partition_pruner.canBePruned(*part))
|
||||
{
|
||||
auto stats = part->loadStatistics();
|
||||
result.addRows(part->rows_count);
|
||||
for (const auto & stat : stats)
|
||||
result.merge(part->info.getPartNameV1(), stat);
|
||||
merged_stats.merge(stats);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
@ -527,7 +523,7 @@ ConditionSelectivityEstimator MergeTreeData::getConditionSelectivityEstimatorByP
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
return ConditionSelectivityEstimator(merged_stats);
|
||||
}
|
||||
|
||||
bool MergeTreeData::supportsFinal() const
|
||||
|
@ -9,47 +9,6 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
void ConditionSelectivityEstimator::ColumnSelectivityEstimator::merge(String part_name, ColumnStatisticsPtr stats)
|
||||
{
|
||||
if (part_statistics.contains(part_name))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "part {} has been added in column {}", part_name, stats->columnName());
|
||||
part_statistics[part_name] = stats;
|
||||
}
|
||||
|
||||
Float64 ConditionSelectivityEstimator::ColumnSelectivityEstimator::estimateLess(const Field & val, Float64 rows) const
|
||||
{
|
||||
if (part_statistics.empty())
|
||||
return default_cond_range_factor * rows;
|
||||
Float64 result = 0;
|
||||
Float64 part_rows = 0;
|
||||
for (const auto & [key, estimator] : part_statistics)
|
||||
{
|
||||
result += estimator->estimateLess(val);
|
||||
part_rows += estimator->rowCount();
|
||||
}
|
||||
return result * rows / part_rows;
|
||||
}
|
||||
|
||||
Float64 ConditionSelectivityEstimator::ColumnSelectivityEstimator::estimateGreater(const Field & val, Float64 rows) const
|
||||
{
|
||||
return rows - estimateLess(val, rows);
|
||||
}
|
||||
|
||||
Float64 ConditionSelectivityEstimator::ColumnSelectivityEstimator::estimateEqual(const Field & val, Float64 rows) const
|
||||
{
|
||||
if (part_statistics.empty())
|
||||
{
|
||||
return default_cond_equal_factor * rows;
|
||||
}
|
||||
Float64 result = 0;
|
||||
Float64 partial_cnt = 0;
|
||||
for (const auto & [key, estimator] : part_statistics)
|
||||
{
|
||||
result += estimator->estimateEqual(val);
|
||||
partial_cnt += estimator->rowCount();
|
||||
}
|
||||
return result * rows / partial_cnt;
|
||||
}
|
||||
|
||||
/// second return value represents how many columns in the node.
|
||||
static std::pair<String, Int32> tryToExtractSingleColumn(const RPNBuilderTreeNode & node)
|
||||
@ -84,7 +43,7 @@ static std::pair<String, Int32> tryToExtractSingleColumn(const RPNBuilderTreeNod
|
||||
return result;
|
||||
}
|
||||
|
||||
std::pair<String, Field> ConditionSelectivityEstimator::extractBinaryOp(const RPNBuilderTreeNode & node, const String & column_name) const
|
||||
static std::pair<String, Field> extractBinaryOp(const RPNBuilderTreeNode & node, const String & column_name)
|
||||
{
|
||||
if (!node.isFunction())
|
||||
return {};
|
||||
@ -126,24 +85,16 @@ std::pair<String, Field> ConditionSelectivityEstimator::extractBinaryOp(const RP
|
||||
Float64 ConditionSelectivityEstimator::estimateRowCount(const RPNBuilderTreeNode & node) const
|
||||
{
|
||||
auto result = tryToExtractSingleColumn(node);
|
||||
auto total_rows = stats.getTotalRowCount();
|
||||
|
||||
if (result.second != 1)
|
||||
return default_unknown_cond_factor * total_rows;
|
||||
|
||||
String col = result.first;
|
||||
auto it = column_estimators.find(col);
|
||||
|
||||
/// If there the estimator of the column is not found or there are no data at all,
|
||||
/// we use dummy estimation.
|
||||
bool dummy = false;
|
||||
ColumnSelectivityEstimator estimator;
|
||||
if (it != column_estimators.end())
|
||||
estimator = it->second;
|
||||
else
|
||||
dummy = true;
|
||||
|
||||
const String & col = result.first;
|
||||
auto [op, val] = extractBinaryOp(node, col);
|
||||
|
||||
if (dummy)
|
||||
/// No statistics for column col
|
||||
if (const auto column_stat = stats.getColumnStat(col); column_stat == nullptr)
|
||||
{
|
||||
if (op == "equals")
|
||||
return default_cond_equal_factor * total_rows;
|
||||
@ -152,21 +103,18 @@ Float64 ConditionSelectivityEstimator::estimateRowCount(const RPNBuilderTreeNode
|
||||
else
|
||||
return default_unknown_cond_factor * total_rows;
|
||||
}
|
||||
|
||||
if (op == "equals")
|
||||
return estimator.estimateEqual(val, total_rows);
|
||||
else if (op == "less" || op == "lessOrEquals")
|
||||
return estimator.estimateLess(val, total_rows);
|
||||
else if (op == "greater" || op == "greaterOrEquals")
|
||||
return estimator.estimateGreater(val, total_rows);
|
||||
else
|
||||
return default_unknown_cond_factor * total_rows;
|
||||
}
|
||||
{
|
||||
if (op == "equals")
|
||||
return column_stat->estimateEqual(val);
|
||||
else if (op == "less" || op == "lessOrEquals")
|
||||
return column_stat->estimateLess(val);
|
||||
else if (op == "greater" || op == "greaterOrEquals")
|
||||
return column_stat->estimateGreater(val);
|
||||
else
|
||||
return default_unknown_cond_factor * total_rows;
|
||||
}
|
||||
|
||||
void ConditionSelectivityEstimator::merge(String part_name, ColumnStatisticsPtr column_stat)
|
||||
{
|
||||
if (column_stat != nullptr)
|
||||
column_estimators[column_stat->columnName()].merge(part_name, column_stat);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/Statistics/Statistics.h>
|
||||
#include <Core/Field.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -12,39 +11,22 @@ class RPNBuilderTreeNode;
|
||||
class ConditionSelectivityEstimator
|
||||
{
|
||||
public:
|
||||
ConditionSelectivityEstimator() : stats(Statistics()) {}
|
||||
explicit ConditionSelectivityEstimator(const Statistics & stats_) : stats(stats_) {}
|
||||
|
||||
/// TODO: Support the condition consists of CNF/DNF like (cond1 and cond2) or (cond3) ...
|
||||
/// Right now we only support simple condition like col = val / col < val
|
||||
Float64 estimateRowCount(const RPNBuilderTreeNode & node) const;
|
||||
|
||||
void merge(String part_name, ColumnStatisticsPtr column_stat);
|
||||
void addRows(UInt64 part_rows) { total_rows += part_rows; }
|
||||
|
||||
private:
|
||||
friend class ColumnStatistics;
|
||||
struct ColumnSelectivityEstimator
|
||||
{
|
||||
/// We store the part_name and part_statistics.
|
||||
/// then simply get selectivity for every part_statistics and combine them.
|
||||
std::map<String, ColumnStatisticsPtr> part_statistics;
|
||||
|
||||
void merge(String part_name, ColumnStatisticsPtr stats);
|
||||
|
||||
Float64 estimateLess(const Field & val, Float64 rows) const;
|
||||
|
||||
Float64 estimateGreater(const Field & val, Float64 rows) const;
|
||||
|
||||
Float64 estimateEqual(const Field & val, Float64 rows) const;
|
||||
};
|
||||
|
||||
std::pair<String, Field> extractBinaryOp(const RPNBuilderTreeNode & node, const String & column_name) const;
|
||||
|
||||
/// Used to estimate the selectivity of a condition when there is no statistics.
|
||||
static constexpr auto default_cond_range_factor = 0.5;
|
||||
static constexpr auto default_cond_equal_factor = 0.01;
|
||||
static constexpr auto default_unknown_cond_factor = 1;
|
||||
|
||||
UInt64 total_rows = 0;
|
||||
std::map<String, ColumnSelectivityEstimator> column_estimators;
|
||||
Statistics stats;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -71,6 +71,18 @@ void ColumnStatistics::update(const ColumnPtr & column)
|
||||
stat.second->update(column);
|
||||
}
|
||||
|
||||
void ColumnStatistics::merge(const ColumnStatistics & other)
|
||||
{
|
||||
rows += other.rows;
|
||||
for (const auto & [type, stat] : other.stats)
|
||||
{
|
||||
if (!stats.contains(type))
|
||||
stats[type] = stat;
|
||||
else
|
||||
stats[type]->merge(stat);
|
||||
}
|
||||
}
|
||||
|
||||
UInt64 ISingleStatistics::estimateCardinality() const
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cardinality estimation is not implemented for this type of statistics");
|
||||
@ -195,6 +207,30 @@ UInt64 ColumnStatistics::rowCount() const
|
||||
return rows;
|
||||
}
|
||||
|
||||
void Statistics::merge(const Statistics & other)
|
||||
{
|
||||
total_row_count += other.total_row_count;
|
||||
for (const auto & [column_name, column_stat] : other.column_stats)
|
||||
{
|
||||
if (!column_stats.contains(column_name))
|
||||
column_stats[column_name]->merge(*column_stat);
|
||||
else
|
||||
column_stats[column_name]= column_stat;
|
||||
}
|
||||
}
|
||||
|
||||
void Statistics::addColumnStat(const ColumnStatisticsPtr & column_stat_)
|
||||
{
|
||||
column_stats[column_stat_->columnName()] = column_stat_;
|
||||
}
|
||||
|
||||
ColumnStatisticsPtr Statistics::getColumnStat(const String & column_name) const
|
||||
{
|
||||
if (column_stats.contains(column_name))
|
||||
return column_stats.at(column_name);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void MergeTreeStatisticsFactory::registerCreator(SingleStatisticsType stats_type, Creator creator)
|
||||
{
|
||||
if (!creators.emplace(stats_type, std::move(creator)).second)
|
||||
|
@ -19,6 +19,9 @@ struct StatisticsUtils
|
||||
static std::optional<Float64> tryConvertToFloat64(const Field & value, const DataTypePtr & data_type);
|
||||
};
|
||||
|
||||
class ISingleStatistics;
|
||||
using SingleStatisticsPtr = std::shared_ptr<ISingleStatistics>;
|
||||
|
||||
/// SingleTypeStatistics describe properties of the values in the column,
|
||||
/// e.g. how many unique values exist,
|
||||
/// what are the N most frequent values,
|
||||
@ -30,6 +33,7 @@ public:
|
||||
virtual ~ISingleStatistics() = default;
|
||||
|
||||
virtual void update(const ColumnPtr & column) = 0;
|
||||
virtual void merge(const SingleStatisticsPtr & other) = 0;
|
||||
|
||||
virtual void serialize(WriteBuffer & buf) = 0;
|
||||
virtual void deserialize(ReadBuffer & buf) = 0;
|
||||
@ -43,11 +47,12 @@ public:
|
||||
virtual Float64 estimateEqual(const Field & val) const; /// cardinality of val in the column
|
||||
virtual Float64 estimateLess(const Field & val) const; /// summarized cardinality of values < val in the column
|
||||
|
||||
String getTypeName() const { return stat.getTypeName(); }
|
||||
|
||||
protected:
|
||||
SingleStatisticsDescription stat;
|
||||
};
|
||||
|
||||
using SingleStatisticsPtr = std::shared_ptr<ISingleStatistics>;
|
||||
|
||||
class ColumnStatistics
|
||||
{
|
||||
@ -63,6 +68,7 @@ public:
|
||||
UInt64 rowCount() const;
|
||||
|
||||
void update(const ColumnPtr & column);
|
||||
void merge(const ColumnStatistics & other);
|
||||
|
||||
Float64 estimateLess(const Field & val) const;
|
||||
Float64 estimateGreater(const Field & val) const;
|
||||
@ -81,6 +87,36 @@ class ColumnsDescription;
|
||||
using ColumnStatisticsPtr = std::shared_ptr<ColumnStatistics>;
|
||||
using ColumnsStatistics = std::vector<ColumnStatisticsPtr>;
|
||||
|
||||
/**
|
||||
* Statistics represent the statistics for a part, table or node in query plan.
|
||||
* It is a computable data structure which means:
|
||||
* 1. it can accumulate statistics for a dedicated column of multiple parts.
|
||||
* 2. it can support more calculations which is needed in estimating complex predicates like `a > 1 and a < 100 or a in (1, 3, 500).
|
||||
* For example column 'a' has a minmax statistics with range [1, 100], after estimate 'a>50' the range is [51, 100].
|
||||
*/
|
||||
class Statistics
|
||||
{
|
||||
public:
|
||||
explicit Statistics() : total_row_count(0) { }
|
||||
explicit Statistics(Float64 total_row_count_) : total_row_count(total_row_count_) { }
|
||||
explicit Statistics(Float64 total_row_count_, const std::unordered_map<String, ColumnStatisticsPtr> & column_stats_)
|
||||
: total_row_count(total_row_count_), column_stats(column_stats_)
|
||||
{
|
||||
}
|
||||
|
||||
void merge(const Statistics & other);
|
||||
void addColumnStat(const ColumnStatisticsPtr & column_stat_);
|
||||
|
||||
Float64 getTotalRowCount() const { return total_row_count; }
|
||||
ColumnStatisticsPtr getColumnStat(const String & column_name) const;
|
||||
|
||||
private:
|
||||
Float64 total_row_count;
|
||||
std::unordered_map<String, ColumnStatisticsPtr> column_stats;
|
||||
};
|
||||
|
||||
using StatisticsPtr = std::shared_ptr<Statistics>;
|
||||
|
||||
class MergeTreeStatisticsFactory : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
|
@ -63,6 +63,14 @@ void StatisticsCountMinSketch::update(const ColumnPtr & column)
|
||||
}
|
||||
}
|
||||
|
||||
void StatisticsCountMinSketch::merge(const SingleStatisticsPtr & other)
|
||||
{
|
||||
if (const auto * other_stat = dynamic_cast<const StatisticsCountMinSketch *>(other.get()))
|
||||
sketch.merge(other_stat->sketch);
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to merge statistics of type {} to CountMin statistics", toString(other->getTypeName()));
|
||||
}
|
||||
|
||||
void StatisticsCountMinSketch::serialize(WriteBuffer & buf)
|
||||
{
|
||||
Sketch::vector_bytes bytes = sketch.serialize();
|
||||
|
@ -19,6 +19,7 @@ public:
|
||||
Float64 estimateEqual(const Field & val) const override;
|
||||
|
||||
void update(const ColumnPtr & column) override;
|
||||
void merge(const SingleStatisticsPtr & other) override;
|
||||
|
||||
void serialize(WriteBuffer & buf) override;
|
||||
void deserialize(ReadBuffer & buf) override;
|
||||
|
@ -35,6 +35,18 @@ void StatisticsMinMax::update(const ColumnPtr & column)
|
||||
row_count += column->size();
|
||||
}
|
||||
|
||||
void StatisticsMinMax::merge(const SingleStatisticsPtr & other)
|
||||
{
|
||||
if (const auto * other_stat = dynamic_cast<const StatisticsMinMax *>(other.get()))
|
||||
{
|
||||
min = std::min(other_stat->min, min);
|
||||
max = std::min(other_stat->max, max);
|
||||
row_count += other_stat->row_count;
|
||||
}
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to merge statistics of type {} to MinMax statistics", toString(other->getTypeName()));
|
||||
}
|
||||
|
||||
void StatisticsMinMax::serialize(WriteBuffer & buf)
|
||||
{
|
||||
writeIntBinary(row_count, buf);
|
||||
|
@ -13,6 +13,7 @@ public:
|
||||
StatisticsMinMax(const SingleStatisticsDescription & statistics_description, const DataTypePtr & data_type_);
|
||||
|
||||
void update(const ColumnPtr & column) override;
|
||||
void merge(const SingleStatisticsPtr & other) override;
|
||||
|
||||
void serialize(WriteBuffer & buf) override;
|
||||
void deserialize(ReadBuffer & buf) override;
|
||||
|
@ -27,6 +27,14 @@ void StatisticsTDigest::update(const ColumnPtr & column)
|
||||
}
|
||||
}
|
||||
|
||||
void StatisticsTDigest::merge(const SingleStatisticsPtr & other)
|
||||
{
|
||||
if (const auto * other_stat = dynamic_cast<const StatisticsTDigest *>(other.get()))
|
||||
t_digest.merge(other_stat->t_digest);
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to merge statistics of type {} to TDigest statistics", toString(other->getTypeName()));
|
||||
}
|
||||
|
||||
void StatisticsTDigest::serialize(WriteBuffer & buf)
|
||||
{
|
||||
t_digest.serialize(buf);
|
||||
|
@ -12,6 +12,7 @@ public:
|
||||
explicit StatisticsTDigest(const SingleStatisticsDescription & description, const DataTypePtr & data_type_);
|
||||
|
||||
void update(const ColumnPtr & column) override;
|
||||
void merge(const SingleStatisticsPtr & other) override;
|
||||
|
||||
void serialize(WriteBuffer & buf) override;
|
||||
void deserialize(ReadBuffer & buf) override;
|
||||
|
@ -35,6 +35,14 @@ void StatisticsUniq::update(const ColumnPtr & column)
|
||||
collector->addBatchSinglePlace(0, column->size(), data, &(raw_ptr), nullptr);
|
||||
}
|
||||
|
||||
void StatisticsUniq::merge(const SingleStatisticsPtr & other)
|
||||
{
|
||||
if (const auto * other_stat = dynamic_cast<const StatisticsUniq *>(other.get()))
|
||||
collector->merge(other_stat->data, data, arena.get());
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to merge statistics of type {} to Uniq statistics", toString(other->getTypeName()));
|
||||
}
|
||||
|
||||
void StatisticsUniq::serialize(WriteBuffer & buf)
|
||||
{
|
||||
collector->serialize(data, buf);
|
||||
|
@ -14,6 +14,7 @@ public:
|
||||
~StatisticsUniq() override;
|
||||
|
||||
void update(const ColumnPtr & column) override;
|
||||
void merge(const SingleStatisticsPtr & other) override;
|
||||
|
||||
void serialize(WriteBuffer & buf) override;
|
||||
void deserialize(ReadBuffer & buf) override;
|
||||
|
Loading…
Reference in New Issue
Block a user