mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Compare commits
23 Commits
dcff06d2c3
...
12c4c63e70
Author | SHA1 | Date | |
---|---|---|---|
|
12c4c63e70 | ||
|
44b4bd38b9 | ||
|
40c7d5fd1a | ||
|
4ccebd9a24 | ||
|
99177c0daf | ||
|
2b39f6c019 | ||
|
151c881e43 | ||
|
7f6dcb854b | ||
|
0951991c1d | ||
|
19aec5e572 | ||
|
a367de9977 | ||
|
6894e280b2 | ||
|
39ebe113d9 | ||
|
239bbaa133 | ||
|
07fac5808d | ||
|
ed95e0781f | ||
|
014608fb6b | ||
|
a29ded4941 | ||
|
d2efae7511 | ||
|
93464f52f4 | ||
|
6879aa130a | ||
|
43f3c886a2 | ||
|
c383a743f7 |
@ -165,3 +165,10 @@ template <typename... Args>
|
|||||||
constexpr void UNUSED(Args &&... args [[maybe_unused]]) // NOLINT(cppcoreguidelines-missing-std-forward)
|
constexpr void UNUSED(Args &&... args [[maybe_unused]]) // NOLINT(cppcoreguidelines-missing-std-forward)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define DB_CONCATENATE_IMPL(s1, s2) s1##s2
|
||||||
|
#define DB_CONCATENATE(s1, s2) DB_CONCATENATE_IMPL(s1, s2)
|
||||||
|
|
||||||
|
#define DB_ANONYMOUS_VARIABLE(str) \
|
||||||
|
DB_CONCATENATE(DB_CONCATENATE(DB_CONCATENATE(str, __COUNTER__), _), __LINE__)
|
||||||
|
|
||||||
|
@ -49,4 +49,4 @@ LIMIT 2
|
|||||||
**See Also**
|
**See Also**
|
||||||
|
|
||||||
- [DeltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
|
- [DeltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
|
||||||
|
- [DeltaLake cluster table function](/docs/en/sql-reference/table-functions/deltalakeCluster.md)
|
||||||
|
30
docs/en/sql-reference/table-functions/deltalakeCluster.md
Normal file
30
docs/en/sql-reference/table-functions/deltalakeCluster.md
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
---
|
||||||
|
slug: /en/sql-reference/table-functions/deltalakeCluster
|
||||||
|
sidebar_position: 46
|
||||||
|
sidebar_label: deltaLakeCluster
|
||||||
|
title: "deltaLakeCluster Table Function"
|
||||||
|
---
|
||||||
|
This is an extension to the [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
|
||||||
|
|
||||||
|
Allows processing files from [Delta Lake](https://github.com/delta-io/delta) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
|
||||||
|
|
||||||
|
**Syntax**
|
||||||
|
|
||||||
|
``` sql
|
||||||
|
deltaLakeCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
|
||||||
|
```
|
||||||
|
|
||||||
|
**Arguments**
|
||||||
|
|
||||||
|
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
|
||||||
|
|
||||||
|
- Description of all other arguments coincides with description of arguments in equivalent [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
|
||||||
|
|
||||||
|
**Returned value**
|
||||||
|
|
||||||
|
A table with the specified structure for reading data from cluster in the specified Delta Lake table in S3.
|
||||||
|
|
||||||
|
**See Also**
|
||||||
|
|
||||||
|
- [deltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
|
||||||
|
- [deltaLake table function](/docs/en/sql-reference/table-functions/deltalake.md)
|
@ -29,4 +29,4 @@ A table with the specified structure for reading data in the specified Hudi tabl
|
|||||||
**See Also**
|
**See Also**
|
||||||
|
|
||||||
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
|
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
|
||||||
|
- [Hudi cluster table function](/docs/en/sql-reference/table-functions/hudiCluster.md)
|
||||||
|
30
docs/en/sql-reference/table-functions/hudiCluster.md
Normal file
30
docs/en/sql-reference/table-functions/hudiCluster.md
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
---
|
||||||
|
slug: /en/sql-reference/table-functions/hudiCluster
|
||||||
|
sidebar_position: 86
|
||||||
|
sidebar_label: hudiCluster
|
||||||
|
title: "hudiCluster Table Function"
|
||||||
|
---
|
||||||
|
This is an extension to the [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
|
||||||
|
|
||||||
|
Allows processing files from Apache [Hudi](https://hudi.apache.org/) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
|
||||||
|
|
||||||
|
**Syntax**
|
||||||
|
|
||||||
|
``` sql
|
||||||
|
hudiCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
|
||||||
|
```
|
||||||
|
|
||||||
|
**Arguments**
|
||||||
|
|
||||||
|
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
|
||||||
|
|
||||||
|
- Description of all other arguments coincides with description of arguments in equivalent [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
|
||||||
|
|
||||||
|
**Returned value**
|
||||||
|
|
||||||
|
A table with the specified structure for reading data from cluster in the specified Hudi table in S3.
|
||||||
|
|
||||||
|
**See Also**
|
||||||
|
|
||||||
|
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
|
||||||
|
- [Hudi table function](/docs/en/sql-reference/table-functions/hudi.md)
|
@ -72,3 +72,4 @@ Table function `iceberg` is an alias to `icebergS3` now.
|
|||||||
**See Also**
|
**See Also**
|
||||||
|
|
||||||
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
|
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
|
||||||
|
- [Iceberg cluster table function](/docs/en/sql-reference/table-functions/icebergCluster.md)
|
||||||
|
43
docs/en/sql-reference/table-functions/icebergCluster.md
Normal file
43
docs/en/sql-reference/table-functions/icebergCluster.md
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
---
|
||||||
|
slug: /en/sql-reference/table-functions/icebergCluster
|
||||||
|
sidebar_position: 91
|
||||||
|
sidebar_label: icebergCluster
|
||||||
|
title: "icebergCluster Table Function"
|
||||||
|
---
|
||||||
|
This is an extension to the [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
|
||||||
|
|
||||||
|
Allows processing files from Apache [Iceberg](https://iceberg.apache.org/) in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
|
||||||
|
|
||||||
|
**Syntax**
|
||||||
|
|
||||||
|
``` sql
|
||||||
|
icebergS3Cluster(cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method])
|
||||||
|
icebergS3Cluster(cluster_name, named_collection[, option=value [,..]])
|
||||||
|
|
||||||
|
icebergAzureCluster(cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method])
|
||||||
|
icebergAzureCluster(cluster_name, named_collection[, option=value [,..]])
|
||||||
|
|
||||||
|
icebergHDFSCluster(cluster_name, path_to_table, [,format] [,compression_method])
|
||||||
|
icebergHDFSCluster(cluster_name, named_collection[, option=value [,..]])
|
||||||
|
```
|
||||||
|
|
||||||
|
**Arguments**
|
||||||
|
|
||||||
|
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
|
||||||
|
|
||||||
|
- Description of all other arguments coincides with description of arguments in equivalent [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
|
||||||
|
|
||||||
|
**Returned value**
|
||||||
|
|
||||||
|
A table with the specified structure for reading data from cluster in the specified Iceberg table.
|
||||||
|
|
||||||
|
**Examples**
|
||||||
|
|
||||||
|
```sql
|
||||||
|
SELECT * FROM icebergS3Cluster('cluster_simple', 'http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test')
|
||||||
|
```
|
||||||
|
|
||||||
|
**See Also**
|
||||||
|
|
||||||
|
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
|
||||||
|
- [Iceberg table function](/docs/en/sql-reference/table-functions/iceberg.md)
|
@ -613,6 +613,7 @@
|
|||||||
M(733, TABLE_IS_BEING_RESTARTED) \
|
M(733, TABLE_IS_BEING_RESTARTED) \
|
||||||
M(734, CANNOT_WRITE_AFTER_BUFFER_CANCELED) \
|
M(734, CANNOT_WRITE_AFTER_BUFFER_CANCELED) \
|
||||||
M(735, QUERY_WAS_CANCELLED_BY_CLIENT) \
|
M(735, QUERY_WAS_CANCELLED_BY_CLIENT) \
|
||||||
|
M(736, JSON_PARSE_ERROR) \
|
||||||
\
|
\
|
||||||
M(900, DISTRIBUTED_CACHE_ERROR) \
|
M(900, DISTRIBUTED_CACHE_ERROR) \
|
||||||
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \
|
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \
|
||||||
|
@ -87,7 +87,7 @@ struct DummyJSONParser
|
|||||||
static Iterator end() { return {}; }
|
static Iterator end() { return {}; }
|
||||||
static size_t size() { return 0; }
|
static size_t size() { return 0; }
|
||||||
bool find(std::string_view, Element &) const { return false; } /// NOLINT
|
bool find(std::string_view, Element &) const { return false; } /// NOLINT
|
||||||
|
bool reset() { return true; }
|
||||||
#if 0
|
#if 0
|
||||||
/// Optional: Provides access to an object's element by index.
|
/// Optional: Provides access to an object's element by index.
|
||||||
KeyValuePair operator[](size_t) const { return {}; }
|
KeyValuePair operator[](size_t) const { return {}; }
|
||||||
|
@ -18,8 +18,19 @@ namespace DB
|
|||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int CANNOT_ALLOCATE_MEMORY;
|
extern const int CANNOT_ALLOCATE_MEMORY;
|
||||||
|
extern const int JSON_PARSE_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define SIMDJSON_ASSIGN_OR_THROW_IMPL(_result, _lhs, _rexpr) \
|
||||||
|
auto && _result = (_rexpr); \
|
||||||
|
if (_result.error() != ::simdjson::SUCCESS) \
|
||||||
|
throw DB::ErrnoException(ErrorCodes::JSON_PARSE_ERROR, "simdjson error: {}", std::string(::simdjson::error_message(_result.error()))); \
|
||||||
|
_lhs = std::move(_result).value_unsafe()
|
||||||
|
|
||||||
|
#define SIMDJSON_ASSIGN_OR_THROW(_lhs, _rexpr) \
|
||||||
|
SIMDJSON_ASSIGN_OR_THROW_IMPL( \
|
||||||
|
DB_ANONYMOUS_VARIABLE(_simdjson_sesult), _lhs, _rexpr)
|
||||||
|
|
||||||
/// Format elements of basic types into string.
|
/// Format elements of basic types into string.
|
||||||
/// The original implementation is mini_formatter in simdjson.h. But it is not public API, so we
|
/// The original implementation is mini_formatter in simdjson.h. But it is not public API, so we
|
||||||
/// add a implementation here.
|
/// add a implementation here.
|
||||||
@ -264,13 +275,93 @@ public:
|
|||||||
format.key(kv.key);
|
format.key(kv.key);
|
||||||
append(kv.value);
|
append(kv.value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void append(simdjson::ondemand::value value)
|
||||||
|
{
|
||||||
|
switch (value.type())
|
||||||
|
{
|
||||||
|
case simdjson::ondemand::json_type::array:
|
||||||
|
append(value.get_array());
|
||||||
|
break;
|
||||||
|
case simdjson::ondemand::json_type::object:
|
||||||
|
append(value.get_object());
|
||||||
|
break;
|
||||||
|
case simdjson::ondemand::json_type::number:
|
||||||
|
{
|
||||||
|
|
||||||
|
simdjson::ondemand::number_type nt{};
|
||||||
|
auto res = value.get_number_type().get(nt);
|
||||||
|
chassert(res == simdjson::SUCCESS);
|
||||||
|
switch (nt)
|
||||||
|
{
|
||||||
|
case simdjson::ondemand::number_type::signed_integer:
|
||||||
|
format.number(value.get_int64().value_unsafe());
|
||||||
|
break;
|
||||||
|
case simdjson::ondemand::number_type::unsigned_integer:
|
||||||
|
format.number(value.get_uint64().value_unsafe());
|
||||||
|
break;
|
||||||
|
case simdjson::ondemand::number_type::floating_point_number:
|
||||||
|
format.number(value.get_double().value_unsafe());
|
||||||
|
break;
|
||||||
|
case simdjson::ondemand::number_type::big_integer:
|
||||||
|
format.string(value.get_string().value_unsafe());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case simdjson::ondemand::json_type::string:
|
||||||
|
format.string(value.get_string().value_unsafe());
|
||||||
|
break;
|
||||||
|
case simdjson::ondemand::json_type::boolean:
|
||||||
|
if (value.get_bool().value_unsafe())
|
||||||
|
format.trueAtom();
|
||||||
|
else
|
||||||
|
format.falseAtom();
|
||||||
|
break;
|
||||||
|
case simdjson::ondemand::json_type::null:
|
||||||
|
format.nullAtom();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void append(simdjson::ondemand::array array)
|
||||||
|
{
|
||||||
|
format.startArray();
|
||||||
|
int i = 0;
|
||||||
|
for (simdjson::ondemand::value value : array)
|
||||||
|
{
|
||||||
|
if (i++ != 0)
|
||||||
|
format.comma();
|
||||||
|
append(value);
|
||||||
|
}
|
||||||
|
format.endArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
void append(simdjson::ondemand::object object)
|
||||||
|
{
|
||||||
|
format.startObject();
|
||||||
|
int i = 0;
|
||||||
|
for (simdjson::ondemand::field field : object)
|
||||||
|
{
|
||||||
|
if (i++ != 0)
|
||||||
|
format.comma();
|
||||||
|
append(field);
|
||||||
|
}
|
||||||
|
format.endObject();
|
||||||
|
}
|
||||||
|
|
||||||
|
void append(simdjson::ondemand::field field)
|
||||||
|
{
|
||||||
|
format.key(field.unescaped_key());
|
||||||
|
append(field.value());
|
||||||
|
}
|
||||||
private:
|
private:
|
||||||
SimdJSONBasicFormatter format;
|
SimdJSONBasicFormatter format;
|
||||||
};
|
};
|
||||||
|
|
||||||
/// This class can be used as an argument for the template class FunctionJSON.
|
/// This class can be used as an argument for the template class FunctionJSON.
|
||||||
/// It provides ability to parse JSONs using simdjson library.
|
/// It provides ability to parse JSONs using simdjson library.
|
||||||
struct SimdJSONParser
|
struct DomSimdJSONParser
|
||||||
{
|
{
|
||||||
class Array;
|
class Array;
|
||||||
class Object;
|
class Object;
|
||||||
@ -419,16 +510,250 @@ private:
|
|||||||
simdjson::dom::parser parser;
|
simdjson::dom::parser parser;
|
||||||
};
|
};
|
||||||
|
|
||||||
inline ALWAYS_INLINE SimdJSONParser::Array SimdJSONParser::Element::getArray() const
|
inline ALWAYS_INLINE DomSimdJSONParser::Array DomSimdJSONParser::Element::getArray() const
|
||||||
{
|
{
|
||||||
return element.get_array().value_unsafe();
|
return element.get_array().value_unsafe();
|
||||||
}
|
}
|
||||||
|
|
||||||
inline ALWAYS_INLINE SimdJSONParser::Object SimdJSONParser::Element::getObject() const
|
inline ALWAYS_INLINE DomSimdJSONParser::Object DomSimdJSONParser::Element::getObject() const
|
||||||
{
|
{
|
||||||
return element.get_object().value_unsafe();
|
return element.get_object().value_unsafe();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct OnDemandSimdJSONParser
|
||||||
|
{
|
||||||
|
class Array;
|
||||||
|
class Object;
|
||||||
|
|
||||||
|
/// References an element in a JSON document, representing a JSON null, boolean, string, number,
|
||||||
|
/// array or object.
|
||||||
|
class Element
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
ALWAYS_INLINE Element() {} /// NOLINT
|
||||||
|
ALWAYS_INLINE Element(simdjson::ondemand::value && value_) { value = std::move(value_); }
|
||||||
|
ALWAYS_INLINE Element & operator=(const simdjson::ondemand::value & value_) { value = value_; return *this; }
|
||||||
|
|
||||||
|
ALWAYS_INLINE ElementType type() const
|
||||||
|
{
|
||||||
|
if (value.type() == simdjson::ondemand::json_type::object)
|
||||||
|
return ElementType::OBJECT;
|
||||||
|
if (value.type() == simdjson::ondemand::json_type::array)
|
||||||
|
return ElementType::ARRAY;
|
||||||
|
if (value.type() == simdjson::ondemand::json_type::boolean)
|
||||||
|
return ElementType::BOOL;
|
||||||
|
if (value.type() == simdjson::ondemand::json_type::string)
|
||||||
|
return ElementType::STRING;
|
||||||
|
if (value.type() == simdjson::ondemand::json_type::number)
|
||||||
|
{
|
||||||
|
auto res = value.get_number_type();
|
||||||
|
if (res.error())
|
||||||
|
return ElementType::NULL_VALUE;
|
||||||
|
if (res.value() == simdjson::ondemand::number_type::signed_integer)
|
||||||
|
return ElementType::INT64;
|
||||||
|
if (res.value() == simdjson::ondemand::number_type::unsigned_integer)
|
||||||
|
return ElementType::UINT64;
|
||||||
|
if (res.value() == simdjson::ondemand::number_type::floating_point_number)
|
||||||
|
return ElementType::DOUBLE;
|
||||||
|
}
|
||||||
|
return ElementType::NULL_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
ALWAYS_INLINE bool isInt64() const { auto res = value.get_number_type(); return !res.error() && res.value() == simdjson::ondemand::number_type::signed_integer; }
|
||||||
|
ALWAYS_INLINE bool isUInt64() const { auto res = value.get_number_type(); return !res.error() && res.value() == simdjson::ondemand::number_type::unsigned_integer; }
|
||||||
|
ALWAYS_INLINE bool isDouble() const { auto res = value.get_number_type(); return !res.error() && res.value() == simdjson::ondemand::number_type::floating_point_number; }
|
||||||
|
ALWAYS_INLINE bool isString() const { auto r = value.type(); return !r.error() && r.value() == simdjson::ondemand::json_type::string; }
|
||||||
|
ALWAYS_INLINE bool isArray() const
|
||||||
|
{
|
||||||
|
auto r = value.type();
|
||||||
|
return !r.error() && r.value() == simdjson::ondemand::json_type::array;
|
||||||
|
}
|
||||||
|
ALWAYS_INLINE bool isObject() const
|
||||||
|
{
|
||||||
|
auto r = value.type();
|
||||||
|
return !r.error() && r.value() == simdjson::ondemand::json_type::object;
|
||||||
|
}
|
||||||
|
ALWAYS_INLINE bool isBool() const { return value.type() == simdjson::ondemand::json_type::boolean; }
|
||||||
|
ALWAYS_INLINE bool isNull() const { return value.type() == simdjson::ondemand::json_type::null; }
|
||||||
|
|
||||||
|
ALWAYS_INLINE Int64 getInt64() const { return value.get_int64().value(); }
|
||||||
|
ALWAYS_INLINE UInt64 getUInt64() const { return value.get_uint64().value(); }
|
||||||
|
ALWAYS_INLINE double getDouble() const { return value.get_double().value(); }
|
||||||
|
ALWAYS_INLINE bool getBool() const { return value.get_bool().value(); }
|
||||||
|
ALWAYS_INLINE std::string_view getString() const
|
||||||
|
{
|
||||||
|
auto r = value.get_string();
|
||||||
|
if (r.error())
|
||||||
|
return {};
|
||||||
|
return r.value();
|
||||||
|
}
|
||||||
|
ALWAYS_INLINE Array getArray() const
|
||||||
|
{
|
||||||
|
SIMDJSON_ASSIGN_OR_THROW(auto arr, value.get_array());
|
||||||
|
return arr;
|
||||||
|
}
|
||||||
|
ALWAYS_INLINE Object getObject() const
|
||||||
|
{
|
||||||
|
SIMDJSON_ASSIGN_OR_THROW(auto obj, value.get_object());
|
||||||
|
return obj;
|
||||||
|
}
|
||||||
|
|
||||||
|
ALWAYS_INLINE simdjson::ondemand::value getElement() const { return value; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
mutable simdjson::ondemand::value value;
|
||||||
|
};
|
||||||
|
|
||||||
|
/// References an array in a JSON document.
|
||||||
|
class Array
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
class Iterator
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Iterator() = default;
|
||||||
|
ALWAYS_INLINE Iterator(const simdjson::ondemand::array_iterator & it_) : it(it_) {} /// NOLINT
|
||||||
|
ALWAYS_INLINE Element operator*() const { return (*it).value(); }
|
||||||
|
ALWAYS_INLINE Iterator & operator++() { ++it; return *this; }
|
||||||
|
ALWAYS_INLINE friend bool operator!=(const Iterator & left, const Iterator & right) { return left.it != right.it; }
|
||||||
|
ALWAYS_INLINE friend bool operator==(const Iterator & left, const Iterator & right) { return !(left != right); }
|
||||||
|
private:
|
||||||
|
mutable simdjson::ondemand::array_iterator it;
|
||||||
|
};
|
||||||
|
|
||||||
|
ALWAYS_INLINE Array(const simdjson::ondemand::array & array_) : array(array_) {} /// NOLINT
|
||||||
|
ALWAYS_INLINE Iterator begin() const { return array.begin().value(); }
|
||||||
|
ALWAYS_INLINE Iterator end() const { return array.end().value(); }
|
||||||
|
ALWAYS_INLINE size_t size() const { return array.count_elements().value(); }
|
||||||
|
ALWAYS_INLINE Element operator[](size_t index) const
|
||||||
|
{
|
||||||
|
if (index < last_index)
|
||||||
|
array.reset();
|
||||||
|
if (last_index == 0)
|
||||||
|
{
|
||||||
|
SIMDJSON_ASSIGN_OR_THROW(auto iter, array.begin());
|
||||||
|
it = iter;
|
||||||
|
}
|
||||||
|
size_t diff = index - last_index;
|
||||||
|
while (diff--)
|
||||||
|
++it;
|
||||||
|
last_index = index;
|
||||||
|
SIMDJSON_ASSIGN_OR_THROW(auto ele, *it);
|
||||||
|
return ele;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
mutable size_t last_index{};
|
||||||
|
mutable simdjson::ondemand::array_iterator it;
|
||||||
|
mutable simdjson::ondemand::array array;
|
||||||
|
};
|
||||||
|
|
||||||
|
using KeyValuePair = std::pair<std::string_view, Element>;
|
||||||
|
|
||||||
|
/// References an object in a JSON document.
|
||||||
|
class Object
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
class Iterator
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
ALWAYS_INLINE Iterator(const simdjson::ondemand::object_iterator & it_) : it(it_) {} /// NOLINT
|
||||||
|
|
||||||
|
ALWAYS_INLINE KeyValuePair operator*() const
|
||||||
|
{
|
||||||
|
auto field_wrapper = *it;
|
||||||
|
if (field_wrapper.error())
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
std::string_view key;
|
||||||
|
auto key_error = field_wrapper.unescaped_key().get(key);
|
||||||
|
if (key_error)
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
::simdjson::ondemand::value v = field_wrapper.value();
|
||||||
|
return {key, Element(std::move(v))};
|
||||||
|
}
|
||||||
|
|
||||||
|
ALWAYS_INLINE Iterator & operator++() { ++it; return *this; }
|
||||||
|
ALWAYS_INLINE friend bool operator!=(const Iterator & left, const Iterator & right) { return left.it != right.it; }
|
||||||
|
ALWAYS_INLINE friend bool operator==(const Iterator & left, const Iterator & right) { return !(left != right); }
|
||||||
|
private:
|
||||||
|
mutable simdjson::ondemand::object_iterator it;
|
||||||
|
};
|
||||||
|
|
||||||
|
ALWAYS_INLINE Object(const simdjson::ondemand::object & object_) : object(object_) {} /// NOLINT
|
||||||
|
ALWAYS_INLINE Iterator begin() const { return object.begin().value(); }
|
||||||
|
ALWAYS_INLINE Iterator end() const { return object.end().value(); }
|
||||||
|
///NOTE: call size() before iterate
|
||||||
|
ALWAYS_INLINE size_t size() const
|
||||||
|
{
|
||||||
|
return object.count_fields().value();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool find(std::string_view key, Element & result) const
|
||||||
|
{
|
||||||
|
auto x = object.find_field_unordered(key);
|
||||||
|
if (x.error())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
result = x.value_unsafe();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool reset()
|
||||||
|
{
|
||||||
|
auto v = object.reset();
|
||||||
|
if (v.error())
|
||||||
|
return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Optional: Provides access to an object's element by index.
|
||||||
|
KeyValuePair operator[](size_t index) const
|
||||||
|
{
|
||||||
|
SIMDJSON_ASSIGN_OR_THROW(auto it, object.begin());
|
||||||
|
while (index--)
|
||||||
|
{
|
||||||
|
(void)*(it); /// NEED TO DO THIS TO ITERATE
|
||||||
|
++it;
|
||||||
|
}
|
||||||
|
SIMDJSON_ASSIGN_OR_THROW(auto field, *it);
|
||||||
|
std::string_view key = field.unescaped_key().value();
|
||||||
|
simdjson::ondemand::value value = field.value();
|
||||||
|
return {key, Element(std::move(value))};
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
mutable simdjson::ondemand::object object;
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Parses a JSON document, returns the reference to its root element if succeeded.
|
||||||
|
bool parse(std::string_view json, Element & result)
|
||||||
|
{
|
||||||
|
padstr = json;
|
||||||
|
auto res = parser.iterate(padstr);
|
||||||
|
if (res.error())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
document = std::move(res.value());
|
||||||
|
auto v = document.get_value();
|
||||||
|
if (v.error())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
result = v.value();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
simdjson::ondemand::parser parser;
|
||||||
|
simdjson::ondemand::document document{};
|
||||||
|
simdjson::padded_string padstr;
|
||||||
|
};
|
||||||
|
|
||||||
|
using SimdJSONParser = OnDemandSimdJSONParser;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -1211,9 +1211,9 @@ public:
|
|||||||
auto array = element.getArray();
|
auto array = element.getArray();
|
||||||
auto it = array.begin();
|
auto it = array.begin();
|
||||||
|
|
||||||
for (size_t index = 0; (index != nested.size()) && (it != array.end()); ++index)
|
for (size_t index = 0; (index != nested.size()) && (it != array.end()); ++index, ++it)
|
||||||
{
|
{
|
||||||
if (nested[index]->insertResultToColumn(tuple.getColumn(index), *it++, insert_settings, format_settings, error))
|
if (nested[index]->insertResultToColumn(tuple.getColumn(index), *it, insert_settings, format_settings, error))
|
||||||
{
|
{
|
||||||
were_valid_elements = true;
|
were_valid_elements = true;
|
||||||
}
|
}
|
||||||
@ -1239,9 +1239,9 @@ public:
|
|||||||
if (name_to_index_map.empty())
|
if (name_to_index_map.empty())
|
||||||
{
|
{
|
||||||
auto it = object.begin();
|
auto it = object.begin();
|
||||||
for (size_t index = 0; (index != nested.size()) && (it != object.end()); ++index)
|
for (size_t index = 0; (index != nested.size()) && (it != object.end()); ++index, ++it)
|
||||||
{
|
{
|
||||||
if (nested[index]->insertResultToColumn(tuple.getColumn(index), (*it++).second, insert_settings, format_settings, error))
|
if (nested[index]->insertResultToColumn(tuple.getColumn(index), (*it).second, insert_settings, format_settings, error))
|
||||||
{
|
{
|
||||||
were_valid_elements = true;
|
were_valid_elements = true;
|
||||||
}
|
}
|
||||||
@ -1316,18 +1316,18 @@ public:
|
|||||||
error = fmt::format("cannot read Map value from JSON element: {}", jsonElementToString<JSONParser>(element, format_settings));
|
error = fmt::format("cannot read Map value from JSON element: {}", jsonElementToString<JSONParser>(element, format_settings));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto & map_col = assert_cast<ColumnMap &>(column);
|
auto & map_col = assert_cast<ColumnMap &>(column);
|
||||||
auto & offsets = map_col.getNestedColumn().getOffsets();
|
auto & offsets = map_col.getNestedColumn().getOffsets();
|
||||||
auto & tuple_col = map_col.getNestedData();
|
auto & tuple_col = map_col.getNestedData();
|
||||||
auto & key_col = tuple_col.getColumn(0);
|
auto & key_col = tuple_col.getColumn(0);
|
||||||
auto & value_col = tuple_col.getColumn(1);
|
auto & value_col = tuple_col.getColumn(1);
|
||||||
size_t old_size = tuple_col.size();
|
size_t old_size = tuple_col.size();
|
||||||
|
|
||||||
auto object = element.getObject();
|
auto object = element.getObject();
|
||||||
auto it = object.begin();
|
auto it = object.begin();
|
||||||
|
size_t object_size = 0;
|
||||||
for (; it != object.end(); ++it)
|
for (; it != object.end(); ++it)
|
||||||
{
|
{
|
||||||
|
++object_size;
|
||||||
auto pair = *it;
|
auto pair = *it;
|
||||||
|
|
||||||
/// Insert key
|
/// Insert key
|
||||||
@ -1350,7 +1350,7 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
offsets.push_back(old_size + object.size());
|
offsets.push_back(old_size + object_size);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,8 +102,12 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// serialize the json element into column's buffer directly
|
/// serialize the json element into column's buffer directly
|
||||||
void addElement(const Element & element)
|
void addElement(const Element & element, const std::string sep = ", ")
|
||||||
{
|
{
|
||||||
|
if (is_first)
|
||||||
|
is_first = false;
|
||||||
|
else
|
||||||
|
addRawData(sep.data(), sep.size());
|
||||||
formatter.append(element.getElement());
|
formatter.append(element.getElement());
|
||||||
}
|
}
|
||||||
void commit()
|
void commit()
|
||||||
@ -121,7 +125,7 @@ private:
|
|||||||
IColumn::Offsets & offsets;
|
IColumn::Offsets & offsets;
|
||||||
Formatter formatter;
|
Formatter formatter;
|
||||||
size_t prev_offset;
|
size_t prev_offset;
|
||||||
|
bool is_first{true};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -364,7 +368,6 @@ public:
|
|||||||
/// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6),
|
/// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6),
|
||||||
/// however this functionality is not implemented yet
|
/// however this functionality is not implemented yet
|
||||||
}
|
}
|
||||||
current_element = root;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status == VisitorStatus::Exhausted)
|
if (status == VisitorStatus::Exhausted)
|
||||||
@ -416,19 +419,14 @@ public:
|
|||||||
bool success = false;
|
bool success = false;
|
||||||
const char * array_begin = "[";
|
const char * array_begin = "[";
|
||||||
const char * array_end = "]";
|
const char * array_end = "]";
|
||||||
const char * comma = ", ";
|
|
||||||
JSONStringSerializer json_serializer(col_str);
|
JSONStringSerializer json_serializer(col_str);
|
||||||
json_serializer.addRawData(array_begin, 1);
|
json_serializer.addRawData(array_begin, 1);
|
||||||
while ((status = generator_json_path.getNextItem(current_element)) != VisitorStatus::Exhausted)
|
std::function<void(const Element&)> result_func= [&json_serializer](const Element & element) { json_serializer.addElement(element); };
|
||||||
|
while ((status = generator_json_path.getNextItemBatch(current_element, result_func)) != VisitorStatus::Exhausted)
|
||||||
{
|
{
|
||||||
if (status == VisitorStatus::Ok)
|
if (status == VisitorStatus::Ok)
|
||||||
{
|
{
|
||||||
if (success)
|
|
||||||
{
|
|
||||||
json_serializer.addRawData(comma, 2);
|
|
||||||
}
|
|
||||||
success = true;
|
success = true;
|
||||||
json_serializer.addElement(current_element);
|
|
||||||
}
|
}
|
||||||
else if (status == VisitorStatus::Error)
|
else if (status == VisitorStatus::Error)
|
||||||
{
|
{
|
||||||
@ -436,7 +434,6 @@ public:
|
|||||||
/// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6),
|
/// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6),
|
||||||
/// however this functionality is not implemented yet
|
/// however this functionality is not implemented yet
|
||||||
}
|
}
|
||||||
current_element = root;
|
|
||||||
}
|
}
|
||||||
if (!success)
|
if (!success)
|
||||||
{
|
{
|
||||||
|
@ -273,12 +273,13 @@ private:
|
|||||||
if (element.isArray())
|
if (element.isArray())
|
||||||
{
|
{
|
||||||
auto array = element.getArray();
|
auto array = element.getArray();
|
||||||
|
size_t array_size = array.size();
|
||||||
if (index >= 0)
|
if (index >= 0)
|
||||||
--index;
|
--index;
|
||||||
else
|
else
|
||||||
index += array.size();
|
index += array_size;
|
||||||
|
|
||||||
if (static_cast<size_t>(index) >= array.size())
|
if (static_cast<size_t>(index) >= array_size)
|
||||||
return false;
|
return false;
|
||||||
element = array[index];
|
element = array[index];
|
||||||
out_key = {};
|
out_key = {};
|
||||||
@ -290,12 +291,13 @@ private:
|
|||||||
if (element.isObject())
|
if (element.isObject())
|
||||||
{
|
{
|
||||||
auto object = element.getObject();
|
auto object = element.getObject();
|
||||||
|
size_t object_size = object.size();
|
||||||
if (index >= 0)
|
if (index >= 0)
|
||||||
--index;
|
--index;
|
||||||
else
|
else
|
||||||
index += object.size();
|
index += object_size;
|
||||||
|
|
||||||
if (static_cast<size_t>(index) >= object.size())
|
if (static_cast<size_t>(index) >= object_size)
|
||||||
return false;
|
return false;
|
||||||
std::tie(out_key, element) = object[index];
|
std::tie(out_key, element) = object[index];
|
||||||
return true;
|
return true;
|
||||||
@ -985,10 +987,13 @@ public:
|
|||||||
auto array = element.getArray();
|
auto array = element.getArray();
|
||||||
ColumnArray & col_res = assert_cast<ColumnArray &>(dest);
|
ColumnArray & col_res = assert_cast<ColumnArray &>(dest);
|
||||||
|
|
||||||
|
size_t size = 0;
|
||||||
for (auto value : array)
|
for (auto value : array)
|
||||||
|
{
|
||||||
|
++size;
|
||||||
JSONExtractRawImpl<JSONParser>::insertResultToColumn(col_res.getData(), value, {}, format_settings, error);
|
JSONExtractRawImpl<JSONParser>::insertResultToColumn(col_res.getData(), value, {}, format_settings, error);
|
||||||
|
}
|
||||||
col_res.getOffsets().push_back(col_res.getOffsets().back() + array.size());
|
col_res.getOffsets().push_back(col_res.getOffsets().back() + size);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -1021,13 +1026,15 @@ public:
|
|||||||
auto & col_key = assert_cast<ColumnString &>(col_tuple.getColumn(0));
|
auto & col_key = assert_cast<ColumnString &>(col_tuple.getColumn(0));
|
||||||
auto & col_value = assert_cast<ColumnString &>(col_tuple.getColumn(1));
|
auto & col_value = assert_cast<ColumnString &>(col_tuple.getColumn(1));
|
||||||
|
|
||||||
|
size_t size = 0;
|
||||||
for (const auto & [key, value] : object)
|
for (const auto & [key, value] : object)
|
||||||
{
|
{
|
||||||
col_key.insertData(key.data(), key.size());
|
col_key.insertData(key.data(), key.size());
|
||||||
JSONExtractRawImpl<JSONParser>::insertResultToColumn(col_value, value, {}, format_settings, error);
|
JSONExtractRawImpl<JSONParser>::insertResultToColumn(col_value, value, {}, format_settings, error);
|
||||||
|
++size;
|
||||||
}
|
}
|
||||||
|
|
||||||
col_arr.getOffsets().push_back(col_arr.getOffsets().back() + object.size());
|
col_arr.getOffsets().push_back(col_arr.getOffsets().back() + size);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -1055,12 +1062,14 @@ public:
|
|||||||
ColumnArray & col_res = assert_cast<ColumnArray &>(dest);
|
ColumnArray & col_res = assert_cast<ColumnArray &>(dest);
|
||||||
auto & col_key = assert_cast<ColumnString &>(col_res.getData());
|
auto & col_key = assert_cast<ColumnString &>(col_res.getData());
|
||||||
|
|
||||||
|
size_t count = 0;
|
||||||
for (const auto & [key, value] : object)
|
for (const auto & [key, value] : object)
|
||||||
{
|
{
|
||||||
|
++count;
|
||||||
col_key.insertData(key.data(), key.size());
|
col_key.insertData(key.data(), key.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
col_res.getOffsets().push_back(col_res.getOffsets().back() + object.size());
|
col_res.getOffsets().push_back(col_res.getOffsets().back() + count);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <utility>
|
||||||
#include <Functions/JSONPath/Generator/IGenerator.h>
|
#include <Functions/JSONPath/Generator/IGenerator.h>
|
||||||
#include <Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h>
|
#include <Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h>
|
||||||
#include <Functions/JSONPath/Generator/VisitorJSONPathRange.h>
|
#include <Functions/JSONPath/Generator/VisitorJSONPathRange.h>
|
||||||
@ -21,6 +22,7 @@ template <typename JSONParser>
|
|||||||
class GeneratorJSONPath : public IGenerator<JSONParser>
|
class GeneratorJSONPath : public IGenerator<JSONParser>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
using TElement = typename JSONParser::Element;
|
||||||
/**
|
/**
|
||||||
* Traverses children ASTs of ASTJSONPathQuery and creates a vector of corresponding visitors
|
* Traverses children ASTs of ASTJSONPathQuery and creates a vector of corresponding visitors
|
||||||
* @param query_ptr_ pointer to ASTJSONPathQuery
|
* @param query_ptr_ pointer to ASTJSONPathQuery
|
||||||
@ -80,11 +82,6 @@ public:
|
|||||||
return VisitorStatus::Exhausted;
|
return VisitorStatus::Exhausted;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < current_visitor; ++i)
|
|
||||||
{
|
|
||||||
visitors[i]->apply(current);
|
|
||||||
}
|
|
||||||
|
|
||||||
VisitorStatus status = VisitorStatus::Error;
|
VisitorStatus status = VisitorStatus::Error;
|
||||||
for (size_t i = current_visitor; i < visitors.size(); ++i)
|
for (size_t i = current_visitor; i < visitors.size(); ++i)
|
||||||
{
|
{
|
||||||
@ -105,6 +102,31 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
VisitorStatus getNextItemBatch(TElement & element, std::function<void(const TElement &)> & res_func) override
|
||||||
|
{
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
/// element passed to us actually is root, so here we assign current to root
|
||||||
|
auto current = element;
|
||||||
|
if (current_visitor < 0)
|
||||||
|
return VisitorStatus::Exhausted;
|
||||||
|
|
||||||
|
VisitorStatus status = VisitorStatus::Error;
|
||||||
|
size_t visitor_size = visitors.size();
|
||||||
|
for (size_t i = current_visitor; i < visitor_size; ++i)
|
||||||
|
{
|
||||||
|
status = visitors[i]->visitBatch(current, res_func, i == visitor_size - 1);
|
||||||
|
current_visitor = static_cast<int>(i);
|
||||||
|
if (status == VisitorStatus::Error || status == VisitorStatus::Ignore)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
updateVisitorsForNextRun();
|
||||||
|
|
||||||
|
if (status != VisitorStatus::Ignore)
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void reinitialize()
|
void reinitialize()
|
||||||
{
|
{
|
||||||
while (current_visitor >= 0)
|
while (current_visitor >= 0)
|
||||||
|
@ -10,6 +10,8 @@ template <typename JSONParser>
|
|||||||
class IGenerator
|
class IGenerator
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
using TElement = typename JSONParser::Element;
|
||||||
|
|
||||||
IGenerator() = default;
|
IGenerator() = default;
|
||||||
|
|
||||||
virtual const char * getName() const = 0;
|
virtual const char * getName() const = 0;
|
||||||
@ -20,7 +22,9 @@ public:
|
|||||||
* @param element to be extracted into
|
* @param element to be extracted into
|
||||||
* @return true if generator is not exhausted
|
* @return true if generator is not exhausted
|
||||||
*/
|
*/
|
||||||
virtual VisitorStatus getNextItem(typename JSONParser::Element & element) = 0;
|
virtual VisitorStatus getNextItem(TElement & element) = 0;
|
||||||
|
|
||||||
|
virtual VisitorStatus getNextItemBatch(TElement & element, std::function<void(const TElement &)> & res_func) = 0;
|
||||||
|
|
||||||
virtual ~IGenerator() = default;
|
virtual ~IGenerator() = default;
|
||||||
};
|
};
|
||||||
|
@ -8,13 +8,20 @@ template <typename JSONParser>
|
|||||||
class IVisitor
|
class IVisitor
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
using TElement = typename JSONParser::Element;
|
||||||
virtual const char * getName() const = 0;
|
virtual const char * getName() const = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Applies this visitor to document and mutates its state
|
* Applies this visitor to document and mutates its state
|
||||||
* @param element simdjson element
|
* @param element simdjson element
|
||||||
*/
|
*/
|
||||||
virtual VisitorStatus visit(typename JSONParser::Element & element) = 0;
|
virtual VisitorStatus visit(TElement & element) = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Applies this visitor to document and mutates its state, returning a batch of results
|
||||||
|
* @param element simdjson element
|
||||||
|
*/
|
||||||
|
virtual VisitorStatus visitBatch(TElement &element, std::function<void(const TElement &)> & res_func, bool can_reduce) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Applies this visitor to document, but does not mutate state
|
* Applies this visitor to document, but does not mutate state
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <optional>
|
||||||
#include <Functions/JSONPath/ASTs/ASTJSONPathMemberAccess.h>
|
#include <Functions/JSONPath/ASTs/ASTJSONPathMemberAccess.h>
|
||||||
#include <Functions/JSONPath/Generator/IVisitor.h>
|
#include <Functions/JSONPath/Generator/IVisitor.h>
|
||||||
#include <Functions/JSONPath/Generator/VisitorStatus.h>
|
#include <Functions/JSONPath/Generator/VisitorStatus.h>
|
||||||
@ -10,6 +11,7 @@ template <typename JSONParser>
|
|||||||
class VisitorJSONPathMemberAccess : public IVisitor<JSONParser>
|
class VisitorJSONPathMemberAccess : public IVisitor<JSONParser>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
using TElement = JSONParser::Element;
|
||||||
explicit VisitorJSONPathMemberAccess(ASTPtr member_access_ptr_)
|
explicit VisitorJSONPathMemberAccess(ASTPtr member_access_ptr_)
|
||||||
: member_access_ptr(member_access_ptr_->as<ASTJSONPathMemberAccess>()) { }
|
: member_access_ptr(member_access_ptr_->as<ASTJSONPathMemberAccess>()) { }
|
||||||
|
|
||||||
@ -18,25 +20,42 @@ public:
|
|||||||
VisitorStatus apply(typename JSONParser::Element & element) const override
|
VisitorStatus apply(typename JSONParser::Element & element) const override
|
||||||
{
|
{
|
||||||
typename JSONParser::Element result;
|
typename JSONParser::Element result;
|
||||||
element.getObject().find(std::string_view(member_access_ptr->member_name), result);
|
auto obj = element.getObject();
|
||||||
element = result;
|
if (!obj.find(std::string_view(member_access_ptr->member_name), result))
|
||||||
|
{
|
||||||
|
return VisitorStatus::Error;
|
||||||
|
}
|
||||||
|
element = std::move(result);
|
||||||
return VisitorStatus::Ok;
|
return VisitorStatus::Ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
VisitorStatus visit(typename JSONParser::Element & element) override
|
VisitorStatus visit(typename JSONParser::Element & element) override
|
||||||
{
|
{
|
||||||
|
if (this->isExhausted())
|
||||||
|
{
|
||||||
|
return VisitorStatus::Exhausted;
|
||||||
|
}
|
||||||
this->setExhausted(true);
|
this->setExhausted(true);
|
||||||
if (!element.isObject())
|
if (!element.isObject())
|
||||||
{
|
{
|
||||||
return VisitorStatus::Error;
|
return VisitorStatus::Error;
|
||||||
}
|
}
|
||||||
typename JSONParser::Element result;
|
return apply(element);
|
||||||
if (!element.getObject().find(std::string_view(member_access_ptr->member_name), result))
|
}
|
||||||
{
|
|
||||||
|
VisitorStatus visitBatch(TElement & element, std::function<void(const TElement &)> & res_func, bool can_reduce) override
|
||||||
|
{
|
||||||
|
if (this->isExhausted())
|
||||||
|
return VisitorStatus::Exhausted;
|
||||||
|
this->setExhausted(true);
|
||||||
|
if (!element.isObject())
|
||||||
return VisitorStatus::Error;
|
return VisitorStatus::Error;
|
||||||
}
|
|
||||||
apply(element);
|
auto status = apply(element);
|
||||||
return VisitorStatus::Ok;
|
if (status == VisitorStatus::Ok && can_reduce)
|
||||||
|
res_func(element);
|
||||||
|
|
||||||
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
void reinitialize() override { this->setExhausted(false); }
|
void reinitialize() override { this->setExhausted(false); }
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <optional>
|
||||||
#include <Functions/JSONPath/ASTs/ASTJSONPathRange.h>
|
#include <Functions/JSONPath/ASTs/ASTJSONPathRange.h>
|
||||||
#include <Functions/JSONPath/Generator/IVisitor.h>
|
#include <Functions/JSONPath/Generator/IVisitor.h>
|
||||||
#include <Functions/JSONPath/Generator/VisitorStatus.h>
|
#include <Functions/JSONPath/Generator/VisitorStatus.h>
|
||||||
@ -20,21 +21,78 @@ public:
|
|||||||
|
|
||||||
VisitorStatus apply(typename JSONParser::Element & element) const override
|
VisitorStatus apply(typename JSONParser::Element & element) const override
|
||||||
{
|
{
|
||||||
typename JSONParser::Array array = element.getArray();
|
element = (*array)[current_index];
|
||||||
element = array[current_index];
|
|
||||||
return VisitorStatus::Ok;
|
return VisitorStatus::Ok;
|
||||||
}
|
}
|
||||||
|
using TElement = JSONParser::Element;
|
||||||
|
VisitorStatus visitBatch(TElement & element, std::function<void(const TElement &)> & res_func, bool can_reduce) override
|
||||||
|
{
|
||||||
|
if (!array && !element.isArray())
|
||||||
|
{
|
||||||
|
this->setExhausted(true);
|
||||||
|
return VisitorStatus::Error;
|
||||||
|
}
|
||||||
|
|
||||||
|
VisitorStatus status = VisitorStatus::Ok;
|
||||||
|
if (!array)
|
||||||
|
{
|
||||||
|
current_element = element;
|
||||||
|
array = current_element.getArray();
|
||||||
|
if (!can_reduce)
|
||||||
|
array_size = array.value().size();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (can_reduce)
|
||||||
|
{
|
||||||
|
std::set<size_t> index_set{};
|
||||||
|
for (auto range: range_ptr->ranges)
|
||||||
|
for (size_t i = range.first ; i < range.second; ++i)
|
||||||
|
index_set.insert(i);
|
||||||
|
|
||||||
|
size_t idx = 0;
|
||||||
|
for (auto item: array.value())
|
||||||
|
if (index_set.find(idx++) != index_set.end())
|
||||||
|
res_func(item);
|
||||||
|
|
||||||
|
this->setExhausted(true);
|
||||||
|
status = VisitorStatus::Ok;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (current_index < array_size.value())
|
||||||
|
{
|
||||||
|
apply(element);
|
||||||
|
status = VisitorStatus::Ok;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
status = VisitorStatus::Ignore;
|
||||||
|
|
||||||
|
if (current_index + 1 == range_ptr->ranges[current_range].second
|
||||||
|
&& current_range + 1 == range_ptr->ranges.size())
|
||||||
|
{
|
||||||
|
this->setExhausted(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
VisitorStatus visit(typename JSONParser::Element & element) override
|
VisitorStatus visit(typename JSONParser::Element & element) override
|
||||||
{
|
{
|
||||||
if (!element.isArray())
|
if (!array && !element.isArray())
|
||||||
{
|
{
|
||||||
this->setExhausted(true);
|
this->setExhausted(true);
|
||||||
return VisitorStatus::Error;
|
return VisitorStatus::Error;
|
||||||
}
|
}
|
||||||
|
|
||||||
VisitorStatus status;
|
VisitorStatus status;
|
||||||
if (current_index < element.getArray().size())
|
if (!array)
|
||||||
|
{
|
||||||
|
current_element = element;
|
||||||
|
array = current_element.getArray();
|
||||||
|
array_size = array.value().size();
|
||||||
|
}
|
||||||
|
if (current_index < array_size.value())
|
||||||
{
|
{
|
||||||
apply(element);
|
apply(element);
|
||||||
status = VisitorStatus::Ok;
|
status = VisitorStatus::Ok;
|
||||||
@ -58,6 +116,8 @@ public:
|
|||||||
current_range = 0;
|
current_range = 0;
|
||||||
current_index = range_ptr->ranges[current_range].first;
|
current_index = range_ptr->ranges[current_range].first;
|
||||||
this->setExhausted(false);
|
this->setExhausted(false);
|
||||||
|
array_size.reset();
|
||||||
|
array.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateState() override
|
void updateState() override
|
||||||
@ -74,6 +134,9 @@ private:
|
|||||||
ASTJSONPathRange * range_ptr;
|
ASTJSONPathRange * range_ptr;
|
||||||
size_t current_range;
|
size_t current_range;
|
||||||
UInt32 current_index;
|
UInt32 current_index;
|
||||||
|
std::optional<size_t> array_size{};
|
||||||
|
std::optional<typename JSONParser::Array> array{};
|
||||||
|
typename JSONParser::Element current_element{};
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@ template <typename JSONParser>
|
|||||||
class VisitorJSONPathRoot : public IVisitor<JSONParser>
|
class VisitorJSONPathRoot : public IVisitor<JSONParser>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
using TElement = JSONParser::Element;
|
||||||
explicit VisitorJSONPathRoot(ASTPtr) { }
|
explicit VisitorJSONPathRoot(ASTPtr) { }
|
||||||
|
|
||||||
const char * getName() const override { return "VisitorJSONPathRoot"; }
|
const char * getName() const override { return "VisitorJSONPathRoot"; }
|
||||||
@ -27,9 +28,19 @@ public:
|
|||||||
return VisitorStatus::Ok;
|
return VisitorStatus::Ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
VisitorStatus visitBatch(TElement & element, std::function<void(const TElement &)> & res_func, bool can_reduce) override
|
||||||
|
{
|
||||||
|
apply(element);
|
||||||
|
this->setExhausted(true);
|
||||||
|
if (can_reduce)
|
||||||
|
res_func(element);
|
||||||
|
return VisitorStatus::Ok;
|
||||||
|
}
|
||||||
|
|
||||||
void reinitialize() override { this->setExhausted(false); }
|
void reinitialize() override { this->setExhausted(false); }
|
||||||
|
|
||||||
void updateState() override { }
|
void updateState() override { }
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <optional>
|
||||||
#include <Functions/JSONPath/ASTs/ASTJSONPathStar.h>
|
#include <Functions/JSONPath/ASTs/ASTJSONPathStar.h>
|
||||||
#include <Functions/JSONPath/Generator/IVisitor.h>
|
#include <Functions/JSONPath/Generator/IVisitor.h>
|
||||||
#include <Functions/JSONPath/Generator/VisitorStatus.h>
|
#include <Functions/JSONPath/Generator/VisitorStatus.h>
|
||||||
@ -19,21 +20,25 @@ public:
|
|||||||
|
|
||||||
VisitorStatus apply(typename JSONParser::Element & element) const override
|
VisitorStatus apply(typename JSONParser::Element & element) const override
|
||||||
{
|
{
|
||||||
typename JSONParser::Array array = element.getArray();
|
element = array.value()[current_index];
|
||||||
element = array[current_index];
|
|
||||||
return VisitorStatus::Ok;
|
return VisitorStatus::Ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
VisitorStatus visit(typename JSONParser::Element & element) override
|
VisitorStatus visit(typename JSONParser::Element & element) override
|
||||||
{
|
{
|
||||||
if (!element.isArray())
|
if (!array && !element.isArray())
|
||||||
{
|
{
|
||||||
this->setExhausted(true);
|
this->setExhausted(true);
|
||||||
return VisitorStatus::Error;
|
return VisitorStatus::Error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!array_size)
|
||||||
|
{
|
||||||
|
array = element.getArray();
|
||||||
|
array_size = array.value().size();
|
||||||
|
}
|
||||||
VisitorStatus status;
|
VisitorStatus status;
|
||||||
if (current_index < element.getArray().size())
|
if (current_index < array_size.value())
|
||||||
{
|
{
|
||||||
apply(element);
|
apply(element);
|
||||||
status = VisitorStatus::Ok;
|
status = VisitorStatus::Ok;
|
||||||
@ -46,11 +51,53 @@ public:
|
|||||||
|
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
using TElement = JSONParser::Element;
|
||||||
|
VisitorStatus visitBatch(TElement & element, std::function<void(const TElement &)> & res_func, bool can_reduce) override
|
||||||
|
{
|
||||||
|
if (!array && !element.isArray())
|
||||||
|
{
|
||||||
|
this->setExhausted(true);
|
||||||
|
return VisitorStatus::Error;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!array)
|
||||||
|
{
|
||||||
|
array = element.getArray();
|
||||||
|
array_size = array.value().size();
|
||||||
|
}
|
||||||
|
VisitorStatus status = VisitorStatus::Ok;
|
||||||
|
|
||||||
|
if (can_reduce)
|
||||||
|
{
|
||||||
|
for (auto item: array.value())
|
||||||
|
res_func(item);
|
||||||
|
|
||||||
|
this->setExhausted(true);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (current_index < array_size.value())
|
||||||
|
{
|
||||||
|
apply(element);
|
||||||
|
status = VisitorStatus::Ok;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
status = VisitorStatus::Ignore;
|
||||||
|
this->setExhausted(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
void reinitialize() override
|
void reinitialize() override
|
||||||
{
|
{
|
||||||
current_index = 0;
|
current_index = 0;
|
||||||
this->setExhausted(false);
|
this->setExhausted(false);
|
||||||
|
array_size.reset();
|
||||||
|
array.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateState() override
|
void updateState() override
|
||||||
@ -59,7 +106,9 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
UInt32 current_index;
|
UInt32 current_index{};
|
||||||
|
std::optional<typename JSONParser::Array> array{};
|
||||||
|
std::optional<size_t> array_size{};
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -62,16 +62,17 @@ public:
|
|||||||
for (size_t i = 0; i < num_rows; ++i)
|
for (size_t i = 0; i < num_rows; ++i)
|
||||||
{
|
{
|
||||||
auto array_size = col_num->getInt(i);
|
auto array_size = col_num->getInt(i);
|
||||||
|
auto element_size = col_value->byteSizeAt(i);
|
||||||
|
|
||||||
if (unlikely(array_size < 0))
|
if (unlikely(array_size < 0))
|
||||||
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} cannot be negative: while executing function {}", array_size, getName());
|
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} cannot be negative: while executing function {}", array_size, getName());
|
||||||
|
|
||||||
Int64 estimated_size = 0;
|
Int64 estimated_size = 0;
|
||||||
if (unlikely(common::mulOverflow(array_size, col_value->byteSize(), estimated_size)))
|
if (unlikely(common::mulOverflow(array_size, element_size, estimated_size)))
|
||||||
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, col_value->byteSize(), getName());
|
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, element_size, getName());
|
||||||
|
|
||||||
if (unlikely(estimated_size > max_array_size_in_columns_bytes))
|
if (unlikely(estimated_size > max_array_size_in_columns_bytes))
|
||||||
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, col_value->byteSize(), getName());
|
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, element_size, getName());
|
||||||
|
|
||||||
offset += array_size;
|
offset += array_size;
|
||||||
|
|
||||||
|
@ -226,6 +226,26 @@ template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConf
|
|||||||
#endif
|
#endif
|
||||||
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
|
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
|
||||||
|
|
||||||
|
#if USE_AVRO && USE_AWS_S3
|
||||||
|
template class TableFunctionObjectStorage<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
|
||||||
|
template class TableFunctionObjectStorage<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if USE_AVRO && USE_HDFS
|
||||||
|
template class TableFunctionObjectStorage<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if USE_PARQUET && USE_AWS_S3
|
||||||
|
template class TableFunctionObjectStorage<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if USE_AWS_S3
|
||||||
|
template class TableFunctionObjectStorage<HudiClusterDefinition, StorageS3HudiConfiguration>;
|
||||||
|
#endif
|
||||||
|
|
||||||
#if USE_AVRO
|
#if USE_AVRO
|
||||||
void registerTableFunctionIceberg(TableFunctionFactory & factory)
|
void registerTableFunctionIceberg(TableFunctionFactory & factory)
|
||||||
{
|
{
|
||||||
|
@ -96,7 +96,7 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
|
|||||||
{
|
{
|
||||||
.documentation = {
|
.documentation = {
|
||||||
.description=R"(The table function can be used to read the data stored on HDFS in parallel for many nodes in a specified cluster.)",
|
.description=R"(The table function can be used to read the data stored on HDFS in parallel for many nodes in a specified cluster.)",
|
||||||
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster_name, uri, format)", ""}}},
|
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster, uri, format)", ""}}},
|
||||||
.allow_readonly = false
|
.allow_readonly = false
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
@ -105,15 +105,77 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
|
|||||||
UNUSED(factory);
|
UNUSED(factory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#if USE_AVRO
|
||||||
|
void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
|
||||||
|
{
|
||||||
|
UNUSED(factory);
|
||||||
|
|
||||||
#if USE_AWS_S3
|
#if USE_AWS_S3
|
||||||
template class TableFunctionObjectStorageCluster<S3ClusterDefinition, StorageS3Configuration>;
|
factory.registerFunction<TableFunctionIcebergS3Cluster>(
|
||||||
|
{.documentation
|
||||||
|
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)",
|
||||||
|
.examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}},
|
||||||
|
.categories{"DataLake"}},
|
||||||
|
.allow_readonly = false});
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if USE_AZURE_BLOB_STORAGE
|
#if USE_AZURE_BLOB_STORAGE
|
||||||
template class TableFunctionObjectStorageCluster<AzureClusterDefinition, StorageAzureConfiguration>;
|
factory.registerFunction<TableFunctionIcebergAzureCluster>(
|
||||||
|
{.documentation
|
||||||
|
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)",
|
||||||
|
.examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])", ""}},
|
||||||
|
.categories{"DataLake"}},
|
||||||
|
.allow_readonly = false});
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if USE_HDFS
|
#if USE_HDFS
|
||||||
template class TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
|
factory.registerFunction<TableFunctionIcebergHDFSCluster>(
|
||||||
|
{.documentation
|
||||||
|
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)",
|
||||||
|
.examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(cluster, uri, [format], [structure], [compression_method])", ""}},
|
||||||
|
.categories{"DataLake"}},
|
||||||
|
.allow_readonly = false});
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if USE_AWS_S3
|
||||||
|
#if USE_PARQUET
|
||||||
|
void registerTableFunctionDeltaLakeCluster(TableFunctionFactory & factory)
|
||||||
|
{
|
||||||
|
factory.registerFunction<TableFunctionDeltaLakeCluster>(
|
||||||
|
{.documentation
|
||||||
|
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)",
|
||||||
|
.examples{{"deltaLakeCluster", "SELECT * FROM deltaLakeCluster(cluster, url, access_key_id, secret_access_key)", ""}},
|
||||||
|
.categories{"DataLake"}},
|
||||||
|
.allow_readonly = false});
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
void registerTableFunctionHudiCluster(TableFunctionFactory & factory)
|
||||||
|
{
|
||||||
|
factory.registerFunction<TableFunctionHudiCluster>(
|
||||||
|
{.documentation
|
||||||
|
= {.description = R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster.)",
|
||||||
|
.examples{{"hudiCluster", "SELECT * FROM hudiCluster(cluster, url, access_key_id, secret_access_key)", ""}},
|
||||||
|
.categories{"DataLake"}},
|
||||||
|
.allow_readonly = false});
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory)
|
||||||
|
{
|
||||||
|
UNUSED(factory);
|
||||||
|
#if USE_AVRO
|
||||||
|
registerTableFunctionIcebergCluster(factory);
|
||||||
|
#endif
|
||||||
|
#if USE_AWS_S3
|
||||||
|
#if USE_PARQUET
|
||||||
|
registerTableFunctionDeltaLakeCluster(factory);
|
||||||
|
#endif
|
||||||
|
registerTableFunctionHudiCluster(factory);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
@ -33,6 +33,36 @@ struct HDFSClusterDefinition
|
|||||||
static constexpr auto storage_type_name = "HDFSCluster";
|
static constexpr auto storage_type_name = "HDFSCluster";
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct IcebergS3ClusterDefinition
|
||||||
|
{
|
||||||
|
static constexpr auto name = "icebergS3Cluster";
|
||||||
|
static constexpr auto storage_type_name = "IcebergS3Cluster";
|
||||||
|
};
|
||||||
|
|
||||||
|
struct IcebergAzureClusterDefinition
|
||||||
|
{
|
||||||
|
static constexpr auto name = "icebergAzureCluster";
|
||||||
|
static constexpr auto storage_type_name = "IcebergAzureCluster";
|
||||||
|
};
|
||||||
|
|
||||||
|
struct IcebergHDFSClusterDefinition
|
||||||
|
{
|
||||||
|
static constexpr auto name = "icebergHDFSCluster";
|
||||||
|
static constexpr auto storage_type_name = "IcebergHDFSCluster";
|
||||||
|
};
|
||||||
|
|
||||||
|
struct DeltaLakeClusterDefinition
|
||||||
|
{
|
||||||
|
static constexpr auto name = "deltaLakeCluster";
|
||||||
|
static constexpr auto storage_type_name = "DeltaLakeS3Cluster";
|
||||||
|
};
|
||||||
|
|
||||||
|
struct HudiClusterDefinition
|
||||||
|
{
|
||||||
|
static constexpr auto name = "hudiCluster";
|
||||||
|
static constexpr auto storage_type_name = "HudiS3Cluster";
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions,
|
* Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions,
|
||||||
* which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster.
|
* which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster.
|
||||||
@ -79,4 +109,25 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster<AzureClu
|
|||||||
#if USE_HDFS
|
#if USE_HDFS
|
||||||
using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
|
using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#if USE_AVRO && USE_AWS_S3
|
||||||
|
using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
|
||||||
|
using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if USE_AVRO && USE_HDFS
|
||||||
|
using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if USE_AWS_S3 && USE_PARQUET
|
||||||
|
using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if USE_AWS_S3
|
||||||
|
using TableFunctionHudiCluster = TableFunctionObjectStorageCluster<HudiClusterDefinition, StorageS3HudiConfiguration>;
|
||||||
|
#endif
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -66,6 +66,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]]
|
|||||||
registerTableFunctionObjectStorage(factory);
|
registerTableFunctionObjectStorage(factory);
|
||||||
registerTableFunctionObjectStorageCluster(factory);
|
registerTableFunctionObjectStorageCluster(factory);
|
||||||
registerDataLakeTableFunctions(factory);
|
registerDataLakeTableFunctions(factory);
|
||||||
|
registerDataLakeClusterTableFunctions(factory);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -70,6 +70,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
|
|||||||
void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
|
void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
|
||||||
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
|
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
|
||||||
void registerDataLakeTableFunctions(TableFunctionFactory & factory);
|
void registerDataLakeTableFunctions(TableFunctionFactory & factory);
|
||||||
|
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);
|
||||||
|
|
||||||
void registerTableFunctionTimeSeries(TableFunctionFactory & factory);
|
void registerTableFunctionTimeSeries(TableFunctionFactory & factory);
|
||||||
|
|
||||||
|
@ -0,0 +1,20 @@
|
|||||||
|
<clickhouse>
|
||||||
|
<remote_servers>
|
||||||
|
<cluster_simple>
|
||||||
|
<shard>
|
||||||
|
<replica>
|
||||||
|
<host>node1</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
<replica>
|
||||||
|
<host>node2</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
<replica>
|
||||||
|
<host>node3</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
</shard>
|
||||||
|
</cluster_simple>
|
||||||
|
</remote_servers>
|
||||||
|
</clickhouse>
|
@ -0,0 +1,6 @@
|
|||||||
|
<clickhouse>
|
||||||
|
<query_log>
|
||||||
|
<database>system</database>
|
||||||
|
<table>query_log</table>
|
||||||
|
</query_log>
|
||||||
|
</clickhouse>
|
@ -73,14 +73,38 @@ def started_cluster():
|
|||||||
cluster.add_instance(
|
cluster.add_instance(
|
||||||
"node1",
|
"node1",
|
||||||
main_configs=[
|
main_configs=[
|
||||||
|
"configs/config.d/query_log.xml",
|
||||||
|
"configs/config.d/cluster.xml",
|
||||||
"configs/config.d/named_collections.xml",
|
"configs/config.d/named_collections.xml",
|
||||||
"configs/config.d/filesystem_caches.xml",
|
"configs/config.d/filesystem_caches.xml",
|
||||||
],
|
],
|
||||||
user_configs=["configs/users.d/users.xml"],
|
user_configs=["configs/users.d/users.xml"],
|
||||||
with_minio=True,
|
with_minio=True,
|
||||||
with_azurite=True,
|
with_azurite=True,
|
||||||
stay_alive=True,
|
|
||||||
with_hdfs=with_hdfs,
|
with_hdfs=with_hdfs,
|
||||||
|
stay_alive=True,
|
||||||
|
)
|
||||||
|
cluster.add_instance(
|
||||||
|
"node2",
|
||||||
|
main_configs=[
|
||||||
|
"configs/config.d/query_log.xml",
|
||||||
|
"configs/config.d/cluster.xml",
|
||||||
|
"configs/config.d/named_collections.xml",
|
||||||
|
"configs/config.d/filesystem_caches.xml",
|
||||||
|
],
|
||||||
|
user_configs=["configs/users.d/users.xml"],
|
||||||
|
stay_alive=True,
|
||||||
|
)
|
||||||
|
cluster.add_instance(
|
||||||
|
"node3",
|
||||||
|
main_configs=[
|
||||||
|
"configs/config.d/query_log.xml",
|
||||||
|
"configs/config.d/cluster.xml",
|
||||||
|
"configs/config.d/named_collections.xml",
|
||||||
|
"configs/config.d/filesystem_caches.xml",
|
||||||
|
],
|
||||||
|
user_configs=["configs/users.d/users.xml"],
|
||||||
|
stay_alive=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
logging.info("Starting cluster...")
|
logging.info("Starting cluster...")
|
||||||
@ -182,6 +206,7 @@ def get_creation_expression(
|
|||||||
cluster,
|
cluster,
|
||||||
format="Parquet",
|
format="Parquet",
|
||||||
table_function=False,
|
table_function=False,
|
||||||
|
run_on_cluster=False,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
):
|
||||||
if storage_type == "s3":
|
if storage_type == "s3":
|
||||||
@ -189,35 +214,56 @@ def get_creation_expression(
|
|||||||
bucket = kwargs["bucket"]
|
bucket = kwargs["bucket"]
|
||||||
else:
|
else:
|
||||||
bucket = cluster.minio_bucket
|
bucket = cluster.minio_bucket
|
||||||
print(bucket)
|
|
||||||
if table_function:
|
if run_on_cluster:
|
||||||
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
|
assert table_function
|
||||||
|
return f"icebergS3Cluster('cluster_simple', s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
|
||||||
else:
|
else:
|
||||||
return f"""
|
if table_function:
|
||||||
DROP TABLE IF EXISTS {table_name};
|
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
|
||||||
CREATE TABLE {table_name}
|
else:
|
||||||
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
|
return f"""
|
||||||
|
DROP TABLE IF EXISTS {table_name};
|
||||||
|
CREATE TABLE {table_name}
|
||||||
|
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
|
||||||
|
|
||||||
elif storage_type == "azure":
|
elif storage_type == "azure":
|
||||||
if table_function:
|
if run_on_cluster:
|
||||||
|
assert table_function
|
||||||
return f"""
|
return f"""
|
||||||
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
|
icebergAzureCluster('cluster_simple', azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
|
||||||
"""
|
"""
|
||||||
else:
|
else:
|
||||||
return f"""
|
if table_function:
|
||||||
DROP TABLE IF EXISTS {table_name};
|
return f"""
|
||||||
CREATE TABLE {table_name}
|
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
|
||||||
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
|
"""
|
||||||
|
else:
|
||||||
|
return f"""
|
||||||
|
DROP TABLE IF EXISTS {table_name};
|
||||||
|
CREATE TABLE {table_name}
|
||||||
|
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
|
||||||
|
|
||||||
elif storage_type == "hdfs":
|
elif storage_type == "hdfs":
|
||||||
if table_function:
|
if run_on_cluster:
|
||||||
|
assert table_function
|
||||||
return f"""
|
return f"""
|
||||||
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
|
icebergHDFSCluster('cluster_simple', hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
|
||||||
"""
|
"""
|
||||||
else:
|
else:
|
||||||
return f"""
|
if table_function:
|
||||||
DROP TABLE IF EXISTS {table_name};
|
return f"""
|
||||||
CREATE TABLE {table_name}
|
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
|
||||||
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
|
"""
|
||||||
|
else:
|
||||||
|
return f"""
|
||||||
|
DROP TABLE IF EXISTS {table_name};
|
||||||
|
CREATE TABLE {table_name}
|
||||||
|
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
|
||||||
|
|
||||||
elif storage_type == "local":
|
elif storage_type == "local":
|
||||||
|
assert not run_on_cluster
|
||||||
|
|
||||||
if table_function:
|
if table_function:
|
||||||
return f"""
|
return f"""
|
||||||
icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})
|
icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})
|
||||||
@ -227,6 +273,7 @@ def get_creation_expression(
|
|||||||
DROP TABLE IF EXISTS {table_name};
|
DROP TABLE IF EXISTS {table_name};
|
||||||
CREATE TABLE {table_name}
|
CREATE TABLE {table_name}
|
||||||
ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});"""
|
ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});"""
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise Exception(f"Unknown iceberg storage type: {storage_type}")
|
raise Exception(f"Unknown iceberg storage type: {storage_type}")
|
||||||
|
|
||||||
@ -492,6 +539,108 @@ def test_types(started_cluster, format_version, storage_type):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("format_version", ["1", "2"])
|
||||||
|
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"])
|
||||||
|
def test_cluster_table_function(started_cluster, format_version, storage_type):
|
||||||
|
if is_arm() and storage_type == "hdfs":
|
||||||
|
pytest.skip("Disabled test IcebergHDFS for aarch64")
|
||||||
|
|
||||||
|
instance = started_cluster.instances["node1"]
|
||||||
|
spark = started_cluster.spark_session
|
||||||
|
|
||||||
|
TABLE_NAME = (
|
||||||
|
"test_iceberg_cluster_"
|
||||||
|
+ format_version
|
||||||
|
+ "_"
|
||||||
|
+ storage_type
|
||||||
|
+ "_"
|
||||||
|
+ get_uuid_str()
|
||||||
|
)
|
||||||
|
|
||||||
|
def add_df(mode):
|
||||||
|
write_iceberg_from_df(
|
||||||
|
spark,
|
||||||
|
generate_data(spark, 0, 100),
|
||||||
|
TABLE_NAME,
|
||||||
|
mode=mode,
|
||||||
|
format_version=format_version,
|
||||||
|
)
|
||||||
|
|
||||||
|
files = default_upload_directory(
|
||||||
|
started_cluster,
|
||||||
|
storage_type,
|
||||||
|
f"/iceberg_data/default/{TABLE_NAME}/",
|
||||||
|
f"/iceberg_data/default/{TABLE_NAME}/",
|
||||||
|
)
|
||||||
|
|
||||||
|
logging.info(f"Adding another dataframe. result files: {files}")
|
||||||
|
|
||||||
|
return files
|
||||||
|
|
||||||
|
files = add_df(mode="overwrite")
|
||||||
|
for i in range(1, len(started_cluster.instances)):
|
||||||
|
files = add_df(mode="append")
|
||||||
|
|
||||||
|
logging.info(f"Setup complete. files: {files}")
|
||||||
|
assert len(files) == 5 + 4 * (len(started_cluster.instances) - 1)
|
||||||
|
|
||||||
|
clusters = instance.query(f"SELECT * FROM system.clusters")
|
||||||
|
logging.info(f"Clusters setup: {clusters}")
|
||||||
|
|
||||||
|
# Regular Query only node1
|
||||||
|
table_function_expr = get_creation_expression(
|
||||||
|
storage_type, TABLE_NAME, started_cluster, table_function=True
|
||||||
|
)
|
||||||
|
select_regular = (
|
||||||
|
instance.query(f"SELECT * FROM {table_function_expr}").strip().split()
|
||||||
|
)
|
||||||
|
|
||||||
|
# Cluster Query with node1 as coordinator
|
||||||
|
table_function_expr_cluster = get_creation_expression(
|
||||||
|
storage_type,
|
||||||
|
TABLE_NAME,
|
||||||
|
started_cluster,
|
||||||
|
table_function=True,
|
||||||
|
run_on_cluster=True,
|
||||||
|
)
|
||||||
|
select_cluster = (
|
||||||
|
instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split()
|
||||||
|
)
|
||||||
|
|
||||||
|
# Simple size check
|
||||||
|
assert len(select_regular) == 600
|
||||||
|
assert len(select_cluster) == 600
|
||||||
|
|
||||||
|
# Actual check
|
||||||
|
assert select_cluster == select_regular
|
||||||
|
|
||||||
|
# Check query_log
|
||||||
|
for replica in started_cluster.instances.values():
|
||||||
|
replica.query("SYSTEM FLUSH LOGS")
|
||||||
|
|
||||||
|
for node_name, replica in started_cluster.instances.items():
|
||||||
|
cluster_secondary_queries = (
|
||||||
|
replica.query(
|
||||||
|
f"""
|
||||||
|
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
|
||||||
|
WHERE
|
||||||
|
type = 'QueryStart' AND
|
||||||
|
positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND
|
||||||
|
position(query, '{TABLE_NAME}') != 0 AND
|
||||||
|
position(query, 'system.query_log') = 0 AND
|
||||||
|
NOT is_initial_query
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
.strip()
|
||||||
|
.split("\n")
|
||||||
|
)
|
||||||
|
|
||||||
|
logging.info(
|
||||||
|
f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}"
|
||||||
|
)
|
||||||
|
assert len(cluster_secondary_queries) == 1
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("format_version", ["1", "2"])
|
@pytest.mark.parametrize("format_version", ["1", "2"])
|
||||||
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
|
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
|
||||||
def test_delete_files(started_cluster, format_version, storage_type):
|
def test_delete_files(started_cluster, format_version, storage_type):
|
||||||
|
@ -1,3 +1,6 @@
|
|||||||
SELECT arrayWithConstant(96142475, ['qMUF']); -- { serverError TOO_LARGE_ARRAY_SIZE }
|
SELECT arrayWithConstant(96142475, ['qMUF']); -- { serverError TOO_LARGE_ARRAY_SIZE }
|
||||||
SELECT arrayWithConstant(100000000, materialize([[[[[[[[[['Hello, world!']]]]]]]]]])); -- { serverError TOO_LARGE_ARRAY_SIZE }
|
SELECT arrayWithConstant(100000000, materialize([[[[[[[[[['Hello, world!']]]]]]]]]])); -- { serverError TOO_LARGE_ARRAY_SIZE }
|
||||||
SELECT length(arrayWithConstant(10000000, materialize([[[[[[[[[['Hello world']]]]]]]]]])));
|
SELECT length(arrayWithConstant(10000000, materialize([[[[[[[[[['Hello world']]]]]]]]]])));
|
||||||
|
|
||||||
|
CREATE TEMPORARY TABLE args (value Array(Int)) ENGINE=Memory AS SELECT [1, 1, 1, 1] as value FROM numbers(1, 100);
|
||||||
|
SELECT length(arrayWithConstant(1000000, value)) FROM args FORMAT NULL;
|
||||||
|
@ -244,7 +244,10 @@ Deduplication
|
|||||||
DefaultTableEngine
|
DefaultTableEngine
|
||||||
DelayedInserts
|
DelayedInserts
|
||||||
DeliveryTag
|
DeliveryTag
|
||||||
|
Deltalake
|
||||||
DeltaLake
|
DeltaLake
|
||||||
|
deltalakeCluster
|
||||||
|
deltaLakeCluster
|
||||||
Denormalize
|
Denormalize
|
||||||
DestroyAggregatesThreads
|
DestroyAggregatesThreads
|
||||||
DestroyAggregatesThreadsActive
|
DestroyAggregatesThreadsActive
|
||||||
@ -377,10 +380,15 @@ Homebrew's
|
|||||||
HorizontalDivide
|
HorizontalDivide
|
||||||
Hostname
|
Hostname
|
||||||
HouseOps
|
HouseOps
|
||||||
|
hudi
|
||||||
Hudi
|
Hudi
|
||||||
|
hudiCluster
|
||||||
|
HudiCluster
|
||||||
HyperLogLog
|
HyperLogLog
|
||||||
Hypot
|
Hypot
|
||||||
IANA
|
IANA
|
||||||
|
icebergCluster
|
||||||
|
IcebergCluster
|
||||||
IDE
|
IDE
|
||||||
IDEs
|
IDEs
|
||||||
IDNA
|
IDNA
|
||||||
|
Loading…
Reference in New Issue
Block a user