diff --git a/base/poco/Net/include/Poco/Net/HTTPSession.h b/base/poco/Net/include/Poco/Net/HTTPSession.h index cac14f479db..2038fd2aff5 100644 --- a/base/poco/Net/include/Poco/Net/HTTPSession.h +++ b/base/poco/Net/include/Poco/Net/HTTPSession.h @@ -19,6 +19,8 @@ #include +#include +#include #include "Poco/Any.h" #include "Poco/Buffer.h" #include "Poco/Exception.h" @@ -33,6 +35,27 @@ namespace Net { + class IHTTPSessionDataHooks + /// Interface to control stream of data bytes being sent or received though socket by HTTPSession + /// It allows to monitor, throttle and schedule data streams with syscall granulatrity + { + public: + virtual ~IHTTPSessionDataHooks() = default; + + virtual void atStart(int bytes) = 0; + /// Called before sending/receiving data `bytes` to/from socket. + + virtual void atFinish(int bytes) = 0; + /// Called when sending/receiving of data `bytes` is successfully finished. + + virtual void atFail() = 0; + /// If an error occurred during send/receive `fail()` is called instead of `finish()`. + }; + + + using HTTPSessionDataHooksPtr = std::shared_ptr; + + class Net_API HTTPSession /// HTTPSession implements basic HTTP session management /// for both HTTP clients and HTTP servers. @@ -73,6 +96,12 @@ namespace Net Poco::Timespan getReceiveTimeout() const; /// Returns receive timeout for the HTTP session. + void setSendDataHooks(const HTTPSessionDataHooksPtr & sendDataHooks = {}); + /// Sets data hooks that will be called on every sent to the socket. + + void setReceiveDataHooks(const HTTPSessionDataHooksPtr & receiveDataHooks = {}); + /// Sets data hooks that will be called on every receive from the socket. + bool connected() const; /// Returns true if the underlying socket is connected. @@ -211,6 +240,10 @@ namespace Net Poco::Exception * _pException; Poco::Any _data; + // Data hooks + HTTPSessionDataHooksPtr _sendDataHooks; + HTTPSessionDataHooksPtr _receiveDataHooks; + friend class HTTPStreamBuf; friend class HTTPHeaderStreamBuf; friend class HTTPFixedLengthStreamBuf; @@ -246,6 +279,16 @@ namespace Net return _receiveTimeout; } + inline void HTTPSession::setSendDataHooks(const HTTPSessionDataHooksPtr & sendDataHooks) + { + _sendDataHooks = sendDataHooks; + } + + inline void HTTPSession::setReceiveDataHooks(const HTTPSessionDataHooksPtr & receiveDataHooks) + { + _receiveDataHooks = receiveDataHooks; + } + inline StreamSocket & HTTPSession::socket() { return _socket; diff --git a/base/poco/Net/src/HTTPSession.cpp b/base/poco/Net/src/HTTPSession.cpp index 8f951b3102c..f30ccb21129 100644 --- a/base/poco/Net/src/HTTPSession.cpp +++ b/base/poco/Net/src/HTTPSession.cpp @@ -128,14 +128,14 @@ int HTTPSession::get() { if (_pCurrent == _pEnd) refill(); - + if (_pCurrent < _pEnd) return *_pCurrent++; else return std::char_traits::eof(); } - + int HTTPSession::peek() { if (_pCurrent == _pEnd) @@ -147,7 +147,7 @@ int HTTPSession::peek() return std::char_traits::eof(); } - + int HTTPSession::read(char* buffer, std::streamsize length) { if (_pCurrent < _pEnd) @@ -166,10 +166,17 @@ int HTTPSession::write(const char* buffer, std::streamsize length) { try { - return _socket.sendBytes(buffer, (int) length); + if (_sendDataHooks) + _sendDataHooks->atStart((int) length); + int result = _socket.sendBytes(buffer, (int) length); + if (_sendDataHooks) + _sendDataHooks->atFinish(result); + return result; } catch (Poco::Exception& exc) { + if (_sendDataHooks) + _sendDataHooks->atFail(); setException(exc); throw; } @@ -180,10 +187,17 @@ int HTTPSession::receive(char* buffer, int length) { try { - return _socket.receiveBytes(buffer, length); + if (_receiveDataHooks) + _receiveDataHooks->atStart(length); + int result = _socket.receiveBytes(buffer, length); + if (_receiveDataHooks) + _receiveDataHooks->atFinish(result); + return result; } catch (Poco::Exception& exc) { + if (_receiveDataHooks) + _receiveDataHooks->atFail(); setException(exc); throw; } diff --git a/base/poco/Net/src/SocketImpl.cpp b/base/poco/Net/src/SocketImpl.cpp index 13a655d153d..dbde3f73330 100644 --- a/base/poco/Net/src/SocketImpl.cpp +++ b/base/poco/Net/src/SocketImpl.cpp @@ -63,7 +63,7 @@ bool checkIsBrokenTimeout() SocketImpl::SocketImpl(): _sockfd(POCO_INVALID_SOCKET), - _blocking(true), + _blocking(true), _isBrokenTimeout(checkIsBrokenTimeout()) { } @@ -82,7 +82,7 @@ SocketImpl::~SocketImpl() close(); } - + SocketImpl* SocketImpl::acceptConnection(SocketAddress& clientAddr) { if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); @@ -118,7 +118,7 @@ void SocketImpl::connect(const SocketAddress& address) rc = ::connect(_sockfd, address.addr(), address.length()); } while (rc != 0 && lastError() == POCO_EINTR); - if (rc != 0) + if (rc != 0) { int err = lastError(); error(err, address.toString()); @@ -205,7 +205,7 @@ void SocketImpl::bind6(const SocketAddress& address, bool reuseAddress, bool reu #if defined(POCO_HAVE_IPv6) if (address.family() != SocketAddress::IPv6) throw Poco::InvalidArgumentException("SocketAddress must be an IPv6 address"); - + if (_sockfd == POCO_INVALID_SOCKET) { init(address.af()); @@ -226,11 +226,11 @@ void SocketImpl::bind6(const SocketAddress& address, bool reuseAddress, bool reu #endif } - + void SocketImpl::listen(int backlog) { if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); - + int rc = ::listen(_sockfd, backlog); if (rc != 0) error(); } @@ -254,7 +254,7 @@ void SocketImpl::shutdownReceive() if (rc != 0) error(); } - + void SocketImpl::shutdownSend() { if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); @@ -263,7 +263,7 @@ void SocketImpl::shutdownSend() if (rc != 0) error(); } - + void SocketImpl::shutdown() { if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); @@ -318,7 +318,7 @@ int SocketImpl::receiveBytes(void* buffer, int length, int flags) throw TimeoutException(); } } - + int rc; do { @@ -326,7 +326,7 @@ int SocketImpl::receiveBytes(void* buffer, int length, int flags) rc = ::recv(_sockfd, reinterpret_cast(buffer), length, flags); } while (blocking && rc < 0 && lastError() == POCO_EINTR); - if (rc < 0) + if (rc < 0) { int err = lastError(); if ((err == POCO_EAGAIN || err == POCO_EWOULDBLOCK) && !blocking) @@ -364,7 +364,7 @@ int SocketImpl::receiveFrom(void* buffer, int length, SocketAddress& address, in throw TimeoutException(); } } - + sockaddr_storage abuffer; struct sockaddr* pSA = reinterpret_cast(&abuffer); poco_socklen_t saLen = sizeof(abuffer); @@ -451,7 +451,7 @@ bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode) } while (rc < 0 && lastError() == POCO_EINTR); if (rc < 0) error(); - return rc > 0; + return rc > 0; #else @@ -494,7 +494,7 @@ bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode) } while (rc < 0 && errorCode == POCO_EINTR); if (rc < 0) error(errorCode); - return rc > 0; + return rc > 0; #endif // POCO_HAVE_FD_POLL } @@ -504,13 +504,13 @@ bool SocketImpl::poll(const Poco::Timespan& timeout, int mode) Poco::Timespan remainingTime(timeout); return pollImpl(remainingTime, mode); } - + void SocketImpl::setSendBufferSize(int size) { setOption(SOL_SOCKET, SO_SNDBUF, size); } - + int SocketImpl::getSendBufferSize() { int result; @@ -524,7 +524,7 @@ void SocketImpl::setReceiveBufferSize(int size) setOption(SOL_SOCKET, SO_RCVBUF, size); } - + int SocketImpl::getReceiveBufferSize() { int result; @@ -570,7 +570,7 @@ Poco::Timespan SocketImpl::getReceiveTimeout() return result; } - + SocketAddress SocketImpl::address() { if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); @@ -581,7 +581,7 @@ SocketAddress SocketImpl::address() int rc = ::getsockname(_sockfd, pSA, &saLen); if (rc == 0) return SocketAddress(pSA, saLen); - else + else error(); return SocketAddress(); } diff --git a/docs/changelogs/v24.3.10.33-lts.md b/docs/changelogs/v24.3.10.33-lts.md new file mode 100644 index 00000000000..fbce8f76dad --- /dev/null +++ b/docs/changelogs/v24.3.10.33-lts.md @@ -0,0 +1,32 @@ +--- +sidebar_position: 1 +sidebar_label: 2024 +--- + +# 2024 Changelog + +### ClickHouse release v24.3.10.33-lts (37b6502ebf0) FIXME as compared to v24.3.9.5-lts (a939270465e) + +#### Improvement +* Backported in [#68870](https://github.com/ClickHouse/ClickHouse/issues/68870): Make allow_experimental_analyzer be controlled by the initiator for distributed queries. This ensures compatibility and correctness during operations in mixed version clusters. [#65777](https://github.com/ClickHouse/ClickHouse/pull/65777) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)). +* Backported in [#69095](https://github.com/ClickHouse/ClickHouse/issues/69095): Support for the Spanish language in the embedded dictionaries. [#69035](https://github.com/ClickHouse/ClickHouse/pull/69035) ([Vasily Okunev](https://github.com/VOkunev)). + +#### Bug Fix (user-visible misbehavior in an official stable release) +* Backported in [#68995](https://github.com/ClickHouse/ClickHouse/issues/68995): Fix the upper bound of the function `fromModifiedJulianDay`. It was supposed to be `9999-12-31` but was mistakenly set to `9999-01-01`. [#67583](https://github.com/ClickHouse/ClickHouse/pull/67583) ([PHO](https://github.com/depressed-pho)). +* Backported in [#68844](https://github.com/ClickHouse/ClickHouse/issues/68844): Fixed crash in Parquet filtering when data types in the file substantially differ from requested types (e.g. `... FROM file('a.parquet', Parquet, 'x String')`, but the file has `x Int64`). Without this fix, use `input_format_parquet_filter_push_down = 0` as a workaround. [#68131](https://github.com/ClickHouse/ClickHouse/pull/68131) ([Michael Kolupaev](https://github.com/al13n321)). +* Backported in [#68881](https://github.com/ClickHouse/ClickHouse/issues/68881): Fixes [#50868](https://github.com/ClickHouse/ClickHouse/issues/50868). Small DateTime64 constant values returned by a nested subquery inside a distributed query were wrongly transformed to Nulls, thus causing errors and possible incorrect query results. [#68323](https://github.com/ClickHouse/ClickHouse/pull/68323) ([Shankar](https://github.com/shiyer7474)). +* Backported in [#69054](https://github.com/ClickHouse/ClickHouse/issues/69054): Added back virtual columns ` _table` and `_database` to distributed tables. They were available until version 24.3. [#68672](https://github.com/ClickHouse/ClickHouse/pull/68672) ([Anton Popov](https://github.com/CurtizJ)). +* Backported in [#68856](https://github.com/ClickHouse/ClickHouse/issues/68856): Fix possible error `Size of permutation (0) is less than required (...)` during Variant column permutation. [#68681](https://github.com/ClickHouse/ClickHouse/pull/68681) ([Kruglov Pavel](https://github.com/Avogar)). +* Backported in [#69152](https://github.com/ClickHouse/ClickHouse/issues/69152): Fix possible wrong result during anyHeavy state merge. [#68950](https://github.com/ClickHouse/ClickHouse/pull/68950) ([Raúl Marín](https://github.com/Algunenano)). +* Backported in [#69112](https://github.com/ClickHouse/ClickHouse/issues/69112): Fix logical error when we have empty async insert. [#69080](https://github.com/ClickHouse/ClickHouse/pull/69080) ([Han Fei](https://github.com/hanfei1991)). + +#### NO CL CATEGORY + +* Backported in [#68938](https://github.com/ClickHouse/ClickHouse/issues/68938):. [#68897](https://github.com/ClickHouse/ClickHouse/pull/68897) ([Alexander Gololobov](https://github.com/davenger)). + +#### NOT FOR CHANGELOG / INSIGNIFICANT + +* Backported in [#68826](https://github.com/ClickHouse/ClickHouse/issues/68826): Turn off fault injection for insert in `01396_inactive_replica_cleanup_nodes_zookeeper`. [#68715](https://github.com/ClickHouse/ClickHouse/pull/68715) ([alesapin](https://github.com/alesapin)). +* Backported in [#68754](https://github.com/ClickHouse/ClickHouse/issues/68754): To make patch release possible from every commit on release branch, package_debug build is required and must not be skipped. [#68750](https://github.com/ClickHouse/ClickHouse/pull/68750) ([Max K.](https://github.com/maxknv)). +* Backported in [#69044](https://github.com/ClickHouse/ClickHouse/issues/69044): Fix 01114_database_atomic flakiness. [#68930](https://github.com/ClickHouse/ClickHouse/pull/68930) ([Raúl Marín](https://github.com/Algunenano)). + diff --git a/docs/changelogs/v24.5.7.31-stable.md b/docs/changelogs/v24.5.7.31-stable.md new file mode 100644 index 00000000000..44f43fb7c4b --- /dev/null +++ b/docs/changelogs/v24.5.7.31-stable.md @@ -0,0 +1,29 @@ +--- +sidebar_position: 1 +sidebar_label: 2024 +--- + +# 2024 Changelog + +### ClickHouse release v24.5.7.31-stable (6c185e9aec1) FIXME as compared to v24.5.6.45-stable (bdca8604c29) + +#### Bug Fix (user-visible misbehavior in an official stable release) +* Backported in [#68564](https://github.com/ClickHouse/ClickHouse/issues/68564): Fix indexHint function case found by fuzzer. [#66286](https://github.com/ClickHouse/ClickHouse/pull/66286) ([Anton Popov](https://github.com/CurtizJ)). +* Backported in [#68996](https://github.com/ClickHouse/ClickHouse/issues/68996): Fix the upper bound of the function `fromModifiedJulianDay`. It was supposed to be `9999-12-31` but was mistakenly set to `9999-01-01`. [#67583](https://github.com/ClickHouse/ClickHouse/pull/67583) ([PHO](https://github.com/depressed-pho)). +* Backported in [#68865](https://github.com/ClickHouse/ClickHouse/issues/68865): Fixed crash in Parquet filtering when data types in the file substantially differ from requested types (e.g. `... FROM file('a.parquet', Parquet, 'x String')`, but the file has `x Int64`). Without this fix, use `input_format_parquet_filter_push_down = 0` as a workaround. [#68131](https://github.com/ClickHouse/ClickHouse/pull/68131) ([Michael Kolupaev](https://github.com/al13n321)). +* Backported in [#69004](https://github.com/ClickHouse/ClickHouse/issues/69004): After https://github.com/ClickHouse/ClickHouse/pull/61984 `schema_inference_make_columns_nullable=0` still can make columns `Nullable` in Parquet/Arrow formats. The change was backward incompatible and users noticed the changes in the behaviour. This PR makes `schema_inference_make_columns_nullable=0` to work as before (no Nullable columns will be inferred) and introduces new value `auto` for this setting that will make columns `Nullable` only if data has information about nullability. [#68298](https://github.com/ClickHouse/ClickHouse/pull/68298) ([Kruglov Pavel](https://github.com/Avogar)). +* Backported in [#68882](https://github.com/ClickHouse/ClickHouse/issues/68882): Fixes [#50868](https://github.com/ClickHouse/ClickHouse/issues/50868). Small DateTime64 constant values returned by a nested subquery inside a distributed query were wrongly transformed to Nulls, thus causing errors and possible incorrect query results. [#68323](https://github.com/ClickHouse/ClickHouse/pull/68323) ([Shankar](https://github.com/shiyer7474)). +* Backported in [#69023](https://github.com/ClickHouse/ClickHouse/issues/69023): Added back virtual columns ` _table` and `_database` to distributed tables. They were available until version 24.3. [#68672](https://github.com/ClickHouse/ClickHouse/pull/68672) ([Anton Popov](https://github.com/CurtizJ)). +* Backported in [#68858](https://github.com/ClickHouse/ClickHouse/issues/68858): Fix possible error `Size of permutation (0) is less than required (...)` during Variant column permutation. [#68681](https://github.com/ClickHouse/ClickHouse/pull/68681) ([Kruglov Pavel](https://github.com/Avogar)). +* Backported in [#68784](https://github.com/ClickHouse/ClickHouse/issues/68784): Fix issue with materialized constant keys when hashing maps with arrays as keys in functions `sipHash(64/128)Keyed`. [#68731](https://github.com/ClickHouse/ClickHouse/pull/68731) ([Salvatore Mesoraca](https://github.com/aiven-sal)). +* Backported in [#69154](https://github.com/ClickHouse/ClickHouse/issues/69154): Fix possible wrong result during anyHeavy state merge. [#68950](https://github.com/ClickHouse/ClickHouse/pull/68950) ([Raúl Marín](https://github.com/Algunenano)). + +#### NO CL CATEGORY + +* Backported in [#68940](https://github.com/ClickHouse/ClickHouse/issues/68940):. [#68897](https://github.com/ClickHouse/ClickHouse/pull/68897) ([Alexander Gololobov](https://github.com/davenger)). + +#### NOT FOR CHANGELOG / INSIGNIFICANT + +* Backported in [#68828](https://github.com/ClickHouse/ClickHouse/issues/68828): Turn off fault injection for insert in `01396_inactive_replica_cleanup_nodes_zookeeper`. [#68715](https://github.com/ClickHouse/ClickHouse/pull/68715) ([alesapin](https://github.com/alesapin)). +* Backported in [#69046](https://github.com/ClickHouse/ClickHouse/issues/69046): Fix 01114_database_atomic flakiness. [#68930](https://github.com/ClickHouse/ClickHouse/pull/68930) ([Raúl Marín](https://github.com/Algunenano)). + diff --git a/docs/changelogs/v24.6.5.30-stable.md b/docs/changelogs/v24.6.5.30-stable.md new file mode 100644 index 00000000000..4eac5691799 --- /dev/null +++ b/docs/changelogs/v24.6.5.30-stable.md @@ -0,0 +1,29 @@ +--- +sidebar_position: 1 +sidebar_label: 2024 +--- + +# 2024 Changelog + +### ClickHouse release v24.6.5.30-stable (e6e196c92d6) FIXME as compared to v24.6.4.42-stable (c534bb4b4dd) + +#### Bug Fix (user-visible misbehavior in an official stable release) +* Backported in [#68969](https://github.com/ClickHouse/ClickHouse/issues/68969): Fix the upper bound of the function `fromModifiedJulianDay`. It was supposed to be `9999-12-31` but was mistakenly set to `9999-01-01`. [#67583](https://github.com/ClickHouse/ClickHouse/pull/67583) ([PHO](https://github.com/depressed-pho)). +* Backported in [#68814](https://github.com/ClickHouse/ClickHouse/issues/68814): Fixed crash in Parquet filtering when data types in the file substantially differ from requested types (e.g. `... FROM file('a.parquet', Parquet, 'x String')`, but the file has `x Int64`). Without this fix, use `input_format_parquet_filter_push_down = 0` as a workaround. [#68131](https://github.com/ClickHouse/ClickHouse/pull/68131) ([Michael Kolupaev](https://github.com/al13n321)). +* Backported in [#69005](https://github.com/ClickHouse/ClickHouse/issues/69005): After https://github.com/ClickHouse/ClickHouse/pull/61984 `schema_inference_make_columns_nullable=0` still can make columns `Nullable` in Parquet/Arrow formats. The change was backward incompatible and users noticed the changes in the behaviour. This PR makes `schema_inference_make_columns_nullable=0` to work as before (no Nullable columns will be inferred) and introduces new value `auto` for this setting that will make columns `Nullable` only if data has information about nullability. [#68298](https://github.com/ClickHouse/ClickHouse/pull/68298) ([Kruglov Pavel](https://github.com/Avogar)). +* Backported in [#68883](https://github.com/ClickHouse/ClickHouse/issues/68883): Fixes [#50868](https://github.com/ClickHouse/ClickHouse/issues/50868). Small DateTime64 constant values returned by a nested subquery inside a distributed query were wrongly transformed to Nulls, thus causing errors and possible incorrect query results. [#68323](https://github.com/ClickHouse/ClickHouse/pull/68323) ([Shankar](https://github.com/shiyer7474)). +* Backported in [#69025](https://github.com/ClickHouse/ClickHouse/issues/69025): Added back virtual columns ` _table` and `_database` to distributed tables. They were available until version 24.3. [#68672](https://github.com/ClickHouse/ClickHouse/pull/68672) ([Anton Popov](https://github.com/CurtizJ)). +* Backported in [#68860](https://github.com/ClickHouse/ClickHouse/issues/68860): Fix possible error `Size of permutation (0) is less than required (...)` during Variant column permutation. [#68681](https://github.com/ClickHouse/ClickHouse/pull/68681) ([Kruglov Pavel](https://github.com/Avogar)). +* Backported in [#68786](https://github.com/ClickHouse/ClickHouse/issues/68786): Fix issue with materialized constant keys when hashing maps with arrays as keys in functions `sipHash(64/128)Keyed`. [#68731](https://github.com/ClickHouse/ClickHouse/pull/68731) ([Salvatore Mesoraca](https://github.com/aiven-sal)). +* Backported in [#69156](https://github.com/ClickHouse/ClickHouse/issues/69156): Fix possible wrong result during anyHeavy state merge. [#68950](https://github.com/ClickHouse/ClickHouse/pull/68950) ([Raúl Marín](https://github.com/Algunenano)). +* Backported in [#69116](https://github.com/ClickHouse/ClickHouse/issues/69116): Fix logical error when we have empty async insert. [#69080](https://github.com/ClickHouse/ClickHouse/pull/69080) ([Han Fei](https://github.com/hanfei1991)). + +#### NO CL CATEGORY + +* Backported in [#68942](https://github.com/ClickHouse/ClickHouse/issues/68942):. [#68897](https://github.com/ClickHouse/ClickHouse/pull/68897) ([Alexander Gololobov](https://github.com/davenger)). + +#### NOT FOR CHANGELOG / INSIGNIFICANT + +* Backported in [#68830](https://github.com/ClickHouse/ClickHouse/issues/68830): Turn off fault injection for insert in `01396_inactive_replica_cleanup_nodes_zookeeper`. [#68715](https://github.com/ClickHouse/ClickHouse/pull/68715) ([alesapin](https://github.com/alesapin)). +* Backported in [#69048](https://github.com/ClickHouse/ClickHouse/issues/69048): Fix 01114_database_atomic flakiness. [#68930](https://github.com/ClickHouse/ClickHouse/pull/68930) ([Raúl Marín](https://github.com/Algunenano)). + diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 204e09e92b2..4bf2b0704f1 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -292,6 +292,9 @@ M(DistrCacheWriteRequests, "Number of executed Write requests to Distributed Cache") \ M(DistrCacheServerConnections, "Number of open connections to ClickHouse server from Distributed Cache") \ \ + M(SchedulerIOReadScheduled, "Number of IO reads are being scheduled currently") \ + M(SchedulerIOWriteScheduled, "Number of IO writes are being scheduled currently") \ + \ M(StorageConnectionsStored, "Total count of sessions stored in the session pool for storages") \ M(StorageConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for storages") \ \ diff --git a/src/Common/CurrentThread.cpp b/src/Common/CurrentThread.cpp index 70b69d6bcc7..ba7087ca7f1 100644 --- a/src/Common/CurrentThread.cpp +++ b/src/Common/CurrentThread.cpp @@ -113,6 +113,56 @@ std::string_view CurrentThread::getQueryId() return current_thread->getQueryId(); } +void CurrentThread::attachReadResource(ResourceLink link) +{ + if (unlikely(!current_thread)) + return; + if (current_thread->read_resource_link) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to read resource", std::to_string(getThreadId())); + current_thread->read_resource_link = link; +} + +void CurrentThread::detachReadResource() +{ + if (unlikely(!current_thread)) + return; + if (!current_thread->read_resource_link) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to read resource", std::to_string(getThreadId())); + current_thread->read_resource_link.reset(); +} + +ResourceLink CurrentThread::getReadResourceLink() +{ + if (unlikely(!current_thread)) + return {}; + return current_thread->read_resource_link; +} + +void CurrentThread::attachWriteResource(ResourceLink link) +{ + if (unlikely(!current_thread)) + return; + if (current_thread->write_resource_link) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to write resource", std::to_string(getThreadId())); + current_thread->write_resource_link = link; +} + +void CurrentThread::detachWriteResource() +{ + if (unlikely(!current_thread)) + return; + if (!current_thread->write_resource_link) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to write resource", std::to_string(getThreadId())); + current_thread->write_resource_link.reset(); +} + +ResourceLink CurrentThread::getWriteResourceLink() +{ + if (unlikely(!current_thread)) + return {}; + return current_thread->write_resource_link; +} + MemoryTracker * CurrentThread::getUserMemoryTracker() { if (unlikely(!current_thread)) diff --git a/src/Common/CurrentThread.h b/src/Common/CurrentThread.h index 53b61ba315f..787e8369a83 100644 --- a/src/Common/CurrentThread.h +++ b/src/Common/CurrentThread.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -23,7 +24,6 @@ class QueryStatus; struct Progress; class InternalTextLogsQueue; - /** Collection of static methods to work with thread-local objects. * Allows to attach and detach query/process (thread group) to a thread * (to calculate query-related metrics and to allow to obtain query-related data from a thread). @@ -92,6 +92,14 @@ public: static std::string_view getQueryId(); + // For IO Scheduling + static void attachReadResource(ResourceLink link); + static void detachReadResource(); + static ResourceLink getReadResourceLink(); + static void attachWriteResource(ResourceLink link); + static void detachWriteResource(); + static ResourceLink getWriteResourceLink(); + /// Initializes query with current thread as master thread in constructor, and detaches it in destructor struct QueryScope : private boost::noncopyable { @@ -102,6 +110,39 @@ public: void logPeakMemoryUsage(); bool log_peak_memory_usage_in_destructor = true; }; + + /// Scoped attach/detach of IO resource links + struct IOScope : private boost::noncopyable + { + explicit IOScope(ResourceLink read_resource_link, ResourceLink write_resource_link) + { + if (read_resource_link) + { + attachReadResource(read_resource_link); + read_attached = true; + } + if (write_resource_link) + { + attachWriteResource(write_resource_link); + write_attached = true; + } + } + + explicit IOScope(const IOSchedulingSettings & settings) + : IOScope(settings.read_resource_link, settings.write_resource_link) + {} + + ~IOScope() + { + if (read_attached) + detachReadResource(); + if (write_attached) + detachWriteResource(); + } + + bool read_attached = false; + bool write_attached = false; + }; }; } diff --git a/src/Common/HTTPConnectionPool.cpp b/src/Common/HTTPConnectionPool.cpp index f3ff09bc90a..7a65863180e 100644 --- a/src/Common/HTTPConnectionPool.cpp +++ b/src/Common/HTTPConnectionPool.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -9,6 +10,7 @@ #include #include #include +#include #include #include @@ -236,6 +238,59 @@ public: }; +// Session data hooks implementation for integration with resource scheduler. +// Hooks are created per every request-response pair and are registered/unregistered in HTTP session. +// * `atStart()` send resource request to the scheduler every time HTTP session is going to send or receive +// data to/from socket. `start()` waits for the scheduler confirmation. This way scheduler might +// throttle and/or schedule socket data streams. +// * `atFinish()` hook is called on successful socket read/write operation. +// It informs the scheduler that operation is complete, which allows the scheduler to control the total +// amount of in-flight bytes and/or operations. +// * `atFail()` hook is called on failure of socket operation. The purpose is to correct the amount of bytes +// passed through the scheduler queue to ensure fair bandwidth allocation even in presence of errors. +struct ResourceGuardSessionDataHooks : public Poco::Net::IHTTPSessionDataHooks +{ + ResourceGuardSessionDataHooks(ResourceLink link_, const ResourceGuard::Metrics * metrics, LoggerPtr log_, const String & method, const String & uri) + : link(link_) + , log(log_) + , http_request(method + " " + uri) + { + request.metrics = metrics; + chassert(link); + } + + ~ResourceGuardSessionDataHooks() override + { + request.assertFinished(); // Never destruct with an active request + } + + void atStart(int bytes) override + { + Stopwatch timer; + request.enqueue(bytes, link); + request.wait(); + timer.stop(); + if (timer.elapsedMilliseconds() >= 5000) + LOG_INFO(log, "Resource request took too long to finish: {} ms for {}", timer.elapsedMilliseconds(), http_request); + } + + void atFinish(int bytes) override + { + request.finish(bytes, link); + } + + void atFail() override + { + request.finish(0, link); + } + + ResourceLink link; + ResourceGuard::Request request; + LoggerPtr log; + String http_request; +}; + + // EndpointConnectionPool manage connections to the endpoint // Features: // - it uses HostResolver for address selecting. See Common/HostResolver.h for more info. @@ -246,8 +301,6 @@ public: // - `Session::reconnect()` uses the pool as well // - comprehensive sensors // - session is reused according its inner state, automatically - - template class EndpointConnectionPool : public std::enable_shared_from_this>, public IExtendedPool { @@ -337,6 +390,13 @@ private: std::ostream & sendRequest(Poco::Net::HTTPRequest & request) override { auto idle = idleTime(); + + // Set data hooks for IO scheduling + if (ResourceLink link = CurrentThread::getReadResourceLink()) + Session::setReceiveDataHooks(std::make_shared(link, ResourceGuard::Metrics::getIORead(), log, request.getMethod(), request.getURI())); + if (ResourceLink link = CurrentThread::getWriteResourceLink()) + Session::setSendDataHooks(std::make_shared(link, ResourceGuard::Metrics::getIOWrite(), log, request.getMethod(), request.getURI())); + std::ostream & result = Session::sendRequest(request); result.exceptions(std::ios::badbit); @@ -393,6 +453,8 @@ private: } } response_stream = nullptr; + Session::setSendDataHooks(); + Session::setReceiveDataHooks(); group->atConnectionDestroy(); diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 044f787aee9..af1b7fbeb4a 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -120,6 +120,13 @@ M(PartsWithAppliedMutationsOnFly, "Total number of parts for which there was any mutation applied on fly") \ M(MutationsAppliedOnFlyInAllParts, "The sum of number of applied mutations on-fly for part among all read parts") \ \ + M(SchedulerIOReadRequests, "Resource requests passed through scheduler for IO reads.") \ + M(SchedulerIOReadBytes, "Bytes passed through scheduler for IO reads.") \ + M(SchedulerIOReadWaitMicroseconds, "Total time a query was waiting on resource requests for IO reads.") \ + M(SchedulerIOWriteRequests, "Resource requests passed through scheduler for IO writes.") \ + M(SchedulerIOWriteBytes, "Bytes passed through scheduler for IO writes.") \ + M(SchedulerIOWriteWaitMicroseconds, "Total time a query was waiting on resource requests for IO writes.") \ + \ M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \ \ M(ReplicatedPartFetches, "Number of times a data part was downloaded from replica of a ReplicatedMergeTree table.") \ diff --git a/src/Common/Scheduler/ISchedulerQueue.h b/src/Common/Scheduler/ISchedulerQueue.h index 532f4bf6c63..b7a51870a24 100644 --- a/src/Common/Scheduler/ISchedulerQueue.h +++ b/src/Common/Scheduler/ISchedulerQueue.h @@ -22,10 +22,13 @@ public: {} // Wrapper for `enqueueRequest()` that should be used to account for available resource budget - void enqueueRequestUsingBudget(ResourceRequest * request) + // Returns `estimated_cost` that should be passed later to `adjustBudget()` + [[ nodiscard ]] ResourceCost enqueueRequestUsingBudget(ResourceRequest * request) { - request->cost = budget.ask(request->cost); + ResourceCost estimated_cost = request->cost; + request->cost = budget.ask(estimated_cost); enqueueRequest(request); + return estimated_cost; } // Should be called to account for difference between real and estimated costs @@ -34,18 +37,6 @@ public: budget.adjust(estimated_cost, real_cost); } - // Adjust budget to account for extra consumption of `cost` resource units - void consumeBudget(ResourceCost cost) - { - adjustBudget(0, cost); - } - - // Adjust budget to account for requested, but not consumed `cost` resource units - void accumulateBudget(ResourceCost cost) - { - adjustBudget(cost, 0); - } - /// Enqueue new request to be executed using underlying resource. /// Should be called outside of scheduling subsystem, implementation must be thread-safe. virtual void enqueueRequest(ResourceRequest * request) = 0; diff --git a/src/Common/Scheduler/Nodes/tests/ResourceTest.h b/src/Common/Scheduler/Nodes/tests/ResourceTest.h index ea3f9edf765..c787a686a09 100644 --- a/src/Common/Scheduler/Nodes/tests/ResourceTest.h +++ b/src/Common/Scheduler/Nodes/tests/ResourceTest.h @@ -232,12 +232,13 @@ struct ResourceTestManager : public ResourceTestBase ResourceTestManager & t; Guard(ResourceTestManager & t_, ResourceLink link_, ResourceCost cost) - : ResourceGuard(link_, cost, PostponeLocking) + : ResourceGuard(ResourceGuard::Metrics::getIOWrite(), link_, cost, Lock::Defer) , t(t_) { t.onEnqueue(link); lock(); t.onExecute(link); + consume(cost); } }; @@ -310,8 +311,9 @@ struct ResourceTestManager : public ResourceTestBase // NOTE: actually leader's request(s) make their own small busy period. void blockResource(ResourceLink link) { - ResourceGuard g(link, 1, ResourceGuard::PostponeLocking); + ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, 1, ResourceGuard::Lock::Defer); g.lock(); + g.consume(1); // NOTE: at this point we assume resource to be blocked by single request (1) busy_period.arrive_and_wait(); // (1) notify all followers that resource is blocked busy_period.arrive_and_wait(); // (2) wait all followers to enqueue their requests @@ -320,10 +322,11 @@ struct ResourceTestManager : public ResourceTestBase { getLinkData(link).left += total_requests + 1; busy_period.arrive_and_wait(); // (1) wait leader to block resource - ResourceGuard g(link, cost, ResourceGuard::PostponeLocking); + ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, cost, ResourceGuard::Lock::Defer); onEnqueue(link); busy_period.arrive_and_wait(); // (2) notify leader to unblock g.lock(); + g.consume(cost); onExecute(link); } }; diff --git a/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp b/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp index 1901a4fd120..3328196cced 100644 --- a/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp +++ b/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp @@ -36,11 +36,16 @@ TEST(SchedulerDynamicResourceManager, Smoke) for (int i = 0; i < 10; i++) { - ResourceGuard gA(cA->get("res1"), ResourceGuard::PostponeLocking); + ResourceGuard gA(ResourceGuard::Metrics::getIOWrite(), cA->get("res1"), 1, ResourceGuard::Lock::Defer); gA.lock(); + gA.consume(1); gA.unlock(); - ResourceGuard gB(cB->get("res1")); + ResourceGuard gB(ResourceGuard::Metrics::getIOWrite(), cB->get("res1")); + gB.unlock(); + + ResourceGuard gC(ResourceGuard::Metrics::getIORead(), cB->get("res1")); + gB.consume(2); } } diff --git a/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp b/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp index f8196d15819..ddfe0cfbc6f 100644 --- a/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp +++ b/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp @@ -1,11 +1,13 @@ #include -#include - #include +#include +#include + #include #include +#include using namespace DB; @@ -22,6 +24,17 @@ struct ResourceTest : public ResourceTestBase { scheduler.stop(true); } + + std::mutex rng_mutex; + pcg64 rng{randomSeed()}; + + template + T randomInt(T from, T to) + { + std::uniform_int_distribution distribution(from, to); + std::lock_guard lock(rng_mutex); + return distribution(rng); + } }; struct ResourceHolder @@ -109,26 +122,55 @@ TEST(SchedulerRoot, Smoke) r2.registerResource(); { - ResourceGuard rg(a); + ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), a); EXPECT_TRUE(fc1->requests.contains(&rg.request)); + rg.consume(1); } { - ResourceGuard rg(b); + ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), b); EXPECT_TRUE(fc1->requests.contains(&rg.request)); + rg.consume(1); } { - ResourceGuard rg(c); + ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), c); EXPECT_TRUE(fc2->requests.contains(&rg.request)); + rg.consume(1); } { - ResourceGuard rg(d); + ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), d); EXPECT_TRUE(fc2->requests.contains(&rg.request)); + rg.consume(1); } } +TEST(SchedulerRoot, Budget) +{ + ResourceTest t; + + ResourceHolder r1(t); + r1.add("/", "1"); + r1.add("/prio"); + auto a = r1.addQueue("/prio/A", ""); + r1.registerResource(); + + ResourceCost total_real_cost = 0; + int total_requests = 10; + for (int i = 0 ; i < total_requests; i++) + { + ResourceCost est_cost = t.randomInt(1, 10); + ResourceCost real_cost = t.randomInt(0, 10); + ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), a, est_cost); + rg.consume(real_cost); + total_real_cost += real_cost; + } + + EXPECT_EQ(total_requests, a.queue->dequeued_requests); + EXPECT_EQ(total_real_cost, a.queue->dequeued_cost - a.queue->getBudget()); +} + TEST(SchedulerRoot, Cancel) { ResourceTest t; diff --git a/src/Common/Scheduler/ResouceLink.cpp b/src/Common/Scheduler/ResouceLink.cpp deleted file mode 100644 index 2da5dba62dc..00000000000 --- a/src/Common/Scheduler/ResouceLink.cpp +++ /dev/null @@ -1,25 +0,0 @@ -#include -#include -#include - -namespace DB -{ -void ResourceLink::adjust(ResourceCost estimated_cost, ResourceCost real_cost) const -{ - if (queue) - queue->adjustBudget(estimated_cost, real_cost); -} - -void ResourceLink::consumed(ResourceCost cost) const -{ - if (queue) - queue->consumeBudget(cost); -} - -void ResourceLink::accumulate(DB::ResourceCost cost) const -{ - if (queue) - queue->accumulateBudget(cost); -} -} - diff --git a/src/Common/Scheduler/ResourceGuard.h b/src/Common/Scheduler/ResourceGuard.h index 3c29f588fba..cf97f7acf93 100644 --- a/src/Common/Scheduler/ResourceGuard.h +++ b/src/Common/Scheduler/ResourceGuard.h @@ -7,10 +7,30 @@ #include #include +#include +#include +#include + #include #include +namespace ProfileEvents +{ + extern const Event SchedulerIOReadRequests; + extern const Event SchedulerIOReadBytes; + extern const Event SchedulerIOReadWaitMicroseconds; + extern const Event SchedulerIOWriteRequests; + extern const Event SchedulerIOWriteBytes; + extern const Event SchedulerIOWriteWaitMicroseconds; +} + +namespace CurrentMetrics +{ + extern const Metric SchedulerIOReadScheduled; + extern const Metric SchedulerIOWriteScheduled; +} + namespace DB { @@ -22,12 +42,42 @@ namespace DB class ResourceGuard { public: - enum ResourceGuardCtor + enum class Lock { - LockStraightAway, /// Locks inside constructor (default) + Default, /// Locks inside constructor // WARNING: Only for tests. It is not exception-safe because `lock()` must be called after construction. - PostponeLocking /// Don't lock in constructor, but send request + Defer /// Don't lock in constructor, but send request + }; + + struct Metrics + { + const ProfileEvents::Event requests = ProfileEvents::end(); + const ProfileEvents::Event cost = ProfileEvents::end(); + const ProfileEvents::Event wait_microseconds = ProfileEvents::end(); + const CurrentMetrics::Metric scheduled_count = CurrentMetrics::end(); + + static const Metrics * getIORead() + { + static Metrics metrics{ + .requests = ProfileEvents::SchedulerIOReadRequests, + .cost = ProfileEvents::SchedulerIOReadBytes, + .wait_microseconds = ProfileEvents::SchedulerIOReadWaitMicroseconds, + .scheduled_count = CurrentMetrics::SchedulerIOReadScheduled + }; + return &metrics; + } + + static const Metrics * getIOWrite() + { + static Metrics metrics{ + .requests = ProfileEvents::SchedulerIOWriteRequests, + .cost = ProfileEvents::SchedulerIOWriteBytes, + .wait_microseconds = ProfileEvents::SchedulerIOWriteWaitMicroseconds, + .scheduled_count = CurrentMetrics::SchedulerIOWriteScheduled + }; + return &metrics; + } }; enum RequestState @@ -46,60 +96,74 @@ public: chassert(state == Finished); state = Enqueued; ResourceRequest::reset(cost_); - link_.queue->enqueueRequestUsingBudget(this); + estimated_cost = link_.queue->enqueueRequestUsingBudget(this); // NOTE: it modifies `cost` and enqueues request } // This function is executed inside scheduler thread and wakes thread issued this `request`. // That thread will continue execution and do real consumption of requested resource synchronously. void execute() override { - { - std::unique_lock lock(mutex); - chassert(state == Enqueued); - state = Dequeued; - } + std::unique_lock lock(mutex); + chassert(state == Enqueued); + state = Dequeued; dequeued_cv.notify_one(); } void wait() { + CurrentMetrics::Increment scheduled(metrics->scheduled_count); + auto timer = CurrentThread::getProfileEvents().timer(metrics->wait_microseconds); std::unique_lock lock(mutex); dequeued_cv.wait(lock, [this] { return state == Dequeued; }); } - void finish() + void finish(ResourceCost real_cost_, ResourceLink link_) { // lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread chassert(state == Dequeued); state = Finished; + if (estimated_cost != real_cost_) + link_.queue->adjustBudget(estimated_cost, real_cost_); ResourceRequest::finish(); + ProfileEvents::increment(metrics->requests); + ProfileEvents::increment(metrics->cost, real_cost_); } - static Request & local() + void assertFinished() + { + // lock(mutex) is not required because `Finished` request cannot be used by the scheduler thread + chassert(state == Finished); + } + + static Request & local(const Metrics * metrics) { // Since single thread cannot use more than one resource request simultaneously, // we can reuse thread-local request to avoid allocations static thread_local Request instance; + instance.metrics = metrics; return instance; } + const Metrics * metrics = nullptr; // Must be initialized before use + private: + ResourceCost estimated_cost = 0; // Stores initial `cost` value in case budget was used to modify it std::mutex mutex; std::condition_variable dequeued_cv; RequestState state = Finished; }; - /// Creates pending request for resource; blocks while resource is not available (unless `PostponeLocking`) - explicit ResourceGuard(ResourceLink link_, ResourceCost cost = 1, ResourceGuardCtor ctor = LockStraightAway) + /// Creates pending request for resource; blocks while resource is not available (unless `Lock::Defer`) + explicit ResourceGuard(const Metrics * metrics, ResourceLink link_, ResourceCost cost = 1, ResourceGuard::Lock type = ResourceGuard::Lock::Default) : link(link_) - , request(Request::local()) + , request(Request::local(metrics)) { if (cost == 0) - link.queue = nullptr; // Ignore zero-cost requests - else if (link.queue) + link.reset(); // Ignore zero-cost requests + else if (link) { request.enqueue(cost, link); - if (ctor == LockStraightAway) + if (type == Lock::Default) request.wait(); } } @@ -112,22 +176,29 @@ public: /// Blocks until resource is available void lock() { - if (link.queue) + if (link) request.wait(); } - /// Report resource consumption has finished - void unlock() + void consume(ResourceCost cost) { - if (link.queue) + real_cost += cost; + } + + /// Report resource consumption has finished + void unlock(ResourceCost consumed = 0) + { + consume(consumed); + if (link) { - request.finish(); - link.queue = nullptr; + request.finish(real_cost, link); + link.reset(); } } ResourceLink link; Request & request; + ResourceCost real_cost = 0; }; } diff --git a/src/Common/Scheduler/ResourceLink.h b/src/Common/Scheduler/ResourceLink.h index 450d9bc1efa..a4e2adbd963 100644 --- a/src/Common/Scheduler/ResourceLink.h +++ b/src/Common/Scheduler/ResourceLink.h @@ -13,13 +13,28 @@ using ResourceCost = Int64; struct ResourceLink { ISchedulerQueue * queue = nullptr; + bool operator==(const ResourceLink &) const = default; + explicit operator bool() const { return queue != nullptr; } - void adjust(ResourceCost estimated_cost, ResourceCost real_cost) const; + void reset() + { + queue = nullptr; + } +}; - void consumed(ResourceCost cost) const; +/* + * Everything required for IO scheduling. + * Note that raw pointer are stored inside, so make sure that `ClassifierPtr` that produced + * resource links will outlive them. Usually classifier is stored in query `Context`. + */ +struct IOSchedulingSettings +{ + ResourceLink read_resource_link; + ResourceLink write_resource_link; - void accumulate(ResourceCost cost) const; + bool operator==(const IOSchedulingSettings &) const = default; + explicit operator bool() const { return read_resource_link && write_resource_link; } }; } diff --git a/src/Common/Scheduler/ResourceRequest.h b/src/Common/Scheduler/ResourceRequest.h index d64f624cec5..7b6a5af0fe6 100644 --- a/src/Common/Scheduler/ResourceRequest.h +++ b/src/Common/Scheduler/ResourceRequest.h @@ -45,7 +45,7 @@ constexpr ResourceCost ResourceCostMax = std::numeric_limits::max(); class ResourceRequest : public boost::intrusive::list_base_hook<> { public: - /// Cost of request execution; should be filled before request enqueueing. + /// Cost of request execution; should be filled before request enqueueing and remain constant until `finish()`. /// NOTE: If cost is not known in advance, ResourceBudget should be used (note that every ISchedulerQueue has it) ResourceCost cost; diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index 0c02ab8fdb0..6ea0a9a848c 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -7,11 +7,11 @@ #include #include #include +#include #include #include -#include #include #include #include @@ -188,6 +188,10 @@ public: Progress progress_in; Progress progress_out; + /// IO scheduling + ResourceLink read_resource_link; + ResourceLink write_resource_link; + private: /// Group of threads, to which this thread attached ThreadGroupPtr thread_group; diff --git a/src/Core/SettingsFields.cpp b/src/Core/SettingsFields.cpp index 278b1101c71..930eedb8d70 100644 --- a/src/Core/SettingsFields.cpp +++ b/src/Core/SettingsFields.cpp @@ -210,7 +210,7 @@ namespace { UInt64 stringToMaxThreads(const String & str) { - if (startsWith(str, "auto")) + if (startsWith(str, "auto") || startsWith(str, "'auto")) return 0; return parseFromString(str); } @@ -237,7 +237,8 @@ SettingFieldMaxThreads & SettingFieldMaxThreads::operator=(const Field & f) String SettingFieldMaxThreads::toString() const { if (is_auto) - return "auto(" + ::DB::toString(value) + ")"; + /// Removing quotes here will introduce an incompatibility between replicas with different versions. + return "'auto(" + ::DB::toString(value) + ")'"; else return ::DB::toString(value); } diff --git a/src/DataTypes/DataTypeObject.cpp b/src/DataTypes/DataTypeObject.cpp index bb0bb928b4f..d8cbfda4afe 100644 --- a/src/DataTypes/DataTypeObject.cpp +++ b/src/DataTypes/DataTypeObject.cpp @@ -519,10 +519,10 @@ static DataTypePtr createJSON(const ASTPtr & arguments) if (!context) context = Context::getGlobalContextInstance(); - if (context->getSettingsRef().use_json_alias_for_old_object_type) + if (context->getSettingsRef().allow_experimental_object_type && context->getSettingsRef().use_json_alias_for_old_object_type) { if (arguments && !arguments->children.empty()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Experimental Object type doesn't support any arguments. If you want to use new JSON type, set setting allow_experimental_json_type = 1"); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Experimental Object type doesn't support any arguments. If you want to use new JSON type, set settings allow_experimental_json_type = 1 and use_json_alias_for_old_object_type = 0"); return std::make_shared("JSON", false); } diff --git a/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp b/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp index ba864035777..dc547c5a8e8 100644 --- a/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -113,7 +114,9 @@ bool ReadBufferFromAzureBlobStorage::nextImpl() { try { + ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, to_read_bytes); bytes_read = data_stream->ReadToCount(reinterpret_cast(data_ptr), to_read_bytes); + rlock.unlock(bytes_read); // Do not hold resource under bandwidth throttler if (read_settings.remote_throttler) read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds); break; diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp index 60fa2997c50..29d3cc8ebd2 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp @@ -101,15 +101,13 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function func, { try { - ResourceGuard rlock(write_settings.resource_link, cost); // Note that zero-cost requests are ignored + ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, cost); // Note that zero-cost requests are ignored func(); + rlock.unlock(cost); break; } catch (const Azure::Core::RequestFailedException & e) { - if (cost) - write_settings.resource_link.accumulate(cost); // Accumulate resource for later use, because we have failed to consume it - if (i == num_tries - 1 || !isRetryableAzureException(e)) throw; @@ -117,8 +115,6 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function func, } catch (...) { - if (cost) - write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure throw; } } diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.cpp b/src/Disks/ObjectStorages/DiskObjectStorage.cpp index 4de6d78e952..07e2edac129 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorage.cpp @@ -461,14 +461,17 @@ DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage() } template -static inline Settings updateResourceLink(const Settings & settings, const String & resource_name) +static inline Settings updateIOSchedulingSettings(const Settings & settings, const String & read_resource_name, const String & write_resource_name) { - if (resource_name.empty()) + if (read_resource_name.empty() && write_resource_name.empty()) return settings; if (auto query_context = CurrentThread::getQueryContext()) { Settings result(settings); - result.resource_link = query_context->getWorkloadClassifier()->get(resource_name); + if (!read_resource_name.empty()) + result.io_scheduling.read_resource_link = query_context->getWorkloadClassifier()->get(read_resource_name); + if (!write_resource_name.empty()) + result.io_scheduling.write_resource_link = query_context->getWorkloadClassifier()->get(write_resource_name); return result; } return settings; @@ -500,7 +503,7 @@ std::unique_ptr DiskObjectStorage::readFile( return object_storage->readObjects( storage_objects, - updateResourceLink(settings, getReadResourceName()), + updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName()), read_hint, file_size); } @@ -513,7 +516,7 @@ std::unique_ptr DiskObjectStorage::writeFile( { LOG_TEST(log, "Write file: {}", path); - WriteSettings write_settings = updateResourceLink(settings, getWriteResourceName()); + WriteSettings write_settings = updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName()); auto transaction = createObjectStorageTransaction(); return transaction->writeFile(path, buf_size, mode, write_settings); } diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index 6972bae64b4..a6176723497 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -6,7 +6,6 @@ #include #include -#include #include #include @@ -423,22 +422,13 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si ProfileEventTimeIncrement watch(ProfileEvents::ReadBufferFromS3InitMicroseconds); // We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below - constexpr ResourceCost estimated_cost = 1; - ResourceGuard rlock(read_settings.resource_link, estimated_cost); - + CurrentThread::IOScope io_scope(read_settings.io_scheduling); Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req); - rlock.unlock(); - if (outcome.IsSuccess()) - { - ResourceCost bytes_read = outcome.GetResult().GetContentLength(); - read_settings.resource_link.adjust(estimated_cost, bytes_read); return outcome.GetResultWithOwnership(); - } else { - read_settings.resource_link.accumulate(estimated_cost); const auto & error = outcome.GetError(); throw S3Exception(error.GetMessage(), error.GetErrorType()); } diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index e73a9054928..7c22682dc76 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -118,8 +118,7 @@ struct ReadSettings ThrottlerPtr remote_throttler; ThrottlerPtr local_throttler; - // Resource to be used during reading - ResourceLink resource_link; + IOSchedulingSettings io_scheduling; size_t http_max_tries = 10; size_t http_retry_initial_backoff_ms = 100; diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index e702b4d35ad..7a978e951a7 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -11,7 +11,6 @@ #include #include -#include #include #include #include @@ -558,12 +557,11 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data) auto & request = std::get<0>(*worker_data); - ResourceCost cost = request.GetContentLength(); - ResourceGuard rlock(write_settings.resource_link, cost); + CurrentThread::IOScope io_scope(write_settings.io_scheduling); + Stopwatch watch; auto outcome = client_ptr->UploadPart(request); watch.stop(); - rlock.unlock(); // Avoid acquiring other locks under resource lock ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); @@ -577,7 +575,6 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data) if (!outcome.IsSuccess()) { ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); - write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType()); } @@ -715,12 +712,11 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3PutObject); - ResourceCost cost = request.GetContentLength(); - ResourceGuard rlock(write_settings.resource_link, cost); + CurrentThread::IOScope io_scope(write_settings.io_scheduling); + Stopwatch watch; auto outcome = client_ptr->PutObject(request); watch.stop(); - rlock.unlock(); ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); if (blob_log) @@ -734,7 +730,6 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data } ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); - write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) { diff --git a/src/IO/WriteSettings.h b/src/IO/WriteSettings.h index 84bb25439b5..cdc75e8c0e9 100644 --- a/src/IO/WriteSettings.h +++ b/src/IO/WriteSettings.h @@ -13,8 +13,7 @@ struct WriteSettings ThrottlerPtr remote_throttler; ThrottlerPtr local_throttler; - // Resource to be used during reading - ResourceLink resource_link; + IOSchedulingSettings io_scheduling; /// Filesystem cache settings bool enable_filesystem_cache_on_write_operations = false; diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 80cb0510b35..e9f40bdbaf5 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -821,6 +821,19 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti { properties.indices = as_storage_metadata->getSecondaryIndices(); properties.projections = as_storage_metadata->getProjections().clone(); + + /// CREATE TABLE AS should copy PRIMARY KEY, ORDER BY, and similar clauses. + if (!create.storage->primary_key && as_storage_metadata->isPrimaryKeyDefined() && as_storage_metadata->hasPrimaryKey()) + create.storage->set(create.storage->primary_key, as_storage_metadata->getPrimaryKeyAST()->clone()); + + if (!create.storage->partition_by && as_storage_metadata->isPartitionKeyDefined() && as_storage_metadata->hasPartitionKey()) + create.storage->set(create.storage->partition_by, as_storage_metadata->getPartitionKeyAST()->clone()); + + if (!create.storage->order_by && as_storage_metadata->isSortingKeyDefined() && as_storage_metadata->hasSortingKey()) + create.storage->set(create.storage->order_by, as_storage_metadata->getSortingKeyAST()->clone()); + + if (!create.storage->sample_by && as_storage_metadata->isSamplingKeyDefined() && as_storage_metadata->hasSamplingKey()) + create.storage->set(create.storage->sample_by, as_storage_metadata->getSamplingKeyAST()->clone()); } else { diff --git a/src/Planner/PlannerJoins.cpp b/src/Planner/PlannerJoins.cpp index 5acff9dac82..42aeb2b9b1d 100644 --- a/src/Planner/PlannerJoins.cpp +++ b/src/Planner/PlannerJoins.cpp @@ -494,6 +494,12 @@ JoinClausesAndActions buildJoinClausesAndActions( necessary_names.push_back(name); }; + bool is_join_with_special_storage = false; + if (const auto * right_table_node = join_node.getRightTableExpression()->as()) + { + is_join_with_special_storage = dynamic_cast(right_table_node->getStorage().get()); + } + for (auto & join_clause : result.join_clauses) { const auto & left_filter_condition_nodes = join_clause.getLeftFilterConditionNodes(); @@ -561,7 +567,7 @@ JoinClausesAndActions buildJoinClausesAndActions( if (!left_key_node->result_type->equals(*common_type)) left_key_node = &left_join_actions.addCast(*left_key_node, common_type, {}); - if (!right_key_node->result_type->equals(*common_type)) + if (!is_join_with_special_storage && !right_key_node->result_type->equals(*common_type)) right_key_node = &right_join_actions.addCast(*right_key_node, common_type, {}); } diff --git a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp index bf6f9db722c..7498e949073 100644 --- a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp +++ b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp @@ -119,27 +119,16 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory(num_bytes_to_read)); - } - catch (...) - { - read_settings.resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure - throw; - } - rlock.unlock(); + ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, num_bytes_to_read); + int bytes_read = hdfsRead(fs.get(), fin, internal_buffer.begin(), safe_cast(num_bytes_to_read)); + rlock.unlock(std::max(0, bytes_read)); if (bytes_read < 0) { - read_settings.resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure throw Exception(ErrorCodes::NETWORK_ERROR, "Fail to read from HDFS: {}, file path: {}. Error: {}", hdfs_uri, hdfs_file_path, std::string(hdfsGetLastError())); } - read_settings.resource_link.adjust(num_bytes_to_read, bytes_read); if (bytes_read) { diff --git a/src/Storages/ObjectStorage/HDFS/WriteBufferFromHDFS.cpp b/src/Storages/ObjectStorage/HDFS/WriteBufferFromHDFS.cpp index e2e7f238a5e..4f6f8c782f2 100644 --- a/src/Storages/ObjectStorage/HDFS/WriteBufferFromHDFS.cpp +++ b/src/Storages/ObjectStorage/HDFS/WriteBufferFromHDFS.cpp @@ -66,25 +66,12 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl int write(const char * start, size_t size) { - ResourceGuard rlock(write_settings.resource_link, size); - int bytes_written; - try - { - bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast(size)); - } - catch (...) - { - write_settings.resource_link.accumulate(size); // We assume no resource was used in case of failure - throw; - } - rlock.unlock(); + ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, size); + int bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast(size)); + rlock.unlock(std::max(0, bytes_written)); if (bytes_written < 0) - { - write_settings.resource_link.accumulate(size); // We assume no resource was used in case of failure throw Exception(ErrorCodes::NETWORK_ERROR, "Fail to write HDFS file: {} {}", hdfs_uri, std::string(hdfsGetLastError())); - } - write_settings.resource_link.adjust(size, bytes_written); if (write_settings.remote_throttler) write_settings.remote_throttler->add(bytes_written, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ff8e362aa36..82a59d9b5b6 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -221,14 +221,17 @@ void StorageReplicatedMergeTree::setZooKeeper() /// strange effects. So we always use only one session for all tables. /// (excluding auxiliary zookeepers) - std::lock_guard lock(current_zookeeper_mutex); if (zookeeper_name == default_zookeeper_name) { - current_zookeeper = getContext()->getZooKeeper(); + auto new_keeper = getContext()->getZooKeeper(); + std::lock_guard lock(current_zookeeper_mutex); + current_zookeeper = new_keeper; } else { - current_zookeeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name); + auto new_keeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name); + std::lock_guard lock(current_zookeeper_mutex); + current_zookeeper = new_keeper; } } @@ -365,7 +368,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_name); if (has_zookeeper) { - /// It's possible for getZooKeeper() to timeout if zookeeper host(s) can't + /// It's possible for getZooKeeper() to timeout if zookeeper host(s) can't /// be reached. In such cases Poco::Exception is thrown after a connection /// timeout - refer to src/Common/ZooKeeper/ZooKeeperImpl.cpp:866 for more info. /// diff --git a/tests/ci/changelog.py b/tests/ci/changelog.py index 554ba339892..8e7900de353 100755 --- a/tests/ci/changelog.py +++ b/tests/ci/changelog.py @@ -288,7 +288,7 @@ def generate_description(item: PullRequest, repo: Repository) -> Optional[Descri # Normalize bug fixes if ( re.match( - r".*(?i)bug\Wfix", + r"(?i).*bug\Wfix", category, ) # Map "Critical Bug Fix" to "Bug fix" category for changelog diff --git a/tests/integration/test_cgroup_limit/test.py b/tests/integration/test_cgroup_limit/test.py index 5d56135d9ff..e77b0f70960 100644 --- a/tests/integration/test_cgroup_limit/test.py +++ b/tests/integration/test_cgroup_limit/test.py @@ -46,7 +46,7 @@ def test_cgroup_cpu_limit(): "clickhouse local -q \"select value from system.settings where name='max_threads'\"", num_cpus, ) - expect_output = (r"auto({})".format(math.ceil(num_cpus))).encode() + expect_output = (r"\'auto({})\'".format(math.ceil(num_cpus))).encode() assert ( result.strip() == expect_output ), f"fail for cpu limit={num_cpus}, result={result.strip()}, expect={expect_output}" diff --git a/tests/integration/test_inserts_with_keeper_retries/configs/storage_conf.xml b/tests/integration/test_inserts_with_keeper_retries/configs/storage_conf.xml new file mode 100644 index 00000000000..19ba29de163 --- /dev/null +++ b/tests/integration/test_inserts_with_keeper_retries/configs/storage_conf.xml @@ -0,0 +1,8 @@ + + + + + test + + + diff --git a/tests/integration/test_inserts_with_keeper_retries/test.py b/tests/integration/test_inserts_with_keeper_retries/test.py index 3937823a37b..3294d4ffa02 100644 --- a/tests/integration/test_inserts_with_keeper_retries/test.py +++ b/tests/integration/test_inserts_with_keeper_retries/test.py @@ -3,6 +3,7 @@ import pytest import time import threading +import uuid from helpers.cluster import ClickHouseCluster from multiprocessing.dummy import Pool from helpers.network import PartitionManager @@ -10,8 +11,12 @@ from helpers.client import QueryRuntimeException from helpers.test_tools import assert_eq_with_retry cluster = ClickHouseCluster(__file__) - -node1 = cluster.add_instance("node1", with_zookeeper=True) +node1 = cluster.add_instance( + "node1", + main_configs=["configs/storage_conf.xml"], + with_zookeeper=True, + with_minio=True, +) @pytest.fixture(scope="module") @@ -25,10 +30,16 @@ def started_cluster(): cluster.shutdown() -def test_replica_inserts_with_keeper_restart(started_cluster): +@pytest.mark.parametrize( + "engine,storage_policy", + [ + ("ReplicatedMergeTree", "default"), + ], +) +def test_replica_inserts_with_keeper_restart(started_cluster, engine, storage_policy): try: node1.query( - "CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '0') ORDER BY tuple()" + f"CREATE TABLE r (a UInt64, b String) ENGINE={engine}('/test/r', '0') ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'" ) p = Pool(1) @@ -60,10 +71,18 @@ def test_replica_inserts_with_keeper_restart(started_cluster): node1.query("DROP TABLE IF EXISTS r SYNC") -def test_replica_inserts_with_keeper_disconnect(started_cluster): +@pytest.mark.parametrize( + "engine,storage_policy", + [ + ("ReplicatedMergeTree", "default"), + ], +) +def test_replica_inserts_with_keeper_disconnect( + started_cluster, engine, storage_policy +): try: node1.query( - "CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '0') ORDER BY tuple()" + f"CREATE TABLE r2 (a UInt64, b String) ENGINE={engine}('/test/r2', '0') ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'" ) p = Pool(1) @@ -84,26 +103,32 @@ def test_replica_inserts_with_keeper_disconnect(started_cluster): disconnect_event.wait(90) node1.query( - "INSERT INTO r SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=20" + "INSERT INTO r2 SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=20" ) node1.query( - "INSERT INTO r SELECT number, toString(number) FROM numbers(10, 10) SETTINGS insert_keeper_max_retries=20" + "INSERT INTO r2 SELECT number, toString(number) FROM numbers(10, 10) SETTINGS insert_keeper_max_retries=20" ) job.wait() p.close() p.join() - assert node1.query("SELECT COUNT() FROM r") == "20\n" + assert node1.query("SELECT COUNT() FROM r2") == "20\n" finally: - node1.query("DROP TABLE IF EXISTS r SYNC") + node1.query("DROP TABLE IF EXISTS r2 SYNC") -def test_query_timeout_with_zk_down(started_cluster): +@pytest.mark.parametrize( + "engine,storage_policy", + [ + ("ReplicatedMergeTree", "default"), + ], +) +def test_query_timeout_with_zk_down(started_cluster, engine, storage_policy): try: node1.query( - "CREATE TABLE zk_down (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/zk_down', '0') ORDER BY tuple()" + f"CREATE TABLE zk_down (a UInt64, b String) ENGINE={engine}('/test/zk_down', '0') ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'" ) cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"]) @@ -118,3 +143,45 @@ def test_query_timeout_with_zk_down(started_cluster): finally: cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"]) node1.query("DROP TABLE IF EXISTS zk_down SYNC") + + +@pytest.mark.parametrize( + "engine,storage_policy", + [ + ("ReplicatedMergeTree", "default"), + ], +) +def test_retries_should_not_wait_for_global_connection( + started_cluster, engine, storage_policy +): + pm = PartitionManager() + try: + node1.query( + f"CREATE TABLE zk_down_retries (a UInt64, b String) ENGINE={engine}('/test/zk_down', '0') ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'" + ) + + cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"]) + # Apart from stopping keepers, we introduce a network delay to make connection retries slower + # We want to check that retries are not blocked during that time + pm.add_network_delay(node1, 1000) + + query_id = uuid.uuid4() + + with pytest.raises(QueryRuntimeException): + node1.query( + "INSERT INTO zk_down_retries SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=10, insert_keeper_retry_max_backoff_ms=100", + query_id=str(query_id), + ) + pm.heal_all() + # Use query_log for execution time since we want to ignore the network delay introduced (also in client) + node1.query("SYSTEM FLUSH LOGS") + res = node1.query( + f"SELECT query_duration_ms FROM system.query_log WHERE type != 'QueryStart' AND query_id = '{query_id}'" + ) + query_duration = int(res) + # It should be around 1 second. 5 seconds is being generous (debug and so on). Used to take 35 seconds without the fix + assert query_duration < 5000 + finally: + pm.heal_all() + cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"]) + node1.query("DROP TABLE IF EXISTS zk_down_retries SYNC") diff --git a/tests/integration/test_scheduler/test.py b/tests/integration/test_scheduler/test.py index cde75c244e8..31cc106a95d 100644 --- a/tests/integration/test_scheduler/test.py +++ b/tests/integration/test_scheduler/test.py @@ -69,6 +69,124 @@ def update_workloads_config(**settings): node.query("system reload config") +def check_profile_event_for_query(workload, profile_event, amount=1): + node.query("system flush logs") + query_pattern = f"workload='{workload}'".replace("'", "\\'") + assert ( + int( + node.query( + f"select ProfileEvents['{profile_event}'] from system.query_log where query ilike '%{query_pattern}%' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1" + ) + ) + == amount + ) + + +def test_s3_resource_request_granularity(): + node.query( + f""" + drop table if exists data; + create table data (key UInt64 CODEC(NONE), value String CODEC(NONE)) engine=MergeTree() order by key settings min_bytes_for_wide_part=1e9, storage_policy='s3'; + """ + ) + + total_bytes = 50000000 # Approximate data size + max_bytes_per_request = 2000000 # Should be ~1MB or less in general + min_bytes_per_request = 6000 # Small requests are ok, but we don't want hurt performance with too often resource requests + + writes_before = int( + node.query( + f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/admin'" + ).strip() + ) + write_bytes_before = int( + node.query( + f"select dequeued_cost from system.scheduler where resource='network_write' and path='/prio/admin'" + ).strip() + ) + write_budget_before = int( + node.query( + f"select budget from system.scheduler where resource='network_write' and path='/prio/admin'" + ).strip() + ) + node.query( + f"insert into data select number, randomString(10000000) from numbers(5) SETTINGS workload='admin'" + ) + writes_after = int( + node.query( + f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/admin'" + ).strip() + ) + write_bytes_after = int( + node.query( + f"select dequeued_cost from system.scheduler where resource='network_write' and path='/prio/admin'" + ).strip() + ) + write_budget_after = int( + node.query( + f"select budget from system.scheduler where resource='network_write' and path='/prio/admin'" + ).strip() + ) + + write_requests = writes_after - writes_before + write_bytes = (write_bytes_after - write_bytes_before) - ( + write_budget_after - write_budget_before + ) + assert write_bytes > 1.0 * total_bytes + assert write_bytes < 1.05 * total_bytes + assert write_bytes / write_requests < max_bytes_per_request + assert write_bytes / write_requests > min_bytes_per_request + check_profile_event_for_query("admin", "SchedulerIOWriteRequests", write_requests) + check_profile_event_for_query("admin", "SchedulerIOWriteBytes", write_bytes) + + node.query(f"optimize table data final") + + reads_before = int( + node.query( + f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/admin'" + ).strip() + ) + read_bytes_before = int( + node.query( + f"select dequeued_cost from system.scheduler where resource='network_read' and path='/prio/admin'" + ).strip() + ) + read_budget_before = int( + node.query( + f"select budget from system.scheduler where resource='network_read' and path='/prio/admin'" + ).strip() + ) + node.query( + f"select count() from data where not ignore(*) SETTINGS workload='admin'" + ) + reads_after = int( + node.query( + f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/admin'" + ).strip() + ) + read_bytes_after = int( + node.query( + f"select dequeued_cost from system.scheduler where resource='network_read' and path='/prio/admin'" + ).strip() + ) + read_budget_after = int( + node.query( + f"select budget from system.scheduler where resource='network_read' and path='/prio/admin'" + ).strip() + ) + + read_bytes = (read_bytes_after - read_bytes_before) - ( + read_budget_after - read_budget_before + ) + read_requests = reads_after - reads_before + assert read_bytes > 1.0 * total_bytes + assert read_bytes < 1.05 * total_bytes + assert read_bytes / read_requests < max_bytes_per_request + assert read_bytes / read_requests > min_bytes_per_request + check_profile_event_for_query("admin", "SchedulerIOReadRequests", read_requests) + check_profile_event_for_query("admin", "SchedulerIOReadBytes", read_bytes) + + def test_s3_disk(): node.query( f""" diff --git a/tests/queries/0_stateless/01056_create_table_as_with_sorting_clauses.reference b/tests/queries/0_stateless/01056_create_table_as_with_sorting_clauses.reference new file mode 100644 index 00000000000..cebb99f005e --- /dev/null +++ b/tests/queries/0_stateless/01056_create_table_as_with_sorting_clauses.reference @@ -0,0 +1,70 @@ +-------------- Test copy sorting clauses from source table -------------- +CREATE TABLE default.x +( + `CounterID` UInt32, + `EventDate` Date, + `UserID` UInt64 +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(EventDate) +ORDER BY (CounterID, EventDate, intHash32(UserID)) +SAMPLE BY intHash32(UserID) +SETTINGS index_granularity = 8192 +------------------------------------------------------------------------- +CREATE TABLE default.x_as +( + `CounterID` UInt32, + `EventDate` Date, + `UserID` UInt64 +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(EventDate) +ORDER BY (CounterID, EventDate, intHash32(UserID)) +SAMPLE BY intHash32(UserID) +SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1, index_granularity = 8192 +-------------- Test copy sorting clauses from destination table (source table without the same type clauses) -------------- +CREATE TABLE default.x +( + `CounterID` UInt32, + `EventDate` Date, + `UserID` UInt64 +) +ENGINE = MergeTree +PRIMARY KEY (CounterID, EventDate, intHash32(UserID)) +ORDER BY (CounterID, EventDate, intHash32(UserID)) +SETTINGS index_granularity = 8192 +------------------------------------------------------------------------- +CREATE TABLE default.x_as +( + `CounterID` UInt32, + `EventDate` Date, + `UserID` UInt64 +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(EventDate) +PRIMARY KEY (CounterID, EventDate, intHash32(UserID)) +ORDER BY (CounterID, EventDate, intHash32(UserID)) +SAMPLE BY intHash32(UserID) +SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1, index_granularity = 8192 +-------------- Test copy sorting clauses from destination table (source table with the same type clauses) -------------- +CREATE TABLE default.x +( + `CounterID` UInt32, + `EventDate` Date, + `UserID` UInt64 +) +ENGINE = MergeTree +ORDER BY CounterID +SETTINGS index_granularity = 8192 +------------------------------------------------------------------------- +CREATE TABLE default.x_as +( + `CounterID` UInt32, + `EventDate` Date, + `UserID` UInt64 +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(EventDate) +ORDER BY (CounterID, EventDate, intHash32(UserID)) +SAMPLE BY intHash32(UserID) +SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1, index_granularity = 8192 diff --git a/tests/queries/0_stateless/01056_create_table_as_with_sorting_clauses.sql b/tests/queries/0_stateless/01056_create_table_as_with_sorting_clauses.sql new file mode 100644 index 00000000000..96c2df54491 --- /dev/null +++ b/tests/queries/0_stateless/01056_create_table_as_with_sorting_clauses.sql @@ -0,0 +1,37 @@ +DROP TABLE IF EXISTS x; +DROP TABLE IF EXISTS x_as; + +SELECT '-------------- Test copy sorting clauses from source table --------------'; +CREATE TABLE x (`CounterID` UInt32, `EventDate` Date, `UserID` UInt64) ENGINE = MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID); +CREATE TABLE x_as AS x ENGINE = MergeTree SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1; + +SHOW CREATE TABLE x FORMAT TSVRaw; +SELECT '-------------------------------------------------------------------------'; +SHOW CREATE TABLE x_as FORMAT TSVRaw; + +DROP TABLE x; +DROP TABLE x_as; + +SELECT '-------------- Test copy sorting clauses from destination table (source table without the same type clauses) --------------'; +CREATE TABLE x (`CounterID` UInt32, `EventDate` Date, `UserID` UInt64) ENGINE = MergeTree PRIMARY KEY (CounterID, EventDate, intHash32(UserID)); +CREATE TABLE x_as AS x ENGINE = MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1; + +SHOW CREATE TABLE x FORMAT TSVRaw; +SELECT '-------------------------------------------------------------------------'; +SHOW CREATE TABLE x_as FORMAT TSVRaw; + +DROP TABLE x; +DROP TABLE x_as; + +SELECT '-------------- Test copy sorting clauses from destination table (source table with the same type clauses) --------------'; +CREATE TABLE x (`CounterID` UInt32, `EventDate` Date, `UserID` UInt64) ENGINE = MergeTree ORDER BY (CounterID); +CREATE TABLE x_as AS x ENGINE = MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1; + +SHOW CREATE TABLE x FORMAT TSVRaw; +SELECT '-------------------------------------------------------------------------'; +SHOW CREATE TABLE x_as FORMAT TSVRaw; + +DROP TABLE x; +DROP TABLE x_as; + + diff --git a/tests/queries/0_stateless/03208_multiple_joins_with_storage_join.reference b/tests/queries/0_stateless/03208_multiple_joins_with_storage_join.reference new file mode 100644 index 00000000000..5ebdaa1c81f --- /dev/null +++ b/tests/queries/0_stateless/03208_multiple_joins_with_storage_join.reference @@ -0,0 +1,23 @@ +----- +1 1 1 a 1 A 1 A +2 2 2 b 2 B 2 B +----- +\N \N \N 0 3 B +1 1 1 a 1 A 1 A +2 2 2 b 2 B 2 B +----- +1 1 1 a 1 A 1 A +2 2 2 b 2 B 2 B +\N \N \N \N 3 B \N \N +\N \N \N \N \N \N 3 B +----- +\N \N \N 3 3 B 0 0 +\N \N \N 0 0 3 3 B +1 1 1 a 1 1 A 1 1 A +2 2 2 b 2 2 B 2 2 B +----- +3 3 \N B B +1 1 1 a A A +2 2 2 b B B +----- +7 diff --git a/tests/queries/0_stateless/03208_multiple_joins_with_storage_join.sql b/tests/queries/0_stateless/03208_multiple_joins_with_storage_join.sql new file mode 100644 index 00000000000..83be4c3f1d2 --- /dev/null +++ b/tests/queries/0_stateless/03208_multiple_joins_with_storage_join.sql @@ -0,0 +1,84 @@ +#!/usr/bin/env -S ${HOME}/clickhouse-client --queries-file + +DROP TABLE IF EXISTS tab; +CREATE TABLE tab ( `k` Nullable(UInt32), `k1` Nullable(UInt32), `k2` Nullable(UInt32), `v` String ) ENGINE = Memory; +INSERT INTO tab VALUES (1, 1, 1, 'a'), (2, 2, 2, 'b'); + +DROP TABLE IF EXISTS mem; +CREATE TABLE mem ( `k` UInt64, `v` String ) ENGINE = Join(ANY, LEFT, k); +INSERT INTO mem VALUES (1, 'A'), (2, 'B'), (3, 'B'); + +DROP TABLE IF EXISTS mem2; +CREATE TABLE mem2 ( `k` UInt64, `v` String ) ENGINE = Join(ANY, RIGHT, k); +INSERT INTO mem2 VALUES (1, 'A'), (2, 'B'), (3, 'B'); + +DROP TABLE IF EXISTS mem3; +CREATE TABLE mem3 ( `k` UInt64, `v` String ) ENGINE = Join(ALL, FULL, k) SETTINGS join_use_nulls = 1; +INSERT INTO mem3 VALUES (1, 'A'), (2, 'B'), (3, 'B'); + +DROP TABLE IF EXISTS mem4; +CREATE TABLE mem4 ( `k1` UInt64, `k2` UInt64, `v` String ) ENGINE = Join(ALL, FULL, k1, k2); +INSERT INTO mem4 VALUES (1, 1, 'A'), (2, 2, 'B'), (3, 3, 'B'); + +SET allow_experimental_analyzer = 1; + +SELECT '-----'; + +SELECT * +FROM tab +ANY LEFT JOIN mem ON k1 = mem.k +ANY LEFT JOIN mem AS t ON k2 = t.k +ORDER BY tab.v +; + +SELECT '-----'; + +SELECT * +FROM tab +ANY LEFT JOIN mem ON k1 = mem.k +ANY RIGHT JOIN mem2 ON k2 = mem2.k +ORDER BY tab.v +; + +SELECT '-----'; + +SELECT * +FROM tab +FULL JOIN mem3 AS t1 ON k1 = t1.k +FULL JOIN mem3 AS t2 ON k2 = t2.k +ORDER BY tab.v +SETTINGS join_use_nulls = 1 +; +SELECT '-----'; + +SELECT * +FROM tab +FULL JOIN mem4 AS t1 ON tab.k1 = t1.k1 AND tab.k2 = t1.k2 +FULL JOIN mem4 AS t2 ON tab.k1 = t2.k1 AND tab.k2 = t2.k2 +ORDER BY tab.v +; + +SELECT '-----'; + +SELECT * +FROM tab +FULL JOIN mem4 AS t1 USING (k1, k2) +FULL JOIN mem4 AS t2 USING (k1, k2) +ORDER BY tab.v +; + +SELECT '-----'; + +SELECT count() FROM ( + EXPLAIN PLAN + SELECT * FROM tab + ANY LEFT JOIN mem AS t1 ON tab.k = t1.k + ANY LEFT JOIN mem AS t2 ON tab.k = t2.k + ANY LEFT JOIN mem AS t3 ON tab.k = t3.k + ANY LEFT JOIN mem AS t4 ON tab.k = t4.k + ANY RIGHT JOIN mem2 AS t5 ON tab.k = t5.k + ANY LEFT JOIN mem AS t6 ON tab.k = t6.k + ANY LEFT JOIN mem AS t7 ON tab.k = t7.k +) +WHERE explain like '%FilledJoin%' +; diff --git a/tests/queries/0_stateless/03230_json_alias_new_old_types.reference b/tests/queries/0_stateless/03230_json_alias_new_old_types.reference index f03e0117618..ad74944e726 100644 --- a/tests/queries/0_stateless/03230_json_alias_new_old_types.reference +++ b/tests/queries/0_stateless/03230_json_alias_new_old_types.reference @@ -1,2 +1,3 @@ {"a":"42"} JSON {"a":42} Object(\'json\') +{"a":"42"} JSON diff --git a/tests/queries/0_stateless/03230_json_alias_new_old_types.sql b/tests/queries/0_stateless/03230_json_alias_new_old_types.sql index 06d4790e0f9..96d4bda171d 100644 --- a/tests/queries/0_stateless/03230_json_alias_new_old_types.sql +++ b/tests/queries/0_stateless/03230_json_alias_new_old_types.sql @@ -6,3 +6,6 @@ set use_json_alias_for_old_object_type=1; select '{"a" : 42}'::JSON as json, toTypeName(json); select '{"a" : 42}'::JSON(max_dynamic_paths=100) as json, toTypeName(json); -- {serverError BAD_ARGUMENTS} +set allow_experimental_object_type = 0; +select materialize('{"a" : 42}')::JSON as json, toTypeName(json); + diff --git a/utils/list-versions/version_date.tsv b/utils/list-versions/version_date.tsv index d9674ed2366..bd3d2931656 100644 --- a/utils/list-versions/version_date.tsv +++ b/utils/list-versions/version_date.tsv @@ -4,10 +4,12 @@ v24.7.4.51-stable 2024-08-23 v24.7.3.42-stable 2024-08-08 v24.7.2.13-stable 2024-08-01 v24.7.1.2915-stable 2024-07-30 +v24.6.5.30-stable 2024-09-03 v24.6.4.42-stable 2024-08-23 v24.6.3.95-stable 2024-08-06 v24.6.2.17-stable 2024-07-05 v24.6.1.4423-stable 2024-07-01 +v24.5.7.31-stable 2024-09-03 v24.5.6.45-stable 2024-08-23 v24.5.5.78-stable 2024-08-05 v24.5.4.49-stable 2024-07-01 @@ -18,6 +20,7 @@ v24.4.4.113-stable 2024-08-02 v24.4.3.25-stable 2024-06-14 v24.4.2.141-stable 2024-06-07 v24.4.1.2088-stable 2024-05-01 +v24.3.10.33-lts 2024-09-03 v24.3.9.5-lts 2024-08-22 v24.3.8.13-lts 2024-08-20 v24.3.7.30-lts 2024-08-14