Merge branch 'master' into output-format-json-compact-with-progress

This commit is contained in:
Alexey Korepanov 2024-09-02 22:22:57 +02:00
commit ba9c8a1625
34 changed files with 721 additions and 183 deletions

View File

@ -19,6 +19,8 @@
#include <ios>
#include <memory>
#include <functional>
#include "Poco/Any.h"
#include "Poco/Buffer.h"
#include "Poco/Exception.h"
@ -33,6 +35,27 @@ namespace Net
{
class IHTTPSessionDataHooks
/// Interface to control stream of data bytes being sent or received though socket by HTTPSession
/// It allows to monitor, throttle and schedule data streams with syscall granulatrity
{
public:
virtual ~IHTTPSessionDataHooks() = default;
virtual void atStart(int bytes) = 0;
/// Called before sending/receiving data `bytes` to/from socket.
virtual void atFinish(int bytes) = 0;
/// Called when sending/receiving of data `bytes` is successfully finished.
virtual void atFail() = 0;
/// If an error occurred during send/receive `fail()` is called instead of `finish()`.
};
using HTTPSessionDataHooksPtr = std::shared_ptr<IHTTPSessionDataHooks>;
class Net_API HTTPSession
/// HTTPSession implements basic HTTP session management
/// for both HTTP clients and HTTP servers.
@ -73,6 +96,12 @@ namespace Net
Poco::Timespan getReceiveTimeout() const;
/// Returns receive timeout for the HTTP session.
void setSendDataHooks(const HTTPSessionDataHooksPtr & sendDataHooks = {});
/// Sets data hooks that will be called on every sent to the socket.
void setReceiveDataHooks(const HTTPSessionDataHooksPtr & receiveDataHooks = {});
/// Sets data hooks that will be called on every receive from the socket.
bool connected() const;
/// Returns true if the underlying socket is connected.
@ -211,6 +240,10 @@ namespace Net
Poco::Exception * _pException;
Poco::Any _data;
// Data hooks
HTTPSessionDataHooksPtr _sendDataHooks;
HTTPSessionDataHooksPtr _receiveDataHooks;
friend class HTTPStreamBuf;
friend class HTTPHeaderStreamBuf;
friend class HTTPFixedLengthStreamBuf;
@ -246,6 +279,16 @@ namespace Net
return _receiveTimeout;
}
inline void HTTPSession::setSendDataHooks(const HTTPSessionDataHooksPtr & sendDataHooks)
{
_sendDataHooks = sendDataHooks;
}
inline void HTTPSession::setReceiveDataHooks(const HTTPSessionDataHooksPtr & receiveDataHooks)
{
_receiveDataHooks = receiveDataHooks;
}
inline StreamSocket & HTTPSession::socket()
{
return _socket;

View File

@ -128,14 +128,14 @@ int HTTPSession::get()
{
if (_pCurrent == _pEnd)
refill();
if (_pCurrent < _pEnd)
return *_pCurrent++;
else
return std::char_traits<char>::eof();
}
int HTTPSession::peek()
{
if (_pCurrent == _pEnd)
@ -147,7 +147,7 @@ int HTTPSession::peek()
return std::char_traits<char>::eof();
}
int HTTPSession::read(char* buffer, std::streamsize length)
{
if (_pCurrent < _pEnd)
@ -166,10 +166,17 @@ int HTTPSession::write(const char* buffer, std::streamsize length)
{
try
{
return _socket.sendBytes(buffer, (int) length);
if (_sendDataHooks)
_sendDataHooks->atStart((int) length);
int result = _socket.sendBytes(buffer, (int) length);
if (_sendDataHooks)
_sendDataHooks->atFinish(result);
return result;
}
catch (Poco::Exception& exc)
{
if (_sendDataHooks)
_sendDataHooks->atFail();
setException(exc);
throw;
}
@ -180,10 +187,17 @@ int HTTPSession::receive(char* buffer, int length)
{
try
{
return _socket.receiveBytes(buffer, length);
if (_receiveDataHooks)
_receiveDataHooks->atStart(length);
int result = _socket.receiveBytes(buffer, length);
if (_receiveDataHooks)
_receiveDataHooks->atFinish(result);
return result;
}
catch (Poco::Exception& exc)
{
if (_receiveDataHooks)
_receiveDataHooks->atFail();
setException(exc);
throw;
}

View File

@ -63,7 +63,7 @@ bool checkIsBrokenTimeout()
SocketImpl::SocketImpl():
_sockfd(POCO_INVALID_SOCKET),
_blocking(true),
_blocking(true),
_isBrokenTimeout(checkIsBrokenTimeout())
{
}
@ -82,7 +82,7 @@ SocketImpl::~SocketImpl()
close();
}
SocketImpl* SocketImpl::acceptConnection(SocketAddress& clientAddr)
{
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
@ -118,7 +118,7 @@ void SocketImpl::connect(const SocketAddress& address)
rc = ::connect(_sockfd, address.addr(), address.length());
}
while (rc != 0 && lastError() == POCO_EINTR);
if (rc != 0)
if (rc != 0)
{
int err = lastError();
error(err, address.toString());
@ -205,7 +205,7 @@ void SocketImpl::bind6(const SocketAddress& address, bool reuseAddress, bool reu
#if defined(POCO_HAVE_IPv6)
if (address.family() != SocketAddress::IPv6)
throw Poco::InvalidArgumentException("SocketAddress must be an IPv6 address");
if (_sockfd == POCO_INVALID_SOCKET)
{
init(address.af());
@ -226,11 +226,11 @@ void SocketImpl::bind6(const SocketAddress& address, bool reuseAddress, bool reu
#endif
}
void SocketImpl::listen(int backlog)
{
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
int rc = ::listen(_sockfd, backlog);
if (rc != 0) error();
}
@ -254,7 +254,7 @@ void SocketImpl::shutdownReceive()
if (rc != 0) error();
}
void SocketImpl::shutdownSend()
{
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
@ -263,7 +263,7 @@ void SocketImpl::shutdownSend()
if (rc != 0) error();
}
void SocketImpl::shutdown()
{
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
@ -318,7 +318,7 @@ int SocketImpl::receiveBytes(void* buffer, int length, int flags)
throw TimeoutException();
}
}
int rc;
do
{
@ -326,7 +326,7 @@ int SocketImpl::receiveBytes(void* buffer, int length, int flags)
rc = ::recv(_sockfd, reinterpret_cast<char*>(buffer), length, flags);
}
while (blocking && rc < 0 && lastError() == POCO_EINTR);
if (rc < 0)
if (rc < 0)
{
int err = lastError();
if ((err == POCO_EAGAIN || err == POCO_EWOULDBLOCK) && !blocking)
@ -364,7 +364,7 @@ int SocketImpl::receiveFrom(void* buffer, int length, SocketAddress& address, in
throw TimeoutException();
}
}
sockaddr_storage abuffer;
struct sockaddr* pSA = reinterpret_cast<struct sockaddr*>(&abuffer);
poco_socklen_t saLen = sizeof(abuffer);
@ -451,7 +451,7 @@ bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode)
}
while (rc < 0 && lastError() == POCO_EINTR);
if (rc < 0) error();
return rc > 0;
return rc > 0;
#else
@ -494,7 +494,7 @@ bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode)
}
while (rc < 0 && errorCode == POCO_EINTR);
if (rc < 0) error(errorCode);
return rc > 0;
return rc > 0;
#endif // POCO_HAVE_FD_POLL
}
@ -504,13 +504,13 @@ bool SocketImpl::poll(const Poco::Timespan& timeout, int mode)
Poco::Timespan remainingTime(timeout);
return pollImpl(remainingTime, mode);
}
void SocketImpl::setSendBufferSize(int size)
{
setOption(SOL_SOCKET, SO_SNDBUF, size);
}
int SocketImpl::getSendBufferSize()
{
int result;
@ -524,7 +524,7 @@ void SocketImpl::setReceiveBufferSize(int size)
setOption(SOL_SOCKET, SO_RCVBUF, size);
}
int SocketImpl::getReceiveBufferSize()
{
int result;
@ -570,7 +570,7 @@ Poco::Timespan SocketImpl::getReceiveTimeout()
return result;
}
SocketAddress SocketImpl::address()
{
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
@ -581,7 +581,7 @@ SocketAddress SocketImpl::address()
int rc = ::getsockname(_sockfd, pSA, &saLen);
if (rc == 0)
return SocketAddress(pSA, saLen);
else
else
error();
return SocketAddress();
}

View File

@ -68,7 +68,10 @@ public:
if (data().isEqualTo(to.data()))
counter += to.counter;
else if (!data().has() || counter < to.counter)
{
data().set(to.data(), arena);
counter = to.counter - counter;
}
else
counter -= to.counter;
}

View File

@ -75,9 +75,9 @@
M(GlobalThread, "Number of threads in global thread pool.") \
M(GlobalThreadActive, "Number of threads in global thread pool running a task.") \
M(GlobalThreadScheduled, "Number of queued or active jobs in global thread pool.") \
M(LocalThread, "Number of threads in local thread pools. The threads in local thread pools are taken from the global thread pool.") \
M(LocalThreadActive, "Number of threads in local thread pools running a task.") \
M(LocalThreadScheduled, "Number of queued or active jobs in local thread pools.") \
M(LocalThread, "Obsolete. Number of threads in local thread pools. The threads in local thread pools are taken from the global thread pool.") \
M(LocalThreadActive, "Obsolete. Number of threads in local thread pools running a task.") \
M(LocalThreadScheduled, "Obsolete. Number of queued or active jobs in local thread pools.") \
M(MergeTreeDataSelectExecutorThreads, "Number of threads in the MergeTreeDataSelectExecutor thread pool.") \
M(MergeTreeDataSelectExecutorThreadsActive, "Number of threads in the MergeTreeDataSelectExecutor thread pool running a task.") \
M(MergeTreeDataSelectExecutorThreadsScheduled, "Number of queued or active jobs in the MergeTreeDataSelectExecutor thread pool.") \
@ -292,6 +292,9 @@
M(DistrCacheWriteRequests, "Number of executed Write requests to Distributed Cache") \
M(DistrCacheServerConnections, "Number of open connections to ClickHouse server from Distributed Cache") \
\
M(SchedulerIOReadScheduled, "Number of IO reads are being scheduled currently") \
M(SchedulerIOWriteScheduled, "Number of IO writes are being scheduled currently") \
\
M(StorageConnectionsStored, "Total count of sessions stored in the session pool for storages") \
M(StorageConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for storages") \
\

View File

@ -113,6 +113,56 @@ std::string_view CurrentThread::getQueryId()
return current_thread->getQueryId();
}
void CurrentThread::attachReadResource(ResourceLink link)
{
if (unlikely(!current_thread))
return;
if (current_thread->read_resource_link)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to read resource", std::to_string(getThreadId()));
current_thread->read_resource_link = link;
}
void CurrentThread::detachReadResource()
{
if (unlikely(!current_thread))
return;
if (!current_thread->read_resource_link)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to read resource", std::to_string(getThreadId()));
current_thread->read_resource_link.reset();
}
ResourceLink CurrentThread::getReadResourceLink()
{
if (unlikely(!current_thread))
return {};
return current_thread->read_resource_link;
}
void CurrentThread::attachWriteResource(ResourceLink link)
{
if (unlikely(!current_thread))
return;
if (current_thread->write_resource_link)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to write resource", std::to_string(getThreadId()));
current_thread->write_resource_link = link;
}
void CurrentThread::detachWriteResource()
{
if (unlikely(!current_thread))
return;
if (!current_thread->write_resource_link)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to write resource", std::to_string(getThreadId()));
current_thread->write_resource_link.reset();
}
ResourceLink CurrentThread::getWriteResourceLink()
{
if (unlikely(!current_thread))
return {};
return current_thread->write_resource_link;
}
MemoryTracker * CurrentThread::getUserMemoryTracker()
{
if (unlikely(!current_thread))

View File

@ -2,6 +2,7 @@
#include <Interpreters/Context_fwd.h>
#include <Common/ThreadStatus.h>
#include <Common/Scheduler/ResourceLink.h>
#include <memory>
#include <string>
@ -23,7 +24,6 @@ class QueryStatus;
struct Progress;
class InternalTextLogsQueue;
/** Collection of static methods to work with thread-local objects.
* Allows to attach and detach query/process (thread group) to a thread
* (to calculate query-related metrics and to allow to obtain query-related data from a thread).
@ -92,6 +92,14 @@ public:
static std::string_view getQueryId();
// For IO Scheduling
static void attachReadResource(ResourceLink link);
static void detachReadResource();
static ResourceLink getReadResourceLink();
static void attachWriteResource(ResourceLink link);
static void detachWriteResource();
static ResourceLink getWriteResourceLink();
/// Initializes query with current thread as master thread in constructor, and detaches it in destructor
struct QueryScope : private boost::noncopyable
{
@ -102,6 +110,39 @@ public:
void logPeakMemoryUsage();
bool log_peak_memory_usage_in_destructor = true;
};
/// Scoped attach/detach of IO resource links
struct IOScope : private boost::noncopyable
{
explicit IOScope(ResourceLink read_resource_link, ResourceLink write_resource_link)
{
if (read_resource_link)
{
attachReadResource(read_resource_link);
read_attached = true;
}
if (write_resource_link)
{
attachWriteResource(write_resource_link);
write_attached = true;
}
}
explicit IOScope(const IOSchedulingSettings & settings)
: IOScope(settings.read_resource_link, settings.write_resource_link)
{}
~IOScope()
{
if (read_attached)
detachReadResource();
if (write_attached)
detachWriteResource();
}
bool read_attached = false;
bool write_attached = false;
};
};
}

View File

@ -2,6 +2,7 @@
#include <Common/HostResolvePool.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <Common/CurrentMetrics.h>
#include <Common/logger_useful.h>
#include <Common/Exception.h>
@ -9,6 +10,7 @@
#include <Common/ProxyConfiguration.h>
#include <Common/MemoryTrackerSwitcher.h>
#include <Common/SipHash.h>
#include <Common/Scheduler/ResourceGuard.h>
#include <Common/proxyConfigurationToPocoProxyConfig.h>
#include <Poco/Net/HTTPChunkedStream.h>
@ -236,6 +238,59 @@ public:
};
// Session data hooks implementation for integration with resource scheduler.
// Hooks are created per every request-response pair and are registered/unregistered in HTTP session.
// * `atStart()` send resource request to the scheduler every time HTTP session is going to send or receive
// data to/from socket. `start()` waits for the scheduler confirmation. This way scheduler might
// throttle and/or schedule socket data streams.
// * `atFinish()` hook is called on successful socket read/write operation.
// It informs the scheduler that operation is complete, which allows the scheduler to control the total
// amount of in-flight bytes and/or operations.
// * `atFail()` hook is called on failure of socket operation. The purpose is to correct the amount of bytes
// passed through the scheduler queue to ensure fair bandwidth allocation even in presence of errors.
struct ResourceGuardSessionDataHooks : public Poco::Net::IHTTPSessionDataHooks
{
ResourceGuardSessionDataHooks(ResourceLink link_, const ResourceGuard::Metrics * metrics, LoggerPtr log_, const String & method, const String & uri)
: link(link_)
, log(log_)
, http_request(method + " " + uri)
{
request.metrics = metrics;
chassert(link);
}
~ResourceGuardSessionDataHooks() override
{
request.assertFinished(); // Never destruct with an active request
}
void atStart(int bytes) override
{
Stopwatch timer;
request.enqueue(bytes, link);
request.wait();
timer.stop();
if (timer.elapsedMilliseconds() >= 5000)
LOG_INFO(log, "Resource request took too long to finish: {} ms for {}", timer.elapsedMilliseconds(), http_request);
}
void atFinish(int bytes) override
{
request.finish(bytes, link);
}
void atFail() override
{
request.finish(0, link);
}
ResourceLink link;
ResourceGuard::Request request;
LoggerPtr log;
String http_request;
};
// EndpointConnectionPool manage connections to the endpoint
// Features:
// - it uses HostResolver for address selecting. See Common/HostResolver.h for more info.
@ -246,8 +301,6 @@ public:
// - `Session::reconnect()` uses the pool as well
// - comprehensive sensors
// - session is reused according its inner state, automatically
template <class Session>
class EndpointConnectionPool : public std::enable_shared_from_this<EndpointConnectionPool<Session>>, public IExtendedPool
{
@ -337,6 +390,13 @@ private:
std::ostream & sendRequest(Poco::Net::HTTPRequest & request) override
{
auto idle = idleTime();
// Set data hooks for IO scheduling
if (ResourceLink link = CurrentThread::getReadResourceLink())
Session::setReceiveDataHooks(std::make_shared<ResourceGuardSessionDataHooks>(link, ResourceGuard::Metrics::getIORead(), log, request.getMethod(), request.getURI()));
if (ResourceLink link = CurrentThread::getWriteResourceLink())
Session::setSendDataHooks(std::make_shared<ResourceGuardSessionDataHooks>(link, ResourceGuard::Metrics::getIOWrite(), log, request.getMethod(), request.getURI()));
std::ostream & result = Session::sendRequest(request);
result.exceptions(std::ios::badbit);
@ -393,6 +453,8 @@ private:
}
}
response_stream = nullptr;
Session::setSendDataHooks();
Session::setReceiveDataHooks();
group->atConnectionDestroy();

View File

@ -86,6 +86,20 @@
M(NetworkReceiveBytes, "Total number of bytes received from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkSendBytes, "Total number of bytes send to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
\
M(GlobalThreadPoolExpansions, "Counts the total number of times new threads have been added to the global thread pool. This metric indicates the frequency of expansions in the global thread pool to accommodate increased processing demands.") \
M(GlobalThreadPoolShrinks, "Counts the total number of times the global thread pool has shrunk by removing threads. This occurs when the number of idle threads exceeds max_thread_pool_free_size, indicating adjustments in the global thread pool size in response to decreased thread utilization.") \
M(GlobalThreadPoolThreadCreationMicroseconds, "Total time spent waiting for new threads to start.") \
M(GlobalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the global thread pool.") \
M(GlobalThreadPoolJobs, "Counts the number of jobs that have been pushed to the global thread pool.") \
M(GlobalThreadPoolJobWaitTimeMicroseconds, "Measures the elapsed time from when a job is scheduled in the thread pool to when it is picked up for execution by a worker thread. This metric helps identify delays in job processing, indicating the responsiveness of the thread pool to new tasks.") \
M(LocalThreadPoolExpansions, "Counts the total number of times threads have been borrowed from the global thread pool to expand local thread pools.") \
M(LocalThreadPoolShrinks, "Counts the total number of times threads have been returned to the global thread pool from local thread pools.") \
M(LocalThreadPoolThreadCreationMicroseconds, "Total time local thread pools have spent waiting to borrow a thread from the global pool.") \
M(LocalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the local thread pools.") \
M(LocalThreadPoolJobs, "Counts the number of jobs that have been pushed to the local thread pools.") \
M(LocalThreadPoolBusyMicroseconds, "Total time threads have spent executing the actual work.") \
M(LocalThreadPoolJobWaitTimeMicroseconds, "Measures the elapsed time from when a job is scheduled in the thread pool to when it is picked up for execution by a worker thread. This metric helps identify delays in job processing, indicating the responsiveness of the thread pool to new tasks.") \
\
M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \
M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \
M(DiskS3PutRequestThrottlerCount, "Number of DiskS3 PUT, COPY, POST and LIST requests passed through throttler.") \
@ -106,6 +120,13 @@
M(PartsWithAppliedMutationsOnFly, "Total number of parts for which there was any mutation applied on fly") \
M(MutationsAppliedOnFlyInAllParts, "The sum of number of applied mutations on-fly for part among all read parts") \
\
M(SchedulerIOReadRequests, "Resource requests passed through scheduler for IO reads.") \
M(SchedulerIOReadBytes, "Bytes passed through scheduler for IO reads.") \
M(SchedulerIOReadWaitMicroseconds, "Total time a query was waiting on resource requests for IO reads.") \
M(SchedulerIOWriteRequests, "Resource requests passed through scheduler for IO writes.") \
M(SchedulerIOWriteBytes, "Bytes passed through scheduler for IO writes.") \
M(SchedulerIOWriteWaitMicroseconds, "Total time a query was waiting on resource requests for IO writes.") \
\
M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \
\
M(ReplicatedPartFetches, "Number of times a data part was downloaded from replica of a ReplicatedMergeTree table.") \

View File

@ -22,10 +22,13 @@ public:
{}
// Wrapper for `enqueueRequest()` that should be used to account for available resource budget
void enqueueRequestUsingBudget(ResourceRequest * request)
// Returns `estimated_cost` that should be passed later to `adjustBudget()`
[[ nodiscard ]] ResourceCost enqueueRequestUsingBudget(ResourceRequest * request)
{
request->cost = budget.ask(request->cost);
ResourceCost estimated_cost = request->cost;
request->cost = budget.ask(estimated_cost);
enqueueRequest(request);
return estimated_cost;
}
// Should be called to account for difference between real and estimated costs
@ -34,18 +37,6 @@ public:
budget.adjust(estimated_cost, real_cost);
}
// Adjust budget to account for extra consumption of `cost` resource units
void consumeBudget(ResourceCost cost)
{
adjustBudget(0, cost);
}
// Adjust budget to account for requested, but not consumed `cost` resource units
void accumulateBudget(ResourceCost cost)
{
adjustBudget(cost, 0);
}
/// Enqueue new request to be executed using underlying resource.
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void enqueueRequest(ResourceRequest * request) = 0;

View File

@ -232,12 +232,13 @@ struct ResourceTestManager : public ResourceTestBase
ResourceTestManager & t;
Guard(ResourceTestManager & t_, ResourceLink link_, ResourceCost cost)
: ResourceGuard(link_, cost, PostponeLocking)
: ResourceGuard(ResourceGuard::Metrics::getIOWrite(), link_, cost, Lock::Defer)
, t(t_)
{
t.onEnqueue(link);
lock();
t.onExecute(link);
consume(cost);
}
};
@ -310,8 +311,9 @@ struct ResourceTestManager : public ResourceTestBase
// NOTE: actually leader's request(s) make their own small busy period.
void blockResource(ResourceLink link)
{
ResourceGuard g(link, 1, ResourceGuard::PostponeLocking);
ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, 1, ResourceGuard::Lock::Defer);
g.lock();
g.consume(1);
// NOTE: at this point we assume resource to be blocked by single request (<max_requests>1</max_requests>)
busy_period.arrive_and_wait(); // (1) notify all followers that resource is blocked
busy_period.arrive_and_wait(); // (2) wait all followers to enqueue their requests
@ -320,10 +322,11 @@ struct ResourceTestManager : public ResourceTestBase
{
getLinkData(link).left += total_requests + 1;
busy_period.arrive_and_wait(); // (1) wait leader to block resource
ResourceGuard g(link, cost, ResourceGuard::PostponeLocking);
ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, cost, ResourceGuard::Lock::Defer);
onEnqueue(link);
busy_period.arrive_and_wait(); // (2) notify leader to unblock
g.lock();
g.consume(cost);
onExecute(link);
}
};

View File

@ -36,11 +36,16 @@ TEST(SchedulerDynamicResourceManager, Smoke)
for (int i = 0; i < 10; i++)
{
ResourceGuard gA(cA->get("res1"), ResourceGuard::PostponeLocking);
ResourceGuard gA(ResourceGuard::Metrics::getIOWrite(), cA->get("res1"), 1, ResourceGuard::Lock::Defer);
gA.lock();
gA.consume(1);
gA.unlock();
ResourceGuard gB(cB->get("res1"));
ResourceGuard gB(ResourceGuard::Metrics::getIOWrite(), cB->get("res1"));
gB.unlock();
ResourceGuard gC(ResourceGuard::Metrics::getIORead(), cB->get("res1"));
gB.consume(2);
}
}

View File

@ -1,11 +1,13 @@
#include <gtest/gtest.h>
#include <Common/Scheduler/SchedulerRoot.h>
#include <Common/Scheduler/Nodes/tests/ResourceTest.h>
#include <Common/Scheduler/SchedulerRoot.h>
#include <Common/randomSeed.h>
#include <barrier>
#include <future>
#include <pcg_random.hpp>
using namespace DB;
@ -22,6 +24,17 @@ struct ResourceTest : public ResourceTestBase
{
scheduler.stop(true);
}
std::mutex rng_mutex;
pcg64 rng{randomSeed()};
template <typename T>
T randomInt(T from, T to)
{
std::uniform_int_distribution<T> distribution(from, to);
std::lock_guard lock(rng_mutex);
return distribution(rng);
}
};
struct ResourceHolder
@ -109,26 +122,55 @@ TEST(SchedulerRoot, Smoke)
r2.registerResource();
{
ResourceGuard rg(a);
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), a);
EXPECT_TRUE(fc1->requests.contains(&rg.request));
rg.consume(1);
}
{
ResourceGuard rg(b);
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), b);
EXPECT_TRUE(fc1->requests.contains(&rg.request));
rg.consume(1);
}
{
ResourceGuard rg(c);
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), c);
EXPECT_TRUE(fc2->requests.contains(&rg.request));
rg.consume(1);
}
{
ResourceGuard rg(d);
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), d);
EXPECT_TRUE(fc2->requests.contains(&rg.request));
rg.consume(1);
}
}
TEST(SchedulerRoot, Budget)
{
ResourceTest t;
ResourceHolder r1(t);
r1.add<ConstraintTest>("/", "<max_requests>1</max_requests>");
r1.add<PriorityPolicy>("/prio");
auto a = r1.addQueue("/prio/A", "");
r1.registerResource();
ResourceCost total_real_cost = 0;
int total_requests = 10;
for (int i = 0 ; i < total_requests; i++)
{
ResourceCost est_cost = t.randomInt(1, 10);
ResourceCost real_cost = t.randomInt(0, 10);
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), a, est_cost);
rg.consume(real_cost);
total_real_cost += real_cost;
}
EXPECT_EQ(total_requests, a.queue->dequeued_requests);
EXPECT_EQ(total_real_cost, a.queue->dequeued_cost - a.queue->getBudget());
}
TEST(SchedulerRoot, Cancel)
{
ResourceTest t;

View File

@ -1,25 +0,0 @@
#include <Common/Scheduler/ISchedulerQueue.h>
#include <Common/Scheduler/ResourceLink.h>
#include <Common/Scheduler/ResourceRequest.h>
namespace DB
{
void ResourceLink::adjust(ResourceCost estimated_cost, ResourceCost real_cost) const
{
if (queue)
queue->adjustBudget(estimated_cost, real_cost);
}
void ResourceLink::consumed(ResourceCost cost) const
{
if (queue)
queue->consumeBudget(cost);
}
void ResourceLink::accumulate(DB::ResourceCost cost) const
{
if (queue)
queue->accumulateBudget(cost);
}
}

View File

@ -7,10 +7,30 @@
#include <Common/Scheduler/ResourceRequest.h>
#include <Common/Scheduler/ResourceLink.h>
#include <Common/CurrentThread.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <condition_variable>
#include <mutex>
namespace ProfileEvents
{
extern const Event SchedulerIOReadRequests;
extern const Event SchedulerIOReadBytes;
extern const Event SchedulerIOReadWaitMicroseconds;
extern const Event SchedulerIOWriteRequests;
extern const Event SchedulerIOWriteBytes;
extern const Event SchedulerIOWriteWaitMicroseconds;
}
namespace CurrentMetrics
{
extern const Metric SchedulerIOReadScheduled;
extern const Metric SchedulerIOWriteScheduled;
}
namespace DB
{
@ -22,12 +42,42 @@ namespace DB
class ResourceGuard
{
public:
enum ResourceGuardCtor
enum class Lock
{
LockStraightAway, /// Locks inside constructor (default)
Default, /// Locks inside constructor
// WARNING: Only for tests. It is not exception-safe because `lock()` must be called after construction.
PostponeLocking /// Don't lock in constructor, but send request
Defer /// Don't lock in constructor, but send request
};
struct Metrics
{
const ProfileEvents::Event requests = ProfileEvents::end();
const ProfileEvents::Event cost = ProfileEvents::end();
const ProfileEvents::Event wait_microseconds = ProfileEvents::end();
const CurrentMetrics::Metric scheduled_count = CurrentMetrics::end();
static const Metrics * getIORead()
{
static Metrics metrics{
.requests = ProfileEvents::SchedulerIOReadRequests,
.cost = ProfileEvents::SchedulerIOReadBytes,
.wait_microseconds = ProfileEvents::SchedulerIOReadWaitMicroseconds,
.scheduled_count = CurrentMetrics::SchedulerIOReadScheduled
};
return &metrics;
}
static const Metrics * getIOWrite()
{
static Metrics metrics{
.requests = ProfileEvents::SchedulerIOWriteRequests,
.cost = ProfileEvents::SchedulerIOWriteBytes,
.wait_microseconds = ProfileEvents::SchedulerIOWriteWaitMicroseconds,
.scheduled_count = CurrentMetrics::SchedulerIOWriteScheduled
};
return &metrics;
}
};
enum RequestState
@ -46,60 +96,74 @@ public:
chassert(state == Finished);
state = Enqueued;
ResourceRequest::reset(cost_);
link_.queue->enqueueRequestUsingBudget(this);
estimated_cost = link_.queue->enqueueRequestUsingBudget(this); // NOTE: it modifies `cost` and enqueues request
}
// This function is executed inside scheduler thread and wakes thread issued this `request`.
// That thread will continue execution and do real consumption of requested resource synchronously.
void execute() override
{
{
std::unique_lock lock(mutex);
chassert(state == Enqueued);
state = Dequeued;
}
std::unique_lock lock(mutex);
chassert(state == Enqueued);
state = Dequeued;
dequeued_cv.notify_one();
}
void wait()
{
CurrentMetrics::Increment scheduled(metrics->scheduled_count);
auto timer = CurrentThread::getProfileEvents().timer(metrics->wait_microseconds);
std::unique_lock lock(mutex);
dequeued_cv.wait(lock, [this] { return state == Dequeued; });
}
void finish()
void finish(ResourceCost real_cost_, ResourceLink link_)
{
// lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread
chassert(state == Dequeued);
state = Finished;
if (estimated_cost != real_cost_)
link_.queue->adjustBudget(estimated_cost, real_cost_);
ResourceRequest::finish();
ProfileEvents::increment(metrics->requests);
ProfileEvents::increment(metrics->cost, real_cost_);
}
static Request & local()
void assertFinished()
{
// lock(mutex) is not required because `Finished` request cannot be used by the scheduler thread
chassert(state == Finished);
}
static Request & local(const Metrics * metrics)
{
// Since single thread cannot use more than one resource request simultaneously,
// we can reuse thread-local request to avoid allocations
static thread_local Request instance;
instance.metrics = metrics;
return instance;
}
const Metrics * metrics = nullptr; // Must be initialized before use
private:
ResourceCost estimated_cost = 0; // Stores initial `cost` value in case budget was used to modify it
std::mutex mutex;
std::condition_variable dequeued_cv;
RequestState state = Finished;
};
/// Creates pending request for resource; blocks while resource is not available (unless `PostponeLocking`)
explicit ResourceGuard(ResourceLink link_, ResourceCost cost = 1, ResourceGuardCtor ctor = LockStraightAway)
/// Creates pending request for resource; blocks while resource is not available (unless `Lock::Defer`)
explicit ResourceGuard(const Metrics * metrics, ResourceLink link_, ResourceCost cost = 1, ResourceGuard::Lock type = ResourceGuard::Lock::Default)
: link(link_)
, request(Request::local())
, request(Request::local(metrics))
{
if (cost == 0)
link.queue = nullptr; // Ignore zero-cost requests
else if (link.queue)
link.reset(); // Ignore zero-cost requests
else if (link)
{
request.enqueue(cost, link);
if (ctor == LockStraightAway)
if (type == Lock::Default)
request.wait();
}
}
@ -112,22 +176,29 @@ public:
/// Blocks until resource is available
void lock()
{
if (link.queue)
if (link)
request.wait();
}
/// Report resource consumption has finished
void unlock()
void consume(ResourceCost cost)
{
if (link.queue)
real_cost += cost;
}
/// Report resource consumption has finished
void unlock(ResourceCost consumed = 0)
{
consume(consumed);
if (link)
{
request.finish();
link.queue = nullptr;
request.finish(real_cost, link);
link.reset();
}
}
ResourceLink link;
Request & request;
ResourceCost real_cost = 0;
};
}

View File

@ -13,13 +13,28 @@ using ResourceCost = Int64;
struct ResourceLink
{
ISchedulerQueue * queue = nullptr;
bool operator==(const ResourceLink &) const = default;
explicit operator bool() const { return queue != nullptr; }
void adjust(ResourceCost estimated_cost, ResourceCost real_cost) const;
void reset()
{
queue = nullptr;
}
};
void consumed(ResourceCost cost) const;
/*
* Everything required for IO scheduling.
* Note that raw pointer are stored inside, so make sure that `ClassifierPtr` that produced
* resource links will outlive them. Usually classifier is stored in query `Context`.
*/
struct IOSchedulingSettings
{
ResourceLink read_resource_link;
ResourceLink write_resource_link;
void accumulate(ResourceCost cost) const;
bool operator==(const IOSchedulingSettings &) const = default;
explicit operator bool() const { return read_resource_link && write_resource_link; }
};
}

View File

@ -45,7 +45,7 @@ constexpr ResourceCost ResourceCostMax = std::numeric_limits<int>::max();
class ResourceRequest : public boost::intrusive::list_base_hook<>
{
public:
/// Cost of request execution; should be filled before request enqueueing.
/// Cost of request execution; should be filled before request enqueueing and remain constant until `finish()`.
/// NOTE: If cost is not known in advance, ResourceBudget should be used (note that every ISchedulerQueue has it)
ResourceCost cost;

View File

@ -1,4 +1,5 @@
#include <Common/ThreadPool.h>
#include <Common/ProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/Exception.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
@ -27,6 +28,25 @@ namespace CurrentMetrics
extern const Metric GlobalThreadScheduled;
}
namespace ProfileEvents
{
extern const Event GlobalThreadPoolExpansions;
extern const Event GlobalThreadPoolShrinks;
extern const Event GlobalThreadPoolThreadCreationMicroseconds;
extern const Event GlobalThreadPoolLockWaitMicroseconds;
extern const Event GlobalThreadPoolJobs;
extern const Event GlobalThreadPoolJobWaitTimeMicroseconds;
extern const Event LocalThreadPoolExpansions;
extern const Event LocalThreadPoolShrinks;
extern const Event LocalThreadPoolThreadCreationMicroseconds;
extern const Event LocalThreadPoolLockWaitMicroseconds;
extern const Event LocalThreadPoolJobs;
extern const Event LocalThreadPoolBusyMicroseconds;
extern const Event LocalThreadPoolJobWaitTimeMicroseconds;
}
class JobWithPriority
{
public:
@ -40,6 +60,7 @@ public:
/// Call stacks of all jobs' schedulings leading to this one
std::vector<StackTrace::FramePointers> frame_pointers;
bool enable_job_stack_trace = false;
Stopwatch job_create_time;
JobWithPriority(
Job job_, Priority priority_, CurrentMetrics::Metric metric,
@ -59,6 +80,13 @@ public:
{
return priority > rhs.priority; // Reversed for `priority_queue` max-heap to yield minimum value (i.e. highest priority) first
}
UInt64 elapsedMicroseconds() const
{
return job_create_time.elapsedMicroseconds();
}
};
static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool";
@ -180,14 +208,18 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
};
{
Stopwatch watch;
std::unique_lock lock(mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());
if (CannotAllocateThreadFaultInjector::injectFault())
return on_error("fault injected");
auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };
if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero.
if (wait_microseconds) /// Check for optional. Condition is true if the optional is set. Even if the value is zero.
{
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
return on_error(fmt::format("no free thread (timeout={})", *wait_microseconds));
@ -216,7 +248,13 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
try
{
Stopwatch watch2;
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch2.elapsedMicroseconds());
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);
}
catch (...)
{
@ -239,6 +277,8 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
/// Wake up a free thread to run the new job.
new_job_or_shutdown.notify_one();
ProfileEvents::increment(std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobs : ProfileEvents::LocalThreadPoolJobs);
return static_cast<ReturnType>(true);
}
@ -262,7 +302,14 @@ void ThreadPoolImpl<Thread>::startNewThreadsNoLock()
try
{
Stopwatch watch;
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch.elapsedMicroseconds());
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);
}
catch (...)
{
@ -293,7 +340,11 @@ void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, Priority priority, uint64_
template <typename Thread>
void ThreadPoolImpl<Thread>::wait()
{
Stopwatch watch;
std::unique_lock lock(mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());
/// Signal here just in case.
/// If threads are waiting on condition variables, but there are some jobs in the queue
/// then it will prevent us from deadlock.
@ -334,7 +385,11 @@ void ThreadPoolImpl<Thread>::finalize()
/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
for (auto & thread : threads)
{
thread.join();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}
threads.clear();
}
@ -391,7 +446,11 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
std::optional<JobWithPriority> job_data;
{
Stopwatch watch;
std::unique_lock lock(mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());
// Finish with previous job if any
if (job_is_done)
@ -424,6 +483,8 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
{
thread_it->detach();
threads.erase(thread_it);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}
return;
}
@ -433,6 +494,10 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
job_data = std::move(const_cast<JobWithPriority &>(jobs.top()));
jobs.pop();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobWaitTimeMicroseconds : ProfileEvents::LocalThreadPoolJobWaitTimeMicroseconds,
job_data->elapsedMicroseconds());
/// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them.
if (shutdown)
{
@ -459,7 +524,22 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
CurrentMetrics::Increment metric_active_pool_threads(metric_active_threads);
job_data->job();
if constexpr (!std::is_same_v<Thread, std::thread>)
{
Stopwatch watch;
job_data->job();
// This metric is less relevant for the global thread pool, as it would show large values (time while
// a thread was used by local pools) and increment only when local pools are destroyed.
//
// In cases where global pool threads are used directly (without a local thread pool), distinguishing
// them is difficult.
ProfileEvents::increment(ProfileEvents::LocalThreadPoolBusyMicroseconds, watch.elapsedMicroseconds());
}
else
{
job_data->job();
}
if (thread_trace_context.root_span.isTraceEnabled())
{

View File

@ -131,7 +131,7 @@ private:
bool threads_remove_themselves = true;
const bool shutdown_on_exception = true;
boost::heap::priority_queue<JobWithPriority> jobs;
boost::heap::priority_queue<JobWithPriority,boost::heap::stable<true>> jobs;
std::list<Thread> threads;
std::exception_ptr first_exception;
std::stack<OnDestroyCallback> on_destroy_callbacks;

View File

@ -7,11 +7,11 @@
#include <Common/MemoryTracker.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <Common/Scheduler/ResourceLink.h>
#include <boost/noncopyable.hpp>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <unordered_set>
@ -188,6 +188,10 @@ public:
Progress progress_in;
Progress progress_out;
/// IO scheduling
ResourceLink read_resource_link;
ResourceLink write_resource_link;
private:
/// Group of threads, to which this thread attached
ThreadGroupPtr thread_group;

View File

@ -8,6 +8,7 @@
#include <IO/ReadBufferFromString.h>
#include <Common/logger_useful.h>
#include <Common/Throttler.h>
#include <Common/Scheduler/ResourceGuard.h>
#include <base/sleep.h>
#include <Common/ProfileEvents.h>
#include <IO/SeekableReadBuffer.h>
@ -113,7 +114,9 @@ bool ReadBufferFromAzureBlobStorage::nextImpl()
{
try
{
ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, to_read_bytes);
bytes_read = data_stream->ReadToCount(reinterpret_cast<uint8_t *>(data_ptr), to_read_bytes);
rlock.unlock(bytes_read); // Do not hold resource under bandwidth throttler
if (read_settings.remote_throttler)
read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
break;

View File

@ -101,15 +101,13 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
{
try
{
ResourceGuard rlock(write_settings.resource_link, cost); // Note that zero-cost requests are ignored
ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, cost); // Note that zero-cost requests are ignored
func();
rlock.unlock(cost);
break;
}
catch (const Azure::Core::RequestFailedException & e)
{
if (cost)
write_settings.resource_link.accumulate(cost); // Accumulate resource for later use, because we have failed to consume it
if (i == num_tries - 1 || !isRetryableAzureException(e))
throw;
@ -117,8 +115,6 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
}
catch (...)
{
if (cost)
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
throw;
}
}

View File

@ -461,14 +461,17 @@ DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
}
template <class Settings>
static inline Settings updateResourceLink(const Settings & settings, const String & resource_name)
static inline Settings updateIOSchedulingSettings(const Settings & settings, const String & read_resource_name, const String & write_resource_name)
{
if (resource_name.empty())
if (read_resource_name.empty() && write_resource_name.empty())
return settings;
if (auto query_context = CurrentThread::getQueryContext())
{
Settings result(settings);
result.resource_link = query_context->getWorkloadClassifier()->get(resource_name);
if (!read_resource_name.empty())
result.io_scheduling.read_resource_link = query_context->getWorkloadClassifier()->get(read_resource_name);
if (!write_resource_name.empty())
result.io_scheduling.write_resource_link = query_context->getWorkloadClassifier()->get(write_resource_name);
return result;
}
return settings;
@ -500,7 +503,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
return object_storage->readObjects(
storage_objects,
updateResourceLink(settings, getReadResourceName()),
updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName()),
read_hint,
file_size);
}
@ -513,7 +516,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
{
LOG_TEST(log, "Write file: {}", path);
WriteSettings write_settings = updateResourceLink(settings, getWriteResourceName());
WriteSettings write_settings = updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName());
auto transaction = createObjectStorageTransaction();
return transaction->writeFile(path, buf_size, mode, write_settings);
}

View File

@ -6,7 +6,6 @@
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromS3.h>
#include <Common/Scheduler/ResourceGuard.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/S3/Requests.h>
@ -423,22 +422,13 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3InitMicroseconds);
// We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below
constexpr ResourceCost estimated_cost = 1;
ResourceGuard rlock(read_settings.resource_link, estimated_cost);
CurrentThread::IOScope io_scope(read_settings.io_scheduling);
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
rlock.unlock();
if (outcome.IsSuccess())
{
ResourceCost bytes_read = outcome.GetResult().GetContentLength();
read_settings.resource_link.adjust(estimated_cost, bytes_read);
return outcome.GetResultWithOwnership();
}
else
{
read_settings.resource_link.accumulate(estimated_cost);
const auto & error = outcome.GetError();
throw S3Exception(error.GetMessage(), error.GetErrorType());
}

View File

@ -118,8 +118,7 @@ struct ReadSettings
ThrottlerPtr remote_throttler;
ThrottlerPtr local_throttler;
// Resource to be used during reading
ResourceLink resource_link;
IOSchedulingSettings io_scheduling;
size_t http_max_tries = 10;
size_t http_retry_initial_backoff_ms = 100;

View File

@ -11,7 +11,6 @@
#include <Common/Throttler.h>
#include <Interpreters/Cache/FileCache.h>
#include <Common/Scheduler/ResourceGuard.h>
#include <IO/WriteHelpers.h>
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
@ -558,12 +557,11 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data)
auto & request = std::get<0>(*worker_data);
ResourceCost cost = request.GetContentLength();
ResourceGuard rlock(write_settings.resource_link, cost);
CurrentThread::IOScope io_scope(write_settings.io_scheduling);
Stopwatch watch;
auto outcome = client_ptr->UploadPart(request);
watch.stop();
rlock.unlock(); // Avoid acquiring other locks under resource lock
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
@ -577,7 +575,6 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data)
if (!outcome.IsSuccess())
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
}
@ -715,12 +712,11 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data
if (client_ptr->isClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskS3PutObject);
ResourceCost cost = request.GetContentLength();
ResourceGuard rlock(write_settings.resource_link, cost);
CurrentThread::IOScope io_scope(write_settings.io_scheduling);
Stopwatch watch;
auto outcome = client_ptr->PutObject(request);
watch.stop();
rlock.unlock();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (blob_log)
@ -734,7 +730,6 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data
}
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
{

View File

@ -13,8 +13,7 @@ struct WriteSettings
ThrottlerPtr remote_throttler;
ThrottlerPtr local_throttler;
// Resource to be used during reading
ResourceLink resource_link;
IOSchedulingSettings io_scheduling;
/// Filesystem cache settings
bool enable_filesystem_cache_on_write_operations = false;

View File

@ -119,27 +119,16 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
return false;
}
ResourceGuard rlock(read_settings.resource_link, num_bytes_to_read);
int bytes_read;
try
{
bytes_read = hdfsRead(fs.get(), fin, internal_buffer.begin(), safe_cast<int>(num_bytes_to_read));
}
catch (...)
{
read_settings.resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure
throw;
}
rlock.unlock();
ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, num_bytes_to_read);
int bytes_read = hdfsRead(fs.get(), fin, internal_buffer.begin(), safe_cast<int>(num_bytes_to_read));
rlock.unlock(std::max(0, bytes_read));
if (bytes_read < 0)
{
read_settings.resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure
throw Exception(ErrorCodes::NETWORK_ERROR,
"Fail to read from HDFS: {}, file path: {}. Error: {}",
hdfs_uri, hdfs_file_path, std::string(hdfsGetLastError()));
}
read_settings.resource_link.adjust(num_bytes_to_read, bytes_read);
if (bytes_read)
{

View File

@ -66,25 +66,12 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
int write(const char * start, size_t size)
{
ResourceGuard rlock(write_settings.resource_link, size);
int bytes_written;
try
{
bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast<int>(size));
}
catch (...)
{
write_settings.resource_link.accumulate(size); // We assume no resource was used in case of failure
throw;
}
rlock.unlock();
ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, size);
int bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast<int>(size));
rlock.unlock(std::max(0, bytes_written));
if (bytes_written < 0)
{
write_settings.resource_link.accumulate(size); // We assume no resource was used in case of failure
throw Exception(ErrorCodes::NETWORK_ERROR, "Fail to write HDFS file: {} {}", hdfs_uri, std::string(hdfsGetLastError()));
}
write_settings.resource_link.adjust(size, bytes_written);
if (write_settings.remote_throttler)
write_settings.remote_throttler->add(bytes_written, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds);

View File

@ -288,7 +288,7 @@ def generate_description(item: PullRequest, repo: Repository) -> Optional[Descri
# Normalize bug fixes
if (
re.match(
r".*(?i)bug\Wfix",
r"(?i).*bug\Wfix",
category,
)
# Map "Critical Bug Fix" to "Bug fix" category for changelog

View File

@ -67,6 +67,11 @@ def test_aggregate_states(start_cluster):
f"select hex(initializeAggregation('{function_name}State', 'foo'))"
).strip()
def get_final_value_unhex(node, function_name, value):
return node.query(
f"select finalizeAggregation(unhex('{value}')::AggregateFunction({function_name}, String))"
).strip()
for aggregate_function in aggregate_functions:
logging.info("Checking %s", aggregate_function)
@ -99,13 +104,39 @@ def test_aggregate_states(start_cluster):
upstream_state = get_aggregate_state_hex(upstream, aggregate_function)
if upstream_state != backward_state:
logging.info(
"Failed %s, %s (backward) != %s (upstream)",
aggregate_function,
backward_state,
upstream_state,
)
failed += 1
allowed_changes_if_result_is_the_same = ["anyHeavy"]
if aggregate_function in allowed_changes_if_result_is_the_same:
backward_final_from_upstream = get_final_value_unhex(
backward, aggregate_function, upstream_state
)
upstream_final_from_backward = get_final_value_unhex(
upstream, aggregate_function, backward_state
)
if backward_final_from_upstream == upstream_final_from_backward:
logging.info(
"OK %s (but different intermediate states)", aggregate_function
)
passed += 1
else:
logging.error(
"Failed %s, Intermediate: %s (backward) != %s (upstream). Final from intermediate: %s (backward from upstream state) != %s (upstream from backward state)",
aggregate_function,
backward_state,
upstream_state,
backward_final_from_upstream,
upstream_final_from_backward,
)
failed += 1
else:
logging.error(
"Failed %s, %s (backward) != %s (upstream)",
aggregate_function,
backward_state,
upstream_state,
)
failed += 1
else:
logging.info("OK %s", aggregate_function)
passed += 1

View File

@ -69,6 +69,124 @@ def update_workloads_config(**settings):
node.query("system reload config")
def check_profile_event_for_query(workload, profile_event, amount=1):
node.query("system flush logs")
query_pattern = f"workload='{workload}'".replace("'", "\\'")
assert (
int(
node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query ilike '%{query_pattern}%' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
)
)
== amount
)
def test_s3_resource_request_granularity():
node.query(
f"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE), value String CODEC(NONE)) engine=MergeTree() order by key settings min_bytes_for_wide_part=1e9, storage_policy='s3';
"""
)
total_bytes = 50000000 # Approximate data size
max_bytes_per_request = 2000000 # Should be ~1MB or less in general
min_bytes_per_request = 6000 # Small requests are ok, but we don't want hurt performance with too often resource requests
writes_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/admin'"
).strip()
)
write_bytes_before = int(
node.query(
f"select dequeued_cost from system.scheduler where resource='network_write' and path='/prio/admin'"
).strip()
)
write_budget_before = int(
node.query(
f"select budget from system.scheduler where resource='network_write' and path='/prio/admin'"
).strip()
)
node.query(
f"insert into data select number, randomString(10000000) from numbers(5) SETTINGS workload='admin'"
)
writes_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/admin'"
).strip()
)
write_bytes_after = int(
node.query(
f"select dequeued_cost from system.scheduler where resource='network_write' and path='/prio/admin'"
).strip()
)
write_budget_after = int(
node.query(
f"select budget from system.scheduler where resource='network_write' and path='/prio/admin'"
).strip()
)
write_requests = writes_after - writes_before
write_bytes = (write_bytes_after - write_bytes_before) - (
write_budget_after - write_budget_before
)
assert write_bytes > 1.0 * total_bytes
assert write_bytes < 1.05 * total_bytes
assert write_bytes / write_requests < max_bytes_per_request
assert write_bytes / write_requests > min_bytes_per_request
check_profile_event_for_query("admin", "SchedulerIOWriteRequests", write_requests)
check_profile_event_for_query("admin", "SchedulerIOWriteBytes", write_bytes)
node.query(f"optimize table data final")
reads_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/admin'"
).strip()
)
read_bytes_before = int(
node.query(
f"select dequeued_cost from system.scheduler where resource='network_read' and path='/prio/admin'"
).strip()
)
read_budget_before = int(
node.query(
f"select budget from system.scheduler where resource='network_read' and path='/prio/admin'"
).strip()
)
node.query(
f"select count() from data where not ignore(*) SETTINGS workload='admin'"
)
reads_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/admin'"
).strip()
)
read_bytes_after = int(
node.query(
f"select dequeued_cost from system.scheduler where resource='network_read' and path='/prio/admin'"
).strip()
)
read_budget_after = int(
node.query(
f"select budget from system.scheduler where resource='network_read' and path='/prio/admin'"
).strip()
)
read_bytes = (read_bytes_after - read_bytes_before) - (
read_budget_after - read_budget_before
)
read_requests = reads_after - reads_before
assert read_bytes > 1.0 * total_bytes
assert read_bytes < 1.05 * total_bytes
assert read_bytes / read_requests < max_bytes_per_request
assert read_bytes / read_requests > min_bytes_per_request
check_profile_event_for_query("admin", "SchedulerIOReadRequests", read_requests)
check_profile_event_for_query("admin", "SchedulerIOReadBytes", read_bytes)
def test_s3_disk():
node.query(
f"""

View File

@ -0,0 +1 @@
a

View File

@ -0,0 +1,4 @@
DROP TABLE IF EXISTS t;
CREATE TABLE t (letter String) ENGINE=MergeTree order by () partition by letter;
INSERT INTO t VALUES ('a'), ('a'), ('a'), ('a'), ('b'), ('a'), ('a'), ('a'), ('a'), ('a'), ('a'), ('a'), ('a'), ('a'), ('a'), ('a'), ('c');
SELECT anyHeavy(if(letter != 'b', letter, NULL)) FROM t;