Add aggregate functions distinctDynamicTypes/distinctJSONPaths/distinctJSONPathsAndTypes

This commit is contained in:
avogar 2024-08-16 17:49:38 +00:00
parent 09d4964de5
commit 32f4b1f891
10 changed files with 951 additions and 1 deletions

View File

@ -0,0 +1,44 @@
---
slug: /en/sql-reference/aggregate-functions/reference/distinctdynamictypes
sidebar_position: 215
---
# distinctDynamicTypes
Calculates the list of distinct data types stored in [Dynamic](../../data-types/dynamic.md) column.
**Syntax**
```sql
distinctDynamicTypes(dynamic)
```
**Arguments**
- `dynamic` — [Dynamic](../../data-types/dynamic.md) column.
**Returned Value**
- The sorted list of data type names [Array(String)](../../data-types/array.md).
**Example**
Query:
```sql
DROP TABLE IF EXISTS test_dynamic;
CREATE TABLE test_dynamic(d Dynamic) ENGINE = Memory;
INSERT INTO test_dynamic VALUES (42), (NULL), ('Hello'), ([1, 2, 3]), ('2020-01-01'), (map(1, 2)), (43), ([4, 5]), (NULL), ('World'), (map(3, 4))
```
```sql
SELECT distinctDynamicTypes(d) FROM test_dynamic;
```
Result:
```reference
┌─distinctDynamicTypes(d)──────────────────────────────────────┐
│ ['Array(Int64)','Date','Int64','Map(UInt8, UInt8)','String'] │
└──────────────────────────────────────────────────────────────┘
```

View File

@ -0,0 +1,84 @@
---
slug: /en/sql-reference/aggregate-functions/reference/distinctjsonpaths
sidebar_position: 216
---
# distinctJSONPaths
Calculates the list of distinct paths stored in [JSON](../../data-types/newjson.md) column.
**Syntax**
```sql
distinctJSONPaths(json)
```
**Arguments**
- `json` — [JSON](../../data-types/newjson.md) column.
**Returned Value**
- The sorted list of paths [Array(String)](../../data-types/array.md).
**Example**
Query:
```sql
DROP TABLE IF EXISTS test_json;
CREATE TABLE test_json(json JSON) ENGINE = Memory;
INSERT INTO test_json VALUES ('{"a" : 42, "b" : "Hello"}'), ('{"b" : [1, 2, 3], "c" : {"d" : {"e" : "2020-01-01"}}}'), ('{"a" : 43, "c" : {"d" : {"f" : [{"g" : 42}]}}}')
```
```sql
SELECT distinctJSONPaths(json) FROM test_json;
```
Result:
```reference
┌─distinctJSONPaths(json)───┐
│ ['a','b','c.d.e','c.d.f'] │
└───────────────────────────┘
```
# distinctJSONPathsAndTypes
Calculates the list of distinct paths and their types stored in [JSON](../../data-types/newjson.md) column.
**Syntax**
```sql
distinctJSONPathsAndTypes(json)
```
**Arguments**
- `json` — [JSON](../../data-types/newjson.md) column.
**Returned Value**
- The sorted map of paths and types [Map(String, Array(String))](../../data-types/map.md).
**Example**
Query:
```sql
DROP TABLE IF EXISTS test_json;
CREATE TABLE test_json(json JSON) ENGINE = Memory;
INSERT INTO test_json VALUES ('{"a" : 42, "b" : "Hello"}'), ('{"b" : [1, 2, 3], "c" : {"d" : {"e" : "2020-01-01"}}}'), ('{"a" : 43, "c" : {"d" : {"f" : [{"g" : 42}]}}}')
```
```sql
SELECT distinctJSONPathsAndTypes(json) FROM test_json;
```
Result:
```reference
┌─distinctJSONPathsAndTypes(json)───────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ {'a':['Int64'],'b':['Array(Nullable(Int64))','String'],'c.d.e':['Date'],'c.d.f':['Array(JSON(max_dynamic_types=16, max_dynamic_paths=256))']} │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```

View File

@ -505,7 +505,130 @@ As we can see, ClickHouse kept the most frequent paths `a`, `b` and `c` and move
## Introspection functions
There are several functions that can help to inspect the content of the JSON column: [JSONAllPaths](../functions/json-functions.md#jsonallpaths), [JSONAllPathsWithTypes](../functions/json-functions.md#jsonallpathswithtypes), [JSONDynamicPaths](../functions/json-functions.md#jsondynamicpaths), [JSONDynamicPathsWithTypes](../functions/json-functions.md#jsondynamicpathswithtypes), [JSONSharedDataPaths](../functions/json-functions.md#jsonshareddatapaths), [JSONSharedDataPathsWithTypes](../functions/json-functions.md#jsonshareddatapathswithtypes).
There are several functions that can help to inspect the content of the JSON column: [JSONAllPaths](../functions/json-functions.md#jsonallpaths), [JSONAllPathsWithTypes](../functions/json-functions.md#jsonallpathswithtypes), [JSONDynamicPaths](../functions/json-functions.md#jsondynamicpaths), [JSONDynamicPathsWithTypes](../functions/json-functions.md#jsondynamicpathswithtypes), [JSONSharedDataPaths](../functions/json-functions.md#jsonshareddatapaths), [JSONSharedDataPathsWithTypes](../functions/json-functions.md#jsonshareddatapathswithtypes), [distinctDynamicTypes](../aggregate-functions/reference/distinctdynamictypes.md), [distinctJSONPaths and distinctJSONPathsAndTypes](../aggregate-functions/reference/distinctjsonpaths.md)
**Examples**
Let's investigate the content of [GH Archive](https://www.gharchive.org/) dataset for `2020-01-01` date:
```sql
SELECT arrayJoin(distinctJSONPaths(json)) FROM s3('s3://clickhouse-public-datasets/gharchive/original/2020-01-01-*.json.gz', JSONAsObject)
```
```text
┌─arrayJoin(distinctJSONPaths(json))─────────────────────────┐
│ actor.avatar_url │
│ actor.display_login │
│ actor.gravatar_id │
│ actor.id │
│ actor.login │
│ actor.url │
│ created_at │
│ id │
│ org.avatar_url │
│ org.gravatar_id │
│ org.id │
│ org.login │
│ org.url │
│ payload.action │
│ payload.before │
│ payload.comment._links.html.href │
│ payload.comment._links.pull_request.href │
│ payload.comment._links.self.href │
│ payload.comment.author_association │
│ payload.comment.body │
│ payload.comment.commit_id │
│ payload.comment.created_at │
│ payload.comment.diff_hunk │
│ payload.comment.html_url │
│ payload.comment.id │
│ payload.comment.in_reply_to_id │
│ payload.comment.issue_url │
│ payload.comment.line │
│ payload.comment.node_id │
│ payload.comment.original_commit_id │
│ payload.comment.original_position │
│ payload.comment.path │
│ payload.comment.position │
│ payload.comment.pull_request_review_id │
...
│ payload.release.node_id │
│ payload.release.prerelease │
│ payload.release.published_at │
│ payload.release.tag_name │
│ payload.release.tarball_url │
│ payload.release.target_commitish │
│ payload.release.upload_url │
│ payload.release.url │
│ payload.release.zipball_url │
│ payload.size │
│ public │
│ repo.id │
│ repo.name │
│ repo.url │
│ type │
└─arrayJoin(distinctJSONPaths(json))─────────────────────────┘
```
```sql
SELECT arrayJoin(distinctJSONPathsAndTypes(json)) FROM s3('s3://clickhouse-public-datasets/gharchive/original/2020-01-01-*.json.gz', JSONAsObject)
```
```text
┌─arrayJoin(distinctJSONPathsAndTypes(json))──────────────────┐
│ ('actor.avatar_url',['String']) │
│ ('actor.display_login',['String']) │
│ ('actor.gravatar_id',['String']) │
│ ('actor.id',['Int64']) │
│ ('actor.login',['String']) │
│ ('actor.url',['String']) │
│ ('created_at',['String']) │
│ ('id',['String']) │
│ ('org.avatar_url',['String']) │
│ ('org.gravatar_id',['String']) │
│ ('org.id',['Int64']) │
│ ('org.login',['String']) │
│ ('org.url',['String']) │
│ ('payload.action',['String']) │
│ ('payload.before',['String']) │
│ ('payload.comment._links.html.href',['String']) │
│ ('payload.comment._links.pull_request.href',['String']) │
│ ('payload.comment._links.self.href',['String']) │
│ ('payload.comment.author_association',['String']) │
│ ('payload.comment.body',['String']) │
│ ('payload.comment.commit_id',['String']) │
│ ('payload.comment.created_at',['String']) │
│ ('payload.comment.diff_hunk',['String']) │
│ ('payload.comment.html_url',['String']) │
│ ('payload.comment.id',['Int64']) │
│ ('payload.comment.in_reply_to_id',['Int64']) │
│ ('payload.comment.issue_url',['String']) │
│ ('payload.comment.line',['Int64']) │
│ ('payload.comment.node_id',['String']) │
│ ('payload.comment.original_commit_id',['String']) │
│ ('payload.comment.original_position',['Int64']) │
│ ('payload.comment.path',['String']) │
│ ('payload.comment.position',['Int64']) │
│ ('payload.comment.pull_request_review_id',['Int64']) │
...
│ ('payload.release.node_id',['String']) │
│ ('payload.release.prerelease',['Bool']) │
│ ('payload.release.published_at',['String']) │
│ ('payload.release.tag_name',['String']) │
│ ('payload.release.tarball_url',['String']) │
│ ('payload.release.target_commitish',['String']) │
│ ('payload.release.upload_url',['String']) │
│ ('payload.release.url',['String']) │
│ ('payload.release.zipball_url',['String']) │
│ ('payload.size',['Int64']) │
│ ('public',['Bool']) │
│ ('repo.id',['Int64']) │
│ ('repo.name',['String']) │
│ ('repo.url',['String']) │
│ ('type',['String']) │
└─arrayJoin(distinctJSONPathsAndTypes(json))──────────────────┘
```
## Tips for better usage of the JSON type

View File

@ -0,0 +1,155 @@
#include <unordered_set>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesBinaryEncoding.h>
#include <Columns/ColumnDynamic.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/FactoryHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
struct AggregateFunctionDistinctDynamicTypesData
{
std::unordered_set<String> data;
void add(const String & type)
{
data.insert(type);
}
void merge(const AggregateFunctionDistinctDynamicTypesData & other)
{
data.insert(other.data.begin(), other.data.end());
}
void serialize(WriteBuffer & buf) const
{
writeVarUInt(data.size(), buf);
for (const auto & type : data)
writeStringBinary(type, buf);
}
void deserialize(ReadBuffer & buf)
{
size_t size;
readVarUInt(size, buf);
data.reserve(size);
String type;
for (size_t i = 0; i != size; ++i)
{
readStringBinary(type, buf);
data.insert(type);
}
}
void insertResultInto(IColumn & column)
{
/// Insert types in sorted order for better output.
auto & array_column = assert_cast<ColumnArray &>(column);
auto & string_column = assert_cast<ColumnString &>(array_column.getData());
std::vector<String> sorted_data(data.begin(), data.end());
std::sort(sorted_data.begin(), sorted_data.end());
for (const auto & type : sorted_data)
string_column.insertData(type.data(), type.size());
array_column.getOffsets().push_back(string_column.size());
}
};
/// Calculates the list of distinct data types in Dynamic column.
class AggregateFunctionDistinctDynamicTypes final : public IAggregateFunctionDataHelper<AggregateFunctionDistinctDynamicTypesData, AggregateFunctionDistinctDynamicTypes>
{
public:
explicit AggregateFunctionDistinctDynamicTypes(const DataTypes & argument_types_)
: IAggregateFunctionDataHelper<AggregateFunctionDistinctDynamicTypesData, AggregateFunctionDistinctDynamicTypes>(argument_types_, {}, std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()))
{
}
String getName() const override { return "distinctDynamicTypes"; }
bool allocatesMemoryInArena() const override { return false; }
void ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
const auto & dynamic_column = assert_cast<const ColumnDynamic & >(*columns[0]);
if (dynamic_column.isNullAt(row_num))
return;
this->data(place).add(dynamic_column.getTypeNameAt(row_num));
}
void ALWAYS_INLINE addBatchSinglePlace(
size_t row_begin, size_t row_end, AggregateDataPtr __restrict place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos)
const override
{
if (if_argument_pos >= 0 || row_begin != 0 || row_end != columns[0]->size())
IAggregateFunctionDataHelper::addBatchSinglePlace(row_begin, row_end, place, columns, arena, if_argument_pos);
/// Optimization for case when we add all rows from the column into single place.
/// In this case we can avoid iterating over all rows because we can get all types
/// in Dynamic column in a more efficient way.
else
assert_cast<const ColumnDynamic & >(*columns[0]).getAllTypeNames(this->data(place).data);
}
void addManyDefaults(
AggregateDataPtr __restrict /*place*/,
const IColumn ** /*columns*/,
size_t /*length*/,
Arena * /*arena*/) const override
{
/// Default value for Dynamic is NULL, so nothing to add.
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
this->data(place).serialize(buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
{
this->data(place).deserialize(buf);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
this->data(place).insertResultInto(to);
}
};
AggregateFunctionPtr createAggregateFunctionDistinctDynamicTypes(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
if (argument_types.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Incorrect number of arguments for aggregate function {}. Expected single argument with type Dynamic, got {} arguments", name, argument_types.size());
if (!isDynamic(argument_types[0]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}. Expected type Dynamic", argument_types[0]->getName(), name);
return std::make_shared<AggregateFunctionDistinctDynamicTypes>(argument_types);
}
void registerAggregateFunctionDistinctDynamicTypes(AggregateFunctionFactory & factory)
{
factory.registerFunction("distinctDynamicTypes", createAggregateFunctionDistinctDynamicTypes);
}
}

View File

@ -0,0 +1,331 @@
#include <unordered_set>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeObject.h>
#include <DataTypes/DataTypesBinaryEncoding.h>
#include <Columns/ColumnDynamic.h>
#include <Columns/ColumnObject.h>
#include <Columns/ColumnMap.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/FactoryHelpers.h>
namespace DB
{
struct AggregateFunctionDistinctJSONPathsData
{
static constexpr auto name = "distinctJSONPaths";
std::unordered_set<String> data;
void add(const ColumnObject & column, size_t row_num, const std::unordered_map<String, String> &)
{
for (const auto & [path, _] : column.getTypedPaths())
data.insert(path);
for (const auto & [path, dynamic_column] : column.getDynamicPathsPtrs())
{
/// Add path from dynamic paths only if it's not NULL in this row.
if (!dynamic_column->isNullAt(row_num))
data.insert(path);
}
/// Iterate over paths in shared data in this row.
const auto [shared_data_paths, _] = column.getSharedDataPathsAndValues();
const auto & shared_data_offsets = column.getSharedDataOffsets();
size_t start = shared_data_offsets[static_cast<ssize_t>(row_num) - 1];
size_t end = shared_data_offsets[static_cast<ssize_t>(row_num)];
for (size_t i = start; i != end; ++i)
data.insert(shared_data_paths->getDataAt(i).toString());
}
void addWholeColumn(const ColumnObject & column, const std::unordered_map<String, String> &)
{
for (const auto & [path, _] : column.getTypedPaths())
data.insert(path);
for (const auto & [path, dynamic_column] : column.getDynamicPathsPtrs())
{
/// Add dynamic path only if it has at least one non-null value.
/// getNumberOfDefaultRows for Dynamic column is O(1).
if (dynamic_column->getNumberOfDefaultRows() != dynamic_column->size())
data.insert(path);
}
/// Iterate over all paths in shared data.
const auto [shared_data_paths, _] = column.getSharedDataPathsAndValues();
for (size_t i = 0; i != shared_data_paths->size(); ++i)
data.insert(shared_data_paths->getDataAt(i).toString());
}
void merge(const AggregateFunctionDistinctJSONPathsData & other)
{
data.insert(other.data.begin(), other.data.end());
}
void serialize(WriteBuffer & buf) const
{
writeVarUInt(data.size(), buf);
for (const auto & path : data)
writeStringBinary(path, buf);
}
void deserialize(ReadBuffer & buf)
{
size_t size;
readVarUInt(size, buf);
String path;
for (size_t i = 0; i != size; ++i)
{
readStringBinary(path, buf);
data.insert(path);
}
}
void insertResultInto(IColumn & column)
{
/// Insert paths in sorted order for better output.
auto & array_column = assert_cast<ColumnArray &>(column);
auto & string_column = assert_cast<ColumnString &>(array_column.getData());
std::vector<String> sorted_data(data.begin(), data.end());
std::sort(sorted_data.begin(), sorted_data.end());
for (const auto & path : sorted_data)
string_column.insertData(path.data(), path.size());
array_column.getOffsets().push_back(string_column.size());
}
static DataTypePtr getResultType()
{
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>());
}
};
struct AggregateFunctionDistinctJSONPathsAndTypesData
{
static constexpr auto name = "distinctJSONPathsAndTypes";
std::unordered_map<String, std::unordered_set<String>> data;
void add(const ColumnObject & column, size_t row_num, const std::unordered_map<String, String> & typed_paths_type_names)
{
for (const auto & [path, _] : column.getTypedPaths())
data[path].insert(typed_paths_type_names.at(path));
for (const auto & [path, dynamic_column] : column.getDynamicPathsPtrs())
{
if (!dynamic_column->isNullAt(row_num))
data[path].insert(dynamic_column->getTypeNameAt(row_num));
}
/// Iterate over paths om shared data in this row and decode the data types.
const auto [shared_data_paths, shared_data_values] = column.getSharedDataPathsAndValues();
const auto & shared_data_offsets = column.getSharedDataOffsets();
size_t start = shared_data_offsets[static_cast<ssize_t>(row_num) - 1];
size_t end = shared_data_offsets[static_cast<ssize_t>(row_num)];
for (size_t i = start; i != end; ++i)
{
auto path = shared_data_paths->getDataAt(i).toString();
auto value = shared_data_values->getDataAt(i);
ReadBufferFromMemory buf(value.data, value.size);
auto type = decodeDataType(buf);
/// We should not have Nulls here but let's check just in case.
if (!isNothing(type))
data[path].insert(type->getName());
}
}
void addWholeColumn(const ColumnObject & column, const std::unordered_map<String, String> & typed_paths_type_names)
{
for (const auto & [path, _] : column.getTypedPaths())
data[path].insert(typed_paths_type_names.at(path));
for (const auto & [path, dynamic_column] : column.getDynamicPathsPtrs())
{
/// Add dynamic path only if it has at least one non-null value.
/// getNumberOfDefaultRows for Dynamic column is O(1).
if (dynamic_column->getNumberOfDefaultRows() != dynamic_column->size())
dynamic_column->getAllTypeNames(data[path]);
}
/// Iterate over all paths in shared data and decode the data types.
const auto [shared_data_paths, shared_data_values] = column.getSharedDataPathsAndValues();
for (size_t i = 0; i != shared_data_paths->size(); ++i)
{
auto path = shared_data_paths->getDataAt(i).toString();
auto value = shared_data_values->getDataAt(i);
ReadBufferFromMemory buf(value.data, value.size);
auto type = decodeDataType(buf);
/// We should not have Nulls here but let's check just in case.
if (!isNothing(type))
data[path].insert(type->getName());
}
}
void merge(const AggregateFunctionDistinctJSONPathsAndTypesData & other)
{
for (const auto & [path, types] : other.data)
data[path].insert(types.begin(), types.end());
}
void serialize(WriteBuffer & buf) const
{
writeVarUInt(data.size(), buf);
for (const auto & [path, types] : data)
{
writeStringBinary(path, buf);
writeVarUInt(types.size(), buf);
for (const auto & type : types)
writeStringBinary(type, buf);
}
}
void deserialize(ReadBuffer & buf)
{
size_t paths_size, types_size;
readVarUInt(paths_size, buf);
data.reserve(paths_size);
String path, type;
for (size_t i = 0; i != paths_size; ++i)
{
readStringBinary(path, buf);
readVarUInt(types_size, buf);
data[path].reserve(types_size);
for (size_t j = 0; j != types_size; ++j)
{
readStringBinary(type, buf);
data[path].insert(type);
}
}
}
void insertResultInto(IColumn & column)
{
/// Insert sorted paths and types for better output.
auto & array_column = assert_cast<ColumnMap &>(column).getNestedColumn();
auto & tuple_column = assert_cast<ColumnTuple &>(array_column.getData());
auto & key_column = assert_cast<ColumnString &>(tuple_column.getColumn(0));
auto & value_column = assert_cast<ColumnArray &>(tuple_column.getColumn(1));
auto & value_column_data = assert_cast<ColumnString &>(value_column.getData());
std::vector<std::pair<String, std::vector<String>>> sorted_data;
sorted_data.reserve(data.size());
for (const auto & [path, types] : data)
{
std::vector<String> sorted_types(types.begin(), types.end());
std::sort(sorted_types.begin(), sorted_types.end());
sorted_data.emplace_back(path, std::move(sorted_types));
}
std::sort(sorted_data.begin(), sorted_data.end());
for (const auto & [path, types] : sorted_data)
{
key_column.insertData(path.data(), path.size());
for (const auto & type : types)
value_column_data.insertData(type.data(), type.size());
value_column.getOffsets().push_back(value_column_data.size());
}
array_column.getOffsets().push_back(key_column.size());
}
static DataTypePtr getResultType()
{
return std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()));
}
};
/// Calculates the list of distinct data types in Dynamic column.
template <typename Data>
class AggregateFunctionDistinctJSONPathsAndTypes final : public IAggregateFunctionDataHelper<Data, AggregateFunctionDistinctJSONPathsAndTypes<Data>>
{
public:
explicit AggregateFunctionDistinctJSONPathsAndTypes(const DataTypes & argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionDistinctJSONPathsAndTypes<Data>>(
argument_types_, {}, Data::getResultType())
{
const auto & typed_paths_types = assert_cast<const DataTypeObject &>(*argument_types_[0]).getTypedPaths();
typed_paths_type_names.reserve(typed_paths_types.size());
for (const auto & [path, type] : typed_paths_types)
typed_paths_type_names[path] = type->getName();
}
String getName() const override { return Data::name; }
bool allocatesMemoryInArena() const override { return false; }
void ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
const auto & object_column = assert_cast<const ColumnObject & >(*columns[0]);
this->data(place).add(object_column, row_num, typed_paths_type_names);
}
void ALWAYS_INLINE addBatchSinglePlace(
size_t row_begin, size_t row_end, AggregateDataPtr __restrict place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos)
const override
{
if (if_argument_pos >= 0 || row_begin != 0 || row_end != columns[0]->size())
IAggregateFunctionDataHelper<Data, AggregateFunctionDistinctJSONPathsAndTypes<Data>>::addBatchSinglePlace(row_begin, row_end, place, columns, arena, if_argument_pos);
/// Optimization for case when we add all rows from the column into single place.
/// In this case we can avoid iterating over all rows because we can get all paths
/// and types in JSON column in a more efficient way.
else
this->data(place).addWholeColumn(assert_cast<const ColumnObject & >(*columns[0]), typed_paths_type_names);
}
void addManyDefaults(
AggregateDataPtr __restrict /*place*/,
const IColumn ** /*columns*/,
size_t /*length*/,
Arena * /*arena*/) const override
{
/// Default value for JSON is empty object, so nothing to add.
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
this->data(place).serialize(buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
{
this->data(place).deserialize(buf);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
this->data(place).insertResultInto(to);
}
private:
std::unordered_map<String, String> typed_paths_type_names;
};
template <typename Data>
AggregateFunctionPtr createAggregateFunctionDistinctJSONPathsAndTypes(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
if (argument_types.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Incorrect number of arguments for aggregate function {}. Expected single argument with type JSON, got {} arguments", name, argument_types.size());
if (!isObject(argument_types[0]))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}. Expected type JSON", argument_types[0]->getName(), name);
return std::make_shared<AggregateFunctionDistinctJSONPathsAndTypes<Data>>(argument_types);
}
void registerAggregateFunctionDistinctJSONPathsAndTypes(AggregateFunctionFactory & factory)
{
factory.registerFunction("distinctJSONPaths", createAggregateFunctionDistinctJSONPathsAndTypes<AggregateFunctionDistinctJSONPathsData>);
factory.registerFunction("distinctJSONPathsAndTypes", createAggregateFunctionDistinctJSONPathsAndTypes<AggregateFunctionDistinctJSONPathsAndTypesData>);
}
}

View File

@ -89,6 +89,8 @@ void registerAggregateFunctionAnalysisOfVariance(AggregateFunctionFactory &);
void registerAggregateFunctionFlameGraph(AggregateFunctionFactory &);
void registerAggregateFunctionKolmogorovSmirnovTest(AggregateFunctionFactory & factory);
void registerAggregateFunctionLargestTriangleThreeBuckets(AggregateFunctionFactory & factory);
void registerAggregateFunctionDistinctDynamicTypes(AggregateFunctionFactory & factory);
void registerAggregateFunctionDistinctJSONPathsAndTypes(AggregateFunctionFactory & factory);
class AggregateFunctionCombinatorFactory;
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
@ -191,6 +193,8 @@ void registerAggregateFunctions()
registerAggregateFunctionFlameGraph(factory);
registerAggregateFunctionKolmogorovSmirnovTest(factory);
registerAggregateFunctionLargestTriangleThreeBuckets(factory);
registerAggregateFunctionDistinctDynamicTypes(factory);
registerAggregateFunctionDistinctJSONPathsAndTypes(factory);
registerWindowFunctions(factory);
}

View File

@ -980,6 +980,41 @@ ColumnPtr ColumnDynamic::compress() const
});
}
String ColumnDynamic::getTypeNameAt(size_t row_num) const
{
const auto & variant_col = getVariantColumn();
size_t discr = variant_col.globalDiscriminatorAt(row_num);
if (discr == ColumnVariant::NULL_DISCRIMINATOR)
return "";
if (discr == getSharedVariantDiscriminator())
{
const auto value = getSharedVariant().getDataAt(variant_col.offsetAt(row_num));
ReadBufferFromMemory buf(value.data, value.size);
return decodeDataType(buf)->getName();
}
return variant_info.variant_names[discr];
}
void ColumnDynamic::getAllTypeNames(std::unordered_set<String> & names) const
{
auto shared_variant_discr = getSharedVariantDiscriminator();
for (size_t i = 0; i != variant_info.variant_names.size(); ++i)
{
if (i != shared_variant_discr && !variant_column_ptr->getVariantByGlobalDiscriminator(i).empty())
names.insert(variant_info.variant_names[i]);
}
const auto & shared_variant = getSharedVariant();
for (size_t i = 0; i != shared_variant.size(); ++i)
{
const auto value = shared_variant.getDataAt(i);
ReadBufferFromMemory buf(value.data, value.size);
names.insert(decodeDataType(buf)->getName());
}
}
void ColumnDynamic::prepareForSquashing(const Columns & source_columns)
{
if (source_columns.empty())

View File

@ -430,6 +430,9 @@ public:
const SerializationPtr & getVariantSerialization(const DataTypePtr & variant_type) const { return getVariantSerialization(variant_type, variant_type->getName()); }
String getTypeNameAt(size_t row_num) const;
void getAllTypeNames(std::unordered_set<String> & names) const;
private:
void createVariantInfo(const DataTypePtr & variant_type);

View File

@ -0,0 +1,121 @@
a0
a1
a10
a11
a12
a2
a3
a4
a5
a6
a7
a8
a9
('a0',['Array(Nullable(Int64))','Date','Int64','String'])
('a1',['String'])
('a10',['Array(Nullable(Int64))','Date','Int64','String'])
('a11',['Array(Nullable(Int64))','Date','Int64','String'])
('a12',['Array(Nullable(Int64))','Date','Int64','String'])
('a2',['Array(Nullable(Int64))','Date','Int64','String'])
('a3',['Array(Nullable(Int64))','Date','Int64','String'])
('a4',['Array(Nullable(Int64))','Date','Int64','String'])
('a5',['Array(Nullable(Int64))','Date','Int64','String'])
('a6',['Array(Nullable(Int64))','Date','Int64','String'])
('a7',['Array(Nullable(Int64))','Date','Int64','String'])
('a8',['Array(Nullable(Int64))','Date','Int64','String'])
('a9',['Array(Nullable(Int64))','Date','Int64','String'])
Array(Nullable(Int64))
Date
Int64
String
Array(Nullable(Int64))
Date
Int64
String
Filter
a1
a2
('a1',['String'])
('a2',['String'])
String
If
a1
a2
('a1',['String'])
('a2',['String'])
String
Group by
Array(Nullable(Int64)) ['a1','a2']
Date ['a1','a2']
Int64 ['a1','a2']
None ['a0','a1','a10','a11','a12','a3','a4','a5','a6','a7','a8','a9']
String ['a1','a2']
Array(Nullable(Int64)) {'a1':['String'],'a2':['Array(Nullable(Int64))']}
Date {'a1':['String'],'a2':['Date']}
Int64 {'a1':['String'],'a2':['Int64']}
None {'a0':['Array(Nullable(Int64))','Date','Int64','String'],'a1':['String'],'a10':['Array(Nullable(Int64))','Date','Int64','String'],'a11':['Array(Nullable(Int64))','Date','Int64','String'],'a12':['Array(Nullable(Int64))','Date','Int64','String'],'a3':['Array(Nullable(Int64))','Date','Int64','String'],'a4':['Array(Nullable(Int64))','Date','Int64','String'],'a5':['Array(Nullable(Int64))','Date','Int64','String'],'a6':['Array(Nullable(Int64))','Date','Int64','String'],'a7':['Array(Nullable(Int64))','Date','Int64','String'],'a8':['Array(Nullable(Int64))','Date','Int64','String'],'a9':['Array(Nullable(Int64))','Date','Int64','String']}
String {'a1':['String'],'a2':['String']}
Array(Nullable(Int64)) ['Array(Nullable(Int64))']
Date ['Date']
Int64 ['Int64']
None []
String ['String']
Remote
a0
a1
a10
a11
a12
a2
a3
a4
a5
a6
a7
a8
a9
('a0',['Array(Nullable(Int64))','Date','Int64','String'])
('a1',['String'])
('a10',['Array(Nullable(Int64))','Date','Int64','String'])
('a11',['Array(Nullable(Int64))','Date','Int64','String'])
('a12',['Array(Nullable(Int64))','Date','Int64','String'])
('a2',['Array(Nullable(Int64))','Date','Int64','String'])
('a3',['Array(Nullable(Int64))','Date','Int64','String'])
('a4',['Array(Nullable(Int64))','Date','Int64','String'])
('a5',['Array(Nullable(Int64))','Date','Int64','String'])
('a6',['Array(Nullable(Int64))','Date','Int64','String'])
('a7',['Array(Nullable(Int64))','Date','Int64','String'])
('a8',['Array(Nullable(Int64))','Date','Int64','String'])
('a9',['Array(Nullable(Int64))','Date','Int64','String'])
Array(Nullable(Int64))
Date
Int64
String
Remote filter
a1
a2
('a1',['String'])
('a2',['String'])
String
Remote if
a1
a2
('a1',['String'])
('a2',['String'])
String
Remote group by
Array(Nullable(Int64)) ['a1','a2']
Date ['a1','a2']
Int64 ['a1','a2']
None ['a0','a1','a10','a11','a12','a3','a4','a5','a6','a7','a8','a9']
String ['a1','a2']
Array(Nullable(Int64)) {'a1':['String'],'a2':['Array(Nullable(Int64))']}
Date {'a1':['String'],'a2':['Date']}
Int64 {'a1':['String'],'a2':['Int64']}
None {'a0':['Array(Nullable(Int64))','Date','Int64','String'],'a1':['String'],'a10':['Array(Nullable(Int64))','Date','Int64','String'],'a11':['Array(Nullable(Int64))','Date','Int64','String'],'a12':['Array(Nullable(Int64))','Date','Int64','String'],'a3':['Array(Nullable(Int64))','Date','Int64','String'],'a4':['Array(Nullable(Int64))','Date','Int64','String'],'a5':['Array(Nullable(Int64))','Date','Int64','String'],'a6':['Array(Nullable(Int64))','Date','Int64','String'],'a7':['Array(Nullable(Int64))','Date','Int64','String'],'a8':['Array(Nullable(Int64))','Date','Int64','String'],'a9':['Array(Nullable(Int64))','Date','Int64','String']}
String {'a1':['String'],'a2':['String']}
Array(Nullable(Int64)) ['Array(Nullable(Int64))']
Date ['Date']
Int64 ['Int64']
None []
String ['String']

View File

@ -0,0 +1,50 @@
set allow_experimental_dynamic_type = 1;
set allow_experimental_json_type = 1;
set allow_experimental_variant_type = 1;
set use_variant_as_common_type = 1;
drop table if exists test_json_dynamic_aggregate_functions;
create table test_json_dynamic_aggregate_functions (json JSON(a1 String, max_dynamic_paths=2, max_dynamic_types=2)) engine=Memory;
insert into test_json_dynamic_aggregate_functions select toJSONString(map('a' || number % 13, multiIf(number % 5 == 0, NULL, number % 5 == 1, number::UInt32, number % 5 == 2, 'str_' || number, number % 5 == 3, range(number % 5), toDate(number)))) from numbers(200000);
select arrayJoin(distinctJSONPaths(json)) from test_json_dynamic_aggregate_functions;
select arrayJoin(distinctJSONPathsAndTypes(json)) from test_json_dynamic_aggregate_functions;
select arrayJoin(distinctDynamicTypes(json.a2)) from test_json_dynamic_aggregate_functions;
select arrayJoin(distinctDynamicTypes(json.a3)) from test_json_dynamic_aggregate_functions;
select arrayJoin(distinctDynamicTypes(json.a42)) from test_json_dynamic_aggregate_functions;
select 'Filter';
select arrayJoin(distinctJSONPaths(json)) from test_json_dynamic_aggregate_functions where dynamicType(json.a2) == 'String';
select arrayJoin(distinctJSONPathsAndTypes(json)) from test_json_dynamic_aggregate_functions where dynamicType(json.a2) == 'String';
select arrayJoin(distinctDynamicTypes(json.a2)) from test_json_dynamic_aggregate_functions where dynamicType(json.a2) == 'String';
select 'If';
select arrayJoin(distinctJSONPathsIf(json, dynamicType(json.a2) == 'String')) from test_json_dynamic_aggregate_functions;
select arrayJoin(distinctJSONPathsAndTypesIf(json, dynamicType(json.a2) == 'String')) from test_json_dynamic_aggregate_functions;
select arrayJoin(distinctDynamicTypesIf(json.a2, dynamicType(json.a2) == 'String')) from test_json_dynamic_aggregate_functions;
select 'Group by';
select dynamicType(json.a2), distinctJSONPaths(json) from test_json_dynamic_aggregate_functions group by dynamicType(json.a2) order by dynamicType(json.a2);
select dynamicType(json.a2), distinctJSONPathsAndTypes(json) from test_json_dynamic_aggregate_functions group by dynamicType(json.a2) order by dynamicType(json.a2);
select dynamicType(json.a2), distinctDynamicTypes(json.a2) from test_json_dynamic_aggregate_functions group by dynamicType(json.a2) order by dynamicType(json.a2);
select 'Remote';
select arrayJoin(distinctJSONPaths(json)) from remote('127.0.0.{1,2,3}', currentDatabase(), test_json_dynamic_aggregate_functions);
select arrayJoin(distinctJSONPathsAndTypes(json)) from remote('127.0.0.{1,2,3}', currentDatabase(), test_json_dynamic_aggregate_functions);
select arrayJoin(distinctDynamicTypes(json.a2)) from remote('127.0.0.{1,2,3}', currentDatabase(), test_json_dynamic_aggregate_functions);
select 'Remote filter';
select arrayJoin(distinctJSONPaths(json)) from remote('127.0.0.{1,2,3}', currentDatabase(), test_json_dynamic_aggregate_functions) where dynamicType(json.a2) == 'String';
select arrayJoin(distinctJSONPathsAndTypes(json)) from remote('127.0.0.{1,2,3}', currentDatabase(), test_json_dynamic_aggregate_functions) where dynamicType(json.a2) == 'String';
select arrayJoin(distinctDynamicTypes(json.a2)) from remote('127.0.0.{1,2,3}', currentDatabase(), test_json_dynamic_aggregate_functions) where dynamicType(json.a2) == 'String';
select 'Remote if';
select arrayJoin(distinctJSONPathsIf(json, dynamicType(json.a2) == 'String')) from remote('127.0.0.{1,2,3}', currentDatabase(), test_json_dynamic_aggregate_functions);
select arrayJoin(distinctJSONPathsAndTypesIf(json, dynamicType(json.a2) == 'String')) from remote('127.0.0.{1,2,3}', currentDatabase(), test_json_dynamic_aggregate_functions);
select arrayJoin(distinctDynamicTypesIf(json.a2, dynamicType(json.a2) == 'String')) from remote('127.0.0.{1,2,3}', currentDatabase(), test_json_dynamic_aggregate_functions);
select 'Remote group by';
select dynamicType(json.a2), distinctJSONPaths(json) from remote('127.0.0.{1,2,3}', currentDatabase(), test_json_dynamic_aggregate_functions) group by dynamicType(json.a2) order by dynamicType(json.a2);
select dynamicType(json.a2), distinctJSONPathsAndTypes(json) from remote('127.0.0.{1,2,3}', currentDatabase(), test_json_dynamic_aggregate_functions) group by dynamicType(json.a2) order by dynamicType(json.a2);
select dynamicType(json.a2), distinctDynamicTypes(json.a2) from remote('127.0.0.{1,2,3}', currentDatabase(), test_json_dynamic_aggregate_functions) group by dynamicType(json.a2) order by dynamicType(json.a2);
drop table test_json_dynamic_aggregate_functions;