mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge pull request #49732 from nickitat/impr_prefetch
Improve reading with prefetch
This commit is contained in:
commit
3d4800995f
42
src/Common/MemoryTrackerSwitcher.h
Normal file
42
src/Common/MemoryTrackerSwitcher.h
Normal file
@ -0,0 +1,42 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/MemoryTracker.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
struct MemoryTrackerSwitcher
|
||||
{
|
||||
explicit MemoryTrackerSwitcher(MemoryTracker * new_tracker)
|
||||
{
|
||||
if (!current_thread)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "current_thread is not initialized");
|
||||
|
||||
auto * thread_tracker = CurrentThread::getMemoryTracker();
|
||||
prev_untracked_memory = current_thread->untracked_memory;
|
||||
prev_memory_tracker_parent = thread_tracker->getParent();
|
||||
|
||||
current_thread->untracked_memory = 0;
|
||||
thread_tracker->setParent(new_tracker);
|
||||
}
|
||||
|
||||
~MemoryTrackerSwitcher()
|
||||
{
|
||||
CurrentThread::flushUntrackedMemory();
|
||||
auto * thread_tracker = CurrentThread::getMemoryTracker();
|
||||
|
||||
current_thread->untracked_memory = prev_untracked_memory;
|
||||
thread_tracker->setParent(prev_memory_tracker_parent);
|
||||
}
|
||||
|
||||
MemoryTracker * prev_memory_tracker_parent = nullptr;
|
||||
Int64 prev_untracked_memory = 0;
|
||||
};
|
||||
|
||||
}
|
@ -1,9 +1,11 @@
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <Poco/Timespan.h>
|
||||
#include <mutex>
|
||||
#include <type_traits>
|
||||
#include <variant>
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <Poco/Timespan.h>
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/Exception.h>
|
||||
@ -15,14 +17,6 @@ namespace ProfileEvents
|
||||
extern const Event ConnectionPoolIsFullMicroseconds;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
/** A class from which you can inherit and get a pool of something. Used for database connection pools.
|
||||
* Descendant class must provide a method for creating a new object to place in the pool.
|
||||
*/
|
||||
@ -35,6 +29,22 @@ public:
|
||||
using ObjectPtr = std::shared_ptr<Object>;
|
||||
using Ptr = std::shared_ptr<PoolBase<TObject>>;
|
||||
|
||||
enum class BehaviourOnLimit
|
||||
{
|
||||
/**
|
||||
* Default behaviour - when limit on pool size is reached, callers will wait until object will be returned back in pool.
|
||||
*/
|
||||
Wait,
|
||||
|
||||
/**
|
||||
* If no free objects in pool - allocate a new object, but not store it in pool.
|
||||
* This behaviour is needed when we simply don't want to waste time waiting or if we cannot guarantee that query could be processed using fixed amount of connections.
|
||||
* For example, when we read from table on s3, one GetObject request corresponds to the whole FileSystemCache segment. This segments are shared between different
|
||||
* reading tasks, so in general case connection could be taken from pool by one task and returned back by another one. And these tasks are processed completely independently.
|
||||
*/
|
||||
AllocateNewBypassingPool,
|
||||
};
|
||||
|
||||
private:
|
||||
|
||||
/** The object with the flag, whether it is currently used. */
|
||||
@ -89,37 +99,53 @@ public:
|
||||
Object & operator*() && = delete;
|
||||
const Object & operator*() const && = delete;
|
||||
|
||||
Object * operator->() & { return &*data->data.object; }
|
||||
const Object * operator->() const & { return &*data->data.object; }
|
||||
Object & operator*() & { return *data->data.object; }
|
||||
const Object & operator*() const & { return *data->data.object; }
|
||||
Object * operator->() & { return castToObjectPtr(); }
|
||||
const Object * operator->() const & { return castToObjectPtr(); }
|
||||
Object & operator*() & { return *castToObjectPtr(); }
|
||||
const Object & operator*() const & { return *castToObjectPtr(); }
|
||||
|
||||
/**
|
||||
* Expire an object to make it reallocated later.
|
||||
*/
|
||||
void expire()
|
||||
{
|
||||
data->data.is_expired = true;
|
||||
if (data.index() == 1)
|
||||
std::get<1>(data)->data.is_expired = true;
|
||||
}
|
||||
|
||||
bool isNull() const { return data == nullptr; }
|
||||
|
||||
PoolBase * getPool() const
|
||||
{
|
||||
if (!data)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Attempt to get pool from uninitialized entry");
|
||||
return &data->data.pool;
|
||||
}
|
||||
bool isNull() const { return data.index() == 0 ? !std::get<0>(data) : !std::get<1>(data); }
|
||||
|
||||
private:
|
||||
std::shared_ptr<PoolEntryHelper> data;
|
||||
/**
|
||||
* Plain object will be stored instead of PoolEntryHelper if fallback was made in get() (see BehaviourOnLimit::AllocateNewBypassingPool).
|
||||
*/
|
||||
std::variant<ObjectPtr, std::shared_ptr<PoolEntryHelper>> data;
|
||||
|
||||
explicit Entry(PooledObject & object) : data(std::make_shared<PoolEntryHelper>(object)) {}
|
||||
explicit Entry(ObjectPtr && object) : data(std::move(object)) { }
|
||||
|
||||
explicit Entry(PooledObject & object) : data(std::make_shared<PoolEntryHelper>(object)) { }
|
||||
|
||||
auto castToObjectPtr() const
|
||||
{
|
||||
return std::visit(
|
||||
[](const auto & ptr)
|
||||
{
|
||||
using T = std::decay_t<decltype(ptr)>;
|
||||
if constexpr (std::is_same_v<ObjectPtr, T>)
|
||||
return ptr.get();
|
||||
else
|
||||
return ptr->data.object.get();
|
||||
},
|
||||
data);
|
||||
}
|
||||
};
|
||||
|
||||
virtual ~PoolBase() = default;
|
||||
|
||||
/** Allocates the object. Wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite. */
|
||||
/** Allocates the object.
|
||||
* If 'behaviour_on_limit' is Wait - wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite.
|
||||
* If 'behaviour_on_limit' is AllocateNewBypassingPool and there is no free object - a new object will be created but not stored in the pool.
|
||||
*/
|
||||
Entry get(Poco::Timespan::TimeDiff timeout)
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
@ -150,6 +176,9 @@ public:
|
||||
return Entry(*items.back());
|
||||
}
|
||||
|
||||
if (behaviour_on_limit == BehaviourOnLimit::AllocateNewBypassingPool)
|
||||
return Entry(allocObject());
|
||||
|
||||
Stopwatch blocked;
|
||||
if (timeout < 0)
|
||||
{
|
||||
@ -184,6 +213,8 @@ private:
|
||||
/** The maximum size of the pool. */
|
||||
unsigned max_items;
|
||||
|
||||
BehaviourOnLimit behaviour_on_limit;
|
||||
|
||||
/** Pool. */
|
||||
Objects items;
|
||||
|
||||
@ -192,11 +223,10 @@ private:
|
||||
std::condition_variable available;
|
||||
|
||||
protected:
|
||||
|
||||
Poco::Logger * log;
|
||||
|
||||
PoolBase(unsigned max_items_, Poco::Logger * log_)
|
||||
: max_items(max_items_), log(log_)
|
||||
PoolBase(unsigned max_items_, Poco::Logger * log_, BehaviourOnLimit behaviour_on_limit_ = BehaviourOnLimit::Wait)
|
||||
: max_items(max_items_), behaviour_on_limit(behaviour_on_limit_), log(log_)
|
||||
{
|
||||
items.reserve(max_items);
|
||||
}
|
||||
|
@ -368,6 +368,10 @@ The server successfully detected this situation and will download merged part fr
|
||||
M(ReadBufferFromS3InitMicroseconds, "Time spent initializing connection to S3.") \
|
||||
M(ReadBufferFromS3Bytes, "Bytes read from S3.") \
|
||||
M(ReadBufferFromS3RequestsErrors, "Number of exceptions while reading from S3.") \
|
||||
M(ReadBufferFromS3ResetSessions, "Number of HTTP sessions that were reset in ReadBufferFromS3.") \
|
||||
M(ReadBufferFromS3PreservedSessions, "Number of HTTP sessions that were preserved in ReadBufferFromS3.") \
|
||||
\
|
||||
M(ReadWriteBufferFromHTTPPreservedSessions, "Number of HTTP sessions that were preserved in ReadWriteBufferFromHTTP.") \
|
||||
\
|
||||
M(WriteBufferFromS3Microseconds, "Time spent on writing to S3.") \
|
||||
M(WriteBufferFromS3Bytes, "Bytes written to S3.") \
|
||||
|
@ -42,23 +42,17 @@ namespace ErrorCodes
|
||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||
}
|
||||
|
||||
static size_t chooseBufferSize(const ReadSettings & settings, size_t file_size)
|
||||
{
|
||||
/// Buffers used for prefetch or pre-download better to have enough size, but not bigger than the whole file.
|
||||
return std::min<size_t>(std::max<size_t>(settings.prefetch_buffer_size, DBMS_DEFAULT_BUFFER_SIZE), file_size);
|
||||
}
|
||||
|
||||
AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer(
|
||||
ImplPtr impl_,
|
||||
IAsynchronousReader & reader_,
|
||||
const ReadSettings & settings_,
|
||||
AsyncReadCountersPtr async_read_counters_,
|
||||
FilesystemReadPrefetchesLogPtr prefetches_log_)
|
||||
: ReadBufferFromFileBase(chooseBufferSize(settings_, impl_->getFileSize()), nullptr, 0)
|
||||
: ReadBufferFromFileBase(chooseBufferSizeForRemoteReading(settings_, impl_->getFileSize()), nullptr, 0)
|
||||
, impl(std::move(impl_))
|
||||
, read_settings(settings_)
|
||||
, reader(reader_)
|
||||
, prefetch_buffer(chooseBufferSize(settings_, impl->getFileSize()))
|
||||
, prefetch_buffer(chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()))
|
||||
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
|
||||
, current_reader_id(getRandomASCIIString(8))
|
||||
, log(&Poco::Logger::get("AsynchronousBoundedReadBuffer"))
|
||||
@ -111,7 +105,7 @@ void AsynchronousBoundedReadBuffer::prefetch(Priority priority)
|
||||
last_prefetch_info.submit_time = std::chrono::system_clock::now();
|
||||
last_prefetch_info.priority = priority;
|
||||
|
||||
chassert(prefetch_buffer.size() == chooseBufferSize(read_settings, impl->getFileSize()));
|
||||
chassert(prefetch_buffer.size() == chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));
|
||||
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
|
||||
}
|
||||
@ -190,7 +184,7 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds);
|
||||
|
||||
chassert(memory.size() == chooseBufferSize(read_settings, impl->getFileSize()));
|
||||
chassert(memory.size() == chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));
|
||||
std::tie(size, offset) = impl->readInto(memory.data(), memory.size(), file_offset_of_buffer_end, bytes_to_ignore);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
|
||||
|
@ -1087,6 +1087,10 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
|
||||
first_offset,
|
||||
file_segments->toString());
|
||||
|
||||
/// Release buffer a little bit earlier.
|
||||
if (read_until_position == file_offset_of_buffer_end)
|
||||
implementation_buffer.reset();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -2,14 +2,27 @@
|
||||
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
|
||||
#include <Disks/ObjectStorages/Cached/CachedObjectStorage.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <IO/SwapHelper.h>
|
||||
#include <iostream>
|
||||
#include <base/hex.h>
|
||||
#include <Interpreters/FilesystemCacheLog.h>
|
||||
#include <base/hex.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
using namespace DB;
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
bool withCache(const ReadSettings & settings)
|
||||
{
|
||||
return settings.remote_fs_cache && settings.enable_filesystem_cache
|
||||
&& (!CurrentThread::getQueryId().empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache
|
||||
|| !settings.avoid_readthrough_cache_outside_query_context);
|
||||
}
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -18,29 +31,35 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_SEEK_THROUGH_FILE;
|
||||
}
|
||||
|
||||
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size)
|
||||
{
|
||||
/// Only when cache is used we could download bigger portions of FileSegments than what we actually gonna read within particular task.
|
||||
if (!withCache(settings))
|
||||
return settings.remote_fs_buffer_size;
|
||||
|
||||
/// Buffers used for prefetch and pre-download better to have enough size, but not bigger than the whole file.
|
||||
return std::min<size_t>(std::max<size_t>(settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE), file_size);
|
||||
}
|
||||
|
||||
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
|
||||
ReadBufferCreator && read_buffer_creator_,
|
||||
const StoredObjects & blobs_to_read_,
|
||||
const ReadSettings & settings_,
|
||||
std::shared_ptr<FilesystemCacheLog> cache_log_,
|
||||
bool use_external_buffer_)
|
||||
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : settings_.remote_fs_buffer_size, nullptr, 0)
|
||||
: ReadBufferFromFileBase(
|
||||
use_external_buffer_ ? 0 : chooseBufferSizeForRemoteReading(settings_, getTotalSize(blobs_to_read_)), nullptr, 0)
|
||||
, settings(settings_)
|
||||
, blobs_to_read(blobs_to_read_)
|
||||
, read_buffer_creator(std::move(read_buffer_creator_))
|
||||
, cache_log(settings.enable_filesystem_cache_log ? cache_log_ : nullptr)
|
||||
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
|
||||
, query_id(CurrentThread::getQueryId())
|
||||
, use_external_buffer(use_external_buffer_)
|
||||
, with_cache(withCache(settings))
|
||||
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
|
||||
{
|
||||
if (!blobs_to_read.empty())
|
||||
current_object = blobs_to_read.front();
|
||||
|
||||
with_cache = settings.remote_fs_cache
|
||||
&& settings.enable_filesystem_cache
|
||||
&& (!query_id.empty()
|
||||
|| settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache
|
||||
|| !settings.avoid_readthrough_cache_outside_query_context);
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
|
||||
|
@ -73,7 +73,7 @@ private:
|
||||
const std::shared_ptr<FilesystemCacheLog> cache_log;
|
||||
const String query_id;
|
||||
const bool use_external_buffer;
|
||||
bool with_cache;
|
||||
const bool with_cache;
|
||||
|
||||
size_t read_until_position = 0;
|
||||
size_t file_offset_of_buffer_end = 0;
|
||||
@ -86,4 +86,5 @@ private:
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size);
|
||||
}
|
||||
|
@ -132,6 +132,9 @@ std::unique_ptr<S3::Client> getClient(
|
||||
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 3000);
|
||||
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
|
||||
client_configuration.endpointOverride = uri.endpoint;
|
||||
client_configuration.http_keep_alive_timeout_ms = config.getUInt(config_prefix + ".http_keep_alive_timeout_ms", 10000);
|
||||
client_configuration.http_connection_pool_size = config.getUInt(config_prefix + ".http_connection_pool_size", 1000);
|
||||
client_configuration.wait_on_pool_size_limit = false;
|
||||
|
||||
auto proxy_config = getProxyConfiguration(config_prefix, config);
|
||||
if (proxy_config)
|
||||
|
@ -1,8 +1,10 @@
|
||||
#include <IO/HTTPCommon.h>
|
||||
|
||||
#include <Server/HTTP/HTTPServerResponse.h>
|
||||
#include <Poco/Any.h>
|
||||
#include <Common/DNSResolver.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/MemoryTrackerSwitcher.h>
|
||||
#include <Common/PoolBase.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/SipHash.h>
|
||||
@ -40,6 +42,7 @@ namespace ErrorCodes
|
||||
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
|
||||
extern const int FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME;
|
||||
extern const int UNSUPPORTED_URI_SCHEME;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
@ -107,6 +110,9 @@ namespace
|
||||
|
||||
ObjectPtr allocObject() override
|
||||
{
|
||||
/// Pool is global, we shouldn't attribute this memory to query/user.
|
||||
MemoryTrackerSwitcher switcher{&total_memory_tracker};
|
||||
|
||||
auto session = makeHTTPSessionImpl(host, port, https, true, resolve_host);
|
||||
if (!proxy_host.empty())
|
||||
{
|
||||
@ -131,8 +137,12 @@ namespace
|
||||
UInt16 proxy_port_,
|
||||
bool proxy_https_,
|
||||
size_t max_pool_size_,
|
||||
bool resolve_host_ = true)
|
||||
: Base(static_cast<unsigned>(max_pool_size_), &Poco::Logger::get("HTTPSessionPool"))
|
||||
bool resolve_host_,
|
||||
bool wait_on_pool_size_limit)
|
||||
: Base(
|
||||
static_cast<unsigned>(max_pool_size_),
|
||||
&Poco::Logger::get("HTTPSessionPool"),
|
||||
wait_on_pool_size_limit ? BehaviourOnLimit::Wait : BehaviourOnLimit::AllocateNewBypassingPool)
|
||||
, host(host_)
|
||||
, port(port_)
|
||||
, https(https_)
|
||||
@ -155,11 +165,12 @@ namespace
|
||||
String proxy_host;
|
||||
UInt16 proxy_port;
|
||||
bool is_proxy_https;
|
||||
bool wait_on_pool_size_limit;
|
||||
|
||||
bool operator ==(const Key & rhs) const
|
||||
{
|
||||
return std::tie(target_host, target_port, is_target_https, proxy_host, proxy_port, is_proxy_https)
|
||||
== std::tie(rhs.target_host, rhs.target_port, rhs.is_target_https, rhs.proxy_host, rhs.proxy_port, rhs.is_proxy_https);
|
||||
return std::tie(target_host, target_port, is_target_https, proxy_host, proxy_port, is_proxy_https, wait_on_pool_size_limit)
|
||||
== std::tie(rhs.target_host, rhs.target_port, rhs.is_target_https, rhs.proxy_host, rhs.proxy_port, rhs.is_proxy_https, rhs.wait_on_pool_size_limit);
|
||||
}
|
||||
};
|
||||
|
||||
@ -178,6 +189,7 @@ namespace
|
||||
s.update(k.proxy_host);
|
||||
s.update(k.proxy_port);
|
||||
s.update(k.is_proxy_https);
|
||||
s.update(k.wait_on_pool_size_limit);
|
||||
return s.get64();
|
||||
}
|
||||
};
|
||||
@ -218,14 +230,14 @@ namespace
|
||||
const Poco::URI & proxy_uri,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
size_t max_connections_per_endpoint,
|
||||
bool resolve_host = true)
|
||||
bool resolve_host,
|
||||
bool wait_on_pool_size_limit)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
std::unique_lock lock(mutex);
|
||||
const std::string & host = uri.getHost();
|
||||
UInt16 port = uri.getPort();
|
||||
bool https = isHTTPS(uri);
|
||||
|
||||
|
||||
String proxy_host;
|
||||
UInt16 proxy_port = 0;
|
||||
bool proxy_https = false;
|
||||
@ -236,36 +248,42 @@ namespace
|
||||
proxy_https = isHTTPS(proxy_uri);
|
||||
}
|
||||
|
||||
HTTPSessionPool::Key key{host, port, https, proxy_host, proxy_port, proxy_https};
|
||||
HTTPSessionPool::Key key{host, port, https, proxy_host, proxy_port, proxy_https, wait_on_pool_size_limit};
|
||||
auto pool_ptr = endpoints_pool.find(key);
|
||||
if (pool_ptr == endpoints_pool.end())
|
||||
std::tie(pool_ptr, std::ignore) = endpoints_pool.emplace(
|
||||
key, std::make_shared<SingleEndpointHTTPSessionPool>(host, port, https, proxy_host, proxy_port, proxy_https, max_connections_per_endpoint, resolve_host));
|
||||
key,
|
||||
std::make_shared<SingleEndpointHTTPSessionPool>(
|
||||
host,
|
||||
port,
|
||||
https,
|
||||
proxy_host,
|
||||
proxy_port,
|
||||
proxy_https,
|
||||
max_connections_per_endpoint,
|
||||
resolve_host,
|
||||
wait_on_pool_size_limit));
|
||||
|
||||
/// Some routines held session objects until the end of its lifetime. Also this routines may create another sessions in this time frame.
|
||||
/// If some other session holds `lock` because it waits on another lock inside `pool_ptr->second->get` it isn't possible to create any
|
||||
/// new session and thus finish routine, return session to the pool and unlock the thread waiting inside `pool_ptr->second->get`.
|
||||
/// To avoid such a deadlock we unlock `lock` before entering `pool_ptr->second->get`.
|
||||
lock.unlock();
|
||||
|
||||
auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
|
||||
auto session = pool_ptr->second->get(retry_timeout);
|
||||
|
||||
/// We store exception messages in session data.
|
||||
/// Poco HTTPSession also stores exception, but it can be removed at any time.
|
||||
const auto & session_data = session->sessionData();
|
||||
if (!session_data.empty())
|
||||
if (session_data.empty() || !Poco::AnyCast<HTTPSessionReuseTag>(&session_data))
|
||||
{
|
||||
auto msg = Poco::AnyCast<std::string>(session_data);
|
||||
if (!msg.empty())
|
||||
{
|
||||
LOG_TRACE((&Poco::Logger::get("HTTPCommon")), "Failed communicating with {} with error '{}' will try to reconnect session", host, msg);
|
||||
session->reset();
|
||||
|
||||
if (resolve_host)
|
||||
{
|
||||
updateHostIfIpChanged(session, DNSResolver::instance().resolveHost(host).toString());
|
||||
}
|
||||
}
|
||||
/// Reset the message, once it has been printed,
|
||||
/// otherwise you will get report for failed parts on and on,
|
||||
/// even for different tables (since they uses the same session).
|
||||
session->attachSessionData({});
|
||||
if (resolve_host)
|
||||
updateHostIfIpChanged(session, DNSResolver::instance().resolveHost(host).toString());
|
||||
}
|
||||
|
||||
session->attachSessionData({});
|
||||
|
||||
setTimeouts(*session, timeouts);
|
||||
|
||||
return session;
|
||||
@ -295,14 +313,25 @@ HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts &
|
||||
}
|
||||
|
||||
|
||||
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host)
|
||||
PooledHTTPSessionPtr makePooledHTTPSession(
|
||||
const Poco::URI & uri,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
size_t per_endpoint_pool_size,
|
||||
bool resolve_host,
|
||||
bool wait_on_pool_size_limit)
|
||||
{
|
||||
return makePooledHTTPSession(uri, {}, timeouts, per_endpoint_pool_size, resolve_host);
|
||||
return makePooledHTTPSession(uri, {}, timeouts, per_endpoint_pool_size, resolve_host, wait_on_pool_size_limit);
|
||||
}
|
||||
|
||||
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host)
|
||||
PooledHTTPSessionPtr makePooledHTTPSession(
|
||||
const Poco::URI & uri,
|
||||
const Poco::URI & proxy_uri,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
size_t per_endpoint_pool_size,
|
||||
bool resolve_host,
|
||||
bool wait_on_pool_size_limit)
|
||||
{
|
||||
return HTTPSessionPool::instance().getSession(uri, proxy_uri, timeouts, per_endpoint_pool_size, resolve_host);
|
||||
return HTTPSessionPool::instance().getSession(uri, proxy_uri, timeouts, per_endpoint_pool_size, resolve_host, wait_on_pool_size_limit);
|
||||
}
|
||||
|
||||
bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status) { return status == Poco::Net::HTTPResponse::HTTP_MOVED_PERMANENTLY || status == Poco::Net::HTTPResponse::HTTP_FOUND || status == Poco::Net::HTTPResponse::HTTP_SEE_OTHER || status == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT; }
|
||||
@ -351,4 +380,24 @@ Exception HTTPException::makeExceptionMessage(
|
||||
uri, static_cast<int>(http_status), reason, body);
|
||||
}
|
||||
|
||||
void markSessionForReuse(Poco::Net::HTTPSession & session)
|
||||
{
|
||||
const auto & session_data = session.sessionData();
|
||||
if (!session_data.empty() && !Poco::AnyCast<HTTPSessionReuseTag>(&session_data))
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR, "Data of an unexpected type ({}) is attached to the session", session_data.type().name());
|
||||
|
||||
session.attachSessionData(HTTPSessionReuseTag{});
|
||||
}
|
||||
|
||||
void markSessionForReuse(HTTPSessionPtr session)
|
||||
{
|
||||
markSessionForReuse(*session);
|
||||
}
|
||||
|
||||
void markSessionForReuse(PooledHTTPSessionPtr session)
|
||||
{
|
||||
markSessionForReuse(static_cast<Poco::Net::HTTPSession &>(*session));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -55,14 +55,38 @@ private:
|
||||
using PooledHTTPSessionPtr = PoolBase<Poco::Net::HTTPClientSession>::Entry; // SingleEndpointHTTPSessionPool::Entry
|
||||
using HTTPSessionPtr = std::shared_ptr<Poco::Net::HTTPClientSession>;
|
||||
|
||||
/// If a session have this tag attached, it will be reused without calling `reset()` on it.
|
||||
/// All pooled sessions don't have this tag attached after being taken from a pool.
|
||||
/// If the request and the response were fully written/read, the client code should add this tag
|
||||
/// explicitly by calling `markSessionForReuse()`.
|
||||
struct HTTPSessionReuseTag
|
||||
{
|
||||
};
|
||||
|
||||
void markSessionForReuse(HTTPSessionPtr session);
|
||||
void markSessionForReuse(PooledHTTPSessionPtr session);
|
||||
|
||||
|
||||
void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_timeout);
|
||||
|
||||
/// Create session object to perform requests and set required parameters.
|
||||
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host = true);
|
||||
|
||||
/// As previous method creates session, but tooks it from pool, without and with proxy uri.
|
||||
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true);
|
||||
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true);
|
||||
PooledHTTPSessionPtr makePooledHTTPSession(
|
||||
const Poco::URI & uri,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
size_t per_endpoint_pool_size,
|
||||
bool resolve_host = true,
|
||||
bool wait_on_pool_size_limit = true);
|
||||
|
||||
PooledHTTPSessionPtr makePooledHTTPSession(
|
||||
const Poco::URI & uri,
|
||||
const Poco::URI & proxy_uri,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
size_t per_endpoint_pool_size,
|
||||
bool resolve_host = true,
|
||||
bool wait_on_pool_size_limit = true);
|
||||
|
||||
bool isRedirect(Poco::Net::HTTPResponse::HTTPStatus status);
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include "config.h"
|
||||
#include <IO/HTTPCommon.h>
|
||||
#include <IO/S3Common.h>
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
@ -24,6 +25,8 @@ namespace ProfileEvents
|
||||
extern const Event ReadBufferFromS3InitMicroseconds;
|
||||
extern const Event ReadBufferFromS3Bytes;
|
||||
extern const Event ReadBufferFromS3RequestsErrors;
|
||||
extern const Event ReadBufferFromS3ResetSessions;
|
||||
extern const Event ReadBufferFromS3PreservedSessions;
|
||||
extern const Event ReadBufferSeekCancelConnection;
|
||||
extern const Event S3GetObject;
|
||||
extern const Event DiskS3GetObject;
|
||||
@ -31,6 +34,46 @@ namespace ProfileEvents
|
||||
extern const Event RemoteReadThrottlerSleepMicroseconds;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
DB::PooledHTTPSessionPtr getSession(Aws::S3::Model::GetObjectResult & read_result)
|
||||
{
|
||||
if (auto * session_aware_stream = dynamic_cast<DB::S3::SessionAwareIOStream<DB::PooledHTTPSessionPtr> *>(&read_result.GetBody()))
|
||||
return static_cast<DB::PooledHTTPSessionPtr &>(session_aware_stream->getSession());
|
||||
else if (!dynamic_cast<DB::S3::SessionAwareIOStream<DB::HTTPSessionPtr> *>(&read_result.GetBody()))
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session of unexpected type encountered");
|
||||
return {};
|
||||
}
|
||||
|
||||
void resetSession(Aws::S3::Model::GetObjectResult & read_result)
|
||||
{
|
||||
if (auto session = getSession(read_result); !session.isNull())
|
||||
{
|
||||
auto & http_session = static_cast<Poco::Net::HTTPClientSession &>(*session);
|
||||
http_session.reset();
|
||||
}
|
||||
}
|
||||
|
||||
void resetSessionIfNeeded(bool read_all_range_successfully, std::optional<Aws::S3::Model::GetObjectResult> & read_result)
|
||||
{
|
||||
if (!read_result)
|
||||
return;
|
||||
|
||||
if (!read_all_range_successfully)
|
||||
{
|
||||
/// When we abandon a session with an ongoing GetObject request and there is another one trying to delete the same object this delete
|
||||
/// operation will hang until GetObject's session idle timeouts. So we have to call `reset()` on GetObject's session session immediately.
|
||||
resetSession(*read_result);
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3ResetSessions);
|
||||
}
|
||||
else if (auto session = getSession(*read_result); !session.isNull())
|
||||
{
|
||||
DB::markSessionForReuse(session);
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3PreservedSessions);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
@ -154,7 +197,10 @@ bool ReadBufferFromS3::nextImpl()
|
||||
}
|
||||
|
||||
if (!next_result)
|
||||
{
|
||||
read_all_range_successfully = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
|
||||
|
||||
@ -240,6 +286,8 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
|
||||
if (offset_ == getPosition() && whence == SEEK_SET)
|
||||
return offset_;
|
||||
|
||||
read_all_range_successfully = false;
|
||||
|
||||
if (impl && restricted_seek)
|
||||
{
|
||||
throw Exception(
|
||||
@ -312,6 +360,8 @@ void ReadBufferFromS3::setReadUntilPosition(size_t position)
|
||||
{
|
||||
if (position != static_cast<size_t>(read_until_position))
|
||||
{
|
||||
read_all_range_successfully = false;
|
||||
|
||||
if (impl)
|
||||
{
|
||||
if (!atEndOfRequestedRangeGuess())
|
||||
@ -328,6 +378,8 @@ void ReadBufferFromS3::setReadUntilEnd()
|
||||
{
|
||||
if (read_until_position)
|
||||
{
|
||||
read_all_range_successfully = false;
|
||||
|
||||
read_until_position = 0;
|
||||
if (impl)
|
||||
{
|
||||
@ -351,8 +403,23 @@ bool ReadBufferFromS3::atEndOfRequestedRangeGuess()
|
||||
return false;
|
||||
}
|
||||
|
||||
ReadBufferFromS3::~ReadBufferFromS3()
|
||||
{
|
||||
try
|
||||
{
|
||||
resetSessionIfNeeded(readAllRangeSuccessfully(), read_result);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
|
||||
{
|
||||
resetSessionIfNeeded(readAllRangeSuccessfully(), read_result);
|
||||
read_all_range_successfully = false;
|
||||
|
||||
/**
|
||||
* If remote_filesystem_read_method = 'threadpool', then for MergeTree family tables
|
||||
* exact byte ranges to read are always passed here.
|
||||
@ -363,7 +430,7 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
|
||||
read_result = sendRequest(offset, read_until_position ? std::make_optional(read_until_position - 1) : std::nullopt);
|
||||
|
||||
size_t buffer_size = use_external_buffer ? 0 : read_settings.remote_fs_buffer_size;
|
||||
return std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size);
|
||||
return std::make_unique<ReadBufferFromIStream>(read_result->GetBody(), buffer_size);
|
||||
}
|
||||
|
||||
Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t range_begin, std::optional<size_t> range_end_incl) const
|
||||
@ -415,6 +482,10 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t range_begin
|
||||
}
|
||||
}
|
||||
|
||||
bool ReadBufferFromS3::readAllRangeSuccessfully() const
|
||||
{
|
||||
return read_until_position ? offset == read_until_position : read_all_range_successfully;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -41,7 +41,7 @@ private:
|
||||
std::atomic<off_t> offset = 0;
|
||||
std::atomic<off_t> read_until_position = 0;
|
||||
|
||||
Aws::S3::Model::GetObjectResult read_result;
|
||||
std::optional<Aws::S3::Model::GetObjectResult> read_result;
|
||||
std::unique_ptr<ReadBuffer> impl;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("ReadBufferFromS3");
|
||||
@ -60,6 +60,8 @@ public:
|
||||
bool restricted_seek_ = false,
|
||||
std::optional<size_t> file_size = std::nullopt);
|
||||
|
||||
~ReadBufferFromS3() override;
|
||||
|
||||
bool nextImpl() override;
|
||||
|
||||
off_t seek(off_t off, int whence) override;
|
||||
@ -93,6 +95,8 @@ private:
|
||||
|
||||
Aws::S3::Model::GetObjectResult sendRequest(size_t range_begin, std::optional<size_t> range_end_incl) const;
|
||||
|
||||
bool readAllRangeSuccessfully() const;
|
||||
|
||||
ReadSettings read_settings;
|
||||
|
||||
bool use_external_buffer;
|
||||
@ -100,6 +104,8 @@ private:
|
||||
/// There is different seek policy for disk seek and for non-disk seek
|
||||
/// (non-disk seek is applied for seekable input formats: orc, arrow, parquet).
|
||||
bool restricted_seek;
|
||||
|
||||
bool read_all_range_successfully = false;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,8 +1,11 @@
|
||||
#include "ReadWriteBufferFromHTTP.h"
|
||||
|
||||
#include <IO/HTTPCommon.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event ReadBufferSeekCancelConnection;
|
||||
extern const Event ReadWriteBufferFromHTTPPreservedSessions;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -146,30 +149,20 @@ std::istream * ReadWriteBufferFromHTTPBase<UpdatableSessionPtr>::callImpl(
|
||||
LOG_TRACE(log, "Sending request to {}", uri_.toString());
|
||||
|
||||
auto sess = current_session->getSession();
|
||||
try
|
||||
{
|
||||
auto & stream_out = sess->sendRequest(request);
|
||||
auto & stream_out = sess->sendRequest(request);
|
||||
|
||||
if (out_stream_callback)
|
||||
out_stream_callback(stream_out);
|
||||
if (out_stream_callback)
|
||||
out_stream_callback(stream_out);
|
||||
|
||||
auto result_istr = receiveResponse(*sess, request, response, true);
|
||||
response.getCookies(cookies);
|
||||
auto result_istr = receiveResponse(*sess, request, response, true);
|
||||
response.getCookies(cookies);
|
||||
|
||||
/// we can fetch object info while the request is being processed
|
||||
/// and we don't want to override any context used by it
|
||||
if (!for_object_info)
|
||||
content_encoding = response.get("Content-Encoding", "");
|
||||
/// we can fetch object info while the request is being processed
|
||||
/// and we don't want to override any context used by it
|
||||
if (!for_object_info)
|
||||
content_encoding = response.get("Content-Encoding", "");
|
||||
|
||||
return result_istr;
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
/// We use session data storage as storage for exception text
|
||||
/// Depend on it we can deduce to reconnect session or reresolve session host
|
||||
sess->attachSessionData(e.message());
|
||||
throw;
|
||||
}
|
||||
return result_istr;
|
||||
}
|
||||
|
||||
template <typename UpdatableSessionPtr>
|
||||
@ -429,23 +422,10 @@ void ReadWriteBufferFromHTTPBase<UpdatableSessionPtr>::initialize()
|
||||
if (!read_range.end && response.hasContentLength())
|
||||
file_info = parseFileInfo(response, withPartialContent(read_range) ? getOffset() : 0);
|
||||
|
||||
try
|
||||
{
|
||||
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size);
|
||||
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size);
|
||||
|
||||
if (use_external_buffer)
|
||||
{
|
||||
setupExternalBuffer();
|
||||
}
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
/// We use session data storage as storage for exception text
|
||||
/// Depend on it we can deduce to reconnect session or reresolve session host
|
||||
auto sess = session->getSession();
|
||||
sess->attachSessionData(e.message());
|
||||
throw;
|
||||
}
|
||||
if (use_external_buffer)
|
||||
setupExternalBuffer();
|
||||
}
|
||||
|
||||
template <typename UpdatableSessionPtr>
|
||||
@ -460,7 +440,12 @@ bool ReadWriteBufferFromHTTPBase<UpdatableSessionPtr>::nextImpl()
|
||||
|
||||
if ((read_range.end && getOffset() > read_range.end.value()) ||
|
||||
(file_info && file_info->file_size && getOffset() >= file_info->file_size.value()))
|
||||
{
|
||||
/// Response was fully read.
|
||||
markSessionForReuse(session->getSession());
|
||||
ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPPreservedSessions);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (impl)
|
||||
{
|
||||
@ -582,7 +567,12 @@ bool ReadWriteBufferFromHTTPBase<UpdatableSessionPtr>::nextImpl()
|
||||
std::rethrow_exception(exception);
|
||||
|
||||
if (!result)
|
||||
{
|
||||
/// Eof is reached, i.e response was fully read.
|
||||
markSessionForReuse(session->getSession());
|
||||
ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPPreservedSessions);
|
||||
return false;
|
||||
}
|
||||
|
||||
internal_buffer = impl->buffer();
|
||||
working_buffer = internal_buffer;
|
||||
@ -635,12 +625,17 @@ size_t ReadWriteBufferFromHTTPBase<UpdatableSessionPtr>::readBigAt(char * to, si
|
||||
bool cancelled;
|
||||
size_t r = copyFromIStreamWithProgressCallback(*result_istr, to, n, progress_callback, &cancelled);
|
||||
|
||||
if (!cancelled)
|
||||
{
|
||||
/// Response was fully read.
|
||||
markSessionForReuse(sess);
|
||||
ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPPreservedSessions);
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
sess->attachSessionData(e.message());
|
||||
|
||||
LOG_ERROR(
|
||||
log,
|
||||
"HTTP request (positioned) to `{}` with range [{}, {}) failed at try {}/{}: {}",
|
||||
|
@ -1,3 +1,4 @@
|
||||
#include <Poco/Timespan.h>
|
||||
#include "Common/DNSResolver.h"
|
||||
#include "config.h"
|
||||
|
||||
@ -138,8 +139,9 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_config
|
||||
, timeouts(ConnectionTimeouts(
|
||||
Poco::Timespan(client_configuration.connectTimeoutMs * 1000), /// connection timeout.
|
||||
Poco::Timespan(client_configuration.requestTimeoutMs * 1000), /// send timeout.
|
||||
Poco::Timespan(client_configuration.requestTimeoutMs * 1000) /// receive timeout.
|
||||
))
|
||||
Poco::Timespan(client_configuration.requestTimeoutMs * 1000), /// receive timeout.
|
||||
Poco::Timespan(client_configuration.enableTcpKeepAlive ? client_configuration.tcpKeepAliveIntervalMs * 1000 : 0),
|
||||
Poco::Timespan(client_configuration.http_keep_alive_timeout_ms * 1000))) /// flag indicating whether keep-alive is enabled is set to each session upon creation
|
||||
, remote_host_filter(client_configuration.remote_host_filter)
|
||||
, s3_max_redirects(client_configuration.s3_max_redirects)
|
||||
, enable_s3_requests_logging(client_configuration.enable_s3_requests_logging)
|
||||
@ -147,6 +149,8 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_config
|
||||
, get_request_throttler(client_configuration.get_request_throttler)
|
||||
, put_request_throttler(client_configuration.put_request_throttler)
|
||||
, extra_headers(client_configuration.extra_headers)
|
||||
, http_connection_pool_size(client_configuration.http_connection_pool_size)
|
||||
, wait_on_pool_size_limit(client_configuration.wait_on_pool_size_limit)
|
||||
{
|
||||
}
|
||||
|
||||
@ -254,9 +258,27 @@ void PocoHTTPClient::addMetric(const Aws::Http::HttpRequest & request, S3MetricT
|
||||
void PocoHTTPClient::makeRequestInternal(
|
||||
Aws::Http::HttpRequest & request,
|
||||
std::shared_ptr<PocoHTTPResponse> & response,
|
||||
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter ,
|
||||
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const
|
||||
{
|
||||
/// Most sessions in pool are already connected and it is not possible to set proxy host/port to a connected session.
|
||||
const auto request_configuration = per_request_configuration(request);
|
||||
if (http_connection_pool_size && request_configuration.proxy_host.empty())
|
||||
makeRequestInternalImpl<true>(request, request_configuration, response, readLimiter, writeLimiter);
|
||||
else
|
||||
makeRequestInternalImpl<false>(request, request_configuration, response, readLimiter, writeLimiter);
|
||||
}
|
||||
|
||||
template <bool pooled>
|
||||
void PocoHTTPClient::makeRequestInternalImpl(
|
||||
Aws::Http::HttpRequest & request,
|
||||
const ClientConfigurationPerRequest & request_configuration,
|
||||
std::shared_ptr<PocoHTTPResponse> & response,
|
||||
Aws::Utils::RateLimits::RateLimiterInterface *,
|
||||
Aws::Utils::RateLimits::RateLimiterInterface *) const
|
||||
{
|
||||
using SessionPtr = std::conditional_t<pooled, PooledHTTPSessionPtr, HTTPSessionPtr>;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("AWSClient");
|
||||
|
||||
auto uri = request.GetUri().GetURIString();
|
||||
@ -303,8 +325,7 @@ void PocoHTTPClient::makeRequestInternal(
|
||||
for (unsigned int attempt = 0; attempt <= s3_max_redirects; ++attempt)
|
||||
{
|
||||
Poco::URI target_uri(uri);
|
||||
HTTPSessionPtr session;
|
||||
auto request_configuration = per_request_configuration(request);
|
||||
SessionPtr session;
|
||||
|
||||
if (!request_configuration.proxy_host.empty())
|
||||
{
|
||||
@ -313,7 +334,11 @@ void PocoHTTPClient::makeRequestInternal(
|
||||
|
||||
/// Reverse proxy can replace host header with resolved ip address instead of host name.
|
||||
/// This can lead to request signature difference on S3 side.
|
||||
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ false);
|
||||
if constexpr (pooled)
|
||||
session = makePooledHTTPSession(
|
||||
target_uri, timeouts, http_connection_pool_size, /* resolve_host = */ true, wait_on_pool_size_limit);
|
||||
else
|
||||
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ false);
|
||||
bool use_tunnel = request_configuration.proxy_scheme == Aws::Http::Scheme::HTTP && target_uri.getScheme() == "https";
|
||||
|
||||
session->setProxy(
|
||||
@ -325,7 +350,11 @@ void PocoHTTPClient::makeRequestInternal(
|
||||
}
|
||||
else
|
||||
{
|
||||
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ true);
|
||||
if constexpr (pooled)
|
||||
session = makePooledHTTPSession(
|
||||
target_uri, timeouts, http_connection_pool_size, /* resolve_host = */ true, wait_on_pool_size_limit);
|
||||
else
|
||||
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ false);
|
||||
}
|
||||
|
||||
/// In case of error this address will be written to logs
|
||||
|
@ -53,6 +53,13 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
|
||||
ThrottlerPtr put_request_throttler;
|
||||
HTTPHeaderEntries extra_headers;
|
||||
|
||||
/// Not a client parameter in terms of HTTP and we won't send it to the server. Used internally to determine when connection have to be re-established.
|
||||
uint32_t http_keep_alive_timeout_ms = 0;
|
||||
/// Zero means pooling will not be used.
|
||||
size_t http_connection_pool_size = 0;
|
||||
/// See PoolBase::BehaviourOnLimit
|
||||
bool wait_on_pool_size_limit = true;
|
||||
|
||||
void updateSchemeAndRegion();
|
||||
|
||||
std::function<void(const ClientConfigurationPerRequest &)> error_report;
|
||||
@ -90,6 +97,12 @@ public:
|
||||
);
|
||||
}
|
||||
|
||||
void SetResponseBody(Aws::IStream & incoming_stream, PooledHTTPSessionPtr & session_) /// NOLINT
|
||||
{
|
||||
body_stream = Aws::Utils::Stream::ResponseStream(
|
||||
Aws::New<SessionAwareIOStream<PooledHTTPSessionPtr>>("http result streambuf", session_, incoming_stream.rdbuf()));
|
||||
}
|
||||
|
||||
void SetResponseBody(std::string & response_body) /// NOLINT
|
||||
{
|
||||
auto stream = Aws::New<std::stringstream>("http result buf", response_body); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
@ -149,6 +162,15 @@ private:
|
||||
EnumSize,
|
||||
};
|
||||
|
||||
template <bool pooled>
|
||||
void makeRequestInternalImpl(
|
||||
Aws::Http::HttpRequest & request,
|
||||
const ClientConfigurationPerRequest & per_request_configuration,
|
||||
std::shared_ptr<PocoHTTPResponse> & response,
|
||||
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
|
||||
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const;
|
||||
|
||||
protected:
|
||||
static S3MetricKind getMetricKind(const Aws::Http::HttpRequest & request);
|
||||
void addMetric(const Aws::Http::HttpRequest & request, S3MetricType type, ProfileEvents::Count amount = 1) const;
|
||||
|
||||
@ -170,6 +192,9 @@ private:
|
||||
ThrottlerPtr put_request_throttler;
|
||||
|
||||
const HTTPHeaderEntries extra_headers;
|
||||
|
||||
size_t http_connection_pool_size = 0;
|
||||
bool wait_on_pool_size_limit = true;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -18,6 +18,10 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
Session & getSession() { return session; }
|
||||
|
||||
const Session & getSession() const { return session; }
|
||||
|
||||
private:
|
||||
/// Poco HTTP session is holder of response stream.
|
||||
Session session;
|
||||
|
@ -125,7 +125,7 @@ void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr excep
|
||||
// Entries data must be destroyed in context of user who runs async insert.
|
||||
// Each entry in the list may correspond to a different user,
|
||||
// so we need to switch current thread's MemoryTracker.
|
||||
UserMemoryTrackerSwitcher switcher(user_memory_tracker);
|
||||
MemoryTrackerSwitcher switcher(user_memory_tracker);
|
||||
bytes = "";
|
||||
}
|
||||
|
||||
|
@ -1,10 +1,12 @@
|
||||
#pragma once
|
||||
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Core/Settings.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Poco/Logger.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/MemoryTrackerSwitcher.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
#include <future>
|
||||
|
||||
namespace DB
|
||||
@ -60,31 +62,6 @@ private:
|
||||
UInt128 calculateHash() const;
|
||||
};
|
||||
|
||||
struct UserMemoryTrackerSwitcher
|
||||
{
|
||||
explicit UserMemoryTrackerSwitcher(MemoryTracker * new_tracker)
|
||||
{
|
||||
auto * thread_tracker = CurrentThread::getMemoryTracker();
|
||||
prev_untracked_memory = current_thread->untracked_memory;
|
||||
prev_memory_tracker_parent = thread_tracker->getParent();
|
||||
|
||||
current_thread->untracked_memory = 0;
|
||||
thread_tracker->setParent(new_tracker);
|
||||
}
|
||||
|
||||
~UserMemoryTrackerSwitcher()
|
||||
{
|
||||
CurrentThread::flushUntrackedMemory();
|
||||
auto * thread_tracker = CurrentThread::getMemoryTracker();
|
||||
|
||||
current_thread->untracked_memory = prev_untracked_memory;
|
||||
thread_tracker->setParent(prev_memory_tracker_parent);
|
||||
}
|
||||
|
||||
MemoryTracker * prev_memory_tracker_parent;
|
||||
Int64 prev_untracked_memory;
|
||||
};
|
||||
|
||||
struct InsertData
|
||||
{
|
||||
struct Entry
|
||||
@ -114,7 +91,7 @@ private:
|
||||
// so we need to switch current thread's MemoryTracker parent on each iteration.
|
||||
while (it != entries.end())
|
||||
{
|
||||
UserMemoryTrackerSwitcher switcher((*it)->user_memory_tracker);
|
||||
MemoryTrackerSwitcher switcher((*it)->user_memory_tracker);
|
||||
it = entries.erase(it);
|
||||
}
|
||||
}
|
||||
|
@ -935,23 +935,17 @@ private:
|
||||
request.setHost(url.getHost());
|
||||
|
||||
auto session = makePooledHTTPSession(url, timeouts, 1);
|
||||
std::istream * response_body{};
|
||||
try
|
||||
{
|
||||
session->sendRequest(request);
|
||||
session->sendRequest(request);
|
||||
|
||||
Poco::Net::HTTPResponse response;
|
||||
std::istream * response_body = receiveResponse(*session, request, response, false);
|
||||
|
||||
Poco::Net::HTTPResponse response;
|
||||
response_body = receiveResponse(*session, request, response, false);
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
/// We use session data storage as storage for exception text
|
||||
/// Depend on it we can deduce to reconnect session or reresolve session host
|
||||
session->attachSessionData(e.message());
|
||||
throw;
|
||||
}
|
||||
Poco::JSON::Parser parser;
|
||||
auto json_body = parser.parse(*response_body).extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
/// Response was fully read.
|
||||
markSessionForReuse(session);
|
||||
|
||||
auto schema = json_body->getValue<std::string>("schema");
|
||||
LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Successfully fetched schema id = {}\n{}", id, schema);
|
||||
return avro::compileJsonSchemaFromString(schema);
|
||||
|
@ -80,6 +80,7 @@ void InterserverIOHTTPHandler::processQuery(HTTPServerRequest & request, HTTPSer
|
||||
void InterserverIOHTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
|
||||
{
|
||||
setThreadName("IntersrvHandler");
|
||||
ThreadStatus thread_status;
|
||||
|
||||
/// In order to work keep-alive.
|
||||
if (request.getVersion() == HTTPServerRequest::HTTP_1_1)
|
||||
|
@ -1,18 +1,18 @@
|
||||
#include <Storages/MergeTree/MergeTreePrefetchedReadPool.h>
|
||||
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
|
||||
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
|
||||
#include <Storages/MergeTree/RangesInDataPart.h>
|
||||
#include <Storages/MergeTree/MarkRange.h>
|
||||
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
|
||||
#include <Storages/MergeTree/IMergeTreeReader.h>
|
||||
#include <Storages/MergeTree/AlterConversions.h>
|
||||
#include <Storages/MergeTree/MergeTreeRangeReader.h>
|
||||
#include <Interpreters/threadPoolCallbackRunner.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/threadPoolCallbackRunner.h>
|
||||
#include <Storages/MergeTree/AlterConversions.h>
|
||||
#include <Storages/MergeTree/IMergeTreeReader.h>
|
||||
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
|
||||
#include <Storages/MergeTree/MarkRange.h>
|
||||
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
|
||||
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
|
||||
#include <Storages/MergeTree/MergeTreePrefetchedReadPool.h>
|
||||
#include <Storages/MergeTree/MergeTreeRangeReader.h>
|
||||
#include <Storages/MergeTree/RangesInDataPart.h>
|
||||
#include <base/getThreadId.h>
|
||||
#include <Common/ElapsedTimeProfileEventIncrement.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <base/getThreadId.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
@ -296,31 +296,12 @@ MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::getTask(size_t thread)
|
||||
return task;
|
||||
}
|
||||
|
||||
size_t MergeTreePrefetchedReadPool::getApproxSizeOfGranule(const IMergeTreeDataPart & part) const
|
||||
size_t getApproximateSizeOfGranule(const IMergeTreeDataPart & part, const Names & columns_to_read)
|
||||
{
|
||||
const auto & columns = part.getColumns();
|
||||
auto all_columns_are_fixed_size = columns.end() == std::find_if(
|
||||
columns.begin(), columns.end(),
|
||||
[](const auto & col){ return col.type->haveMaximumSizeOfValue() == false; });
|
||||
|
||||
if (all_columns_are_fixed_size)
|
||||
{
|
||||
size_t approx_size = 0;
|
||||
for (const auto & col : columns)
|
||||
approx_size += col.type->getMaximumSizeOfValueInMemory() * fixed_index_granularity;
|
||||
|
||||
if (!index_granularity_bytes)
|
||||
return approx_size;
|
||||
|
||||
return std::min(index_granularity_bytes, approx_size);
|
||||
}
|
||||
|
||||
const size_t approx_size = static_cast<size_t>(std::round(static_cast<double>(part.getBytesOnDisk()) / part.getMarksCount()));
|
||||
|
||||
if (!index_granularity_bytes)
|
||||
return approx_size;
|
||||
|
||||
return std::min(index_granularity_bytes, approx_size);
|
||||
ColumnSize columns_size{};
|
||||
for (const auto & col_name : columns_to_read)
|
||||
columns_size.add(part.getColumnSize(col_name));
|
||||
return columns_size.data_compressed / part.getMarksCount();
|
||||
}
|
||||
|
||||
MergeTreePrefetchedReadPool::PartsInfos MergeTreePrefetchedReadPool::getPartsInfos(
|
||||
@ -347,7 +328,7 @@ MergeTreePrefetchedReadPool::PartsInfos MergeTreePrefetchedReadPool::getPartsInf
|
||||
for (const auto & range : part.ranges)
|
||||
part_info->sum_marks += range.end - range.begin;
|
||||
|
||||
part_info->approx_size_of_mark = getApproxSizeOfGranule(*part_info->data_part);
|
||||
part_info->approx_size_of_mark = getApproximateSizeOfGranule(*part_info->data_part, column_names);
|
||||
|
||||
const auto task_columns = getReadTaskColumns(
|
||||
part_reader_info,
|
||||
@ -357,7 +338,7 @@ MergeTreePrefetchedReadPool::PartsInfos MergeTreePrefetchedReadPool::getPartsInf
|
||||
prewhere_info,
|
||||
actions_settings,
|
||||
reader_settings,
|
||||
/*with_subcolumns=*/ true);
|
||||
/* with_subcolumns */ true);
|
||||
|
||||
part_info->size_predictor = !predict_block_size_bytes
|
||||
? nullptr
|
||||
@ -421,10 +402,6 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
|
||||
}
|
||||
|
||||
size_t min_prefetch_step_marks = 0;
|
||||
if (settings.filesystem_prefetches_limit && settings.filesystem_prefetches_limit < sum_marks)
|
||||
{
|
||||
min_prefetch_step_marks = static_cast<size_t>(std::round(static_cast<double>(sum_marks) / settings.filesystem_prefetches_limit));
|
||||
}
|
||||
|
||||
for (const auto & part : parts_infos)
|
||||
{
|
||||
@ -437,12 +414,6 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
|
||||
part->prefetch_step_marks = std::max<size_t>(
|
||||
1, static_cast<size_t>(std::round(static_cast<double>(settings.filesystem_prefetch_step_bytes) / part->approx_size_of_mark)));
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Experimentally derived ratio.
|
||||
part->prefetch_step_marks = static_cast<size_t>(
|
||||
std::round(std::pow(std::max<size_t>(1, static_cast<size_t>(std::round(sum_marks / 1000))), double(1.5))));
|
||||
}
|
||||
|
||||
/// This limit is important to avoid spikes of slow aws getObject requests when parallelizing within one file.
|
||||
/// (The default is taken from here https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/use-byte-range-fetches.html).
|
||||
@ -450,13 +421,13 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
|
||||
&& settings.filesystem_prefetch_min_bytes_for_single_read_task
|
||||
&& part->approx_size_of_mark < settings.filesystem_prefetch_min_bytes_for_single_read_task)
|
||||
{
|
||||
|
||||
const size_t new_min_prefetch_step_marks = static_cast<size_t>(
|
||||
const size_t min_prefetch_step_marks_by_total_cols = static_cast<size_t>(
|
||||
std::ceil(static_cast<double>(settings.filesystem_prefetch_min_bytes_for_single_read_task) / part->approx_size_of_mark));
|
||||
/// At least one task to start working on it right now and another one to prefetch in the meantime.
|
||||
const size_t new_min_prefetch_step_marks = std::min<size_t>(min_prefetch_step_marks_by_total_cols, sum_marks / threads / 2);
|
||||
if (min_prefetch_step_marks < new_min_prefetch_step_marks)
|
||||
{
|
||||
LOG_TEST(
|
||||
log, "Increasing min prefetch step from {} to {}", min_prefetch_step_marks, new_min_prefetch_step_marks);
|
||||
LOG_DEBUG(log, "Increasing min prefetch step from {} to {}", min_prefetch_step_marks, new_min_prefetch_step_marks);
|
||||
|
||||
min_prefetch_step_marks = new_min_prefetch_step_marks;
|
||||
}
|
||||
@ -464,25 +435,33 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
|
||||
|
||||
if (part->prefetch_step_marks < min_prefetch_step_marks)
|
||||
{
|
||||
LOG_TEST(
|
||||
log, "Increasing prefetch step from {} to {} because of the prefetches limit {}",
|
||||
part->prefetch_step_marks, min_prefetch_step_marks, settings.filesystem_prefetches_limit);
|
||||
LOG_DEBUG(log, "Increasing prefetch step from {} to {}", part->prefetch_step_marks, min_prefetch_step_marks);
|
||||
|
||||
part->prefetch_step_marks = min_prefetch_step_marks;
|
||||
}
|
||||
|
||||
LOG_TEST(log,
|
||||
"Part: {}, sum_marks: {}, approx mark size: {}, prefetch_step_bytes: {}, prefetch_step_marks: {}, (ranges: {})",
|
||||
part->data_part->name, part->sum_marks, part->approx_size_of_mark,
|
||||
settings.filesystem_prefetch_step_bytes, part->prefetch_step_marks, toString(part->ranges));
|
||||
LOG_DEBUG(
|
||||
log,
|
||||
"Part: {}, sum_marks: {}, approx mark size: {}, prefetch_step_bytes: {}, prefetch_step_marks: {}, (ranges: {})",
|
||||
part->data_part->name,
|
||||
part->sum_marks,
|
||||
part->approx_size_of_mark,
|
||||
settings.filesystem_prefetch_step_bytes,
|
||||
part->prefetch_step_marks,
|
||||
toString(part->ranges));
|
||||
}
|
||||
|
||||
const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;
|
||||
|
||||
LOG_DEBUG(
|
||||
log,
|
||||
"Sum marks: {}, threads: {}, min_marks_per_thread: {}, result prefetch step marks: {}, prefetches limit: {}, total_size_approx: {}",
|
||||
sum_marks, threads, min_marks_per_thread, settings.filesystem_prefetch_step_bytes, settings.filesystem_prefetches_limit, total_size_approx);
|
||||
"Sum marks: {}, threads: {}, min_marks_per_thread: {}, min prefetch step marks: {}, prefetches limit: {}, total_size_approx: {}",
|
||||
sum_marks,
|
||||
threads,
|
||||
min_marks_per_thread,
|
||||
min_prefetch_step_marks,
|
||||
settings.filesystem_prefetches_limit,
|
||||
total_size_approx);
|
||||
|
||||
size_t allowed_memory_usage = settings.filesystem_prefetch_max_memory_usage;
|
||||
if (!allowed_memory_usage)
|
||||
@ -492,6 +471,7 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
|
||||
: std::nullopt;
|
||||
|
||||
ThreadsTasks result_threads_tasks;
|
||||
size_t total_tasks = 0;
|
||||
for (size_t i = 0, part_idx = 0; i < threads && part_idx < parts_infos.size(); ++i)
|
||||
{
|
||||
int64_t need_marks = min_marks_per_thread;
|
||||
@ -606,12 +586,11 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
|
||||
++priority.value;
|
||||
|
||||
result_threads_tasks[i].push_back(std::move(read_task));
|
||||
++total_tasks;
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TEST(
|
||||
log, "Result tasks {} for {} threads: {}",
|
||||
result_threads_tasks.size(), threads, dumpTasks(result_threads_tasks));
|
||||
LOG_TEST(log, "Result tasks {} for {} threads: {}", total_tasks, threads, dumpTasks(result_threads_tasks));
|
||||
|
||||
return result_threads_tasks;
|
||||
}
|
||||
|
@ -0,0 +1,2 @@
|
||||
1
|
||||
1
|
59
tests/queries/0_stateless/02789_reading_from_s3_with_connection_pool.sh
Executable file
59
tests/queries/0_stateless/02789_reading_from_s3_with_connection_pool.sh
Executable file
@ -0,0 +1,59 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-fasttest, no-random-settings, no-replicated-database
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
${CLICKHOUSE_CLIENT} -nm --query "
|
||||
DROP TABLE IF EXISTS test_s3;
|
||||
|
||||
CREATE TABLE test_s3 (a UInt64, b UInt64)
|
||||
ENGINE = MergeTree ORDER BY a
|
||||
SETTINGS disk = 's3_disk', min_bytes_for_wide_part = 0;
|
||||
|
||||
INSERT INTO test_s3 SELECT number, number FROM numbers_mt(1e7);
|
||||
"
|
||||
query="SELECT a, b FROM test_s3"
|
||||
query_id=$(${CLICKHOUSE_CLIENT} --query "select queryID() from ($query) limit 1" 2>&1)
|
||||
${CLICKHOUSE_CLIENT} --query "SYSTEM FLUSH LOGS"
|
||||
${CLICKHOUSE_CLIENT} -nm --query "
|
||||
WITH
|
||||
ProfileEvents['ReadBufferFromS3ResetSessions'] AS reset,
|
||||
ProfileEvents['ReadBufferFromS3PreservedSessions'] AS preserved
|
||||
SELECT preserved > reset
|
||||
FROM system.query_log
|
||||
WHERE type = 'QueryFinish'
|
||||
AND current_database = currentDatabase()
|
||||
AND query_id='$query_id';
|
||||
"
|
||||
|
||||
|
||||
# Test connection pool in ReadWriteBufferFromHTTP
|
||||
|
||||
query_id=$(${CLICKHOUSE_CLIENT} -nq "
|
||||
create table mut (n int, m int, k int) engine=ReplicatedMergeTree('/test/02441/{database}/mut', '1') order by n;
|
||||
set insert_keeper_fault_injection_probability=0;
|
||||
insert into mut values (1, 2, 3), (10, 20, 30);
|
||||
|
||||
system stop merges mut;
|
||||
alter table mut delete where n = 10;
|
||||
|
||||
select queryID() from(
|
||||
-- a funny way to wait for a MUTATE_PART to be assigned
|
||||
select sleepEachRow(2) from url('http://localhost:8123/?param_tries={1..10}&query=' || encodeURLComponent(
|
||||
'select 1 where ''MUTATE_PART'' not in (select type from system.replication_queue where database=''' || currentDatabase() || ''' and table=''mut'')'
|
||||
), 'LineAsString', 's String')
|
||||
-- queryID() will be returned for each row, since the query above doesn't return anything we need to return a fake row
|
||||
union all
|
||||
select 1
|
||||
) limit 1 settings max_threads=1;
|
||||
" 2>&1)
|
||||
${CLICKHOUSE_CLIENT} --query "SYSTEM FLUSH LOGS"
|
||||
${CLICKHOUSE_CLIENT} -nm --query "
|
||||
SELECT ProfileEvents['ReadWriteBufferFromHTTPPreservedSessions'] > 0
|
||||
FROM system.query_log
|
||||
WHERE type = 'QueryFinish'
|
||||
AND current_database = currentDatabase()
|
||||
AND query_id='$query_id';
|
||||
"
|
Loading…
Reference in New Issue
Block a user