ClickHouse/src/Storages/StorageURL.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

1217 lines
42 KiB
C++
Raw Normal View History

2018-06-11 12:13:00 +00:00
#include <Storages/StorageURL.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Storages/PartitionedSink.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/NamedCollectionsHelpers.h>
2018-06-11 12:13:00 +00:00
#include <Interpreters/evaluateConstantExpression.h>
2022-03-30 10:49:37 +00:00
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Parsers/ASTCreateQuery.h>
2021-10-26 09:31:01 +00:00
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
2022-06-17 12:53:16 +00:00
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
2018-06-11 12:13:00 +00:00
#include <IO/ConnectionTimeouts.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
2018-06-11 12:13:00 +00:00
#include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h>
2021-10-13 18:22:02 +00:00
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Common/ThreadStatus.h>
2021-10-26 09:31:01 +00:00
#include <Common/parseRemoteDescription.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <IO/ReadWriteBufferFromHTTP.h>
2023-04-21 12:11:18 +00:00
#include <IO/HTTPHeaderEntries.h>
2018-06-11 12:13:00 +00:00
#include <algorithm>
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/QueryPipelineBuilder.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
#include <Poco/Net/HTTPRequest.h>
#include <regex>
#include <DataTypes/DataTypeString.h>
2023-06-16 19:38:50 +00:00
#include <DataTypes/DataTypeLowCardinality.h>
2018-06-11 12:13:00 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
2021-04-21 14:36:04 +00:00
extern const int NETWORK_ERROR;
2021-09-10 11:11:52 +00:00
extern const int BAD_ARGUMENTS;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
2018-06-11 12:13:00 +00:00
2022-06-17 12:53:16 +00:00
static constexpr auto bad_arguments_error_message = "Storage URL requires 1-4 arguments: "
2023-04-21 12:11:18 +00:00
"url, name of used format (taken from file extension by default), "
"optional compression method, optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
2021-09-10 11:11:52 +00:00
static const std::unordered_set<std::string_view> required_configuration_keys = {
"url",
};
static const std::unordered_set<std::string_view> optional_configuration_keys = {
"format",
"compression",
"compression_method",
"structure",
"filename",
2022-12-19 12:56:23 +00:00
"method",
"http_method",
2022-12-16 23:34:29 +00:00
"description",
"headers.header.name",
"headers.header.value",
};
/// Headers in config file will have structure "headers.header.name" and "headers.header.value".
/// But Poco::AbstractConfiguration converts them into "header", "header[1]", "header[2]".
static const std::vector<std::regex> optional_regex_keys = {
2023-01-04 14:29:06 +00:00
std::regex(R"(headers.header\[[\d]*\].name)"),
std::regex(R"(headers.header\[[\d]*\].value)"),
};
2022-02-07 19:40:47 +00:00
static bool urlWithGlobs(const String & uri)
{
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) || uri.find('|') != std::string::npos;
2022-02-07 19:40:47 +00:00
}
2023-04-21 12:11:18 +00:00
static ConnectionTimeouts getHTTPTimeouts(ContextPtr context)
{
return ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0});
}
2022-02-07 19:40:47 +00:00
2019-08-24 21:20:20 +00:00
IStorageURLBase::IStorageURLBase(
2021-10-26 09:31:01 +00:00
const String & uri_,
ContextPtr context_,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_,
2018-06-11 12:13:00 +00:00
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
2019-08-24 21:20:20 +00:00
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
2021-04-23 12:18:23 +00:00
const String & comment,
2021-09-07 11:17:25 +00:00
const String & compression_method_,
const HTTPHeaderEntries & headers_,
2021-10-28 12:44:12 +00:00
const String & http_method_,
ASTPtr partition_by_,
bool distributed_processing_)
2021-10-26 09:31:01 +00:00
: IStorage(table_id_)
, uri(uri_)
, compression_method(chooseCompressionMethod(Poco::URI(uri_).getPath(), compression_method_))
2021-10-26 09:31:01 +00:00
, format_name(format_name_)
, format_settings(format_settings_)
, headers(headers_)
2021-10-28 12:44:12 +00:00
, http_method(http_method_)
2021-10-26 12:22:13 +00:00
, partition_by(partition_by_)
, distributed_processing(distributed_processing_)
2018-06-11 12:13:00 +00:00
{
2022-05-23 12:48:48 +00:00
FormatFactory::instance().checkFormatName(format_name);
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata;
2022-02-08 09:59:20 +00:00
if (columns_.empty())
{
auto columns = getTableStructureFromData(format_name, uri, compression_method, headers, format_settings, context_);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
2022-02-07 19:40:47 +00:00
2020-06-19 15:39:41 +00:00
storage_metadata.setConstraints(constraints_);
2021-04-23 12:18:23 +00:00
storage_metadata.setComment(comment);
2020-06-19 15:39:41 +00:00
setInMemoryMetadata(storage_metadata);
2018-06-11 12:13:00 +00:00
}
2023-01-19 02:19:04 +00:00
2023-04-21 12:11:18 +00:00
namespace
{
HTTPHeaderEntries getHeaders(const HTTPHeaderEntries & headers_)
2021-10-03 13:53:24 +00:00
{
2023-04-21 12:11:18 +00:00
HTTPHeaderEntries headers(headers_.begin(), headers_.end());
2022-03-16 08:45:17 +00:00
2023-04-21 12:11:18 +00:00
// Propagate OpenTelemetry trace context, if any, downstream.
const auto & current_trace_context = OpenTelemetry::CurrentContext();
2023-04-21 12:11:18 +00:00
if (current_trace_context.isTraceEnabled())
2021-10-03 13:53:24 +00:00
{
2023-04-21 12:11:18 +00:00
headers.emplace_back("traceparent", current_trace_context.composeTraceparentHeader());
2021-10-03 13:53:24 +00:00
2023-04-21 12:11:18 +00:00
if (!current_trace_context.tracestate.empty())
{
headers.emplace_back("tracestate", current_trace_context.tracestate);
}
}
2021-10-26 09:31:01 +00:00
2023-04-21 12:11:18 +00:00
return headers;
}
2021-10-03 13:53:24 +00:00
StorageURLSource::FailoverOptions getFailoverOptions(const String & uri, size_t max_addresses)
{
2023-04-21 17:28:14 +00:00
return parseRemoteDescription(uri, 0, uri.size(), '|', max_addresses);
}
}
2021-12-17 11:03:37 +00:00
class StorageURLSource::DisclosedGlobIterator::Impl
{
public:
2023-04-21 17:35:17 +00:00
Impl(const String & uri, size_t max_addresses)
2023-01-19 02:19:04 +00:00
{
2023-04-21 17:35:17 +00:00
uris = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
}
2021-12-27 19:42:56 +00:00
String next()
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= uris.size())
return {};
return uris[current_index];
}
size_t size()
{
return uris.size();
}
2022-02-08 11:57:35 +00:00
private:
Strings uris;
std::atomic_size_t index = 0;
};
2023-04-21 17:35:17 +00:00
StorageURLSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, size_t max_addresses)
: pimpl(std::make_shared<StorageURLSource::DisclosedGlobIterator::Impl>(uri, max_addresses)) {}
String StorageURLSource::DisclosedGlobIterator::next()
{
return pimpl->next();
}
size_t StorageURLSource::DisclosedGlobIterator::size()
{
return pimpl->size();
}
void StorageURLSource::setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri)
{
const auto & user_info = request_uri.getUserInfo();
if (!user_info.empty())
{
std::size_t n = user_info.find(':');
if (n != std::string::npos)
2018-06-11 12:13:00 +00:00
{
credentials.setUsername(user_info.substr(0, n));
credentials.setPassword(user_info.substr(n + 1));
2023-02-02 13:12:20 +00:00
}
}
}
2021-12-17 11:03:37 +00:00
Block StorageURLSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
{
for (const auto & virtual_column : requested_virtual_columns)
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
return sample_block;
}
2021-12-17 11:03:37 +00:00
StorageURLSource::StorageURLSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
std::shared_ptr<IteratorWrapper> uri_iterator_,
const std::string & http_method,
std::function<void(std::ostream &)> callback,
const String & format,
const std::optional<FormatSettings> & format_settings,
String name_,
const Block & sample_block,
ContextPtr context,
const ColumnsDescription & columns,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
size_t download_threads,
const HTTPHeaderEntries & headers_,
const URIParams & params,
bool glob_url)
: ISource(getHeader(sample_block, requested_virtual_columns_), false), name(std::move(name_)), requested_virtual_columns(requested_virtual_columns_), uri_iterator(uri_iterator_)
{
auto headers = getHeaders(headers_);
2018-06-11 12:13:00 +00:00
/// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline.
initialize = [=, this]()
{
2023-06-13 14:43:50 +00:00
std::vector<String> current_uri_options;
std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> uri_and_buf;
do
{
current_uri_options = (*uri_iterator)();
if (current_uri_options.empty())
return false;
auto first_option = current_uri_options.cbegin();
uri_and_buf = getFirstAvailableURIAndReadBuffer(
first_option,
current_uri_options.end(),
context,
params,
http_method,
callback,
timeouts,
credentials,
headers,
glob_url,
2023-06-23 13:43:40 +00:00
current_uri_options.size() == 1);
2023-06-13 14:43:50 +00:00
/// If file is empty and engine_url_skip_empty_files=1, skip it and go to the next file.
}
while (context->getSettingsRef().engine_url_skip_empty_files && uri_and_buf.second->eof());
curr_uri = uri_and_buf.first;
read_buf = std::move(uri_and_buf.second);
2023-06-23 13:43:40 +00:00
if (auto file_progress_callback = context->getFileProgressCallback())
{
size_t file_size = tryGetFileSizeFromReadBuffer(*read_buf).value_or(0);
LOG_DEBUG(&Poco::Logger::get("URL"), "Send file size {}", file_size);
2023-06-23 13:43:40 +00:00
file_progress_callback(FileProgress(0, file_size));
2023-04-21 12:11:18 +00:00
}
// TODO: Pass max_parsing_threads and max_download_threads adjusted for num_streams.
input_format = FormatFactory::instance().getInput(
format,
*read_buf,
sample_block,
context,
max_block_size,
format_settings,
download_threads,
/*max_download_threads*/ std::nullopt,
/* is_remote_fs */ true,
compression_method);
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
builder.addSimpleTransform([&](const Block & cur_header)
{ return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context); });
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
return true;
};
}
2023-01-19 05:18:07 +00:00
Chunk StorageURLSource::generate()
{
while (true)
{
if (isCancelled())
2018-06-11 12:13:00 +00:00
{
if (reader)
reader->cancel();
break;
2018-06-11 12:13:00 +00:00
}
if (!reader && !initialize())
return {};
2023-01-19 02:19:04 +00:00
Chunk chunk;
if (reader->pull(chunk))
{
UInt64 num_rows = chunk.getNumRows();
2023-06-23 13:43:40 +00:00
size_t chunk_size = input_format->getApproxBytesReadForChunk();
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
const String & path{curr_uri.getPath()};
2021-12-17 11:03:37 +00:00
for (const auto & virtual_column : requested_virtual_columns)
2020-08-28 01:21:08 +00:00
{
if (virtual_column.name == "_path")
2023-03-28 20:28:28 +00:00
{
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, path)->convertToFullColumnIfConst());
2023-03-28 20:28:28 +00:00
}
else if (virtual_column.name == "_file")
2023-02-28 15:11:21 +00:00
{
size_t last_slash_pos = path.find_last_of('/');
auto column = virtual_column.type->createColumnConst(num_rows, path.substr(last_slash_pos + 1));
chunk.addColumn(column->convertToFullColumnIfConst());
2023-03-28 20:28:28 +00:00
}
}
2023-03-28 20:28:28 +00:00
return chunk;
2023-01-19 02:19:04 +00:00
}
pipeline->reset();
reader.reset();
2023-06-23 13:43:40 +00:00
input_format.reset();
read_buf.reset();
}
return {};
}
2023-06-13 14:43:50 +00:00
std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource::getFirstAvailableURIAndReadBuffer(
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
ContextPtr context,
const URIParams & params,
const String & http_method,
std::function<void(std::ostream &)> callback,
const ConnectionTimeouts & timeouts,
Poco::Net::HTTPBasicCredentials & credentials,
const HTTPHeaderEntries & headers,
bool glob_url,
2023-06-23 13:43:40 +00:00
bool delay_initialization)
{
String first_exception_message;
ReadSettings read_settings = context->getReadSettings();
size_t options = std::distance(option, end);
2023-06-15 12:59:46 +00:00
std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> last_skipped_empty_res;
for (; option != end; ++option)
{
bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs && option == std::prev(end);
2023-07-20 12:38:41 +00:00
auto request_uri = Poco::URI(*option, context->getSettingsRef().disable_url_encoding);
2021-10-03 04:28:28 +00:00
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
2023-01-19 02:19:04 +00:00
setCredentials(credentials, request_uri);
2022-02-08 09:59:20 +00:00
const auto settings = context->getSettings();
2023-01-19 02:19:04 +00:00
try
{
auto res = std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
http_method,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
settings.max_read_buffer_size,
read_settings,
headers,
&context->getRemoteHostFilter(),
delay_initialization,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
2023-06-15 12:59:46 +00:00
if (context->getSettingsRef().engine_url_skip_empty_files && res->eof() && option != std::prev(end))
{
last_skipped_empty_res = {request_uri, std::move(res)};
continue;
}
return std::make_tuple(request_uri, std::move(res));
}
catch (...)
{
if (options == 1)
throw;
if (first_exception_message.empty())
first_exception_message = getCurrentExceptionMessage(false);
tryLogCurrentException(__PRETTY_FUNCTION__);
continue;
}
}
2023-06-15 12:59:46 +00:00
/// If all options are unreachable except empty ones that we skipped,
/// return last empty result. It will be skipped later.
if (last_skipped_empty_res.second)
return last_skipped_empty_res;
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", options, first_exception_message);
2018-06-11 12:13:00 +00:00
}
2018-06-16 05:54:06 +00:00
2021-07-23 14:25:35 +00:00
StorageURLSink::StorageURLSink(
2021-10-26 09:31:01 +00:00
const String & uri,
2021-07-23 14:25:35 +00:00
const String & format,
const std::optional<FormatSettings> & format_settings,
const Block & sample_block,
ContextPtr context,
const ConnectionTimeouts & timeouts,
2021-10-26 09:31:01 +00:00
const CompressionMethod compression_method,
2023-04-21 12:11:18 +00:00
const HTTPHeaderEntries & headers,
2021-10-28 12:44:12 +00:00
const String & http_method)
2021-07-23 14:25:35 +00:00
: SinkToStorage(sample_block)
2020-04-28 00:56:44 +00:00
{
std::string content_type = FormatFactory::instance().getContentType(format, context, format_settings);
2022-02-12 02:43:53 +00:00
std::string content_encoding = toContentEncodingName(compression_method);
2020-04-28 00:56:44 +00:00
write_buf = wrapWriteBufferWithCompressionMethod(
2023-04-21 12:11:18 +00:00
std::make_unique<WriteBufferFromHTTP>(Poco::URI(uri), http_method, content_type, content_encoding, headers, timeouts),
compression_method,
3);
writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block, context, format_settings);
2020-04-28 00:56:44 +00:00
}
2018-06-16 05:54:06 +00:00
2023-04-21 12:11:18 +00:00
2021-07-23 14:25:35 +00:00
void StorageURLSink::consume(Chunk chunk)
2020-07-09 01:00:16 +00:00
{
std::lock_guard lock(cancel_mutex);
if (cancelled)
return;
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
2020-07-09 01:00:16 +00:00
}
void StorageURLSink::onCancel()
{
std::lock_guard lock(cancel_mutex);
finalize();
cancelled = true;
}
void StorageURLSink::onException(std::exception_ptr exception)
{
std::lock_guard lock(cancel_mutex);
try
{
std::rethrow_exception(exception);
}
catch (...)
{
/// An exception context is needed to proper delete write buffers without finalization
release();
}
}
2021-07-23 14:25:35 +00:00
void StorageURLSink::onFinish()
2020-07-09 01:00:16 +00:00
{
std::lock_guard lock(cancel_mutex);
finalize();
}
void StorageURLSink::finalize()
{
if (!writer)
return;
Fix possible "Cannot write to finalized buffer" It is still possible to get this error since onException does not finalize format correctly. Here is an example of such error, that was found by CI [1]: <details> [ 2686 ] {fa01bf02-73f6-4f7f-b14f-e725de6d7f9b} <Fatal> : Logical error: 'Cannot write to finalized buffer'. [ 34577 ] {} <Fatal> BaseDaemon: ######################################## [ 34577 ] {} <Fatal> BaseDaemon: (version 22.6.1.1, build id: AB8040A6769E01A0) (from thread 2686) (query_id: fa01bf02-73f6-4f7f-b14f-e725de6d7f9b) (query: insert into test_02302 select number from numbers(10) settings s3_truncate_on_insert=1;) Received signal Aborted (6) [ 34577 ] {} <Fatal> BaseDaemon: [ 34577 ] {} <Fatal> BaseDaemon: Stack trace: 0x7fcbaa5a703b 0x7fcbaa586859 0xfad9bab 0xfad9e05 0xfaf6a3b 0x24a48c7f 0x258fb9b9 0x258f2004 0x258b88f4 0x258b863b 0x2581773d 0x258177ce 0x24bb5e98 0xfad01d6 0xfad0105 0x2419b11d 0xfad01d6 0xfad0105 0x2215afbb 0x2215aa48 0xfad01d6 0xfad0105 0xfcc265d 0x225cc546 0x249a1c40 0x249bc1b6 0x2685902c 0x26859505 0x269d7767 0x269d504c 0x7fcbaa75e609 0x7fcbaa683163 [ 34577 ] {} <Fatal> BaseDaemon: 3. raise @ 0x7fcbaa5a703b in ? [ 34577 ] {} <Fatal> BaseDaemon: 4. abort @ 0x7fcbaa586859 in ? [ 34577 ] {} <Fatal> BaseDaemon: 5. ./build_docker/../src/Common/Exception.cpp:47: DB::abortOnFailedAssertion(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfad9bab in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 6. ./build_docker/../src/Common/Exception.cpp:70: DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0xfad9e05 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 7. ./build_docker/../src/IO/WriteBuffer.h:0: DB::WriteBuffer::write(char const*, unsigned long) @ 0xfaf6a3b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 8. ./build_docker/../src/Processors/Formats/Impl/ArrowBufferedStreams.cpp:47: DB::ArrowBufferedOutputStream::Write(void const*, long) @ 0x24a48c7f in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 9. long parquet::ThriftSerializer::Serialize<parquet::format::FileMetaData>(parquet::format::FileMetaData const*, arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) @ 0x258fb9b9 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 10. parquet::FileMetaData::FileMetaDataImpl::WriteTo(arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) const @ 0x258f2004 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 11. parquet::WriteFileMetaData(parquet::FileMetaData const&, arrow::io::OutputStream*) @ 0x258b88f4 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 12. parquet::ParquetFileWriter::~ParquetFileWriter() @ 0x258b863b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 13. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x2581773d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 14. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x258177ce in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 15. ./build_docker/../src/Processors/Formats/Impl/ParquetBlockOutputFormat.h:27: DB::ParquetBlockOutputFormat::~ParquetBlockOutputFormat() @ 0x24bb5e98 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 16. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 17. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 18.1. inlined from ./build_docker/../contrib/libcxx/include/__memory/unique_ptr.h:312: std::__1::unique_ptr<DB::WriteBuffer, std::__1::default_delete<DB::WriteBuffer> >::reset(DB::WriteBuffer*) [ 34577 ] {} <Fatal> BaseDaemon: 18.2. inlined from ../contrib/libcxx/include/__memory/unique_ptr.h:269: ~unique_ptr [ 34577 ] {} <Fatal> BaseDaemon: 18. ../src/Storages/StorageS3.cpp:566: DB::StorageS3Sink::~StorageS3Sink() @ 0x2419b11d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 19. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 20. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 21. ./build_docker/../contrib/abseil-cpp/absl/container/internal/raw_hash_set.h:1662: absl::lts_20211102::container_internal::raw_hash_set<absl::lts_20211102::container_internal::FlatHashMapPolicy<StringRef, std::__1::shared_ptr<DB::SinkToStorage> >, absl::lts_20211102::hash_internal::Hash<StringRef>, std::__1::equal_to<StringRef>, std::__1::allocator<std::__1::pair<StringRef const, std::__1::shared_ptr<DB::SinkToStorage> > > >::destroy_slots() @ 0x2215afbb in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 22.1. inlined from ./build_docker/../contrib/libcxx/include/string:1445: std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >::__is_long() const [ 34577 ] {} <Fatal> BaseDaemon: 22.2. inlined from ../contrib/libcxx/include/string:2231: ~basic_string [ 34577 ] {} <Fatal> BaseDaemon: 22. ../src/Storages/PartitionedSink.h:14: DB::PartitionedSink::~PartitionedSink() @ 0x2215aa48 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 23. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 24. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 25. ./build_docker/../contrib/libcxx/include/vector:802: std::__1::vector<std::__1::shared_ptr<DB::IProcessor>, std::__1::allocator<std::__1::shared_ptr<DB::IProcessor> > >::__base_destruct_at_end(std::__1::shared_ptr<DB::IProcessor>*) @ 0xfcc265d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 26.1. inlined from ./build_docker/../contrib/libcxx/include/vector:402: ~vector [ 34577 ] {} <Fatal> BaseDaemon: 26.2. inlined from ../src/QueryPipeline/QueryPipeline.cpp:29: ~QueryPipeline [ 34577 ] {} <Fatal> BaseDaemon: 26. ../src/QueryPipeline/QueryPipeline.cpp:535: DB::QueryPipeline::reset() @ 0x225cc546 in /usr/bin/clickhouse [ 614 ] {} <Fatal> Application: Child process was terminated by signal 6. </details> [1]: https://s3.amazonaws.com/clickhouse-test-reports/37542/8a224239c1d922158b4dc9f5d6609dca836dfd06/stress_test__undefined__actions_.html Follow-up for: #36979 Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-30 10:13:47 +00:00
try
{
writer->finalize();
writer->flush();
write_buf->finalize();
}
catch (...)
{
/// Stop ParallelFormattingOutputFormat correctly.
release();
Fix possible "Cannot write to finalized buffer" It is still possible to get this error since onException does not finalize format correctly. Here is an example of such error, that was found by CI [1]: <details> [ 2686 ] {fa01bf02-73f6-4f7f-b14f-e725de6d7f9b} <Fatal> : Logical error: 'Cannot write to finalized buffer'. [ 34577 ] {} <Fatal> BaseDaemon: ######################################## [ 34577 ] {} <Fatal> BaseDaemon: (version 22.6.1.1, build id: AB8040A6769E01A0) (from thread 2686) (query_id: fa01bf02-73f6-4f7f-b14f-e725de6d7f9b) (query: insert into test_02302 select number from numbers(10) settings s3_truncate_on_insert=1;) Received signal Aborted (6) [ 34577 ] {} <Fatal> BaseDaemon: [ 34577 ] {} <Fatal> BaseDaemon: Stack trace: 0x7fcbaa5a703b 0x7fcbaa586859 0xfad9bab 0xfad9e05 0xfaf6a3b 0x24a48c7f 0x258fb9b9 0x258f2004 0x258b88f4 0x258b863b 0x2581773d 0x258177ce 0x24bb5e98 0xfad01d6 0xfad0105 0x2419b11d 0xfad01d6 0xfad0105 0x2215afbb 0x2215aa48 0xfad01d6 0xfad0105 0xfcc265d 0x225cc546 0x249a1c40 0x249bc1b6 0x2685902c 0x26859505 0x269d7767 0x269d504c 0x7fcbaa75e609 0x7fcbaa683163 [ 34577 ] {} <Fatal> BaseDaemon: 3. raise @ 0x7fcbaa5a703b in ? [ 34577 ] {} <Fatal> BaseDaemon: 4. abort @ 0x7fcbaa586859 in ? [ 34577 ] {} <Fatal> BaseDaemon: 5. ./build_docker/../src/Common/Exception.cpp:47: DB::abortOnFailedAssertion(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfad9bab in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 6. ./build_docker/../src/Common/Exception.cpp:70: DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0xfad9e05 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 7. ./build_docker/../src/IO/WriteBuffer.h:0: DB::WriteBuffer::write(char const*, unsigned long) @ 0xfaf6a3b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 8. ./build_docker/../src/Processors/Formats/Impl/ArrowBufferedStreams.cpp:47: DB::ArrowBufferedOutputStream::Write(void const*, long) @ 0x24a48c7f in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 9. long parquet::ThriftSerializer::Serialize<parquet::format::FileMetaData>(parquet::format::FileMetaData const*, arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) @ 0x258fb9b9 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 10. parquet::FileMetaData::FileMetaDataImpl::WriteTo(arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) const @ 0x258f2004 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 11. parquet::WriteFileMetaData(parquet::FileMetaData const&, arrow::io::OutputStream*) @ 0x258b88f4 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 12. parquet::ParquetFileWriter::~ParquetFileWriter() @ 0x258b863b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 13. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x2581773d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 14. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x258177ce in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 15. ./build_docker/../src/Processors/Formats/Impl/ParquetBlockOutputFormat.h:27: DB::ParquetBlockOutputFormat::~ParquetBlockOutputFormat() @ 0x24bb5e98 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 16. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 17. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 18.1. inlined from ./build_docker/../contrib/libcxx/include/__memory/unique_ptr.h:312: std::__1::unique_ptr<DB::WriteBuffer, std::__1::default_delete<DB::WriteBuffer> >::reset(DB::WriteBuffer*) [ 34577 ] {} <Fatal> BaseDaemon: 18.2. inlined from ../contrib/libcxx/include/__memory/unique_ptr.h:269: ~unique_ptr [ 34577 ] {} <Fatal> BaseDaemon: 18. ../src/Storages/StorageS3.cpp:566: DB::StorageS3Sink::~StorageS3Sink() @ 0x2419b11d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 19. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 20. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 21. ./build_docker/../contrib/abseil-cpp/absl/container/internal/raw_hash_set.h:1662: absl::lts_20211102::container_internal::raw_hash_set<absl::lts_20211102::container_internal::FlatHashMapPolicy<StringRef, std::__1::shared_ptr<DB::SinkToStorage> >, absl::lts_20211102::hash_internal::Hash<StringRef>, std::__1::equal_to<StringRef>, std::__1::allocator<std::__1::pair<StringRef const, std::__1::shared_ptr<DB::SinkToStorage> > > >::destroy_slots() @ 0x2215afbb in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 22.1. inlined from ./build_docker/../contrib/libcxx/include/string:1445: std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >::__is_long() const [ 34577 ] {} <Fatal> BaseDaemon: 22.2. inlined from ../contrib/libcxx/include/string:2231: ~basic_string [ 34577 ] {} <Fatal> BaseDaemon: 22. ../src/Storages/PartitionedSink.h:14: DB::PartitionedSink::~PartitionedSink() @ 0x2215aa48 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 23. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 24. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 25. ./build_docker/../contrib/libcxx/include/vector:802: std::__1::vector<std::__1::shared_ptr<DB::IProcessor>, std::__1::allocator<std::__1::shared_ptr<DB::IProcessor> > >::__base_destruct_at_end(std::__1::shared_ptr<DB::IProcessor>*) @ 0xfcc265d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 26.1. inlined from ./build_docker/../contrib/libcxx/include/vector:402: ~vector [ 34577 ] {} <Fatal> BaseDaemon: 26.2. inlined from ../src/QueryPipeline/QueryPipeline.cpp:29: ~QueryPipeline [ 34577 ] {} <Fatal> BaseDaemon: 26. ../src/QueryPipeline/QueryPipeline.cpp:535: DB::QueryPipeline::reset() @ 0x225cc546 in /usr/bin/clickhouse [ 614 ] {} <Fatal> Application: Child process was terminated by signal 6. </details> [1]: https://s3.amazonaws.com/clickhouse-test-reports/37542/8a224239c1d922158b4dc9f5d6609dca836dfd06/stress_test__undefined__actions_.html Follow-up for: #36979 Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-30 10:13:47 +00:00
throw;
}
2020-07-09 01:00:16 +00:00
}
void StorageURLSink::release()
{
writer.reset();
write_buf->finalize();
}
2021-10-26 09:31:01 +00:00
class PartitionedStorageURLSink : public PartitionedSink
{
public:
PartitionedStorageURLSink(
const ASTPtr & partition_by,
const String & uri_,
const String & format_,
const std::optional<FormatSettings> & format_settings_,
const Block & sample_block_,
ContextPtr context_,
const ConnectionTimeouts & timeouts_,
const CompressionMethod compression_method_,
2023-04-21 12:11:18 +00:00
const HTTPHeaderEntries & headers_,
2021-10-28 12:44:12 +00:00
const String & http_method_)
: PartitionedSink(partition_by, context_, sample_block_)
, uri(uri_)
, format(format_)
, format_settings(format_settings_)
, sample_block(sample_block_)
, context(context_)
, timeouts(timeouts_)
, compression_method(compression_method_)
2023-04-21 12:11:18 +00:00
, headers(headers_)
, http_method(http_method_)
2021-10-26 09:31:01 +00:00
{
}
SinkPtr createSinkForPartition(const String & partition_id) override
{
auto partition_path = PartitionedSink::replaceWildcards(uri, partition_id);
context->getRemoteHostFilter().checkURL(Poco::URI(partition_path));
return std::make_shared<StorageURLSink>(
2023-04-21 12:11:18 +00:00
partition_path, format, format_settings, sample_block, context, timeouts, compression_method, headers, http_method);
2021-10-26 09:31:01 +00:00
}
private:
const String uri;
const String format;
const std::optional<FormatSettings> format_settings;
const Block sample_block;
ContextPtr context;
const ConnectionTimeouts timeouts;
const CompressionMethod compression_method;
2023-04-21 12:11:18 +00:00
const HTTPHeaderEntries headers;
2021-10-28 12:44:12 +00:00
const String http_method;
2021-10-26 09:31:01 +00:00
};
2020-07-09 01:00:16 +00:00
std::string IStorageURLBase::getReadMethod() const
{
return Poco::Net::HTTPRequest::HTTP_GET;
}
std::vector<std::pair<std::string, std::string>> IStorageURLBase::getReadURIParams(
const Names & /*column_names*/,
const StorageSnapshotPtr & /*storage_snapshot*/,
2018-06-11 12:13:00 +00:00
const SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
2018-06-11 12:13:00 +00:00
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/) const
{
return {};
}
std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(
const Names & /*column_names*/,
2022-02-28 13:29:05 +00:00
const ColumnsDescription & /* columns_description */,
2018-06-11 12:13:00 +00:00
const SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/) const
{
return nullptr;
}
2022-02-07 19:40:47 +00:00
ColumnsDescription IStorageURLBase::getTableStructureFromData(
const String & format,
const String & uri,
CompressionMethod compression_method,
const HTTPHeaderEntries & headers,
2022-02-07 19:40:47 +00:00
const std::optional<FormatSettings> & format_settings,
ContextPtr context)
{
context->getRemoteHostFilter().checkURL(Poco::URI(uri));
2022-02-08 09:59:20 +00:00
Poco::Net::HTTPBasicCredentials credentials;
2022-02-07 19:40:47 +00:00
2022-02-09 16:14:14 +00:00
std::vector<String> urls_to_check;
2022-02-07 19:40:47 +00:00
if (urlWithGlobs(uri))
{
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses, "url");
2022-02-07 19:40:47 +00:00
for (const auto & description : uri_descriptions)
{
auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses, "url");
2022-02-07 19:40:47 +00:00
urls_to_check.insert(urls_to_check.end(), options.begin(), options.end());
}
2022-02-09 16:14:14 +00:00
}
else
{
urls_to_check = {uri};
}
std::optional<ColumnsDescription> columns_from_cache;
2022-08-15 12:33:08 +00:00
if (context->getSettingsRef().schema_inference_use_cache_for_url)
2022-06-28 16:13:42 +00:00
columns_from_cache = tryGetColumnsFromCache(urls_to_check, headers, credentials, format, format_settings, context);
2022-02-07 19:40:47 +00:00
2023-06-13 14:43:50 +00:00
ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin(), first = true](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
2022-02-09 16:14:14 +00:00
{
2023-06-13 14:43:50 +00:00
std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> uri_and_buf;
do
{
2023-06-13 14:43:50 +00:00
if (it == urls_to_check.cend())
{
if (first)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. "
"You must specify table structure manually",
format);
return nullptr;
}
2023-06-13 14:43:50 +00:00
uri_and_buf = StorageURLSource::getFirstAvailableURIAndReadBuffer(
it,
urls_to_check.cend(),
context,
{},
Poco::Net::HTTPRequest::HTTP_GET,
{},
getHTTPTimeouts(context),
credentials,
headers,
false,
false);
2023-06-13 14:43:50 +00:00
++it;
} while (context->getSettingsRef().engine_url_skip_empty_files && uri_and_buf.second->eof());
first = false;
2023-04-21 12:11:18 +00:00
return wrapReadBufferWithCompressionMethod(
2023-06-13 14:43:50 +00:00
std::move(uri_and_buf.second),
2023-04-21 12:11:18 +00:00
compression_method,
static_cast<int>(context->getSettingsRef().zstd_window_log_max));
};
2022-02-07 19:40:47 +00:00
ColumnsDescription columns;
if (columns_from_cache)
columns = *columns_from_cache;
else
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context);
2022-08-15 12:33:08 +00:00
if (context->getSettingsRef().schema_inference_use_cache_for_url)
addColumnsToCache(urls_to_check, columns, format, format_settings, context);
return columns;
2022-02-07 19:40:47 +00:00
}
2022-05-13 18:39:19 +00:00
bool IStorageURLBase::supportsSubsetOfColumns() const
2022-02-23 19:31:16 +00:00
{
2022-05-13 18:39:19 +00:00
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
2022-02-23 19:31:16 +00:00
}
bool IStorageURLBase::prefersLargeBlocks() const
{
return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(format_name);
}
bool IStorageURLBase::parallelizeOutputAfterReading(ContextPtr context) const
{
return FormatFactory::instance().checkParallelizeOutputAfterReading(format_name, context);
}
2020-08-03 13:54:14 +00:00
Pipe IStorageURLBase::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
2018-06-25 12:21:54 +00:00
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
2018-06-11 12:13:00 +00:00
{
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
2021-10-28 13:56:45 +00:00
2022-02-23 19:31:16 +00:00
ColumnsDescription columns_description;
Block block_for_format;
2022-05-13 18:39:19 +00:00
if (supportsSubsetOfColumns())
2022-02-23 19:31:16 +00:00
{
columns_description = storage_snapshot->getDescriptionForColumns(column_names);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
2022-02-23 19:31:16 +00:00
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
2022-02-23 19:31:16 +00:00
}
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
std::vector<NameAndTypePair> requested_virtual_columns;
for (const auto & virtual_column : getVirtuals())
{
if (column_names_set.contains(virtual_column.name))
requested_virtual_columns.push_back(virtual_column);
}
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper{nullptr};
bool is_url_with_globs = urlWithGlobs(uri);
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
if (distributed_processing)
2021-10-26 09:31:01 +00:00
{
iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>(
[callback = local_context->getReadTaskCallback(), max_addresses]()
{
String next_uri = callback();
if (next_uri.empty())
return StorageURLSource::FailoverOptions{};
return getFailoverOptions(next_uri, max_addresses);
});
}
else if (is_url_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
2023-04-21 17:35:17 +00:00
auto glob_iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(uri, max_addresses);
iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>([glob_iterator, max_addresses]()
2021-12-17 11:03:37 +00:00
{
String next_uri = glob_iterator->next();
if (next_uri.empty())
return StorageURLSource::FailoverOptions{};
return getFailoverOptions(next_uri, max_addresses);
});
if (num_streams > glob_iterator->size())
num_streams = glob_iterator->size();
2021-10-26 09:31:01 +00:00
}
else
{
2023-04-24 19:08:53 +00:00
iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>([&, max_addresses, done = false]() mutable
{
if (done)
return StorageURLSource::FailoverOptions{};
done = true;
return getFailoverOptions(uri, max_addresses);
});
num_streams = 1;
}
Pipes pipes;
pipes.reserve(num_streams);
size_t download_threads = num_streams >= max_download_threads ? 1 : (max_download_threads / num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageURLSource>(
requested_virtual_columns,
iterator_wrapper,
2021-10-26 09:31:01 +00:00
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
2021-10-26 09:31:01 +00:00
format_name,
format_settings,
getName(),
2022-02-23 19:31:16 +00:00
block_for_format,
2021-10-26 09:31:01 +00:00
local_context,
2022-02-23 19:31:16 +00:00
columns_description,
2021-10-26 09:31:01 +00:00
max_block_size,
2023-04-21 12:11:18 +00:00
getHTTPTimeouts(local_context),
compression_method,
download_threads,
headers,
params,
is_url_with_globs));
2021-10-26 09:31:01 +00:00
}
return Pipe::unitePipes(std::move(pipes));
2018-06-11 12:13:00 +00:00
}
2021-04-21 14:36:04 +00:00
Pipe StorageURLWithFailover::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
2021-04-21 14:36:04 +00:00
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t /*num_streams*/)
2021-04-21 14:36:04 +00:00
{
2022-02-28 13:29:05 +00:00
ColumnsDescription columns_description;
Block block_for_format;
2022-05-13 18:39:19 +00:00
if (supportsSubsetOfColumns())
2022-02-28 13:29:05 +00:00
{
2022-05-20 13:20:52 +00:00
columns_description = storage_snapshot->getDescriptionForColumns(column_names);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
2022-02-28 13:29:05 +00:00
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
2022-02-28 13:29:05 +00:00
}
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
2021-11-01 09:52:27 +00:00
auto iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>([&, done = false]() mutable
{
if (done)
return StorageURLSource::FailoverOptions{};
done = true;
return uri_options;
});
2023-04-21 12:11:18 +00:00
auto pipe = Pipe(std::make_shared<StorageURLSource>(
std::vector<NameAndTypePair>{},
iterator_wrapper,
2021-10-03 13:53:24 +00:00
getReadMethod(),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
2021-10-03 13:53:24 +00:00
format_name,
format_settings,
getName(),
2022-02-28 13:29:05 +00:00
block_for_format,
2021-10-03 13:53:24 +00:00
local_context,
2022-02-28 13:29:05 +00:00
columns_description,
2021-10-03 13:53:24 +00:00
max_block_size,
2023-04-21 12:11:18 +00:00
getHTTPTimeouts(local_context),
compression_method,
local_context->getSettingsRef().max_download_threads,
headers,
params));
2021-10-03 13:53:24 +00:00
std::shuffle(uri_options.begin(), uri_options.end(), thread_local_rng);
return pipe;
2021-04-21 14:36:04 +00:00
}
SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, bool /*async_insert*/)
2018-06-11 12:13:00 +00:00
{
2021-10-28 12:44:12 +00:00
if (http_method.empty())
http_method = Poco::Net::HTTPRequest::HTTP_POST;
2021-10-26 09:31:01 +00:00
bool has_wildcards = uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
2021-10-26 12:22:13 +00:00
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
bool is_partitioned_implementation = partition_by_ast && has_wildcards;
2021-10-26 09:31:01 +00:00
if (is_partitioned_implementation)
{
return std::make_shared<PartitionedStorageURLSink>(
2021-10-26 12:22:13 +00:00
partition_by_ast,
uri,
format_name,
format_settings,
metadata_snapshot->getSampleBlock(),
context,
2023-04-21 12:11:18 +00:00
getHTTPTimeouts(context),
compression_method,
2023-04-21 12:11:18 +00:00
headers,
http_method);
2021-10-26 09:31:01 +00:00
}
else
{
return std::make_shared<StorageURLSink>(
uri,
format_name,
format_settings,
metadata_snapshot->getSampleBlock(),
context,
2023-04-21 12:11:18 +00:00
getHTTPTimeouts(context),
compression_method,
2023-04-21 12:11:18 +00:00
headers,
http_method);
2021-10-26 09:31:01 +00:00
}
2018-06-11 12:13:00 +00:00
}
2018-06-16 05:54:06 +00:00
NamesAndTypesList IStorageURLBase::getVirtuals() const
{
return NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}
SchemaCache & IStorageURLBase::getSchemaCache(const ContextPtr & context)
{
static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_url", DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
std::optional<ColumnsDescription> IStorageURLBase::tryGetColumnsFromCache(
const Strings & urls,
const HTTPHeaderEntries & headers,
2022-06-28 16:13:42 +00:00
const Poco::Net::HTTPBasicCredentials & credentials,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache(context);
for (const auto & url : urls)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
2022-06-28 16:13:42 +00:00
auto last_mod_time = getLastModificationTime(url, headers, credentials, context);
/// Some URLs could not have Last-Modified header, in this case we cannot be sure that
/// data wasn't changed after adding it's schema to cache. Use schema from cache only if
/// special setting for this case is enabled.
if (!last_mod_time && !context->getSettingsRef().schema_inference_cache_require_modification_time_for_url)
return 0;
return last_mod_time;
};
2022-08-19 16:42:23 +00:00
auto cache_key = getKeyForSchemaCache(url, format_name, format_settings, context);
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void IStorageURLBase::addColumnsToCache(
const Strings & urls,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache(context);
2022-08-19 16:42:23 +00:00
auto cache_keys = getKeysForSchemaCache(urls, format_name, format_settings, context);
schema_cache.addMany(cache_keys, columns);
}
2022-06-28 16:13:42 +00:00
std::optional<time_t> IStorageURLBase::getLastModificationTime(
const String & url,
const HTTPHeaderEntries & headers,
2022-06-28 16:13:42 +00:00
const Poco::Net::HTTPBasicCredentials & credentials,
const ContextPtr & context)
{
2022-11-03 11:10:49 +00:00
auto settings = context->getSettingsRef();
try
{
ReadWriteBufferFromHTTP buf(
Poco::URI(url),
Poco::Net::HTTPRequest::HTTP_GET,
{},
2023-04-21 12:11:18 +00:00
getHTTPTimeouts(context),
2022-06-28 16:13:42 +00:00
credentials,
2022-11-03 11:10:49 +00:00
settings.max_http_get_redirects,
settings.max_read_buffer_size,
context->getReadSettings(),
headers,
&context->getRemoteHostFilter(),
true,
false,
false);
return buf.getLastModificationTime();
}
catch (...)
{
return std::nullopt;
}
}
2021-04-23 12:18:23 +00:00
StorageURL::StorageURL(
2021-10-26 09:31:01 +00:00
const String & uri_,
2021-04-23 12:18:23 +00:00
const StorageID & table_id_,
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
2021-09-07 11:17:25 +00:00
const String & compression_method_,
const HTTPHeaderEntries & headers_,
2021-10-28 12:44:12 +00:00
const String & http_method_,
ASTPtr partition_by_,
bool distributed_processing_)
: IStorageURLBase(
uri_,
context_,
table_id_,
format_name_,
format_settings_,
columns_,
constraints_,
comment,
compression_method_,
headers_,
http_method_,
partition_by_,
distributed_processing_)
2021-03-23 11:29:29 +00:00
{
2021-10-26 09:31:01 +00:00
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
2023-06-15 13:49:49 +00:00
context_->getHTTPHeaderFilter().checkHeaders(headers);
2021-03-23 11:29:29 +00:00
}
2021-04-21 12:32:57 +00:00
2021-04-21 14:36:04 +00:00
StorageURLWithFailover::StorageURLWithFailover(
2021-05-02 16:33:45 +00:00
const std::vector<String> & uri_options_,
const StorageID & table_id_,
const String & format_name_,
const std::optional<FormatSettings> & format_settings_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_)
2021-10-26 09:31:01 +00:00
: StorageURL("", table_id_, format_name_, format_settings_, columns_, constraints_, String{}, context_, compression_method_)
2021-04-21 14:36:04 +00:00
{
for (const auto & uri_option : uri_options_)
{
2021-04-22 07:37:20 +00:00
Poco::URI poco_uri(uri_option);
context_->getRemoteHostFilter().checkURL(poco_uri);
2021-04-21 14:36:04 +00:00
LOG_DEBUG(&Poco::Logger::get("StorageURLDistributed"), "Adding URL option: {}", uri_option);
uri_options.emplace_back(uri_option);
2021-04-21 14:36:04 +00:00
}
}
2021-04-21 12:32:57 +00:00
FormatSettings StorageURL::getFormatSettingsFromArgs(const StorageFactory::Arguments & args)
{
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
FormatSettings format_settings;
if (args.storage_def->settings)
{
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
{
user_format_settings.set(change.name, change.value);
}
}
// Apply changes from SETTINGS clause, with validation.
user_format_settings.applyChanges(args.storage_def->settings->changes);
format_settings = getFormatSettings(args.getContext(), user_format_settings);
2021-04-21 12:32:57 +00:00
}
else
{
format_settings = getFormatSettings(args.getContext());
}
return format_settings;
}
2022-06-17 12:53:16 +00:00
ASTs::iterator StorageURL::collectHeaders(
ASTs & url_function_args, HTTPHeaderEntries & header_entries, ContextPtr context)
2022-06-17 12:53:16 +00:00
{
2022-06-22 14:51:18 +00:00
ASTs::iterator headers_it = url_function_args.end();
2022-06-17 12:53:16 +00:00
2022-10-18 09:40:12 +00:00
for (auto * arg_it = url_function_args.begin(); arg_it != url_function_args.end(); ++arg_it)
2022-06-17 12:53:16 +00:00
{
2022-06-22 14:51:18 +00:00
const auto * headers_ast_function = (*arg_it)->as<ASTFunction>();
2022-12-19 11:16:50 +00:00
if (headers_ast_function && headers_ast_function->name == "headers")
2022-06-17 12:53:16 +00:00
{
2022-06-22 14:51:18 +00:00
if (headers_it != url_function_args.end())
2022-06-17 12:53:16 +00:00
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"URL table function can have only one key-value argument: headers=(). {}",
bad_arguments_error_message);
const auto * headers_function_args_expr = assert_cast<const ASTExpressionList *>(headers_ast_function->arguments.get());
auto headers_function_args = headers_function_args_expr->children;
for (auto & header_arg : headers_function_args)
{
const auto * header_ast = header_arg->as<ASTFunction>();
2022-06-22 14:51:18 +00:00
if (!header_ast || header_ast->name != "equals")
2022-06-17 12:53:16 +00:00
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Headers argument is incorrect. {}", bad_arguments_error_message);
const auto * header_args_expr = assert_cast<const ASTExpressionList *>(header_ast->arguments.get());
auto header_args = header_args_expr->children;
if (header_args.size() != 2)
2022-06-22 14:51:18 +00:00
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Headers argument is incorrect: expected 2 arguments, got {}",
header_args.size());
2022-06-17 12:53:16 +00:00
auto ast_literal = evaluateConstantExpressionOrIdentifierAsLiteral(header_args[0], context);
2022-06-22 14:51:18 +00:00
auto arg_name_value = ast_literal->as<ASTLiteral>()->value;
if (arg_name_value.getType() != Field::Types::Which::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as header name");
auto arg_name = arg_name_value.safeGet<String>();
2022-06-17 12:53:16 +00:00
ast_literal = evaluateConstantExpressionOrIdentifierAsLiteral(header_args[1], context);
auto arg_value = ast_literal->as<ASTLiteral>()->value;
2022-06-22 14:51:18 +00:00
if (arg_value.getType() != Field::Types::Which::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as header value");
header_entries.emplace_back(arg_name, arg_value.safeGet<String>());
2022-06-17 12:53:16 +00:00
}
headers_it = arg_it;
continue;
}
2022-12-19 11:16:50 +00:00
if (headers_ast_function && headers_ast_function->name == "equals")
continue;
2022-06-17 12:53:16 +00:00
(*arg_it) = evaluateConstantExpressionOrIdentifierAsLiteral((*arg_it), context);
}
return headers_it;
}
void StorageURL::processNamedCollectionResult(Configuration & configuration, const NamedCollection & collection)
2018-06-11 12:13:00 +00:00
{
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys, optional_regex_keys);
configuration.url = collection.get<String>("url");
configuration.headers = getHeadersFromNamedCollection(collection);
2022-12-19 12:56:23 +00:00
configuration.http_method = collection.getOrDefault<String>("http_method", collection.getOrDefault<String>("method", ""));
if (!configuration.http_method.empty() && configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_PUT)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Http method can be POST or PUT (current: {}). For insert default is POST, for select GET",
configuration.http_method);
configuration.format = collection.getOrDefault<String>("format", "auto");
configuration.compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
configuration.structure = collection.getOrDefault<String>("structure", "auto");
}
2018-06-11 12:13:00 +00:00
StorageURL::Configuration StorageURL::getConfiguration(ASTs & args, ContextPtr local_context)
{
StorageURL::Configuration configuration;
2021-10-26 09:31:01 +00:00
2023-04-21 12:11:18 +00:00
if (auto named_collection = tryGetNamedCollectionWithOverrides(args, local_context))
{
StorageURL::processNamedCollectionResult(configuration, *named_collection);
collectHeaders(args, configuration.headers, local_context);
2021-09-07 11:17:25 +00:00
}
else
{
2022-01-14 13:27:57 +00:00
if (args.empty() || args.size() > 3)
2022-06-17 12:53:16 +00:00
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, bad_arguments_error_message);
2018-06-11 12:13:00 +00:00
auto * header_it = collectHeaders(args, configuration.headers, local_context);
2022-06-22 14:51:18 +00:00
if (header_it != args.end())
2022-06-17 12:53:16 +00:00
args.erase(header_it);
2018-06-11 12:13:00 +00:00
configuration.url = checkAndGetLiteralArgument<String>(args[0], "url");
if (args.size() > 1)
configuration.format = checkAndGetLiteralArgument<String>(args[1], "format");
2021-09-07 11:17:25 +00:00
if (args.size() == 3)
configuration.compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
2021-09-07 11:17:25 +00:00
}
2018-06-11 12:13:00 +00:00
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(Poco::URI(configuration.url).getPath(), true);
for (const auto & [header, value] : configuration.headers)
{
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
}
2021-09-07 11:17:25 +00:00
return configuration;
}
2018-06-11 12:13:00 +00:00
2021-09-07 11:17:25 +00:00
void registerStorageURL(StorageFactory & factory)
{
factory.registerStorage(
"URL",
[](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext());
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return std::make_shared<StorageURL>(
configuration.url,
args.table_id,
configuration.format,
format_settings,
args.columns,
args.constraints,
args.comment,
args.getContext(),
configuration.compression_method,
configuration.headers,
configuration.http_method,
partition_by);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::URL,
});
2018-06-11 12:13:00 +00:00
}
}