ClickHouse/src/Storages/StorageURL.cpp

1021 lines
38 KiB
C++
Raw Normal View History

2018-06-11 12:13:00 +00:00
#include <Storages/StorageURL.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Storages/PartitionedSink.h>
#include <Storages/checkAndGetLiteralArgument.h>
2018-06-11 12:13:00 +00:00
#include <Interpreters/evaluateConstantExpression.h>
2022-03-30 10:49:37 +00:00
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Parsers/ASTCreateQuery.h>
2021-10-26 09:31:01 +00:00
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
2022-06-17 12:53:16 +00:00
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
2018-06-11 12:13:00 +00:00
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
2022-03-09 13:57:33 +00:00
#include <IO/IOThreadPool.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
2018-06-11 12:13:00 +00:00
#include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h>
2021-10-13 18:22:02 +00:00
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Common/ThreadStatus.h>
2021-10-26 09:31:01 +00:00
#include <Common/parseRemoteDescription.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadWriteBufferFromHTTP.h>
2018-06-11 12:13:00 +00:00
#include <algorithm>
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/QueryPipelineBuilder.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
#include <Poco/Net/HTTPRequest.h>
2018-06-11 12:13:00 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
2021-04-21 14:36:04 +00:00
extern const int NETWORK_ERROR;
2021-09-10 11:11:52 +00:00
extern const int BAD_ARGUMENTS;
2022-02-08 20:58:03 +00:00
extern const int LOGICAL_ERROR;
}
2018-06-11 12:13:00 +00:00
2022-06-17 12:53:16 +00:00
static constexpr auto bad_arguments_error_message = "Storage URL requires 1-4 arguments: "
"url, name of used format (taken from file extension by default), "
"optional compression method, optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
2021-09-10 11:11:52 +00:00
2022-02-07 19:40:47 +00:00
static bool urlWithGlobs(const String & uri)
{
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) || uri.find('|') != std::string::npos;
2022-02-07 19:40:47 +00:00
}
2019-08-24 21:20:20 +00:00
IStorageURLBase::IStorageURLBase(
2021-10-26 09:31:01 +00:00
const String & uri_,
ContextPtr context_,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_,
2018-06-11 12:13:00 +00:00
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
2019-08-24 21:20:20 +00:00
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
2021-04-23 12:18:23 +00:00
const String & comment,
2021-09-07 11:17:25 +00:00
const String & compression_method_,
2021-10-26 09:31:01 +00:00
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_,
2021-10-28 12:44:12 +00:00
const String & http_method_,
2021-10-26 12:22:13 +00:00
ASTPtr partition_by_)
2021-10-26 09:31:01 +00:00
: IStorage(table_id_)
, uri(uri_)
, compression_method(chooseCompressionMethod(uri_, compression_method_))
2021-10-26 09:31:01 +00:00
, format_name(format_name_)
, format_settings(format_settings_)
, headers(headers_)
2021-10-28 12:44:12 +00:00
, http_method(http_method_)
2021-10-26 12:22:13 +00:00
, partition_by(partition_by_)
2018-06-11 12:13:00 +00:00
{
2022-05-23 12:48:48 +00:00
FormatFactory::instance().checkFormatName(format_name);
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata;
2022-02-08 09:59:20 +00:00
if (columns_.empty())
{
auto columns = getTableStructureFromData(format_name, uri, compression_method, headers, format_settings, context_);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
2022-02-07 19:40:47 +00:00
2020-06-19 15:39:41 +00:00
storage_metadata.setConstraints(constraints_);
2021-04-23 12:18:23 +00:00
storage_metadata.setComment(comment);
2020-06-19 15:39:41 +00:00
setInMemoryMetadata(storage_metadata);
2018-06-11 12:13:00 +00:00
}
2018-06-11 12:13:00 +00:00
namespace
{
ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders(const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
2021-10-03 13:53:24 +00:00
{
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers(headers_.begin(), headers_.end());
// Propagate OpenTelemetry trace context, if any, downstream.
if (CurrentThread::isInitialized())
{
const auto & thread_trace_context = CurrentThread::get().thread_trace_context;
if (thread_trace_context.trace_id != UUID())
{
headers.emplace_back("traceparent", thread_trace_context.composeTraceparentHeader());
2021-10-03 13:53:24 +00:00
if (!thread_trace_context.tracestate.empty())
{
headers.emplace_back("tracestate", thread_trace_context.tracestate);
2021-10-03 13:53:24 +00:00
}
}
}
return headers;
}
2021-10-26 09:31:01 +00:00
2022-05-20 19:49:31 +00:00
class StorageURLSource : public ISource
2018-06-11 12:13:00 +00:00
{
using URIParams = std::vector<std::pair<String, String>>;
2021-10-03 13:53:24 +00:00
2018-06-11 12:13:00 +00:00
public:
2021-12-17 11:03:37 +00:00
struct URIInfo
{
using FailoverOptions = std::vector<String>;
std::vector<FailoverOptions> uri_list_to_read;
std::atomic<size_t> next_uri_to_read = 0;
};
using URIInfoPtr = std::shared_ptr<URIInfo>;
2021-12-27 19:42:56 +00:00
void onCancel() override
{
std::lock_guard lock(reader_mutex);
2021-12-27 19:42:56 +00:00
if (reader)
reader->cancel();
}
2022-02-08 11:57:35 +00:00
static void setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri)
{
const auto & user_info = request_uri.getUserInfo();
if (!user_info.empty())
{
std::size_t n = user_info.find(':');
if (n != std::string::npos)
{
credentials.setUsername(user_info.substr(0, n));
credentials.setPassword(user_info.substr(n + 1));
}
}
}
2021-10-03 13:53:24 +00:00
StorageURLSource(
2021-12-17 11:03:37 +00:00
URIInfoPtr uri_info_,
2021-10-28 12:44:12 +00:00
const std::string & http_method,
std::function<void(std::ostream &)> callback,
2018-06-11 12:13:00 +00:00
const String & format,
const std::optional<FormatSettings> & format_settings,
String name_,
2018-06-11 12:13:00 +00:00
const Block & sample_block,
ContextPtr context,
2020-10-02 12:38:50 +00:00
const ColumnsDescription & columns,
2019-02-10 16:55:12 +00:00
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
size_t download_threads,
2021-10-03 13:53:24 +00:00
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
2022-02-07 19:40:47 +00:00
const URIParams & params = {},
bool glob_url = false)
2022-05-20 19:49:31 +00:00
: ISource(sample_block), name(std::move(name_)), uri_info(uri_info_)
2018-06-11 12:13:00 +00:00
{
2021-10-03 13:53:24 +00:00
auto headers = getHeaders(headers_);
2021-12-17 11:03:37 +00:00
2021-10-03 13:53:24 +00:00
/// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline.
2021-12-17 11:03:37 +00:00
initialize = [=, this](const URIInfo::FailoverOptions & uri_options)
2020-08-28 01:21:08 +00:00
{
2022-02-08 11:57:35 +00:00
if (uri_options.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list");
2022-02-09 16:14:14 +00:00
auto first_option = uri_options.begin();
read_buf = getFirstAvailableURLReadBuffer(
first_option,
uri_options.end(),
context,
params,
http_method,
callback,
timeouts,
compression_method,
credentials,
headers,
glob_url,
uri_options.size() == 1,
download_threads);
auto input_format
= FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
2021-10-03 04:28:28 +00:00
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
builder.addSimpleTransform(
[&](const Block & cur_header)
{ return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context); });
2021-07-20 18:18:43 +00:00
2022-05-24 20:06:08 +00:00
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
2021-10-03 04:28:28 +00:00
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
};
2018-06-11 12:13:00 +00:00
}
String getName() const override { return name; }
2018-06-11 12:13:00 +00:00
Chunk generate() override
2018-06-11 12:13:00 +00:00
{
2021-12-17 11:03:37 +00:00
while (true)
2021-10-03 04:28:28 +00:00
{
2021-12-17 11:03:37 +00:00
if (!reader)
{
auto current_uri_pos = uri_info->next_uri_to_read.fetch_add(1);
if (current_uri_pos >= uri_info->uri_list_to_read.size())
return {};
2018-06-11 12:13:00 +00:00
2021-12-17 11:03:37 +00:00
auto current_uri = uri_info->uri_list_to_read[current_uri_pos];
2022-01-31 11:03:06 +00:00
std::lock_guard lock(reader_mutex);
2021-12-17 11:03:37 +00:00
initialize(current_uri);
}
2018-06-11 12:13:00 +00:00
2021-12-17 11:03:37 +00:00
Chunk chunk;
2022-04-06 06:39:56 +00:00
std::lock_guard lock(reader_mutex);
2021-12-17 11:03:37 +00:00
if (reader->pull(chunk))
return chunk;
2022-04-06 06:39:56 +00:00
pipeline->reset();
reader.reset();
2021-12-17 11:03:37 +00:00
}
2018-06-11 12:13:00 +00:00
}
2022-02-07 19:40:47 +00:00
static std::unique_ptr<ReadBuffer> getFirstAvailableURLReadBuffer(
2022-02-09 16:14:14 +00:00
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
2022-02-07 19:40:47 +00:00
ContextPtr context,
const URIParams & params,
const String & http_method,
std::function<void(std::ostream &)> callback,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
2022-02-08 09:59:20 +00:00
Poco::Net::HTTPBasicCredentials & credentials,
2022-02-09 16:14:14 +00:00
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
bool glob_url,
bool delay_initialization,
size_t download_threads)
2022-02-07 19:40:47 +00:00
{
String first_exception_message;
2022-02-08 11:57:35 +00:00
ReadSettings read_settings = context->getReadSettings();
2022-02-07 19:40:47 +00:00
2022-02-09 16:14:14 +00:00
size_t options = std::distance(option, end);
for (; option != end; ++option)
2022-02-07 19:40:47 +00:00
{
2022-02-09 16:14:14 +00:00
bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs && option == std::prev(end);
2022-02-08 11:57:35 +00:00
auto request_uri = Poco::URI(*option);
2022-02-07 19:40:47 +00:00
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
2022-02-08 11:57:35 +00:00
setCredentials(credentials, request_uri);
2022-03-11 13:38:19 +00:00
const auto settings = context->getSettings();
2022-02-07 19:40:47 +00:00
try
{
if (download_threads > 1)
2022-03-04 14:21:52 +00:00
{
2022-03-11 13:38:19 +00:00
try
2022-03-04 14:21:52 +00:00
{
2022-03-11 13:38:19 +00:00
ReadWriteBufferFromHTTP buffer(
request_uri,
Poco::Net::HTTPRequest::HTTP_HEAD,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
read_settings,
headers,
2022-03-16 11:44:07 +00:00
ReadWriteBufferFromHTTP::Range{0, std::nullopt},
&context->getRemoteHostFilter(),
2022-03-11 13:38:19 +00:00
true,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
Poco::Net::HTTPResponse res;
2022-03-16 14:59:06 +00:00
for (size_t i = 0; i < settings.http_max_tries; ++i)
{
try
{
buffer.callWithRedirects(res, Poco::Net::HTTPRequest::HTTP_HEAD, true);
break;
}
2022-03-17 08:03:22 +00:00
catch (const Poco::Exception & e)
2022-03-16 14:59:06 +00:00
{
2022-03-17 08:03:22 +00:00
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
"HTTP HEAD request to `{}` failed at try {}/{}. "
"Error: {}.",
request_uri.toString(),
i + 1,
settings.http_max_tries,
e.displayText());
2022-03-16 14:59:06 +00:00
if (!ReadWriteBufferFromHTTP::isRetriableError(res.getStatus()))
{
throw;
}
}
}
2022-03-11 13:38:19 +00:00
// to check if Range header is supported, we need to send a request with it set
2022-03-17 10:08:42 +00:00
const bool supports_ranges = (res.has("Accept-Ranges") && res.get("Accept-Ranges") == "bytes")
|| (res.has("Content-Range") && res.get("Content-Range").starts_with("bytes"));
2022-03-11 13:38:19 +00:00
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
fmt::runtime(supports_ranges ? "HTTP Range is supported" : "HTTP Range is not supported"));
2022-03-16 14:59:06 +00:00
if (supports_ranges && res.getStatus() == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT
&& res.hasContentLength())
2022-03-11 13:38:19 +00:00
{
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
"Using ParallelReadBuffer with {} workers with chunks of {} bytes",
download_threads,
2022-03-11 13:38:19 +00:00
settings.max_download_buffer_size);
auto read_buffer_factory = std::make_unique<RangedReadWriteBufferFromHTTPFactory>(
res.getContentLength(),
settings.max_download_buffer_size,
request_uri,
http_method,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
read_settings,
headers,
&context->getRemoteHostFilter(),
2022-03-11 13:38:19 +00:00
delay_initialization,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
2022-03-17 10:08:42 +00:00
2022-03-11 13:38:19 +00:00
return wrapReadBufferWithCompressionMethod(
std::make_unique<ParallelReadBuffer>(
2022-03-17 10:08:42 +00:00
std::move(read_buffer_factory),
2022-03-30 10:49:37 +00:00
threadPoolCallbackRunner(IOThreadPool::get()),
2022-03-30 08:15:20 +00:00
download_threads),
compression_method,
settings.zstd_window_log_max);
2022-03-11 13:38:19 +00:00
}
}
2022-03-17 08:03:22 +00:00
catch (const Poco::Exception & e)
2022-03-11 13:38:19 +00:00
{
LOG_TRACE(
2022-03-17 08:03:22 +00:00
&Poco::Logger::get("StorageURLSource"),
2022-03-16 14:59:06 +00:00
"Failed to setup ParallelReadBuffer because of an exception:\n{}.\nFalling back to the single-threaded "
"buffer",
2022-03-17 08:03:22 +00:00
e.displayText());
2022-03-04 14:21:52 +00:00
}
}
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "Using single-threaded read buffer");
2022-02-07 19:40:47 +00:00
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
http_method,
callback,
timeouts,
credentials,
2022-03-11 13:38:19 +00:00
settings.max_http_get_redirects,
2022-02-07 19:40:47 +00:00
DBMS_DEFAULT_BUFFER_SIZE,
2022-02-08 11:57:35 +00:00
read_settings,
2022-02-07 19:40:47 +00:00
headers,
ReadWriteBufferFromHTTP::Range{},
&context->getRemoteHostFilter(),
2022-02-09 16:14:14 +00:00
delay_initialization,
2022-03-11 13:38:19 +00:00
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error),
compression_method,
settings.zstd_window_log_max);
2022-02-07 19:40:47 +00:00
}
catch (...)
{
if (first_exception_message.empty())
first_exception_message = getCurrentExceptionMessage(false);
2022-02-09 16:14:14 +00:00
if (options == 1)
2022-02-07 19:40:47 +00:00
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
2022-02-09 16:14:14 +00:00
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", options, first_exception_message);
2022-02-07 19:40:47 +00:00
}
2018-06-11 12:13:00 +00:00
private:
2021-12-17 11:03:37 +00:00
using InitializeFunc = std::function<void(const URIInfo::FailoverOptions &)>;
InitializeFunc initialize;
2021-10-03 04:28:28 +00:00
2018-06-11 12:13:00 +00:00
String name;
2021-12-17 11:03:37 +00:00
URIInfoPtr uri_info;
std::unique_ptr<ReadBuffer> read_buf;
2021-09-16 17:40:42 +00:00
std::unique_ptr<QueryPipeline> pipeline;
2021-07-20 18:18:43 +00:00
std::unique_ptr<PullingPipelineExecutor> reader;
/// onCancell and generate can be called concurrently and both of them
/// have R/W access to reader pointer.
std::mutex reader_mutex;
2022-02-08 09:59:20 +00:00
Poco::Net::HTTPBasicCredentials credentials;
2018-06-11 12:13:00 +00:00
};
}
2018-06-16 05:54:06 +00:00
2021-07-23 14:25:35 +00:00
StorageURLSink::StorageURLSink(
2021-10-26 09:31:01 +00:00
const String & uri,
2021-07-23 14:25:35 +00:00
const String & format,
const std::optional<FormatSettings> & format_settings,
const Block & sample_block,
ContextPtr context,
const ConnectionTimeouts & timeouts,
2021-10-26 09:31:01 +00:00
const CompressionMethod compression_method,
2021-10-28 12:44:12 +00:00
const String & http_method)
2021-07-23 14:25:35 +00:00
: SinkToStorage(sample_block)
2020-04-28 00:56:44 +00:00
{
std::string content_type = FormatFactory::instance().getContentType(format, context, format_settings);
2022-02-12 02:43:53 +00:00
std::string content_encoding = toContentEncodingName(compression_method);
2020-04-28 00:56:44 +00:00
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(Poco::URI(uri), http_method, content_type, content_encoding, timeouts),
compression_method,
3);
writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block, context, {} /* write callback */, format_settings);
2020-04-28 00:56:44 +00:00
}
2018-06-16 05:54:06 +00:00
2020-07-09 01:00:16 +00:00
2021-07-23 14:25:35 +00:00
void StorageURLSink::consume(Chunk chunk)
2020-07-09 01:00:16 +00:00
{
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
2020-07-09 01:00:16 +00:00
}
void StorageURLSink::onException()
{
if (!writer)
return;
Fix possible "Cannot write to finalized buffer" It is still possible to get this error since onException does not finalize format correctly. Here is an example of such error, that was found by CI [1]: <details> [ 2686 ] {fa01bf02-73f6-4f7f-b14f-e725de6d7f9b} <Fatal> : Logical error: 'Cannot write to finalized buffer'. [ 34577 ] {} <Fatal> BaseDaemon: ######################################## [ 34577 ] {} <Fatal> BaseDaemon: (version 22.6.1.1, build id: AB8040A6769E01A0) (from thread 2686) (query_id: fa01bf02-73f6-4f7f-b14f-e725de6d7f9b) (query: insert into test_02302 select number from numbers(10) settings s3_truncate_on_insert=1;) Received signal Aborted (6) [ 34577 ] {} <Fatal> BaseDaemon: [ 34577 ] {} <Fatal> BaseDaemon: Stack trace: 0x7fcbaa5a703b 0x7fcbaa586859 0xfad9bab 0xfad9e05 0xfaf6a3b 0x24a48c7f 0x258fb9b9 0x258f2004 0x258b88f4 0x258b863b 0x2581773d 0x258177ce 0x24bb5e98 0xfad01d6 0xfad0105 0x2419b11d 0xfad01d6 0xfad0105 0x2215afbb 0x2215aa48 0xfad01d6 0xfad0105 0xfcc265d 0x225cc546 0x249a1c40 0x249bc1b6 0x2685902c 0x26859505 0x269d7767 0x269d504c 0x7fcbaa75e609 0x7fcbaa683163 [ 34577 ] {} <Fatal> BaseDaemon: 3. raise @ 0x7fcbaa5a703b in ? [ 34577 ] {} <Fatal> BaseDaemon: 4. abort @ 0x7fcbaa586859 in ? [ 34577 ] {} <Fatal> BaseDaemon: 5. ./build_docker/../src/Common/Exception.cpp:47: DB::abortOnFailedAssertion(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfad9bab in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 6. ./build_docker/../src/Common/Exception.cpp:70: DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0xfad9e05 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 7. ./build_docker/../src/IO/WriteBuffer.h:0: DB::WriteBuffer::write(char const*, unsigned long) @ 0xfaf6a3b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 8. ./build_docker/../src/Processors/Formats/Impl/ArrowBufferedStreams.cpp:47: DB::ArrowBufferedOutputStream::Write(void const*, long) @ 0x24a48c7f in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 9. long parquet::ThriftSerializer::Serialize<parquet::format::FileMetaData>(parquet::format::FileMetaData const*, arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) @ 0x258fb9b9 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 10. parquet::FileMetaData::FileMetaDataImpl::WriteTo(arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) const @ 0x258f2004 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 11. parquet::WriteFileMetaData(parquet::FileMetaData const&, arrow::io::OutputStream*) @ 0x258b88f4 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 12. parquet::ParquetFileWriter::~ParquetFileWriter() @ 0x258b863b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 13. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x2581773d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 14. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x258177ce in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 15. ./build_docker/../src/Processors/Formats/Impl/ParquetBlockOutputFormat.h:27: DB::ParquetBlockOutputFormat::~ParquetBlockOutputFormat() @ 0x24bb5e98 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 16. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 17. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 18.1. inlined from ./build_docker/../contrib/libcxx/include/__memory/unique_ptr.h:312: std::__1::unique_ptr<DB::WriteBuffer, std::__1::default_delete<DB::WriteBuffer> >::reset(DB::WriteBuffer*) [ 34577 ] {} <Fatal> BaseDaemon: 18.2. inlined from ../contrib/libcxx/include/__memory/unique_ptr.h:269: ~unique_ptr [ 34577 ] {} <Fatal> BaseDaemon: 18. ../src/Storages/StorageS3.cpp:566: DB::StorageS3Sink::~StorageS3Sink() @ 0x2419b11d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 19. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 20. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 21. ./build_docker/../contrib/abseil-cpp/absl/container/internal/raw_hash_set.h:1662: absl::lts_20211102::container_internal::raw_hash_set<absl::lts_20211102::container_internal::FlatHashMapPolicy<StringRef, std::__1::shared_ptr<DB::SinkToStorage> >, absl::lts_20211102::hash_internal::Hash<StringRef>, std::__1::equal_to<StringRef>, std::__1::allocator<std::__1::pair<StringRef const, std::__1::shared_ptr<DB::SinkToStorage> > > >::destroy_slots() @ 0x2215afbb in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 22.1. inlined from ./build_docker/../contrib/libcxx/include/string:1445: std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >::__is_long() const [ 34577 ] {} <Fatal> BaseDaemon: 22.2. inlined from ../contrib/libcxx/include/string:2231: ~basic_string [ 34577 ] {} <Fatal> BaseDaemon: 22. ../src/Storages/PartitionedSink.h:14: DB::PartitionedSink::~PartitionedSink() @ 0x2215aa48 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 23. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 24. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 25. ./build_docker/../contrib/libcxx/include/vector:802: std::__1::vector<std::__1::shared_ptr<DB::IProcessor>, std::__1::allocator<std::__1::shared_ptr<DB::IProcessor> > >::__base_destruct_at_end(std::__1::shared_ptr<DB::IProcessor>*) @ 0xfcc265d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 26.1. inlined from ./build_docker/../contrib/libcxx/include/vector:402: ~vector [ 34577 ] {} <Fatal> BaseDaemon: 26.2. inlined from ../src/QueryPipeline/QueryPipeline.cpp:29: ~QueryPipeline [ 34577 ] {} <Fatal> BaseDaemon: 26. ../src/QueryPipeline/QueryPipeline.cpp:535: DB::QueryPipeline::reset() @ 0x225cc546 in /usr/bin/clickhouse [ 614 ] {} <Fatal> Application: Child process was terminated by signal 6. </details> [1]: https://s3.amazonaws.com/clickhouse-test-reports/37542/8a224239c1d922158b4dc9f5d6609dca836dfd06/stress_test__undefined__actions_.html Follow-up for: #36979 Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-30 10:13:47 +00:00
onFinish();
}
2021-07-23 14:25:35 +00:00
void StorageURLSink::onFinish()
2020-07-09 01:00:16 +00:00
{
Fix possible "Cannot write to finalized buffer" It is still possible to get this error since onException does not finalize format correctly. Here is an example of such error, that was found by CI [1]: <details> [ 2686 ] {fa01bf02-73f6-4f7f-b14f-e725de6d7f9b} <Fatal> : Logical error: 'Cannot write to finalized buffer'. [ 34577 ] {} <Fatal> BaseDaemon: ######################################## [ 34577 ] {} <Fatal> BaseDaemon: (version 22.6.1.1, build id: AB8040A6769E01A0) (from thread 2686) (query_id: fa01bf02-73f6-4f7f-b14f-e725de6d7f9b) (query: insert into test_02302 select number from numbers(10) settings s3_truncate_on_insert=1;) Received signal Aborted (6) [ 34577 ] {} <Fatal> BaseDaemon: [ 34577 ] {} <Fatal> BaseDaemon: Stack trace: 0x7fcbaa5a703b 0x7fcbaa586859 0xfad9bab 0xfad9e05 0xfaf6a3b 0x24a48c7f 0x258fb9b9 0x258f2004 0x258b88f4 0x258b863b 0x2581773d 0x258177ce 0x24bb5e98 0xfad01d6 0xfad0105 0x2419b11d 0xfad01d6 0xfad0105 0x2215afbb 0x2215aa48 0xfad01d6 0xfad0105 0xfcc265d 0x225cc546 0x249a1c40 0x249bc1b6 0x2685902c 0x26859505 0x269d7767 0x269d504c 0x7fcbaa75e609 0x7fcbaa683163 [ 34577 ] {} <Fatal> BaseDaemon: 3. raise @ 0x7fcbaa5a703b in ? [ 34577 ] {} <Fatal> BaseDaemon: 4. abort @ 0x7fcbaa586859 in ? [ 34577 ] {} <Fatal> BaseDaemon: 5. ./build_docker/../src/Common/Exception.cpp:47: DB::abortOnFailedAssertion(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfad9bab in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 6. ./build_docker/../src/Common/Exception.cpp:70: DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0xfad9e05 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 7. ./build_docker/../src/IO/WriteBuffer.h:0: DB::WriteBuffer::write(char const*, unsigned long) @ 0xfaf6a3b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 8. ./build_docker/../src/Processors/Formats/Impl/ArrowBufferedStreams.cpp:47: DB::ArrowBufferedOutputStream::Write(void const*, long) @ 0x24a48c7f in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 9. long parquet::ThriftSerializer::Serialize<parquet::format::FileMetaData>(parquet::format::FileMetaData const*, arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) @ 0x258fb9b9 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 10. parquet::FileMetaData::FileMetaDataImpl::WriteTo(arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) const @ 0x258f2004 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 11. parquet::WriteFileMetaData(parquet::FileMetaData const&, arrow::io::OutputStream*) @ 0x258b88f4 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 12. parquet::ParquetFileWriter::~ParquetFileWriter() @ 0x258b863b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 13. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x2581773d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 14. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x258177ce in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 15. ./build_docker/../src/Processors/Formats/Impl/ParquetBlockOutputFormat.h:27: DB::ParquetBlockOutputFormat::~ParquetBlockOutputFormat() @ 0x24bb5e98 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 16. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 17. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 18.1. inlined from ./build_docker/../contrib/libcxx/include/__memory/unique_ptr.h:312: std::__1::unique_ptr<DB::WriteBuffer, std::__1::default_delete<DB::WriteBuffer> >::reset(DB::WriteBuffer*) [ 34577 ] {} <Fatal> BaseDaemon: 18.2. inlined from ../contrib/libcxx/include/__memory/unique_ptr.h:269: ~unique_ptr [ 34577 ] {} <Fatal> BaseDaemon: 18. ../src/Storages/StorageS3.cpp:566: DB::StorageS3Sink::~StorageS3Sink() @ 0x2419b11d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 19. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 20. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 21. ./build_docker/../contrib/abseil-cpp/absl/container/internal/raw_hash_set.h:1662: absl::lts_20211102::container_internal::raw_hash_set<absl::lts_20211102::container_internal::FlatHashMapPolicy<StringRef, std::__1::shared_ptr<DB::SinkToStorage> >, absl::lts_20211102::hash_internal::Hash<StringRef>, std::__1::equal_to<StringRef>, std::__1::allocator<std::__1::pair<StringRef const, std::__1::shared_ptr<DB::SinkToStorage> > > >::destroy_slots() @ 0x2215afbb in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 22.1. inlined from ./build_docker/../contrib/libcxx/include/string:1445: std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >::__is_long() const [ 34577 ] {} <Fatal> BaseDaemon: 22.2. inlined from ../contrib/libcxx/include/string:2231: ~basic_string [ 34577 ] {} <Fatal> BaseDaemon: 22. ../src/Storages/PartitionedSink.h:14: DB::PartitionedSink::~PartitionedSink() @ 0x2215aa48 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 23. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 24. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 25. ./build_docker/../contrib/libcxx/include/vector:802: std::__1::vector<std::__1::shared_ptr<DB::IProcessor>, std::__1::allocator<std::__1::shared_ptr<DB::IProcessor> > >::__base_destruct_at_end(std::__1::shared_ptr<DB::IProcessor>*) @ 0xfcc265d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 26.1. inlined from ./build_docker/../contrib/libcxx/include/vector:402: ~vector [ 34577 ] {} <Fatal> BaseDaemon: 26.2. inlined from ../src/QueryPipeline/QueryPipeline.cpp:29: ~QueryPipeline [ 34577 ] {} <Fatal> BaseDaemon: 26. ../src/QueryPipeline/QueryPipeline.cpp:535: DB::QueryPipeline::reset() @ 0x225cc546 in /usr/bin/clickhouse [ 614 ] {} <Fatal> Application: Child process was terminated by signal 6. </details> [1]: https://s3.amazonaws.com/clickhouse-test-reports/37542/8a224239c1d922158b4dc9f5d6609dca836dfd06/stress_test__undefined__actions_.html Follow-up for: #36979 Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-30 10:13:47 +00:00
try
{
writer->finalize();
writer->flush();
write_buf->finalize();
}
catch (...)
{
/// Stop ParallelFormattingOutputFormat correctly.
writer.reset();
throw;
}
2020-07-09 01:00:16 +00:00
}
2021-10-26 09:31:01 +00:00
class PartitionedStorageURLSink : public PartitionedSink
{
public:
PartitionedStorageURLSink(
const ASTPtr & partition_by,
const String & uri_,
const String & format_,
const std::optional<FormatSettings> & format_settings_,
const Block & sample_block_,
ContextPtr context_,
const ConnectionTimeouts & timeouts_,
const CompressionMethod compression_method_,
2021-10-28 12:44:12 +00:00
const String & http_method_)
: PartitionedSink(partition_by, context_, sample_block_)
, uri(uri_)
, format(format_)
, format_settings(format_settings_)
, sample_block(sample_block_)
, context(context_)
, timeouts(timeouts_)
, compression_method(compression_method_)
, http_method(http_method_)
2021-10-26 09:31:01 +00:00
{
}
SinkPtr createSinkForPartition(const String & partition_id) override
{
auto partition_path = PartitionedSink::replaceWildcards(uri, partition_id);
context->getRemoteHostFilter().checkURL(Poco::URI(partition_path));
return std::make_shared<StorageURLSink>(
partition_path, format, format_settings, sample_block, context, timeouts, compression_method, http_method);
2021-10-26 09:31:01 +00:00
}
private:
const String uri;
const String format;
const std::optional<FormatSettings> format_settings;
const Block sample_block;
ContextPtr context;
const ConnectionTimeouts timeouts;
const CompressionMethod compression_method;
2021-10-28 12:44:12 +00:00
const String http_method;
2021-10-26 09:31:01 +00:00
};
2020-07-09 01:00:16 +00:00
std::string IStorageURLBase::getReadMethod() const
{
return Poco::Net::HTTPRequest::HTTP_GET;
}
std::vector<std::pair<std::string, std::string>> IStorageURLBase::getReadURIParams(
const Names & /*column_names*/,
const StorageSnapshotPtr & /*storage_snapshot*/,
2018-06-11 12:13:00 +00:00
const SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
2018-06-11 12:13:00 +00:00
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/) const
{
return {};
}
std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(
const Names & /*column_names*/,
2022-02-28 13:29:05 +00:00
const ColumnsDescription & /* columns_description */,
2018-06-11 12:13:00 +00:00
const SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/) const
{
return nullptr;
}
2022-02-07 19:40:47 +00:00
ColumnsDescription IStorageURLBase::getTableStructureFromData(
const String & format,
const String & uri,
CompressionMethod compression_method,
2022-02-07 19:40:47 +00:00
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const std::optional<FormatSettings> & format_settings,
ContextPtr context)
{
context->getRemoteHostFilter().checkURL(Poco::URI(uri));
2022-02-08 09:59:20 +00:00
Poco::Net::HTTPBasicCredentials credentials;
2022-02-07 19:40:47 +00:00
2022-02-09 16:14:14 +00:00
std::vector<String> urls_to_check;
2022-02-07 19:40:47 +00:00
if (urlWithGlobs(uri))
{
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
for (const auto & description : uri_descriptions)
{
auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses);
urls_to_check.insert(urls_to_check.end(), options.begin(), options.end());
}
2022-02-09 16:14:14 +00:00
}
else
{
urls_to_check = {uri};
}
2022-02-07 19:40:47 +00:00
2022-04-19 19:16:47 +00:00
ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()]() mutable -> std::unique_ptr<ReadBuffer>
2022-02-09 16:14:14 +00:00
{
2022-04-19 19:16:47 +00:00
if (it == urls_to_check.cend())
return nullptr;
2022-04-19 19:25:41 +00:00
auto buf = StorageURLSource::getFirstAvailableURLReadBuffer(
it,
urls_to_check.cend(),
context,
{},
Poco::Net::HTTPRequest::HTTP_GET,
{},
ConnectionTimeouts::getHTTPTimeouts(context),
compression_method,
credentials,
headers,
false,
false,
2022-04-19 19:25:41 +00:00
context->getSettingsRef().max_download_threads);\
++it;
return buf;
};
2022-02-07 19:40:47 +00:00
2022-04-19 19:16:47 +00:00
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context);
2022-02-07 19:40:47 +00:00
}
2022-05-13 18:39:19 +00:00
bool IStorageURLBase::supportsSubsetOfColumns() const
2022-02-23 19:31:16 +00:00
{
2022-05-13 18:39:19 +00:00
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
2022-02-23 19:31:16 +00:00
}
2020-08-03 13:54:14 +00:00
Pipe IStorageURLBase::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
2018-06-25 12:21:54 +00:00
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
2021-12-17 11:03:37 +00:00
unsigned num_streams)
2018-06-11 12:13:00 +00:00
{
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
2021-10-28 13:56:45 +00:00
2022-02-23 19:31:16 +00:00
ColumnsDescription columns_description;
Block block_for_format;
2022-05-13 18:39:19 +00:00
if (supportsSubsetOfColumns())
2022-02-23 19:31:16 +00:00
{
columns_description = storage_snapshot->getDescriptionForColumns(column_names);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
2022-02-23 19:31:16 +00:00
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
2022-02-23 19:31:16 +00:00
}
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
2022-02-07 19:40:47 +00:00
if (urlWithGlobs(uri))
2021-10-26 09:31:01 +00:00
{
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
2021-12-17 11:03:37 +00:00
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
if (num_streams > uri_descriptions.size())
num_streams = uri_descriptions.size();
/// For each uri (which acts like shard) check if it has failover options
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
for (const auto & description : uri_descriptions)
uri_info->uri_list_to_read.emplace_back(parseRemoteDescription(description, 0, description.size(), '|', max_addresses));
2021-10-26 09:31:01 +00:00
Pipes pipes;
2021-12-17 11:03:37 +00:00
pipes.reserve(num_streams);
2021-10-26 09:31:01 +00:00
2022-03-16 14:59:06 +00:00
size_t download_threads = num_streams >= max_download_threads ? 1 : (max_download_threads / num_streams);
2021-12-17 11:03:37 +00:00
for (size_t i = 0; i < num_streams; ++i)
{
2021-10-26 09:31:01 +00:00
pipes.emplace_back(std::make_shared<StorageURLSource>(
2021-12-17 11:03:37 +00:00
uri_info,
2021-10-26 09:31:01 +00:00
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
2021-10-26 09:31:01 +00:00
format_name,
format_settings,
getName(),
2022-02-23 19:31:16 +00:00
block_for_format,
2021-10-26 09:31:01 +00:00
local_context,
2022-02-23 19:31:16 +00:00
columns_description,
2021-10-26 09:31:01 +00:00
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method,
2022-03-16 14:59:06 +00:00
download_threads,
headers,
params,
/* glob_url */ true));
2021-10-26 09:31:01 +00:00
}
return Pipe::unitePipes(std::move(pipes));
}
else
{
2021-12-17 11:03:37 +00:00
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.emplace_back(std::vector<String>{uri});
2021-10-26 09:31:01 +00:00
return Pipe(std::make_shared<StorageURLSource>(
2021-12-17 11:03:37 +00:00
uri_info,
2021-10-26 09:31:01 +00:00
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
2021-10-26 09:31:01 +00:00
format_name,
format_settings,
getName(),
2022-02-23 19:31:16 +00:00
block_for_format,
2021-10-26 09:31:01 +00:00
local_context,
2022-02-23 19:31:16 +00:00
columns_description,
2021-10-26 09:31:01 +00:00
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method,
max_download_threads,
headers,
params));
2021-10-26 09:31:01 +00:00
}
2018-06-11 12:13:00 +00:00
}
2021-04-21 14:36:04 +00:00
Pipe StorageURLWithFailover::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
2021-04-21 14:36:04 +00:00
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned /*num_streams*/)
{
2022-02-28 13:29:05 +00:00
ColumnsDescription columns_description;
Block block_for_format;
2022-05-13 18:39:19 +00:00
if (supportsSubsetOfColumns())
2022-02-28 13:29:05 +00:00
{
2022-05-20 13:20:52 +00:00
columns_description = storage_snapshot->getDescriptionForColumns(column_names);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
2022-02-28 13:29:05 +00:00
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
2022-02-28 13:29:05 +00:00
}
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
2021-11-01 09:52:27 +00:00
2021-12-17 11:03:37 +00:00
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.emplace_back(uri_options);
auto pipe = Pipe(std::make_shared<StorageURLSource>(
2021-12-17 11:03:37 +00:00
uri_info,
2021-10-03 13:53:24 +00:00
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
2021-10-03 13:53:24 +00:00
format_name,
format_settings,
getName(),
2022-02-28 13:29:05 +00:00
block_for_format,
2021-10-03 13:53:24 +00:00
local_context,
2022-02-28 13:29:05 +00:00
columns_description,
2021-10-03 13:53:24 +00:00
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method,
local_context->getSettingsRef().max_download_threads,
headers,
params));
2021-10-03 13:53:24 +00:00
std::shuffle(uri_options.begin(), uri_options.end(), thread_local_rng);
return pipe;
2021-04-21 14:36:04 +00:00
}
2021-10-26 09:31:01 +00:00
SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
2018-06-11 12:13:00 +00:00
{
2021-10-28 12:44:12 +00:00
if (http_method.empty())
http_method = Poco::Net::HTTPRequest::HTTP_POST;
2021-10-26 09:31:01 +00:00
bool has_wildcards = uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
2021-10-26 12:22:13 +00:00
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
bool is_partitioned_implementation = partition_by_ast && has_wildcards;
2021-10-26 09:31:01 +00:00
if (is_partitioned_implementation)
{
return std::make_shared<PartitionedStorageURLSink>(
2021-10-26 12:22:13 +00:00
partition_by_ast,
uri,
format_name,
format_settings,
metadata_snapshot->getSampleBlock(),
context,
2021-10-26 09:31:01 +00:00
ConnectionTimeouts::getHTTPTimeouts(context),
compression_method,
http_method);
2021-10-26 09:31:01 +00:00
}
else
{
return std::make_shared<StorageURLSink>(
uri,
format_name,
format_settings,
metadata_snapshot->getSampleBlock(),
context,
2021-10-26 09:31:01 +00:00
ConnectionTimeouts::getHTTPTimeouts(context),
compression_method,
http_method);
2021-10-26 09:31:01 +00:00
}
2018-06-11 12:13:00 +00:00
}
2018-06-16 05:54:06 +00:00
2021-04-23 12:18:23 +00:00
StorageURL::StorageURL(
2021-10-26 09:31:01 +00:00
const String & uri_,
2021-04-23 12:18:23 +00:00
const StorageID & table_id_,
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
2021-09-07 11:17:25 +00:00
const String & compression_method_,
2021-10-26 09:31:01 +00:00
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_,
2021-10-28 12:44:12 +00:00
const String & http_method_,
2021-10-26 12:22:13 +00:00
ASTPtr partition_by_)
: IStorageURLBase(
uri_,
context_,
table_id_,
format_name_,
format_settings_,
columns_,
constraints_,
comment,
compression_method_,
headers_,
http_method_,
partition_by_)
2021-03-23 11:29:29 +00:00
{
2021-10-26 09:31:01 +00:00
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
2021-03-23 11:29:29 +00:00
}
2021-04-21 12:32:57 +00:00
2021-04-21 14:36:04 +00:00
StorageURLWithFailover::StorageURLWithFailover(
2021-05-02 16:33:45 +00:00
const std::vector<String> & uri_options_,
const StorageID & table_id_,
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_)
2021-10-26 09:31:01 +00:00
: StorageURL("", table_id_, format_name_, format_settings_, columns_, constraints_, String{}, context_, compression_method_)
2021-04-21 14:36:04 +00:00
{
for (const auto & uri_option : uri_options_)
{
2021-04-22 07:37:20 +00:00
Poco::URI poco_uri(uri_option);
context_->getRemoteHostFilter().checkURL(poco_uri);
2021-04-21 14:36:04 +00:00
LOG_DEBUG(&Poco::Logger::get("StorageURLDistributed"), "Adding URL option: {}", uri_option);
uri_options.emplace_back(uri_option);
2021-04-21 14:36:04 +00:00
}
}
2021-04-21 12:32:57 +00:00
FormatSettings StorageURL::getFormatSettingsFromArgs(const StorageFactory::Arguments & args)
{
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
FormatSettings format_settings;
if (args.storage_def->settings)
{
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
{
user_format_settings.set(change.name, change.value);
}
}
// Apply changes from SETTINGS clause, with validation.
user_format_settings.applyChanges(args.storage_def->settings->changes);
format_settings = getFormatSettings(args.getContext(), user_format_settings);
2021-04-21 12:32:57 +00:00
}
else
{
format_settings = getFormatSettings(args.getContext());
}
return format_settings;
}
2022-06-17 12:53:16 +00:00
ASTs::iterator StorageURL::collectHeaders(
ASTs & url_function_args, URLBasedDataSourceConfiguration & configuration, ContextPtr context)
{
2022-06-22 14:51:18 +00:00
ASTs::iterator headers_it = url_function_args.end();
2022-06-17 12:53:16 +00:00
for (auto arg_it = url_function_args.begin(); arg_it != url_function_args.end(); ++arg_it)
{
2022-06-22 14:51:18 +00:00
const auto * headers_ast_function = (*arg_it)->as<ASTFunction>();
if (headers_ast_function && headers_ast_function->name == "headers")
2022-06-17 12:53:16 +00:00
{
2022-06-22 14:51:18 +00:00
if (headers_it != url_function_args.end())
2022-06-17 12:53:16 +00:00
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"URL table function can have only one key-value argument: headers=(). {}",
bad_arguments_error_message);
const auto * headers_function_args_expr = assert_cast<const ASTExpressionList *>(headers_ast_function->arguments.get());
auto headers_function_args = headers_function_args_expr->children;
for (auto & header_arg : headers_function_args)
{
const auto * header_ast = header_arg->as<ASTFunction>();
2022-06-22 14:51:18 +00:00
if (!header_ast || header_ast->name != "equals")
2022-06-17 12:53:16 +00:00
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Headers argument is incorrect. {}", bad_arguments_error_message);
const auto * header_args_expr = assert_cast<const ASTExpressionList *>(header_ast->arguments.get());
auto header_args = header_args_expr->children;
if (header_args.size() != 2)
2022-06-22 14:51:18 +00:00
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Headers argument is incorrect: expected 2 arguments, got {}",
header_args.size());
2022-06-17 12:53:16 +00:00
auto ast_literal = evaluateConstantExpressionOrIdentifierAsLiteral(header_args[0], context);
2022-06-22 14:51:18 +00:00
auto arg_name_value = ast_literal->as<ASTLiteral>()->value;
if (arg_name_value.getType() != Field::Types::Which::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as header name");
auto arg_name = arg_name_value.safeGet<String>();
2022-06-17 12:53:16 +00:00
ast_literal = evaluateConstantExpressionOrIdentifierAsLiteral(header_args[1], context);
auto arg_value = ast_literal->as<ASTLiteral>()->value;
2022-06-22 14:51:18 +00:00
if (arg_value.getType() != Field::Types::Which::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as header value");
2022-06-17 12:53:16 +00:00
configuration.headers.emplace_back(arg_name, arg_value);
}
headers_it = arg_it;
continue;
}
(*arg_it) = evaluateConstantExpressionOrIdentifierAsLiteral((*arg_it), context);
}
return headers_it;
}
2021-09-07 11:17:25 +00:00
URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, ContextPtr local_context)
2018-06-11 12:13:00 +00:00
{
2021-09-15 22:45:43 +00:00
URLBasedDataSourceConfiguration configuration;
2018-06-11 12:13:00 +00:00
2021-09-15 22:45:43 +00:00
if (auto named_collection = getURLBasedDataSourceConfiguration(args, local_context))
2018-08-24 00:07:25 +00:00
{
2021-09-15 22:45:43 +00:00
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
2018-06-11 12:13:00 +00:00
if (!configuration.http_method.empty() && configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
2021-10-28 12:44:12 +00:00
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_PUT)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Http method can be POST or PUT (current: {}). For insert default is POST, for select GET",
configuration.http_method);
2021-10-26 09:31:01 +00:00
2021-09-07 11:17:25 +00:00
if (!storage_specific_args.empty())
2021-09-08 19:28:22 +00:00
{
String illegal_args;
for (const auto & arg : storage_specific_args)
{
if (!illegal_args.empty())
illegal_args += ", ";
illegal_args += arg.first;
}
2021-11-01 09:52:27 +00:00
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown argument `{}` for storage URL", illegal_args);
2021-09-08 19:28:22 +00:00
}
2021-09-07 11:17:25 +00:00
}
else
{
2022-01-14 13:27:57 +00:00
if (args.empty() || args.size() > 3)
2022-06-17 12:53:16 +00:00
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, bad_arguments_error_message);
2018-06-11 12:13:00 +00:00
2022-06-17 12:53:16 +00:00
auto header_it = collectHeaders(args, configuration, local_context);
2022-06-22 14:51:18 +00:00
if (header_it != args.end())
2022-06-17 12:53:16 +00:00
args.erase(header_it);
2018-06-11 12:13:00 +00:00
configuration.url = checkAndGetLiteralArgument<String>(args[0], "url");
if (args.size() > 1)
configuration.format = checkAndGetLiteralArgument<String>(args[1], "format");
2021-09-07 11:17:25 +00:00
if (args.size() == 3)
configuration.compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
2021-09-07 11:17:25 +00:00
}
2018-06-11 12:13:00 +00:00
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.url, true);
2021-09-07 11:17:25 +00:00
return configuration;
}
2018-06-11 12:13:00 +00:00
2021-09-07 11:17:25 +00:00
void registerStorageURL(StorageFactory & factory)
{
factory.registerStorage(
"URL",
[](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext());
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
{
auto value_literal = value.safeGet<String>();
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(std::make_pair(header, value_literal));
}
2021-10-26 12:22:13 +00:00
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return std::make_shared<StorageURL>(
configuration.url,
args.table_id,
configuration.format,
format_settings,
args.columns,
args.constraints,
args.comment,
args.getContext(),
configuration.compression_method,
headers,
configuration.http_method,
partition_by);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::URL,
});
2018-06-11 12:13:00 +00:00
}
}