Merge branch 'master' into reuse_s3_mocks

This commit is contained in:
Alexander Tokmakov 2023-06-01 13:20:23 +03:00 committed by GitHub
commit 05c90f8987
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 175 additions and 106 deletions

2
contrib/libgsasl vendored

@ -1 +1 @@
Subproject commit f4e7bf0bb068030d57266f87ccac4c8c012fb5c4
Subproject commit 0fb79e7609ae5a5e015a41d24bcbadd48f8f5469

2
contrib/libxml2 vendored

@ -1 +1 @@
Subproject commit f507d167f1755b7eaea09fb1a44d29aab828b6d1
Subproject commit 223cb03a5d27b1b2393b266a8657443d046139d6

View File

@ -51,7 +51,7 @@ Calculates the MD5 from a string and returns the resulting set of bytes as Fixed
If you do not need MD5 in particular, but you need a decent cryptographic 128-bit hash, use the sipHash128 function instead.
If you want to get the same result as output by the md5sum utility, use lower(hex(MD5(s))).
## sipHash64 (#hash_functions-siphash64)
## sipHash64 {#hash_functions-siphash64}
Produces a 64-bit [SipHash](https://en.wikipedia.org/wiki/SipHash) hash value.
@ -63,9 +63,9 @@ This is a cryptographic hash function. It works at least three times faster than
The function [interprets](/docs/en/sql-reference/functions/type-conversion-functions.md/#type_conversion_functions-reinterpretAsString) all the input parameters as strings and calculates the hash value for each of them. It then combines the hashes by the following algorithm:
1. The first and the second hash value are concatenated to an array which is hashed.
2. The previously calculated hash value and the hash of the third input parameter are hashed in a similar way.
3. This calculation is repeated for all remaining hash values of the original input.
1. The first and the second hash value are concatenated to an array which is hashed.
2. The previously calculated hash value and the hash of the third input parameter are hashed in a similar way.
3. This calculation is repeated for all remaining hash values of the original input.
**Arguments**

View File

@ -909,6 +909,11 @@
<host>127.0.0.10</host>
<port>9000</port>
</replica>
<!-- Unavailable replica -->
<replica>
<host>127.0.0.11</host>
<port>1234</port>
</replica>
</shard>
</parallel_replicas>
<test_cluster_two_shards_localhost>

View File

@ -9,12 +9,13 @@
#include "Common/formatReadable.h"
#include <Common/TerminalSize.h>
#include <Common/UnicodeBar.h>
#include "IO/WriteBufferFromString.h"
#include <Databases/DatabaseMemory.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
/// http://en.wikipedia.org/wiki/ANSI_escape_code
#define CLEAR_TO_END_OF_LINE "\033[K"
namespace DB
{

View File

@ -14,7 +14,6 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/queryToString.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Common/logger_useful.h>
#include <Common/Macros.h>
#include <Common/filesystemHelpers.h>

View File

@ -298,7 +298,7 @@ off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
* Note: we read in range [file_offset_of_buffer_end, read_until_position).
*/
if (read_until_position && new_pos < *read_until_position
if (file_offset_of_buffer_end && read_until_position && new_pos < *read_until_position
&& new_pos > file_offset_of_buffer_end
&& new_pos < file_offset_of_buffer_end + read_settings.remote_read_min_bytes_for_seek)
{

View File

@ -91,12 +91,18 @@ namespace impl
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "wrong tuple size: key must be a tuple of 2 UInt64");
if (const auto * key0col = checkAndGetColumn<ColumnUInt64>(&(tuple->getColumn(0))))
ret.key0 = key0col->get64(0);
{
const auto & key0col_data = key0col->getData();
ret.key0 = key0col_data[0];
}
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "first element of the key tuple is not UInt64");
if (const auto * key1col = checkAndGetColumn<ColumnUInt64>(&(tuple->getColumn(1))))
ret.key1 = key1col->get64(0);
{
const auto & key1col_data = key1col->getData();
ret.key1 = key1col_data[0];
}
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "second element of the key tuple is not UInt64");
@ -1420,6 +1426,9 @@ public:
{
auto col_to = ColumnVector<ToType>::create(input_rows_count);
if (input_rows_count == 0)
return col_to;
typename ColumnVector<ToType>::Container & vec_to = col_to->getData();
/// If using a "keyed" algorithm, the first argument is the key and

View File

@ -121,17 +121,17 @@ void WriteBufferFromS3::TaskTracker::add(Callback && func)
/// preallocation for the second issue
FinishedList pre_allocated_finished {future_placeholder};
Callback func_with_notification = [&, func=std::move(func), pre_allocated_finished=std::move(pre_allocated_finished)] () mutable
Callback func_with_notification = [&, my_func = std::move(func), my_pre_allocated_finished = std::move(pre_allocated_finished)]() mutable
{
SCOPE_EXIT({
DENY_ALLOCATIONS_IN_SCOPE;
std::lock_guard lock(mutex);
finished_futures.splice(finished_futures.end(), pre_allocated_finished);
finished_futures.splice(finished_futures.end(), my_pre_allocated_finished);
has_finished.notify_one();
});
func();
my_func();
};
/// this move is nothrow

View File

@ -468,6 +468,12 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
/// Set skip_unavailable_shards to true only if it wasn't disabled explicitly
if (settings.allow_experimental_parallel_reading_from_replicas > 0 && !settings.skip_unavailable_shards && !settings.isChanged("skip_unavailable_shards"))
{
context->setSetting("skip_unavailable_shards", true);
}
/// Check support for JOIN for parallel replicas with custom key
if (joined_tables.tablesCount() > 1 && !settings.parallel_replicas_custom_key.value.empty())
{

View File

@ -47,8 +47,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_), scalars(scalars_)
, external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, extension(extension_)
{}
RemoteQueryExecutor::RemoteQueryExecutor(
@ -90,8 +89,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, extension(extension_)
{
create_connections = [this, connections_, throttler, extension_](AsyncCallback) mutable {
auto res = std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
@ -108,8 +106,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, extension(extension_)
{
create_connections = [this, pool, throttler, extension_](AsyncCallback async_callback)->std::unique_ptr<IConnections>
{
@ -247,6 +244,13 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
finished = true;
sent_query = true;
/// We need to tell the coordinator not to wait for this replica.
if (extension && extension->parallel_reading_coordinator)
{
chassert(extension->replica_info);
extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica);
}
return;
}
@ -360,7 +364,18 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
read_context->resume();
if (needToSkipUnavailableShard())
{
/// We need to tell the coordinator not to wait for this replica.
/// But at this point it may lead to an incomplete result set, because
/// this replica committed to read some part of there data and then died.
if (extension && extension->parallel_reading_coordinator)
{
chassert(extension->parallel_reading_coordinator);
extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica);
}
return ReadResult(Block());
}
/// Check if packet is not ready yet.
if (read_context->isInProgress())
@ -524,30 +539,30 @@ bool RemoteQueryExecutor::setPartUUIDs(const std::vector<UUID> & uuids)
void RemoteQueryExecutor::processReadTaskRequest()
{
if (!task_iterator)
if (!extension || !extension->task_iterator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized");
ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);
auto response = (*task_iterator)();
auto response = (*extension->task_iterator)();
connections->sendReadTaskResponse(response);
}
void RemoteQueryExecutor::processMergeTreeReadTaskRequest(ParallelReadRequest request)
{
if (!parallel_reading_coordinator)
if (!extension || !extension->parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsReceived);
auto response = parallel_reading_coordinator->handleRequest(std::move(request));
auto response = extension->parallel_reading_coordinator->handleRequest(std::move(request));
connections->sendMergeTreeReadTaskResponse(response);
}
void RemoteQueryExecutor::processMergeTreeInitialReadAnnounecement(InitialAllRangesAnnouncement announcement)
{
if (!parallel_reading_coordinator)
if (!extension || !extension->parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement);
extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement);
}
void RemoteQueryExecutor::finish()

View File

@ -212,11 +212,11 @@ private:
/// Temporary tables needed to be sent to remote servers
Tables external_tables;
QueryProcessingStage::Enum stage;
std::optional<Extension> extension;
/// Initiator identifier for distributed task processing
std::shared_ptr<TaskIterator> task_iterator;
std::shared_ptr<ParallelReplicasReadingCoordinator> parallel_reading_coordinator;
/// This is needed only for parallel reading from replicas, because
/// we create a RemoteQueryExecutor per replica and have to store additional info
/// about the number of the current replica or the count of replicas at all.

View File

@ -19,6 +19,7 @@
#include "Storages/MergeTree/RequestResponse.h"
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/IntersectionsIndexes.h>
#include <fmt/core.h>
#include <fmt/format.h>
namespace DB
@ -61,18 +62,22 @@ public:
{
size_t number_of_requests{0};
size_t sum_marks{0};
bool is_unavailable{false};
};
using Stats = std::vector<Stat>;
static String toString(Stats stats)
{
String result = "Statistics: ";
std::vector<String> stats_by_replica;
for (size_t i = 0; i < stats.size(); ++i)
result += fmt::format("-- replica {}, requests: {} marks: {} ", i, stats[i].number_of_requests, stats[i].sum_marks);
stats_by_replica.push_back(fmt::format("replica {}{} - {{requests: {} marks: {}}}", i, stats[i].is_unavailable ? " is unavailable" : "", stats[i].number_of_requests, stats[i].sum_marks));
result += fmt::format("{}", fmt::join(stats_by_replica, "; "));
return result;
}
Stats stats;
size_t replicas_count;
size_t replicas_count{0};
size_t unavailable_replicas_count{0};
explicit ImplInterface(size_t replicas_count_)
: stats{replicas_count_}
@ -82,6 +87,7 @@ public:
virtual ~ImplInterface() = default;
virtual ParallelReadResponse handleRequest(ParallelReadRequest request) = 0;
virtual void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) = 0;
virtual void markReplicaAsUnavailable(size_t replica_number) = 0;
};
using Parts = std::set<Part>;
@ -128,6 +134,7 @@ public:
ParallelReadResponse handleRequest(ParallelReadRequest request) override;
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) override;
void markReplicaAsUnavailable(size_t replica_number) override;
void updateReadingState(const InitialAllRangesAnnouncement & announcement);
void finalizeReadingState();
@ -199,6 +206,17 @@ void DefaultCoordinator::updateReadingState(const InitialAllRangesAnnouncement &
}
}
void DefaultCoordinator::markReplicaAsUnavailable(size_t replica_number)
{
LOG_DEBUG(log, "Replica number {} is unavailable", replica_number);
++unavailable_replicas_count;
stats[replica_number].is_unavailable = true;
if (sent_initial_requests == replicas_count - unavailable_replicas_count)
finalizeReadingState();
}
void DefaultCoordinator::finalizeReadingState()
{
/// Clear all the delayed queue
@ -345,12 +363,23 @@ public:
ParallelReadResponse handleRequest([[ maybe_unused ]] ParallelReadRequest request) override;
void handleInitialAllRangesAnnouncement([[ maybe_unused ]] InitialAllRangesAnnouncement announcement) override;
void markReplicaAsUnavailable(size_t replica_number) override;
Parts all_parts_to_read;
Poco::Logger * log = &Poco::Logger::get(fmt::format("{}{}", magic_enum::enum_name(mode), "Coordinator"));
};
template <CoordinationMode mode>
void InOrderCoordinator<mode>::markReplicaAsUnavailable(size_t replica_number)
{
LOG_DEBUG(log, "Replica number {} is unavailable", replica_number);
stats[replica_number].is_unavailable = true;
++unavailable_replicas_count;
/// There is nothing to do else.
}
template <CoordinationMode mode>
void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement)
@ -388,7 +417,6 @@ void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRang
}
}
template <CoordinationMode mode>
ParallelReadResponse InOrderCoordinator<mode>::handleRequest(ParallelReadRequest request)
{
@ -486,7 +514,7 @@ void ParallelReplicasReadingCoordinator::handleInitialAllRangesAnnouncement(Init
if (!pimpl)
{
setMode(announcement.mode);
mode = announcement.mode;
initialize();
}
@ -500,16 +528,23 @@ ParallelReadResponse ParallelReplicasReadingCoordinator::handleRequest(ParallelR
if (!pimpl)
{
setMode(request.mode);
mode = request.mode;
initialize();
}
return pimpl->handleRequest(std::move(request));
}
void ParallelReplicasReadingCoordinator::setMode(CoordinationMode mode_)
void ParallelReplicasReadingCoordinator::markReplicaAsUnavailable(size_t replica_number)
{
mode = mode_;
std::lock_guard lock(mutex);
if (!pimpl)
{
initialize();
}
return pimpl->markReplicaAsUnavailable(replica_number);
}
void ParallelReplicasReadingCoordinator::initialize()

View File

@ -18,10 +18,15 @@ public:
explicit ParallelReplicasReadingCoordinator(size_t replicas_count_);
~ParallelReplicasReadingCoordinator();
void setMode(CoordinationMode mode);
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement);
ParallelReadResponse handleRequest(ParallelReadRequest request);
/// Called when some replica is unavailable and we skipped it.
/// This is needed to "finalize" reading state e.g. spread all the marks using
/// consistent hashing, because otherwise coordinator will continue working in
/// "pending" state waiting for the unavailable replica to send the announcement.
void markReplicaAsUnavailable(size_t replica_number);
private:
void initialize();

View File

@ -1137,13 +1137,6 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata
auto cluster = getCluster();
const auto & settings = local_context->getSettingsRef();
/// Ban an attempt to make async insert into the table belonging to DatabaseMemory
if (!storage_policy && !owned_cluster && !settings.insert_distributed_sync && !settings.insert_shard_id)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Storage {} must have own data directory to enable asynchronous inserts",
getName());
}
auto shard_num = cluster->getLocalShardCount() + cluster->getRemoteShardCount();
/// If sharding key is not specified, then you can only write to a shard containing only one shard

View File

@ -31,7 +31,6 @@ namespace DB
{
class PullingPipelineExecutor;
class StorageS3SequentialSource;
class NamedCollection;
class StorageS3Source : public ISource, WithContext
@ -248,11 +247,6 @@ public:
String getPath() const { return url.key; }
void appendToPath(const String & suffix)
{
url = S3::URI{std::filesystem::path(url.uri.toString()) / suffix};
}
bool update(ContextPtr context);
void connect(ContextPtr context);

View File

@ -31,61 +31,8 @@ def cluster():
cluster.shutdown()
init_list = {
"ReadBufferFromS3Bytes": 0,
"ReadBufferFromS3Microseconds": 0,
"ReadBufferFromS3InitMicroseconds": 0,
"ReadBufferFromS3RequestsErrors": 0,
"WriteBufferFromS3Bytes": 0,
"WriteBufferFromS3Microseconds": 0,
"WriteBufferFromS3RequestsErrors": 0,
"S3ReadMicroseconds": 0,
"S3ReadRequestsCount": 0,
"S3ReadRequestsErrorsTotal": 0,
"S3ReadRequestsErrors503": 0,
"S3ReadRequestsRedirects": 0,
"S3WriteMicroseconds": 0,
"S3WriteRequestsCount": 0,
"S3WriteRequestsErrorsTotal": 0,
"S3WriteRequestsErrors503": 0,
"S3WriteRequestsRedirects": 0,
"DiskS3ReadMicroseconds": 0,
"DiskS3ReadRequestsCount": 0,
"DiskS3ReadRequestsErrorsTotal": 0,
"DiskS3ReadRequestsErrors503": 0,
"DiskS3ReadRequestsRedirects": 0,
"DiskS3WriteMicroseconds": 0,
"DiskS3WriteRequestsCount": 0,
"DiskS3WriteRequestsErrorsTotal": 0,
"DiskS3WriteRequestsErrors503": 0,
"DiskS3WriteRequestsRedirects": 0,
"S3DeleteObjects": 0,
"S3CopyObject": 0,
"S3ListObjects": 0,
"S3HeadObject": 0,
"S3CreateMultipartUpload": 0,
"S3UploadPartCopy": 0,
"S3UploadPart": 0,
"S3AbortMultipartUpload": 0,
"S3CompleteMultipartUpload": 0,
"S3PutObject": 0,
"S3GetObject": 0,
"DiskS3DeleteObjects": 0,
"DiskS3CopyObject": 0,
"DiskS3ListObjects": 0,
"DiskS3HeadObject": 0,
"DiskS3CreateMultipartUpload": 0,
"DiskS3UploadPartCopy": 0,
"DiskS3UploadPart": 0,
"DiskS3AbortMultipartUpload": 0,
"DiskS3CompleteMultipartUpload": 0,
"DiskS3PutObject": 0,
"DiskS3GetObject": 0,
}
def get_s3_events(instance):
result = init_list.copy()
result = dict()
events = instance.query(
"SELECT event, value FROM system.events WHERE event LIKE '%S3%'"
).split("\n")
@ -130,7 +77,7 @@ def get_minio_stat(cluster):
def get_query_stat(instance, hint):
result = init_list.copy()
result = dict()
instance.query("SYSTEM FLUSH LOGS")
events = instance.query(
"""
@ -146,7 +93,10 @@ def get_query_stat(instance, hint):
ev = event.split("\t")
if len(ev) == 2:
if "S3" in ev[0]:
result[ev[0]] += int(ev[1])
if ev[0] in result:
result[ev[0]] += int(ev[1])
else:
result[ev[0]] = int(ev[1])
return result

View File

@ -194,3 +194,7 @@ E28DBDE7FE22E41C
1
E28DBDE7FE22E41C
1
1CE422FEE7BD8DE20000000000000000
7766709361750702608
20AF99D3A87829E0
12489502208762728797

View File

@ -272,3 +272,9 @@ select hex(sipHash64());
SELECT hex(sipHash128()) = hex(reverse(unhex('1CE422FEE7BD8DE20000000000000000'))) or hex(sipHash128()) = '1CE422FEE7BD8DE20000000000000000';
select hex(sipHash64Keyed());
SELECT hex(sipHash128Keyed()) = hex(reverse(unhex('1CE422FEE7BD8DE20000000000000000'))) or hex(sipHash128Keyed()) = '1CE422FEE7BD8DE20000000000000000';
-- Crashed with memory sanitizer
SELECT hex(sipHash128ReferenceKeyed((toUInt64(2), toUInt64(-9223372036854775807)))) GROUP BY (toUInt64(506097522914230528), toUInt64(now64(2, NULL + NULL), 1084818905618843912)), toUInt64(2), NULL + NULL, char(-2147483649, 1);
SELECT sipHash64Keyed((2::UInt64, toUInt64(2)), 4) GROUP BY toUInt64(2);
SELECT hex(sipHash64Keyed((toUInt64(9223372036854775806), toUInt64(-9223372036854775808)), char(2147483646, -2147483648, 1, 3, 4, 7, 2147483647))) GROUP BY toUInt64(257), (toUInt64(9223372036854775806), toUInt64(2147483646));
SELECT sipHash64Keyed((toUInt64(9223372036854775806), 9223372036854775808::UInt64), char(2)) GROUP BY toUInt64(9223372036854775806);

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS test_parallel_replicas_unavailable_shards;
CREATE TABLE test_parallel_replicas_unavailable_shards (n UInt64) ENGINE=MergeTree() ORDER BY tuple();
INSERT INTO test_parallel_replicas_unavailable_shards SELECT * FROM numbers(10);
SYSTEM FLUSH LOGS;
SET skip_unavailable_shards=1, allow_experimental_parallel_reading_from_replicas=1, max_parallel_replicas=11, use_hedged_requests=0, cluster_for_parallel_replicas='parallel_replicas', parallel_replicas_for_non_replicated_merge_tree=1;
SET send_logs_level='error';
SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*);
SYSTEM FLUSH LOGS;
SELECT count() > 0 FROM system.text_log WHERE yesterday() <= event_date AND message LIKE '%Replica number 10 is unavailable%';
DROP TABLE test_parallel_replicas_unavailable_shards;

View File

@ -0,0 +1 @@
4 66446 66446

View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-random-settings
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -nm --query "
DROP TABLE IF EXISTS test_s3;
CREATE TABLE test_s3 (a UInt64, b UInt64)
ENGINE = MergeTree ORDER BY a
SETTINGS disk = 's3_disk', min_bytes_for_wide_part = 0;
INSERT INTO test_s3 SELECT number, number FROM numbers(1000000);
"
query="SELECT sum(b) FROM test_s3 WHERE a >= 100000 AND a <= 102000"
query_id=$(${CLICKHOUSE_CLIENT} --query "select queryID() from ($query) limit 1" 2>&1)
${CLICKHOUSE_CLIENT} --query "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -nm --query "
SELECT
ProfileEvents['S3ReadRequestsCount'],
ProfileEvents['ReadBufferFromS3Bytes'],
ProfileEvents['ReadCompressedBytes']
FROM system.query_log
WHERE type = 'QueryFinish'
AND current_database = currentDatabase()
AND query_id='$query_id';
"