Merge branch 'fix-system-numbers-rows-approx' into tighten-limits-functional-tests

This commit is contained in:
Alexey Milovidov 2024-07-24 07:50:02 +02:00
commit d7994cb317
34 changed files with 503 additions and 134 deletions

View File

@ -9,6 +9,7 @@ set(DATASKETCHES_LIBRARY theta)
add_library(_datasketches INTERFACE)
target_include_directories(_datasketches SYSTEM BEFORE INTERFACE
"${ClickHouse_SOURCE_DIR}/contrib/datasketches-cpp/common/include"
"${ClickHouse_SOURCE_DIR}/contrib/datasketches-cpp/count/include"
"${ClickHouse_SOURCE_DIR}/contrib/datasketches-cpp/theta/include")
add_library(ch_contrib::datasketches ALIAS _datasketches)

View File

@ -999,6 +999,10 @@ They can be used for prewhere optimization only if we enable `set allow_statisti
[HyperLogLog](https://en.wikipedia.org/wiki/HyperLogLog) sketches which provide an estimation how many distinct values a column contains.
- `count_min`
[Count-min](https://en.wikipedia.org/wiki/Count%E2%80%93min_sketch) sketches which provide an approximate count of the frequency of each value in a column.
## Column-level Settings {#column-level-settings}
Certain MergeTree settings can be override at column level:

View File

@ -543,7 +543,7 @@ if (TARGET ch_contrib::libpqxx)
endif()
if (TARGET ch_contrib::datasketches)
target_link_libraries (clickhouse_aggregate_functions PRIVATE ch_contrib::datasketches)
dbms_target_link_libraries(PUBLIC ch_contrib::datasketches)
endif ()
target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::lz4)

View File

@ -1,5 +1,8 @@
#pragma once
/// CLion freezes for a minute on every keypress in any file including this.
#if !defined(__CLION_IDE__)
#include <Common/NamePrompter.h>
#include <Core/BaseSettings.h>
#include <Core/SettingsEnums.h>
@ -1348,3 +1351,5 @@ struct FormatFactorySettings : public BaseSettings<FormatFactorySettingsTraits>
};
}
#endif

View File

@ -545,7 +545,7 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type, const ID
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNEXPECTED_DATA_AFTER_PARSED_VALUE)
throw Exception(ErrorCodes::TYPE_MISMATCH, "Cannot convert string {} to type {}", src.get<String>(), type.getName());
throw Exception(ErrorCodes::TYPE_MISMATCH, "Cannot convert string '{}' to type {}", src.get<String>(), type.getName());
e.addMessage(fmt::format("while converting '{}' to {}", src.get<String>(), type.getName()));
throw;

View File

@ -147,7 +147,7 @@ INSTANTIATE_TEST_SUITE_P(
DecimalField(DateTime64(123 * Day * 1'000'000), 6)
}
})
);
);
INSTANTIATE_TEST_SUITE_P(
DateTimeToDateTime64,
@ -179,3 +179,84 @@ INSTANTIATE_TEST_SUITE_P(
},
})
);
INSTANTIATE_TEST_SUITE_P(
StringToNumber,
ConvertFieldToTypeTest,
::testing::ValuesIn(std::initializer_list<ConvertFieldToTypeTestParams>{
{
"String",
Field("1"),
"Int8",
Field(1)
},
{
"String",
Field("256"),
"Int8",
Field()
},
{
"String",
Field("not a number"),
"Int8",
{}
},
{
"String",
Field("1.1"),
"Int8",
{} /// we can not convert '1.1' to Int8
},
{
"String",
Field("1.1"),
"Float64",
Field(1.1)
},
})
);
INSTANTIATE_TEST_SUITE_P(
NumberToString,
ConvertFieldToTypeTest,
::testing::ValuesIn(std::initializer_list<ConvertFieldToTypeTestParams>{
{
"Int8",
Field(1),
"String",
Field("1")
},
{
"Int8",
Field(-1),
"String",
Field("-1")
},
{
"Float64",
Field(1.1),
"String",
Field("1.1")
},
})
);
INSTANTIATE_TEST_SUITE_P(
StringToDate,
ConvertFieldToTypeTest,
::testing::ValuesIn(std::initializer_list<ConvertFieldToTypeTestParams>{
{
"String",
Field("2024-07-12"),
"Date",
Field(static_cast<UInt16>(19916))
},
{
"String",
Field("not a date"),
"Date",
{}
},
})
);

View File

@ -9,7 +9,7 @@ namespace DB
{
/** The SELECT subquery is in parenthesis.
/** The SELECT subquery, in parentheses.
*/
class ParserSubquery : public IParserBase
{

View File

@ -11,15 +11,12 @@
namespace DB
{
bool ParserDescribeTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_describe(Keyword::DESCRIBE);
ParserKeyword s_desc(Keyword::DESC);
ParserKeyword s_table(Keyword::TABLE);
ParserKeyword s_settings(Keyword::SETTINGS);
ParserToken s_dot(TokenType::Dot);
ParserIdentifier name_p;
ParserSetQuery parser_settings(true);
ASTPtr database;
@ -53,5 +50,4 @@ bool ParserDescribeTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & ex
return true;
}
}

View File

@ -596,12 +596,12 @@ Pipe ReadFromSystemNumbersStep::makePipe()
numbers_storage.step,
step_between_chunks);
if (numbers_storage.limit && i == 0)
if (end && i == 0)
{
auto rows_appr = itemCountInRange(numbers_storage.offset, *numbers_storage.limit, numbers_storage.step);
if (limit > 0 && limit < rows_appr)
rows_appr = query_info_limit;
source->addTotalRowsApprox(rows_appr);
UInt64 rows_approx = itemCountInRange(numbers_storage.offset, *end, numbers_storage.step);
if (limit > 0 && limit < rows_approx)
rows_approx = query_info_limit;
source->addTotalRowsApprox(rows_approx);
}
pipe.addSource(std::move(source));

View File

@ -304,7 +304,7 @@ void RefreshTask::refreshTask()
{
PreformattedMessage message = getCurrentExceptionMessageAndPattern(true);
auto text = message.text;
message.text = fmt::format("Refresh failed: {}", message.text);
message.text = fmt::format("Refresh view {} failed: {}", view->getStorageID().getFullTableName(), message.text);
LOG_ERROR(log, message);
exception = text;
}

View File

@ -499,8 +499,9 @@ ConditionSelectivityEstimator MergeTreeData::getConditionSelectivityEstimatorByP
{
auto stats = part->loadStatistics();
/// TODO: We only have one stats file for every part.
result.addRows(part->rows_count);
for (const auto & stat : stats)
result.merge(part->info.getPartNameV1(), part->rows_count, stat);
result.merge(part->info.getPartNameV1(), stat);
}
catch (...)
{
@ -515,8 +516,9 @@ ConditionSelectivityEstimator MergeTreeData::getConditionSelectivityEstimatorByP
if (!partition_pruner.canBePruned(*part))
{
auto stats = part->loadStatistics();
result.addRows(part->rows_count);
for (const auto & stat : stats)
result.merge(part->info.getPartNameV1(), part->rows_count, stat);
result.merge(part->info.getPartNameV1(), stat);
}
}
catch (...)

View File

@ -16,7 +16,7 @@ void ConditionSelectivityEstimator::ColumnSelectivityEstimator::merge(String par
part_statistics[part_name] = stats;
}
Float64 ConditionSelectivityEstimator::ColumnSelectivityEstimator::estimateLess(Float64 val, Float64 rows) const
Float64 ConditionSelectivityEstimator::ColumnSelectivityEstimator::estimateLess(const Field & val, Float64 rows) const
{
if (part_statistics.empty())
return default_normal_cond_factor * rows;
@ -30,16 +30,19 @@ Float64 ConditionSelectivityEstimator::ColumnSelectivityEstimator::estimateLess(
return result * rows / part_rows;
}
Float64 ConditionSelectivityEstimator::ColumnSelectivityEstimator::estimateGreater(Float64 val, Float64 rows) const
Float64 ConditionSelectivityEstimator::ColumnSelectivityEstimator::estimateGreater(const Field & val, Float64 rows) const
{
return rows - estimateLess(val, rows);
}
Float64 ConditionSelectivityEstimator::ColumnSelectivityEstimator::estimateEqual(Float64 val, Float64 rows) const
Float64 ConditionSelectivityEstimator::ColumnSelectivityEstimator::estimateEqual(const Field & val, Float64 rows) const
{
if (part_statistics.empty())
{
if (val < - threshold || val > threshold)
auto float_val = StatisticsUtils::tryConvertToFloat64(val);
if (!float_val)
return default_unknown_cond_factor * rows;
else if (float_val.value() < - threshold || float_val.value() > threshold)
return default_normal_cond_factor * rows;
else
return default_good_cond_factor * rows;
@ -87,7 +90,7 @@ static std::pair<String, Int32> tryToExtractSingleColumn(const RPNBuilderTreeNod
return result;
}
std::pair<String, Float64> ConditionSelectivityEstimator::extractBinaryOp(const RPNBuilderTreeNode & node, const String & column_name) const
std::pair<String, Field> ConditionSelectivityEstimator::extractBinaryOp(const RPNBuilderTreeNode & node, const String & column_name) const
{
if (!node.isFunction())
return {};
@ -123,48 +126,35 @@ std::pair<String, Float64> ConditionSelectivityEstimator::extractBinaryOp(const
DataTypePtr output_type;
if (!constant_node->tryGetConstant(output_value, output_type))
return {};
const auto type = output_value.getType();
Float64 value;
if (type == Field::Types::Int64)
value = output_value.get<Int64>();
else if (type == Field::Types::UInt64)
value = output_value.get<UInt64>();
else if (type == Field::Types::Float64)
value = output_value.get<Float64>();
else
return {};
return std::make_pair(function_name, value);
return std::make_pair(function_name, output_value);
}
Float64 ConditionSelectivityEstimator::estimateRowCount(const RPNBuilderTreeNode & node) const
{
auto result = tryToExtractSingleColumn(node);
if (result.second != 1)
{
return default_unknown_cond_factor;
}
return default_unknown_cond_factor * total_rows;
String col = result.first;
auto it = column_estimators.find(col);
/// If there the estimator of the column is not found or there are no data at all,
/// we use dummy estimation.
bool dummy = total_rows == 0;
bool dummy = false;
ColumnSelectivityEstimator estimator;
if (it != column_estimators.end())
{
estimator = it->second;
}
else
{
dummy = true;
}
auto [op, val] = extractBinaryOp(node, col);
if (op == "equals")
{
if (dummy)
{
if (val < - threshold || val > threshold)
auto float_val = StatisticsUtils::tryConvertToFloat64(val);
if (!float_val || (float_val < - threshold || float_val > threshold))
return default_normal_cond_factor * total_rows;
else
return default_good_cond_factor * total_rows;
@ -187,13 +177,8 @@ Float64 ConditionSelectivityEstimator::estimateRowCount(const RPNBuilderTreeNode
return default_unknown_cond_factor * total_rows;
}
void ConditionSelectivityEstimator::merge(String part_name, UInt64 part_rows, ColumnStatisticsPtr column_stat)
void ConditionSelectivityEstimator::merge(String part_name, ColumnStatisticsPtr column_stat)
{
if (!part_names.contains(part_name))
{
total_rows += part_rows;
part_names.insert(part_name);
}
if (column_stat != nullptr)
column_estimators[column_stat->columnName()].merge(part_name, column_stat);
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/Statistics/Statistics.h>
#include <Core/Field.h>
namespace DB
{
@ -10,6 +11,14 @@ class RPNBuilderTreeNode;
/// It estimates the selectivity of a condition.
class ConditionSelectivityEstimator
{
public:
/// TODO: Support the condition consists of CNF/DNF like (cond1 and cond2) or (cond3) ...
/// Right now we only support simple condition like col = val / col < val
Float64 estimateRowCount(const RPNBuilderTreeNode & node) const;
void merge(String part_name, ColumnStatisticsPtr column_stat);
void addRows(UInt64 part_rows) { total_rows += part_rows; }
private:
friend class ColumnStatistics;
struct ColumnSelectivityEstimator
@ -20,13 +29,15 @@ private:
void merge(String part_name, ColumnStatisticsPtr stats);
Float64 estimateLess(Float64 val, Float64 rows) const;
Float64 estimateLess(const Field & val, Float64 rows) const;
Float64 estimateGreater(Float64 val, Float64 rows) const;
Float64 estimateGreater(const Field & val, Float64 rows) const;
Float64 estimateEqual(Float64 val, Float64 rows) const;
Float64 estimateEqual(const Field & val, Float64 rows) const;
};
std::pair<String, Field> extractBinaryOp(const RPNBuilderTreeNode & node, const String & column_name) const;
static constexpr auto default_good_cond_factor = 0.1;
static constexpr auto default_normal_cond_factor = 0.5;
static constexpr auto default_unknown_cond_factor = 1.0;
@ -35,16 +46,7 @@ private:
static constexpr auto threshold = 2;
UInt64 total_rows = 0;
std::set<String> part_names;
std::map<String, ColumnSelectivityEstimator> column_estimators;
std::pair<String, Float64> extractBinaryOp(const RPNBuilderTreeNode & node, const String & column_name) const;
public:
/// TODO: Support the condition consists of CNF/DNF like (cond1 and cond2) or (cond3) ...
/// Right now we only support simple condition like col = val / col < val
Float64 estimateRowCount(const RPNBuilderTreeNode & node) const;
void merge(String part_name, UInt64 part_rows, ColumnStatisticsPtr column_stat);
};
}

View File

@ -1,15 +1,18 @@
#include <Storages/Statistics/Statistics.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/Statistics/ConditionSelectivityEstimator.h>
#include <Storages/Statistics/StatisticsCountMinSketch.h>
#include <Storages/Statistics/StatisticsTDigest.h>
#include <Storages/Statistics/StatisticsUniq.h>
#include <Storages/StatisticsDescription.h>
#include <Storages/ColumnsDescription.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include "config.h" /// USE_DATASKETCHES
namespace DB
{
@ -24,6 +27,36 @@ enum StatisticsFileVersion : UInt16
V0 = 0,
};
std::optional<Float64> StatisticsUtils::tryConvertToFloat64(const Field & field)
{
switch (field.getType())
{
case Field::Types::Int64:
return field.get<Int64>();
case Field::Types::UInt64:
return field.get<UInt64>();
case Field::Types::Float64:
return field.get<Float64>();
case Field::Types::Int128:
return field.get<Int128>();
case Field::Types::UInt128:
return field.get<UInt128>();
case Field::Types::Int256:
return field.get<Int256>();
case Field::Types::UInt256:
return field.get<UInt256>();
default:
return {};
}
}
std::optional<String> StatisticsUtils::tryConvertToString(const DB::Field & field)
{
if (field.getType() == Field::Types::String)
return field.get<String>();
return {};
}
IStatistics::IStatistics(const SingleStatisticsDescription & stat_)
: stat(stat_)
{
@ -46,12 +79,12 @@ UInt64 IStatistics::estimateCardinality() const
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cardinality estimation is not implemented for this type of statistics");
}
Float64 IStatistics::estimateEqual(Float64 /*val*/) const
Float64 IStatistics::estimateEqual(const Field & /*val*/) const
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Equality estimation is not implemented for this type of statistics");
}
Float64 IStatistics::estimateLess(Float64 /*val*/) const
Float64 IStatistics::estimateLess(const Field & /*val*/) const
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Less-than estimation is not implemented for this type of statistics");
}
@ -66,27 +99,32 @@ Float64 IStatistics::estimateLess(Float64 /*val*/) const
/// For that reason, all estimation are performed in a central place (here), and we don't simply pass the predicate to the first statistics
/// object that supports it natively.
Float64 ColumnStatistics::estimateLess(Float64 val) const
Float64 ColumnStatistics::estimateLess(const Field & val) const
{
if (stats.contains(StatisticsType::TDigest))
return stats.at(StatisticsType::TDigest)->estimateLess(val);
return rows * ConditionSelectivityEstimator::default_normal_cond_factor;
}
Float64 ColumnStatistics::estimateGreater(Float64 val) const
Float64 ColumnStatistics::estimateGreater(const Field & val) const
{
return rows - estimateLess(val);
}
Float64 ColumnStatistics::estimateEqual(Float64 val) const
Float64 ColumnStatistics::estimateEqual(const Field & val) const
{
if (stats.contains(StatisticsType::Uniq) && stats.contains(StatisticsType::TDigest))
auto float_val = StatisticsUtils::tryConvertToFloat64(val);
if (float_val.has_value() && stats.contains(StatisticsType::Uniq) && stats.contains(StatisticsType::TDigest))
{
/// 2048 is the default number of buckets in TDigest. In this case, TDigest stores exactly one value (with many rows) for every bucket.
if (stats.at(StatisticsType::Uniq)->estimateCardinality() < 2048)
return stats.at(StatisticsType::TDigest)->estimateEqual(val);
}
if (val < - ConditionSelectivityEstimator::threshold || val > ConditionSelectivityEstimator::threshold)
#if USE_DATASKETCHES
if (stats.contains(StatisticsType::CountMinSketch))
return stats.at(StatisticsType::CountMinSketch)->estimateEqual(val);
#endif
if (!float_val.has_value() && (float_val < - ConditionSelectivityEstimator::threshold || float_val > ConditionSelectivityEstimator::threshold))
return rows * ConditionSelectivityEstimator::default_normal_cond_factor;
else
return rows * ConditionSelectivityEstimator::default_good_cond_factor;
@ -166,11 +204,16 @@ void MergeTreeStatisticsFactory::registerValidator(StatisticsType stats_type, Va
MergeTreeStatisticsFactory::MergeTreeStatisticsFactory()
{
registerValidator(StatisticsType::TDigest, TDigestValidator);
registerCreator(StatisticsType::TDigest, TDigestCreator);
registerValidator(StatisticsType::TDigest, tdigestValidator);
registerCreator(StatisticsType::TDigest, tdigestCreator);
registerValidator(StatisticsType::Uniq, UniqValidator);
registerCreator(StatisticsType::Uniq, UniqCreator);
registerValidator(StatisticsType::Uniq, uniqValidator);
registerCreator(StatisticsType::Uniq, uniqCreator);
#if USE_DATASKETCHES
registerValidator(StatisticsType::CountMinSketch, countMinSketchValidator);
registerCreator(StatisticsType::CountMinSketch, countMinSketchCreator);
#endif
}
MergeTreeStatisticsFactory & MergeTreeStatisticsFactory::instance()
@ -197,7 +240,7 @@ ColumnStatisticsPtr MergeTreeStatisticsFactory::get(const ColumnStatisticsDescri
{
auto it = creators.find(type);
if (it == creators.end())
throw Exception(ErrorCodes::INCORRECT_QUERY, "Unknown statistic type '{}'. Available types: 'tdigest' 'uniq'", type);
throw Exception(ErrorCodes::INCORRECT_QUERY, "Unknown statistic type '{}'. Available types: 'tdigest' 'uniq' and 'count_min'", type);
auto stat_ptr = (it->second)(desc, stats.data_type);
column_stat->stats[type] = stat_ptr;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Block.h>
#include <Core/Field.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <Storages/StatisticsDescription.h>
@ -13,6 +14,14 @@ namespace DB
constexpr auto STATS_FILE_PREFIX = "statistics_";
constexpr auto STATS_FILE_SUFFIX = ".stats";
struct StatisticsUtils
{
/// Returns std::nullopt if input Field cannot be converted to a concrete value
static std::optional<Float64> tryConvertToFloat64(const Field & field);
static std::optional<String> tryConvertToString(const Field & field);
};
/// Statistics describe properties of the values in the column,
/// e.g. how many unique values exist,
/// what are the N most frequent values,
@ -34,8 +43,8 @@ public:
/// Per-value estimations.
/// Throws if the statistics object is not able to do a meaningful estimation.
virtual Float64 estimateEqual(Float64 val) const; /// cardinality of val in the column
virtual Float64 estimateLess(Float64 val) const; /// summarized cardinality of values < val in the column
virtual Float64 estimateEqual(const Field & val) const; /// cardinality of val in the column
virtual Float64 estimateLess(const Field & val) const; /// summarized cardinality of values < val in the column
protected:
SingleStatisticsDescription stat;
@ -58,9 +67,9 @@ public:
void update(const ColumnPtr & column);
Float64 estimateLess(Float64 val) const;
Float64 estimateGreater(Float64 val) const;
Float64 estimateEqual(Float64 val) const;
Float64 estimateLess(const Field & val) const;
Float64 estimateGreater(const Field & val) const;
Float64 estimateEqual(const Field & val) const;
private:
friend class MergeTreeStatisticsFactory;

View File

@ -0,0 +1,102 @@
#include <Storages/Statistics/StatisticsCountMinSketch.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/convertFieldToType.h>
#if USE_DATASKETCHES
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_STATISTICS;
}
/// Constants chosen based on rolling dices.
/// The values provides:
/// 1. an error tolerance of 0.1% (ε = 0.001)
/// 2. a confidence level of 99.9% (δ = 0.001).
/// And sketch the size is 152kb.
static constexpr auto num_hashes = 7uz;
static constexpr auto num_buckets = 2718uz;
StatisticsCountMinSketch::StatisticsCountMinSketch(const SingleStatisticsDescription & stat_, DataTypePtr data_type_)
: IStatistics(stat_)
, sketch(num_hashes, num_buckets)
, data_type(data_type_)
{
}
Float64 StatisticsCountMinSketch::estimateEqual(const Field & val) const
{
/// Try to convert field to data_type. Converting string to proper data types such as: number, date, datetime, IPv4, Decimal etc.
/// Return null if val larger than the range of data_type
///
/// For example: if data_type is Int32:
/// 1. For 1.0, 1, '1', return Field(1)
/// 2. For 1.1, max_value_int64, return null
Field val_converted = convertFieldToType(val, *data_type);
if (val_converted.isNull())
return 0;
if (data_type->isValueRepresentedByNumber())
return sketch.get_estimate(&val_converted, data_type->getSizeOfValueInMemory());
if (isStringOrFixedString(data_type))
return sketch.get_estimate(val.get<String>());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Statistics 'count_min' does not support estimate data type of {}", data_type->getName());
}
void StatisticsCountMinSketch::update(const ColumnPtr & column)
{
for (size_t row = 0; row < column->size(); ++row)
{
if (column->isNullAt(row))
continue;
auto data = column->getDataAt(row);
sketch.update(data.data, data.size, 1);
}
}
void StatisticsCountMinSketch::serialize(WriteBuffer & buf)
{
Sketch::vector_bytes bytes = sketch.serialize();
writeIntBinary(static_cast<UInt64>(bytes.size()), buf);
buf.write(reinterpret_cast<const char *>(bytes.data()), bytes.size());
}
void StatisticsCountMinSketch::deserialize(ReadBuffer & buf)
{
UInt64 size;
readIntBinary(size, buf);
Sketch::vector_bytes bytes;
bytes.resize(size); /// To avoid 'container-overflow' in AddressSanitizer checking
buf.readStrict(reinterpret_cast<char *>(bytes.data()), size);
sketch = Sketch::deserialize(bytes.data(), size);
}
void countMinSketchValidator(const SingleStatisticsDescription &, DataTypePtr data_type)
{
data_type = removeNullable(data_type);
data_type = removeLowCardinalityAndNullable(data_type);
if (!data_type->isValueRepresentedByNumber() && !isStringOrFixedString(data_type))
throw Exception(ErrorCodes::ILLEGAL_STATISTICS, "Statistics of type 'count_min' does not support type {}", data_type->getName());
}
StatisticsPtr countMinSketchCreator(const SingleStatisticsDescription & stat, DataTypePtr data_type)
{
return std::make_shared<StatisticsCountMinSketch>(stat, data_type);
}
}
#endif

View File

@ -0,0 +1,39 @@
#pragma once
#include <Storages/Statistics/Statistics.h>
#include "config.h"
#if USE_DATASKETCHES
#include <count_min.hpp>
namespace DB
{
class StatisticsCountMinSketch : public IStatistics
{
public:
StatisticsCountMinSketch(const SingleStatisticsDescription & stat_, DataTypePtr data_type_);
Float64 estimateEqual(const Field & val) const override;
void update(const ColumnPtr & column) override;
void serialize(WriteBuffer & buf) override;
void deserialize(ReadBuffer & buf) override;
private:
using Sketch = datasketches::count_min_sketch<UInt64>;
Sketch sketch;
DataTypePtr data_type;
};
void countMinSketchValidator(const SingleStatisticsDescription &, DataTypePtr data_type);
StatisticsPtr countMinSketchCreator(const SingleStatisticsDescription & stat, DataTypePtr);
}
#endif

View File

@ -1,11 +1,13 @@
#include <Storages/Statistics/StatisticsTDigest.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_STATISTICS;
extern const int ILLEGAL_STATISTICS;
extern const int LOGICAL_ERROR;
}
StatisticsTDigest::StatisticsTDigest(const SingleStatisticsDescription & stat_)
@ -16,12 +18,16 @@ StatisticsTDigest::StatisticsTDigest(const SingleStatisticsDescription & stat_)
void StatisticsTDigest::update(const ColumnPtr & column)
{
size_t rows = column->size();
for (size_t row = 0; row < rows; ++row)
{
/// TODO: support more types.
Float64 value = column->getFloat64(row);
t_digest.add(value, 1);
Field field;
column->get(row, field);
if (field.isNull())
continue;
if (auto field_as_float = StatisticsUtils::tryConvertToFloat64(field))
t_digest.add(*field_as_float, 1);
}
}
@ -35,24 +41,31 @@ void StatisticsTDigest::deserialize(ReadBuffer & buf)
t_digest.deserialize(buf);
}
Float64 StatisticsTDigest::estimateLess(Float64 val) const
Float64 StatisticsTDigest::estimateLess(const Field & val) const
{
return t_digest.getCountLessThan(val);
auto val_as_float = StatisticsUtils::tryConvertToFloat64(val);
if (val_as_float)
return t_digest.getCountLessThan(*val_as_float);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Statistics 'tdigest' does not support estimating value of type {}", val.getTypeName());
}
Float64 StatisticsTDigest::estimateEqual(Float64 val) const
Float64 StatisticsTDigest::estimateEqual(const Field & val) const
{
return t_digest.getCountEqual(val);
auto val_as_float = StatisticsUtils::tryConvertToFloat64(val);
if (val_as_float)
return t_digest.getCountEqual(*val_as_float);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Statistics 'tdigest' does not support estimating value of type {}", val.getTypeName());
}
void TDigestValidator(const SingleStatisticsDescription &, DataTypePtr data_type)
void tdigestValidator(const SingleStatisticsDescription &, DataTypePtr data_type)
{
data_type = removeNullable(data_type);
data_type = removeLowCardinalityAndNullable(data_type);
if (!data_type->isValueRepresentedByNumber())
throw Exception(ErrorCodes::ILLEGAL_STATISTICS, "Statistics of type 'tdigest' do not support type {}", data_type->getName());
}
StatisticsPtr TDigestCreator(const SingleStatisticsDescription & stat, DataTypePtr)
StatisticsPtr tdigestCreator(const SingleStatisticsDescription & stat, DataTypePtr)
{
return std::make_shared<StatisticsTDigest>(stat);
}

View File

@ -16,14 +16,14 @@ public:
void serialize(WriteBuffer & buf) override;
void deserialize(ReadBuffer & buf) override;
Float64 estimateLess(Float64 val) const override;
Float64 estimateEqual(Float64 val) const override;
Float64 estimateLess(const Field & val) const override;
Float64 estimateEqual(const Field & val) const override;
private:
QuantileTDigest<Float64> t_digest;
};
void TDigestValidator(const SingleStatisticsDescription &, DataTypePtr data_type);
StatisticsPtr TDigestCreator(const SingleStatisticsDescription & stat, DataTypePtr);
void tdigestValidator(const SingleStatisticsDescription &, DataTypePtr data_type);
StatisticsPtr tdigestCreator(const SingleStatisticsDescription & stat, DataTypePtr);
}

View File

@ -1,6 +1,7 @@
#include <Storages/Statistics/StatisticsUniq.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
@ -51,14 +52,15 @@ UInt64 StatisticsUniq::estimateCardinality() const
return column->getUInt(0);
}
void UniqValidator(const SingleStatisticsDescription &, DataTypePtr data_type)
void uniqValidator(const SingleStatisticsDescription &, DataTypePtr data_type)
{
data_type = removeNullable(data_type);
data_type = removeLowCardinalityAndNullable(data_type);
if (!data_type->isValueRepresentedByNumber())
throw Exception(ErrorCodes::ILLEGAL_STATISTICS, "Statistics of type 'uniq' do not support type {}", data_type->getName());
}
StatisticsPtr UniqCreator(const SingleStatisticsDescription & stat, DataTypePtr data_type)
StatisticsPtr uniqCreator(const SingleStatisticsDescription & stat, DataTypePtr data_type)
{
return std::make_shared<StatisticsUniq>(stat, data_type);
}

View File

@ -27,7 +27,7 @@ private:
};
void UniqValidator(const SingleStatisticsDescription &, DataTypePtr data_type);
StatisticsPtr UniqCreator(const SingleStatisticsDescription & stat, DataTypePtr data_type);
void uniqValidator(const SingleStatisticsDescription &, DataTypePtr data_type);
StatisticsPtr uniqCreator(const SingleStatisticsDescription & stat, DataTypePtr data_type);
}

View File

@ -1,6 +1,10 @@
#include <gtest/gtest.h>
#include <Storages/Statistics/StatisticsTDigest.h>
#include <Interpreters/convertFieldToType.h>
#include <DataTypes/DataTypeFactory.h>
using namespace DB;
TEST(Statistics, TDigestLessThan)
{
@ -39,6 +43,4 @@ TEST(Statistics, TDigestLessThan)
std::reverse(data.begin(), data.end());
test_less_than(data, {-1, 1e9, 50000.0, 3000.0, 30.0}, {0, 100000, 50000, 3000, 30}, {0, 0, 0.001, 0.001, 0.001});
}

View File

@ -1,19 +1,14 @@
#include <Storages/StatisticsDescription.h>
#include <base/defines.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTStatisticsDeclaration.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ParserCreateQuery.h>
#include <Poco/Logger.h>
#include <Storages/extractKeyExpressionList.h>
#include <Storages/ColumnsDescription.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -54,7 +49,9 @@ static StatisticsType stringToStatisticsType(String type)
return StatisticsType::TDigest;
if (type == "uniq")
return StatisticsType::Uniq;
throw Exception(ErrorCodes::INCORRECT_QUERY, "Unknown statistics type: {}. Supported statistics types are `tdigest` and `uniq`.", type);
if (type == "count_min")
return StatisticsType::CountMinSketch;
throw Exception(ErrorCodes::INCORRECT_QUERY, "Unknown statistics type: {}. Supported statistics types are 'tdigest', 'uniq' and 'count_min'.", type);
}
String SingleStatisticsDescription::getTypeName() const
@ -65,8 +62,10 @@ String SingleStatisticsDescription::getTypeName() const
return "TDigest";
case StatisticsType::Uniq:
return "Uniq";
case StatisticsType::CountMinSketch:
return "count_min";
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown statistics type: {}. Supported statistics types are `tdigest` and `uniq`.", type);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown statistics type: {}. Supported statistics types are 'tdigest', 'uniq' and 'count_min'.", type);
}
}
@ -99,10 +98,9 @@ void ColumnStatisticsDescription::merge(const ColumnStatisticsDescription & othe
chassert(merging_column_type);
if (column_name.empty())
{
column_name = merging_column_name;
data_type = merging_column_type;
}
data_type = merging_column_type;
for (const auto & [stats_type, stats_desc]: other.types_to_desc)
{
@ -121,6 +119,7 @@ void ColumnStatisticsDescription::assign(const ColumnStatisticsDescription & oth
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot assign statistics from column {} to {}", column_name, other.column_name);
types_to_desc = other.types_to_desc;
data_type = other.data_type;
}
void ColumnStatisticsDescription::clear()
@ -159,6 +158,7 @@ std::vector<ColumnStatisticsDescription> ColumnStatisticsDescription::fromAST(co
const auto & column = columns.getPhysical(physical_column_name);
stats.column_name = column.name;
stats.data_type = column.type;
stats.types_to_desc = statistics_types;
result.push_back(stats);
}

View File

@ -13,6 +13,7 @@ enum class StatisticsType : UInt8
{
TDigest = 0,
Uniq = 1,
CountMinSketch = 2,
Max = 63,
};

View File

@ -3940,7 +3940,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
merge_selecting_task->schedule();
else
{
LOG_TRACE(log, "Scheduling next merge selecting task after {}ms, current attempt status: {}", merge_selecting_sleep_ms, result);
LOG_TRACE(log, "Scheduling next merge selecting task after {}ms", merge_selecting_sleep_ms);
merge_selecting_task->scheduleAfter(merge_selecting_sleep_ms);
}
}

View File

@ -7,7 +7,7 @@
<query>SELECT sum(NOT ignore(*)) FROM (SELECT * FROM generateRandom('d Date, dt DateTime, dtm DateTime(\'Asia/Istanbul\')', 0, 10, 10) LIMIT 1000000000);</query>
<query>SELECT sum(NOT ignore(*)) FROM (SELECT * FROM generateRandom('dt64 DateTime64, dts64 DateTime64(6), dtms64 DateTime64(6 ,\'Asia/Istanbul\')', 0, 10, 10) LIMIT 100000000);</query>
<query>SELECT sum(NOT ignore(*)) FROM (SELECT * FROM generateRandom('f32 Float32, f64 Float64', 0, 10, 10) LIMIT 1000000000);</query>
<query>SELECT sum(NOT ignore(*)) FROM (SELECT * FROM generateRandom('d32 Decimal32(4), d64 Decimal64(8), d128 Decimal128(16)', 0, 10, 10) LIMIT 1000000000);</query>
<query>SELECT sum(NOT ignore(*)) FROM (SELECT * FROM generateRandom('d32 Decimal32(4), d64 Decimal64(8), d128 Decimal128(16)', 0, 10, 10) LIMIT 100000000);</query>
<query>SELECT sum(NOT ignore(*)) FROM (SELECT * FROM generateRandom('i Tuple(Int32, Int64)', 0, 10, 10) LIMIT 1000000000);</query>
<query>SELECT sum(NOT ignore(*)) FROM (SELECT * FROM generateRandom('i Array(Int8)', 0, 10, 10) LIMIT 100000000);</query>
<query>SELECT sum(NOT ignore(*)) FROM (SELECT * FROM generateRandom('i Array(Nullable(Int32))', 0, 10, 10) LIMIT 100000000);</query>

View File

@ -41,7 +41,7 @@ function thread3()
function thread4()
{
while true; do $CLICKHOUSE_CLIENT --receive_timeout=1 -q "OPTIMIZE TABLE alter_table0 FINAL" | grep -Fv "Timeout exceeded while receiving data from server"; done
while true; do $CLICKHOUSE_CLIENT --receive_timeout=3 -q "OPTIMIZE TABLE alter_table0 FINAL" | grep -Fv "Timeout exceeded while receiving data from server"; done
}
function thread5()

View File

@ -0,0 +1,14 @@
CREATE TABLE default.tab\n(\n `a` String,\n `b` UInt64,\n `c` Int64,\n `pk` String\n)\nENGINE = MergeTree\nORDER BY pk\nSETTINGS min_bytes_for_wide_part = 0, index_granularity = 8192
Test statistics count_min:
Prewhere info
Prewhere filter
Prewhere filter column: and(equals(a, \'0\'), equals(b, 0), equals(c, 0)) (removed)
Test statistics multi-types:
Prewhere info
Prewhere filter
Prewhere filter column: and(equals(a, \'0\'), less(c, -90), greater(b, 900)) (removed)
Prewhere info
Prewhere filter
Prewhere filter column: and(equals(a, \'10000\'), equals(b, 0), less(c, 0)) (removed)
Test LowCardinality and Nullable data type:
tab2

View File

@ -0,0 +1,70 @@
-- Tags: no-fasttest
DROP TABLE IF EXISTS tab SYNC;
SET allow_experimental_statistics = 1;
SET allow_statistics_optimize = 1;
SET allow_suspicious_low_cardinality_types=1;
SET mutations_sync = 2;
CREATE TABLE tab
(
a String,
b UInt64,
c Int64,
pk String,
) Engine = MergeTree() ORDER BY pk
SETTINGS min_bytes_for_wide_part = 0;
SHOW CREATE TABLE tab;
INSERT INTO tab select toString(number % 10000), number % 1000, -(number % 100), generateUUIDv4() FROM system.numbers LIMIT 10000;
SELECT 'Test statistics count_min:';
ALTER TABLE tab ADD STATISTICS a TYPE count_min;
ALTER TABLE tab ADD STATISTICS b TYPE count_min;
ALTER TABLE tab ADD STATISTICS c TYPE count_min;
ALTER TABLE tab MATERIALIZE STATISTICS a, b, c;
SELECT replaceRegexpAll(explain, '__table1.|_UInt8|_Int8|_UInt16|_String', '')
FROM (EXPLAIN actions=1 SELECT count(*) FROM tab WHERE c = 0/*100*/ and b = 0/*10*/ and a = '0'/*1*/) xx
WHERE explain LIKE '%Prewhere%' OR explain LIKE '%Filter column%';
ALTER TABLE tab DROP STATISTICS a, b, c;
SELECT 'Test statistics multi-types:';
ALTER TABLE tab ADD STATISTICS a TYPE count_min;
ALTER TABLE tab ADD STATISTICS b TYPE count_min, uniq, tdigest;
ALTER TABLE tab ADD STATISTICS c TYPE count_min, uniq, tdigest;
ALTER TABLE tab MATERIALIZE STATISTICS a, b, c;
SELECT replaceRegexpAll(explain, '__table1.|_UInt8|_Int8|_UInt16|_String', '')
FROM (EXPLAIN actions=1 SELECT count(*) FROM tab WHERE c < -90/*900*/ and b > 900/*990*/ and a = '0'/*1*/)
WHERE explain LIKE '%Prewhere%' OR explain LIKE '%Filter column%';
SELECT replaceRegexpAll(explain, '__table1.|_UInt8|_Int8|_UInt16|_String', '')
FROM (EXPLAIN actions=1 SELECT count(*) FROM tab WHERE c < 0/*9900*/ and b = 0/*10*/ and a = '10000'/*0*/)
WHERE explain LIKE '%Prewhere%' OR explain LIKE '%Filter column%';
ALTER TABLE tab DROP STATISTICS a, b, c;
DROP TABLE IF EXISTS tab SYNC;
SELECT 'Test LowCardinality and Nullable data type:';
DROP TABLE IF EXISTS tab2 SYNC;
SET allow_suspicious_low_cardinality_types=1;
CREATE TABLE tab2
(
a LowCardinality(Int64) STATISTICS(count_min),
b Nullable(Int64) STATISTICS(count_min),
c LowCardinality(Nullable(Int64)) STATISTICS(count_min),
pk String,
) Engine = MergeTree() ORDER BY pk;
select name from system.tables where name = 'tab2' and database = currentDatabase();
DROP TABLE IF EXISTS tab2 SYNC;

View File

@ -70,3 +70,4 @@ SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO t3 select number, -number, number/1000, generateUUIDv4() FROM system.numbers LIMIT 10000;
DROP TABLE IF EXISTS t3;

View File

@ -2,8 +2,6 @@
# Tags: atomic-database
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# reset --log_comment
CLICKHOUSE_LOG_COMMENT=
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
@ -134,7 +132,7 @@ while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshe
do
sleep 0.1
done
sleep 1
$CLICKHOUSE_CLIENT -nq "
select '<14: waiting for next cycle>', view, status, remaining_dependencies, next_refresh_time from refreshes;
truncate src;
@ -172,13 +170,13 @@ $CLICKHOUSE_CLIENT -nq "
drop table b;
create materialized view c refresh every 1 second (x Int64) engine Memory empty as select * from src;
drop table src;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Exception' ]
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes where view = 'c' -- $LINENO" | xargs`" != 'Exception' ]
do
sleep 0.1
done
# Check exception, create src, expect successful refresh.
$CLICKHOUSE_CLIENT -nq "
select '<19: exception>', exception ilike '%UNKNOWN_TABLE%' from refreshes;
select '<19: exception>', exception ilike '%UNKNOWN_TABLE%' ? '1' : exception from refreshes where view = 'c';
create table src (x Int64) engine Memory as select 1;
system refresh view c;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ]
@ -224,22 +222,27 @@ done
$CLICKHOUSE_CLIENT -nq "
rename table e to f;
select '<24: rename during refresh>', * from f;
select '<25: rename during refresh>', view, status from refreshes;
select '<25: rename during refresh>', view, status from refreshes where view = 'f';
alter table f modify refresh after 10 year;"
sleep 2 # make it likely that at least one row was processed
# Cancel.
$CLICKHOUSE_CLIENT -nq "
system cancel view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Cancelled' ]
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes where view = 'f' -- $LINENO" | xargs`" != 'Cancelled' ]
do
sleep 0.1
done
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'f' -- $LINENO" | xargs`" = 'Running' ]
do
sleep 0.1
done
# Check that another refresh doesn't immediately start after the cancelled one.
sleep 1
$CLICKHOUSE_CLIENT -nq "
select '<27: cancelled>', view, status from refreshes;
select '<27: cancelled>', view, status from refreshes where view = 'f';
system refresh view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes -- $LINENO" | xargs`" != 'Running' ]
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'f' -- $LINENO" | xargs`" != 'Running' ]
do
sleep 0.1
done

View File

@ -0,0 +1 @@
SELECT number FROM numbers(2, 1) WHERE number % 2 = 0 SETTINGS max_rows_to_read = 10;

View File

@ -51,14 +51,6 @@ function check_replication_consistency()
table_name_prefix=$1
check_query_part=$2
# Try to kill some mutations because sometimes tests run too much (it's not guarenteed to kill all mutations, see below)
# Try multiple replicas, because queries are not finished yet, and "global" KILL MUTATION may fail due to another query (like DROP TABLE)
readarray -t tables_arr < <(${CLICKHOUSE_CLIENT} -q "SELECT name FROM system.tables WHERE database=currentDatabase() AND name like '$table_name_prefix%'")
for t in "${tables_arr[@]}"
do
${CLICKHOUSE_CLIENT} -q "KILL MUTATION WHERE database=currentDatabase() AND table='$t'" > /dev/null 2>/dev/null
done
# Wait for all queries to finish (query may still be running if thread is killed by timeout)
num_tries=0
while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%'") -ne 1 ]]; do
@ -104,7 +96,7 @@ function check_replication_consistency()
some_table=$($CLICKHOUSE_CLIENT -q "SELECT name FROM system.tables WHERE database=currentDatabase() AND name like '$table_name_prefix%' ORDER BY rand() LIMIT 1")
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA $some_table PULL" 1>/dev/null 2>/dev/null ||:
# Forcefully cancel mutations to avoid waiting for them to finish. Kills the remaining mutations
# Forcefully cancel mutations to avoid waiting for them to finish
${CLICKHOUSE_CLIENT} -q "KILL MUTATION WHERE database=currentDatabase() AND table like '$table_name_prefix%'" > /dev/null
# SYNC REPLICA is not enough if some MUTATE_PARTs are not assigned yet