Merge pull request #68867 from ucasfl/url-engine

Add virtual column _headers for url table engine
This commit is contained in:
vdimir 2024-08-27 13:27:19 +00:00 committed by GitHub
commit bb22736bc3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 87 additions and 7 deletions

View File

@ -18,7 +18,9 @@
#define Net_HTTPResponse_INCLUDED
#include <map>
#include <vector>
#include "Poco/Net/HTTPCookie.h"
#include "Poco/Net/HTTPMessage.h"
#include "Poco/Net/Net.h"
@ -180,6 +182,8 @@ namespace Net
/// May throw an exception in case of a malformed
/// Set-Cookie header.
void getHeaders(std::map<std::string, std::string> & headers) const;
void write(std::ostream & ostr) const;
/// Writes the HTTP response to the given
/// output stream.

View File

@ -209,6 +209,15 @@ void HTTPResponse::getCookies(std::vector<HTTPCookie>& cookies) const
}
}
void HTTPResponse::getHeaders(std::map<std::string, std::string> & headers) const
{
headers.clear();
for (const auto & it : *this)
{
headers.emplace(it.first, it.second);
}
}
void HTTPResponse::write(std::ostream& ostr) const
{

View File

@ -109,6 +109,7 @@ For partitioning by month, use the `toYYYYMM(date_column)` expression, where `da
- `_file` — Resource name of the `URL`. Type: `LowCardinalty(String)`.
- `_size` — Size of the resource in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
- `_headers` - HTTP response headers. Type: `Map(LowCardinality(String), LowCardinality(String))`.
## Storage Settings {#storage-settings}

View File

@ -54,6 +54,7 @@ Character `|` inside patterns is used to specify failover addresses. They are it
- `_file` — Resource name of the `URL`. Type: `LowCardinalty(String)`.
- `_size` — Size of the resource in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
- `_headers` - HTTP response headers. Type: `Map(LowCardinality(String), LowCardinality(String))`.
## Hive-style partitioning {#hive-style-partitioning}

View File

@ -443,6 +443,7 @@ std::unique_ptr<ReadBuffer> ReadWriteBufferFromHTTP::initialize()
}
response.getCookies(cookies);
response.getHeaders(response_headers);
content_encoding = response.get("Content-Encoding", "");
// Remember file size. It'll be used to report eof in next nextImpl() call.
@ -680,6 +681,19 @@ std::string ReadWriteBufferFromHTTP::getResponseCookie(const std::string & name,
return def;
}
Map ReadWriteBufferFromHTTP::getResponseHeaders() const
{
Map map;
for (const auto & header : response_headers)
{
Tuple elem;
elem.emplace_back(header.first);
elem.emplace_back(header.second);
map.emplace_back(elem);
}
return map;
}
void ReadWriteBufferFromHTTP::setNextCallback(NextCallback next_callback_)
{
next_callback = next_callback_;

View File

@ -90,6 +90,9 @@ private:
std::unique_ptr<ReadBuffer> impl;
std::vector<Poco::Net::HTTPCookie> cookies;
std::map<String, String> response_headers;
HTTPHeaderEntries http_header_entries;
std::function<void(size_t)> next_callback;
@ -187,6 +190,8 @@ public:
HTTPFileInfo getFileInfo();
static HTTPFileInfo parseFileInfo(const Poco::Net::HTTPResponse & response, size_t requested_range_begin);
Map getResponseHeaders() const;
};
using ReadWriteBufferFromHTTPPtr = std::unique_ptr<ReadWriteBufferFromHTTP>;

View File

@ -44,10 +44,11 @@
#include <IO/HTTPHeaderEntries.h>
#include <algorithm>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeString.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Poco/Net/HTTPRequest.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace ProfileEvents
{
@ -166,7 +167,19 @@ IStorageURLBase::IStorageURLBase(
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.columns, context_, getSampleURI(uri, context_), format_settings));
auto virtual_columns_desc = VirtualColumnUtils::getVirtualsForFileLikeStorage(
storage_metadata.columns, context_, getSampleURI(uri, context_), format_settings);
if (!storage_metadata.getColumns().has("_headers"))
{
virtual_columns_desc.addEphemeral(
"_headers",
std::make_shared<DataTypeMap>(
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())),
"");
}
setVirtuals(virtual_columns_desc);
setInMemoryMetadata(storage_metadata);
}
@ -292,11 +305,13 @@ StorageURLSource::StorageURLSource(
const URIParams & params,
bool glob_url,
bool need_only_count_)
: SourceWithKeyCondition(info.source_header, false), WithContext(context_)
: SourceWithKeyCondition(info.source_header, false)
, WithContext(context_)
, name(std::move(name_))
, columns_description(info.columns_description)
, requested_columns(info.requested_columns)
, requested_virtual_columns(info.requested_virtual_columns)
, need_headers_virtual_column(info.requested_virtual_columns.contains("_headers"))
, requested_virtual_columns(info.requested_virtual_columns.eraseNames({"_headers"}))
, block_for_format(info.format_header)
, uri_iterator(uri_iterator_)
, format(format_)
@ -431,11 +446,28 @@ Chunk StorageURLSource::generate()
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, requested_virtual_columns,
chunk,
requested_virtual_columns,
{
.path = curr_uri.getPath(),
.size = current_file_size,
}, getContext());
},
getContext());
chassert(dynamic_cast<ReadWriteBufferFromHTTP *>(read_buf.get()));
if (need_headers_virtual_column)
{
if (!http_response_headers_initialized)
{
http_response_headers = dynamic_cast<ReadWriteBufferFromHTTP *>(read_buf.get())->getResponseHeaders();
http_response_headers_initialized = true;
}
auto type = std::make_shared<DataTypeMap>(
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
chunk.addColumn(type->createColumnConst(chunk.getNumRows(), http_response_headers)->convertToFullColumnIfConst());
}
return chunk;
}
@ -446,6 +478,7 @@ Chunk StorageURLSource::generate()
reader.reset();
input_format.reset();
read_buf.reset();
http_response_headers_initialized = false;
total_rows_in_file = 0;
}
return {};

View File

@ -220,6 +220,7 @@ private:
String name;
ColumnsDescription columns_description;
NamesAndTypesList requested_columns;
bool need_headers_virtual_column;
NamesAndTypesList requested_virtual_columns;
Block block_for_format;
std::shared_ptr<IteratorWrapper> uri_iterator;
@ -233,6 +234,9 @@ private:
Poco::Net::HTTPBasicCredentials credentials;
Map http_response_headers;
bool http_response_headers_initialized = false;
std::unique_ptr<ReadBuffer> read_buf;
std::shared_ptr<IInputFormat> input_format;
std::unique_ptr<QueryPipeline> pipeline;

View File

@ -0,0 +1,2 @@
Map(LowCardinality(String), LowCardinality(String))
1 1

View File

@ -0,0 +1,7 @@
SELECT toTypeName(_headers)
FROM url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String');
SELECT
*,
mapFromString(_headers['X-ClickHouse-Summary'])['read_rows']
FROM url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String');