Merge pull request #35150 from ClickHouse/parallel-downloading-url-engine

Parallel URL engine with Ranges
This commit is contained in:
Antonio Andelic 2022-03-23 10:39:47 +01:00 committed by GitHub
commit 89e0630898
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1406 additions and 265 deletions

View File

@ -45,6 +45,7 @@
#include <Core/ServerUUID.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadHelpers.h>
#include <IO/IOThreadPool.h>
#include <IO/UseSSL.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <Interpreters/DDLWorker.h>
@ -554,6 +555,10 @@ if (ThreadFuzzer::instance().isEffective())
config().getUInt("thread_pool_queue_size", 10000)
);
IOThreadPool::initialize(
config().getUInt("max_io_thread_pool_size", 100),
config().getUInt("max_io_thread_pool_free_size", 0),
config().getUInt("io_thread_pool_queue_size", 10000));
/// Initialize global local cache for remote filesystem.
if (config().has("local_cache_for_remote_fs"))

View File

@ -113,5 +113,35 @@ public:
}
};
class SynchronizedArenaWithFreeLists : private ArenaWithFreeLists
{
public:
explicit SynchronizedArenaWithFreeLists(
const size_t initial_size = 4096, const size_t growth_factor = 2,
const size_t linear_growth_threshold = 128 * 1024 * 1024)
: ArenaWithFreeLists{initial_size, growth_factor, linear_growth_threshold}
{}
char * alloc(const size_t size)
{
std::lock_guard lock{mutex};
return ArenaWithFreeLists::alloc(size);
}
void free(char * ptr, const size_t size)
{
std::lock_guard lock{mutex};
return ArenaWithFreeLists::free(ptr, size);
}
/// Size of the allocated pool in bytes
size_t size() const
{
std::lock_guard lock{mutex};
return ArenaWithFreeLists::size();
}
private:
mutable std::mutex mutex;
};
}

View File

@ -47,6 +47,8 @@ class IColumn;
M(UInt64, max_insert_delayed_streams_for_parallel_write, 0, "The maximum number of streams (columns) to delay final part flush. Default - auto (1000 in case of underlying storage supports parallel write, for example S3 and disabled otherwise)", 0) \
M(UInt64, max_final_threads, 16, "The maximum number of threads to read from table with FINAL.", 0) \
M(MaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \
M(MaxThreads, max_download_threads, 4, "The maximum number of threads to download data (e.g. for URL engine).", 0) \
M(UInt64, max_download_buffer_size, 10*1024*1024, "The maximal size of buffer for parallel downloading (e.g. for URL engine) per each thread.", 0) \
M(UInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \
M(UInt64, max_distributed_connections, 1024, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).", 0) \
M(UInt64, max_query_size, DBMS_DEFAULT_MAX_QUERY_SIZE, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)", 0) \

34
src/IO/IOThreadPool.cpp Normal file
View File

@ -0,0 +1,34 @@
#include <IO/IOThreadPool.h>
#include "Core/Field.h"
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
std::unique_ptr<ThreadPool> IOThreadPool::instance;
void IOThreadPool::initialize(size_t max_threads, size_t max_free_threads, size_t queue_size)
{
if (instance)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "The IO thread pool is initialized twice");
}
instance = std::make_unique<ThreadPool>(max_threads, max_free_threads, queue_size, false /*shutdown_on_exception*/);
}
ThreadPool & IOThreadPool::get()
{
if (!instance)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "The IO thread pool is not initialized");
}
return *instance;
}
}

20
src/IO/IOThreadPool.h Normal file
View File

@ -0,0 +1,20 @@
#pragma once
#include <Common/ThreadPool.h>
namespace DB
{
/*
* ThreadPool used for the IO.
*/
class IOThreadPool
{
static std::unique_ptr<ThreadPool> instance;
public:
static void initialize(size_t max_threads, size_t max_free_threads, size_t queue_size);
static ThreadPool & get();
};
}

View File

@ -0,0 +1,290 @@
#include <IO/ParallelReadBuffer.h>
#include <base/logger_useful.h>
#include <Poco/Logger.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
ParallelReadBuffer::ParallelReadBuffer(
std::unique_ptr<ReadBufferFactory> reader_factory_,
ThreadPool * pool_,
size_t max_working_readers_,
WorkerSetup worker_setup_,
WorkerCleanup worker_cleanup_)
: SeekableReadBufferWithSize(nullptr, 0)
, pool(pool_)
, max_working_readers(max_working_readers_)
, reader_factory(std::move(reader_factory_))
, worker_setup(std::move(worker_setup_))
, worker_cleanup(std::move(worker_cleanup_))
{
std::unique_lock<std::mutex> lock{mutex};
addReaders(lock);
}
bool ParallelReadBuffer::addReaderToPool(std::unique_lock<std::mutex> & /*buffer_lock*/)
{
auto reader = reader_factory->getReader();
if (!reader)
{
return false;
}
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader)));
pool->scheduleOrThrow(
[&, this, worker = std::move(worker)]() mutable
{
ThreadStatus thread_status;
{
std::lock_guard lock{mutex};
++active_working_reader;
}
SCOPE_EXIT({
worker_cleanup(thread_status);
std::lock_guard lock{mutex};
--active_working_reader;
if (active_working_reader == 0)
{
readers_done.notify_all();
}
});
worker_setup(thread_status);
readerThreadFunction(std::move(worker));
});
return true;
}
void ParallelReadBuffer::addReaders(std::unique_lock<std::mutex> & buffer_lock)
{
while (read_workers.size() < max_working_readers && addReaderToPool(buffer_lock))
;
}
off_t ParallelReadBuffer::seek(off_t offset, int whence)
{
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
if (!working_buffer.empty() && static_cast<size_t>(offset) >= current_position - working_buffer.size() && offset < current_position)
{
pos = working_buffer.end() - (current_position - offset);
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return offset;
}
std::unique_lock lock{mutex};
const auto offset_is_in_range
= [&](const auto & range) { return static_cast<size_t>(offset) >= range.left && static_cast<size_t>(offset) <= *range.right; };
while (!read_workers.empty() && (offset < current_position || !offset_is_in_range(read_workers.front()->range)))
{
read_workers.front()->cancel = true;
read_workers.pop_front();
}
if (!read_workers.empty())
{
auto & front_worker = read_workers.front();
auto & segments = front_worker->segments;
current_position = front_worker->range.left;
while (true)
{
next_condvar.wait(lock, [&] { return emergency_stop || !segments.empty(); });
if (emergency_stop)
handleEmergencyStop();
auto next_segment = front_worker->nextSegment();
if (static_cast<size_t>(offset) < current_position + next_segment.size())
{
current_segment = std::move(next_segment);
working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size());
current_position += current_segment.size();
pos = working_buffer.end() - (current_position - offset);
addReaders(lock);
return offset;
}
current_position += next_segment.size();
}
}
lock.unlock();
finishAndWait();
reader_factory->seek(offset, whence);
all_completed = false;
read_workers.clear();
current_position = offset;
resetWorkingBuffer();
emergency_stop = false;
lock.lock();
addReaders(lock);
return offset;
}
std::optional<size_t> ParallelReadBuffer::getTotalSize()
{
std::lock_guard lock{mutex};
return reader_factory->getTotalSize();
}
off_t ParallelReadBuffer::getPosition()
{
return current_position - available();
}
bool ParallelReadBuffer::currentWorkerReady() const
{
assert(!read_workers.empty());
return read_workers.front()->finished || !read_workers.front()->segments.empty();
}
bool ParallelReadBuffer::currentWorkerCompleted() const
{
assert(!read_workers.empty());
return read_workers.front()->finished && read_workers.front()->segments.empty();
}
void ParallelReadBuffer::handleEmergencyStop()
{
// this can only be called from the main thread when there is an exception
assert(background_exception);
if (background_exception)
std::rethrow_exception(background_exception);
}
bool ParallelReadBuffer::nextImpl()
{
if (all_completed)
return false;
while (true)
{
std::unique_lock lock(mutex);
next_condvar.wait(
lock,
[this]()
{
/// Check if no more readers left or current reader can be processed
return emergency_stop || currentWorkerReady();
});
bool worker_removed = false;
/// Remove completed units
while (!read_workers.empty() && currentWorkerCompleted() && !emergency_stop)
{
read_workers.pop_front();
worker_removed = true;
}
if (emergency_stop)
handleEmergencyStop();
if (worker_removed)
addReaders(lock);
/// All readers processed, stop
if (read_workers.empty())
{
all_completed = true;
return false;
}
auto & front_worker = read_workers.front();
/// Read data from first segment of the first reader
if (!front_worker->segments.empty())
{
current_segment = front_worker->nextSegment();
if (currentWorkerCompleted())
{
read_workers.pop_front();
all_completed = !addReaderToPool(lock) && read_workers.empty();
}
break;
}
}
working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size());
current_position += working_buffer.size();
return true;
}
void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
{
try
{
while (!emergency_stop && !read_worker->cancel)
{
if (!read_worker->reader->next())
throw Exception("Failed to read all the data from the reader", ErrorCodes::LOGICAL_ERROR);
if (emergency_stop || read_worker->cancel)
break;
Buffer buffer = read_worker->reader->buffer();
size_t bytes_to_copy = std::min(buffer.size(), read_worker->bytes_left);
Segment new_segment(bytes_to_copy, &arena);
memcpy(new_segment.data(), buffer.begin(), bytes_to_copy);
read_worker->reader->ignore(bytes_to_copy);
read_worker->bytes_left -= bytes_to_copy;
{
/// New data ready to be read
std::lock_guard lock(mutex);
read_worker->segments.emplace_back(std::move(new_segment));
read_worker->finished = read_worker->bytes_left == 0;
next_condvar.notify_all();
}
if (read_worker->finished)
{
break;
}
}
}
catch (...)
{
onBackgroundException();
}
}
void ParallelReadBuffer::onBackgroundException()
{
std::lock_guard lock(mutex);
if (!background_exception)
{
background_exception = std::current_exception();
}
emergency_stop = true;
next_condvar.notify_all();
}
void ParallelReadBuffer::finishAndWait()
{
emergency_stop = true;
std::unique_lock lock{mutex};
readers_done.wait(lock, [&] { return active_working_reader == 0; });
}
}

174
src/IO/ParallelReadBuffer.h Normal file
View File

@ -0,0 +1,174 @@
#pragma once
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBuffer.h>
#include <IO/SeekableReadBuffer.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/ThreadPool.h>
namespace DB
{
/**
* Reads from multiple ReadBuffers in parallel.
* Preserves order of readers obtained from ReadBufferFactory.
*
* It consumes multiple readers and yields data from them in order as it passed.
* Each working reader save segments of data to internal queue.
*
* ParallelReadBuffer in nextImpl method take first available segment from first reader in deque and fed it to user.
* When first reader finish reading, they will be removed from worker deque and data from next reader consumed.
*
* Number of working readers limited by max_working_readers.
*/
class ParallelReadBuffer : public SeekableReadBufferWithSize
{
private:
/// Blocks until data occurred in the first reader or this reader indicate finishing
/// Finished readers removed from queue and data from next readers processed
bool nextImpl() override;
class Segment : private boost::noncopyable
{
public:
Segment(size_t size_, SynchronizedArenaWithFreeLists * arena_) : arena(arena_), m_data(arena->alloc(size_)), m_size(size_) { }
Segment() = default;
Segment(Segment && other) noexcept : arena(other.arena)
{
std::swap(m_data, other.m_data);
std::swap(m_size, other.m_size);
}
Segment & operator=(Segment && other) noexcept
{
arena = other.arena;
std::swap(m_data, other.m_data);
std::swap(m_size, other.m_size);
return *this;
}
~Segment()
{
if (m_data)
{
arena->free(m_data, m_size);
}
}
auto data() const noexcept { return m_data; }
auto size() const noexcept { return m_size; }
private:
SynchronizedArenaWithFreeLists * arena{nullptr};
char * m_data{nullptr};
size_t m_size{0};
};
public:
class ReadBufferFactory
{
public:
virtual SeekableReadBufferPtr getReader() = 0;
virtual ~ReadBufferFactory() = default;
virtual off_t seek(off_t off, int whence) = 0;
virtual std::optional<size_t> getTotalSize() = 0;
};
using WorkerSetup = std::function<void(ThreadStatus &)>;
using WorkerCleanup = std::function<void(ThreadStatus &)>;
explicit ParallelReadBuffer(
std::unique_ptr<ReadBufferFactory> reader_factory_,
ThreadPool * pool,
size_t max_working_readers,
WorkerSetup worker_setup = {},
WorkerCleanup worker_cleanup = {});
~ParallelReadBuffer() override { finishAndWait(); }
off_t seek(off_t off, int whence) override;
std::optional<size_t> getTotalSize() override;
off_t getPosition() override;
private:
/// Reader in progress with a list of read segments
struct ReadWorker
{
explicit ReadWorker(SeekableReadBufferPtr reader_) : reader(std::move(reader_)), range(reader->getRemainingReadRange())
{
assert(range.right);
bytes_left = *range.right - range.left + 1;
}
Segment nextSegment()
{
assert(!segments.empty());
auto next_segment = std::move(segments.front());
segments.pop_front();
range.left += next_segment.size();
return next_segment;
}
SeekableReadBufferPtr reader;
std::deque<Segment> segments;
bool finished{false};
SeekableReadBuffer::Range range;
size_t bytes_left{0};
std::atomic_bool cancel{false};
};
using ReadWorkerPtr = std::shared_ptr<ReadWorker>;
/// First worker in deque have new data or processed all available amount
bool currentWorkerReady() const;
/// First worker in deque processed and flushed all data
bool currentWorkerCompleted() const;
void handleEmergencyStop();
void addReaders(std::unique_lock<std::mutex> & buffer_lock);
bool addReaderToPool(std::unique_lock<std::mutex> & buffer_lock);
/// Process read_worker, read data and save into internal segments queue
void readerThreadFunction(ReadWorkerPtr read_worker);
void onBackgroundException();
void finishAndWait();
SynchronizedArenaWithFreeLists arena;
Segment current_segment;
ThreadPool * pool;
size_t max_working_readers;
size_t active_working_reader{0};
// Triggered when all reader workers are done
std::condition_variable readers_done;
std::unique_ptr<ReadBufferFactory> reader_factory;
WorkerSetup worker_setup;
WorkerCleanup worker_cleanup;
/**
* FIFO queue of readers.
* Each worker contains reader itself and downloaded segments.
* When reader read all available data it will be removed from
* deque and data from next reader will be consumed to user.
*/
std::deque<ReadWorkerPtr> read_workers;
std::mutex mutex;
/// Triggered when new data available
std::condition_variable next_condvar;
std::exception_ptr background_exception = nullptr;
std::atomic_bool emergency_stop{false};
off_t current_position{0};
bool all_completed{false};
};
}

View File

@ -1,32 +1,33 @@
#pragma once
#include <functional>
#include <base/types.h>
#include <base/sleep.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadSettings.h>
#include <base/logger_useful.h>
#include <base/sleep.h>
#include <base/types.h>
#include <Poco/Any.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/URI.h>
#include <Poco/URIStreamFactory.h>
#include <Poco/Version.h>
#include <Common/DNSResolver.h>
#include <Common/RemoteHostFilter.h>
#include <Common/config.h>
#include <Common/config_version.h>
#include <base/logger_useful.h>
#include <Poco/URIStreamFactory.h>
namespace ProfileEvents
{
extern const Event ReadBufferSeekCancelConnection;
extern const Event ReadBufferSeekCancelConnection;
}
namespace DB
@ -48,7 +49,7 @@ class UpdatableSessionBase
{
protected:
SessionPtr session;
UInt64 redirects { 0 };
UInt64 redirects{0};
Poco::URI initial_uri;
ConnectionTimeouts timeouts;
UInt64 max_redirects;
@ -56,19 +57,12 @@ protected:
public:
virtual void buildNewSession(const Poco::URI & uri) = 0;
explicit UpdatableSessionBase(const Poco::URI uri,
const ConnectionTimeouts & timeouts_,
UInt64 max_redirects_)
: initial_uri { uri }
, timeouts { timeouts_ }
, max_redirects { max_redirects_ }
explicit UpdatableSessionBase(const Poco::URI uri, const ConnectionTimeouts & timeouts_, UInt64 max_redirects_)
: initial_uri{uri}, timeouts{timeouts_}, max_redirects{max_redirects_}
{
}
SessionPtr getSession()
{
return session;
}
SessionPtr getSession() { return session; }
void updateSession(const Poco::URI & uri)
{
@ -99,7 +93,7 @@ namespace detail
/// HTTP range, including right bound [begin, end].
struct Range
{
size_t begin = 0;
std::optional<size_t> begin;
std::optional<size_t> end;
};
@ -144,10 +138,9 @@ namespace detail
return read_range.begin || read_range.end || retry_with_range_header;
}
size_t getOffset() const
{
return read_range.begin + offset_from_begin_pos;
}
size_t getRangeBegin() const { return read_range.begin.value_or(0); }
size_t getOffset() const { return getRangeBegin() + offset_from_begin_pos; }
std::istream * callImpl(Poco::URI uri_, Poco::Net::HTTPResponse & response, const std::string & method_)
{
@ -161,7 +154,7 @@ namespace detail
if (out_stream_callback)
request.setChunkedTransferEncoding(true);
for (auto & http_header_entry: http_header_entries)
for (auto & http_header_entry : http_header_entries)
request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry));
if (withPartialContent())
@ -207,26 +200,14 @@ namespace detail
std::optional<size_t> getTotalSize() override
{
if (read_range.end)
return *read_range.end - read_range.begin;
return *read_range.end - getRangeBegin();
Poco::Net::HTTPResponse response;
for (size_t i = 0; i < 10; ++i)
{
try
{
call(response, Poco::Net::HTTPRequest::HTTP_HEAD);
while (isRedirect(response.getStatus()))
{
Poco::URI uri_redirect(response.get("Location"));
if (remote_host_filter)
remote_host_filter->checkURL(uri_redirect);
session->updateSession(uri_redirect);
istr = callImpl(uri_redirect, response, method);
}
callWithRedirects(response, Poco::Net::HTTPRequest::HTTP_HEAD);
break;
}
catch (const Poco::Exception & e)
@ -236,7 +217,7 @@ namespace detail
}
if (response.hasContentLength())
read_range.end = read_range.begin + response.getContentLength();
read_range.end = getRangeBegin() + response.getContentLength();
return read_range.end;
}
@ -252,6 +233,21 @@ namespace detail
InitializeError initialization_error = InitializeError::NONE;
private:
void setupExternalBuffer()
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
* previously returned buffer was read or not (no hasPendingData() check is needed),
* because this branch means we are prefetching data,
* each nextImpl() call we can fill a different buffer.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
public:
using NextCallback = std::function<void(size_t)>;
using OutStreamCallback = std::function<void(std::ostream &)>;
@ -276,7 +272,7 @@ namespace detail
, session {session_}
, out_stream_callback {out_stream_callback_}
, credentials {credentials_}
, http_header_entries {http_header_entries_}
, http_header_entries {std::move(http_header_entries_)}
, remote_host_filter {remote_host_filter_}
, buffer_size {buffer_size_}
, use_external_buffer {use_external_buffer_}
@ -287,18 +283,21 @@ namespace detail
{
if (settings.http_max_tries <= 0 || settings.http_retry_initial_backoff_ms <= 0
|| settings.http_retry_initial_backoff_ms >= settings.http_retry_max_backoff_ms)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Invalid setting for http backoff, "
"must be http_max_tries >= 1 (current is {}) and "
"0 < http_retry_initial_backoff_ms < settings.http_retry_max_backoff_ms (now 0 < {} < {})",
settings.http_max_tries, settings.http_retry_initial_backoff_ms, settings.http_retry_max_backoff_ms);
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Invalid setting for http backoff, "
"must be http_max_tries >= 1 (current is {}) and "
"0 < http_retry_initial_backoff_ms < settings.http_retry_max_backoff_ms (now 0 < {} < {})",
settings.http_max_tries,
settings.http_retry_initial_backoff_ms,
settings.http_retry_max_backoff_ms);
// Configure User-Agent if it not already set.
const std::string user_agent = "User-Agent";
auto iter = std::find_if(http_header_entries.begin(), http_header_entries.end(), [&user_agent](const HTTPHeaderEntry & entry)
{
return std::get<0>(entry) == user_agent;
});
auto iter = std::find_if(
http_header_entries.begin(),
http_header_entries.end(),
[&user_agent](const HTTPHeaderEntry & entry) { return std::get<0>(entry) == user_agent; });
if (iter == http_header_entries.end())
{
@ -313,7 +312,36 @@ namespace detail
}
}
void call(Poco::Net::HTTPResponse & response, const String & method_)
static bool isRetriableError(const Poco::Net::HTTPResponse::HTTPStatus http_status) noexcept
{
constexpr std::array non_retriable_errors{
Poco::Net::HTTPResponse::HTTPStatus::HTTP_BAD_REQUEST,
Poco::Net::HTTPResponse::HTTPStatus::HTTP_UNAUTHORIZED,
Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND,
Poco::Net::HTTPResponse::HTTPStatus::HTTP_FORBIDDEN,
Poco::Net::HTTPResponse::HTTPStatus::HTTP_METHOD_NOT_ALLOWED};
return std::all_of(
non_retriable_errors.begin(), non_retriable_errors.end(), [&](const auto status) { return http_status != status; });
}
void callWithRedirects(Poco::Net::HTTPResponse & response, const String & method_, bool throw_on_all_errors = false)
{
call(response, method_, throw_on_all_errors);
while (isRedirect(response.getStatus()))
{
Poco::URI uri_redirect(response.get("Location"));
if (remote_host_filter)
remote_host_filter->checkURL(uri_redirect);
session->updateSession(uri_redirect);
istr = callImpl(uri_redirect, response, method);
}
}
void call(Poco::Net::HTTPResponse & response, const String & method_, bool throw_on_all_errors = false)
{
try
{
@ -321,18 +349,18 @@ namespace detail
}
catch (...)
{
if (throw_on_all_errors)
{
throw;
}
auto http_status = response.getStatus();
if (http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND
&& http_skip_not_found_url)
if (http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND && http_skip_not_found_url)
{
initialization_error = InitializeError::SKIP_NOT_FOUND_URL;
}
else if (http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_BAD_REQUEST
|| http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_UNAUTHORIZED
|| http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND
|| http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_FORBIDDEN
|| http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_METHOD_NOT_ALLOWED)
else if (!isRetriableError(http_status))
{
initialization_error = InitializeError::NON_RETRIABLE_ERROR;
exception = std::current_exception();
@ -372,12 +400,14 @@ namespace detail
if (withPartialContent() && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
{
/// Having `200 OK` instead of `206 Partial Content` is acceptable in case we retried with range.begin == 0.
if (read_range.begin)
if (read_range.begin && *read_range.begin != 0)
{
if (!exception)
exception = std::make_exception_ptr(
Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE,
"Cannot read with range: [{}, {}]", read_range.begin, read_range.end ? *read_range.end : '-'));
exception = std::make_exception_ptr(Exception(
ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE,
"Cannot read with range: [{}, {}]",
*read_range.begin,
read_range.end ? *read_range.end : '-'));
initialization_error = InitializeError::NON_RETRIABLE_ERROR;
return;
@ -386,12 +416,12 @@ namespace detail
{
/// We could have range.begin == 0 and range.end != 0 in case of DiskWeb and failing to read with partial content
/// will affect only performance, so a warning is enough.
LOG_WARNING(log, "Unable to read with range header: [{}, {}]", read_range.begin, *read_range.end);
LOG_WARNING(log, "Unable to read with range header: [{}, {}]", getRangeBegin(), *read_range.end);
}
}
if (!offset_from_begin_pos && !read_range.end && response.hasContentLength())
read_range.end = read_range.begin + response.getContentLength();
read_range.end = getRangeBegin() + response.getContentLength();
try
{
@ -399,12 +429,7 @@ namespace detail
if (use_external_buffer)
{
/**
* See comment 30 lines below.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
setupExternalBuffer();
}
}
catch (const Poco::Exception & e)
@ -426,23 +451,17 @@ namespace detail
if (next_callback)
next_callback(count());
if (read_range.end && getOffset() == read_range.end.value())
if (read_range.end && getOffset() > read_range.end.value())
{
assert(getOffset() == read_range.end.value() + 1);
return false;
}
if (impl)
{
if (use_external_buffer)
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
* previously returned buffer was read or not (no hasPendingData() check is needed),
* because this branch means we are prefetching data,
* each nextImpl() call we can fill a different buffer.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
setupExternalBuffer();
}
else
{
@ -477,10 +496,7 @@ namespace detail
if (use_external_buffer)
{
/// See comment 40 lines above.
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
setupExternalBuffer();
}
}
@ -498,13 +514,18 @@ namespace detail
if (!can_retry_request)
throw;
LOG_ERROR(log,
"HTTP request to `{}` failed at try {}/{} with bytes read: {}/{}. "
"Error: {}. (Current backoff wait is {}/{} ms)",
uri.toString(), i + 1, settings.http_max_tries,
getOffset(), read_range.end ? toString(*read_range.end) : "unknown",
e.displayText(),
milliseconds_to_wait, settings.http_retry_max_backoff_ms);
LOG_ERROR(
log,
"HTTP request to `{}` failed at try {}/{} with bytes read: {}/{}. "
"Error: {}. (Current backoff wait is {}/{} ms)",
uri.toString(),
i + 1,
settings.http_max_tries,
getOffset(),
read_range.end ? toString(*read_range.end) : "unknown",
e.displayText(),
milliseconds_to_wait,
settings.http_retry_max_backoff_ms);
retry_with_range_header = true;
exception = std::current_exception();
@ -529,10 +550,7 @@ namespace detail
return true;
}
off_t getPosition() override
{
return getOffset() - available();
}
off_t getPosition() override { return getOffset() - available(); }
off_t seek(off_t offset_, int whence) override
{
@ -540,12 +558,11 @@ namespace detail
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
throw Exception(
"Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
off_t current_offset = getOffset();
if (!working_buffer.empty()
&& size_t(offset_) >= current_offset - working_buffer.size()
&& offset_ < current_offset)
if (!working_buffer.empty() && size_t(offset_) >= current_offset - working_buffer.size() && offset_ < current_offset)
{
pos = working_buffer.end() - (current_offset - offset_);
assert(pos >= working_buffer.begin());
@ -567,7 +584,6 @@ namespace detail
if (impl)
{
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
@ -580,6 +596,8 @@ namespace detail
return offset_;
}
SeekableReadBuffer::Range getRemainingReadRange() const override { return {getOffset(), read_range.end}; }
std::string getResponseCookie(const std::string & name, const std::string & def) const
{
for (const auto & cookie : cookies)
@ -599,10 +617,7 @@ namespace detail
next_callback(count());
}
const std::string & getCompressionMethod() const
{
return content_encoding;
}
const std::string & getCompressionMethod() const { return content_encoding; }
};
}
@ -611,19 +626,50 @@ class UpdatableSession : public UpdatableSessionBase<HTTPSessionPtr>
using Parent = UpdatableSessionBase<HTTPSessionPtr>;
public:
UpdatableSession(
const Poco::URI uri,
const ConnectionTimeouts & timeouts_,
const UInt64 max_redirects_)
UpdatableSession(const Poco::URI uri, const ConnectionTimeouts & timeouts_, const UInt64 max_redirects_)
: Parent(uri, timeouts_, max_redirects_)
{
session = makeHTTPSession(initial_uri, timeouts);
}
void buildNewSession(const Poco::URI & uri) override
void buildNewSession(const Poco::URI & uri) override { session = makeHTTPSession(uri, timeouts); }
};
class RangeGenerator
{
public:
explicit RangeGenerator(size_t total_size_, size_t range_step_, size_t range_start = 0)
: from(range_start), range_step(range_step_), total_size(total_size_)
{
session = makeHTTPSession(uri, timeouts);
}
size_t totalRanges() const { return static_cast<size_t>(round(static_cast<float>(total_size - from) / range_step)); }
using Range = std::pair<size_t, size_t>;
// return upper exclusive range of values, i.e. [from_range, to_range>
std::optional<Range> nextRange()
{
if (from >= total_size)
{
return std::nullopt;
}
auto to = from + range_step;
if (to >= total_size)
{
to = total_size;
}
Range range{from, to};
from = to;
return std::move(range);
}
private:
size_t from;
size_t range_step;
size_t total_size;
};
class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>
@ -631,7 +677,7 @@ class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::
using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>;
public:
ReadWriteBufferFromHTTP(
ReadWriteBufferFromHTTP(
Poco::URI uri_,
const std::string & method_,
OutStreamCallback out_stream_callback_,
@ -646,14 +692,117 @@ public:
bool delay_initialization_ = true,
bool use_external_buffer_ = false,
bool skip_not_found_url_ = false)
: Parent(std::make_shared<UpdatableSession>(uri_, timeouts, max_redirects),
uri_, credentials_, method_, out_stream_callback_, buffer_size_,
settings_, http_header_entries_, read_range_, remote_host_filter_,
delay_initialization_, use_external_buffer_, skip_not_found_url_)
: Parent(
std::make_shared<UpdatableSession>(uri_, timeouts, max_redirects),
uri_,
credentials_,
method_,
out_stream_callback_,
buffer_size_,
settings_,
http_header_entries_,
read_range_,
remote_host_filter_,
delay_initialization_,
use_external_buffer_,
skip_not_found_url_)
{
}
};
class RangedReadWriteBufferFromHTTPFactory : public ParallelReadBuffer::ReadBufferFactory
{
using OutStreamCallback = ReadWriteBufferFromHTTP::OutStreamCallback;
public:
RangedReadWriteBufferFromHTTPFactory(
size_t total_object_size_,
size_t range_step_,
Poco::URI uri_,
std::string method_,
OutStreamCallback out_stream_callback_,
ConnectionTimeouts timeouts_,
const Poco::Net::HTTPBasicCredentials & credentials_,
UInt64 max_redirects_ = 0,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
ReadSettings settings_ = {},
ReadWriteBufferFromHTTP::HTTPHeaderEntries http_header_entries_ = {},
const RemoteHostFilter * remote_host_filter_ = nullptr,
bool delay_initialization_ = true,
bool use_external_buffer_ = false,
bool skip_not_found_url_ = false)
: range_generator(total_object_size_, range_step_)
, total_object_size(total_object_size_)
, range_step(range_step_)
, uri(uri_)
, method(std::move(method_))
, out_stream_callback(out_stream_callback_)
, timeouts(std::move(timeouts_))
, credentials(credentials_)
, max_redirects(max_redirects_)
, buffer_size(buffer_size_)
, settings(std::move(settings_))
, http_header_entries(std::move(http_header_entries_))
, remote_host_filter(remote_host_filter_)
, delay_initialization(delay_initialization_)
, use_external_buffer(use_external_buffer_)
, skip_not_found_url(skip_not_found_url_)
{
}
SeekableReadBufferPtr getReader() override
{
const auto next_range = range_generator.nextRange();
if (!next_range)
{
return nullptr;
}
return std::make_shared<ReadWriteBufferFromHTTP>(
uri,
method,
out_stream_callback,
timeouts,
credentials,
max_redirects,
buffer_size,
settings,
http_header_entries,
// HTTP Range has inclusive bounds, i.e. [from, to]
ReadWriteBufferFromHTTP::Range{next_range->first, next_range->second - 1},
remote_host_filter,
delay_initialization,
use_external_buffer,
skip_not_found_url);
}
off_t seek(off_t off, [[maybe_unused]] int whence) override
{
range_generator = RangeGenerator{total_object_size, range_step, static_cast<size_t>(off)};
return off;
}
std::optional<size_t> getTotalSize() override { return total_object_size; }
private:
RangeGenerator range_generator;
size_t total_object_size;
size_t range_step;
Poco::URI uri;
std::string method;
OutStreamCallback out_stream_callback;
ConnectionTimeouts timeouts;
const Poco::Net::HTTPBasicCredentials & credentials;
UInt64 max_redirects;
size_t buffer_size;
ReadSettings settings;
ReadWriteBufferFromHTTP::HTTPHeaderEntries http_header_entries;
const RemoteHostFilter * remote_host_filter;
bool delay_initialization;
bool use_external_buffer;
bool skip_not_found_url;
};
class UpdatablePooledSession : public UpdatableSessionBase<PooledHTTPSessionPtr>
{
using Parent = UpdatableSessionBase<PooledHTTPSessionPtr>;
@ -662,20 +811,14 @@ private:
size_t per_endpoint_pool_size;
public:
explicit UpdatablePooledSession(const Poco::URI uri,
const ConnectionTimeouts & timeouts_,
const UInt64 max_redirects_,
size_t per_endpoint_pool_size_)
: Parent(uri, timeouts_, max_redirects_)
, per_endpoint_pool_size { per_endpoint_pool_size_ }
explicit UpdatablePooledSession(
const Poco::URI uri, const ConnectionTimeouts & timeouts_, const UInt64 max_redirects_, size_t per_endpoint_pool_size_)
: Parent(uri, timeouts_, max_redirects_), per_endpoint_pool_size{per_endpoint_pool_size_}
{
session = makePooledHTTPSession(initial_uri, timeouts, per_endpoint_pool_size);
}
void buildNewSession(const Poco::URI & uri) override
{
session = makePooledHTTPSession(uri, timeouts, per_endpoint_pool_size);
}
void buildNewSession(const Poco::URI & uri) override { session = makePooledHTTPSession(uri, timeouts, per_endpoint_pool_size); }
};
class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatablePooledSession>>
@ -683,7 +826,8 @@ class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase
using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatablePooledSession>>;
public:
explicit PooledReadWriteBufferFromHTTP(Poco::URI uri_,
explicit PooledReadWriteBufferFromHTTP(
Poco::URI uri_,
const std::string & method_ = {},
OutStreamCallback out_stream_callback_ = {},
const ConnectionTimeouts & timeouts_ = {},
@ -691,12 +835,13 @@ public:
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const UInt64 max_redirects = 0,
size_t max_connections_per_endpoint = DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT)
: Parent(std::make_shared<UpdatablePooledSession>(uri_, timeouts_, max_redirects, max_connections_per_endpoint),
uri_,
credentials_,
method_,
out_stream_callback_,
buffer_size_)
: Parent(
std::make_shared<UpdatablePooledSession>(uri_, timeouts_, max_redirects, max_connections_per_endpoint),
uri_,
credentials_,
method_,
out_stream_callback_,
buffer_size_)
{
}
};

View File

@ -3,30 +3,35 @@
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/IOThreadPool.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Common/parseRemoteDescription.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Storages/PartitionedSink.h>
#include "Common/ThreadStatus.h"
#include <Common/parseRemoteDescription.h>
#include "IO/HTTPCommon.h"
#include "IO/ReadWriteBufferFromHTTP.h"
#include <Poco/Net/HTTPRequest.h>
#include <algorithm>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <base/logger_useful.h>
#include <algorithm>
#include <Poco/Net/HTTPRequest.h>
namespace DB
@ -43,8 +48,7 @@ namespace ErrorCodes
static bool urlWithGlobs(const String & uri)
{
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos)
|| uri.find('|') != std::string::npos;
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) || uri.find('|') != std::string::npos;
}
@ -88,8 +92,7 @@ IStorageURLBase::IStorageURLBase(
namespace
{
ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders(
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders(const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
{
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers(headers_.begin(), headers_.end());
// Propagate OpenTelemetry trace context, if any, downstream.
@ -98,13 +101,11 @@ namespace
const auto & thread_trace_context = CurrentThread::get().thread_trace_context;
if (thread_trace_context.trace_id != UUID())
{
headers.emplace_back("traceparent",
thread_trace_context.composeTraceparentHeader());
headers.emplace_back("traceparent", thread_trace_context.composeTraceparentHeader());
if (!thread_trace_context.tracestate.empty())
{
headers.emplace_back("tracestate",
thread_trace_context.tracestate);
headers.emplace_back("tracestate", thread_trace_context.tracestate);
}
}
}
@ -114,8 +115,7 @@ namespace
class StorageURLSource : public SourceWithProgress
{
using URIParams = std::vector<std::pair<String, String>>;
using URIParams = std::vector<std::pair<String, String>>;
public:
struct URIInfo
@ -160,11 +160,11 @@ namespace
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
const String & compression_method,
size_t download_threads,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
const URIParams & params = {},
bool glob_url = false)
: SourceWithProgress(sample_block), name(std::move(name_))
, uri_info(uri_info_)
: SourceWithProgress(sample_block), name(std::move(name_)), uri_info(uri_info_)
{
auto headers = getHeaders(headers_);
@ -176,33 +176,40 @@ namespace
auto first_option = uri_options.begin();
read_buf = getFirstAvailableURLReadBuffer(
first_option, uri_options.end(), context, params, http_method,
callback, timeouts, compression_method, credentials, headers, glob_url, uri_options.size() == 1);
first_option,
uri_options.end(),
context,
params,
http_method,
callback,
timeouts,
compression_method,
credentials,
headers,
glob_url,
uri_options.size() == 1,
download_threads);
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
auto input_format
= FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
builder.addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context);
});
builder.addSimpleTransform(
[&](const Block & cur_header)
{ return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context); });
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
};
}
String getName() const override
{
return name;
}
String getName() const override { return name; }
Chunk generate() override
{
while (true)
{
if (!reader)
{
auto current_uri_pos = uri_info->next_uri_to_read.fetch_add(1);
@ -239,7 +246,8 @@ namespace
Poco::Net::HTTPBasicCredentials & credentials,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
bool glob_url,
bool delay_initialization)
bool delay_initialization,
size_t download_threads)
{
String first_exception_message;
ReadSettings read_settings = context->getReadSettings();
@ -255,8 +263,137 @@ namespace
setCredentials(credentials, request_uri);
const auto settings = context->getSettings();
try
{
if (download_threads > 1)
{
try
{
ReadWriteBufferFromHTTP buffer(
request_uri,
Poco::Net::HTTPRequest::HTTP_HEAD,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
read_settings,
headers,
ReadWriteBufferFromHTTP::Range{0, std::nullopt},
&context->getRemoteHostFilter(),
true,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
Poco::Net::HTTPResponse res;
for (size_t i = 0; i < settings.http_max_tries; ++i)
{
try
{
buffer.callWithRedirects(res, Poco::Net::HTTPRequest::HTTP_HEAD, true);
break;
}
catch (const Poco::Exception & e)
{
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
"HTTP HEAD request to `{}` failed at try {}/{}. "
"Error: {}.",
request_uri.toString(),
i + 1,
settings.http_max_tries,
e.displayText());
if (!ReadWriteBufferFromHTTP::isRetriableError(res.getStatus()))
{
throw;
}
}
}
// to check if Range header is supported, we need to send a request with it set
const bool supports_ranges = (res.has("Accept-Ranges") && res.get("Accept-Ranges") == "bytes")
|| (res.has("Content-Range") && res.get("Content-Range").starts_with("bytes"));
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
fmt::runtime(supports_ranges ? "HTTP Range is supported" : "HTTP Range is not supported"));
if (supports_ranges && res.getStatus() == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT
&& res.hasContentLength())
{
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
"Using ParallelReadBuffer with {} workers with chunks of {} bytes",
download_threads,
settings.max_download_buffer_size);
auto read_buffer_factory = std::make_unique<RangedReadWriteBufferFromHTTPFactory>(
res.getContentLength(),
settings.max_download_buffer_size,
request_uri,
http_method,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
read_settings,
headers,
&context->getRemoteHostFilter(),
delay_initialization,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()
? CurrentThread::get().getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
ContextPtr query_context
= CurrentThread::isInitialized() ? CurrentThread::get().getQueryContext() : nullptr;
auto worker_cleanup = [has_running_group = running_group == nullptr](ThreadStatus & thread_status)
{
if (has_running_group)
thread_status.detachQuery(false);
};
auto worker_setup = [query_context = std::move(query_context),
running_group = std::move(running_group)](ThreadStatus & thread_status)
{
/// Save query context if any, because cache implementation needs it.
if (query_context)
thread_status.attachQueryContext(query_context);
/// To be able to pass ProfileEvents.
if (running_group)
thread_status.attachQuery(running_group);
};
return wrapReadBufferWithCompressionMethod(
std::make_unique<ParallelReadBuffer>(
std::move(read_buffer_factory),
&IOThreadPool::get(),
download_threads,
std::move(worker_setup),
std::move(worker_cleanup)),
chooseCompressionMethod(request_uri.getPath(), compression_method));
}
}
catch (const Poco::Exception & e)
{
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
"Failed to setup ParallelReadBuffer because of an exception:\n{}.\nFalling back to the single-threaded "
"buffer",
e.displayText());
}
}
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "Using single-threaded read buffer");
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
@ -264,15 +401,15 @@ namespace
callback,
timeouts,
credentials,
context->getSettingsRef().max_http_get_redirects,
settings.max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
read_settings,
headers,
ReadWriteBufferFromHTTP::Range{},
&context->getRemoteHostFilter(),
delay_initialization,
/* use_external_buffer */false,
/* skip_url_not_found_error */skip_url_not_found_error),
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error),
chooseCompressionMethod(request_uri.getPath(), compression_method));
}
catch (...)
@ -323,10 +460,10 @@ StorageURLSink::StorageURLSink(
std::string content_encoding = toContentEncodingName(compression_method);
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(Poco::URI(uri), http_method, content_type, content_encoding, timeouts),
compression_method, 3);
writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block,
context, {} /* write callback */, format_settings);
std::make_unique<WriteBufferFromHTTP>(Poco::URI(uri), http_method, content_type, content_encoding, timeouts),
compression_method,
3);
writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block, context, {} /* write callback */, format_settings);
}
@ -355,15 +492,15 @@ public:
const ConnectionTimeouts & timeouts_,
const CompressionMethod compression_method_,
const String & http_method_)
: PartitionedSink(partition_by, context_, sample_block_)
, uri(uri_)
, format(format_)
, format_settings(format_settings_)
, sample_block(sample_block_)
, context(context_)
, timeouts(timeouts_)
, compression_method(compression_method_)
, http_method(http_method_)
: PartitionedSink(partition_by, context_, sample_block_)
, uri(uri_)
, format(format_)
, format_settings(format_settings_)
, sample_block(sample_block_)
, context(context_)
, timeouts(timeouts_)
, compression_method(compression_method_)
, http_method(http_method_)
{
}
@ -371,8 +508,8 @@ public:
{
auto partition_path = PartitionedSink::replaceWildcards(uri, partition_id);
context->getRemoteHostFilter().checkURL(Poco::URI(partition_path));
return std::make_shared<StorageURLSink>(partition_path, format,
format_settings, sample_block, context, timeouts, compression_method, http_method);
return std::make_shared<StorageURLSink>(
partition_path, format, format_settings, sample_block, context, timeouts, compression_method, http_method);
}
private:
@ -462,7 +599,8 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
credentials,
headers,
false,
false);
false,
context->getSettingsRef().max_download_threads);
};
try
@ -479,7 +617,10 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
} while (++option < urls_to_check.end());
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from urls failed. Errors:\n{}", exception_messages);
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"All attempts to extract table structure from urls failed. Errors:\n{}",
exception_messages);
}
bool IStorageURLBase::isColumnOriented() const
@ -512,6 +653,8 @@ Pipe IStorageURLBase::read(
block_for_format = storage_snapshot->metadata->getSampleBlock();
}
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
if (urlWithGlobs(uri))
{
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
@ -528,14 +671,13 @@ Pipe IStorageURLBase::read(
Pipes pipes;
pipes.reserve(num_streams);
size_t download_threads = num_streams >= max_download_threads ? 1 : (max_download_threads / num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageURLSource>(
uri_info,
getReadMethod(),
getReadPOSTDataCallback(
column_names, columns_description, query_info,
local_context, processed_stage, max_block_size),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
@ -544,7 +686,11 @@ Pipe IStorageURLBase::read(
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params, /* glob_url */true));
compression_method,
download_threads,
headers,
params,
/* glob_url */ true));
}
return Pipe::unitePipes(std::move(pipes));
}
@ -555,9 +701,7 @@ Pipe IStorageURLBase::read(
return Pipe(std::make_shared<StorageURLSource>(
uri_info,
getReadMethod(),
getReadPOSTDataCallback(
column_names, columns_description, query_info,
local_context, processed_stage, max_block_size),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
@ -566,7 +710,10 @@ Pipe IStorageURLBase::read(
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));
compression_method,
max_download_threads,
headers,
params));
}
}
@ -598,12 +745,10 @@ Pipe StorageURLWithFailover::read(
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.emplace_back(uri_options);
auto pipe = Pipe(std::make_shared<StorageURLSource>(
auto pipe = Pipe(std::make_shared<StorageURLSource>(
uri_info,
getReadMethod(),
getReadPOSTDataCallback(
column_names, columns_description, query_info,
local_context, processed_stage, max_block_size),
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
@ -612,7 +757,10 @@ Pipe StorageURLWithFailover::read(
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));
compression_method,
local_context->getSettingsRef().max_download_threads,
headers,
params));
std::shuffle(uri_options.begin(), uri_options.end(), thread_local_rng);
return pipe;
}
@ -632,17 +780,26 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
{
return std::make_shared<PartitionedStorageURLSink>(
partition_by_ast,
uri, format_name,
format_settings, metadata_snapshot->getSampleBlock(), context,
uri,
format_name,
format_settings,
metadata_snapshot->getSampleBlock(),
context,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri, compression_method), http_method);
chooseCompressionMethod(uri, compression_method),
http_method);
}
else
{
return std::make_shared<StorageURLSink>(uri, format_name,
format_settings, metadata_snapshot->getSampleBlock(), context,
return std::make_shared<StorageURLSink>(
uri,
format_name,
format_settings,
metadata_snapshot->getSampleBlock(),
context,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri, compression_method), http_method);
chooseCompressionMethod(uri, compression_method),
http_method);
}
}
@ -659,8 +816,19 @@ StorageURL::StorageURL(
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_,
const String & http_method_,
ASTPtr partition_by_)
: IStorageURLBase(uri_, context_, table_id_, format_name_, format_settings_,
columns_, constraints_, comment, compression_method_, headers_, http_method_, partition_by_)
: IStorageURLBase(
uri_,
context_,
table_id_,
format_name_,
format_settings_,
columns_,
constraints_,
comment,
compression_method_,
headers_,
http_method_,
partition_by_)
{
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
}
@ -711,8 +879,7 @@ FormatSettings StorageURL::getFormatSettingsFromArgs(const StorageFactory::Argum
// Apply changes from SETTINGS clause, with validation.
user_format_settings.applyChanges(args.storage_def->settings->changes);
format_settings = getFormatSettings(args.getContext(),
user_format_settings);
format_settings = getFormatSettings(args.getContext(), user_format_settings);
}
else
{
@ -731,12 +898,12 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
if (!configuration.http_method.empty()
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
if (!configuration.http_method.empty() && configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_PUT)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Http method can be POST or PUT (current: {}). For insert default is POST, for select GET",
configuration.http_method);
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Http method can be POST or PUT (current: {}). For insert default is POST, for select GET",
configuration.http_method);
if (!storage_specific_args.empty())
{
@ -754,7 +921,8 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
{
if (args.empty() || args.size() > 3)
throw Exception(
"Storage URL requires 1, 2 or 3 arguments: url, name of used format (taken from file extension by default) and optional compression method.",
"Storage URL requires 1, 2 or 3 arguments: url, name of used format (taken from file extension by default) and optional "
"compression method.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & arg : args)
@ -776,43 +944,45 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
void registerStorageURL(StorageFactory & factory)
{
factory.registerStorage("URL", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext());
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
factory.registerStorage(
"URL",
[](const StorageFactory::Arguments & args)
{
auto value_literal = value.safeGet<String>();
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(std::make_pair(header, value_literal));
}
ASTs & engine_args = args.engine_args;
auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext());
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
{
auto value_literal = value.safeGet<String>();
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(std::make_pair(header, value_literal));
}
return StorageURL::create(
configuration.url,
args.table_id,
configuration.format,
format_settings,
args.columns,
args.constraints,
args.comment,
args.getContext(),
configuration.compression_method,
headers,
configuration.http_method,
partition_by);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::URL,
});
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return StorageURL::create(
configuration.url,
args.table_id,
configuration.format,
format_settings,
args.columns,
args.constraints,
args.comment,
args.getContext(),
configuration.compression_method,
headers,
configuration.http_method,
partition_by);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::URL,
});
}
}

View File

@ -162,4 +162,4 @@ def test_url_reconnect(started_cluster):
thread.join()
assert (int(result), 6581218782194912115)
assert node1.contains_in_log("Error: Timeout: connect timed out")
assert node1.contains_in_log("Timeout: connect timed out")

View File

@ -120,18 +120,14 @@ class CSVHTTPServer(BaseHTTPRequestHandler):
class HTTPServerV6(HTTPServer):
address_family = socket.AF_INET6
def start_server(requests_amount):
def start_server():
if IS_IPV6:
httpd = HTTPServerV6(HTTP_SERVER_ADDRESS, CSVHTTPServer)
else:
httpd = HTTPServer(HTTP_SERVER_ADDRESS, CSVHTTPServer)
def real_func():
for i in range(requests_amount):
httpd.handle_request()
t = threading.Thread(target=real_func)
return t
t = threading.Thread(target=httpd.serve_forever)
return t, httpd
# test section
@ -201,7 +197,7 @@ def main():
'select double, count(*) from {tbl} group by double': "7.7\t2\n9.9\t10"
}
t = start_server(len(select_only_requests) * 2 + (len(insert_requests) + len(select_requests)) * 2)
t, httpd = start_server()
t.start()
# test table with url engine
test_select(table_name="test_table_select", requests=list(select_only_requests.keys()), answers=list(select_only_requests.values()), test_data=test_data)
@ -211,6 +207,8 @@ def main():
test_insert(table_name="test_table_insert", requests_insert=insert_requests, requests_select=list(select_requests.keys()), answers=list(select_requests.values()))
#test insert into table function url
test_insert(requests_insert=insert_requests, requests_select=list(select_requests.keys()), answers=list(select_requests.values()))
httpd.shutdown()
t.join()
print("PASSED")

View File

@ -11,9 +11,11 @@
===native===
{"query":"select * from url('http:\/\/127.0.0.2:8123\/?query=select%201%20format%20Null', CSV, 'a int')","status":"QueryFinish","tracestate":"another custom state","sorted_by_start_time":1}
{"query":"select 1 format Null\n","status":"QueryFinish","tracestate":"another custom state","sorted_by_start_time":1}
{"query":"select 1 format Null\n","status":"QueryFinish","tracestate":"another custom state","sorted_by_start_time":1}
{"query":"select 1 format Null\n","query_status":"QueryFinish","tracestate":"another custom state","sorted_by_finish_time":1}
{"query":"select 1 format Null\n","query_status":"QueryFinish","tracestate":"another custom state","sorted_by_finish_time":1}
{"query":"select * from url('http:\/\/127.0.0.2:8123\/?query=select%201%20format%20Null', CSV, 'a int')","query_status":"QueryFinish","tracestate":"another custom state","sorted_by_finish_time":1}
{"total spans":"2","unique spans":"2","unique non-zero parent spans":"2"}
{"total spans":"3","unique spans":"3","unique non-zero parent spans":"2"}
{"initial query spans with proper parent":"1"}
{"unique non-empty tracestate values":"1"}
===sampled===

View File

@ -121,18 +121,14 @@ class CSVHTTPServer(BaseHTTPRequestHandler):
class HTTPServerV6(HTTPServer):
address_family = socket.AF_INET6
def start_server(requests_amount):
def start_server():
if IS_IPV6:
httpd = HTTPServerV6(HTTP_SERVER_ADDRESS, CSVHTTPServer)
else:
httpd = HTTPServer(HTTP_SERVER_ADDRESS, CSVHTTPServer)
def real_func():
for i in range(requests_amount):
httpd.handle_request()
t = threading.Thread(target=real_func)
return t
t = threading.Thread(target=httpd.serve_forever)
return t, httpd
# test section
@ -217,9 +213,10 @@ def main():
query : 'hello, world',
}
t = start_server(len(list(select_requests_url_auth.keys())))
t, httpd = start_server()
t.start()
test_select(requests=list(select_requests_url_auth.keys()), answers=list(select_requests_url_auth.values()), test_data=test_data)
httpd.shutdown()
t.join()
print("PASSED")

View File

@ -124,7 +124,8 @@ def test_select():
check_answers(query, EXPECTED_ANSWER)
def main():
t = start_server(1)
# HEAD + GET
t = start_server(2)
t.start()
test_select()
t.join()

View File

@ -0,0 +1,262 @@
#!/usr/bin/env python3
from http.server import BaseHTTPRequestHandler, HTTPServer
import socket
import sys
import re
import threading
import os
import traceback
import urllib.request
import subprocess
def is_ipv6(host):
try:
socket.inet_aton(host)
return False
except:
return True
def get_local_port(host, ipv6):
if ipv6:
family = socket.AF_INET6
else:
family = socket.AF_INET
with socket.socket(family) as fd:
fd.bind((host, 0))
return fd.getsockname()[1]
CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "localhost")
CLICKHOUSE_PORT_HTTP = os.environ.get("CLICKHOUSE_PORT_HTTP", "8123")
# Server returns this JSON response.
SERVER_JSON_RESPONSE = """{
"login": "ClickHouse",
"id": 54801242,
"name": "ClickHouse",
"company": null
}"""
PAYLOAD_LEN = len(SERVER_JSON_RESPONSE)
EXPECTED_ANSWER = """{\\n\\t"login": "ClickHouse",\\n\\t"id": 54801242,\\n\\t"name": "ClickHouse",\\n\\t"company": null\\n}"""
#####################################################################################
# This test starts an HTTP server and serves data to clickhouse url-engine based table.
# The objective of this test is to check the ClickHouse server provides a User-Agent
# with HTTP requests.
# In order for it to work ip+port of http server (given below) should be
# accessible from clickhouse server.
#####################################################################################
# IP-address of this host accessible from the outside world. Get the first one
HTTP_SERVER_HOST = (
subprocess.check_output(["hostname", "-i"]).decode("utf-8").strip().split()[0]
)
IS_IPV6 = is_ipv6(HTTP_SERVER_HOST)
HTTP_SERVER_PORT = get_local_port(HTTP_SERVER_HOST, IS_IPV6)
# IP address and port of the HTTP server started from this script.
HTTP_SERVER_ADDRESS = (HTTP_SERVER_HOST, HTTP_SERVER_PORT)
if IS_IPV6:
HTTP_SERVER_URL_STR = (
"http://"
+ f"[{str(HTTP_SERVER_ADDRESS[0])}]:{str(HTTP_SERVER_ADDRESS[1])}"
+ "/"
)
else:
HTTP_SERVER_URL_STR = (
"http://" + f"{str(HTTP_SERVER_ADDRESS[0])}:{str(HTTP_SERVER_ADDRESS[1])}" + "/"
)
def get_ch_answer(query):
host = CLICKHOUSE_HOST
if IS_IPV6:
host = f"[{host}]"
url = os.environ.get(
"CLICKHOUSE_URL",
"http://{host}:{port}".format(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT_HTTP),
)
return urllib.request.urlopen(url, data=query.encode()).read().decode()
def check_answers(query, answer):
ch_answer = get_ch_answer(query)
if ch_answer.strip() != answer.strip():
print("FAIL on query:", query, file=sys.stderr)
print("Expected answer:", answer, file=sys.stderr)
print("Fetched answer :", ch_answer, file=sys.stderr)
raise Exception("Fail on query")
BYTE_RANGE_RE = re.compile(r"bytes=(\d+)-(\d+)?$")
def parse_byte_range(byte_range):
"""Returns the two numbers in 'bytes=123-456' or throws ValueError.
The last number or both numbers may be None.
"""
if byte_range.strip() == "":
return None, None
m = BYTE_RANGE_RE.match(byte_range)
if not m:
raise ValueError(f"Invalid byte range {byte_range}")
first, last = [x and int(x) for x in m.groups()]
if last and last < first:
raise ValueError(f"Invalid byte range {byte_range}")
return first, last
# Server with check for User-Agent headers.
class HttpProcessor(BaseHTTPRequestHandler):
allow_range = False
range_used = False
get_call_num = 0
def send_head(self):
if self.headers["Range"] and HttpProcessor.allow_range:
try:
self.range = parse_byte_range(self.headers["Range"])
except ValueError as e:
self.send_error(400, "Invalid byte range")
return None
else:
self.range = None
if self.range:
first, last = self.range
else:
first, last = None, None
if first == None:
first = 0
payload = SERVER_JSON_RESPONSE.encode()
payload_len = len(payload)
if first and first >= payload_len:
self.send_error(416, "Requested Range Not Satisfiable")
return None
self.send_response(206 if HttpProcessor.allow_range else 200)
self.send_header("Content-type", "application/json")
if HttpProcessor.allow_range:
self.send_header("Accept-Ranges", "bytes")
if last is None or last >= payload_len:
last = payload_len - 1
response_length = last - first + 1
if first or last:
self.send_header("Content-Range", f"bytes {first}-{last}/{payload_len}")
self.send_header(
"Content-Length",
str(response_length) if HttpProcessor.allow_range else str(payload_len),
)
self.end_headers()
return payload
def do_HEAD(self):
self.send_head()
def do_GET(self):
result = self.send_head()
if result == None:
return
HttpProcessor.get_call_num += 1
if not self.range:
self.wfile.write(SERVER_JSON_RESPONSE.encode())
return
HttpProcessor.range_used = True
payload = SERVER_JSON_RESPONSE.encode()
start, stop = self.range
if stop == None:
stop = len(payload) - 1
if start == None:
start = 0
self.wfile.write(SERVER_JSON_RESPONSE.encode()[start : stop + 1])
def log_message(self, format, *args):
return
class HTTPServerV6(HTTPServer):
address_family = socket.AF_INET6
def start_server():
if IS_IPV6:
httpd = HTTPServerV6(HTTP_SERVER_ADDRESS, HttpProcessor)
else:
httpd = HTTPServer(HTTP_SERVER_ADDRESS, HttpProcessor)
t = threading.Thread(target=httpd.serve_forever)
return t, httpd
#####################################################################
# Testing area.
#####################################################################
def test_select(download_buffer_size):
global HTTP_SERVER_URL_STR
query = f"SELECT * FROM url('{HTTP_SERVER_URL_STR}','JSONAsString') SETTINGS max_download_buffer_size={download_buffer_size};"
check_answers(query, EXPECTED_ANSWER)
def run_test(allow_range, download_buffer_size=20):
HttpProcessor.range_used = False
HttpProcessor.get_call_num = 0
HttpProcessor.allow_range = allow_range
t, httpd = start_server()
t.start()
test_select(download_buffer_size)
expected_get_call_num = (PAYLOAD_LEN - 1) // download_buffer_size + 1
if allow_range:
if not HttpProcessor.range_used:
raise Exception("HTTP Range was not used when supported")
if expected_get_call_num != HttpProcessor.get_call_num:
raise Exception(
f"Invalid amount of GET calls with Range. Expected {expected_get_call_num}, actual {HttpProcessor.get_call_num}"
)
else:
if HttpProcessor.range_used:
raise Exception("HTTP Range used while not supported")
httpd.shutdown()
t.join()
print("PASSED")
def main():
run_test(allow_range=False)
run_test(allow_range=True, download_buffer_size=20)
run_test(allow_range=True, download_buffer_size=10)
if __name__ == "__main__":
try:
main()
except Exception as ex:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback, file=sys.stderr)
print(ex, file=sys.stderr)
sys.stderr.flush()
os._exit(1)

View File

@ -0,0 +1,3 @@
PASSED
PASSED
PASSED

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
python3 "$CURDIR"/02233_HTTP_ranged.python