Merge branch 'ClickHouse:master' into 59557_form_input_format

This commit is contained in:
Shaun Struwig 2024-04-09 18:11:06 +02:00 committed by GitHub
commit d5fe715b3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
264 changed files with 3255 additions and 549 deletions

View File

@ -119,7 +119,6 @@ Checks: [
'-readability-named-parameter',
'-readability-redundant-declaration',
'-readability-simplify-boolean-expr',
'-readability-static-accessed-through-instance',
'-readability-suspicious-call-argument',
'-readability-uppercase-literal-suffix',
'-readability-use-anyofallof',

View File

@ -157,7 +157,7 @@ jobs:
################################# Stage Final #################################
#
FinishCheck:
if: ${{ !failure() && !cancelled() }}
if: ${{ !failure() && !cancelled() && github.event_name != 'merge_group' }}
needs: [Tests_1, Tests_2]
runs-on: [self-hosted, style-checker]
steps:

View File

@ -213,6 +213,19 @@ namespace Net
Poco::Timespan getKeepAliveTimeout() const;
/// Returns the connection timeout for HTTP connections.
void setKeepAliveMaxRequests(int max_requests);
int getKeepAliveMaxRequests() const;
int getKeepAliveRequest() const;
bool isKeepAliveExpired(double reliability = 1.0) const;
/// Returns if the connection is expired with some margin as fraction of timeout as reliability
double getKeepAliveReliability() const;
/// Returns the current fraction of keep alive timeout when connection is considered safe to use
/// It helps to avoid situation when a client uses nearly expired connection and receives NoMessageException
virtual std::ostream & sendRequest(HTTPRequest & request);
/// Sends the header for the given HTTP request to
/// the server.
@ -345,6 +358,8 @@ namespace Net
void assign(HTTPClientSession & session);
void setKeepAliveRequest(int request);
HTTPSessionFactory _proxySessionFactory;
/// Factory to create HTTPClientSession to proxy.
private:
@ -353,6 +368,8 @@ namespace Net
Poco::UInt16 _port;
ProxyConfig _proxyConfig;
Poco::Timespan _keepAliveTimeout;
int _keepAliveCurrentRequest = 0;
int _keepAliveMaxRequests = 1000;
Poco::Timestamp _lastRequest;
bool _reconnect;
bool _mustReconnect;
@ -361,6 +378,7 @@ namespace Net
Poco::SharedPtr<std::ostream> _pRequestStream;
Poco::SharedPtr<std::istream> _pResponseStream;
static const double _defaultKeepAliveReliabilityLevel;
static ProxyConfig _globalProxyConfig;
HTTPClientSession(const HTTPClientSession &);
@ -450,9 +468,19 @@ namespace Net
return _lastRequest;
}
inline void HTTPClientSession::setLastRequest(Poco::Timestamp time)
inline double HTTPClientSession::getKeepAliveReliability() const
{
_lastRequest = time;
return _defaultKeepAliveReliabilityLevel;
}
inline int HTTPClientSession::getKeepAliveMaxRequests() const
{
return _keepAliveMaxRequests;
}
inline int HTTPClientSession::getKeepAliveRequest() const
{
return _keepAliveCurrentRequest;
}
}

View File

@ -120,6 +120,10 @@ namespace Net
/// The value is set to "Keep-Alive" if keepAlive is
/// true, or to "Close" otherwise.
void setKeepAliveTimeout(int timeout, int max_requests);
int getKeepAliveTimeout() const;
int getKeepAliveMaxRequests() const;
bool getKeepAlive() const;
/// Returns true if
/// * the message has a Connection header field and its value is "Keep-Alive"

View File

@ -44,7 +44,7 @@ namespace Net
/// - timeout: 60 seconds
/// - keepAlive: true
/// - maxKeepAliveRequests: 0
/// - keepAliveTimeout: 10 seconds
/// - keepAliveTimeout: 15 seconds
void setServerName(const std::string & serverName);
/// Sets the name and port (name:port) that the server uses to identify itself.

View File

@ -56,6 +56,8 @@ namespace Net
SocketAddress serverAddress();
/// Returns the server's address.
void setKeepAliveTimeout(Poco::Timespan keepAliveTimeout);
private:
bool _firstRequest;
Poco::Timespan _keepAliveTimeout;

View File

@ -37,6 +37,7 @@ namespace Net {
HTTPClientSession::ProxyConfig HTTPClientSession::_globalProxyConfig;
const double HTTPClientSession::_defaultKeepAliveReliabilityLevel = 0.9;
HTTPClientSession::HTTPClientSession():
@ -220,7 +221,41 @@ void HTTPClientSession::setGlobalProxyConfig(const ProxyConfig& config)
void HTTPClientSession::setKeepAliveTimeout(const Poco::Timespan& timeout)
{
_keepAliveTimeout = timeout;
if (connected())
{
throw Poco::IllegalStateException("cannot change keep alive timeout on initiated connection, "
"That value is managed privately after connection is established.");
}
_keepAliveTimeout = timeout;
}
void HTTPClientSession::setKeepAliveMaxRequests(int max_requests)
{
if (connected())
{
throw Poco::IllegalStateException("cannot change keep alive max requests on initiated connection, "
"That value is managed privately after connection is established.");
}
_keepAliveMaxRequests = max_requests;
}
void HTTPClientSession::setKeepAliveRequest(int request)
{
_keepAliveCurrentRequest = request;
}
void HTTPClientSession::setLastRequest(Poco::Timestamp time)
{
if (connected())
{
throw Poco::IllegalStateException("cannot change last request on initiated connection, "
"That value is managed privately after connection is established.");
}
_lastRequest = time;
}
@ -231,6 +266,8 @@ std::ostream& HTTPClientSession::sendRequest(HTTPRequest& request)
clearException();
_responseReceived = false;
_keepAliveCurrentRequest += 1;
bool keepAlive = getKeepAlive();
if (((connected() && !keepAlive) || mustReconnect()) && !_host.empty())
{
@ -241,8 +278,10 @@ std::ostream& HTTPClientSession::sendRequest(HTTPRequest& request)
{
if (!connected())
reconnect();
if (!keepAlive)
request.setKeepAlive(false);
if (!request.has(HTTPMessage::CONNECTION))
request.setKeepAlive(keepAlive);
if (keepAlive && !request.has(HTTPMessage::CONNECTION_KEEP_ALIVE) && _keepAliveTimeout.totalSeconds() > 0)
request.setKeepAliveTimeout(_keepAliveTimeout.totalSeconds(), _keepAliveMaxRequests);
if (!request.has(HTTPRequest::HOST) && !_host.empty())
request.setHost(_host, _port);
if (!_proxyConfig.host.empty() && !bypassProxy())
@ -324,6 +363,17 @@ std::istream& HTTPClientSession::receiveResponse(HTTPResponse& response)
_mustReconnect = getKeepAlive() && !response.getKeepAlive();
if (!_mustReconnect)
{
/// when server sends its keep alive timeout, client has to follow that value
auto timeout = response.getKeepAliveTimeout();
if (timeout > 0)
_keepAliveTimeout = std::min(_keepAliveTimeout, Poco::Timespan(timeout, 0));
auto max_requests = response.getKeepAliveMaxRequests();
if (max_requests > 0)
_keepAliveMaxRequests = std::min(_keepAliveMaxRequests, max_requests);
}
if (!_expectResponseBody || response.getStatus() < 200 || response.getStatus() == HTTPResponse::HTTP_NO_CONTENT || response.getStatus() == HTTPResponse::HTTP_NOT_MODIFIED)
_pResponseStream = new HTTPFixedLengthInputStream(*this, 0);
else if (response.getChunkedTransferEncoding())
@ -430,15 +480,18 @@ std::string HTTPClientSession::proxyRequestPrefix() const
return result;
}
bool HTTPClientSession::isKeepAliveExpired(double reliability) const
{
Poco::Timestamp now;
return Timespan(Timestamp::TimeDiff(reliability *_keepAliveTimeout.totalMicroseconds())) <= now - _lastRequest
|| _keepAliveCurrentRequest > _keepAliveMaxRequests;
}
bool HTTPClientSession::mustReconnect() const
{
if (!_mustReconnect)
{
Poco::Timestamp now;
return _keepAliveTimeout <= now - _lastRequest;
}
else return true;
return isKeepAliveExpired(_defaultKeepAliveReliabilityLevel);
return true;
}
@ -511,14 +564,21 @@ void HTTPClientSession::assign(Poco::Net::HTTPClientSession & session)
if (buffered())
throw Poco::LogicException("assign to a session with not empty buffered data");
attachSocket(session.detachSocket());
setLastRequest(session.getLastRequest());
poco_assert(!connected());
setResolvedHost(session.getResolvedHost());
setKeepAlive(session.getKeepAlive());
setProxyConfig(session.getProxyConfig());
setTimeout(session.getConnectionTimeout(), session.getSendTimeout(), session.getReceiveTimeout());
setKeepAlive(session.getKeepAlive());
setLastRequest(session.getLastRequest());
setKeepAliveTimeout(session.getKeepAliveTimeout());
setProxyConfig(session.getProxyConfig());
_keepAliveMaxRequests = session._keepAliveMaxRequests;
_keepAliveCurrentRequest = session._keepAliveCurrentRequest;
attachSocket(session.detachSocket());
session.reset();
}

View File

@ -17,6 +17,7 @@
#include "Poco/NumberFormatter.h"
#include "Poco/NumberParser.h"
#include "Poco/String.h"
#include <format>
using Poco::NumberFormatter;
@ -179,4 +180,51 @@ bool HTTPMessage::getKeepAlive() const
}
void HTTPMessage::setKeepAliveTimeout(int timeout, int max_requests)
{
add(HTTPMessage::CONNECTION_KEEP_ALIVE, std::format("timeout={}, max={}", timeout, max_requests));
}
int parseFromHeaderValues(const std::string_view header_value, const std::string_view param_name)
{
auto param_value_pos = header_value.find(param_name);
if (param_value_pos == std::string::npos)
param_value_pos = header_value.size();
if (param_value_pos != header_value.size())
param_value_pos += param_name.size();
auto param_value_end = header_value.find(',', param_value_pos);
if (param_value_end == std::string::npos)
param_value_end = header_value.size();
auto timeout_value_substr = header_value.substr(param_value_pos, param_value_end - param_value_pos);
if (timeout_value_substr.empty())
return -1;
int value = 0;
auto [ptr, ec] = std::from_chars(timeout_value_substr.begin(), timeout_value_substr.end(), value);
if (ec == std::errc())
return value;
return -1;
}
int HTTPMessage::getKeepAliveTimeout() const
{
const std::string& ka_header = get(HTTPMessage::CONNECTION_KEEP_ALIVE, HTTPMessage::EMPTY);
static const std::string_view timeout_param = "timeout=";
return parseFromHeaderValues(ka_header, timeout_param);
}
int HTTPMessage::getKeepAliveMaxRequests() const
{
const std::string& ka_header = get(HTTPMessage::CONNECTION_KEEP_ALIVE, HTTPMessage::EMPTY);
static const std::string_view timeout_param = "max=";
return parseFromHeaderValues(ka_header, timeout_param);
}
} } // namespace Poco::Net

View File

@ -88,7 +88,18 @@ void HTTPServerConnection::run()
pHandler->handleRequest(request, response);
session.setKeepAlive(_pParams->getKeepAlive() && response.getKeepAlive() && session.canKeepAlive());
}
/// all that fuzz is all about to make session close with less timeout than 15s (set in HTTPServerParams c-tor)
if (_pParams->getKeepAlive() && response.getKeepAlive() && session.canKeepAlive())
{
int value = response.getKeepAliveTimeout();
if (value < 0)
value = request.getKeepAliveTimeout();
if (value > 0)
session.setKeepAliveTimeout(Poco::Timespan(value, 0));
}
}
else sendErrorResponse(session, HTTPResponse::HTTP_NOT_IMPLEMENTED);
}
catch (Poco::Exception&)

View File

@ -33,6 +33,12 @@ HTTPServerSession::~HTTPServerSession()
{
}
void HTTPServerSession::setKeepAliveTimeout(Poco::Timespan keepAliveTimeout)
{
_keepAliveTimeout = keepAliveTimeout;
}
bool HTTPServerSession::hasMoreRequests()
{

View File

@ -43,6 +43,8 @@ source /utils.lib
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
echo "Azure is disabled"
elif [[ -n "$USE_SHARED_CATALOG" ]] && [[ "$USE_SHARED_CATALOG" -eq 1 ]]; then
echo "Azure is disabled"
else
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --debug /azurite_log &
fi
@ -139,6 +141,32 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
MAX_RUN_TIME=$((MAX_RUN_TIME != 0 ? MAX_RUN_TIME : 9000)) # set to 2.5 hours if 0 (unlimited)
fi
if [[ -n "$USE_SHARED_CATALOG" ]] && [[ "$USE_SHARED_CATALOG" -eq 1 ]]; then
sudo cat /etc/clickhouse-server1/config.d/filesystem_caches_path.xml \
| sed "s|<filesystem_caches_path>/var/lib/clickhouse/filesystem_caches/</filesystem_caches_path>|<filesystem_caches_path>/var/lib/clickhouse/filesystem_caches_1/</filesystem_caches_path>|" \
> /etc/clickhouse-server1/config.d/filesystem_caches_path.xml.tmp
mv /etc/clickhouse-server1/config.d/filesystem_caches_path.xml.tmp /etc/clickhouse-server1/config.d/filesystem_caches_path.xml
sudo cat /etc/clickhouse-server1/config.d/filesystem_caches_path.xml \
| sed "s|<custom_cached_disks_base_directory replace=\"replace\">/var/lib/clickhouse/filesystem_caches/</custom_cached_disks_base_directory>|<custom_cached_disks_base_directory replace=\"replace\">/var/lib/clickhouse/filesystem_caches_1/</custom_cached_disks_base_directory>|" \
> /etc/clickhouse-server1/config.d/filesystem_caches_path.xml.tmp
mv /etc/clickhouse-server1/config.d/filesystem_caches_path.xml.tmp /etc/clickhouse-server1/config.d/filesystem_caches_path.xml
mkdir -p /var/run/clickhouse-server1
sudo chown clickhouse:clickhouse /var/run/clickhouse-server1
sudo -E -u clickhouse /usr/bin/clickhouse server --config /etc/clickhouse-server1/config.xml --daemon \
--pid-file /var/run/clickhouse-server1/clickhouse-server.pid \
-- --path /var/lib/clickhouse1/ --logger.stderr /var/log/clickhouse-server/stderr1.log \
--logger.log /var/log/clickhouse-server/clickhouse-server1.log --logger.errorlog /var/log/clickhouse-server/clickhouse-server1.err.log \
--tcp_port 19000 --tcp_port_secure 19440 --http_port 18123 --https_port 18443 --interserver_http_port 19009 --tcp_with_proxy_port 19010 \
--mysql_port 19004 --postgresql_port 19005 \
--keeper_server.tcp_port 19181 --keeper_server.server_id 2 \
--prometheus.port 19988 \
--macros.replica r2 # It doesn't work :(
MAX_RUN_TIME=$((MAX_RUN_TIME < 9000 ? MAX_RUN_TIME : 9000)) # min(MAX_RUN_TIME, 2.5 hours)
MAX_RUN_TIME=$((MAX_RUN_TIME != 0 ? MAX_RUN_TIME : 9000)) # set to 2.5 hours if 0 (unlimited)
fi
# Wait for the server to start, but not for too long.
for _ in {1..100}
@ -185,6 +213,10 @@ function run_tests()
ADDITIONAL_OPTIONS+=('--s3-storage')
fi
if [[ -n "$USE_SHARED_CATALOG" ]] && [[ "$USE_SHARED_CATALOG" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--shared-catalog')
fi
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--replicated-database')
# Too many tests fail for DatabaseReplicated in parallel.
@ -266,6 +298,12 @@ do
echo "$err"
[[ "0" != "${#err}" ]] && failed_to_save_logs=1
fi
if [[ -n "$USE_SHARED_CATALOG" ]] && [[ "$USE_SHARED_CATALOG" -eq 1 ]]; then
err=$( { clickhouse-client --port 19000 -q "select * from system.$table format TSVWithNamesAndTypes" | zstd --threads=0 > /test_output/$table.1.tsv.zst; } 2>&1 )
echo "$err"
[[ "0" != "${#err}" ]] && failed_to_save_logs=1
fi
done
# Stop server so we can safely read data with clickhouse-local.
@ -277,6 +315,10 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
sudo clickhouse stop --pid-path /var/run/clickhouse-server2 ||:
fi
if [[ -n "$USE_SHARED_CATALOG" ]] && [[ "$USE_SHARED_CATALOG" -eq 1 ]]; then
sudo clickhouse stop --pid-path /var/run/clickhouse-server1 ||:
fi
rg -Fa "<Fatal>" /var/log/clickhouse-server/clickhouse-server.log ||:
rg -A50 -Fa "============" /var/log/clickhouse-server/stderr.log ||:
zstd --threads=0 < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.zst &
@ -304,6 +346,10 @@ if [ $failed_to_save_logs -ne 0 ]; then
clickhouse-local --path /var/lib/clickhouse1/ --only-system-tables --stacktrace -q "select * from system.$table format TSVWithNamesAndTypes" | zstd --threads=0 > /test_output/$table.1.tsv.zst ||:
clickhouse-local --path /var/lib/clickhouse2/ --only-system-tables --stacktrace -q "select * from system.$table format TSVWithNamesAndTypes" | zstd --threads=0 > /test_output/$table.2.tsv.zst ||:
fi
if [[ -n "$USE_SHARED_CATALOG" ]] && [[ "$USE_SHARED_CATALOG" -eq 1 ]]; then
clickhouse-local --path /var/lib/clickhouse1/ --only-system-tables --stacktrace -q "select * from system.$table format TSVWithNamesAndTypes" | zstd --threads=0 > /test_output/$table.1.tsv.zst ||:
fi
done
fi
@ -343,3 +389,10 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
tar -chf /test_output/coordination1.tar /var/lib/clickhouse1/coordination ||:
tar -chf /test_output/coordination2.tar /var/lib/clickhouse2/coordination ||:
fi
if [[ -n "$USE_SHARED_CATALOG" ]] && [[ "$USE_SHARED_CATALOG" -eq 1 ]]; then
rg -Fa "<Fatal>" /var/log/clickhouse-server/clickhouse-server1.log ||:
zstd --threads=0 < /var/log/clickhouse-server/clickhouse-server1.log > /test_output/clickhouse-server1.log.zst ||:
mv /var/log/clickhouse-server/stderr1.log /test_output/ ||:
tar -chf /test_output/coordination1.tar /var/lib/clickhouse1/coordination ||:
fi

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/aggregatefunction
sidebar_position: 53
sidebar_position: 46
sidebar_label: AggregateFunction
---

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/array
sidebar_position: 52
sidebar_position: 32
sidebar_label: Array(T)
---

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/boolean
sidebar_position: 43
sidebar_position: 22
sidebar_label: Boolean
---

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/date
sidebar_position: 47
sidebar_position: 12
sidebar_label: Date
---

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/date32
sidebar_position: 48
sidebar_position: 14
sidebar_label: Date32
---

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/datetime
sidebar_position: 48
sidebar_position: 16
sidebar_label: DateTime
---

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/datetime64
sidebar_position: 49
sidebar_position: 18
sidebar_label: DateTime64
---

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/decimal
sidebar_position: 42
sidebar_position: 6
sidebar_label: Decimal
---

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/enum
sidebar_position: 50
sidebar_position: 20
sidebar_label: Enum
---

View File

@ -1,10 +1,10 @@
---
slug: /en/sql-reference/data-types/fixedstring
sidebar_position: 45
sidebar_position: 10
sidebar_label: FixedString(N)
---
# FixedString
# FixedString(N)
A fixed-length string of `N` bytes (neither characters nor code points).

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/float
sidebar_position: 41
sidebar_position: 4
sidebar_label: Float32, Float64
---

View File

@ -1,8 +1,8 @@
---
slug: /en/sql-reference/data-types/geo
sidebar_position: 62
sidebar_position: 54
sidebar_label: Geo
title: "Geo Data Types"
title: "Geometric"
---
ClickHouse supports data types for representing geographical objects — locations, lands, etc.

View File

@ -1,10 +1,10 @@
---
slug: /en/sql-reference/data-types/
sidebar_label: List of data types
sidebar_position: 37
sidebar_position: 1
---
# ClickHouse Data Types
# Data Types in ClickHouse
ClickHouse can store various kinds of data in table cells. This section describes the supported data types and special considerations for using and/or implementing them if any.

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/int-uint
sidebar_position: 40
sidebar_position: 2
sidebar_label: UInt8, UInt16, UInt32, UInt64, UInt128, UInt256, Int8, Int16, Int32, Int64, Int128, Int256
---

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/ipv4
sidebar_position: 59
sidebar_position: 28
sidebar_label: IPv4
---

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/ipv6
sidebar_position: 60
sidebar_position: 30
sidebar_label: IPv6
---

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/json
sidebar_position: 54
sidebar_position: 26
sidebar_label: JSON
---

View File

@ -1,10 +1,10 @@
---
slug: /en/sql-reference/data-types/lowcardinality
sidebar_position: 51
sidebar_label: LowCardinality
sidebar_position: 42
sidebar_label: LowCardinality(T)
---
# LowCardinality
# LowCardinality(T)
Changes the internal representation of other data types to be dictionary-encoded.

View File

@ -1,12 +1,12 @@
---
slug: /en/sql-reference/data-types/map
sidebar_position: 65
sidebar_label: Map(key, value)
sidebar_position: 36
sidebar_label: Map(K, V)
---
# Map(key, value)
# Map(K, V)
`Map(key, value)` data type stores `key:value` pairs.
`Map(K, V)` data type stores `key:value` pairs.
**Parameters**

View File

@ -1,27 +0,0 @@
---
slug: /en/sql-reference/data-types/multiword-types
sidebar_position: 61
sidebar_label: Multiword Type Names
title: "Multiword Types"
---
When creating tables, you can use data types with a name consisting of several words. This is implemented for better SQL compatibility.
## Multiword Types Support
| Multiword types | Simple types |
|----------------------------------|--------------------------------------------------------------|
| DOUBLE PRECISION | [Float64](../../sql-reference/data-types/float.md) |
| CHAR LARGE OBJECT | [String](../../sql-reference/data-types/string.md) |
| CHAR VARYING | [String](../../sql-reference/data-types/string.md) |
| CHARACTER LARGE OBJECT | [String](../../sql-reference/data-types/string.md) |
| CHARACTER VARYING | [String](../../sql-reference/data-types/string.md) |
| NCHAR LARGE OBJECT | [String](../../sql-reference/data-types/string.md) |
| NCHAR VARYING | [String](../../sql-reference/data-types/string.md) |
| NATIONAL CHARACTER LARGE OBJECT | [String](../../sql-reference/data-types/string.md) |
| NATIONAL CHARACTER VARYING | [String](../../sql-reference/data-types/string.md) |
| NATIONAL CHAR VARYING | [String](../../sql-reference/data-types/string.md) |
| NATIONAL CHARACTER | [String](../../sql-reference/data-types/string.md) |
| NATIONAL CHAR | [String](../../sql-reference/data-types/string.md) |
| BINARY LARGE OBJECT | [String](../../sql-reference/data-types/string.md) |
| BINARY VARYING | [String](../../sql-reference/data-types/string.md) |

View File

@ -1,7 +1,7 @@
---
slug: /en/sql-reference/data-types/nullable
sidebar_position: 55
sidebar_label: Nullable
sidebar_position: 44
sidebar_label: Nullable(T)
---
# Nullable(T)

View File

@ -1,5 +1,7 @@
---
slug: /en/sql-reference/data-types/simpleaggregatefunction
sidebar_position: 48
sidebar_label: SimpleAggregateFunction
---
# SimpleAggregateFunction

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/string
sidebar_position: 44
sidebar_position: 8
sidebar_label: String
---
@ -13,7 +13,7 @@ When creating tables, numeric parameters for string fields can be set (e.g. `VAR
Aliases:
- `String``LONGTEXT`, `MEDIUMTEXT`, `TINYTEXT`, `TEXT`, `LONGBLOB`, `MEDIUMBLOB`, `TINYBLOB`, `BLOB`, `VARCHAR`, `CHAR`.
- `String``LONGTEXT`, `MEDIUMTEXT`, `TINYTEXT`, `TEXT`, `LONGBLOB`, `MEDIUMBLOB`, `TINYBLOB`, `BLOB`, `VARCHAR`, `CHAR`, `CHAR LARGE OBJECT`, `CHAR VARYING`, `CHARACTER LARGE OBJECT`, `CHARACTER VARYING`, `NCHAR LARGE OBJECT`, `NCHAR VARYING`, `NATIONAL CHARACTER LARGE OBJECT`, `NATIONAL CHARACTER VARYING`, `NATIONAL CHAR VARYING`, `NATIONAL CHARACTER`, `NATIONAL CHAR`, `BINARY LARGE OBJECT`, `BINARY VARYING`,
## Encodings

View File

@ -1,10 +1,10 @@
---
slug: /en/sql-reference/data-types/tuple
sidebar_position: 54
sidebar_position: 34
sidebar_label: Tuple(T1, T2, ...)
---
# Tuple(T1, T2, )
# Tuple(T1, T2, ...)
A tuple of elements, each having an individual [type](../../sql-reference/data-types/index.md#data_types). Tuple must contain at least one element.

View File

@ -1,6 +1,6 @@
---
slug: /en/sql-reference/data-types/uuid
sidebar_position: 46
sidebar_position: 24
sidebar_label: UUID
---

View File

@ -1,10 +1,10 @@
---
slug: /en/sql-reference/data-types/variant
sidebar_position: 55
sidebar_label: Variant
sidebar_position: 40
sidebar_label: Variant(T1, T2, ...)
---
# Variant(T1, T2, T3, ...)
# Variant(T1, T2, ...)
This type represents a union of other data types. Type `Variant(T1, T2, ..., TN)` means that each row of this type
has a value of either type `T1` or `T2` or ... or `TN` or none of them (`NULL` value).

View File

@ -20,11 +20,10 @@ DROP DATABASE [IF EXISTS] db [ON CLUSTER cluster] [SYNC]
## DROP TABLE
Deletes the table.
In case when `IF EMPTY` clause is specified server will check if table is empty only on replica that received initial query.
Deletes one or more tables.
:::tip
Also see [UNDROP TABLE](/docs/en/sql-reference/statements/undrop.md)
To undo the deletion of a table, please see [UNDROP TABLE](/docs/en/sql-reference/statements/undrop.md)
:::
Syntax:
@ -33,7 +32,9 @@ Syntax:
DROP [TEMPORARY] TABLE [IF EXISTS] [IF EMPTY] [db1.]name_1[, [db2.]name_2, ...] [ON CLUSTER cluster] [SYNC]
```
Note that deleting multiple tables at the same time is a non-atomic deletion. If a table fails to be deleted, subsequent tables will not be deleted.
Limitations:
- If the clause `IF EMPTY` is specified, the server checks the emptiness of the table only on the replica which received the query.
- Deleting multiple tables at once is not an atomic operation, i.e. if the deletion of a table fails, subsequent tables will not be deleted.
## DROP DICTIONARY

View File

@ -1,28 +0,0 @@
---
slug: /ru/sql-reference/data-types/multiword-types
sidebar_position: 61
sidebar_label: Составные типы
---
# Составные типы {#multiword-types}
При создании таблиц вы можете использовать типы данных с названием, состоящим из нескольких слов. Такие названия поддерживаются для лучшей совместимости с SQL.
## Поддержка составных типов {#multiword-types-support}
| Составные типы | Обычные типы |
|-------------------------------------|-----------------------------------------------------------|
| DOUBLE PRECISION | [Float64](../../sql-reference/data-types/float.md) |
| CHAR LARGE OBJECT | [String](../../sql-reference/data-types/string.md) |
| CHAR VARYING | [String](../../sql-reference/data-types/string.md) |
| CHARACTER LARGE OBJECT | [String](../../sql-reference/data-types/string.md) |
| CHARACTER VARYING | [String](../../sql-reference/data-types/string.md) |
| NCHAR LARGE OBJECT | [String](../../sql-reference/data-types/string.md) |
| NCHAR VARYING | [String](../../sql-reference/data-types/string.md) |
| NATIONAL CHARACTER LARGE OBJECT | [String](../../sql-reference/data-types/string.md) |
| NATIONAL CHARACTER VARYING | [String](../../sql-reference/data-types/string.md) |
| NATIONAL CHAR VARYING | [String](../../sql-reference/data-types/string.md) |
| NATIONAL CHARACTER | [String](../../sql-reference/data-types/string.md) |
| NATIONAL CHAR | [String](../../sql-reference/data-types/string.md) |
| BINARY LARGE OBJECT | [String](../../sql-reference/data-types/string.md) |
| BINARY VARYING | [String](../../sql-reference/data-types/string.md) |

View File

@ -1,10 +0,0 @@
---
slug: /zh/sql-reference/data-types/multiword-types
sidebar_position: 61
sidebar_label: Multiword Type Names
title: "Multiword Types"
---
import Content from '@site/docs/en/sql-reference/data-types/multiword-types.md';
<Content />

View File

@ -3,7 +3,7 @@
function _clickhouse_get_utils()
{
local cmd=$1 && shift
"$cmd" --help |& awk '/^clickhouse.*args/ { print $2 }'
"$cmd" help |& awk '/^clickhouse.*args/ { print $2 }'
}
function _complete_for_clickhouse_entrypoint_bin()

View File

@ -166,7 +166,7 @@ int DisksApp::main(const std::vector<String> & /*args*/)
{
String config_path = config().getString("config-file", getDefaultConfigFileName());
ConfigProcessor config_processor(config_path, false, false);
config_processor.setConfigPath(fs::path(config_path).parent_path());
ConfigProcessor::setConfigPath(fs::path(config_path).parent_path());
auto loaded_config = config_processor.loadConfig();
config().add(loaded_config.configuration.duplicate(), false, false);
}

View File

@ -368,7 +368,7 @@ int KeeperClient::main(const std::vector<String> & /* args */)
DB::ConfigProcessor config_processor(config().getString("config-file", "config.xml"));
/// This will handle a situation when clickhouse is running on the embedded config, but config.d folder is also present.
config_processor.registerEmbeddedConfig("config.xml", "<clickhouse/>");
ConfigProcessor::registerEmbeddedConfig("config.xml", "<clickhouse/>");
auto clickhouse_config = config_processor.loadConfig();
Poco::Util::AbstractConfiguration::Keys keys;

View File

@ -122,7 +122,7 @@ void LocalServer::initialize(Poco::Util::Application & self)
{
const auto config_path = config().getString("config-file", "config.xml");
ConfigProcessor config_processor(config_path, false, true);
config_processor.setConfigPath(fs::path(config_path).parent_path());
ConfigProcessor::setConfigPath(fs::path(config_path).parent_path());
auto loaded_config = config_processor.loadConfig();
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
}

View File

@ -487,7 +487,7 @@ int main(int argc_, char ** argv_)
/// Interpret binary without argument or with arguments starts with dash
/// ('-') as clickhouse-local for better usability:
///
/// clickhouse # dumps help
/// clickhouse help # dumps help
/// clickhouse -q 'select 1' # use local
/// clickhouse # spawn local
/// clickhouse local # spawn local

View File

@ -54,30 +54,30 @@ public:
{
const auto & value = columns[0]->getFloat64(row_num);
const auto & time = columns[1]->getFloat64(row_num);
this->data(place).add(value, time, half_decay);
data(place).add(value, time, half_decay);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs), half_decay);
data(place).merge(data(rhs), half_decay);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
writeBinary(this->data(place).value, buf);
writeBinary(this->data(place).time, buf);
writeBinary(data(place).value, buf);
writeBinary(data(place).time, buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
{
readBinary(this->data(place).value, buf);
readBinary(this->data(place).time, buf);
readBinary(data(place).value, buf);
readBinary(data(place).time, buf);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
auto & column = assert_cast<ColumnVector<Float64> &>(to);
column.getData().push_back(this->data(place).get(half_decay));
column.getData().push_back(data(place).get(half_decay));
}
};

View File

@ -293,32 +293,32 @@ public:
Float64 value = columns[0]->getFloat64(row_num);
UInt8 is_second = columns[1]->getUInt(row_num);
if (is_second)
this->data(place).addY(value, arena);
data(place).addY(value, arena);
else
this->data(place).addX(value, arena);
data(place).addX(value, arena);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs), arena);
data(place).merge(data(rhs), arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
this->data(place).write(buf);
data(place).write(buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
{
this->data(place).read(buf, arena);
data(place).read(buf, arena);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
if (!this->data(place).size_x || !this->data(place).size_y)
if (!data(place).size_x || !data(place).size_y)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Aggregate function {} require both samples to be non empty", getName());
auto [d_statistic, p_value] = this->data(place).getResult(alternative, method);
auto [d_statistic, p_value] = data(place).getResult(alternative, method);
/// Because p-value is a probability.
p_value = std::min(1.0, std::max(0.0, p_value));

View File

@ -147,6 +147,8 @@ public:
negative_store->merge(other.negative_store.get());
}
/// NOLINTBEGIN(readability-static-accessed-through-instance)
void serialize(WriteBuffer& buf) const
{
// Write the mapping
@ -201,6 +203,8 @@ public:
count = static_cast<Float64>(negative_store->count + zero_count + store->count);
}
/// NOLINTEND(readability-static-accessed-through-instance)
private:
std::unique_ptr<DDSketchLogarithmicMapping> mapping;
std::unique_ptr<DDSketchDenseStore> store;

View File

@ -87,6 +87,8 @@ public:
count += other->count;
}
/// NOLINTBEGIN(readability-static-accessed-through-instance)
void serialize(WriteBuffer& buf) const
{
@ -179,6 +181,8 @@ public:
}
}
/// NOLINTEND(readability-static-accessed-through-instance)
private:
UInt32 chunk_size;
DDSketchEncoding enc;

View File

@ -5799,7 +5799,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
return result_projection_names;
}
FunctionOverloadResolverPtr function = UserDefinedExecutableFunctionFactory::instance().tryGet(function_name, scope.context, parameters);
FunctionOverloadResolverPtr function = UserDefinedExecutableFunctionFactory::instance().tryGet(function_name, scope.context, parameters); /// NOLINT(readability-static-accessed-through-instance)
bool is_executable_udf = true;
IdentifierResolveScope::ResolvedFunctionsCache * function_cache = nullptr;
@ -5829,7 +5829,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
{
std::vector<std::string> possible_function_names;
auto function_names = UserDefinedExecutableFunctionFactory::instance().getRegisteredNames(scope.context);
auto function_names = UserDefinedExecutableFunctionFactory::instance().getRegisteredNames(scope.context); /// NOLINT(readability-static-accessed-through-instance)
possible_function_names.insert(possible_function_names.end(), function_names.begin(), function_names.end());
function_names = UserDefinedSQLFunctionFactory::instance().getAllRegisteredNames();
@ -5847,8 +5847,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
possible_function_names.push_back(name);
}
NamePrompter<2> name_prompter;
auto hints = name_prompter.getHints(function_name, possible_function_names);
auto hints = NamePrompter<2>::getHints(function_name, possible_function_names);
throw Exception(ErrorCodes::UNKNOWN_FUNCTION,
"Function with name '{}' does not exists. In scope {}{}",

View File

@ -141,7 +141,7 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
async_callback(socket->impl()->sockfd(), connection_timeout, AsyncEventTimeoutType::CONNECT, description, AsyncTaskExecutor::READ | AsyncTaskExecutor::WRITE | AsyncTaskExecutor::ERROR);
if (auto err = socket->impl()->socketError())
socket->impl()->error(err); // Throws an exception
socket->impl()->error(err); // Throws an exception /// NOLINT(readability-static-accessed-through-instance)
socket->setBlocking(true);
}

View File

@ -940,7 +940,7 @@ void ColumnObject::addNestedSubcolumn(const PathInData & key, const FieldInfo &
if (nested_node)
{
/// Find any leaf of Nested subcolumn.
const auto * leaf = subcolumns.findLeaf(nested_node, [&](const auto &) { return true; });
const auto * leaf = Subcolumns::findLeaf(nested_node, [&](const auto &) { return true; });
assert(leaf);
/// Recreate subcolumn with default values and the same sizes of arrays.
@ -983,7 +983,7 @@ const ColumnObject::Subcolumns::Node * ColumnObject::getLeafOfTheSameNested(cons
while (current_node)
{
/// Try to find the first Nested up to the current node.
const auto * node_nested = subcolumns.findParent(current_node,
const auto * node_nested = Subcolumns::findParent(current_node,
[](const auto & candidate) { return candidate.isNested(); });
if (!node_nested)
@ -993,7 +993,7 @@ const ColumnObject::Subcolumns::Node * ColumnObject::getLeafOfTheSameNested(cons
/// for the last rows.
/// If there are no leaves, skip current node and find
/// the next node up to the current.
leaf = subcolumns.findLeaf(node_nested,
leaf = Subcolumns::findLeaf(node_nested,
[&](const auto & candidate)
{
return candidate.data.size() > old_size;

View File

@ -16,6 +16,7 @@
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/HTTPStream.h>
#include <Poco/Net/NetException.h>
#include <Poco/Timespan.h>
#include <queue>
@ -83,17 +84,15 @@ namespace
}
size_t roundUp(size_t x, size_t rounding)
constexpr size_t roundUp(size_t x, size_t rounding)
{
chassert(rounding > 0);
return (x + (rounding - 1)) / rounding * rounding;
}
Poco::Timespan divide(const Poco::Timespan span, int divisor)
{
return Poco::Timespan(Poco::Timestamp::TimeDiff(span.totalMicroseconds() / divisor));
return (x + rounding) / rounding * rounding;
}
static_assert(roundUp(10000, 100) == 10100);
static_assert(roundUp(10001, 100) == 10100);
static_assert(roundUp(10099, 100) == 10100);
static_assert(roundUp(10100, 100) == 10200);
}
namespace DB
@ -202,8 +201,9 @@ public:
if (total_connections_in_group >= limits.warning_limit && total_connections_in_group >= mute_warning_until)
{
LOG_WARNING(log, "Too many active sessions in group {}, count {}, warning limit {}", type, total_connections_in_group, limits.warning_limit);
mute_warning_until = roundUp(total_connections_in_group, HTTPConnectionPools::Limits::warning_step);
LOG_WARNING(log, "Too many active sessions in group {}, count {}, warning limit {}, next warning at {}",
type, total_connections_in_group, limits.warning_limit, mute_warning_until);
}
}
@ -213,7 +213,8 @@ public:
--total_connections_in_group;
const size_t reduced_warning_limit = limits.warning_limit > 10 ? limits.warning_limit - 10 : 1;
const size_t gap = 20;
const size_t reduced_warning_limit = limits.warning_limit > gap ? limits.warning_limit - gap : 1;
if (mute_warning_until > 0 && total_connections_in_group < reduced_warning_limit)
{
LOG_WARNING(log, "Sessions count is OK in the group {}, count {}", type, total_connections_in_group);
@ -273,9 +274,15 @@ private:
public:
using Ptr = std::shared_ptr<PooledConnection>;
using Session::mustReconnect;
void markAsExpired()
{
isExpired = true;
}
void reconnect() override
{
ProfileEvents::increment(metrics.reset);
Session::close();
if (auto lock = pool.lock())
@ -283,6 +290,7 @@ private:
auto timeouts = getTimeouts(*this);
auto new_connection = lock->getConnection(timeouts);
Session::assign(*new_connection);
Session::setKeepAliveRequest(Session::getKeepAliveRequest() + 1);
}
else
{
@ -304,6 +312,12 @@ private:
Session::getPort());
}
Poco::Timespan idleTime()
{
Poco::Timestamp now;
return now - Session::getLastRequest();
}
void flushRequest() override
{
if (bool(request_stream))
@ -335,6 +349,7 @@ private:
std::ostream & sendRequest(Poco::Net::HTTPRequest & request) override
{
auto idle = idleTime();
std::ostream & result = Session::sendRequest(request);
result.exceptions(std::ios::badbit);
@ -392,10 +407,11 @@ private:
}
response_stream = nullptr;
if (auto lock = pool.lock())
lock->atConnectionDestroy(*this);
else
ProfileEvents::increment(metrics.reset);
group->atConnectionDestroy();
if (!isExpired)
if (auto lock = pool.lock())
lock->atConnectionDestroy(*this);
CurrentMetrics::sub(metrics.active_count);
}
@ -404,10 +420,18 @@ private:
friend class EndpointConnectionPool;
template <class... Args>
explicit PooledConnection(EndpointConnectionPool::WeakPtr pool_, IHTTPConnectionPoolForEndpoint::Metrics metrics_, Args &&... args)
: Session(args...), pool(std::move(pool_)), metrics(std::move(metrics_))
explicit PooledConnection(
EndpointConnectionPool::WeakPtr pool_,
ConnectionGroup::Ptr group_,
IHTTPConnectionPoolForEndpoint::Metrics metrics_,
Args &&... args)
: Session(std::forward<Args>(args)...)
, pool(std::move(pool_))
, group(group_)
, metrics(std::move(metrics_))
{
CurrentMetrics::add(metrics.active_count);
group->atConnectionCreate();
}
template <class... Args>
@ -433,10 +457,12 @@ private:
return request_stream_completed && response_stream_completed;
}
WeakPtr pool;
EndpointConnectionPool::WeakPtr pool;
ConnectionGroup::Ptr group;
IHTTPConnectionPoolForEndpoint::Metrics metrics;
bool isExpired = false;
Poco::Logger * log = &Poco::Logger::get("PooledConnection");
LoggerPtr log = getLogger("PooledConnection");
std::ostream * request_stream = nullptr;
std::istream * response_stream = nullptr;
@ -484,7 +510,6 @@ public:
IHTTPConnectionPoolForEndpoint::ConnectionPtr getConnection(const ConnectionTimeouts & timeouts) override
{
Poco::Timestamp now;
std::vector<ConnectionPtr> expired_connections;
SCOPE_EXIT({
@ -494,8 +519,9 @@ public:
{
std::lock_guard lock(mutex);
expired_connections.reserve(stored_connections.size());
wipeExpiredImpl(expired_connections, now);
wipeExpiredImpl(expired_connections);
if (!stored_connections.empty())
{
@ -526,7 +552,6 @@ public:
size_t wipeExpired() override
{
Poco::Timestamp now;
std::vector<ConnectionPtr> expired_connections;
SCOPE_EXIT({
@ -535,25 +560,29 @@ public:
});
std::lock_guard lock(mutex);
return wipeExpiredImpl(expired_connections, now);
return wipeExpiredImpl(expired_connections);
}
size_t wipeExpiredImpl(std::vector<ConnectionPtr> & expired_connections, Poco::Timestamp now) TSA_REQUIRES(mutex)
size_t wipeExpiredImpl(std::vector<ConnectionPtr> & expired_connections) TSA_REQUIRES(mutex)
{
SCOPE_EXIT({
CurrentMetrics::sub(getMetrics().stored_count, expired_connections.size());
ProfileEvents::increment(getMetrics().expired, expired_connections.size());
});
auto isSoftLimitReached = group->isSoftLimitReached();
while (!stored_connections.empty())
{
auto connection = stored_connections.top();
if (!isExpired(now, connection))
if (!isExpired(connection, isSoftLimitReached))
return stored_connections.size();
stored_connections.pop();
connection->markAsExpired();
expired_connections.push_back(connection);
}
CurrentMetrics::sub(getMetrics().stored_count, expired_connections.size());
ProfileEvents::increment(getMetrics().expired, expired_connections.size());
return stored_connections.size();
}
@ -569,57 +598,53 @@ private:
WeakPtr getWeakFromThis() { return EndpointConnectionPool::weak_from_this(); }
bool isExpired(Poco::Timestamp & now, ConnectionPtr connection)
bool isExpired(ConnectionPtr connection, bool isSoftLimitReached) TSA_REQUIRES(mutex)
{
if (group->isSoftLimitReached())
return now > (connection->getLastRequest() + divide(connection->getKeepAliveTimeout(), 10));
return now > connection->getLastRequest() + connection->getKeepAliveTimeout();
if (isSoftLimitReached)
return connection->isKeepAliveExpired(0.1);
return connection->isKeepAliveExpired(0.8);
}
ConnectionPtr allocateNewConnection()
ConnectionPtr prepareNewConnection(const ConnectionTimeouts & timeouts)
{
ConnectionPtr connection = PooledConnection::create(this->getWeakFromThis(), getMetrics(), host, port);
auto connection = PooledConnection::create(this->getWeakFromThis(), group, getMetrics(), host, port);
connection->setKeepAlive(true);
setTimeouts(*connection, timeouts);
if (!proxy_configuration.isEmpty())
{
connection->setProxyConfig(proxyConfigurationToPocoProxyConfig(proxy_configuration));
}
group->atConnectionCreate();
return connection;
}
ConnectionPtr prepareNewConnection(const ConnectionTimeouts & timeouts)
{
auto address = HostResolversPool::instance().getResolver(host)->resolve();
auto session = allocateNewConnection();
setTimeouts(*session, timeouts);
session->setResolvedHost(*address);
connection->setResolvedHost(*address);
try
{
auto timer = CurrentThread::getProfileEvents().timer(getMetrics().elapsed_microseconds);
session->doConnect();
connection->doConnect();
}
catch (...)
{
address.setFail();
ProfileEvents::increment(getMetrics().errors);
session->reset();
connection->reset();
throw;
}
ProfileEvents::increment(getMetrics().created);
return session;
return connection;
}
void atConnectionDestroy(PooledConnection & connection)
{
group->atConnectionDestroy();
if (connection.getKeepAliveRequest() >= connection.getKeepAliveMaxRequests())
{
ProfileEvents::increment(getMetrics().expired, 1);
return;
}
if (!connection.connected() || connection.mustReconnect() || !connection.isCompleted() || connection.buffered()
|| group->isStoreLimitReached())
@ -628,17 +653,17 @@ private:
return;
}
auto connection_to_store = allocateNewConnection();
auto connection_to_store = PooledConnection::create(this->getWeakFromThis(), group, getMetrics(), host, port);
connection_to_store->assign(connection);
CurrentMetrics::add(getMetrics().stored_count, 1);
ProfileEvents::increment(getMetrics().preserved, 1);
{
MemoryTrackerSwitcher switcher{&total_memory_tracker};
std::lock_guard lock(mutex);
stored_connections.push(connection_to_store);
}
CurrentMetrics::add(getMetrics().stored_count, 1);
ProfileEvents::increment(getMetrics().preserved, 1);
}
@ -726,14 +751,13 @@ createConnectionPool(ConnectionGroup::Ptr group, std::string host, UInt16 port,
class HTTPConnectionPools::Impl
{
private:
const size_t DEFAULT_WIPE_TIMEOUT_SECONDS = 5 * 60;
const size_t DEFAULT_WIPE_TIMEOUT_SECONDS = 10 * 60;
const Poco::Timespan wipe_timeout = Poco::Timespan(DEFAULT_WIPE_TIMEOUT_SECONDS, 0);
ConnectionGroup::Ptr disk_group = std::make_shared<ConnectionGroup>(HTTPConnectionGroupType::DISK);
ConnectionGroup::Ptr storage_group = std::make_shared<ConnectionGroup>(HTTPConnectionGroupType::STORAGE);
ConnectionGroup::Ptr http_group = std::make_shared<ConnectionGroup>(HTTPConnectionGroupType::HTTP);
/// If multiple mutexes are held simultaneously,
/// they should be locked in this order:
/// HTTPConnectionPools::mutex, then EndpointConnectionPool::mutex, then ConnectionGroup::mutex.

View File

@ -18,13 +18,10 @@
#include <filesystem>
#include <map>
#include <mutex>
#include <sstream>
#include <unordered_map>
#include <fmt/format.h>
#include <libunwind.h>
#include "config.h"
#include <boost/algorithm/string/split.hpp>
#if defined(OS_DARWIN)
@ -481,7 +478,17 @@ void StackTrace::toStringEveryLine(void ** frame_pointers_raw, size_t offset, si
toStringEveryLineImpl(true, {frame_pointers, offset, size}, std::move(callback));
}
using StackTraceCache = std::map<StackTraceTriple, String, std::less<>>;
struct CacheEntry
{
std::optional<String> stacktrace_string;
bool to_string_in_progress = false;
std::condition_variable cv;
};
using CacheEntryPtr = std::shared_ptr<CacheEntry>;
using StackTraceCache = std::map<StackTraceTriple, CacheEntryPtr, std::less<>>;
static StackTraceCache & cacheInstance()
{
@ -493,23 +500,63 @@ static std::mutex stacktrace_cache_mutex;
String toStringCached(const StackTrace::FramePointers & pointers, size_t offset, size_t size)
{
const StackTraceRefTriple key{pointers, offset, size};
/// Calculation of stack trace text is extremely slow.
/// We use simple cache because otherwise the server could be overloaded by trash queries.
/// Note that this cache can grow unconditionally, but practically it should be small.
std::lock_guard lock{stacktrace_cache_mutex};
std::unique_lock lock{stacktrace_cache_mutex};
CacheEntryPtr cache_entry;
StackTraceCache & cache = cacheInstance();
const StackTraceRefTriple key{pointers, offset, size};
if (auto it = cache.find(key); it != cache.end())
return it->second;
{
cache_entry = it->second;
}
else
{
auto [new_it, inserted] = cache.emplace(StackTraceTriple{pointers, offset, size}, std::make_shared<CacheEntry>());
chassert(inserted);
cache_entry = new_it->second;
}
if (!cache_entry->to_string_in_progress && cache_entry->stacktrace_string.has_value())
return *cache_entry->stacktrace_string;
if (cache_entry->to_string_in_progress)
{
cache_entry->cv.wait(lock, [&]{ return !cache_entry->to_string_in_progress; });
if (cache_entry->stacktrace_string.has_value())
return *cache_entry->stacktrace_string;
}
cache_entry->to_string_in_progress = true;
lock.unlock();
String stacktrace_string;
try
{
DB::WriteBufferFromOwnString out;
toStringEveryLineImpl(false, key, [&](std::string_view str) { out << str << '\n'; });
return cache.emplace(StackTraceTriple{pointers, offset, size}, out.str()).first->second;
stacktrace_string = out.str();
}
catch (...)
{
lock.lock();
cache_entry->to_string_in_progress = false;
lock.unlock();
cache_entry->cv.notify_one();
throw;
}
lock.lock();
cache_entry->to_string_in_progress = false;
cache_entry->stacktrace_string = stacktrace_string;
lock.unlock();
cache_entry->cv.notify_all();
return stacktrace_string;
}
std::string StackTrace::toString() const

View File

@ -2,7 +2,6 @@
#include <Common/HTTPConnectionPool.h>
#include <Poco/URI.h>
#include <Poco/Net/ServerSocket.h>
#include <Poco/Net/MessageHeader.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
@ -17,6 +16,40 @@
namespace
{
template<class T>
class SafeHandler
{
public:
using Ptr = std::shared_ptr<SafeHandler<T>>;
SafeHandler() = default;
SafeHandler(SafeHandler<T>&) = delete;
SafeHandler& operator=(SafeHandler<T>&) = delete;
T get()
{
std::lock_guard lock(mutex);
return obj;
}
void set(T && options_)
{
std::lock_guard lock(mutex);
obj = std::move(options_);
}
protected:
std::mutex mutex;
T obj = {};
};
struct RequestOptions
{
size_t slowdown_receive = 0;
int overwrite_keep_alive_timeout = 0;
int overwrite_keep_alive_max_requests = 10;
};
size_t stream_copy_n(std::istream & in, std::ostream & out, std::size_t count = std::numeric_limits<size_t>::max())
{
const size_t buffer_size = 4096;
@ -47,13 +80,21 @@ size_t stream_copy_n(std::istream & in, std::ostream & out, std::size_t count =
class MockRequestHandler : public Poco::Net::HTTPRequestHandler
{
public:
explicit MockRequestHandler(std::shared_ptr<std::atomic<size_t>> slowdown_)
: slowdown(std::move(slowdown_))
explicit MockRequestHandler(SafeHandler<RequestOptions>::Ptr options_)
: options(options_)
{
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override
{
int value = request.getKeepAliveTimeout();
ASSERT_GT(value, 0);
auto params = options->get();
if (params.overwrite_keep_alive_timeout > 0)
response.setKeepAliveTimeout(params.overwrite_keep_alive_timeout, params.overwrite_keep_alive_max_requests);
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
auto size = request.getContentLength();
if (size > 0)
@ -61,28 +102,29 @@ public:
else
response.setChunkedTransferEncoding(true); // or chunk encoding
sleepForSeconds(*slowdown);
if (params.slowdown_receive > 0)
sleepForSeconds(params.slowdown_receive);
stream_copy_n(request.stream(), response.send(), size);
}
std::shared_ptr<std::atomic<size_t>> slowdown;
SafeHandler<RequestOptions>::Ptr options;
};
class HTTPRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
{
public:
explicit HTTPRequestHandlerFactory(std::shared_ptr<std::atomic<size_t>> slowdown_)
: slowdown(std::move(slowdown_))
explicit HTTPRequestHandlerFactory(SafeHandler<RequestOptions>::Ptr options_)
: options(options_)
{
}
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest &) override
{
return new MockRequestHandler(slowdown);
return new MockRequestHandler(options);
}
std::shared_ptr<std::atomic<size_t>> slowdown;
SafeHandler<RequestOptions>::Ptr options;
};
}
@ -94,6 +136,8 @@ class ConnectionPoolTest : public testing::Test {
protected:
ConnectionPoolTest()
{
options = std::make_shared<SafeHandler<RequestOptions>>();
startServer();
}
@ -102,7 +146,7 @@ protected:
DB::HTTPConnectionPools::Limits def_limits{};
DB::HTTPConnectionPools::instance().setLimits(def_limits, def_limits, def_limits);
setSlowDown(0);
options->set(RequestOptions());
DB::HTTPConnectionPools::instance().dropCache();
DB::CurrentThread::getProfileEvents().reset();
@ -129,7 +173,7 @@ protected:
void startServer()
{
server_data.reset();
server_data.handler_factory = new HTTPRequestHandlerFactory(slowdown_receive);
server_data.handler_factory = new HTTPRequestHandlerFactory(options);
server_data.server = std::make_unique<Poco::Net::HTTPServer>(
server_data.handler_factory, server_data.port);
@ -143,11 +187,21 @@ protected:
void setSlowDown(size_t seconds)
{
*slowdown_receive = seconds;
auto opt = options->get();
opt.slowdown_receive = seconds;
options->set(std::move(opt));
}
void setOverWriteKeepAlive(size_t seconds, int max_requests)
{
auto opt = options->get();
opt.overwrite_keep_alive_timeout = int(seconds);
opt.overwrite_keep_alive_max_requests= max_requests;
options->set(std::move(opt));
}
DB::ConnectionTimeouts timeouts;
std::shared_ptr<std::atomic<size_t>> slowdown_receive = std::make_shared<std::atomic<size_t>>(0);
SafeHandler<RequestOptions>::Ptr options;
struct ServerData
{
@ -182,7 +236,7 @@ protected:
void wait_until(std::function<bool()> pred)
{
while (!pred())
sleepForMilliseconds(250);
sleepForMilliseconds(10);
}
void echoRequest(String data, HTTPSession & session)
@ -245,45 +299,52 @@ TEST_F(ConnectionPoolTest, CanRequest)
ASSERT_EQ(0, getServer().currentConnections());
ASSERT_EQ(1, getServer().totalConnections());
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
auto metrics = pool->getMetrics();
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, CanPreserve)
{
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
}
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
wait_until([&] () { return getServer().currentConnections() == 1; });
ASSERT_EQ(1, getServer().currentConnections());
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
}
TEST_F(ConnectionPoolTest, CanReuse)
{
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
// DB::setReuseTag(*connection);
}
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().stored_count));
{
auto connection = pool->getConnection(timeouts);
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
wait_until([&] () { return getServer().currentConnections() == 1; });
ASSERT_EQ(1, getServer().currentConnections());
@ -293,6 +354,11 @@ TEST_F(ConnectionPoolTest, CanReuse)
ASSERT_EQ(1, getServer().totalConnections());
ASSERT_EQ(1, getServer().currentConnections());
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
connection->reset();
}
@ -303,15 +369,16 @@ TEST_F(ConnectionPoolTest, CanReuse)
ASSERT_EQ(0, getServer().currentConnections());
ASSERT_EQ(1, getServer().totalConnections());
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
}
TEST_F(ConnectionPoolTest, CanReuse10)
{
auto pool = getPool();
auto metrics = pool->getMetrics();
for (int i = 0; i < 10; ++i)
{
@ -328,16 +395,23 @@ TEST_F(ConnectionPoolTest, CanReuse10)
ASSERT_EQ(0, getServer().currentConnections());
ASSERT_EQ(1, getServer().totalConnections());
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, CanReuse5)
{
timeouts.withHTTPKeepAliveTimeout(1);
auto ka = Poco::Timespan(1, 0); // 1 seconds
timeouts.withHTTPKeepAliveTimeout(ka);
auto pool = getPool();
auto metrics = pool->getMetrics();
std::vector<DB::HTTPSessionPtr> connections;
connections.reserve(5);
@ -347,11 +421,14 @@ TEST_F(ConnectionPoolTest, CanReuse5)
}
connections.clear();
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(5, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(5, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(5, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(5, CurrentMetrics::get(metrics.stored_count));
wait_until([&] () { return getServer().currentConnections() == 5; });
ASSERT_EQ(5, getServer().currentConnections());
@ -363,35 +440,56 @@ TEST_F(ConnectionPoolTest, CanReuse5)
echoRequest("Hello", *connection);
}
ASSERT_EQ(5, getServer().totalConnections());
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(5, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(5, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(5, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(5, CurrentMetrics::get(metrics.stored_count));
/// wait until all connections are timeouted
wait_until([&] () { return getServer().currentConnections() == 0; });
{
// just to trigger pool->wipeExpired();
auto connection = pool->getConnection(timeouts);
connection->reset();
}
ASSERT_EQ(6, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, CanReconnectAndCreate)
{
auto pool = getPool();
auto metrics = pool->getMetrics();
std::vector<HTTPSessionPtr> in_use;
const size_t count = 2;
const size_t count = 3;
for (int i = 0; i < count; ++i)
{
auto connection = pool->getConnection(timeouts);
// DB::setReuseTag(*connection);
in_use.push_back(connection);
}
ASSERT_EQ(count, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(count, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(count, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(count, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
auto connection = std::move(in_use.back());
in_use.pop_back();
@ -402,28 +500,39 @@ TEST_F(ConnectionPoolTest, CanReconnectAndCreate)
echoRequest("Hello", *connection);
connection->reset();
ASSERT_EQ(count+1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
wait_until([&] () { return getServer().currentConnections() == 1; });
ASSERT_EQ(1, getServer().currentConnections());
ASSERT_EQ(count+1, getServer().totalConnections());
ASSERT_EQ(count+1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(count, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, CanReconnectAndReuse)
{
auto ka = Poco::Timespan(1, 0); // 1 seconds
timeouts.withHTTPKeepAliveTimeout(ka);
auto pool = getPool();
auto metrics = pool->getMetrics();
std::vector<HTTPSessionPtr> in_use;
const size_t count = 2;
const size_t count = 3;
for (int i = 0; i < count; ++i)
{
auto connection = pool->getConnection(timeouts);
/// make some request in order to show to the server the keep alive headers
echoRequest("Hello", *connection);
in_use.push_back(std::move(connection));
}
in_use.clear();
for (int i = 0; i < count; ++i)
{
auto connection = pool->getConnection(timeouts);
// DB::setReuseTag(*connection);
in_use.push_back(std::move(connection));
}
@ -441,11 +550,16 @@ TEST_F(ConnectionPoolTest, CanReconnectAndReuse)
wait_until([&] () { return getServer().currentConnections() == 0; });
ASSERT_EQ(0, getServer().currentConnections());
ASSERT_EQ(2, getServer().totalConnections());
ASSERT_EQ(count, getServer().totalConnections());
ASSERT_EQ(count, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(count, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(count + count - 1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(count + 1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(count-1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(count-2, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, ReceiveTimeout)
@ -454,6 +568,7 @@ TEST_F(ConnectionPoolTest, ReceiveTimeout)
timeouts.withReceiveTimeout(1);
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
@ -462,10 +577,14 @@ TEST_F(ConnectionPoolTest, ReceiveTimeout)
);
}
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
{
timeouts.withReceiveTimeout(3);
@ -475,10 +594,14 @@ TEST_F(ConnectionPoolTest, ReceiveTimeout)
);
}
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
{
/// timeouts have effect for reused session
@ -489,10 +612,14 @@ TEST_F(ConnectionPoolTest, ReceiveTimeout)
);
}
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, ReadWriteBufferFromHTTP)
@ -500,6 +627,7 @@ TEST_F(ConnectionPoolTest, ReadWriteBufferFromHTTP)
std::string_view message = "Hello ReadWriteBufferFromHTTP";
auto uri = Poco::URI(getServerUrl());
auto metrics = DB::HTTPConnectionPools::instance().getPool(DB::HTTPConnectionGroupType::HTTP, uri, DB::ProxyConfiguration{})->getMetrics();
Poco::Net::HTTPBasicCredentials empty_creds;
auto buf_from_http = DB::BuilderRWBufferFromHTTP(uri)
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
@ -527,6 +655,7 @@ TEST_F(ConnectionPoolTest, ReadWriteBufferFromHTTP)
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
@ -538,23 +667,26 @@ TEST_F(ConnectionPoolTest, HardLimit)
DB::HTTPConnectionPools::instance().setLimits(zero_limits, zero_limits, zero_limits);
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
}
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, NoReceiveCall)
{
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
@ -570,11 +702,209 @@ TEST_F(ConnectionPoolTest, NoReceiveCall)
connection->flushRequest();
}
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, ReconnectedWhenConnectionIsHoldTooLong)
{
auto ka = Poco::Timespan(1, 0); // 1 seconds
timeouts.withHTTPKeepAliveTimeout(ka);
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
auto fake_ka = Poco::Timespan(30 * 1000 * 1000); // 30 seconds
timeouts.withHTTPKeepAliveTimeout(fake_ka);
DB::setTimeouts(*connection, timeouts); // new keep alive timeout has no effect
wait_until([&] () { return getServer().currentConnections() == 0; });
ASSERT_EQ(1, connection->connected());
ASSERT_EQ(1, connection->getKeepAlive());
ASSERT_EQ(1000, connection->getKeepAliveTimeout().totalMilliseconds());
ASSERT_EQ(1, connection->isKeepAliveExpired(connection->getKeepAliveReliability()));
echoRequest("Hello", *connection);
}
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, ReconnectedWhenConnectionIsNearlyExpired)
{
auto ka = Poco::Timespan(1, 0); // 1 seconds
timeouts.withHTTPKeepAliveTimeout(ka);
auto pool = getPool();
auto metrics = pool->getMetrics();
{
{
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
}
sleepForMilliseconds(900);
{
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
}
}
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, ServerOverwriteKeepAlive)
{
auto ka = Poco::Timespan(30, 0); // 30 seconds
timeouts.withHTTPKeepAliveTimeout(ka);
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds());
ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds());
}
{
setOverWriteKeepAlive(1, 10);
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds());
ASSERT_EQ(1, connection->getKeepAliveTimeout().totalSeconds());
}
{
// server do not overwrite it in the following requests but client has to remember last agreed value
setOverWriteKeepAlive(0, 0);
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds());
ASSERT_EQ(1, connection->getKeepAliveTimeout().totalSeconds());
}
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(3, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, MaxRequests)
{
auto ka = Poco::Timespan(30, 0); // 30 seconds
timeouts.withHTTPKeepAliveTimeout(ka);
auto max_requests = 5;
timeouts.http_keep_alive_max_requests = max_requests;
auto pool = getPool();
auto metrics = pool->getMetrics();
for (int i = 1; i <= max_requests - 1; ++i)
{
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds());
ASSERT_EQ(max_requests, connection->getKeepAliveMaxRequests());
ASSERT_EQ(i, connection->getKeepAliveRequest());
}
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(max_requests-1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(max_requests-2, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
{
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds());
ASSERT_EQ(max_requests, connection->getKeepAliveMaxRequests());
ASSERT_EQ(max_requests, connection->getKeepAliveRequest());
}
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(max_requests-1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(max_requests-1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, ServerOverwriteMaxRequests)
{
auto ka = Poco::Timespan(30, 0); // 30 seconds
timeouts.withHTTPKeepAliveTimeout(ka);
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds());
ASSERT_EQ(1000, connection->getKeepAliveMaxRequests());
ASSERT_EQ(1, connection->getKeepAliveRequest());
}
auto max_requests = 3;
setOverWriteKeepAlive(5, max_requests);
for (int i = 2; i <= 10*max_requests; ++i)
{
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
ASSERT_EQ(5, connection->getKeepAliveTimeout().totalSeconds());
ASSERT_EQ(max_requests, connection->getKeepAliveMaxRequests());
ASSERT_EQ(((i-1) % max_requests) + 1, connection->getKeepAliveRequest());
}
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(10*max_requests-10, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(10*max_requests-10, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}

View File

@ -483,7 +483,7 @@ void testTranscoding(Timer & timer, ICompressionCodec & codec, const CodecTestSe
ASSERT_TRUE(EqualByteContainers(test_sequence.data_type->getSizeOfValueInMemory(), source_data, decoded));
const auto header_size = codec.getHeaderSize();
const auto header_size = ICompressionCodec::getHeaderSize();
const auto compression_ratio = (encoded_size - header_size) / (source_data.size() * 1.0);
if (expected_compression_ratio)

View File

@ -54,6 +54,7 @@ static constexpr auto DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT = 15;
static constexpr auto DEFAULT_TCP_KEEP_ALIVE_TIMEOUT = 290;
static constexpr auto DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT = 30;
static constexpr auto DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST = 1000;
static constexpr auto DBMS_DEFAULT_PATH = "/var/lib/clickhouse/";

View File

@ -128,9 +128,9 @@ namespace DB
M(Bool, format_alter_operations_with_parentheses, false, "If enabled, each operation in alter queries will be surrounded with parentheses in formatted queries to make them less ambiguous.", 0) \
M(String, default_replica_path, "/clickhouse/tables/{uuid}/{shard}", "The path to the table in ZooKeeper", 0) \
M(String, default_replica_name, "{replica}", "The replica name in ZooKeeper", 0) \
M(UInt64, disk_connections_soft_limit, 1000, "Connections above this limit have significantly shorter time to live. The limit applies to the disks connections.", 0) \
M(UInt64, disk_connections_soft_limit, 5000, "Connections above this limit have significantly shorter time to live. The limit applies to the disks connections.", 0) \
M(UInt64, disk_connections_warn_limit, 10000, "Warning massages are written to the logs if number of in-use connections are higher than this limit. The limit applies to the disks connections.", 0) \
M(UInt64, disk_connections_store_limit, 12000, "Connections above this limit reset after use. Set to 0 to turn connection cache off. The limit applies to the disks connections.", 0) \
M(UInt64, disk_connections_store_limit, 30000, "Connections above this limit reset after use. Set to 0 to turn connection cache off. The limit applies to the disks connections.", 0) \
M(UInt64, storage_connections_soft_limit, 100, "Connections above this limit have significantly shorter time to live. The limit applies to the storages connections.", 0) \
M(UInt64, storage_connections_warn_limit, 1000, "Warning massages are written to the logs if number of in-use connections are higher than this limit. The limit applies to the storages connections.", 0) \
M(UInt64, storage_connections_store_limit, 5000, "Connections above this limit reset after use. Set to 0 to turn connection cache off. The limit applies to the storages connections.", 0) \

View File

@ -665,7 +665,7 @@ void BaseDaemon::reloadConfiguration()
*/
config_path = config().getString("config-file", getDefaultConfigFileName());
ConfigProcessor config_processor(config_path, false, true);
config_processor.setConfigPath(fs::path(config_path).parent_path());
ConfigProcessor::setConfigPath(fs::path(config_path).parent_path());
loaded_config = config_processor.loadConfig(/* allow_zk_includes = */ true);
if (last_configuration != nullptr)

View File

@ -18,7 +18,7 @@ SerializationPtr DataTypeDate32::doGetDefaultSerialization() const
Field DataTypeDate32::getDefault() const
{
return -static_cast<Int64>(DateLUT::instance().getDayNumOffsetEpoch());
return -static_cast<Int64>(DateLUT::instance().getDayNumOffsetEpoch()); /// NOLINT(readability-static-accessed-through-instance)
}
void registerDataTypeDate32(DataTypeFactory & factory)

View File

@ -34,7 +34,7 @@ TEST(JSONDataParser, ReadJSON)
JSONDataParser<SimdJSONParser> parser;
ReadBufferFromString buf(json_bad);
String res;
parser.readJSON(res, buf);
JSONDataParser<SimdJSONParser>::readJSON(res, buf);
ASSERT_EQ(json1, res);
}
@ -44,7 +44,7 @@ TEST(JSONDataParser, ReadJSON)
JSONDataParser<SimdJSONParser> parser;
ReadBufferFromString buf(json_bad);
String res;
parser.readJSON(res, buf);
JSONDataParser<SimdJSONParser>::readJSON(res, buf);
ASSERT_EQ(json2, res);
}
}

View File

@ -346,7 +346,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegment & file_s
}
auto downloader_id = file_segment.getOrSetDownloader();
if (downloader_id == file_segment.getCallerId())
if (downloader_id == FileSegment::getCallerId())
{
if (canStartFromCache(file_offset_of_buffer_end, file_segment))
{

View File

@ -76,6 +76,9 @@ std::unique_ptr<S3::Client> getClient(
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", S3::DEFAULT_CONNECT_TIMEOUT_MS);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", S3::DEFAULT_REQUEST_TIMEOUT_MS);
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", S3::DEFAULT_MAX_CONNECTIONS);
client_configuration.http_keep_alive_timeout = config.getUInt(config_prefix + ".http_keep_alive_timeout", S3::DEFAULT_KEEP_ALIVE_TIMEOUT);
client_configuration.http_keep_alive_max_requests = config.getUInt(config_prefix + ".http_keep_alive_max_requests", S3::DEFAULT_KEEP_ALIVE_MAX_REQUESTS);
client_configuration.endpointOverride = uri.endpoint;
client_configuration.s3_use_adaptive_timeouts = config.getBool(
config_prefix + ".use_adaptive_timeouts", client_configuration.s3_use_adaptive_timeouts);

View File

@ -279,7 +279,7 @@ struct ToDate32Transform32Or64Signed
static NO_SANITIZE_UNDEFINED Int32 execute(const FromType & from, const DateLUTImpl & time_zone)
{
static const Int32 daynum_min_offset = -static_cast<Int32>(time_zone.getDayNumOffsetEpoch());
static const Int32 daynum_min_offset = -static_cast<Int32>(DateLUTImpl::getDayNumOffsetEpoch());
if constexpr (date_time_overflow_behavior == FormatSettings::DateTimeOverflowBehavior::Throw)
{
@ -1092,7 +1092,7 @@ struct ConvertThroughParsing
{
if constexpr (std::is_same_v<ToDataType, DataTypeDate32>)
{
vec_to[i] = -static_cast<Int32>(DateLUT::instance().getDayNumOffsetEpoch());
vec_to[i] = -static_cast<Int32>(DateLUT::instance().getDayNumOffsetEpoch()); /// NOLINT(readability-static-accessed-through-instance)
}
else
{

View File

@ -106,7 +106,7 @@ void UserDefinedSQLFunctionFactory::checkCanBeRegistered(const ContextPtr & cont
if (AggregateFunctionFactory::instance().hasNameOrAlias(function_name))
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The aggregate function '{}' already exists", function_name);
if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context))
if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context)) /// NOLINT(readability-static-accessed-through-instance)
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User defined executable function '{}' already exists", function_name);
validateFunction(assert_cast<const ASTCreateFunctionQuery &>(create_function_query).function_core, function_name);
@ -118,7 +118,7 @@ void UserDefinedSQLFunctionFactory::checkCanBeUnregistered(const ContextPtr & co
AggregateFunctionFactory::instance().hasNameOrAlias(function_name))
throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "Cannot drop system function '{}'", function_name);
if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context))
if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context)) /// NOLINT(readability-static-accessed-through-instance)
throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "Cannot drop user defined executable function '{}'", function_name);
}

View File

@ -66,13 +66,13 @@ struct DotProduct
};
template <typename Type>
static void accumulate(State<Type> & state, Type x, Type y)
static NO_SANITIZE_UNDEFINED void accumulate(State<Type> & state, Type x, Type y)
{
state.sum += x * y;
}
template <typename Type>
static void combine(State<Type> & state, const State<Type> & other_state)
static NO_SANITIZE_UNDEFINED void combine(State<Type> & state, const State<Type> & other_state)
{
state.sum += other_state.sum;
}

View File

@ -144,7 +144,12 @@ ConnectionTimeouts ConnectionTimeouts::getAdaptiveTimeouts(const String & method
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
{
session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
/// we can not change keep alive timeout for already initiated connections
if (!session.connected())
{
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
session.setKeepAliveMaxRequests(int(timeouts.http_keep_alive_max_requests));
}
}
ConnectionTimeouts getTimeouts(const Poco::Net::HTTPClientSession & session)

View File

@ -35,6 +35,8 @@ struct ConnectionTimeouts
Poco::Timespan tcp_keep_alive_timeout = Poco::Timespan(DEFAULT_TCP_KEEP_ALIVE_TIMEOUT, 0);
Poco::Timespan http_keep_alive_timeout = Poco::Timespan(DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT, 0);
size_t http_keep_alive_max_requests = DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST;
/// Timeouts for HedgedConnections
Poco::Timespan hedged_connection_timeout = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0);
@ -69,6 +71,7 @@ APPLY_FOR_ALL_CONNECTION_TIMEOUT_MEMBERS(DECLARE_BUILDER_FOR_MEMBER)
ConnectionTimeouts & withConnectionTimeout(size_t seconds);
ConnectionTimeouts & withConnectionTimeout(Poco::Timespan span);
ConnectionTimeouts & withHTTPKeepAliveMaxRequests(size_t requests);
};
/// NOLINTBEGIN(bugprone-macro-parentheses)
@ -114,6 +117,12 @@ inline ConnectionTimeouts & ConnectionTimeouts::withConnectionTimeout(Poco::Time
return *this;
}
inline ConnectionTimeouts & ConnectionTimeouts::withHTTPKeepAliveMaxRequests(size_t requests)
{
http_keep_alive_max_requests = requests;
return *this;
}
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts);
ConnectionTimeouts getTimeouts(const Poco::Net::HTTPClientSession & session);

View File

@ -37,7 +37,7 @@ MMapReadBufferFromFileWithCache::MMapReadBufferFromFileWithCache(
MMapReadBufferFromFileWithCache::MMapReadBufferFromFileWithCache(
MMappedFileCache & cache, const std::string & file_name, size_t offset)
{
mapped = cache.getOrSet(cache.hash(file_name, offset, -1), [&]
mapped = cache.getOrSet(MMappedFileCache::hash(file_name, offset, -1), [&]
{
return std::make_shared<MMappedFile>(file_name, offset);
});

View File

@ -345,7 +345,7 @@ void ReadWriteBufferFromHTTP::doWithRetries(std::function<void()> && callable,
if (last_attempt || !is_retriable)
{
if (!mute_logging)
LOG_ERROR(log,
LOG_DEBUG(log,
"Failed to make request to '{}'{}. "
"Error: '{}'. "
"Failed at try {}/{}.",
@ -361,7 +361,7 @@ void ReadWriteBufferFromHTTP::doWithRetries(std::function<void()> && callable,
on_retry();
if (!mute_logging)
LOG_INFO(log,
LOG_TRACE(log,
"Failed to make request to '{}'{}. "
"Error: {}. "
"Failed at try {}/{}. "

View File

@ -96,9 +96,9 @@ bool isS3ExpressEndpoint(const std::string & endpoint);
struct ClientSettings
{
bool use_virtual_addressing;
bool use_virtual_addressing = false;
/// Disable checksum to avoid extra read of the input stream
bool disable_checksum;
bool disable_checksum = false;
/// Should client send ComposeObject request after upload to GCS.
///
/// Previously ComposeObject request was required to make Copy possible,
@ -108,8 +108,8 @@ struct ClientSettings
///
/// Ability to enable it preserved since likely it is required for old
/// files.
bool gcs_issue_compose_request;
bool is_s3express_bucket;
bool gcs_issue_compose_request = false;
bool is_s3express_bucket = false;
};
/// Client that improves the client from the AWS SDK

View File

@ -22,6 +22,8 @@ inline static constexpr uint64_t DEFAULT_EXPIRATION_WINDOW_SECONDS = 120;
inline static constexpr uint64_t DEFAULT_CONNECT_TIMEOUT_MS = 1000;
inline static constexpr uint64_t DEFAULT_REQUEST_TIMEOUT_MS = 30000;
inline static constexpr uint64_t DEFAULT_MAX_CONNECTIONS = 100;
inline static constexpr uint64_t DEFAULT_KEEP_ALIVE_TIMEOUT = 5;
inline static constexpr uint64_t DEFAULT_KEEP_ALIVE_MAX_REQUESTS = 100;
/// In GCP metadata service can be accessed via DNS regardless of IPv4 or IPv6.
static inline constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.google.internal";

View File

@ -146,7 +146,9 @@ ConnectionTimeouts getTimeoutsFromConfiguration(const PocoHTTPClientConfiguratio
.withSendTimeout(Poco::Timespan(client_configuration.requestTimeoutMs * 1000))
.withReceiveTimeout(Poco::Timespan(client_configuration.requestTimeoutMs * 1000))
.withTCPKeepAliveTimeout(Poco::Timespan(
client_configuration.enableTcpKeepAlive ? client_configuration.tcpKeepAliveIntervalMs * 1000 : 0));
client_configuration.enableTcpKeepAlive ? client_configuration.tcpKeepAliveIntervalMs * 1000 : 0))
.withHTTPKeepAliveTimeout(Poco::Timespan(client_configuration.http_keep_alive_timeout, 0))
.withHTTPKeepAliveMaxRequests(client_configuration.http_keep_alive_max_requests);
}
PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_configuration)

View File

@ -51,6 +51,8 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
/// See PoolBase::BehaviourOnLimit
bool s3_use_adaptive_timeouts = true;
size_t http_keep_alive_timeout = DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT;
size_t http_keep_alive_max_requests = DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST;
std::function<void(const DB::ProxyConfiguration &)> error_report;

View File

@ -33,12 +33,17 @@ namespace S3
URI::URI(const std::string & uri_)
{
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.Region.amazonaws.com/key)
/// E.g. (https://bucket-name.s3.region.amazonaws.com/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#virtual-hosted-style-access
static const RE2 virtual_hosted_style_pattern(R"((.+)\.(s3express[\-a-z0-9]+|s3|cos|obs|oss|eos)([.\-][a-z0-9\-.:]+))");
/// Case when AWS Private Link Interface is being used
/// E.g. (bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com/bucket-name/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/userguide/privatelink-interface-endpoints.html
static const RE2 aws_private_link_style_pattern(R"(bucket\.vpce\-([a-z0-9\-.]+)\.vpce.amazonaws.com(:\d{1,5})?)");
/// Case when bucket name and key represented in path of S3 URL.
/// E.g. (https://s3.Region.amazonaws.com/bucket-name/key)
/// E.g. (https://s3.region.amazonaws.com/bucket-name/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#path-style-access
static const RE2 path_style_pattern("^/([^/]*)/(.*)");
@ -103,7 +108,10 @@ URI::URI(const std::string & uri_)
String name;
String endpoint_authority_from_uri;
if (re2::RE2::FullMatch(uri.getAuthority(), virtual_hosted_style_pattern, &bucket, &name, &endpoint_authority_from_uri))
bool is_using_aws_private_link_interface = re2::RE2::FullMatch(uri.getAuthority(), aws_private_link_style_pattern);
if (!is_using_aws_private_link_interface
&& re2::RE2::FullMatch(uri.getAuthority(), virtual_hosted_style_pattern, &bucket, &name, &endpoint_authority_from_uri))
{
is_virtual_hosted_style = true;
endpoint = uri.getScheme() + "://" + name + endpoint_authority_from_uri;

View File

@ -17,6 +17,7 @@ namespace DB::S3
* The following patterns are allowed:
* s3://bucket/key
* http(s)://endpoint/bucket/key
* http(s)://bucket.<vpce_endpoint_id>.s3.<region>.vpce.amazonaws.com<:port_number>/bucket_name/key
*/
struct URI
{

View File

@ -159,7 +159,7 @@ void testServerSideEncryption(
DB::S3::CredentialsConfiguration
{
.use_environment_credentials = use_environment_credentials,
.use_insecure_imds_request = use_insecure_imds_request
.use_insecure_imds_request = use_insecure_imds_request,
}
);

View File

@ -74,6 +74,40 @@ const TestCase TestCases[] = {
"data",
"",
true},
{S3::URI("https://bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com/root/nested/file.txt"),
"https://bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com",
"root",
"nested/file.txt",
"",
false},
// Test with a file with no extension
{S3::URI("https://bucket.vpce-03b2c987f1bd55c5f-j3b4vg7w.s3.ap-southeast-2.vpce.amazonaws.com/some_bucket/document"),
"https://bucket.vpce-03b2c987f1bd55c5f-j3b4vg7w.s3.ap-southeast-2.vpce.amazonaws.com",
"some_bucket",
"document",
"",
false},
// Test with a deeply nested file path
{S3::URI("https://bucket.vpce-0242cd56f1bd55c5f-l5b7vg8x.s3.sa-east-1.vpce.amazonaws.com/some_bucket/b/c/d/e/f/g/h/i/j/data.json"),
"https://bucket.vpce-0242cd56f1bd55c5f-l5b7vg8x.s3.sa-east-1.vpce.amazonaws.com",
"some_bucket",
"b/c/d/e/f/g/h/i/j/data.json",
"",
false},
// Zonal
{S3::URI("https://bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w-us-east-1a.s3.us-east-1.vpce.amazonaws.com/root/nested/file.txt"),
"https://bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w-us-east-1a.s3.us-east-1.vpce.amazonaws.com",
"root",
"nested/file.txt",
"",
false},
// Non standard port
{S3::URI("https://bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w-us-east-1a.s3.us-east-1.vpce.amazonaws.com:65535/root/nested/file.txt"),
"https://bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w-us-east-1a.s3.us-east-1.vpce.amazonaws.com:65535",
"root",
"nested/file.txt",
"",
false},
};
class S3UriTest : public testing::TestWithParam<std::string>

View File

@ -1071,7 +1071,7 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
auto current_context = data.getContext();
if (UserDefinedExecutableFunctionFactory::instance().has(node.name, current_context))
if (UserDefinedExecutableFunctionFactory::instance().has(node.name, current_context)) /// NOLINT(readability-static-accessed-through-instance)
{
Array parameters;
if (node.parameters)
@ -1087,7 +1087,7 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
}
}
function_builder = UserDefinedExecutableFunctionFactory::instance().tryGet(node.name, current_context, parameters);
function_builder = UserDefinedExecutableFunctionFactory::instance().tryGet(node.name, current_context, parameters); /// NOLINT(readability-static-accessed-through-instance)
}
if (!function_builder)

View File

@ -1056,7 +1056,7 @@ void NO_INLINE Aggregator::executeImplBatch(
/// During processing of row #i we will prefetch HashTable cell for row #(i + prefetch_look_ahead).
PrefetchingHelper prefetching;
size_t prefetch_look_ahead = prefetching.getInitialLookAheadValue();
size_t prefetch_look_ahead = PrefetchingHelper::getInitialLookAheadValue();
/// Optimization for special case when there are no aggregate functions.
if (params.aggregates_size == 0)
@ -1077,7 +1077,7 @@ void NO_INLINE Aggregator::executeImplBatch(
{
if constexpr (prefetch && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
{
if (i == row_begin + prefetching.iterationsToMeasure())
if (i == row_begin + PrefetchingHelper::iterationsToMeasure())
prefetch_look_ahead = prefetching.calcPrefetchLookAhead();
if (i + prefetch_look_ahead < row_end)
@ -1163,7 +1163,7 @@ void NO_INLINE Aggregator::executeImplBatch(
if constexpr (prefetch && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
{
if (i == key_start + prefetching.iterationsToMeasure())
if (i == key_start + PrefetchingHelper::iterationsToMeasure())
prefetch_look_ahead = prefetching.calcPrefetchLookAhead();
if (i + prefetch_look_ahead < row_end)

View File

@ -17,6 +17,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
EvictionCandidates::EvictionCandidates()
: log(getLogger("EvictionCandidates"))
{
}
EvictionCandidates::~EvictionCandidates()
{
/// Here `queue_entries_to_invalidate` contains queue entries
@ -31,6 +36,10 @@ EvictionCandidates::~EvictionCandidates()
iterator->invalidate();
}
/// We cannot reset evicting flag if we already removed queue entries.
if (removed_queue_entries)
return;
/// Here `candidates` contain only those file segments
/// which failed to be removed during evict()
/// because there was some exception before evict()
@ -58,13 +67,37 @@ void EvictionCandidates::add(
++candidates_size;
}
void EvictionCandidates::removeQueueEntries(const CachePriorityGuard::Lock & lock)
{
/// Remove queue entries of eviction candidates.
/// This will release space we consider to be hold for them.
LOG_TEST(log, "Will remove {} eviction candidates", size());
for (const auto & [key, key_candidates] : candidates)
{
for (const auto & candidate : key_candidates.candidates)
{
auto queue_iterator = candidate->getQueueIterator();
queue_iterator->invalidate();
candidate->file_segment->resetQueueIterator();
queue_iterator->remove(lock);
}
}
removed_queue_entries = true;
}
void EvictionCandidates::evict()
{
if (candidates.empty())
return;
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::FilesystemCacheEvictMicroseconds);
queue_entries_to_invalidate.reserve(candidates_size);
/// If queue entries are already removed, then nothing to invalidate.
if (!removed_queue_entries)
queue_entries_to_invalidate.reserve(candidates_size);
for (auto & [key, key_candidates] : candidates)
{
@ -80,10 +113,14 @@ void EvictionCandidates::evict()
{
auto & candidate = key_candidates.candidates.back();
chassert(candidate->releasable());
const auto segment = candidate->file_segment;
auto iterator = segment->getQueueIterator();
chassert(iterator);
IFileCachePriority::IteratorPtr iterator;
if (!removed_queue_entries)
{
iterator = segment->getQueueIterator();
chassert(iterator);
}
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedFileSegments);
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedBytes, segment->range().size());
@ -112,7 +149,9 @@ void EvictionCandidates::evict()
/// it was freed in favour of some reserver, so we can make it visibly
/// free only for that particular reserver.
queue_entries_to_invalidate.push_back(iterator);
if (iterator)
queue_entries_to_invalidate.push_back(iterator);
key_candidates.candidates.pop_back();
}
}
@ -154,6 +193,12 @@ void EvictionCandidates::finalize(
on_finalize.clear();
}
bool EvictionCandidates::needFinalize() const
{
/// Do we need to call finalize()?
return !on_finalize.empty() || !queue_entries_to_invalidate.empty();
}
void EvictionCandidates::setSpaceHolder(
size_t size,
size_t elements,

View File

@ -4,11 +4,12 @@
namespace DB
{
class EvictionCandidates
class EvictionCandidates : private boost::noncopyable
{
public:
using FinalizeEvictionFunc = std::function<void(const CachePriorityGuard::Lock & lk)>;
EvictionCandidates();
~EvictionCandidates();
void add(
@ -18,12 +19,16 @@ public:
void evict();
void removeQueueEntries(const CachePriorityGuard::Lock &);
void onFinalize(FinalizeEvictionFunc && func) { on_finalize.emplace_back(std::move(func)); }
void finalize(
FileCacheQueryLimit::QueryContext * query_context,
const CachePriorityGuard::Lock &);
bool needFinalize() const;
size_t size() const { return candidates_size; }
auto begin() const { return candidates.begin(); }
@ -47,8 +52,13 @@ private:
size_t candidates_size = 0;
std::vector<FinalizeEvictionFunc> on_finalize;
std::vector<IFileCachePriority::IteratorPtr> queue_entries_to_invalidate;
bool removed_queue_entries = false;
IFileCachePriority::HoldSpacePtr hold_space;
LoggerPtr log;
};
using EvictionCandidatesPtr = std::unique_ptr<EvictionCandidates>;

View File

@ -186,6 +186,7 @@ void FileCache::initialize()
}
metadata.startup();
is_initialized = true;
}
@ -1384,37 +1385,86 @@ void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings,
}
}
if (new_settings.max_size != actual_settings.max_size
|| new_settings.max_elements != actual_settings.max_elements)
{
cache_is_being_resized.store(true, std::memory_order_relaxed);
SCOPE_EXIT({
cache_is_being_resized.store(false, std::memory_order_relaxed);
});
EvictionCandidates eviction_candidates;
bool modified_size_limit = false;
auto cache_lock = lockCache();
bool updated = false;
try
/// In order to not block cache for the duration of cache resize,
/// we do:
/// a. Take a cache lock.
/// 1. Collect eviction candidates,
/// 2. Remove queue entries of eviction candidates.
/// This will release space we consider to be hold for them,
/// so that we can safely modify size limits.
/// 3. Modify size limits of cache.
/// b. Release a cache lock.
/// 1. Do actual eviction from filesystem.
{
updated = main_priority->modifySizeLimits(
new_settings.max_size, new_settings.max_elements, new_settings.slru_size_ratio, cache_lock);
}
catch (...)
{
actual_settings.max_size = main_priority->getSizeLimit(cache_lock);
actual_settings.max_elements = main_priority->getElementsLimit(cache_lock);
throw;
cache_is_being_resized.store(true, std::memory_order_relaxed);
SCOPE_EXIT({
cache_is_being_resized.store(false, std::memory_order_relaxed);
});
auto cache_lock = lockCache();
FileCacheReserveStat stat;
if (main_priority->collectCandidatesForEviction(
new_settings.max_size, new_settings.max_elements, 0/* max_candidates_to_evict */,
stat, eviction_candidates, cache_lock))
{
/// Remove only queue entries of eviction candidates.
eviction_candidates.removeQueueEntries(cache_lock);
/// Note that (in-memory) metadata about corresponding file segments
/// (e.g. file segment info in CacheMetadata) will be removed
/// only after eviction from filesystem. This is needed to avoid
/// a race on removal of file from filesystsem and
/// addition of the same file as part of a newly cached file segment.
/// Modify cache size limits.
/// From this point cache eviction will follow them.
main_priority->modifySizeLimits(
new_settings.max_size, new_settings.max_elements,
new_settings.slru_size_ratio, cache_lock);
modified_size_limit = true;
}
}
if (updated)
if (modified_size_limit)
{
try
{
/// Do actual eviction from filesystem.
eviction_candidates.evict();
}
catch (...)
{
if (eviction_candidates.needFinalize())
eviction_candidates.finalize(nullptr, lockCache());
throw;
}
if (eviction_candidates.needFinalize())
eviction_candidates.finalize(nullptr, lockCache());
LOG_INFO(log, "Changed max_size from {} to {}, max_elements from {} to {}",
actual_settings.max_size, new_settings.max_size,
actual_settings.max_elements, new_settings.max_elements);
actual_settings.max_size = main_priority->getSizeLimit(cache_lock);
actual_settings.max_elements = main_priority->getElementsLimit(cache_lock);
actual_settings.max_size = new_settings.max_size;
actual_settings.max_elements = new_settings.max_elements;
}
else
{
LOG_WARNING(
log, "Unable to modify size limit from {} to {}, elements limit from {} to {}. "
"`max_size` and `max_elements` settings will remain inconsistent with config.xml. "
"Next attempt to update them will happen on the next config reload. "
"You can trigger it with SYSTEM RELOAD CONFIG.",
actual_settings.max_size, new_settings.max_size,
actual_settings.max_elements, new_settings.max_elements);
}
}

View File

@ -18,6 +18,7 @@
#include <Interpreters/Cache/FileCache_fwd_internal.h>
#include <Interpreters/Cache/FileCacheSettings.h>
#include <Interpreters/Cache/UserInfo.h>
#include <Core/BackgroundSchedulePool.h>
#include <filesystem>

View File

@ -155,7 +155,17 @@ void FileCacheFactory::updateSettingsFromConfig(const Poco::Util::AbstractConfig
FileCacheSettings old_settings = cache_info->getSettings();
if (old_settings == new_settings)
{
continue;
}
/// FIXME: registerDiskCache modifies `path` setting of FileCacheSettings if path is relative.
/// This can lead to calling applySettingsIfPossible even though nothing changed, which is avoidable.
// LOG_TRACE(log, "Will apply settings changes for cache {}. "
// "Settings changes: {} (new settings: {}, old_settings: {})",
// cache_name, fmt::join(new_settings.getSettingsDiff(old_settings), ", "),
// new_settings.toString(), old_settings.toString());
try
{
@ -166,6 +176,7 @@ void FileCacheFactory::updateSettingsFromConfig(const Poco::Util::AbstractConfig
/// Settings changes could be partially applied in case of exception,
/// make sure cache_info->settings show correct state of applied settings.
cache_info->setSettings(old_settings);
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}

View File

@ -5,6 +5,7 @@
#include <Common/NamedCollections/NamedCollections.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <Common/logger_useful.h>
namespace DB
@ -98,4 +99,61 @@ void FileCacheSettings::loadFromCollection(const NamedCollection & collection)
loadImpl(std::move(collection_has), std::move(collection_get_uint), std::move(collection_get_string), std::move(collection_get_double));
}
std::string FileCacheSettings::toString() const
{
WriteBufferFromOwnString res;
res << "base_path: " << base_path << ", ";
res << "max_size: " << max_size << ", ";
res << "max_elements: " << max_elements << ", ";
res << "max_file_segment_size: " << max_file_segment_size << ", ";
res << "cache_on_write_operations: " << cache_on_write_operations << ", ";
res << "cache_hits_threshold: " << cache_hits_threshold << ", ";
res << "enable_filesystem_query_cache_limit: " << enable_filesystem_query_cache_limit << ", ";
res << "bypass_cache_threshold: " << bypass_cache_threshold << ", ";
res << "boundary_alignment: " << boundary_alignment << ", ";
res << "background_download_threads: " << background_download_threads << ", ";
res << "background_download_queue_size_limit: " << background_download_queue_size_limit << ", ";
res << "load_metadata_threads: " << load_metadata_threads << ", ";
res << "write_cache_per_user_id_directory: " << write_cache_per_user_id_directory << ", ";
res << "cache_policy: " << cache_policy << ", ";
res << "slru_size_ratio: " << slru_size_ratio << ", ";
return res.str();
}
std::vector<std::string> FileCacheSettings::getSettingsDiff(const FileCacheSettings & other) const
{
std::vector<std::string> res;
if (base_path != other.base_path)
res.push_back("base_path");
if (max_size != other.max_size)
res.push_back("max_size");
if (max_elements != other.max_elements)
res.push_back("max_elements");
if (max_file_segment_size != other.max_file_segment_size)
res.push_back("max_file_segment_size");
if (cache_on_write_operations != other.cache_on_write_operations)
res.push_back("cache_on_write_operations");
if (cache_hits_threshold != other.cache_hits_threshold)
res.push_back("cache_hits_threshold");
if (enable_filesystem_query_cache_limit != other.enable_filesystem_query_cache_limit)
res.push_back("enable_filesystem_query_cache_limit");
if (bypass_cache_threshold != other.bypass_cache_threshold)
res.push_back("bypass_cache_threshold");
if (boundary_alignment != other.boundary_alignment)
res.push_back("boundary_alignment");
if (background_download_threads != other.background_download_threads)
res.push_back("background_download_threads");
if (background_download_queue_size_limit != other.background_download_queue_size_limit)
res.push_back("background_download_queue_size_limit");
if (load_metadata_threads != other.load_metadata_threads)
res.push_back("load_metadata_threads");
if (write_cache_per_user_id_directory != other.write_cache_per_user_id_directory)
res.push_back("write_cache_per_user_directory");
if (cache_policy != other.cache_policy)
res.push_back("cache_policy");
if (slru_size_ratio != other.slru_size_ratio)
res.push_back("slru_size_ratio");
return res;
}
}

View File

@ -41,6 +41,9 @@ struct FileCacheSettings
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
void loadFromCollection(const NamedCollection & collection);
std::string toString() const;
std::vector<std::string> getSettingsDiff(const FileCacheSettings & other) const;
bool operator ==(const FileCacheSettings &) const = default;
private:

View File

@ -114,7 +114,7 @@ FileSegment::Range::Range(size_t left_, size_t right_) : left(left_), right(righ
FileSegment::State FileSegment::state() const
{
auto lock = lockFileSegment();
auto lk = lock();
return download_state;
}
@ -131,7 +131,7 @@ String FileSegment::tryGetPath() const
return metadata->getFileSegmentPath(*this);
}
FileSegmentGuard::Lock FileSegment::lockFileSegment() const
FileSegmentGuard::Lock FileSegment::lock() const
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentLockMicroseconds);
return segment_guard.lock();
@ -153,24 +153,30 @@ void FileSegment::setDownloadState(State state, const FileSegmentGuard::Lock & l
size_t FileSegment::getReservedSize() const
{
auto lock = lockFileSegment();
auto lk = lock();
return reserved_size;
}
FileSegment::Priority::IteratorPtr FileSegment::getQueueIterator() const
{
auto lock = lockFileSegment();
auto lk = lock();
return queue_iterator;
}
void FileSegment::setQueueIterator(Priority::IteratorPtr iterator)
{
auto lock = lockFileSegment();
auto lk = lock();
if (queue_iterator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Queue iterator cannot be set twice");
queue_iterator = iterator;
}
void FileSegment::resetQueueIterator()
{
auto lk = lock();
queue_iterator.reset();
}
size_t FileSegment::getCurrentWriteOffset() const
{
return range().left + downloaded_size;
@ -183,14 +189,14 @@ size_t FileSegment::getDownloadedSize() const
void FileSegment::setDownloadedSize(size_t delta)
{
auto lock = lockFileSegment();
auto lk = lock();
downloaded_size += delta;
assert(downloaded_size == std::filesystem::file_size(getPath()));
}
bool FileSegment::isDownloaded() const
{
auto lock = lockFileSegment();
auto lk = lock();
return download_state == State::DOWNLOADED;
}
@ -204,8 +210,7 @@ String FileSegment::getCallerId()
String FileSegment::getDownloader() const
{
auto lock = lockFileSegment();
return getDownloaderUnlocked(lock);
return getDownloaderUnlocked(lock());
}
String FileSegment::getDownloaderUnlocked(const FileSegmentGuard::Lock &) const
@ -215,11 +220,11 @@ String FileSegment::getDownloaderUnlocked(const FileSegmentGuard::Lock &) const
String FileSegment::getOrSetDownloader()
{
auto lock = lockFileSegment();
auto lk = lock();
assertNotDetachedUnlocked(lock);
assertNotDetachedUnlocked(lk);
auto current_downloader = getDownloaderUnlocked(lock);
auto current_downloader = getDownloaderUnlocked(lk);
if (current_downloader.empty())
{
@ -229,7 +234,7 @@ String FileSegment::getOrSetDownloader()
return "notAllowed:" + stateToString(download_state);
current_downloader = downloader_id = caller_id;
setDownloadState(State::DOWNLOADING, lock);
setDownloadState(State::DOWNLOADING, lk);
chassert(key_metadata.lock());
}
@ -253,15 +258,15 @@ void FileSegment::resetDownloadingStateUnlocked(const FileSegmentGuard::Lock & l
void FileSegment::resetDownloader()
{
auto lock = lockFileSegment();
auto lk = lock();
SCOPE_EXIT({ cv.notify_all(); });
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("resetDownloader", lock);
assertNotDetachedUnlocked(lk);
assertIsDownloaderUnlocked("resetDownloader", lk);
resetDownloadingStateUnlocked(lock);
resetDownloaderUnlocked(lock);
resetDownloadingStateUnlocked(lk);
resetDownloaderUnlocked(lk);
}
void FileSegment::resetDownloaderUnlocked(const FileSegmentGuard::Lock &)
@ -290,8 +295,8 @@ void FileSegment::assertIsDownloaderUnlocked(const std::string & operation, cons
bool FileSegment::isDownloader() const
{
auto lock = lockFileSegment();
return isDownloaderUnlocked(lock);
auto lk = lock();
return isDownloaderUnlocked(lk);
}
bool FileSegment::isDownloaderUnlocked(const FileSegmentGuard::Lock & lock) const
@ -301,21 +306,21 @@ bool FileSegment::isDownloaderUnlocked(const FileSegmentGuard::Lock & lock) cons
FileSegment::RemoteFileReaderPtr FileSegment::getRemoteFileReader()
{
auto lock = lockFileSegment();
assertIsDownloaderUnlocked("getRemoteFileReader", lock);
auto lk = lock();
assertIsDownloaderUnlocked("getRemoteFileReader", lk);
return remote_file_reader;
}
void FileSegment::resetRemoteFileReader()
{
auto lock = lockFileSegment();
assertIsDownloaderUnlocked("resetRemoteFileReader", lock);
auto lk = lock();
assertIsDownloaderUnlocked("resetRemoteFileReader", lk);
remote_file_reader.reset();
}
FileSegment::RemoteFileReaderPtr FileSegment::extractRemoteFileReader()
{
auto lock = lockFileSegment();
auto lk = lock();
if (remote_file_reader && (download_state == State::DOWNLOADED
|| download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION))
{
@ -326,8 +331,8 @@ FileSegment::RemoteFileReaderPtr FileSegment::extractRemoteFileReader()
void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_)
{
auto lock = lockFileSegment();
assertIsDownloaderUnlocked("setRemoteFileReader", lock);
auto lk = lock();
assertIsDownloaderUnlocked("setRemoteFileReader", lk);
if (remote_file_reader)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Remote file reader already exists");
@ -343,9 +348,9 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing zero size is not allowed");
{
auto lock = lockFileSegment();
assertIsDownloaderUnlocked("write", lock);
assertNotDetachedUnlocked(lock);
auto lk = lock();
assertIsDownloaderUnlocked("write", lk);
assertNotDetachedUnlocked(lk);
}
const auto file_segment_path = getPath();
@ -404,10 +409,10 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
const int code = e.getErrno();
const bool is_no_space_left_error = code == /* No space left on device */28 || code == /* Quota exceeded */122;
auto lock = lockFileSegment();
auto lk = lock();
e.addMessage(fmt::format("{}, current cache state: {}", e.what(), getInfoForLogUnlocked(lock)));
setDownloadFailedUnlocked(lock);
e.addMessage(fmt::format("{}, current cache state: {}", e.what(), getInfoForLogUnlocked(lk)));
setDownloadFailedUnlocked(lk);
if (downloaded_size == 0 && fs::exists(file_segment_path))
{
@ -430,9 +435,9 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
}
catch (Exception & e)
{
auto lock = lockFileSegment();
e.addMessage(fmt::format("{}, current cache state: {}", e.what(), getInfoForLogUnlocked(lock)));
setDownloadFailedUnlocked(lock);
auto lk = lock();
e.addMessage(fmt::format("{}, current cache state: {}", e.what(), getInfoForLogUnlocked(lk)));
setDownloadFailedUnlocked(lk);
throw;
}
@ -445,7 +450,7 @@ FileSegment::State FileSegment::wait(size_t offset)
span.addAttribute("clickhouse.key", key().toString());
span.addAttribute("clickhouse.offset", offset);
auto lock = lockFileSegment();
auto lk = lock();
if (downloader_id.empty() || offset < getCurrentWriteOffset())
return download_state;
@ -458,10 +463,10 @@ FileSegment::State FileSegment::wait(size_t offset)
LOG_TEST(log, "{} waiting on: {}, current downloader: {}", getCallerId(), range().toString(), downloader_id);
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentWaitMicroseconds);
chassert(!getDownloaderUnlocked(lock).empty());
chassert(!isDownloaderUnlocked(lock));
chassert(!getDownloaderUnlocked(lk).empty());
chassert(!isDownloaderUnlocked(lk));
[[maybe_unused]] const auto ok = cv.wait_for(lock, std::chrono::seconds(60), [&, this]()
[[maybe_unused]] const auto ok = cv.wait_for(lk, std::chrono::seconds(60), [&, this]()
{
return download_state != State::DOWNLOADING || offset < getCurrentWriteOffset();
});
@ -507,10 +512,10 @@ bool FileSegment::reserve(size_t size_to_reserve, size_t lock_wait_timeout_milli
bool is_file_segment_size_exceeded;
{
auto lock = lockFileSegment();
auto lk = lock();
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("reserve", lock);
assertNotDetachedUnlocked(lk);
assertIsDownloaderUnlocked("reserve", lk);
expected_downloaded_size = getDownloadedSize();
@ -553,7 +558,7 @@ bool FileSegment::reserve(size_t size_to_reserve, size_t lock_wait_timeout_milli
bool reserved = cache->tryReserve(*this, size_to_reserve, *reserve_stat, getKeyMetadata()->user, lock_wait_timeout_milliseconds);
if (!reserved)
setDownloadFailedUnlocked(lockFileSegment());
setDownloadFailedUnlocked(lock());
return reserved;
}
@ -578,8 +583,8 @@ void FileSegment::setDownloadedUnlocked(const FileSegmentGuard::Lock &)
void FileSegment::setDownloadFailed()
{
auto lock = lockFileSegment();
setDownloadFailedUnlocked(lock);
auto lk = lock();
setDownloadFailedUnlocked(lk);
}
void FileSegment::setDownloadFailedUnlocked(const FileSegmentGuard::Lock & lock)
@ -601,22 +606,22 @@ void FileSegment::setDownloadFailedUnlocked(const FileSegmentGuard::Lock & lock)
void FileSegment::completePartAndResetDownloader()
{
auto lock = lockFileSegment();
auto lk = lock();
SCOPE_EXIT({ cv.notify_all(); });
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("completePartAndResetDownloader", lock);
assertNotDetachedUnlocked(lk);
assertIsDownloaderUnlocked("completePartAndResetDownloader", lk);
chassert(download_state == State::DOWNLOADING
|| download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
if (download_state == State::DOWNLOADING)
resetDownloadingStateUnlocked(lock);
resetDownloadingStateUnlocked(lk);
resetDownloaderUnlocked(lock);
resetDownloaderUnlocked(lk);
LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(lock));
LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(lk));
}
void FileSegment::complete()
@ -636,7 +641,7 @@ void FileSegment::complete()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot complete file segment: {}", getInfoForLog());
}
auto segment_lock = lockFileSegment();
auto segment_lock = lock();
if (isCompleted(false))
return;
@ -752,8 +757,8 @@ void FileSegment::complete()
String FileSegment::getInfoForLog() const
{
auto lock = lockFileSegment();
return getInfoForLogUnlocked(lock);
auto lk = lock();
return getInfoForLogUnlocked(lk);
}
String FileSegment::getInfoForLogUnlocked(const FileSegmentGuard::Lock &) const
@ -795,7 +800,7 @@ String FileSegment::stateToString(FileSegment::State state)
bool FileSegment::assertCorrectness() const
{
return assertCorrectnessUnlocked(lockFileSegment());
return assertCorrectnessUnlocked(lock());
}
bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock & lock) const
@ -841,7 +846,6 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock & lock)
chassert(downloaded_size == range().size());
chassert(downloaded_size > 0);
chassert(std::filesystem::file_size(getPath()) > 0);
chassert(queue_iterator);
check_iterator(queue_iterator);
}
else
@ -865,8 +869,8 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock & lock)
void FileSegment::assertNotDetached() const
{
auto lock = lockFileSegment();
assertNotDetachedUnlocked(lock);
auto lk = lock();
assertNotDetachedUnlocked(lk);
}
void FileSegment::assertNotDetachedUnlocked(const FileSegmentGuard::Lock & lock) const
@ -883,7 +887,7 @@ void FileSegment::assertNotDetachedUnlocked(const FileSegmentGuard::Lock & lock)
FileSegment::Info FileSegment::getInfo(const FileSegmentPtr & file_segment)
{
auto lock = file_segment->lockFileSegment();
auto lock = file_segment->lock();
auto key_metadata = file_segment->tryGetKeyMetadata();
return Info{
.key = file_segment->key(),
@ -906,7 +910,7 @@ FileSegment::Info FileSegment::getInfo(const FileSegmentPtr & file_segment)
bool FileSegment::isDetached() const
{
auto lock = lockFileSegment();
auto lk = lock();
return download_state == State::DETACHED;
}
@ -922,7 +926,7 @@ bool FileSegment::isCompleted(bool sync) const
if (is_completed_state())
return true;
auto lock = lockFileSegment();
auto lk = lock();
return is_completed_state();
}

View File

@ -171,12 +171,14 @@ public:
* ========== Methods used by `cache` ========================
*/
FileSegmentGuard::Lock lock() const { return segment_guard.lock(); }
FileSegmentGuard::Lock lock() const;
Priority::IteratorPtr getQueueIterator() const;
void setQueueIterator(Priority::IteratorPtr iterator);
void resetQueueIterator();
KeyMetadataPtr tryGetKeyMetadata() const;
KeyMetadataPtr getKeyMetadata() const;
@ -241,7 +243,6 @@ private:
bool assertCorrectnessUnlocked(const FileSegmentGuard::Lock &) const;
LockedKeyPtr lockKeyMetadata(bool assert_exists = true) const;
FileSegmentGuard::Lock lockFileSegment() const;
String tryGetPath() const;

View File

@ -146,7 +146,20 @@ public:
const UserID & user_id,
const CachePriorityGuard::Lock &) = 0;
virtual bool modifySizeLimits(size_t max_size_, size_t max_elements_, double size_ratio_, const CachePriorityGuard::Lock &) = 0;
/// Collect eviction `candidates_num` candidates for eviction.
virtual bool collectCandidatesForEviction(
size_t desired_size,
size_t desired_elements_count,
size_t max_candidates_to_evict,
FileCacheReserveStat & stat,
EvictionCandidates & candidates,
const CachePriorityGuard::Lock &) = 0;
virtual bool modifySizeLimits(
size_t max_size_,
size_t max_elements_,
double size_ratio_,
const CachePriorityGuard::Lock &) = 0;
/// A space holder implementation, which allows to take hold of
/// some space in cache given that this space was freed.

View File

@ -17,9 +17,6 @@ namespace ProfileEvents
{
extern const Event FilesystemCacheEvictionSkippedFileSegments;
extern const Event FilesystemCacheEvictionTries;
extern const Event FilesystemCacheEvictMicroseconds;
extern const Event FilesystemCacheEvictedBytes;
extern const Event FilesystemCacheEvictedFileSegments;
extern const Event FilesystemCacheEvictionSkippedEvictingFileSegments;
}
@ -257,10 +254,14 @@ bool LRUFileCachePriority::canFit(
size_t elements,
size_t released_size_assumption,
size_t released_elements_assumption,
const CachePriorityGuard::Lock &) const
const CachePriorityGuard::Lock &,
const size_t * max_size_,
const size_t * max_elements_) const
{
return (max_size == 0 || state->current_size + size - released_size_assumption <= max_size)
&& (max_elements == 0 || state->current_elements_num + elements - released_elements_assumption <= max_elements);
return (max_size == 0
|| (state->current_size + size - released_size_assumption <= (max_size_ ? *max_size_ : max_size.load())))
&& (max_elements == 0
|| state->current_elements_num + elements - released_elements_assumption <= (max_elements_ ? *max_elements_ : max_elements.load()));
}
bool LRUFileCachePriority::collectCandidatesForEviction(
@ -277,37 +278,12 @@ bool LRUFileCachePriority::collectCandidatesForEviction(
return true;
}
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionTries);
IterateFunc iterate_func = [&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{
const auto & file_segment = segment_metadata->file_segment;
chassert(file_segment->assertCorrectness());
if (segment_metadata->releasable())
{
res.add(segment_metadata, locked_key, lock);
stat.update(segment_metadata->size(), file_segment->getKind(), true);
}
else
{
stat.update(segment_metadata->size(), file_segment->getKind(), false);
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionSkippedFileSegments);
}
return IterationResult::CONTINUE;
};
auto can_fit = [&]
{
return canFit(size, elements, stat.total_stat.releasable_size, stat.total_stat.releasable_count, lock);
};
iterate([&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{
return can_fit() ? IterationResult::BREAK : iterate_func(locked_key, segment_metadata);
}, lock);
iterateForEviction(res, stat, can_fit, lock);
if (can_fit())
{
/// `res` contains eviction candidates. Do we have any?
@ -346,8 +322,61 @@ bool LRUFileCachePriority::collectCandidatesForEviction(
}
}
LRUFileCachePriority::LRUIterator
LRUFileCachePriority::move(LRUIterator & it, LRUFileCachePriority & other, const CachePriorityGuard::Lock &)
bool LRUFileCachePriority::collectCandidatesForEviction(
size_t desired_size,
size_t desired_elements_count,
size_t max_candidates_to_evict,
FileCacheReserveStat & stat,
EvictionCandidates & res,
const CachePriorityGuard::Lock & lock)
{
auto stop_condition = [&, this]()
{
return canFit(0, 0, stat.total_stat.releasable_size, stat.total_stat.releasable_count,
lock, &desired_size, &desired_elements_count)
|| (max_candidates_to_evict && res.size() >= max_candidates_to_evict);
};
iterateForEviction(res, stat, stop_condition, lock);
return stop_condition();
}
void LRUFileCachePriority::iterateForEviction(
EvictionCandidates & res,
FileCacheReserveStat & stat,
StopConditionFunc stop_condition,
const CachePriorityGuard::Lock & lock)
{
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionTries);
IterateFunc iterate_func = [&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{
const auto & file_segment = segment_metadata->file_segment;
chassert(file_segment->assertCorrectness());
if (segment_metadata->releasable())
{
res.add(segment_metadata, locked_key, lock);
stat.update(segment_metadata->size(), file_segment->getKind(), true);
}
else
{
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionSkippedFileSegments);
stat.update(segment_metadata->size(), file_segment->getKind(), false);
}
return IterationResult::CONTINUE;
};
iterate([&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{
return stop_condition() ? IterationResult::BREAK : iterate_func(locked_key, segment_metadata);
}, lock);
}
LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(
LRUIterator & it,
LRUFileCachePriority & other,
const CachePriorityGuard::Lock &)
{
const auto & entry = *it.getEntry();
if (entry.size == 0)
@ -391,50 +420,31 @@ IFileCachePriority::PriorityDumpPtr LRUFileCachePriority::dump(const CachePriori
}
bool LRUFileCachePriority::modifySizeLimits(
size_t max_size_, size_t max_elements_, double /* size_ratio_ */, const CachePriorityGuard::Lock & lock)
size_t max_size_, size_t max_elements_, double /* size_ratio_ */, const CachePriorityGuard::Lock &)
{
if (max_size == max_size_ && max_elements == max_elements_)
return false; /// Nothing to change.
auto check_limits_satisfied = [&]()
if (state->current_size > max_size_ || state->current_elements_num > max_elements_)
{
return (max_size_ == 0 || state->current_size <= max_size_)
&& (max_elements_ == 0 || state->current_elements_num <= max_elements_);
};
if (check_limits_satisfied())
{
max_size = max_size_;
max_elements = max_elements_;
return true;
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot modify size limits to {} in size and {} in elements: "
"not enough space freed. Current size: {}/{}, elements: {}/{}",
max_size_, max_elements_,
state->current_size, max_size, state->current_elements_num, max_elements);
}
auto iterate_func = [&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{
chassert(segment_metadata->file_segment->assertCorrectness());
if (!segment_metadata->releasable())
return IterationResult::CONTINUE;
auto segment = segment_metadata->file_segment;
locked_key.removeFileSegment(segment->offset(), segment->lock());
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedFileSegments);
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedBytes, segment->getDownloadedSize());
return IterationResult::REMOVE_AND_CONTINUE;
};
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::FilesystemCacheEvictMicroseconds);
iterate(
[&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{ return check_limits_satisfied() ? IterationResult::BREAK : iterate_func(locked_key, segment_metadata); },
lock);
max_size = max_size_;
max_elements = max_elements_;
return true;
}
IFileCachePriority::EntryPtr LRUFileCachePriority::LRUIterator::getEntry() const
{
assertValid();
return *iterator;
}
void LRUFileCachePriority::LRUIterator::remove(const CachePriorityGuard::Lock & lock)
{
assertValid();

View File

@ -62,6 +62,14 @@ public:
const UserID & user_id,
const CachePriorityGuard::Lock &) override;
bool collectCandidatesForEviction(
size_t desired_size,
size_t desired_elements_count,
size_t max_candidates_to_evict,
FileCacheReserveStat & stat,
EvictionCandidates & res,
const CachePriorityGuard::Lock &) override;
void shuffle(const CachePriorityGuard::Lock &) override;
struct LRUPriorityDump : public IPriorityDump
@ -94,7 +102,9 @@ private:
size_t elements,
size_t released_size_assumption,
size_t released_elements_assumption,
const CachePriorityGuard::Lock &) const;
const CachePriorityGuard::Lock &,
const size_t * max_size_ = nullptr,
const size_t * max_elements_ = nullptr) const;
LRUQueue::iterator remove(LRUQueue::iterator it, const CachePriorityGuard::Lock &);
@ -110,6 +120,13 @@ private:
LRUIterator move(LRUIterator & it, LRUFileCachePriority & other, const CachePriorityGuard::Lock &);
LRUIterator add(EntryPtr entry, const CachePriorityGuard::Lock &);
using StopConditionFunc = std::function<bool()>;
void iterateForEviction(
EvictionCandidates & res,
FileCacheReserveStat & stat,
StopConditionFunc stop_condition,
const CachePriorityGuard::Lock &);
void holdImpl(
size_t size,
size_t elements,
@ -131,7 +148,7 @@ public:
LRUIterator & operator =(const LRUIterator & other);
bool operator ==(const LRUIterator & other) const;
EntryPtr getEntry() const override { return *iterator; }
EntryPtr getEntry() const override;
size_t increasePriority(const CachePriorityGuard::Lock &) override;

View File

@ -251,6 +251,49 @@ bool SLRUFileCachePriority::collectCandidatesForEvictionInProtected(
return true;
}
bool SLRUFileCachePriority::collectCandidatesForEviction(
size_t desired_size,
size_t desired_elements_count,
size_t max_candidates_to_evict,
FileCacheReserveStat & stat,
EvictionCandidates & res,
const CachePriorityGuard::Lock & lock)
{
const auto desired_probationary_size = getRatio(desired_size, 1 - size_ratio);
const auto desired_probationary_elements_num = getRatio(desired_elements_count, 1 - size_ratio);
FileCacheReserveStat probationary_stat;
const bool probationary_limit_satisfied = probationary_queue.collectCandidatesForEviction(
desired_probationary_size, desired_probationary_elements_num,
max_candidates_to_evict, probationary_stat, res, lock);
stat += probationary_stat;
LOG_TEST(log, "Collected {} to evict from probationary queue. Total size: {}",
res.size(), probationary_stat.total_stat.releasable_size);
chassert(!max_candidates_to_evict || res.size() <= max_candidates_to_evict);
chassert(res.size() == stat.total_stat.releasable_count);
if (max_candidates_to_evict && res.size() >= max_candidates_to_evict)
return probationary_limit_satisfied;
const auto desired_protected_size = getRatio(max_size, size_ratio);
const auto desired_protected_elements_num = getRatio(max_elements, size_ratio);
FileCacheReserveStat protected_stat;
const bool protected_limit_satisfied = protected_queue.collectCandidatesForEviction(
desired_protected_size, desired_protected_elements_num,
max_candidates_to_evict - res.size(), protected_stat, res, lock);
stat += protected_stat;
LOG_TEST(log, "Collected {} to evict from protected queue. Total size: {}",
res.size(), protected_stat.total_stat.releasable_size);
return probationary_limit_satisfied && protected_limit_satisfied;
}
void SLRUFileCachePriority::downgrade(IteratorPtr iterator, const CachePriorityGuard::Lock & lock)
{
auto * candidate_it = assert_cast<SLRUIterator *>(iterator.get());

View File

@ -58,6 +58,14 @@ public:
const UserID & user_id,
const CachePriorityGuard::Lock &) override;
bool collectCandidatesForEviction(
size_t desired_size,
size_t desired_elements_count,
size_t max_candidates_to_evict,
FileCacheReserveStat & stat,
EvictionCandidates & res,
const CachePriorityGuard::Lock &) override;
void shuffle(const CachePriorityGuard::Lock &) override;
PriorityDumpPtr dump(const CachePriorityGuard::Lock &) override;

View File

@ -56,7 +56,7 @@ InterpreterAlterQuery::InterpreterAlterQuery(const ASTPtr & query_ptr_, ContextP
BlockIO InterpreterAlterQuery::execute()
{
FunctionNameNormalizer().visit(query_ptr.get());
FunctionNameNormalizer::visit(query_ptr.get());
const auto & alter = query_ptr->as<ASTAlterQuery &>();
if (alter.alter_object == ASTAlterQuery::AlterObjectType::DATABASE)
{
@ -131,7 +131,7 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
if (modify_query)
{
// Expand CTE before filling default database
ApplyWithSubqueryVisitor().visit(*modify_query);
ApplyWithSubqueryVisitor::visit(*modify_query);
}
/// Add default database to table identifiers that we can encounter in e.g. default expressions, mutation expression, etc.

View File

@ -25,7 +25,7 @@ namespace ErrorCodes
BlockIO InterpreterCreateIndexQuery::execute()
{
FunctionNameNormalizer().visit(query_ptr.get());
FunctionNameNormalizer::visit(query_ptr.get());
auto current_context = getContext();
const auto & create_index = query_ptr->as<ASTCreateIndexQuery &>();

View File

@ -1114,7 +1114,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
// Table SQL definition is available even if the table is detached (even permanently)
auto query = database->getCreateTableQuery(create.getTable(), getContext());
FunctionNameNormalizer().visit(query.get());
FunctionNameNormalizer::visit(query.get());
auto create_query = query->as<ASTCreateQuery &>();
if (!create.is_dictionary && create_query.is_dictionary)
@ -1184,7 +1184,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if (create.select && create.isView())
{
// Expand CTE before filling default database
ApplyWithSubqueryVisitor().visit(*create.select);
ApplyWithSubqueryVisitor::visit(*create.select);
AddDefaultDatabaseVisitor visitor(getContext(), current_database);
visitor.visit(*create.select);
}
@ -1763,7 +1763,7 @@ BlockIO InterpreterCreateQuery::executeQueryOnCluster(ASTCreateQuery & create)
BlockIO InterpreterCreateQuery::execute()
{
FunctionNameNormalizer().visit(query_ptr.get());
FunctionNameNormalizer::visit(query_ptr.get());
auto & create = query_ptr->as<ASTCreateQuery &>();
bool is_create_database = create.database && !create.table;

View File

@ -168,7 +168,7 @@ void Set::setHeader(const ColumnsWithTypeAndName & header)
}
/// Choose data structure to use for the set.
data.init(data.chooseMethod(key_columns, key_sizes));
data.init(SetVariants::chooseMethod(key_columns, key_sizes));
}
void Set::fillSetElements()

View File

@ -144,7 +144,7 @@ void optimizeGroupBy(ASTSelectQuery * select_query, ContextPtr context)
}
else
{
FunctionOverloadResolverPtr function_builder = UserDefinedExecutableFunctionFactory::instance().tryGet(function->name, context);
FunctionOverloadResolverPtr function_builder = UserDefinedExecutableFunctionFactory::instance().tryGet(function->name, context); /// NOLINT(readability-static-accessed-through-instance)
if (!function_builder)
function_builder = function_factory.get(function->name, context);

View File

@ -912,7 +912,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Propagate WITH statement to children ASTSelect.
if (settings.enable_global_with_statement)
{
ApplyWithGlobalVisitor().visit(ast);
ApplyWithGlobalVisitor::visit(ast);
}
{

Some files were not shown because too many files have changed in this diff Show More