From 54129115924e9fe8d61f48d61737c723c9ebe92f Mon Sep 17 00:00:00 2001 From: avogar Date: Fri, 28 Oct 2022 17:43:07 +0000 Subject: [PATCH 01/24] Add documentation for 'format' table function --- .../sql-reference/table-functions/format.md | 75 +++++++++++++++++++ .../sql-reference/table-functions/format.md | 1 + .../sql-reference/table-functions/format,md | 1 + 3 files changed, 77 insertions(+) create mode 100644 docs/en/sql-reference/table-functions/format.md create mode 120000 docs/ru/sql-reference/table-functions/format.md create mode 120000 docs/zh/sql-reference/table-functions/format,md diff --git a/docs/en/sql-reference/table-functions/format.md b/docs/en/sql-reference/table-functions/format.md new file mode 100644 index 00000000000..bc0ada9795b --- /dev/null +++ b/docs/en/sql-reference/table-functions/format.md @@ -0,0 +1,75 @@ +--- +slug: /en/sql-reference/table-functions/format +sidebar_position: 56 +sidebar_label: format +--- + +# format + +Extracts table structure from data and parse it according to specified input format. + +**Syntax** + +``` sql +format(format_name, data) +``` + +**Parameters** + +- `format_name` — The [format](../../interfaces/formats.md#formats) of the data. +- `data` — String literal containing data in specified format + +**Returned value** + +A table with data parsed from `data` argument according specified format and extracted schema. + +**Examples** + +**Query:** +``` sql +:) select * from format(JSONEachRow, +$$ +{"a": "Hello", "b": 111} +{"a": "World", "b": 123} +{"a": "Hello", "b": 112} +{"a": "World", "b": 124} +$$) +``` + +**Result:** + +```text +┌───b─┬─a─────┐ +│ 111 │ Hello │ +│ 123 │ World │ +│ 112 │ Hello │ +│ 124 │ World │ +└─────┴───────┘ +``` + +**Query:** +```sql + +:) desc format(JSONEachRow, +$$ +{"a": "Hello", "b": 111} +{"a": "World", "b": 123} +{"a": "Hello", "b": 112} +{"a": "World", "b": 124} +$$) +``` + +**Result:** + +```text +┌─name─┬─type──────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐ +│ b │ Nullable(Float64) │ │ │ │ │ │ +│ a │ Nullable(String) │ │ │ │ │ │ +└──────┴───────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘ +``` + +**See Also** + +- [Formats](../../interfaces/formats.md) + +[Original article](https://clickhouse.com/docs/en/sql-reference/table-functions/format) diff --git a/docs/ru/sql-reference/table-functions/format.md b/docs/ru/sql-reference/table-functions/format.md new file mode 120000 index 00000000000..cc5e3a5a142 --- /dev/null +++ b/docs/ru/sql-reference/table-functions/format.md @@ -0,0 +1 @@ +../../../en/sql-reference/table-functions/format.md \ No newline at end of file diff --git a/docs/zh/sql-reference/table-functions/format,md b/docs/zh/sql-reference/table-functions/format,md new file mode 120000 index 00000000000..cc5e3a5a142 --- /dev/null +++ b/docs/zh/sql-reference/table-functions/format,md @@ -0,0 +1 @@ +../../../en/sql-reference/table-functions/format.md \ No newline at end of file From c7031569b66671a1f8eeda5b3a78c0cb7de5d651 Mon Sep 17 00:00:00 2001 From: avogar Date: Fri, 28 Oct 2022 21:58:34 +0000 Subject: [PATCH 02/24] Fix typo --- docs/zh/sql-reference/table-functions/{format,md => format.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename docs/zh/sql-reference/table-functions/{format,md => format.md} (100%) diff --git a/docs/zh/sql-reference/table-functions/format,md b/docs/zh/sql-reference/table-functions/format.md similarity index 100% rename from docs/zh/sql-reference/table-functions/format,md rename to docs/zh/sql-reference/table-functions/format.md From 800e789ac44292cd59823fd0a736fc6110b27d11 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov <440544+davenger@users.noreply.github.com> Date: Sat, 29 Oct 2022 09:26:16 +0200 Subject: [PATCH 03/24] Small fix --- docs/en/sql-reference/table-functions/format.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/sql-reference/table-functions/format.md b/docs/en/sql-reference/table-functions/format.md index bc0ada9795b..da287c77283 100644 --- a/docs/en/sql-reference/table-functions/format.md +++ b/docs/en/sql-reference/table-functions/format.md @@ -6,7 +6,7 @@ sidebar_label: format # format -Extracts table structure from data and parse it according to specified input format. +Extracts table structure from data and parses it according to specified input format. **Syntax** From 95e64310a611e4c01ca6376bc8ab8c9480268a8f Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Tue, 1 Nov 2022 14:06:45 +0100 Subject: [PATCH 04/24] Update format.md --- docs/en/sql-reference/table-functions/format.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/sql-reference/table-functions/format.md b/docs/en/sql-reference/table-functions/format.md index da287c77283..4d1488ea640 100644 --- a/docs/en/sql-reference/table-functions/format.md +++ b/docs/en/sql-reference/table-functions/format.md @@ -17,7 +17,7 @@ format(format_name, data) **Parameters** - `format_name` — The [format](../../interfaces/formats.md#formats) of the data. -- `data` — String literal containing data in specified format +- `data` — String literal or constant expression that returns a string containing data in specified format **Returned value** From 9d9bca3dbba9ce0d413ae9da85808eb31549fa13 Mon Sep 17 00:00:00 2001 From: avogar Date: Tue, 1 Nov 2022 14:08:34 +0000 Subject: [PATCH 05/24] Add embedded documentation --- src/TableFunctions/TableFunctionFormat.cpp | 67 +++++++++++++++++++++- 1 file changed, 65 insertions(+), 2 deletions(-) diff --git a/src/TableFunctions/TableFunctionFormat.cpp b/src/TableFunctions/TableFunctionFormat.cpp index 9f239adb538..15a82044dab 100644 --- a/src/TableFunctions/TableFunctionFormat.cpp +++ b/src/TableFunctions/TableFunctionFormat.cpp @@ -89,9 +89,72 @@ StoragePtr TableFunctionFormat::executeImpl(const ASTPtr & /*ast_function*/, Con return res; } +static const Documentation format_table_function_documentation = +{ + R"( +Extracts table structure from data and parses it according to specified input format. +Syntax: `format(format_name, data)`. +Parameters: + - `format_name` - the format of the data. + - `data ` - String literal or constant expression that returns a string containing data in specified format. +Returned value: A table with data parsed from `data` argument according specified format and extracted schema. +)", + Documentation::Examples + { + { + "First example", + R"( +Query: +``` +:) select * from format(JSONEachRow, +$$ +{"a": "Hello", "b": 111} +{"a": "World", "b": 123} +{"a": "Hello", "b": 112} +{"a": "World", "b": 124} +$$) +``` + +Result: +``` +┌───b─┬─a─────┐ +│ 111 │ Hello │ +│ 123 │ World │ +│ 112 │ Hello │ +│ 124 │ World │ +└─────┴───────┘ +``` +)" + }, + { + "Second example", + R"( +Query: +``` +:) desc format(JSONEachRow, +$$ +{"a": "Hello", "b": 111} +{"a": "World", "b": 123} +{"a": "Hello", "b": 112} +{"a": "World", "b": 124} +$$) +``` + +Result: +``` +┌─name─┬─type──────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐ +│ b │ Nullable(Float64) │ │ │ │ │ │ +│ a │ Nullable(String) │ │ │ │ │ │ +└──────┴───────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘ +``` +)" + }, + }, + Documentation::Categories{"format", "table-functions"} +}; + void registerTableFunctionFormat(TableFunctionFactory & factory) { - factory.registerFunction({}, TableFunctionFactory::CaseInsensitive); + factory.registerFunction(format_table_function_documentation, TableFunctionFactory::CaseInsensitive); } - } From ca708dd4b6d52e0f29f163c017f49b8d4761ce09 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Wed, 2 Nov 2022 13:25:09 +0100 Subject: [PATCH 06/24] Fix test --- .../02414_all_new_table_functions_must_be_documented.reference | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/queries/0_stateless/02414_all_new_table_functions_must_be_documented.reference b/tests/queries/0_stateless/02414_all_new_table_functions_must_be_documented.reference index bb8c8c2228a..2277e19cf25 100644 --- a/tests/queries/0_stateless/02414_all_new_table_functions_must_be_documented.reference +++ b/tests/queries/0_stateless/02414_all_new_table_functions_must_be_documented.reference @@ -3,7 +3,6 @@ clusterAllReplicas dictionary executable file -format generateRandom input jdbc From 831abbaff94e3efadc5d816b999d304cc961a3f7 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Mon, 14 Nov 2022 17:29:30 +0100 Subject: [PATCH 07/24] Fix build --- src/TableFunctions/TableFunctionFormat.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/TableFunctions/TableFunctionFormat.cpp b/src/TableFunctions/TableFunctionFormat.cpp index 15a82044dab..b15b350f00b 100644 --- a/src/TableFunctions/TableFunctionFormat.cpp +++ b/src/TableFunctions/TableFunctionFormat.cpp @@ -155,6 +155,6 @@ Result: void registerTableFunctionFormat(TableFunctionFactory & factory) { - factory.registerFunction(format_table_function_documentation, TableFunctionFactory::CaseInsensitive); + factory.registerFunction({format_table_function_documentation, false}, TableFunctionFactory::CaseInsensitive); } } From 19d39d7881fd2cbb6cafaa44c3f9b1837859108a Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Thu, 3 Nov 2022 19:10:22 +0100 Subject: [PATCH 08/24] Add SensitiveDataMasker to exceptions messages --- src/Common/Exception.cpp | 14 ++++++++--- src/Common/Exception.h | 22 +++++++++++++--- .../00956_sensitive_data_masking.reference | 3 +++ .../00956_sensitive_data_masking.sh | 25 ++++++++++++++++++- 4 files changed, 57 insertions(+), 7 deletions(-) diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp index 399ccecf000..35231354651 100644 --- a/src/Common/Exception.cpp +++ b/src/Common/Exception.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -63,11 +64,18 @@ void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool ErrorCodes::increment(code, remote, msg, trace); } -Exception::Exception(const std::string & msg, int code, bool remote_) - : Poco::Exception(msg, code) +Exception::MessageMasked::MessageMasked(const std::string & msg_) + : msg(msg_) +{ + if (auto * masker = SensitiveDataMasker::getInstance()) + masker->wipeSensitiveData(msg); +} + +Exception::Exception(const MessageMasked & msg_masked, int code, bool remote_) + : Poco::Exception(msg_masked.msg, code) , remote(remote_) { - handle_error_code(msg, code, remote, getStackFramePointers()); + handle_error_code(msg_masked.msg, code, remote, getStackFramePointers()); } Exception::Exception(CreateFromPocoTag, const Poco::Exception & exc) diff --git a/src/Common/Exception.h b/src/Common/Exception.h index 84687581e52..458fc0aa19a 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -27,7 +27,18 @@ public: using FramePointers = std::vector; Exception() = default; - Exception(const std::string & msg, int code, bool remote_ = false); + + // used to remove the sensitive information from exceptions if query_masking_rules is configured + struct MessageMasked { + std::string msg; + MessageMasked(const std::string & msg_); + }; + + Exception(const MessageMasked & msg_masked, int code, bool remote_); + + // delegating constructor to mask sensitive information from the message + Exception(const std::string & msg, int code, bool remote_ = false): Exception(MessageMasked(msg), code, remote_) + {} Exception(int code, const std::string & message) : Exception(message, code) @@ -54,12 +65,17 @@ public: template void addMessage(fmt::format_string format, Args &&... args) { - extendedMessage(fmt::format(format, std::forward(args)...)); + addMessage(fmt::format(format, std::forward(args)...)); } void addMessage(const std::string& message) { - extendedMessage(message); + addMessage(MessageMasked(message)); + } + + void addMessage(const MessageMasked & msg_masked) + { + extendedMessage(msg_masked.msg); } /// Used to distinguish local exceptions from the one that was received from remote node. diff --git a/tests/queries/0_stateless/00956_sensitive_data_masking.reference b/tests/queries/0_stateless/00956_sensitive_data_masking.reference index 86323ec45e8..457ab9118f1 100644 --- a/tests/queries/0_stateless/00956_sensitive_data_masking.reference +++ b/tests/queries/0_stateless/00956_sensitive_data_masking.reference @@ -1,11 +1,14 @@ 1 2 3 +3.1 4 5 5.1 6 7 +7.1 +7.2 8 9 text_log non empty diff --git a/tests/queries/0_stateless/00956_sensitive_data_masking.sh b/tests/queries/0_stateless/00956_sensitive_data_masking.sh index e36031c54be..ccd9bbcf10e 100755 --- a/tests/queries/0_stateless/00956_sensitive_data_masking.sh +++ b/tests/queries/0_stateless/00956_sensitive_data_masking.sh @@ -37,12 +37,20 @@ rm -f "$tmp_file" >/dev/null 2>&1 echo 3 # failure at before query start $CLICKHOUSE_CLIENT \ - --query="SELECT 'find_me_TOPSECRET=TOPSECRET' FROM non_existing_table FORMAT Null" \ + --query="SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" \ --log_queries=1 --ignore-error --multiquery |& grep -v '^(query: ' > "$tmp_file" grep -F 'find_me_[hidden]' "$tmp_file" >/dev/null || echo 'fail 3a' grep -F 'TOPSECRET' "$tmp_file" && echo 'fail 3b' +echo '3.1' +echo "SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @- >"$tmp_file" 2>&1 + +grep -F 'find_me_[hidden]' "$tmp_file" >/dev/null || echo 'fail 3.1a' +grep -F 'TOPSECRET' "$tmp_file" && echo 'fail 3.1b' + +#echo "SELECT 1 FROM system.numbers WHERE credit_card_number='find_me_TOPSECRET=TOPSECRET' FORMAT Null" | curl -sSg http://172.17.0.3:8123/ -d @- + rm -f "$tmp_file" >/dev/null 2>&1 echo 4 # failure at the end of query @@ -100,6 +108,21 @@ $CLICKHOUSE_CLIENT \ --server_logs_file=/dev/null \ --query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and query like '%TOPSECRET%';" +echo '7.1' +# query_log exceptions +$CLICKHOUSE_CLIENT \ + --server_logs_file=/dev/null \ + --query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and exception like '%TOPSECRET%'" + +echo '7.2' + +# not perfect: when run in parallel with other tests that check can give false-negative result +# because other tests can overwrite the last_error_message, where we check the absence of sensitive data. +# But it's still good enough for CI - in case of regressions it will start flapping (normally it shouldn't) +$CLICKHOUSE_CLIENT \ + --server_logs_file=/dev/null \ + --query="select * from system.errors where last_error_message like '%TOPSECRET%';" + rm -f "$tmp_file" >/dev/null 2>&1 echo 8 From 8369998286c2c7061274dcd7b0ce6cb3b2d43f9c Mon Sep 17 00:00:00 2001 From: filimonov <1549571+filimonov@users.noreply.github.com> Date: Mon, 14 Nov 2022 22:06:22 +0100 Subject: [PATCH 09/24] Style --- src/Common/Exception.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Common/Exception.h b/src/Common/Exception.h index 458fc0aa19a..169479a7918 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -29,7 +29,8 @@ public: Exception() = default; // used to remove the sensitive information from exceptions if query_masking_rules is configured - struct MessageMasked { + struct MessageMasked + { std::string msg; MessageMasked(const std::string & msg_); }; From ec6698395e127502e3b5c22798f95b1b12fd852d Mon Sep 17 00:00:00 2001 From: xiedeyantu Date: Tue, 15 Nov 2022 13:25:15 +0800 Subject: [PATCH 10/24] fix skip_unavailable_shards does not work using hdfsCluster table function --- src/Storages/HDFS/StorageHDFSCluster.cpp | 38 ++++++++----------- .../test_storage_hdfs/configs/cluster.xml | 18 +++++++++ tests/integration/test_storage_hdfs/test.py | 33 +++++++++++++++- 3 files changed, 65 insertions(+), 24 deletions(-) create mode 100644 tests/integration/test_storage_hdfs/configs/cluster.xml diff --git a/src/Storages/HDFS/StorageHDFSCluster.cpp b/src/Storages/HDFS/StorageHDFSCluster.cpp index 5f9d5ea3d6d..cdea8749fac 100644 --- a/src/Storages/HDFS/StorageHDFSCluster.cpp +++ b/src/Storages/HDFS/StorageHDFSCluster.cpp @@ -99,32 +99,24 @@ Pipe StorageHDFSCluster::read( addColumnsStructureToQueryWithClusterEngine( query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 3, getName()); - for (const auto & replicas : cluster->getShardsAddresses()) + const auto & current_settings = context->getSettingsRef(); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); + for (const auto & shard_info : cluster->getShardsInfo()) { - /// There will be only one replica, because we consider each replica as a shard - for (const auto & node : replicas) + auto try_results = shard_info.pool->getMany(timeouts, ¤t_settings, PoolMode::GET_MANY); + for (auto & try_result : try_results) { - auto connection = std::make_shared( - node.host_name, node.port, context->getGlobalContext()->getCurrentDatabase(), - node.user, node.password, node.quota_key, node.cluster, node.cluster_secret, - "HDFSClusterInititiator", - node.compression, - node.secure - ); - - - /// For unknown reason global context is passed to IStorage::read() method - /// So, task_identifier is passed as constructor argument. It is more obvious. auto remote_query_executor = std::make_shared( - connection, - queryToString(query_to_send), - header, - context, - /*throttler=*/nullptr, - scalars, - Tables(), - processed_stage, - RemoteQueryExecutor::Extension{.task_iterator = callback}); + shard_info.pool, + std::vector{try_result}, + queryToString(query_to_send), + header, + context, + /*throttler=*/nullptr, + scalars, + Tables(), + processed_stage, + RemoteQueryExecutor::Extension{.task_iterator = callback}); pipes.emplace_back(std::make_shared(remote_query_executor, add_agg_info, false)); } diff --git a/tests/integration/test_storage_hdfs/configs/cluster.xml b/tests/integration/test_storage_hdfs/configs/cluster.xml new file mode 100644 index 00000000000..3d72462332e --- /dev/null +++ b/tests/integration/test_storage_hdfs/configs/cluster.xml @@ -0,0 +1,18 @@ + + + + + + node1 + 9000 + + + + + node1 + 19000 + + + + + \ No newline at end of file diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index 34243e4b58d..defc0d1fa1d 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -9,7 +9,7 @@ from pyhdfs import HdfsClient cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( "node1", - main_configs=["configs/macro.xml", "configs/schema_cache.xml"], + main_configs=["configs/macro.xml", "configs/schema_cache.xml", "configs/cluster.xml"], with_hdfs=True, ) @@ -783,6 +783,37 @@ def test_schema_inference_cache(started_cluster): check_cache_misses(node1, files, 4) +def test_test_hdfsCluster_skip_unavailable_shards(started_cluster): + node = started_cluster.instances["node1"] + result = node.query( + """ + SELECT count(*) FROM hdfsCluster( + 'cluster_non_existent_port', + 'hdfs://hdfs1:9000/test_hdfsCluster/file*', + 'TSV', + 'id UInt32') + SETTINGS skip_unavailable_shards = 1 + """ + ) + + assert result == "3\n" + + +def test_test_hdfsCluster_unskip_unavailable_shards(started_cluster): + node = started_cluster.instances["node1"] + error = node.query_and_get_error( + """ + SELECT count(*) FROM hdfsCluster( + 'cluster_non_existent_port', + 'hdfs://hdfs1:9000/test_hdfsCluster/file*', + 'TSV', + 'id UInt32') + """ + ) + + assert "NETWORK_ERROR" in error + + if __name__ == "__main__": cluster.start() input("Cluster created, press any key to destroy...") From 09c749fbd0842e4dd54500cdaaf9c3f3e9d6d469 Mon Sep 17 00:00:00 2001 From: xiedeyantu Date: Tue, 15 Nov 2022 13:30:29 +0800 Subject: [PATCH 11/24] update --- tests/integration/test_storage_hdfs/configs/cluster.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_storage_hdfs/configs/cluster.xml b/tests/integration/test_storage_hdfs/configs/cluster.xml index 3d72462332e..9efe0ebf273 100644 --- a/tests/integration/test_storage_hdfs/configs/cluster.xml +++ b/tests/integration/test_storage_hdfs/configs/cluster.xml @@ -15,4 +15,4 @@ - \ No newline at end of file + From 3d218795beddd01d9f57561c23b46a18ad6c2f14 Mon Sep 17 00:00:00 2001 From: xiedeyantu Date: Wed, 16 Nov 2022 09:18:45 +0800 Subject: [PATCH 12/24] fix --- tests/integration/test_storage_hdfs/test.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index defc0d1fa1d..a0a2d3f8426 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -9,7 +9,10 @@ from pyhdfs import HdfsClient cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( "node1", - main_configs=["configs/macro.xml", "configs/schema_cache.xml", "configs/cluster.xml"], + main_configs=[ + "configs/macro.xml", + "configs/schema_cache.xml", + "configs/cluster.xml"], with_hdfs=True, ) From 8144516e53cc2c12b71f02eacaa9fcd746811c66 Mon Sep 17 00:00:00 2001 From: xiedeyantu Date: Wed, 16 Nov 2022 09:33:46 +0800 Subject: [PATCH 13/24] fix --- tests/integration/test_storage_hdfs/test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index a0a2d3f8426..745756476da 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -12,7 +12,8 @@ node1 = cluster.add_instance( main_configs=[ "configs/macro.xml", "configs/schema_cache.xml", - "configs/cluster.xml"], + "configs/cluster.xml" + ], with_hdfs=True, ) From c23cd091a36bda02522ad2f81ff09b624678aed0 Mon Sep 17 00:00:00 2001 From: xiedeyantu Date: Wed, 16 Nov 2022 09:59:44 +0800 Subject: [PATCH 14/24] fix --- tests/integration/test_storage_hdfs/test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index 745756476da..4968deb7810 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -10,9 +10,9 @@ cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( "node1", main_configs=[ - "configs/macro.xml", - "configs/schema_cache.xml", - "configs/cluster.xml" + "configs/macro.xml", + "configs/schema_cache.xml", + "configs/cluster.xml", ], with_hdfs=True, ) From 7dc941cacd13948180dc448f7e3440199a77a93a Mon Sep 17 00:00:00 2001 From: xiedeyantu Date: Wed, 16 Nov 2022 19:47:57 +0800 Subject: [PATCH 15/24] fix --- tests/integration/test_storage_hdfs/test.py | 31 ++++++++------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index 4968deb7810..4b174a42943 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -787,32 +787,23 @@ def test_schema_inference_cache(started_cluster): check_cache_misses(node1, files, 4) -def test_test_hdfsCluster_skip_unavailable_shards(started_cluster): +def test_hdfsCluster_skip_unavailable_shards(started_cluster): node = started_cluster.instances["node1"] - result = node.query( - """ - SELECT count(*) FROM hdfsCluster( - 'cluster_non_existent_port', - 'hdfs://hdfs1:9000/test_hdfsCluster/file*', - 'TSV', - 'id UInt32') - SETTINGS skip_unavailable_shards = 1 - """ + data = "1\tSerialize\t555.222\n2\tData\t777.333\n" + hdfs_api.write_data("/simple_table_function", data) + + assert ( + node1.query( + "select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64') settings skip_unavailable_shards = 1" + ) + == data ) - assert result == "3\n" - -def test_test_hdfsCluster_unskip_unavailable_shards(started_cluster): +def test_hdfsCluster_unskip_unavailable_shards(started_cluster): node = started_cluster.instances["node1"] error = node.query_and_get_error( - """ - SELECT count(*) FROM hdfsCluster( - 'cluster_non_existent_port', - 'hdfs://hdfs1:9000/test_hdfsCluster/file*', - 'TSV', - 'id UInt32') - """ + "select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')" ) assert "NETWORK_ERROR" in error From fc1d06f096db5f6374bed2a728ef99fd5e4d5d2f Mon Sep 17 00:00:00 2001 From: xiedeyantu Date: Wed, 16 Nov 2022 23:43:23 +0800 Subject: [PATCH 16/24] fix --- tests/integration/test_storage_hdfs/test.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index 4b174a42943..319f8b1f86c 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -788,22 +788,26 @@ def test_schema_inference_cache(started_cluster): def test_hdfsCluster_skip_unavailable_shards(started_cluster): + hdfs_api = started_cluster.hdfs_api node = started_cluster.instances["node1"] data = "1\tSerialize\t555.222\n2\tData\t777.333\n" - hdfs_api.write_data("/simple_table_function", data) + hdfs_api.write_data("/skip_unavailable_shards", data) assert ( node1.query( - "select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64') settings skip_unavailable_shards = 1" + "select * from hdfs('hdfs://hdfs1:9000/skip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64') settings skip_unavailable_shards = 1" ) == data ) def test_hdfsCluster_unskip_unavailable_shards(started_cluster): + hdfs_api = started_cluster.hdfs_api node = started_cluster.instances["node1"] + data = "1\tSerialize\t555.222\n2\tData\t777.333\n" + hdfs_api.write_data("/unskip_unavailable_shards", data) error = node.query_and_get_error( - "select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')" + "select * from hdfs('hdfs://hdfs1:9000/unskip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64')" ) assert "NETWORK_ERROR" in error From cb8333758661fba19d2d67c2c0d8ee1290b4dd72 Mon Sep 17 00:00:00 2001 From: chen Date: Thu, 17 Nov 2022 06:41:43 +0800 Subject: [PATCH 17/24] Update test.py --- tests/integration/test_storage_hdfs/test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index 319f8b1f86c..d4752d6cf2e 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -795,7 +795,7 @@ def test_hdfsCluster_skip_unavailable_shards(started_cluster): assert ( node1.query( - "select * from hdfs('hdfs://hdfs1:9000/skip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64') settings skip_unavailable_shards = 1" + "select * from hdfsCluster('cluster_non_existent_port', 'hdfs://hdfs1:9000/skip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64') settings skip_unavailable_shards = 1" ) == data ) @@ -807,7 +807,7 @@ def test_hdfsCluster_unskip_unavailable_shards(started_cluster): data = "1\tSerialize\t555.222\n2\tData\t777.333\n" hdfs_api.write_data("/unskip_unavailable_shards", data) error = node.query_and_get_error( - "select * from hdfs('hdfs://hdfs1:9000/unskip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64')" + "select * from hdfsCluster('cluster_non_existent_port', 'hdfs://hdfs1:9000/unskip_unavailable_shards', 'TSV', 'id UInt64, text String, number Float64')" ) assert "NETWORK_ERROR" in error From d5848d53cc06419539b7e1d622ed7287268d4013 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Thu, 17 Nov 2022 11:46:17 +0000 Subject: [PATCH 18/24] Fix DESCRIBE for deltaLake and hudi table functions --- src/Storages/StorageDelta.cpp | 84 +++++-- src/Storages/StorageDelta.h | 18 +- src/Storages/StorageHudi.cpp | 247 +++++++++++-------- src/Storages/StorageHudi.h | 13 +- src/TableFunctions/TableFunctionDelta.cpp | 2 +- src/TableFunctions/TableFunctionHudi.cpp | 2 +- tests/integration/test_storage_delta/test.py | 23 +- tests/integration/test_storage_hudi/test.py | 29 ++- 8 files changed, 265 insertions(+), 153 deletions(-) diff --git a/src/Storages/StorageDelta.cpp b/src/Storages/StorageDelta.cpp index e8287a2fd61..739c001fb81 100644 --- a/src/Storages/StorageDelta.cpp +++ b/src/Storages/StorageDelta.cpp @@ -47,7 +47,7 @@ void DeltaLakeMetadata::remove(const String & filename, uint64_t /*timestamp */) throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid table metadata, tried to remove {} before adding it", filename); } -std::vector DeltaLakeMetadata::ListCurrentFiles() && +std::vector DeltaLakeMetadata::listCurrentFiles() && { std::vector keys; keys.reserve(file_update_time.size()); @@ -61,10 +61,10 @@ std::vector DeltaLakeMetadata::ListCurrentFiles() && JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context) : base_configuration(configuration_), table_path(table_path_) { - Init(context); + init(context); } -void JsonMetadataGetter::Init(ContextPtr context) +void JsonMetadataGetter::init(ContextPtr context) { auto keys = getJsonLogFiles(); @@ -178,6 +178,52 @@ void JsonMetadataGetter::handleJSON(const JSON & json) } } +namespace +{ + +StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration) +{ + return {configuration.url, configuration.auth_settings, configuration.rw_settings, configuration.headers}; +} + +// DeltaLake stores data in parts in different files +// keys is vector of parts with latest version +// generateQueryFromKeys constructs query from parts filenames for +// underlying StorageS3 engine +String generateQueryFromKeys(const std::vector & keys) +{ + std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ",")); + return new_query; +} + + +StorageS3Configuration getAdjustedS3Configuration( + const ContextPtr & context, + StorageS3::S3Configuration & base_configuration, + const StorageS3Configuration & configuration, + const std::string & table_path, + Poco::Logger * log) +{ + JsonMetadataGetter getter{base_configuration, table_path, context}; + + auto keys = getter.getFiles(); + auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys); + + LOG_DEBUG(log, "New uri: {}", new_uri); + LOG_DEBUG(log, "Table path: {}", table_path); + + // set new url in configuration + StorageS3Configuration new_configuration; + new_configuration.url = new_uri; + new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id; + new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key; + new_configuration.format = configuration.format; + + return new_configuration; +} + +} + StorageDelta::StorageDelta( const StorageS3Configuration & configuration_, const StorageID & table_id_, @@ -187,28 +233,14 @@ StorageDelta::StorageDelta( ContextPtr context_, std::optional format_settings_) : IStorage(table_id_) - , base_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers} + , base_configuration{getBaseConfiguration(configuration_)} , log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")")) , table_path(base_configuration.uri.key) { StorageInMemoryMetadata storage_metadata; StorageS3::updateS3Configuration(context_, base_configuration); - JsonMetadataGetter getter{base_configuration, table_path, context_}; - - auto keys = getter.getFiles(); - auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys)); - - LOG_DEBUG(log, "New uri: {}", new_uri); - LOG_DEBUG(log, "Table path: {}", table_path); - - // set new url in configuration - StorageS3Configuration new_configuration; - new_configuration.url = new_uri; - new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id; - new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key; - new_configuration.format = configuration_.format; - + auto new_configuration = getAdjustedS3Configuration(context_, base_configuration, configuration_, table_path, log); if (columns_.empty()) { @@ -250,13 +282,15 @@ Pipe StorageDelta::read( return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); } -String StorageDelta::generateQueryFromKeys(std::vector && keys) +ColumnsDescription StorageDelta::getTableStructureFromData( + const StorageS3Configuration & configuration, const std::optional & format_settings, ContextPtr ctx) { - // DeltaLake store data parts in different files - // keys are filenames of parts - // for StorageS3 to read all parts we need format {key1,key2,key3,...keyn} - std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ",")); - return new_query; + auto base_configuration = getBaseConfiguration(configuration); + StorageS3::updateS3Configuration(ctx, base_configuration); + auto new_configuration = getAdjustedS3Configuration( + ctx, base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake")); + return StorageS3::getTableStructureFromData( + new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr); } void registerStorageDelta(StorageFactory & factory) diff --git a/src/Storages/StorageDelta.h b/src/Storages/StorageDelta.h index e3bb4c0b416..22c95d87a0f 100644 --- a/src/Storages/StorageDelta.h +++ b/src/Storages/StorageDelta.h @@ -32,7 +32,7 @@ public: void setLastModifiedTime(const String & filename, uint64_t timestamp); void remove(const String & filename, uint64_t timestamp); - std::vector ListCurrentFiles() &&; + std::vector listCurrentFiles() &&; private: std::unordered_map file_update_time; @@ -44,10 +44,10 @@ class JsonMetadataGetter public: JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context); - std::vector getFiles() { return std::move(metadata).ListCurrentFiles(); } + std::vector getFiles() { return std::move(metadata).listCurrentFiles(); } private: - void Init(ContextPtr context); + void init(ContextPtr context); std::vector getJsonLogFiles(); @@ -87,14 +87,12 @@ public: size_t max_block_size, size_t num_streams) override; + static ColumnsDescription getTableStructureFromData( + const StorageS3Configuration & configuration, + const std::optional & format_settings, + ContextPtr ctx); private: - void Init(); - - // DeltaLake stores data in parts in different files - // keys is vector of parts with latest version - // generateQueryFromKeys constructs query from parts filenames for - // underlying StorageS3 engine - static String generateQueryFromKeys(std::vector && keys); + void init(); StorageS3::S3Configuration base_configuration; std::shared_ptr s3engine; diff --git a/src/Storages/StorageHudi.cpp b/src/Storages/StorageHudi.cpp index 121856c4a57..be09f608c3b 100644 --- a/src/Storages/StorageHudi.cpp +++ b/src/Storages/StorageHudi.cpp @@ -28,115 +28,20 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -StorageHudi::StorageHudi( - const StorageS3Configuration & configuration_, - const StorageID & table_id_, - ColumnsDescription columns_, - const ConstraintsDescription & constraints_, - const String & comment, - ContextPtr context_, - std::optional format_settings_) - : IStorage(table_id_) - , base_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers} - , log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")")) - , table_path(base_configuration.uri.key) +namespace { - StorageInMemoryMetadata storage_metadata; - StorageS3::updateS3Configuration(context_, base_configuration); - auto keys = getKeysFromS3(); - auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration_.format); - - LOG_DEBUG(log, "New uri: {}", new_uri); - LOG_DEBUG(log, "Table path: {}", table_path); - - StorageS3Configuration new_configuration; - new_configuration.url = new_uri; - new_configuration.auth_settings.access_key_id = configuration_.auth_settings.access_key_id; - new_configuration.auth_settings.secret_access_key = configuration_.auth_settings.secret_access_key; - new_configuration.format = configuration_.format; - - if (columns_.empty()) - { - columns_ = StorageS3::getTableStructureFromData( - new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr); - storage_metadata.setColumns(columns_); - } - else - storage_metadata.setColumns(columns_); - - storage_metadata.setConstraints(constraints_); - storage_metadata.setComment(comment); - setInMemoryMetadata(storage_metadata); - - s3engine = std::make_shared( - new_configuration, - table_id_, - columns_, - constraints_, - comment, - context_, - format_settings_, - /* distributed_processing_ */ false, - nullptr); +StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration) +{ + return {configuration.url, configuration.auth_settings, configuration.rw_settings, configuration.headers}; } -Pipe StorageHudi::read( - const Names & column_names, - const StorageSnapshotPtr & storage_snapshot, - SelectQueryInfo & query_info, - ContextPtr context, - QueryProcessingStage::Enum processed_stage, - size_t max_block_size, - size_t num_streams) -{ - StorageS3::updateS3Configuration(context, base_configuration); - return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); -} - -std::vector StorageHudi::getKeysFromS3() -{ - std::vector keys; - - const auto & client = base_configuration.client; - - Aws::S3::Model::ListObjectsV2Request request; - Aws::S3::Model::ListObjectsV2Outcome outcome; - - bool is_finished{false}; - const auto bucket{base_configuration.uri.bucket}; - - request.SetBucket(bucket); - request.SetPrefix(table_path); - - while (!is_finished) - { - outcome = client->ListObjectsV2(request); - if (!outcome.IsSuccess()) - throw Exception( - ErrorCodes::S3_ERROR, - "Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}", - quoteString(bucket), - quoteString(table_path), - backQuote(outcome.GetError().GetExceptionName()), - quoteString(outcome.GetError().GetMessage())); - - const auto & result_batch = outcome.GetResult().GetContents(); - for (const auto & obj : result_batch) - { - const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix. - keys.push_back(filename); - LOG_DEBUG(log, "Found file: {}", filename); - } - - request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); - is_finished = !outcome.GetResult().GetIsTruncated(); - } - - return keys; -} - -String StorageHudi::generateQueryFromKeys(const std::vector & keys, const String & format) +/// Apache Hudi store parts of data in different files. +/// Every part file has timestamp in it. +/// Every partition(directory) in Apache Hudi has different versions of part. +/// To find needed parts we need to find out latest part file for every partition. +/// Part format is usually parquet, but can differ. +String generateQueryFromKeys(const std::vector & keys, const String & format) { /// For each partition path take only latest file. struct FileInfo @@ -187,6 +92,138 @@ String StorageHudi::generateQueryFromKeys(const std::vector & keys, return "{" + list_of_keys + "}"; } +std::vector getKeysFromS3(const StorageS3::S3Configuration & base_configuration, const std::string & table_path, Poco::Logger * log) +{ + std::vector keys; + + const auto & client = base_configuration.client; + + Aws::S3::Model::ListObjectsV2Request request; + Aws::S3::Model::ListObjectsV2Outcome outcome; + + bool is_finished{false}; + const auto bucket{base_configuration.uri.bucket}; + + request.SetBucket(bucket); + request.SetPrefix(table_path); + + while (!is_finished) + { + outcome = client->ListObjectsV2(request); + if (!outcome.IsSuccess()) + throw Exception( + ErrorCodes::S3_ERROR, + "Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}", + quoteString(bucket), + quoteString(table_path), + backQuote(outcome.GetError().GetExceptionName()), + quoteString(outcome.GetError().GetMessage())); + + const auto & result_batch = outcome.GetResult().GetContents(); + for (const auto & obj : result_batch) + { + const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix. + keys.push_back(filename); + LOG_DEBUG(log, "Found file: {}", filename); + } + + request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); + is_finished = !outcome.GetResult().GetIsTruncated(); + } + + return keys; +} + + +StorageS3Configuration getAdjustedS3Configuration( + StorageS3::S3Configuration & base_configuration, + const StorageS3Configuration & configuration, + const std::string & table_path, + Poco::Logger * log) +{ + auto keys = getKeysFromS3(base_configuration, table_path, log); + auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration.format); + + LOG_DEBUG(log, "New uri: {}", new_uri); + LOG_DEBUG(log, "Table path: {}", table_path); + + StorageS3Configuration new_configuration; + new_configuration.url = new_uri; + new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id; + new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key; + new_configuration.format = configuration.format; + + return new_configuration; +} + +} + +StorageHudi::StorageHudi( + const StorageS3Configuration & configuration_, + const StorageID & table_id_, + ColumnsDescription columns_, + const ConstraintsDescription & constraints_, + const String & comment, + ContextPtr context_, + std::optional format_settings_) + : IStorage(table_id_) + , base_configuration{getBaseConfiguration(configuration_)} + , log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")")) + , table_path(base_configuration.uri.key) +{ + StorageInMemoryMetadata storage_metadata; + StorageS3::updateS3Configuration(context_, base_configuration); + + auto new_configuration = getAdjustedS3Configuration(base_configuration, configuration_, table_path, log); + + if (columns_.empty()) + { + columns_ = StorageS3::getTableStructureFromData( + new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr); + storage_metadata.setColumns(columns_); + } + else + storage_metadata.setColumns(columns_); + + storage_metadata.setConstraints(constraints_); + storage_metadata.setComment(comment); + setInMemoryMetadata(storage_metadata); + + s3engine = std::make_shared( + new_configuration, + table_id_, + columns_, + constraints_, + comment, + context_, + format_settings_, + /* distributed_processing_ */ false, + nullptr); +} + +Pipe StorageHudi::read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) +{ + StorageS3::updateS3Configuration(context, base_configuration); + return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); +} + +ColumnsDescription StorageHudi::getTableStructureFromData( + const StorageS3Configuration & configuration, const std::optional & format_settings, ContextPtr ctx) +{ + auto base_configuration = getBaseConfiguration(configuration); + StorageS3::updateS3Configuration(ctx, base_configuration); + auto new_configuration = getAdjustedS3Configuration( + base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake")); + return StorageS3::getTableStructureFromData( + new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr); +} void registerStorageHudi(StorageFactory & factory) { diff --git a/src/Storages/StorageHudi.h b/src/Storages/StorageHudi.h index bebda4cd4f6..00b8c01a46d 100644 --- a/src/Storages/StorageHudi.h +++ b/src/Storages/StorageHudi.h @@ -48,16 +48,11 @@ public: size_t max_block_size, size_t num_streams) override; + static ColumnsDescription getTableStructureFromData( + const StorageS3Configuration & configuration, + const std::optional & format_settings, + ContextPtr ctx); private: - std::vector getKeysFromS3(); - - /// Apache Hudi store parts of data in different files. - /// Every part file has timestamp in it. - /// Every partition(directory) in Apache Hudi has different versions of part. - /// To find needed parts we need to find out latest part file for every partition. - /// Part format is usually parquet, but can differ. - static String generateQueryFromKeys(const std::vector & keys, const String & format); - StorageS3::S3Configuration base_configuration; std::shared_ptr s3engine; Poco::Logger * log; diff --git a/src/TableFunctions/TableFunctionDelta.cpp b/src/TableFunctions/TableFunctionDelta.cpp index 25ea2aaa77f..df4ea75894d 100644 --- a/src/TableFunctions/TableFunctionDelta.cpp +++ b/src/TableFunctions/TableFunctionDelta.cpp @@ -130,7 +130,7 @@ ColumnsDescription TableFunctionDelta::getActualTableStructure(ContextPtr contex if (configuration.structure == "auto") { context->checkAccess(getSourceAccessType()); - return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context); + return StorageDelta::getTableStructureFromData(configuration, std::nullopt, context); } return parseColumnsListFromString(configuration.structure, context); diff --git a/src/TableFunctions/TableFunctionHudi.cpp b/src/TableFunctions/TableFunctionHudi.cpp index b1db90da550..c24c14b1b85 100644 --- a/src/TableFunctions/TableFunctionHudi.cpp +++ b/src/TableFunctions/TableFunctionHudi.cpp @@ -130,7 +130,7 @@ ColumnsDescription TableFunctionHudi::getActualTableStructure(ContextPtr context if (configuration.structure == "auto") { context->checkAccess(getSourceAccessType()); - return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context); + return StorageHudi::getTableStructureFromData(configuration, std::nullopt, context); } return parseColumnsListFromString(configuration.structure, context); diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index a63244df814..3f9da071281 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -1,7 +1,6 @@ import logging import os import json - import helpers.client import pytest from helpers.cluster import ClickHouseCluster @@ -143,3 +142,25 @@ def test_select_query(started_cluster): ), ).splitlines() assert len(result) > 0 + + +def test_describe_query(started_cluster): + instance = started_cluster.instances["main_server"] + bucket = started_cluster.minio_bucket + result = instance.query( + f"DESCRIBE deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV", + ) + + assert result == TSV( + [ + ["begin_lat", "Nullable(Float64)"], + ["begin_lon", "Nullable(Float64)"], + ["driver", "Nullable(String)"], + ["end_lat", "Nullable(Float64)"], + ["end_lon", "Nullable(Float64)"], + ["fare", "Nullable(Float64)"], + ["rider", "Nullable(String)"], + ["ts", "Nullable(Int64)"], + ["uuid", "Nullable(String)"], + ] + ) diff --git a/tests/integration/test_storage_hudi/test.py b/tests/integration/test_storage_hudi/test.py index dd870aae42e..7748ddf6160 100644 --- a/tests/integration/test_storage_hudi/test.py +++ b/tests/integration/test_storage_hudi/test.py @@ -161,7 +161,7 @@ def test_select_query(started_cluster): result = run_query(instance, distinct_select_query) result_table_function = run_query( instance, - distinct_select_query.format( + distinct_select_table_function_query.format( ip=started_cluster.minio_ip, port=started_cluster.minio_port, bucket=bucket ), ) @@ -173,3 +173,30 @@ def test_select_query(started_cluster): assert TSV(result) == TSV(expected) assert TSV(result_table_function) == TSV(expected) + +def test_describe_query(started_cluster): + instance = started_cluster.instances["main_server"] + bucket = started_cluster.minio_bucket + result = instance.query( + f"DESCRIBE hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV", + ) + + assert result == TSV( + [ + ["_hoodie_commit_time", "Nullable(String)"], + ["_hoodie_commit_seqno", "Nullable(String)"], + ["_hoodie_record_key", "Nullable(String)"], + ["_hoodie_partition_path", "Nullable(String)"], + ["_hoodie_file_name", "Nullable(String)"], + ["begin_lat", "Nullable(Float64)"], + ["begin_lon", "Nullable(Float64)"], + ["driver", "Nullable(String)"], + ["end_lat", "Nullable(Float64)"], + ["end_lon", "Nullable(Float64)"], + ["fare", "Nullable(Float64)"], + ["partitionpath", "Nullable(String)"], + ["rider", "Nullable(String)"], + ["ts", "Nullable(Int64)"], + ["uuid", "Nullable(String)"], + ] + ) From 1ad7362db76cfbe4a0576da7a72dfbb9d6ef75a4 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Thu, 17 Nov 2022 11:54:13 +0000 Subject: [PATCH 19/24] Automatic style fix --- tests/integration/test_storage_hudi/test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_storage_hudi/test.py b/tests/integration/test_storage_hudi/test.py index 7748ddf6160..3328f859406 100644 --- a/tests/integration/test_storage_hudi/test.py +++ b/tests/integration/test_storage_hudi/test.py @@ -174,6 +174,7 @@ def test_select_query(started_cluster): assert TSV(result) == TSV(expected) assert TSV(result_table_function) == TSV(expected) + def test_describe_query(started_cluster): instance = started_cluster.instances["main_server"] bucket = started_cluster.minio_bucket From d2d6e75d97b75ebc05745d4e7e1bb53cb01d639c Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Thu, 17 Nov 2022 12:40:14 +0000 Subject: [PATCH 20/24] Fix: make test_read_only_table more stable + add retries to INSERT queries after keeper node restart --- tests/integration/test_read_only_table/test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_read_only_table/test.py b/tests/integration/test_read_only_table/test.py index 28abbf6601e..b97b8edb85e 100644 --- a/tests/integration/test_read_only_table/test.py +++ b/tests/integration/test_read_only_table/test.py @@ -84,6 +84,6 @@ def test_restart_zookeeper(start_cluster): time.sleep(5) for table_id in range(NUM_TABLES): - node1.query( - f"INSERT INTO test_table_{table_id} VALUES (6), (7), (8), (9), (10);" + node1.query_with_retry( + sql = f"INSERT INTO test_table_{table_id} VALUES (6), (7), (8), (9), (10);", retry_count=10, sleep_time=1 ) From 13e051a5de5ba50d5c3763d5d4e3c1bf312f37d6 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Thu, 17 Nov 2022 13:46:21 +0000 Subject: [PATCH 21/24] Automatic style fix --- tests/integration/test_read_only_table/test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_read_only_table/test.py b/tests/integration/test_read_only_table/test.py index b97b8edb85e..914c6a99508 100644 --- a/tests/integration/test_read_only_table/test.py +++ b/tests/integration/test_read_only_table/test.py @@ -85,5 +85,7 @@ def test_restart_zookeeper(start_cluster): for table_id in range(NUM_TABLES): node1.query_with_retry( - sql = f"INSERT INTO test_table_{table_id} VALUES (6), (7), (8), (9), (10);", retry_count=10, sleep_time=1 + sql=f"INSERT INTO test_table_{table_id} VALUES (6), (7), (8), (9), (10);", + retry_count=10, + sleep_time=1, ) From b5872b8a05599fed281275c91471b4997239c0d0 Mon Sep 17 00:00:00 2001 From: Nikolay Degterinsky Date: Fri, 18 Nov 2022 01:16:28 +0000 Subject: [PATCH 22/24] Fix function parameters parsing --- src/Parsers/ExpressionListParsers.cpp | 2 +- src/Parsers/ParserTablesInSelectQuery.cpp | 2 +- .../queries/0_stateless/02481_fix_parameters_parsing.reference | 0 tests/queries/0_stateless/02481_fix_parameters_parsing.sql | 2 ++ 4 files changed, 4 insertions(+), 2 deletions(-) create mode 100644 tests/queries/0_stateless/02481_fix_parameters_parsing.reference create mode 100644 tests/queries/0_stateless/02481_fix_parameters_parsing.sql diff --git a/src/Parsers/ExpressionListParsers.cpp b/src/Parsers/ExpressionListParsers.cpp index d29aa248ec4..da39afa4bb9 100644 --- a/src/Parsers/ExpressionListParsers.cpp +++ b/src/Parsers/ExpressionListParsers.cpp @@ -920,7 +920,7 @@ public: , ErrorCodes::SYNTAX_ERROR); } - if (allow_function_parameters && ParserToken(TokenType::OpeningRoundBracket).ignore(pos, expected)) + if (allow_function_parameters && !parameters && ParserToken(TokenType::OpeningRoundBracket).ignore(pos, expected)) { parameters = std::make_shared(); std::swap(parameters->children, elements); diff --git a/src/Parsers/ParserTablesInSelectQuery.cpp b/src/Parsers/ParserTablesInSelectQuery.cpp index ef39df8ca52..cff4c959267 100644 --- a/src/Parsers/ParserTablesInSelectQuery.cpp +++ b/src/Parsers/ParserTablesInSelectQuery.cpp @@ -22,7 +22,7 @@ bool ParserTableExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expec auto res = std::make_shared(); if (!ParserWithOptionalAlias(std::make_unique(), true).parse(pos, res->subquery, expected) - && !ParserWithOptionalAlias(std::make_unique(true, true), true).parse(pos, res->table_function, expected) + && !ParserWithOptionalAlias(std::make_unique(false, true), true).parse(pos, res->table_function, expected) && !ParserWithOptionalAlias(std::make_unique(true, true), true) .parse(pos, res->database_and_table_name, expected)) return false; diff --git a/tests/queries/0_stateless/02481_fix_parameters_parsing.reference b/tests/queries/0_stateless/02481_fix_parameters_parsing.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02481_fix_parameters_parsing.sql b/tests/queries/0_stateless/02481_fix_parameters_parsing.sql new file mode 100644 index 00000000000..6164ec77774 --- /dev/null +++ b/tests/queries/0_stateless/02481_fix_parameters_parsing.sql @@ -0,0 +1,2 @@ +SELECT func(1)(2)(3); -- { clientError SYNTAX_ERROR } +SELECT * FROM VALUES(1)(2); -- { clientError SYNTAX_ERROR } From 4af8ef381be5e2ef0dbcb344b229eb623d7ce0fe Mon Sep 17 00:00:00 2001 From: Alexander Gololobov <440544+davenger@users.noreply.github.com> Date: Fri, 18 Nov 2022 17:33:51 +0100 Subject: [PATCH 23/24] Test with default value used in row level policy --- ...t_value_used_in_row_level_filter.reference | 16 ++++++++++++ ...default_value_used_in_row_level_filter.sql | 25 +++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.reference create mode 100644 tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.sql diff --git a/tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.reference b/tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.reference new file mode 100644 index 00000000000..c8e17be819a --- /dev/null +++ b/tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.reference @@ -0,0 +1,16 @@ +-- { echoOn } + +SELECT a, c FROM test_rlp WHERE c%2 == 0 AND b < 5; +0 10 +2 12 +4 14 +DROP POLICY IF EXISTS test_rlp_policy ON test_rlp; +CREATE ROW POLICY test_rlp_policy ON test_rlp FOR SELECT USING c%2 == 0 TO default; +SELECT a, c FROM test_rlp WHERE b < 5 SETTINGS optimize_move_to_prewhere = 0; +0 10 +2 12 +4 14 +SELECT a, c FROM test_rlp PREWHERE b < 5; +0 10 +2 12 +4 14 diff --git a/tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.sql b/tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.sql new file mode 100644 index 00000000000..6835a3a57ea --- /dev/null +++ b/tests/queries/0_stateless/02481_default_value_used_in_row_level_filter.sql @@ -0,0 +1,25 @@ +DROP TABLE IF EXISTS test_rlp; + +CREATE TABLE test_rlp (a Int32, b Int32) ENGINE=MergeTree() ORDER BY a SETTINGS index_granularity=5; + +INSERT INTO test_rlp SELECT number, number FROM numbers(15); + +ALTER TABLE test_rlp ADD COLUMN c Int32 DEFAULT b+10; + +-- { echoOn } + +SELECT a, c FROM test_rlp WHERE c%2 == 0 AND b < 5; + +DROP POLICY IF EXISTS test_rlp_policy ON test_rlp; + +CREATE ROW POLICY test_rlp_policy ON test_rlp FOR SELECT USING c%2 == 0 TO default; + +SELECT a, c FROM test_rlp WHERE b < 5 SETTINGS optimize_move_to_prewhere = 0; + +SELECT a, c FROM test_rlp PREWHERE b < 5; + +-- { echoOff } + +DROP POLICY test_rlp_policy ON test_rlp; + +DROP TABLE test_rlp; From f004eea413804004885dfd223a743d78df8de219 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov <440544+davenger@users.noreply.github.com> Date: Fri, 18 Nov 2022 23:42:45 +0100 Subject: [PATCH 24/24] Add columns required fro defaults calculation --- src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index c3f069498be..525d76d0f0f 100644 --- a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -315,7 +315,9 @@ MergeTreeReadTaskColumns getReadTaskColumns( /// 1. Columns for row level filter if (prewhere_info->row_level_filter) { - Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames(); + Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames(); + injectRequiredColumns( + data_part_info_for_reader, storage_snapshot, with_subcolumns, row_filter_column_names); result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, row_filter_column_names)); pre_name_set.insert(row_filter_column_names.begin(), row_filter_column_names.end()); } @@ -323,7 +325,7 @@ MergeTreeReadTaskColumns getReadTaskColumns( /// 2. Columns for prewhere Names all_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames(); - const auto injected_pre_columns = injectRequiredColumns( + injectRequiredColumns( data_part_info_for_reader, storage_snapshot, with_subcolumns, all_pre_column_names); for (const auto & name : all_pre_column_names)