Merge pull request #41107 from Avogar/improve-combinators

Support all combinators combination in WindowTransform/arratReduce*/initializeAggregation/aggregate functions versioning
This commit is contained in:
Kruglov Pavel 2022-10-18 15:24:49 +02:00 committed by GitHub
commit 25e13bdd2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 267 additions and 105 deletions

View File

@ -156,6 +156,11 @@ public:
nested_func->insertResultInto(place, to, arena);
}
void insertMergeResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
nested_func->insertMergeResultInto(place, to, arena);
}
bool allocatesMemoryInArena() const override
{
return nested_func->allocatesMemoryInArena();

View File

@ -196,7 +196,8 @@ public:
this->data(place).deserialize(buf, arena);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
template <bool MergeResult>
void insertResultIntoImpl(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const
{
auto arguments = this->data(place).getArguments(this->argument_types);
ColumnRawPtrs arguments_raw(arguments.size());
@ -205,7 +206,20 @@ public:
assert(!arguments.empty());
nested_func->addBatchSinglePlace(0, arguments[0]->size(), getNestedPlace(place), arguments_raw.data(), arena);
nested_func->insertResultInto(getNestedPlace(place), to, arena);
if constexpr (MergeResult)
nested_func->insertMergeResultInto(getNestedPlace(place), to, arena);
else
nested_func->insertResultInto(getNestedPlace(place), to, arena);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<false>(place, to, arena);
}
void insertMergeResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<true>(place, to, arena);
}
size_t sizeOfData() const override

View File

@ -257,7 +257,8 @@ public:
}
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
template <bool merge>
void insertResultIntoImpl(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const
{
AggregateFunctionForEachData & state = data(place);
@ -268,13 +269,26 @@ public:
char * nested_state = state.array_of_aggregate_datas;
for (size_t i = 0; i < state.dynamic_array_size; ++i)
{
nested_func->insertResultInto(nested_state, elems_to, arena);
if constexpr (merge)
nested_func->insertMergeResultInto(nested_state, elems_to, arena);
else
nested_func->insertResultInto(nested_state, elems_to, arena);
nested_state += nested_size_of_data;
}
offsets_to.push_back(offsets_to.back() + state.dynamic_array_size);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<false>(place, to, arena);
}
void insertMergeResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<true>(place, to, arena);
}
bool allocatesMemoryInArena() const override
{
return true;

View File

@ -183,6 +183,11 @@ public:
nested_func->insertResultInto(place, to, arena);
}
void insertMergeResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
nested_func->insertMergeResultInto(place, to, arena);
}
bool allocatesMemoryInArena() const override
{
return nested_func->allocatesMemoryInArena();

View File

@ -264,7 +264,8 @@ public:
}
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
template <bool merge>
void insertResultIntoImpl(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const
{
auto & map_column = assert_cast<ColumnMap &>(to);
auto & nested_column = map_column.getNestedColumn();
@ -288,13 +289,26 @@ public:
for (auto & key : keys)
{
key_column.insert(key);
nested_func->insertResultInto(merged_maps[key], val_column, arena);
if constexpr (merge)
nested_func->insertMergeResultInto(merged_maps[key], val_column, arena);
else
nested_func->insertResultInto(merged_maps[key], val_column, arena);
}
IColumn::Offsets & res_offsets = nested_column.getOffsets();
res_offsets.push_back(val_column.size());
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<false>(place, to, arena);
}
void insertMergeResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<true>(place, to, arena);
}
bool allocatesMemoryInArena() const override { return true; }
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }

View File

@ -163,14 +163,18 @@ public:
}
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
template <bool merge>
void insertResultIntoImpl(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const
{
if constexpr (result_is_nullable)
{
ColumnNullable & to_concrete = assert_cast<ColumnNullable &>(to);
if (getFlag(place))
{
nested_function->insertResultInto(nestedPlace(place), to_concrete.getNestedColumn(), arena);
if constexpr (merge)
nested_function->insertMergeResultInto(nestedPlace(place), to_concrete.getNestedColumn(), arena);
else
nested_function->insertResultInto(nestedPlace(place), to_concrete.getNestedColumn(), arena);
to_concrete.getNullMapData().push_back(0);
}
else
@ -180,10 +184,23 @@ public:
}
else
{
nested_function->insertResultInto(nestedPlace(place), to, arena);
if constexpr (merge)
nested_function->insertMergeResultInto(nestedPlace(place), to, arena);
else
nested_function->insertResultInto(nestedPlace(place), to, arena);
}
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<false>(place, to, arena);
}
void insertMergeResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<true>(place, to, arena);
}
bool allocatesMemoryInArena() const override
{
return nested_function->allocatesMemoryInArena();

View File

@ -265,10 +265,11 @@ public:
}
}
void insertResultInto(
template <bool merge>
void insertResultIntoImpl(
AggregateDataPtr __restrict place,
IColumn & to,
Arena * arena) const override
Arena * arena) const
{
if (place[size_of_data])
{
@ -277,7 +278,12 @@ public:
// -OrNull
if (inner_nullable)
nested_function->insertResultInto(place, to, arena);
{
if constexpr (merge)
nested_function->insertMergeResultInto(place, to, arena);
else
nested_function->insertResultInto(place, to, arena);
}
else
{
ColumnNullable & col = typeid_cast<ColumnNullable &>(to);
@ -289,14 +295,26 @@ public:
else
{
// -OrDefault
nested_function->insertResultInto(place, to, arena);
if constexpr (merge)
nested_function->insertMergeResultInto(place, to, arena);
else
nested_function->insertResultInto(place, to, arena);
}
}
else
to.insertDefault();
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<false>(place, to, arena);
}
void insertMergeResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<true>(place, to, arena);
}
AggregateFunctionPtr getNestedFunction() const override { return nested_function; }
};

View File

@ -195,17 +195,33 @@ public:
return std::make_shared<DataTypeArray>(nested_function->getReturnType());
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
template <bool merge>
void insertResultIntoImpl(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const
{
auto & col = assert_cast<ColumnArray &>(to);
auto & col_offsets = assert_cast<ColumnArray::ColumnOffsets &>(col.getOffsetsColumn());
for (size_t i = 0; i < total; ++i)
nested_function->insertResultInto(place + i * size_of_data, col.getData(), arena);
{
if constexpr (merge)
nested_function->insertMergeResultInto(place + i * size_of_data, col.getData(), arena);
else
nested_function->insertResultInto(place + i * size_of_data, col.getData(), arena);
}
col_offsets.getData().push_back(col.getData().size());
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<false>(place, to, arena);
}
void insertMergeResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<true>(place, to, arena);
}
AggregateFunctionPtr getNestedFunction() const override { return nested_function; }
};

View File

@ -111,6 +111,11 @@ public:
assert_cast<ColumnAggregateFunction &>(to).getData().push_back(place);
}
void insertMergeResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
assert_cast<ColumnAggregateFunction &>(to).insertFrom(place);
}
/// Aggregate function or aggregate function state.
bool isState() const override { return true; }

View File

@ -164,6 +164,18 @@ public:
/// window function.
virtual void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const = 0;
/// Special method for aggregate functions with -State combinator, it behaves the same way as insertResultInto,
/// but if we need to insert AggregateData into ColumnAggregateFunction we use special method
/// insertInto that inserts default value and then performs merge with provided AggregateData
/// instead of just copying pointer to this AggregateData. Used in WindowTransform.
virtual void insertMergeResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const
{
if (isState())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Function {} is marked as State but method insertMergeResultInto is not implemented");
insertResultInto(place, to, arena);
}
/// Used for machine learning methods. Predict result from trained model.
/// Will insert result into `to` column for rows in range [offset, offset + limit).
virtual void predictValues(

View File

@ -10,6 +10,7 @@
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/Serializations/SerializationAggregateFunction.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/transformTypesRecursively.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
@ -241,6 +242,23 @@ static DataTypePtr create(const ASTPtr & arguments)
return std::make_shared<DataTypeAggregateFunction>(function, argument_types, params_row, version);
}
void setVersionToAggregateFunctions(DataTypePtr & type, bool if_empty, std::optional<size_t> revision)
{
auto callback = [revision, if_empty](DataTypePtr & column_type)
{
const auto * aggregate_function_type = typeid_cast<const DataTypeAggregateFunction *>(column_type.get());
if (aggregate_function_type && aggregate_function_type->isVersioned())
{
if (revision)
aggregate_function_type->updateVersionFromRevision(*revision, if_empty);
else
aggregate_function_type->setVersion(0, if_empty);
}
};
callOnNestedSimpleTypes(type, callback);
}
void registerDataTypeAggregateFunction(DataTypeFactory & factory)
{

View File

@ -70,8 +70,6 @@ public:
bool isVersioned() const { return function->isVersioned(); }
size_t getVersionFromRevision(size_t revision) const { return function->getVersionFromRevision(revision); }
/// Version is not empty only if it was parsed from AST or implicitly cast to 0 or version according
/// to server revision.
/// It is ok to have an empty version value here - then for serialization a default (latest)
@ -84,6 +82,13 @@ public:
version = version_;
}
void updateVersionFromRevision(size_t revision, bool if_empty) const
{
setVersion(function->getVersionFromRevision(revision), if_empty);
}
};
void setVersionToAggregateFunctions(DataTypePtr & type, bool if_empty, std::optional<size_t> revision = std::nullopt);
}

View File

@ -175,4 +175,10 @@ void transformTypesRecursively(DataTypes & types, std::function<void(DataTypes &
transform_simple_types(types);
}
void callOnNestedSimpleTypes(DataTypePtr & type, std::function<void(DataTypePtr &)> callback)
{
DataTypes types = {type};
transformTypesRecursively(types, [callback](auto & data_types){ callback(data_types[0]); }, {});
}
}

View File

@ -14,4 +14,6 @@ namespace DB
/// Function transform_complex_types will be applied to complex types (Array/Map/Tuple) after recursive call to their nested types.
void transformTypesRecursively(DataTypes & types, std::function<void(DataTypes &)> transform_simple_types, std::function<void(DataTypes &)> transform_complex_types);
void callOnNestedSimpleTypes(DataTypePtr & type, std::function<void(DataTypePtr &)> callback);
}

View File

@ -145,12 +145,7 @@ Block NativeReader::read()
readBinary(type_name, istr);
column.type = data_type_factory.get(type_name);
const auto * aggregate_function_data_type = typeid_cast<const DataTypeAggregateFunction *>(column.type.get());
if (aggregate_function_data_type && aggregate_function_data_type->isVersioned())
{
auto version = aggregate_function_data_type->getVersionFromRevision(server_revision);
aggregate_function_data_type->setVersion(version, /*if_empty=*/ true);
}
setVersionToAggregateFunctions(column.type, true, server_revision);
SerializationPtr serialization;
if (server_revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION)

View File

@ -11,9 +11,8 @@
#include <Formats/NativeWriter.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/NestedUtils.h>
#include <Columns/ColumnSparse.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeAggregateFunction.h>
namespace DB
@ -116,19 +115,7 @@ void NativeWriter::write(const Block & block)
writeStringBinary(column.name, ostr);
bool include_version = client_revision >= DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING;
const auto * aggregate_function_data_type = typeid_cast<const DataTypeAggregateFunction *>(column.type.get());
if (aggregate_function_data_type && aggregate_function_data_type->isVersioned())
{
if (include_version)
{
auto version = aggregate_function_data_type->getVersionFromRevision(client_revision);
aggregate_function_data_type->setVersion(version, /* if_empty */true);
}
else
{
aggregate_function_data_type->setVersion(0, /* if_empty */false);
}
}
setVersionToAggregateFunctions(column.type, include_version, include_version ? std::optional<size_t>(client_revision) : std::nullopt);
/// Type
String type_name = column.type->getName();

View File

@ -152,13 +152,6 @@ ColumnPtr FunctionArrayReduce::executeImpl(const ColumnsWithTypeAndName & argume
MutableColumnPtr result_holder = result_type->createColumn();
IColumn & res_col = *result_holder;
/// AggregateFunction's states should be inserted into column using specific way
auto * res_col_aggregate_function = typeid_cast<ColumnAggregateFunction *>(&res_col);
if (!res_col_aggregate_function && agg_func.isState())
throw Exception("State function " + agg_func.getName() + " inserts results into non-state column "
+ result_type->getName(), ErrorCodes::ILLEGAL_COLUMN);
PODArray<AggregateDataPtr> places(input_rows_count);
for (size_t i = 0; i < input_rows_count; ++i)
{
@ -190,10 +183,9 @@ ColumnPtr FunctionArrayReduce::executeImpl(const ColumnsWithTypeAndName & argume
}
for (size_t i = 0; i < input_rows_count; ++i)
if (!res_col_aggregate_function)
agg_func.insertResultInto(places[i], res_col, arena.get());
else
res_col_aggregate_function->insertFrom(places[i]);
/// We should use insertMergeResultInto to insert result into ColumnAggregateFunction
/// correctly if result contains AggregateFunction's states
agg_func.insertMergeResultInto(places[i], res_col, arena.get());
return result_holder;
}

View File

@ -202,13 +202,6 @@ ColumnPtr FunctionArrayReduceInRanges::executeImpl(
result_arr->getOffsets().insert(ranges_offsets->begin(), ranges_offsets->end());
/// AggregateFunction's states should be inserted into column using specific way
auto * res_col_aggregate_function = typeid_cast<ColumnAggregateFunction *>(&result_data);
if (!res_col_aggregate_function && agg_func.isState())
throw Exception("State function " + agg_func.getName() + " inserts results into non-state column "
+ result_type->getName(), ErrorCodes::ILLEGAL_COLUMN);
/// Perform the aggregation
size_t begin = 0;
@ -379,11 +372,9 @@ ColumnPtr FunctionArrayReduceInRanges::executeImpl(
for (size_t k = local_begin; k < local_end; ++k)
true_func->add(place, aggregate_arguments, begin + k, arena.get());
}
if (!res_col_aggregate_function)
agg_func.insertResultInto(place, result_data, arena.get());
else
res_col_aggregate_function->insertFrom(place);
/// We should use insertMergeResultInto to insert result into ColumnAggregateFunction
/// correctly if result contains AggregateFunction's states
agg_func.insertMergeResultInto(place, result_data, arena.get());
}
}

View File

@ -17,7 +17,6 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
}
@ -114,13 +113,6 @@ ColumnPtr FunctionInitializeAggregation::executeImpl(const ColumnsWithTypeAndNam
MutableColumnPtr result_holder = result_type->createColumn();
IColumn & res_col = *result_holder;
/// AggregateFunction's states should be inserted into column using specific way
auto * res_col_aggregate_function = typeid_cast<ColumnAggregateFunction *>(&res_col);
if (!res_col_aggregate_function && agg_func.isState())
throw Exception("State function " + agg_func.getName() + " inserts results into non-state column "
+ result_type->getName(), ErrorCodes::ILLEGAL_COLUMN);
PODArray<AggregateDataPtr> places(input_rows_count);
for (size_t i = 0; i < input_rows_count; ++i)
{
@ -151,10 +143,9 @@ ColumnPtr FunctionInitializeAggregation::executeImpl(const ColumnsWithTypeAndNam
}
for (size_t i = 0; i < input_rows_count; ++i)
if (!res_col_aggregate_function)
agg_func.insertResultInto(places[i], res_col, arena.get());
else
res_col_aggregate_function->insertFrom(places[i]);
/// We should use insertMergeResultInto to insert result into ColumnAggregateFunction
/// correctly if result contains AggregateFunction's states
agg_func.insertMergeResultInto(places[i], res_col, arena.get());
return result_holder;
}

View File

@ -12,17 +12,14 @@
#include <Common/hex.h>
#include <Core/Defines.h>
#include <Core/Settings.h>
#include <Core/SettingsEnums.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ParserCreateQuery.h>
@ -37,7 +34,6 @@
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterCreateQuery.h>
@ -59,7 +55,6 @@
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseReplicated.h>
#include <Databases/IDatabase.h>
#include <Databases/DatabaseOnDisk.h>
#include <Databases/TablesLoader.h>
#include <Databases/DDLDependencyVisitor.h>
@ -484,9 +479,8 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
{
column_type = DataTypeFactory::instance().get(col_decl.type);
const auto * aggregate_function_type = typeid_cast<const DataTypeAggregateFunction *>(column_type.get());
if (attach && aggregate_function_type && aggregate_function_type->isVersioned())
aggregate_function_type->setVersion(0, /* if_empty */true);
if (attach)
setVersionToAggregateFunctions(column_type, true);
if (col_decl.null_modifier)
{

View File

@ -28,7 +28,6 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
@ -984,22 +983,9 @@ void WindowTransform::writeOutCurrentRow()
// FIXME does it also allocate the result on the arena?
// We'll have to pass it out with blocks then...
if (a->isState())
{
/// AggregateFunction's states should be inserted into column using specific way
auto * res_col_aggregate_function = typeid_cast<ColumnAggregateFunction *>(result_column);
if (!res_col_aggregate_function)
{
throw Exception("State function " + a->getName() + " inserts results into non-state column ",
ErrorCodes::ILLEGAL_COLUMN);
}
res_col_aggregate_function->insertFrom(buf);
}
else
{
a->insertResultInto(buf, *result_column, arena.get());
}
/// We should use insertMergeResultInto to insert result into ColumnAggregateFunction
/// correctly if result contains AggregateFunction's states
a->insertMergeResultInto(buf, *result_column, arena.get());
}
}

View File

@ -1190,12 +1190,8 @@ void IMergeTreeDataPart::loadColumns(bool require)
auto in = metadata_manager->read("columns.txt");
loaded_columns.readText(*in);
for (const auto & column : loaded_columns)
{
const auto * aggregate_function_data_type = typeid_cast<const DataTypeAggregateFunction *>(column.type.get());
if (aggregate_function_data_type && aggregate_function_data_type->isVersioned())
aggregate_function_data_type->setVersion(0, /* if_empty */true);
}
for (auto & column : loaded_columns)
setVersionToAggregateFunctions(column.type, true);
}
SerializationInfo::Settings settings =

View File

@ -0,0 +1,50 @@
{1:'\0ёwRп'}
{1:'\0Dѕ@='}
{1:'\0<37><D08C>'}
{1:'\0СтоВ'}
{1:'\0"Q<06>'}
{1:'\0V\'<27>у'}
{1:'\0вт\0Ј'}
{1:'\0Ѓ_Ч'}
{1:'\0qЕ4h'}
{1:'\07'}
['\0ёwRп']
['\0Dѕ@=']
['\0<37><D08C>']
['\0СтоВ']
['\0"Q<06>']
['\0V\'<27>у']
['\0вт\0Ј']
['\0Ѓ_Ч']
['\0qЕ4h']
['\07']
['\0щZТ','\0\0']
['\0т4nџ','\0\0']
['\0<01>о<EFBFBD>','\0\0']
['\0Й<>№1','\0\0']
['\0<01>_<V','\0\0']
['\0ЪJА','\0\0']
['\0DЖ\03','\0\0']
['\0й6%','\0\0']
['\0@<40>','\0\0']
['\0gЙљ','\0\0']
[{1:['\0ёwRп']}]
[{1:['\0Dѕ@=']}]
[{1:['\0<37><D08C>']}]
[{1:['\0СтоВ']}]
[{1:['\0"Q<06>']}]
[{1:['\0V\'<27>у']}]
[{1:['\0вт\0Ј']}]
[{1:['\0Ѓ_Ч']}]
[{1:['\0qЕ4h']}]
[{1:['\07']}]
{1:'\0ёwRп'}
{1:'\0Dѕ@='}
{1:'\0<37><D08C>'}
{1:'\0СтоВ'}
{1:'\0"Q<06>'}
{1:'\0V\'<27>у'}
{1:'\0вт\0Ј'}
{1:'\0Ѓ_Ч'}
{1:'\0qЕ4h'}
{1:'\07'}

View File

@ -0,0 +1,10 @@
drop table if exists test;
create table test (x AggregateFunction(uniq, UInt64), y Int64) engine=Memory;
insert into test select uniqState(number) as x, number as y from numbers(10) group by number;
select uniqStateMap(map(1, x)) OVER (PARTITION BY y) from test;
select uniqStateForEach([x]) OVER (PARTITION BY y) from test;
select uniqStateResample(30, 75, 30)([x], 30) OVER (PARTITION BY y) from test;
select uniqStateForEachMapForEach([map(1, [x])]) OVER (PARTITION BY y) from test;
select uniqStateDistinctMap(map(1, x)) OVER (PARTITION BY y) from test;
drop table test;

View File

@ -0,0 +1,6 @@
{1:'\0çƒe'}
['\0,ËÂ4çƒe']
[{1:['\0çƒe']}]
[{1:'\0\f¤”µýŸ¿¼'},{1:'\0\f¤”µº#¾q'},{1:'\0\f*<•º#¾q'}]
[['\0\f¤”µýŸ¿¼'],['\0\f¤”µº#¾q'],['\0\f*<•º#¾q']]
[[{1:['\0\f¤”µýŸ¿¼']}],[{1:['\0\f¤”µº#¾q']}],[{1:['\0\f*<•º#¾q']}]]

View File

@ -0,0 +1,6 @@
select arrayReduce('uniqStateMap', [map(1, 2)]);
select arrayReduce('uniqStateForEach', [[1], [2]]);
select arrayReduce('uniqStateForEachMapForEach', [[map(1, [2])]]);
select arrayReduceInRanges('uniqStateMap', [(1, 3), (2, 3), (3, 3)], [map(1, 'a'), map(1, 'b'), map(1, 'c'), map(1, 'd'), map(1, 'e')]);
select arrayReduceInRanges('uniqStateForEach', [(1, 3), (2, 3), (3, 3)], [['a'], ['b'], ['c'],['d'], ['e']]);
select arrayReduceInRanges('uniqStateForEachMapForEach', [(1, 3), (2, 3), (3, 3)], [[map(1, ['a'])], [map(1, ['b'])], [map(1, ['c'])], [map(1, ['d'])], [map(1, ['e'])]]);

View File

@ -0,0 +1,3 @@
{1:'\0çƒe'}
['\0,ËÂ4','\0çƒe']
[{1:['\0çƒe']}]

View File

@ -0,0 +1,4 @@
select initializeAggregation('uniqStateMap', map(1, 2));
select initializeAggregation('uniqStateForEach', [1, 2]);
select initializeAggregation('uniqStateForEachMapForEach', [map(1, [2])]);