ClickHouse/src/Storages/StorageURL.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

1161 lines
42 KiB
C++
Raw Normal View History

2018-06-11 12:13:00 +00:00
#include <Storages/StorageURL.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Storages/PartitionedSink.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/NamedCollectionsHelpers.h>
2018-06-11 12:13:00 +00:00
#include <Interpreters/evaluateConstantExpression.h>
2022-03-30 10:49:37 +00:00
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Parsers/ASTCreateQuery.h>
2021-10-26 09:31:01 +00:00
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
2022-06-17 12:53:16 +00:00
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
2018-06-11 12:13:00 +00:00
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
2022-03-09 13:57:33 +00:00
#include <IO/IOThreadPool.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
2018-06-11 12:13:00 +00:00
#include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h>
2021-10-13 18:22:02 +00:00
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Common/ThreadStatus.h>
2021-10-26 09:31:01 +00:00
#include <Common/parseRemoteDescription.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadWriteBufferFromHTTP.h>
2018-06-11 12:13:00 +00:00
#include <algorithm>
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/QueryPipelineBuilder.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
#include <Poco/Net/HTTPRequest.h>
#include <regex>
2018-06-11 12:13:00 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
2021-04-21 14:36:04 +00:00
extern const int NETWORK_ERROR;
2021-09-10 11:11:52 +00:00
extern const int BAD_ARGUMENTS;
2022-02-08 20:58:03 +00:00
extern const int LOGICAL_ERROR;
}
2018-06-11 12:13:00 +00:00
2022-06-17 12:53:16 +00:00
static constexpr auto bad_arguments_error_message = "Storage URL requires 1-4 arguments: "
"url, name of used format (taken from file extension by default), "
"optional compression method, optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
2021-09-10 11:11:52 +00:00
static const std::unordered_set<std::string_view> required_configuration_keys = {
"url",
};
static const std::unordered_set<std::string_view> optional_configuration_keys = {
"format",
"compression",
"compression_method",
"structure",
"filename",
2022-12-19 12:56:23 +00:00
"method",
"http_method",
2022-12-16 23:34:29 +00:00
"description",
"headers.header.name",
"headers.header.value",
};
/// Headers in config file will have structure "headers.header.name" and "headers.header.value".
/// But Poco::AbstractConfiguration converts them into "header", "header[1]", "header[2]".
static const std::vector<std::regex> optional_regex_keys = {
std::regex("headers.header\\[[\\d]*\\].name"),
std::regex("headers.header\\[[\\d]*\\].value"),
};
2022-02-07 19:40:47 +00:00
static bool urlWithGlobs(const String & uri)
{
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) || uri.find('|') != std::string::npos;
2022-02-07 19:40:47 +00:00
}
2019-08-24 21:20:20 +00:00
IStorageURLBase::IStorageURLBase(
2021-10-26 09:31:01 +00:00
const String & uri_,
ContextPtr context_,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_,
2018-06-11 12:13:00 +00:00
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
2019-08-24 21:20:20 +00:00
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
2021-04-23 12:18:23 +00:00
const String & comment,
2021-09-07 11:17:25 +00:00
const String & compression_method_,
const HTTPHeaderEntries & headers_,
2021-10-28 12:44:12 +00:00
const String & http_method_,
2021-10-26 12:22:13 +00:00
ASTPtr partition_by_)
2021-10-26 09:31:01 +00:00
: IStorage(table_id_)
, uri(uri_)
, compression_method(chooseCompressionMethod(Poco::URI(uri_).getPath(), compression_method_))
2021-10-26 09:31:01 +00:00
, format_name(format_name_)
, format_settings(format_settings_)
, headers(headers_)
2021-10-28 12:44:12 +00:00
, http_method(http_method_)
2021-10-26 12:22:13 +00:00
, partition_by(partition_by_)
2018-06-11 12:13:00 +00:00
{
2022-05-23 12:48:48 +00:00
FormatFactory::instance().checkFormatName(format_name);
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata;
2022-02-08 09:59:20 +00:00
if (columns_.empty())
{
auto columns = getTableStructureFromData(format_name, uri, compression_method, headers, format_settings, context_);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
2022-02-07 19:40:47 +00:00
2020-06-19 15:39:41 +00:00
storage_metadata.setConstraints(constraints_);
2021-04-23 12:18:23 +00:00
storage_metadata.setComment(comment);
2020-06-19 15:39:41 +00:00
setInMemoryMetadata(storage_metadata);
2018-06-11 12:13:00 +00:00
}
2018-06-11 12:13:00 +00:00
namespace
{
HTTPHeaderEntries getHeaders(const HTTPHeaderEntries & headers_)
2021-10-03 13:53:24 +00:00
{
HTTPHeaderEntries headers(headers_.begin(), headers_.end());
2022-03-16 08:45:17 +00:00
2021-10-03 13:53:24 +00:00
// Propagate OpenTelemetry trace context, if any, downstream.
const auto &current_trace_context = OpenTelemetry::CurrentContext();
2022-03-16 08:45:17 +00:00
if (current_trace_context.isTraceEnabled())
2021-10-03 13:53:24 +00:00
{
2022-07-07 09:44:19 +00:00
headers.emplace_back("traceparent", current_trace_context.composeTraceparentHeader());
2021-10-03 13:53:24 +00:00
2022-03-16 08:45:17 +00:00
if (!current_trace_context.tracestate.empty())
{
2022-07-07 09:44:19 +00:00
headers.emplace_back("tracestate", current_trace_context.tracestate);
2021-10-03 13:53:24 +00:00
}
}
2022-03-16 08:45:17 +00:00
2021-10-03 13:53:24 +00:00
return headers;
}
2021-10-26 09:31:01 +00:00
2022-05-20 19:49:31 +00:00
class StorageURLSource : public ISource
2018-06-11 12:13:00 +00:00
{
using URIParams = std::vector<std::pair<String, String>>;
2021-10-03 13:53:24 +00:00
2018-06-11 12:13:00 +00:00
public:
2021-12-17 11:03:37 +00:00
struct URIInfo
{
using FailoverOptions = std::vector<String>;
std::vector<FailoverOptions> uri_list_to_read;
std::atomic<size_t> next_uri_to_read = 0;
};
using URIInfoPtr = std::shared_ptr<URIInfo>;
2021-12-27 19:42:56 +00:00
void onCancel() override
{
std::lock_guard lock(reader_mutex);
2021-12-27 19:42:56 +00:00
if (reader)
reader->cancel();
}
2022-02-08 11:57:35 +00:00
static void setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri)
{
const auto & user_info = request_uri.getUserInfo();
if (!user_info.empty())
{
std::size_t n = user_info.find(':');
if (n != std::string::npos)
{
credentials.setUsername(user_info.substr(0, n));
credentials.setPassword(user_info.substr(n + 1));
}
}
}
2021-10-03 13:53:24 +00:00
StorageURLSource(
2021-12-17 11:03:37 +00:00
URIInfoPtr uri_info_,
2021-10-28 12:44:12 +00:00
const std::string & http_method,
std::function<void(std::ostream &)> callback,
2018-06-11 12:13:00 +00:00
const String & format,
const std::optional<FormatSettings> & format_settings,
String name_,
2018-06-11 12:13:00 +00:00
const Block & sample_block,
ContextPtr context,
2020-10-02 12:38:50 +00:00
const ColumnsDescription & columns,
2019-02-10 16:55:12 +00:00
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
size_t download_threads,
const HTTPHeaderEntries & headers_ = {},
2022-02-07 19:40:47 +00:00
const URIParams & params = {},
bool glob_url = false)
2022-05-20 19:49:31 +00:00
: ISource(sample_block), name(std::move(name_)), uri_info(uri_info_)
2018-06-11 12:13:00 +00:00
{
2021-10-03 13:53:24 +00:00
auto headers = getHeaders(headers_);
2021-12-17 11:03:37 +00:00
2021-10-03 13:53:24 +00:00
/// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline.
2021-12-17 11:03:37 +00:00
initialize = [=, this](const URIInfo::FailoverOptions & uri_options)
2020-08-28 01:21:08 +00:00
{
2022-02-08 11:57:35 +00:00
if (uri_options.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list");
2022-02-09 16:14:14 +00:00
auto first_option = uri_options.begin();
read_buf = getFirstAvailableURLReadBuffer(
first_option,
uri_options.end(),
context,
params,
http_method,
callback,
timeouts,
compression_method,
credentials,
headers,
glob_url,
uri_options.size() == 1,
download_threads);
auto input_format
= FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
2021-10-03 04:28:28 +00:00
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
builder.addSimpleTransform(
[&](const Block & cur_header)
{ return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context); });
2021-07-20 18:18:43 +00:00
2022-05-24 20:06:08 +00:00
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
2021-10-03 04:28:28 +00:00
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
};
2018-06-11 12:13:00 +00:00
}
String getName() const override { return name; }
2018-06-11 12:13:00 +00:00
Chunk generate() override
2018-06-11 12:13:00 +00:00
{
2021-12-17 11:03:37 +00:00
while (true)
2021-10-03 04:28:28 +00:00
{
2021-12-17 11:03:37 +00:00
if (!reader)
{
auto current_uri_pos = uri_info->next_uri_to_read.fetch_add(1);
if (current_uri_pos >= uri_info->uri_list_to_read.size())
return {};
2018-06-11 12:13:00 +00:00
2021-12-17 11:03:37 +00:00
auto current_uri = uri_info->uri_list_to_read[current_uri_pos];
2022-01-31 11:03:06 +00:00
std::lock_guard lock(reader_mutex);
2021-12-17 11:03:37 +00:00
initialize(current_uri);
}
2018-06-11 12:13:00 +00:00
2021-12-17 11:03:37 +00:00
Chunk chunk;
2022-04-06 06:39:56 +00:00
std::lock_guard lock(reader_mutex);
2021-12-17 11:03:37 +00:00
if (reader->pull(chunk))
return chunk;
2022-04-06 06:39:56 +00:00
pipeline->reset();
reader.reset();
2021-12-17 11:03:37 +00:00
}
2018-06-11 12:13:00 +00:00
}
2022-02-07 19:40:47 +00:00
static std::unique_ptr<ReadBuffer> getFirstAvailableURLReadBuffer(
2022-02-09 16:14:14 +00:00
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
2022-02-07 19:40:47 +00:00
ContextPtr context,
const URIParams & params,
const String & http_method,
std::function<void(std::ostream &)> callback,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
2022-02-08 09:59:20 +00:00
Poco::Net::HTTPBasicCredentials & credentials,
const HTTPHeaderEntries & headers,
2022-02-09 16:14:14 +00:00
bool glob_url,
bool delay_initialization,
size_t download_threads)
2022-02-07 19:40:47 +00:00
{
String first_exception_message;
2022-02-08 11:57:35 +00:00
ReadSettings read_settings = context->getReadSettings();
2022-02-07 19:40:47 +00:00
2022-02-09 16:14:14 +00:00
size_t options = std::distance(option, end);
for (; option != end; ++option)
2022-02-07 19:40:47 +00:00
{
2022-02-09 16:14:14 +00:00
bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs && option == std::prev(end);
2022-02-08 11:57:35 +00:00
auto request_uri = Poco::URI(*option);
2022-02-07 19:40:47 +00:00
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
2022-02-08 11:57:35 +00:00
setCredentials(credentials, request_uri);
2022-03-11 13:38:19 +00:00
const auto settings = context->getSettings();
int zstd_window_log_max = static_cast<int>(settings.zstd_window_log_max);
2022-02-07 19:40:47 +00:00
try
{
if (download_threads > 1)
2022-03-04 14:21:52 +00:00
{
2022-03-11 13:38:19 +00:00
try
2022-03-04 14:21:52 +00:00
{
2022-03-11 13:38:19 +00:00
ReadWriteBufferFromHTTP buffer(
request_uri,
Poco::Net::HTTPRequest::HTTP_HEAD,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
2022-11-03 11:10:49 +00:00
settings.max_read_buffer_size,
2022-03-11 13:38:19 +00:00
read_settings,
headers,
2022-03-16 11:44:07 +00:00
ReadWriteBufferFromHTTP::Range{0, std::nullopt},
&context->getRemoteHostFilter(),
2022-03-11 13:38:19 +00:00
true,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
Poco::Net::HTTPResponse res;
2022-03-16 14:59:06 +00:00
for (size_t i = 0; i < settings.http_max_tries; ++i)
{
try
{
buffer.callWithRedirects(res, Poco::Net::HTTPRequest::HTTP_HEAD, true);
break;
}
2022-03-17 08:03:22 +00:00
catch (const Poco::Exception & e)
2022-03-16 14:59:06 +00:00
{
2022-03-17 08:03:22 +00:00
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
"HTTP HEAD request to `{}` failed at try {}/{}. "
"Error: {}.",
request_uri.toString(),
i + 1,
settings.http_max_tries,
e.displayText());
2022-03-16 14:59:06 +00:00
if (!ReadWriteBufferFromHTTP::isRetriableError(res.getStatus()))
{
throw;
}
}
}
2022-03-11 13:38:19 +00:00
// to check if Range header is supported, we need to send a request with it set
2022-03-17 10:08:42 +00:00
const bool supports_ranges = (res.has("Accept-Ranges") && res.get("Accept-Ranges") == "bytes")
|| (res.has("Content-Range") && res.get("Content-Range").starts_with("bytes"));
2022-03-11 13:38:19 +00:00
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
fmt::runtime(supports_ranges ? "HTTP Range is supported" : "HTTP Range is not supported"));
2022-03-16 14:59:06 +00:00
if (supports_ranges && res.getStatus() == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT
&& res.hasContentLength())
2022-03-11 13:38:19 +00:00
{
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
"Using ParallelReadBuffer with {} workers with chunks of {} bytes",
download_threads,
2022-03-11 13:38:19 +00:00
settings.max_download_buffer_size);
auto read_buffer_factory = std::make_unique<RangedReadWriteBufferFromHTTPFactory>(
res.getContentLength(),
settings.max_download_buffer_size,
request_uri,
http_method,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
2022-11-03 11:10:49 +00:00
settings.max_read_buffer_size,
2022-03-11 13:38:19 +00:00
read_settings,
headers,
&context->getRemoteHostFilter(),
2022-03-11 13:38:19 +00:00
delay_initialization,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
2022-03-17 10:08:42 +00:00
2022-03-11 13:38:19 +00:00
return wrapReadBufferWithCompressionMethod(
std::make_unique<ParallelReadBuffer>(
2022-03-17 10:08:42 +00:00
std::move(read_buffer_factory),
2022-09-07 15:44:29 +00:00
threadPoolCallbackRunner<void>(IOThreadPool::get(), "URLParallelRead"),
2022-03-30 08:15:20 +00:00
download_threads),
compression_method,
zstd_window_log_max);
2022-03-11 13:38:19 +00:00
}
}
2022-03-17 08:03:22 +00:00
catch (const Poco::Exception & e)
2022-03-11 13:38:19 +00:00
{
LOG_TRACE(
2022-03-17 08:03:22 +00:00
&Poco::Logger::get("StorageURLSource"),
2022-03-16 14:59:06 +00:00
"Failed to setup ParallelReadBuffer because of an exception:\n{}.\nFalling back to the single-threaded "
"buffer",
2022-03-17 08:03:22 +00:00
e.displayText());
2022-03-04 14:21:52 +00:00
}
}
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "Using single-threaded read buffer");
2022-02-07 19:40:47 +00:00
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
http_method,
callback,
timeouts,
credentials,
2022-03-11 13:38:19 +00:00
settings.max_http_get_redirects,
2022-11-03 11:10:49 +00:00
settings.max_read_buffer_size,
2022-02-08 11:57:35 +00:00
read_settings,
2022-02-07 19:40:47 +00:00
headers,
ReadWriteBufferFromHTTP::Range{},
&context->getRemoteHostFilter(),
2022-02-09 16:14:14 +00:00
delay_initialization,
2022-03-11 13:38:19 +00:00
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error),
compression_method,
zstd_window_log_max);
2022-02-07 19:40:47 +00:00
}
catch (...)
{
if (first_exception_message.empty())
first_exception_message = getCurrentExceptionMessage(false);
2022-02-09 16:14:14 +00:00
if (options == 1)
2022-02-07 19:40:47 +00:00
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
2022-02-09 16:14:14 +00:00
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", options, first_exception_message);
2022-02-07 19:40:47 +00:00
}
2018-06-11 12:13:00 +00:00
private:
2021-12-17 11:03:37 +00:00
using InitializeFunc = std::function<void(const URIInfo::FailoverOptions &)>;
InitializeFunc initialize;
2021-10-03 04:28:28 +00:00
2018-06-11 12:13:00 +00:00
String name;
2021-12-17 11:03:37 +00:00
URIInfoPtr uri_info;
std::unique_ptr<ReadBuffer> read_buf;
2021-09-16 17:40:42 +00:00
std::unique_ptr<QueryPipeline> pipeline;
2021-07-20 18:18:43 +00:00
std::unique_ptr<PullingPipelineExecutor> reader;
/// onCancell and generate can be called concurrently and both of them
/// have R/W access to reader pointer.
std::mutex reader_mutex;
2022-02-08 09:59:20 +00:00
Poco::Net::HTTPBasicCredentials credentials;
2018-06-11 12:13:00 +00:00
};
}
2018-06-16 05:54:06 +00:00
2021-07-23 14:25:35 +00:00
StorageURLSink::StorageURLSink(
2021-10-26 09:31:01 +00:00
const String & uri,
2021-07-23 14:25:35 +00:00
const String & format,
const std::optional<FormatSettings> & format_settings,
const Block & sample_block,
ContextPtr context,
const ConnectionTimeouts & timeouts,
2021-10-26 09:31:01 +00:00
const CompressionMethod compression_method,
2021-10-28 12:44:12 +00:00
const String & http_method)
2021-07-23 14:25:35 +00:00
: SinkToStorage(sample_block)
2020-04-28 00:56:44 +00:00
{
std::string content_type = FormatFactory::instance().getContentType(format, context, format_settings);
2022-02-12 02:43:53 +00:00
std::string content_encoding = toContentEncodingName(compression_method);
2020-04-28 00:56:44 +00:00
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(Poco::URI(uri), http_method, content_type, content_encoding, timeouts),
compression_method,
3);
writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block, context, {} /* write callback */, format_settings);
2020-04-28 00:56:44 +00:00
}
2018-06-16 05:54:06 +00:00
2020-07-09 01:00:16 +00:00
2021-07-23 14:25:35 +00:00
void StorageURLSink::consume(Chunk chunk)
2020-07-09 01:00:16 +00:00
{
std::lock_guard lock(cancel_mutex);
if (cancelled)
return;
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
2020-07-09 01:00:16 +00:00
}
void StorageURLSink::onCancel()
{
std::lock_guard lock(cancel_mutex);
finalize();
cancelled = true;
}
void StorageURLSink::onException()
{
std::lock_guard lock(cancel_mutex);
finalize();
}
2021-07-23 14:25:35 +00:00
void StorageURLSink::onFinish()
2020-07-09 01:00:16 +00:00
{
std::lock_guard lock(cancel_mutex);
finalize();
}
void StorageURLSink::finalize()
{
if (!writer)
return;
Fix possible "Cannot write to finalized buffer" It is still possible to get this error since onException does not finalize format correctly. Here is an example of such error, that was found by CI [1]: <details> [ 2686 ] {fa01bf02-73f6-4f7f-b14f-e725de6d7f9b} <Fatal> : Logical error: 'Cannot write to finalized buffer'. [ 34577 ] {} <Fatal> BaseDaemon: ######################################## [ 34577 ] {} <Fatal> BaseDaemon: (version 22.6.1.1, build id: AB8040A6769E01A0) (from thread 2686) (query_id: fa01bf02-73f6-4f7f-b14f-e725de6d7f9b) (query: insert into test_02302 select number from numbers(10) settings s3_truncate_on_insert=1;) Received signal Aborted (6) [ 34577 ] {} <Fatal> BaseDaemon: [ 34577 ] {} <Fatal> BaseDaemon: Stack trace: 0x7fcbaa5a703b 0x7fcbaa586859 0xfad9bab 0xfad9e05 0xfaf6a3b 0x24a48c7f 0x258fb9b9 0x258f2004 0x258b88f4 0x258b863b 0x2581773d 0x258177ce 0x24bb5e98 0xfad01d6 0xfad0105 0x2419b11d 0xfad01d6 0xfad0105 0x2215afbb 0x2215aa48 0xfad01d6 0xfad0105 0xfcc265d 0x225cc546 0x249a1c40 0x249bc1b6 0x2685902c 0x26859505 0x269d7767 0x269d504c 0x7fcbaa75e609 0x7fcbaa683163 [ 34577 ] {} <Fatal> BaseDaemon: 3. raise @ 0x7fcbaa5a703b in ? [ 34577 ] {} <Fatal> BaseDaemon: 4. abort @ 0x7fcbaa586859 in ? [ 34577 ] {} <Fatal> BaseDaemon: 5. ./build_docker/../src/Common/Exception.cpp:47: DB::abortOnFailedAssertion(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfad9bab in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 6. ./build_docker/../src/Common/Exception.cpp:70: DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0xfad9e05 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 7. ./build_docker/../src/IO/WriteBuffer.h:0: DB::WriteBuffer::write(char const*, unsigned long) @ 0xfaf6a3b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 8. ./build_docker/../src/Processors/Formats/Impl/ArrowBufferedStreams.cpp:47: DB::ArrowBufferedOutputStream::Write(void const*, long) @ 0x24a48c7f in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 9. long parquet::ThriftSerializer::Serialize<parquet::format::FileMetaData>(parquet::format::FileMetaData const*, arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) @ 0x258fb9b9 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 10. parquet::FileMetaData::FileMetaDataImpl::WriteTo(arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) const @ 0x258f2004 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 11. parquet::WriteFileMetaData(parquet::FileMetaData const&, arrow::io::OutputStream*) @ 0x258b88f4 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 12. parquet::ParquetFileWriter::~ParquetFileWriter() @ 0x258b863b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 13. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x2581773d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 14. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x258177ce in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 15. ./build_docker/../src/Processors/Formats/Impl/ParquetBlockOutputFormat.h:27: DB::ParquetBlockOutputFormat::~ParquetBlockOutputFormat() @ 0x24bb5e98 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 16. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 17. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 18.1. inlined from ./build_docker/../contrib/libcxx/include/__memory/unique_ptr.h:312: std::__1::unique_ptr<DB::WriteBuffer, std::__1::default_delete<DB::WriteBuffer> >::reset(DB::WriteBuffer*) [ 34577 ] {} <Fatal> BaseDaemon: 18.2. inlined from ../contrib/libcxx/include/__memory/unique_ptr.h:269: ~unique_ptr [ 34577 ] {} <Fatal> BaseDaemon: 18. ../src/Storages/StorageS3.cpp:566: DB::StorageS3Sink::~StorageS3Sink() @ 0x2419b11d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 19. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 20. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 21. ./build_docker/../contrib/abseil-cpp/absl/container/internal/raw_hash_set.h:1662: absl::lts_20211102::container_internal::raw_hash_set<absl::lts_20211102::container_internal::FlatHashMapPolicy<StringRef, std::__1::shared_ptr<DB::SinkToStorage> >, absl::lts_20211102::hash_internal::Hash<StringRef>, std::__1::equal_to<StringRef>, std::__1::allocator<std::__1::pair<StringRef const, std::__1::shared_ptr<DB::SinkToStorage> > > >::destroy_slots() @ 0x2215afbb in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 22.1. inlined from ./build_docker/../contrib/libcxx/include/string:1445: std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >::__is_long() const [ 34577 ] {} <Fatal> BaseDaemon: 22.2. inlined from ../contrib/libcxx/include/string:2231: ~basic_string [ 34577 ] {} <Fatal> BaseDaemon: 22. ../src/Storages/PartitionedSink.h:14: DB::PartitionedSink::~PartitionedSink() @ 0x2215aa48 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 23. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 24. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 25. ./build_docker/../contrib/libcxx/include/vector:802: std::__1::vector<std::__1::shared_ptr<DB::IProcessor>, std::__1::allocator<std::__1::shared_ptr<DB::IProcessor> > >::__base_destruct_at_end(std::__1::shared_ptr<DB::IProcessor>*) @ 0xfcc265d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 26.1. inlined from ./build_docker/../contrib/libcxx/include/vector:402: ~vector [ 34577 ] {} <Fatal> BaseDaemon: 26.2. inlined from ../src/QueryPipeline/QueryPipeline.cpp:29: ~QueryPipeline [ 34577 ] {} <Fatal> BaseDaemon: 26. ../src/QueryPipeline/QueryPipeline.cpp:535: DB::QueryPipeline::reset() @ 0x225cc546 in /usr/bin/clickhouse [ 614 ] {} <Fatal> Application: Child process was terminated by signal 6. </details> [1]: https://s3.amazonaws.com/clickhouse-test-reports/37542/8a224239c1d922158b4dc9f5d6609dca836dfd06/stress_test__undefined__actions_.html Follow-up for: #36979 Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-30 10:13:47 +00:00
try
{
writer->finalize();
writer->flush();
write_buf->finalize();
}
catch (...)
{
/// Stop ParallelFormattingOutputFormat correctly.
writer.reset();
throw;
}
2020-07-09 01:00:16 +00:00
}
2021-10-26 09:31:01 +00:00
class PartitionedStorageURLSink : public PartitionedSink
{
public:
PartitionedStorageURLSink(
const ASTPtr & partition_by,
const String & uri_,
const String & format_,
const std::optional<FormatSettings> & format_settings_,
const Block & sample_block_,
ContextPtr context_,
const ConnectionTimeouts & timeouts_,
const CompressionMethod compression_method_,
2021-10-28 12:44:12 +00:00
const String & http_method_)
: PartitionedSink(partition_by, context_, sample_block_)
, uri(uri_)
, format(format_)
, format_settings(format_settings_)
, sample_block(sample_block_)
, context(context_)
, timeouts(timeouts_)
, compression_method(compression_method_)
, http_method(http_method_)
2021-10-26 09:31:01 +00:00
{
}
SinkPtr createSinkForPartition(const String & partition_id) override
{
auto partition_path = PartitionedSink::replaceWildcards(uri, partition_id);
context->getRemoteHostFilter().checkURL(Poco::URI(partition_path));
return std::make_shared<StorageURLSink>(
partition_path, format, format_settings, sample_block, context, timeouts, compression_method, http_method);
2021-10-26 09:31:01 +00:00
}
private:
const String uri;
const String format;
const std::optional<FormatSettings> format_settings;
const Block sample_block;
ContextPtr context;
const ConnectionTimeouts timeouts;
const CompressionMethod compression_method;
2021-10-28 12:44:12 +00:00
const String http_method;
2021-10-26 09:31:01 +00:00
};
2020-07-09 01:00:16 +00:00
std::string IStorageURLBase::getReadMethod() const
{
return Poco::Net::HTTPRequest::HTTP_GET;
}
std::vector<std::pair<std::string, std::string>> IStorageURLBase::getReadURIParams(
const Names & /*column_names*/,
const StorageSnapshotPtr & /*storage_snapshot*/,
2018-06-11 12:13:00 +00:00
const SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
2018-06-11 12:13:00 +00:00
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/) const
{
return {};
}
std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(
const Names & /*column_names*/,
2022-02-28 13:29:05 +00:00
const ColumnsDescription & /* columns_description */,
2018-06-11 12:13:00 +00:00
const SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/) const
{
return nullptr;
}
2022-02-07 19:40:47 +00:00
ColumnsDescription IStorageURLBase::getTableStructureFromData(
const String & format,
const String & uri,
CompressionMethod compression_method,
const HTTPHeaderEntries & headers,
2022-02-07 19:40:47 +00:00
const std::optional<FormatSettings> & format_settings,
ContextPtr context)
{
context->getRemoteHostFilter().checkURL(Poco::URI(uri));
2022-02-08 09:59:20 +00:00
Poco::Net::HTTPBasicCredentials credentials;
2022-02-07 19:40:47 +00:00
2022-02-09 16:14:14 +00:00
std::vector<String> urls_to_check;
2022-02-07 19:40:47 +00:00
if (urlWithGlobs(uri))
{
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
for (const auto & description : uri_descriptions)
{
auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses);
urls_to_check.insert(urls_to_check.end(), options.begin(), options.end());
}
2022-02-09 16:14:14 +00:00
}
else
{
urls_to_check = {uri};
}
std::optional<ColumnsDescription> columns_from_cache;
2022-08-15 12:33:08 +00:00
if (context->getSettingsRef().schema_inference_use_cache_for_url)
2022-06-28 16:13:42 +00:00
columns_from_cache = tryGetColumnsFromCache(urls_to_check, headers, credentials, format, format_settings, context);
2022-02-07 19:40:47 +00:00
ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
2022-02-09 16:14:14 +00:00
{
2022-04-19 19:16:47 +00:00
if (it == urls_to_check.cend())
return nullptr;
2022-04-19 19:25:41 +00:00
auto buf = StorageURLSource::getFirstAvailableURLReadBuffer(
it,
urls_to_check.cend(),
context,
{},
Poco::Net::HTTPRequest::HTTP_GET,
{},
ConnectionTimeouts::getHTTPTimeouts(context),
compression_method,
credentials,
headers,
false,
false,
2022-04-19 19:25:41 +00:00
context->getSettingsRef().max_download_threads);\
++it;
return buf;
};
2022-02-07 19:40:47 +00:00
ColumnsDescription columns;
if (columns_from_cache)
columns = *columns_from_cache;
else
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context);
2022-08-15 12:33:08 +00:00
if (context->getSettingsRef().schema_inference_use_cache_for_url)
addColumnsToCache(urls_to_check, columns, format, format_settings, context);
return columns;
2022-02-07 19:40:47 +00:00
}
2022-05-13 18:39:19 +00:00
bool IStorageURLBase::supportsSubsetOfColumns() const
2022-02-23 19:31:16 +00:00
{
2022-05-13 18:39:19 +00:00
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
2022-02-23 19:31:16 +00:00
}
2020-08-03 13:54:14 +00:00
Pipe IStorageURLBase::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
2018-06-25 12:21:54 +00:00
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
2018-06-11 12:13:00 +00:00
{
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
2021-10-28 13:56:45 +00:00
2022-02-23 19:31:16 +00:00
ColumnsDescription columns_description;
Block block_for_format;
2022-05-13 18:39:19 +00:00
if (supportsSubsetOfColumns())
2022-02-23 19:31:16 +00:00
{
columns_description = storage_snapshot->getDescriptionForColumns(column_names);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
2022-02-23 19:31:16 +00:00
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
2022-02-23 19:31:16 +00:00
}
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
2022-02-07 19:40:47 +00:00
if (urlWithGlobs(uri))
2021-10-26 09:31:01 +00:00
{
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
2021-12-17 11:03:37 +00:00
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
if (num_streams > uri_descriptions.size())
num_streams = uri_descriptions.size();
/// For each uri (which acts like shard) check if it has failover options
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
for (const auto & description : uri_descriptions)
uri_info->uri_list_to_read.emplace_back(parseRemoteDescription(description, 0, description.size(), '|', max_addresses));
2021-10-26 09:31:01 +00:00
Pipes pipes;
2021-12-17 11:03:37 +00:00
pipes.reserve(num_streams);
2021-10-26 09:31:01 +00:00
2022-03-16 14:59:06 +00:00
size_t download_threads = num_streams >= max_download_threads ? 1 : (max_download_threads / num_streams);
2021-12-17 11:03:37 +00:00
for (size_t i = 0; i < num_streams; ++i)
{
2021-10-26 09:31:01 +00:00
pipes.emplace_back(std::make_shared<StorageURLSource>(
2021-12-17 11:03:37 +00:00
uri_info,
2021-10-26 09:31:01 +00:00
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
2021-10-26 09:31:01 +00:00
format_name,
format_settings,
getName(),
2022-02-23 19:31:16 +00:00
block_for_format,
2021-10-26 09:31:01 +00:00
local_context,
2022-02-23 19:31:16 +00:00
columns_description,
2021-10-26 09:31:01 +00:00
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method,
2022-03-16 14:59:06 +00:00
download_threads,
headers,
params,
/* glob_url */ true));
2021-10-26 09:31:01 +00:00
}
return Pipe::unitePipes(std::move(pipes));
}
else
{
2021-12-17 11:03:37 +00:00
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.emplace_back(std::vector<String>{uri});
2021-10-26 09:31:01 +00:00
return Pipe(std::make_shared<StorageURLSource>(
2021-12-17 11:03:37 +00:00
uri_info,
2021-10-26 09:31:01 +00:00
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
2021-10-26 09:31:01 +00:00
format_name,
format_settings,
getName(),
2022-02-23 19:31:16 +00:00
block_for_format,
2021-10-26 09:31:01 +00:00
local_context,
2022-02-23 19:31:16 +00:00
columns_description,
2021-10-26 09:31:01 +00:00
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method,
max_download_threads,
headers,
params));
2021-10-26 09:31:01 +00:00
}
2018-06-11 12:13:00 +00:00
}
2021-04-21 14:36:04 +00:00
Pipe StorageURLWithFailover::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
2021-04-21 14:36:04 +00:00
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t /*num_streams*/)
2021-04-21 14:36:04 +00:00
{
2022-02-28 13:29:05 +00:00
ColumnsDescription columns_description;
Block block_for_format;
2022-05-13 18:39:19 +00:00
if (supportsSubsetOfColumns())
2022-02-28 13:29:05 +00:00
{
2022-05-20 13:20:52 +00:00
columns_description = storage_snapshot->getDescriptionForColumns(column_names);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
2022-02-28 13:29:05 +00:00
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
2022-02-28 13:29:05 +00:00
}
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
2021-11-01 09:52:27 +00:00
2021-12-17 11:03:37 +00:00
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.emplace_back(uri_options);
auto pipe = Pipe(std::make_shared<StorageURLSource>(
2021-12-17 11:03:37 +00:00
uri_info,
2021-10-03 13:53:24 +00:00
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
2021-10-03 13:53:24 +00:00
format_name,
format_settings,
getName(),
2022-02-28 13:29:05 +00:00
block_for_format,
2021-10-03 13:53:24 +00:00
local_context,
2022-02-28 13:29:05 +00:00
columns_description,
2021-10-03 13:53:24 +00:00
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method,
local_context->getSettingsRef().max_download_threads,
headers,
params));
2021-10-03 13:53:24 +00:00
std::shuffle(uri_options.begin(), uri_options.end(), thread_local_rng);
return pipe;
2021-04-21 14:36:04 +00:00
}
2021-10-26 09:31:01 +00:00
SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
2018-06-11 12:13:00 +00:00
{
2021-10-28 12:44:12 +00:00
if (http_method.empty())
http_method = Poco::Net::HTTPRequest::HTTP_POST;
2021-10-26 09:31:01 +00:00
bool has_wildcards = uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
2021-10-26 12:22:13 +00:00
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
bool is_partitioned_implementation = partition_by_ast && has_wildcards;
2021-10-26 09:31:01 +00:00
if (is_partitioned_implementation)
{
return std::make_shared<PartitionedStorageURLSink>(
2021-10-26 12:22:13 +00:00
partition_by_ast,
uri,
format_name,
format_settings,
metadata_snapshot->getSampleBlock(),
context,
2021-10-26 09:31:01 +00:00
ConnectionTimeouts::getHTTPTimeouts(context),
compression_method,
http_method);
2021-10-26 09:31:01 +00:00
}
else
{
return std::make_shared<StorageURLSink>(
uri,
format_name,
format_settings,
metadata_snapshot->getSampleBlock(),
context,
2021-10-26 09:31:01 +00:00
ConnectionTimeouts::getHTTPTimeouts(context),
compression_method,
http_method);
2021-10-26 09:31:01 +00:00
}
2018-06-11 12:13:00 +00:00
}
2018-06-16 05:54:06 +00:00
SchemaCache & IStorageURLBase::getSchemaCache(const ContextPtr & context)
{
static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_url", DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
std::optional<ColumnsDescription> IStorageURLBase::tryGetColumnsFromCache(
const Strings & urls,
const HTTPHeaderEntries & headers,
2022-06-28 16:13:42 +00:00
const Poco::Net::HTTPBasicCredentials & credentials,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache(context);
for (const auto & url : urls)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
2022-06-28 16:13:42 +00:00
auto last_mod_time = getLastModificationTime(url, headers, credentials, context);
/// Some URLs could not have Last-Modified header, in this case we cannot be sure that
/// data wasn't changed after adding it's schema to cache. Use schema from cache only if
/// special setting for this case is enabled.
if (!last_mod_time && !context->getSettingsRef().schema_inference_cache_require_modification_time_for_url)
return 0;
return last_mod_time;
};
2022-08-19 16:42:23 +00:00
auto cache_key = getKeyForSchemaCache(url, format_name, format_settings, context);
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void IStorageURLBase::addColumnsToCache(
const Strings & urls,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache(context);
2022-08-19 16:42:23 +00:00
auto cache_keys = getKeysForSchemaCache(urls, format_name, format_settings, context);
schema_cache.addMany(cache_keys, columns);
}
2022-06-28 16:13:42 +00:00
std::optional<time_t> IStorageURLBase::getLastModificationTime(
const String & url,
const HTTPHeaderEntries & headers,
2022-06-28 16:13:42 +00:00
const Poco::Net::HTTPBasicCredentials & credentials,
const ContextPtr & context)
{
2022-11-03 11:10:49 +00:00
auto settings = context->getSettingsRef();
try
{
ReadWriteBufferFromHTTP buf(
Poco::URI(url),
Poco::Net::HTTPRequest::HTTP_GET,
{},
ConnectionTimeouts::getHTTPTimeouts(context),
2022-06-28 16:13:42 +00:00
credentials,
2022-11-03 11:10:49 +00:00
settings.max_http_get_redirects,
settings.max_read_buffer_size,
context->getReadSettings(),
headers,
ReadWriteBufferFromHTTP::Range{},
&context->getRemoteHostFilter(),
true,
false,
false);
return buf.getLastModificationTime();
}
catch (...)
{
return std::nullopt;
}
}
2021-04-23 12:18:23 +00:00
StorageURL::StorageURL(
2021-10-26 09:31:01 +00:00
const String & uri_,
2021-04-23 12:18:23 +00:00
const StorageID & table_id_,
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
2021-09-07 11:17:25 +00:00
const String & compression_method_,
const HTTPHeaderEntries & headers_,
2021-10-28 12:44:12 +00:00
const String & http_method_,
2021-10-26 12:22:13 +00:00
ASTPtr partition_by_)
: IStorageURLBase(
uri_,
context_,
table_id_,
format_name_,
format_settings_,
columns_,
constraints_,
comment,
compression_method_,
headers_,
http_method_,
partition_by_)
2021-03-23 11:29:29 +00:00
{
2021-10-26 09:31:01 +00:00
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
2021-03-23 11:29:29 +00:00
}
2021-04-21 12:32:57 +00:00
2021-04-21 14:36:04 +00:00
StorageURLWithFailover::StorageURLWithFailover(
2021-05-02 16:33:45 +00:00
const std::vector<String> & uri_options_,
const StorageID & table_id_,
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_)
2021-10-26 09:31:01 +00:00
: StorageURL("", table_id_, format_name_, format_settings_, columns_, constraints_, String{}, context_, compression_method_)
2021-04-21 14:36:04 +00:00
{
for (const auto & uri_option : uri_options_)
{
2021-04-22 07:37:20 +00:00
Poco::URI poco_uri(uri_option);
context_->getRemoteHostFilter().checkURL(poco_uri);
2021-04-21 14:36:04 +00:00
LOG_DEBUG(&Poco::Logger::get("StorageURLDistributed"), "Adding URL option: {}", uri_option);
uri_options.emplace_back(uri_option);
2021-04-21 14:36:04 +00:00
}
}
2021-04-21 12:32:57 +00:00
FormatSettings StorageURL::getFormatSettingsFromArgs(const StorageFactory::Arguments & args)
{
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
FormatSettings format_settings;
if (args.storage_def->settings)
{
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
{
user_format_settings.set(change.name, change.value);
}
}
// Apply changes from SETTINGS clause, with validation.
user_format_settings.applyChanges(args.storage_def->settings->changes);
format_settings = getFormatSettings(args.getContext(), user_format_settings);
2021-04-21 12:32:57 +00:00
}
else
{
format_settings = getFormatSettings(args.getContext());
}
return format_settings;
}
2022-06-17 12:53:16 +00:00
ASTs::iterator StorageURL::collectHeaders(
ASTs & url_function_args, HTTPHeaderEntries & header_entries, ContextPtr context)
2022-06-17 12:53:16 +00:00
{
2022-06-22 14:51:18 +00:00
ASTs::iterator headers_it = url_function_args.end();
2022-06-17 12:53:16 +00:00
for (auto arg_it = url_function_args.begin(); arg_it != url_function_args.end(); ++arg_it)
{
2022-06-22 14:51:18 +00:00
const auto * headers_ast_function = (*arg_it)->as<ASTFunction>();
2022-12-19 11:16:50 +00:00
if (headers_ast_function && headers_ast_function->name == "headers")
2022-06-17 12:53:16 +00:00
{
2022-06-22 14:51:18 +00:00
if (headers_it != url_function_args.end())
2022-06-17 12:53:16 +00:00
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"URL table function can have only one key-value argument: headers=(). {}",
bad_arguments_error_message);
const auto * headers_function_args_expr = assert_cast<const ASTExpressionList *>(headers_ast_function->arguments.get());
auto headers_function_args = headers_function_args_expr->children;
for (auto & header_arg : headers_function_args)
{
const auto * header_ast = header_arg->as<ASTFunction>();
2022-06-22 14:51:18 +00:00
if (!header_ast || header_ast->name != "equals")
2022-06-17 12:53:16 +00:00
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Headers argument is incorrect. {}", bad_arguments_error_message);
const auto * header_args_expr = assert_cast<const ASTExpressionList *>(header_ast->arguments.get());
auto header_args = header_args_expr->children;
if (header_args.size() != 2)
2022-06-22 14:51:18 +00:00
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Headers argument is incorrect: expected 2 arguments, got {}",
header_args.size());
2022-06-17 12:53:16 +00:00
auto ast_literal = evaluateConstantExpressionOrIdentifierAsLiteral(header_args[0], context);
2022-06-22 14:51:18 +00:00
auto arg_name_value = ast_literal->as<ASTLiteral>()->value;
if (arg_name_value.getType() != Field::Types::Which::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as header name");
auto arg_name = arg_name_value.safeGet<String>();
2022-06-17 12:53:16 +00:00
ast_literal = evaluateConstantExpressionOrIdentifierAsLiteral(header_args[1], context);
auto arg_value = ast_literal->as<ASTLiteral>()->value;
2022-06-22 14:51:18 +00:00
if (arg_value.getType() != Field::Types::Which::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as header value");
header_entries.emplace_back(arg_name, arg_value.safeGet<String>());
2022-06-17 12:53:16 +00:00
}
headers_it = arg_it;
continue;
}
2022-12-19 11:16:50 +00:00
if (headers_ast_function && headers_ast_function->name == "equals")
continue;
2022-06-17 12:53:16 +00:00
(*arg_it) = evaluateConstantExpressionOrIdentifierAsLiteral((*arg_it), context);
}
return headers_it;
}
void StorageURL::processNamedCollectionResult(Configuration & configuration, const NamedCollection & collection)
2018-06-11 12:13:00 +00:00
{
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys, optional_regex_keys);
configuration.url = collection.get<String>("url");
configuration.headers = getHeadersFromNamedCollection(collection);
2022-12-19 12:56:23 +00:00
configuration.http_method = collection.getOrDefault<String>("http_method", collection.getOrDefault<String>("method", ""));
if (!configuration.http_method.empty() && configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_PUT)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Http method can be POST or PUT (current: {}). For insert default is POST, for select GET",
configuration.http_method);
configuration.format = collection.getOrDefault<String>("format", "auto");
configuration.compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
configuration.structure = collection.getOrDefault<String>("structure", "auto");
}
2018-06-11 12:13:00 +00:00
StorageURL::Configuration StorageURL::getConfiguration(ASTs & args, ContextPtr local_context)
{
StorageURL::Configuration configuration;
2021-10-26 09:31:01 +00:00
if (auto named_collection = tryGetNamedCollectionWithOverrides(args))
{
StorageURL::processNamedCollectionResult(configuration, *named_collection);
collectHeaders(args, configuration.headers, local_context);
2021-09-07 11:17:25 +00:00
}
else
{
2022-01-14 13:27:57 +00:00
if (args.empty() || args.size() > 3)
2022-06-17 12:53:16 +00:00
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, bad_arguments_error_message);
2018-06-11 12:13:00 +00:00
auto header_it = collectHeaders(args, configuration.headers, local_context);
2022-06-22 14:51:18 +00:00
if (header_it != args.end())
2022-06-17 12:53:16 +00:00
args.erase(header_it);
2018-06-11 12:13:00 +00:00
configuration.url = checkAndGetLiteralArgument<String>(args[0], "url");
if (args.size() > 1)
configuration.format = checkAndGetLiteralArgument<String>(args[1], "format");
2021-09-07 11:17:25 +00:00
if (args.size() == 3)
configuration.compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
2021-09-07 11:17:25 +00:00
}
2018-06-11 12:13:00 +00:00
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(Poco::URI(configuration.url).getPath(), true);
for (const auto & [header, value] : configuration.headers)
{
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
}
2021-09-07 11:17:25 +00:00
return configuration;
}
2018-06-11 12:13:00 +00:00
2021-09-07 11:17:25 +00:00
void registerStorageURL(StorageFactory & factory)
{
factory.registerStorage(
"URL",
[](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext());
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return std::make_shared<StorageURL>(
configuration.url,
args.table_id,
configuration.format,
format_settings,
args.columns,
args.constraints,
args.comment,
args.getContext(),
configuration.compression_method,
configuration.headers,
configuration.http_method,
partition_by);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::URL,
});
2018-06-11 12:13:00 +00:00
}
}