From aad55ab55faa9ed64d802c73080db77503c6363d Mon Sep 17 00:00:00 2001 From: serxa Date: Wed, 12 Jun 2024 14:49:06 +0000 Subject: [PATCH 01/26] IO scheduling on HTTP session level --- base/poco/Net/include/Poco/Net/HTTPSession.h | 43 ++++++++++ base/poco/Net/src/HTTPSession.cpp | 24 ++++-- base/poco/Net/src/SocketImpl.cpp | 36 ++++----- src/Common/CurrentMetrics.cpp | 3 + src/Common/CurrentThread.cpp | 50 ++++++++++++ src/Common/CurrentThread.h | 43 +++++++++- src/Common/HTTPConnectionPool.cpp | 64 ++++++++++++++- src/Common/ProfileEvents.cpp | 7 ++ .../Scheduler/Nodes/tests/ResourceTest.h | 6 +- .../tests/gtest_dynamic_resource_manager.cpp | 4 +- .../Nodes/tests/gtest_resource_scheduler.cpp | 8 +- src/Common/Scheduler/ResourceGuard.h | 79 +++++++++++++++++-- src/Common/Scheduler/ResourceLink.h | 23 +++++- src/Common/ThreadStatus.h | 6 +- .../IO/ReadBufferFromAzureBlobStorage.cpp | 5 ++ .../IO/WriteBufferFromAzureBlobStorage.cpp | 6 +- .../ObjectStorages/DiskObjectStorage.cpp | 13 +-- src/IO/ReadBufferFromS3.cpp | 12 +-- src/IO/ReadSettings.h | 3 +- src/IO/WriteBufferFromS3.cpp | 13 +-- src/IO/WriteSettings.h | 3 +- .../ObjectStorage/HDFS/ReadBufferFromHDFS.cpp | 8 +- .../HDFS/WriteBufferFromHDFS.cpp | 10 +-- 23 files changed, 379 insertions(+), 90 deletions(-) diff --git a/base/poco/Net/include/Poco/Net/HTTPSession.h b/base/poco/Net/include/Poco/Net/HTTPSession.h index cac14f479db..b0e59443f9b 100644 --- a/base/poco/Net/include/Poco/Net/HTTPSession.h +++ b/base/poco/Net/include/Poco/Net/HTTPSession.h @@ -19,6 +19,8 @@ #include +#include +#include #include "Poco/Any.h" #include "Poco/Buffer.h" #include "Poco/Exception.h" @@ -33,6 +35,27 @@ namespace Net { + class IHTTPSessionDataHooks + /// Interface to control stream of data bytes being sent or received though socket by HTTPSession + /// It allows to monitor, throttle and schedule data streams with syscall granulatrity + { + public: + virtual ~IHTTPSessionDataHooks() = default; + + virtual void start(int bytes) = 0; + /// Called before sending/receiving data `bytes` to/from socket. + + virtual void finish(int bytes) = 0; + /// Called when sending/receiving of data `bytes` is successfully finished. + + virtual void fail() = 0; + /// If an error occured during send/receive `fail()` is called instead of `finish()`. + }; + + + using HTTPSessionDataHooksPtr = std::shared_ptr; + + class Net_API HTTPSession /// HTTPSession implements basic HTTP session management /// for both HTTP clients and HTTP servers. @@ -73,6 +96,12 @@ namespace Net Poco::Timespan getReceiveTimeout() const; /// Returns receive timeout for the HTTP session. + void setSendDataHooks(const HTTPSessionDataHooksPtr & sendDataHooks = {}); + /// Sets data hooks that will be called on every sent to the socket. + + void setReceiveDataHooks(const HTTPSessionDataHooksPtr & receiveDataHooks = {}); + /// Sets data hooks that will be called on every receive from the socket. + bool connected() const; /// Returns true if the underlying socket is connected. @@ -211,6 +240,10 @@ namespace Net Poco::Exception * _pException; Poco::Any _data; + // Data hooks + HTTPSessionDataHooksPtr _sendDataHooks; + HTTPSessionDataHooksPtr _receiveDataHooks; + friend class HTTPStreamBuf; friend class HTTPHeaderStreamBuf; friend class HTTPFixedLengthStreamBuf; @@ -246,6 +279,16 @@ namespace Net return _receiveTimeout; } + inline void HTTPSession::setSendDataHooks(const HTTPSessionDataHooksPtr & sendDataHooks) + { + _sendDataHooks = sendDataHooks; + } + + inline void HTTPSession::setReceiveDataHooks(const HTTPSessionDataHooksPtr & receiveDataHooks) + { + _receiveDataHooks = receiveDataHooks; + } + inline StreamSocket & HTTPSession::socket() { return _socket; diff --git a/base/poco/Net/src/HTTPSession.cpp b/base/poco/Net/src/HTTPSession.cpp index 8f951b3102c..596185703fa 100644 --- a/base/poco/Net/src/HTTPSession.cpp +++ b/base/poco/Net/src/HTTPSession.cpp @@ -128,14 +128,14 @@ int HTTPSession::get() { if (_pCurrent == _pEnd) refill(); - + if (_pCurrent < _pEnd) return *_pCurrent++; else return std::char_traits::eof(); } - + int HTTPSession::peek() { if (_pCurrent == _pEnd) @@ -147,7 +147,7 @@ int HTTPSession::peek() return std::char_traits::eof(); } - + int HTTPSession::read(char* buffer, std::streamsize length) { if (_pCurrent < _pEnd) @@ -166,10 +166,17 @@ int HTTPSession::write(const char* buffer, std::streamsize length) { try { - return _socket.sendBytes(buffer, (int) length); + if (_sendDataHooks) + _sendDataHooks->start((int) length); + int result = _socket.sendBytes(buffer, (int) length); + if (_sendDataHooks) + _sendDataHooks->finish(result); + return result; } catch (Poco::Exception& exc) { + if (_sendDataHooks) + _sendDataHooks->fail(); setException(exc); throw; } @@ -180,10 +187,17 @@ int HTTPSession::receive(char* buffer, int length) { try { - return _socket.receiveBytes(buffer, length); + if (_receiveDataHooks) + _receiveDataHooks->start(length); + int result = _socket.receiveBytes(buffer, length); + if (_receiveDataHooks) + _receiveDataHooks->finish(result); + return result; } catch (Poco::Exception& exc) { + if (_receiveDataHooks) + _receiveDataHooks->fail(); setException(exc); throw; } diff --git a/base/poco/Net/src/SocketImpl.cpp b/base/poco/Net/src/SocketImpl.cpp index 484b8cfeec3..65456a287fb 100644 --- a/base/poco/Net/src/SocketImpl.cpp +++ b/base/poco/Net/src/SocketImpl.cpp @@ -62,7 +62,7 @@ bool checkIsBrokenTimeout() SocketImpl::SocketImpl(): _sockfd(POCO_INVALID_SOCKET), - _blocking(true), + _blocking(true), _isBrokenTimeout(checkIsBrokenTimeout()) { } @@ -81,7 +81,7 @@ SocketImpl::~SocketImpl() close(); } - + SocketImpl* SocketImpl::acceptConnection(SocketAddress& clientAddr) { if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); @@ -117,7 +117,7 @@ void SocketImpl::connect(const SocketAddress& address) rc = ::connect(_sockfd, address.addr(), address.length()); } while (rc != 0 && lastError() == POCO_EINTR); - if (rc != 0) + if (rc != 0) { int err = lastError(); error(err, address.toString()); @@ -204,7 +204,7 @@ void SocketImpl::bind6(const SocketAddress& address, bool reuseAddress, bool reu #if defined(POCO_HAVE_IPv6) if (address.family() != SocketAddress::IPv6) throw Poco::InvalidArgumentException("SocketAddress must be an IPv6 address"); - + if (_sockfd == POCO_INVALID_SOCKET) { init(address.af()); @@ -225,11 +225,11 @@ void SocketImpl::bind6(const SocketAddress& address, bool reuseAddress, bool reu #endif } - + void SocketImpl::listen(int backlog) { if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); - + int rc = ::listen(_sockfd, backlog); if (rc != 0) error(); } @@ -253,7 +253,7 @@ void SocketImpl::shutdownReceive() if (rc != 0) error(); } - + void SocketImpl::shutdownSend() { if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); @@ -262,7 +262,7 @@ void SocketImpl::shutdownSend() if (rc != 0) error(); } - + void SocketImpl::shutdown() { if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); @@ -317,7 +317,7 @@ int SocketImpl::receiveBytes(void* buffer, int length, int flags) throw TimeoutException(); } } - + int rc; do { @@ -325,7 +325,7 @@ int SocketImpl::receiveBytes(void* buffer, int length, int flags) rc = ::recv(_sockfd, reinterpret_cast(buffer), length, flags); } while (blocking && rc < 0 && lastError() == POCO_EINTR); - if (rc < 0) + if (rc < 0) { int err = lastError(); if ((err == POCO_EAGAIN || err == POCO_EWOULDBLOCK) && !blocking) @@ -363,7 +363,7 @@ int SocketImpl::receiveFrom(void* buffer, int length, SocketAddress& address, in throw TimeoutException(); } } - + sockaddr_storage abuffer; struct sockaddr* pSA = reinterpret_cast(&abuffer); poco_socklen_t saLen = sizeof(abuffer); @@ -450,7 +450,7 @@ bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode) } while (rc < 0 && lastError() == POCO_EINTR); if (rc < 0) error(); - return rc > 0; + return rc > 0; #else @@ -493,7 +493,7 @@ bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode) } while (rc < 0 && errorCode == POCO_EINTR); if (rc < 0) error(errorCode); - return rc > 0; + return rc > 0; #endif // POCO_HAVE_FD_POLL } @@ -503,13 +503,13 @@ bool SocketImpl::poll(const Poco::Timespan& timeout, int mode) Poco::Timespan remainingTime(timeout); return pollImpl(remainingTime, mode); } - + void SocketImpl::setSendBufferSize(int size) { setOption(SOL_SOCKET, SO_SNDBUF, size); } - + int SocketImpl::getSendBufferSize() { int result; @@ -523,7 +523,7 @@ void SocketImpl::setReceiveBufferSize(int size) setOption(SOL_SOCKET, SO_RCVBUF, size); } - + int SocketImpl::getReceiveBufferSize() { int result; @@ -569,7 +569,7 @@ Poco::Timespan SocketImpl::getReceiveTimeout() return result; } - + SocketAddress SocketImpl::address() { if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); @@ -580,7 +580,7 @@ SocketAddress SocketImpl::address() int rc = ::getsockname(_sockfd, pSA, &saLen); if (rc == 0) return SocketAddress(pSA, saLen); - else + else error(); return SocketAddress(); } diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 731c72d65f2..63156f5291d 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -291,6 +291,9 @@ M(DistrCacheWriteRequests, "Number of executed Write requests to Distributed Cache") \ M(DistrCacheServerConnections, "Number of open connections to ClickHouse server from Distributed Cache") \ \ + M(SchedulerIOReadScheduled, "Number of IO reads are being scheduled currently") \ + M(SchedulerIOWriteScheduled, "Number of IO writes are being scheduled currently") \ + \ M(StorageConnectionsStored, "Total count of sessions stored in the session pool for storages") \ M(StorageConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for storages") \ \ diff --git a/src/Common/CurrentThread.cpp b/src/Common/CurrentThread.cpp index 70b69d6bcc7..ba7087ca7f1 100644 --- a/src/Common/CurrentThread.cpp +++ b/src/Common/CurrentThread.cpp @@ -113,6 +113,56 @@ std::string_view CurrentThread::getQueryId() return current_thread->getQueryId(); } +void CurrentThread::attachReadResource(ResourceLink link) +{ + if (unlikely(!current_thread)) + return; + if (current_thread->read_resource_link) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to read resource", std::to_string(getThreadId())); + current_thread->read_resource_link = link; +} + +void CurrentThread::detachReadResource() +{ + if (unlikely(!current_thread)) + return; + if (!current_thread->read_resource_link) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to read resource", std::to_string(getThreadId())); + current_thread->read_resource_link.reset(); +} + +ResourceLink CurrentThread::getReadResourceLink() +{ + if (unlikely(!current_thread)) + return {}; + return current_thread->read_resource_link; +} + +void CurrentThread::attachWriteResource(ResourceLink link) +{ + if (unlikely(!current_thread)) + return; + if (current_thread->write_resource_link) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to write resource", std::to_string(getThreadId())); + current_thread->write_resource_link = link; +} + +void CurrentThread::detachWriteResource() +{ + if (unlikely(!current_thread)) + return; + if (!current_thread->write_resource_link) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to write resource", std::to_string(getThreadId())); + current_thread->write_resource_link.reset(); +} + +ResourceLink CurrentThread::getWriteResourceLink() +{ + if (unlikely(!current_thread)) + return {}; + return current_thread->write_resource_link; +} + MemoryTracker * CurrentThread::getUserMemoryTracker() { if (unlikely(!current_thread)) diff --git a/src/Common/CurrentThread.h b/src/Common/CurrentThread.h index 53b61ba315f..787e8369a83 100644 --- a/src/Common/CurrentThread.h +++ b/src/Common/CurrentThread.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -23,7 +24,6 @@ class QueryStatus; struct Progress; class InternalTextLogsQueue; - /** Collection of static methods to work with thread-local objects. * Allows to attach and detach query/process (thread group) to a thread * (to calculate query-related metrics and to allow to obtain query-related data from a thread). @@ -92,6 +92,14 @@ public: static std::string_view getQueryId(); + // For IO Scheduling + static void attachReadResource(ResourceLink link); + static void detachReadResource(); + static ResourceLink getReadResourceLink(); + static void attachWriteResource(ResourceLink link); + static void detachWriteResource(); + static ResourceLink getWriteResourceLink(); + /// Initializes query with current thread as master thread in constructor, and detaches it in destructor struct QueryScope : private boost::noncopyable { @@ -102,6 +110,39 @@ public: void logPeakMemoryUsage(); bool log_peak_memory_usage_in_destructor = true; }; + + /// Scoped attach/detach of IO resource links + struct IOScope : private boost::noncopyable + { + explicit IOScope(ResourceLink read_resource_link, ResourceLink write_resource_link) + { + if (read_resource_link) + { + attachReadResource(read_resource_link); + read_attached = true; + } + if (write_resource_link) + { + attachWriteResource(write_resource_link); + write_attached = true; + } + } + + explicit IOScope(const IOSchedulingSettings & settings) + : IOScope(settings.read_resource_link, settings.write_resource_link) + {} + + ~IOScope() + { + if (read_attached) + detachReadResource(); + if (write_attached) + detachWriteResource(); + } + + bool read_attached = false; + bool write_attached = false; + }; }; } diff --git a/src/Common/HTTPConnectionPool.cpp b/src/Common/HTTPConnectionPool.cpp index 167aeee68f3..de7e10d044a 100644 --- a/src/Common/HTTPConnectionPool.cpp +++ b/src/Common/HTTPConnectionPool.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -249,6 +250,54 @@ public: }; +// Session data hooks implementation for integration with resource scheduler. +// Hooks are created per every request-response pair and are registered/unregistered in HTTP session. +// * `start()` send resource request to the scheduler every time HTTP session is going to send or receive +// data to/from socket. `start()` waits for the scheduler confirmation. This way scheduler might +// throttle and/or schedule socket data streams. +// * `finish()` hook is called on successful socket read/write operation. +// It informs the scheduler that operation is complete, which allows the scheduler to control the total +// amount of in-flight bytes and/or operations. +// * `fail()` hook is called on failure of socket operation. The purpose is to correct the amount of bytes +// passed through the scheduler queue to ensure fair bandwidth allocation even in presence of errors. +struct ResourceGuardSessionDataHooks : public Poco::Net::IHTTPSessionDataHooks +{ + explicit ResourceGuardSessionDataHooks(const ResourceGuard::Metrics * metrics, ResourceLink link_) + : link(link_) + { + request.metrics = metrics; + chassert(link); + } + + ~ResourceGuardSessionDataHooks() override + { + request.assertFinished(); // Never destruct with an active request + } + + void start(int bytes) override + { + // TODO(serxa): add metrics here or better in scheduler code (e.g. during enqueue, or better in REsourceGuard::REquest)? + request.enqueue(bytes, link); + request.wait(); + } + + void finish(int bytes) override + { + request.finish(); + link.adjust(request.cost, bytes); // success + } + + void fail() override + { + request.finish(); + link.accumulate(request.cost); // We assume no resource was used in case of failure + } + + ResourceLink link; + ResourceGuard::Request request; +}; + + // EndpointConnectionPool manage connections to the endpoint // Features: // - it uses HostResolver for address selecting. See Common/HostResolver.h for more info. @@ -259,8 +308,6 @@ public: // - `Session::reconnect()` uses the pool as well // - comprehensive sensors // - session is reused according its inner state, automatically - - template class EndpointConnectionPool : public std::enable_shared_from_this>, public IExtendedPool { @@ -350,6 +397,19 @@ private: std::ostream & sendRequest(Poco::Net::HTTPRequest & request) override { auto idle = idleTime(); + + // Reset data hooks for IO scheduling + if (ResourceLink link = CurrentThread::getReadResourceLink()) { + Session::setSendDataHooks(std::make_shared(ResourceGuard::Metrics::getIORead(), link)); + } else { + Session::setSendDataHooks(); + } + if (ResourceLink link = CurrentThread::getWriteResourceLink()) { + Session::setReceiveDataHooks(std::make_shared(ResourceGuard::Metrics::getIOWrite(), link)); + } else { + Session::setReceiveDataHooks(); + } + std::ostream & result = Session::sendRequest(request); result.exceptions(std::ios::badbit); diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index f73e16c517d..b305614e54c 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -104,6 +104,13 @@ M(PartsWithAppliedMutationsOnFly, "Total number of parts for which there was any mutation applied on fly") \ M(MutationsAppliedOnFlyInAllParts, "The sum of number of applied mutations on-fly for part among all read parts") \ \ + M(SchedulerIOReadRequests, "Resource requests passed through scheduler for IO reads.") \ + M(SchedulerIOReadBytes, "Bytes passed through scheduler for IO reads.") \ + M(SchedulerIOReadWaitMicroseconds, "Total time a query was waiting on resource requests for IO reads.") \ + M(SchedulerIOWriteRequests, "Resource requests passed through scheduler for IO writes.") \ + M(SchedulerIOWriteBytes, "Bytes passed through scheduler for IO writes.") \ + M(SchedulerIOWriteWaitMicroseconds, "Total time a query was waiting on resource requests for IO writes.") \ + \ M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \ \ M(ReplicatedPartFetches, "Number of times a data part was downloaded from replica of a ReplicatedMergeTree table.") \ diff --git a/src/Common/Scheduler/Nodes/tests/ResourceTest.h b/src/Common/Scheduler/Nodes/tests/ResourceTest.h index ea3f9edf765..a5eb98f2a2f 100644 --- a/src/Common/Scheduler/Nodes/tests/ResourceTest.h +++ b/src/Common/Scheduler/Nodes/tests/ResourceTest.h @@ -232,7 +232,7 @@ struct ResourceTestManager : public ResourceTestBase ResourceTestManager & t; Guard(ResourceTestManager & t_, ResourceLink link_, ResourceCost cost) - : ResourceGuard(link_, cost, PostponeLocking) + : ResourceGuard(ResourceGuard::Metrics::getIOWrite(), link_, cost, PostponeLocking) , t(t_) { t.onEnqueue(link); @@ -310,7 +310,7 @@ struct ResourceTestManager : public ResourceTestBase // NOTE: actually leader's request(s) make their own small busy period. void blockResource(ResourceLink link) { - ResourceGuard g(link, 1, ResourceGuard::PostponeLocking); + ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, 1, ResourceGuard::PostponeLocking); g.lock(); // NOTE: at this point we assume resource to be blocked by single request (1) busy_period.arrive_and_wait(); // (1) notify all followers that resource is blocked @@ -320,7 +320,7 @@ struct ResourceTestManager : public ResourceTestBase { getLinkData(link).left += total_requests + 1; busy_period.arrive_and_wait(); // (1) wait leader to block resource - ResourceGuard g(link, cost, ResourceGuard::PostponeLocking); + ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, cost, ResourceGuard::PostponeLocking); onEnqueue(link); busy_period.arrive_and_wait(); // (2) notify leader to unblock g.lock(); diff --git a/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp b/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp index 1901a4fd120..4ac79977663 100644 --- a/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp +++ b/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp @@ -36,11 +36,11 @@ TEST(SchedulerDynamicResourceManager, Smoke) for (int i = 0; i < 10; i++) { - ResourceGuard gA(cA->get("res1"), ResourceGuard::PostponeLocking); + ResourceGuard gA(ResourceGuard::Metrics::getIOWrite(), cA->get("res1"), ResourceGuard::PostponeLocking); gA.lock(); gA.unlock(); - ResourceGuard gB(cB->get("res1")); + ResourceGuard gB(ResourceGuard::Metrics::getIOWrite(), cB->get("res1")); } } diff --git a/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp b/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp index f8196d15819..ba573bf0c85 100644 --- a/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp +++ b/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp @@ -109,22 +109,22 @@ TEST(SchedulerRoot, Smoke) r2.registerResource(); { - ResourceGuard rg(a); + ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), a); EXPECT_TRUE(fc1->requests.contains(&rg.request)); } { - ResourceGuard rg(b); + ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), b); EXPECT_TRUE(fc1->requests.contains(&rg.request)); } { - ResourceGuard rg(c); + ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), c); EXPECT_TRUE(fc2->requests.contains(&rg.request)); } { - ResourceGuard rg(d); + ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), d); EXPECT_TRUE(fc2->requests.contains(&rg.request)); } } diff --git a/src/Common/Scheduler/ResourceGuard.h b/src/Common/Scheduler/ResourceGuard.h index 3c29f588fba..73aea4afdb6 100644 --- a/src/Common/Scheduler/ResourceGuard.h +++ b/src/Common/Scheduler/ResourceGuard.h @@ -7,10 +7,30 @@ #include #include +#include +#include +#include + #include #include +namespace ProfileEvents +{ + extern const Event SchedulerIOReadRequests; + extern const Event SchedulerIOReadBytes; + extern const Event SchedulerIOReadWaitMicroseconds; + extern const Event SchedulerIOWriteRequests; + extern const Event SchedulerIOWriteBytes; + extern const Event SchedulerIOWriteWaitMicroseconds; +} + +namespace CurrentMetrics +{ + extern const Metric SchedulerIOReadScheduled; + extern const Metric SchedulerIOWriteScheduled; +} + namespace DB { @@ -30,6 +50,36 @@ public: PostponeLocking /// Don't lock in constructor, but send request }; + struct Metrics + { + const ProfileEvents::Event requests = ProfileEvents::end(); + const ProfileEvents::Event cost = ProfileEvents::end(); + const ProfileEvents::Event wait_microseconds = ProfileEvents::end(); + const CurrentMetrics::Metric scheduled_count = CurrentMetrics::end(); + + static const Metrics * getIORead() + { + static Metrics metrics{ + .requests = ProfileEvents::SchedulerIOReadRequests, + .cost = ProfileEvents::SchedulerIOReadBytes, + .wait_microseconds = ProfileEvents::SchedulerIOReadWaitMicroseconds, + .scheduled_count = CurrentMetrics::SchedulerIOReadScheduled + }; + return &metrics; + } + + static const Metrics * getIOWrite() + { + static Metrics metrics{ + .requests = ProfileEvents::SchedulerIOWriteRequests, + .cost = ProfileEvents::SchedulerIOWriteBytes, + .wait_microseconds = ProfileEvents::SchedulerIOWriteWaitMicroseconds, + .scheduled_count = CurrentMetrics::SchedulerIOWriteScheduled + }; + return &metrics; + } + }; + enum RequestState { Finished, // Last request has already finished; no concurrent access is possible @@ -46,6 +96,8 @@ public: chassert(state == Finished); state = Enqueued; ResourceRequest::reset(cost_); + ProfileEvents::increment(metrics->requests); + ProfileEvents::increment(metrics->cost, cost_); link_.queue->enqueueRequestUsingBudget(this); } @@ -63,6 +115,8 @@ public: void wait() { + CurrentMetrics::Increment scheduled(metrics->scheduled_count); + auto timer = CurrentThread::getProfileEvents().timer(metrics->wait_microseconds); std::unique_lock lock(mutex); dequeued_cv.wait(lock, [this] { return state == Dequeued; }); } @@ -75,14 +129,23 @@ public: ResourceRequest::finish(); } - static Request & local() + void assertFinished() + { + // lock(mutex) is not required because `Finished` request cannot be used by the scheduler thread + chassert(state == Finished); + } + + static Request & local(const Metrics * metrics) { // Since single thread cannot use more than one resource request simultaneously, // we can reuse thread-local request to avoid allocations static thread_local Request instance; + instance.metrics = metrics; return instance; } + const Metrics * metrics = nullptr; // Must be initialized before use + private: std::mutex mutex; std::condition_variable dequeued_cv; @@ -90,13 +153,13 @@ public: }; /// Creates pending request for resource; blocks while resource is not available (unless `PostponeLocking`) - explicit ResourceGuard(ResourceLink link_, ResourceCost cost = 1, ResourceGuardCtor ctor = LockStraightAway) + explicit ResourceGuard(const Metrics * metrics, ResourceLink link_, ResourceCost cost = 1, ResourceGuardCtor ctor = LockStraightAway) : link(link_) - , request(Request::local()) + , request(Request::local(metrics)) { if (cost == 0) - link.queue = nullptr; // Ignore zero-cost requests - else if (link.queue) + link.reset(); // Ignore zero-cost requests + else if (link) { request.enqueue(cost, link); if (ctor == LockStraightAway) @@ -112,17 +175,17 @@ public: /// Blocks until resource is available void lock() { - if (link.queue) + if (link) request.wait(); } /// Report resource consumption has finished void unlock() { - if (link.queue) + if (link) { request.finish(); - link.queue = nullptr; + link.reset(); } } diff --git a/src/Common/Scheduler/ResourceLink.h b/src/Common/Scheduler/ResourceLink.h index 450d9bc1efa..6dd3be930ca 100644 --- a/src/Common/Scheduler/ResourceLink.h +++ b/src/Common/Scheduler/ResourceLink.h @@ -13,13 +13,32 @@ using ResourceCost = Int64; struct ResourceLink { ISchedulerQueue * queue = nullptr; + bool operator==(const ResourceLink &) const = default; + explicit operator bool() const { return queue != nullptr; } void adjust(ResourceCost estimated_cost, ResourceCost real_cost) const; - void consumed(ResourceCost cost) const; - void accumulate(ResourceCost cost) const; + + void reset() + { + queue = nullptr; + } +}; + +/* + * Everything required for IO scheduling. + * Note that raw pointer are stored inside, so make sure that `ClassifierPtr` that produced + * resource links will outlive them. Usually classifier is stored in query `Context`. + */ +struct IOSchedulingSettings +{ + ResourceLink read_resource_link; + ResourceLink write_resource_link; + + bool operator==(const IOSchedulingSettings &) const = default; + explicit operator bool() const { return read_resource_link && write_resource_link; } }; } diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index 0c02ab8fdb0..6ea0a9a848c 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -7,11 +7,11 @@ #include #include #include +#include #include #include -#include #include #include #include @@ -188,6 +188,10 @@ public: Progress progress_in; Progress progress_out; + /// IO scheduling + ResourceLink read_resource_link; + ResourceLink write_resource_link; + private: /// Group of threads, to which this thread attached ThreadGroupPtr thread_group; diff --git a/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp b/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp index da1ea65f2ea..ecc4168c729 100644 --- a/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -113,13 +114,17 @@ bool ReadBufferFromAzureBlobStorage::nextImpl() { try { + ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, to_read_bytes); bytes_read = data_stream->ReadToCount(reinterpret_cast(data_ptr), to_read_bytes); + read_settings.io_scheduling.read_resource_link.adjust(to_read_bytes, bytes_read); + rlock.unlock(); // Do not hold resource under bandwidth throttler if (read_settings.remote_throttler) read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds); break; } catch (const Azure::Core::RequestFailedException & e) { + read_settings.io_scheduling.read_resource_link.accumulate(to_read_bytes); // We assume no resource was used in case of failure ProfileEvents::increment(ProfileEvents::ReadBufferFromAzureRequestsErrors); LOG_DEBUG(log, "Exception caught during Azure Read for file {} at attempt {}/{}: {}", path, i + 1, max_single_read_retries, e.Message); diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp index 2c90e3a9003..355f70b5e33 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp @@ -88,14 +88,14 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function func, { try { - ResourceGuard rlock(write_settings.resource_link, cost); // Note that zero-cost requests are ignored + ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, cost); // Note that zero-cost requests are ignored func(); break; } catch (const Azure::Core::RequestFailedException & e) { if (cost) - write_settings.resource_link.accumulate(cost); // Accumulate resource for later use, because we have failed to consume it + write_settings.io_scheduling.write_resource_link.accumulate(cost); // Accumulate resource for later use, because we have failed to consume it if (i == num_tries - 1 || !isRetryableAzureException(e)) throw; @@ -105,7 +105,7 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function func, catch (...) { if (cost) - write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure + write_settings.io_scheduling.write_resource_link.accumulate(cost); // We assume no resource was used in case of failure throw; } } diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.cpp b/src/Disks/ObjectStorages/DiskObjectStorage.cpp index 5803a985000..ccdb321c904 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorage.cpp @@ -461,14 +461,17 @@ DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage() } template -static inline Settings updateResourceLink(const Settings & settings, const String & resource_name) +static inline Settings updateIOSchedulingSettings(const Settings & settings, const String & read_resource_name, const String & write_resource_name) { - if (resource_name.empty()) + if (read_resource_name.empty() && write_resource_name.empty()) return settings; if (auto query_context = CurrentThread::getQueryContext()) { Settings result(settings); - result.resource_link = query_context->getWorkloadClassifier()->get(resource_name); + if (!read_resource_name.empty()) + result.io_scheduling.read_resource_link = query_context->getWorkloadClassifier()->get(read_resource_name); + if (!write_resource_name.empty()) + result.io_scheduling.write_resource_link = query_context->getWorkloadClassifier()->get(write_resource_name); return result; } return settings; @@ -500,7 +503,7 @@ std::unique_ptr DiskObjectStorage::readFile( return object_storage->readObjects( storage_objects, - updateResourceLink(settings, getReadResourceName()), + updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName()), read_hint, file_size); } @@ -513,7 +516,7 @@ std::unique_ptr DiskObjectStorage::writeFile( { LOG_TEST(log, "Write file: {}", path); - WriteSettings write_settings = updateResourceLink(settings, getWriteResourceName()); + WriteSettings write_settings = updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName()); auto transaction = createObjectStorageTransaction(); return transaction->writeFile(path, buf_size, mode, write_settings); } diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index 8823af55936..0c90ae25626 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -6,7 +6,6 @@ #include #include -#include #include #include @@ -425,22 +424,13 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si ProfileEventTimeIncrement watch(ProfileEvents::ReadBufferFromS3InitMicroseconds); // We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below - constexpr ResourceCost estimated_cost = 1; - ResourceGuard rlock(read_settings.resource_link, estimated_cost); - + CurrentThread::IOScope io_scope(read_settings.io_scheduling); Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req); - rlock.unlock(); - if (outcome.IsSuccess()) - { - ResourceCost bytes_read = outcome.GetResult().GetContentLength(); - read_settings.resource_link.adjust(estimated_cost, bytes_read); return outcome.GetResultWithOwnership(); - } else { - read_settings.resource_link.accumulate(estimated_cost); const auto & error = outcome.GetError(); throw S3Exception(error.GetMessage(), error.GetErrorType()); } diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index e73a9054928..7c22682dc76 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -118,8 +118,7 @@ struct ReadSettings ThrottlerPtr remote_throttler; ThrottlerPtr local_throttler; - // Resource to be used during reading - ResourceLink resource_link; + IOSchedulingSettings io_scheduling; size_t http_max_tries = 10; size_t http_retry_initial_backoff_ms = 100; diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index ff18a77f09f..160816ebbaa 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -11,7 +11,6 @@ #include #include -#include #include #include #include @@ -538,12 +537,11 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data) auto & request = std::get<0>(*worker_data); - ResourceCost cost = request.GetContentLength(); - ResourceGuard rlock(write_settings.resource_link, cost); + CurrentThread::IOScope io_scope(write_settings.io_scheduling); + Stopwatch watch; auto outcome = client_ptr->UploadPart(request); watch.stop(); - rlock.unlock(); // Avoid acquiring other locks under resource lock ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); @@ -557,7 +555,6 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data) if (!outcome.IsSuccess()) { ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); - write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType()); } @@ -695,12 +692,11 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3PutObject); - ResourceCost cost = request.GetContentLength(); - ResourceGuard rlock(write_settings.resource_link, cost); + CurrentThread::IOScope io_scope(write_settings.io_scheduling); + Stopwatch watch; auto outcome = client_ptr->PutObject(request); watch.stop(); - rlock.unlock(); ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); if (blob_log) @@ -714,7 +710,6 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data } ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); - write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) { diff --git a/src/IO/WriteSettings.h b/src/IO/WriteSettings.h index 84bb25439b5..cdc75e8c0e9 100644 --- a/src/IO/WriteSettings.h +++ b/src/IO/WriteSettings.h @@ -13,8 +13,7 @@ struct WriteSettings ThrottlerPtr remote_throttler; ThrottlerPtr local_throttler; - // Resource to be used during reading - ResourceLink resource_link; + IOSchedulingSettings io_scheduling; /// Filesystem cache settings bool enable_filesystem_cache_on_write_operations = false; diff --git a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp index be339d021dc..d1502577bf7 100644 --- a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp +++ b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp @@ -119,27 +119,25 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory(num_bytes_to_read)); + read_settings.io_scheduling.read_resource_link.adjust(num_bytes_to_read, std::max(0, bytes_read)); } catch (...) { - read_settings.resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure + read_settings.io_scheduling.read_resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure throw; } - rlock.unlock(); if (bytes_read < 0) { - read_settings.resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure throw Exception(ErrorCodes::NETWORK_ERROR, "Fail to read from HDFS: {}, file path: {}. Error: {}", hdfs_uri, hdfs_file_path, std::string(hdfsGetLastError())); } - read_settings.resource_link.adjust(num_bytes_to_read, bytes_read); if (bytes_read) { diff --git a/src/Storages/ObjectStorage/HDFS/WriteBufferFromHDFS.cpp b/src/Storages/ObjectStorage/HDFS/WriteBufferFromHDFS.cpp index 2c14b38ce01..65a5f45cd96 100644 --- a/src/Storages/ObjectStorage/HDFS/WriteBufferFromHDFS.cpp +++ b/src/Storages/ObjectStorage/HDFS/WriteBufferFromHDFS.cpp @@ -66,25 +66,21 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl int write(const char * start, size_t size) { - ResourceGuard rlock(write_settings.resource_link, size); int bytes_written; try { + ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, size); bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast(size)); + write_settings.io_scheduling.write_resource_link.adjust(size, std::max(0, bytes_written)); } catch (...) { - write_settings.resource_link.accumulate(size); // We assume no resource was used in case of failure + write_settings.io_scheduling.write_resource_link.accumulate(size); // We assume no resource was used in case of failure throw; } - rlock.unlock(); if (bytes_written < 0) - { - write_settings.resource_link.accumulate(size); // We assume no resource was used in case of failure throw Exception(ErrorCodes::NETWORK_ERROR, "Fail to write HDFS file: {} {}", hdfs_uri, std::string(hdfsGetLastError())); - } - write_settings.resource_link.adjust(size, bytes_written); if (write_settings.remote_throttler) write_settings.remote_throttler->add(bytes_written, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds); From eb3c61915d4bfae9f1e1007ca02fcd2c0f50b585 Mon Sep 17 00:00:00 2001 From: serxa Date: Thu, 13 Jun 2024 14:27:39 +0000 Subject: [PATCH 02/26] fix mixed read and write links --- src/Common/HTTPConnectionPool.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Common/HTTPConnectionPool.cpp b/src/Common/HTTPConnectionPool.cpp index de7e10d044a..b6bf98894e4 100644 --- a/src/Common/HTTPConnectionPool.cpp +++ b/src/Common/HTTPConnectionPool.cpp @@ -400,15 +400,15 @@ private: // Reset data hooks for IO scheduling if (ResourceLink link = CurrentThread::getReadResourceLink()) { - Session::setSendDataHooks(std::make_shared(ResourceGuard::Metrics::getIORead(), link)); - } else { - Session::setSendDataHooks(); - } - if (ResourceLink link = CurrentThread::getWriteResourceLink()) { - Session::setReceiveDataHooks(std::make_shared(ResourceGuard::Metrics::getIOWrite(), link)); + Session::setReceiveDataHooks(std::make_shared(ResourceGuard::Metrics::getIORead(), link)); } else { Session::setReceiveDataHooks(); } + if (ResourceLink link = CurrentThread::getWriteResourceLink()) { + Session::setSendDataHooks(std::make_shared(ResourceGuard::Metrics::getIOWrite(), link)); + } else { + Session::setSendDataHooks(); + } std::ostream & result = Session::sendRequest(request); result.exceptions(std::ios::badbit); From f1f354f22b5d1573e970e5d2a5e2ea540e3f99f8 Mon Sep 17 00:00:00 2001 From: serxa Date: Thu, 13 Jun 2024 15:33:59 +0000 Subject: [PATCH 03/26] add test for granularity and total byte size of resource requests --- tests/integration/test_scheduler/test.py | 67 ++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/tests/integration/test_scheduler/test.py b/tests/integration/test_scheduler/test.py index 8e37bd8d403..9940e16ea42 100644 --- a/tests/integration/test_scheduler/test.py +++ b/tests/integration/test_scheduler/test.py @@ -28,6 +28,73 @@ def start_cluster(): cluster.shutdown() +def test_s3_resource_request_granularity(): + node.query( + f""" + drop table if exists data; + create table data (key UInt64 CODEC(NONE), value String CODEC(NONE)) engine=MergeTree() order by key settings min_bytes_for_wide_part=1e9, storage_policy='s3'; + """ + ) + + total_bytes = 50000000 # Approximate data size + max_bytes_per_request = 2000000 # Should be ~1MB or less in general + min_bytes_per_request = 6000 # Small requests are ok, but we don't want hurt performance with too often resource requests + + writes_before = int( + node.query( + f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/admin'" + ).strip() + ) + write_bytes_before = int( + node.query( + f"select dequeued_cost from system.scheduler where resource='network_write' and path='/prio/admin'" + ).strip() + ) + node.query(f"insert into data select number, randomString(10000000) from numbers(5) SETTINGS workload='admin'") + writes_after = int( + node.query( + f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/admin'" + ).strip() + ) + write_bytes_after = int( + node.query( + f"select dequeued_cost from system.scheduler where resource='network_write' and path='/prio/admin'" + ).strip() + ) + + assert write_bytes_after - write_bytes_before > 1.0 * total_bytes + assert write_bytes_after - write_bytes_before < 1.2 * total_bytes + assert (write_bytes_after - write_bytes_before) / (writes_after - writes_before) < max_bytes_per_request + assert (write_bytes_after - write_bytes_before) / (writes_after - writes_before) > min_bytes_per_request + + reads_before = int( + node.query( + f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/admin'" + ).strip() + ) + read_bytes_before = int( + node.query( + f"select dequeued_cost from system.scheduler where resource='network_read' and path='/prio/admin'" + ).strip() + ) + node.query(f"select count() from data where not ignore(*) SETTINGS workload='admin'") + reads_after = int( + node.query( + f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/admin'" + ).strip() + ) + read_bytes_after = int( + node.query( + f"select dequeued_cost from system.scheduler where resource='network_read' and path='/prio/admin'" + ).strip() + ) + + assert read_bytes_after - read_bytes_before > 1.0 * total_bytes + assert read_bytes_after - read_bytes_before < 1.2 * total_bytes + assert (read_bytes_after - read_bytes_before) / (reads_after - reads_before) < max_bytes_per_request + assert (read_bytes_after - read_bytes_before) / (reads_after - reads_before) > min_bytes_per_request + + def test_s3_disk(): node.query( f""" From b74cf1a356fe668721532c1d05fa08aeaa1bf023 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Thu, 13 Jun 2024 15:43:08 +0000 Subject: [PATCH 04/26] Automatic style fix --- tests/integration/test_scheduler/test.py | 30 +++++++++++++++++------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/tests/integration/test_scheduler/test.py b/tests/integration/test_scheduler/test.py index 9940e16ea42..0ae297f04d1 100644 --- a/tests/integration/test_scheduler/test.py +++ b/tests/integration/test_scheduler/test.py @@ -36,9 +36,9 @@ def test_s3_resource_request_granularity(): """ ) - total_bytes = 50000000 # Approximate data size - max_bytes_per_request = 2000000 # Should be ~1MB or less in general - min_bytes_per_request = 6000 # Small requests are ok, but we don't want hurt performance with too often resource requests + total_bytes = 50000000 # Approximate data size + max_bytes_per_request = 2000000 # Should be ~1MB or less in general + min_bytes_per_request = 6000 # Small requests are ok, but we don't want hurt performance with too often resource requests writes_before = int( node.query( @@ -50,7 +50,9 @@ def test_s3_resource_request_granularity(): f"select dequeued_cost from system.scheduler where resource='network_write' and path='/prio/admin'" ).strip() ) - node.query(f"insert into data select number, randomString(10000000) from numbers(5) SETTINGS workload='admin'") + node.query( + f"insert into data select number, randomString(10000000) from numbers(5) SETTINGS workload='admin'" + ) writes_after = int( node.query( f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/admin'" @@ -64,8 +66,12 @@ def test_s3_resource_request_granularity(): assert write_bytes_after - write_bytes_before > 1.0 * total_bytes assert write_bytes_after - write_bytes_before < 1.2 * total_bytes - assert (write_bytes_after - write_bytes_before) / (writes_after - writes_before) < max_bytes_per_request - assert (write_bytes_after - write_bytes_before) / (writes_after - writes_before) > min_bytes_per_request + assert (write_bytes_after - write_bytes_before) / ( + writes_after - writes_before + ) < max_bytes_per_request + assert (write_bytes_after - write_bytes_before) / ( + writes_after - writes_before + ) > min_bytes_per_request reads_before = int( node.query( @@ -77,7 +83,9 @@ def test_s3_resource_request_granularity(): f"select dequeued_cost from system.scheduler where resource='network_read' and path='/prio/admin'" ).strip() ) - node.query(f"select count() from data where not ignore(*) SETTINGS workload='admin'") + node.query( + f"select count() from data where not ignore(*) SETTINGS workload='admin'" + ) reads_after = int( node.query( f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/admin'" @@ -91,8 +99,12 @@ def test_s3_resource_request_granularity(): assert read_bytes_after - read_bytes_before > 1.0 * total_bytes assert read_bytes_after - read_bytes_before < 1.2 * total_bytes - assert (read_bytes_after - read_bytes_before) / (reads_after - reads_before) < max_bytes_per_request - assert (read_bytes_after - read_bytes_before) / (reads_after - reads_before) > min_bytes_per_request + assert (read_bytes_after - read_bytes_before) / ( + reads_after - reads_before + ) < max_bytes_per_request + assert (read_bytes_after - read_bytes_before) / ( + reads_after - reads_before + ) > min_bytes_per_request def test_s3_disk(): From 937e1708259c518d27959961849da263163cc5a2 Mon Sep 17 00:00:00 2001 From: serxa Date: Thu, 13 Jun 2024 18:58:39 +0000 Subject: [PATCH 05/26] make ResourceGuards more straight-forward --- src/Common/HTTPConnectionPool.cpp | 10 +++--- .../Scheduler/Nodes/tests/ResourceTest.h | 9 ++++-- .../tests/gtest_dynamic_resource_manager.cpp | 7 ++++- .../Nodes/tests/gtest_resource_scheduler.cpp | 4 +++ src/Common/Scheduler/ResourceGuard.h | 31 ++++++++++++------- src/Common/Scheduler/ResourceRequest.h | 2 +- .../IO/ReadBufferFromAzureBlobStorage.cpp | 4 +-- .../IO/WriteBufferFromAzureBlobStorage.cpp | 6 +--- .../ObjectStorage/HDFS/ReadBufferFromHDFS.cpp | 15 ++------- .../HDFS/WriteBufferFromHDFS.cpp | 15 ++------- 10 files changed, 50 insertions(+), 53 deletions(-) diff --git a/src/Common/HTTPConnectionPool.cpp b/src/Common/HTTPConnectionPool.cpp index b6bf98894e4..18a6ab13c4f 100644 --- a/src/Common/HTTPConnectionPool.cpp +++ b/src/Common/HTTPConnectionPool.cpp @@ -276,21 +276,18 @@ struct ResourceGuardSessionDataHooks : public Poco::Net::IHTTPSessionDataHooks void start(int bytes) override { - // TODO(serxa): add metrics here or better in scheduler code (e.g. during enqueue, or better in REsourceGuard::REquest)? request.enqueue(bytes, link); request.wait(); } void finish(int bytes) override { - request.finish(); - link.adjust(request.cost, bytes); // success + request.finish(bytes, link); } void fail() override { - request.finish(); - link.accumulate(request.cost); // We assume no resource was used in case of failure + request.finish(0, link); } ResourceLink link; @@ -466,6 +463,9 @@ private: } } response_stream = nullptr; + // FIXME: We are not sure that response stream is fully read at this moment, so hooks could possible be called after this point, right? + // Session::setSendDataHooks(); + // Session::setReceiveDataHooks(); group->atConnectionDestroy(); diff --git a/src/Common/Scheduler/Nodes/tests/ResourceTest.h b/src/Common/Scheduler/Nodes/tests/ResourceTest.h index a5eb98f2a2f..c440cb176f8 100644 --- a/src/Common/Scheduler/Nodes/tests/ResourceTest.h +++ b/src/Common/Scheduler/Nodes/tests/ResourceTest.h @@ -232,12 +232,13 @@ struct ResourceTestManager : public ResourceTestBase ResourceTestManager & t; Guard(ResourceTestManager & t_, ResourceLink link_, ResourceCost cost) - : ResourceGuard(ResourceGuard::Metrics::getIOWrite(), link_, cost, PostponeLocking) + : ResourceGuard(ResourceGuard::Metrics::getIOWrite(), link_, cost, Lock::Postpone) , t(t_) { t.onEnqueue(link); lock(); t.onExecute(link); + consume(cost); } }; @@ -310,8 +311,9 @@ struct ResourceTestManager : public ResourceTestBase // NOTE: actually leader's request(s) make their own small busy period. void blockResource(ResourceLink link) { - ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, 1, ResourceGuard::PostponeLocking); + ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, 1, ResourceGuard::Lock::Postpone); g.lock(); + g.consume(1); // NOTE: at this point we assume resource to be blocked by single request (1) busy_period.arrive_and_wait(); // (1) notify all followers that resource is blocked busy_period.arrive_and_wait(); // (2) wait all followers to enqueue their requests @@ -320,10 +322,11 @@ struct ResourceTestManager : public ResourceTestBase { getLinkData(link).left += total_requests + 1; busy_period.arrive_and_wait(); // (1) wait leader to block resource - ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, cost, ResourceGuard::PostponeLocking); + ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, cost, ResourceGuard::Lock::Postpone); onEnqueue(link); busy_period.arrive_and_wait(); // (2) notify leader to unblock g.lock(); + g.consume(cost); onExecute(link); } }; diff --git a/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp b/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp index 4ac79977663..0f4aaab70aa 100644 --- a/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp +++ b/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp @@ -36,11 +36,16 @@ TEST(SchedulerDynamicResourceManager, Smoke) for (int i = 0; i < 10; i++) { - ResourceGuard gA(ResourceGuard::Metrics::getIOWrite(), cA->get("res1"), ResourceGuard::PostponeLocking); + ResourceGuard gA(ResourceGuard::Metrics::getIOWrite(), cA->get("res1"), 1, ResourceGuard::Lock::Postpone); gA.lock(); + gA.consume(1); gA.unlock(); ResourceGuard gB(ResourceGuard::Metrics::getIOWrite(), cB->get("res1")); + gB.unlock(); + + ResourceGuard gC(ResourceGuard::Metrics::getIORead(), cB->get("res1")); + gB.consume(2); } } diff --git a/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp b/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp index ba573bf0c85..ff3054d6b7a 100644 --- a/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp +++ b/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp @@ -111,21 +111,25 @@ TEST(SchedulerRoot, Smoke) { ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), a); EXPECT_TRUE(fc1->requests.contains(&rg.request)); + rg.consume(1); } { ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), b); EXPECT_TRUE(fc1->requests.contains(&rg.request)); + rg.consume(1); } { ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), c); EXPECT_TRUE(fc2->requests.contains(&rg.request)); + rg.consume(1); } { ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), d); EXPECT_TRUE(fc2->requests.contains(&rg.request)); + rg.consume(1); } } diff --git a/src/Common/Scheduler/ResourceGuard.h b/src/Common/Scheduler/ResourceGuard.h index 73aea4afdb6..93b6268a62c 100644 --- a/src/Common/Scheduler/ResourceGuard.h +++ b/src/Common/Scheduler/ResourceGuard.h @@ -42,12 +42,12 @@ namespace DB class ResourceGuard { public: - enum ResourceGuardCtor + enum class Lock { - LockStraightAway, /// Locks inside constructor (default) + StraightAway, /// Locks inside constructor (default) // WARNING: Only for tests. It is not exception-safe because `lock()` must be called after construction. - PostponeLocking /// Don't lock in constructor, but send request + Postpone /// Don't lock in constructor, but send request }; struct Metrics @@ -96,8 +96,6 @@ public: chassert(state == Finished); state = Enqueued; ResourceRequest::reset(cost_); - ProfileEvents::increment(metrics->requests); - ProfileEvents::increment(metrics->cost, cost_); link_.queue->enqueueRequestUsingBudget(this); } @@ -121,12 +119,16 @@ public: dequeued_cv.wait(lock, [this] { return state == Dequeued; }); } - void finish() + void finish(ResourceCost real_cost_, ResourceLink link_) { // lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread chassert(state == Dequeued); state = Finished; + if (cost != real_cost_) + link_.adjust(cost, real_cost_); ResourceRequest::finish(); + ProfileEvents::increment(metrics->requests); + ProfileEvents::increment(metrics->cost, real_cost_); } void assertFinished() @@ -153,7 +155,7 @@ public: }; /// Creates pending request for resource; blocks while resource is not available (unless `PostponeLocking`) - explicit ResourceGuard(const Metrics * metrics, ResourceLink link_, ResourceCost cost = 1, ResourceGuardCtor ctor = LockStraightAway) + explicit ResourceGuard(const Metrics * metrics, ResourceLink link_, ResourceCost cost = 1, ResourceGuard::Lock type = ResourceGuard::Lock::StraightAway) : link(link_) , request(Request::local(metrics)) { @@ -162,7 +164,7 @@ public: else if (link) { request.enqueue(cost, link); - if (ctor == LockStraightAway) + if (type == Lock::StraightAway) request.wait(); } } @@ -179,18 +181,25 @@ public: request.wait(); } - /// Report resource consumption has finished - void unlock() + void consume(ResourceCost cost) { + real_cost += cost; + } + + /// Report resource consumption has finished + void unlock(ResourceCost consumed = 0) + { + consume(consumed); if (link) { - request.finish(); + request.finish(real_cost, link); link.reset(); } } ResourceLink link; Request & request; + ResourceCost real_cost = 0; }; } diff --git a/src/Common/Scheduler/ResourceRequest.h b/src/Common/Scheduler/ResourceRequest.h index d64f624cec5..7b6a5af0fe6 100644 --- a/src/Common/Scheduler/ResourceRequest.h +++ b/src/Common/Scheduler/ResourceRequest.h @@ -45,7 +45,7 @@ constexpr ResourceCost ResourceCostMax = std::numeric_limits::max(); class ResourceRequest : public boost::intrusive::list_base_hook<> { public: - /// Cost of request execution; should be filled before request enqueueing. + /// Cost of request execution; should be filled before request enqueueing and remain constant until `finish()`. /// NOTE: If cost is not known in advance, ResourceBudget should be used (note that every ISchedulerQueue has it) ResourceCost cost; diff --git a/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp b/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp index ecc4168c729..626220a843e 100644 --- a/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp @@ -116,15 +116,13 @@ bool ReadBufferFromAzureBlobStorage::nextImpl() { ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, to_read_bytes); bytes_read = data_stream->ReadToCount(reinterpret_cast(data_ptr), to_read_bytes); - read_settings.io_scheduling.read_resource_link.adjust(to_read_bytes, bytes_read); - rlock.unlock(); // Do not hold resource under bandwidth throttler + rlock.unlock(bytes_read); // Do not hold resource under bandwidth throttler if (read_settings.remote_throttler) read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds); break; } catch (const Azure::Core::RequestFailedException & e) { - read_settings.io_scheduling.read_resource_link.accumulate(to_read_bytes); // We assume no resource was used in case of failure ProfileEvents::increment(ProfileEvents::ReadBufferFromAzureRequestsErrors); LOG_DEBUG(log, "Exception caught during Azure Read for file {} at attempt {}/{}: {}", path, i + 1, max_single_read_retries, e.Message); diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp index 82ce9d32f5a..d040200c31e 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp @@ -94,13 +94,11 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function func, { ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, cost); // Note that zero-cost requests are ignored func(); + rlock.unlock(cost); break; } catch (const Azure::Core::RequestFailedException & e) { - if (cost) - write_settings.io_scheduling.write_resource_link.accumulate(cost); // Accumulate resource for later use, because we have failed to consume it - if (i == num_tries - 1 || !isRetryableAzureException(e)) throw; @@ -108,8 +106,6 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function func, } catch (...) { - if (cost) - write_settings.io_scheduling.write_resource_link.accumulate(cost); // We assume no resource was used in case of failure throw; } } diff --git a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp index d1502577bf7..0fbd123508e 100644 --- a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp +++ b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp @@ -119,18 +119,9 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory(num_bytes_to_read)); - read_settings.io_scheduling.read_resource_link.adjust(num_bytes_to_read, std::max(0, bytes_read)); - } - catch (...) - { - read_settings.io_scheduling.read_resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure - throw; - } + ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, num_bytes_to_read); + int bytes_read = hdfsRead(fs.get(), fin, internal_buffer.begin(), safe_cast(num_bytes_to_read)); + rlock.unlock(std::max(0, bytes_read)); if (bytes_read < 0) { diff --git a/src/Storages/ObjectStorage/HDFS/WriteBufferFromHDFS.cpp b/src/Storages/ObjectStorage/HDFS/WriteBufferFromHDFS.cpp index 65a5f45cd96..816de6676f1 100644 --- a/src/Storages/ObjectStorage/HDFS/WriteBufferFromHDFS.cpp +++ b/src/Storages/ObjectStorage/HDFS/WriteBufferFromHDFS.cpp @@ -66,18 +66,9 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl int write(const char * start, size_t size) { - int bytes_written; - try - { - ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, size); - bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast(size)); - write_settings.io_scheduling.write_resource_link.adjust(size, std::max(0, bytes_written)); - } - catch (...) - { - write_settings.io_scheduling.write_resource_link.accumulate(size); // We assume no resource was used in case of failure - throw; - } + ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, size); + int bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast(size)); + rlock.unlock(std::max(0, bytes_written)); if (bytes_written < 0) throw Exception(ErrorCodes::NETWORK_ERROR, "Fail to write HDFS file: {} {}", hdfs_uri, std::string(hdfsGetLastError())); From a5eeeb3422e956212c15382af13ca45012efc96b Mon Sep 17 00:00:00 2001 From: serxa Date: Fri, 14 Jun 2024 13:01:14 +0000 Subject: [PATCH 06/26] bugfix: wrong estimated cost passed for budget adjusting --- src/Common/Scheduler/ISchedulerQueue.h | 19 +++++-------------- src/Common/Scheduler/ResouceLink.cpp | 25 ------------------------- src/Common/Scheduler/ResourceGuard.h | 7 ++++--- src/Common/Scheduler/ResourceLink.h | 4 ---- 4 files changed, 9 insertions(+), 46 deletions(-) delete mode 100644 src/Common/Scheduler/ResouceLink.cpp diff --git a/src/Common/Scheduler/ISchedulerQueue.h b/src/Common/Scheduler/ISchedulerQueue.h index 532f4bf6c63..b7a51870a24 100644 --- a/src/Common/Scheduler/ISchedulerQueue.h +++ b/src/Common/Scheduler/ISchedulerQueue.h @@ -22,10 +22,13 @@ public: {} // Wrapper for `enqueueRequest()` that should be used to account for available resource budget - void enqueueRequestUsingBudget(ResourceRequest * request) + // Returns `estimated_cost` that should be passed later to `adjustBudget()` + [[ nodiscard ]] ResourceCost enqueueRequestUsingBudget(ResourceRequest * request) { - request->cost = budget.ask(request->cost); + ResourceCost estimated_cost = request->cost; + request->cost = budget.ask(estimated_cost); enqueueRequest(request); + return estimated_cost; } // Should be called to account for difference between real and estimated costs @@ -34,18 +37,6 @@ public: budget.adjust(estimated_cost, real_cost); } - // Adjust budget to account for extra consumption of `cost` resource units - void consumeBudget(ResourceCost cost) - { - adjustBudget(0, cost); - } - - // Adjust budget to account for requested, but not consumed `cost` resource units - void accumulateBudget(ResourceCost cost) - { - adjustBudget(cost, 0); - } - /// Enqueue new request to be executed using underlying resource. /// Should be called outside of scheduling subsystem, implementation must be thread-safe. virtual void enqueueRequest(ResourceRequest * request) = 0; diff --git a/src/Common/Scheduler/ResouceLink.cpp b/src/Common/Scheduler/ResouceLink.cpp deleted file mode 100644 index 2da5dba62dc..00000000000 --- a/src/Common/Scheduler/ResouceLink.cpp +++ /dev/null @@ -1,25 +0,0 @@ -#include -#include -#include - -namespace DB -{ -void ResourceLink::adjust(ResourceCost estimated_cost, ResourceCost real_cost) const -{ - if (queue) - queue->adjustBudget(estimated_cost, real_cost); -} - -void ResourceLink::consumed(ResourceCost cost) const -{ - if (queue) - queue->consumeBudget(cost); -} - -void ResourceLink::accumulate(DB::ResourceCost cost) const -{ - if (queue) - queue->accumulateBudget(cost); -} -} - diff --git a/src/Common/Scheduler/ResourceGuard.h b/src/Common/Scheduler/ResourceGuard.h index 93b6268a62c..c46a3683455 100644 --- a/src/Common/Scheduler/ResourceGuard.h +++ b/src/Common/Scheduler/ResourceGuard.h @@ -96,7 +96,7 @@ public: chassert(state == Finished); state = Enqueued; ResourceRequest::reset(cost_); - link_.queue->enqueueRequestUsingBudget(this); + estimated_cost = link_.queue->enqueueRequestUsingBudget(this); // NOTE: it modifies `cost` and enqueues request } // This function is executed inside scheduler thread and wakes thread issued this `request`. @@ -124,8 +124,8 @@ public: // lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread chassert(state == Dequeued); state = Finished; - if (cost != real_cost_) - link_.adjust(cost, real_cost_); + if (estimated_cost != real_cost_) + link_.queue->adjustBudget(estimated_cost, real_cost_); ResourceRequest::finish(); ProfileEvents::increment(metrics->requests); ProfileEvents::increment(metrics->cost, real_cost_); @@ -149,6 +149,7 @@ public: const Metrics * metrics = nullptr; // Must be initialized before use private: + ResourceCost estimated_cost = 0; // Stores initial `cost` value in case budget was used to modify it std::mutex mutex; std::condition_variable dequeued_cv; RequestState state = Finished; diff --git a/src/Common/Scheduler/ResourceLink.h b/src/Common/Scheduler/ResourceLink.h index 6dd3be930ca..a4e2adbd963 100644 --- a/src/Common/Scheduler/ResourceLink.h +++ b/src/Common/Scheduler/ResourceLink.h @@ -17,10 +17,6 @@ struct ResourceLink bool operator==(const ResourceLink &) const = default; explicit operator bool() const { return queue != nullptr; } - void adjust(ResourceCost estimated_cost, ResourceCost real_cost) const; - void consumed(ResourceCost cost) const; - void accumulate(ResourceCost cost) const; - void reset() { queue = nullptr; From ec71d35aa64e22d62715d313f642b81f9e74f36c Mon Sep 17 00:00:00 2001 From: serxa Date: Fri, 14 Jun 2024 13:12:23 +0000 Subject: [PATCH 07/26] add test for scheduler queue budget --- .../Nodes/tests/gtest_resource_scheduler.cpp | 42 ++++++++++++++++++- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp b/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp index ff3054d6b7a..ddfe0cfbc6f 100644 --- a/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp +++ b/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp @@ -1,11 +1,13 @@ #include -#include - #include +#include +#include + #include #include +#include using namespace DB; @@ -22,6 +24,17 @@ struct ResourceTest : public ResourceTestBase { scheduler.stop(true); } + + std::mutex rng_mutex; + pcg64 rng{randomSeed()}; + + template + T randomInt(T from, T to) + { + std::uniform_int_distribution distribution(from, to); + std::lock_guard lock(rng_mutex); + return distribution(rng); + } }; struct ResourceHolder @@ -133,6 +146,31 @@ TEST(SchedulerRoot, Smoke) } } +TEST(SchedulerRoot, Budget) +{ + ResourceTest t; + + ResourceHolder r1(t); + r1.add("/", "1"); + r1.add("/prio"); + auto a = r1.addQueue("/prio/A", ""); + r1.registerResource(); + + ResourceCost total_real_cost = 0; + int total_requests = 10; + for (int i = 0 ; i < total_requests; i++) + { + ResourceCost est_cost = t.randomInt(1, 10); + ResourceCost real_cost = t.randomInt(0, 10); + ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), a, est_cost); + rg.consume(real_cost); + total_real_cost += real_cost; + } + + EXPECT_EQ(total_requests, a.queue->dequeued_requests); + EXPECT_EQ(total_real_cost, a.queue->dequeued_cost - a.queue->getBudget()); +} + TEST(SchedulerRoot, Cancel) { ResourceTest t; From 709ef2ba8539bfe0316817374e9fe4c0768e502b Mon Sep 17 00:00:00 2001 From: serxa Date: Fri, 14 Jun 2024 16:09:00 +0000 Subject: [PATCH 08/26] add metrics and budget checks --- tests/integration/test_scheduler/test.py | 59 ++++++++++++++++++++---- 1 file changed, 51 insertions(+), 8 deletions(-) diff --git a/tests/integration/test_scheduler/test.py b/tests/integration/test_scheduler/test.py index 9940e16ea42..c6338ec3eb1 100644 --- a/tests/integration/test_scheduler/test.py +++ b/tests/integration/test_scheduler/test.py @@ -28,6 +28,19 @@ def start_cluster(): cluster.shutdown() +def check_profile_event_for_query(workload, profile_event, amount=1): + node.query("system flush logs") + query_pattern = f"workload='{workload}'".replace("'", "\\'") + assert ( + int( + node.query( + f"select ProfileEvents['{profile_event}'] from system.query_log where query ilike '%{query_pattern}%' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1" + ) + ) + == amount + ) + + def test_s3_resource_request_granularity(): node.query( f""" @@ -50,6 +63,11 @@ def test_s3_resource_request_granularity(): f"select dequeued_cost from system.scheduler where resource='network_write' and path='/prio/admin'" ).strip() ) + write_budget_before = int( + node.query( + f"select budget from system.scheduler where resource='network_write' and path='/prio/admin'" + ).strip() + ) node.query(f"insert into data select number, randomString(10000000) from numbers(5) SETTINGS workload='admin'") writes_after = int( node.query( @@ -61,11 +79,22 @@ def test_s3_resource_request_granularity(): f"select dequeued_cost from system.scheduler where resource='network_write' and path='/prio/admin'" ).strip() ) + write_budget_after = int( + node.query( + f"select budget from system.scheduler where resource='network_write' and path='/prio/admin'" + ).strip() + ) - assert write_bytes_after - write_bytes_before > 1.0 * total_bytes - assert write_bytes_after - write_bytes_before < 1.2 * total_bytes - assert (write_bytes_after - write_bytes_before) / (writes_after - writes_before) < max_bytes_per_request - assert (write_bytes_after - write_bytes_before) / (writes_after - writes_before) > min_bytes_per_request + write_requests = writes_after - writes_before + write_bytes = (write_bytes_after - write_bytes_before) - (write_budget_after - write_budget_before) + assert write_bytes > 1.0 * total_bytes + assert write_bytes < 1.05 * total_bytes + assert write_bytes / write_requests < max_bytes_per_request + assert write_bytes / write_requests > min_bytes_per_request + check_profile_event_for_query("admin", "SchedulerIOWriteRequests", write_requests) + check_profile_event_for_query("admin", "SchedulerIOWriteBytes", write_bytes) + + node.query(f"optimize table data final") reads_before = int( node.query( @@ -77,6 +106,11 @@ def test_s3_resource_request_granularity(): f"select dequeued_cost from system.scheduler where resource='network_read' and path='/prio/admin'" ).strip() ) + read_budget_before = int( + node.query( + f"select budget from system.scheduler where resource='network_read' and path='/prio/admin'" + ).strip() + ) node.query(f"select count() from data where not ignore(*) SETTINGS workload='admin'") reads_after = int( node.query( @@ -88,11 +122,20 @@ def test_s3_resource_request_granularity(): f"select dequeued_cost from system.scheduler where resource='network_read' and path='/prio/admin'" ).strip() ) + read_budget_after = int( + node.query( + f"select budget from system.scheduler where resource='network_read' and path='/prio/admin'" + ).strip() + ) - assert read_bytes_after - read_bytes_before > 1.0 * total_bytes - assert read_bytes_after - read_bytes_before < 1.2 * total_bytes - assert (read_bytes_after - read_bytes_before) / (reads_after - reads_before) < max_bytes_per_request - assert (read_bytes_after - read_bytes_before) / (reads_after - reads_before) > min_bytes_per_request + read_bytes = (read_bytes_after - read_bytes_before) - (read_budget_after - read_budget_before) + read_requests = reads_after - reads_before + assert read_bytes > 1.0 * total_bytes + assert read_bytes < 1.05 * total_bytes + assert read_bytes / read_requests < max_bytes_per_request + assert read_bytes / read_requests > min_bytes_per_request + check_profile_event_for_query("admin", "SchedulerIOReadRequests", read_requests) + check_profile_event_for_query("admin", "SchedulerIOReadBytes", read_bytes) def test_s3_disk(): From f3c3d419bbce4f7edecf807fdbaca8557ee36d1f Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Fri, 14 Jun 2024 16:17:57 +0000 Subject: [PATCH 09/26] Automatic style fix --- tests/integration/test_scheduler/test.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_scheduler/test.py b/tests/integration/test_scheduler/test.py index 11525d3fd62..5c7da0e2516 100644 --- a/tests/integration/test_scheduler/test.py +++ b/tests/integration/test_scheduler/test.py @@ -88,7 +88,9 @@ def test_s3_resource_request_granularity(): ) write_requests = writes_after - writes_before - write_bytes = (write_bytes_after - write_bytes_before) - (write_budget_after - write_budget_before) + write_bytes = (write_bytes_after - write_bytes_before) - ( + write_budget_after - write_budget_before + ) assert write_bytes > 1.0 * total_bytes assert write_bytes < 1.05 * total_bytes assert write_bytes / write_requests < max_bytes_per_request @@ -132,7 +134,9 @@ def test_s3_resource_request_granularity(): ).strip() ) - read_bytes = (read_bytes_after - read_bytes_before) - (read_budget_after - read_budget_before) + read_bytes = (read_bytes_after - read_bytes_before) - ( + read_budget_after - read_budget_before + ) read_requests = reads_after - reads_before assert read_bytes > 1.0 * total_bytes assert read_bytes < 1.05 * total_bytes From a9bfaf6454f38ce9e6ef57fdcaba864b47d49e9f Mon Sep 17 00:00:00 2001 From: serxa Date: Tue, 18 Jun 2024 11:52:40 +0000 Subject: [PATCH 10/26] style --- src/Common/HTTPConnectionPool.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/Common/HTTPConnectionPool.cpp b/src/Common/HTTPConnectionPool.cpp index a713bd14d62..bac12fd438d 100644 --- a/src/Common/HTTPConnectionPool.cpp +++ b/src/Common/HTTPConnectionPool.cpp @@ -383,16 +383,15 @@ private: auto idle = idleTime(); // Reset data hooks for IO scheduling - if (ResourceLink link = CurrentThread::getReadResourceLink()) { + if (ResourceLink link = CurrentThread::getReadResourceLink()) Session::setReceiveDataHooks(std::make_shared(ResourceGuard::Metrics::getIORead(), link)); - } else { + else Session::setReceiveDataHooks(); - } - if (ResourceLink link = CurrentThread::getWriteResourceLink()) { + + if (ResourceLink link = CurrentThread::getWriteResourceLink()) Session::setSendDataHooks(std::make_shared(ResourceGuard::Metrics::getIOWrite(), link)); - } else { + else Session::setSendDataHooks(); - } std::ostream & result = Session::sendRequest(request); result.exceptions(std::ios::badbit); From 7a01b8189cadb9b720182548c3c49f9267f8a606 Mon Sep 17 00:00:00 2001 From: serxa Date: Wed, 19 Jun 2024 00:49:21 +0000 Subject: [PATCH 11/26] fix typo, renames --- base/poco/Net/include/Poco/Net/HTTPSession.h | 2 +- src/Common/Scheduler/Nodes/tests/ResourceTest.h | 6 +++--- .../Nodes/tests/gtest_dynamic_resource_manager.cpp | 2 +- src/Common/Scheduler/ResourceGuard.h | 10 +++++----- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/base/poco/Net/include/Poco/Net/HTTPSession.h b/base/poco/Net/include/Poco/Net/HTTPSession.h index b0e59443f9b..b25ad68cc67 100644 --- a/base/poco/Net/include/Poco/Net/HTTPSession.h +++ b/base/poco/Net/include/Poco/Net/HTTPSession.h @@ -49,7 +49,7 @@ namespace Net /// Called when sending/receiving of data `bytes` is successfully finished. virtual void fail() = 0; - /// If an error occured during send/receive `fail()` is called instead of `finish()`. + /// If an error occurred during send/receive `fail()` is called instead of `finish()`. }; diff --git a/src/Common/Scheduler/Nodes/tests/ResourceTest.h b/src/Common/Scheduler/Nodes/tests/ResourceTest.h index c440cb176f8..c787a686a09 100644 --- a/src/Common/Scheduler/Nodes/tests/ResourceTest.h +++ b/src/Common/Scheduler/Nodes/tests/ResourceTest.h @@ -232,7 +232,7 @@ struct ResourceTestManager : public ResourceTestBase ResourceTestManager & t; Guard(ResourceTestManager & t_, ResourceLink link_, ResourceCost cost) - : ResourceGuard(ResourceGuard::Metrics::getIOWrite(), link_, cost, Lock::Postpone) + : ResourceGuard(ResourceGuard::Metrics::getIOWrite(), link_, cost, Lock::Defer) , t(t_) { t.onEnqueue(link); @@ -311,7 +311,7 @@ struct ResourceTestManager : public ResourceTestBase // NOTE: actually leader's request(s) make their own small busy period. void blockResource(ResourceLink link) { - ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, 1, ResourceGuard::Lock::Postpone); + ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, 1, ResourceGuard::Lock::Defer); g.lock(); g.consume(1); // NOTE: at this point we assume resource to be blocked by single request (1) @@ -322,7 +322,7 @@ struct ResourceTestManager : public ResourceTestBase { getLinkData(link).left += total_requests + 1; busy_period.arrive_and_wait(); // (1) wait leader to block resource - ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, cost, ResourceGuard::Lock::Postpone); + ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, cost, ResourceGuard::Lock::Defer); onEnqueue(link); busy_period.arrive_and_wait(); // (2) notify leader to unblock g.lock(); diff --git a/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp b/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp index 0f4aaab70aa..3328196cced 100644 --- a/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp +++ b/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp @@ -36,7 +36,7 @@ TEST(SchedulerDynamicResourceManager, Smoke) for (int i = 0; i < 10; i++) { - ResourceGuard gA(ResourceGuard::Metrics::getIOWrite(), cA->get("res1"), 1, ResourceGuard::Lock::Postpone); + ResourceGuard gA(ResourceGuard::Metrics::getIOWrite(), cA->get("res1"), 1, ResourceGuard::Lock::Defer); gA.lock(); gA.consume(1); gA.unlock(); diff --git a/src/Common/Scheduler/ResourceGuard.h b/src/Common/Scheduler/ResourceGuard.h index c46a3683455..2e735aae656 100644 --- a/src/Common/Scheduler/ResourceGuard.h +++ b/src/Common/Scheduler/ResourceGuard.h @@ -44,10 +44,10 @@ class ResourceGuard public: enum class Lock { - StraightAway, /// Locks inside constructor (default) + Default, /// Locks inside constructor // WARNING: Only for tests. It is not exception-safe because `lock()` must be called after construction. - Postpone /// Don't lock in constructor, but send request + Defer /// Don't lock in constructor, but send request }; struct Metrics @@ -155,8 +155,8 @@ public: RequestState state = Finished; }; - /// Creates pending request for resource; blocks while resource is not available (unless `PostponeLocking`) - explicit ResourceGuard(const Metrics * metrics, ResourceLink link_, ResourceCost cost = 1, ResourceGuard::Lock type = ResourceGuard::Lock::StraightAway) + /// Creates pending request for resource; blocks while resource is not available (unless `Lock::Defer`) + explicit ResourceGuard(const Metrics * metrics, ResourceLink link_, ResourceCost cost = 1, ResourceGuard::Lock type = ResourceGuard::Lock::Default) : link(link_) , request(Request::local(metrics)) { @@ -165,7 +165,7 @@ public: else if (link) { request.enqueue(cost, link); - if (type == Lock::StraightAway) + if (type == Lock::Default) request.wait(); } } From 54fda05094d1d3d788e0e65591c0d15b0e5838b0 Mon Sep 17 00:00:00 2001 From: serxa Date: Fri, 5 Jul 2024 10:25:04 +0000 Subject: [PATCH 12/26] reset session send/recv hooks at connection destruction --- src/Common/HTTPConnectionPool.cpp | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/Common/HTTPConnectionPool.cpp b/src/Common/HTTPConnectionPool.cpp index bac12fd438d..3798b7624ea 100644 --- a/src/Common/HTTPConnectionPool.cpp +++ b/src/Common/HTTPConnectionPool.cpp @@ -385,13 +385,8 @@ private: // Reset data hooks for IO scheduling if (ResourceLink link = CurrentThread::getReadResourceLink()) Session::setReceiveDataHooks(std::make_shared(ResourceGuard::Metrics::getIORead(), link)); - else - Session::setReceiveDataHooks(); - if (ResourceLink link = CurrentThread::getWriteResourceLink()) Session::setSendDataHooks(std::make_shared(ResourceGuard::Metrics::getIOWrite(), link)); - else - Session::setSendDataHooks(); std::ostream & result = Session::sendRequest(request); result.exceptions(std::ios::badbit); @@ -449,9 +444,8 @@ private: } } response_stream = nullptr; - // FIXME: We are not sure that response stream is fully read at this moment, so hooks could possible be called after this point, right? - // Session::setSendDataHooks(); - // Session::setReceiveDataHooks(); + Session::setSendDataHooks(); + Session::setReceiveDataHooks(); group->atConnectionDestroy(); From 3fa5ad92b4612b59ca9c35aab5853dcb3318a112 Mon Sep 17 00:00:00 2001 From: serxa Date: Fri, 5 Jul 2024 10:33:22 +0000 Subject: [PATCH 13/26] rename `IHTTPSessionDataHooks` methods --- base/poco/Net/include/Poco/Net/HTTPSession.h | 6 +++--- base/poco/Net/src/HTTPSession.cpp | 12 ++++++------ src/Common/HTTPConnectionPool.cpp | 12 ++++++------ 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/base/poco/Net/include/Poco/Net/HTTPSession.h b/base/poco/Net/include/Poco/Net/HTTPSession.h index b25ad68cc67..2038fd2aff5 100644 --- a/base/poco/Net/include/Poco/Net/HTTPSession.h +++ b/base/poco/Net/include/Poco/Net/HTTPSession.h @@ -42,13 +42,13 @@ namespace Net public: virtual ~IHTTPSessionDataHooks() = default; - virtual void start(int bytes) = 0; + virtual void atStart(int bytes) = 0; /// Called before sending/receiving data `bytes` to/from socket. - virtual void finish(int bytes) = 0; + virtual void atFinish(int bytes) = 0; /// Called when sending/receiving of data `bytes` is successfully finished. - virtual void fail() = 0; + virtual void atFail() = 0; /// If an error occurred during send/receive `fail()` is called instead of `finish()`. }; diff --git a/base/poco/Net/src/HTTPSession.cpp b/base/poco/Net/src/HTTPSession.cpp index 596185703fa..f30ccb21129 100644 --- a/base/poco/Net/src/HTTPSession.cpp +++ b/base/poco/Net/src/HTTPSession.cpp @@ -167,16 +167,16 @@ int HTTPSession::write(const char* buffer, std::streamsize length) try { if (_sendDataHooks) - _sendDataHooks->start((int) length); + _sendDataHooks->atStart((int) length); int result = _socket.sendBytes(buffer, (int) length); if (_sendDataHooks) - _sendDataHooks->finish(result); + _sendDataHooks->atFinish(result); return result; } catch (Poco::Exception& exc) { if (_sendDataHooks) - _sendDataHooks->fail(); + _sendDataHooks->atFail(); setException(exc); throw; } @@ -188,16 +188,16 @@ int HTTPSession::receive(char* buffer, int length) try { if (_receiveDataHooks) - _receiveDataHooks->start(length); + _receiveDataHooks->atStart(length); int result = _socket.receiveBytes(buffer, length); if (_receiveDataHooks) - _receiveDataHooks->finish(result); + _receiveDataHooks->atFinish(result); return result; } catch (Poco::Exception& exc) { if (_receiveDataHooks) - _receiveDataHooks->fail(); + _receiveDataHooks->atFail(); setException(exc); throw; } diff --git a/src/Common/HTTPConnectionPool.cpp b/src/Common/HTTPConnectionPool.cpp index 3798b7624ea..62362c61605 100644 --- a/src/Common/HTTPConnectionPool.cpp +++ b/src/Common/HTTPConnectionPool.cpp @@ -239,13 +239,13 @@ public: // Session data hooks implementation for integration with resource scheduler. // Hooks are created per every request-response pair and are registered/unregistered in HTTP session. -// * `start()` send resource request to the scheduler every time HTTP session is going to send or receive +// * `atStart()` send resource request to the scheduler every time HTTP session is going to send or receive // data to/from socket. `start()` waits for the scheduler confirmation. This way scheduler might // throttle and/or schedule socket data streams. -// * `finish()` hook is called on successful socket read/write operation. +// * `atFinish()` hook is called on successful socket read/write operation. // It informs the scheduler that operation is complete, which allows the scheduler to control the total // amount of in-flight bytes and/or operations. -// * `fail()` hook is called on failure of socket operation. The purpose is to correct the amount of bytes +// * `atFail()` hook is called on failure of socket operation. The purpose is to correct the amount of bytes // passed through the scheduler queue to ensure fair bandwidth allocation even in presence of errors. struct ResourceGuardSessionDataHooks : public Poco::Net::IHTTPSessionDataHooks { @@ -261,18 +261,18 @@ struct ResourceGuardSessionDataHooks : public Poco::Net::IHTTPSessionDataHooks request.assertFinished(); // Never destruct with an active request } - void start(int bytes) override + void atStart(int bytes) override { request.enqueue(bytes, link); request.wait(); } - void finish(int bytes) override + void atFinish(int bytes) override { request.finish(bytes, link); } - void fail() override + void atFail() override { request.finish(0, link); } From fefcc52c24b4560cc2434decdbd0981edfcbc5ca Mon Sep 17 00:00:00 2001 From: serxa Date: Fri, 5 Jul 2024 11:09:10 +0000 Subject: [PATCH 14/26] add logging of too long resource requests for http sessions --- src/Common/HTTPConnectionPool.cpp | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/Common/HTTPConnectionPool.cpp b/src/Common/HTTPConnectionPool.cpp index 62362c61605..acddcc8530d 100644 --- a/src/Common/HTTPConnectionPool.cpp +++ b/src/Common/HTTPConnectionPool.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -249,8 +250,10 @@ public: // passed through the scheduler queue to ensure fair bandwidth allocation even in presence of errors. struct ResourceGuardSessionDataHooks : public Poco::Net::IHTTPSessionDataHooks { - explicit ResourceGuardSessionDataHooks(const ResourceGuard::Metrics * metrics, ResourceLink link_) + ResourceGuardSessionDataHooks(ResourceLink link_, const ResourceGuard::Metrics * metrics, LoggerPtr log_, const String & method, const String & uri) : link(link_) + , log(log_) + , http_request(method + " " + uri) { request.metrics = metrics; chassert(link); @@ -263,8 +266,12 @@ struct ResourceGuardSessionDataHooks : public Poco::Net::IHTTPSessionDataHooks void atStart(int bytes) override { + Stopwatch timer; request.enqueue(bytes, link); request.wait(); + timer.stop(); + if (timer.elapsedMilliseconds() >= 5000) + LOG_INFO(log, "Resource request took too long to finish: {} ms for {}", timer.elapsedMilliseconds(), http_request); } void atFinish(int bytes) override @@ -279,6 +286,8 @@ struct ResourceGuardSessionDataHooks : public Poco::Net::IHTTPSessionDataHooks ResourceLink link; ResourceGuard::Request request; + LoggerPtr log; + String http_request; }; @@ -384,9 +393,9 @@ private: // Reset data hooks for IO scheduling if (ResourceLink link = CurrentThread::getReadResourceLink()) - Session::setReceiveDataHooks(std::make_shared(ResourceGuard::Metrics::getIORead(), link)); + Session::setReceiveDataHooks(std::make_shared(link, ResourceGuard::Metrics::getIORead(), log, request.getMethod(), request.getURI())); if (ResourceLink link = CurrentThread::getWriteResourceLink()) - Session::setSendDataHooks(std::make_shared(ResourceGuard::Metrics::getIOWrite(), link)); + Session::setSendDataHooks(std::make_shared(link, ResourceGuard::Metrics::getIOWrite(), log, request.getMethod(), request.getURI())); std::ostream & result = Session::sendRequest(request); result.exceptions(std::ios::badbit); From 3bff7ddcf8891d091bc5be2b827172029fb8b76f Mon Sep 17 00:00:00 2001 From: serxa Date: Fri, 2 Aug 2024 13:19:00 +0000 Subject: [PATCH 15/26] fix data race: delay reset of data hooks until the next sendRequest() --- src/Common/HTTPConnectionPool.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Common/HTTPConnectionPool.cpp b/src/Common/HTTPConnectionPool.cpp index acddcc8530d..68c13838c04 100644 --- a/src/Common/HTTPConnectionPool.cpp +++ b/src/Common/HTTPConnectionPool.cpp @@ -394,8 +394,12 @@ private: // Reset data hooks for IO scheduling if (ResourceLink link = CurrentThread::getReadResourceLink()) Session::setReceiveDataHooks(std::make_shared(link, ResourceGuard::Metrics::getIORead(), log, request.getMethod(), request.getURI())); + else + Session::setReceiveDataHooks(); if (ResourceLink link = CurrentThread::getWriteResourceLink()) Session::setSendDataHooks(std::make_shared(link, ResourceGuard::Metrics::getIOWrite(), log, request.getMethod(), request.getURI())); + else + Session::setSendDataHooks(); std::ostream & result = Session::sendRequest(request); result.exceptions(std::ios::badbit); @@ -453,8 +457,6 @@ private: } } response_stream = nullptr; - Session::setSendDataHooks(); - Session::setReceiveDataHooks(); group->atConnectionDestroy(); From 1f2f3c69b9bcc4d7f19962cbc2562c6b3ce404dd Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Fri, 26 Jan 2024 23:26:04 +0100 Subject: [PATCH 16/26] New metrics for ThreadPool - Introduced performance metrics for better monitoring and troubleshooting of ThreadPool. --- src/Common/ProfileEvents.cpp | 12 +++++++ src/Common/ThreadPool.cpp | 69 ++++++++++++++++++++++++++++++++++-- 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index d43d9fdcea8..6bcdf9d8e91 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -86,6 +86,18 @@ M(NetworkReceiveBytes, "Total number of bytes received from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \ M(NetworkSendBytes, "Total number of bytes send to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \ \ + M(GlobalThreadPoolExpansions, "Counts the total number of times new threads have been added to the global thread pool. This metric indicates the frequency of expansions in the global thread pool to accommodate increased processing demands.") \ + M(GlobalThreadPoolShrinks, "Counts the total number of times the global thread pool has shrunk by removing threads. This occurs when the number of idle threads exceeds max_thread_pool_free_size, indicating adjustments in the global thread pool size in response to decreased thread utilization.") \ + M(GlobalThreadPoolThreadCreationMicroseconds, "Total time spent waiting for new threads to start.") \ + M(GlobalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the global thread pool.") \ + M(GlobalThreadPoolJobs, "Counts the number of jobs that have been pushed to the global thread pool.") \ + M(LocalThreadPoolExpansions, "Counts the total number of times threads have been borrowed from the global thread pool to expand local thread pools.") \ + M(LocalThreadPoolShrinks, "Counts the total number of times threads have been returned to the global thread pool from local thread pools.") \ + M(LocalThreadPoolThreadCreationMicroseconds, "Total time local thread pools have spent waiting to borrow a thread from the global pool.") \ + M(LocalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the local thread pools.") \ + M(LocalThreadPoolJobs, "Counts the number of jobs that have been pushed to the local thread pools.") \ + M(LocalThreadPoolBusyMicroseconds, "Total time threads have spent executing the actual work.") \ + \ M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \ M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \ M(DiskS3PutRequestThrottlerCount, "Number of DiskS3 PUT, COPY, POST and LIST requests passed through throttler.") \ diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index c8f1ae99969..d93859e1abc 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -27,6 +28,22 @@ namespace CurrentMetrics extern const Metric GlobalThreadScheduled; } +namespace ProfileEvents +{ + extern const Event GlobalThreadPoolExpansions; + extern const Event GlobalThreadPoolShrinks; + extern const Event GlobalThreadPoolThreadCreationMicroseconds; + extern const Event GlobalThreadPoolLockWaitMicroseconds; + extern const Event GlobalThreadPoolJobs; + + extern const Event LocalThreadPoolExpansions; + extern const Event LocalThreadPoolShrinks; + extern const Event LocalThreadPoolThreadCreationMicroseconds; + extern const Event LocalThreadPoolLockWaitMicroseconds; + extern const Event LocalThreadPoolJobs; + extern const Event LocalThreadPoolBusyMicroseconds; +} + class JobWithPriority { public: @@ -180,14 +197,18 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: }; { + Stopwatch watch; std::unique_lock lock(mutex); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds, + watch.elapsedMicroseconds()); if (CannotAllocateThreadFaultInjector::injectFault()) return on_error("fault injected"); auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; }; - if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero. + if (wait_microseconds) /// Check for optional. Condition is true if the optional is set. Even if the value is zero. { if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred)) return on_error(fmt::format("no free thread (timeout={})", *wait_microseconds)); @@ -216,7 +237,13 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: try { + Stopwatch watch2; threads.front() = Thread([this, it = threads.begin()] { worker(it); }); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, + watch2.elapsedMicroseconds()); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); } catch (...) { @@ -239,6 +266,8 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: /// Wake up a free thread to run the new job. new_job_or_shutdown.notify_one(); + ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolJobs : ProfileEvents::LocalThreadPoolJobs); + return static_cast(true); } @@ -262,7 +291,14 @@ void ThreadPoolImpl::startNewThreadsNoLock() try { + Stopwatch watch; threads.front() = Thread([this, it = threads.begin()] { worker(it); }); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, + watch.elapsedMicroseconds()); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); + } catch (...) { @@ -293,7 +329,11 @@ void ThreadPoolImpl::scheduleOrThrow(Job job, Priority priority, uint64_ template void ThreadPoolImpl::wait() { + Stopwatch watch; std::unique_lock lock(mutex); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds, + watch.elapsedMicroseconds()); /// Signal here just in case. /// If threads are waiting on condition variables, but there are some jobs in the queue /// then it will prevent us from deadlock. @@ -334,7 +374,11 @@ void ThreadPoolImpl::finalize() /// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does). for (auto & thread : threads) + { thread.join(); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); + } threads.clear(); } @@ -391,7 +435,11 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ std::optional job_data; { + Stopwatch watch; std::unique_lock lock(mutex); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds, + watch.elapsedMicroseconds()); // Finish with previous job if any if (job_is_done) @@ -424,6 +472,8 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ { thread_it->detach(); threads.erase(thread_it); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); } return; } @@ -459,7 +509,22 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ CurrentMetrics::Increment metric_active_pool_threads(metric_active_threads); - job_data->job(); + if constexpr (!std::is_same_v) + { + Stopwatch watch; + job_data->job(); + // This metric is less relevant for the global thread pool, as it would show large values (time while + // a thread was used by local pools) and increment only when local pools are destroyed. + // + // In cases where global pool threads are used directly (without a local thread pool), distinguishing + // them is difficult. + ProfileEvents::increment(ProfileEvents::LocalThreadPoolBusyMicroseconds, watch.elapsedMicroseconds()); + } + else + { + job_data->job(); + } + if (thread_trace_context.root_span.isTraceEnabled()) { From 843d8696316fb6b648200585e411d3bce1b46f98 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Fri, 9 Feb 2024 08:14:05 +0100 Subject: [PATCH 17/26] add one more metric --- src/Common/ProfileEvents.cpp | 2 ++ src/Common/ThreadPool.cpp | 15 +++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 6bcdf9d8e91..c09599e7172 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -91,12 +91,14 @@ M(GlobalThreadPoolThreadCreationMicroseconds, "Total time spent waiting for new threads to start.") \ M(GlobalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the global thread pool.") \ M(GlobalThreadPoolJobs, "Counts the number of jobs that have been pushed to the global thread pool.") \ + M(GlobalThreadPoolJobWaitTimeMicroseconds, "Measures the elapsed time from when a job is scheduled in the thread pool to when it is picked up for execution by a worker thread. This metric helps identify delays in job processing, indicating the responsiveness of the thread pool to new tasks.") \ M(LocalThreadPoolExpansions, "Counts the total number of times threads have been borrowed from the global thread pool to expand local thread pools.") \ M(LocalThreadPoolShrinks, "Counts the total number of times threads have been returned to the global thread pool from local thread pools.") \ M(LocalThreadPoolThreadCreationMicroseconds, "Total time local thread pools have spent waiting to borrow a thread from the global pool.") \ M(LocalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the local thread pools.") \ M(LocalThreadPoolJobs, "Counts the number of jobs that have been pushed to the local thread pools.") \ M(LocalThreadPoolBusyMicroseconds, "Total time threads have spent executing the actual work.") \ + M(LocalThreadPoolJobWaitTimeMicroseconds, "Measures the elapsed time from when a job is scheduled in the thread pool to when it is picked up for execution by a worker thread. This metric helps identify delays in job processing, indicating the responsiveness of the thread pool to new tasks.") \ \ M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \ M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \ diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index d93859e1abc..0b28b7567a7 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -35,6 +35,7 @@ namespace ProfileEvents extern const Event GlobalThreadPoolThreadCreationMicroseconds; extern const Event GlobalThreadPoolLockWaitMicroseconds; extern const Event GlobalThreadPoolJobs; + extern const Event GlobalThreadPoolJobWaitTimeMicroseconds; extern const Event LocalThreadPoolExpansions; extern const Event LocalThreadPoolShrinks; @@ -42,6 +43,8 @@ namespace ProfileEvents extern const Event LocalThreadPoolLockWaitMicroseconds; extern const Event LocalThreadPoolJobs; extern const Event LocalThreadPoolBusyMicroseconds; + extern const Event LocalThreadPoolJobWaitTimeMicroseconds; + } class JobWithPriority @@ -57,6 +60,7 @@ public: /// Call stacks of all jobs' schedulings leading to this one std::vector frame_pointers; bool enable_job_stack_trace = false; + Stopwatch job_create_time; JobWithPriority( Job job_, Priority priority_, CurrentMetrics::Metric metric, @@ -76,6 +80,13 @@ public: { return priority > rhs.priority; // Reversed for `priority_queue` max-heap to yield minimum value (i.e. highest priority) first } + + UInt64 elapsedMicroseconds() const + { + return job_create_time.elapsedMicroseconds(); + } + + }; static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool"; @@ -483,6 +494,10 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ job_data = std::move(const_cast(jobs.top())); jobs.pop(); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolJobWaitTimeMicroseconds : ProfileEvents::LocalThreadPoolJobWaitTimeMicroseconds, + job_data->elapsedMicroseconds()); + /// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them. if (shutdown) { From d5aec83ce79127fc68f08c0e545d5d063d79f7cc Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Tue, 20 Aug 2024 22:35:32 +0200 Subject: [PATCH 18/26] Mark LocalThread metrics obsolete due to https://github.com/ClickHouse/ClickHouse/pull/47880 --- src/Common/CurrentMetrics.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 67890568941..204e09e92b2 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -75,9 +75,9 @@ M(GlobalThread, "Number of threads in global thread pool.") \ M(GlobalThreadActive, "Number of threads in global thread pool running a task.") \ M(GlobalThreadScheduled, "Number of queued or active jobs in global thread pool.") \ - M(LocalThread, "Number of threads in local thread pools. The threads in local thread pools are taken from the global thread pool.") \ - M(LocalThreadActive, "Number of threads in local thread pools running a task.") \ - M(LocalThreadScheduled, "Number of queued or active jobs in local thread pools.") \ + M(LocalThread, "Obsolete. Number of threads in local thread pools. The threads in local thread pools are taken from the global thread pool.") \ + M(LocalThreadActive, "Obsolete. Number of threads in local thread pools running a task.") \ + M(LocalThreadScheduled, "Obsolete. Number of queued or active jobs in local thread pools.") \ M(MergeTreeDataSelectExecutorThreads, "Number of threads in the MergeTreeDataSelectExecutor thread pool.") \ M(MergeTreeDataSelectExecutorThreadsActive, "Number of threads in the MergeTreeDataSelectExecutor thread pool running a task.") \ M(MergeTreeDataSelectExecutorThreadsScheduled, "Number of queued or active jobs in the MergeTreeDataSelectExecutor thread pool.") \ From a66bd99d2fbc21ac00c979f2b9b301b7c2fd9d44 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Tue, 20 Aug 2024 21:39:36 +0200 Subject: [PATCH 19/26] make jobs queue in the ThreadPool stable (i.e. FIFO for the same priority), otherwise some jobs can stay in queue untaken for a long time --- src/Common/ThreadPool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index a31d793264e..fd9149bda04 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -131,7 +131,7 @@ private: bool threads_remove_themselves = true; const bool shutdown_on_exception = true; - boost::heap::priority_queue jobs; + boost::heap::priority_queue> jobs; std::list threads; std::exception_ptr first_exception; std::stack on_destroy_callbacks; From 64d4ef002df41697247c8ed59586e02d7a4bfaf3 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Wed, 21 Aug 2024 18:20:05 +0200 Subject: [PATCH 20/26] fix style --- src/Common/ProfileEvents.cpp | 4 ++-- src/Common/ThreadPool.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index c09599e7172..044f787aee9 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -89,11 +89,11 @@ M(GlobalThreadPoolExpansions, "Counts the total number of times new threads have been added to the global thread pool. This metric indicates the frequency of expansions in the global thread pool to accommodate increased processing demands.") \ M(GlobalThreadPoolShrinks, "Counts the total number of times the global thread pool has shrunk by removing threads. This occurs when the number of idle threads exceeds max_thread_pool_free_size, indicating adjustments in the global thread pool size in response to decreased thread utilization.") \ M(GlobalThreadPoolThreadCreationMicroseconds, "Total time spent waiting for new threads to start.") \ - M(GlobalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the global thread pool.") \ + M(GlobalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the global thread pool.") \ M(GlobalThreadPoolJobs, "Counts the number of jobs that have been pushed to the global thread pool.") \ M(GlobalThreadPoolJobWaitTimeMicroseconds, "Measures the elapsed time from when a job is scheduled in the thread pool to when it is picked up for execution by a worker thread. This metric helps identify delays in job processing, indicating the responsiveness of the thread pool to new tasks.") \ M(LocalThreadPoolExpansions, "Counts the total number of times threads have been borrowed from the global thread pool to expand local thread pools.") \ - M(LocalThreadPoolShrinks, "Counts the total number of times threads have been returned to the global thread pool from local thread pools.") \ + M(LocalThreadPoolShrinks, "Counts the total number of times threads have been returned to the global thread pool from local thread pools.") \ M(LocalThreadPoolThreadCreationMicroseconds, "Total time local thread pools have spent waiting to borrow a thread from the global pool.") \ M(LocalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the local thread pools.") \ M(LocalThreadPoolJobs, "Counts the number of jobs that have been pushed to the local thread pools.") \ diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 0b28b7567a7..8685533e2d1 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -277,7 +277,7 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: /// Wake up a free thread to run the new job. new_job_or_shutdown.notify_one(); - ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolJobs : ProfileEvents::LocalThreadPoolJobs); + ProfileEvents::increment(std::is_same_v ? ProfileEvents::GlobalThreadPoolJobs : ProfileEvents::LocalThreadPoolJobs); return static_cast(true); } From c1a83f47341032ae90f43ba84a54c0ec0e60810c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Tue, 27 Aug 2024 13:44:17 +0200 Subject: [PATCH 21/26] Fix possible wrong result during anyHeavy state merge --- src/AggregateFunctions/AggregateFunctionAnyHeavy.cpp | 3 +++ tests/queries/0_stateless/03230_anyHeavy_merge.reference | 1 + tests/queries/0_stateless/03230_anyHeavy_merge.sql | 4 ++++ 3 files changed, 8 insertions(+) create mode 100644 tests/queries/0_stateless/03230_anyHeavy_merge.reference create mode 100644 tests/queries/0_stateless/03230_anyHeavy_merge.sql diff --git a/src/AggregateFunctions/AggregateFunctionAnyHeavy.cpp b/src/AggregateFunctions/AggregateFunctionAnyHeavy.cpp index ffddd46f2e3..dbc5f9be72f 100644 --- a/src/AggregateFunctions/AggregateFunctionAnyHeavy.cpp +++ b/src/AggregateFunctions/AggregateFunctionAnyHeavy.cpp @@ -68,7 +68,10 @@ public: if (data().isEqualTo(to.data())) counter += to.counter; else if (!data().has() || counter < to.counter) + { data().set(to.data(), arena); + counter = to.counter - counter; + } else counter -= to.counter; } diff --git a/tests/queries/0_stateless/03230_anyHeavy_merge.reference b/tests/queries/0_stateless/03230_anyHeavy_merge.reference new file mode 100644 index 00000000000..78981922613 --- /dev/null +++ b/tests/queries/0_stateless/03230_anyHeavy_merge.reference @@ -0,0 +1 @@ +a diff --git a/tests/queries/0_stateless/03230_anyHeavy_merge.sql b/tests/queries/0_stateless/03230_anyHeavy_merge.sql new file mode 100644 index 00000000000..5d4c0e55d0f --- /dev/null +++ b/tests/queries/0_stateless/03230_anyHeavy_merge.sql @@ -0,0 +1,4 @@ +DROP TABLE IF EXISTS t; +CREATE TABLE t (letter String) ENGINE=MergeTree order by () partition by letter; +INSERT INTO t VALUES ('a'), ('a'), ('a'), ('a'), ('b'), ('a'), ('a'), ('a'), ('a'), ('a'), ('a'), ('a'), ('a'), ('a'), ('a'), ('a'), ('c'); +SELECT anyHeavy(if(letter != 'b', letter, NULL)) FROM t; From b4b68196243ff8ec90cbc31c4294663127ad976f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Fri, 30 Aug 2024 14:07:30 +0200 Subject: [PATCH 22/26] Adapt backward compatibility test --- .../test_functions.py | 45 ++++++++++++++++--- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/tests/integration/test_backward_compatibility/test_functions.py b/tests/integration/test_backward_compatibility/test_functions.py index 3231fb87f33..202a741bfb5 100644 --- a/tests/integration/test_backward_compatibility/test_functions.py +++ b/tests/integration/test_backward_compatibility/test_functions.py @@ -67,6 +67,11 @@ def test_aggregate_states(start_cluster): f"select hex(initializeAggregation('{function_name}State', 'foo'))" ).strip() + def get_final_value_unhex(node, function_name, value): + return node.query( + f"select finalizeAggregation(unhex('{value}')::AggregateFunction({function_name}, String))" + ).strip() + for aggregate_function in aggregate_functions: logging.info("Checking %s", aggregate_function) @@ -99,13 +104,39 @@ def test_aggregate_states(start_cluster): upstream_state = get_aggregate_state_hex(upstream, aggregate_function) if upstream_state != backward_state: - logging.info( - "Failed %s, %s (backward) != %s (upstream)", - aggregate_function, - backward_state, - upstream_state, - ) - failed += 1 + allowed_changes_if_result_is_the_same = ["anyHeavy"] + + if aggregate_function in allowed_changes_if_result_is_the_same: + backward_final_from_upstream = get_final_value_unhex( + backward, aggregate_function, upstream_state + ) + upstream_final_from_backward = get_final_value_unhex( + upstream, aggregate_function, backward_state + ) + + if backward_final_from_upstream == upstream_final_from_backward: + logging.info( + "OK %s (but different intermediate states)", aggregate_function + ) + passed += 1 + else: + logging.error( + "Failed %s, Intermediate: %s (backward) != %s (upstream). Final from intermediate: %s (backward from upstream state) != %s (upstream from backward state)", + aggregate_function, + backward_state, + upstream_state, + backward_final_from_upstream, + upstream_final_from_backward, + ) + failed += 1 + else: + logging.error( + "Failed %s, %s (backward) != %s (upstream)", + aggregate_function, + backward_state, + upstream_state, + ) + failed += 1 else: logging.info("OK %s", aggregate_function) passed += 1 From cbf82712542fb192ac4bbf1102e894d46ecb2ee1 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Fri, 30 Aug 2024 15:45:51 +0200 Subject: [PATCH 23/26] Better --- tests/ci/changelog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/ci/changelog.py b/tests/ci/changelog.py index 554ba339892..8e7900de353 100755 --- a/tests/ci/changelog.py +++ b/tests/ci/changelog.py @@ -288,7 +288,7 @@ def generate_description(item: PullRequest, repo: Repository) -> Optional[Descri # Normalize bug fixes if ( re.match( - r".*(?i)bug\Wfix", + r"(?i).*bug\Wfix", category, ) # Map "Critical Bug Fix" to "Bug fix" category for changelog From 30dd82324aac0053d340d0c08b3f056f8255b6d9 Mon Sep 17 00:00:00 2001 From: serxa Date: Fri, 30 Aug 2024 19:26:59 +0000 Subject: [PATCH 24/26] revert wrong data race fix --- src/Common/HTTPConnectionPool.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/Common/HTTPConnectionPool.cpp b/src/Common/HTTPConnectionPool.cpp index 68c13838c04..7a65863180e 100644 --- a/src/Common/HTTPConnectionPool.cpp +++ b/src/Common/HTTPConnectionPool.cpp @@ -391,15 +391,11 @@ private: { auto idle = idleTime(); - // Reset data hooks for IO scheduling + // Set data hooks for IO scheduling if (ResourceLink link = CurrentThread::getReadResourceLink()) Session::setReceiveDataHooks(std::make_shared(link, ResourceGuard::Metrics::getIORead(), log, request.getMethod(), request.getURI())); - else - Session::setReceiveDataHooks(); if (ResourceLink link = CurrentThread::getWriteResourceLink()) Session::setSendDataHooks(std::make_shared(link, ResourceGuard::Metrics::getIOWrite(), log, request.getMethod(), request.getURI())); - else - Session::setSendDataHooks(); std::ostream & result = Session::sendRequest(request); result.exceptions(std::ios::badbit); @@ -457,6 +453,8 @@ private: } } response_stream = nullptr; + Session::setSendDataHooks(); + Session::setReceiveDataHooks(); group->atConnectionDestroy(); From 3675e83a3f064c6972546537367cc53e36ccb236 Mon Sep 17 00:00:00 2001 From: serxa Date: Fri, 30 Aug 2024 19:27:26 +0000 Subject: [PATCH 25/26] Fix data race in ResourceGuard --- src/Common/Scheduler/ResourceGuard.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/Scheduler/ResourceGuard.h b/src/Common/Scheduler/ResourceGuard.h index 2e735aae656..5947022d9b1 100644 --- a/src/Common/Scheduler/ResourceGuard.h +++ b/src/Common/Scheduler/ResourceGuard.h @@ -107,8 +107,8 @@ public: std::unique_lock lock(mutex); chassert(state == Enqueued); state = Dequeued; + dequeued_cv.notify_one(); } - dequeued_cv.notify_one(); } void wait() From 9f96d180604cba6467e64a51bf98118ebafc598d Mon Sep 17 00:00:00 2001 From: serxa Date: Fri, 30 Aug 2024 19:33:34 +0000 Subject: [PATCH 26/26] cleanup --- src/Common/Scheduler/ResourceGuard.h | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/Common/Scheduler/ResourceGuard.h b/src/Common/Scheduler/ResourceGuard.h index 5947022d9b1..cf97f7acf93 100644 --- a/src/Common/Scheduler/ResourceGuard.h +++ b/src/Common/Scheduler/ResourceGuard.h @@ -103,12 +103,10 @@ public: // That thread will continue execution and do real consumption of requested resource synchronously. void execute() override { - { - std::unique_lock lock(mutex); - chassert(state == Enqueued); - state = Dequeued; - dequeued_cv.notify_one(); - } + std::unique_lock lock(mutex); + chassert(state == Enqueued); + state = Dequeued; + dequeued_cv.notify_one(); } void wait()