Add virtual column _headers for url table engine

This commit is contained in:
flynn 2024-08-25 07:48:42 +00:00
parent 054b38d4eb
commit e4aceed36a
8 changed files with 80 additions and 5 deletions

View File

@ -18,7 +18,9 @@
#define Net_HTTPResponse_INCLUDED
#include <map>
#include <vector>
#include "Poco/Net/HTTPCookie.h"
#include "Poco/Net/HTTPMessage.h"
#include "Poco/Net/Net.h"
@ -180,6 +182,8 @@ namespace Net
/// May throw an exception in case of a malformed
/// Set-Cookie header.
void getHeaders(std::map<std::string, std::string> & headers) const;
void write(std::ostream & ostr) const;
/// Writes the HTTP response to the given
/// output stream.

View File

@ -209,6 +209,15 @@ void HTTPResponse::getCookies(std::vector<HTTPCookie>& cookies) const
}
}
void HTTPResponse::getHeaders(std::map<std::string, std::string> & headers) const
{
headers.clear();
for (const auto & it : *this)
{
headers.emplace(it.first, it.second);
}
}
void HTTPResponse::write(std::ostream& ostr) const
{

View File

@ -443,6 +443,7 @@ std::unique_ptr<ReadBuffer> ReadWriteBufferFromHTTP::initialize()
}
response.getCookies(cookies);
response.getHeaders(response_headers);
content_encoding = response.get("Content-Encoding", "");
// Remember file size. It'll be used to report eof in next nextImpl() call.
@ -680,6 +681,19 @@ std::string ReadWriteBufferFromHTTP::getResponseCookie(const std::string & name,
return def;
}
Map ReadWriteBufferFromHTTP::getResponseHeaders() const
{
Map map;
for (const auto & header : response_headers)
{
Tuple elem;
elem.emplace_back(header.first);
elem.emplace_back(header.second);
map.emplace_back(elem);
}
return map;
}
void ReadWriteBufferFromHTTP::setNextCallback(NextCallback next_callback_)
{
next_callback = next_callback_;

View File

@ -90,6 +90,9 @@ private:
std::unique_ptr<ReadBuffer> impl;
std::vector<Poco::Net::HTTPCookie> cookies;
std::map<String, String> response_headers;
HTTPHeaderEntries http_header_entries;
std::function<void(size_t)> next_callback;
@ -187,6 +190,8 @@ public:
HTTPFileInfo getFileInfo();
static HTTPFileInfo parseFileInfo(const Poco::Net::HTTPResponse & response, size_t requested_range_begin);
Map getResponseHeaders() const;
};
using ReadWriteBufferFromHTTPPtr = std::unique_ptr<ReadWriteBufferFromHTTP>;

View File

@ -44,10 +44,11 @@
#include <IO/HTTPHeaderEntries.h>
#include <algorithm>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeString.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Poco/Net/HTTPRequest.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace ProfileEvents
{
@ -167,7 +168,19 @@ IStorageURLBase::IStorageURLBase(
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), context_, getSampleURI(uri, context_), format_settings));
auto virtual_columns_desc = VirtualColumnUtils::getVirtualsForFileLikeStorage(
storage_metadata.getColumns(), context_, getSampleURI(uri, context_), format_settings);
if (!storage_metadata.getColumns().has("_headers"))
{
virtual_columns_desc.addEphemeral(
"_headers",
std::make_shared<DataTypeMap>(
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())),
"");
}
setVirtuals(virtual_columns_desc);
}
@ -292,11 +305,13 @@ StorageURLSource::StorageURLSource(
const URIParams & params,
bool glob_url,
bool need_only_count_)
: SourceWithKeyCondition(info.source_header, false), WithContext(context_)
: SourceWithKeyCondition(info.source_header, false)
, WithContext(context_)
, name(std::move(name_))
, columns_description(info.columns_description)
, requested_columns(info.requested_columns)
, requested_virtual_columns(info.requested_virtual_columns)
, need_headers_virtual_column(info.requested_virtual_columns.contains("_headers"))
, requested_virtual_columns(info.requested_virtual_columns.eraseNames({"_headers"}))
, block_for_format(info.format_header)
, uri_iterator(uri_iterator_)
, format(format_)
@ -436,6 +451,20 @@ Chunk StorageURLSource::generate()
.path = curr_uri.getPath(),
.size = current_file_size,
}, getContext(), columns_description);
if (need_headers_virtual_column)
{
if (!http_response_headers_initialized)
{
http_response_headers = dynamic_cast<ReadWriteBufferFromHTTP *>(read_buf.get())->getResponseHeaders();
http_response_headers_initialized = true;
}
auto type = std::make_shared<DataTypeMap>(
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
chunk.addColumn(type->createColumnConst(chunk.getNumRows(), http_response_headers));
}
return chunk;
}
@ -446,6 +475,7 @@ Chunk StorageURLSource::generate()
reader.reset();
input_format.reset();
read_buf.reset();
http_response_headers_initialized = false;
total_rows_in_file = 0;
}
return {};

View File

@ -220,6 +220,7 @@ private:
String name;
ColumnsDescription columns_description;
NamesAndTypesList requested_columns;
bool need_headers_virtual_column;
NamesAndTypesList requested_virtual_columns;
Block block_for_format;
std::shared_ptr<IteratorWrapper> uri_iterator;
@ -233,6 +234,9 @@ private:
Poco::Net::HTTPBasicCredentials credentials;
Map http_response_headers;
bool http_response_headers_initialized = false;
std::unique_ptr<ReadBuffer> read_buf;
std::shared_ptr<IInputFormat> input_format;
std::unique_ptr<QueryPipeline> pipeline;

View File

@ -0,0 +1,2 @@
Map(LowCardinality(String), LowCardinality(String))
1 1

View File

@ -0,0 +1,7 @@
SELECT toTypeName(_headers)
FROM url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String');
SELECT
*,
mapFromString(_headers['X-ClickHouse-Summary'])['read_rows']
FROM url('http://127.0.0.1:8123/?query=select+1&user=default', LineAsString, 's String');