Merge branch 'master' into pr-local-plan

This commit is contained in:
Igor Nikonov 2024-09-03 13:31:36 +02:00 committed by GitHub
commit d0d2509c69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
46 changed files with 1006 additions and 192 deletions

View File

@ -19,6 +19,8 @@
#include <ios> #include <ios>
#include <memory>
#include <functional>
#include "Poco/Any.h" #include "Poco/Any.h"
#include "Poco/Buffer.h" #include "Poco/Buffer.h"
#include "Poco/Exception.h" #include "Poco/Exception.h"
@ -33,6 +35,27 @@ namespace Net
{ {
class IHTTPSessionDataHooks
/// Interface to control stream of data bytes being sent or received though socket by HTTPSession
/// It allows to monitor, throttle and schedule data streams with syscall granulatrity
{
public:
virtual ~IHTTPSessionDataHooks() = default;
virtual void atStart(int bytes) = 0;
/// Called before sending/receiving data `bytes` to/from socket.
virtual void atFinish(int bytes) = 0;
/// Called when sending/receiving of data `bytes` is successfully finished.
virtual void atFail() = 0;
/// If an error occurred during send/receive `fail()` is called instead of `finish()`.
};
using HTTPSessionDataHooksPtr = std::shared_ptr<IHTTPSessionDataHooks>;
class Net_API HTTPSession class Net_API HTTPSession
/// HTTPSession implements basic HTTP session management /// HTTPSession implements basic HTTP session management
/// for both HTTP clients and HTTP servers. /// for both HTTP clients and HTTP servers.
@ -73,6 +96,12 @@ namespace Net
Poco::Timespan getReceiveTimeout() const; Poco::Timespan getReceiveTimeout() const;
/// Returns receive timeout for the HTTP session. /// Returns receive timeout for the HTTP session.
void setSendDataHooks(const HTTPSessionDataHooksPtr & sendDataHooks = {});
/// Sets data hooks that will be called on every sent to the socket.
void setReceiveDataHooks(const HTTPSessionDataHooksPtr & receiveDataHooks = {});
/// Sets data hooks that will be called on every receive from the socket.
bool connected() const; bool connected() const;
/// Returns true if the underlying socket is connected. /// Returns true if the underlying socket is connected.
@ -211,6 +240,10 @@ namespace Net
Poco::Exception * _pException; Poco::Exception * _pException;
Poco::Any _data; Poco::Any _data;
// Data hooks
HTTPSessionDataHooksPtr _sendDataHooks;
HTTPSessionDataHooksPtr _receiveDataHooks;
friend class HTTPStreamBuf; friend class HTTPStreamBuf;
friend class HTTPHeaderStreamBuf; friend class HTTPHeaderStreamBuf;
friend class HTTPFixedLengthStreamBuf; friend class HTTPFixedLengthStreamBuf;
@ -246,6 +279,16 @@ namespace Net
return _receiveTimeout; return _receiveTimeout;
} }
inline void HTTPSession::setSendDataHooks(const HTTPSessionDataHooksPtr & sendDataHooks)
{
_sendDataHooks = sendDataHooks;
}
inline void HTTPSession::setReceiveDataHooks(const HTTPSessionDataHooksPtr & receiveDataHooks)
{
_receiveDataHooks = receiveDataHooks;
}
inline StreamSocket & HTTPSession::socket() inline StreamSocket & HTTPSession::socket()
{ {
return _socket; return _socket;

View File

@ -128,14 +128,14 @@ int HTTPSession::get()
{ {
if (_pCurrent == _pEnd) if (_pCurrent == _pEnd)
refill(); refill();
if (_pCurrent < _pEnd) if (_pCurrent < _pEnd)
return *_pCurrent++; return *_pCurrent++;
else else
return std::char_traits<char>::eof(); return std::char_traits<char>::eof();
} }
int HTTPSession::peek() int HTTPSession::peek()
{ {
if (_pCurrent == _pEnd) if (_pCurrent == _pEnd)
@ -147,7 +147,7 @@ int HTTPSession::peek()
return std::char_traits<char>::eof(); return std::char_traits<char>::eof();
} }
int HTTPSession::read(char* buffer, std::streamsize length) int HTTPSession::read(char* buffer, std::streamsize length)
{ {
if (_pCurrent < _pEnd) if (_pCurrent < _pEnd)
@ -166,10 +166,17 @@ int HTTPSession::write(const char* buffer, std::streamsize length)
{ {
try try
{ {
return _socket.sendBytes(buffer, (int) length); if (_sendDataHooks)
_sendDataHooks->atStart((int) length);
int result = _socket.sendBytes(buffer, (int) length);
if (_sendDataHooks)
_sendDataHooks->atFinish(result);
return result;
} }
catch (Poco::Exception& exc) catch (Poco::Exception& exc)
{ {
if (_sendDataHooks)
_sendDataHooks->atFail();
setException(exc); setException(exc);
throw; throw;
} }
@ -180,10 +187,17 @@ int HTTPSession::receive(char* buffer, int length)
{ {
try try
{ {
return _socket.receiveBytes(buffer, length); if (_receiveDataHooks)
_receiveDataHooks->atStart(length);
int result = _socket.receiveBytes(buffer, length);
if (_receiveDataHooks)
_receiveDataHooks->atFinish(result);
return result;
} }
catch (Poco::Exception& exc) catch (Poco::Exception& exc)
{ {
if (_receiveDataHooks)
_receiveDataHooks->atFail();
setException(exc); setException(exc);
throw; throw;
} }

View File

@ -63,7 +63,7 @@ bool checkIsBrokenTimeout()
SocketImpl::SocketImpl(): SocketImpl::SocketImpl():
_sockfd(POCO_INVALID_SOCKET), _sockfd(POCO_INVALID_SOCKET),
_blocking(true), _blocking(true),
_isBrokenTimeout(checkIsBrokenTimeout()) _isBrokenTimeout(checkIsBrokenTimeout())
{ {
} }
@ -82,7 +82,7 @@ SocketImpl::~SocketImpl()
close(); close();
} }
SocketImpl* SocketImpl::acceptConnection(SocketAddress& clientAddr) SocketImpl* SocketImpl::acceptConnection(SocketAddress& clientAddr)
{ {
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
@ -118,7 +118,7 @@ void SocketImpl::connect(const SocketAddress& address)
rc = ::connect(_sockfd, address.addr(), address.length()); rc = ::connect(_sockfd, address.addr(), address.length());
} }
while (rc != 0 && lastError() == POCO_EINTR); while (rc != 0 && lastError() == POCO_EINTR);
if (rc != 0) if (rc != 0)
{ {
int err = lastError(); int err = lastError();
error(err, address.toString()); error(err, address.toString());
@ -205,7 +205,7 @@ void SocketImpl::bind6(const SocketAddress& address, bool reuseAddress, bool reu
#if defined(POCO_HAVE_IPv6) #if defined(POCO_HAVE_IPv6)
if (address.family() != SocketAddress::IPv6) if (address.family() != SocketAddress::IPv6)
throw Poco::InvalidArgumentException("SocketAddress must be an IPv6 address"); throw Poco::InvalidArgumentException("SocketAddress must be an IPv6 address");
if (_sockfd == POCO_INVALID_SOCKET) if (_sockfd == POCO_INVALID_SOCKET)
{ {
init(address.af()); init(address.af());
@ -226,11 +226,11 @@ void SocketImpl::bind6(const SocketAddress& address, bool reuseAddress, bool reu
#endif #endif
} }
void SocketImpl::listen(int backlog) void SocketImpl::listen(int backlog)
{ {
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
int rc = ::listen(_sockfd, backlog); int rc = ::listen(_sockfd, backlog);
if (rc != 0) error(); if (rc != 0) error();
} }
@ -254,7 +254,7 @@ void SocketImpl::shutdownReceive()
if (rc != 0) error(); if (rc != 0) error();
} }
void SocketImpl::shutdownSend() void SocketImpl::shutdownSend()
{ {
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
@ -263,7 +263,7 @@ void SocketImpl::shutdownSend()
if (rc != 0) error(); if (rc != 0) error();
} }
void SocketImpl::shutdown() void SocketImpl::shutdown()
{ {
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
@ -318,7 +318,7 @@ int SocketImpl::receiveBytes(void* buffer, int length, int flags)
throw TimeoutException(); throw TimeoutException();
} }
} }
int rc; int rc;
do do
{ {
@ -326,7 +326,7 @@ int SocketImpl::receiveBytes(void* buffer, int length, int flags)
rc = ::recv(_sockfd, reinterpret_cast<char*>(buffer), length, flags); rc = ::recv(_sockfd, reinterpret_cast<char*>(buffer), length, flags);
} }
while (blocking && rc < 0 && lastError() == POCO_EINTR); while (blocking && rc < 0 && lastError() == POCO_EINTR);
if (rc < 0) if (rc < 0)
{ {
int err = lastError(); int err = lastError();
if ((err == POCO_EAGAIN || err == POCO_EWOULDBLOCK) && !blocking) if ((err == POCO_EAGAIN || err == POCO_EWOULDBLOCK) && !blocking)
@ -364,7 +364,7 @@ int SocketImpl::receiveFrom(void* buffer, int length, SocketAddress& address, in
throw TimeoutException(); throw TimeoutException();
} }
} }
sockaddr_storage abuffer; sockaddr_storage abuffer;
struct sockaddr* pSA = reinterpret_cast<struct sockaddr*>(&abuffer); struct sockaddr* pSA = reinterpret_cast<struct sockaddr*>(&abuffer);
poco_socklen_t saLen = sizeof(abuffer); poco_socklen_t saLen = sizeof(abuffer);
@ -451,7 +451,7 @@ bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode)
} }
while (rc < 0 && lastError() == POCO_EINTR); while (rc < 0 && lastError() == POCO_EINTR);
if (rc < 0) error(); if (rc < 0) error();
return rc > 0; return rc > 0;
#else #else
@ -494,7 +494,7 @@ bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode)
} }
while (rc < 0 && errorCode == POCO_EINTR); while (rc < 0 && errorCode == POCO_EINTR);
if (rc < 0) error(errorCode); if (rc < 0) error(errorCode);
return rc > 0; return rc > 0;
#endif // POCO_HAVE_FD_POLL #endif // POCO_HAVE_FD_POLL
} }
@ -504,13 +504,13 @@ bool SocketImpl::poll(const Poco::Timespan& timeout, int mode)
Poco::Timespan remainingTime(timeout); Poco::Timespan remainingTime(timeout);
return pollImpl(remainingTime, mode); return pollImpl(remainingTime, mode);
} }
void SocketImpl::setSendBufferSize(int size) void SocketImpl::setSendBufferSize(int size)
{ {
setOption(SOL_SOCKET, SO_SNDBUF, size); setOption(SOL_SOCKET, SO_SNDBUF, size);
} }
int SocketImpl::getSendBufferSize() int SocketImpl::getSendBufferSize()
{ {
int result; int result;
@ -524,7 +524,7 @@ void SocketImpl::setReceiveBufferSize(int size)
setOption(SOL_SOCKET, SO_RCVBUF, size); setOption(SOL_SOCKET, SO_RCVBUF, size);
} }
int SocketImpl::getReceiveBufferSize() int SocketImpl::getReceiveBufferSize()
{ {
int result; int result;
@ -570,7 +570,7 @@ Poco::Timespan SocketImpl::getReceiveTimeout()
return result; return result;
} }
SocketAddress SocketImpl::address() SocketAddress SocketImpl::address()
{ {
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException(); if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
@ -581,7 +581,7 @@ SocketAddress SocketImpl::address()
int rc = ::getsockname(_sockfd, pSA, &saLen); int rc = ::getsockname(_sockfd, pSA, &saLen);
if (rc == 0) if (rc == 0)
return SocketAddress(pSA, saLen); return SocketAddress(pSA, saLen);
else else
error(); error();
return SocketAddress(); return SocketAddress();
} }

View File

@ -0,0 +1,32 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v24.3.10.33-lts (37b6502ebf0) FIXME as compared to v24.3.9.5-lts (a939270465e)
#### Improvement
* Backported in [#68870](https://github.com/ClickHouse/ClickHouse/issues/68870): Make allow_experimental_analyzer be controlled by the initiator for distributed queries. This ensures compatibility and correctness during operations in mixed version clusters. [#65777](https://github.com/ClickHouse/ClickHouse/pull/65777) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov)).
* Backported in [#69095](https://github.com/ClickHouse/ClickHouse/issues/69095): Support for the Spanish language in the embedded dictionaries. [#69035](https://github.com/ClickHouse/ClickHouse/pull/69035) ([Vasily Okunev](https://github.com/VOkunev)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Backported in [#68995](https://github.com/ClickHouse/ClickHouse/issues/68995): Fix the upper bound of the function `fromModifiedJulianDay`. It was supposed to be `9999-12-31` but was mistakenly set to `9999-01-01`. [#67583](https://github.com/ClickHouse/ClickHouse/pull/67583) ([PHO](https://github.com/depressed-pho)).
* Backported in [#68844](https://github.com/ClickHouse/ClickHouse/issues/68844): Fixed crash in Parquet filtering when data types in the file substantially differ from requested types (e.g. `... FROM file('a.parquet', Parquet, 'x String')`, but the file has `x Int64`). Without this fix, use `input_format_parquet_filter_push_down = 0` as a workaround. [#68131](https://github.com/ClickHouse/ClickHouse/pull/68131) ([Michael Kolupaev](https://github.com/al13n321)).
* Backported in [#68881](https://github.com/ClickHouse/ClickHouse/issues/68881): Fixes [#50868](https://github.com/ClickHouse/ClickHouse/issues/50868). Small DateTime64 constant values returned by a nested subquery inside a distributed query were wrongly transformed to Nulls, thus causing errors and possible incorrect query results. [#68323](https://github.com/ClickHouse/ClickHouse/pull/68323) ([Shankar](https://github.com/shiyer7474)).
* Backported in [#69054](https://github.com/ClickHouse/ClickHouse/issues/69054): Added back virtual columns ` _table` and `_database` to distributed tables. They were available until version 24.3. [#68672](https://github.com/ClickHouse/ClickHouse/pull/68672) ([Anton Popov](https://github.com/CurtizJ)).
* Backported in [#68856](https://github.com/ClickHouse/ClickHouse/issues/68856): Fix possible error `Size of permutation (0) is less than required (...)` during Variant column permutation. [#68681](https://github.com/ClickHouse/ClickHouse/pull/68681) ([Kruglov Pavel](https://github.com/Avogar)).
* Backported in [#69152](https://github.com/ClickHouse/ClickHouse/issues/69152): Fix possible wrong result during anyHeavy state merge. [#68950](https://github.com/ClickHouse/ClickHouse/pull/68950) ([Raúl Marín](https://github.com/Algunenano)).
* Backported in [#69112](https://github.com/ClickHouse/ClickHouse/issues/69112): Fix logical error when we have empty async insert. [#69080](https://github.com/ClickHouse/ClickHouse/pull/69080) ([Han Fei](https://github.com/hanfei1991)).
#### NO CL CATEGORY
* Backported in [#68938](https://github.com/ClickHouse/ClickHouse/issues/68938):. [#68897](https://github.com/ClickHouse/ClickHouse/pull/68897) ([Alexander Gololobov](https://github.com/davenger)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Backported in [#68826](https://github.com/ClickHouse/ClickHouse/issues/68826): Turn off fault injection for insert in `01396_inactive_replica_cleanup_nodes_zookeeper`. [#68715](https://github.com/ClickHouse/ClickHouse/pull/68715) ([alesapin](https://github.com/alesapin)).
* Backported in [#68754](https://github.com/ClickHouse/ClickHouse/issues/68754): To make patch release possible from every commit on release branch, package_debug build is required and must not be skipped. [#68750](https://github.com/ClickHouse/ClickHouse/pull/68750) ([Max K.](https://github.com/maxknv)).
* Backported in [#69044](https://github.com/ClickHouse/ClickHouse/issues/69044): Fix 01114_database_atomic flakiness. [#68930](https://github.com/ClickHouse/ClickHouse/pull/68930) ([Raúl Marín](https://github.com/Algunenano)).

View File

@ -0,0 +1,29 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v24.5.7.31-stable (6c185e9aec1) FIXME as compared to v24.5.6.45-stable (bdca8604c29)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Backported in [#68564](https://github.com/ClickHouse/ClickHouse/issues/68564): Fix indexHint function case found by fuzzer. [#66286](https://github.com/ClickHouse/ClickHouse/pull/66286) ([Anton Popov](https://github.com/CurtizJ)).
* Backported in [#68996](https://github.com/ClickHouse/ClickHouse/issues/68996): Fix the upper bound of the function `fromModifiedJulianDay`. It was supposed to be `9999-12-31` but was mistakenly set to `9999-01-01`. [#67583](https://github.com/ClickHouse/ClickHouse/pull/67583) ([PHO](https://github.com/depressed-pho)).
* Backported in [#68865](https://github.com/ClickHouse/ClickHouse/issues/68865): Fixed crash in Parquet filtering when data types in the file substantially differ from requested types (e.g. `... FROM file('a.parquet', Parquet, 'x String')`, but the file has `x Int64`). Without this fix, use `input_format_parquet_filter_push_down = 0` as a workaround. [#68131](https://github.com/ClickHouse/ClickHouse/pull/68131) ([Michael Kolupaev](https://github.com/al13n321)).
* Backported in [#69004](https://github.com/ClickHouse/ClickHouse/issues/69004): After https://github.com/ClickHouse/ClickHouse/pull/61984 `schema_inference_make_columns_nullable=0` still can make columns `Nullable` in Parquet/Arrow formats. The change was backward incompatible and users noticed the changes in the behaviour. This PR makes `schema_inference_make_columns_nullable=0` to work as before (no Nullable columns will be inferred) and introduces new value `auto` for this setting that will make columns `Nullable` only if data has information about nullability. [#68298](https://github.com/ClickHouse/ClickHouse/pull/68298) ([Kruglov Pavel](https://github.com/Avogar)).
* Backported in [#68882](https://github.com/ClickHouse/ClickHouse/issues/68882): Fixes [#50868](https://github.com/ClickHouse/ClickHouse/issues/50868). Small DateTime64 constant values returned by a nested subquery inside a distributed query were wrongly transformed to Nulls, thus causing errors and possible incorrect query results. [#68323](https://github.com/ClickHouse/ClickHouse/pull/68323) ([Shankar](https://github.com/shiyer7474)).
* Backported in [#69023](https://github.com/ClickHouse/ClickHouse/issues/69023): Added back virtual columns ` _table` and `_database` to distributed tables. They were available until version 24.3. [#68672](https://github.com/ClickHouse/ClickHouse/pull/68672) ([Anton Popov](https://github.com/CurtizJ)).
* Backported in [#68858](https://github.com/ClickHouse/ClickHouse/issues/68858): Fix possible error `Size of permutation (0) is less than required (...)` during Variant column permutation. [#68681](https://github.com/ClickHouse/ClickHouse/pull/68681) ([Kruglov Pavel](https://github.com/Avogar)).
* Backported in [#68784](https://github.com/ClickHouse/ClickHouse/issues/68784): Fix issue with materialized constant keys when hashing maps with arrays as keys in functions `sipHash(64/128)Keyed`. [#68731](https://github.com/ClickHouse/ClickHouse/pull/68731) ([Salvatore Mesoraca](https://github.com/aiven-sal)).
* Backported in [#69154](https://github.com/ClickHouse/ClickHouse/issues/69154): Fix possible wrong result during anyHeavy state merge. [#68950](https://github.com/ClickHouse/ClickHouse/pull/68950) ([Raúl Marín](https://github.com/Algunenano)).
#### NO CL CATEGORY
* Backported in [#68940](https://github.com/ClickHouse/ClickHouse/issues/68940):. [#68897](https://github.com/ClickHouse/ClickHouse/pull/68897) ([Alexander Gololobov](https://github.com/davenger)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Backported in [#68828](https://github.com/ClickHouse/ClickHouse/issues/68828): Turn off fault injection for insert in `01396_inactive_replica_cleanup_nodes_zookeeper`. [#68715](https://github.com/ClickHouse/ClickHouse/pull/68715) ([alesapin](https://github.com/alesapin)).
* Backported in [#69046](https://github.com/ClickHouse/ClickHouse/issues/69046): Fix 01114_database_atomic flakiness. [#68930](https://github.com/ClickHouse/ClickHouse/pull/68930) ([Raúl Marín](https://github.com/Algunenano)).

View File

@ -0,0 +1,29 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v24.6.5.30-stable (e6e196c92d6) FIXME as compared to v24.6.4.42-stable (c534bb4b4dd)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Backported in [#68969](https://github.com/ClickHouse/ClickHouse/issues/68969): Fix the upper bound of the function `fromModifiedJulianDay`. It was supposed to be `9999-12-31` but was mistakenly set to `9999-01-01`. [#67583](https://github.com/ClickHouse/ClickHouse/pull/67583) ([PHO](https://github.com/depressed-pho)).
* Backported in [#68814](https://github.com/ClickHouse/ClickHouse/issues/68814): Fixed crash in Parquet filtering when data types in the file substantially differ from requested types (e.g. `... FROM file('a.parquet', Parquet, 'x String')`, but the file has `x Int64`). Without this fix, use `input_format_parquet_filter_push_down = 0` as a workaround. [#68131](https://github.com/ClickHouse/ClickHouse/pull/68131) ([Michael Kolupaev](https://github.com/al13n321)).
* Backported in [#69005](https://github.com/ClickHouse/ClickHouse/issues/69005): After https://github.com/ClickHouse/ClickHouse/pull/61984 `schema_inference_make_columns_nullable=0` still can make columns `Nullable` in Parquet/Arrow formats. The change was backward incompatible and users noticed the changes in the behaviour. This PR makes `schema_inference_make_columns_nullable=0` to work as before (no Nullable columns will be inferred) and introduces new value `auto` for this setting that will make columns `Nullable` only if data has information about nullability. [#68298](https://github.com/ClickHouse/ClickHouse/pull/68298) ([Kruglov Pavel](https://github.com/Avogar)).
* Backported in [#68883](https://github.com/ClickHouse/ClickHouse/issues/68883): Fixes [#50868](https://github.com/ClickHouse/ClickHouse/issues/50868). Small DateTime64 constant values returned by a nested subquery inside a distributed query were wrongly transformed to Nulls, thus causing errors and possible incorrect query results. [#68323](https://github.com/ClickHouse/ClickHouse/pull/68323) ([Shankar](https://github.com/shiyer7474)).
* Backported in [#69025](https://github.com/ClickHouse/ClickHouse/issues/69025): Added back virtual columns ` _table` and `_database` to distributed tables. They were available until version 24.3. [#68672](https://github.com/ClickHouse/ClickHouse/pull/68672) ([Anton Popov](https://github.com/CurtizJ)).
* Backported in [#68860](https://github.com/ClickHouse/ClickHouse/issues/68860): Fix possible error `Size of permutation (0) is less than required (...)` during Variant column permutation. [#68681](https://github.com/ClickHouse/ClickHouse/pull/68681) ([Kruglov Pavel](https://github.com/Avogar)).
* Backported in [#68786](https://github.com/ClickHouse/ClickHouse/issues/68786): Fix issue with materialized constant keys when hashing maps with arrays as keys in functions `sipHash(64/128)Keyed`. [#68731](https://github.com/ClickHouse/ClickHouse/pull/68731) ([Salvatore Mesoraca](https://github.com/aiven-sal)).
* Backported in [#69156](https://github.com/ClickHouse/ClickHouse/issues/69156): Fix possible wrong result during anyHeavy state merge. [#68950](https://github.com/ClickHouse/ClickHouse/pull/68950) ([Raúl Marín](https://github.com/Algunenano)).
* Backported in [#69116](https://github.com/ClickHouse/ClickHouse/issues/69116): Fix logical error when we have empty async insert. [#69080](https://github.com/ClickHouse/ClickHouse/pull/69080) ([Han Fei](https://github.com/hanfei1991)).
#### NO CL CATEGORY
* Backported in [#68942](https://github.com/ClickHouse/ClickHouse/issues/68942):. [#68897](https://github.com/ClickHouse/ClickHouse/pull/68897) ([Alexander Gololobov](https://github.com/davenger)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Backported in [#68830](https://github.com/ClickHouse/ClickHouse/issues/68830): Turn off fault injection for insert in `01396_inactive_replica_cleanup_nodes_zookeeper`. [#68715](https://github.com/ClickHouse/ClickHouse/pull/68715) ([alesapin](https://github.com/alesapin)).
* Backported in [#69048](https://github.com/ClickHouse/ClickHouse/issues/69048): Fix 01114_database_atomic flakiness. [#68930](https://github.com/ClickHouse/ClickHouse/pull/68930) ([Raúl Marín](https://github.com/Algunenano)).

View File

@ -292,6 +292,9 @@
M(DistrCacheWriteRequests, "Number of executed Write requests to Distributed Cache") \ M(DistrCacheWriteRequests, "Number of executed Write requests to Distributed Cache") \
M(DistrCacheServerConnections, "Number of open connections to ClickHouse server from Distributed Cache") \ M(DistrCacheServerConnections, "Number of open connections to ClickHouse server from Distributed Cache") \
\ \
M(SchedulerIOReadScheduled, "Number of IO reads are being scheduled currently") \
M(SchedulerIOWriteScheduled, "Number of IO writes are being scheduled currently") \
\
M(StorageConnectionsStored, "Total count of sessions stored in the session pool for storages") \ M(StorageConnectionsStored, "Total count of sessions stored in the session pool for storages") \
M(StorageConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for storages") \ M(StorageConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for storages") \
\ \

View File

@ -113,6 +113,56 @@ std::string_view CurrentThread::getQueryId()
return current_thread->getQueryId(); return current_thread->getQueryId();
} }
void CurrentThread::attachReadResource(ResourceLink link)
{
if (unlikely(!current_thread))
return;
if (current_thread->read_resource_link)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to read resource", std::to_string(getThreadId()));
current_thread->read_resource_link = link;
}
void CurrentThread::detachReadResource()
{
if (unlikely(!current_thread))
return;
if (!current_thread->read_resource_link)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to read resource", std::to_string(getThreadId()));
current_thread->read_resource_link.reset();
}
ResourceLink CurrentThread::getReadResourceLink()
{
if (unlikely(!current_thread))
return {};
return current_thread->read_resource_link;
}
void CurrentThread::attachWriteResource(ResourceLink link)
{
if (unlikely(!current_thread))
return;
if (current_thread->write_resource_link)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to write resource", std::to_string(getThreadId()));
current_thread->write_resource_link = link;
}
void CurrentThread::detachWriteResource()
{
if (unlikely(!current_thread))
return;
if (!current_thread->write_resource_link)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to write resource", std::to_string(getThreadId()));
current_thread->write_resource_link.reset();
}
ResourceLink CurrentThread::getWriteResourceLink()
{
if (unlikely(!current_thread))
return {};
return current_thread->write_resource_link;
}
MemoryTracker * CurrentThread::getUserMemoryTracker() MemoryTracker * CurrentThread::getUserMemoryTracker()
{ {
if (unlikely(!current_thread)) if (unlikely(!current_thread))

View File

@ -2,6 +2,7 @@
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
#include <Common/ThreadStatus.h> #include <Common/ThreadStatus.h>
#include <Common/Scheduler/ResourceLink.h>
#include <memory> #include <memory>
#include <string> #include <string>
@ -23,7 +24,6 @@ class QueryStatus;
struct Progress; struct Progress;
class InternalTextLogsQueue; class InternalTextLogsQueue;
/** Collection of static methods to work with thread-local objects. /** Collection of static methods to work with thread-local objects.
* Allows to attach and detach query/process (thread group) to a thread * Allows to attach and detach query/process (thread group) to a thread
* (to calculate query-related metrics and to allow to obtain query-related data from a thread). * (to calculate query-related metrics and to allow to obtain query-related data from a thread).
@ -92,6 +92,14 @@ public:
static std::string_view getQueryId(); static std::string_view getQueryId();
// For IO Scheduling
static void attachReadResource(ResourceLink link);
static void detachReadResource();
static ResourceLink getReadResourceLink();
static void attachWriteResource(ResourceLink link);
static void detachWriteResource();
static ResourceLink getWriteResourceLink();
/// Initializes query with current thread as master thread in constructor, and detaches it in destructor /// Initializes query with current thread as master thread in constructor, and detaches it in destructor
struct QueryScope : private boost::noncopyable struct QueryScope : private boost::noncopyable
{ {
@ -102,6 +110,39 @@ public:
void logPeakMemoryUsage(); void logPeakMemoryUsage();
bool log_peak_memory_usage_in_destructor = true; bool log_peak_memory_usage_in_destructor = true;
}; };
/// Scoped attach/detach of IO resource links
struct IOScope : private boost::noncopyable
{
explicit IOScope(ResourceLink read_resource_link, ResourceLink write_resource_link)
{
if (read_resource_link)
{
attachReadResource(read_resource_link);
read_attached = true;
}
if (write_resource_link)
{
attachWriteResource(write_resource_link);
write_attached = true;
}
}
explicit IOScope(const IOSchedulingSettings & settings)
: IOScope(settings.read_resource_link, settings.write_resource_link)
{}
~IOScope()
{
if (read_attached)
detachReadResource();
if (write_attached)
detachWriteResource();
}
bool read_attached = false;
bool write_attached = false;
};
}; };
} }

View File

@ -2,6 +2,7 @@
#include <Common/HostResolvePool.h> #include <Common/HostResolvePool.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/Exception.h> #include <Common/Exception.h>
@ -9,6 +10,7 @@
#include <Common/ProxyConfiguration.h> #include <Common/ProxyConfiguration.h>
#include <Common/MemoryTrackerSwitcher.h> #include <Common/MemoryTrackerSwitcher.h>
#include <Common/SipHash.h> #include <Common/SipHash.h>
#include <Common/Scheduler/ResourceGuard.h>
#include <Common/proxyConfigurationToPocoProxyConfig.h> #include <Common/proxyConfigurationToPocoProxyConfig.h>
#include <Poco/Net/HTTPChunkedStream.h> #include <Poco/Net/HTTPChunkedStream.h>
@ -236,6 +238,59 @@ public:
}; };
// Session data hooks implementation for integration with resource scheduler.
// Hooks are created per every request-response pair and are registered/unregistered in HTTP session.
// * `atStart()` send resource request to the scheduler every time HTTP session is going to send or receive
// data to/from socket. `start()` waits for the scheduler confirmation. This way scheduler might
// throttle and/or schedule socket data streams.
// * `atFinish()` hook is called on successful socket read/write operation.
// It informs the scheduler that operation is complete, which allows the scheduler to control the total
// amount of in-flight bytes and/or operations.
// * `atFail()` hook is called on failure of socket operation. The purpose is to correct the amount of bytes
// passed through the scheduler queue to ensure fair bandwidth allocation even in presence of errors.
struct ResourceGuardSessionDataHooks : public Poco::Net::IHTTPSessionDataHooks
{
ResourceGuardSessionDataHooks(ResourceLink link_, const ResourceGuard::Metrics * metrics, LoggerPtr log_, const String & method, const String & uri)
: link(link_)
, log(log_)
, http_request(method + " " + uri)
{
request.metrics = metrics;
chassert(link);
}
~ResourceGuardSessionDataHooks() override
{
request.assertFinished(); // Never destruct with an active request
}
void atStart(int bytes) override
{
Stopwatch timer;
request.enqueue(bytes, link);
request.wait();
timer.stop();
if (timer.elapsedMilliseconds() >= 5000)
LOG_INFO(log, "Resource request took too long to finish: {} ms for {}", timer.elapsedMilliseconds(), http_request);
}
void atFinish(int bytes) override
{
request.finish(bytes, link);
}
void atFail() override
{
request.finish(0, link);
}
ResourceLink link;
ResourceGuard::Request request;
LoggerPtr log;
String http_request;
};
// EndpointConnectionPool manage connections to the endpoint // EndpointConnectionPool manage connections to the endpoint
// Features: // Features:
// - it uses HostResolver for address selecting. See Common/HostResolver.h for more info. // - it uses HostResolver for address selecting. See Common/HostResolver.h for more info.
@ -246,8 +301,6 @@ public:
// - `Session::reconnect()` uses the pool as well // - `Session::reconnect()` uses the pool as well
// - comprehensive sensors // - comprehensive sensors
// - session is reused according its inner state, automatically // - session is reused according its inner state, automatically
template <class Session> template <class Session>
class EndpointConnectionPool : public std::enable_shared_from_this<EndpointConnectionPool<Session>>, public IExtendedPool class EndpointConnectionPool : public std::enable_shared_from_this<EndpointConnectionPool<Session>>, public IExtendedPool
{ {
@ -337,6 +390,13 @@ private:
std::ostream & sendRequest(Poco::Net::HTTPRequest & request) override std::ostream & sendRequest(Poco::Net::HTTPRequest & request) override
{ {
auto idle = idleTime(); auto idle = idleTime();
// Set data hooks for IO scheduling
if (ResourceLink link = CurrentThread::getReadResourceLink())
Session::setReceiveDataHooks(std::make_shared<ResourceGuardSessionDataHooks>(link, ResourceGuard::Metrics::getIORead(), log, request.getMethod(), request.getURI()));
if (ResourceLink link = CurrentThread::getWriteResourceLink())
Session::setSendDataHooks(std::make_shared<ResourceGuardSessionDataHooks>(link, ResourceGuard::Metrics::getIOWrite(), log, request.getMethod(), request.getURI()));
std::ostream & result = Session::sendRequest(request); std::ostream & result = Session::sendRequest(request);
result.exceptions(std::ios::badbit); result.exceptions(std::ios::badbit);
@ -393,6 +453,8 @@ private:
} }
} }
response_stream = nullptr; response_stream = nullptr;
Session::setSendDataHooks();
Session::setReceiveDataHooks();
group->atConnectionDestroy(); group->atConnectionDestroy();

View File

@ -120,6 +120,13 @@
M(PartsWithAppliedMutationsOnFly, "Total number of parts for which there was any mutation applied on fly") \ M(PartsWithAppliedMutationsOnFly, "Total number of parts for which there was any mutation applied on fly") \
M(MutationsAppliedOnFlyInAllParts, "The sum of number of applied mutations on-fly for part among all read parts") \ M(MutationsAppliedOnFlyInAllParts, "The sum of number of applied mutations on-fly for part among all read parts") \
\ \
M(SchedulerIOReadRequests, "Resource requests passed through scheduler for IO reads.") \
M(SchedulerIOReadBytes, "Bytes passed through scheduler for IO reads.") \
M(SchedulerIOReadWaitMicroseconds, "Total time a query was waiting on resource requests for IO reads.") \
M(SchedulerIOWriteRequests, "Resource requests passed through scheduler for IO writes.") \
M(SchedulerIOWriteBytes, "Bytes passed through scheduler for IO writes.") \
M(SchedulerIOWriteWaitMicroseconds, "Total time a query was waiting on resource requests for IO writes.") \
\
M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \ M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \
\ \
M(ReplicatedPartFetches, "Number of times a data part was downloaded from replica of a ReplicatedMergeTree table.") \ M(ReplicatedPartFetches, "Number of times a data part was downloaded from replica of a ReplicatedMergeTree table.") \

View File

@ -22,10 +22,13 @@ public:
{} {}
// Wrapper for `enqueueRequest()` that should be used to account for available resource budget // Wrapper for `enqueueRequest()` that should be used to account for available resource budget
void enqueueRequestUsingBudget(ResourceRequest * request) // Returns `estimated_cost` that should be passed later to `adjustBudget()`
[[ nodiscard ]] ResourceCost enqueueRequestUsingBudget(ResourceRequest * request)
{ {
request->cost = budget.ask(request->cost); ResourceCost estimated_cost = request->cost;
request->cost = budget.ask(estimated_cost);
enqueueRequest(request); enqueueRequest(request);
return estimated_cost;
} }
// Should be called to account for difference between real and estimated costs // Should be called to account for difference between real and estimated costs
@ -34,18 +37,6 @@ public:
budget.adjust(estimated_cost, real_cost); budget.adjust(estimated_cost, real_cost);
} }
// Adjust budget to account for extra consumption of `cost` resource units
void consumeBudget(ResourceCost cost)
{
adjustBudget(0, cost);
}
// Adjust budget to account for requested, but not consumed `cost` resource units
void accumulateBudget(ResourceCost cost)
{
adjustBudget(cost, 0);
}
/// Enqueue new request to be executed using underlying resource. /// Enqueue new request to be executed using underlying resource.
/// Should be called outside of scheduling subsystem, implementation must be thread-safe. /// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void enqueueRequest(ResourceRequest * request) = 0; virtual void enqueueRequest(ResourceRequest * request) = 0;

View File

@ -232,12 +232,13 @@ struct ResourceTestManager : public ResourceTestBase
ResourceTestManager & t; ResourceTestManager & t;
Guard(ResourceTestManager & t_, ResourceLink link_, ResourceCost cost) Guard(ResourceTestManager & t_, ResourceLink link_, ResourceCost cost)
: ResourceGuard(link_, cost, PostponeLocking) : ResourceGuard(ResourceGuard::Metrics::getIOWrite(), link_, cost, Lock::Defer)
, t(t_) , t(t_)
{ {
t.onEnqueue(link); t.onEnqueue(link);
lock(); lock();
t.onExecute(link); t.onExecute(link);
consume(cost);
} }
}; };
@ -310,8 +311,9 @@ struct ResourceTestManager : public ResourceTestBase
// NOTE: actually leader's request(s) make their own small busy period. // NOTE: actually leader's request(s) make their own small busy period.
void blockResource(ResourceLink link) void blockResource(ResourceLink link)
{ {
ResourceGuard g(link, 1, ResourceGuard::PostponeLocking); ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, 1, ResourceGuard::Lock::Defer);
g.lock(); g.lock();
g.consume(1);
// NOTE: at this point we assume resource to be blocked by single request (<max_requests>1</max_requests>) // NOTE: at this point we assume resource to be blocked by single request (<max_requests>1</max_requests>)
busy_period.arrive_and_wait(); // (1) notify all followers that resource is blocked busy_period.arrive_and_wait(); // (1) notify all followers that resource is blocked
busy_period.arrive_and_wait(); // (2) wait all followers to enqueue their requests busy_period.arrive_and_wait(); // (2) wait all followers to enqueue their requests
@ -320,10 +322,11 @@ struct ResourceTestManager : public ResourceTestBase
{ {
getLinkData(link).left += total_requests + 1; getLinkData(link).left += total_requests + 1;
busy_period.arrive_and_wait(); // (1) wait leader to block resource busy_period.arrive_and_wait(); // (1) wait leader to block resource
ResourceGuard g(link, cost, ResourceGuard::PostponeLocking); ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, cost, ResourceGuard::Lock::Defer);
onEnqueue(link); onEnqueue(link);
busy_period.arrive_and_wait(); // (2) notify leader to unblock busy_period.arrive_and_wait(); // (2) notify leader to unblock
g.lock(); g.lock();
g.consume(cost);
onExecute(link); onExecute(link);
} }
}; };

View File

@ -36,11 +36,16 @@ TEST(SchedulerDynamicResourceManager, Smoke)
for (int i = 0; i < 10; i++) for (int i = 0; i < 10; i++)
{ {
ResourceGuard gA(cA->get("res1"), ResourceGuard::PostponeLocking); ResourceGuard gA(ResourceGuard::Metrics::getIOWrite(), cA->get("res1"), 1, ResourceGuard::Lock::Defer);
gA.lock(); gA.lock();
gA.consume(1);
gA.unlock(); gA.unlock();
ResourceGuard gB(cB->get("res1")); ResourceGuard gB(ResourceGuard::Metrics::getIOWrite(), cB->get("res1"));
gB.unlock();
ResourceGuard gC(ResourceGuard::Metrics::getIORead(), cB->get("res1"));
gB.consume(2);
} }
} }

View File

@ -1,11 +1,13 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <Common/Scheduler/SchedulerRoot.h>
#include <Common/Scheduler/Nodes/tests/ResourceTest.h> #include <Common/Scheduler/Nodes/tests/ResourceTest.h>
#include <Common/Scheduler/SchedulerRoot.h>
#include <Common/randomSeed.h>
#include <barrier> #include <barrier>
#include <future> #include <future>
#include <pcg_random.hpp>
using namespace DB; using namespace DB;
@ -22,6 +24,17 @@ struct ResourceTest : public ResourceTestBase
{ {
scheduler.stop(true); scheduler.stop(true);
} }
std::mutex rng_mutex;
pcg64 rng{randomSeed()};
template <typename T>
T randomInt(T from, T to)
{
std::uniform_int_distribution<T> distribution(from, to);
std::lock_guard lock(rng_mutex);
return distribution(rng);
}
}; };
struct ResourceHolder struct ResourceHolder
@ -109,26 +122,55 @@ TEST(SchedulerRoot, Smoke)
r2.registerResource(); r2.registerResource();
{ {
ResourceGuard rg(a); ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), a);
EXPECT_TRUE(fc1->requests.contains(&rg.request)); EXPECT_TRUE(fc1->requests.contains(&rg.request));
rg.consume(1);
} }
{ {
ResourceGuard rg(b); ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), b);
EXPECT_TRUE(fc1->requests.contains(&rg.request)); EXPECT_TRUE(fc1->requests.contains(&rg.request));
rg.consume(1);
} }
{ {
ResourceGuard rg(c); ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), c);
EXPECT_TRUE(fc2->requests.contains(&rg.request)); EXPECT_TRUE(fc2->requests.contains(&rg.request));
rg.consume(1);
} }
{ {
ResourceGuard rg(d); ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), d);
EXPECT_TRUE(fc2->requests.contains(&rg.request)); EXPECT_TRUE(fc2->requests.contains(&rg.request));
rg.consume(1);
} }
} }
TEST(SchedulerRoot, Budget)
{
ResourceTest t;
ResourceHolder r1(t);
r1.add<ConstraintTest>("/", "<max_requests>1</max_requests>");
r1.add<PriorityPolicy>("/prio");
auto a = r1.addQueue("/prio/A", "");
r1.registerResource();
ResourceCost total_real_cost = 0;
int total_requests = 10;
for (int i = 0 ; i < total_requests; i++)
{
ResourceCost est_cost = t.randomInt(1, 10);
ResourceCost real_cost = t.randomInt(0, 10);
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), a, est_cost);
rg.consume(real_cost);
total_real_cost += real_cost;
}
EXPECT_EQ(total_requests, a.queue->dequeued_requests);
EXPECT_EQ(total_real_cost, a.queue->dequeued_cost - a.queue->getBudget());
}
TEST(SchedulerRoot, Cancel) TEST(SchedulerRoot, Cancel)
{ {
ResourceTest t; ResourceTest t;

View File

@ -1,25 +0,0 @@
#include <Common/Scheduler/ISchedulerQueue.h>
#include <Common/Scheduler/ResourceLink.h>
#include <Common/Scheduler/ResourceRequest.h>
namespace DB
{
void ResourceLink::adjust(ResourceCost estimated_cost, ResourceCost real_cost) const
{
if (queue)
queue->adjustBudget(estimated_cost, real_cost);
}
void ResourceLink::consumed(ResourceCost cost) const
{
if (queue)
queue->consumeBudget(cost);
}
void ResourceLink::accumulate(DB::ResourceCost cost) const
{
if (queue)
queue->accumulateBudget(cost);
}
}

View File

@ -7,10 +7,30 @@
#include <Common/Scheduler/ResourceRequest.h> #include <Common/Scheduler/ResourceRequest.h>
#include <Common/Scheduler/ResourceLink.h> #include <Common/Scheduler/ResourceLink.h>
#include <Common/CurrentThread.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <condition_variable> #include <condition_variable>
#include <mutex> #include <mutex>
namespace ProfileEvents
{
extern const Event SchedulerIOReadRequests;
extern const Event SchedulerIOReadBytes;
extern const Event SchedulerIOReadWaitMicroseconds;
extern const Event SchedulerIOWriteRequests;
extern const Event SchedulerIOWriteBytes;
extern const Event SchedulerIOWriteWaitMicroseconds;
}
namespace CurrentMetrics
{
extern const Metric SchedulerIOReadScheduled;
extern const Metric SchedulerIOWriteScheduled;
}
namespace DB namespace DB
{ {
@ -22,12 +42,42 @@ namespace DB
class ResourceGuard class ResourceGuard
{ {
public: public:
enum ResourceGuardCtor enum class Lock
{ {
LockStraightAway, /// Locks inside constructor (default) Default, /// Locks inside constructor
// WARNING: Only for tests. It is not exception-safe because `lock()` must be called after construction. // WARNING: Only for tests. It is not exception-safe because `lock()` must be called after construction.
PostponeLocking /// Don't lock in constructor, but send request Defer /// Don't lock in constructor, but send request
};
struct Metrics
{
const ProfileEvents::Event requests = ProfileEvents::end();
const ProfileEvents::Event cost = ProfileEvents::end();
const ProfileEvents::Event wait_microseconds = ProfileEvents::end();
const CurrentMetrics::Metric scheduled_count = CurrentMetrics::end();
static const Metrics * getIORead()
{
static Metrics metrics{
.requests = ProfileEvents::SchedulerIOReadRequests,
.cost = ProfileEvents::SchedulerIOReadBytes,
.wait_microseconds = ProfileEvents::SchedulerIOReadWaitMicroseconds,
.scheduled_count = CurrentMetrics::SchedulerIOReadScheduled
};
return &metrics;
}
static const Metrics * getIOWrite()
{
static Metrics metrics{
.requests = ProfileEvents::SchedulerIOWriteRequests,
.cost = ProfileEvents::SchedulerIOWriteBytes,
.wait_microseconds = ProfileEvents::SchedulerIOWriteWaitMicroseconds,
.scheduled_count = CurrentMetrics::SchedulerIOWriteScheduled
};
return &metrics;
}
}; };
enum RequestState enum RequestState
@ -46,60 +96,74 @@ public:
chassert(state == Finished); chassert(state == Finished);
state = Enqueued; state = Enqueued;
ResourceRequest::reset(cost_); ResourceRequest::reset(cost_);
link_.queue->enqueueRequestUsingBudget(this); estimated_cost = link_.queue->enqueueRequestUsingBudget(this); // NOTE: it modifies `cost` and enqueues request
} }
// This function is executed inside scheduler thread and wakes thread issued this `request`. // This function is executed inside scheduler thread and wakes thread issued this `request`.
// That thread will continue execution and do real consumption of requested resource synchronously. // That thread will continue execution and do real consumption of requested resource synchronously.
void execute() override void execute() override
{ {
{ std::unique_lock lock(mutex);
std::unique_lock lock(mutex); chassert(state == Enqueued);
chassert(state == Enqueued); state = Dequeued;
state = Dequeued;
}
dequeued_cv.notify_one(); dequeued_cv.notify_one();
} }
void wait() void wait()
{ {
CurrentMetrics::Increment scheduled(metrics->scheduled_count);
auto timer = CurrentThread::getProfileEvents().timer(metrics->wait_microseconds);
std::unique_lock lock(mutex); std::unique_lock lock(mutex);
dequeued_cv.wait(lock, [this] { return state == Dequeued; }); dequeued_cv.wait(lock, [this] { return state == Dequeued; });
} }
void finish() void finish(ResourceCost real_cost_, ResourceLink link_)
{ {
// lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread // lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread
chassert(state == Dequeued); chassert(state == Dequeued);
state = Finished; state = Finished;
if (estimated_cost != real_cost_)
link_.queue->adjustBudget(estimated_cost, real_cost_);
ResourceRequest::finish(); ResourceRequest::finish();
ProfileEvents::increment(metrics->requests);
ProfileEvents::increment(metrics->cost, real_cost_);
} }
static Request & local() void assertFinished()
{
// lock(mutex) is not required because `Finished` request cannot be used by the scheduler thread
chassert(state == Finished);
}
static Request & local(const Metrics * metrics)
{ {
// Since single thread cannot use more than one resource request simultaneously, // Since single thread cannot use more than one resource request simultaneously,
// we can reuse thread-local request to avoid allocations // we can reuse thread-local request to avoid allocations
static thread_local Request instance; static thread_local Request instance;
instance.metrics = metrics;
return instance; return instance;
} }
const Metrics * metrics = nullptr; // Must be initialized before use
private: private:
ResourceCost estimated_cost = 0; // Stores initial `cost` value in case budget was used to modify it
std::mutex mutex; std::mutex mutex;
std::condition_variable dequeued_cv; std::condition_variable dequeued_cv;
RequestState state = Finished; RequestState state = Finished;
}; };
/// Creates pending request for resource; blocks while resource is not available (unless `PostponeLocking`) /// Creates pending request for resource; blocks while resource is not available (unless `Lock::Defer`)
explicit ResourceGuard(ResourceLink link_, ResourceCost cost = 1, ResourceGuardCtor ctor = LockStraightAway) explicit ResourceGuard(const Metrics * metrics, ResourceLink link_, ResourceCost cost = 1, ResourceGuard::Lock type = ResourceGuard::Lock::Default)
: link(link_) : link(link_)
, request(Request::local()) , request(Request::local(metrics))
{ {
if (cost == 0) if (cost == 0)
link.queue = nullptr; // Ignore zero-cost requests link.reset(); // Ignore zero-cost requests
else if (link.queue) else if (link)
{ {
request.enqueue(cost, link); request.enqueue(cost, link);
if (ctor == LockStraightAway) if (type == Lock::Default)
request.wait(); request.wait();
} }
} }
@ -112,22 +176,29 @@ public:
/// Blocks until resource is available /// Blocks until resource is available
void lock() void lock()
{ {
if (link.queue) if (link)
request.wait(); request.wait();
} }
/// Report resource consumption has finished void consume(ResourceCost cost)
void unlock()
{ {
if (link.queue) real_cost += cost;
}
/// Report resource consumption has finished
void unlock(ResourceCost consumed = 0)
{
consume(consumed);
if (link)
{ {
request.finish(); request.finish(real_cost, link);
link.queue = nullptr; link.reset();
} }
} }
ResourceLink link; ResourceLink link;
Request & request; Request & request;
ResourceCost real_cost = 0;
}; };
} }

View File

@ -13,13 +13,28 @@ using ResourceCost = Int64;
struct ResourceLink struct ResourceLink
{ {
ISchedulerQueue * queue = nullptr; ISchedulerQueue * queue = nullptr;
bool operator==(const ResourceLink &) const = default; bool operator==(const ResourceLink &) const = default;
explicit operator bool() const { return queue != nullptr; }
void adjust(ResourceCost estimated_cost, ResourceCost real_cost) const; void reset()
{
queue = nullptr;
}
};
void consumed(ResourceCost cost) const; /*
* Everything required for IO scheduling.
* Note that raw pointer are stored inside, so make sure that `ClassifierPtr` that produced
* resource links will outlive them. Usually classifier is stored in query `Context`.
*/
struct IOSchedulingSettings
{
ResourceLink read_resource_link;
ResourceLink write_resource_link;
void accumulate(ResourceCost cost) const; bool operator==(const IOSchedulingSettings &) const = default;
explicit operator bool() const { return read_resource_link && write_resource_link; }
}; };
} }

View File

@ -45,7 +45,7 @@ constexpr ResourceCost ResourceCostMax = std::numeric_limits<int>::max();
class ResourceRequest : public boost::intrusive::list_base_hook<> class ResourceRequest : public boost::intrusive::list_base_hook<>
{ {
public: public:
/// Cost of request execution; should be filled before request enqueueing. /// Cost of request execution; should be filled before request enqueueing and remain constant until `finish()`.
/// NOTE: If cost is not known in advance, ResourceBudget should be used (note that every ISchedulerQueue has it) /// NOTE: If cost is not known in advance, ResourceBudget should be used (note that every ISchedulerQueue has it)
ResourceCost cost; ResourceCost cost;

View File

@ -7,11 +7,11 @@
#include <Common/MemoryTracker.h> #include <Common/MemoryTracker.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/Scheduler/ResourceLink.h>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <functional> #include <functional>
#include <map>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <unordered_set> #include <unordered_set>
@ -188,6 +188,10 @@ public:
Progress progress_in; Progress progress_in;
Progress progress_out; Progress progress_out;
/// IO scheduling
ResourceLink read_resource_link;
ResourceLink write_resource_link;
private: private:
/// Group of threads, to which this thread attached /// Group of threads, to which this thread attached
ThreadGroupPtr thread_group; ThreadGroupPtr thread_group;

View File

@ -210,7 +210,7 @@ namespace
{ {
UInt64 stringToMaxThreads(const String & str) UInt64 stringToMaxThreads(const String & str)
{ {
if (startsWith(str, "auto")) if (startsWith(str, "auto") || startsWith(str, "'auto"))
return 0; return 0;
return parseFromString<UInt64>(str); return parseFromString<UInt64>(str);
} }
@ -237,7 +237,8 @@ SettingFieldMaxThreads & SettingFieldMaxThreads::operator=(const Field & f)
String SettingFieldMaxThreads::toString() const String SettingFieldMaxThreads::toString() const
{ {
if (is_auto) if (is_auto)
return "auto(" + ::DB::toString(value) + ")"; /// Removing quotes here will introduce an incompatibility between replicas with different versions.
return "'auto(" + ::DB::toString(value) + ")'";
else else
return ::DB::toString(value); return ::DB::toString(value);
} }

View File

@ -519,10 +519,10 @@ static DataTypePtr createJSON(const ASTPtr & arguments)
if (!context) if (!context)
context = Context::getGlobalContextInstance(); context = Context::getGlobalContextInstance();
if (context->getSettingsRef().use_json_alias_for_old_object_type) if (context->getSettingsRef().allow_experimental_object_type && context->getSettingsRef().use_json_alias_for_old_object_type)
{ {
if (arguments && !arguments->children.empty()) if (arguments && !arguments->children.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Experimental Object type doesn't support any arguments. If you want to use new JSON type, set setting allow_experimental_json_type = 1"); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Experimental Object type doesn't support any arguments. If you want to use new JSON type, set settings allow_experimental_json_type = 1 and use_json_alias_for_old_object_type = 0");
return std::make_shared<DataTypeObjectDeprecated>("JSON", false); return std::make_shared<DataTypeObjectDeprecated>("JSON", false);
} }

View File

@ -8,6 +8,7 @@
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/Throttler.h> #include <Common/Throttler.h>
#include <Common/Scheduler/ResourceGuard.h>
#include <base/sleep.h> #include <base/sleep.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <IO/SeekableReadBuffer.h> #include <IO/SeekableReadBuffer.h>
@ -113,7 +114,9 @@ bool ReadBufferFromAzureBlobStorage::nextImpl()
{ {
try try
{ {
ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, to_read_bytes);
bytes_read = data_stream->ReadToCount(reinterpret_cast<uint8_t *>(data_ptr), to_read_bytes); bytes_read = data_stream->ReadToCount(reinterpret_cast<uint8_t *>(data_ptr), to_read_bytes);
rlock.unlock(bytes_read); // Do not hold resource under bandwidth throttler
if (read_settings.remote_throttler) if (read_settings.remote_throttler)
read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds); read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
break; break;

View File

@ -101,15 +101,13 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
{ {
try try
{ {
ResourceGuard rlock(write_settings.resource_link, cost); // Note that zero-cost requests are ignored ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, cost); // Note that zero-cost requests are ignored
func(); func();
rlock.unlock(cost);
break; break;
} }
catch (const Azure::Core::RequestFailedException & e) catch (const Azure::Core::RequestFailedException & e)
{ {
if (cost)
write_settings.resource_link.accumulate(cost); // Accumulate resource for later use, because we have failed to consume it
if (i == num_tries - 1 || !isRetryableAzureException(e)) if (i == num_tries - 1 || !isRetryableAzureException(e))
throw; throw;
@ -117,8 +115,6 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
} }
catch (...) catch (...)
{ {
if (cost)
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
throw; throw;
} }
} }

View File

@ -461,14 +461,17 @@ DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
} }
template <class Settings> template <class Settings>
static inline Settings updateResourceLink(const Settings & settings, const String & resource_name) static inline Settings updateIOSchedulingSettings(const Settings & settings, const String & read_resource_name, const String & write_resource_name)
{ {
if (resource_name.empty()) if (read_resource_name.empty() && write_resource_name.empty())
return settings; return settings;
if (auto query_context = CurrentThread::getQueryContext()) if (auto query_context = CurrentThread::getQueryContext())
{ {
Settings result(settings); Settings result(settings);
result.resource_link = query_context->getWorkloadClassifier()->get(resource_name); if (!read_resource_name.empty())
result.io_scheduling.read_resource_link = query_context->getWorkloadClassifier()->get(read_resource_name);
if (!write_resource_name.empty())
result.io_scheduling.write_resource_link = query_context->getWorkloadClassifier()->get(write_resource_name);
return result; return result;
} }
return settings; return settings;
@ -500,7 +503,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
return object_storage->readObjects( return object_storage->readObjects(
storage_objects, storage_objects,
updateResourceLink(settings, getReadResourceName()), updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName()),
read_hint, read_hint,
file_size); file_size);
} }
@ -513,7 +516,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
{ {
LOG_TEST(log, "Write file: {}", path); LOG_TEST(log, "Write file: {}", path);
WriteSettings write_settings = updateResourceLink(settings, getWriteResourceName()); WriteSettings write_settings = updateIOSchedulingSettings(settings, getReadResourceName(), getWriteResourceName());
auto transaction = createObjectStorageTransaction(); auto transaction = createObjectStorageTransaction();
return transaction->writeFile(path, buf_size, mode, write_settings); return transaction->writeFile(path, buf_size, mode, write_settings);
} }

View File

@ -6,7 +6,6 @@
#include <IO/ReadBufferFromIStream.h> #include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromS3.h> #include <IO/ReadBufferFromS3.h>
#include <Common/Scheduler/ResourceGuard.h>
#include <IO/S3/getObjectInfo.h> #include <IO/S3/getObjectInfo.h>
#include <IO/S3/Requests.h> #include <IO/S3/Requests.h>
@ -423,22 +422,13 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3InitMicroseconds); ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3InitMicroseconds);
// We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below // We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below
constexpr ResourceCost estimated_cost = 1; CurrentThread::IOScope io_scope(read_settings.io_scheduling);
ResourceGuard rlock(read_settings.resource_link, estimated_cost);
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req); Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
rlock.unlock();
if (outcome.IsSuccess()) if (outcome.IsSuccess())
{
ResourceCost bytes_read = outcome.GetResult().GetContentLength();
read_settings.resource_link.adjust(estimated_cost, bytes_read);
return outcome.GetResultWithOwnership(); return outcome.GetResultWithOwnership();
}
else else
{ {
read_settings.resource_link.accumulate(estimated_cost);
const auto & error = outcome.GetError(); const auto & error = outcome.GetError();
throw S3Exception(error.GetMessage(), error.GetErrorType()); throw S3Exception(error.GetMessage(), error.GetErrorType());
} }

View File

@ -118,8 +118,7 @@ struct ReadSettings
ThrottlerPtr remote_throttler; ThrottlerPtr remote_throttler;
ThrottlerPtr local_throttler; ThrottlerPtr local_throttler;
// Resource to be used during reading IOSchedulingSettings io_scheduling;
ResourceLink resource_link;
size_t http_max_tries = 10; size_t http_max_tries = 10;
size_t http_retry_initial_backoff_ms = 100; size_t http_retry_initial_backoff_ms = 100;

View File

@ -11,7 +11,6 @@
#include <Common/Throttler.h> #include <Common/Throttler.h>
#include <Interpreters/Cache/FileCache.h> #include <Interpreters/Cache/FileCache.h>
#include <Common/Scheduler/ResourceGuard.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/S3Common.h> #include <IO/S3Common.h>
#include <IO/S3/Requests.h> #include <IO/S3/Requests.h>
@ -558,12 +557,11 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data)
auto & request = std::get<0>(*worker_data); auto & request = std::get<0>(*worker_data);
ResourceCost cost = request.GetContentLength(); CurrentThread::IOScope io_scope(write_settings.io_scheduling);
ResourceGuard rlock(write_settings.resource_link, cost);
Stopwatch watch; Stopwatch watch;
auto outcome = client_ptr->UploadPart(request); auto outcome = client_ptr->UploadPart(request);
watch.stop(); watch.stop();
rlock.unlock(); // Avoid acquiring other locks under resource lock
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
@ -577,7 +575,6 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data)
if (!outcome.IsSuccess()) if (!outcome.IsSuccess())
{ {
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType()); throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
} }
@ -715,12 +712,11 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data
if (client_ptr->isClientForDisk()) if (client_ptr->isClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskS3PutObject); ProfileEvents::increment(ProfileEvents::DiskS3PutObject);
ResourceCost cost = request.GetContentLength(); CurrentThread::IOScope io_scope(write_settings.io_scheduling);
ResourceGuard rlock(write_settings.resource_link, cost);
Stopwatch watch; Stopwatch watch;
auto outcome = client_ptr->PutObject(request); auto outcome = client_ptr->PutObject(request);
watch.stop(); watch.stop();
rlock.unlock();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (blob_log) if (blob_log)
@ -734,7 +730,6 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data
} }
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
{ {

View File

@ -13,8 +13,7 @@ struct WriteSettings
ThrottlerPtr remote_throttler; ThrottlerPtr remote_throttler;
ThrottlerPtr local_throttler; ThrottlerPtr local_throttler;
// Resource to be used during reading IOSchedulingSettings io_scheduling;
ResourceLink resource_link;
/// Filesystem cache settings /// Filesystem cache settings
bool enable_filesystem_cache_on_write_operations = false; bool enable_filesystem_cache_on_write_operations = false;

View File

@ -821,6 +821,19 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
{ {
properties.indices = as_storage_metadata->getSecondaryIndices(); properties.indices = as_storage_metadata->getSecondaryIndices();
properties.projections = as_storage_metadata->getProjections().clone(); properties.projections = as_storage_metadata->getProjections().clone();
/// CREATE TABLE AS should copy PRIMARY KEY, ORDER BY, and similar clauses.
if (!create.storage->primary_key && as_storage_metadata->isPrimaryKeyDefined() && as_storage_metadata->hasPrimaryKey())
create.storage->set(create.storage->primary_key, as_storage_metadata->getPrimaryKeyAST()->clone());
if (!create.storage->partition_by && as_storage_metadata->isPartitionKeyDefined() && as_storage_metadata->hasPartitionKey())
create.storage->set(create.storage->partition_by, as_storage_metadata->getPartitionKeyAST()->clone());
if (!create.storage->order_by && as_storage_metadata->isSortingKeyDefined() && as_storage_metadata->hasSortingKey())
create.storage->set(create.storage->order_by, as_storage_metadata->getSortingKeyAST()->clone());
if (!create.storage->sample_by && as_storage_metadata->isSamplingKeyDefined() && as_storage_metadata->hasSamplingKey())
create.storage->set(create.storage->sample_by, as_storage_metadata->getSamplingKeyAST()->clone());
} }
else else
{ {

View File

@ -494,6 +494,12 @@ JoinClausesAndActions buildJoinClausesAndActions(
necessary_names.push_back(name); necessary_names.push_back(name);
}; };
bool is_join_with_special_storage = false;
if (const auto * right_table_node = join_node.getRightTableExpression()->as<TableNode>())
{
is_join_with_special_storage = dynamic_cast<const StorageJoin *>(right_table_node->getStorage().get());
}
for (auto & join_clause : result.join_clauses) for (auto & join_clause : result.join_clauses)
{ {
const auto & left_filter_condition_nodes = join_clause.getLeftFilterConditionNodes(); const auto & left_filter_condition_nodes = join_clause.getLeftFilterConditionNodes();
@ -561,7 +567,7 @@ JoinClausesAndActions buildJoinClausesAndActions(
if (!left_key_node->result_type->equals(*common_type)) if (!left_key_node->result_type->equals(*common_type))
left_key_node = &left_join_actions.addCast(*left_key_node, common_type, {}); left_key_node = &left_join_actions.addCast(*left_key_node, common_type, {});
if (!right_key_node->result_type->equals(*common_type)) if (!is_join_with_special_storage && !right_key_node->result_type->equals(*common_type))
right_key_node = &right_join_actions.addCast(*right_key_node, common_type, {}); right_key_node = &right_join_actions.addCast(*right_key_node, common_type, {});
} }

View File

@ -119,27 +119,16 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
return false; return false;
} }
ResourceGuard rlock(read_settings.resource_link, num_bytes_to_read); ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, num_bytes_to_read);
int bytes_read; int bytes_read = hdfsRead(fs.get(), fin, internal_buffer.begin(), safe_cast<int>(num_bytes_to_read));
try rlock.unlock(std::max(0, bytes_read));
{
bytes_read = hdfsRead(fs.get(), fin, internal_buffer.begin(), safe_cast<int>(num_bytes_to_read));
}
catch (...)
{
read_settings.resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure
throw;
}
rlock.unlock();
if (bytes_read < 0) if (bytes_read < 0)
{ {
read_settings.resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure
throw Exception(ErrorCodes::NETWORK_ERROR, throw Exception(ErrorCodes::NETWORK_ERROR,
"Fail to read from HDFS: {}, file path: {}. Error: {}", "Fail to read from HDFS: {}, file path: {}. Error: {}",
hdfs_uri, hdfs_file_path, std::string(hdfsGetLastError())); hdfs_uri, hdfs_file_path, std::string(hdfsGetLastError()));
} }
read_settings.resource_link.adjust(num_bytes_to_read, bytes_read);
if (bytes_read) if (bytes_read)
{ {

View File

@ -66,25 +66,12 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
int write(const char * start, size_t size) int write(const char * start, size_t size)
{ {
ResourceGuard rlock(write_settings.resource_link, size); ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, size);
int bytes_written; int bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast<int>(size));
try rlock.unlock(std::max(0, bytes_written));
{
bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast<int>(size));
}
catch (...)
{
write_settings.resource_link.accumulate(size); // We assume no resource was used in case of failure
throw;
}
rlock.unlock();
if (bytes_written < 0) if (bytes_written < 0)
{
write_settings.resource_link.accumulate(size); // We assume no resource was used in case of failure
throw Exception(ErrorCodes::NETWORK_ERROR, "Fail to write HDFS file: {} {}", hdfs_uri, std::string(hdfsGetLastError())); throw Exception(ErrorCodes::NETWORK_ERROR, "Fail to write HDFS file: {} {}", hdfs_uri, std::string(hdfsGetLastError()));
}
write_settings.resource_link.adjust(size, bytes_written);
if (write_settings.remote_throttler) if (write_settings.remote_throttler)
write_settings.remote_throttler->add(bytes_written, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds); write_settings.remote_throttler->add(bytes_written, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds);

View File

@ -221,14 +221,17 @@ void StorageReplicatedMergeTree::setZooKeeper()
/// strange effects. So we always use only one session for all tables. /// strange effects. So we always use only one session for all tables.
/// (excluding auxiliary zookeepers) /// (excluding auxiliary zookeepers)
std::lock_guard lock(current_zookeeper_mutex);
if (zookeeper_name == default_zookeeper_name) if (zookeeper_name == default_zookeeper_name)
{ {
current_zookeeper = getContext()->getZooKeeper(); auto new_keeper = getContext()->getZooKeeper();
std::lock_guard lock(current_zookeeper_mutex);
current_zookeeper = new_keeper;
} }
else else
{ {
current_zookeeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name); auto new_keeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
std::lock_guard lock(current_zookeeper_mutex);
current_zookeeper = new_keeper;
} }
} }
@ -365,7 +368,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_name); bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_name);
if (has_zookeeper) if (has_zookeeper)
{ {
/// It's possible for getZooKeeper() to timeout if zookeeper host(s) can't /// It's possible for getZooKeeper() to timeout if zookeeper host(s) can't
/// be reached. In such cases Poco::Exception is thrown after a connection /// be reached. In such cases Poco::Exception is thrown after a connection
/// timeout - refer to src/Common/ZooKeeper/ZooKeeperImpl.cpp:866 for more info. /// timeout - refer to src/Common/ZooKeeper/ZooKeeperImpl.cpp:866 for more info.
/// ///

View File

@ -288,7 +288,7 @@ def generate_description(item: PullRequest, repo: Repository) -> Optional[Descri
# Normalize bug fixes # Normalize bug fixes
if ( if (
re.match( re.match(
r".*(?i)bug\Wfix", r"(?i).*bug\Wfix",
category, category,
) )
# Map "Critical Bug Fix" to "Bug fix" category for changelog # Map "Critical Bug Fix" to "Bug fix" category for changelog

View File

@ -46,7 +46,7 @@ def test_cgroup_cpu_limit():
"clickhouse local -q \"select value from system.settings where name='max_threads'\"", "clickhouse local -q \"select value from system.settings where name='max_threads'\"",
num_cpus, num_cpus,
) )
expect_output = (r"auto({})".format(math.ceil(num_cpus))).encode() expect_output = (r"\'auto({})\'".format(math.ceil(num_cpus))).encode()
assert ( assert (
result.strip() == expect_output result.strip() == expect_output
), f"fail for cpu limit={num_cpus}, result={result.strip()}, expect={expect_output}" ), f"fail for cpu limit={num_cpus}, result={result.strip()}, expect={expect_output}"

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
<clickhouse>
<logger>
<level>test</level>
</logger>
</clickhouse>

View File

@ -3,6 +3,7 @@
import pytest import pytest
import time import time
import threading import threading
import uuid
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from multiprocessing.dummy import Pool from multiprocessing.dummy import Pool
from helpers.network import PartitionManager from helpers.network import PartitionManager
@ -10,8 +11,12 @@ from helpers.client import QueryRuntimeException
from helpers.test_tools import assert_eq_with_retry from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
node1 = cluster.add_instance("node1", with_zookeeper=True) "node1",
main_configs=["configs/storage_conf.xml"],
with_zookeeper=True,
with_minio=True,
)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
@ -25,10 +30,16 @@ def started_cluster():
cluster.shutdown() cluster.shutdown()
def test_replica_inserts_with_keeper_restart(started_cluster): @pytest.mark.parametrize(
"engine,storage_policy",
[
("ReplicatedMergeTree", "default"),
],
)
def test_replica_inserts_with_keeper_restart(started_cluster, engine, storage_policy):
try: try:
node1.query( node1.query(
"CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '0') ORDER BY tuple()" f"CREATE TABLE r (a UInt64, b String) ENGINE={engine}('/test/r', '0') ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'"
) )
p = Pool(1) p = Pool(1)
@ -60,10 +71,18 @@ def test_replica_inserts_with_keeper_restart(started_cluster):
node1.query("DROP TABLE IF EXISTS r SYNC") node1.query("DROP TABLE IF EXISTS r SYNC")
def test_replica_inserts_with_keeper_disconnect(started_cluster): @pytest.mark.parametrize(
"engine,storage_policy",
[
("ReplicatedMergeTree", "default"),
],
)
def test_replica_inserts_with_keeper_disconnect(
started_cluster, engine, storage_policy
):
try: try:
node1.query( node1.query(
"CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '0') ORDER BY tuple()" f"CREATE TABLE r2 (a UInt64, b String) ENGINE={engine}('/test/r2', '0') ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'"
) )
p = Pool(1) p = Pool(1)
@ -84,26 +103,32 @@ def test_replica_inserts_with_keeper_disconnect(started_cluster):
disconnect_event.wait(90) disconnect_event.wait(90)
node1.query( node1.query(
"INSERT INTO r SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=20" "INSERT INTO r2 SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=20"
) )
node1.query( node1.query(
"INSERT INTO r SELECT number, toString(number) FROM numbers(10, 10) SETTINGS insert_keeper_max_retries=20" "INSERT INTO r2 SELECT number, toString(number) FROM numbers(10, 10) SETTINGS insert_keeper_max_retries=20"
) )
job.wait() job.wait()
p.close() p.close()
p.join() p.join()
assert node1.query("SELECT COUNT() FROM r") == "20\n" assert node1.query("SELECT COUNT() FROM r2") == "20\n"
finally: finally:
node1.query("DROP TABLE IF EXISTS r SYNC") node1.query("DROP TABLE IF EXISTS r2 SYNC")
def test_query_timeout_with_zk_down(started_cluster): @pytest.mark.parametrize(
"engine,storage_policy",
[
("ReplicatedMergeTree", "default"),
],
)
def test_query_timeout_with_zk_down(started_cluster, engine, storage_policy):
try: try:
node1.query( node1.query(
"CREATE TABLE zk_down (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/zk_down', '0') ORDER BY tuple()" f"CREATE TABLE zk_down (a UInt64, b String) ENGINE={engine}('/test/zk_down', '0') ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'"
) )
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"]) cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
@ -118,3 +143,45 @@ def test_query_timeout_with_zk_down(started_cluster):
finally: finally:
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"]) cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
node1.query("DROP TABLE IF EXISTS zk_down SYNC") node1.query("DROP TABLE IF EXISTS zk_down SYNC")
@pytest.mark.parametrize(
"engine,storage_policy",
[
("ReplicatedMergeTree", "default"),
],
)
def test_retries_should_not_wait_for_global_connection(
started_cluster, engine, storage_policy
):
pm = PartitionManager()
try:
node1.query(
f"CREATE TABLE zk_down_retries (a UInt64, b String) ENGINE={engine}('/test/zk_down', '0') ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'"
)
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
# Apart from stopping keepers, we introduce a network delay to make connection retries slower
# We want to check that retries are not blocked during that time
pm.add_network_delay(node1, 1000)
query_id = uuid.uuid4()
with pytest.raises(QueryRuntimeException):
node1.query(
"INSERT INTO zk_down_retries SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=10, insert_keeper_retry_max_backoff_ms=100",
query_id=str(query_id),
)
pm.heal_all()
# Use query_log for execution time since we want to ignore the network delay introduced (also in client)
node1.query("SYSTEM FLUSH LOGS")
res = node1.query(
f"SELECT query_duration_ms FROM system.query_log WHERE type != 'QueryStart' AND query_id = '{query_id}'"
)
query_duration = int(res)
# It should be around 1 second. 5 seconds is being generous (debug and so on). Used to take 35 seconds without the fix
assert query_duration < 5000
finally:
pm.heal_all()
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
node1.query("DROP TABLE IF EXISTS zk_down_retries SYNC")

View File

@ -69,6 +69,124 @@ def update_workloads_config(**settings):
node.query("system reload config") node.query("system reload config")
def check_profile_event_for_query(workload, profile_event, amount=1):
node.query("system flush logs")
query_pattern = f"workload='{workload}'".replace("'", "\\'")
assert (
int(
node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query ilike '%{query_pattern}%' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
)
)
== amount
)
def test_s3_resource_request_granularity():
node.query(
f"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE), value String CODEC(NONE)) engine=MergeTree() order by key settings min_bytes_for_wide_part=1e9, storage_policy='s3';
"""
)
total_bytes = 50000000 # Approximate data size
max_bytes_per_request = 2000000 # Should be ~1MB or less in general
min_bytes_per_request = 6000 # Small requests are ok, but we don't want hurt performance with too often resource requests
writes_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/admin'"
).strip()
)
write_bytes_before = int(
node.query(
f"select dequeued_cost from system.scheduler where resource='network_write' and path='/prio/admin'"
).strip()
)
write_budget_before = int(
node.query(
f"select budget from system.scheduler where resource='network_write' and path='/prio/admin'"
).strip()
)
node.query(
f"insert into data select number, randomString(10000000) from numbers(5) SETTINGS workload='admin'"
)
writes_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/admin'"
).strip()
)
write_bytes_after = int(
node.query(
f"select dequeued_cost from system.scheduler where resource='network_write' and path='/prio/admin'"
).strip()
)
write_budget_after = int(
node.query(
f"select budget from system.scheduler where resource='network_write' and path='/prio/admin'"
).strip()
)
write_requests = writes_after - writes_before
write_bytes = (write_bytes_after - write_bytes_before) - (
write_budget_after - write_budget_before
)
assert write_bytes > 1.0 * total_bytes
assert write_bytes < 1.05 * total_bytes
assert write_bytes / write_requests < max_bytes_per_request
assert write_bytes / write_requests > min_bytes_per_request
check_profile_event_for_query("admin", "SchedulerIOWriteRequests", write_requests)
check_profile_event_for_query("admin", "SchedulerIOWriteBytes", write_bytes)
node.query(f"optimize table data final")
reads_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/admin'"
).strip()
)
read_bytes_before = int(
node.query(
f"select dequeued_cost from system.scheduler where resource='network_read' and path='/prio/admin'"
).strip()
)
read_budget_before = int(
node.query(
f"select budget from system.scheduler where resource='network_read' and path='/prio/admin'"
).strip()
)
node.query(
f"select count() from data where not ignore(*) SETTINGS workload='admin'"
)
reads_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/admin'"
).strip()
)
read_bytes_after = int(
node.query(
f"select dequeued_cost from system.scheduler where resource='network_read' and path='/prio/admin'"
).strip()
)
read_budget_after = int(
node.query(
f"select budget from system.scheduler where resource='network_read' and path='/prio/admin'"
).strip()
)
read_bytes = (read_bytes_after - read_bytes_before) - (
read_budget_after - read_budget_before
)
read_requests = reads_after - reads_before
assert read_bytes > 1.0 * total_bytes
assert read_bytes < 1.05 * total_bytes
assert read_bytes / read_requests < max_bytes_per_request
assert read_bytes / read_requests > min_bytes_per_request
check_profile_event_for_query("admin", "SchedulerIOReadRequests", read_requests)
check_profile_event_for_query("admin", "SchedulerIOReadBytes", read_bytes)
def test_s3_disk(): def test_s3_disk():
node.query( node.query(
f""" f"""

View File

@ -0,0 +1,70 @@
-------------- Test copy sorting clauses from source table --------------
CREATE TABLE default.x
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS index_granularity = 8192
-------------------------------------------------------------------------
CREATE TABLE default.x_as
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1, index_granularity = 8192
-------------- Test copy sorting clauses from destination table (source table without the same type clauses) --------------
CREATE TABLE default.x
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
PRIMARY KEY (CounterID, EventDate, intHash32(UserID))
ORDER BY (CounterID, EventDate, intHash32(UserID))
SETTINGS index_granularity = 8192
-------------------------------------------------------------------------
CREATE TABLE default.x_as
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
PRIMARY KEY (CounterID, EventDate, intHash32(UserID))
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1, index_granularity = 8192
-------------- Test copy sorting clauses from destination table (source table with the same type clauses) --------------
CREATE TABLE default.x
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
ORDER BY CounterID
SETTINGS index_granularity = 8192
-------------------------------------------------------------------------
CREATE TABLE default.x_as
(
`CounterID` UInt32,
`EventDate` Date,
`UserID` UInt64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1, index_granularity = 8192

View File

@ -0,0 +1,37 @@
DROP TABLE IF EXISTS x;
DROP TABLE IF EXISTS x_as;
SELECT '-------------- Test copy sorting clauses from source table --------------';
CREATE TABLE x (`CounterID` UInt32, `EventDate` Date, `UserID` UInt64) ENGINE = MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID);
CREATE TABLE x_as AS x ENGINE = MergeTree SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1;
SHOW CREATE TABLE x FORMAT TSVRaw;
SELECT '-------------------------------------------------------------------------';
SHOW CREATE TABLE x_as FORMAT TSVRaw;
DROP TABLE x;
DROP TABLE x_as;
SELECT '-------------- Test copy sorting clauses from destination table (source table without the same type clauses) --------------';
CREATE TABLE x (`CounterID` UInt32, `EventDate` Date, `UserID` UInt64) ENGINE = MergeTree PRIMARY KEY (CounterID, EventDate, intHash32(UserID));
CREATE TABLE x_as AS x ENGINE = MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1;
SHOW CREATE TABLE x FORMAT TSVRaw;
SELECT '-------------------------------------------------------------------------';
SHOW CREATE TABLE x_as FORMAT TSVRaw;
DROP TABLE x;
DROP TABLE x_as;
SELECT '-------------- Test copy sorting clauses from destination table (source table with the same type clauses) --------------';
CREATE TABLE x (`CounterID` UInt32, `EventDate` Date, `UserID` UInt64) ENGINE = MergeTree ORDER BY (CounterID);
CREATE TABLE x_as AS x ENGINE = MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1;
SHOW CREATE TABLE x FORMAT TSVRaw;
SELECT '-------------------------------------------------------------------------';
SHOW CREATE TABLE x_as FORMAT TSVRaw;
DROP TABLE x;
DROP TABLE x_as;

View File

@ -0,0 +1,23 @@
-----
1 1 1 a 1 A 1 A
2 2 2 b 2 B 2 B
-----
\N \N \N 0 3 B
1 1 1 a 1 A 1 A
2 2 2 b 2 B 2 B
-----
1 1 1 a 1 A 1 A
2 2 2 b 2 B 2 B
\N \N \N \N 3 B \N \N
\N \N \N \N \N \N 3 B
-----
\N \N \N 3 3 B 0 0
\N \N \N 0 0 3 3 B
1 1 1 a 1 1 A 1 1 A
2 2 2 b 2 2 B 2 2 B
-----
3 3 \N B B
1 1 1 a A A
2 2 2 b B B
-----
7

View File

@ -0,0 +1,84 @@
#!/usr/bin/env -S ${HOME}/clickhouse-client --queries-file
DROP TABLE IF EXISTS tab;
CREATE TABLE tab ( `k` Nullable(UInt32), `k1` Nullable(UInt32), `k2` Nullable(UInt32), `v` String ) ENGINE = Memory;
INSERT INTO tab VALUES (1, 1, 1, 'a'), (2, 2, 2, 'b');
DROP TABLE IF EXISTS mem;
CREATE TABLE mem ( `k` UInt64, `v` String ) ENGINE = Join(ANY, LEFT, k);
INSERT INTO mem VALUES (1, 'A'), (2, 'B'), (3, 'B');
DROP TABLE IF EXISTS mem2;
CREATE TABLE mem2 ( `k` UInt64, `v` String ) ENGINE = Join(ANY, RIGHT, k);
INSERT INTO mem2 VALUES (1, 'A'), (2, 'B'), (3, 'B');
DROP TABLE IF EXISTS mem3;
CREATE TABLE mem3 ( `k` UInt64, `v` String ) ENGINE = Join(ALL, FULL, k) SETTINGS join_use_nulls = 1;
INSERT INTO mem3 VALUES (1, 'A'), (2, 'B'), (3, 'B');
DROP TABLE IF EXISTS mem4;
CREATE TABLE mem4 ( `k1` UInt64, `k2` UInt64, `v` String ) ENGINE = Join(ALL, FULL, k1, k2);
INSERT INTO mem4 VALUES (1, 1, 'A'), (2, 2, 'B'), (3, 3, 'B');
SET allow_experimental_analyzer = 1;
SELECT '-----';
SELECT *
FROM tab
ANY LEFT JOIN mem ON k1 = mem.k
ANY LEFT JOIN mem AS t ON k2 = t.k
ORDER BY tab.v
;
SELECT '-----';
SELECT *
FROM tab
ANY LEFT JOIN mem ON k1 = mem.k
ANY RIGHT JOIN mem2 ON k2 = mem2.k
ORDER BY tab.v
;
SELECT '-----';
SELECT *
FROM tab
FULL JOIN mem3 AS t1 ON k1 = t1.k
FULL JOIN mem3 AS t2 ON k2 = t2.k
ORDER BY tab.v
SETTINGS join_use_nulls = 1
;
SELECT '-----';
SELECT *
FROM tab
FULL JOIN mem4 AS t1 ON tab.k1 = t1.k1 AND tab.k2 = t1.k2
FULL JOIN mem4 AS t2 ON tab.k1 = t2.k1 AND tab.k2 = t2.k2
ORDER BY tab.v
;
SELECT '-----';
SELECT *
FROM tab
FULL JOIN mem4 AS t1 USING (k1, k2)
FULL JOIN mem4 AS t2 USING (k1, k2)
ORDER BY tab.v
;
SELECT '-----';
SELECT count() FROM (
EXPLAIN PLAN
SELECT * FROM tab
ANY LEFT JOIN mem AS t1 ON tab.k = t1.k
ANY LEFT JOIN mem AS t2 ON tab.k = t2.k
ANY LEFT JOIN mem AS t3 ON tab.k = t3.k
ANY LEFT JOIN mem AS t4 ON tab.k = t4.k
ANY RIGHT JOIN mem2 AS t5 ON tab.k = t5.k
ANY LEFT JOIN mem AS t6 ON tab.k = t6.k
ANY LEFT JOIN mem AS t7 ON tab.k = t7.k
)
WHERE explain like '%FilledJoin%'
;

View File

@ -1,2 +1,3 @@
{"a":"42"} JSON {"a":"42"} JSON
{"a":42} Object(\'json\') {"a":42} Object(\'json\')
{"a":"42"} JSON

View File

@ -6,3 +6,6 @@ set use_json_alias_for_old_object_type=1;
select '{"a" : 42}'::JSON as json, toTypeName(json); select '{"a" : 42}'::JSON as json, toTypeName(json);
select '{"a" : 42}'::JSON(max_dynamic_paths=100) as json, toTypeName(json); -- {serverError BAD_ARGUMENTS} select '{"a" : 42}'::JSON(max_dynamic_paths=100) as json, toTypeName(json); -- {serverError BAD_ARGUMENTS}
set allow_experimental_object_type = 0;
select materialize('{"a" : 42}')::JSON as json, toTypeName(json);

View File

@ -4,10 +4,12 @@ v24.7.4.51-stable 2024-08-23
v24.7.3.42-stable 2024-08-08 v24.7.3.42-stable 2024-08-08
v24.7.2.13-stable 2024-08-01 v24.7.2.13-stable 2024-08-01
v24.7.1.2915-stable 2024-07-30 v24.7.1.2915-stable 2024-07-30
v24.6.5.30-stable 2024-09-03
v24.6.4.42-stable 2024-08-23 v24.6.4.42-stable 2024-08-23
v24.6.3.95-stable 2024-08-06 v24.6.3.95-stable 2024-08-06
v24.6.2.17-stable 2024-07-05 v24.6.2.17-stable 2024-07-05
v24.6.1.4423-stable 2024-07-01 v24.6.1.4423-stable 2024-07-01
v24.5.7.31-stable 2024-09-03
v24.5.6.45-stable 2024-08-23 v24.5.6.45-stable 2024-08-23
v24.5.5.78-stable 2024-08-05 v24.5.5.78-stable 2024-08-05
v24.5.4.49-stable 2024-07-01 v24.5.4.49-stable 2024-07-01
@ -18,6 +20,7 @@ v24.4.4.113-stable 2024-08-02
v24.4.3.25-stable 2024-06-14 v24.4.3.25-stable 2024-06-14
v24.4.2.141-stable 2024-06-07 v24.4.2.141-stable 2024-06-07
v24.4.1.2088-stable 2024-05-01 v24.4.1.2088-stable 2024-05-01
v24.3.10.33-lts 2024-09-03
v24.3.9.5-lts 2024-08-22 v24.3.9.5-lts 2024-08-22
v24.3.8.13-lts 2024-08-20 v24.3.8.13-lts 2024-08-20
v24.3.7.30-lts 2024-08-14 v24.3.7.30-lts 2024-08-14

1 v24.8.2.3-lts 2024-08-22
4 v24.7.3.42-stable 2024-08-08
5 v24.7.2.13-stable 2024-08-01
6 v24.7.1.2915-stable 2024-07-30
7 v24.6.5.30-stable 2024-09-03
8 v24.6.4.42-stable 2024-08-23
9 v24.6.3.95-stable 2024-08-06
10 v24.6.2.17-stable 2024-07-05
11 v24.6.1.4423-stable 2024-07-01
12 v24.5.7.31-stable 2024-09-03
13 v24.5.6.45-stable 2024-08-23
14 v24.5.5.78-stable 2024-08-05
15 v24.5.4.49-stable 2024-07-01
20 v24.4.3.25-stable 2024-06-14
21 v24.4.2.141-stable 2024-06-07
22 v24.4.1.2088-stable 2024-05-01
23 v24.3.10.33-lts 2024-09-03
24 v24.3.9.5-lts 2024-08-22
25 v24.3.8.13-lts 2024-08-20
26 v24.3.7.30-lts 2024-08-14