mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #62652 from ianton-ru/fix-host-resolver-fail
Fix HostResolver behavior on fail
This commit is contained in:
commit
38787c429f
@ -288,6 +288,7 @@
|
||||
M(HTTPConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for http hosts") \
|
||||
\
|
||||
M(AddressesActive, "Total count of addresses which are used for creation connections with connection pools") \
|
||||
M(AddressesBanned, "Total count of addresses which are banned as faulty for creation connections with connection pools") \
|
||||
|
||||
|
||||
#ifdef APPLY_FOR_EXTERNAL_METRICS
|
||||
|
@ -8,6 +8,8 @@
|
||||
#include <Common/MemoryTrackerSwitcher.h>
|
||||
|
||||
#include <mutex>
|
||||
#include <algorithm>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
@ -19,6 +21,7 @@ namespace ProfileEvents
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric AddressesActive;
|
||||
extern const Metric AddressesBanned;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -36,6 +39,7 @@ HostResolverMetrics HostResolver::getMetrics()
|
||||
.expired = ProfileEvents::AddressesExpired,
|
||||
.failed = ProfileEvents::AddressesMarkedAsFailed,
|
||||
.active_count = CurrentMetrics::AddressesActive,
|
||||
.banned_count = CurrentMetrics::AddressesBanned,
|
||||
};
|
||||
}
|
||||
|
||||
@ -47,7 +51,7 @@ HostResolver::WeakPtr HostResolver::getWeakFromThis()
|
||||
HostResolver::HostResolver(String host_, Poco::Timespan history_)
|
||||
: host(std::move(host_))
|
||||
, history(history_)
|
||||
, resolve_function([](const String & host_to_resolve) { return DNSResolver::instance().resolveHostAll(host_to_resolve); })
|
||||
, resolve_function([](const String & host_to_resolve) { return DNSResolver::instance().resolveHostAllInOriginOrder(host_to_resolve); })
|
||||
{
|
||||
update();
|
||||
}
|
||||
@ -62,6 +66,12 @@ HostResolver::HostResolver(
|
||||
HostResolver::~HostResolver()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
auto banned_count = 0;
|
||||
for (const auto & rec: records)
|
||||
banned_count += rec.failed;
|
||||
CurrentMetrics::sub(metrics.banned_count, banned_count);
|
||||
|
||||
CurrentMetrics::sub(metrics.active_count, records.size());
|
||||
records.clear();
|
||||
}
|
||||
@ -113,6 +123,7 @@ void HostResolver::updateWeights()
|
||||
|
||||
if (getTotalWeight() == 0 && !records.empty())
|
||||
{
|
||||
CurrentMetrics::sub(metrics.banned_count, records.size());
|
||||
for (auto & rec : records)
|
||||
rec.failed = false;
|
||||
|
||||
@ -140,7 +151,7 @@ void HostResolver::setSuccess(const Poco::Net::IPAddress & address)
|
||||
return;
|
||||
|
||||
auto old_weight = it->getWeight();
|
||||
++it->usage;
|
||||
it->setSuccess();
|
||||
auto new_weight = it->getWeight();
|
||||
|
||||
if (old_weight != new_weight)
|
||||
@ -158,8 +169,8 @@ void HostResolver::setFail(const Poco::Net::IPAddress & address)
|
||||
if (it == records.end())
|
||||
return;
|
||||
|
||||
it->failed = true;
|
||||
it->fail_time = now;
|
||||
if (it->setFail(now))
|
||||
CurrentMetrics::add(metrics.banned_count);
|
||||
}
|
||||
|
||||
ProfileEvents::increment(metrics.failed);
|
||||
@ -216,14 +227,20 @@ void HostResolver::updateImpl(Poco::Timestamp now, std::vector<Poco::Net::IPAddr
|
||||
{
|
||||
CurrentMetrics::sub(metrics.active_count, 1);
|
||||
ProfileEvents::increment(metrics.expired, 1);
|
||||
if (it_before->failed)
|
||||
CurrentMetrics::sub(metrics.banned_count);
|
||||
}
|
||||
++it_before;
|
||||
}
|
||||
else if (it_before == records.end() || (it_next != next_gen.end() && *it_next < it_before->address))
|
||||
{
|
||||
CurrentMetrics::add(metrics.active_count, 1);
|
||||
ProfileEvents::increment(metrics.discovered, 1);
|
||||
merged.push_back(Record(*it_next, now));
|
||||
/// there are could be duplicates in next_gen vector
|
||||
if (merged.empty() || merged.back().address != *it_next)
|
||||
{
|
||||
CurrentMetrics::add(metrics.active_count, 1);
|
||||
ProfileEvents::increment(metrics.discovered, 1);
|
||||
merged.push_back(Record(*it_next, now));
|
||||
}
|
||||
++it_next;
|
||||
}
|
||||
else
|
||||
@ -237,10 +254,22 @@ void HostResolver::updateImpl(Poco::Timestamp now, std::vector<Poco::Net::IPAddr
|
||||
}
|
||||
|
||||
for (auto & rec : merged)
|
||||
if (rec.failed && rec.fail_time < last_effective_resolve)
|
||||
rec.failed = false;
|
||||
{
|
||||
if (!rec.failed)
|
||||
continue;
|
||||
|
||||
/// Exponential increased time for each consecutive fail
|
||||
auto banned_until = now - Poco::Timespan(history.totalMicroseconds() * (1ull << (rec.consecutive_fail_count - 1)));
|
||||
if (rec.fail_time < banned_until)
|
||||
{
|
||||
rec.failed = false;
|
||||
CurrentMetrics::sub(metrics.banned_count);
|
||||
}
|
||||
}
|
||||
|
||||
chassert(std::is_sorted(merged.begin(), merged.end()));
|
||||
// check that merged contains unuque elements
|
||||
chassert(std::adjacent_find(merged.begin(), merged.end()) == merged.end());
|
||||
|
||||
last_resolve_time = now;
|
||||
records.swap(merged);
|
||||
@ -251,6 +280,7 @@ void HostResolver::updateImpl(Poco::Timestamp now, std::vector<Poco::Net::IPAddr
|
||||
updateWeights();
|
||||
}
|
||||
|
||||
|
||||
size_t HostResolver::getTotalWeight() const
|
||||
{
|
||||
if (records.empty())
|
||||
|
@ -39,9 +39,11 @@ struct HostResolverMetrics
|
||||
const ProfileEvents::Event failed = ProfileEvents::end();
|
||||
|
||||
const CurrentMetrics::Metric active_count = CurrentMetrics::end();
|
||||
const CurrentMetrics::Metric banned_count = CurrentMetrics::end();
|
||||
};
|
||||
|
||||
constexpr size_t DEFAULT_RESOLVE_TIME_HISTORY_SECONDS = 2*60;
|
||||
constexpr size_t RECORD_CONSECTIVE_FAIL_COUNT_LIMIT = 6;
|
||||
|
||||
|
||||
class HostResolver : public std::enable_shared_from_this<HostResolver>
|
||||
@ -141,6 +143,7 @@ protected:
|
||||
size_t usage = 0;
|
||||
bool failed = false;
|
||||
Poco::Timestamp fail_time = 0;
|
||||
size_t consecutive_fail_count = 0;
|
||||
|
||||
size_t weight_prefix_sum;
|
||||
|
||||
@ -149,6 +152,11 @@ protected:
|
||||
return address < r.address;
|
||||
}
|
||||
|
||||
bool operator ==(const Record & r) const
|
||||
{
|
||||
return address == r.address;
|
||||
}
|
||||
|
||||
size_t getWeight() const
|
||||
{
|
||||
if (failed)
|
||||
@ -166,6 +174,28 @@ protected:
|
||||
return 8;
|
||||
return 10;
|
||||
}
|
||||
|
||||
bool setFail(const Poco::Timestamp & now)
|
||||
{
|
||||
bool was_ok = !failed;
|
||||
|
||||
failed = true;
|
||||
fail_time = now;
|
||||
|
||||
if (was_ok)
|
||||
{
|
||||
if (consecutive_fail_count < RECORD_CONSECTIVE_FAIL_COUNT_LIMIT)
|
||||
++consecutive_fail_count;
|
||||
}
|
||||
|
||||
return was_ok;
|
||||
}
|
||||
|
||||
void setSuccess()
|
||||
{
|
||||
consecutive_fail_count = 0;
|
||||
++usage;
|
||||
}
|
||||
};
|
||||
|
||||
using Records = std::vector<Record>;
|
||||
@ -178,6 +208,7 @@ protected:
|
||||
void updateWeights() TSA_REQUIRES(mutex);
|
||||
void updateWeightsImpl() TSA_REQUIRES(mutex);
|
||||
size_t getTotalWeight() const TSA_REQUIRES(mutex);
|
||||
Poco::Timespan getRecordHistoryTime(const Record&) const;
|
||||
|
||||
const String host;
|
||||
const Poco::Timespan history;
|
||||
@ -188,7 +219,7 @@ protected:
|
||||
|
||||
std::mutex mutex;
|
||||
|
||||
Poco::Timestamp last_resolve_time TSA_GUARDED_BY(mutex);
|
||||
Poco::Timestamp last_resolve_time TSA_GUARDED_BY(mutex) = Poco::Timestamp::TIMEVAL_MIN;
|
||||
Records records TSA_GUARDED_BY(mutex);
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("ConnectionPool");
|
||||
|
@ -2,7 +2,9 @@
|
||||
#include <base/sleep.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/HostResolvePool.h>
|
||||
#include "base/defines.h"
|
||||
|
||||
#include <optional>
|
||||
#include <thread>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
@ -29,8 +31,9 @@ protected:
|
||||
DB::CurrentThread::getProfileEvents().reset();
|
||||
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count));
|
||||
|
||||
addresses = std::set<String>{"127.0.0.1", "127.0.0.2", "127.0.0.3"};
|
||||
addresses = std::multiset<String>{"127.0.0.1", "127.0.0.2", "127.0.0.3"};
|
||||
// Code here will be called immediately after the constructor (right
|
||||
// before each test).
|
||||
}
|
||||
@ -58,7 +61,7 @@ protected:
|
||||
}
|
||||
|
||||
DB::HostResolverMetrics metrics = DB::HostResolver::getMetrics();
|
||||
std::set<String> addresses;
|
||||
std::multiset<String> addresses;
|
||||
};
|
||||
|
||||
TEST_F(ResolvePoolTest, CanResolve)
|
||||
@ -160,7 +163,7 @@ TEST_F(ResolvePoolTest, CanMerge)
|
||||
ASSERT_EQ(addresses.size(), DB::CurrentThread::getProfileEvents()[metrics.discovered]);
|
||||
|
||||
auto old_addresses = addresses;
|
||||
addresses = std::set<String>{"127.0.0.4", "127.0.0.5"};
|
||||
addresses = std::multiset<String>{"127.0.0.4", "127.0.0.5"};
|
||||
|
||||
|
||||
resolver->update();
|
||||
@ -229,6 +232,7 @@ TEST_F(ResolvePoolTest, CanFail)
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.failed]);
|
||||
ASSERT_EQ(addresses.size(), CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
|
||||
ASSERT_EQ(addresses.size(), DB::CurrentThread::getProfileEvents()[metrics.discovered]);
|
||||
|
||||
for (size_t i = 0; i < 1000; ++i)
|
||||
@ -243,15 +247,20 @@ TEST_F(ResolvePoolTest, CanFail)
|
||||
TEST_F(ResolvePoolTest, CanFailAndHeal)
|
||||
{
|
||||
auto resolver = make_resolver();
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count));
|
||||
|
||||
auto failed_addr = resolver->resolve();
|
||||
failed_addr.setFail();
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
|
||||
|
||||
while (true)
|
||||
{
|
||||
auto next_addr = resolver->resolve();
|
||||
if (*failed_addr == *next_addr)
|
||||
{
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -277,3 +286,123 @@ TEST_F(ResolvePoolTest, CanExpire)
|
||||
ASSERT_EQ(addresses.size() + 1, DB::CurrentThread::getProfileEvents()[metrics.discovered]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
}
|
||||
|
||||
|
||||
TEST_F(ResolvePoolTest, DuplicatesInAddresses)
|
||||
{
|
||||
auto resolver = make_resolver();
|
||||
|
||||
size_t unuque_addresses = addresses.size();
|
||||
|
||||
ASSERT_EQ(3, unuque_addresses);
|
||||
ASSERT_EQ(3, DB::CurrentThread::getProfileEvents()[metrics.discovered]);
|
||||
|
||||
ASSERT_TRUE(!addresses.empty());
|
||||
addresses.insert(*addresses.begin());
|
||||
addresses.insert(*addresses.begin());
|
||||
|
||||
size_t total_addresses = addresses.size();
|
||||
|
||||
ASSERT_EQ(addresses.count(*addresses.begin()), 3);
|
||||
ASSERT_EQ(unuque_addresses + 2, total_addresses);
|
||||
|
||||
resolver->update();
|
||||
ASSERT_EQ(3, DB::CurrentThread::getProfileEvents()[metrics.discovered]);
|
||||
}
|
||||
|
||||
void check_no_failed_address(size_t iteration, auto & resolver, auto & addresses, auto & failed_addr, auto & metrics)
|
||||
{
|
||||
ASSERT_EQ(iteration, DB::CurrentThread::getProfileEvents()[metrics.failed]);
|
||||
for (size_t i = 0; i < 100; ++i)
|
||||
{
|
||||
auto next_addr = resolver->resolve();
|
||||
ASSERT_TRUE(addresses.contains(*next_addr));
|
||||
ASSERT_NE(*next_addr, *failed_addr);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ResolvePoolTest, BannedForConsiquenceFail)
|
||||
{
|
||||
size_t history_ms = 5;
|
||||
auto resolver = make_resolver(history_ms);
|
||||
|
||||
|
||||
auto failed_addr = resolver->resolve();
|
||||
ASSERT_TRUE(addresses.contains(*failed_addr));
|
||||
|
||||
failed_addr.setFail();
|
||||
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
|
||||
check_no_failed_address(1, resolver, addresses, failed_addr, metrics);
|
||||
|
||||
sleepForMilliseconds(history_ms + 1);
|
||||
resolver->update();
|
||||
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count));
|
||||
|
||||
failed_addr.setFail();
|
||||
check_no_failed_address(2, resolver, addresses, failed_addr, metrics);
|
||||
|
||||
sleepForMilliseconds(history_ms + 1);
|
||||
resolver->update();
|
||||
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
|
||||
|
||||
// ip still banned adter history_ms + update, because it was his second consiquent fail
|
||||
check_no_failed_address(2, resolver, addresses, failed_addr, metrics);
|
||||
}
|
||||
|
||||
TEST_F(ResolvePoolTest, NoAditionalBannForConcurrentFail)
|
||||
{
|
||||
size_t history_ms = 5;
|
||||
auto resolver = make_resolver(history_ms);
|
||||
|
||||
auto failed_addr = resolver->resolve();
|
||||
ASSERT_TRUE(addresses.contains(*failed_addr));
|
||||
|
||||
failed_addr.setFail();
|
||||
failed_addr.setFail();
|
||||
failed_addr.setFail();
|
||||
|
||||
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
|
||||
check_no_failed_address(3, resolver, addresses, failed_addr, metrics);
|
||||
|
||||
sleepForMilliseconds(history_ms + 1);
|
||||
resolver->update();
|
||||
// ip is cleared after just 1 history_ms interval.
|
||||
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count));
|
||||
}
|
||||
|
||||
TEST_F(ResolvePoolTest, StillBannedAfterSuccess)
|
||||
{
|
||||
size_t history_ms = 5;
|
||||
auto resolver = make_resolver(history_ms);
|
||||
|
||||
auto failed_addr = resolver->resolve();
|
||||
ASSERT_TRUE(addresses.contains(*failed_addr));
|
||||
|
||||
std::optional<decltype(resolver->resolve())> again_addr;
|
||||
while (true)
|
||||
{
|
||||
auto addr = resolver->resolve();
|
||||
if (*addr == *failed_addr)
|
||||
{
|
||||
again_addr.emplace(std::move(addr));
|
||||
break;
|
||||
}
|
||||
}
|
||||
chassert(again_addr);
|
||||
|
||||
failed_addr.setFail();
|
||||
|
||||
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
|
||||
check_no_failed_address(1, resolver, addresses, failed_addr, metrics);
|
||||
|
||||
again_addr = std::nullopt; // success;
|
||||
|
||||
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
|
||||
}
|
||||
|
@ -0,0 +1,12 @@
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<connect_timeout>5</connect_timeout>
|
||||
<receive_timeout>5</receive_timeout>
|
||||
<send_timeout>5</send_timeout>
|
||||
<http_connection_timeout>5</http_connection_timeout>
|
||||
<http_send_timeout>5</http_send_timeout>
|
||||
<http_receive_timeout>5</http_receive_timeout>
|
||||
</default>
|
||||
</profiles>
|
||||
</clickhouse>
|
@ -0,0 +1,21 @@
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<s3>
|
||||
<type>s3</type>
|
||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
</s3>
|
||||
</disks>
|
||||
<policies>
|
||||
<s3>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>s3</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</s3>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
</clickhouse>
|
126
tests/integration/test_host_resolver_fail_count/test_case.py
Normal file
126
tests/integration/test_host_resolver_fail_count/test_case.py
Normal file
@ -0,0 +1,126 @@
|
||||
"""Test Interserver responses on configured IP."""
|
||||
|
||||
import pytest
|
||||
import time
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node = cluster.add_instance(
|
||||
"node",
|
||||
main_configs=["configs/config.d/cluster.xml", "configs/config.d/s3.xml"],
|
||||
with_minio=True,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
# The same value as in ClickHouse, this can't be confugured via config now
|
||||
DEFAULT_RESOLVE_TIME_HISTORY_SECONDS = 2 * 60
|
||||
|
||||
|
||||
def test_host_resolver(start_cluster):
|
||||
minio_ip = cluster.get_instance_ip("minio1")
|
||||
|
||||
# drop DNS cache
|
||||
node.set_hosts(
|
||||
[
|
||||
(minio_ip, "minio1"),
|
||||
(node.ip_address, "minio1"), # no answer on 9001 port on this IP
|
||||
]
|
||||
)
|
||||
|
||||
node.query("SYSTEM DROP DNS CACHE")
|
||||
node.query("SYSTEM DROP CONNECTIONS CACHE")
|
||||
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE test (key UInt32, value UInt32)
|
||||
Engine=MergeTree()
|
||||
ORDER BY key PARTITION BY key
|
||||
SETTINGS storage_policy='s3'
|
||||
"""
|
||||
)
|
||||
|
||||
initial_fails = "0\n"
|
||||
k = 0
|
||||
limit = 100
|
||||
while initial_fails == "0\n":
|
||||
node.query(
|
||||
f"""
|
||||
INSERT INTO test VALUES (0,{k})
|
||||
"""
|
||||
)
|
||||
# HostResolver chooses IP randomly, so on single call can choose worked ID
|
||||
initial_fails = node.query(
|
||||
"SELECT value FROM system.events WHERE event LIKE 'AddressesMarkedAsFailed'"
|
||||
)
|
||||
k += 1
|
||||
if k >= limit:
|
||||
# Dead IP was not choosen for 100 iteration.
|
||||
# This is not expected, but not an error actually.
|
||||
# And test should be stopped.
|
||||
return
|
||||
|
||||
# initial_fails can be more than 1 if clickhouse does something in several parallel threads
|
||||
|
||||
for j in range(10):
|
||||
for i in range(10):
|
||||
node.query(
|
||||
f"""
|
||||
INSERT INTO test VALUES ({i+1},{j+1})
|
||||
"""
|
||||
)
|
||||
fails = node.query(
|
||||
"SELECT value FROM system.events WHERE event LIKE 'AddressesMarkedAsFailed'"
|
||||
)
|
||||
assert fails == initial_fails
|
||||
|
||||
# Check that clickhouse tries to recheck IP after 2 minutes
|
||||
time.sleep(DEFAULT_RESOLVE_TIME_HISTORY_SECONDS)
|
||||
|
||||
intermediate_fails = initial_fails
|
||||
limit = k + 100
|
||||
while intermediate_fails == initial_fails:
|
||||
node.query(
|
||||
f"""
|
||||
INSERT INTO test VALUES (101,{k})
|
||||
"""
|
||||
)
|
||||
intermediate_fails = node.query(
|
||||
"SELECT value FROM system.events WHERE event LIKE 'AddressesMarkedAsFailed'"
|
||||
)
|
||||
k += 1
|
||||
if k >= limit:
|
||||
# Dead IP was not choosen for 100 iteration.
|
||||
# This is not expected, but not an error actually.
|
||||
# And test should be stopped.
|
||||
return
|
||||
|
||||
# After another 2 minutes shoudl not be new fails, next retry after 4 minutes
|
||||
time.sleep(DEFAULT_RESOLVE_TIME_HISTORY_SECONDS)
|
||||
|
||||
initial_fails = intermediate_fails
|
||||
limit = k + 100
|
||||
while intermediate_fails == initial_fails:
|
||||
node.query(
|
||||
f"""
|
||||
INSERT INTO test VALUES (102,{k})
|
||||
"""
|
||||
)
|
||||
intermediate_fails = node.query(
|
||||
"SELECT value FROM system.events WHERE event LIKE 'AddressesMarkedAsFailed'"
|
||||
)
|
||||
k += 1
|
||||
if k >= limit:
|
||||
break
|
||||
|
||||
assert k == limit
|
Loading…
Reference in New Issue
Block a user