Replace old named collections code for url

This commit is contained in:
kssenii 2022-12-16 23:57:09 +01:00
parent 980d5ce289
commit 30547d2dcd
45 changed files with 459 additions and 312 deletions

View File

@ -37,7 +37,7 @@
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
#include <Storages/registerStorages.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Formats/registerFormats.h>

View File

@ -60,7 +60,7 @@
#include <Storages/System/attachInformationSchemaTables.h>
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Storages/Cache/registerRemoteFileMetadatas.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/registerFunctions.h>

View File

@ -8,6 +8,7 @@
#include <IO/IOThreadPool.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/HTTPHeaderEntries.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
@ -35,7 +36,7 @@ namespace
auto settings = context->getStorageS3Settings().getSettings(s3_uri.uri.toString());
Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key);
HeaderCollection headers;
HTTPHeaderEntries headers;
if (access_key_id.empty())
{
credentials = Aws::Auth::AWSCredentials(settings.auth_settings.access_key_id, settings.auth_settings.secret_access_key);

View File

@ -263,7 +263,7 @@ QueryPipeline ExternalDictionaryLibraryBridgeHelper::loadBase(const Poco::URI &
0,
DBMS_DEFAULT_BUFFER_SIZE,
getContext()->getReadSettings(),
ReadWriteBufferFromHTTP::HTTPHeaderEntries{});
HTTPHeaderEntries{});
auto source = FormatFactory::instance().getInput(ExternalDictionaryLibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, getContext(), DEFAULT_BLOCK_SIZE);
source->addBuffer(std::move(read_buf_ptr));

View File

@ -106,7 +106,7 @@ if (TARGET ch_contrib::nats_io)
endif()
add_headers_and_sources(dbms Storages/MeiliSearch)
add_headers_and_sources(dbms Storages/NamedCollections)
add_headers_and_sources(dbms Common/NamedCollections)
if (TARGET ch_contrib::amqp_cpp)
add_headers_and_sources(dbms Storages/RabbitMQ)

View File

@ -1,4 +1,4 @@
#include <Storages/NamedCollections/NamedCollectionConfiguration.h>
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Common/Exception.h>
#include <Common/SettingsChanges.h>
@ -145,6 +145,52 @@ ConfigurationPtr createConfiguration(const std::string & root_name, const Settin
return config;
}
void listKeys(
const Poco::Util::AbstractConfiguration & config,
std::queue<std::string> enumerate_paths,
std::set<std::string, std::less<>> & result,
ssize_t depth)
{
if (enumerate_paths.empty())
enumerate_paths.push("");
const bool do_finish = depth >= 0 && --depth < 0;
auto initial_paths = std::move(enumerate_paths);
enumerate_paths = {};
while (!initial_paths.empty())
{
auto path = initial_paths.front();
initial_paths.pop();
Poco::Util::AbstractConfiguration::Keys keys;
if (path.empty())
config.keys(keys);
else
config.keys(path, keys);
if (keys.empty())
{
result.insert(path);
}
else if (do_finish)
{
for (const auto & key : keys)
result.emplace(path.empty() ? key : path + '.' + key);
}
else
{
for (const auto & key : keys)
enumerate_paths.emplace(path.empty() ? key : path + '.' + key);
}
}
if (enumerate_paths.empty())
return;
listKeys(config, enumerate_paths, result, depth);
}
template String getConfigValue<String>(const Poco::Util::AbstractConfiguration & config,
const std::string & path);
template UInt64 getConfigValue<UInt64>(const Poco::Util::AbstractConfiguration & config,

View File

@ -1,5 +1,7 @@
#pragma once
#include <Poco/Util/AbstractConfiguration.h>
#include <queue>
#include <set>
namespace DB
{
@ -39,6 +41,28 @@ void removeConfigValue(
ConfigurationPtr createConfiguration(const std::string & root_name, const SettingsChanges & settings);
/// Enumerate keys paths of the config recursively.
/// E.g. if `enumerate_paths` = {"root.key1"} and config like
/// <root>
/// <key0></key0>
/// <key1>
/// <key2></key2>
/// <key3>
/// <key4></key4>
/// </key3>
/// </key1>
/// </root>
/// the `result` will contain: "root.key0", "root.key1.key2" and "root.key1.key3.key4"
///
/// depth == -1 means to return all keys with full path: "root.key0", "root.key1.key2", "root.key1.key3.key4".
/// depth == 0 means: "root.key0" and "root.key1"
/// depth == 1 means: "root.key0", "root.key1.key2" and "root.key1.key3"
/// and so on.
void listKeys(
const Poco::Util::AbstractConfiguration & config,
std::queue<std::string> enumerate_paths,
std::set<std::string, std::less<>> & result,
ssize_t depth);
}
}

View File

@ -1,4 +1,4 @@
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/FieldVisitorToString.h>
#include <Common/logger_useful.h>
@ -13,8 +13,8 @@
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Interpreters/Context.h>
#include <Storages/NamedCollections/NamedCollections.h>
#include <Storages/NamedCollections/NamedCollectionConfiguration.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
namespace fs = std::filesystem;
@ -69,10 +69,10 @@ public:
{
const auto collection_prefix = getCollectionPrefix(collection_name);
std::queue<std::string> enumerate_input;
std::set<std::string> enumerate_result;
std::set<std::string, std::less<>> enumerate_result;
enumerate_input.push(collection_prefix);
collectKeys(config, std::move(enumerate_input), enumerate_result);
NamedCollectionConfiguration::listKeys(config, std::move(enumerate_input), enumerate_result, -1);
/// Collection does not have any keys.
/// (`enumerate_result` == <collection_path>).
@ -97,50 +97,6 @@ private:
{
return fmt::format("{}.{}", NAMED_COLLECTIONS_CONFIG_PREFIX, collection_name);
}
/// Enumerate keys paths of the config recursively.
/// E.g. if `enumerate_paths` = {"root.key1"} and config like
/// <root>
/// <key0></key0>
/// <key1>
/// <key2></key2>
/// <key3>
/// <key4></key4>
/// </key3>
/// </key1>
/// </root>
/// the `result` will contain two strings: "root.key1.key2" and "root.key1.key3.key4"
static void collectKeys(
const Poco::Util::AbstractConfiguration & config,
std::queue<std::string> enumerate_paths,
std::set<std::string> & result)
{
if (enumerate_paths.empty())
return;
auto initial_paths = std::move(enumerate_paths);
enumerate_paths = {};
while (!initial_paths.empty())
{
auto path = initial_paths.front();
initial_paths.pop();
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(path, keys);
if (keys.empty())
{
result.insert(path);
}
else
{
for (const auto & key : keys)
enumerate_paths.emplace(path + '.' + key);
}
}
collectKeys(config, enumerate_paths, result);
}
};

View File

@ -3,8 +3,8 @@
#include <Interpreters/Context.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Storages/NamedCollections/NamedCollectionConfiguration.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <ranges>
@ -229,9 +229,21 @@ public:
assert(removed);
}
Keys getKeys() const
Keys getKeys(ssize_t depth, const std::string & prefix) const
{
return keys;
if (depth == -1)
{
/// Return all keys with full depth.
return keys;
}
std::queue<std::string> enumerate_input;
if (!prefix.empty())
enumerate_input.push(prefix);
Keys result;
Configuration::listKeys(*config, enumerate_input, result, depth);
return result;
}
Keys::const_iterator begin() const
@ -379,10 +391,10 @@ MutableNamedCollectionPtr NamedCollection::duplicate() const
std::move(impl), collection_name, NamedCollectionUtils::SourceId::NONE, true));
}
NamedCollection::Keys NamedCollection::getKeys() const
NamedCollection::Keys NamedCollection::getKeys(ssize_t depth, const std::string & prefix) const
{
std::lock_guard lock(mutex);
return pimpl->getKeys();
return pimpl->getKeys(depth, prefix);
}
template <bool Locked> NamedCollection::const_iterator NamedCollection::begin() const

View File

@ -1,7 +1,7 @@
#pragma once
#include <Interpreters/Context.h>
#include <Storages/NamedCollections/NamedCollections_fwd.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <Common/NamedCollections/NamedCollections_fwd.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
namespace Poco { namespace Util { class AbstractConfiguration; } }
@ -47,7 +47,7 @@ public:
MutableNamedCollectionPtr duplicate() const;
Keys getKeys() const;
Keys getKeys(ssize_t depth = -1, const std::string & prefix = "") const;
using iterator = typename Keys::iterator;
using const_iterator = typename Keys::const_iterator;

View File

@ -1,6 +1,6 @@
#include <Common/tests/gtest_global_context.h>
#include <Storages/NamedCollections/NamedCollections.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/DOM/DOMParser.h>
#include <gtest/gtest.h>
@ -143,3 +143,82 @@ key2:
ASSERT_EQ(collection->get<Int64>("key2.key2_2.key2_3.key2_5"), 5);
}
TEST(NamedCollections, NestedConfigDuplicateKeys)
{
std::string xml(R"CONFIG(<clickhouse>
<named_collections>
<collection>
<headers>
<header>
<name>key1</name>
<value>value1</value>
</header>
<header>
<name>key2</name>
<value>value2</value>
</header>
<header>
<name>key3</name>
<value>value3</value>
</header>
</headers>
</collection>
</named_collections>
</clickhouse>)CONFIG");
Poco::XML::DOMParser dom_parser;
Poco::AutoPtr<Poco::XML::Document> document = dom_parser.parseString(xml);
Poco::AutoPtr<Poco::Util::XMLConfiguration> config = new Poco::Util::XMLConfiguration(document);
NamedCollectionUtils::loadFromConfig(*config);
auto collection = NamedCollectionFactory::instance().get("collection");
auto keys = collection->getKeys();
ASSERT_EQ(keys.size(), 6);
ASSERT_TRUE(keys.contains("headers.header.name"));
ASSERT_TRUE(keys.contains("headers.header[1].name"));
ASSERT_TRUE(keys.contains("headers.header[2].name"));
ASSERT_TRUE(keys.contains("headers.header.value"));
ASSERT_TRUE(keys.contains("headers.header[1].value"));
ASSERT_TRUE(keys.contains("headers.header[2].value"));
ASSERT_EQ(collection->get<String>("headers.header.name"), "key1");
ASSERT_EQ(collection->get<String>("headers.header[1].name"), "key2");
ASSERT_EQ(collection->get<String>("headers.header[2].name"), "key3");
ASSERT_EQ(collection->get<String>("headers.header.value"), "value1");
ASSERT_EQ(collection->get<String>("headers.header[1].value"), "value2");
ASSERT_EQ(collection->get<String>("headers.header[2].value"), "value3");
keys = collection->getKeys(0);
ASSERT_EQ(keys.size(), 1);
ASSERT_TRUE(keys.contains("headers"));
keys = collection->getKeys(0, "headers");
ASSERT_EQ(keys.size(), 3);
ASSERT_TRUE(keys.contains("headers.header"));
ASSERT_TRUE(keys.contains("headers.header[1]"));
ASSERT_TRUE(keys.contains("headers.header[2]"));
keys = collection->getKeys(1);
ASSERT_EQ(keys.size(), 3);
ASSERT_TRUE(keys.contains("headers.header"));
ASSERT_TRUE(keys.contains("headers.header[1]"));
ASSERT_TRUE(keys.contains("headers.header[2]"));
keys = collection->getKeys(2);
ASSERT_EQ(keys.size(), 6);
ASSERT_TRUE(keys.contains("headers.header.name"));
ASSERT_TRUE(keys.contains("headers.header[1].name"));
ASSERT_TRUE(keys.contains("headers.header[2].name"));
ASSERT_TRUE(keys.contains("headers.header.value"));
ASSERT_TRUE(keys.contains("headers.header[1].value"));
ASSERT_TRUE(keys.contains("headers.header[2].value"));
}

View File

@ -78,7 +78,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
LOG_INFO(log, "S3 configuration was updated");
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key);
HeaderCollection headers = auth_settings.headers;
auto headers = auth_settings.headers;
static constexpr size_t s3_max_redirects = 10;
static constexpr bool enable_s3_requests_logging = false;

View File

@ -227,7 +227,7 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
auto settings_config_prefix = config_prefix + ".http";
Poco::Net::HTTPBasicCredentials credentials;
ReadWriteBufferFromHTTP::HTTPHeaderEntries header_entries;
HTTPHeaderEntries header_entries;
String url;
String endpoint;
String format;
@ -246,7 +246,7 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
header_entries.reserve(named_collection->configuration.headers.size());
for (const auto & [key, value] : named_collection->configuration.headers)
header_entries.emplace_back(std::make_tuple(key, value));
header_entries.emplace_back(key, value);
}
else
{
@ -271,7 +271,7 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
{
const auto header_key = config.getString(headers_prefix + "." + key + ".name", "");
const auto header_value = config.getString(headers_prefix + "." + key + ".value", "");
header_entries.emplace_back(std::make_tuple(header_key, header_value));
header_entries.emplace_back(header_key, header_value);
}
}

View File

@ -29,7 +29,7 @@ public:
const std::string format;
const std::string update_field;
const UInt64 update_lag;
const ReadWriteBufferFromHTTP::HTTPHeaderEntries header_entries;
const HTTPHeaderEntries header_entries;
};
HTTPDictionarySource(

View File

@ -74,7 +74,7 @@ std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
0,
buf_size,
read_settings,
ReadWriteBufferFromHTTP::HTTPHeaderEntries{},
HTTPHeaderEntries{},
range,
&context->getRemoteHostFilter(),
/* delay_initialization */true,

View File

@ -0,0 +1,18 @@
#pragma once
#include <string>
namespace DB
{
struct HTTPHeaderEntry
{
std::string name;
std::string value;
HTTPHeaderEntry(const std::string & name_, const std::string & value_) : name(name_), value(value_) {}
inline bool operator==(const HTTPHeaderEntry & other) const { return name == other.name && value == other.value; }
};
using HTTPHeaderEntries = std::vector<HTTPHeaderEntry>;
}

View File

@ -10,6 +10,7 @@
#include <IO/ReadHelpers.h>
#include <IO/ReadSettings.h>
#include <IO/WithFileName.h>
#include <IO/HTTPHeaderEntries.h>
#include <Common/logger_useful.h>
#include <base/sleep.h>
#include <base/types.h>
@ -91,9 +92,6 @@ namespace detail
class ReadWriteBufferFromHTTPBase : public SeekableReadBuffer, public WithFileName, public WithFileSize
{
public:
using HTTPHeaderEntry = std::tuple<std::string, std::string>;
using HTTPHeaderEntries = std::vector<HTTPHeaderEntry>;
/// HTTP range, including right bound [begin, end].
struct Range
{
@ -159,8 +157,8 @@ namespace detail
if (out_stream_callback)
request.setChunkedTransferEncoding(true);
for (auto & http_header_entry : http_header_entries)
request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry));
for (auto & [header, value] : http_header_entries)
request.set(header, value);
if (withPartialContent())
{
@ -319,11 +317,11 @@ namespace detail
auto iter = std::find_if(
http_header_entries.begin(),
http_header_entries.end(),
[&user_agent](const HTTPHeaderEntry & entry) { return std::get<0>(entry) == user_agent; });
[&user_agent](const HTTPHeaderEntry & entry) { return entry.name == user_agent; });
if (iter == http_header_entries.end())
{
http_header_entries.emplace_back(std::make_pair("User-Agent", fmt::format("ClickHouse/{}", VERSION_STRING)));
http_header_entries.emplace_back("User-Agent", fmt::format("ClickHouse/{}", VERSION_STRING));
}
if (!delay_initialization)
@ -779,7 +777,7 @@ public:
UInt64 max_redirects_ = 0,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
ReadSettings settings_ = {},
ReadWriteBufferFromHTTP::HTTPHeaderEntries http_header_entries_ = {},
HTTPHeaderEntries http_header_entries_ = {},
const RemoteHostFilter * remote_host_filter_ = nullptr,
bool delay_initialization_ = true,
bool use_external_buffer_ = false,
@ -851,7 +849,7 @@ private:
UInt64 max_redirects;
size_t buffer_size;
ReadSettings settings;
ReadWriteBufferFromHTTP::HTTPHeaderEntries http_header_entries;
HTTPHeaderEntries http_header_entries;
const RemoteHostFilter * remote_host_filter;
bool delay_initialization;
bool use_external_buffer;

View File

@ -11,8 +11,8 @@
#include <Common/Throttler_fwd.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/HTTPHeaderEntries.h>
#include <IO/S3/SessionAwareIOStream.h>
#include <Storages/HeaderCollection.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/core/http/HttpClient.h>
@ -51,7 +51,7 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
bool for_disk_s3;
ThrottlerPtr get_request_throttler;
ThrottlerPtr put_request_throttler;
HeaderCollection extra_headers;
HTTPHeaderEntries extra_headers;
void updateSchemeAndRegion();
@ -169,7 +169,7 @@ private:
/// NOTE: DELETE and CANCEL requests are not throttled by either put or get throttler
ThrottlerPtr put_request_throttler;
const HeaderCollection extra_headers;
const HTTPHeaderEntries extra_headers;
};
}

View File

@ -26,6 +26,7 @@
#include <IO/ReadHelpers.h>
#include <IO/ReadSettings.h>
#include <IO/S3Common.h>
#include <IO/HTTPHeaderEntries.h>
#include <Storages/StorageS3Settings.h>
#include "TestPocoHTTPServer.h"
@ -97,7 +98,7 @@ TEST(IOTestAwsS3Client, AppendExtraSSECHeaders)
client_configuration.retryStrategy = std::make_shared<NoRetryStrategy>();
String server_side_encryption_customer_key_base64 = "Kv/gDqdWVGIT4iDqg+btQvV3lc1idlm4WI+MMOyHOAw=";
DB::HeaderCollection headers;
DB::HTTPHeaderEntries headers;
bool use_environment_credentials = false;
bool use_insecure_imds_request = false;

View File

@ -9,6 +9,7 @@
# include <Common/quoteString.h>
# include <IO/WriteBufferFromString.h>
# include <IO/HTTPHeaderEntries.h>
# include <Storages/StorageS3Settings.h>
# include <aws/core/Version.h>
@ -700,7 +701,7 @@ namespace S3
const String & access_key_id,
const String & secret_access_key,
const String & server_side_encryption_customer_key_base64,
HeaderCollection headers,
HTTPHeaderEntries headers,
bool use_environment_credentials,
bool use_insecure_imds_request)
{
@ -916,7 +917,7 @@ AuthSettings AuthSettings::loadFromConfig(const std::string & config_elem, const
if (config.has(config_elem + ".use_insecure_imds_request"))
use_insecure_imds_request = config.getBool(config_elem + ".use_insecure_imds_request");
HeaderCollection headers;
HTTPHeaderEntries headers;
Poco::Util::AbstractConfiguration::Keys subconfig_keys;
config.keys(config_elem, subconfig_keys);
for (const std::string & subkey : subconfig_keys)
@ -927,7 +928,7 @@ AuthSettings AuthSettings::loadFromConfig(const std::string & config_elem, const
auto delimiter = header_str.find(':');
if (delimiter == std::string::npos)
throw Exception("Malformed s3 header value", ErrorCodes::INVALID_CONFIG_PARAMETER);
headers.emplace_back(HttpHeader{header_str.substr(0, delimiter), header_str.substr(delimiter + 1, String::npos)});
headers.emplace_back(header_str.substr(0, delimiter), header_str.substr(delimiter + 1, String::npos));
}
}

View File

@ -1,6 +1,5 @@
#pragma once
#include <Storages/HeaderCollection.h>
#include <IO/S3/PocoHTTPClient.h>
#include <string>
@ -18,6 +17,7 @@
#include <Common/Exception.h>
#include <Common/Throttler_fwd.h>
#include <IO/HTTPHeaderEntries.h>
namespace Aws::S3
{
@ -80,7 +80,7 @@ public:
const String & access_key_id,
const String & secret_access_key,
const String & server_side_encryption_customer_key_base64,
HeaderCollection headers,
DB::HTTPHeaderEntries headers,
bool use_environment_credentials,
bool use_insecure_imds_request);
@ -154,7 +154,7 @@ struct AuthSettings
std::string region;
std::string server_side_encryption_customer_key_base64;
HeaderCollection headers;
DB::HTTPHeaderEntries headers;
std::optional<bool> use_environment_credentials;
std::optional<bool> use_insecure_imds_request;

View File

@ -3,7 +3,7 @@
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
namespace DB

View File

@ -4,7 +4,7 @@
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
namespace DB

View File

@ -3,7 +3,7 @@
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
namespace DB

View File

@ -3,7 +3,7 @@
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/HeaderCollection.h>
#include <IO/HTTPHeaderEntries.h>
namespace DB
@ -109,7 +109,7 @@ struct URLBasedDataSourceConfiguration
String user;
String password;
HeaderCollection headers;
HTTPHeaderEntries headers;
String http_method;
void set(const URLBasedDataSourceConfiguration & conf);

View File

@ -1,18 +0,0 @@
#pragma once
#include <string>
namespace DB
{
struct HttpHeader
{
std::string name;
std::string value;
HttpHeader(const std::string & name_, const std::string & value_) : name(name_), value(value_) {}
inline bool operator==(const HttpHeader & other) const { return name == other.name && value == other.value; }
};
using HeaderCollection = std::vector<HttpHeader>;
}

View File

@ -1,18 +0,0 @@
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Storages/NamedCollections/NamedCollections_fwd.h>
#include <unordered_set>
#include <string_view>
namespace DB
{
NamedCollectionPtr tryGetNamedCollectionWithOverrides(ASTs asts);
void validateNamedCollection(
const NamedCollection & collection,
const std::unordered_set<std::string_view> & required_keys,
const std::unordered_set<std::string_view> & optional_keys);
}

View File

@ -1,5 +1,5 @@
#include "NamedCollectionsHelpers.h"
#include <Storages/NamedCollections/NamedCollections.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Parsers/ASTIdentifier.h>
@ -83,12 +83,30 @@ NamedCollectionPtr tryGetNamedCollectionWithOverrides(ASTs asts)
void validateNamedCollection(
const NamedCollection & collection,
const std::unordered_set<std::string_view> & required_keys,
const std::unordered_set<std::string_view> & optional_keys)
const std::unordered_set<std::string_view> & optional_keys,
const std::vector<std::regex> & optional_regex_keys)
{
const auto & keys = collection.getKeys();
auto required_keys_copy = required_keys;
for (const auto & key : keys)
{
if (!required_keys.contains(key) && !optional_keys.contains(key))
auto it = required_keys_copy.find(key);
if (it != required_keys_copy.end())
{
required_keys_copy.erase(it);
continue;
}
if (optional_keys.contains(key))
continue;
auto match = std::find_if(
optional_regex_keys.begin(), optional_regex_keys.end(),
[&](const std::regex & regex) { return std::regex_search(key, regex); })
!= optional_regex_keys.end();
if (!match)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
@ -97,16 +115,22 @@ void validateNamedCollection(
}
}
for (const auto & key : required_keys)
if (!required_keys_copy.empty())
{
if (!keys.contains(key))
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Key `{}` is required, but not specified. Required keys: {}, optional keys: {}",
key, fmt::join(required_keys, ", "), fmt::join(optional_keys, ", "));
}
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Required keys ({}) are not specified. All required keys: {}, optional keys: {}",
fmt::join(required_keys_copy, ", "), fmt::join(required_keys, ", "), fmt::join(optional_keys, ", "));
}
}
HTTPHeaderEntries getHeadersFromNamedCollection(const NamedCollection & collection)
{
HTTPHeaderEntries headers;
auto keys = collection.getKeys(0, "headers");
for (const auto & key : keys)
headers.emplace_back(collection.get<String>(key + ".name"), collection.get<String>(key + ".value"));
return headers;
}
}

View File

@ -0,0 +1,23 @@
#pragma once
#include <Parsers/IAST_fwd.h>
#include <IO/HTTPHeaderEntries.h>
#include <Common/NamedCollections/NamedCollections_fwd.h>
#include <unordered_set>
#include <string_view>
#include <regex>
namespace DB
{
NamedCollectionPtr tryGetNamedCollectionWithOverrides(ASTs asts);
void validateNamedCollection(
const NamedCollection & collection,
const std::unordered_set<std::string_view> & required_keys,
const std::unordered_set<std::string_view> & optional_keys,
const std::vector<std::regex> & optional_regex_keys = {});
HTTPHeaderEntries getHeadersFromNamedCollection(const NamedCollection & collection);
}

View File

@ -28,8 +28,8 @@
#include <Storages/getVirtualsForStorage.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageURL.h>
#include <Storages/NamedCollections/NamedCollectionsHelpers.h>
#include <Storages/NamedCollections/NamedCollections.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Storages/ReadFromStorageProgress.h>
#include <IO/ReadBufferFromS3.h>
@ -1200,48 +1200,27 @@ void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration
void StorageS3::processNamedCollectionResult(StorageS3Configuration & configuration, const NamedCollection & collection)
{
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys);
std::string filename;
for (const auto & key : collection)
{
if (key == "url")
configuration.url = collection.get<String>(key);
else if (key == "access_key_id")
configuration.auth_settings.access_key_id = collection.get<String>(key);
else if (key == "secret_access_key")
configuration.auth_settings.secret_access_key = collection.get<String>(key);
else if (key == "filename")
filename = collection.get<String>(key);
else if (key == "format")
configuration.format = collection.get<String>(key);
else if (key == "compression" || key == "compression_method")
configuration.compression_method = collection.get<String>(key);
else if (key == "structure")
configuration.structure = collection.get<String>(key);
else if (key == "use_environment_credentials")
configuration.auth_settings.use_environment_credentials = collection.get<UInt64>(key);
else if (key == "max_single_read_retries")
configuration.request_settings.max_single_read_retries = collection.get<UInt64>(key);
else if (key == "min_upload_part_size")
configuration.request_settings.min_upload_part_size = collection.get<UInt64>(key);
else if (key == "upload_part_size_multiply_factor")
configuration.request_settings.upload_part_size_multiply_factor = collection.get<UInt64>(key);
else if (key == "upload_part_size_multiply_parts_count_threshold")
configuration.request_settings.upload_part_size_multiply_parts_count_threshold = collection.get<UInt64>(key);
else if (key == "max_single_part_upload_size")
configuration.request_settings.max_single_part_upload_size = collection.get<UInt64>(key);
else if (key == "max_connections")
configuration.request_settings.max_connections = collection.get<UInt64>(key);
else
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Unknown configuration key `{}` for StorageS3, "
"expected: url, [access_key_id, secret_access_key], "
"name of used format and [compression_method].",
key);
}
configuration.url = collection.get<String>("url");
auto filename = collection.getOrDefault<String>("filename", "");
if (!filename.empty())
configuration.url = std::filesystem::path(configuration.url) / filename;
configuration.auth_settings.access_key_id = collection.getOrDefault<String>("access_key_id", "");
configuration.auth_settings.secret_access_key = collection.getOrDefault<String>("secret_access_key", "");
configuration.auth_settings.use_environment_credentials = collection.getOrDefault<UInt64>("use_environment_credentials", 0);
configuration.format = collection.getOrDefault<String>("format", "auto");
configuration.compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
configuration.structure = collection.getOrDefault<String>("structure", "auto");
configuration.request_settings.max_single_read_retries = collection.getOrDefault<UInt64>("max_single_read_retries", 0);
configuration.request_settings.min_upload_part_size = collection.getOrDefault<UInt64>("min_upload_part_size", 0);
configuration.request_settings.upload_part_size_multiply_factor = collection.getOrDefault<UInt64>("upload_part_size_multiply_factor", 0);
configuration.request_settings.upload_part_size_multiply_parts_count_threshold = collection.getOrDefault<UInt64>("upload_part_size_multiply_parts_count_threshold", 0);
configuration.request_settings.max_single_part_upload_size = collection.getOrDefault<UInt64>("max_single_part_upload_size", 0);
configuration.request_settings.max_connections = collection.getOrDefault<UInt64>("max_connections", 0);
}
StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPtr local_context)
@ -1267,7 +1246,7 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
"Storage S3 requires 1 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto header_it = StorageURL::collectHeaders(engine_args, configuration, local_context);
auto header_it = StorageURL::collectHeaders(engine_args, configuration.headers, local_context);
if (header_it != engine_args.end())
engine_args.erase(header_it);

View File

@ -233,13 +233,13 @@ public:
bool static_configuration = true;
/// Headers from ast is a part of static configuration.
HeaderCollection headers_from_ast;
HTTPHeaderEntries headers_from_ast;
S3Configuration(
const String & url_,
const S3::AuthSettings & auth_settings_,
const S3Settings::RequestSettings & request_settings_,
const HeaderCollection & headers_from_ast_)
const HTTPHeaderEntries & headers_from_ast_)
: uri(S3::URI(url_))
, auth_settings(auth_settings_)
, request_settings(request_settings_)

View File

@ -8,7 +8,6 @@
#include <base/types.h>
#include <Interpreters/Context_fwd.h>
#include <Common/Throttler_fwd.h>
#include <Storages/HeaderCollection.h>
#include <IO/S3Common.h>

View File

@ -2,6 +2,7 @@
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Storages/PartitionedSink.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/threadPoolCallbackRunner.h>
@ -27,6 +28,7 @@
#include <Common/ThreadStatus.h>
#include <Common/parseRemoteDescription.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadWriteBufferFromHTTP.h>
@ -34,6 +36,7 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/logger_useful.h>
#include <Poco/Net/HTTPRequest.h>
#include <regex>
namespace DB
@ -50,6 +53,27 @@ static constexpr auto bad_arguments_error_message = "Storage URL requires 1-4 ar
"url, name of used format (taken from file extension by default), "
"optional compression method, optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
static const std::unordered_set<std::string_view> required_configuration_keys = {
"url",
};
static const std::unordered_set<std::string_view> optional_configuration_keys = {
"format",
"compression",
"compression_method",
"structure",
"filename",
"headers.header.name",
"headers.header.value",
};
/// Headers in config file will have structure "headers.header.name" and "headers.header.value".
/// But Poco::AbstractConfiguration converts them into "header", "header[1]", "header[2]".
static const std::vector<std::regex> optional_regex_keys = {
std::regex("headers.header\\[[\\d]*\\].name"),
std::regex("headers.header\\[[\\d]*\\].value"),
};
static bool urlWithGlobs(const String & uri)
{
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) || uri.find('|') != std::string::npos;
@ -66,7 +90,7 @@ IStorageURLBase::IStorageURLBase(
const ConstraintsDescription & constraints_,
const String & comment,
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_,
const HTTPHeaderEntries & headers_,
const String & http_method_,
ASTPtr partition_by_)
: IStorage(table_id_)
@ -97,9 +121,9 @@ IStorageURLBase::IStorageURLBase(
namespace
{
ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders(const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
HTTPHeaderEntries getHeaders(const HTTPHeaderEntries & headers_)
{
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers(headers_.begin(), headers_.end());
HTTPHeaderEntries headers(headers_.begin(), headers_.end());
// Propagate OpenTelemetry trace context, if any, downstream.
const auto &current_trace_context = OpenTelemetry::CurrentContext();
@ -165,7 +189,7 @@ namespace
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
size_t download_threads,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
const HTTPHeaderEntries & headers_ = {},
const URIParams & params = {},
bool glob_url = false)
: ISource(sample_block), name(std::move(name_)), uri_info(uri_info_)
@ -246,7 +270,7 @@ namespace
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
Poco::Net::HTTPBasicCredentials & credentials,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const HTTPHeaderEntries & headers,
bool glob_url,
bool delay_initialization,
size_t download_threads)
@ -567,7 +591,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
const String & format,
const String & uri,
CompressionMethod compression_method,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const HTTPHeaderEntries & headers,
const std::optional<FormatSettings> & format_settings,
ContextPtr context)
{
@ -816,7 +840,7 @@ SchemaCache & IStorageURLBase::getSchemaCache(const ContextPtr & context)
std::optional<ColumnsDescription> IStorageURLBase::tryGetColumnsFromCache(
const Strings & urls,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const HTTPHeaderEntries & headers,
const Poco::Net::HTTPBasicCredentials & credentials,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
@ -859,7 +883,7 @@ void IStorageURLBase::addColumnsToCache(
std::optional<time_t> IStorageURLBase::getLastModificationTime(
const String & url,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const HTTPHeaderEntries & headers,
const Poco::Net::HTTPBasicCredentials & credentials,
const ContextPtr & context)
{
@ -901,7 +925,7 @@ StorageURL::StorageURL(
const String & comment,
ContextPtr context_,
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_,
const HTTPHeaderEntries & headers_,
const String & http_method_,
ASTPtr partition_by_)
: IStorageURLBase(
@ -978,15 +1002,18 @@ FormatSettings StorageURL::getFormatSettingsFromArgs(const StorageFactory::Argum
}
ASTs::iterator StorageURL::collectHeaders(
ASTs & url_function_args, URLBasedDataSourceConfiguration & configuration, ContextPtr context)
ASTs & url_function_args, HTTPHeaderEntries & header_entries, ContextPtr context)
{
ASTs::iterator headers_it = url_function_args.end();
for (auto arg_it = url_function_args.begin(); arg_it != url_function_args.end(); ++arg_it)
{
const auto * headers_ast_function = (*arg_it)->as<ASTFunction>();
if (headers_ast_function && headers_ast_function->name == "headers")
if (headers_ast_function)
{
if (headers_ast_function->name != "headers")
continue;
if (headers_it != url_function_args.end())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
@ -1021,7 +1048,7 @@ ASTs::iterator StorageURL::collectHeaders(
if (arg_value.getType() != Field::Types::Which::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as header value");
configuration.headers.emplace_back(arg_name, arg_value.safeGet<String>());
header_entries.emplace_back(arg_name, arg_value.safeGet<String>());
}
headers_it = arg_it;
@ -1035,40 +1062,41 @@ ASTs::iterator StorageURL::collectHeaders(
return headers_it;
}
URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, ContextPtr local_context)
void StorageURL::processNamedCollectionResult(Configuration & configuration, const NamedCollection & collection)
{
URLBasedDataSourceConfiguration configuration;
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys, optional_regex_keys);
if (auto named_collection = getURLBasedDataSourceConfiguration(args, local_context))
configuration.url = collection.get<String>("url");
configuration.headers = getHeadersFromNamedCollection(collection);
configuration.http_method = collection.getOrDefault<String>("http_method", "");
if (!configuration.http_method.empty() && configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_PUT)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Http method can be POST or PUT (current: {}). For insert default is POST, for select GET",
configuration.http_method);
configuration.format = collection.getOrDefault<String>("format", "auto");
configuration.compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
configuration.structure = collection.getOrDefault<String>("structure", "auto");
}
StorageURL::Configuration StorageURL::getConfiguration(ASTs & args, ContextPtr local_context)
{
StorageURL::Configuration configuration;
if (auto named_collection = tryGetNamedCollectionWithOverrides(args))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
if (!configuration.http_method.empty() && configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_PUT)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Http method can be POST or PUT (current: {}). For insert default is POST, for select GET",
configuration.http_method);
if (!storage_specific_args.empty())
{
String illegal_args;
for (const auto & arg : storage_specific_args)
{
if (!illegal_args.empty())
illegal_args += ", ";
illegal_args += arg.first;
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown argument `{}` for storage URL", illegal_args);
}
StorageURL::processNamedCollectionResult(configuration, *named_collection);
collectHeaders(args, configuration.headers, local_context);
}
else
{
if (args.empty() || args.size() > 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, bad_arguments_error_message);
auto header_it = collectHeaders(args, configuration, local_context);
auto header_it = collectHeaders(args, configuration.headers, local_context);
if (header_it != args.end())
args.erase(header_it);
@ -1082,6 +1110,12 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(Poco::URI(configuration.url).getPath(), true);
for (const auto & [header, value] : configuration.headers)
{
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
}
return configuration;
}
@ -1096,14 +1130,6 @@ void registerStorageURL(StorageFactory & factory)
auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext());
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
{
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(header, value);
}
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
@ -1118,7 +1144,7 @@ void registerStorageURL(StorageFactory & factory)
args.comment,
args.getContext(),
configuration.compression_method,
headers,
configuration.headers,
configuration.http_method,
partition_by);
},

View File

@ -1,11 +1,12 @@
#pragma once
#include <Storages/IStorage.h>
#include <Poco/URI.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Formats/FormatSettings.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/HTTPHeaderEntries.h>
#include <Storages/IStorage.h>
#include <Storages/StorageFactory.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/Cache/SchemaCache.h>
@ -18,6 +19,7 @@ class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
struct ConnectionTimeouts;
class NamedCollection;
/**
* This class represents table engine for external urls.
@ -45,7 +47,7 @@ public:
const String & format,
const String & uri,
CompressionMethod compression_method,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const HTTPHeaderEntries & headers,
const std::optional<FormatSettings> & format_settings,
ContextPtr context);
@ -62,7 +64,7 @@ protected:
const ConstraintsDescription & constraints_,
const String & comment,
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
const HTTPHeaderEntries & headers_ = {},
const String & method_ = "",
ASTPtr partition_by = nullptr);
@ -74,7 +76,7 @@ protected:
// For `url` table function, we use settings from current query context.
// In this case, format_settings is not set.
std::optional<FormatSettings> format_settings;
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
HTTPHeaderEntries headers;
String http_method; /// For insert can choose Put instead of default Post.
ASTPtr partition_by;
@ -103,7 +105,7 @@ private:
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const Strings & urls,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const HTTPHeaderEntries & headers,
const Poco::Net::HTTPBasicCredentials & credentials,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
@ -118,7 +120,7 @@ private:
static std::optional<time_t> getLastModificationTime(
const String & url,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const HTTPHeaderEntries & headers,
const Poco::Net::HTTPBasicCredentials & credentials,
const ContextPtr & context);
};
@ -163,7 +165,7 @@ public:
const String & comment,
ContextPtr context_,
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
const HTTPHeaderEntries & headers_ = {},
const String & method_ = "",
ASTPtr partition_by_ = nullptr);
@ -179,9 +181,23 @@ public:
static FormatSettings getFormatSettingsFromArgs(const StorageFactory::Arguments & args);
static URLBasedDataSourceConfiguration getConfiguration(ASTs & args, ContextPtr context);
struct Configuration
{
std::string url;
std::string http_method;
static ASTs::iterator collectHeaders(ASTs & url_function_args, URLBasedDataSourceConfiguration & configuration, ContextPtr context);
std::string format = "auto";
std::string compression_method = "auto";
std::string structure = "auto";
HTTPHeaderEntries headers;
};
static Configuration getConfiguration(ASTs & args, ContextPtr context);
static ASTs::iterator collectHeaders(ASTs & url_function_args, HTTPHeaderEntries & header_entries, ContextPtr context);
static void processNamedCollectionResult(Configuration & configuration, const NamedCollection & collection);
};

View File

@ -8,7 +8,7 @@
#include <Access/Common/AccessType.h>
#include <Access/Common/AccessFlags.h>
#include <Columns/ColumnMap.h>
#include <Storages/NamedCollections/NamedCollections.h>
#include <Common/NamedCollections/NamedCollections.h>
namespace DB

View File

@ -33,7 +33,7 @@ void TableFunctionDeltaLake::parseArgumentsImpl(
if (args.empty() || args.size() > 6)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message);
auto header_it = StorageURL::collectHeaders(args, base_configuration, context);
auto header_it = StorageURL::collectHeaders(args, base_configuration.headers, context);
if (header_it != args.end())
args.erase(header_it);

View File

@ -33,7 +33,7 @@ void TableFunctionHudi::parseArgumentsImpl(
if (args.empty() || args.size() > 6)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message);
auto header_it = StorageURL::collectHeaders(args, base_configuration, context);
auto header_it = StorageURL::collectHeaders(args, base_configuration.headers, context);
if (header_it != args.end())
args.erase(header_it);

View File

@ -13,7 +13,7 @@
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Storages/NamedCollections/NamedCollectionsHelpers.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Formats/FormatFactory.h>
#include "registerTableFunctions.h"
#include <filesystem>
@ -40,7 +40,7 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
if (args.empty() || args.size() > 6)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message);
auto header_it = StorageURL::collectHeaders(args, s3_configuration, context);
auto header_it = StorageURL::collectHeaders(args, s3_configuration.headers, context);
if (header_it != args.end())
args.erase(header_it);

View File

@ -6,8 +6,8 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageURL.h>
#include <Storages/StorageExternalDistributed.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Interpreters/Context.h>
@ -16,67 +16,48 @@
namespace DB
{
static const String bad_arguments_error_message = "Table function URL can have the following arguments: "
"url, name of used format (taken from file extension by default), "
"optional table structure, optional compression method, "
"optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
void TableFunctionURL::parseArguments(const ASTPtr & ast_function, ContextPtr context)
void TableFunctionURL::parseArguments(const ASTPtr & ast, ContextPtr context)
{
const auto & func_args = ast_function->as<ASTFunction &>();
if (!func_args.arguments)
throw Exception("Table function 'URL' must have arguments.", ErrorCodes::BAD_ARGUMENTS);
const auto & ast_function = assert_cast<const ASTFunction *>(ast.get());
if (auto with_named_collection = getURLBasedDataSourceConfiguration(func_args.arguments->children, context))
auto & args = ast_function->children;
if (args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, bad_arguments_error_message);
auto & url_function_args = assert_cast<ASTExpressionList *>(args[0].get())->children;
if (auto named_collection = tryGetNamedCollectionWithOverrides(url_function_args))
{
auto [common_configuration, storage_specific_args] = with_named_collection.value();
configuration.set(common_configuration);
if (!configuration.http_method.empty()
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_PUT)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Method can be POST or PUT (current: {}). For insert default is POST, for select GET",
configuration.http_method);
if (!storage_specific_args.empty())
{
String illegal_args;
for (const auto & arg : storage_specific_args)
{
if (!illegal_args.empty())
illegal_args += ", ";
illegal_args += arg.first;
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown argument `{}` for table function URL", illegal_args);
}
StorageURL::processNamedCollectionResult(configuration, *named_collection);
filename = configuration.url;
structure = configuration.structure;
compression_method = configuration.compression_method;
format = configuration.format;
if (format == "auto")
format = FormatFactory::instance().getFormatFromFileName(Poco::URI(filename).getPath(), true);
structure = configuration.structure;
compression_method = configuration.compression_method;
StorageURL::collectHeaders(url_function_args, configuration.headers, context);
}
else
{
String bad_arguments_error_message = "Table function URL can have the following arguments: "
"url, name of used format (taken from file extension by default), "
"optional table structure, optional compression method, optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
auto & args = ast_function->children;
if (args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, bad_arguments_error_message);
auto * url_function_args_expr = assert_cast<ASTExpressionList *>(args[0].get());
auto & url_function_args = url_function_args_expr->children;
auto headers_it = StorageURL::collectHeaders(url_function_args, configuration, context);
auto headers_it = StorageURL::collectHeaders(url_function_args, configuration.headers, context);
/// ITableFunctionFileLike cannot parse headers argument, so remove it.
if (headers_it != url_function_args.end())
url_function_args.erase(headers_it);
ITableFunctionFileLike::parseArguments(ast_function, context);
ITableFunctionFileLike::parseArguments(ast, context);
}
}
@ -94,22 +75,10 @@ StoragePtr TableFunctionURL::getStorage(
String{},
global_context,
compression_method_,
getHeaders(),
configuration.headers,
configuration.http_method);
}
ReadWriteBufferFromHTTP::HTTPHeaderEntries TableFunctionURL::getHeaders() const
{
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
{
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(header, value);
}
return headers;
}
ColumnsDescription TableFunctionURL::getActualTableStructure(ContextPtr context) const
{
if (structure == "auto")
@ -118,7 +87,7 @@ ColumnsDescription TableFunctionURL::getActualTableStructure(ContextPtr context)
return StorageURL::getTableStructureFromData(format,
filename,
chooseCompressionMethod(Poco::URI(filename).getPath(), compression_method),
getHeaders(),
configuration.headers,
std::nullopt,
context);
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <TableFunctions/ITableFunctionFileLike.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/StorageURL.h>
#include <IO/ReadWriteBufferFromHTTP.h>
@ -24,19 +24,18 @@ public:
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
protected:
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
void parseArguments(const ASTPtr & ast, ContextPtr context) override;
private:
StoragePtr getStorage(
const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context,
const std::string & table_name, const String & compression_method_) const override;
const char * getStorageTypeName() const override { return "URL"; }
String getFormatFromFirstArgument() override;
ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders() const;
URLBasedDataSourceConfiguration configuration;
StorageURL::Configuration configuration;
};
}

View File

@ -18,7 +18,11 @@
<headers>
<header>
<name>X-ClickHouse-Format</name>
<value>JSONEachRow</value>
<value>Vertical</value>
</header>
<header>
<name>X-ClickHouse-Database</name>
<value>non_existing_database</value>
</header>
</headers>
</url_with_headers>

View File

@ -1,3 +1,6 @@
{"12":12}\n
Row 1:\n──────\n12: 12\n
{"12":12}\n
12\n
12\n
{"12":12}\n
{"12":12}\n

View File

@ -1,5 +1,10 @@
select * from url(url_with_headers, url='http://127.0.0.1:8123?query=select+12', format='RawBLOB');
select * from url('http://127.0.0.1:8123?query=select+12', 'RawBLOB', headers('X-ClickHouse-Format'='JSONEachRow'));
select * from url(url_with_headers, url='http://127.0.0.1:8123?query=select+12', format='RawBLOB'); -- { serverError 86 }
select * from url(url_with_headers, url='http://127.0.0.1:8123?query=select+12', format='RawBLOB', headers('X-ClickHouse-Database'='default'));
select * from url(url_with_headers, url='http://127.0.0.1:8123?query=select+12', format='RawBLOB', headers('X-ClickHouse-Database'='default', 'X-ClickHouse-Format'='JSONEachRow'));
select * from url(url_with_headers, url='http://127.0.0.1:8123?query=select+12', format='RawBLOB', headers('X-ClickHouse-Database'='kek')); -- { serverError 86 }
select * from url('http://127.0.0.1:8123?query=select+12', 'RawBLOB');
select * from url('http://127.0.0.1:8123?query=select+12', 'RawBLOB', headers('X-ClickHouse-Database'='default'));
select * from url('http://127.0.0.1:8123?query=select+12', 'RawBLOB', headers('X-ClickHouse-Database'='default', 'X-ClickHouse-Format'='JSONEachRow'));
select * from url('http://127.0.0.1:8123?query=select+12', 'RawBLOB', headers('X-ClickHouse-Format'='JSONEachRow', 'X-ClickHouse-Database'='kek')); -- { serverError 86 }
select * from url('http://127.0.0.1:8123?query=select+12', 'RawBLOB', headers('X-ClickHouse-Format'='JSONEachRow', 'X-ClickHouse-Database'=1)); -- { serverError 36 }
drop table if exists url;