Merge pull request #47256 from ClickHouse/keeper-4lw-clean-resources

Add Keeper 4LW for cleaning resources
This commit is contained in:
Alexey Milovidov 2023-03-07 04:23:15 +03:00 committed by GitHub
commit 7d48eae0f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 92 additions and 3 deletions

View File

@ -128,6 +128,7 @@ if (BUILD_STANDALONE_KEEPER)
ch_contrib::lz4
ch_contrib::zstd
ch_contrib::cityhash
ch_contrib::jemalloc
common ch_contrib::double_conversion
ch_contrib::dragonbox_to_chars
pcg_random

View File

@ -36,7 +36,7 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco
}
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc";
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs";
KeeperConfigurationAndSettings::KeeperConfigurationAndSettings()
: server_id(NOT_EXIST)

View File

@ -148,6 +148,9 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat
FourLetterCommandPtr recalculate_command = std::make_shared<RecalculateCommand>(keeper_dispatcher);
factory.registerCommand(recalculate_command);
FourLetterCommandPtr clean_resources_command = std::make_shared<CleanResourcesCommand>(keeper_dispatcher);
factory.registerCommand(clean_resources_command);
factory.initializeAllowList(keeper_dispatcher);
factory.setInitialize(true);
}
@ -524,4 +527,10 @@ String RecalculateCommand::run()
return "ok";
}
String CleanResourcesCommand::run()
{
keeper_dispatcher.cleanResources();
return "ok";
}
}

View File

@ -377,7 +377,6 @@ struct RequestLeaderCommand : public IFourLetterCommand
~RequestLeaderCommand() override = default;
};
/// Request to be leader.
struct RecalculateCommand : public IFourLetterCommand
{
explicit RecalculateCommand(KeeperDispatcher & keeper_dispatcher_)
@ -390,4 +389,16 @@ struct RecalculateCommand : public IFourLetterCommand
~RecalculateCommand() override = default;
};
struct CleanResourcesCommand : public IFourLetterCommand
{
explicit CleanResourcesCommand(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}
String name() override { return "clrs"; }
String run() override;
~CleanResourcesCommand() override = default;
};
}

View File

@ -9,7 +9,7 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/checkStackSize.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <future>
#include <chrono>
@ -17,12 +17,26 @@
#include <iterator>
#include <limits>
#if USE_JEMALLOC
# include <jemalloc/jemalloc.h>
#define STRINGIFY_HELPER(x) #x
#define STRINGIFY(x) STRINGIFY_HELPER(x)
#endif
namespace CurrentMetrics
{
extern const Metric KeeperAliveConnections;
extern const Metric KeeperOutstandingRequets;
}
namespace ProfileEvents
{
extern const Event MemoryAllocatorPurge;
extern const Event MemoryAllocatorPurgeTimeMicroseconds;
}
namespace fs = std::filesystem;
namespace DB
@ -753,4 +767,15 @@ Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const
return result;
}
void KeeperDispatcher::cleanResources()
{
#if USE_JEMALLOC
LOG_TRACE(&Poco::Logger::get("KeeperDispatcher"), "Purging unused memory");
Stopwatch watch;
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, watch.elapsedMicroseconds());
#endif
}
}

View File

@ -230,6 +230,8 @@ public:
{
return server->recalculateStorageStats();
}
static void cleanResources();
};
}

View File

@ -679,3 +679,44 @@ def test_cmd_rqld(started_cluster):
+ " does not become leader after 30s, maybe there is something wrong."
)
assert keeper_utils.is_leader(cluster, node)
def test_cmd_clrs(started_cluster):
if node1.is_built_with_sanitizer():
return
def get_memory_purges():
return node1.query(
"SELECT value FROM system.events WHERE event = 'MemoryAllocatorPurge' SETTINGS system_events_show_zero_values = 1"
)
zk = None
try:
wait_nodes()
zk = get_fake_zk(node1.name, timeout=30.0)
paths = [f"/clrs_{i}" for i in range(10000)]
# we only count the events because we cannot reliably test memory usage of Keeper
# but let's create and delete nodes so the first purge needs to release some memory
create_transaction = zk.transaction()
for path in paths:
create_transaction.create(path)
create_transaction.commit()
delete_transaction = zk.transaction()
for path in paths:
delete_transaction.delete(path)
delete_transaction.commit()
# repeat multiple times to make sure MemoryAllocatorPurge isn't increased because of other reasons
for _ in range(5):
prev_purges = int(get_memory_purges())
keeper_utils.send_4lw_cmd(cluster, node1, cmd="clrs")
current_purges = int(get_memory_purges())
assert current_purges > prev_purges
prev_purges = current_purges
finally:
destroy_zk_client(zk)