mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Compare commits
44 Commits
40d5478827
...
2ff295f5f6
Author | SHA1 | Date | |
---|---|---|---|
|
2ff295f5f6 | ||
|
e0f8b8d351 | ||
|
da2176d696 | ||
|
53e0036593 | ||
|
25bd73ea5e | ||
|
72d5af29e0 | ||
|
44b4bd38b9 | ||
|
4abbf29a86 | ||
|
fd5023e2a6 | ||
|
9a2a664b04 | ||
|
4ccebd9a24 | ||
|
99177c0daf | ||
|
599d977d42 | ||
|
9152fc9c67 | ||
|
7b4aabbb71 | ||
|
b34dfca0c5 | ||
|
ad67608956 | ||
|
0951991c1d | ||
|
19aec5e572 | ||
|
a367de9977 | ||
|
6894e280b2 | ||
|
4e2549bcf3 | ||
|
39ebe113d9 | ||
|
014608fb6b | ||
|
a29ded4941 | ||
|
d2efae7511 | ||
|
49589da56e | ||
|
32ff7d2722 | ||
|
0e14b49298 | ||
|
326c91c02b | ||
|
6c3003c6ce | ||
|
58edfbe113 | ||
|
6dfd4ad942 | ||
|
1e8ee2034a | ||
|
f8fbd0330c | ||
|
19854ff78a | ||
|
f8255fb4ae | ||
|
e565f5f1c7 | ||
|
ca3dfe5d8e | ||
|
1c3fcfa355 | ||
|
fb28c16453 | ||
|
9c6a75c1dd | ||
|
3827d90bb0 | ||
|
bf3a3ad607 |
@ -16,16 +16,18 @@ ClickHouse works 100-1000x faster than traditional database management systems,
|
||||
|
||||
For more information and documentation see https://clickhouse.com/.
|
||||
|
||||
<!-- This is not related to the docker official library, remove it before commit to https://github.com/docker-library/docs -->
|
||||
## Versions
|
||||
|
||||
- The `latest` tag points to the latest release of the latest stable branch.
|
||||
- Branch tags like `22.2` point to the latest release of the corresponding branch.
|
||||
- Full version tags like `22.2.3.5` point to the corresponding release.
|
||||
- Full version tags like `22.2.3` and `22.2.3.5` point to the corresponding release.
|
||||
<!-- docker-official-library:off -->
|
||||
<!-- This is not related to the docker official library, remove it before commit to https://github.com/docker-library/docs -->
|
||||
- The tag `head` is built from the latest commit to the default branch.
|
||||
- Each tag has optional `-alpine` suffix to reflect that it's built on top of `alpine`.
|
||||
|
||||
<!-- REMOVE UNTIL HERE -->
|
||||
<!-- docker-official-library:on -->
|
||||
### Compatibility
|
||||
|
||||
- The amd64 image requires support for [SSE3 instructions](https://en.wikipedia.org/wiki/SSE3). Virtually all x86 CPUs after 2005 support SSE3.
|
||||
|
@ -10,16 +10,18 @@ ClickHouse works 100-1000x faster than traditional database management systems,
|
||||
|
||||
For more information and documentation see https://clickhouse.com/.
|
||||
|
||||
<!-- This is not related to the docker official library, remove it before commit to https://github.com/docker-library/docs -->
|
||||
## Versions
|
||||
|
||||
- The `latest` tag points to the latest release of the latest stable branch.
|
||||
- Branch tags like `22.2` point to the latest release of the corresponding branch.
|
||||
- Full version tags like `22.2.3.5` point to the corresponding release.
|
||||
- Full version tags like `22.2.3` and `22.2.3.5` point to the corresponding release.
|
||||
<!-- docker-official-library:off -->
|
||||
<!-- This is not related to the docker official library, remove it before commit to https://github.com/docker-library/docs -->
|
||||
- The tag `head` is built from the latest commit to the default branch.
|
||||
- Each tag has optional `-alpine` suffix to reflect that it's built on top of `alpine`.
|
||||
|
||||
<!-- REMOVE UNTIL HERE -->
|
||||
<!-- docker-official-library:on -->
|
||||
### Compatibility
|
||||
|
||||
- The amd64 image requires support for [SSE3 instructions](https://en.wikipedia.org/wiki/SSE3). Virtually all x86 CPUs after 2005 support SSE3.
|
||||
|
@ -36,7 +36,7 @@ geomet==0.2.1.post1
|
||||
grpcio-tools==1.60.0
|
||||
grpcio==1.60.0
|
||||
gssapi==1.8.3
|
||||
httplib2==0.20.2
|
||||
httplib2==0.22.0
|
||||
idna==3.7
|
||||
importlib-metadata==4.6.4
|
||||
iniconfig==2.0.0
|
||||
@ -72,7 +72,7 @@ pyarrow==17.0.0
|
||||
pycparser==2.22
|
||||
pycryptodome==3.20.0
|
||||
pymongo==3.11.0
|
||||
pyparsing==2.4.7
|
||||
pyparsing==3.1.0
|
||||
pyspark==3.3.2
|
||||
pyspnego==0.10.2
|
||||
pytest-order==1.0.0
|
||||
@ -101,3 +101,4 @@ wadllib==1.3.6
|
||||
websocket-client==1.8.0
|
||||
wheel==0.38.1
|
||||
zipp==1.0.0
|
||||
pyiceberg==0.7.1
|
||||
|
@ -522,4 +522,3 @@ sidebar_label: 2024
|
||||
* Backported in [#68518](https://github.com/ClickHouse/ClickHouse/issues/68518): Minor update in Dynamic/JSON serializations. [#68459](https://github.com/ClickHouse/ClickHouse/pull/68459) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
* Backported in [#68558](https://github.com/ClickHouse/ClickHouse/issues/68558): CI: Minor release workflow fix. [#68536](https://github.com/ClickHouse/ClickHouse/pull/68536) ([Max K.](https://github.com/maxknv)).
|
||||
* Backported in [#68576](https://github.com/ClickHouse/ClickHouse/issues/68576): CI: Tidy build timeout from 2h to 3h. [#68567](https://github.com/ClickHouse/ClickHouse/pull/68567) ([Max K.](https://github.com/maxknv)).
|
||||
|
||||
|
@ -497,4 +497,3 @@ sidebar_label: 2024
|
||||
* Backported in [#69899](https://github.com/ClickHouse/ClickHouse/issues/69899): Revert "Merge pull request [#69032](https://github.com/ClickHouse/ClickHouse/issues/69032) from alexon1234/include_real_time_execution_in_http_header". [#69885](https://github.com/ClickHouse/ClickHouse/pull/69885) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
* Backported in [#69931](https://github.com/ClickHouse/ClickHouse/issues/69931): RIPE is an acronym and thus should be capital. RIPE stands for **R**ACE **I**ntegrity **P**rimitives **E**valuation and RACE stands for **R**esearch and Development in **A**dvanced **C**ommunications **T**echnologies in **E**urope. [#69901](https://github.com/ClickHouse/ClickHouse/pull/69901) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)).
|
||||
* Backported in [#70034](https://github.com/ClickHouse/ClickHouse/issues/70034): Revert "Add RIPEMD160 function". [#70005](https://github.com/ClickHouse/ClickHouse/pull/70005) ([Robert Schulze](https://github.com/rschu1ze)).
|
||||
|
||||
|
@ -49,4 +49,4 @@ LIMIT 2
|
||||
**See Also**
|
||||
|
||||
- [DeltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
|
||||
|
||||
- [DeltaLake cluster table function](/docs/en/sql-reference/table-functions/deltalakeCluster.md)
|
||||
|
30
docs/en/sql-reference/table-functions/deltalakeCluster.md
Normal file
30
docs/en/sql-reference/table-functions/deltalakeCluster.md
Normal file
@ -0,0 +1,30 @@
|
||||
---
|
||||
slug: /en/sql-reference/table-functions/deltalakeCluster
|
||||
sidebar_position: 46
|
||||
sidebar_label: deltaLakeCluster
|
||||
title: "deltaLakeCluster Table Function"
|
||||
---
|
||||
This is an extension to the [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
|
||||
|
||||
Allows processing files from [Delta Lake](https://github.com/delta-io/delta) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
deltaLakeCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
|
||||
|
||||
- Description of all other arguments coincides with description of arguments in equivalent [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
|
||||
|
||||
**Returned value**
|
||||
|
||||
A table with the specified structure for reading data from cluster in the specified Delta Lake table in S3.
|
||||
|
||||
**See Also**
|
||||
|
||||
- [deltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
|
||||
- [deltaLake table function](/docs/en/sql-reference/table-functions/deltalake.md)
|
@ -29,4 +29,4 @@ A table with the specified structure for reading data in the specified Hudi tabl
|
||||
**See Also**
|
||||
|
||||
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
|
||||
|
||||
- [Hudi cluster table function](/docs/en/sql-reference/table-functions/hudiCluster.md)
|
||||
|
30
docs/en/sql-reference/table-functions/hudiCluster.md
Normal file
30
docs/en/sql-reference/table-functions/hudiCluster.md
Normal file
@ -0,0 +1,30 @@
|
||||
---
|
||||
slug: /en/sql-reference/table-functions/hudiCluster
|
||||
sidebar_position: 86
|
||||
sidebar_label: hudiCluster
|
||||
title: "hudiCluster Table Function"
|
||||
---
|
||||
This is an extension to the [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
|
||||
|
||||
Allows processing files from Apache [Hudi](https://hudi.apache.org/) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
hudiCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
|
||||
|
||||
- Description of all other arguments coincides with description of arguments in equivalent [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
|
||||
|
||||
**Returned value**
|
||||
|
||||
A table with the specified structure for reading data from cluster in the specified Hudi table in S3.
|
||||
|
||||
**See Also**
|
||||
|
||||
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
|
||||
- [Hudi table function](/docs/en/sql-reference/table-functions/hudi.md)
|
@ -72,3 +72,4 @@ Table function `iceberg` is an alias to `icebergS3` now.
|
||||
**See Also**
|
||||
|
||||
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
|
||||
- [Iceberg cluster table function](/docs/en/sql-reference/table-functions/icebergCluster.md)
|
||||
|
43
docs/en/sql-reference/table-functions/icebergCluster.md
Normal file
43
docs/en/sql-reference/table-functions/icebergCluster.md
Normal file
@ -0,0 +1,43 @@
|
||||
---
|
||||
slug: /en/sql-reference/table-functions/icebergCluster
|
||||
sidebar_position: 91
|
||||
sidebar_label: icebergCluster
|
||||
title: "icebergCluster Table Function"
|
||||
---
|
||||
This is an extension to the [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
|
||||
|
||||
Allows processing files from Apache [Iceberg](https://iceberg.apache.org/) in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
icebergS3Cluster(cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method])
|
||||
icebergS3Cluster(cluster_name, named_collection[, option=value [,..]])
|
||||
|
||||
icebergAzureCluster(cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method])
|
||||
icebergAzureCluster(cluster_name, named_collection[, option=value [,..]])
|
||||
|
||||
icebergHDFSCluster(cluster_name, path_to_table, [,format] [,compression_method])
|
||||
icebergHDFSCluster(cluster_name, named_collection[, option=value [,..]])
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
|
||||
|
||||
- Description of all other arguments coincides with description of arguments in equivalent [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
|
||||
|
||||
**Returned value**
|
||||
|
||||
A table with the specified structure for reading data from cluster in the specified Iceberg table.
|
||||
|
||||
**Examples**
|
||||
|
||||
```sql
|
||||
SELECT * FROM icebergS3Cluster('cluster_simple', 'http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test')
|
||||
```
|
||||
|
||||
**See Also**
|
||||
|
||||
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
|
||||
- [Iceberg table function](/docs/en/sql-reference/table-functions/iceberg.md)
|
@ -476,7 +476,7 @@
|
||||
<input id="edit" type="button" value="✎" style="display: none;">
|
||||
<input id="add" type="button" value="Add chart" style="display: none;">
|
||||
<input id="reload" type="button" value="Reload">
|
||||
<span id="search-span" class="nowrap" style="display: none;"><input id="search" type="button" value="🔎" title="Run query to obtain list of charts from ClickHouse"><input id="search-query" name="search" type="text" spellcheck="false"></span>
|
||||
<span id="search-span" class="nowrap" style="display: none;"><input id="search" type="button" value="🔎" title="Run query to obtain list of charts from ClickHouse. Either select dashboard name or write your own query"><input id="search-query" name="search" list="search-options" type="text" spellcheck="false"><datalist id="search-options"></datalist></span>
|
||||
<div id="chart-params"></div>
|
||||
</div>
|
||||
</form>
|
||||
@ -532,9 +532,15 @@ const errorMessages = [
|
||||
}
|
||||
]
|
||||
|
||||
/// Dashboard selector
|
||||
const dashboardSearchQuery = (dashboard_name) => `SELECT title, query FROM system.dashboards WHERE dashboard = '${dashboard_name}'`;
|
||||
let dashboard_queries = {
|
||||
"Overview": dashboardSearchQuery("Overview"),
|
||||
};
|
||||
const default_dashboard = 'Overview';
|
||||
|
||||
/// Query to fill `queries` list for the dashboard
|
||||
let search_query = `SELECT title, query FROM system.dashboards WHERE dashboard = 'Overview'`;
|
||||
let search_query = dashboardSearchQuery(default_dashboard);
|
||||
let customized = false;
|
||||
let queries = [];
|
||||
|
||||
@ -1439,7 +1445,7 @@ async function reloadAll(do_search) {
|
||||
try {
|
||||
updateParams();
|
||||
if (do_search) {
|
||||
search_query = document.getElementById('search-query').value;
|
||||
search_query = toSearchQuery(document.getElementById('search-query').value);
|
||||
queries = [];
|
||||
refreshCustomized(false);
|
||||
}
|
||||
@ -1504,7 +1510,7 @@ function updateFromState() {
|
||||
document.getElementById('url').value = host;
|
||||
document.getElementById('user').value = user;
|
||||
document.getElementById('password').value = password;
|
||||
document.getElementById('search-query').value = search_query;
|
||||
document.getElementById('search-query').value = fromSearchQuery(search_query);
|
||||
refreshCustomized();
|
||||
}
|
||||
|
||||
@ -1543,6 +1549,44 @@ if (window.location.hash) {
|
||||
} catch {}
|
||||
}
|
||||
|
||||
function fromSearchQuery(query) {
|
||||
for (const dashboard_name in dashboard_queries) {
|
||||
if (query == dashboard_queries[dashboard_name])
|
||||
return dashboard_name;
|
||||
}
|
||||
return query;
|
||||
}
|
||||
|
||||
function toSearchQuery(value) {
|
||||
if (value in dashboard_queries)
|
||||
return dashboard_queries[value];
|
||||
else
|
||||
return value;
|
||||
}
|
||||
|
||||
async function populateSearchOptions() {
|
||||
let {reply, error} = await doFetch("SELECT dashboard FROM system.dashboards GROUP BY dashboard ORDER BY ALL");
|
||||
if (error) {
|
||||
throw new Error(error);
|
||||
}
|
||||
let data = reply.data;
|
||||
if (data.dashboard.length == 0) {
|
||||
console.log("Unable to fetch dashboards list");
|
||||
return;
|
||||
}
|
||||
dashboard_queries = {};
|
||||
for (let i = 0; i < data.dashboard.length; i++) {
|
||||
const dashboard = data.dashboard[i];
|
||||
dashboard_queries[dashboard] = dashboardSearchQuery(dashboard);
|
||||
}
|
||||
const searchOptions = document.getElementById('search-options');
|
||||
for (const dashboard in dashboard_queries) {
|
||||
const opt = document.createElement('option');
|
||||
opt.value = dashboard;
|
||||
searchOptions.appendChild(opt);
|
||||
}
|
||||
}
|
||||
|
||||
async function start() {
|
||||
try {
|
||||
updateFromState();
|
||||
@ -1558,6 +1602,7 @@ async function start() {
|
||||
} else {
|
||||
drawAll();
|
||||
}
|
||||
await populateSearchOptions();
|
||||
} catch (e) {
|
||||
showError(e.message);
|
||||
}
|
||||
|
@ -528,7 +528,7 @@ QueryTreeNodePtr IdentifierResolver::tryResolveIdentifierFromCompoundExpression(
|
||||
*
|
||||
* Resolve strategy:
|
||||
* 1. Try to bind identifier to scope argument name to node map.
|
||||
* 2. If identifier is binded but expression context and node type are incompatible return nullptr.
|
||||
* 2. If identifier is bound but expression context and node type are incompatible return nullptr.
|
||||
*
|
||||
* It is important to support edge cases, where we lookup for table or function node, but argument has same name.
|
||||
* Example: WITH (x -> x + 1) AS func, (func -> func(1) + func) AS lambda SELECT lambda(1);
|
||||
|
@ -166,6 +166,8 @@ if (TARGET ch_contrib::hdfs)
|
||||
add_headers_and_sources(dbms Disks/ObjectStorages/HDFS)
|
||||
endif()
|
||||
|
||||
add_headers_and_sources(dbms Databases/Iceberg)
|
||||
|
||||
add_headers_and_sources(dbms Disks/ObjectStorages/Cached)
|
||||
add_headers_and_sources(dbms Disks/ObjectStorages/Local)
|
||||
add_headers_and_sources(dbms Disks/ObjectStorages/Web)
|
||||
|
@ -362,7 +362,7 @@ ReplxxLineReader::ReplxxLineReader(
|
||||
if (highlighter)
|
||||
rx.set_highlighter_callback(highlighter);
|
||||
|
||||
/// By default C-p/C-n binded to COMPLETE_NEXT/COMPLETE_PREV,
|
||||
/// By default C-p/C-n bound to COMPLETE_NEXT/COMPLETE_PREV,
|
||||
/// bind C-p/C-n to history-previous/history-next like readline.
|
||||
rx.bind_key(Replxx::KEY::control('N'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::HISTORY_NEXT, code); });
|
||||
rx.bind_key(Replxx::KEY::control('P'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::HISTORY_PREVIOUS, code); });
|
||||
@ -384,9 +384,9 @@ ReplxxLineReader::ReplxxLineReader(
|
||||
rx.bind_key(Replxx::KEY::control('J'), commit_action);
|
||||
rx.bind_key(Replxx::KEY::ENTER, commit_action);
|
||||
|
||||
/// By default COMPLETE_NEXT/COMPLETE_PREV was binded to C-p/C-n, re-bind
|
||||
/// By default COMPLETE_NEXT/COMPLETE_PREV was bound to C-p/C-n, re-bind
|
||||
/// to M-P/M-N (that was used for HISTORY_COMMON_PREFIX_SEARCH before, but
|
||||
/// it also binded to M-p/M-n).
|
||||
/// it also bound to M-p/M-n).
|
||||
rx.bind_key(Replxx::KEY::meta('N'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::COMPLETE_NEXT, code); });
|
||||
rx.bind_key(Replxx::KEY::meta('P'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::COMPLETE_PREVIOUS, code); });
|
||||
/// By default M-BACKSPACE is KILL_TO_WHITESPACE_ON_LEFT, while in readline it is backward-kill-word
|
||||
|
@ -189,6 +189,9 @@
|
||||
M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \
|
||||
M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \
|
||||
M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \
|
||||
M(IcebergCatalogThreads, "Number of threads in the IcebergCatalog thread pool.") \
|
||||
M(IcebergCatalogThreadsActive, "Number of threads in the IcebergCatalog thread pool running a task.") \
|
||||
M(IcebergCatalogThreadsScheduled, "Number of queued or active jobs in the IcebergCatalog thread pool.") \
|
||||
\
|
||||
M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
|
||||
M(DiskPlainRewritableAzureFileCount, "Number of file entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
|
||||
|
@ -613,6 +613,7 @@
|
||||
M(733, TABLE_IS_BEING_RESTARTED) \
|
||||
M(734, CANNOT_WRITE_AFTER_BUFFER_CANCELED) \
|
||||
M(735, QUERY_WAS_CANCELLED_BY_CLIENT) \
|
||||
M(736, ICEBERG_CATALOG_ERROR) \
|
||||
\
|
||||
M(900, DISTRIBUTED_CACHE_ERROR) \
|
||||
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \
|
||||
|
@ -280,4 +280,14 @@ IMPLEMENT_SETTING_ENUM(
|
||||
{"StochasticSimple", MergeSelectorAlgorithm::STOCHASTIC_SIMPLE},
|
||||
{"Trivial", MergeSelectorAlgorithm::TRIVIAL}})
|
||||
|
||||
IMPLEMENT_SETTING_ENUM(DatabaseIcebergCatalogType, ErrorCodes::BAD_ARGUMENTS,
|
||||
{{"rest", DatabaseIcebergCatalogType::REST}})
|
||||
|
||||
IMPLEMENT_SETTING_ENUM(DatabaseIcebergStorageType, ErrorCodes::BAD_ARGUMENTS,
|
||||
{{"s3", DatabaseIcebergStorageType::S3},
|
||||
{"azure", DatabaseIcebergStorageType::Azure},
|
||||
{"hdfs", DatabaseIcebergStorageType::HDFS},
|
||||
{"local", DatabaseIcebergStorageType::Local},
|
||||
})
|
||||
|
||||
}
|
||||
|
@ -361,4 +361,21 @@ DECLARE_SETTING_ENUM(GroupArrayActionWhenLimitReached)
|
||||
|
||||
DECLARE_SETTING_ENUM(MergeSelectorAlgorithm)
|
||||
|
||||
enum class DatabaseIcebergCatalogType : uint8_t
|
||||
{
|
||||
REST,
|
||||
};
|
||||
|
||||
DECLARE_SETTING_ENUM(DatabaseIcebergCatalogType)
|
||||
|
||||
enum class DatabaseIcebergStorageType : uint8_t
|
||||
{
|
||||
S3,
|
||||
Azure,
|
||||
Local,
|
||||
HDFS,
|
||||
};
|
||||
|
||||
DECLARE_SETTING_ENUM(DatabaseIcebergStorageType)
|
||||
|
||||
}
|
||||
|
365
src/Databases/Iceberg/DatabaseIceberg.cpp
Normal file
365
src/Databases/Iceberg/DatabaseIceberg.cpp
Normal file
@ -0,0 +1,365 @@
|
||||
#include <Databases/Iceberg/DatabaseIceberg.h>
|
||||
|
||||
#if USE_AVRO
|
||||
#include <Access/Common/HTTPAuthenticationScheme.h>
|
||||
|
||||
#include <Databases/DatabaseFactory.h>
|
||||
#include <Databases/Iceberg/RestCatalog.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
|
||||
#include <Storages/ObjectStorage/S3/Configuration.h>
|
||||
#include <Storages/ConstraintsDescription.h>
|
||||
#include <Storages/StorageNull.h>
|
||||
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
|
||||
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/StorageID.h>
|
||||
|
||||
#include <Formats/FormatFactory.h>
|
||||
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTDataType.h>
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric IcebergCatalogThreads;
|
||||
extern const Metric IcebergCatalogThreadsActive;
|
||||
extern const Metric IcebergCatalogThreadsScheduled;
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace DatabaseIcebergSetting
|
||||
{
|
||||
extern const DatabaseIcebergSettingsDatabaseIcebergCatalogType catalog_type;
|
||||
extern const DatabaseIcebergSettingsDatabaseIcebergStorageType storage_type;
|
||||
extern const DatabaseIcebergSettingsString warehouse;
|
||||
extern const DatabaseIcebergSettingsString catalog_credential;
|
||||
extern const DatabaseIcebergSettingsString auth_header;
|
||||
extern const DatabaseIcebergSettingsString storage_endpoint;
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
/// Parse a string, containing at least one dot, into a two substrings:
|
||||
/// A.B.C.D.E -> A.B.C.D and E, where
|
||||
/// `A.B.C.D` is a table "namespace".
|
||||
/// `E` is a table name.
|
||||
std::pair<std::string, std::string> parseTableName(const std::string & name)
|
||||
{
|
||||
auto pos = name.rfind('.');
|
||||
if (pos == std::string::npos)
|
||||
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Table cannot have empty namespace: {}", name);
|
||||
|
||||
auto table_name = name.substr(pos + 1);
|
||||
auto namespace_name = name.substr(0, name.size() - table_name.size() - 1);
|
||||
return {namespace_name, table_name};
|
||||
}
|
||||
}
|
||||
|
||||
DatabaseIceberg::DatabaseIceberg(
|
||||
const std::string & database_name_,
|
||||
const std::string & url_,
|
||||
const DatabaseIcebergSettings & settings_,
|
||||
ASTPtr database_engine_definition_,
|
||||
ContextPtr context_)
|
||||
: IDatabase(database_name_)
|
||||
, url(url_)
|
||||
, settings(settings_)
|
||||
, database_engine_definition(database_engine_definition_)
|
||||
, log(getLogger("DatabaseIceberg(" + database_name_ + ")"))
|
||||
{
|
||||
validateSettings(context_);
|
||||
}
|
||||
|
||||
void DatabaseIceberg::validateSettings(const ContextPtr & context_)
|
||||
{
|
||||
if (settings[DatabaseIcebergSetting::warehouse].value.empty())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS, "`warehouse` setting cannot be empty. "
|
||||
"Please specify 'SETTINGS warehouse=<warehouse_name>' in the CREATE DATABASE query");
|
||||
}
|
||||
|
||||
if (!settings[DatabaseIcebergSetting::storage_type].changed)
|
||||
{
|
||||
auto catalog = getCatalog(context_);
|
||||
const auto storage_type = catalog->getStorageType();
|
||||
if (!storage_type)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS, "Storage type is not found in catalog config. "
|
||||
"Please specify it manually via 'SETTINGS storage_type=<type>' in CREATE DATABASE query");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<Iceberg::ICatalog> DatabaseIceberg::getCatalog(ContextPtr) const
|
||||
{
|
||||
if (catalog_impl)
|
||||
return catalog_impl;
|
||||
|
||||
switch (settings[DatabaseIcebergSetting::catalog_type].value)
|
||||
{
|
||||
case DB::DatabaseIcebergCatalogType::REST:
|
||||
{
|
||||
catalog_impl = std::make_shared<Iceberg::RestCatalog>(
|
||||
settings[DatabaseIcebergSetting::warehouse].value,
|
||||
url,
|
||||
settings[DatabaseIcebergSetting::catalog_credential].value,
|
||||
settings[DatabaseIcebergSetting::auth_header],
|
||||
Context::getGlobalContextInstance());
|
||||
}
|
||||
}
|
||||
return catalog_impl;
|
||||
}
|
||||
|
||||
std::shared_ptr<StorageObjectStorage::Configuration> DatabaseIceberg::getConfiguration() const
|
||||
{
|
||||
switch (settings[DatabaseIcebergSetting::storage_type].value)
|
||||
{
|
||||
#if USE_AWS_S3
|
||||
case DB::DatabaseIcebergStorageType::S3:
|
||||
{
|
||||
return std::make_shared<StorageS3IcebergConfiguration>();
|
||||
}
|
||||
#endif
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
case DB::DatabaseIcebergStorageType::Azure:
|
||||
{
|
||||
return std::make_shared<StorageAzureIcebergConfiguration>();
|
||||
}
|
||||
#endif
|
||||
#if USE_HDFS
|
||||
case DB::DatabaseIcebergStorageType::HDFS:
|
||||
{
|
||||
return std::make_shared<StorageHDFSIcebergConfiguration>();
|
||||
}
|
||||
#endif
|
||||
case DB::DatabaseIcebergStorageType::Local:
|
||||
{
|
||||
return std::make_shared<StorageLocalIcebergConfiguration>();
|
||||
}
|
||||
#if !USE_AWS_S3 || !USE_AZURE_BLOB_STORAGE || !USE_HDFS
|
||||
default:
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Server does not contain support for storage type {}",
|
||||
settings[DatabaseIcebergSetting::storage_type].value);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
std::string DatabaseIceberg::getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const
|
||||
{
|
||||
auto endpoint_from_settings = settings[DatabaseIcebergSetting::storage_endpoint].value;
|
||||
if (!endpoint_from_settings.empty())
|
||||
{
|
||||
return std::filesystem::path(endpoint_from_settings)
|
||||
/ table_metadata.getLocation(/* path_only */true)
|
||||
/ "";
|
||||
}
|
||||
else
|
||||
{
|
||||
return std::filesystem::path(table_metadata.getLocation(/* path_only */false)) / "";
|
||||
}
|
||||
}
|
||||
|
||||
bool DatabaseIceberg::empty() const
|
||||
{
|
||||
return getCatalog(Context::getGlobalContextInstance())->empty();
|
||||
}
|
||||
|
||||
bool DatabaseIceberg::isTableExist(const String & name, ContextPtr context_) const
|
||||
{
|
||||
const auto [namespace_name, table_name] = parseTableName(name);
|
||||
return getCatalog(context_)->existsTable(namespace_name, table_name);
|
||||
}
|
||||
|
||||
StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_) const
|
||||
{
|
||||
auto catalog = getCatalog(context_);
|
||||
auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema();
|
||||
auto [namespace_name, table_name] = parseTableName(name);
|
||||
|
||||
if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata))
|
||||
return nullptr;
|
||||
|
||||
/// Take database engine definition AST as base.
|
||||
ASTStorage * storage = database_engine_definition->as<ASTStorage>();
|
||||
ASTs args = storage->engine->arguments->children;
|
||||
|
||||
/// Replace Iceberg Catalog endpoint with storage path endpoint of requested table.
|
||||
auto table_endpoint = getStorageEndpointForTable(table_metadata);
|
||||
args[0] = std::make_shared<ASTLiteral>(table_endpoint);
|
||||
|
||||
LOG_TEST(log, "Using table endpoint: {}", table_endpoint);
|
||||
|
||||
const auto columns = ColumnsDescription(table_metadata.getSchema());
|
||||
const auto configuration = getConfiguration();
|
||||
|
||||
/// with_table_structure = false: because there will be
|
||||
/// no table structure in table definition AST.
|
||||
StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false);
|
||||
|
||||
return std::make_shared<StorageObjectStorage>(
|
||||
configuration,
|
||||
configuration->createObjectStorage(context_, /* is_readonly */ false),
|
||||
context_,
|
||||
StorageID(getDatabaseName(), name),
|
||||
/* columns */columns,
|
||||
/* constraints */ConstraintsDescription{},
|
||||
/* comment */"",
|
||||
getFormatSettings(context_),
|
||||
LoadingStrictnessLevel::CREATE,
|
||||
/* distributed_processing */false,
|
||||
/* partition_by */nullptr,
|
||||
/* lazy_init */true);
|
||||
}
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator(
|
||||
ContextPtr context_,
|
||||
const FilterByNameFunction & filter_by_table_name,
|
||||
bool /* skip_not_loaded */) const
|
||||
{
|
||||
Tables tables;
|
||||
auto catalog = getCatalog(context_);
|
||||
|
||||
auto iceberg_tables = catalog->getTables();
|
||||
size_t num_threads = std::min<size_t>(10, iceberg_tables.size());
|
||||
ThreadPool pool(
|
||||
CurrentMetrics::IcebergCatalogThreads,
|
||||
CurrentMetrics::IcebergCatalogThreadsActive,
|
||||
CurrentMetrics::IcebergCatalogThreadsScheduled,
|
||||
num_threads);
|
||||
|
||||
DB::ThreadPoolCallbackRunnerLocal<void> runner(pool, "RestCatalog");
|
||||
std::mutex mutexx;
|
||||
|
||||
for (const auto & table_name : iceberg_tables)
|
||||
{
|
||||
if (filter_by_table_name && !filter_by_table_name(table_name))
|
||||
continue;
|
||||
|
||||
runner([&]{
|
||||
auto storage = tryGetTable(table_name, context_);
|
||||
{
|
||||
std::lock_guard lock(mutexx);
|
||||
[[maybe_unused]] bool inserted = tables.emplace(table_name, storage).second;
|
||||
chassert(inserted);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
runner.waitForAllToFinishAndRethrowFirstError();
|
||||
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, getDatabaseName());
|
||||
}
|
||||
|
||||
ASTPtr DatabaseIceberg::getCreateDatabaseQuery() const
|
||||
{
|
||||
const auto & create_query = std::make_shared<ASTCreateQuery>();
|
||||
create_query->setDatabase(getDatabaseName());
|
||||
create_query->set(create_query->storage, database_engine_definition);
|
||||
return create_query;
|
||||
}
|
||||
|
||||
ASTPtr DatabaseIceberg::getCreateTableQueryImpl(
|
||||
const String & name,
|
||||
ContextPtr context_,
|
||||
bool /* throw_on_error */) const
|
||||
{
|
||||
auto catalog = getCatalog(context_);
|
||||
auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema();
|
||||
|
||||
const auto [namespace_name, table_name] = parseTableName(name);
|
||||
catalog->getTableMetadata(namespace_name, table_name, table_metadata);
|
||||
|
||||
auto create_table_query = std::make_shared<ASTCreateQuery>();
|
||||
auto table_storage_define = database_engine_definition->clone();
|
||||
|
||||
auto * storage = table_storage_define->as<ASTStorage>();
|
||||
storage->engine->kind = ASTFunction::Kind::TABLE_ENGINE;
|
||||
storage->settings = {};
|
||||
|
||||
create_table_query->set(create_table_query->storage, table_storage_define);
|
||||
|
||||
auto columns_declare_list = std::make_shared<ASTColumns>();
|
||||
auto columns_expression_list = std::make_shared<ASTExpressionList>();
|
||||
|
||||
columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
|
||||
create_table_query->set(create_table_query->columns_list, columns_declare_list);
|
||||
|
||||
create_table_query->setTable(name);
|
||||
create_table_query->setDatabase(getDatabaseName());
|
||||
|
||||
for (const auto & column_type_and_name : table_metadata.getSchema())
|
||||
{
|
||||
const auto column_declaration = std::make_shared<ASTColumnDeclaration>();
|
||||
column_declaration->name = column_type_and_name.name;
|
||||
column_declaration->type = makeASTDataType(column_type_and_name.type->getName());
|
||||
columns_expression_list->children.emplace_back(column_declaration);
|
||||
}
|
||||
|
||||
auto storage_engine_arguments = storage->engine->arguments;
|
||||
if (storage_engine_arguments->children.empty())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS, "Unexpected number of arguments: {}",
|
||||
storage_engine_arguments->children.size());
|
||||
}
|
||||
|
||||
auto table_endpoint = getStorageEndpointForTable(table_metadata);
|
||||
storage_engine_arguments->children[0] = std::make_shared<ASTLiteral>(table_endpoint);
|
||||
|
||||
return create_table_query;
|
||||
}
|
||||
|
||||
void registerDatabaseIceberg(DatabaseFactory & factory)
|
||||
{
|
||||
auto create_fn = [](const DatabaseFactory::Arguments & args)
|
||||
{
|
||||
const auto * database_engine_define = args.create_query.storage;
|
||||
const auto & database_engine_name = args.engine_name;
|
||||
|
||||
const ASTFunction * function_define = database_engine_define->engine;
|
||||
if (!function_define->arguments)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
|
||||
|
||||
ASTs & engine_args = function_define->arguments->children;
|
||||
if (engine_args.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
|
||||
|
||||
const size_t max_args_num = 3;
|
||||
if (engine_args.size() != max_args_num)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine must have {} arguments", max_args_num);
|
||||
|
||||
for (auto & engine_arg : engine_args)
|
||||
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);
|
||||
|
||||
const auto url = engine_args[0]->as<ASTLiteral>()->value.safeGet<String>();
|
||||
|
||||
DatabaseIcebergSettings database_settings;
|
||||
if (database_engine_define->settings)
|
||||
database_settings.loadFromQuery(*database_engine_define);
|
||||
|
||||
return std::make_shared<DatabaseIceberg>(
|
||||
args.database_name,
|
||||
url,
|
||||
database_settings,
|
||||
database_engine_define->clone(),
|
||||
args.context);
|
||||
};
|
||||
factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true });
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
71
src/Databases/Iceberg/DatabaseIceberg.h
Normal file
71
src/Databases/Iceberg/DatabaseIceberg.h
Normal file
@ -0,0 +1,71 @@
|
||||
#pragma once
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AVRO
|
||||
#include <Databases/DatabasesCommon.h>
|
||||
#include <Databases/Iceberg/DatabaseIcebergSettings.h>
|
||||
#include <Databases/Iceberg/ICatalog.h>
|
||||
#include <Storages/ObjectStorage/StorageObjectStorage.h>
|
||||
#include <Poco/Net/HTTPBasicCredentials.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// TODO:
|
||||
/// - auth: oauth, bearer token?
|
||||
/// - tests with azure, hdfs, local
|
||||
|
||||
class DatabaseIceberg final : public IDatabase, WithContext
|
||||
{
|
||||
public:
|
||||
explicit DatabaseIceberg(
|
||||
const std::string & database_name_,
|
||||
const std::string & url_,
|
||||
const DatabaseIcebergSettings & settings_,
|
||||
ASTPtr database_engine_definition_,
|
||||
ContextPtr context_);
|
||||
|
||||
String getEngineName() const override { return "Iceberg"; }
|
||||
|
||||
bool canContainMergeTreeTables() const override { return false; }
|
||||
bool canContainDistributedTables() const override { return false; }
|
||||
bool shouldBeEmptyOnDetach() const override { return false; }
|
||||
|
||||
bool empty() const override;
|
||||
|
||||
bool isTableExist(const String & name, ContextPtr context) const override;
|
||||
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
|
||||
|
||||
DatabaseTablesIteratorPtr getTablesIterator(
|
||||
ContextPtr context,
|
||||
const FilterByNameFunction & filter_by_table_name,
|
||||
bool skip_not_loaded) const override;
|
||||
|
||||
void shutdown() override {}
|
||||
|
||||
ASTPtr getCreateDatabaseQuery() const override;
|
||||
|
||||
protected:
|
||||
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override;
|
||||
|
||||
private:
|
||||
/// Iceberg Catalog url.
|
||||
const std::string url;
|
||||
/// SETTINGS from CREATE query.
|
||||
const DatabaseIcebergSettings settings;
|
||||
/// Database engine definition taken from initial CREATE DATABASE query.
|
||||
const ASTPtr database_engine_definition;
|
||||
const LoggerPtr log;
|
||||
/// Crendetials to authenticate Iceberg Catalog.
|
||||
Poco::Net::HTTPBasicCredentials credentials;
|
||||
|
||||
mutable std::shared_ptr<Iceberg::ICatalog> catalog_impl;
|
||||
|
||||
void validateSettings(const ContextPtr & context_);
|
||||
std::shared_ptr<Iceberg::ICatalog> getCatalog(ContextPtr context_) const;
|
||||
std::shared_ptr<StorageObjectStorage::Configuration> getConfiguration() const;
|
||||
std::string getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const;
|
||||
};
|
||||
|
||||
}
|
||||
#endif
|
86
src/Databases/Iceberg/DatabaseIcebergSettings.cpp
Normal file
86
src/Databases/Iceberg/DatabaseIcebergSettings.cpp
Normal file
@ -0,0 +1,86 @@
|
||||
#include <Core/BaseSettings.h>
|
||||
#include <Core/BaseSettingsFwdMacrosImpl.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTSetQuery.h>
|
||||
#include <Databases/Iceberg/DatabaseIcebergSettings.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_SETTING;
|
||||
}
|
||||
|
||||
#define DATABASE_ICEBERG_RELATED_SETTINGS(DECLARE, ALIAS) \
|
||||
DECLARE(DatabaseIcebergCatalogType, catalog_type, DatabaseIcebergCatalogType::REST, "Catalog type", 0) \
|
||||
DECLARE(DatabaseIcebergStorageType, storage_type, DatabaseIcebergStorageType::S3, "Storage type: S3, Local, Azure, HDFS", 0) \
|
||||
DECLARE(String, catalog_credential, "", "", 0) \
|
||||
DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \
|
||||
DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: <scheme> <auth_info>'", 0) \
|
||||
DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \
|
||||
|
||||
#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \
|
||||
DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS)
|
||||
|
||||
DECLARE_SETTINGS_TRAITS(DatabaseIcebergSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS)
|
||||
IMPLEMENT_SETTINGS_TRAITS(DatabaseIcebergSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS)
|
||||
|
||||
struct DatabaseIcebergSettingsImpl : public BaseSettings<DatabaseIcebergSettingsTraits>
|
||||
{
|
||||
};
|
||||
|
||||
#define INITIALIZE_SETTING_EXTERN(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS) \
|
||||
DatabaseIcebergSettings##TYPE NAME = &DatabaseIcebergSettingsImpl ::NAME;
|
||||
|
||||
namespace DatabaseIcebergSetting
|
||||
{
|
||||
LIST_OF_DATABASE_ICEBERG_SETTINGS(INITIALIZE_SETTING_EXTERN, SKIP_ALIAS)
|
||||
}
|
||||
|
||||
#undef INITIALIZE_SETTING_EXTERN
|
||||
|
||||
DatabaseIcebergSettings::DatabaseIcebergSettings() : impl(std::make_unique<DatabaseIcebergSettingsImpl>())
|
||||
{
|
||||
}
|
||||
|
||||
DatabaseIcebergSettings::DatabaseIcebergSettings(const DatabaseIcebergSettings & settings)
|
||||
: impl(std::make_unique<DatabaseIcebergSettingsImpl>(*settings.impl))
|
||||
{
|
||||
}
|
||||
|
||||
DatabaseIcebergSettings::DatabaseIcebergSettings(DatabaseIcebergSettings && settings) noexcept
|
||||
: impl(std::make_unique<DatabaseIcebergSettingsImpl>(std::move(*settings.impl)))
|
||||
{
|
||||
}
|
||||
|
||||
DatabaseIcebergSettings::~DatabaseIcebergSettings() = default;
|
||||
|
||||
DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR)
|
||||
|
||||
|
||||
void DatabaseIcebergSettings::applyChanges(const SettingsChanges & changes)
|
||||
{
|
||||
impl->applyChanges(changes);
|
||||
}
|
||||
|
||||
void DatabaseIcebergSettings::loadFromQuery(const ASTStorage & storage_def)
|
||||
{
|
||||
if (storage_def.settings)
|
||||
{
|
||||
try
|
||||
{
|
||||
impl->applyChanges(storage_def.settings->changes);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
|
||||
e.addMessage("for database engine " + storage_def.engine->name);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
40
src/Databases/Iceberg/DatabaseIcebergSettings.h
Normal file
40
src/Databases/Iceberg/DatabaseIcebergSettings.h
Normal file
@ -0,0 +1,40 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/BaseSettingsFwdMacros.h>
|
||||
#include <Core/FormatFactorySettings.h>
|
||||
#include <Core/SettingsEnums.h>
|
||||
#include <Core/SettingsFields.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ASTStorage;
|
||||
struct DatabaseIcebergSettingsImpl;
|
||||
class SettingsChanges;
|
||||
|
||||
/// List of available types supported in DatabaseIcebergSettings object
|
||||
#define DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
|
||||
M(CLASS_NAME, String) \
|
||||
M(CLASS_NAME, UInt64) \
|
||||
M(CLASS_NAME, DatabaseIcebergCatalogType) \
|
||||
M(CLASS_NAME, DatabaseIcebergStorageType)
|
||||
|
||||
DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, DECLARE_SETTING_TRAIT)
|
||||
|
||||
struct DatabaseIcebergSettings
|
||||
{
|
||||
DatabaseIcebergSettings();
|
||||
DatabaseIcebergSettings(const DatabaseIcebergSettings & settings);
|
||||
DatabaseIcebergSettings(DatabaseIcebergSettings && settings) noexcept;
|
||||
~DatabaseIcebergSettings();
|
||||
|
||||
DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, DECLARE_SETTING_SUBSCRIPT_OPERATOR)
|
||||
|
||||
void loadFromQuery(const ASTStorage & storage_def);
|
||||
|
||||
void applyChanges(const SettingsChanges & changes);
|
||||
|
||||
private:
|
||||
std::unique_ptr<DatabaseIcebergSettingsImpl> impl;
|
||||
};
|
||||
}
|
86
src/Databases/Iceberg/ICatalog.cpp
Normal file
86
src/Databases/Iceberg/ICatalog.cpp
Normal file
@ -0,0 +1,86 @@
|
||||
#include <Databases/Iceberg/ICatalog.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Poco/String.h>
|
||||
|
||||
namespace DB::ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace Iceberg
|
||||
{
|
||||
|
||||
void TableMetadata::setLocation(const std::string & location_)
|
||||
{
|
||||
if (!with_location)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested");
|
||||
|
||||
/// Location has format:
|
||||
/// s3://<bucket>/path/to/table/data.
|
||||
/// We want to split s3://<bucket> and path/to/table/data.
|
||||
|
||||
auto pos = location_.find("://");
|
||||
if (pos == std::string::npos)
|
||||
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
|
||||
|
||||
auto pos_to_bucket = pos + std::strlen("://");
|
||||
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
|
||||
|
||||
if (pos_to_path == std::string::npos)
|
||||
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
|
||||
|
||||
pos_to_path = pos_to_bucket + pos_to_path;
|
||||
|
||||
location_without_path = location_.substr(0, pos_to_path);
|
||||
path = location_.substr(pos_to_path + 1);
|
||||
|
||||
LOG_TEST(getLogger("TableMetadata"),
|
||||
"Parsed location without path: {}, path: {}",
|
||||
location_without_path, path);
|
||||
}
|
||||
|
||||
std::string TableMetadata::getLocation(bool path_only) const
|
||||
{
|
||||
if (!with_location)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested");
|
||||
|
||||
if (path_only)
|
||||
return path;
|
||||
else
|
||||
return std::filesystem::path(location_without_path) / path;
|
||||
}
|
||||
|
||||
void TableMetadata::setSchema(const DB::NamesAndTypesList & schema_)
|
||||
{
|
||||
if (!with_schema)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data schema was not requested");
|
||||
|
||||
schema = schema_;
|
||||
}
|
||||
|
||||
const DB::NamesAndTypesList & TableMetadata::getSchema() const
|
||||
{
|
||||
if (!with_schema)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data schema was not requested");
|
||||
|
||||
return schema;
|
||||
}
|
||||
|
||||
StorageType ICatalog::getStorageType(const std::string & location)
|
||||
{
|
||||
auto pos = location.find("://");
|
||||
if (pos == std::string::npos)
|
||||
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected path format: {}", location);
|
||||
|
||||
auto storage_type_str = location.substr(0, pos);
|
||||
auto storage_type = magic_enum::enum_cast<StorageType>(Poco::toUpper(storage_type_str));
|
||||
|
||||
if (!storage_type)
|
||||
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unsupported storage type: {}", storage_type_str);
|
||||
|
||||
return *storage_type;
|
||||
}
|
||||
|
||||
}
|
78
src/Databases/Iceberg/ICatalog.h
Normal file
78
src/Databases/Iceberg/ICatalog.h
Normal file
@ -0,0 +1,78 @@
|
||||
#pragma once
|
||||
#include <Core/Types.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Core/SettingsEnums.h>
|
||||
|
||||
namespace Iceberg
|
||||
{
|
||||
using StorageType = DB::DatabaseIcebergStorageType;
|
||||
|
||||
class TableMetadata
|
||||
{
|
||||
public:
|
||||
TableMetadata() = default;
|
||||
|
||||
TableMetadata & withLocation() { with_location = true; return *this; }
|
||||
TableMetadata & withSchema() { with_schema = true; return *this; }
|
||||
|
||||
std::string getLocation(bool path_only) const;
|
||||
std::string getLocation() const;
|
||||
std::string getLocationWithoutPath() const;
|
||||
|
||||
const DB::NamesAndTypesList & getSchema() const;
|
||||
|
||||
bool requiresLocation() const { return with_location; }
|
||||
bool requiresSchema() const { return with_schema; }
|
||||
|
||||
void setLocation(const std::string & location_);
|
||||
void setSchema(const DB::NamesAndTypesList & schema_);
|
||||
|
||||
private:
|
||||
/// starts with s3://, file://, etc
|
||||
std::string location_without_path;
|
||||
std::string path;
|
||||
/// column names and types
|
||||
DB::NamesAndTypesList schema;
|
||||
|
||||
bool with_location = false;
|
||||
bool with_schema = false;
|
||||
};
|
||||
|
||||
|
||||
class ICatalog
|
||||
{
|
||||
public:
|
||||
using Namespaces = std::vector<std::string>;
|
||||
using Tables = std::vector<std::string>;
|
||||
|
||||
explicit ICatalog(const std::string & warehouse_) : warehouse(warehouse_) {}
|
||||
|
||||
virtual ~ICatalog() = default;
|
||||
|
||||
virtual bool empty() const = 0;
|
||||
|
||||
virtual Tables getTables() const = 0;
|
||||
|
||||
virtual bool existsTable(
|
||||
const std::string & namespace_naem,
|
||||
const std::string & table_name) const = 0;
|
||||
|
||||
virtual void getTableMetadata(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const = 0;
|
||||
|
||||
virtual bool tryGetTableMetadata(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const = 0;
|
||||
|
||||
virtual std::optional<StorageType> getStorageType() const = 0;
|
||||
|
||||
protected:
|
||||
const std::string warehouse;
|
||||
|
||||
static StorageType getStorageType(const std::string & location);
|
||||
};
|
||||
|
||||
}
|
540
src/Databases/Iceberg/RestCatalog.cpp
Normal file
540
src/Databases/Iceberg/RestCatalog.cpp
Normal file
@ -0,0 +1,540 @@
|
||||
#include <Databases/Iceberg/RestCatalog.h>
|
||||
|
||||
#include <base/find_symbols.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/threadPoolCallbackRunner.h>
|
||||
#include <Common/Base64.h>
|
||||
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <IO/HTTPCommon.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
|
||||
#include <Server/HTTP/HTMLForm.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
|
||||
#include <Poco/URI.h>
|
||||
#include <Poco/JSON/Array.h>
|
||||
#include <Poco/JSON/Parser.h>
|
||||
|
||||
namespace DB::ErrorCodes
|
||||
{
|
||||
extern const int ICEBERG_CATALOG_ERROR;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric IcebergCatalogThreads;
|
||||
extern const Metric IcebergCatalogThreadsActive;
|
||||
extern const Metric IcebergCatalogThreadsScheduled;
|
||||
}
|
||||
|
||||
namespace Iceberg
|
||||
{
|
||||
|
||||
static constexpr auto config_endpoint = "config";
|
||||
static constexpr auto namespaces_endpoint = "namespaces";
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
std::pair<std::string, std::string> parseCatalogCredential(const std::string & catalog_credential)
|
||||
{
|
||||
/// Parse a string of format "<client_id>:<client_secret>"
|
||||
/// into separare strings client_id and client_secret.
|
||||
|
||||
std::string client_id, client_secret;
|
||||
if (!catalog_credential.empty())
|
||||
{
|
||||
auto pos = catalog_credential.find(':');
|
||||
if (pos == std::string::npos)
|
||||
{
|
||||
throw DB::Exception(
|
||||
DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of catalog credential: "
|
||||
"expected client_id and client_secret separated by `:`");
|
||||
}
|
||||
client_id = catalog_credential.substr(0, pos);
|
||||
client_secret = catalog_credential.substr(pos + 1);
|
||||
}
|
||||
return std::pair(client_id, client_secret);
|
||||
}
|
||||
|
||||
DB::HTTPHeaderEntry parseAuthHeader(const std::string & auth_header)
|
||||
{
|
||||
/// Parse a string of format "Authorization: <auth_scheme> <auth_token>"
|
||||
/// into a key-value header "Authorization", "<auth_scheme> <auth_token>"
|
||||
|
||||
auto pos = auth_header.find(':');
|
||||
if (pos == std::string::npos)
|
||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of auth header");
|
||||
|
||||
return DB::HTTPHeaderEntry(auth_header.substr(0, pos), auth_header.substr(pos + 1));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
std::string RestCatalog::Config::toString() const
|
||||
{
|
||||
DB::WriteBufferFromOwnString wb;
|
||||
|
||||
if (!prefix.empty())
|
||||
wb << "prefix: " << prefix.string() << ", ";
|
||||
|
||||
if (!default_base_location.empty())
|
||||
wb << "default_base_location: " << default_base_location << ", ";
|
||||
|
||||
return wb.str();
|
||||
}
|
||||
|
||||
RestCatalog::RestCatalog(
|
||||
const std::string & warehouse_,
|
||||
const std::string & base_url_,
|
||||
const std::string & catalog_credential_,
|
||||
const std::string & auth_header_,
|
||||
DB::ContextPtr context_)
|
||||
: ICatalog(warehouse_)
|
||||
, DB::WithContext(context_)
|
||||
, base_url(base_url_)
|
||||
, log(getLogger("RestCatalog(" + warehouse_ + ")"))
|
||||
{
|
||||
if (!catalog_credential_.empty())
|
||||
std::tie(client_id, client_secret) = parseCatalogCredential(catalog_credential_);
|
||||
else if (!auth_header_.empty())
|
||||
auth_header = parseAuthHeader(auth_header_);
|
||||
|
||||
config = loadConfig();
|
||||
}
|
||||
|
||||
RestCatalog::Config RestCatalog::loadConfig()
|
||||
{
|
||||
Poco::URI::QueryParameters params = {{"warehouse", warehouse}};
|
||||
auto buf = createReadBuffer(config_endpoint, params);
|
||||
|
||||
std::string json_str;
|
||||
readJSONObjectPossiblyInvalid(json_str, *buf);
|
||||
|
||||
LOG_TEST(log, "Received catalog configuration settings: {}", json_str);
|
||||
|
||||
Poco::JSON::Parser parser;
|
||||
Poco::Dynamic::Var json = parser.parse(json_str);
|
||||
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
Config result;
|
||||
|
||||
auto defaults_object = object->get("defaults").extract<Poco::JSON::Object::Ptr>();
|
||||
parseCatalogConfigurationSettings(defaults_object, result);
|
||||
|
||||
auto overrides_object = object->get("overrides").extract<Poco::JSON::Object::Ptr>();
|
||||
parseCatalogConfigurationSettings(overrides_object, result);
|
||||
|
||||
LOG_TEST(log, "Parsed catalog configuration settings: {}", result.toString());
|
||||
return result;
|
||||
}
|
||||
|
||||
void RestCatalog::parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result)
|
||||
{
|
||||
if (!object)
|
||||
return;
|
||||
|
||||
if (object->has("prefix"))
|
||||
result.prefix = object->get("prefix").extract<String>();
|
||||
|
||||
if (object->has("default-base-location"))
|
||||
result.default_base_location = object->get("default-base-location").extract<String>();
|
||||
}
|
||||
|
||||
DB::HTTPHeaderEntries RestCatalog::getHeaders(bool update_token) const
|
||||
{
|
||||
if (auth_header.has_value())
|
||||
{
|
||||
return DB::HTTPHeaderEntries{auth_header.value()};
|
||||
}
|
||||
|
||||
if (!client_id.empty())
|
||||
{
|
||||
if (!access_token.has_value() || update_token)
|
||||
{
|
||||
access_token = retrieveAccessToken();
|
||||
}
|
||||
|
||||
DB::HTTPHeaderEntries headers;
|
||||
headers.emplace_back("Authorization", "Bearer " + access_token.value());
|
||||
return headers;
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
std::string RestCatalog::retrieveAccessToken() const
|
||||
{
|
||||
static constexpr auto oauth_tokens_endpoint = "oauth/tokens";
|
||||
|
||||
/// TODO:
|
||||
/// 1. support oauth2-server-uri
|
||||
/// https://github.com/apache/iceberg/blob/918f81f3c3f498f46afcea17c1ac9cdc6913cb5c/open-api/rest-catalog-open-api.yaml#L183C82-L183C99
|
||||
|
||||
Poco::JSON::Object json;
|
||||
json.set("grant_type", "client_credentials");
|
||||
json.set("scope", "PRINCIPAL_ROLE:ALL"); /// TODO: add it into setting.
|
||||
json.set("client_id", client_id);
|
||||
json.set("client_secret", client_secret);
|
||||
|
||||
DB::HTTPHeaderEntries headers;
|
||||
headers.emplace_back("Content-Type", "application/x-www-form-urlencoded");
|
||||
headers.emplace_back("Accepts", "application/json; charset=UTF-8");
|
||||
|
||||
Poco::URI url(base_url / oauth_tokens_endpoint);
|
||||
Poco::URI::QueryParameters params = {
|
||||
{"grant_type", "client_credentials"},
|
||||
{"scope", "PRINCIPAL_ROLE:ALL"},
|
||||
{"client_id", client_id},
|
||||
{"client_secret", client_secret},
|
||||
};
|
||||
url.setQueryParameters(params);
|
||||
|
||||
const auto & context = getContext();
|
||||
auto wb = DB::BuilderRWBufferFromHTTP(url)
|
||||
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
|
||||
.withMethod(Poco::Net::HTTPRequest::HTTP_POST)
|
||||
.withSettings(context->getReadSettings())
|
||||
.withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()))
|
||||
.withHostFilter(&context->getRemoteHostFilter())
|
||||
.withSkipNotFound(false)
|
||||
.withHeaders(headers)
|
||||
.create(credentials);
|
||||
|
||||
std::string json_str;
|
||||
readJSONObjectPossiblyInvalid(json_str, *wb);
|
||||
|
||||
Poco::JSON::Parser parser;
|
||||
Poco::Dynamic::Var res_json = parser.parse(json_str);
|
||||
const Poco::JSON::Object::Ptr & object = res_json.extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
return object->get("access_token").extract<String>();
|
||||
}
|
||||
|
||||
std::optional<StorageType> RestCatalog::getStorageType() const
|
||||
{
|
||||
if (config.default_base_location.empty())
|
||||
return std::nullopt;
|
||||
return ICatalog::getStorageType(config.default_base_location);
|
||||
}
|
||||
|
||||
DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(
|
||||
const std::string & endpoint,
|
||||
const Poco::URI::QueryParameters & params) const
|
||||
{
|
||||
const auto & context = getContext();
|
||||
|
||||
Poco::URI url(base_url / endpoint);
|
||||
if (!params.empty())
|
||||
url.setQueryParameters(params);
|
||||
|
||||
auto headers = getHeaders(false);
|
||||
|
||||
LOG_TEST(log, "Requesting: {}", url.toString());
|
||||
|
||||
try
|
||||
{
|
||||
return DB::BuilderRWBufferFromHTTP(url)
|
||||
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
|
||||
.withSettings(getContext()->getReadSettings())
|
||||
.withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()))
|
||||
.withHeaders(headers)
|
||||
.withHostFilter(&getContext()->getRemoteHostFilter())
|
||||
.withDelayInit(false)
|
||||
.withSkipNotFound(false)
|
||||
.create(credentials);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
auto new_headers = getHeaders(true);
|
||||
return DB::BuilderRWBufferFromHTTP(url)
|
||||
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
|
||||
.withSettings(getContext()->getReadSettings())
|
||||
.withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()))
|
||||
.withHeaders(new_headers)
|
||||
.withHostFilter(&getContext()->getRemoteHostFilter())
|
||||
.withDelayInit(false)
|
||||
.withSkipNotFound(false)
|
||||
.create(credentials);
|
||||
}
|
||||
}
|
||||
|
||||
bool RestCatalog::empty() const
|
||||
{
|
||||
try
|
||||
{
|
||||
bool found_table = false;
|
||||
auto stop_condition = [&](const std::string & namespace_name) -> bool
|
||||
{
|
||||
const auto tables = getTables(namespace_name, /* limit */1);
|
||||
found_table = !tables.empty();
|
||||
return found_table;
|
||||
};
|
||||
|
||||
Namespaces namespaces;
|
||||
getNamespacesRecursive("", namespaces, stop_condition, {});
|
||||
|
||||
return found_table;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
DB::tryLogCurrentException(log);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
RestCatalog::Tables RestCatalog::getTables() const
|
||||
{
|
||||
size_t num_threads = 10;
|
||||
ThreadPool pool(
|
||||
CurrentMetrics::IcebergCatalogThreads,
|
||||
CurrentMetrics::IcebergCatalogThreadsActive,
|
||||
CurrentMetrics::IcebergCatalogThreadsScheduled,
|
||||
num_threads);
|
||||
|
||||
DB::ThreadPoolCallbackRunnerLocal<void> runner(pool, "RestCatalog");
|
||||
|
||||
Tables tables;
|
||||
std::mutex mutex;
|
||||
|
||||
auto func = [&](const std::string & current_namespace)
|
||||
{
|
||||
runner(
|
||||
[&]{
|
||||
auto tables_in_namespace = getTables(current_namespace);
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
std::move(tables_in_namespace.begin(), tables_in_namespace.end(), std::back_inserter(tables));
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
Namespaces namespaces;
|
||||
getNamespacesRecursive("", namespaces, {}, func);
|
||||
|
||||
runner.waitForAllToFinishAndRethrowFirstError();
|
||||
return tables;
|
||||
}
|
||||
|
||||
void RestCatalog::getNamespacesRecursive(
|
||||
const std::string & base_namespace,
|
||||
Namespaces & result,
|
||||
StopCondition stop_condition,
|
||||
ExecuteFunc func) const
|
||||
{
|
||||
auto namespaces = getNamespaces(base_namespace);
|
||||
result.reserve(result.size() + namespaces.size());
|
||||
result.insert(result.end(), namespaces.begin(), namespaces.end());
|
||||
|
||||
for (const auto & current_namespace : namespaces)
|
||||
{
|
||||
chassert(current_namespace.starts_with(base_namespace));
|
||||
|
||||
if (stop_condition && stop_condition(current_namespace))
|
||||
break;
|
||||
|
||||
if (func)
|
||||
func(current_namespace);
|
||||
|
||||
getNamespacesRecursive(current_namespace, result, stop_condition, func);
|
||||
}
|
||||
}
|
||||
|
||||
Poco::URI::QueryParameters RestCatalog::createParentNamespaceParams(const std::string & base_namespace) const
|
||||
{
|
||||
std::vector<std::string> parts;
|
||||
splitInto<'.'>(parts, base_namespace);
|
||||
std::string parent_param;
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
if (!parent_param.empty())
|
||||
parent_param += static_cast<char>(0x1F);
|
||||
parent_param += part;
|
||||
}
|
||||
return {{"parent", parent_param}};
|
||||
}
|
||||
|
||||
RestCatalog::Namespaces RestCatalog::getNamespaces(const std::string & base_namespace) const
|
||||
{
|
||||
Poco::URI::QueryParameters params;
|
||||
if (!base_namespace.empty())
|
||||
params = createParentNamespaceParams(base_namespace);
|
||||
|
||||
try
|
||||
{
|
||||
auto buf = createReadBuffer(config.prefix / namespaces_endpoint, params);
|
||||
auto namespaces = parseNamespaces(*buf, base_namespace);
|
||||
LOG_TEST(log, "Loaded {} namespaces", namespaces.size());
|
||||
return namespaces;
|
||||
}
|
||||
catch (const DB::HTTPException & e)
|
||||
{
|
||||
std::string message = fmt::format(
|
||||
"Received error while fetching list of namespaces from iceberg catalog `{}`. ",
|
||||
warehouse);
|
||||
|
||||
if (e.code() == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND)
|
||||
message += "Namespace provided in the `parent` query parameter is not found. ";
|
||||
|
||||
message += fmt::format(
|
||||
"Code: {}, status: {}, message: {}",
|
||||
e.code(), e.getHTTPStatus(), e.displayText());
|
||||
|
||||
throw DB::Exception(DB::ErrorCodes::ICEBERG_CATALOG_ERROR, "{}", message);
|
||||
}
|
||||
}
|
||||
|
||||
RestCatalog::Namespaces RestCatalog::parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const
|
||||
{
|
||||
if (buf.eof())
|
||||
return {};
|
||||
|
||||
String json_str;
|
||||
readJSONObjectPossiblyInvalid(json_str, buf);
|
||||
|
||||
LOG_TEST(log, "Received response: {}", json_str);
|
||||
|
||||
Poco::JSON::Parser parser;
|
||||
Poco::Dynamic::Var json = parser.parse(json_str);
|
||||
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
auto namespaces_object = object->get("namespaces").extract<Poco::JSON::Array::Ptr>();
|
||||
if (!namespaces_object)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result");
|
||||
|
||||
Namespaces namespaces;
|
||||
for (size_t i = 0; i < namespaces_object->size(); ++i)
|
||||
{
|
||||
auto current_namespace_array = namespaces_object->get(static_cast<int>(i)).extract<Poco::JSON::Array::Ptr>();
|
||||
if (current_namespace_array->size() == 0)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expected namespace array to be non-empty");
|
||||
|
||||
const int idx = static_cast<int>(current_namespace_array->size()) - 1;
|
||||
const auto current_namespace = current_namespace_array->get(idx).extract<String>();
|
||||
const auto full_namespace = base_namespace.empty()
|
||||
? current_namespace
|
||||
: base_namespace + "." + current_namespace;
|
||||
|
||||
namespaces.push_back(full_namespace);
|
||||
}
|
||||
|
||||
return namespaces;
|
||||
}
|
||||
|
||||
RestCatalog::Tables RestCatalog::getTables(const std::string & base_namespace, size_t limit) const
|
||||
{
|
||||
const auto endpoint = std::string(namespaces_endpoint) + "/" + base_namespace + "/tables";
|
||||
auto buf = createReadBuffer(config.prefix / endpoint);
|
||||
return parseTables(*buf, base_namespace, limit);
|
||||
}
|
||||
|
||||
RestCatalog::Tables RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const
|
||||
{
|
||||
if (buf.eof())
|
||||
return {};
|
||||
|
||||
String json_str;
|
||||
readJSONObjectPossiblyInvalid(json_str, buf);
|
||||
|
||||
Poco::JSON::Parser parser;
|
||||
Poco::Dynamic::Var json = parser.parse(json_str);
|
||||
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
auto identifiers_object = object->get("identifiers").extract<Poco::JSON::Array::Ptr>();
|
||||
if (!identifiers_object)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result");
|
||||
|
||||
Tables tables;
|
||||
for (size_t i = 0; i < identifiers_object->size(); ++i)
|
||||
{
|
||||
const auto current_table_json = identifiers_object->get(static_cast<int>(i)).extract<Poco::JSON::Object::Ptr>();
|
||||
const auto table_name = current_table_json->get("name").extract<String>();
|
||||
|
||||
tables.push_back(base_namespace + "." + table_name);
|
||||
if (limit && tables.size() >= limit)
|
||||
break;
|
||||
}
|
||||
return tables;
|
||||
}
|
||||
|
||||
bool RestCatalog::existsTable(const std::string & namespace_name, const std::string & table_name) const
|
||||
{
|
||||
TableMetadata table_metadata;
|
||||
return tryGetTableMetadata(namespace_name, table_name, table_metadata);
|
||||
}
|
||||
|
||||
bool RestCatalog::tryGetTableMetadata(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const
|
||||
{
|
||||
try
|
||||
{
|
||||
return getTableMetadataImpl(namespace_name, table_name, result);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
DB::tryLogCurrentException(log);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void RestCatalog::getTableMetadata(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const
|
||||
{
|
||||
if (!getTableMetadataImpl(namespace_name, table_name, result))
|
||||
throw DB::Exception(DB::ErrorCodes::ICEBERG_CATALOG_ERROR, "No response from iceberg catalog");
|
||||
}
|
||||
|
||||
bool RestCatalog::getTableMetadataImpl(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const
|
||||
{
|
||||
LOG_TEST(log, "Checking table {} in namespace {}", table_name, namespace_name);
|
||||
|
||||
const auto endpoint = std::string(namespaces_endpoint) + "/" + namespace_name + "/tables/" + table_name;
|
||||
auto buf = createReadBuffer(config.prefix / endpoint);
|
||||
|
||||
if (buf->eof())
|
||||
{
|
||||
LOG_TEST(log, "Table doesn't exist (endpoint: {})", endpoint);
|
||||
return false;
|
||||
}
|
||||
|
||||
String json_str;
|
||||
readJSONObjectPossiblyInvalid(json_str, *buf);
|
||||
|
||||
LOG_TEST(log, "Received metadata for table {}: {}", table_name, json_str);
|
||||
|
||||
Poco::JSON::Parser parser;
|
||||
Poco::Dynamic::Var json = parser.parse(json_str);
|
||||
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
auto metadata_object = object->get("metadata").extract<Poco::JSON::Object::Ptr>();
|
||||
if (!metadata_object)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result");
|
||||
|
||||
if (result.requiresLocation())
|
||||
{
|
||||
const auto location = metadata_object->get("location").extract<String>();
|
||||
result.setLocation(location);
|
||||
LOG_TEST(log, "Location for table {}: {}", table_name, location);
|
||||
}
|
||||
|
||||
if (result.requiresSchema())
|
||||
{
|
||||
int format_version = metadata_object->getValue<int>("format-version");
|
||||
result.setSchema(DB::IcebergMetadata::parseTableSchema(metadata_object, format_version, true).first);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
111
src/Databases/Iceberg/RestCatalog.h
Normal file
111
src/Databases/Iceberg/RestCatalog.h
Normal file
@ -0,0 +1,111 @@
|
||||
#pragma once
|
||||
#include <Databases/Iceberg/ICatalog.h>
|
||||
#include <Poco/Net/HTTPBasicCredentials.h>
|
||||
#include <IO/ReadWriteBufferFromHTTP.h>
|
||||
#include <IO/HTTPHeaderEntries.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <filesystem>
|
||||
#include <Poco/JSON/Object.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ReadBuffer;
|
||||
}
|
||||
|
||||
namespace Iceberg
|
||||
{
|
||||
|
||||
class RestCatalog final : public ICatalog, private DB::WithContext
|
||||
{
|
||||
public:
|
||||
explicit RestCatalog(
|
||||
const std::string & warehouse_,
|
||||
const std::string & base_url_,
|
||||
const std::string & catalog_credential_,
|
||||
const std::string & auth_header_,
|
||||
DB::ContextPtr context_);
|
||||
|
||||
~RestCatalog() override = default;
|
||||
|
||||
bool empty() const override;
|
||||
|
||||
Tables getTables() const override;
|
||||
|
||||
bool existsTable(const std::string & namespace_name, const std::string & table_name) const override;
|
||||
|
||||
void getTableMetadata(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const override;
|
||||
|
||||
bool tryGetTableMetadata(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const override;
|
||||
|
||||
std::optional<StorageType> getStorageType() const override;
|
||||
|
||||
private:
|
||||
struct Config
|
||||
{
|
||||
/// Prefix is a path of the catalog endpoint,
|
||||
/// e.g. /v1/{prefix}/namespaces/{namespace}/tables/{table}
|
||||
std::filesystem::path prefix;
|
||||
/// Base location is location of data in storage
|
||||
/// (in filesystem or object storage).
|
||||
std::string default_base_location;
|
||||
|
||||
std::string toString() const;
|
||||
};
|
||||
|
||||
const std::filesystem::path base_url;
|
||||
const LoggerPtr log;
|
||||
|
||||
/// Catalog configuration settings from /v1/config endpoint.
|
||||
Config config;
|
||||
|
||||
/// Auth headers of format: "Authorization": "<auth_scheme> <token>"
|
||||
std::optional<DB::HTTPHeaderEntry> auth_header;
|
||||
|
||||
/// Parameters for OAuth.
|
||||
std::string client_id;
|
||||
std::string client_secret;
|
||||
mutable std::optional<std::string> access_token;
|
||||
|
||||
Poco::Net::HTTPBasicCredentials credentials{};
|
||||
|
||||
DB::ReadWriteBufferFromHTTPPtr createReadBuffer(
|
||||
const std::string & endpoint,
|
||||
const Poco::URI::QueryParameters & params = {}) const;
|
||||
|
||||
Poco::URI::QueryParameters createParentNamespaceParams(const std::string & base_namespace) const;
|
||||
|
||||
using StopCondition = std::function<bool(const std::string & namespace_name)>;
|
||||
using ExecuteFunc = std::function<void(const std::string & namespace_name)>;
|
||||
|
||||
void getNamespacesRecursive(
|
||||
const std::string & base_namespace,
|
||||
Namespaces & result,
|
||||
StopCondition stop_condition,
|
||||
ExecuteFunc func) const;
|
||||
|
||||
Namespaces getNamespaces(const std::string & base_namespace) const;
|
||||
|
||||
Namespaces parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const;
|
||||
|
||||
Tables getTables(const std::string & base_namespace, size_t limit = 0) const;
|
||||
|
||||
Tables parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const;
|
||||
|
||||
bool getTableMetadataImpl(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const;
|
||||
|
||||
Config loadConfig();
|
||||
std::string retrieveAccessToken() const;
|
||||
DB::HTTPHeaderEntries getHeaders(bool update_token = false) const;
|
||||
static void parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result);
|
||||
};
|
||||
|
||||
}
|
@ -36,6 +36,10 @@ void registerDatabaseS3(DatabaseFactory & factory);
|
||||
void registerDatabaseHDFS(DatabaseFactory & factory);
|
||||
#endif
|
||||
|
||||
#if USE_AVRO
|
||||
void registerDatabaseIceberg(DatabaseFactory & factory);
|
||||
#endif
|
||||
|
||||
void registerDatabases()
|
||||
{
|
||||
auto & factory = DatabaseFactory::instance();
|
||||
@ -68,5 +72,9 @@ void registerDatabases()
|
||||
#if USE_HDFS
|
||||
registerDatabaseHDFS(factory);
|
||||
#endif
|
||||
|
||||
#if USE_AVRO
|
||||
registerDatabaseIceberg(factory);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
@ -237,6 +237,7 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
|
||||
null_modifier.emplace(true);
|
||||
}
|
||||
|
||||
bool is_comment = false;
|
||||
/// Collate is also allowed after NULL/NOT NULL
|
||||
if (!collation_expression && s_collate.ignore(pos, expected)
|
||||
&& !collation_parser.parse(pos, collation_expression, expected))
|
||||
@ -254,7 +255,9 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
|
||||
else if (s_ephemeral.ignore(pos, expected))
|
||||
{
|
||||
default_specifier = s_ephemeral.getName();
|
||||
if (!expr_parser.parse(pos, default_expression, expected) && type)
|
||||
if (s_comment.ignore(pos, expected))
|
||||
is_comment = true;
|
||||
if ((is_comment || !expr_parser.parse(pos, default_expression, expected)) && type)
|
||||
{
|
||||
ephemeral_default = true;
|
||||
|
||||
@ -289,6 +292,8 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
|
||||
if (require_type && !type && !default_expression)
|
||||
return false; /// reject column name without type
|
||||
|
||||
if (!is_comment)
|
||||
{
|
||||
if ((type || default_expression) && allow_null_modifiers && !null_modifier.has_value())
|
||||
{
|
||||
if (s_not.ignore(pos, expected))
|
||||
@ -300,8 +305,9 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
|
||||
else if (s_null.ignore(pos, expected))
|
||||
null_modifier.emplace(true);
|
||||
}
|
||||
}
|
||||
|
||||
if (s_comment.ignore(pos, expected))
|
||||
if (is_comment || s_comment.ignore(pos, expected))
|
||||
{
|
||||
/// should be followed by a string literal
|
||||
if (!string_literal_parser.parse(pos, comment_expression, expected))
|
||||
|
@ -70,9 +70,6 @@ IcebergMetadata::IcebergMetadata(
|
||||
{
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
enum class ManifestEntryStatus : uint8_t
|
||||
{
|
||||
EXISTING = 0,
|
||||
@ -248,7 +245,7 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t
|
||||
|
||||
}
|
||||
|
||||
std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution)
|
||||
std::pair<NamesAndTypesList, Int32> IcebergMetadata::parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution)
|
||||
{
|
||||
Poco::JSON::Object::Ptr schema;
|
||||
Int32 current_schema_id;
|
||||
@ -313,9 +310,9 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
|
||||
for (size_t i = 0; i != fields->size(); ++i)
|
||||
{
|
||||
auto field = fields->getObject(static_cast<UInt32>(i));
|
||||
auto name = field->getValue<String>("name");
|
||||
auto column_name = field->getValue<String>("name");
|
||||
bool required = field->getValue<bool>("required");
|
||||
names_and_types.push_back({name, getFieldType(field, "type", required)});
|
||||
names_and_types.push_back({column_name, getFieldType(field, "type", required)});
|
||||
}
|
||||
|
||||
return {std::move(names_and_types), current_schema_id};
|
||||
@ -380,8 +377,6 @@ std::pair<Int32, String> getMetadataFileAndVersion(
|
||||
return *std::max_element(metadata_files_with_versions.begin(), metadata_files_with_versions.end());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
DataLakeMetadataPtr
|
||||
IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context)
|
||||
{
|
||||
|
@ -96,6 +96,11 @@ public:
|
||||
|
||||
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context);
|
||||
|
||||
static std::pair<NamesAndTypesList, Int32> parseTableSchema(
|
||||
const Poco::JSON::Object::Ptr & metadata_object,
|
||||
int format_version,
|
||||
bool ignore_schema_evolution);
|
||||
|
||||
private:
|
||||
size_t getVersion() const { return metadata_version; }
|
||||
|
||||
|
@ -84,7 +84,8 @@ StorageObjectStorage::StorageObjectStorage(
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
LoadingStrictnessLevel mode,
|
||||
bool distributed_processing_,
|
||||
ASTPtr partition_by_)
|
||||
ASTPtr partition_by_,
|
||||
bool lazy_init)
|
||||
: IStorage(table_id_)
|
||||
, configuration(configuration_)
|
||||
, object_storage(object_storage_)
|
||||
@ -95,6 +96,7 @@ StorageObjectStorage::StorageObjectStorage(
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!lazy_init)
|
||||
configuration->update(object_storage, context);
|
||||
}
|
||||
catch (...)
|
||||
|
@ -59,7 +59,8 @@ public:
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
LoadingStrictnessLevel mode,
|
||||
bool distributed_processing_ = false,
|
||||
ASTPtr partition_by_ = nullptr);
|
||||
ASTPtr partition_by_ = nullptr,
|
||||
bool lazy_init = false);
|
||||
|
||||
String getName() const override;
|
||||
|
||||
|
@ -226,6 +226,26 @@ template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConf
|
||||
#endif
|
||||
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
|
||||
|
||||
#if USE_AVRO && USE_AWS_S3
|
||||
template class TableFunctionObjectStorage<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
|
||||
template class TableFunctionObjectStorage<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AVRO && USE_HDFS
|
||||
template class TableFunctionObjectStorage<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_PARQUET && USE_AWS_S3
|
||||
template class TableFunctionObjectStorage<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AWS_S3
|
||||
template class TableFunctionObjectStorage<HudiClusterDefinition, StorageS3HudiConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AVRO
|
||||
void registerTableFunctionIceberg(TableFunctionFactory & factory)
|
||||
{
|
||||
|
@ -96,7 +96,7 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
|
||||
{
|
||||
.documentation = {
|
||||
.description=R"(The table function can be used to read the data stored on HDFS in parallel for many nodes in a specified cluster.)",
|
||||
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster_name, uri, format)", ""}}},
|
||||
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster, uri, format)", ""}}},
|
||||
.allow_readonly = false
|
||||
}
|
||||
);
|
||||
@ -105,15 +105,77 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
|
||||
UNUSED(factory);
|
||||
}
|
||||
|
||||
|
||||
#if USE_AVRO
|
||||
void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
|
||||
{
|
||||
UNUSED(factory);
|
||||
|
||||
#if USE_AWS_S3
|
||||
template class TableFunctionObjectStorageCluster<S3ClusterDefinition, StorageS3Configuration>;
|
||||
factory.registerFunction<TableFunctionIcebergS3Cluster>(
|
||||
{.documentation
|
||||
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)",
|
||||
.examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}},
|
||||
.categories{"DataLake"}},
|
||||
.allow_readonly = false});
|
||||
#endif
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
template class TableFunctionObjectStorageCluster<AzureClusterDefinition, StorageAzureConfiguration>;
|
||||
factory.registerFunction<TableFunctionIcebergAzureCluster>(
|
||||
{.documentation
|
||||
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)",
|
||||
.examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])", ""}},
|
||||
.categories{"DataLake"}},
|
||||
.allow_readonly = false});
|
||||
#endif
|
||||
|
||||
#if USE_HDFS
|
||||
template class TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
|
||||
factory.registerFunction<TableFunctionIcebergHDFSCluster>(
|
||||
{.documentation
|
||||
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)",
|
||||
.examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(cluster, uri, [format], [structure], [compression_method])", ""}},
|
||||
.categories{"DataLake"}},
|
||||
.allow_readonly = false});
|
||||
#endif
|
||||
}
|
||||
#endif
|
||||
|
||||
#if USE_AWS_S3
|
||||
#if USE_PARQUET
|
||||
void registerTableFunctionDeltaLakeCluster(TableFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<TableFunctionDeltaLakeCluster>(
|
||||
{.documentation
|
||||
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)",
|
||||
.examples{{"deltaLakeCluster", "SELECT * FROM deltaLakeCluster(cluster, url, access_key_id, secret_access_key)", ""}},
|
||||
.categories{"DataLake"}},
|
||||
.allow_readonly = false});
|
||||
}
|
||||
#endif
|
||||
|
||||
void registerTableFunctionHudiCluster(TableFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<TableFunctionHudiCluster>(
|
||||
{.documentation
|
||||
= {.description = R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster.)",
|
||||
.examples{{"hudiCluster", "SELECT * FROM hudiCluster(cluster, url, access_key_id, secret_access_key)", ""}},
|
||||
.categories{"DataLake"}},
|
||||
.allow_readonly = false});
|
||||
}
|
||||
#endif
|
||||
|
||||
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory)
|
||||
{
|
||||
UNUSED(factory);
|
||||
#if USE_AVRO
|
||||
registerTableFunctionIcebergCluster(factory);
|
||||
#endif
|
||||
#if USE_AWS_S3
|
||||
#if USE_PARQUET
|
||||
registerTableFunctionDeltaLakeCluster(factory);
|
||||
#endif
|
||||
registerTableFunctionHudiCluster(factory);
|
||||
#endif
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -33,6 +33,36 @@ struct HDFSClusterDefinition
|
||||
static constexpr auto storage_type_name = "HDFSCluster";
|
||||
};
|
||||
|
||||
struct IcebergS3ClusterDefinition
|
||||
{
|
||||
static constexpr auto name = "icebergS3Cluster";
|
||||
static constexpr auto storage_type_name = "IcebergS3Cluster";
|
||||
};
|
||||
|
||||
struct IcebergAzureClusterDefinition
|
||||
{
|
||||
static constexpr auto name = "icebergAzureCluster";
|
||||
static constexpr auto storage_type_name = "IcebergAzureCluster";
|
||||
};
|
||||
|
||||
struct IcebergHDFSClusterDefinition
|
||||
{
|
||||
static constexpr auto name = "icebergHDFSCluster";
|
||||
static constexpr auto storage_type_name = "IcebergHDFSCluster";
|
||||
};
|
||||
|
||||
struct DeltaLakeClusterDefinition
|
||||
{
|
||||
static constexpr auto name = "deltaLakeCluster";
|
||||
static constexpr auto storage_type_name = "DeltaLakeS3Cluster";
|
||||
};
|
||||
|
||||
struct HudiClusterDefinition
|
||||
{
|
||||
static constexpr auto name = "hudiCluster";
|
||||
static constexpr auto storage_type_name = "HudiS3Cluster";
|
||||
};
|
||||
|
||||
/**
|
||||
* Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions,
|
||||
* which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster.
|
||||
@ -79,4 +109,25 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster<AzureClu
|
||||
#if USE_HDFS
|
||||
using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AVRO && USE_AWS_S3
|
||||
using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
|
||||
using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AVRO && USE_HDFS
|
||||
using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AWS_S3 && USE_PARQUET
|
||||
using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AWS_S3
|
||||
using TableFunctionHudiCluster = TableFunctionObjectStorageCluster<HudiClusterDefinition, StorageS3HudiConfiguration>;
|
||||
#endif
|
||||
|
||||
}
|
||||
|
@ -66,6 +66,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]]
|
||||
registerTableFunctionObjectStorage(factory);
|
||||
registerTableFunctionObjectStorageCluster(factory);
|
||||
registerDataLakeTableFunctions(factory);
|
||||
registerDataLakeClusterTableFunctions(factory);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -70,6 +70,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
|
||||
void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
|
||||
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
|
||||
void registerDataLakeTableFunctions(TableFunctionFactory & factory);
|
||||
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);
|
||||
|
||||
void registerTableFunctionTimeSeries(TableFunctionFactory & factory);
|
||||
|
||||
|
@ -299,8 +299,6 @@ class TagAttrs:
|
||||
|
||||
# Only one latest can exist
|
||||
latest: ClickHouseVersion
|
||||
# Only one can be a major one (the most fresh per a year)
|
||||
majors: Dict[int, ClickHouseVersion]
|
||||
# Only one lts version can exist
|
||||
lts: Optional[ClickHouseVersion]
|
||||
|
||||
@ -345,14 +343,6 @@ def ldf_tags(version: ClickHouseVersion, distro: str, tag_attrs: TagAttrs) -> st
|
||||
tags.append("lts")
|
||||
tags.append(f"lts-{distro}")
|
||||
|
||||
# If the tag `22`, `23`, `24` etc. should be included in the tags
|
||||
with_major = tag_attrs.majors.get(version.major) in (None, version)
|
||||
if with_major:
|
||||
tag_attrs.majors[version.major] = version
|
||||
if without_distro:
|
||||
tags.append(f"{version.major}")
|
||||
tags.append(f"{version.major}-{distro}")
|
||||
|
||||
# Add all normal tags
|
||||
for tag in (
|
||||
f"{version.major}.{version.minor}",
|
||||
@ -384,7 +374,7 @@ def generate_ldf(args: argparse.Namespace) -> None:
|
||||
args.directory / git_runner(f"git -C {args.directory} rev-parse --show-cdup")
|
||||
).absolute()
|
||||
lines = ldf_header(git, directory)
|
||||
tag_attrs = TagAttrs(versions[-1], {}, None)
|
||||
tag_attrs = TagAttrs(versions[-1], None)
|
||||
|
||||
# We iterate from the most recent to the oldest version
|
||||
for version in reversed(versions):
|
||||
|
@ -0,0 +1,59 @@
|
||||
services:
|
||||
spark-iceberg:
|
||||
image: tabulario/spark-iceberg
|
||||
container_name: spark-iceberg
|
||||
build: spark/
|
||||
depends_on:
|
||||
- rest
|
||||
- minio
|
||||
environment:
|
||||
- AWS_ACCESS_KEY_ID=admin
|
||||
- AWS_SECRET_ACCESS_KEY=password
|
||||
- AWS_REGION=us-east-1
|
||||
ports:
|
||||
- 8080:8080
|
||||
- 10000:10000
|
||||
- 10001:10001
|
||||
rest:
|
||||
image: tabulario/iceberg-rest
|
||||
ports:
|
||||
- 8182:8181
|
||||
environment:
|
||||
- AWS_ACCESS_KEY_ID=minio
|
||||
- AWS_SECRET_ACCESS_KEY=minio123
|
||||
- AWS_REGION=us-east-1
|
||||
- CATALOG_WAREHOUSE=s3://iceberg_data/
|
||||
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
|
||||
- CATALOG_S3_ENDPOINT=http://minio:9000
|
||||
minio:
|
||||
image: minio/minio
|
||||
container_name: minio
|
||||
environment:
|
||||
- MINIO_ROOT_USER=minio
|
||||
- MINIO_ROOT_PASSWORD=minio123
|
||||
- MINIO_DOMAIN=minio
|
||||
networks:
|
||||
default:
|
||||
aliases:
|
||||
- warehouse.minio
|
||||
ports:
|
||||
- 9001:9001
|
||||
- 9002:9000
|
||||
command: ["server", "/data", "--console-address", ":9001"]
|
||||
mc:
|
||||
depends_on:
|
||||
- minio
|
||||
image: minio/mc
|
||||
container_name: mc
|
||||
environment:
|
||||
- AWS_ACCESS_KEY_ID=minio
|
||||
- AWS_SECRET_ACCESS_KEY=minio123
|
||||
- AWS_REGION=us-east-1
|
||||
entrypoint: >
|
||||
/bin/sh -c "
|
||||
until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done;
|
||||
/usr/bin/mc rm -r --force minio/warehouse;
|
||||
/usr/bin/mc mb minio/warehouse --ignore-existing;
|
||||
/usr/bin/mc policy set public minio/warehouse;
|
||||
tail -f /dev/null
|
||||
"
|
@ -14,6 +14,10 @@ services:
|
||||
depends_on:
|
||||
- proxy1
|
||||
- proxy2
|
||||
networks:
|
||||
default:
|
||||
aliases:
|
||||
- warehouse.minio
|
||||
|
||||
# HTTP proxies for Minio.
|
||||
proxy1:
|
||||
|
@ -568,6 +568,7 @@ class ClickHouseCluster:
|
||||
self.resolver_logs_dir = os.path.join(self.instances_dir, "resolver")
|
||||
|
||||
self.spark_session = None
|
||||
self.with_iceberg_catalog = False
|
||||
|
||||
self.with_azurite = False
|
||||
self.azurite_container = "azurite-container"
|
||||
@ -1464,6 +1465,26 @@ class ClickHouseCluster:
|
||||
)
|
||||
return self.base_minio_cmd
|
||||
|
||||
def setup_iceberg_catalog_cmd(
|
||||
self, instance, env_variables, docker_compose_yml_dir
|
||||
):
|
||||
self.base_cmd.extend(
|
||||
[
|
||||
"--file",
|
||||
p.join(
|
||||
docker_compose_yml_dir, "docker_compose_iceberg_rest_catalog.yml"
|
||||
),
|
||||
]
|
||||
)
|
||||
self.base_iceberg_catalog_cmd = self.compose_cmd(
|
||||
"--env-file",
|
||||
instance.env_file,
|
||||
"--file",
|
||||
p.join(docker_compose_yml_dir, "docker_compose_iceberg_rest_catalog.yml"),
|
||||
)
|
||||
return self.base_iceberg_catalog_cmd
|
||||
# return self.base_minio_cmd
|
||||
|
||||
def setup_azurite_cmd(self, instance, env_variables, docker_compose_yml_dir):
|
||||
self.with_azurite = True
|
||||
env_variables["AZURITE_PORT"] = str(self.azurite_port)
|
||||
@ -1630,6 +1651,7 @@ class ClickHouseCluster:
|
||||
with_hive=False,
|
||||
with_coredns=False,
|
||||
with_prometheus=False,
|
||||
with_iceberg_catalog=False,
|
||||
handle_prometheus_remote_write=False,
|
||||
handle_prometheus_remote_read=False,
|
||||
use_old_analyzer=None,
|
||||
@ -1733,6 +1755,7 @@ class ClickHouseCluster:
|
||||
with_coredns=with_coredns,
|
||||
with_cassandra=with_cassandra,
|
||||
with_ldap=with_ldap,
|
||||
with_iceberg_catalog=with_iceberg_catalog,
|
||||
use_old_analyzer=use_old_analyzer,
|
||||
server_bin_path=self.server_bin_path,
|
||||
odbc_bridge_bin_path=self.odbc_bridge_bin_path,
|
||||
@ -1922,6 +1945,13 @@ class ClickHouseCluster:
|
||||
self.setup_minio_cmd(instance, env_variables, docker_compose_yml_dir)
|
||||
)
|
||||
|
||||
if with_iceberg_catalog and not self.with_iceberg_catalog:
|
||||
cmds.append(
|
||||
self.setup_iceberg_catalog_cmd(
|
||||
instance, env_variables, docker_compose_yml_dir
|
||||
)
|
||||
)
|
||||
|
||||
if with_azurite and not self.with_azurite:
|
||||
cmds.append(
|
||||
self.setup_azurite_cmd(instance, env_variables, docker_compose_yml_dir)
|
||||
@ -3390,6 +3420,7 @@ class ClickHouseInstance:
|
||||
with_coredns,
|
||||
with_cassandra,
|
||||
with_ldap,
|
||||
with_iceberg_catalog,
|
||||
use_old_analyzer,
|
||||
server_bin_path,
|
||||
odbc_bridge_bin_path,
|
||||
|
0
tests/integration/test_database_iceberg/__init__.py
Normal file
0
tests/integration/test_database_iceberg/__init__.py
Normal file
306
tests/integration/test_database_iceberg/test.py
Normal file
306
tests/integration/test_database_iceberg/test.py
Normal file
@ -0,0 +1,306 @@
|
||||
import glob
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import pyarrow as pa
|
||||
import pytest
|
||||
import requests
|
||||
import urllib3
|
||||
from minio import Minio
|
||||
from pyiceberg.catalog import load_catalog
|
||||
from pyiceberg.partitioning import PartitionField, PartitionSpec
|
||||
from pyiceberg.schema import Schema
|
||||
from pyiceberg.table.sorting import SortField, SortOrder
|
||||
from pyiceberg.transforms import DayTransform, IdentityTransform
|
||||
from pyiceberg.types import (
|
||||
DoubleType,
|
||||
FloatType,
|
||||
NestedField,
|
||||
StringType,
|
||||
StructType,
|
||||
TimestampType,
|
||||
)
|
||||
|
||||
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm
|
||||
from helpers.s3_tools import get_file_contents, list_s3_objects, prepare_s3_bucket
|
||||
from helpers.test_tools import TSV, csv_compare
|
||||
|
||||
BASE_URL = "http://rest:8181/v1"
|
||||
BASE_URL_LOCAL = "http://localhost:8182/v1"
|
||||
BASE_URL_LOCAL_RAW = "http://localhost:8182"
|
||||
|
||||
CATALOG_NAME = "demo"
|
||||
|
||||
DEFAULT_SCHEMA = Schema(
|
||||
NestedField(
|
||||
field_id=1, name="datetime", field_type=TimestampType(), required=False
|
||||
),
|
||||
NestedField(field_id=2, name="symbol", field_type=StringType(), required=False),
|
||||
NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False),
|
||||
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
|
||||
NestedField(
|
||||
field_id=5,
|
||||
name="details",
|
||||
field_type=StructType(
|
||||
NestedField(
|
||||
field_id=4,
|
||||
name="created_by",
|
||||
field_type=StringType(),
|
||||
required=False,
|
||||
),
|
||||
),
|
||||
required=False,
|
||||
),
|
||||
)
|
||||
|
||||
DEFAULT_CREATE_TABLE = "CREATE TABLE {}.`{}.{}`\\n(\\n `datetime` Nullable(DateTime64(6)),\\n `symbol` Nullable(String),\\n `bid` Nullable(Float64),\\n `ask` Nullable(Float64),\\n `details` Tuple(created_by Nullable(String))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n"
|
||||
|
||||
DEFAULT_PARTITION_SPEC = PartitionSpec(
|
||||
PartitionField(
|
||||
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
|
||||
)
|
||||
)
|
||||
|
||||
DEFAULT_SORT_ORDER = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
|
||||
|
||||
|
||||
def list_namespaces():
|
||||
response = requests.get(f"{BASE_URL_LOCAL}/namespaces")
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
raise Exception(f"Failed to list namespaces: {response.status_code}")
|
||||
|
||||
|
||||
def load_catalog_impl(started_cluster):
|
||||
return load_catalog(
|
||||
CATALOG_NAME,
|
||||
**{
|
||||
"uri": BASE_URL_LOCAL_RAW,
|
||||
"type": "rest",
|
||||
"s3.endpoint": f"http://localhost:9002",
|
||||
"s3.access-key-id": "minio",
|
||||
"s3.secret-access-key": "minio123",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def create_table(
|
||||
catalog,
|
||||
namespace,
|
||||
table,
|
||||
schema=DEFAULT_SCHEMA,
|
||||
partition_spec=DEFAULT_PARTITION_SPEC,
|
||||
sort_order=DEFAULT_SORT_ORDER,
|
||||
):
|
||||
return catalog.create_table(
|
||||
identifier=f"{namespace}.{table}",
|
||||
schema=schema,
|
||||
location=f"s3://warehouse/data",
|
||||
partition_spec=partition_spec,
|
||||
sort_order=sort_order,
|
||||
)
|
||||
|
||||
|
||||
def generate_record():
|
||||
return {
|
||||
"datetime": datetime.now(),
|
||||
"symbol": str("kek"),
|
||||
"bid": round(random.uniform(100, 200), 2),
|
||||
"ask": round(random.uniform(200, 300), 2),
|
||||
"details": {"created_by": "Alice Smith"},
|
||||
}
|
||||
|
||||
|
||||
def create_clickhouse_iceberg_database(started_cluster, node, name):
|
||||
node.query(
|
||||
f"""
|
||||
DROP DATABASE IF EXISTS {name};
|
||||
CREATE DATABASE {name} ENGINE = Iceberg('{BASE_URL}', 'minio', 'minio123')
|
||||
SETTINGS catalog_type = 'rest', storage_endpoint = 'http://minio:9000/'
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def print_objects():
|
||||
minio_client = Minio(
|
||||
f"localhost:9002",
|
||||
access_key="minio",
|
||||
secret_key="minio123",
|
||||
secure=False,
|
||||
http_client=urllib3.PoolManager(cert_reqs="CERT_NONE"),
|
||||
)
|
||||
|
||||
objects = list(minio_client.list_objects("warehouse", "", recursive=True))
|
||||
names = [x.object_name for x in objects]
|
||||
names.sort()
|
||||
for name in names:
|
||||
print(f"Found object: {name}")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
cluster.add_instance(
|
||||
"node1",
|
||||
main_configs=[],
|
||||
user_configs=[],
|
||||
stay_alive=True,
|
||||
with_iceberg_catalog=True,
|
||||
)
|
||||
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
|
||||
# TODO: properly wait for container
|
||||
time.sleep(10)
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_list_tables(started_cluster):
|
||||
node = started_cluster.instances["node1"]
|
||||
|
||||
root_namespace = f"clickhouse_{uuid.uuid4()}"
|
||||
namespace_1 = f"{root_namespace}.testA.A"
|
||||
namespace_2 = f"{root_namespace}.testB.B"
|
||||
namespace_1_tables = ["tableA", "tableB"]
|
||||
namespace_2_tables = ["tableC", "tableD"]
|
||||
|
||||
catalog = load_catalog_impl(started_cluster)
|
||||
|
||||
for namespace in [namespace_1, namespace_2]:
|
||||
catalog.create_namespace(namespace)
|
||||
|
||||
found = False
|
||||
for namespace_list in list_namespaces()["namespaces"]:
|
||||
if root_namespace == namespace_list[0]:
|
||||
found = True
|
||||
break
|
||||
assert found
|
||||
|
||||
found = False
|
||||
for namespace_list in catalog.list_namespaces():
|
||||
if root_namespace == namespace_list[0]:
|
||||
found = True
|
||||
break
|
||||
assert found
|
||||
|
||||
for namespace in [namespace_1, namespace_2]:
|
||||
assert len(catalog.list_tables(namespace)) == 0
|
||||
|
||||
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
|
||||
|
||||
tables_list = ""
|
||||
for table in namespace_1_tables:
|
||||
create_table(catalog, namespace_1, table)
|
||||
if len(tables_list) > 0:
|
||||
tables_list += "\n"
|
||||
tables_list += f"{namespace_1}.{table}"
|
||||
|
||||
for table in namespace_2_tables:
|
||||
create_table(catalog, namespace_2, table)
|
||||
if len(tables_list) > 0:
|
||||
tables_list += "\n"
|
||||
tables_list += f"{namespace_2}.{table}"
|
||||
|
||||
assert (
|
||||
tables_list
|
||||
== node.query(
|
||||
f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' and name ILIKE '{root_namespace}%' ORDER BY name"
|
||||
).strip()
|
||||
)
|
||||
node.restart_clickhouse()
|
||||
assert (
|
||||
tables_list
|
||||
== node.query(
|
||||
f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' and name ILIKE '{root_namespace}%' ORDER BY name"
|
||||
).strip()
|
||||
)
|
||||
|
||||
expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace_2, "tableC")
|
||||
assert expected == node.query(
|
||||
f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace_2}.tableC`"
|
||||
)
|
||||
|
||||
|
||||
def test_many_namespaces(started_cluster):
|
||||
node = started_cluster.instances["node1"]
|
||||
root_namespace_1 = f"A_{uuid.uuid4()}"
|
||||
root_namespace_2 = f"B_{uuid.uuid4()}"
|
||||
namespaces = [
|
||||
f"{root_namespace_1}",
|
||||
f"{root_namespace_1}.B.C",
|
||||
f"{root_namespace_1}.B.C.D",
|
||||
f"{root_namespace_1}.B.C.D.E",
|
||||
f"{root_namespace_2}",
|
||||
f"{root_namespace_2}.C",
|
||||
f"{root_namespace_2}.CC",
|
||||
]
|
||||
tables = ["A", "B", "C"]
|
||||
catalog = load_catalog_impl(started_cluster)
|
||||
|
||||
for namespace in namespaces:
|
||||
catalog.create_namespace(namespace)
|
||||
for table in tables:
|
||||
create_table(catalog, namespace, table)
|
||||
|
||||
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
|
||||
|
||||
for namespace in namespaces:
|
||||
for table in tables:
|
||||
table_name = f"{namespace}.{table}"
|
||||
assert int(
|
||||
node.query(
|
||||
f"SELECT count() FROM system.tables WHERE database = '{CATALOG_NAME}' and name = '{table_name}'"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test_select(started_cluster):
|
||||
node = started_cluster.instances["node1"]
|
||||
|
||||
test_ref = f"test_list_tables_{uuid.uuid4()}"
|
||||
table_name = f"{test_ref}_table"
|
||||
root_namespace = f"{test_ref}_namespace"
|
||||
|
||||
namespace = f"{root_namespace}.A.B.C"
|
||||
namespaces_to_create = [
|
||||
root_namespace,
|
||||
f"{root_namespace}.A",
|
||||
f"{root_namespace}.A.B",
|
||||
f"{root_namespace}.A.B.C",
|
||||
]
|
||||
|
||||
catalog = load_catalog_impl(started_cluster)
|
||||
|
||||
for namespace in namespaces_to_create:
|
||||
catalog.create_namespace(namespace)
|
||||
assert len(catalog.list_tables(namespace)) == 0
|
||||
|
||||
table = create_table(catalog, namespace, table_name)
|
||||
|
||||
num_rows = 10
|
||||
data = [generate_record() for _ in range(num_rows)]
|
||||
df = pa.Table.from_pylist(data)
|
||||
table.append(df)
|
||||
|
||||
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
|
||||
|
||||
expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace, table_name)
|
||||
assert expected == node.query(
|
||||
f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace}.{table_name}`"
|
||||
)
|
||||
|
||||
assert num_rows == int(
|
||||
node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace}.{table_name}`")
|
||||
)
|
@ -378,7 +378,7 @@ def test_reload_via_client(cluster, zk):
|
||||
configure_from_zk(zk)
|
||||
break
|
||||
except QueryRuntimeException:
|
||||
logging.exception("The new socket is not binded yet")
|
||||
logging.exception("The new socket is not bound yet")
|
||||
time.sleep(0.1)
|
||||
|
||||
if exception:
|
||||
|
@ -0,0 +1,20 @@
|
||||
<clickhouse>
|
||||
<remote_servers>
|
||||
<cluster_simple>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>node3</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</cluster_simple>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
@ -0,0 +1,6 @@
|
||||
<clickhouse>
|
||||
<query_log>
|
||||
<database>system</database>
|
||||
<table>query_log</table>
|
||||
</query_log>
|
||||
</clickhouse>
|
@ -73,14 +73,38 @@ def started_cluster():
|
||||
cluster.add_instance(
|
||||
"node1",
|
||||
main_configs=[
|
||||
"configs/config.d/query_log.xml",
|
||||
"configs/config.d/cluster.xml",
|
||||
"configs/config.d/named_collections.xml",
|
||||
"configs/config.d/filesystem_caches.xml",
|
||||
],
|
||||
user_configs=["configs/users.d/users.xml"],
|
||||
with_minio=True,
|
||||
with_azurite=True,
|
||||
stay_alive=True,
|
||||
with_hdfs=with_hdfs,
|
||||
stay_alive=True,
|
||||
)
|
||||
cluster.add_instance(
|
||||
"node2",
|
||||
main_configs=[
|
||||
"configs/config.d/query_log.xml",
|
||||
"configs/config.d/cluster.xml",
|
||||
"configs/config.d/named_collections.xml",
|
||||
"configs/config.d/filesystem_caches.xml",
|
||||
],
|
||||
user_configs=["configs/users.d/users.xml"],
|
||||
stay_alive=True,
|
||||
)
|
||||
cluster.add_instance(
|
||||
"node3",
|
||||
main_configs=[
|
||||
"configs/config.d/query_log.xml",
|
||||
"configs/config.d/cluster.xml",
|
||||
"configs/config.d/named_collections.xml",
|
||||
"configs/config.d/filesystem_caches.xml",
|
||||
],
|
||||
user_configs=["configs/users.d/users.xml"],
|
||||
stay_alive=True,
|
||||
)
|
||||
|
||||
logging.info("Starting cluster...")
|
||||
@ -182,6 +206,7 @@ def get_creation_expression(
|
||||
cluster,
|
||||
format="Parquet",
|
||||
table_function=False,
|
||||
run_on_cluster=False,
|
||||
**kwargs,
|
||||
):
|
||||
if storage_type == "s3":
|
||||
@ -189,7 +214,11 @@ def get_creation_expression(
|
||||
bucket = kwargs["bucket"]
|
||||
else:
|
||||
bucket = cluster.minio_bucket
|
||||
print(bucket)
|
||||
|
||||
if run_on_cluster:
|
||||
assert table_function
|
||||
return f"icebergS3Cluster('cluster_simple', s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
|
||||
else:
|
||||
if table_function:
|
||||
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
|
||||
else:
|
||||
@ -197,7 +226,14 @@ def get_creation_expression(
|
||||
DROP TABLE IF EXISTS {table_name};
|
||||
CREATE TABLE {table_name}
|
||||
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
|
||||
|
||||
elif storage_type == "azure":
|
||||
if run_on_cluster:
|
||||
assert table_function
|
||||
return f"""
|
||||
icebergAzureCluster('cluster_simple', azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
|
||||
"""
|
||||
else:
|
||||
if table_function:
|
||||
return f"""
|
||||
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
|
||||
@ -207,7 +243,14 @@ def get_creation_expression(
|
||||
DROP TABLE IF EXISTS {table_name};
|
||||
CREATE TABLE {table_name}
|
||||
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
|
||||
|
||||
elif storage_type == "hdfs":
|
||||
if run_on_cluster:
|
||||
assert table_function
|
||||
return f"""
|
||||
icebergHDFSCluster('cluster_simple', hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
|
||||
"""
|
||||
else:
|
||||
if table_function:
|
||||
return f"""
|
||||
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
|
||||
@ -217,7 +260,10 @@ def get_creation_expression(
|
||||
DROP TABLE IF EXISTS {table_name};
|
||||
CREATE TABLE {table_name}
|
||||
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
|
||||
|
||||
elif storage_type == "local":
|
||||
assert not run_on_cluster
|
||||
|
||||
if table_function:
|
||||
return f"""
|
||||
icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})
|
||||
@ -227,6 +273,7 @@ def get_creation_expression(
|
||||
DROP TABLE IF EXISTS {table_name};
|
||||
CREATE TABLE {table_name}
|
||||
ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});"""
|
||||
|
||||
else:
|
||||
raise Exception(f"Unknown iceberg storage type: {storage_type}")
|
||||
|
||||
@ -492,6 +539,108 @@ def test_types(started_cluster, format_version, storage_type):
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("format_version", ["1", "2"])
|
||||
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"])
|
||||
def test_cluster_table_function(started_cluster, format_version, storage_type):
|
||||
if is_arm() and storage_type == "hdfs":
|
||||
pytest.skip("Disabled test IcebergHDFS for aarch64")
|
||||
|
||||
instance = started_cluster.instances["node1"]
|
||||
spark = started_cluster.spark_session
|
||||
|
||||
TABLE_NAME = (
|
||||
"test_iceberg_cluster_"
|
||||
+ format_version
|
||||
+ "_"
|
||||
+ storage_type
|
||||
+ "_"
|
||||
+ get_uuid_str()
|
||||
)
|
||||
|
||||
def add_df(mode):
|
||||
write_iceberg_from_df(
|
||||
spark,
|
||||
generate_data(spark, 0, 100),
|
||||
TABLE_NAME,
|
||||
mode=mode,
|
||||
format_version=format_version,
|
||||
)
|
||||
|
||||
files = default_upload_directory(
|
||||
started_cluster,
|
||||
storage_type,
|
||||
f"/iceberg_data/default/{TABLE_NAME}/",
|
||||
f"/iceberg_data/default/{TABLE_NAME}/",
|
||||
)
|
||||
|
||||
logging.info(f"Adding another dataframe. result files: {files}")
|
||||
|
||||
return files
|
||||
|
||||
files = add_df(mode="overwrite")
|
||||
for i in range(1, len(started_cluster.instances)):
|
||||
files = add_df(mode="append")
|
||||
|
||||
logging.info(f"Setup complete. files: {files}")
|
||||
assert len(files) == 5 + 4 * (len(started_cluster.instances) - 1)
|
||||
|
||||
clusters = instance.query(f"SELECT * FROM system.clusters")
|
||||
logging.info(f"Clusters setup: {clusters}")
|
||||
|
||||
# Regular Query only node1
|
||||
table_function_expr = get_creation_expression(
|
||||
storage_type, TABLE_NAME, started_cluster, table_function=True
|
||||
)
|
||||
select_regular = (
|
||||
instance.query(f"SELECT * FROM {table_function_expr}").strip().split()
|
||||
)
|
||||
|
||||
# Cluster Query with node1 as coordinator
|
||||
table_function_expr_cluster = get_creation_expression(
|
||||
storage_type,
|
||||
TABLE_NAME,
|
||||
started_cluster,
|
||||
table_function=True,
|
||||
run_on_cluster=True,
|
||||
)
|
||||
select_cluster = (
|
||||
instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split()
|
||||
)
|
||||
|
||||
# Simple size check
|
||||
assert len(select_regular) == 600
|
||||
assert len(select_cluster) == 600
|
||||
|
||||
# Actual check
|
||||
assert select_cluster == select_regular
|
||||
|
||||
# Check query_log
|
||||
for replica in started_cluster.instances.values():
|
||||
replica.query("SYSTEM FLUSH LOGS")
|
||||
|
||||
for node_name, replica in started_cluster.instances.items():
|
||||
cluster_secondary_queries = (
|
||||
replica.query(
|
||||
f"""
|
||||
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
|
||||
WHERE
|
||||
type = 'QueryStart' AND
|
||||
positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND
|
||||
position(query, '{TABLE_NAME}') != 0 AND
|
||||
position(query, 'system.query_log') = 0 AND
|
||||
NOT is_initial_query
|
||||
"""
|
||||
)
|
||||
.strip()
|
||||
.split("\n")
|
||||
)
|
||||
|
||||
logging.info(
|
||||
f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}"
|
||||
)
|
||||
assert len(cluster_secondary_queries) == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize("format_version", ["1", "2"])
|
||||
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
|
||||
def test_delete_files(started_cluster, format_version, storage_type):
|
||||
|
11
tests/queries/0_stateless/03250_ephemeral_comment.sql
Normal file
11
tests/queries/0_stateless/03250_ephemeral_comment.sql
Normal file
@ -0,0 +1,11 @@
|
||||
drop table if exists test;
|
||||
CREATE TABLE test (
|
||||
`start_s` UInt32 EPHEMERAL COMMENT 'start UNIX time' ,
|
||||
`start_us` UInt16 EPHEMERAL COMMENT 'start microseconds',
|
||||
`finish_s` UInt32 EPHEMERAL COMMENT 'finish UNIX time',
|
||||
`finish_us` UInt16 EPHEMERAL COMMENT 'finish microseconds',
|
||||
`captured` DateTime MATERIALIZED fromUnixTimestamp(start_s),
|
||||
`duration` Decimal32(6) MATERIALIZED finish_s - start_s + (finish_us - start_us)/1000000
|
||||
)
|
||||
ENGINE Null;
|
||||
drop table if exists test;
|
@ -244,7 +244,10 @@ Deduplication
|
||||
DefaultTableEngine
|
||||
DelayedInserts
|
||||
DeliveryTag
|
||||
Deltalake
|
||||
DeltaLake
|
||||
deltalakeCluster
|
||||
deltaLakeCluster
|
||||
Denormalize
|
||||
DestroyAggregatesThreads
|
||||
DestroyAggregatesThreadsActive
|
||||
@ -377,10 +380,15 @@ Homebrew's
|
||||
HorizontalDivide
|
||||
Hostname
|
||||
HouseOps
|
||||
hudi
|
||||
Hudi
|
||||
hudiCluster
|
||||
HudiCluster
|
||||
HyperLogLog
|
||||
Hypot
|
||||
IANA
|
||||
icebergCluster
|
||||
IcebergCluster
|
||||
IDE
|
||||
IDEs
|
||||
IDNA
|
||||
|
Loading…
Reference in New Issue
Block a user