mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 16:50:48 +00:00
Merge remote-tracking branch 'main/master' into h3-integration
This commit is contained in:
commit
df4bd3eec6
@ -77,6 +77,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
|
||||
throw Exception("Logical error: cannot find aggregate function combinator to apply a function to Nullable arguments.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
DataTypes nested_types = combinator->transformArguments(type_without_low_cardinality);
|
||||
Array nested_parameters = combinator->transformParameters(parameters);
|
||||
|
||||
AggregateFunctionPtr nested_function;
|
||||
|
||||
@ -84,7 +85,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
|
||||
/// Combinator will check if nested_function was created.
|
||||
if (name == "count" || std::none_of(argument_types.begin(), argument_types.end(),
|
||||
[](const auto & type) { return type->onlyNull(); }))
|
||||
nested_function = getImpl(name, nested_types, parameters, recursion_level);
|
||||
nested_function = getImpl(name, nested_types, nested_parameters, recursion_level);
|
||||
|
||||
return combinator->transformAggregateFunction(nested_function, argument_types, parameters);
|
||||
}
|
||||
@ -126,7 +127,10 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
|
||||
|
||||
String nested_name = name.substr(0, name.size() - combinator->getName().size());
|
||||
DataTypes nested_types = combinator->transformArguments(argument_types);
|
||||
AggregateFunctionPtr nested_function = get(nested_name, nested_types, parameters, recursion_level + 1);
|
||||
Array nested_parameters = combinator->transformParameters(parameters);
|
||||
|
||||
AggregateFunctionPtr nested_function = get(nested_name, nested_types, nested_parameters, recursion_level + 1);
|
||||
|
||||
return combinator->transformAggregateFunction(nested_function, argument_types, parameters);
|
||||
}
|
||||
|
||||
|
99
dbms/src/AggregateFunctions/AggregateFunctionResample.cpp
Normal file
99
dbms/src/AggregateFunctions/AggregateFunctionResample.cpp
Normal file
@ -0,0 +1,99 @@
|
||||
#include <AggregateFunctions/AggregateFunctionResample.h>
|
||||
|
||||
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
class AggregateFunctionCombinatorResample final : public IAggregateFunctionCombinator
|
||||
{
|
||||
public:
|
||||
String getName() const override
|
||||
{
|
||||
return "Resample";
|
||||
}
|
||||
|
||||
DataTypes transformArguments(const DataTypes & arguments) const override
|
||||
{
|
||||
if (arguments.empty())
|
||||
throw Exception("Incorrect number of arguments for aggregate function with "
|
||||
+ getName() + " suffix",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
return DataTypes(arguments.begin(), arguments.end() - 1);
|
||||
}
|
||||
|
||||
Array transformParameters(const Array & params) const override
|
||||
{
|
||||
if (params.size() < 3)
|
||||
throw Exception("Incorrect number of parameters for aggregate function with "
|
||||
+ getName() + " suffix",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
return Array(params.begin(), params.end() - 3);
|
||||
}
|
||||
|
||||
AggregateFunctionPtr transformAggregateFunction(
|
||||
const AggregateFunctionPtr & nested_function,
|
||||
const DataTypes & arguments,
|
||||
const Array & params) const override
|
||||
{
|
||||
WhichDataType which{arguments.back()};
|
||||
|
||||
if (which.isNativeUInt() || which.isDateOrDateTime())
|
||||
{
|
||||
UInt64 begin = params[params.size() - 3].safeGet<UInt64>();
|
||||
UInt64 end = params[params.size() - 2].safeGet<UInt64>();
|
||||
|
||||
UInt64 step = params[params.size() - 1].safeGet<UInt64>();
|
||||
|
||||
return std::make_shared<AggregateFunctionResample<UInt64>>(
|
||||
nested_function,
|
||||
begin,
|
||||
end,
|
||||
step,
|
||||
arguments,
|
||||
params);
|
||||
}
|
||||
|
||||
if (which.isNativeInt() || which.isEnum() || which.isInterval())
|
||||
{
|
||||
Int64 begin, end;
|
||||
|
||||
// notice: UInt64 -> Int64 may lead to overflow
|
||||
if (!params[params.size() - 3].tryGet<Int64>(begin))
|
||||
begin = params[params.size() - 3].safeGet<UInt64>();
|
||||
if (!params[params.size() - 2].tryGet<Int64>(end))
|
||||
end = params[params.size() - 2].safeGet<UInt64>();
|
||||
|
||||
UInt64 step = params[params.size() - 1].safeGet<UInt64>();
|
||||
|
||||
return std::make_shared<AggregateFunctionResample<Int64>>(
|
||||
nested_function,
|
||||
begin,
|
||||
end,
|
||||
step,
|
||||
arguments,
|
||||
params);
|
||||
}
|
||||
|
||||
throw Exception(
|
||||
"Illegal types of argument for aggregate function " + getName()
|
||||
+ ", the type of the last argument should be native integer or integer-like",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
}
|
||||
};
|
||||
|
||||
void registerAggregateFunctionCombinatorResample(AggregateFunctionCombinatorFactory & factory)
|
||||
{
|
||||
factory.registerCombinator(std::make_shared<AggregateFunctionCombinatorResample>());
|
||||
}
|
||||
|
||||
}
|
182
dbms/src/AggregateFunctions/AggregateFunctionResample.h
Normal file
182
dbms/src/AggregateFunctions/AggregateFunctionResample.h
Normal file
@ -0,0 +1,182 @@
|
||||
#pragma once
|
||||
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||
}
|
||||
|
||||
template <typename Key>
|
||||
class AggregateFunctionResample final : public IAggregateFunctionHelper<AggregateFunctionResample<Key>>
|
||||
{
|
||||
private:
|
||||
const size_t MAX_ELEMENTS = 4096;
|
||||
|
||||
AggregateFunctionPtr nested_function;
|
||||
|
||||
size_t last_col;
|
||||
|
||||
Key begin;
|
||||
Key end;
|
||||
size_t step;
|
||||
|
||||
size_t total;
|
||||
size_t aod;
|
||||
size_t sod;
|
||||
|
||||
public:
|
||||
AggregateFunctionResample(
|
||||
AggregateFunctionPtr nested_function,
|
||||
Key begin,
|
||||
Key end,
|
||||
size_t step,
|
||||
const DataTypes & arguments,
|
||||
const Array & params)
|
||||
: IAggregateFunctionHelper<AggregateFunctionResample<Key>>{arguments, params}
|
||||
, nested_function{nested_function}
|
||||
, last_col{arguments.size() - 1}
|
||||
, begin{begin}
|
||||
, end{end}
|
||||
, step{step}
|
||||
, total{0}
|
||||
, aod{nested_function->alignOfData()}
|
||||
, sod{(nested_function->sizeOfData() + aod - 1) / aod * aod}
|
||||
{
|
||||
// notice: argument types has been checked before
|
||||
if (step == 0)
|
||||
throw Exception("The step given in function "
|
||||
+ getName() + " should not be zero",
|
||||
ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||
|
||||
if (end < begin)
|
||||
total = 0;
|
||||
else
|
||||
total = (end - begin + step - 1) / step;
|
||||
|
||||
if (total > MAX_ELEMENTS)
|
||||
throw Exception("The range given in function "
|
||||
+ getName() + " contains too many elements",
|
||||
ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||
}
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
return nested_function->getName() + "Resample";
|
||||
}
|
||||
|
||||
const char * getHeaderFilePath() const override
|
||||
{
|
||||
return __FILE__;
|
||||
}
|
||||
|
||||
bool isState() const override
|
||||
{
|
||||
return nested_function->isState();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override
|
||||
{
|
||||
return nested_function->allocatesMemoryInArena();
|
||||
}
|
||||
|
||||
bool hasTrivialDestructor() const override
|
||||
{
|
||||
return nested_function->hasTrivialDestructor();
|
||||
}
|
||||
|
||||
size_t sizeOfData() const override
|
||||
{
|
||||
return total * sod;
|
||||
}
|
||||
|
||||
size_t alignOfData() const override
|
||||
{
|
||||
return aod;
|
||||
}
|
||||
|
||||
void create(AggregateDataPtr place) const override
|
||||
{
|
||||
for (size_t i = 0; i < total; ++i)
|
||||
nested_function->create(place + i * sod);
|
||||
}
|
||||
|
||||
void destroy(AggregateDataPtr place) const noexcept override
|
||||
{
|
||||
for (size_t i = 0; i < total; ++i)
|
||||
nested_function->destroy(place + i * sod);
|
||||
}
|
||||
|
||||
void add(
|
||||
AggregateDataPtr place,
|
||||
const IColumn ** columns,
|
||||
size_t row_num,
|
||||
Arena * arena) const override
|
||||
{
|
||||
Key key;
|
||||
|
||||
if constexpr (static_cast<Key>(-1) < 0)
|
||||
key = columns[last_col]->getInt(row_num);
|
||||
else
|
||||
key = columns[last_col]->getUInt(row_num);
|
||||
|
||||
if (key < begin || key >= end)
|
||||
return;
|
||||
|
||||
size_t pos = (key - begin) / step;
|
||||
|
||||
nested_function->add(place + pos * sod, columns, row_num, arena);
|
||||
}
|
||||
|
||||
void merge(
|
||||
AggregateDataPtr place,
|
||||
ConstAggregateDataPtr rhs,
|
||||
Arena * arena) const override
|
||||
{
|
||||
for (size_t i = 0; i < total; ++i)
|
||||
nested_function->merge(place + i * sod, rhs + i * sod, arena);
|
||||
}
|
||||
|
||||
void serialize(
|
||||
ConstAggregateDataPtr place,
|
||||
WriteBuffer & buf) const override
|
||||
{
|
||||
for (size_t i = 0; i < total; ++i)
|
||||
nested_function->serialize(place + i * sod, buf);
|
||||
}
|
||||
|
||||
void deserialize(
|
||||
AggregateDataPtr place,
|
||||
ReadBuffer & buf,
|
||||
Arena * arena) const override
|
||||
{
|
||||
for (size_t i = 0; i < total; ++i)
|
||||
nested_function->deserialize(place + i * sod, buf, arena);
|
||||
}
|
||||
|
||||
DataTypePtr getReturnType() const override
|
||||
{
|
||||
return std::make_shared<DataTypeArray>(nested_function->getReturnType());
|
||||
}
|
||||
|
||||
void insertResultInto(
|
||||
ConstAggregateDataPtr place,
|
||||
IColumn & to) const override
|
||||
{
|
||||
auto & col = static_cast<ColumnArray &>(to);
|
||||
auto & col_offsets = static_cast<ColumnArray::ColumnOffsets &>(col.getOffsetsColumn());
|
||||
|
||||
for (size_t i = 0; i < total; ++i)
|
||||
nested_function->insertResultInto(place + i * sod, col.getData());
|
||||
|
||||
col_offsets.getData().push_back(col.getData().size());
|
||||
}
|
||||
};
|
||||
|
||||
}
|
@ -38,7 +38,19 @@ public:
|
||||
* get the arguments for nested function (ex: UInt64 for sum).
|
||||
* If arguments are not suitable for combined function, throw an exception.
|
||||
*/
|
||||
virtual DataTypes transformArguments(const DataTypes & arguments) const = 0;
|
||||
virtual DataTypes transformArguments(const DataTypes & arguments) const
|
||||
{
|
||||
return arguments;
|
||||
}
|
||||
|
||||
/** From the parameters for combined function,
|
||||
* get the parameters for nested function.
|
||||
* If arguments are not suitable for combined function, throw an exception.
|
||||
*/
|
||||
virtual Array transformParameters(const Array & parameters) const
|
||||
{
|
||||
return parameters;
|
||||
}
|
||||
|
||||
/** Create combined aggregate function (ex: sumIf)
|
||||
* from nested function (ex: sum)
|
||||
|
@ -39,6 +39,7 @@ void registerAggregateFunctionCombinatorForEach(AggregateFunctionCombinatorFacto
|
||||
void registerAggregateFunctionCombinatorState(AggregateFunctionCombinatorFactory &);
|
||||
void registerAggregateFunctionCombinatorMerge(AggregateFunctionCombinatorFactory &);
|
||||
void registerAggregateFunctionCombinatorNull(AggregateFunctionCombinatorFactory &);
|
||||
void registerAggregateFunctionCombinatorResample(AggregateFunctionCombinatorFactory &);
|
||||
|
||||
void registerAggregateFunctionHistogram(AggregateFunctionFactory & factory);
|
||||
void registerAggregateFunctionRetention(AggregateFunctionFactory & factory);
|
||||
@ -87,6 +88,7 @@ void registerAggregateFunctions()
|
||||
registerAggregateFunctionCombinatorState(factory);
|
||||
registerAggregateFunctionCombinatorMerge(factory);
|
||||
registerAggregateFunctionCombinatorNull(factory);
|
||||
registerAggregateFunctionCombinatorResample(factory);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include "IPv6ToBinary.h"
|
||||
#include <Poco/Net/IPAddress.h>
|
||||
#include <cstring>
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -211,6 +211,8 @@ size_t IMergedBlockOutputStream::writeSingleGranule(
|
||||
return from_row + number_of_rows;
|
||||
}
|
||||
|
||||
/// column must not be empty. (column.size() !== 0)
|
||||
|
||||
std::pair<size_t, size_t> IMergedBlockOutputStream::writeColumn(
|
||||
const String & name,
|
||||
const IDataType & type,
|
||||
|
@ -43,6 +43,10 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
size_t rows = block.rows();
|
||||
if (!rows)
|
||||
return;
|
||||
|
||||
size_t new_index_offset = 0;
|
||||
size_t new_current_mark = 0;
|
||||
WrittenOffsetColumns offset_columns = already_written_offset_columns;
|
||||
|
@ -0,0 +1,16 @@
|
||||
[11,12,13,14,15,16]
|
||||
[27,31,17]
|
||||
[39,48,18]
|
||||
[39,48,18]
|
||||
[0,0,0,0,0,0]
|
||||
[0.5,0.5,0]
|
||||
[0.816496580927726,0.816496580927726,0]
|
||||
[0.816496580927726,0.816496580927726,0]
|
||||
[[11],[12],[13],[14],[15],[16]]
|
||||
[[13,14],[15,16],[17]]
|
||||
[[12,13,14],[15,16,17],[18]]
|
||||
[[12,13,14],[15,16,17],[18]]
|
||||
[1,1,1,1,1,1]
|
||||
[2,2,1]
|
||||
[3,3,1]
|
||||
[3,3,1]
|
16
dbms/tests/queries/0_stateless/00954_resample_combinator.sql
Normal file
16
dbms/tests/queries/0_stateless/00954_resample_combinator.sql
Normal file
@ -0,0 +1,16 @@
|
||||
select arrayReduce('sumResample(1, 7, 1)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('sumResample(3, 8, 2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('sumResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('sumResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [-0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('stddevPopResample(1, 7, 1)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('stddevPopResample(3, 8, 2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('stddevPopResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('stddevPopResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [-0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('groupArrayResample(1, 7, 1)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('groupArrayResample(3, 8, 2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('groupArrayResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('groupArrayResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [-0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('uniqResample(1, 7, 1)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('uniqResample(3, 8, 2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('uniqResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
select arrayReduce('uniqResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [-0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
@ -9,5 +9,6 @@
|
||||
"docker/test/stateful": "yandex/clickhouse-stateful-test",
|
||||
"docker/test/stateless": "yandex/clickhouse-stateless-test",
|
||||
"docker/test/unit": "yandex/clickhouse-unit-test",
|
||||
"docker/test/stress": "yandex/clickhouse-stress-test",
|
||||
"dbms/tests/integration/image": "yandex/clickhouse-integration-tests-runner"
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
FROM yandex/clickhouse-deb-builder
|
||||
# docker build -t yandex/clickhouse-stress-test .
|
||||
FROM yandex/clickhouse-stateful-test
|
||||
|
||||
RUN apt-get update -y \
|
||||
&& env DEBIAN_FRONTEND=noninteractive \
|
||||
@ -21,9 +22,10 @@ RUN apt-get update -y \
|
||||
llvm-8 \
|
||||
brotli
|
||||
|
||||
|
||||
COPY ./stress /stress
|
||||
|
||||
ENV DATASETS="hits visits"
|
||||
|
||||
CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
|
||||
dpkg -i package_folder/clickhouse-common-static-dbg_*.deb; \
|
||||
dpkg -i package_folder/clickhouse-server_*.deb; \
|
||||
@ -39,4 +41,15 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
|
||||
echo "UBSAN_SYMBOLIZER_PATH=/usr/lib/llvm-6.0/bin/llvm-symbolizer" >> /etc/environment; \
|
||||
echo "TSAN_SYMBOLIZER_PATH=/usr/lib/llvm-6.0/bin/llvm-symbolizer" >> /etc/environment; \
|
||||
echo "LLVM_SYMBOLIZER_PATH=/usr/lib/llvm-6.0/bin/llvm-symbolizer" >> /etc/environment; \
|
||||
service clickhouse-server start && sleep 1 && ./stress --output-folder test_output
|
||||
service clickhouse-server start && sleep 5 \
|
||||
&& /s3downloader --dataset-names $DATASETS \
|
||||
&& chmod 777 -R /var/lib/clickhouse \
|
||||
&& clickhouse-client --query "CREATE DATABASE IF NOT EXISTS datasets" \
|
||||
&& clickhouse-client --query "CREATE DATABASE IF NOT EXISTS test" \
|
||||
&& service clickhouse-server restart && sleep 5 \
|
||||
&& clickhouse-client --query "SHOW TABLES FROM datasets" \
|
||||
&& clickhouse-client --query "SHOW TABLES FROM test" \
|
||||
&& clickhouse-client --query "RENAME TABLE datasets.hits_v1 TO test.hits" \
|
||||
&& clickhouse-client --query "RENAME TABLE datasets.visits_v1 TO test.visits" \
|
||||
&& clickhouse-client --query "SHOW TABLES FROM test" \
|
||||
&& ./stress --output-folder test_output
|
||||
|
@ -36,7 +36,7 @@ ClickHouse在其他Yandex服务中至少有12个安装:search verticals, Marke
|
||||
- 用户不会查看我们为他生成的所有报告,大部分计算将是无用的
|
||||
- 各种聚合可能违背了数据的逻辑完整性
|
||||
|
||||
如果我们直接使用非聚合数据而不尽兴任何聚合时,我们的计算量可能是减少的。
|
||||
如果我们直接使用非聚合数据而不进行任何聚合时,我们的计算量可能是减少的。
|
||||
|
||||
然而,相对于聚合中很大一部分工作被离线完成,在线计算需要尽快的完成计算,因为用户在等待结果。
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user