Add small script for stupid keeper check

This commit is contained in:
alesapin 2022-04-28 18:13:54 +02:00
parent 1cbdc1ef3a
commit 3107b2bcd6
5 changed files with 229 additions and 3 deletions

View File

@ -242,6 +242,11 @@ void KeeperServer::launchRaftServer(bool enable_ipv6)
coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds(), "election_timeout_upper_bound_ms", log);
params.reserved_log_items_ = getValueOrMaxInt32AndLogWarning(coordination_settings->reserved_log_items, "reserved_log_items", log);
params.snapshot_distance_ = getValueOrMaxInt32AndLogWarning(coordination_settings->snapshot_distance, "snapshot_distance", log);
if (params.snapshot_distance_ < 10000)
LOG_WARNING(log, "Very small snapshot_distance {} specified in coordination settings. "
"It doesn't make sence to specify such small value, because it can lead to degraded performance and another issues.", params.snapshot_distance_);
params.stale_log_gap_ = getValueOrMaxInt32AndLogWarning(coordination_settings->stale_log_gap, "stale_log_gap", log);
params.fresh_log_gap_ = getValueOrMaxInt32AndLogWarning(coordination_settings->fresh_log_gap, "fresh_log_gap", log);
params.client_req_timeout_

View File

@ -2,13 +2,16 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_BLOCK_SIGNAL;
}
}
void Runner::thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> & zookeepers)
void Runner::thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> zookeepers)
{
Coordination::ZooKeeperRequestPtr request;
/// Randomly choosing connection index
@ -76,6 +79,31 @@ void Runner::thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> & zook
throw;
}
std::cerr << DB::getCurrentExceptionMessage(true, true /*check embedded stack trace*/) << std::endl;
bool got_expired = false;
for (const auto & connection : zookeepers)
{
if (connection->isExpired())
{
got_expired = true;
break;
}
}
if (got_expired)
{
while (true)
{
try
{
zookeepers = getConnections();
break;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true, true /*check embedded stack trace*/) << std::endl;
}
}
}
}
++requests_executed;
@ -183,7 +211,9 @@ std::vector<std::shared_ptr<Coordination::ZooKeeper>> Runner::getConnections()
Poco::Timespan(0, 1000 * 1000),
Poco::Timespan(0, 10000 * 1000),
nullptr));
}
return zookeepers;
}

View File

@ -39,7 +39,7 @@ public:
{
}
void thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> & zookeepers);
void thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> zookeepers);
void printNumberOfRequestsExecuted(size_t num)
{

View File

@ -25,7 +25,7 @@ int main(int argc, char *argv[])
("delay,d", value<double>()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)")
("iterations,i", value<size_t>()->default_value(0), "amount of queries to be executed")
("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
("hosts,h", value<Strings>()->multitoken(), "")
("hosts,h", value<Strings>()->multitoken(), "")
("continue_on_errors", "continue testing even if a query fails")
("reconnect", "establish new connection for every query")
;

View File

@ -0,0 +1,191 @@
#!/usr/bin/env python3
import os
import subprocess
import signal
import sys
import time
import random
from argparse import ArgumentParser
XML_TEMPLATE = """
<clickhouse>
<logger>
<level>trace</level>
<log>{data_dir}/clickhouse-keeper.log</log>
<errorlog>{data_dir}/clickhouse-keeper.err.log</errorlog>
<stderr>{data_dir}/stderr.log</stderr>
<stdout>{data_dir}/stdout.log</stdout>
<size>1000M</size>
<count>10</count>
<console>0</console>
</logger>
<keeper_server>
<tcp_port>{client_port}</tcp_port>
<server_id>{server_id}</server_id>
<log_storage_path>{data_dir}/log</log_storage_path>
<snapshot_storage_path>{data_dir}/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<snapshot_distance>1</snapshot_distance>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>localhost</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>localhost</hostname>
<port>9235</port>
</server>
<server>
<id>3</id>
<hostname>localhost</hostname>
<port>9236</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>
"""
class Keeper():
def __init__(self, keeper_binary_path, server_id, client_port, workdir, with_thread_fuzzer):
self.keeper_binary_path = keeper_binary_path
if not os.path.exists(self.keeper_binary_path):
raise Exception(f"Path for keeper binary doesn't exist {self.keeper_binary_path}")
self.server_id = server_id
self.client_port = client_port
self.workdir = workdir
self.data_dir = os.path.join(self.workdir, f"data_server_{self.server_id}")
self.config_dir = os.path.join(self.workdir, f"config_server_{self.server_id}")
self.config_path = os.path.join(self.config_dir, "config.xml")
self.with_thread_fuzzer = with_thread_fuzzer
self.process = None
def _create_dirs(self):
if not os.path.exists(self.workdir):
os.makedirs(self.workdir)
for p in [self.data_dir, self.config_dir]:
if not os.path.exists(p):
os.mkdir(p)
def _prepare_and_write_config(self):
if not os.path.exists(self.config_path):
config = XML_TEMPLATE.format(
client_port=self.client_port,
server_id=self.server_id,
data_dir = self.data_dir,
)
with open(self.config_path, 'w', encoding='utf-8') as f:
f.write(config)
def prepare(self):
self._create_dirs()
self._prepare_and_write_config()
def start(self):
env = os.environ.copy()
if self.with_thread_fuzzer:
env['THREAD_FUZZER_CPU_TIME_PERIOD_US'] = '1000'
env['THREAD_FUZZER_SLEEP_PROBABILITY'] = '0.1'
env['THREAD_FUZZER_SLEEP_TIME_US'] = '100000'
env['THREAD_FUZZER_pthread_mutex_lock_BEFORE_MIGRATE_PROBABILITY'] = '1'
env['THREAD_FUZZER_pthread_mutex_lock_AFTER_MIGRATE_PROBABILITY'] = '1'
env['THREAD_FUZZER_pthread_mutex_unlock_BEFORE_MIGRATE_PROBABILITY'] = '1'
env['THREAD_FUZZER_pthread_mutex_unlock_AFTER_MIGRATE_PROBABILITY'] = '1'
env['THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_PROBABILITY'] = '0.001'
env['THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_PROBABILITY'] = '0.001'
env['THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_PROBABILITY'] = '0.001'
env['THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_PROBABILITY'] = '0.001'
env['THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_TIME_US'] = '10000'
env['THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_TIME_US'] = '10000'
env['THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_TIME_US'] = '10000'
env['THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_TIME_US'] = '10000'
self.process = subprocess.Popen([
self.keeper_binary_path, '--config', self.config_path,
'--log-file', f'{self.data_dir}/clickhouse-keeper.log',
'--errorlog-file', f'{self.data_dir}/clickhouse-keeper.err.log'], env=env)
def restart(self):
print("Restarting keeper", self.server_id)
self.stop()
self.start()
def stop(self):
if self.process:
self.process.kill()
self.process = None
class KeeperBench():
def __init__(self, bench_binary_path, client_ports):
self.bench_binary_path = bench_binary_path
self.client_ports = client_ports
self.process = None
def start(self):
hosts = ' '.join([f'--hosts localhost:{port}' for port in self.client_ports])
cmd = f"{self.bench_binary_path} {hosts} --continue_on_errors -c 32 --generator create_small_data"
self.process = subprocess.Popen(cmd, shell=True)
def stop(self):
if self.process:
self.process.kill()
self.process = None
def main(args):
PORTS = [9181, 9182, 9183]
SERVER_IDS = [1, 2, 3]
workdir = args.workdir
keeper_binary_path = args.keeper_binary_path
keeper_bench_path = args.keeper_bench_path
keepers = []
for (port, server_id) in zip(PORTS, SERVER_IDS):
keepers.append(Keeper(keeper_binary_path, server_id, port, workdir, args.with_thread_fuzzer))
bench = KeeperBench(keeper_bench_path, PORTS)
for keeper in keepers:
keeper.prepare()
keeper.start()
time.sleep(10)
bench.start()
def signal_handler(sig, frame):
print('You pressed Ctrl+C!')
for keeper in keepers:
keeper.stop()
bench.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
for _ in range(100):
time.sleep(30)
sacrifice = random.choice(keepers)
sacrifice.restart()
time.sleep(60)
if __name__ == "__main__":
parser = ArgumentParser(description='Simple tool to load three-nodes keeper')
parser.add_argument('--workdir', default='./keeper-overload-workdir', help='Path to workdir')
parser.add_argument('--keeper-binary-path', required=True, help='Path to keeper binary')
parser.add_argument('--keeper-bench-path', required=True, help='Path to keeper bench utility')
parser.add_argument('--with-thread-fuzzer', action='store_true', help='Path to keeper bench utility')
args = parser.parse_args()
main(args)