mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge branch 'master' into improve-dashboard
This commit is contained in:
commit
54fa1f14bb
@ -26,7 +26,6 @@ sed -i '/onBrokenMarkdownLinks:/ s/ignore/error/g' docusaurus.config.js
|
||||
|
||||
if [[ $# -lt 1 ]] || [[ "$1" == "--"* ]]; then
|
||||
export CI=true
|
||||
yarn install
|
||||
exec yarn build "$@"
|
||||
fi
|
||||
|
||||
|
@ -306,6 +306,8 @@
|
||||
\
|
||||
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
|
||||
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
|
||||
\
|
||||
M(S3DiskNoKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \
|
||||
|
||||
#ifdef APPLY_FOR_EXTERNAL_METRICS
|
||||
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)
|
||||
|
@ -103,7 +103,15 @@ static std::string getSortDescriptionDump(const SortDescription & description, c
|
||||
WriteBufferFromOwnString buffer;
|
||||
|
||||
for (size_t i = 0; i < description.size(); ++i)
|
||||
buffer << header_types[i]->getName() << ' ' << description[i].direction << ' ' << description[i].nulls_direction;
|
||||
{
|
||||
if (i != 0)
|
||||
buffer << ", ";
|
||||
|
||||
buffer << "(type: " << header_types[i]->getName()
|
||||
<< ", direction: " << description[i].direction
|
||||
<< ", nulls_direction: " << description[i].nulls_direction
|
||||
<< ")";
|
||||
}
|
||||
|
||||
return buffer.str();
|
||||
}
|
||||
|
@ -12,7 +12,6 @@
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
#include <Common/ZooKeeper/Types.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
#include <Common/PoolId.h>
|
||||
#include <Core/ServerSettings.h>
|
||||
#include <Core/Settings.h>
|
||||
@ -339,12 +338,9 @@ ClusterPtr DatabaseReplicated::getClusterImpl(bool all_groups) const
|
||||
return std::make_shared<Cluster>(getContext()->getSettingsRef(), shards, params);
|
||||
}
|
||||
|
||||
ReplicasInfo DatabaseReplicated::tryGetReplicasInfo(const ClusterPtr & cluster_) const
|
||||
std::vector<UInt8> DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr & cluster_) const
|
||||
{
|
||||
Strings paths_get, paths_exists;
|
||||
|
||||
paths_get.emplace_back(fs::path(zookeeper_path) / "max_log_ptr");
|
||||
|
||||
Strings paths;
|
||||
const auto & addresses_with_failover = cluster_->getShardsAddresses();
|
||||
const auto & shards_info = cluster_->getShardsInfo();
|
||||
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
|
||||
@ -352,59 +348,32 @@ ReplicasInfo DatabaseReplicated::tryGetReplicasInfo(const ClusterPtr & cluster_)
|
||||
for (const auto & replica : addresses_with_failover[shard_index])
|
||||
{
|
||||
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
|
||||
paths_exists.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
|
||||
paths_get.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "log_ptr");
|
||||
paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
auto current_zookeeper = getZooKeeper();
|
||||
auto get_res = current_zookeeper->get(paths_get);
|
||||
auto exist_res = current_zookeeper->exists(paths_exists);
|
||||
chassert(get_res.size() == exist_res.size() + 1);
|
||||
auto res = current_zookeeper->exists(paths);
|
||||
|
||||
auto max_log_ptr_zk = get_res[0];
|
||||
if (max_log_ptr_zk.error != Coordination::Error::ZOK)
|
||||
throw Coordination::Exception(max_log_ptr_zk.error);
|
||||
std::vector<UInt8> statuses;
|
||||
statuses.resize(paths.size());
|
||||
|
||||
UInt32 max_log_ptr = parse<UInt32>(max_log_ptr_zk.data);
|
||||
for (size_t i = 0; i < res.size(); ++i)
|
||||
if (res[i].error == Coordination::Error::ZOK)
|
||||
statuses[i] = 1;
|
||||
|
||||
ReplicasInfo replicas_info;
|
||||
replicas_info.resize(exist_res.size());
|
||||
|
||||
size_t global_replica_index = 0;
|
||||
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
|
||||
{
|
||||
for (const auto & replica : addresses_with_failover[shard_index])
|
||||
{
|
||||
auto replica_active = exist_res[global_replica_index];
|
||||
auto replica_log_ptr = get_res[global_replica_index + 1];
|
||||
|
||||
if (replica_active.error != Coordination::Error::ZOK && replica_active.error != Coordination::Error::ZNONODE)
|
||||
throw Coordination::Exception(replica_active.error);
|
||||
|
||||
if (replica_log_ptr.error != Coordination::Error::ZOK)
|
||||
throw Coordination::Exception(replica_log_ptr.error);
|
||||
|
||||
replicas_info[global_replica_index] = ReplicaInfo{
|
||||
.is_active = replica_active.error == Coordination::Error::ZOK,
|
||||
.replication_lag = max_log_ptr - parse<UInt32>(replica_log_ptr.data),
|
||||
.recovery_time = replica.is_local ? ddl_worker->getCurrentInitializationDurationMs() : 0,
|
||||
};
|
||||
|
||||
++global_replica_index;
|
||||
}
|
||||
}
|
||||
|
||||
return replicas_info;
|
||||
} catch (...)
|
||||
return statuses;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco::Util::AbstractConfiguration & config_ref)
|
||||
{
|
||||
const auto & config_prefix = fmt::format("named_collections.{}", collection_name);
|
||||
|
@ -17,14 +17,6 @@ using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
|
||||
class Cluster;
|
||||
using ClusterPtr = std::shared_ptr<Cluster>;
|
||||
|
||||
struct ReplicaInfo
|
||||
{
|
||||
bool is_active;
|
||||
UInt32 replication_lag;
|
||||
UInt64 recovery_time;
|
||||
};
|
||||
using ReplicasInfo = std::vector<ReplicaInfo>;
|
||||
|
||||
class DatabaseReplicated : public DatabaseAtomic
|
||||
{
|
||||
public:
|
||||
@ -92,7 +84,7 @@ public:
|
||||
|
||||
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, bool throw_if_noop);
|
||||
|
||||
ReplicasInfo tryGetReplicasInfo(const ClusterPtr & cluster_) const;
|
||||
std::vector<UInt8> tryGetAreReplicasActive(const ClusterPtr & cluster_) const;
|
||||
|
||||
void renameDatabase(ContextPtr query_context, const String & new_name) override;
|
||||
|
||||
|
@ -32,12 +32,6 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db
|
||||
|
||||
bool DatabaseReplicatedDDLWorker::initializeMainThread()
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(initialization_duration_timer_mutex);
|
||||
initialization_duration_timer.emplace();
|
||||
initialization_duration_timer->start();
|
||||
}
|
||||
|
||||
while (!stop_flag)
|
||||
{
|
||||
try
|
||||
@ -75,10 +69,6 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
|
||||
|
||||
initializeReplication();
|
||||
initialized = true;
|
||||
{
|
||||
std::lock_guard lock(initialization_duration_timer_mutex);
|
||||
initialization_duration_timer.reset();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
catch (...)
|
||||
@ -88,11 +78,6 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(initialization_duration_timer_mutex);
|
||||
initialization_duration_timer.reset();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -474,10 +459,4 @@ UInt32 DatabaseReplicatedDDLWorker::getLogPointer() const
|
||||
return max_id.load();
|
||||
}
|
||||
|
||||
UInt64 DatabaseReplicatedDDLWorker::getCurrentInitializationDurationMs() const
|
||||
{
|
||||
std::lock_guard lock(initialization_duration_timer_mutex);
|
||||
return initialization_duration_timer ? initialization_duration_timer->elapsedMilliseconds() : 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -36,8 +36,6 @@ public:
|
||||
DatabaseReplicated * const database, bool committed = false); /// NOLINT
|
||||
|
||||
UInt32 getLogPointer() const;
|
||||
|
||||
UInt64 getCurrentInitializationDurationMs() const;
|
||||
private:
|
||||
bool initializeMainThread() override;
|
||||
void initializeReplication();
|
||||
@ -58,9 +56,6 @@ private:
|
||||
ZooKeeperPtr active_node_holder_zookeeper;
|
||||
/// It will remove "active" node when database is detached
|
||||
zkutil::EphemeralNodeHolderPtr active_node_holder;
|
||||
|
||||
std::optional<Stopwatch> initialization_duration_timer;
|
||||
mutable std::mutex initialization_duration_timer_mutex;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/ProxyConfigurationResolverProvider.h>
|
||||
|
||||
#include <Core/Settings.h>
|
||||
@ -43,6 +44,11 @@ namespace ProfileEvents
|
||||
extern const Event TinyS3Clients;
|
||||
}
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric S3DiskNoKeyErrors;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -381,7 +387,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
|
||||
|
||||
/// The next call is NOT a recurcive call
|
||||
/// This is a virtuall call Aws::S3::S3Client::HeadObject(const Model::HeadObjectRequest&)
|
||||
return enrichErrorMessage(
|
||||
return processRequestResult(
|
||||
HeadObject(static_cast<const Model::HeadObjectRequest&>(request)));
|
||||
}
|
||||
|
||||
@ -402,7 +408,7 @@ Model::ListObjectsOutcome Client::ListObjects(ListObjectsRequest & request) cons
|
||||
|
||||
Model::GetObjectOutcome Client::GetObject(GetObjectRequest & request) const
|
||||
{
|
||||
return enrichErrorMessage(
|
||||
return processRequestResult(
|
||||
doRequest(request, [this](const Model::GetObjectRequest & req) { return GetObject(req); }));
|
||||
}
|
||||
|
||||
@ -689,11 +695,14 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request
|
||||
}
|
||||
|
||||
template <typename RequestResult>
|
||||
RequestResult Client::enrichErrorMessage(RequestResult && outcome) const
|
||||
RequestResult Client::processRequestResult(RequestResult && outcome) const
|
||||
{
|
||||
if (outcome.IsSuccess() || !isClientForDisk())
|
||||
return std::forward<RequestResult>(outcome);
|
||||
|
||||
if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
|
||||
CurrentMetrics::add(CurrentMetrics::S3DiskNoKeyErrors);
|
||||
|
||||
String enriched_message = fmt::format(
|
||||
"{} {}",
|
||||
outcome.GetError().GetMessage(),
|
||||
|
@ -271,7 +271,7 @@ private:
|
||||
void insertRegionOverride(const std::string & bucket, const std::string & region) const;
|
||||
|
||||
template <typename RequestResult>
|
||||
RequestResult enrichErrorMessage(RequestResult && outcome) const;
|
||||
RequestResult processRequestResult(RequestResult && outcome) const;
|
||||
|
||||
String initial_endpoint;
|
||||
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;
|
||||
|
@ -31,8 +31,6 @@ ColumnsDescription StorageSystemClusters::getColumnsDescription()
|
||||
{"database_shard_name", std::make_shared<DataTypeString>(), "The name of the `Replicated` database shard (for clusters that belong to a `Replicated` database)."},
|
||||
{"database_replica_name", std::make_shared<DataTypeString>(), "The name of the `Replicated` database replica (for clusters that belong to a `Replicated` database)."},
|
||||
{"is_active", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "The status of the Replicated database replica (for clusters that belong to a Replicated database): 1 means 'replica is online', 0 means 'replica is offline', NULL means 'unknown'."},
|
||||
{"replication_lag", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt32>()), "The replication lag of the `Replicated` database replica (for clusters that belong to a Replicated database)."},
|
||||
{"recovery_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()), "The recovery time of the `Replicated` database replica (for clusters that belong to a Replicated database), in milliseconds."},
|
||||
};
|
||||
|
||||
description.setAliases({
|
||||
@ -48,30 +46,31 @@ void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr co
|
||||
writeCluster(res_columns, name_and_cluster, {});
|
||||
|
||||
const auto databases = DatabaseCatalog::instance().getDatabases();
|
||||
for (const auto & [database_name, database] : databases)
|
||||
for (const auto & name_and_database : databases)
|
||||
{
|
||||
if (const auto * replicated = typeid_cast<const DatabaseReplicated *>(database.get()))
|
||||
if (const auto * replicated = typeid_cast<const DatabaseReplicated *>(name_and_database.second.get()))
|
||||
{
|
||||
|
||||
if (auto database_cluster = replicated->tryGetCluster())
|
||||
writeCluster(res_columns, {database_name, database_cluster},
|
||||
replicated->tryGetReplicasInfo(database_cluster));
|
||||
writeCluster(res_columns, {name_and_database.first, database_cluster},
|
||||
replicated->tryGetAreReplicasActive(database_cluster));
|
||||
|
||||
if (auto database_cluster = replicated->tryGetAllGroupsCluster())
|
||||
writeCluster(res_columns, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + database_name, database_cluster},
|
||||
replicated->tryGetReplicasInfo(database_cluster));
|
||||
writeCluster(res_columns, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + name_and_database.first, database_cluster},
|
||||
replicated->tryGetAreReplicasActive(database_cluster));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster,
|
||||
const ReplicasInfo & replicas_info)
|
||||
const std::vector<UInt8> & is_active)
|
||||
{
|
||||
const String & cluster_name = name_and_cluster.first;
|
||||
const ClusterPtr & cluster = name_and_cluster.second;
|
||||
const auto & shards_info = cluster->getShardsInfo();
|
||||
const auto & addresses_with_failover = cluster->getShardsAddresses();
|
||||
|
||||
size_t global_replica_idx = 0;
|
||||
size_t replica_idx = 0;
|
||||
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
|
||||
{
|
||||
const auto & shard_info = shards_info[shard_index];
|
||||
@ -100,24 +99,10 @@ void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const Nam
|
||||
res_columns[i++]->insert(pool_status[replica_index].estimated_recovery_time.count());
|
||||
res_columns[i++]->insert(address.database_shard_name);
|
||||
res_columns[i++]->insert(address.database_replica_name);
|
||||
if (replicas_info.empty())
|
||||
{
|
||||
if (is_active.empty())
|
||||
res_columns[i++]->insertDefault();
|
||||
res_columns[i++]->insertDefault();
|
||||
res_columns[i++]->insertDefault();
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & replica_info = replicas_info[global_replica_idx];
|
||||
res_columns[i++]->insert(replica_info.is_active);
|
||||
res_columns[i++]->insert(replica_info.replication_lag);
|
||||
if (replica_info.recovery_time != 0)
|
||||
res_columns[i++]->insert(replica_info.recovery_time);
|
||||
else
|
||||
res_columns[i++]->insertDefault();
|
||||
}
|
||||
|
||||
++global_replica_idx;
|
||||
res_columns[i++]->insert(is_active[replica_idx++]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,10 @@
|
||||
#pragma once
|
||||
|
||||
#include <Databases/DatabaseReplicated.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Storages/System/IStorageSystemOneBlock.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -27,7 +27,7 @@ protected:
|
||||
using NameAndCluster = std::pair<String, std::shared_ptr<Cluster>>;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const ReplicasInfo & replicas_info);
|
||||
static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const std::vector<UInt8> & is_active);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -4,6 +4,8 @@ import logging
|
||||
import pytest
|
||||
import os
|
||||
import minio
|
||||
import random
|
||||
import string
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.mock_servers import start_s3_mock
|
||||
@ -45,6 +47,11 @@ def cluster():
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def randomize_query_id(query_id, random_suffix_length=10):
|
||||
letters = string.ascii_letters + string.digits
|
||||
return f"{query_id}_{''.join(random.choice(letters) for _ in range(random_suffix_length))}"
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def init_broken_s3(cluster):
|
||||
yield start_s3_mock(cluster, "broken_s3", "8083")
|
||||
@ -61,6 +68,7 @@ def test_upload_after_check_works(cluster, broken_s3):
|
||||
|
||||
node.query(
|
||||
"""
|
||||
DROP TABLE IF EXISTS s3_upload_after_check_works;
|
||||
CREATE TABLE s3_upload_after_check_works (
|
||||
id Int64,
|
||||
data String
|
||||
@ -127,7 +135,9 @@ def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression
|
||||
|
||||
broken_s3.setup_at_create_multi_part_upload()
|
||||
|
||||
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_FAIL_CREATE_MPU_{compression}"
|
||||
insert_query_id = randomize_query_id(
|
||||
f"INSERT_INTO_TABLE_FUNCTION_FAIL_CREATE_MPU_{compression}"
|
||||
)
|
||||
error = node.query_and_get_error(
|
||||
f"""
|
||||
INSERT INTO
|
||||
@ -169,7 +179,9 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload(
|
||||
broken_s3.setup_fake_multpartuploads()
|
||||
broken_s3.setup_at_part_upload(count=1, after=2)
|
||||
|
||||
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_FAIL_UPLOAD_PART_{compression}"
|
||||
insert_query_id = randomize_query_id(
|
||||
f"INSERT_INTO_TABLE_FUNCTION_FAIL_UPLOAD_PART_{compression}"
|
||||
)
|
||||
error = node.query_and_get_error(
|
||||
f"""
|
||||
INSERT INTO
|
||||
@ -221,7 +233,7 @@ def test_when_error_is_retried(cluster, broken_s3, action_and_message):
|
||||
broken_s3.setup_fake_multpartuploads()
|
||||
broken_s3.setup_at_part_upload(count=3, after=2, action=action)
|
||||
|
||||
insert_query_id = f"INSERT_INTO_TABLE_{action}_RETRIED"
|
||||
insert_query_id = randomize_query_id(f"INSERT_INTO_TABLE_{action}_RETRIED")
|
||||
node.query(
|
||||
f"""
|
||||
INSERT INTO
|
||||
@ -250,7 +262,7 @@ def test_when_error_is_retried(cluster, broken_s3, action_and_message):
|
||||
assert s3_errors == 3
|
||||
|
||||
broken_s3.setup_at_part_upload(count=1000, after=2, action=action)
|
||||
insert_query_id = f"INSERT_INTO_TABLE_{action}_RETRIED_1"
|
||||
insert_query_id = randomize_query_id(f"INSERT_INTO_TABLE_{action}_RETRIED_1")
|
||||
error = node.query_and_get_error(
|
||||
f"""
|
||||
INSERT INTO
|
||||
@ -285,7 +297,7 @@ def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3):
|
||||
action="broken_pipe",
|
||||
)
|
||||
|
||||
insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD"
|
||||
insert_query_id = randomize_query_id(f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD")
|
||||
node.query(
|
||||
f"""
|
||||
INSERT INTO
|
||||
@ -319,7 +331,7 @@ def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3):
|
||||
after=2,
|
||||
action="broken_pipe",
|
||||
)
|
||||
insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD_1"
|
||||
insert_query_id = randomize_query_id(f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD_1")
|
||||
error = node.query_and_get_error(
|
||||
f"""
|
||||
INSERT INTO
|
||||
@ -361,7 +373,7 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried(
|
||||
action_args=["1"] if send_something else ["0"],
|
||||
)
|
||||
|
||||
insert_query_id = (
|
||||
insert_query_id = randomize_query_id(
|
||||
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_UPLOAD_{send_something}"
|
||||
)
|
||||
node.query(
|
||||
@ -398,7 +410,7 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried(
|
||||
action="connection_reset_by_peer",
|
||||
action_args=["1"] if send_something else ["0"],
|
||||
)
|
||||
insert_query_id = (
|
||||
insert_query_id = randomize_query_id(
|
||||
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_UPLOAD_{send_something}_1"
|
||||
)
|
||||
error = node.query_and_get_error(
|
||||
@ -443,7 +455,7 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried(
|
||||
action_args=["1"] if send_something else ["0"],
|
||||
)
|
||||
|
||||
insert_query_id = (
|
||||
insert_query_id = randomize_query_id(
|
||||
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_MULTIPARTUPLOAD_{send_something}"
|
||||
)
|
||||
node.query(
|
||||
@ -481,7 +493,7 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried(
|
||||
action_args=["1"] if send_something else ["0"],
|
||||
)
|
||||
|
||||
insert_query_id = (
|
||||
insert_query_id = randomize_query_id(
|
||||
f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_MULTIPARTUPLOAD_{send_something}_1"
|
||||
)
|
||||
error = node.query_and_get_error(
|
||||
@ -521,7 +533,7 @@ def test_query_is_canceled_with_inf_retries(cluster, broken_s3):
|
||||
action="connection_refused",
|
||||
)
|
||||
|
||||
insert_query_id = f"TEST_QUERY_IS_CANCELED_WITH_INF_RETRIES"
|
||||
insert_query_id = randomize_query_id(f"TEST_QUERY_IS_CANCELED_WITH_INF_RETRIES")
|
||||
request = node.get_query_request(
|
||||
f"""
|
||||
INSERT INTO
|
||||
@ -579,7 +591,7 @@ def test_adaptive_timeouts(cluster, broken_s3, node_name):
|
||||
count=1000000,
|
||||
)
|
||||
|
||||
insert_query_id = f"TEST_ADAPTIVE_TIMEOUTS_{node_name}"
|
||||
insert_query_id = randomize_query_id(f"TEST_ADAPTIVE_TIMEOUTS_{node_name}")
|
||||
node.query(
|
||||
f"""
|
||||
INSERT INTO
|
||||
@ -631,6 +643,7 @@ def test_no_key_found_disk(cluster, broken_s3):
|
||||
|
||||
node.query(
|
||||
"""
|
||||
DROP TABLE IF EXISTS no_key_found_disk;
|
||||
CREATE TABLE no_key_found_disk (
|
||||
id Int64
|
||||
) ENGINE=MergeTree()
|
||||
@ -689,3 +702,15 @@ def test_no_key_found_disk(cluster, broken_s3):
|
||||
"DB::Exception: The specified key does not exist. This error happened for S3 disk."
|
||||
in error
|
||||
)
|
||||
|
||||
s3_disk_no_key_errors_metric_value = int(
|
||||
node.query(
|
||||
"""
|
||||
SELECT value
|
||||
FROM system.metrics
|
||||
WHERE metric = 'S3DiskNoKeyErrors'
|
||||
"""
|
||||
).strip()
|
||||
)
|
||||
|
||||
assert s3_disk_no_key_errors_metric_value > 0
|
||||
|
@ -1,41 +0,0 @@
|
||||
<clickhouse>
|
||||
<tcp_port>9000</tcp_port>
|
||||
|
||||
<profiles>
|
||||
<default>
|
||||
</default>
|
||||
</profiles>
|
||||
|
||||
<users>
|
||||
<default>
|
||||
<profile>default</profile>
|
||||
<no_password></no_password>
|
||||
</default>
|
||||
</users>
|
||||
|
||||
<keeper_server>
|
||||
<tcp_port>2181</tcp_port>
|
||||
<server_id>1</server_id>
|
||||
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
||||
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
|
||||
<coordination_settings>
|
||||
<session_timeout_ms>20000</session_timeout_ms>
|
||||
</coordination_settings>
|
||||
<raft_configuration>
|
||||
<server>
|
||||
<id>1</id>
|
||||
<hostname>localhost</hostname>
|
||||
<port>9444</port>
|
||||
</server>
|
||||
</raft_configuration>
|
||||
</keeper_server>
|
||||
|
||||
<zookeeper>
|
||||
<node index="1">
|
||||
<host>localhost</host>
|
||||
<port>2181</port>
|
||||
</node>
|
||||
<session_timeout_ms>20000</session_timeout_ms>
|
||||
</zookeeper>
|
||||
|
||||
</clickhouse>
|
@ -1,61 +0,0 @@
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance(
|
||||
"node",
|
||||
main_configs=["configs/config.xml"],
|
||||
stay_alive=True,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_recovery_time_metric(start_cluster):
|
||||
node.query(
|
||||
"""
|
||||
DROP DATABASE IF EXISTS rdb;
|
||||
CREATE DATABASE rdb
|
||||
ENGINE = Replicated('/test/test_recovery_time_metric', 'shard1', 'replica1')
|
||||
"""
|
||||
)
|
||||
|
||||
node.query(
|
||||
"""
|
||||
DROP TABLE IF EXISTS rdb.t;
|
||||
CREATE TABLE rdb.t
|
||||
(
|
||||
`x` UInt32
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY x
|
||||
"""
|
||||
)
|
||||
|
||||
node.exec_in_container(["bash", "-c", "rm /var/lib/clickhouse/metadata/rdb/t.sql"])
|
||||
|
||||
node.restart_clickhouse()
|
||||
|
||||
ret = int(
|
||||
node.query(
|
||||
"""
|
||||
SELECT recovery_time
|
||||
FROM system.clusters
|
||||
WHERE cluster = 'rdb'
|
||||
"""
|
||||
).strip()
|
||||
)
|
||||
assert ret > 0
|
||||
|
||||
node.query(
|
||||
"""
|
||||
DROP DATABASE rdb
|
||||
"""
|
||||
)
|
@ -8,6 +8,8 @@ import os
|
||||
import json
|
||||
import time
|
||||
import glob
|
||||
import random
|
||||
import string
|
||||
|
||||
import pyspark
|
||||
import delta
|
||||
@ -52,6 +54,11 @@ def get_spark():
|
||||
return builder.master("local").getOrCreate()
|
||||
|
||||
|
||||
def randomize_table_name(table_name, random_suffix_length=10):
|
||||
letters = string.ascii_letters + string.digits
|
||||
return f"{table_name}{''.join(random.choice(letters) for _ in range(random_suffix_length))}"
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
@ -151,7 +158,7 @@ def test_single_log_file(started_cluster):
|
||||
spark = started_cluster.spark_session
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = started_cluster.minio_bucket
|
||||
TABLE_NAME = "test_single_log_file"
|
||||
TABLE_NAME = randomize_table_name("test_single_log_file")
|
||||
|
||||
inserted_data = "SELECT number as a, toString(number + 1) as b FROM numbers(100)"
|
||||
parquet_data_path = create_initial_data_file(
|
||||
@ -175,7 +182,7 @@ def test_partition_by(started_cluster):
|
||||
spark = started_cluster.spark_session
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = started_cluster.minio_bucket
|
||||
TABLE_NAME = "test_partition_by"
|
||||
TABLE_NAME = randomize_table_name("test_partition_by")
|
||||
|
||||
write_delta_from_df(
|
||||
spark,
|
||||
@ -197,7 +204,7 @@ def test_checkpoint(started_cluster):
|
||||
spark = started_cluster.spark_session
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = started_cluster.minio_bucket
|
||||
TABLE_NAME = "test_checkpoint"
|
||||
TABLE_NAME = randomize_table_name("test_checkpoint")
|
||||
|
||||
write_delta_from_df(
|
||||
spark,
|
||||
@ -272,7 +279,7 @@ def test_multiple_log_files(started_cluster):
|
||||
spark = started_cluster.spark_session
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = started_cluster.minio_bucket
|
||||
TABLE_NAME = "test_multiple_log_files"
|
||||
TABLE_NAME = randomize_table_name("test_multiple_log_files")
|
||||
|
||||
write_delta_from_df(
|
||||
spark, generate_data(spark, 0, 100), f"/{TABLE_NAME}", mode="overwrite"
|
||||
@ -310,7 +317,7 @@ def test_metadata(started_cluster):
|
||||
spark = started_cluster.spark_session
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = started_cluster.minio_bucket
|
||||
TABLE_NAME = "test_metadata"
|
||||
TABLE_NAME = randomize_table_name("test_metadata")
|
||||
|
||||
parquet_data_path = create_initial_data_file(
|
||||
started_cluster,
|
||||
@ -339,9 +346,9 @@ def test_metadata(started_cluster):
|
||||
|
||||
|
||||
def test_types(started_cluster):
|
||||
TABLE_NAME = "test_types"
|
||||
TABLE_NAME = randomize_table_name("test_types")
|
||||
spark = started_cluster.spark_session
|
||||
result_file = f"{TABLE_NAME}_result_2"
|
||||
result_file = randomize_table_name(f"{TABLE_NAME}_result_2")
|
||||
|
||||
delta_table = (
|
||||
DeltaTable.create(spark)
|
||||
@ -415,7 +422,7 @@ def test_restart_broken(started_cluster):
|
||||
spark = started_cluster.spark_session
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = "broken"
|
||||
TABLE_NAME = "test_restart_broken"
|
||||
TABLE_NAME = randomize_table_name("test_restart_broken")
|
||||
|
||||
if not minio_client.bucket_exists(bucket):
|
||||
minio_client.make_bucket(bucket)
|
||||
@ -452,6 +459,18 @@ def test_restart_broken(started_cluster):
|
||||
f"SELECT count() FROM {TABLE_NAME}"
|
||||
)
|
||||
|
||||
s3_disk_no_key_errors_metric_value = int(
|
||||
instance.query(
|
||||
"""
|
||||
SELECT value
|
||||
FROM system.metrics
|
||||
WHERE metric = 'S3DiskNoKeyErrors'
|
||||
"""
|
||||
).strip()
|
||||
)
|
||||
|
||||
assert s3_disk_no_key_errors_metric_value == 0
|
||||
|
||||
minio_client.make_bucket(bucket)
|
||||
|
||||
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
||||
@ -464,7 +483,7 @@ def test_restart_broken_table_function(started_cluster):
|
||||
spark = started_cluster.spark_session
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = "broken2"
|
||||
TABLE_NAME = "test_restart_broken_table_function"
|
||||
TABLE_NAME = randomize_table_name("test_restart_broken_table_function")
|
||||
|
||||
if not minio_client.bucket_exists(bucket):
|
||||
minio_client.make_bucket(bucket)
|
||||
@ -518,7 +537,7 @@ def test_partition_columns(started_cluster):
|
||||
spark = started_cluster.spark_session
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = started_cluster.minio_bucket
|
||||
TABLE_NAME = "test_partition_columns"
|
||||
TABLE_NAME = randomize_table_name("test_partition_columns")
|
||||
result_file = f"{TABLE_NAME}"
|
||||
partition_columns = ["b", "c", "d", "e"]
|
||||
|
||||
|
@ -52,8 +52,6 @@ CREATE TABLE system.clusters
|
||||
`database_shard_name` String,
|
||||
`database_replica_name` String,
|
||||
`is_active` Nullable(UInt8),
|
||||
`replication_lag` Nullable(UInt32),
|
||||
`recovery_time` Nullable(UInt64),
|
||||
`name` String ALIAS cluster
|
||||
)
|
||||
ENGINE = SystemClusters
|
||||
|
@ -1,35 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
function test()
|
||||
{
|
||||
echo "test"
|
||||
$CH_CLIENT -q "insert into test select number, number from numbers(100000) settings min_insert_block_size_rows=50000"
|
||||
$CH_CLIENT -q "insert into test select number, 'str_' || toString(number) from numbers(100000, 100000) settings min_insert_block_size_rows=50000"
|
||||
$CH_CLIENT -q "insert into test select number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10 + 1)) from numbers(200000, 100000) settings min_insert_block_size_rows=50000"
|
||||
$CH_CLIENT -q "insert into test select number, NULL from numbers(300000, 100000) settings min_insert_block_size_rows=50000"
|
||||
$CH_CLIENT -q "insert into test select number, multiIf(number % 4 == 3, 'str_' || toString(number), number % 4 == 2, NULL, number % 4 == 1, number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10 + 1))) from numbers(400000, 400000) settings min_insert_block_size_rows=50000"
|
||||
$CH_CLIENT -q "insert into test select number, [range((number % 10 + 1)::UInt64)]::Array(Array(Dynamic)) from numbers(100000, 100000) settings min_insert_block_size_rows=50000"
|
||||
|
||||
$CH_CLIENT -q "select distinct dynamicType(d) as type from test order by type"
|
||||
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'UInt64'"
|
||||
$CH_CLIENT -q "select count() from test where d.UInt64 is not NULL"
|
||||
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'String'"
|
||||
$CH_CLIENT -q "select count() from test where d.String is not NULL"
|
||||
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'Date'"
|
||||
$CH_CLIENT -q "select count() from test where d.Date is not NULL"
|
||||
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'Array(Variant(String, UInt64))'"
|
||||
$CH_CLIENT -q "select count() from test where not empty(d.\`Array(Variant(String, UInt64))\`)"
|
||||
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'Array(Array(Dynamic))'"
|
||||
$CH_CLIENT -q "select count() from test where not empty(d.\`Array(Array(Dynamic))\`)"
|
||||
$CH_CLIENT -q "select count() from test where d is NULL"
|
||||
$CH_CLIENT -q "select count() from test where not empty(d.\`Tuple(a Array(Dynamic))\`.a.String)"
|
||||
|
||||
$CH_CLIENT -q "select d, d.UInt64, d.String, d.\`Array(Variant(String, UInt64))\` from test format Null"
|
||||
$CH_CLIENT -q "select d.UInt64, d.String, d.\`Array(Variant(String, UInt64))\` from test format Null"
|
||||
$CH_CLIENT -q "select d.Int8, d.Date, d.\`Array(String)\` from test format Null"
|
||||
$CH_CLIENT -q "select d, d.UInt64, d.Date, d.\`Array(Variant(String, UInt64))\`, d.\`Array(Variant(String, UInt64))\`.size0, d.\`Array(Variant(String, UInt64))\`.UInt64 from test format Null"
|
||||
$CH_CLIENT -q "select d.UInt64, d.Date, d.\`Array(Variant(String, UInt64))\`, d.\`Array(Variant(String, UInt64))\`.size0, d.\`Array(Variant(String, UInt64))\`.UInt64, d.\`Array(Variant(String, UInt64))\`.String from test format Null"
|
||||
$CH_CLIENT -q "select d, d.\`Tuple(a UInt64, b String)\`.a, d.\`Array(Dynamic)\`.\`Variant(String, UInt64)\`.UInt64, d.\`Array(Variant(String, UInt64))\`.UInt64 from test format Null"
|
||||
$CH_CLIENT -q "select d.\`Array(Dynamic)\`.\`Variant(String, UInt64)\`.UInt64, d.\`Array(Dynamic)\`.size0, d.\`Array(Variant(String, UInt64))\`.UInt64 from test format Null"
|
||||
$CH_CLIENT -q "select d.\`Array(Array(Dynamic))\`.size1, d.\`Array(Array(Dynamic))\`.UInt64, d.\`Array(Array(Dynamic))\`.\`Map(String, Tuple(a UInt64))\`.values.a from test format Null"
|
||||
}
|
@ -1,19 +0,0 @@
|
||||
Memory
|
||||
test
|
||||
Array(Array(Dynamic))
|
||||
Array(Variant(String, UInt64))
|
||||
None
|
||||
String
|
||||
UInt64
|
||||
200000
|
||||
200000
|
||||
200000
|
||||
200000
|
||||
0
|
||||
0
|
||||
200000
|
||||
200000
|
||||
100000
|
||||
100000
|
||||
200000
|
||||
0
|
@ -1,19 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: long
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
|
||||
# shellcheck source=./03036_dynamic_read_subcolumns.lib
|
||||
. "$CUR_DIR"/03036_dynamic_read_subcolumns.lib
|
||||
|
||||
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --use_variant_as_common_type=1 --allow_experimental_dynamic_type=1"
|
||||
|
||||
$CH_CLIENT -q "drop table if exists test;"
|
||||
|
||||
echo "Memory"
|
||||
$CH_CLIENT -q "create table test (id UInt64, d Dynamic) engine=Memory"
|
||||
test
|
||||
$CH_CLIENT -q "drop table test;"
|
@ -1,19 +0,0 @@
|
||||
MergeTree compact
|
||||
test
|
||||
Array(Array(Dynamic))
|
||||
Array(Variant(String, UInt64))
|
||||
None
|
||||
String
|
||||
UInt64
|
||||
200000
|
||||
200000
|
||||
200000
|
||||
200000
|
||||
0
|
||||
0
|
||||
200000
|
||||
200000
|
||||
100000
|
||||
100000
|
||||
200000
|
||||
0
|
@ -1,19 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: long
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
|
||||
# shellcheck source=./03036_dynamic_read_subcolumns.lib
|
||||
. "$CUR_DIR"/03036_dynamic_read_subcolumns.lib
|
||||
|
||||
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --use_variant_as_common_type=1 --allow_experimental_dynamic_type=1"
|
||||
|
||||
$CH_CLIENT -q "drop table if exists test;"
|
||||
|
||||
echo "MergeTree compact"
|
||||
$CH_CLIENT -q "create table test (id UInt64, d Dynamic) engine=MergeTree order by id settings min_rows_for_wide_part=1000000000, min_bytes_for_wide_part=10000000000;"
|
||||
test
|
||||
$CH_CLIENT -q "drop table test;"
|
@ -1,19 +0,0 @@
|
||||
MergeTree wide
|
||||
test
|
||||
Array(Array(Dynamic))
|
||||
Array(Variant(String, UInt64))
|
||||
None
|
||||
String
|
||||
UInt64
|
||||
200000
|
||||
200000
|
||||
200000
|
||||
200000
|
||||
0
|
||||
0
|
||||
200000
|
||||
200000
|
||||
100000
|
||||
100000
|
||||
200000
|
||||
0
|
@ -1,19 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: long
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
|
||||
# shellcheck source=./03036_dynamic_read_subcolumns.lib
|
||||
. "$CUR_DIR"/03036_dynamic_read_subcolumns.lib
|
||||
|
||||
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --use_variant_as_common_type=1 --allow_experimental_dynamic_type=1"
|
||||
|
||||
$CH_CLIENT -q "drop table if exists test;"
|
||||
|
||||
echo "MergeTree wide"
|
||||
$CH_CLIENT -q "create table test (id UInt64, d Dynamic) engine=MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;"
|
||||
test
|
||||
$CH_CLIENT -q "drop table test;"
|
@ -1,4 +0,0 @@
|
||||
0
|
||||
2
|
||||
0
|
||||
2
|
@ -1,11 +0,0 @@
|
||||
-- Tags: no-parallel
|
||||
|
||||
CREATE DATABASE rdb1 ENGINE = Replicated('/test/test_replication_lag_metric', 'shard1', 'replica1');
|
||||
CREATE DATABASE rdb2 ENGINE = Replicated('/test/test_replication_lag_metric', 'shard1', 'replica2');
|
||||
|
||||
SET distributed_ddl_task_timeout = 0;
|
||||
CREATE TABLE rdb1.t (id UInt32) ENGINE = ReplicatedMergeTree ORDER BY id;
|
||||
SELECT replication_lag FROM system.clusters WHERE cluster IN ('rdb1', 'rdb2') ORDER BY cluster ASC, replica_num ASC;
|
||||
|
||||
DROP DATABASE rdb1;
|
||||
DROP DATABASE rdb2;
|
Loading…
Reference in New Issue
Block a user