mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
dbms: add quantile[s]Deterministic, rename stdext:: to ext:: [#METR-13199]
This commit is contained in:
parent
24f3f80b9f
commit
d0971956e0
@ -33,7 +33,7 @@ template <typename ArgumentFieldType, bool returns_float = true>
|
||||
class AggregateFunctionQuantile final : public IUnaryAggregateFunction<AggregateFunctionQuantileData<ArgumentFieldType>, AggregateFunctionQuantile<ArgumentFieldType, returns_float> >
|
||||
{
|
||||
private:
|
||||
typedef ReservoirSampler<ArgumentFieldType, ReservoirSamplerOnEmpty::RETURN_NAN_OR_ZERO> Sample;
|
||||
using Sample = typename AggregateFunctionQuantileData<ArgumentFieldType>::Sample;
|
||||
|
||||
double level;
|
||||
DataTypePtr type;
|
||||
@ -108,7 +108,7 @@ template <typename ArgumentFieldType, bool returns_float = true>
|
||||
class AggregateFunctionQuantiles final : public IUnaryAggregateFunction<AggregateFunctionQuantileData<ArgumentFieldType>, AggregateFunctionQuantiles<ArgumentFieldType, returns_float> >
|
||||
{
|
||||
private:
|
||||
typedef ReservoirSampler<ArgumentFieldType, ReservoirSamplerOnEmpty::RETURN_NAN_OR_ZERO> Sample;
|
||||
using Sample = typename AggregateFunctionQuantileData<ArgumentFieldType>::Sample;
|
||||
|
||||
typedef std::vector<double> Levels;
|
||||
Levels levels;
|
||||
|
@ -0,0 +1,212 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/AggregateFunctions/ReservoirSamplerDeterministic.h>
|
||||
|
||||
#include <DB/IO/WriteHelpers.h>
|
||||
#include <DB/IO/ReadHelpers.h>
|
||||
|
||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||
#include <DB/DataTypes/DataTypeArray.h>
|
||||
|
||||
#include <DB/AggregateFunctions/IBinaryAggregateFunction.h>
|
||||
|
||||
#include <DB/Columns/ColumnArray.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
template <typename ArgumentFieldType>
|
||||
struct AggregateFunctionQuantileDeterministicData
|
||||
{
|
||||
using Sample = ReservoirSamplerDeterministic<ArgumentFieldType, ReservoirSamplerDeterministicOnEmpty::RETURN_NAN_OR_ZERO>;
|
||||
Sample sample; /// TODO Добавить MemoryTracker
|
||||
};
|
||||
|
||||
|
||||
/** Приближённо вычисляет квантиль.
|
||||
* В качестве типа аргумента может быть только числовой тип (в том числе, дата и дата-с-временем).
|
||||
* Если returns_float = true, то типом результата будет Float64, иначе - тип результата совпадает с типом аргумента.
|
||||
* Для дат и дат-с-временем returns_float следует задавать равным false.
|
||||
*/
|
||||
template <typename ArgumentFieldType, bool returns_float = true>
|
||||
class AggregateFunctionQuantileDeterministic final
|
||||
: public IBinaryAggregateFunction<
|
||||
AggregateFunctionQuantileDeterministicData<ArgumentFieldType>,
|
||||
AggregateFunctionQuantileDeterministic<ArgumentFieldType, returns_float>>
|
||||
{
|
||||
private:
|
||||
using Sample = typename AggregateFunctionQuantileDeterministicData<ArgumentFieldType>::Sample;
|
||||
|
||||
double level;
|
||||
DataTypePtr type;
|
||||
|
||||
public:
|
||||
AggregateFunctionQuantileDeterministic(double level_ = 0.5) : level(level_) {}
|
||||
|
||||
String getName() const { return "quantileDeterministic"; }
|
||||
|
||||
DataTypePtr getReturnType() const
|
||||
{
|
||||
return type;
|
||||
}
|
||||
|
||||
void setArgumentsImpl(const DataTypes & arguments)
|
||||
{
|
||||
type = returns_float ? new DataTypeFloat64 : arguments[0];
|
||||
|
||||
if (!arguments[1]->isNumeric())
|
||||
throw Exception{
|
||||
"Invalid type of second argument to function " + getName() +
|
||||
", got " + arguments[1]->getName() + ", expected numeric",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
|
||||
};
|
||||
}
|
||||
|
||||
void setParameters(const Array & params)
|
||||
{
|
||||
if (params.size() != 1)
|
||||
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
level = apply_visitor(FieldVisitorConvertToNumber<Float64>(), params[0]);
|
||||
}
|
||||
|
||||
|
||||
void addOne(AggregateDataPtr place, const IColumn & column, const IColumn & determinator, size_t row_num) const
|
||||
{
|
||||
this->data(place).sample.insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num],
|
||||
determinator.get64(row_num));
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
|
||||
{
|
||||
this->data(place).sample.merge(this->data(rhs).sample);
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
|
||||
{
|
||||
this->data(place).sample.write(buf);
|
||||
}
|
||||
|
||||
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
|
||||
{
|
||||
Sample tmp_sample;
|
||||
tmp_sample.read(buf);
|
||||
this->data(place).sample.merge(tmp_sample);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
|
||||
{
|
||||
/// Sample может отсортироваться при получении квантиля, но в этом контексте можно не считать это нарушением константности.
|
||||
Sample & sample = const_cast<Sample &>(this->data(place).sample);
|
||||
|
||||
if (returns_float)
|
||||
static_cast<ColumnFloat64 &>(to).getData().push_back(sample.quantileInterpolated(level));
|
||||
else
|
||||
static_cast<ColumnVector<ArgumentFieldType> &>(to).getData().push_back(sample.quantileInterpolated(level));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/** То же самое, но позволяет вычислить сразу несколько квантилей.
|
||||
* Для этого, принимает в качестве параметров несколько уровней. Пример: quantiles(0.5, 0.8, 0.9, 0.95)(ConnectTiming).
|
||||
* Возвращает массив результатов.
|
||||
*/
|
||||
template <typename ArgumentFieldType, bool returns_float = true>
|
||||
class AggregateFunctionQuantilesDeterministic final
|
||||
: public IBinaryAggregateFunction<
|
||||
AggregateFunctionQuantileDeterministicData<ArgumentFieldType>,
|
||||
AggregateFunctionQuantilesDeterministic<ArgumentFieldType, returns_float>>
|
||||
{
|
||||
private:
|
||||
using Sample = typename AggregateFunctionQuantileDeterministicData<ArgumentFieldType>::Sample;
|
||||
|
||||
typedef std::vector<double> Levels;
|
||||
Levels levels;
|
||||
DataTypePtr type;
|
||||
|
||||
public:
|
||||
String getName() const { return "quantilesDeterministic"; }
|
||||
|
||||
DataTypePtr getReturnType() const
|
||||
{
|
||||
return new DataTypeArray(type);
|
||||
}
|
||||
|
||||
void setArgumentsImpl(const DataTypes & arguments)
|
||||
{
|
||||
type = returns_float ? new DataTypeFloat64 : arguments[0];
|
||||
|
||||
if (!arguments[1]->isNumeric())
|
||||
throw Exception{
|
||||
"Invalid type of second argument to function " + getName() +
|
||||
", got " + arguments[1]->getName() + ", expected numeric",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
|
||||
};
|
||||
}
|
||||
|
||||
void setParameters(const Array & params)
|
||||
{
|
||||
if (params.empty())
|
||||
throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
size_t size = params.size();
|
||||
levels.resize(size);
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
levels[i] = apply_visitor(FieldVisitorConvertToNumber<Float64>(), params[i]);
|
||||
}
|
||||
|
||||
|
||||
void addOne(AggregateDataPtr place, const IColumn & column, const IColumn & determinator, size_t row_num) const
|
||||
{
|
||||
this->data(place).sample.insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num],
|
||||
determinator.get64(row_num));
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
|
||||
{
|
||||
this->data(place).sample.merge(this->data(rhs).sample);
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
|
||||
{
|
||||
this->data(place).sample.write(buf);
|
||||
}
|
||||
|
||||
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
|
||||
{
|
||||
Sample tmp_sample;
|
||||
tmp_sample.read(buf);
|
||||
this->data(place).sample.merge(tmp_sample);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
|
||||
{
|
||||
/// Sample может отсортироваться при получении квантиля, но в этом контексте можно не считать это нарушением константности.
|
||||
Sample & sample = const_cast<Sample &>(this->data(place).sample);
|
||||
|
||||
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
|
||||
ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets();
|
||||
|
||||
size_t size = levels.size();
|
||||
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size);
|
||||
|
||||
if (returns_float)
|
||||
{
|
||||
ColumnFloat64::Container_t & data_to = static_cast<ColumnFloat64 &>(arr_to.getData()).getData();
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
data_to.push_back(sample.quantileInterpolated(levels[i]));
|
||||
}
|
||||
else
|
||||
{
|
||||
typename ColumnVector<ArgumentFieldType>::Container_t & data_to = static_cast<ColumnVector<ArgumentFieldType> &>(arr_to.getData()).getData();
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
data_to.push_back(sample.quantileInterpolated(levels[i]));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/AggregateFunctions/IAggregateFunction.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
template <typename T, typename Derived>
|
||||
class IBinaryAggregateFunction : public IAggregateFunctionHelper<T>
|
||||
{
|
||||
Derived & getDerived() { return static_cast<Derived &>(*this); }
|
||||
const Derived & getDerived() const { return static_cast<const Derived &>(*this); }
|
||||
|
||||
public:
|
||||
void setArguments(const DataTypes & arguments)
|
||||
{
|
||||
if (arguments.size() != 2)
|
||||
throw Exception{
|
||||
"Passed " + toString(arguments.size()) + " arguments to binary aggregate function " + this->getName(),
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
|
||||
};
|
||||
|
||||
getDerived().setArgumentsImpl(arguments);
|
||||
}
|
||||
|
||||
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num) const
|
||||
{
|
||||
getDerived().addOne(place, *columns[0], *columns[1], row_num);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
@ -0,0 +1,210 @@
|
||||
#pragma once
|
||||
|
||||
#include <limits>
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#include <climits>
|
||||
#include <sstream>
|
||||
#include <stats/ReservoirSampler.h>
|
||||
#include <Yandex/Common.h>
|
||||
#include <DB/IO/ReadBuffer.h>
|
||||
#include <DB/IO/ReadHelpers.h>
|
||||
#include <DB/IO/WriteHelpers.h>
|
||||
#include <Poco/Exception.h>
|
||||
#include <boost/random.hpp>
|
||||
|
||||
|
||||
/// Реализация алгоритма Reservoir Sampling. Инкрементально выбирает из добавленных объектов случайное подмножество размера sample_count.
|
||||
/// Умеет приближенно получать квантили.
|
||||
/// Вызов quantile занимает O(sample_count log sample_count), если после предыдущего вызова quantile был хотя бы один вызов insert. Иначе, O(1).
|
||||
/// То есть, имеет смысл сначала добавлять, потом получать квантили, не добавляя.
|
||||
|
||||
namespace detail
|
||||
{
|
||||
const size_t DEFAULT_SAMPLE_COUNT = 8192;
|
||||
}
|
||||
|
||||
/// Что делать, если нет ни одного значения - кинуть исключение, или вернуть 0 или NaN в случае double?
|
||||
enum class ReservoirSamplerDeterministicOnEmpty
|
||||
{
|
||||
THROW,
|
||||
RETURN_NAN_OR_ZERO,
|
||||
};
|
||||
|
||||
template<typename T,
|
||||
ReservoirSamplerDeterministicOnEmpty OnEmpty = ReservoirSamplerDeterministicOnEmpty::THROW>
|
||||
class ReservoirSamplerDeterministic
|
||||
{
|
||||
bool good(const UInt32 hash)
|
||||
{
|
||||
return hash == ((hash >> skip_degree) << skip_degree);
|
||||
}
|
||||
|
||||
public:
|
||||
ReservoirSamplerDeterministic(const size_t sample_count = DEFAULT_SAMPLE_COUNT)
|
||||
: sample_count{sample_count}
|
||||
{
|
||||
}
|
||||
|
||||
void clear()
|
||||
{
|
||||
samples.clear();
|
||||
sorted = false;
|
||||
total_values = 0;
|
||||
}
|
||||
|
||||
void insert(const T & v, const UInt64 determinator)
|
||||
{
|
||||
const UInt32 hash = intHash64(determinator);
|
||||
if (!good(hash))
|
||||
return;
|
||||
|
||||
insertImpl(v, hash);
|
||||
sorted = false;
|
||||
++total_values;
|
||||
}
|
||||
|
||||
void insertImpl(const T & v, const UInt32 hash)
|
||||
{
|
||||
if (samples.size() == sample_count)
|
||||
{
|
||||
++skip_degree;
|
||||
thinOut();
|
||||
}
|
||||
|
||||
samples.emplace_back(v, hash);
|
||||
}
|
||||
|
||||
void thinOut()
|
||||
{
|
||||
auto size = samples.size();
|
||||
for (size_t i = 0; i < size;)
|
||||
{
|
||||
if (!good(samples[i].second))
|
||||
{
|
||||
/// swap current element with the last one
|
||||
std::swap(samples[size - 1], samples[i]);
|
||||
--size;
|
||||
}
|
||||
else
|
||||
++i;
|
||||
}
|
||||
|
||||
if (size != samples.size())
|
||||
{
|
||||
samples.resize(size);
|
||||
sorted = false;
|
||||
}
|
||||
}
|
||||
|
||||
size_t size() const
|
||||
{
|
||||
return total_values;
|
||||
}
|
||||
|
||||
T quantileNearest(double level)
|
||||
{
|
||||
if (samples.empty())
|
||||
return onEmpty<T>();
|
||||
|
||||
sortIfNeeded();
|
||||
|
||||
double index = level * (samples.size() - 1);
|
||||
size_t int_index = static_cast<size_t>(index + 0.5);
|
||||
int_index = std::max(0LU, std::min(samples.size() - 1, int_index));
|
||||
return samples[int_index].first;
|
||||
}
|
||||
|
||||
/** Если T не числовой тип, использование этого метода вызывает ошибку компиляции,
|
||||
* но использование класса ошибки не вызывает. SFINAE.
|
||||
*/
|
||||
double quantileInterpolated(double level)
|
||||
{
|
||||
if (samples.empty())
|
||||
return onEmpty<double>();
|
||||
|
||||
sortIfNeeded();
|
||||
|
||||
const double index = std::max(0., std::min(samples.size() - 1., level * (samples.size() - 1)));
|
||||
|
||||
/// Чтобы получить значение по дробному индексу линейно интерполируем между соседними значениями.
|
||||
size_t left_index = static_cast<size_t>(index);
|
||||
size_t right_index = left_index + 1;
|
||||
if (right_index == samples.size())
|
||||
return samples[left_index].first;
|
||||
|
||||
const double left_coef = right_index - index;
|
||||
const double right_coef = index - left_index;
|
||||
|
||||
return samples[left_index].first * left_coef + samples[right_index].first * right_coef;
|
||||
}
|
||||
|
||||
void merge(const ReservoirSamplerDeterministic & b)
|
||||
{
|
||||
if (sample_count != b.sample_count)
|
||||
throw Poco::Exception("Cannot merge ReservoirSamplerDeterministic's with different sample_count");
|
||||
sorted = false;
|
||||
|
||||
if (b.skip_degree > skip_degree)
|
||||
{
|
||||
skip_degree = b.skip_degree;
|
||||
thinOut();
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < b.samples.size(); ++i)
|
||||
if (good(b.samples[i].second))
|
||||
insertImpl(b.samples[i].first, b.samples[i].second);
|
||||
|
||||
total_values += b.total_values;
|
||||
}
|
||||
|
||||
void read(DB::ReadBuffer & buf)
|
||||
{
|
||||
DB::readIntBinary<size_t>(sample_count, buf);
|
||||
DB::readIntBinary<size_t>(total_values, buf);
|
||||
samples.resize(std::min(total_values, sample_count));
|
||||
|
||||
for (size_t i = 0; i < samples.size(); ++i)
|
||||
DB::readBinary(samples[i].first, buf);
|
||||
|
||||
sorted = false;
|
||||
}
|
||||
|
||||
void write(DB::WriteBuffer & buf) const
|
||||
{
|
||||
DB::writeIntBinary<size_t>(sample_count, buf);
|
||||
DB::writeIntBinary<size_t>(total_values, buf);
|
||||
|
||||
for (size_t i = 0; i < std::min(sample_count, total_values); ++i)
|
||||
DB::writeBinary(samples[i].first, buf);
|
||||
}
|
||||
|
||||
private:
|
||||
friend void rs_perf_test();
|
||||
friend void qdigest_test(int, UInt64, const std::vector<UInt64> &, int, bool);
|
||||
|
||||
size_t sample_count;
|
||||
size_t total_values{};
|
||||
bool sorted{};
|
||||
std::vector<std::pair<T, UInt32>> samples;
|
||||
UInt8 skip_degree{};
|
||||
|
||||
void sortIfNeeded()
|
||||
{
|
||||
if (sorted)
|
||||
return;
|
||||
sorted = true;
|
||||
std::sort(samples.begin(), samples.end(), [] (const std::pair<T, UInt32> & lhs, const std::pair<T, UInt32> & rhs) {
|
||||
return lhs.first < rhs.first;
|
||||
});
|
||||
}
|
||||
|
||||
template <typename ResultType>
|
||||
ResultType onEmpty() const
|
||||
{
|
||||
if (OnEmpty == ReservoirSamplerDeterministicOnEmpty::THROW)
|
||||
throw Poco::Exception("Quantile of empty ReservoirSamplerDeterministic");
|
||||
else
|
||||
return NanLikeValueConstructor<ResultType, std::is_floating_point<ResultType>::value>::getValue();
|
||||
}
|
||||
};
|
@ -3,7 +3,7 @@
|
||||
#include <Poco/Mutex.h>
|
||||
|
||||
#include <statdaemons/OptimizedRegularExpression.h>
|
||||
#include <statdaemons/stdext.h>
|
||||
#include <statdaemons/ext/memory.hpp>
|
||||
|
||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||
#include <DB/DataTypes/DataTypeString.h>
|
||||
@ -313,7 +313,7 @@ namespace Regexps
|
||||
|
||||
auto it = known_regexps.find(pattern);
|
||||
if (known_regexps.end() == it)
|
||||
it = known_regexps.emplace(pattern, stdext::make_unique<Holder>()).first;
|
||||
it = known_regexps.emplace(pattern, ext::make_unique<Holder>()).first;
|
||||
|
||||
return it->second->get([&pattern] {
|
||||
return new Regexp{createRegexp<like>(pattern)};
|
||||
|
@ -16,7 +16,7 @@ inline void evaluateMissingDefaults(Block & block,
|
||||
if (column_defaults.empty())
|
||||
return;
|
||||
|
||||
ASTPtr default_expr_list{stdext::make_unique<ASTExpressionList>().release()};
|
||||
ASTPtr default_expr_list{ext::make_unique<ASTExpressionList>().release()};
|
||||
|
||||
for (const auto & column : required_columns)
|
||||
{
|
||||
|
@ -10,7 +10,7 @@
|
||||
#include <DB/Interpreters/InterpreterInsertQuery.h>
|
||||
|
||||
#include <statdaemons/Increment.h>
|
||||
#include <statdaemons/stdext.h>
|
||||
#include <statdaemons/ext/memory.hpp>
|
||||
|
||||
#include <iostream>
|
||||
#include <type_traits>
|
||||
|
@ -14,7 +14,7 @@
|
||||
#include <DB/Storages/AlterCommands.h>
|
||||
#include <Poco/File.h>
|
||||
#include <Poco/RWLock.h>
|
||||
#include <statdaemons/stdext.h>
|
||||
#include <statdaemons/ext/memory.hpp>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -125,7 +125,7 @@ public:
|
||||
*/
|
||||
TableDataWriteLockPtr lockDataForAlter()
|
||||
{
|
||||
auto res = stdext::make_unique<Poco::ScopedWriteRWLock>(data_lock);
|
||||
auto res = ext::make_unique<Poco::ScopedWriteRWLock>(data_lock);
|
||||
if (is_dropped)
|
||||
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
|
||||
return res;
|
||||
@ -133,7 +133,7 @@ public:
|
||||
|
||||
TableStructureWriteLockPtr lockStructureForAlter()
|
||||
{
|
||||
auto res = stdext::make_unique<Poco::ScopedWriteRWLock>(structure_lock);
|
||||
auto res = ext::make_unique<Poco::ScopedWriteRWLock>(structure_lock);
|
||||
if (is_dropped)
|
||||
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
|
||||
return res;
|
||||
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <statdaemons/Stopwatch.h>
|
||||
#include <statdaemons/stdext.h>
|
||||
#include <statdaemons/ext/memory.hpp>
|
||||
#include <list>
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
@ -67,7 +67,7 @@ public:
|
||||
EntryPtr insert(Args &&... args)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{mutex};
|
||||
return stdext::make_unique<Entry>(*this, merges.emplace(merges.end(), std::forward<Args>(args)...));
|
||||
return ext::make_unique<Entry>(*this, merges.emplace(merges.end(), std::forward<Args>(args)...));
|
||||
}
|
||||
|
||||
container_t get() const
|
||||
|
@ -9,7 +9,7 @@
|
||||
#include <DB/Parsers/ASTSubquery.h>
|
||||
#include <DB/Parsers/formatAST.h>
|
||||
#include <DB/Common/escapeForFileName.h>
|
||||
#include <statdaemons/stdext.h>
|
||||
#include <statdaemons/ext/memory.hpp>
|
||||
#include <unordered_map>
|
||||
#include <map>
|
||||
#include <limits>
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <DB/AggregateFunctions/AggregateFunctionGroupUniqArray.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionQuantile.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionQuantileTiming.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionQuantileDeterministic.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionIf.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionArray.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionState.h>
|
||||
@ -325,6 +326,50 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
|
||||
|
||||
return res;
|
||||
}
|
||||
else if (name == "quantileDeterministic")
|
||||
{
|
||||
if (argument_types.size() != 2)
|
||||
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
const IDataType & argument_type = *argument_types[0];
|
||||
|
||||
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionQuantileDeterministic<UInt8>;
|
||||
else if (typeid_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionQuantileDeterministic<UInt16>;
|
||||
else if (typeid_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionQuantileDeterministic<UInt32>;
|
||||
else if (typeid_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionQuantileDeterministic<UInt64>;
|
||||
else if (typeid_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionQuantileDeterministic<Int8>;
|
||||
else if (typeid_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionQuantileDeterministic<Int16>;
|
||||
else if (typeid_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionQuantileDeterministic<Int32>;
|
||||
else if (typeid_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionQuantileDeterministic<Int64>;
|
||||
else if (typeid_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionQuantileDeterministic<Float32>;
|
||||
else if (typeid_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionQuantileDeterministic<Float64>;
|
||||
else if (typeid_cast<const DataTypeDate *>(&argument_type)) return new AggregateFunctionQuantileDeterministic<DataTypeDate::FieldType, false>;
|
||||
else if (typeid_cast<const DataTypeDateTime*>(&argument_type)) return new AggregateFunctionQuantileDeterministic<DataTypeDateTime::FieldType, false>;
|
||||
else
|
||||
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
}
|
||||
else if (name == "quantilesDeterministic")
|
||||
{
|
||||
if (argument_types.size() != 2)
|
||||
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
const IDataType & argument_type = *argument_types[0];
|
||||
|
||||
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionQuantilesDeterministic<UInt8>;
|
||||
else if (typeid_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionQuantilesDeterministic<UInt16>;
|
||||
else if (typeid_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionQuantilesDeterministic<UInt32>;
|
||||
else if (typeid_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionQuantilesDeterministic<UInt64>;
|
||||
else if (typeid_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionQuantilesDeterministic<Int8>;
|
||||
else if (typeid_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionQuantilesDeterministic<Int16>;
|
||||
else if (typeid_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionQuantilesDeterministic<Int32>;
|
||||
else if (typeid_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionQuantilesDeterministic<Int64>;
|
||||
else if (typeid_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionQuantilesDeterministic<Float32>;
|
||||
else if (typeid_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionQuantilesDeterministic<Float64>;
|
||||
else if (typeid_cast<const DataTypeDate *>(&argument_type)) return new AggregateFunctionQuantilesDeterministic<DataTypeDate::FieldType, false>;
|
||||
else if (typeid_cast<const DataTypeDateTime*>(&argument_type)) return new AggregateFunctionQuantilesDeterministic<DataTypeDateTime::FieldType, false>;
|
||||
else
|
||||
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
}
|
||||
else if (recursion_level == 0 && name.size() > strlen("State") && !(strcmp(name.data() + name.size() - strlen("State"), "State")))
|
||||
{
|
||||
/// Для агрегатных функций вида aggState, где agg - имя другой агрегатной функции.
|
||||
@ -386,7 +431,7 @@ AggregateFunctionPtr AggregateFunctionFactory::tryGet(const String & name, const
|
||||
|
||||
bool AggregateFunctionFactory::isAggregateFunctionName(const String & name, int recursion_level) const
|
||||
{
|
||||
static const char * names[] =
|
||||
static const char * names[]
|
||||
{
|
||||
"count",
|
||||
"any",
|
||||
@ -409,7 +454,9 @@ bool AggregateFunctionFactory::isAggregateFunctionName(const String & name, int
|
||||
"medianTiming",
|
||||
"quantileTiming",
|
||||
"quantilesTiming",
|
||||
NULL
|
||||
"quantileDeterministic",
|
||||
"quantilesDeterministic",
|
||||
nullptr
|
||||
};
|
||||
|
||||
for (const char ** it = names; *it; ++it)
|
||||
|
@ -12,7 +12,7 @@
|
||||
|
||||
#include <DB/Parsers/ASTExpressionList.h>
|
||||
#include <DB/Interpreters/ExpressionAnalyzer.h>
|
||||
#include <statdaemons/stdext.h>
|
||||
#include <statdaemons/ext/memory.hpp>
|
||||
|
||||
#include <DB/Parsers/formatAST.h>
|
||||
|
||||
|
@ -5,7 +5,7 @@
|
||||
|
||||
#include <Yandex/ApplicationServerExt.h>
|
||||
#include <statdaemons/ConfigProcessor.h>
|
||||
#include <statdaemons/stdext.h>
|
||||
#include <statdaemons/ext/memory.hpp>
|
||||
|
||||
#include <DB/Interpreters/loadMetadata.h>
|
||||
#include <DB/Storages/StorageSystemNumbers.h>
|
||||
@ -383,7 +383,7 @@ int Server::main(const std::vector<std::string> & args)
|
||||
global_context->setMacros(Macros(config(), "macros"));
|
||||
|
||||
std::string users_config_path = config().getString("users_config", config().getString("config-file", "config.xml"));
|
||||
auto users_config_reloader = stdext::make_unique<UsersConfigReloader>(users_config_path, global_context.get());
|
||||
auto users_config_reloader = ext::make_unique<UsersConfigReloader>(users_config_path, global_context.get());
|
||||
|
||||
/// Максимальное количество одновременно выполняющихся запросов.
|
||||
global_context->getProcessList().setMaxSize(config().getInt("max_concurrent_queries", 0));
|
||||
@ -428,7 +428,7 @@ int Server::main(const std::vector<std::string> & args)
|
||||
|
||||
{
|
||||
const auto profile_events_transmitter = config().getBool("use_graphite", true)
|
||||
? stdext::make_unique<ProfileEventsTransmitter>()
|
||||
? ext::make_unique<ProfileEventsTransmitter>()
|
||||
: nullptr;
|
||||
|
||||
const std::string listen_host = config().getString("listen_host", "::");
|
||||
|
@ -348,7 +348,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
|
||||
{
|
||||
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
|
||||
|
||||
auto input = stdext::make_unique<MergeTreeBlockInputStream>(
|
||||
auto input = ext::make_unique<MergeTreeBlockInputStream>(
|
||||
data.getFullPath() + parts[i]->name + '/', DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data,
|
||||
parts[i], ranges, false, nullptr, "");
|
||||
|
||||
@ -372,19 +372,19 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
|
||||
switch (data.mode)
|
||||
{
|
||||
case MergeTreeData::Ordinary:
|
||||
merged_stream = stdext::make_unique<MergingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
|
||||
merged_stream = ext::make_unique<MergingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
|
||||
break;
|
||||
|
||||
case MergeTreeData::Collapsing:
|
||||
merged_stream = stdext::make_unique<CollapsingSortedBlockInputStream>(src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
|
||||
merged_stream = ext::make_unique<CollapsingSortedBlockInputStream>(src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
|
||||
break;
|
||||
|
||||
case MergeTreeData::Summing:
|
||||
merged_stream = stdext::make_unique<SummingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
|
||||
merged_stream = ext::make_unique<SummingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
|
||||
break;
|
||||
|
||||
case MergeTreeData::Aggregating:
|
||||
merged_stream = stdext::make_unique<AggregatingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
|
||||
merged_stream = ext::make_unique<AggregatingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -12,7 +12,7 @@
|
||||
|
||||
#include <DB/Core/Field.h>
|
||||
|
||||
#include <statdaemons/stdext.h>
|
||||
#include <statdaemons/ext/memory.hpp>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -228,7 +228,7 @@ bool StorageDistributed::hasColumn(const String & column_name) const
|
||||
|
||||
void StorageDistributed::createDirectoryMonitor(const std::string & name)
|
||||
{
|
||||
directory_monitors.emplace(name, stdext::make_unique<DirectoryMonitor>(*this, name));
|
||||
directory_monitors.emplace(name, ext::make_unique<DirectoryMonitor>(*this, name));
|
||||
}
|
||||
|
||||
void StorageDistributed::createDirectoryMonitors()
|
||||
|
Loading…
Reference in New Issue
Block a user