Partitioned write part 2

This commit is contained in:
kssenii 2021-10-26 12:31:01 +03:00
parent 1d743b9259
commit 2ba3ee830e
14 changed files with 283 additions and 191 deletions

View File

@ -259,6 +259,7 @@ void URLBasedDataSourceConfiguration::set(const URLBasedDataSourceConfiguration
format = conf.format;
compression_method = conf.compression_method;
structure = conf.structure;
method = conf.method;
}
@ -286,6 +287,18 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
{
configuration.url = config.getString(config_prefix + ".url", "");
}
else if (key == "method")
{
configuration.method = config.getString(config_prefix + ".method", "");
}
else if (key == "format")
{
configuration.format = config.getString(config_prefix + ".format", "");
}
else if (key == "structure")
{
configuration.structure = config.getString(config_prefix + ".structure", "");
}
else if (key == "headers")
{
Poco::Util::AbstractConfiguration::Keys header_keys;
@ -319,6 +332,8 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
if (arg_name == "url")
configuration.url = arg_value.safeGet<String>();
if (arg_name == "method")
configuration.method = arg_value.safeGet<String>();
else if (arg_name == "format")
configuration.format = arg_value.safeGet<String>();
else if (arg_name == "compression_method")

View File

@ -93,6 +93,7 @@ struct URLBasedDataSourceConfiguration
String structure;
std::vector<std::pair<String, Field>> headers;
String method;
void set(const URLBasedDataSourceConfiguration & conf);
};

View File

@ -374,7 +374,6 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP
if (is_partitioned_implementation)
{
std::cerr << "partitioned implementation\n";
return std::make_shared<PartitionedHDFSSink>(
insert_query->partition_by,
uri,
@ -385,7 +384,6 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP
}
else
{
std::cerr << "non partitioned implementation\n";
return std::make_shared<HDFSSink>(uri,
format_name,
metadata_snapshot->getSampleBlock(),

View File

@ -159,9 +159,8 @@ StorageExternalDistributed::StorageExternalDistributed(
}
else
{
Poco::URI uri(url_description);
shard = std::make_shared<StorageURL>(
uri, table_id, format_name, format_settings, columns, constraints, String{}, context, compression_method);
url_description, table_id, format_name, format_settings, columns, constraints, String{}, context, compression_method);
LOG_DEBUG(&Poco::Logger::get("StorageURLDistributed"), "Adding URL: {}", url_description);
}

View File

@ -22,6 +22,7 @@
#include <Storages/StorageFactory.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/PartitionedSink.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadHelpers.h>
@ -353,7 +354,7 @@ private:
};
class PartitionedStorageS3Sink : public SinkToStorage
class PartitionedStorageS3Sink : public PartitionedSink
{
public:
PartitionedStorageS3Sink(
@ -368,7 +369,7 @@ public:
const String & key_,
size_t min_upload_part_size_,
size_t max_single_part_upload_size_)
: SinkToStorage(sample_block_)
: PartitionedSink(partition_by, context_, sample_block_)
, format(format_)
, sample_block(sample_block_)
, context(context_)
@ -380,74 +381,36 @@ public:
, max_single_part_upload_size(max_single_part_upload_size_)
, format_settings(format_settings_)
{
std::vector<ASTPtr> arguments(1, partition_by);
ASTPtr partition_by_string = makeASTFunction(FunctionToString::name, std::move(arguments));
auto syntax_result = TreeRewriter(context).analyze(partition_by_string, sample_block.getNamesAndTypesList());
partition_by_expr = ExpressionAnalyzer(partition_by_string, syntax_result, context).getActions(false);
partition_by_column_name = partition_by_string->getColumnName();
}
String getName() const override { return "PartitionedStorageS3Sink"; }
void consume(Chunk chunk) override
SinkPtr createSinkForPartition(const String & partition_id) override
{
const auto & columns = chunk.getColumns();
auto partition_bucket = replaceWildcards(bucket, partition_id);
validateBucket(partition_bucket);
Block block_with_partition_by_expr = sample_block.cloneWithoutColumns();
block_with_partition_by_expr.setColumns(columns);
partition_by_expr->execute(block_with_partition_by_expr);
auto partition_key = replaceWildcards(key, partition_id);
validateKey(partition_key);
const auto * column = block_with_partition_by_expr.getByName(partition_by_column_name).column.get();
std::unordered_map<String, size_t> sub_chunks_indices;
IColumn::Selector selector;
for (size_t row = 0; row < chunk.getNumRows(); ++row)
{
auto value = column->getDataAt(row);
auto [it, inserted] = sub_chunks_indices.emplace(value, sub_chunks_indices.size());
selector.push_back(it->second);
}
Chunks sub_chunks;
sub_chunks.reserve(sub_chunks_indices.size());
for (size_t column_index = 0; column_index < columns.size(); ++column_index)
{
MutableColumns column_sub_chunks = columns[column_index]->scatter(sub_chunks_indices.size(), selector);
if (column_index == 0) /// Set sizes for sub-chunks.
{
for (const auto & column_sub_chunk : column_sub_chunks)
{
sub_chunks.emplace_back(Columns(), column_sub_chunk->size());
}
}
for (size_t sub_chunk_index = 0; sub_chunk_index < column_sub_chunks.size(); ++sub_chunk_index)
{
sub_chunks[sub_chunk_index].addColumn(std::move(column_sub_chunks[sub_chunk_index]));
}
}
for (const auto & [partition_id, sub_chunk_index] : sub_chunks_indices)
{
getSinkForPartition(partition_id)->consume(std::move(sub_chunks[sub_chunk_index]));
}
}
void onFinish() override
{
for (auto & [partition_id, sink] : sinks)
{
sink->onFinish();
}
return std::make_shared<StorageS3Sink>(
format,
sample_block,
context,
format_settings,
compression_method,
client,
partition_bucket,
partition_key,
min_upload_part_size,
max_single_part_upload_size
);
}
private:
using SinkPtr = std::shared_ptr<StorageS3Sink>;
const String format;
const Block sample_block;
ContextPtr context;
const CompressionMethod compression_method;
std::shared_ptr<Aws::S3::S3Client> client;
const String bucket;
const String key;
@ -458,41 +421,6 @@ private:
ExpressionActionsPtr partition_by_expr;
String partition_by_column_name;
std::unordered_map<String, SinkPtr> sinks;
static String replaceWildcards(const String & haystack, const String & partition_id)
{
return boost::replace_all_copy(haystack, PARTITION_ID_WILDCARD, partition_id);
}
SinkPtr getSinkForPartition(const String & partition_id)
{
auto it = sinks.find(partition_id);
if (it == sinks.end())
{
auto partition_bucket = replaceWildcards(bucket, partition_id);
validateBucket(partition_bucket);
auto partition_key = replaceWildcards(key, partition_id);
validateKey(partition_key);
std::tie(it, std::ignore) = sinks.emplace(partition_id, std::make_shared<StorageS3Sink>(
format,
sample_block,
context,
format_settings,
compression_method,
client,
partition_bucket,
partition_key,
min_upload_part_size,
max_single_part_upload_size
));
}
return it->second;
}
static void validateBucket(const String & str)
{
S3::URI::validateBucket(str, {});
@ -517,21 +445,6 @@ private:
validatePartitionKey(str, true);
}
static void validatePartitionKey(const StringRef & str, bool allow_slash)
{
for (const char * i = str.data; i != str.data + str.size; ++i)
{
if (static_cast<UInt8>(*i) < 0x20 || *i == '{' || *i == '}' || *i == '*' || *i == '?' || (!allow_slash && *i == '/'))
{
/// Need to convert to UInt32 because UInt8 can't be passed to format due to "mixing character types is disallowed".
UInt32 invalid_char_byte = static_cast<UInt32>(static_cast<UInt8>(*i));
throw DB::Exception(
ErrorCodes::CANNOT_PARSE_TEXT, "Illegal character '\\x{:02x}' in partition id starting with '{}'",
invalid_char_byte, StringRef(str.data, i - str.data));
}
}
}
};

View File

@ -4,6 +4,7 @@
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTInsertQuery.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromHTTP.h>
@ -15,7 +16,9 @@
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
#include <Common/parseRemoteDescription.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Storages/PartitionedSink.h>
#include <Poco/Net/HTTPRequest.h>
#include <Processors/Sources/SourceWithProgress.h>
@ -36,7 +39,7 @@ namespace ErrorCodes
IStorageURLBase::IStorageURLBase(
const Poco::URI & uri_,
const String & uri_,
ContextPtr /*context_*/,
const StorageID & table_id_,
const String & format_name_,
@ -45,8 +48,15 @@ IStorageURLBase::IStorageURLBase(
const ConstraintsDescription & constraints_,
const String & comment,
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
: IStorage(table_id_), uri(uri_), compression_method(compression_method_), format_name(format_name_), format_settings(format_settings_), headers(headers_)
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_,
const String & method_)
: IStorage(table_id_)
, uri(uri_)
, compression_method(compression_method_)
, format_name(format_name_)
, format_settings(format_settings_)
, headers(headers_)
, method(method_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -80,13 +90,14 @@ namespace
return headers;
}
class StorageURLSource : public SourceWithProgress
{
using URIParams = std::vector<std::pair<String, String>>;
public:
StorageURLSource(
const std::vector<Poco::URI> & uri_options,
const std::vector<String> & uri_options,
const std::string & method,
std::function<void(std::ostream &)> callback,
const String & format,
@ -109,7 +120,7 @@ namespace
WriteBufferFromOwnString error_message;
for (auto option = uri_options.begin(); option < uri_options.end(); ++option)
{
auto request_uri = *option;
auto request_uri = Poco::URI(*option);
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
@ -136,7 +147,7 @@ namespace
if (option == uri_options.end() - 1)
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri options are unreachable. {}", error_message.str());
error_message << option->toString() << " error: " << getCurrentExceptionMessage(false) << "\n";
error_message << *option << " error: " << getCurrentExceptionMessage(false) << "\n";
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -192,17 +203,18 @@ namespace
}
StorageURLSink::StorageURLSink(
const Poco::URI & uri,
const String & uri,
const String & format,
const std::optional<FormatSettings> & format_settings,
const Block & sample_block,
ContextPtr context,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
const CompressionMethod compression_method,
const String & method)
: SinkToStorage(sample_block)
{
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts),
std::make_unique<WriteBufferFromHTTP>(Poco::URI(uri), method, timeouts),
compression_method, 3);
writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block,
context, {} /* write callback */, format_settings);
@ -227,6 +239,50 @@ void StorageURLSink::onFinish()
write_buf->finalize();
}
class PartitionedStorageURLSink : public PartitionedSink
{
public:
PartitionedStorageURLSink(
const ASTPtr & partition_by,
const String & uri_,
const String & format_,
const std::optional<FormatSettings> & format_settings_,
const Block & sample_block_,
ContextPtr context_,
const ConnectionTimeouts & timeouts_,
const CompressionMethod compression_method_,
const String & method_)
: PartitionedSink(partition_by, context_, sample_block_)
, uri(uri_)
, format(format_)
, format_settings(format_settings_)
, sample_block(sample_block_)
, context(context_)
, timeouts(timeouts_)
, compression_method(compression_method_)
, method(method_)
{
}
SinkPtr createSinkForPartition(const String & partition_id) override
{
auto partition_path = PartitionedSink::replaceWildcards(uri, partition_id);
context->getRemoteHostFilter().checkURL(Poco::URI(partition_path));
return std::make_shared<StorageURLSink>(partition_path, format,
format_settings, sample_block, context, timeouts, compression_method, method);
}
private:
const String uri;
const String format;
const std::optional<FormatSettings> format_settings;
const Block sample_block;
ContextPtr context;
const ConnectionTimeouts timeouts;
const CompressionMethod compression_method;
const String method;
};
std::string IStorageURLBase::getReadMethod() const
{
@ -266,22 +322,57 @@ Pipe IStorageURLBase::read(
unsigned /*num_streams*/)
{
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size);
std::vector<Poco::URI> uri_options{uri};
return Pipe(std::make_shared<StorageURLSource>(
uri_options,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
local_context,
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));
auto with_globs = (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) || uri.find('|') == std::string::npos;
if (with_globs)
{
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
std::vector<String> url_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
std::vector<String> uri_options;
Pipes pipes;
for (const auto & url_description : url_descriptions)
{
/// For each uri (which acts like shard) check if it has failover options
uri_options = parseRemoteDescription(url_description, 0, url_description.size(), '|', max_addresses);
StoragePtr shard;
pipes.emplace_back(std::make_shared<StorageURLSource>(
uri_options,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
local_context,
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));
}
return Pipe::unitePipes(std::move(pipes));
}
else
{
std::vector<String> uri_options{uri};
return Pipe(std::make_shared<StorageURLSource>(
uri_options,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
local_context,
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));
}
}
@ -315,16 +406,35 @@ Pipe StorageURLWithFailover::read(
}
SinkToStoragePtr IStorageURLBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
return std::make_shared<StorageURLSink>(uri, format_name,
format_settings, metadata_snapshot->getSampleBlock(), context,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri.toString(), compression_method));
if (method.empty())
method = Poco::Net::HTTPRequest::HTTP_POST;
bool has_wildcards = uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
bool is_partitioned_implementation = insert_query && insert_query->partition_by && has_wildcards;
if (is_partitioned_implementation)
{
return std::make_shared<PartitionedStorageURLSink>(
insert_query->partition_by,
uri, format_name,
format_settings, metadata_snapshot->getSampleBlock(), context,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri, compression_method), method);
}
else
{
return std::make_shared<StorageURLSink>(uri, format_name,
format_settings, metadata_snapshot->getSampleBlock(), context,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri, compression_method), method);
}
}
StorageURL::StorageURL(
const Poco::URI & uri_,
const String & uri_,
const StorageID & table_id_,
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
@ -333,10 +443,11 @@ StorageURL::StorageURL(
const String & comment,
ContextPtr context_,
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
: IStorageURLBase(uri_, context_, table_id_, format_name_, format_settings_, columns_, constraints_, comment, compression_method_, headers_)
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_,
const String & method_)
: IStorageURLBase(uri_, context_, table_id_, format_name_, format_settings_, columns_, constraints_, comment, compression_method_, headers_, method_)
{
context_->getRemoteHostFilter().checkURL(uri);
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
}
@ -349,13 +460,13 @@ StorageURLWithFailover::StorageURLWithFailover(
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_)
: StorageURL(Poco::URI(), table_id_, format_name_, format_settings_, columns_, constraints_, String{}, context_, compression_method_)
: StorageURL("", table_id_, format_name_, format_settings_, columns_, constraints_, String{}, context_, compression_method_)
{
for (const auto & uri_option : uri_options_)
{
Poco::URI poco_uri(uri_option);
context_->getRemoteHostFilter().checkURL(poco_uri);
uri_options.emplace_back(std::move(poco_uri));
uri_options.emplace_back(std::move(uri_option));
LOG_DEBUG(&Poco::Logger::get("StorageURLDistributed"), "Adding URL option: {}", uri_option);
}
}
@ -405,6 +516,13 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
if (!configuration.method.empty()
&& configuration.method != Poco::Net::HTTPRequest::HTTP_POST
&& configuration.method != Poco::Net::HTTPRequest::HTTP_PUT)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Method can be POST or PUT (current: {}). For insert default is POST, for select GET",
configuration.method);
if (!storage_specific_args.empty())
{
String illegal_args;
@ -421,7 +539,8 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
{
if (args.size() != 2 && args.size() != 3)
throw Exception(
"Storage URL requires 2 or 3 arguments: url, name of used format and optional compression method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
"Storage URL requires 2 or 3 arguments: url, name of used format and optional compression method.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, local_context);
@ -443,7 +562,6 @@ void registerStorageURL(StorageFactory & factory)
ASTs & engine_args = args.engine_args;
auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext());
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
Poco::URI uri(configuration.url);
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
@ -453,7 +571,7 @@ void registerStorageURL(StorageFactory & factory)
}
return StorageURL::create(
uri,
configuration.url,
args.table_id,
configuration.format,
format_settings,
@ -462,7 +580,7 @@ void registerStorageURL(StorageFactory & factory)
args.comment,
args.getContext(),
configuration.compression_method,
headers);
headers, configuration.method);
},
{
.supports_settings = true,

View File

@ -39,9 +39,11 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
bool supportsPartitionBy() const override { return true; }
protected:
IStorageURLBase(
const Poco::URI & uri_,
const String & uri_,
ContextPtr context_,
const StorageID & id_,
const String & format_name_,
@ -50,9 +52,10 @@ protected:
const ConstraintsDescription & constraints_,
const String & comment,
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {});
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
const String & method_ = "");
Poco::URI uri;
String uri;
String compression_method;
String format_name;
// For URL engine, we use format settings from server context + `SETTINGS`
@ -61,6 +64,7 @@ protected:
// In this case, format_settings is not set.
std::optional<FormatSettings> format_settings;
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
String method; /// For insert can choose Put instead of default Post.
virtual std::string getReadMethod() const;
@ -88,13 +92,14 @@ class StorageURLSink : public SinkToStorage
{
public:
StorageURLSink(
const Poco::URI & uri,
const String & uri,
const String & format,
const std::optional<FormatSettings> & format_settings,
const Block & sample_block,
ContextPtr context,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method);
CompressionMethod compression_method,
const String & method = Poco::Net::HTTPRequest::HTTP_POST);
std::string getName() const override { return "StorageURLSink"; }
void consume(Chunk chunk) override;
@ -112,7 +117,7 @@ class StorageURL : public shared_ptr_helper<StorageURL>, public IStorageURLBase
friend struct shared_ptr_helper<StorageURL>;
public:
StorageURL(
const Poco::URI & uri_,
const String & uri_,
const StorageID & table_id_,
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
@ -121,7 +126,8 @@ public:
const String & comment,
ContextPtr context_,
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {});
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
const String & method_ = "");
String getName() const override
{
@ -170,6 +176,6 @@ public:
};
private:
std::vector<Poco::URI> uri_options;
std::vector<String> uri_options;
};
}

View File

@ -33,7 +33,7 @@ StorageXDBC::StorageXDBC(
const BridgeHelperPtr bridge_helper_)
/// Please add support for constraints as soon as StorageODBC or JDBC will support insertion.
: IStorageURLBase(
Poco::URI(),
"",
context_,
table_id_,
IXDBCBridgeHelper::DEFAULT_FORMAT,
@ -47,7 +47,7 @@ StorageXDBC::StorageXDBC(
, remote_table_name(remote_table_name_)
, log(&Poco::Logger::get("Storage" + bridge_helper->getName()))
{
uri = bridge_helper->getMainURI();
poco_uri = bridge_helper->getMainURI();
}
std::string StorageXDBC::getReadMethod() const
@ -118,7 +118,7 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetad
{
bridge_helper->startBridgeSync();
Poco::URI request_uri = uri;
Poco::URI request_uri = poco_uri;
request_uri.setPath("/write");
auto url_params = bridge_helper->getURLParams(65536);
@ -131,13 +131,13 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetad
request_uri.addQueryParameter("sample_block", metadata_snapshot->getSampleBlock().getNamesAndTypesList().toString());
return std::make_shared<StorageURLSink>(
request_uri,
request_uri.toString(),
format_name,
getFormatSettings(local_context),
metadata_snapshot->getSampleBlock(),
local_context,
ConnectionTimeouts::getHTTPTimeouts(local_context),
chooseCompressionMethod(uri.toString(), compression_method));
chooseCompressionMethod(poco_uri.toString(), compression_method));
}
Block StorageXDBC::getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const

View File

@ -41,6 +41,7 @@ public:
std::string getName() const override;
private:
Poco::URI poco_uri;
BridgeHelperPtr bridge_helper;
std::string remote_database_name;

View File

@ -24,12 +24,18 @@ void TableFunctionURL::parseArguments(const ASTPtr & ast_function, ContextPtr co
if (!func_args.arguments)
throw Exception("Table function 'URL' must have arguments.", ErrorCodes::BAD_ARGUMENTS);
URLBasedDataSourceConfiguration configuration;
if (auto with_named_collection = getURLBasedDataSourceConfiguration(func_args.arguments->children, context))
{
auto [common_configuration, storage_specific_args] = with_named_collection.value();
configuration.set(common_configuration);
if (!configuration.method.empty()
&& configuration.method != Poco::Net::HTTPRequest::HTTP_POST
&& configuration.method != Poco::Net::HTTPRequest::HTTP_PUT)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Method can be POST or PUT (current: {}). For insert default is POST, for select GET",
configuration.method);
if (!storage_specific_args.empty())
{
String illegal_args;
@ -58,33 +64,25 @@ StoragePtr TableFunctionURL::getStorage(
const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context,
const std::string & table_name, const String & compression_method_) const
{
/// If url contains {1..k} or failover options with separator `|`, use a separate storage
if ((source.find('{') == std::string::npos || source.find('}') == std::string::npos) && source.find('|') == std::string::npos)
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
{
Poco::URI uri(source);
return StorageURL::create(
uri,
StorageID(getDatabaseName(), table_name),
format_,
std::nullopt /*format settings*/,
columns,
ConstraintsDescription{},
String{},
global_context,
compression_method_);
}
else
{
return StorageExternalDistributed::create(
source,
StorageID(getDatabaseName(), table_name),
format_,
std::nullopt,
compression_method_,
columns,
ConstraintsDescription{},
global_context);
auto value_literal = value.safeGet<String>();
headers.emplace_back(std::make_pair(header, value_literal));
}
return StorageURL::create(
source,
StorageID(getDatabaseName(), table_name),
format_,
std::nullopt /*format settings*/,
columns,
ConstraintsDescription{},
String{},
global_context,
compression_method_,
headers,
configuration.method);
}
void registerTableFunctionURL(TableFunctionFactory & factory)

View File

@ -1,6 +1,7 @@
#pragma once
#include <TableFunctions/ITableFunctionFileLike.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB
@ -27,6 +28,8 @@ private:
const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context,
const std::string & table_name, const String & compression_method_) const override;
const char * getStorageTypeName() const override { return "URL"; }
URLBasedDataSourceConfiguration configuration;
};
}

View File

@ -0,0 +1,11 @@
<?xml version="1.0"?>
<clickhouse>
<named_collections>
<url1>
<url>http://nginx:80/test_{_partition_id}</url>
<method>PUT</method>
<format>TSV</format>
<structure>column1 UInt32, column2 UInt32, column3 UInt32</structure>
</url1>
</named_collections>
</clickhouse>

View File

@ -0,0 +1,29 @@
import pytest
from helpers.cluster import ClickHouseCluster
uuids = []
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node1", main_configs=["configs/conf.xml"], with_nginx=True)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_partition_by(cluster):
node1 = cluster.instances["node1"]
node1.query(f"insert into table function url(url1) partition by column3 values (1, 2, 3), (3, 2, 1), (1, 3, 2)")
result = node1.query(f"select * from url('http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')")
assert(result.strip() == "3\t2\t1")
result = node1.query(f"select * from url('http://nginx:80/test_2', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')")
assert(result.strip() == "1\t3\t2")
result = node1.query(f"select * from url('http://nginx:80/test_3', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')")
assert(result.strip() == "1\t2\t3")