mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge branch 'master' into case-insensitive-column-matching
This commit is contained in:
commit
08cb71a067
@ -45,6 +45,7 @@
|
||||
#include <Core/ServerUUID.h>
|
||||
#include <IO/HTTPCommon.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/IOThreadPool.h>
|
||||
#include <IO/UseSSL.h>
|
||||
#include <Interpreters/AsynchronousMetrics.h>
|
||||
#include <Interpreters/DDLWorker.h>
|
||||
@ -554,6 +555,10 @@ if (ThreadFuzzer::instance().isEffective())
|
||||
config().getUInt("thread_pool_queue_size", 10000)
|
||||
);
|
||||
|
||||
IOThreadPool::initialize(
|
||||
config().getUInt("max_io_thread_pool_size", 100),
|
||||
config().getUInt("max_io_thread_pool_free_size", 0),
|
||||
config().getUInt("io_thread_pool_queue_size", 10000));
|
||||
|
||||
/// Initialize global local cache for remote filesystem.
|
||||
if (config().has("local_cache_for_remote_fs"))
|
||||
|
@ -113,5 +113,35 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class SynchronizedArenaWithFreeLists : private ArenaWithFreeLists
|
||||
{
|
||||
public:
|
||||
explicit SynchronizedArenaWithFreeLists(
|
||||
const size_t initial_size = 4096, const size_t growth_factor = 2,
|
||||
const size_t linear_growth_threshold = 128 * 1024 * 1024)
|
||||
: ArenaWithFreeLists{initial_size, growth_factor, linear_growth_threshold}
|
||||
{}
|
||||
|
||||
char * alloc(const size_t size)
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
return ArenaWithFreeLists::alloc(size);
|
||||
}
|
||||
|
||||
void free(char * ptr, const size_t size)
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
return ArenaWithFreeLists::free(ptr, size);
|
||||
}
|
||||
|
||||
/// Size of the allocated pool in bytes
|
||||
size_t size() const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
return ArenaWithFreeLists::size();
|
||||
}
|
||||
private:
|
||||
mutable std::mutex mutex;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -47,6 +47,8 @@ class IColumn;
|
||||
M(UInt64, max_insert_delayed_streams_for_parallel_write, 0, "The maximum number of streams (columns) to delay final part flush. Default - auto (1000 in case of underlying storage supports parallel write, for example S3 and disabled otherwise)", 0) \
|
||||
M(UInt64, max_final_threads, 16, "The maximum number of threads to read from table with FINAL.", 0) \
|
||||
M(MaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \
|
||||
M(MaxThreads, max_download_threads, 4, "The maximum number of threads to download data (e.g. for URL engine).", 0) \
|
||||
M(UInt64, max_download_buffer_size, 10*1024*1024, "The maximal size of buffer for parallel downloading (e.g. for URL engine) per each thread.", 0) \
|
||||
M(UInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \
|
||||
M(UInt64, max_distributed_connections, 1024, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).", 0) \
|
||||
M(UInt64, max_query_size, DBMS_DEFAULT_MAX_QUERY_SIZE, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)", 0) \
|
||||
|
34
src/IO/IOThreadPool.cpp
Normal file
34
src/IO/IOThreadPool.cpp
Normal file
@ -0,0 +1,34 @@
|
||||
#include <IO/IOThreadPool.h>
|
||||
#include "Core/Field.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
std::unique_ptr<ThreadPool> IOThreadPool::instance;
|
||||
|
||||
void IOThreadPool::initialize(size_t max_threads, size_t max_free_threads, size_t queue_size)
|
||||
{
|
||||
if (instance)
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "The IO thread pool is initialized twice");
|
||||
}
|
||||
|
||||
instance = std::make_unique<ThreadPool>(max_threads, max_free_threads, queue_size, false /*shutdown_on_exception*/);
|
||||
}
|
||||
|
||||
ThreadPool & IOThreadPool::get()
|
||||
{
|
||||
if (!instance)
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "The IO thread pool is not initialized");
|
||||
}
|
||||
|
||||
return *instance;
|
||||
}
|
||||
|
||||
}
|
20
src/IO/IOThreadPool.h
Normal file
20
src/IO/IOThreadPool.h
Normal file
@ -0,0 +1,20 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/*
|
||||
* ThreadPool used for the IO.
|
||||
*/
|
||||
class IOThreadPool
|
||||
{
|
||||
static std::unique_ptr<ThreadPool> instance;
|
||||
|
||||
public:
|
||||
static void initialize(size_t max_threads, size_t max_free_threads, size_t queue_size);
|
||||
static ThreadPool & get();
|
||||
};
|
||||
|
||||
}
|
290
src/IO/ParallelReadBuffer.cpp
Normal file
290
src/IO/ParallelReadBuffer.cpp
Normal file
@ -0,0 +1,290 @@
|
||||
#include <IO/ParallelReadBuffer.h>
|
||||
#include <base/logger_useful.h>
|
||||
#include <Poco/Logger.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int CANNOT_SEEK_THROUGH_FILE;
|
||||
extern const int SEEK_POSITION_OUT_OF_BOUND;
|
||||
|
||||
}
|
||||
|
||||
ParallelReadBuffer::ParallelReadBuffer(
|
||||
std::unique_ptr<ReadBufferFactory> reader_factory_,
|
||||
ThreadPool * pool_,
|
||||
size_t max_working_readers_,
|
||||
WorkerSetup worker_setup_,
|
||||
WorkerCleanup worker_cleanup_)
|
||||
: SeekableReadBufferWithSize(nullptr, 0)
|
||||
, pool(pool_)
|
||||
, max_working_readers(max_working_readers_)
|
||||
, reader_factory(std::move(reader_factory_))
|
||||
, worker_setup(std::move(worker_setup_))
|
||||
, worker_cleanup(std::move(worker_cleanup_))
|
||||
{
|
||||
std::unique_lock<std::mutex> lock{mutex};
|
||||
addReaders(lock);
|
||||
}
|
||||
|
||||
bool ParallelReadBuffer::addReaderToPool(std::unique_lock<std::mutex> & /*buffer_lock*/)
|
||||
{
|
||||
auto reader = reader_factory->getReader();
|
||||
if (!reader)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader)));
|
||||
|
||||
pool->scheduleOrThrow(
|
||||
[&, this, worker = std::move(worker)]() mutable
|
||||
{
|
||||
ThreadStatus thread_status;
|
||||
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
++active_working_reader;
|
||||
}
|
||||
|
||||
SCOPE_EXIT({
|
||||
worker_cleanup(thread_status);
|
||||
|
||||
std::lock_guard lock{mutex};
|
||||
--active_working_reader;
|
||||
if (active_working_reader == 0)
|
||||
{
|
||||
readers_done.notify_all();
|
||||
}
|
||||
});
|
||||
worker_setup(thread_status);
|
||||
|
||||
readerThreadFunction(std::move(worker));
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
void ParallelReadBuffer::addReaders(std::unique_lock<std::mutex> & buffer_lock)
|
||||
{
|
||||
while (read_workers.size() < max_working_readers && addReaderToPool(buffer_lock))
|
||||
;
|
||||
}
|
||||
|
||||
off_t ParallelReadBuffer::seek(off_t offset, int whence)
|
||||
{
|
||||
if (whence != SEEK_SET)
|
||||
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
||||
|
||||
if (offset < 0)
|
||||
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
||||
|
||||
if (!working_buffer.empty() && static_cast<size_t>(offset) >= current_position - working_buffer.size() && offset < current_position)
|
||||
{
|
||||
pos = working_buffer.end() - (current_position - offset);
|
||||
assert(pos >= working_buffer.begin());
|
||||
assert(pos <= working_buffer.end());
|
||||
|
||||
return offset;
|
||||
}
|
||||
|
||||
std::unique_lock lock{mutex};
|
||||
const auto offset_is_in_range
|
||||
= [&](const auto & range) { return static_cast<size_t>(offset) >= range.left && static_cast<size_t>(offset) <= *range.right; };
|
||||
|
||||
while (!read_workers.empty() && (offset < current_position || !offset_is_in_range(read_workers.front()->range)))
|
||||
{
|
||||
read_workers.front()->cancel = true;
|
||||
read_workers.pop_front();
|
||||
}
|
||||
|
||||
if (!read_workers.empty())
|
||||
{
|
||||
auto & front_worker = read_workers.front();
|
||||
auto & segments = front_worker->segments;
|
||||
current_position = front_worker->range.left;
|
||||
while (true)
|
||||
{
|
||||
next_condvar.wait(lock, [&] { return emergency_stop || !segments.empty(); });
|
||||
|
||||
if (emergency_stop)
|
||||
handleEmergencyStop();
|
||||
|
||||
auto next_segment = front_worker->nextSegment();
|
||||
if (static_cast<size_t>(offset) < current_position + next_segment.size())
|
||||
{
|
||||
current_segment = std::move(next_segment);
|
||||
working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size());
|
||||
current_position += current_segment.size();
|
||||
pos = working_buffer.end() - (current_position - offset);
|
||||
addReaders(lock);
|
||||
return offset;
|
||||
}
|
||||
|
||||
current_position += next_segment.size();
|
||||
}
|
||||
}
|
||||
|
||||
lock.unlock();
|
||||
finishAndWait();
|
||||
|
||||
reader_factory->seek(offset, whence);
|
||||
all_completed = false;
|
||||
read_workers.clear();
|
||||
|
||||
current_position = offset;
|
||||
resetWorkingBuffer();
|
||||
|
||||
emergency_stop = false;
|
||||
|
||||
lock.lock();
|
||||
addReaders(lock);
|
||||
return offset;
|
||||
}
|
||||
|
||||
std::optional<size_t> ParallelReadBuffer::getTotalSize()
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
return reader_factory->getTotalSize();
|
||||
}
|
||||
|
||||
off_t ParallelReadBuffer::getPosition()
|
||||
{
|
||||
return current_position - available();
|
||||
}
|
||||
|
||||
bool ParallelReadBuffer::currentWorkerReady() const
|
||||
{
|
||||
assert(!read_workers.empty());
|
||||
return read_workers.front()->finished || !read_workers.front()->segments.empty();
|
||||
}
|
||||
|
||||
bool ParallelReadBuffer::currentWorkerCompleted() const
|
||||
{
|
||||
assert(!read_workers.empty());
|
||||
return read_workers.front()->finished && read_workers.front()->segments.empty();
|
||||
}
|
||||
|
||||
void ParallelReadBuffer::handleEmergencyStop()
|
||||
{
|
||||
// this can only be called from the main thread when there is an exception
|
||||
assert(background_exception);
|
||||
if (background_exception)
|
||||
std::rethrow_exception(background_exception);
|
||||
}
|
||||
|
||||
bool ParallelReadBuffer::nextImpl()
|
||||
{
|
||||
if (all_completed)
|
||||
return false;
|
||||
|
||||
while (true)
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
next_condvar.wait(
|
||||
lock,
|
||||
[this]()
|
||||
{
|
||||
/// Check if no more readers left or current reader can be processed
|
||||
return emergency_stop || currentWorkerReady();
|
||||
});
|
||||
|
||||
bool worker_removed = false;
|
||||
/// Remove completed units
|
||||
while (!read_workers.empty() && currentWorkerCompleted() && !emergency_stop)
|
||||
{
|
||||
read_workers.pop_front();
|
||||
worker_removed = true;
|
||||
}
|
||||
|
||||
if (emergency_stop)
|
||||
handleEmergencyStop();
|
||||
|
||||
if (worker_removed)
|
||||
addReaders(lock);
|
||||
|
||||
/// All readers processed, stop
|
||||
if (read_workers.empty())
|
||||
{
|
||||
all_completed = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
auto & front_worker = read_workers.front();
|
||||
/// Read data from first segment of the first reader
|
||||
if (!front_worker->segments.empty())
|
||||
{
|
||||
current_segment = front_worker->nextSegment();
|
||||
if (currentWorkerCompleted())
|
||||
{
|
||||
read_workers.pop_front();
|
||||
all_completed = !addReaderToPool(lock) && read_workers.empty();
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size());
|
||||
current_position += working_buffer.size();
|
||||
return true;
|
||||
}
|
||||
|
||||
void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
|
||||
{
|
||||
try
|
||||
{
|
||||
while (!emergency_stop && !read_worker->cancel)
|
||||
{
|
||||
if (!read_worker->reader->next())
|
||||
throw Exception("Failed to read all the data from the reader", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (emergency_stop || read_worker->cancel)
|
||||
break;
|
||||
|
||||
Buffer buffer = read_worker->reader->buffer();
|
||||
size_t bytes_to_copy = std::min(buffer.size(), read_worker->bytes_left);
|
||||
Segment new_segment(bytes_to_copy, &arena);
|
||||
memcpy(new_segment.data(), buffer.begin(), bytes_to_copy);
|
||||
read_worker->reader->ignore(bytes_to_copy);
|
||||
read_worker->bytes_left -= bytes_to_copy;
|
||||
{
|
||||
/// New data ready to be read
|
||||
std::lock_guard lock(mutex);
|
||||
read_worker->segments.emplace_back(std::move(new_segment));
|
||||
read_worker->finished = read_worker->bytes_left == 0;
|
||||
next_condvar.notify_all();
|
||||
}
|
||||
|
||||
if (read_worker->finished)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
onBackgroundException();
|
||||
}
|
||||
}
|
||||
|
||||
void ParallelReadBuffer::onBackgroundException()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (!background_exception)
|
||||
{
|
||||
background_exception = std::current_exception();
|
||||
}
|
||||
emergency_stop = true;
|
||||
next_condvar.notify_all();
|
||||
}
|
||||
|
||||
void ParallelReadBuffer::finishAndWait()
|
||||
{
|
||||
emergency_stop = true;
|
||||
|
||||
std::unique_lock lock{mutex};
|
||||
readers_done.wait(lock, [&] { return active_working_reader == 0; });
|
||||
}
|
||||
|
||||
}
|
174
src/IO/ParallelReadBuffer.h
Normal file
174
src/IO/ParallelReadBuffer.h
Normal file
@ -0,0 +1,174 @@
|
||||
#pragma once
|
||||
|
||||
#include <IO/BufferWithOwnMemory.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
#include <Common/ArenaWithFreeLists.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/**
|
||||
* Reads from multiple ReadBuffers in parallel.
|
||||
* Preserves order of readers obtained from ReadBufferFactory.
|
||||
*
|
||||
* It consumes multiple readers and yields data from them in order as it passed.
|
||||
* Each working reader save segments of data to internal queue.
|
||||
*
|
||||
* ParallelReadBuffer in nextImpl method take first available segment from first reader in deque and fed it to user.
|
||||
* When first reader finish reading, they will be removed from worker deque and data from next reader consumed.
|
||||
*
|
||||
* Number of working readers limited by max_working_readers.
|
||||
*/
|
||||
class ParallelReadBuffer : public SeekableReadBufferWithSize
|
||||
{
|
||||
private:
|
||||
/// Blocks until data occurred in the first reader or this reader indicate finishing
|
||||
/// Finished readers removed from queue and data from next readers processed
|
||||
bool nextImpl() override;
|
||||
|
||||
class Segment : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
Segment(size_t size_, SynchronizedArenaWithFreeLists * arena_) : arena(arena_), m_data(arena->alloc(size_)), m_size(size_) { }
|
||||
|
||||
Segment() = default;
|
||||
|
||||
Segment(Segment && other) noexcept : arena(other.arena)
|
||||
{
|
||||
std::swap(m_data, other.m_data);
|
||||
std::swap(m_size, other.m_size);
|
||||
}
|
||||
|
||||
Segment & operator=(Segment && other) noexcept
|
||||
{
|
||||
arena = other.arena;
|
||||
std::swap(m_data, other.m_data);
|
||||
std::swap(m_size, other.m_size);
|
||||
return *this;
|
||||
}
|
||||
|
||||
~Segment()
|
||||
{
|
||||
if (m_data)
|
||||
{
|
||||
arena->free(m_data, m_size);
|
||||
}
|
||||
}
|
||||
|
||||
auto data() const noexcept { return m_data; }
|
||||
auto size() const noexcept { return m_size; }
|
||||
|
||||
private:
|
||||
SynchronizedArenaWithFreeLists * arena{nullptr};
|
||||
char * m_data{nullptr};
|
||||
size_t m_size{0};
|
||||
};
|
||||
|
||||
public:
|
||||
class ReadBufferFactory
|
||||
{
|
||||
public:
|
||||
virtual SeekableReadBufferPtr getReader() = 0;
|
||||
virtual ~ReadBufferFactory() = default;
|
||||
virtual off_t seek(off_t off, int whence) = 0;
|
||||
virtual std::optional<size_t> getTotalSize() = 0;
|
||||
};
|
||||
|
||||
using WorkerSetup = std::function<void(ThreadStatus &)>;
|
||||
using WorkerCleanup = std::function<void(ThreadStatus &)>;
|
||||
explicit ParallelReadBuffer(
|
||||
std::unique_ptr<ReadBufferFactory> reader_factory_,
|
||||
ThreadPool * pool,
|
||||
size_t max_working_readers,
|
||||
WorkerSetup worker_setup = {},
|
||||
WorkerCleanup worker_cleanup = {});
|
||||
|
||||
~ParallelReadBuffer() override { finishAndWait(); }
|
||||
|
||||
off_t seek(off_t off, int whence) override;
|
||||
std::optional<size_t> getTotalSize() override;
|
||||
off_t getPosition() override;
|
||||
|
||||
private:
|
||||
/// Reader in progress with a list of read segments
|
||||
struct ReadWorker
|
||||
{
|
||||
explicit ReadWorker(SeekableReadBufferPtr reader_) : reader(std::move(reader_)), range(reader->getRemainingReadRange())
|
||||
{
|
||||
assert(range.right);
|
||||
bytes_left = *range.right - range.left + 1;
|
||||
}
|
||||
|
||||
Segment nextSegment()
|
||||
{
|
||||
assert(!segments.empty());
|
||||
auto next_segment = std::move(segments.front());
|
||||
segments.pop_front();
|
||||
range.left += next_segment.size();
|
||||
return next_segment;
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr reader;
|
||||
std::deque<Segment> segments;
|
||||
bool finished{false};
|
||||
SeekableReadBuffer::Range range;
|
||||
size_t bytes_left{0};
|
||||
std::atomic_bool cancel{false};
|
||||
};
|
||||
|
||||
using ReadWorkerPtr = std::shared_ptr<ReadWorker>;
|
||||
|
||||
/// First worker in deque have new data or processed all available amount
|
||||
bool currentWorkerReady() const;
|
||||
/// First worker in deque processed and flushed all data
|
||||
bool currentWorkerCompleted() const;
|
||||
|
||||
void handleEmergencyStop();
|
||||
|
||||
void addReaders(std::unique_lock<std::mutex> & buffer_lock);
|
||||
bool addReaderToPool(std::unique_lock<std::mutex> & buffer_lock);
|
||||
|
||||
/// Process read_worker, read data and save into internal segments queue
|
||||
void readerThreadFunction(ReadWorkerPtr read_worker);
|
||||
|
||||
void onBackgroundException();
|
||||
void finishAndWait();
|
||||
|
||||
SynchronizedArenaWithFreeLists arena;
|
||||
|
||||
Segment current_segment;
|
||||
|
||||
ThreadPool * pool;
|
||||
size_t max_working_readers;
|
||||
size_t active_working_reader{0};
|
||||
// Triggered when all reader workers are done
|
||||
std::condition_variable readers_done;
|
||||
|
||||
std::unique_ptr<ReadBufferFactory> reader_factory;
|
||||
|
||||
WorkerSetup worker_setup;
|
||||
WorkerCleanup worker_cleanup;
|
||||
|
||||
/**
|
||||
* FIFO queue of readers.
|
||||
* Each worker contains reader itself and downloaded segments.
|
||||
* When reader read all available data it will be removed from
|
||||
* deque and data from next reader will be consumed to user.
|
||||
*/
|
||||
std::deque<ReadWorkerPtr> read_workers;
|
||||
|
||||
std::mutex mutex;
|
||||
/// Triggered when new data available
|
||||
std::condition_variable next_condvar;
|
||||
|
||||
std::exception_ptr background_exception = nullptr;
|
||||
std::atomic_bool emergency_stop{false};
|
||||
|
||||
off_t current_position{0};
|
||||
|
||||
bool all_completed{false};
|
||||
};
|
||||
|
||||
}
|
@ -1,32 +1,33 @@
|
||||
#pragma once
|
||||
|
||||
#include <functional>
|
||||
#include <base/types.h>
|
||||
#include <base/sleep.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <IO/HTTPCommon.h>
|
||||
#include <IO/ParallelReadBuffer.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <IO/ReadBufferFromIStream.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <base/logger_useful.h>
|
||||
#include <base/sleep.h>
|
||||
#include <base/types.h>
|
||||
#include <Poco/Any.h>
|
||||
#include <Poco/Net/HTTPBasicCredentials.h>
|
||||
#include <Poco/Net/HTTPClientSession.h>
|
||||
#include <Poco/Net/HTTPRequest.h>
|
||||
#include <Poco/Net/HTTPResponse.h>
|
||||
#include <Poco/URI.h>
|
||||
#include <Poco/URIStreamFactory.h>
|
||||
#include <Poco/Version.h>
|
||||
#include <Common/DNSResolver.h>
|
||||
#include <Common/RemoteHostFilter.h>
|
||||
#include <Common/config.h>
|
||||
#include <Common/config_version.h>
|
||||
#include <base/logger_useful.h>
|
||||
#include <Poco/URIStreamFactory.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event ReadBufferSeekCancelConnection;
|
||||
extern const Event ReadBufferSeekCancelConnection;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -48,7 +49,7 @@ class UpdatableSessionBase
|
||||
{
|
||||
protected:
|
||||
SessionPtr session;
|
||||
UInt64 redirects { 0 };
|
||||
UInt64 redirects{0};
|
||||
Poco::URI initial_uri;
|
||||
ConnectionTimeouts timeouts;
|
||||
UInt64 max_redirects;
|
||||
@ -56,19 +57,12 @@ protected:
|
||||
public:
|
||||
virtual void buildNewSession(const Poco::URI & uri) = 0;
|
||||
|
||||
explicit UpdatableSessionBase(const Poco::URI uri,
|
||||
const ConnectionTimeouts & timeouts_,
|
||||
UInt64 max_redirects_)
|
||||
: initial_uri { uri }
|
||||
, timeouts { timeouts_ }
|
||||
, max_redirects { max_redirects_ }
|
||||
explicit UpdatableSessionBase(const Poco::URI uri, const ConnectionTimeouts & timeouts_, UInt64 max_redirects_)
|
||||
: initial_uri{uri}, timeouts{timeouts_}, max_redirects{max_redirects_}
|
||||
{
|
||||
}
|
||||
|
||||
SessionPtr getSession()
|
||||
{
|
||||
return session;
|
||||
}
|
||||
SessionPtr getSession() { return session; }
|
||||
|
||||
void updateSession(const Poco::URI & uri)
|
||||
{
|
||||
@ -99,7 +93,7 @@ namespace detail
|
||||
/// HTTP range, including right bound [begin, end].
|
||||
struct Range
|
||||
{
|
||||
size_t begin = 0;
|
||||
std::optional<size_t> begin;
|
||||
std::optional<size_t> end;
|
||||
};
|
||||
|
||||
@ -144,10 +138,9 @@ namespace detail
|
||||
return read_range.begin || read_range.end || retry_with_range_header;
|
||||
}
|
||||
|
||||
size_t getOffset() const
|
||||
{
|
||||
return read_range.begin + offset_from_begin_pos;
|
||||
}
|
||||
size_t getRangeBegin() const { return read_range.begin.value_or(0); }
|
||||
|
||||
size_t getOffset() const { return getRangeBegin() + offset_from_begin_pos; }
|
||||
|
||||
std::istream * callImpl(Poco::URI uri_, Poco::Net::HTTPResponse & response, const std::string & method_)
|
||||
{
|
||||
@ -161,7 +154,7 @@ namespace detail
|
||||
if (out_stream_callback)
|
||||
request.setChunkedTransferEncoding(true);
|
||||
|
||||
for (auto & http_header_entry: http_header_entries)
|
||||
for (auto & http_header_entry : http_header_entries)
|
||||
request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry));
|
||||
|
||||
if (withPartialContent())
|
||||
@ -207,26 +200,14 @@ namespace detail
|
||||
std::optional<size_t> getTotalSize() override
|
||||
{
|
||||
if (read_range.end)
|
||||
return *read_range.end - read_range.begin;
|
||||
return *read_range.end - getRangeBegin();
|
||||
|
||||
Poco::Net::HTTPResponse response;
|
||||
for (size_t i = 0; i < 10; ++i)
|
||||
{
|
||||
try
|
||||
{
|
||||
call(response, Poco::Net::HTTPRequest::HTTP_HEAD);
|
||||
|
||||
while (isRedirect(response.getStatus()))
|
||||
{
|
||||
Poco::URI uri_redirect(response.get("Location"));
|
||||
if (remote_host_filter)
|
||||
remote_host_filter->checkURL(uri_redirect);
|
||||
|
||||
session->updateSession(uri_redirect);
|
||||
|
||||
istr = callImpl(uri_redirect, response, method);
|
||||
}
|
||||
|
||||
callWithRedirects(response, Poco::Net::HTTPRequest::HTTP_HEAD);
|
||||
break;
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
@ -236,7 +217,7 @@ namespace detail
|
||||
}
|
||||
|
||||
if (response.hasContentLength())
|
||||
read_range.end = read_range.begin + response.getContentLength();
|
||||
read_range.end = getRangeBegin() + response.getContentLength();
|
||||
|
||||
return read_range.end;
|
||||
}
|
||||
@ -252,6 +233,21 @@ namespace detail
|
||||
|
||||
InitializeError initialization_error = InitializeError::NONE;
|
||||
|
||||
private:
|
||||
void setupExternalBuffer()
|
||||
{
|
||||
/**
|
||||
* use_external_buffer -- means we read into the buffer which
|
||||
* was passed to us from somewhere else. We do not check whether
|
||||
* previously returned buffer was read or not (no hasPendingData() check is needed),
|
||||
* because this branch means we are prefetching data,
|
||||
* each nextImpl() call we can fill a different buffer.
|
||||
*/
|
||||
impl->set(internal_buffer.begin(), internal_buffer.size());
|
||||
assert(working_buffer.begin() != nullptr);
|
||||
assert(!internal_buffer.empty());
|
||||
}
|
||||
|
||||
public:
|
||||
using NextCallback = std::function<void(size_t)>;
|
||||
using OutStreamCallback = std::function<void(std::ostream &)>;
|
||||
@ -276,7 +272,7 @@ namespace detail
|
||||
, session {session_}
|
||||
, out_stream_callback {out_stream_callback_}
|
||||
, credentials {credentials_}
|
||||
, http_header_entries {http_header_entries_}
|
||||
, http_header_entries {std::move(http_header_entries_)}
|
||||
, remote_host_filter {remote_host_filter_}
|
||||
, buffer_size {buffer_size_}
|
||||
, use_external_buffer {use_external_buffer_}
|
||||
@ -287,18 +283,21 @@ namespace detail
|
||||
{
|
||||
if (settings.http_max_tries <= 0 || settings.http_retry_initial_backoff_ms <= 0
|
||||
|| settings.http_retry_initial_backoff_ms >= settings.http_retry_max_backoff_ms)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Invalid setting for http backoff, "
|
||||
"must be http_max_tries >= 1 (current is {}) and "
|
||||
"0 < http_retry_initial_backoff_ms < settings.http_retry_max_backoff_ms (now 0 < {} < {})",
|
||||
settings.http_max_tries, settings.http_retry_initial_backoff_ms, settings.http_retry_max_backoff_ms);
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Invalid setting for http backoff, "
|
||||
"must be http_max_tries >= 1 (current is {}) and "
|
||||
"0 < http_retry_initial_backoff_ms < settings.http_retry_max_backoff_ms (now 0 < {} < {})",
|
||||
settings.http_max_tries,
|
||||
settings.http_retry_initial_backoff_ms,
|
||||
settings.http_retry_max_backoff_ms);
|
||||
|
||||
// Configure User-Agent if it not already set.
|
||||
const std::string user_agent = "User-Agent";
|
||||
auto iter = std::find_if(http_header_entries.begin(), http_header_entries.end(), [&user_agent](const HTTPHeaderEntry & entry)
|
||||
{
|
||||
return std::get<0>(entry) == user_agent;
|
||||
});
|
||||
auto iter = std::find_if(
|
||||
http_header_entries.begin(),
|
||||
http_header_entries.end(),
|
||||
[&user_agent](const HTTPHeaderEntry & entry) { return std::get<0>(entry) == user_agent; });
|
||||
|
||||
if (iter == http_header_entries.end())
|
||||
{
|
||||
@ -313,7 +312,36 @@ namespace detail
|
||||
}
|
||||
}
|
||||
|
||||
void call(Poco::Net::HTTPResponse & response, const String & method_)
|
||||
static bool isRetriableError(const Poco::Net::HTTPResponse::HTTPStatus http_status) noexcept
|
||||
{
|
||||
constexpr std::array non_retriable_errors{
|
||||
Poco::Net::HTTPResponse::HTTPStatus::HTTP_BAD_REQUEST,
|
||||
Poco::Net::HTTPResponse::HTTPStatus::HTTP_UNAUTHORIZED,
|
||||
Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND,
|
||||
Poco::Net::HTTPResponse::HTTPStatus::HTTP_FORBIDDEN,
|
||||
Poco::Net::HTTPResponse::HTTPStatus::HTTP_METHOD_NOT_ALLOWED};
|
||||
|
||||
return std::all_of(
|
||||
non_retriable_errors.begin(), non_retriable_errors.end(), [&](const auto status) { return http_status != status; });
|
||||
}
|
||||
|
||||
void callWithRedirects(Poco::Net::HTTPResponse & response, const String & method_, bool throw_on_all_errors = false)
|
||||
{
|
||||
call(response, method_, throw_on_all_errors);
|
||||
|
||||
while (isRedirect(response.getStatus()))
|
||||
{
|
||||
Poco::URI uri_redirect(response.get("Location"));
|
||||
if (remote_host_filter)
|
||||
remote_host_filter->checkURL(uri_redirect);
|
||||
|
||||
session->updateSession(uri_redirect);
|
||||
|
||||
istr = callImpl(uri_redirect, response, method);
|
||||
}
|
||||
}
|
||||
|
||||
void call(Poco::Net::HTTPResponse & response, const String & method_, bool throw_on_all_errors = false)
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -321,18 +349,18 @@ namespace detail
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
if (throw_on_all_errors)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
|
||||
auto http_status = response.getStatus();
|
||||
|
||||
if (http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND
|
||||
&& http_skip_not_found_url)
|
||||
if (http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND && http_skip_not_found_url)
|
||||
{
|
||||
initialization_error = InitializeError::SKIP_NOT_FOUND_URL;
|
||||
}
|
||||
else if (http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_BAD_REQUEST
|
||||
|| http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_UNAUTHORIZED
|
||||
|| http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND
|
||||
|| http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_FORBIDDEN
|
||||
|| http_status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_METHOD_NOT_ALLOWED)
|
||||
else if (!isRetriableError(http_status))
|
||||
{
|
||||
initialization_error = InitializeError::NON_RETRIABLE_ERROR;
|
||||
exception = std::current_exception();
|
||||
@ -372,12 +400,14 @@ namespace detail
|
||||
if (withPartialContent() && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
|
||||
{
|
||||
/// Having `200 OK` instead of `206 Partial Content` is acceptable in case we retried with range.begin == 0.
|
||||
if (read_range.begin)
|
||||
if (read_range.begin && *read_range.begin != 0)
|
||||
{
|
||||
if (!exception)
|
||||
exception = std::make_exception_ptr(
|
||||
Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE,
|
||||
"Cannot read with range: [{}, {}]", read_range.begin, read_range.end ? *read_range.end : '-'));
|
||||
exception = std::make_exception_ptr(Exception(
|
||||
ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE,
|
||||
"Cannot read with range: [{}, {}]",
|
||||
*read_range.begin,
|
||||
read_range.end ? *read_range.end : '-'));
|
||||
|
||||
initialization_error = InitializeError::NON_RETRIABLE_ERROR;
|
||||
return;
|
||||
@ -386,12 +416,12 @@ namespace detail
|
||||
{
|
||||
/// We could have range.begin == 0 and range.end != 0 in case of DiskWeb and failing to read with partial content
|
||||
/// will affect only performance, so a warning is enough.
|
||||
LOG_WARNING(log, "Unable to read with range header: [{}, {}]", read_range.begin, *read_range.end);
|
||||
LOG_WARNING(log, "Unable to read with range header: [{}, {}]", getRangeBegin(), *read_range.end);
|
||||
}
|
||||
}
|
||||
|
||||
if (!offset_from_begin_pos && !read_range.end && response.hasContentLength())
|
||||
read_range.end = read_range.begin + response.getContentLength();
|
||||
read_range.end = getRangeBegin() + response.getContentLength();
|
||||
|
||||
try
|
||||
{
|
||||
@ -399,12 +429,7 @@ namespace detail
|
||||
|
||||
if (use_external_buffer)
|
||||
{
|
||||
/**
|
||||
* See comment 30 lines below.
|
||||
*/
|
||||
impl->set(internal_buffer.begin(), internal_buffer.size());
|
||||
assert(working_buffer.begin() != nullptr);
|
||||
assert(!internal_buffer.empty());
|
||||
setupExternalBuffer();
|
||||
}
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
@ -426,23 +451,17 @@ namespace detail
|
||||
if (next_callback)
|
||||
next_callback(count());
|
||||
|
||||
if (read_range.end && getOffset() == read_range.end.value())
|
||||
if (read_range.end && getOffset() > read_range.end.value())
|
||||
{
|
||||
assert(getOffset() == read_range.end.value() + 1);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (impl)
|
||||
{
|
||||
if (use_external_buffer)
|
||||
{
|
||||
/**
|
||||
* use_external_buffer -- means we read into the buffer which
|
||||
* was passed to us from somewhere else. We do not check whether
|
||||
* previously returned buffer was read or not (no hasPendingData() check is needed),
|
||||
* because this branch means we are prefetching data,
|
||||
* each nextImpl() call we can fill a different buffer.
|
||||
*/
|
||||
impl->set(internal_buffer.begin(), internal_buffer.size());
|
||||
assert(working_buffer.begin() != nullptr);
|
||||
assert(!internal_buffer.empty());
|
||||
setupExternalBuffer();
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -477,10 +496,7 @@ namespace detail
|
||||
|
||||
if (use_external_buffer)
|
||||
{
|
||||
/// See comment 40 lines above.
|
||||
impl->set(internal_buffer.begin(), internal_buffer.size());
|
||||
assert(working_buffer.begin() != nullptr);
|
||||
assert(!internal_buffer.empty());
|
||||
setupExternalBuffer();
|
||||
}
|
||||
}
|
||||
|
||||
@ -498,13 +514,18 @@ namespace detail
|
||||
if (!can_retry_request)
|
||||
throw;
|
||||
|
||||
LOG_ERROR(log,
|
||||
"HTTP request to `{}` failed at try {}/{} with bytes read: {}/{}. "
|
||||
"Error: {}. (Current backoff wait is {}/{} ms)",
|
||||
uri.toString(), i + 1, settings.http_max_tries,
|
||||
getOffset(), read_range.end ? toString(*read_range.end) : "unknown",
|
||||
e.displayText(),
|
||||
milliseconds_to_wait, settings.http_retry_max_backoff_ms);
|
||||
LOG_ERROR(
|
||||
log,
|
||||
"HTTP request to `{}` failed at try {}/{} with bytes read: {}/{}. "
|
||||
"Error: {}. (Current backoff wait is {}/{} ms)",
|
||||
uri.toString(),
|
||||
i + 1,
|
||||
settings.http_max_tries,
|
||||
getOffset(),
|
||||
read_range.end ? toString(*read_range.end) : "unknown",
|
||||
e.displayText(),
|
||||
milliseconds_to_wait,
|
||||
settings.http_retry_max_backoff_ms);
|
||||
|
||||
retry_with_range_header = true;
|
||||
exception = std::current_exception();
|
||||
@ -529,10 +550,7 @@ namespace detail
|
||||
return true;
|
||||
}
|
||||
|
||||
off_t getPosition() override
|
||||
{
|
||||
return getOffset() - available();
|
||||
}
|
||||
off_t getPosition() override { return getOffset() - available(); }
|
||||
|
||||
off_t seek(off_t offset_, int whence) override
|
||||
{
|
||||
@ -540,12 +558,11 @@ namespace detail
|
||||
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
||||
|
||||
if (offset_ < 0)
|
||||
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
||||
throw Exception(
|
||||
"Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
||||
|
||||
off_t current_offset = getOffset();
|
||||
if (!working_buffer.empty()
|
||||
&& size_t(offset_) >= current_offset - working_buffer.size()
|
||||
&& offset_ < current_offset)
|
||||
if (!working_buffer.empty() && size_t(offset_) >= current_offset - working_buffer.size() && offset_ < current_offset)
|
||||
{
|
||||
pos = working_buffer.end() - (current_offset - offset_);
|
||||
assert(pos >= working_buffer.begin());
|
||||
@ -567,7 +584,6 @@ namespace detail
|
||||
|
||||
if (impl)
|
||||
{
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
|
||||
impl.reset();
|
||||
}
|
||||
@ -580,6 +596,8 @@ namespace detail
|
||||
return offset_;
|
||||
}
|
||||
|
||||
SeekableReadBuffer::Range getRemainingReadRange() const override { return {getOffset(), read_range.end}; }
|
||||
|
||||
std::string getResponseCookie(const std::string & name, const std::string & def) const
|
||||
{
|
||||
for (const auto & cookie : cookies)
|
||||
@ -599,10 +617,7 @@ namespace detail
|
||||
next_callback(count());
|
||||
}
|
||||
|
||||
const std::string & getCompressionMethod() const
|
||||
{
|
||||
return content_encoding;
|
||||
}
|
||||
const std::string & getCompressionMethod() const { return content_encoding; }
|
||||
};
|
||||
}
|
||||
|
||||
@ -611,19 +626,50 @@ class UpdatableSession : public UpdatableSessionBase<HTTPSessionPtr>
|
||||
using Parent = UpdatableSessionBase<HTTPSessionPtr>;
|
||||
|
||||
public:
|
||||
UpdatableSession(
|
||||
const Poco::URI uri,
|
||||
const ConnectionTimeouts & timeouts_,
|
||||
const UInt64 max_redirects_)
|
||||
UpdatableSession(const Poco::URI uri, const ConnectionTimeouts & timeouts_, const UInt64 max_redirects_)
|
||||
: Parent(uri, timeouts_, max_redirects_)
|
||||
{
|
||||
session = makeHTTPSession(initial_uri, timeouts);
|
||||
}
|
||||
|
||||
void buildNewSession(const Poco::URI & uri) override
|
||||
void buildNewSession(const Poco::URI & uri) override { session = makeHTTPSession(uri, timeouts); }
|
||||
};
|
||||
|
||||
class RangeGenerator
|
||||
{
|
||||
public:
|
||||
explicit RangeGenerator(size_t total_size_, size_t range_step_, size_t range_start = 0)
|
||||
: from(range_start), range_step(range_step_), total_size(total_size_)
|
||||
{
|
||||
session = makeHTTPSession(uri, timeouts);
|
||||
}
|
||||
|
||||
size_t totalRanges() const { return static_cast<size_t>(round(static_cast<float>(total_size - from) / range_step)); }
|
||||
|
||||
using Range = std::pair<size_t, size_t>;
|
||||
|
||||
// return upper exclusive range of values, i.e. [from_range, to_range>
|
||||
std::optional<Range> nextRange()
|
||||
{
|
||||
if (from >= total_size)
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto to = from + range_step;
|
||||
if (to >= total_size)
|
||||
{
|
||||
to = total_size;
|
||||
}
|
||||
|
||||
Range range{from, to};
|
||||
from = to;
|
||||
return std::move(range);
|
||||
}
|
||||
|
||||
private:
|
||||
size_t from;
|
||||
size_t range_step;
|
||||
size_t total_size;
|
||||
};
|
||||
|
||||
class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>
|
||||
@ -631,7 +677,7 @@ class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::
|
||||
using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>;
|
||||
|
||||
public:
|
||||
ReadWriteBufferFromHTTP(
|
||||
ReadWriteBufferFromHTTP(
|
||||
Poco::URI uri_,
|
||||
const std::string & method_,
|
||||
OutStreamCallback out_stream_callback_,
|
||||
@ -646,14 +692,117 @@ public:
|
||||
bool delay_initialization_ = true,
|
||||
bool use_external_buffer_ = false,
|
||||
bool skip_not_found_url_ = false)
|
||||
: Parent(std::make_shared<UpdatableSession>(uri_, timeouts, max_redirects),
|
||||
uri_, credentials_, method_, out_stream_callback_, buffer_size_,
|
||||
settings_, http_header_entries_, read_range_, remote_host_filter_,
|
||||
delay_initialization_, use_external_buffer_, skip_not_found_url_)
|
||||
: Parent(
|
||||
std::make_shared<UpdatableSession>(uri_, timeouts, max_redirects),
|
||||
uri_,
|
||||
credentials_,
|
||||
method_,
|
||||
out_stream_callback_,
|
||||
buffer_size_,
|
||||
settings_,
|
||||
http_header_entries_,
|
||||
read_range_,
|
||||
remote_host_filter_,
|
||||
delay_initialization_,
|
||||
use_external_buffer_,
|
||||
skip_not_found_url_)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
class RangedReadWriteBufferFromHTTPFactory : public ParallelReadBuffer::ReadBufferFactory
|
||||
{
|
||||
using OutStreamCallback = ReadWriteBufferFromHTTP::OutStreamCallback;
|
||||
|
||||
public:
|
||||
RangedReadWriteBufferFromHTTPFactory(
|
||||
size_t total_object_size_,
|
||||
size_t range_step_,
|
||||
Poco::URI uri_,
|
||||
std::string method_,
|
||||
OutStreamCallback out_stream_callback_,
|
||||
ConnectionTimeouts timeouts_,
|
||||
const Poco::Net::HTTPBasicCredentials & credentials_,
|
||||
UInt64 max_redirects_ = 0,
|
||||
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
ReadSettings settings_ = {},
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries http_header_entries_ = {},
|
||||
const RemoteHostFilter * remote_host_filter_ = nullptr,
|
||||
bool delay_initialization_ = true,
|
||||
bool use_external_buffer_ = false,
|
||||
bool skip_not_found_url_ = false)
|
||||
: range_generator(total_object_size_, range_step_)
|
||||
, total_object_size(total_object_size_)
|
||||
, range_step(range_step_)
|
||||
, uri(uri_)
|
||||
, method(std::move(method_))
|
||||
, out_stream_callback(out_stream_callback_)
|
||||
, timeouts(std::move(timeouts_))
|
||||
, credentials(credentials_)
|
||||
, max_redirects(max_redirects_)
|
||||
, buffer_size(buffer_size_)
|
||||
, settings(std::move(settings_))
|
||||
, http_header_entries(std::move(http_header_entries_))
|
||||
, remote_host_filter(remote_host_filter_)
|
||||
, delay_initialization(delay_initialization_)
|
||||
, use_external_buffer(use_external_buffer_)
|
||||
, skip_not_found_url(skip_not_found_url_)
|
||||
{
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr getReader() override
|
||||
{
|
||||
const auto next_range = range_generator.nextRange();
|
||||
if (!next_range)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
return std::make_shared<ReadWriteBufferFromHTTP>(
|
||||
uri,
|
||||
method,
|
||||
out_stream_callback,
|
||||
timeouts,
|
||||
credentials,
|
||||
max_redirects,
|
||||
buffer_size,
|
||||
settings,
|
||||
http_header_entries,
|
||||
// HTTP Range has inclusive bounds, i.e. [from, to]
|
||||
ReadWriteBufferFromHTTP::Range{next_range->first, next_range->second - 1},
|
||||
remote_host_filter,
|
||||
delay_initialization,
|
||||
use_external_buffer,
|
||||
skip_not_found_url);
|
||||
}
|
||||
|
||||
off_t seek(off_t off, [[maybe_unused]] int whence) override
|
||||
{
|
||||
range_generator = RangeGenerator{total_object_size, range_step, static_cast<size_t>(off)};
|
||||
return off;
|
||||
}
|
||||
|
||||
std::optional<size_t> getTotalSize() override { return total_object_size; }
|
||||
|
||||
private:
|
||||
RangeGenerator range_generator;
|
||||
size_t total_object_size;
|
||||
size_t range_step;
|
||||
Poco::URI uri;
|
||||
std::string method;
|
||||
OutStreamCallback out_stream_callback;
|
||||
ConnectionTimeouts timeouts;
|
||||
const Poco::Net::HTTPBasicCredentials & credentials;
|
||||
UInt64 max_redirects;
|
||||
size_t buffer_size;
|
||||
ReadSettings settings;
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries http_header_entries;
|
||||
const RemoteHostFilter * remote_host_filter;
|
||||
bool delay_initialization;
|
||||
bool use_external_buffer;
|
||||
bool skip_not_found_url;
|
||||
};
|
||||
|
||||
class UpdatablePooledSession : public UpdatableSessionBase<PooledHTTPSessionPtr>
|
||||
{
|
||||
using Parent = UpdatableSessionBase<PooledHTTPSessionPtr>;
|
||||
@ -662,20 +811,14 @@ private:
|
||||
size_t per_endpoint_pool_size;
|
||||
|
||||
public:
|
||||
explicit UpdatablePooledSession(const Poco::URI uri,
|
||||
const ConnectionTimeouts & timeouts_,
|
||||
const UInt64 max_redirects_,
|
||||
size_t per_endpoint_pool_size_)
|
||||
: Parent(uri, timeouts_, max_redirects_)
|
||||
, per_endpoint_pool_size { per_endpoint_pool_size_ }
|
||||
explicit UpdatablePooledSession(
|
||||
const Poco::URI uri, const ConnectionTimeouts & timeouts_, const UInt64 max_redirects_, size_t per_endpoint_pool_size_)
|
||||
: Parent(uri, timeouts_, max_redirects_), per_endpoint_pool_size{per_endpoint_pool_size_}
|
||||
{
|
||||
session = makePooledHTTPSession(initial_uri, timeouts, per_endpoint_pool_size);
|
||||
}
|
||||
|
||||
void buildNewSession(const Poco::URI & uri) override
|
||||
{
|
||||
session = makePooledHTTPSession(uri, timeouts, per_endpoint_pool_size);
|
||||
}
|
||||
void buildNewSession(const Poco::URI & uri) override { session = makePooledHTTPSession(uri, timeouts, per_endpoint_pool_size); }
|
||||
};
|
||||
|
||||
class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatablePooledSession>>
|
||||
@ -683,7 +826,8 @@ class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase
|
||||
using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatablePooledSession>>;
|
||||
|
||||
public:
|
||||
explicit PooledReadWriteBufferFromHTTP(Poco::URI uri_,
|
||||
explicit PooledReadWriteBufferFromHTTP(
|
||||
Poco::URI uri_,
|
||||
const std::string & method_ = {},
|
||||
OutStreamCallback out_stream_callback_ = {},
|
||||
const ConnectionTimeouts & timeouts_ = {},
|
||||
@ -691,12 +835,13 @@ public:
|
||||
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
const UInt64 max_redirects = 0,
|
||||
size_t max_connections_per_endpoint = DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT)
|
||||
: Parent(std::make_shared<UpdatablePooledSession>(uri_, timeouts_, max_redirects, max_connections_per_endpoint),
|
||||
uri_,
|
||||
credentials_,
|
||||
method_,
|
||||
out_stream_callback_,
|
||||
buffer_size_)
|
||||
: Parent(
|
||||
std::make_shared<UpdatablePooledSession>(uri_, timeouts_, max_redirects, max_connections_per_endpoint),
|
||||
uri_,
|
||||
credentials_,
|
||||
method_,
|
||||
out_stream_callback_,
|
||||
buffer_size_)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
@ -359,6 +359,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
|
||||
table_lock.reset();
|
||||
table_id = StorageID::createEmpty();
|
||||
metadata_snapshot = nullptr;
|
||||
storage_snapshot = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,30 +3,35 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTInsertQuery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <IO/ConnectionTimeoutsContext.h>
|
||||
#include <IO/IOThreadPool.h>
|
||||
#include <IO/ParallelReadBuffer.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromHTTP.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <IO/ConnectionTimeoutsContext.h>
|
||||
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Formats/ReadSchemaUtils.h>
|
||||
#include <Processors/Formats/IInputFormat.h>
|
||||
#include <Processors/Formats/IOutputFormat.h>
|
||||
|
||||
#include <Common/parseRemoteDescription.h>
|
||||
#include <Processors/Transforms/AddingDefaultsTransform.h>
|
||||
#include <Storages/PartitionedSink.h>
|
||||
#include "Common/ThreadStatus.h"
|
||||
#include <Common/parseRemoteDescription.h>
|
||||
#include "IO/HTTPCommon.h"
|
||||
#include "IO/ReadWriteBufferFromHTTP.h"
|
||||
|
||||
#include <Poco/Net/HTTPRequest.h>
|
||||
#include <algorithm>
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <Processors/Sources/SourceWithProgress.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <base/logger_useful.h>
|
||||
#include <algorithm>
|
||||
#include <Poco/Net/HTTPRequest.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -43,8 +48,7 @@ namespace ErrorCodes
|
||||
|
||||
static bool urlWithGlobs(const String & uri)
|
||||
{
|
||||
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos)
|
||||
|| uri.find('|') != std::string::npos;
|
||||
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) || uri.find('|') != std::string::npos;
|
||||
}
|
||||
|
||||
|
||||
@ -88,8 +92,7 @@ IStorageURLBase::IStorageURLBase(
|
||||
|
||||
namespace
|
||||
{
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders(
|
||||
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders(const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
|
||||
{
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers(headers_.begin(), headers_.end());
|
||||
// Propagate OpenTelemetry trace context, if any, downstream.
|
||||
@ -98,13 +101,11 @@ namespace
|
||||
const auto & thread_trace_context = CurrentThread::get().thread_trace_context;
|
||||
if (thread_trace_context.trace_id != UUID())
|
||||
{
|
||||
headers.emplace_back("traceparent",
|
||||
thread_trace_context.composeTraceparentHeader());
|
||||
headers.emplace_back("traceparent", thread_trace_context.composeTraceparentHeader());
|
||||
|
||||
if (!thread_trace_context.tracestate.empty())
|
||||
{
|
||||
headers.emplace_back("tracestate",
|
||||
thread_trace_context.tracestate);
|
||||
headers.emplace_back("tracestate", thread_trace_context.tracestate);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -114,8 +115,7 @@ namespace
|
||||
|
||||
class StorageURLSource : public SourceWithProgress
|
||||
{
|
||||
|
||||
using URIParams = std::vector<std::pair<String, String>>;
|
||||
using URIParams = std::vector<std::pair<String, String>>;
|
||||
|
||||
public:
|
||||
struct URIInfo
|
||||
@ -160,11 +160,11 @@ namespace
|
||||
UInt64 max_block_size,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
const String & compression_method,
|
||||
size_t download_threads,
|
||||
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
|
||||
const URIParams & params = {},
|
||||
bool glob_url = false)
|
||||
: SourceWithProgress(sample_block), name(std::move(name_))
|
||||
, uri_info(uri_info_)
|
||||
: SourceWithProgress(sample_block), name(std::move(name_)), uri_info(uri_info_)
|
||||
{
|
||||
auto headers = getHeaders(headers_);
|
||||
|
||||
@ -176,33 +176,40 @@ namespace
|
||||
|
||||
auto first_option = uri_options.begin();
|
||||
read_buf = getFirstAvailableURLReadBuffer(
|
||||
first_option, uri_options.end(), context, params, http_method,
|
||||
callback, timeouts, compression_method, credentials, headers, glob_url, uri_options.size() == 1);
|
||||
first_option,
|
||||
uri_options.end(),
|
||||
context,
|
||||
params,
|
||||
http_method,
|
||||
callback,
|
||||
timeouts,
|
||||
compression_method,
|
||||
credentials,
|
||||
headers,
|
||||
glob_url,
|
||||
uri_options.size() == 1,
|
||||
download_threads);
|
||||
|
||||
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
|
||||
auto input_format
|
||||
= FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
|
||||
QueryPipelineBuilder builder;
|
||||
builder.init(Pipe(input_format));
|
||||
|
||||
builder.addSimpleTransform([&](const Block & cur_header)
|
||||
{
|
||||
return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context);
|
||||
});
|
||||
builder.addSimpleTransform(
|
||||
[&](const Block & cur_header)
|
||||
{ return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context); });
|
||||
|
||||
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
|
||||
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
|
||||
};
|
||||
}
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
return name;
|
||||
}
|
||||
String getName() const override { return name; }
|
||||
|
||||
Chunk generate() override
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
|
||||
if (!reader)
|
||||
{
|
||||
auto current_uri_pos = uri_info->next_uri_to_read.fetch_add(1);
|
||||
@ -239,7 +246,8 @@ namespace
|
||||
Poco::Net::HTTPBasicCredentials & credentials,
|
||||
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
|
||||
bool glob_url,
|
||||
bool delay_initialization)
|
||||
bool delay_initialization,
|
||||
size_t download_threads)
|
||||
{
|
||||
String first_exception_message;
|
||||
ReadSettings read_settings = context->getReadSettings();
|
||||
@ -255,8 +263,137 @@ namespace
|
||||
|
||||
setCredentials(credentials, request_uri);
|
||||
|
||||
const auto settings = context->getSettings();
|
||||
try
|
||||
{
|
||||
if (download_threads > 1)
|
||||
{
|
||||
try
|
||||
{
|
||||
ReadWriteBufferFromHTTP buffer(
|
||||
request_uri,
|
||||
Poco::Net::HTTPRequest::HTTP_HEAD,
|
||||
callback,
|
||||
timeouts,
|
||||
credentials,
|
||||
settings.max_http_get_redirects,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
read_settings,
|
||||
headers,
|
||||
ReadWriteBufferFromHTTP::Range{0, std::nullopt},
|
||||
&context->getRemoteHostFilter(),
|
||||
true,
|
||||
/* use_external_buffer */ false,
|
||||
/* skip_url_not_found_error */ skip_url_not_found_error);
|
||||
|
||||
Poco::Net::HTTPResponse res;
|
||||
|
||||
for (size_t i = 0; i < settings.http_max_tries; ++i)
|
||||
{
|
||||
try
|
||||
{
|
||||
buffer.callWithRedirects(res, Poco::Net::HTTPRequest::HTTP_HEAD, true);
|
||||
break;
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
LOG_TRACE(
|
||||
&Poco::Logger::get("StorageURLSource"),
|
||||
"HTTP HEAD request to `{}` failed at try {}/{}. "
|
||||
"Error: {}.",
|
||||
request_uri.toString(),
|
||||
i + 1,
|
||||
settings.http_max_tries,
|
||||
e.displayText());
|
||||
if (!ReadWriteBufferFromHTTP::isRetriableError(res.getStatus()))
|
||||
{
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// to check if Range header is supported, we need to send a request with it set
|
||||
const bool supports_ranges = (res.has("Accept-Ranges") && res.get("Accept-Ranges") == "bytes")
|
||||
|| (res.has("Content-Range") && res.get("Content-Range").starts_with("bytes"));
|
||||
LOG_TRACE(
|
||||
&Poco::Logger::get("StorageURLSource"),
|
||||
fmt::runtime(supports_ranges ? "HTTP Range is supported" : "HTTP Range is not supported"));
|
||||
|
||||
|
||||
if (supports_ranges && res.getStatus() == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT
|
||||
&& res.hasContentLength())
|
||||
{
|
||||
LOG_TRACE(
|
||||
&Poco::Logger::get("StorageURLSource"),
|
||||
"Using ParallelReadBuffer with {} workers with chunks of {} bytes",
|
||||
download_threads,
|
||||
settings.max_download_buffer_size);
|
||||
|
||||
auto read_buffer_factory = std::make_unique<RangedReadWriteBufferFromHTTPFactory>(
|
||||
res.getContentLength(),
|
||||
settings.max_download_buffer_size,
|
||||
request_uri,
|
||||
http_method,
|
||||
callback,
|
||||
timeouts,
|
||||
credentials,
|
||||
settings.max_http_get_redirects,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
read_settings,
|
||||
headers,
|
||||
&context->getRemoteHostFilter(),
|
||||
delay_initialization,
|
||||
/* use_external_buffer */ false,
|
||||
/* skip_url_not_found_error */ skip_url_not_found_error);
|
||||
|
||||
ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()
|
||||
? CurrentThread::get().getThreadGroup()
|
||||
: MainThreadStatus::getInstance().getThreadGroup();
|
||||
|
||||
ContextPtr query_context
|
||||
= CurrentThread::isInitialized() ? CurrentThread::get().getQueryContext() : nullptr;
|
||||
|
||||
auto worker_cleanup = [has_running_group = running_group == nullptr](ThreadStatus & thread_status)
|
||||
{
|
||||
if (has_running_group)
|
||||
thread_status.detachQuery(false);
|
||||
};
|
||||
|
||||
auto worker_setup = [query_context = std::move(query_context),
|
||||
running_group = std::move(running_group)](ThreadStatus & thread_status)
|
||||
{
|
||||
/// Save query context if any, because cache implementation needs it.
|
||||
if (query_context)
|
||||
thread_status.attachQueryContext(query_context);
|
||||
|
||||
/// To be able to pass ProfileEvents.
|
||||
if (running_group)
|
||||
thread_status.attachQuery(running_group);
|
||||
};
|
||||
|
||||
|
||||
return wrapReadBufferWithCompressionMethod(
|
||||
std::make_unique<ParallelReadBuffer>(
|
||||
std::move(read_buffer_factory),
|
||||
&IOThreadPool::get(),
|
||||
download_threads,
|
||||
std::move(worker_setup),
|
||||
std::move(worker_cleanup)),
|
||||
chooseCompressionMethod(request_uri.getPath(), compression_method));
|
||||
}
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
LOG_TRACE(
|
||||
&Poco::Logger::get("StorageURLSource"),
|
||||
"Failed to setup ParallelReadBuffer because of an exception:\n{}.\nFalling back to the single-threaded "
|
||||
"buffer",
|
||||
e.displayText());
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "Using single-threaded read buffer");
|
||||
|
||||
return wrapReadBufferWithCompressionMethod(
|
||||
std::make_unique<ReadWriteBufferFromHTTP>(
|
||||
request_uri,
|
||||
@ -264,15 +401,15 @@ namespace
|
||||
callback,
|
||||
timeouts,
|
||||
credentials,
|
||||
context->getSettingsRef().max_http_get_redirects,
|
||||
settings.max_http_get_redirects,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
read_settings,
|
||||
headers,
|
||||
ReadWriteBufferFromHTTP::Range{},
|
||||
&context->getRemoteHostFilter(),
|
||||
delay_initialization,
|
||||
/* use_external_buffer */false,
|
||||
/* skip_url_not_found_error */skip_url_not_found_error),
|
||||
/* use_external_buffer */ false,
|
||||
/* skip_url_not_found_error */ skip_url_not_found_error),
|
||||
chooseCompressionMethod(request_uri.getPath(), compression_method));
|
||||
}
|
||||
catch (...)
|
||||
@ -323,10 +460,10 @@ StorageURLSink::StorageURLSink(
|
||||
std::string content_encoding = toContentEncodingName(compression_method);
|
||||
|
||||
write_buf = wrapWriteBufferWithCompressionMethod(
|
||||
std::make_unique<WriteBufferFromHTTP>(Poco::URI(uri), http_method, content_type, content_encoding, timeouts),
|
||||
compression_method, 3);
|
||||
writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block,
|
||||
context, {} /* write callback */, format_settings);
|
||||
std::make_unique<WriteBufferFromHTTP>(Poco::URI(uri), http_method, content_type, content_encoding, timeouts),
|
||||
compression_method,
|
||||
3);
|
||||
writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block, context, {} /* write callback */, format_settings);
|
||||
}
|
||||
|
||||
|
||||
@ -355,15 +492,15 @@ public:
|
||||
const ConnectionTimeouts & timeouts_,
|
||||
const CompressionMethod compression_method_,
|
||||
const String & http_method_)
|
||||
: PartitionedSink(partition_by, context_, sample_block_)
|
||||
, uri(uri_)
|
||||
, format(format_)
|
||||
, format_settings(format_settings_)
|
||||
, sample_block(sample_block_)
|
||||
, context(context_)
|
||||
, timeouts(timeouts_)
|
||||
, compression_method(compression_method_)
|
||||
, http_method(http_method_)
|
||||
: PartitionedSink(partition_by, context_, sample_block_)
|
||||
, uri(uri_)
|
||||
, format(format_)
|
||||
, format_settings(format_settings_)
|
||||
, sample_block(sample_block_)
|
||||
, context(context_)
|
||||
, timeouts(timeouts_)
|
||||
, compression_method(compression_method_)
|
||||
, http_method(http_method_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -371,8 +508,8 @@ public:
|
||||
{
|
||||
auto partition_path = PartitionedSink::replaceWildcards(uri, partition_id);
|
||||
context->getRemoteHostFilter().checkURL(Poco::URI(partition_path));
|
||||
return std::make_shared<StorageURLSink>(partition_path, format,
|
||||
format_settings, sample_block, context, timeouts, compression_method, http_method);
|
||||
return std::make_shared<StorageURLSink>(
|
||||
partition_path, format, format_settings, sample_block, context, timeouts, compression_method, http_method);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -462,7 +599,8 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
|
||||
credentials,
|
||||
headers,
|
||||
false,
|
||||
false);
|
||||
false,
|
||||
context->getSettingsRef().max_download_threads);
|
||||
};
|
||||
|
||||
try
|
||||
@ -479,7 +617,10 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
|
||||
|
||||
} while (++option < urls_to_check.end());
|
||||
|
||||
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from urls failed. Errors:\n{}", exception_messages);
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
|
||||
"All attempts to extract table structure from urls failed. Errors:\n{}",
|
||||
exception_messages);
|
||||
}
|
||||
|
||||
bool IStorageURLBase::isColumnOriented() const
|
||||
@ -512,6 +653,8 @@ Pipe IStorageURLBase::read(
|
||||
block_for_format = storage_snapshot->metadata->getSampleBlock();
|
||||
}
|
||||
|
||||
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
|
||||
|
||||
if (urlWithGlobs(uri))
|
||||
{
|
||||
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
|
||||
@ -528,14 +671,13 @@ Pipe IStorageURLBase::read(
|
||||
Pipes pipes;
|
||||
pipes.reserve(num_streams);
|
||||
|
||||
size_t download_threads = num_streams >= max_download_threads ? 1 : (max_download_threads / num_streams);
|
||||
for (size_t i = 0; i < num_streams; ++i)
|
||||
{
|
||||
pipes.emplace_back(std::make_shared<StorageURLSource>(
|
||||
uri_info,
|
||||
getReadMethod(),
|
||||
getReadPOSTDataCallback(
|
||||
column_names, columns_description, query_info,
|
||||
local_context, processed_stage, max_block_size),
|
||||
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
|
||||
format_name,
|
||||
format_settings,
|
||||
getName(),
|
||||
@ -544,7 +686,11 @@ Pipe IStorageURLBase::read(
|
||||
columns_description,
|
||||
max_block_size,
|
||||
ConnectionTimeouts::getHTTPTimeouts(local_context),
|
||||
compression_method, headers, params, /* glob_url */true));
|
||||
compression_method,
|
||||
download_threads,
|
||||
headers,
|
||||
params,
|
||||
/* glob_url */ true));
|
||||
}
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
}
|
||||
@ -555,9 +701,7 @@ Pipe IStorageURLBase::read(
|
||||
return Pipe(std::make_shared<StorageURLSource>(
|
||||
uri_info,
|
||||
getReadMethod(),
|
||||
getReadPOSTDataCallback(
|
||||
column_names, columns_description, query_info,
|
||||
local_context, processed_stage, max_block_size),
|
||||
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
|
||||
format_name,
|
||||
format_settings,
|
||||
getName(),
|
||||
@ -566,7 +710,10 @@ Pipe IStorageURLBase::read(
|
||||
columns_description,
|
||||
max_block_size,
|
||||
ConnectionTimeouts::getHTTPTimeouts(local_context),
|
||||
compression_method, headers, params));
|
||||
compression_method,
|
||||
max_download_threads,
|
||||
headers,
|
||||
params));
|
||||
}
|
||||
}
|
||||
|
||||
@ -598,12 +745,10 @@ Pipe StorageURLWithFailover::read(
|
||||
|
||||
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
|
||||
uri_info->uri_list_to_read.emplace_back(uri_options);
|
||||
auto pipe = Pipe(std::make_shared<StorageURLSource>(
|
||||
auto pipe = Pipe(std::make_shared<StorageURLSource>(
|
||||
uri_info,
|
||||
getReadMethod(),
|
||||
getReadPOSTDataCallback(
|
||||
column_names, columns_description, query_info,
|
||||
local_context, processed_stage, max_block_size),
|
||||
getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
|
||||
format_name,
|
||||
format_settings,
|
||||
getName(),
|
||||
@ -612,7 +757,10 @@ Pipe StorageURLWithFailover::read(
|
||||
columns_description,
|
||||
max_block_size,
|
||||
ConnectionTimeouts::getHTTPTimeouts(local_context),
|
||||
compression_method, headers, params));
|
||||
compression_method,
|
||||
local_context->getSettingsRef().max_download_threads,
|
||||
headers,
|
||||
params));
|
||||
std::shuffle(uri_options.begin(), uri_options.end(), thread_local_rng);
|
||||
return pipe;
|
||||
}
|
||||
@ -632,17 +780,26 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
|
||||
{
|
||||
return std::make_shared<PartitionedStorageURLSink>(
|
||||
partition_by_ast,
|
||||
uri, format_name,
|
||||
format_settings, metadata_snapshot->getSampleBlock(), context,
|
||||
uri,
|
||||
format_name,
|
||||
format_settings,
|
||||
metadata_snapshot->getSampleBlock(),
|
||||
context,
|
||||
ConnectionTimeouts::getHTTPTimeouts(context),
|
||||
chooseCompressionMethod(uri, compression_method), http_method);
|
||||
chooseCompressionMethod(uri, compression_method),
|
||||
http_method);
|
||||
}
|
||||
else
|
||||
{
|
||||
return std::make_shared<StorageURLSink>(uri, format_name,
|
||||
format_settings, metadata_snapshot->getSampleBlock(), context,
|
||||
return std::make_shared<StorageURLSink>(
|
||||
uri,
|
||||
format_name,
|
||||
format_settings,
|
||||
metadata_snapshot->getSampleBlock(),
|
||||
context,
|
||||
ConnectionTimeouts::getHTTPTimeouts(context),
|
||||
chooseCompressionMethod(uri, compression_method), http_method);
|
||||
chooseCompressionMethod(uri, compression_method),
|
||||
http_method);
|
||||
}
|
||||
}
|
||||
|
||||
@ -659,8 +816,19 @@ StorageURL::StorageURL(
|
||||
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_,
|
||||
const String & http_method_,
|
||||
ASTPtr partition_by_)
|
||||
: IStorageURLBase(uri_, context_, table_id_, format_name_, format_settings_,
|
||||
columns_, constraints_, comment, compression_method_, headers_, http_method_, partition_by_)
|
||||
: IStorageURLBase(
|
||||
uri_,
|
||||
context_,
|
||||
table_id_,
|
||||
format_name_,
|
||||
format_settings_,
|
||||
columns_,
|
||||
constraints_,
|
||||
comment,
|
||||
compression_method_,
|
||||
headers_,
|
||||
http_method_,
|
||||
partition_by_)
|
||||
{
|
||||
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
|
||||
}
|
||||
@ -711,8 +879,7 @@ FormatSettings StorageURL::getFormatSettingsFromArgs(const StorageFactory::Argum
|
||||
// Apply changes from SETTINGS clause, with validation.
|
||||
user_format_settings.applyChanges(args.storage_def->settings->changes);
|
||||
|
||||
format_settings = getFormatSettings(args.getContext(),
|
||||
user_format_settings);
|
||||
format_settings = getFormatSettings(args.getContext(), user_format_settings);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -731,12 +898,12 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
|
||||
auto [common_configuration, storage_specific_args] = named_collection.value();
|
||||
configuration.set(common_configuration);
|
||||
|
||||
if (!configuration.http_method.empty()
|
||||
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
|
||||
if (!configuration.http_method.empty() && configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
|
||||
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_PUT)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Http method can be POST or PUT (current: {}). For insert default is POST, for select GET",
|
||||
configuration.http_method);
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Http method can be POST or PUT (current: {}). For insert default is POST, for select GET",
|
||||
configuration.http_method);
|
||||
|
||||
if (!storage_specific_args.empty())
|
||||
{
|
||||
@ -754,7 +921,8 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
|
||||
{
|
||||
if (args.empty() || args.size() > 3)
|
||||
throw Exception(
|
||||
"Storage URL requires 1, 2 or 3 arguments: url, name of used format (taken from file extension by default) and optional compression method.",
|
||||
"Storage URL requires 1, 2 or 3 arguments: url, name of used format (taken from file extension by default) and optional "
|
||||
"compression method.",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
for (auto & arg : args)
|
||||
@ -776,43 +944,45 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
|
||||
|
||||
void registerStorageURL(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage("URL", [](const StorageFactory::Arguments & args)
|
||||
{
|
||||
ASTs & engine_args = args.engine_args;
|
||||
auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext());
|
||||
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
|
||||
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
|
||||
for (const auto & [header, value] : configuration.headers)
|
||||
factory.registerStorage(
|
||||
"URL",
|
||||
[](const StorageFactory::Arguments & args)
|
||||
{
|
||||
auto value_literal = value.safeGet<String>();
|
||||
if (header == "Range")
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
|
||||
headers.emplace_back(std::make_pair(header, value_literal));
|
||||
}
|
||||
ASTs & engine_args = args.engine_args;
|
||||
auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext());
|
||||
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
|
||||
|
||||
ASTPtr partition_by;
|
||||
if (args.storage_def->partition_by)
|
||||
partition_by = args.storage_def->partition_by->clone();
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
|
||||
for (const auto & [header, value] : configuration.headers)
|
||||
{
|
||||
auto value_literal = value.safeGet<String>();
|
||||
if (header == "Range")
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
|
||||
headers.emplace_back(std::make_pair(header, value_literal));
|
||||
}
|
||||
|
||||
return StorageURL::create(
|
||||
configuration.url,
|
||||
args.table_id,
|
||||
configuration.format,
|
||||
format_settings,
|
||||
args.columns,
|
||||
args.constraints,
|
||||
args.comment,
|
||||
args.getContext(),
|
||||
configuration.compression_method,
|
||||
headers,
|
||||
configuration.http_method,
|
||||
partition_by);
|
||||
},
|
||||
{
|
||||
.supports_settings = true,
|
||||
.supports_schema_inference = true,
|
||||
.source_access_type = AccessType::URL,
|
||||
});
|
||||
ASTPtr partition_by;
|
||||
if (args.storage_def->partition_by)
|
||||
partition_by = args.storage_def->partition_by->clone();
|
||||
|
||||
return StorageURL::create(
|
||||
configuration.url,
|
||||
args.table_id,
|
||||
configuration.format,
|
||||
format_settings,
|
||||
args.columns,
|
||||
args.constraints,
|
||||
args.comment,
|
||||
args.getContext(),
|
||||
configuration.compression_method,
|
||||
headers,
|
||||
configuration.http_method,
|
||||
partition_by);
|
||||
},
|
||||
{
|
||||
.supports_settings = true,
|
||||
.supports_schema_inference = true,
|
||||
.source_access_type = AccessType::URL,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -200,10 +200,8 @@ if __name__ == "__main__":
|
||||
head = requests.head(build_url)
|
||||
counter += 1
|
||||
if counter >= 180:
|
||||
post_commit_status(
|
||||
gh, pr_info.sha, CHECK_NAME, "Cannot fetch build to run", "error", ""
|
||||
)
|
||||
raise Exception("Cannot fetch build")
|
||||
logging.warning("Cannot fetch build in 30 minutes, exiting")
|
||||
sys.exit(0)
|
||||
|
||||
with SSHKey(key_value=get_parameter_from_ssm("jepsen_ssh_key") + "\n"):
|
||||
ssh_auth_sock = os.environ["SSH_AUTH_SOCK"]
|
||||
|
@ -162,4 +162,4 @@ def test_url_reconnect(started_cluster):
|
||||
thread.join()
|
||||
|
||||
assert (int(result), 6581218782194912115)
|
||||
assert node1.contains_in_log("Error: Timeout: connect timed out")
|
||||
assert node1.contains_in_log("Timeout: connect timed out")
|
||||
|
@ -120,18 +120,14 @@ class CSVHTTPServer(BaseHTTPRequestHandler):
|
||||
class HTTPServerV6(HTTPServer):
|
||||
address_family = socket.AF_INET6
|
||||
|
||||
def start_server(requests_amount):
|
||||
def start_server():
|
||||
if IS_IPV6:
|
||||
httpd = HTTPServerV6(HTTP_SERVER_ADDRESS, CSVHTTPServer)
|
||||
else:
|
||||
httpd = HTTPServer(HTTP_SERVER_ADDRESS, CSVHTTPServer)
|
||||
|
||||
def real_func():
|
||||
for i in range(requests_amount):
|
||||
httpd.handle_request()
|
||||
|
||||
t = threading.Thread(target=real_func)
|
||||
return t
|
||||
t = threading.Thread(target=httpd.serve_forever)
|
||||
return t, httpd
|
||||
|
||||
# test section
|
||||
|
||||
@ -201,7 +197,7 @@ def main():
|
||||
'select double, count(*) from {tbl} group by double': "7.7\t2\n9.9\t10"
|
||||
}
|
||||
|
||||
t = start_server(len(select_only_requests) * 2 + (len(insert_requests) + len(select_requests)) * 2)
|
||||
t, httpd = start_server()
|
||||
t.start()
|
||||
# test table with url engine
|
||||
test_select(table_name="test_table_select", requests=list(select_only_requests.keys()), answers=list(select_only_requests.values()), test_data=test_data)
|
||||
@ -211,6 +207,8 @@ def main():
|
||||
test_insert(table_name="test_table_insert", requests_insert=insert_requests, requests_select=list(select_requests.keys()), answers=list(select_requests.values()))
|
||||
#test insert into table function url
|
||||
test_insert(requests_insert=insert_requests, requests_select=list(select_requests.keys()), answers=list(select_requests.values()))
|
||||
|
||||
httpd.shutdown()
|
||||
t.join()
|
||||
print("PASSED")
|
||||
|
||||
|
@ -11,9 +11,11 @@
|
||||
===native===
|
||||
{"query":"select * from url('http:\/\/127.0.0.2:8123\/?query=select%201%20format%20Null', CSV, 'a int')","status":"QueryFinish","tracestate":"another custom state","sorted_by_start_time":1}
|
||||
{"query":"select 1 format Null\n","status":"QueryFinish","tracestate":"another custom state","sorted_by_start_time":1}
|
||||
{"query":"select 1 format Null\n","status":"QueryFinish","tracestate":"another custom state","sorted_by_start_time":1}
|
||||
{"query":"select 1 format Null\n","query_status":"QueryFinish","tracestate":"another custom state","sorted_by_finish_time":1}
|
||||
{"query":"select 1 format Null\n","query_status":"QueryFinish","tracestate":"another custom state","sorted_by_finish_time":1}
|
||||
{"query":"select * from url('http:\/\/127.0.0.2:8123\/?query=select%201%20format%20Null', CSV, 'a int')","query_status":"QueryFinish","tracestate":"another custom state","sorted_by_finish_time":1}
|
||||
{"total spans":"2","unique spans":"2","unique non-zero parent spans":"2"}
|
||||
{"total spans":"3","unique spans":"3","unique non-zero parent spans":"2"}
|
||||
{"initial query spans with proper parent":"1"}
|
||||
{"unique non-empty tracestate values":"1"}
|
||||
===sampled===
|
||||
|
@ -121,18 +121,14 @@ class CSVHTTPServer(BaseHTTPRequestHandler):
|
||||
class HTTPServerV6(HTTPServer):
|
||||
address_family = socket.AF_INET6
|
||||
|
||||
def start_server(requests_amount):
|
||||
def start_server():
|
||||
if IS_IPV6:
|
||||
httpd = HTTPServerV6(HTTP_SERVER_ADDRESS, CSVHTTPServer)
|
||||
else:
|
||||
httpd = HTTPServer(HTTP_SERVER_ADDRESS, CSVHTTPServer)
|
||||
|
||||
def real_func():
|
||||
for i in range(requests_amount):
|
||||
httpd.handle_request()
|
||||
|
||||
t = threading.Thread(target=real_func)
|
||||
return t
|
||||
t = threading.Thread(target=httpd.serve_forever)
|
||||
return t, httpd
|
||||
|
||||
# test section
|
||||
|
||||
@ -217,9 +213,10 @@ def main():
|
||||
query : 'hello, world',
|
||||
}
|
||||
|
||||
t = start_server(len(list(select_requests_url_auth.keys())))
|
||||
t, httpd = start_server()
|
||||
t.start()
|
||||
test_select(requests=list(select_requests_url_auth.keys()), answers=list(select_requests_url_auth.values()), test_data=test_data)
|
||||
httpd.shutdown()
|
||||
t.join()
|
||||
print("PASSED")
|
||||
|
||||
|
@ -124,7 +124,8 @@ def test_select():
|
||||
check_answers(query, EXPECTED_ANSWER)
|
||||
|
||||
def main():
|
||||
t = start_server(1)
|
||||
# HEAD + GET
|
||||
t = start_server(2)
|
||||
t.start()
|
||||
test_select()
|
||||
t.join()
|
||||
|
262
tests/queries/0_stateless/02233_HTTP_ranged.python
Normal file
262
tests/queries/0_stateless/02233_HTTP_ranged.python
Normal file
@ -0,0 +1,262 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
import socket
|
||||
import sys
|
||||
import re
|
||||
import threading
|
||||
import os
|
||||
import traceback
|
||||
import urllib.request
|
||||
import subprocess
|
||||
|
||||
|
||||
def is_ipv6(host):
|
||||
try:
|
||||
socket.inet_aton(host)
|
||||
return False
|
||||
except:
|
||||
return True
|
||||
|
||||
|
||||
def get_local_port(host, ipv6):
|
||||
if ipv6:
|
||||
family = socket.AF_INET6
|
||||
else:
|
||||
family = socket.AF_INET
|
||||
|
||||
with socket.socket(family) as fd:
|
||||
fd.bind((host, 0))
|
||||
return fd.getsockname()[1]
|
||||
|
||||
|
||||
CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "localhost")
|
||||
CLICKHOUSE_PORT_HTTP = os.environ.get("CLICKHOUSE_PORT_HTTP", "8123")
|
||||
|
||||
# Server returns this JSON response.
|
||||
SERVER_JSON_RESPONSE = """{
|
||||
"login": "ClickHouse",
|
||||
"id": 54801242,
|
||||
"name": "ClickHouse",
|
||||
"company": null
|
||||
}"""
|
||||
|
||||
PAYLOAD_LEN = len(SERVER_JSON_RESPONSE)
|
||||
|
||||
EXPECTED_ANSWER = """{\\n\\t"login": "ClickHouse",\\n\\t"id": 54801242,\\n\\t"name": "ClickHouse",\\n\\t"company": null\\n}"""
|
||||
|
||||
#####################################################################################
|
||||
# This test starts an HTTP server and serves data to clickhouse url-engine based table.
|
||||
# The objective of this test is to check the ClickHouse server provides a User-Agent
|
||||
# with HTTP requests.
|
||||
# In order for it to work ip+port of http server (given below) should be
|
||||
# accessible from clickhouse server.
|
||||
#####################################################################################
|
||||
|
||||
# IP-address of this host accessible from the outside world. Get the first one
|
||||
HTTP_SERVER_HOST = (
|
||||
subprocess.check_output(["hostname", "-i"]).decode("utf-8").strip().split()[0]
|
||||
)
|
||||
IS_IPV6 = is_ipv6(HTTP_SERVER_HOST)
|
||||
HTTP_SERVER_PORT = get_local_port(HTTP_SERVER_HOST, IS_IPV6)
|
||||
|
||||
# IP address and port of the HTTP server started from this script.
|
||||
HTTP_SERVER_ADDRESS = (HTTP_SERVER_HOST, HTTP_SERVER_PORT)
|
||||
if IS_IPV6:
|
||||
HTTP_SERVER_URL_STR = (
|
||||
"http://"
|
||||
+ f"[{str(HTTP_SERVER_ADDRESS[0])}]:{str(HTTP_SERVER_ADDRESS[1])}"
|
||||
+ "/"
|
||||
)
|
||||
else:
|
||||
HTTP_SERVER_URL_STR = (
|
||||
"http://" + f"{str(HTTP_SERVER_ADDRESS[0])}:{str(HTTP_SERVER_ADDRESS[1])}" + "/"
|
||||
)
|
||||
|
||||
|
||||
def get_ch_answer(query):
|
||||
host = CLICKHOUSE_HOST
|
||||
if IS_IPV6:
|
||||
host = f"[{host}]"
|
||||
|
||||
url = os.environ.get(
|
||||
"CLICKHOUSE_URL",
|
||||
"http://{host}:{port}".format(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT_HTTP),
|
||||
)
|
||||
return urllib.request.urlopen(url, data=query.encode()).read().decode()
|
||||
|
||||
|
||||
def check_answers(query, answer):
|
||||
ch_answer = get_ch_answer(query)
|
||||
if ch_answer.strip() != answer.strip():
|
||||
print("FAIL on query:", query, file=sys.stderr)
|
||||
print("Expected answer:", answer, file=sys.stderr)
|
||||
print("Fetched answer :", ch_answer, file=sys.stderr)
|
||||
raise Exception("Fail on query")
|
||||
|
||||
|
||||
BYTE_RANGE_RE = re.compile(r"bytes=(\d+)-(\d+)?$")
|
||||
|
||||
|
||||
def parse_byte_range(byte_range):
|
||||
"""Returns the two numbers in 'bytes=123-456' or throws ValueError.
|
||||
The last number or both numbers may be None.
|
||||
"""
|
||||
if byte_range.strip() == "":
|
||||
return None, None
|
||||
|
||||
m = BYTE_RANGE_RE.match(byte_range)
|
||||
if not m:
|
||||
raise ValueError(f"Invalid byte range {byte_range}")
|
||||
|
||||
first, last = [x and int(x) for x in m.groups()]
|
||||
if last and last < first:
|
||||
raise ValueError(f"Invalid byte range {byte_range}")
|
||||
return first, last
|
||||
|
||||
|
||||
# Server with check for User-Agent headers.
|
||||
class HttpProcessor(BaseHTTPRequestHandler):
|
||||
allow_range = False
|
||||
range_used = False
|
||||
get_call_num = 0
|
||||
|
||||
def send_head(self):
|
||||
if self.headers["Range"] and HttpProcessor.allow_range:
|
||||
try:
|
||||
self.range = parse_byte_range(self.headers["Range"])
|
||||
except ValueError as e:
|
||||
self.send_error(400, "Invalid byte range")
|
||||
return None
|
||||
else:
|
||||
self.range = None
|
||||
|
||||
if self.range:
|
||||
first, last = self.range
|
||||
else:
|
||||
first, last = None, None
|
||||
|
||||
if first == None:
|
||||
first = 0
|
||||
|
||||
payload = SERVER_JSON_RESPONSE.encode()
|
||||
payload_len = len(payload)
|
||||
if first and first >= payload_len:
|
||||
self.send_error(416, "Requested Range Not Satisfiable")
|
||||
return None
|
||||
|
||||
self.send_response(206 if HttpProcessor.allow_range else 200)
|
||||
self.send_header("Content-type", "application/json")
|
||||
|
||||
if HttpProcessor.allow_range:
|
||||
self.send_header("Accept-Ranges", "bytes")
|
||||
|
||||
if last is None or last >= payload_len:
|
||||
last = payload_len - 1
|
||||
|
||||
response_length = last - first + 1
|
||||
|
||||
if first or last:
|
||||
self.send_header("Content-Range", f"bytes {first}-{last}/{payload_len}")
|
||||
self.send_header(
|
||||
"Content-Length",
|
||||
str(response_length) if HttpProcessor.allow_range else str(payload_len),
|
||||
)
|
||||
self.end_headers()
|
||||
return payload
|
||||
|
||||
def do_HEAD(self):
|
||||
self.send_head()
|
||||
|
||||
def do_GET(self):
|
||||
result = self.send_head()
|
||||
if result == None:
|
||||
return
|
||||
|
||||
HttpProcessor.get_call_num += 1
|
||||
|
||||
if not self.range:
|
||||
self.wfile.write(SERVER_JSON_RESPONSE.encode())
|
||||
return
|
||||
|
||||
HttpProcessor.range_used = True
|
||||
payload = SERVER_JSON_RESPONSE.encode()
|
||||
start, stop = self.range
|
||||
if stop == None:
|
||||
stop = len(payload) - 1
|
||||
if start == None:
|
||||
start = 0
|
||||
self.wfile.write(SERVER_JSON_RESPONSE.encode()[start : stop + 1])
|
||||
|
||||
def log_message(self, format, *args):
|
||||
return
|
||||
|
||||
|
||||
class HTTPServerV6(HTTPServer):
|
||||
address_family = socket.AF_INET6
|
||||
|
||||
|
||||
def start_server():
|
||||
if IS_IPV6:
|
||||
httpd = HTTPServerV6(HTTP_SERVER_ADDRESS, HttpProcessor)
|
||||
else:
|
||||
httpd = HTTPServer(HTTP_SERVER_ADDRESS, HttpProcessor)
|
||||
|
||||
t = threading.Thread(target=httpd.serve_forever)
|
||||
return t, httpd
|
||||
|
||||
|
||||
#####################################################################
|
||||
# Testing area.
|
||||
#####################################################################
|
||||
|
||||
|
||||
def test_select(download_buffer_size):
|
||||
global HTTP_SERVER_URL_STR
|
||||
query = f"SELECT * FROM url('{HTTP_SERVER_URL_STR}','JSONAsString') SETTINGS max_download_buffer_size={download_buffer_size};"
|
||||
check_answers(query, EXPECTED_ANSWER)
|
||||
|
||||
|
||||
def run_test(allow_range, download_buffer_size=20):
|
||||
HttpProcessor.range_used = False
|
||||
HttpProcessor.get_call_num = 0
|
||||
HttpProcessor.allow_range = allow_range
|
||||
|
||||
t, httpd = start_server()
|
||||
t.start()
|
||||
test_select(download_buffer_size)
|
||||
|
||||
expected_get_call_num = (PAYLOAD_LEN - 1) // download_buffer_size + 1
|
||||
if allow_range:
|
||||
if not HttpProcessor.range_used:
|
||||
raise Exception("HTTP Range was not used when supported")
|
||||
|
||||
if expected_get_call_num != HttpProcessor.get_call_num:
|
||||
raise Exception(
|
||||
f"Invalid amount of GET calls with Range. Expected {expected_get_call_num}, actual {HttpProcessor.get_call_num}"
|
||||
)
|
||||
else:
|
||||
if HttpProcessor.range_used:
|
||||
raise Exception("HTTP Range used while not supported")
|
||||
|
||||
httpd.shutdown()
|
||||
t.join()
|
||||
print("PASSED")
|
||||
|
||||
|
||||
def main():
|
||||
run_test(allow_range=False)
|
||||
run_test(allow_range=True, download_buffer_size=20)
|
||||
run_test(allow_range=True, download_buffer_size=10)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
main()
|
||||
except Exception as ex:
|
||||
exc_type, exc_value, exc_traceback = sys.exc_info()
|
||||
traceback.print_tb(exc_traceback, file=sys.stderr)
|
||||
print(ex, file=sys.stderr)
|
||||
sys.stderr.flush()
|
||||
|
||||
os._exit(1)
|
3
tests/queries/0_stateless/02233_HTTP_ranged.reference
Normal file
3
tests/queries/0_stateless/02233_HTTP_ranged.reference
Normal file
@ -0,0 +1,3 @@
|
||||
PASSED
|
||||
PASSED
|
||||
PASSED
|
8
tests/queries/0_stateless/02233_HTTP_ranged.sh
Executable file
8
tests/queries/0_stateless/02233_HTTP_ranged.sh
Executable file
@ -0,0 +1,8 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
python3 "$CURDIR"/02233_HTTP_ranged.python
|
||||
|
@ -0,0 +1 @@
|
||||
2
|
@ -0,0 +1,3 @@
|
||||
SET optimize_functions_to_subcolumns = 1;
|
||||
SELECT count(*) FROM numbers(2) AS n1, numbers(3) AS n2, numbers(4) AS n3
|
||||
WHERE (n1.number = n2.number) AND (n2.number = n3.number);
|
Loading…
Reference in New Issue
Block a user