mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Merge remote-tracking branch 'rschu1ze/master' into clang-17
This commit is contained in:
commit
877e4f3aab
@ -38,7 +38,7 @@ struct Settings;
|
||||
M(UInt64, stale_log_gap, 10000, "When node became stale and should receive snapshots from leader", 0) \
|
||||
M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \
|
||||
M(UInt64, max_request_queue_size, 100000, "Maximum number of request that can be in queue for processing", 0) \
|
||||
M(UInt64, max_requests_batch_size, 1000, "Max size of batch of requests that can be sent to RAFT", 0) \
|
||||
M(UInt64, max_requests_batch_size, 100, "Max size of batch of requests that can be sent to RAFT", 0) \
|
||||
M(UInt64, max_requests_batch_bytes_size, 100*1024, "Max size in bytes of batch of requests that can be sent to RAFT", 0) \
|
||||
M(UInt64, max_flush_batch_size, 1000, "Max size of batch of requests that can be flushed together", 0) \
|
||||
M(UInt64, max_requests_quick_batch_size, 100, "Max size of batch of requests to try to get before proceeding with RAFT. Keeper will not wait for requests but take only requests that are already in queue" , 0) \
|
||||
|
@ -105,6 +105,7 @@ class IColumn;
|
||||
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
|
||||
M(UInt64, s3_retry_attempts, 10, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
|
||||
M(UInt64, s3_request_timeout_ms, 3000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
|
||||
M(UInt64, s3_http_connection_pool_size, 1000, "How many reusable open connections to keep per S3 endpoint. Only applies to the S3 table engine and table function, not to S3 disks (for disks, use disk config instead). Global setting, can only be set in config, overriding it per session or per query has no effect.", 0) \
|
||||
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
|
||||
M(String, s3queue_default_zookeeper_path, "/clickhouse/s3queue/", "Default zookeeper path prefix for S3Queue engine", 0) \
|
||||
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
|
||||
|
@ -58,6 +58,11 @@ using HTTPSessionPtr = std::shared_ptr<Poco::Net::HTTPClientSession>;
|
||||
/// All pooled sessions don't have this tag attached after being taken from a pool.
|
||||
/// If the request and the response were fully written/read, the client code should add this tag
|
||||
/// explicitly by calling `markSessionForReuse()`.
|
||||
///
|
||||
/// Note that HTTP response may contain extra bytes after the last byte of the payload. Specifically,
|
||||
/// when chunked encoding is used, there's an empty chunk at the end. Those extra bytes must also be
|
||||
/// read before the session can be reused. So we usually put an `istr->ignore(INT64_MAX)` call
|
||||
/// before `markSessionForReuse()`.
|
||||
struct HTTPSessionReuseTag
|
||||
{
|
||||
};
|
||||
@ -76,7 +81,18 @@ HTTPSessionPtr makeHTTPSession(
|
||||
Poco::Net::HTTPClientSession::ProxyConfig proxy_config = {}
|
||||
);
|
||||
|
||||
/// As previous method creates session, but tooks it from pool, without and with proxy uri.
|
||||
/// As previous method creates session, but takes it from pool, without and with proxy uri.
|
||||
///
|
||||
/// The max_connections_per_endpoint parameter makes it look like the pool size can be different for
|
||||
/// different requests (whatever that means), but actually we just assign the endpoint's connection
|
||||
/// pool size when we see the endpoint for the first time, then we never change it.
|
||||
/// We should probably change how this configuration works, and how this pooling works in general:
|
||||
/// * Make the per_endpoint_pool_size be a global server setting instead of per-disk or per-query.
|
||||
/// * Have boolean per-disk/per-query settings for enabling/disabling pooling.
|
||||
/// * Add a limit on the number of endpoints and the total number of sessions across all endpoints.
|
||||
/// * Enable pooling by default everywhere. In particular StorageURL and StorageS3.
|
||||
/// (Enabling it for StorageURL is scary without the previous item - the user may query lots of
|
||||
/// different endpoints. So currently pooling is mainly used for S3.)
|
||||
PooledHTTPSessionPtr makePooledHTTPSession(
|
||||
const Poco::URI & uri,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
|
@ -230,10 +230,35 @@ size_t ReadBufferFromS3::readBigAt(char * to, size_t n, size_t range_begin, cons
|
||||
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3Microseconds);
|
||||
|
||||
std::optional<Aws::S3::Model::GetObjectResult> result;
|
||||
/// Connection is reusable if we've read the full response.
|
||||
bool session_is_reusable = false;
|
||||
SCOPE_EXIT(
|
||||
{
|
||||
if (!result.has_value())
|
||||
return;
|
||||
if (session_is_reusable)
|
||||
{
|
||||
auto session = getSession(*result);
|
||||
if (!session.isNull())
|
||||
{
|
||||
DB::markSessionForReuse(session);
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3PreservedSessions);
|
||||
}
|
||||
else
|
||||
session_is_reusable = false;
|
||||
}
|
||||
if (!session_is_reusable)
|
||||
{
|
||||
resetSession(*result);
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3ResetSessions);
|
||||
}
|
||||
});
|
||||
|
||||
try
|
||||
{
|
||||
auto result = sendRequest(range_begin, range_begin + n - 1);
|
||||
std::istream & istr = result.GetBody();
|
||||
result = sendRequest(range_begin, range_begin + n - 1);
|
||||
std::istream & istr = result->GetBody();
|
||||
|
||||
copyFromIStreamWithProgressCallback(istr, to, n, progress_callback, &bytes_copied);
|
||||
|
||||
@ -241,6 +266,10 @@ size_t ReadBufferFromS3::readBigAt(char * to, size_t n, size_t range_begin, cons
|
||||
|
||||
if (read_settings.remote_throttler)
|
||||
read_settings.remote_throttler->add(bytes_copied, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
|
||||
|
||||
/// Read remaining bytes after the end of the payload, see HTTPSessionReuseTag.
|
||||
istr.ignore(INT64_MAX);
|
||||
session_is_reusable = true;
|
||||
}
|
||||
catch (Poco::Exception & e)
|
||||
{
|
||||
|
@ -1278,6 +1278,7 @@ void StorageS3::Configuration::connect(ContextPtr context)
|
||||
|
||||
client_configuration.endpointOverride = url.endpoint;
|
||||
client_configuration.maxConnections = static_cast<unsigned>(request_settings.max_connections);
|
||||
client_configuration.http_connection_pool_size = context->getGlobalContext()->getSettingsRef().s3_http_connection_pool_size;
|
||||
auto headers = auth_settings.headers;
|
||||
if (!headers_from_ast.empty())
|
||||
headers.insert(headers.end(), headers_from_ast.begin(), headers_from_ast.end());
|
||||
|
@ -389,19 +389,19 @@ void addRequestedPathAndFileVirtualsToChunk(
|
||||
{
|
||||
if (virtual_column.name == "_path")
|
||||
{
|
||||
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), path));
|
||||
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), path)->convertToFullColumnIfConst());
|
||||
}
|
||||
else if (virtual_column.name == "_file")
|
||||
{
|
||||
if (filename)
|
||||
{
|
||||
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), *filename));
|
||||
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), *filename)->convertToFullColumnIfConst());
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t last_slash_pos = path.find_last_of('/');
|
||||
auto filename_from_path = path.substr(last_slash_pos + 1);
|
||||
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), filename_from_path));
|
||||
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), filename_from_path)->convertToFullColumnIfConst());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -279,7 +279,7 @@ def test_cmd_conf(started_cluster):
|
||||
assert result["stale_log_gap"] == "10000"
|
||||
assert result["fresh_log_gap"] == "200"
|
||||
|
||||
assert result["max_requests_batch_size"] == "1000"
|
||||
assert result["max_requests_batch_size"] == "100"
|
||||
assert result["max_requests_batch_bytes_size"] == "102400"
|
||||
assert result["max_flush_batch_size"] == "1000"
|
||||
assert result["max_request_queue_size"] == "100000"
|
||||
|
@ -5,9 +5,9 @@
|
||||
{"data":{"k0":0,"k1":0,"k2":0,"k3":0,"k4":100,"k5":0}}
|
||||
{"data":{"k0":0,"k1":0,"k2":0,"k3":0,"k4":0,"k5":100}}
|
||||
Tuple(k0 Int8, k1 Int8, k2 Int8, k3 Int8, k4 Int8, k5 Int8)
|
||||
{"data":{"k0":0,"k1":0,"k2":100}}
|
||||
{"data":{"k0":0,"k1":100,"k2":0}}
|
||||
{"data":{"k0":100,"k1":0,"k2":0}}
|
||||
{"data":{"k0":0,"k1":100,"k2":0}}
|
||||
{"data":{"k0":0,"k1":0,"k2":100}}
|
||||
Tuple(k0 Int8, k1 Int8, k2 Int8)
|
||||
{"data":{"k1":100,"k3":0}}
|
||||
{"data":{"k1":0,"k3":100}}
|
||||
|
@ -0,0 +1,2 @@
|
||||
02884_1.csv 1
|
||||
02884_2.csv 2
|
@ -0,0 +1,4 @@
|
||||
-- Tags: no-fasttest
|
||||
insert into function file('02884_1.csv') select 1 as x settings engine_file_truncate_on_insert=1;
|
||||
insert into function file('02884_2.csv') select 2 as x settings engine_file_truncate_on_insert=1;
|
||||
select _file, * from file('02884_{1,2}.csv') order by _file settings max_threads=1;
|
Loading…
Reference in New Issue
Block a user