Merge pull request #49356 from Ziy1-Tan/vcol

Support for `_path` and `_file` virtual columns for table function `url`.
This commit is contained in:
Nikolay Degterinsky 2023-05-22 18:10:32 +02:00 committed by GitHub
commit d4b89cb643
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 169 additions and 30 deletions

View File

@ -46,3 +46,12 @@ SELECT * FROM test_table;
Patterns in curly brackets `{ }` are used to generate a set of shards or to specify failover addresses. Supported pattern types and examples see in the description of the [remote](remote.md#globs-in-addresses) function.
Character `|` inside patterns is used to specify failover addresses. They are iterated in the same order as listed in the pattern. The number of generated addresses is limited by [glob_expansion_max_elements](../../operations/settings/settings.md#glob_expansion_max_elements) setting.
## Virtual Columns
- `_path` — Path to the `URL`.
- `_file` — Resource name of the `URL`.
**See Also**
- [Virtual columns](/docs/en/engines/table-engines/index.md#table_engines-virtual_columns)

View File

@ -46,3 +46,12 @@ SELECT * FROM test_table;
Шаблоны в фигурных скобках `{ }` используются, чтобы сгенерировать список шардов или указать альтернативные адреса на случай отказа. Поддерживаемые типы шаблонов и примеры смотрите в описании функции [remote](remote.md#globs-in-addresses).
Символ `|` внутри шаблонов используется, чтобы задать адреса, если предыдущие оказались недоступны. Эти адреса перебираются в том же порядке, в котором они указаны в шаблоне. Количество адресов, которые могут быть сгенерированы, ограничено настройкой [glob_expansion_max_elements](../../operations/settings/settings.md#glob_expansion_max_elements).
## Виртуальные столбцы
- `_path` — Путь до `URL`.
- `_file` — Имя ресурса `URL`.
**Смотрите также**
- [Виртуальные столбцы](index.md#table_engines-virtual_columns)

View File

@ -41,3 +41,11 @@ CREATE TABLE test_table (column1 String, column2 UInt32) ENGINE=Memory;
INSERT INTO FUNCTION url('http://127.0.0.1:8123/?query=INSERT+INTO+test_table+FORMAT+CSV', 'CSV', 'column1 String, column2 UInt32') VALUES ('http interface', 42);
SELECT * FROM test_table;
```
## 虚拟列 {#virtual-columns}
- `_path``URL`路径。
- `_file` — 资源名称。
**另请参阅**
- [虚拟列](https://clickhouse.com/docs/en/operations/table_engines/#table_engines-virtual_columns)

View File

@ -64,7 +64,8 @@ static bool parseNumber(const String & description, size_t l, size_t r, size_t &
* abc{1..9}de{f,g,h} - is a direct product, 27 shards.
* abc{1..9}de{0|1} - is a direct product, 9 shards, in each 2 replicas.
*/
std::vector<String> parseRemoteDescription(const String & description, size_t l, size_t r, char separator, size_t max_addresses)
std::vector<String>
parseRemoteDescription(const String & description, size_t l, size_t r, char separator, size_t max_addresses, const String & func_name)
{
std::vector<String> res;
std::vector<String> cur;
@ -97,28 +98,41 @@ std::vector<String> parseRemoteDescription(const String & description, size_t l,
if (cnt == 0) break;
}
if (cnt != 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': incorrect brace sequence in first argument");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}': incorrect brace sequence in first argument", func_name);
/// The presence of a dot - numeric interval
if (last_dot != -1)
{
size_t left, right;
if (description[last_dot - 1] != '.')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': incorrect argument in braces (only one dot): {}",
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Table function '{}': incorrect argument in braces (only one dot): {}",
func_name,
description.substr(i, m - i + 1));
if (!parseNumber(description, i + 1, last_dot - 1, left))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': "
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Table function '{}': "
"incorrect argument in braces (Incorrect left number): {}",
func_name,
description.substr(i, m - i + 1));
if (!parseNumber(description, last_dot + 1, m, right))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': "
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Table function '{}': "
"incorrect argument in braces (Incorrect right number): {}",
func_name,
description.substr(i, m - i + 1));
if (left > right)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': "
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Table function '{}': "
"incorrect argument in braces (left number is greater then right): {}",
func_name,
description.substr(i, m - i + 1));
if (right - left + 1 > max_addresses)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': first argument generates too many result addresses");
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Table function '{}': first argument generates too many result addresses", func_name);
bool add_leading_zeroes = false;
size_t len = last_dot - 1 - (i + 1);
/// If the left and right borders have equal numbers, then you must add leading zeros.
@ -161,7 +175,7 @@ std::vector<String> parseRemoteDescription(const String & description, size_t l,
res.insert(res.end(), cur.begin(), cur.end());
if (res.size() > max_addresses)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function 'remote': first argument generates too many result addresses");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}': first argument generates too many result addresses", func_name);
return res;
}

View File

@ -15,7 +15,8 @@ namespace DB
* abc{1..9}de{f,g,h} - is a direct product, 27 shards.
* abc{1..9}de{0|1} - is a direct product, 9 shards, in each 2 replicas.
*/
std::vector<String> parseRemoteDescription(const String & description, size_t l, size_t r, char separator, size_t max_addresses);
std::vector<String> parseRemoteDescription(
const String & description, size_t l, size_t r, char separator, size_t max_addresses, const String & func_name = "remote");
/// Parse remote description for external database (MySQL or PostgreSQL).
std::vector<std::pair<String, uint16_t>> parseRemoteDescriptionForExternalDatabase(const String & description, size_t max_addresses, UInt16 default_port);

View File

@ -38,6 +38,7 @@
#include <Common/logger_useful.h>
#include <Poco/Net/HTTPRequest.h>
#include <regex>
#include <DataTypes/DataTypeString.h>
namespace DB
@ -159,6 +160,9 @@ namespace
using FailoverOptions = std::vector<String>;
std::vector<FailoverOptions> uri_list_to_read;
std::atomic<size_t> next_uri_to_read = 0;
bool need_path_column = false;
bool need_file_column = false;
};
using URIInfoPtr = std::shared_ptr<URIInfo>;
@ -176,6 +180,27 @@ namespace
}
}
static Block getBlockForSource(const Block & block_for_format, const URIInfoPtr & uri_info)
{
auto res = block_for_format;
if (uri_info->need_path_column)
{
res.insert(
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
"_path"});
}
if (uri_info->need_file_column)
{
res.insert(
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
"_file"});
}
return res;
}
StorageURLSource(
URIInfoPtr uri_info_,
const std::string & http_method,
@ -193,7 +218,7 @@ namespace
const HTTPHeaderEntries & headers_ = {},
const URIParams & params = {},
bool glob_url = false)
: ISource(sample_block), name(std::move(name_)), uri_info(uri_info_)
: ISource(getBlockForSource(sample_block, uri_info_)), name(std::move(name_)), uri_info(uri_info_)
{
auto headers = getHeaders(headers_);
@ -204,7 +229,7 @@ namespace
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list");
auto first_option = uri_options.begin();
auto buf_factory = getFirstAvailableURLReadBuffer(
auto [actual_uri, buf_factory] = getFirstAvailableURIAndReadBuffer(
first_option,
uri_options.end(),
context,
@ -217,6 +242,8 @@ namespace
glob_url,
uri_options.size() == 1);
curr_uri = actual_uri;
try
{
total_size += buf_factory->getFileSize();
@ -269,15 +296,32 @@ namespace
if (current_uri_pos >= uri_info->uri_list_to_read.size())
return {};
auto current_uri = uri_info->uri_list_to_read[current_uri_pos];
auto current_uri_options = uri_info->uri_list_to_read[current_uri_pos];
initialize(current_uri);
initialize(current_uri_options);
}
Chunk chunk;
if (reader->pull(chunk))
{
UInt64 num_rows = chunk.getNumRows();
const String & path{curr_uri.getPath()};
if (uri_info->need_path_column)
{
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, path);
chunk.addColumn(column->convertToFullColumnIfConst());
}
if (uri_info->need_file_column)
{
const size_t last_slash_pos = path.find_last_of('/');
auto file_name = path.substr(last_slash_pos + 1);
auto column
= DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
chunk.addColumn(column->convertToFullColumnIfConst());
}
if (num_rows && total_size)
updateRowsProgressApprox(
*this, chunk, total_size, total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max);
@ -291,7 +335,7 @@ namespace
return {};
}
static SeekableReadBufferFactoryPtr getFirstAvailableURLReadBuffer(
static std::tuple<Poco::URI, SeekableReadBufferFactoryPtr> getFirstAvailableURIAndReadBuffer(
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
ContextPtr context,
@ -352,7 +396,7 @@ namespace
}
}
return res;
return std::make_tuple(request_uri, std::move(res));
}
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", options, first_exception_message);
@ -364,6 +408,7 @@ namespace
String name;
URIInfoPtr uri_info;
Poco::URI curr_uri;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
@ -538,10 +583,10 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
if (urlWithGlobs(uri))
{
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses, "url");
for (const auto & description : uri_descriptions)
{
auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses);
auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses, "url");
urls_to_check.insert(urls_to_check.end(), options.begin(), options.end());
}
}
@ -559,7 +604,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
if (it == urls_to_check.cend())
return nullptr;
auto buf_factory = StorageURLSource::getFirstAvailableURLReadBuffer(
auto [_, buf_factory] = StorageURLSource::getFirstAvailableURIAndReadBuffer(
it,
urls_to_check.cend(),
context,
@ -631,18 +676,27 @@ Pipe IStorageURLBase::read(
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
for (const auto & column : column_names)
{
if (column == "_path")
uri_info->need_path_column = true;
if (column == "_file")
uri_info->need_file_column = true;
}
if (urlWithGlobs(uri))
{
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses, "url");
if (num_streams > uri_descriptions.size())
num_streams = uri_descriptions.size();
/// For each uri (which acts like shard) check if it has failover options
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.reserve(uri_descriptions.size());
for (const auto & description : uri_descriptions)
uri_info->uri_list_to_read.emplace_back(parseRemoteDescription(description, 0, description.size(), '|', max_addresses));
uri_info->uri_list_to_read.emplace_back(parseRemoteDescription(description, 0, description.size(), '|', max_addresses, "url"));
Pipes pipes;
pipes.reserve(num_streams);
@ -672,7 +726,6 @@ Pipe IStorageURLBase::read(
}
else
{
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.emplace_back(std::vector<String>{uri});
return Pipe(std::make_shared<StorageURLSource>(
uri_info,
@ -781,6 +834,13 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
}
}
NamesAndTypesList IStorageURLBase::getVirtuals() const
{
return NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}
SchemaCache & IStorageURLBase::getSchemaCache(const ContextPtr & context)
{
static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_url", DEFAULT_SCHEMA_CACHE_ELEMENTS));

View File

@ -43,6 +43,8 @@ public:
bool supportsPartitionBy() const override { return true; }
NamesAndTypesList getVirtuals() const override;
static ColumnsDescription getTableStructureFromData(
const String & format,
const String & uri,

View File

@ -7,6 +7,7 @@ import tempfile
import threading
import os
import traceback
from urllib.parse import urljoin
import urllib.request
import subprocess
from io import StringIO
@ -163,6 +164,7 @@ def test_select(
requests=[],
answers=[],
test_data="",
res_path="",
):
with open(CSV_DATA, "w") as f: # clear file
f.write("")
@ -183,7 +185,7 @@ def test_select(
tbl = table_name
if not tbl:
tbl = "url('{addr}', 'CSV', '{schema}')".format(
addr=HTTP_SERVER_URL_STR, schema=schema
addr=urljoin(HTTP_SERVER_URL_STR, res_path), schema=schema
)
check_answers(requests[i].format(tbl=tbl), answers[i])
@ -252,6 +254,20 @@ def main():
"select double, count(*) from {tbl} group by double order by double": "7.7\t2\n9.9\t10",
}
pathname = CSV_DATA
filename = os.path.basename(CSV_DATA)
select_virtual_requests = {
"select _path from {tbl}": "\n".join(pathname for _ in range(2)),
"select _file from {tbl}": "\n".join(filename for _ in range(2)),
"select _file, from {tbl} order by _path": "\n".join(
filename for _ in range(2)
),
"select _path, _file from {tbl}": "\n".join(
f"{pathname}\t{filename}" for _ in range(2)
),
"select _path, count(*) from {tbl} group by _path": f"{pathname}\t2",
}
t, httpd = start_server()
t.start()
# test table with url engine
@ -267,6 +283,14 @@ def main():
answers=list(select_only_requests.values()),
test_data=test_data,
)
# test table function url for virtual column
test_select(
requests=list(select_virtual_requests.keys()),
answers=list(select_virtual_requests.values()),
test_data=test_data,
res_path=CSV_DATA,
)
# test insert into table with url engine
test_insert(
table_name="test_table_insert",

View File

@ -0,0 +1,4 @@
/
1
/ 1

View File

@ -0,0 +1,8 @@
-- Tags: no-parallel
select _path from url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String');
select _file from url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String');
select _file, count() from url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String') group by _file;
select _path, _file, s from url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String');
select _path, _file, s from url('http://127.0.0.1:8123/?query=select+1&user=default&password=wrong', LineAsString, 's String'); -- { serverError RECEIVED_ERROR_FROM_REMOTE_IO_SERVER }