mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
IO scheduling on HTTP session level
This commit is contained in:
parent
fcd6a19059
commit
aad55ab55f
@ -19,6 +19,8 @@
|
||||
|
||||
|
||||
#include <ios>
|
||||
#include <memory>
|
||||
#include <functional>
|
||||
#include "Poco/Any.h"
|
||||
#include "Poco/Buffer.h"
|
||||
#include "Poco/Exception.h"
|
||||
@ -33,6 +35,27 @@ namespace Net
|
||||
{
|
||||
|
||||
|
||||
class IHTTPSessionDataHooks
|
||||
/// Interface to control stream of data bytes being sent or received though socket by HTTPSession
|
||||
/// It allows to monitor, throttle and schedule data streams with syscall granulatrity
|
||||
{
|
||||
public:
|
||||
virtual ~IHTTPSessionDataHooks() = default;
|
||||
|
||||
virtual void start(int bytes) = 0;
|
||||
/// Called before sending/receiving data `bytes` to/from socket.
|
||||
|
||||
virtual void finish(int bytes) = 0;
|
||||
/// Called when sending/receiving of data `bytes` is successfully finished.
|
||||
|
||||
virtual void fail() = 0;
|
||||
/// If an error occured during send/receive `fail()` is called instead of `finish()`.
|
||||
};
|
||||
|
||||
|
||||
using HTTPSessionDataHooksPtr = std::shared_ptr<IHTTPSessionDataHooks>;
|
||||
|
||||
|
||||
class Net_API HTTPSession
|
||||
/// HTTPSession implements basic HTTP session management
|
||||
/// for both HTTP clients and HTTP servers.
|
||||
@ -73,6 +96,12 @@ namespace Net
|
||||
Poco::Timespan getReceiveTimeout() const;
|
||||
/// Returns receive timeout for the HTTP session.
|
||||
|
||||
void setSendDataHooks(const HTTPSessionDataHooksPtr & sendDataHooks = {});
|
||||
/// Sets data hooks that will be called on every sent to the socket.
|
||||
|
||||
void setReceiveDataHooks(const HTTPSessionDataHooksPtr & receiveDataHooks = {});
|
||||
/// Sets data hooks that will be called on every receive from the socket.
|
||||
|
||||
bool connected() const;
|
||||
/// Returns true if the underlying socket is connected.
|
||||
|
||||
@ -211,6 +240,10 @@ namespace Net
|
||||
Poco::Exception * _pException;
|
||||
Poco::Any _data;
|
||||
|
||||
// Data hooks
|
||||
HTTPSessionDataHooksPtr _sendDataHooks;
|
||||
HTTPSessionDataHooksPtr _receiveDataHooks;
|
||||
|
||||
friend class HTTPStreamBuf;
|
||||
friend class HTTPHeaderStreamBuf;
|
||||
friend class HTTPFixedLengthStreamBuf;
|
||||
@ -246,6 +279,16 @@ namespace Net
|
||||
return _receiveTimeout;
|
||||
}
|
||||
|
||||
inline void HTTPSession::setSendDataHooks(const HTTPSessionDataHooksPtr & sendDataHooks)
|
||||
{
|
||||
_sendDataHooks = sendDataHooks;
|
||||
}
|
||||
|
||||
inline void HTTPSession::setReceiveDataHooks(const HTTPSessionDataHooksPtr & receiveDataHooks)
|
||||
{
|
||||
_receiveDataHooks = receiveDataHooks;
|
||||
}
|
||||
|
||||
inline StreamSocket & HTTPSession::socket()
|
||||
{
|
||||
return _socket;
|
||||
|
@ -128,14 +128,14 @@ int HTTPSession::get()
|
||||
{
|
||||
if (_pCurrent == _pEnd)
|
||||
refill();
|
||||
|
||||
|
||||
if (_pCurrent < _pEnd)
|
||||
return *_pCurrent++;
|
||||
else
|
||||
return std::char_traits<char>::eof();
|
||||
}
|
||||
|
||||
|
||||
|
||||
int HTTPSession::peek()
|
||||
{
|
||||
if (_pCurrent == _pEnd)
|
||||
@ -147,7 +147,7 @@ int HTTPSession::peek()
|
||||
return std::char_traits<char>::eof();
|
||||
}
|
||||
|
||||
|
||||
|
||||
int HTTPSession::read(char* buffer, std::streamsize length)
|
||||
{
|
||||
if (_pCurrent < _pEnd)
|
||||
@ -166,10 +166,17 @@ int HTTPSession::write(const char* buffer, std::streamsize length)
|
||||
{
|
||||
try
|
||||
{
|
||||
return _socket.sendBytes(buffer, (int) length);
|
||||
if (_sendDataHooks)
|
||||
_sendDataHooks->start((int) length);
|
||||
int result = _socket.sendBytes(buffer, (int) length);
|
||||
if (_sendDataHooks)
|
||||
_sendDataHooks->finish(result);
|
||||
return result;
|
||||
}
|
||||
catch (Poco::Exception& exc)
|
||||
{
|
||||
if (_sendDataHooks)
|
||||
_sendDataHooks->fail();
|
||||
setException(exc);
|
||||
throw;
|
||||
}
|
||||
@ -180,10 +187,17 @@ int HTTPSession::receive(char* buffer, int length)
|
||||
{
|
||||
try
|
||||
{
|
||||
return _socket.receiveBytes(buffer, length);
|
||||
if (_receiveDataHooks)
|
||||
_receiveDataHooks->start(length);
|
||||
int result = _socket.receiveBytes(buffer, length);
|
||||
if (_receiveDataHooks)
|
||||
_receiveDataHooks->finish(result);
|
||||
return result;
|
||||
}
|
||||
catch (Poco::Exception& exc)
|
||||
{
|
||||
if (_receiveDataHooks)
|
||||
_receiveDataHooks->fail();
|
||||
setException(exc);
|
||||
throw;
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ bool checkIsBrokenTimeout()
|
||||
|
||||
SocketImpl::SocketImpl():
|
||||
_sockfd(POCO_INVALID_SOCKET),
|
||||
_blocking(true),
|
||||
_blocking(true),
|
||||
_isBrokenTimeout(checkIsBrokenTimeout())
|
||||
{
|
||||
}
|
||||
@ -81,7 +81,7 @@ SocketImpl::~SocketImpl()
|
||||
close();
|
||||
}
|
||||
|
||||
|
||||
|
||||
SocketImpl* SocketImpl::acceptConnection(SocketAddress& clientAddr)
|
||||
{
|
||||
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
|
||||
@ -117,7 +117,7 @@ void SocketImpl::connect(const SocketAddress& address)
|
||||
rc = ::connect(_sockfd, address.addr(), address.length());
|
||||
}
|
||||
while (rc != 0 && lastError() == POCO_EINTR);
|
||||
if (rc != 0)
|
||||
if (rc != 0)
|
||||
{
|
||||
int err = lastError();
|
||||
error(err, address.toString());
|
||||
@ -204,7 +204,7 @@ void SocketImpl::bind6(const SocketAddress& address, bool reuseAddress, bool reu
|
||||
#if defined(POCO_HAVE_IPv6)
|
||||
if (address.family() != SocketAddress::IPv6)
|
||||
throw Poco::InvalidArgumentException("SocketAddress must be an IPv6 address");
|
||||
|
||||
|
||||
if (_sockfd == POCO_INVALID_SOCKET)
|
||||
{
|
||||
init(address.af());
|
||||
@ -225,11 +225,11 @@ void SocketImpl::bind6(const SocketAddress& address, bool reuseAddress, bool reu
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
|
||||
void SocketImpl::listen(int backlog)
|
||||
{
|
||||
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
|
||||
|
||||
|
||||
int rc = ::listen(_sockfd, backlog);
|
||||
if (rc != 0) error();
|
||||
}
|
||||
@ -253,7 +253,7 @@ void SocketImpl::shutdownReceive()
|
||||
if (rc != 0) error();
|
||||
}
|
||||
|
||||
|
||||
|
||||
void SocketImpl::shutdownSend()
|
||||
{
|
||||
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
|
||||
@ -262,7 +262,7 @@ void SocketImpl::shutdownSend()
|
||||
if (rc != 0) error();
|
||||
}
|
||||
|
||||
|
||||
|
||||
void SocketImpl::shutdown()
|
||||
{
|
||||
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
|
||||
@ -317,7 +317,7 @@ int SocketImpl::receiveBytes(void* buffer, int length, int flags)
|
||||
throw TimeoutException();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int rc;
|
||||
do
|
||||
{
|
||||
@ -325,7 +325,7 @@ int SocketImpl::receiveBytes(void* buffer, int length, int flags)
|
||||
rc = ::recv(_sockfd, reinterpret_cast<char*>(buffer), length, flags);
|
||||
}
|
||||
while (blocking && rc < 0 && lastError() == POCO_EINTR);
|
||||
if (rc < 0)
|
||||
if (rc < 0)
|
||||
{
|
||||
int err = lastError();
|
||||
if ((err == POCO_EAGAIN || err == POCO_EWOULDBLOCK) && !blocking)
|
||||
@ -363,7 +363,7 @@ int SocketImpl::receiveFrom(void* buffer, int length, SocketAddress& address, in
|
||||
throw TimeoutException();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
sockaddr_storage abuffer;
|
||||
struct sockaddr* pSA = reinterpret_cast<struct sockaddr*>(&abuffer);
|
||||
poco_socklen_t saLen = sizeof(abuffer);
|
||||
@ -450,7 +450,7 @@ bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode)
|
||||
}
|
||||
while (rc < 0 && lastError() == POCO_EINTR);
|
||||
if (rc < 0) error();
|
||||
return rc > 0;
|
||||
return rc > 0;
|
||||
|
||||
#else
|
||||
|
||||
@ -493,7 +493,7 @@ bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode)
|
||||
}
|
||||
while (rc < 0 && errorCode == POCO_EINTR);
|
||||
if (rc < 0) error(errorCode);
|
||||
return rc > 0;
|
||||
return rc > 0;
|
||||
|
||||
#endif // POCO_HAVE_FD_POLL
|
||||
}
|
||||
@ -503,13 +503,13 @@ bool SocketImpl::poll(const Poco::Timespan& timeout, int mode)
|
||||
Poco::Timespan remainingTime(timeout);
|
||||
return pollImpl(remainingTime, mode);
|
||||
}
|
||||
|
||||
|
||||
void SocketImpl::setSendBufferSize(int size)
|
||||
{
|
||||
setOption(SOL_SOCKET, SO_SNDBUF, size);
|
||||
}
|
||||
|
||||
|
||||
|
||||
int SocketImpl::getSendBufferSize()
|
||||
{
|
||||
int result;
|
||||
@ -523,7 +523,7 @@ void SocketImpl::setReceiveBufferSize(int size)
|
||||
setOption(SOL_SOCKET, SO_RCVBUF, size);
|
||||
}
|
||||
|
||||
|
||||
|
||||
int SocketImpl::getReceiveBufferSize()
|
||||
{
|
||||
int result;
|
||||
@ -569,7 +569,7 @@ Poco::Timespan SocketImpl::getReceiveTimeout()
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
|
||||
SocketAddress SocketImpl::address()
|
||||
{
|
||||
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
|
||||
@ -580,7 +580,7 @@ SocketAddress SocketImpl::address()
|
||||
int rc = ::getsockname(_sockfd, pSA, &saLen);
|
||||
if (rc == 0)
|
||||
return SocketAddress(pSA, saLen);
|
||||
else
|
||||
else
|
||||
error();
|
||||
return SocketAddress();
|
||||
}
|
||||
|
@ -291,6 +291,9 @@
|
||||
M(DistrCacheWriteRequests, "Number of executed Write requests to Distributed Cache") \
|
||||
M(DistrCacheServerConnections, "Number of open connections to ClickHouse server from Distributed Cache") \
|
||||
\
|
||||
M(SchedulerIOReadScheduled, "Number of IO reads are being scheduled currently") \
|
||||
M(SchedulerIOWriteScheduled, "Number of IO writes are being scheduled currently") \
|
||||
\
|
||||
M(StorageConnectionsStored, "Total count of sessions stored in the session pool for storages") \
|
||||
M(StorageConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for storages") \
|
||||
\
|
||||
|
@ -113,6 +113,56 @@ std::string_view CurrentThread::getQueryId()
|
||||
return current_thread->getQueryId();
|
||||
}
|
||||
|
||||
void CurrentThread::attachReadResource(ResourceLink link)
|
||||
{
|
||||
if (unlikely(!current_thread))
|
||||
return;
|
||||
if (current_thread->read_resource_link)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to read resource", std::to_string(getThreadId()));
|
||||
current_thread->read_resource_link = link;
|
||||
}
|
||||
|
||||
void CurrentThread::detachReadResource()
|
||||
{
|
||||
if (unlikely(!current_thread))
|
||||
return;
|
||||
if (!current_thread->read_resource_link)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to read resource", std::to_string(getThreadId()));
|
||||
current_thread->read_resource_link.reset();
|
||||
}
|
||||
|
||||
ResourceLink CurrentThread::getReadResourceLink()
|
||||
{
|
||||
if (unlikely(!current_thread))
|
||||
return {};
|
||||
return current_thread->read_resource_link;
|
||||
}
|
||||
|
||||
void CurrentThread::attachWriteResource(ResourceLink link)
|
||||
{
|
||||
if (unlikely(!current_thread))
|
||||
return;
|
||||
if (current_thread->write_resource_link)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to write resource", std::to_string(getThreadId()));
|
||||
current_thread->write_resource_link = link;
|
||||
}
|
||||
|
||||
void CurrentThread::detachWriteResource()
|
||||
{
|
||||
if (unlikely(!current_thread))
|
||||
return;
|
||||
if (!current_thread->write_resource_link)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to write resource", std::to_string(getThreadId()));
|
||||
current_thread->write_resource_link.reset();
|
||||
}
|
||||
|
||||
ResourceLink CurrentThread::getWriteResourceLink()
|
||||
{
|
||||
if (unlikely(!current_thread))
|
||||
return {};
|
||||
return current_thread->write_resource_link;
|
||||
}
|
||||
|
||||
MemoryTracker * CurrentThread::getUserMemoryTracker()
|
||||
{
|
||||
if (unlikely(!current_thread))
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Common/ThreadStatus.h>
|
||||
#include <Common/Scheduler/ResourceLink.h>
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
@ -23,7 +24,6 @@ class QueryStatus;
|
||||
struct Progress;
|
||||
class InternalTextLogsQueue;
|
||||
|
||||
|
||||
/** Collection of static methods to work with thread-local objects.
|
||||
* Allows to attach and detach query/process (thread group) to a thread
|
||||
* (to calculate query-related metrics and to allow to obtain query-related data from a thread).
|
||||
@ -92,6 +92,14 @@ public:
|
||||
|
||||
static std::string_view getQueryId();
|
||||
|
||||
// For IO Scheduling
|
||||
static void attachReadResource(ResourceLink link);
|
||||
static void detachReadResource();
|
||||
static ResourceLink getReadResourceLink();
|
||||
static void attachWriteResource(ResourceLink link);
|
||||
static void detachWriteResource();
|
||||
static ResourceLink getWriteResourceLink();
|
||||
|
||||
/// Initializes query with current thread as master thread in constructor, and detaches it in destructor
|
||||
struct QueryScope : private boost::noncopyable
|
||||
{
|
||||
@ -102,6 +110,39 @@ public:
|
||||
void logPeakMemoryUsage();
|
||||
bool log_peak_memory_usage_in_destructor = true;
|
||||
};
|
||||
|
||||
/// Scoped attach/detach of IO resource links
|
||||
struct IOScope : private boost::noncopyable
|
||||
{
|
||||
explicit IOScope(ResourceLink read_resource_link, ResourceLink write_resource_link)
|
||||
{
|
||||
if (read_resource_link)
|
||||
{
|
||||
attachReadResource(read_resource_link);
|
||||
read_attached = true;
|
||||
}
|
||||
if (write_resource_link)
|
||||
{
|
||||
attachWriteResource(write_resource_link);
|
||||
write_attached = true;
|
||||
}
|
||||
}
|
||||
|
||||
explicit IOScope(const IOSchedulingSettings & settings)
|
||||
: IOScope(settings.read_resource_link, settings.write_resource_link)
|
||||
{}
|
||||
|
||||
~IOScope()
|
||||
{
|
||||
if (read_attached)
|
||||
detachReadResource();
|
||||
if (write_attached)
|
||||
detachWriteResource();
|
||||
}
|
||||
|
||||
bool read_attached = false;
|
||||
bool write_attached = false;
|
||||
};
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <Common/ProxyConfiguration.h>
|
||||
#include <Common/MemoryTrackerSwitcher.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/Scheduler/ResourceGuard.h>
|
||||
|
||||
#include <Poco/Net/HTTPChunkedStream.h>
|
||||
#include <Poco/Net/HTTPClientSession.h>
|
||||
@ -249,6 +250,54 @@ public:
|
||||
};
|
||||
|
||||
|
||||
// Session data hooks implementation for integration with resource scheduler.
|
||||
// Hooks are created per every request-response pair and are registered/unregistered in HTTP session.
|
||||
// * `start()` send resource request to the scheduler every time HTTP session is going to send or receive
|
||||
// data to/from socket. `start()` waits for the scheduler confirmation. This way scheduler might
|
||||
// throttle and/or schedule socket data streams.
|
||||
// * `finish()` hook is called on successful socket read/write operation.
|
||||
// It informs the scheduler that operation is complete, which allows the scheduler to control the total
|
||||
// amount of in-flight bytes and/or operations.
|
||||
// * `fail()` hook is called on failure of socket operation. The purpose is to correct the amount of bytes
|
||||
// passed through the scheduler queue to ensure fair bandwidth allocation even in presence of errors.
|
||||
struct ResourceGuardSessionDataHooks : public Poco::Net::IHTTPSessionDataHooks
|
||||
{
|
||||
explicit ResourceGuardSessionDataHooks(const ResourceGuard::Metrics * metrics, ResourceLink link_)
|
||||
: link(link_)
|
||||
{
|
||||
request.metrics = metrics;
|
||||
chassert(link);
|
||||
}
|
||||
|
||||
~ResourceGuardSessionDataHooks() override
|
||||
{
|
||||
request.assertFinished(); // Never destruct with an active request
|
||||
}
|
||||
|
||||
void start(int bytes) override
|
||||
{
|
||||
// TODO(serxa): add metrics here or better in scheduler code (e.g. during enqueue, or better in REsourceGuard::REquest)?
|
||||
request.enqueue(bytes, link);
|
||||
request.wait();
|
||||
}
|
||||
|
||||
void finish(int bytes) override
|
||||
{
|
||||
request.finish();
|
||||
link.adjust(request.cost, bytes); // success
|
||||
}
|
||||
|
||||
void fail() override
|
||||
{
|
||||
request.finish();
|
||||
link.accumulate(request.cost); // We assume no resource was used in case of failure
|
||||
}
|
||||
|
||||
ResourceLink link;
|
||||
ResourceGuard::Request request;
|
||||
};
|
||||
|
||||
|
||||
// EndpointConnectionPool manage connections to the endpoint
|
||||
// Features:
|
||||
// - it uses HostResolver for address selecting. See Common/HostResolver.h for more info.
|
||||
@ -259,8 +308,6 @@ public:
|
||||
// - `Session::reconnect()` uses the pool as well
|
||||
// - comprehensive sensors
|
||||
// - session is reused according its inner state, automatically
|
||||
|
||||
|
||||
template <class Session>
|
||||
class EndpointConnectionPool : public std::enable_shared_from_this<EndpointConnectionPool<Session>>, public IExtendedPool
|
||||
{
|
||||
@ -350,6 +397,19 @@ private:
|
||||
std::ostream & sendRequest(Poco::Net::HTTPRequest & request) override
|
||||
{
|
||||
auto idle = idleTime();
|
||||
|
||||
// Reset data hooks for IO scheduling
|
||||
if (ResourceLink link = CurrentThread::getReadResourceLink()) {
|
||||
Session::setSendDataHooks(std::make_shared<ResourceGuardSessionDataHooks>(ResourceGuard::Metrics::getIORead(), link));
|
||||
} else {
|
||||
Session::setSendDataHooks();
|
||||
}
|
||||
if (ResourceLink link = CurrentThread::getWriteResourceLink()) {
|
||||
Session::setReceiveDataHooks(std::make_shared<ResourceGuardSessionDataHooks>(ResourceGuard::Metrics::getIOWrite(), link));
|
||||
} else {
|
||||
Session::setReceiveDataHooks();
|
||||
}
|
||||
|
||||
std::ostream & result = Session::sendRequest(request);
|
||||
result.exceptions(std::ios::badbit);
|
||||
|
||||
|
@ -104,6 +104,13 @@
|
||||
M(PartsWithAppliedMutationsOnFly, "Total number of parts for which there was any mutation applied on fly") \
|
||||
M(MutationsAppliedOnFlyInAllParts, "The sum of number of applied mutations on-fly for part among all read parts") \
|
||||
\
|
||||
M(SchedulerIOReadRequests, "Resource requests passed through scheduler for IO reads.") \
|
||||
M(SchedulerIOReadBytes, "Bytes passed through scheduler for IO reads.") \
|
||||
M(SchedulerIOReadWaitMicroseconds, "Total time a query was waiting on resource requests for IO reads.") \
|
||||
M(SchedulerIOWriteRequests, "Resource requests passed through scheduler for IO writes.") \
|
||||
M(SchedulerIOWriteBytes, "Bytes passed through scheduler for IO writes.") \
|
||||
M(SchedulerIOWriteWaitMicroseconds, "Total time a query was waiting on resource requests for IO writes.") \
|
||||
\
|
||||
M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \
|
||||
\
|
||||
M(ReplicatedPartFetches, "Number of times a data part was downloaded from replica of a ReplicatedMergeTree table.") \
|
||||
|
@ -232,7 +232,7 @@ struct ResourceTestManager : public ResourceTestBase
|
||||
ResourceTestManager & t;
|
||||
|
||||
Guard(ResourceTestManager & t_, ResourceLink link_, ResourceCost cost)
|
||||
: ResourceGuard(link_, cost, PostponeLocking)
|
||||
: ResourceGuard(ResourceGuard::Metrics::getIOWrite(), link_, cost, PostponeLocking)
|
||||
, t(t_)
|
||||
{
|
||||
t.onEnqueue(link);
|
||||
@ -310,7 +310,7 @@ struct ResourceTestManager : public ResourceTestBase
|
||||
// NOTE: actually leader's request(s) make their own small busy period.
|
||||
void blockResource(ResourceLink link)
|
||||
{
|
||||
ResourceGuard g(link, 1, ResourceGuard::PostponeLocking);
|
||||
ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, 1, ResourceGuard::PostponeLocking);
|
||||
g.lock();
|
||||
// NOTE: at this point we assume resource to be blocked by single request (<max_requests>1</max_requests>)
|
||||
busy_period.arrive_and_wait(); // (1) notify all followers that resource is blocked
|
||||
@ -320,7 +320,7 @@ struct ResourceTestManager : public ResourceTestBase
|
||||
{
|
||||
getLinkData(link).left += total_requests + 1;
|
||||
busy_period.arrive_and_wait(); // (1) wait leader to block resource
|
||||
ResourceGuard g(link, cost, ResourceGuard::PostponeLocking);
|
||||
ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, cost, ResourceGuard::PostponeLocking);
|
||||
onEnqueue(link);
|
||||
busy_period.arrive_and_wait(); // (2) notify leader to unblock
|
||||
g.lock();
|
||||
|
@ -36,11 +36,11 @@ TEST(SchedulerDynamicResourceManager, Smoke)
|
||||
|
||||
for (int i = 0; i < 10; i++)
|
||||
{
|
||||
ResourceGuard gA(cA->get("res1"), ResourceGuard::PostponeLocking);
|
||||
ResourceGuard gA(ResourceGuard::Metrics::getIOWrite(), cA->get("res1"), ResourceGuard::PostponeLocking);
|
||||
gA.lock();
|
||||
gA.unlock();
|
||||
|
||||
ResourceGuard gB(cB->get("res1"));
|
||||
ResourceGuard gB(ResourceGuard::Metrics::getIOWrite(), cB->get("res1"));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -109,22 +109,22 @@ TEST(SchedulerRoot, Smoke)
|
||||
r2.registerResource();
|
||||
|
||||
{
|
||||
ResourceGuard rg(a);
|
||||
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), a);
|
||||
EXPECT_TRUE(fc1->requests.contains(&rg.request));
|
||||
}
|
||||
|
||||
{
|
||||
ResourceGuard rg(b);
|
||||
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), b);
|
||||
EXPECT_TRUE(fc1->requests.contains(&rg.request));
|
||||
}
|
||||
|
||||
{
|
||||
ResourceGuard rg(c);
|
||||
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), c);
|
||||
EXPECT_TRUE(fc2->requests.contains(&rg.request));
|
||||
}
|
||||
|
||||
{
|
||||
ResourceGuard rg(d);
|
||||
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), d);
|
||||
EXPECT_TRUE(fc2->requests.contains(&rg.request));
|
||||
}
|
||||
}
|
||||
|
@ -7,10 +7,30 @@
|
||||
#include <Common/Scheduler/ResourceRequest.h>
|
||||
#include <Common/Scheduler/ResourceLink.h>
|
||||
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event SchedulerIOReadRequests;
|
||||
extern const Event SchedulerIOReadBytes;
|
||||
extern const Event SchedulerIOReadWaitMicroseconds;
|
||||
extern const Event SchedulerIOWriteRequests;
|
||||
extern const Event SchedulerIOWriteBytes;
|
||||
extern const Event SchedulerIOWriteWaitMicroseconds;
|
||||
}
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric SchedulerIOReadScheduled;
|
||||
extern const Metric SchedulerIOWriteScheduled;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -30,6 +50,36 @@ public:
|
||||
PostponeLocking /// Don't lock in constructor, but send request
|
||||
};
|
||||
|
||||
struct Metrics
|
||||
{
|
||||
const ProfileEvents::Event requests = ProfileEvents::end();
|
||||
const ProfileEvents::Event cost = ProfileEvents::end();
|
||||
const ProfileEvents::Event wait_microseconds = ProfileEvents::end();
|
||||
const CurrentMetrics::Metric scheduled_count = CurrentMetrics::end();
|
||||
|
||||
static const Metrics * getIORead()
|
||||
{
|
||||
static Metrics metrics{
|
||||
.requests = ProfileEvents::SchedulerIOReadRequests,
|
||||
.cost = ProfileEvents::SchedulerIOReadBytes,
|
||||
.wait_microseconds = ProfileEvents::SchedulerIOReadWaitMicroseconds,
|
||||
.scheduled_count = CurrentMetrics::SchedulerIOReadScheduled
|
||||
};
|
||||
return &metrics;
|
||||
}
|
||||
|
||||
static const Metrics * getIOWrite()
|
||||
{
|
||||
static Metrics metrics{
|
||||
.requests = ProfileEvents::SchedulerIOWriteRequests,
|
||||
.cost = ProfileEvents::SchedulerIOWriteBytes,
|
||||
.wait_microseconds = ProfileEvents::SchedulerIOWriteWaitMicroseconds,
|
||||
.scheduled_count = CurrentMetrics::SchedulerIOWriteScheduled
|
||||
};
|
||||
return &metrics;
|
||||
}
|
||||
};
|
||||
|
||||
enum RequestState
|
||||
{
|
||||
Finished, // Last request has already finished; no concurrent access is possible
|
||||
@ -46,6 +96,8 @@ public:
|
||||
chassert(state == Finished);
|
||||
state = Enqueued;
|
||||
ResourceRequest::reset(cost_);
|
||||
ProfileEvents::increment(metrics->requests);
|
||||
ProfileEvents::increment(metrics->cost, cost_);
|
||||
link_.queue->enqueueRequestUsingBudget(this);
|
||||
}
|
||||
|
||||
@ -63,6 +115,8 @@ public:
|
||||
|
||||
void wait()
|
||||
{
|
||||
CurrentMetrics::Increment scheduled(metrics->scheduled_count);
|
||||
auto timer = CurrentThread::getProfileEvents().timer(metrics->wait_microseconds);
|
||||
std::unique_lock lock(mutex);
|
||||
dequeued_cv.wait(lock, [this] { return state == Dequeued; });
|
||||
}
|
||||
@ -75,14 +129,23 @@ public:
|
||||
ResourceRequest::finish();
|
||||
}
|
||||
|
||||
static Request & local()
|
||||
void assertFinished()
|
||||
{
|
||||
// lock(mutex) is not required because `Finished` request cannot be used by the scheduler thread
|
||||
chassert(state == Finished);
|
||||
}
|
||||
|
||||
static Request & local(const Metrics * metrics)
|
||||
{
|
||||
// Since single thread cannot use more than one resource request simultaneously,
|
||||
// we can reuse thread-local request to avoid allocations
|
||||
static thread_local Request instance;
|
||||
instance.metrics = metrics;
|
||||
return instance;
|
||||
}
|
||||
|
||||
const Metrics * metrics = nullptr; // Must be initialized before use
|
||||
|
||||
private:
|
||||
std::mutex mutex;
|
||||
std::condition_variable dequeued_cv;
|
||||
@ -90,13 +153,13 @@ public:
|
||||
};
|
||||
|
||||
/// Creates pending request for resource; blocks while resource is not available (unless `PostponeLocking`)
|
||||
explicit ResourceGuard(ResourceLink link_, ResourceCost cost = 1, ResourceGuardCtor ctor = LockStraightAway)
|
||||
explicit ResourceGuard(const Metrics * metrics, ResourceLink link_, ResourceCost cost = 1, ResourceGuardCtor ctor = LockStraightAway)
|
||||
: link(link_)
|
||||
, request(Request::local())
|
||||
, request(Request::local(metrics))
|
||||
{
|
||||
if (cost == 0)
|
||||
link.queue = nullptr; // Ignore zero-cost requests
|
||||
else if (link.queue)
|
||||
link.reset(); // Ignore zero-cost requests
|
||||
else if (link)
|
||||
{
|
||||
request.enqueue(cost, link);
|
||||
if (ctor == LockStraightAway)
|
||||
@ -112,17 +175,17 @@ public:
|
||||
/// Blocks until resource is available
|
||||
void lock()
|
||||
{
|
||||
if (link.queue)
|
||||
if (link)
|
||||
request.wait();
|
||||
}
|
||||
|
||||
/// Report resource consumption has finished
|
||||
void unlock()
|
||||
{
|
||||
if (link.queue)
|
||||
if (link)
|
||||
{
|
||||
request.finish();
|
||||
link.queue = nullptr;
|
||||
link.reset();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -13,13 +13,32 @@ using ResourceCost = Int64;
|
||||
struct ResourceLink
|
||||
{
|
||||
ISchedulerQueue * queue = nullptr;
|
||||
|
||||
bool operator==(const ResourceLink &) const = default;
|
||||
explicit operator bool() const { return queue != nullptr; }
|
||||
|
||||
void adjust(ResourceCost estimated_cost, ResourceCost real_cost) const;
|
||||
|
||||
void consumed(ResourceCost cost) const;
|
||||
|
||||
void accumulate(ResourceCost cost) const;
|
||||
|
||||
void reset()
|
||||
{
|
||||
queue = nullptr;
|
||||
}
|
||||
};
|
||||
|
||||
/*
|
||||
* Everything required for IO scheduling.
|
||||
* Note that raw pointer are stored inside, so make sure that `ClassifierPtr` that produced
|
||||
* resource links will outlive them. Usually classifier is stored in query `Context`.
|
||||
*/
|
||||
struct IOSchedulingSettings
|
||||
{
|
||||
ResourceLink read_resource_link;
|
||||
ResourceLink write_resource_link;
|
||||
|
||||
bool operator==(const IOSchedulingSettings &) const = default;
|
||||
explicit operator bool() const { return read_resource_link && write_resource_link; }
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -7,11 +7,11 @@
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/Scheduler/ResourceLink.h>
|
||||
|
||||
#include <boost/noncopyable.hpp>
|
||||
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <unordered_set>
|
||||
@ -188,6 +188,10 @@ public:
|
||||
Progress progress_in;
|
||||
Progress progress_out;
|
||||
|
||||
/// IO scheduling
|
||||
ResourceLink read_resource_link;
|
||||
ResourceLink write_resource_link;
|
||||
|
||||
private:
|
||||
/// Group of threads, to which this thread attached
|
||||
ThreadGroupPtr thread_group;
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/Throttler.h>
|
||||
#include <Common/Scheduler/ResourceGuard.h>
|
||||
#include <base/sleep.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
@ -113,13 +114,17 @@ bool ReadBufferFromAzureBlobStorage::nextImpl()
|
||||
{
|
||||
try
|
||||
{
|
||||
ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, to_read_bytes);
|
||||
bytes_read = data_stream->ReadToCount(reinterpret_cast<uint8_t *>(data_ptr), to_read_bytes);
|
||||
read_settings.io_scheduling.read_resource_link.adjust(to_read_bytes, bytes_read);
|
||||
rlock.unlock(); // Do not hold resource under bandwidth throttler
|
||||
if (read_settings.remote_throttler)
|
||||
read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
|
||||
break;
|
||||
}
|
||||
catch (const Azure::Core::RequestFailedException & e)
|
||||
{
|
||||
read_settings.io_scheduling.read_resource_link.accumulate(to_read_bytes); // We assume no resource was used in case of failure
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromAzureRequestsErrors);
|
||||
LOG_DEBUG(log, "Exception caught during Azure Read for file {} at attempt {}/{}: {}", path, i + 1, max_single_read_retries, e.Message);
|
||||
|
||||
|
@ -88,14 +88,14 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
|
||||
{
|
||||
try
|
||||
{
|
||||
ResourceGuard rlock(write_settings.resource_link, cost); // Note that zero-cost requests are ignored
|
||||
ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, cost); // Note that zero-cost requests are ignored
|
||||
func();
|
||||
break;
|
||||
}
|
||||
catch (const Azure::Core::RequestFailedException & e)
|
||||
{
|
||||
if (cost)
|
||||
write_settings.resource_link.accumulate(cost); // Accumulate resource for later use, because we have failed to consume it
|
||||
write_settings.io_scheduling.write_resource_link.accumulate(cost); // Accumulate resource for later use, because we have failed to consume it
|
||||
|
||||
if (i == num_tries - 1 || !isRetryableAzureException(e))
|
||||
throw;
|
||||
@ -105,7 +105,7 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
|
||||
catch (...)
|
||||
{
|
||||
if (cost)
|
||||
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
|
||||
write_settings.io_scheduling.write_resource_link.accumulate(cost); // We assume no resource was used in case of failure
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -461,14 +461,17 @@ DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
|
||||
}
|
||||
|
||||
template <class Settings>
|
||||
static inline Settings updateResourceLink(const Settings & settings, const String & resource_name)
|
||||
static inline Settings updateIOSchedulingSettings(const Settings & settings, const String & read_resource_name, const String & write_resource_name)
|
||||
{
|
||||
if (resource_name.empty())
|
||||
if (read_resource_name.empty() && write_resource_name.empty())
|
||||
return settings;
|
||||
if (auto query_context = CurrentThread::getQueryContext())
|
||||
{
|
||||
Settings result(settings);
|
||||
result.resource_link = query_context->getWorkloadClassifier()->get(resource_name);
|
||||
if (!read_resource_name.empty())
|
||||
result.io_scheduling.read_resource_link = query_context->getWorkloadClassifier()->get(read_resource_name);
|
||||
if (!write_resource_name.empty())
|
||||
result.io_scheduling.write_resource_link = query_context->getWorkloadClassifier()->get(write_resource_name);
|
||||
return result;
|
||||
}
|
||||
return settings;
|
||||
@ -500,7 +503,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
|
||||
|
||||
return object_storage->readObjects(
|
||||
storage_objects,
|
||||
updateResourceLink(settings, getReadResourceName()),
|
||||
updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName()),
|
||||
read_hint,
|
||||
file_size);
|
||||
}
|
||||
@ -513,7 +516,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
|
||||
{
|
||||
LOG_TEST(log, "Write file: {}", path);
|
||||
|
||||
WriteSettings write_settings = updateResourceLink(settings, getWriteResourceName());
|
||||
WriteSettings write_settings = updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName());
|
||||
auto transaction = createObjectStorageTransaction();
|
||||
return transaction->writeFile(path, buf_size, mode, write_settings);
|
||||
}
|
||||
|
@ -6,7 +6,6 @@
|
||||
|
||||
#include <IO/ReadBufferFromIStream.h>
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <Common/Scheduler/ResourceGuard.h>
|
||||
#include <IO/S3/getObjectInfo.h>
|
||||
#include <IO/S3/Requests.h>
|
||||
|
||||
@ -425,22 +424,13 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3InitMicroseconds);
|
||||
|
||||
// We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below
|
||||
constexpr ResourceCost estimated_cost = 1;
|
||||
ResourceGuard rlock(read_settings.resource_link, estimated_cost);
|
||||
|
||||
CurrentThread::IOScope io_scope(read_settings.io_scheduling);
|
||||
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
|
||||
|
||||
rlock.unlock();
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
ResourceCost bytes_read = outcome.GetResult().GetContentLength();
|
||||
read_settings.resource_link.adjust(estimated_cost, bytes_read);
|
||||
return outcome.GetResultWithOwnership();
|
||||
}
|
||||
else
|
||||
{
|
||||
read_settings.resource_link.accumulate(estimated_cost);
|
||||
const auto & error = outcome.GetError();
|
||||
throw S3Exception(error.GetMessage(), error.GetErrorType());
|
||||
}
|
||||
|
@ -118,8 +118,7 @@ struct ReadSettings
|
||||
ThrottlerPtr remote_throttler;
|
||||
ThrottlerPtr local_throttler;
|
||||
|
||||
// Resource to be used during reading
|
||||
ResourceLink resource_link;
|
||||
IOSchedulingSettings io_scheduling;
|
||||
|
||||
size_t http_max_tries = 10;
|
||||
size_t http_retry_initial_backoff_ms = 100;
|
||||
|
@ -11,7 +11,6 @@
|
||||
#include <Common/Throttler.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
|
||||
#include <Common/Scheduler/ResourceGuard.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/S3Common.h>
|
||||
#include <IO/S3/Requests.h>
|
||||
@ -538,12 +537,11 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data)
|
||||
|
||||
auto & request = std::get<0>(*worker_data);
|
||||
|
||||
ResourceCost cost = request.GetContentLength();
|
||||
ResourceGuard rlock(write_settings.resource_link, cost);
|
||||
CurrentThread::IOScope io_scope(write_settings.io_scheduling);
|
||||
|
||||
Stopwatch watch;
|
||||
auto outcome = client_ptr->UploadPart(request);
|
||||
watch.stop();
|
||||
rlock.unlock(); // Avoid acquiring other locks under resource lock
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
|
||||
|
||||
@ -557,7 +555,6 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data)
|
||||
if (!outcome.IsSuccess())
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
|
||||
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
}
|
||||
|
||||
@ -695,12 +692,11 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data
|
||||
if (client_ptr->isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3PutObject);
|
||||
|
||||
ResourceCost cost = request.GetContentLength();
|
||||
ResourceGuard rlock(write_settings.resource_link, cost);
|
||||
CurrentThread::IOScope io_scope(write_settings.io_scheduling);
|
||||
|
||||
Stopwatch watch;
|
||||
auto outcome = client_ptr->PutObject(request);
|
||||
watch.stop();
|
||||
rlock.unlock();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
|
||||
if (blob_log)
|
||||
@ -714,7 +710,6 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
|
||||
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
|
||||
|
||||
if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
|
||||
{
|
||||
|
@ -13,8 +13,7 @@ struct WriteSettings
|
||||
ThrottlerPtr remote_throttler;
|
||||
ThrottlerPtr local_throttler;
|
||||
|
||||
// Resource to be used during reading
|
||||
ResourceLink resource_link;
|
||||
IOSchedulingSettings io_scheduling;
|
||||
|
||||
/// Filesystem cache settings
|
||||
bool enable_filesystem_cache_on_write_operations = false;
|
||||
|
@ -119,27 +119,25 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
return false;
|
||||
}
|
||||
|
||||
ResourceGuard rlock(read_settings.resource_link, num_bytes_to_read);
|
||||
int bytes_read;
|
||||
try
|
||||
{
|
||||
ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, num_bytes_to_read);
|
||||
bytes_read = hdfsRead(fs.get(), fin, internal_buffer.begin(), safe_cast<int>(num_bytes_to_read));
|
||||
read_settings.io_scheduling.read_resource_link.adjust(num_bytes_to_read, std::max(0, bytes_read));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
read_settings.resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure
|
||||
read_settings.io_scheduling.read_resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure
|
||||
throw;
|
||||
}
|
||||
rlock.unlock();
|
||||
|
||||
if (bytes_read < 0)
|
||||
{
|
||||
read_settings.resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure
|
||||
throw Exception(ErrorCodes::NETWORK_ERROR,
|
||||
"Fail to read from HDFS: {}, file path: {}. Error: {}",
|
||||
hdfs_uri, hdfs_file_path, std::string(hdfsGetLastError()));
|
||||
}
|
||||
read_settings.resource_link.adjust(num_bytes_to_read, bytes_read);
|
||||
|
||||
if (bytes_read)
|
||||
{
|
||||
|
@ -66,25 +66,21 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
|
||||
|
||||
int write(const char * start, size_t size)
|
||||
{
|
||||
ResourceGuard rlock(write_settings.resource_link, size);
|
||||
int bytes_written;
|
||||
try
|
||||
{
|
||||
ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, size);
|
||||
bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast<int>(size));
|
||||
write_settings.io_scheduling.write_resource_link.adjust(size, std::max(0, bytes_written));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
write_settings.resource_link.accumulate(size); // We assume no resource was used in case of failure
|
||||
write_settings.io_scheduling.write_resource_link.accumulate(size); // We assume no resource was used in case of failure
|
||||
throw;
|
||||
}
|
||||
rlock.unlock();
|
||||
|
||||
if (bytes_written < 0)
|
||||
{
|
||||
write_settings.resource_link.accumulate(size); // We assume no resource was used in case of failure
|
||||
throw Exception(ErrorCodes::NETWORK_ERROR, "Fail to write HDFS file: {} {}", hdfs_uri, std::string(hdfsGetLastError()));
|
||||
}
|
||||
write_settings.resource_link.adjust(size, bytes_written);
|
||||
|
||||
if (write_settings.remote_throttler)
|
||||
write_settings.remote_throttler->add(bytes_written, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds);
|
||||
|
Loading…
Reference in New Issue
Block a user