From 2c159061edce46adf9dfd83a82129cb68ed1a9f3 Mon Sep 17 00:00:00 2001 From: Ziy1-Tan Date: Mon, 1 May 2023 21:40:14 +0800 Subject: [PATCH] Support `_path` and `_file` virtual columns for table function `url`. --- docs/en/sql-reference/table-functions/url.md | 9 ++ docs/ru/sql-reference/table-functions/url.md | 9 ++ docs/zh/sql-reference/table-functions/url.md | 8 ++ src/Storages/StorageURL.cpp | 86 ++++++++++++++++--- src/Storages/StorageURL.h | 2 + ...02725_url_support_virtual_column.reference | 5 ++ .../02725_url_support_virtual_column.sql | 9 ++ 7 files changed, 115 insertions(+), 13 deletions(-) create mode 100644 tests/queries/0_stateless/02725_url_support_virtual_column.reference create mode 100644 tests/queries/0_stateless/02725_url_support_virtual_column.sql diff --git a/docs/en/sql-reference/table-functions/url.md b/docs/en/sql-reference/table-functions/url.md index 014dc3ae853..9b4a02e2393 100644 --- a/docs/en/sql-reference/table-functions/url.md +++ b/docs/en/sql-reference/table-functions/url.md @@ -46,3 +46,12 @@ SELECT * FROM test_table; Patterns in curly brackets `{ }` are used to generate a set of shards or to specify failover addresses. Supported pattern types and examples see in the description of the [remote](remote.md#globs-in-addresses) function. Character `|` inside patterns is used to specify failover addresses. They are iterated in the same order as listed in the pattern. The number of generated addresses is limited by [glob_expansion_max_elements](../../operations/settings/settings.md#glob_expansion_max_elements) setting. + +## Virtual Columns + +- `_path` — Path to the `URL`. +- `_file` — Resource name of the `URL`. + +**See Also** + +- [Virtual columns](/docs/en/engines/table-engines/index.md#table_engines-virtual_columns) diff --git a/docs/ru/sql-reference/table-functions/url.md b/docs/ru/sql-reference/table-functions/url.md index e5d9faeec00..ec9548229c8 100644 --- a/docs/ru/sql-reference/table-functions/url.md +++ b/docs/ru/sql-reference/table-functions/url.md @@ -46,3 +46,12 @@ SELECT * FROM test_table; Шаблоны в фигурных скобках `{ }` используются, чтобы сгенерировать список шардов или указать альтернативные адреса на случай отказа. Поддерживаемые типы шаблонов и примеры смотрите в описании функции [remote](remote.md#globs-in-addresses). Символ `|` внутри шаблонов используется, чтобы задать адреса, если предыдущие оказались недоступны. Эти адреса перебираются в том же порядке, в котором они указаны в шаблоне. Количество адресов, которые могут быть сгенерированы, ограничено настройкой [glob_expansion_max_elements](../../operations/settings/settings.md#glob_expansion_max_elements). + +## Виртуальные столбцы + +- `_path` — Путь до `URL`. +- `_file` — Имя ресурса `URL`. + +**Смотрите также** + +- [Виртуальные столбцы](index.md#table_engines-virtual_columns) diff --git a/docs/zh/sql-reference/table-functions/url.md b/docs/zh/sql-reference/table-functions/url.md index d3b7665d21b..c8ca9b775b2 100644 --- a/docs/zh/sql-reference/table-functions/url.md +++ b/docs/zh/sql-reference/table-functions/url.md @@ -41,3 +41,11 @@ CREATE TABLE test_table (column1 String, column2 UInt32) ENGINE=Memory; INSERT INTO FUNCTION url('http://127.0.0.1:8123/?query=INSERT+INTO+test_table+FORMAT+CSV', 'CSV', 'column1 String, column2 UInt32') VALUES ('http interface', 42); SELECT * FROM test_table; ``` +## 虚拟列 {#virtual-columns} + +- `_path` — `URL`路径。 +- `_file` — 资源名称。 + +**另请参阅** + +- [虚拟列](https://clickhouse.com/docs/en/operations/table_engines/#table_engines-virtual_columns) diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index d2df3881c71..1847eccce12 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -38,6 +38,7 @@ #include #include #include +#include "DataTypes/DataTypeString.h" namespace DB @@ -159,6 +160,9 @@ namespace using FailoverOptions = std::vector; std::vector uri_list_to_read; std::atomic next_uri_to_read = 0; + + bool need_path_column = false; + bool need_file_column = false; }; using URIInfoPtr = std::shared_ptr; @@ -176,6 +180,27 @@ namespace } } + static Block getBlockForSource(const Block & block_for_format, const URIInfoPtr & uri_info) + { + auto res = block_for_format; + if (uri_info->need_path_column) + { + res.insert( + {DataTypeLowCardinality{std::make_shared()}.createColumn(), + std::make_shared(std::make_shared()), + "_path"}); + } + + if (uri_info->need_file_column) + { + res.insert( + {DataTypeLowCardinality{std::make_shared()}.createColumn(), + std::make_shared(std::make_shared()), + "_file"}); + } + return res; + } + StorageURLSource( URIInfoPtr uri_info_, const std::string & http_method, @@ -193,7 +218,7 @@ namespace const HTTPHeaderEntries & headers_ = {}, const URIParams & params = {}, bool glob_url = false) - : ISource(sample_block), name(std::move(name_)), uri_info(uri_info_) + : ISource(getBlockForSource(sample_block, uri_info_)), name(std::move(name_)), uri_info(uri_info_) { auto headers = getHeaders(headers_); @@ -204,7 +229,7 @@ namespace throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list"); auto first_option = uri_options.begin(); - auto buf_factory = getFirstAvailableURLReadBuffer( + auto [actual_uri, buf_factory] = getFirstAvailableURIAndReadBuffer( first_option, uri_options.end(), context, @@ -217,6 +242,8 @@ namespace glob_url, uri_options.size() == 1); + curr_uri = actual_uri; + try { total_size += buf_factory->getFileSize(); @@ -269,15 +296,32 @@ namespace if (current_uri_pos >= uri_info->uri_list_to_read.size()) return {}; - auto current_uri = uri_info->uri_list_to_read[current_uri_pos]; + auto current_uri_options = uri_info->uri_list_to_read[current_uri_pos]; - initialize(current_uri); + initialize(current_uri_options); } Chunk chunk; if (reader->pull(chunk)) { UInt64 num_rows = chunk.getNumRows(); + + const String & path{curr_uri.getPath()}; + if (uri_info->need_path_column) + { + auto column = DataTypeLowCardinality{std::make_shared()}.createColumnConst(num_rows, path); + chunk.addColumn(column->convertToFullColumnIfConst()); + } + + if (uri_info->need_file_column) + { + const size_t last_slash_pos = path.find_last_of('/'); + auto file_name = path.substr(last_slash_pos + 1); + auto column + = DataTypeLowCardinality{std::make_shared()}.createColumnConst(num_rows, std::move(file_name)); + chunk.addColumn(column->convertToFullColumnIfConst()); + } + if (num_rows && total_size) updateRowsProgressApprox( *this, chunk, total_size, total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max); @@ -291,7 +335,7 @@ namespace return {}; } - static SeekableReadBufferFactoryPtr getFirstAvailableURLReadBuffer( + static std::tuple getFirstAvailableURIAndReadBuffer( std::vector::const_iterator & option, const std::vector::const_iterator & end, ContextPtr context, @@ -352,7 +396,7 @@ namespace } } - return res; + return std::make_tuple(request_uri, std::move(res)); } throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", options, first_exception_message); @@ -364,6 +408,7 @@ namespace String name; URIInfoPtr uri_info; + Poco::URI curr_uri; std::unique_ptr pipeline; std::unique_ptr reader; @@ -538,10 +583,10 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( if (urlWithGlobs(uri)) { size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements; - auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses); + auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses, "url"); for (const auto & description : uri_descriptions) { - auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses); + auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses,"url"); urls_to_check.insert(urls_to_check.end(), options.begin(), options.end()); } } @@ -559,7 +604,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( if (it == urls_to_check.cend()) return nullptr; - auto buf_factory = StorageURLSource::getFirstAvailableURLReadBuffer( + auto [_, buf_factory] = StorageURLSource::getFirstAvailableURIAndReadBuffer( it, urls_to_check.cend(), context, @@ -621,18 +666,27 @@ Pipe IStorageURLBase::read( size_t max_download_threads = local_context->getSettingsRef().max_download_threads; + auto uri_info = std::make_shared(); + for (const auto & column : column_names) + { + if (column == "_path") + uri_info->need_path_column = true; + if (column == "_file") + uri_info->need_file_column = true; + } + if (urlWithGlobs(uri)) { size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements; - auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses); + auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses, "url"); if (num_streams > uri_descriptions.size()) num_streams = uri_descriptions.size(); /// For each uri (which acts like shard) check if it has failover options - auto uri_info = std::make_shared(); + uri_info->uri_list_to_read.reserve(uri_descriptions.size()); for (const auto & description : uri_descriptions) - uri_info->uri_list_to_read.emplace_back(parseRemoteDescription(description, 0, description.size(), '|', max_addresses)); + uri_info->uri_list_to_read.emplace_back(parseRemoteDescription(description, 0, description.size(), '|', max_addresses, "url")); Pipes pipes; pipes.reserve(num_streams); @@ -662,7 +716,6 @@ Pipe IStorageURLBase::read( } else { - auto uri_info = std::make_shared(); uri_info->uri_list_to_read.emplace_back(std::vector{uri}); return Pipe(std::make_shared( uri_info, @@ -771,6 +824,13 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad } } +NamesAndTypesList IStorageURLBase::getVirtuals() const +{ + return NamesAndTypesList{ + {"_path", std::make_shared(std::make_shared())}, + {"_file", std::make_shared(std::make_shared())}}; +} + SchemaCache & IStorageURLBase::getSchemaCache(const ContextPtr & context) { static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_url", DEFAULT_SCHEMA_CACHE_ELEMENTS)); diff --git a/src/Storages/StorageURL.h b/src/Storages/StorageURL.h index 65ee78e1e73..48498836e8d 100644 --- a/src/Storages/StorageURL.h +++ b/src/Storages/StorageURL.h @@ -43,6 +43,8 @@ public: bool supportsPartitionBy() const override { return true; } + NamesAndTypesList getVirtuals() const override; + static ColumnsDescription getTableStructureFromData( const String & format, const String & uri, diff --git a/tests/queries/0_stateless/02725_url_support_virtual_column.reference b/tests/queries/0_stateless/02725_url_support_virtual_column.reference new file mode 100644 index 00000000000..9d4051cc242 --- /dev/null +++ b/tests/queries/0_stateless/02725_url_support_virtual_column.reference @@ -0,0 +1,5 @@ +/ + + 1 +/ 1 +/other/pagecounts-raw/2007/2007-12/pagecounts-20071209-180000.gz pagecounts-20071209-180000.gz 856769 diff --git a/tests/queries/0_stateless/02725_url_support_virtual_column.sql b/tests/queries/0_stateless/02725_url_support_virtual_column.sql new file mode 100644 index 00000000000..417835a1e53 --- /dev/null +++ b/tests/queries/0_stateless/02725_url_support_virtual_column.sql @@ -0,0 +1,9 @@ +-- Tags: no-parallel + +select _path from url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String'); +select _file from url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String'); +select _file, count() from url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String') group by _file; +select _path, _file, s from url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String'); +select _path, _file, s from url('http://127.0.0.1:8123/?query=select+1&user=default&password=wrong', LineAsString, 's String'); -- { serverError RECEIVED_ERROR_FROM_REMOTE_IO_SERVER } + +SELECT _path, _file, count() FROM url('https://dumps.wikimedia.org/other/pagecounts-raw/2007/2007-12/pagecounts-20071209-180000.gz', LineAsString) group by _path, _file;