code style change

This commit is contained in:
JackyWoo 2021-11-12 20:48:42 +08:00
parent 1cbe9f6024
commit a2f3337ca1
13 changed files with 158 additions and 161 deletions

View File

@ -2,9 +2,9 @@
#include <Common/ShellCommand.h>
#include <IO/WriteBufferFromString.h>
#include <IO/copyData.h>
#include <string>
#include <cstdio>
#include <unistd.h>
#include <fmt/format.h>
#include <IO/ReadHelpers.h>
int getCurrentProcessFDCount()
@ -12,17 +12,39 @@ int getCurrentProcessFDCount()
#if defined(__linux__) || defined(__APPLE__)
using namespace DB;
char buf[64];
snprintf(buf, 64, "lsof -p %i | wc -l", getpid());
Int32 pid = getpid();
std::unique_ptr<ShellCommand> command;
auto command = ShellCommand::execute(buf);
/// First try procfs
String by_procfs = fmt::format("ls /proc/{}/fd | wc -l", pid);
command = ShellCommand::execute(by_procfs);
try
{
command->wait();
}
catch (...)
{
/// Then try lsof command
String by_lsof = fmt::format("lsof -p {} | wc -l", pid);
command = ShellCommand::execute(by_procfs);
try
{
command->wait();
}
catch (...)
{
return -1;
}
}
WriteBufferFromOwnString out;
copyData(command->out, out);
if (!out.str().empty())
{
return std::stoi(out.str());
return parse<Int32>(out.str());
}
return -1;

View File

@ -1,5 +1,5 @@
#pragma once
/// Get current process file descriptor count
/// @return -1 if error occurs
/// @return -1 os doesn't support "lsof" command or some error occurs.
int getCurrentProcessFDCount();

View File

@ -1,27 +1,34 @@
#include <Common/getMaxFileDescriptorCount.h>
#include <Common/ShellCommand.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/copyData.h>
#include <string>
#include <Common/ShellCommand.h>
#include <Common/getMaxFileDescriptorCount.h>
int getMaxFileDescriptorCount()
{
#if defined(__linux__) || defined(__APPLE__)
#if defined(__linux__) || defined(__APPLE__)
using namespace DB;
auto command = ShellCommand::execute("ulimit -n");
try
{
command->wait();
}
catch (...)
{
return -1;
}
WriteBufferFromOwnString out;
copyData(command->out, out);
if (!out.str().empty())
{
return std::stoi(out.str());
return parse<Int32>(out.str());
}
return -1;
#else
return -1;
#endif
}

View File

@ -1,6 +1,6 @@
#pragma once
/// Get process max file descriptor count
/// @return -1 if error occurs
/// @return -1 if os does not support ulimit command or some error occurs
int getMaxFileDescriptorCount();

View File

@ -3,6 +3,8 @@
#include <base/logger_useful.h>
#include <filesystem>
#include <Coordination/Defines.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteIntText.h>
namespace DB
{
@ -49,103 +51,100 @@ KeeperSettings::KeeperSettings()
void KeeperSettings::dump(WriteBufferFromOwnString & buf) const
{
auto write = [&buf](const String & content) { buf.write(content.data(), content.size()); };
auto write_int = [&buf](Int64 value)
{
String str_val = std::to_string(value);
buf.write(str_val.data(), str_val.size());
writeIntText(value, buf);
buf.write('\n');
};
auto write_bool = [&buf](bool value)
{
String str_val = value ? "true" : "false";
buf.write(str_val.data(), str_val.size());
writeText(str_val, buf);
buf.write('\n');
};
write("server_id=");
writeText("server_id=", buf);
write_int(server_id);
if (tcp_port != NOT_EXIST)
{
write("tcp_port=");
writeText("tcp_port=", buf);
write_int(tcp_port);
}
if (tcp_port_secure != NOT_EXIST)
{
write("tcp_port_secure=");
writeText("tcp_port_secure=", buf);
write_int(tcp_port_secure);
}
write("four_letter_word_white_list=");
write(four_letter_word_white_list);
writeText("four_letter_word_white_list=", buf);
writeText(four_letter_word_white_list, buf);
buf.write('\n');
write("log_storage_path=");
write(log_storage_path);
writeText("log_storage_path=", buf);
writeText(log_storage_path, buf);
buf.write('\n');
write("snapshot_storage_path=");
write(snapshot_storage_path);
writeText("snapshot_storage_path=", buf);
writeText(snapshot_storage_path, buf);
buf.write('\n');
/// coordination_settings
write("max_requests_batch_size=");
writeText("max_requests_batch_size=", buf);
write_int(coordination_settings->max_requests_batch_size);
write("session_timeout_ms=");
writeText("session_timeout_ms=", buf);
write_int(UInt64(coordination_settings->session_timeout_ms));
write("operation_timeout_ms=");
writeText("operation_timeout_ms=", buf);
write_int(UInt64(coordination_settings->operation_timeout_ms));
write("dead_session_check_period_ms=");
writeText("dead_session_check_period_ms=", buf);
write_int(UInt64(coordination_settings->dead_session_check_period_ms));
write("heart_beat_interval_ms=");
writeText("heart_beat_interval_ms=", buf);
write_int(UInt64(coordination_settings->heart_beat_interval_ms));
write("election_timeout_lower_bound_ms=");
writeText("election_timeout_lower_bound_ms=", buf);
write_int(UInt64(coordination_settings->election_timeout_lower_bound_ms));
write("election_timeout_upper_bound_ms=");
writeText("election_timeout_upper_bound_ms=", buf);
write_int(UInt64(coordination_settings->election_timeout_upper_bound_ms));
write("reserved_log_items=");
writeText("reserved_log_items=", buf);
write_int(coordination_settings->reserved_log_items);
write("snapshot_distance=");
writeText("snapshot_distance=", buf);
write_int(coordination_settings->snapshot_distance);
write("auto_forwarding=");
writeText("auto_forwarding=", buf);
write_bool(coordination_settings->auto_forwarding);
write("shutdown_timeout=");
writeText("shutdown_timeout=", buf);
write_int(UInt64(coordination_settings->shutdown_timeout));
write("startup_timeout=");
writeText("startup_timeout=", buf);
write_int(UInt64(coordination_settings->startup_timeout));
write("raft_logs_level=");
write(coordination_settings->raft_logs_level.toString());
writeText("raft_logs_level=", buf);
writeText(coordination_settings->raft_logs_level.toString(), buf);
buf.write('\n');
write("snapshots_to_keep=");
writeText("snapshots_to_keep=", buf);
write_int(coordination_settings->snapshots_to_keep);
write("rotate_log_storage_interval=");
writeText("rotate_log_storage_interval=", buf);
write_int(coordination_settings->rotate_log_storage_interval);
write("stale_log_gap=");
writeText("stale_log_gap=", buf);
write_int(coordination_settings->stale_log_gap);
write("fresh_log_gap=");
writeText("fresh_log_gap=", buf);
write_int(coordination_settings->fresh_log_gap);
write("max_requests_batch_size=");
writeText("max_requests_batch_size=", buf);
write_int(coordination_settings->max_requests_batch_size);
write("quorum_reads=");
writeText("quorum_reads=", buf);
write_bool(coordination_settings->quorum_reads);
write("force_sync=");
writeText("force_sync=", buf);
write_bool(coordination_settings->force_sync);
write("compress_logs=");
writeText("compress_logs=", buf);
write_bool(coordination_settings->compress_logs);
write("compress_snapshots_with_zstd_format=");
writeText("compress_snapshots_with_zstd_format=", buf);
write_bool(coordination_settings->compress_snapshots_with_zstd_format);
write("configuration_change_tries_count=");
writeText("configuration_change_tries_count=", buf);
write_int(coordination_settings->configuration_change_tries_count);
}

View File

@ -52,6 +52,7 @@ struct CoordinationSettings : public BaseSettings<CoordinationSettingsTraits>
using CoordinationSettingsPtr = std::shared_ptr<CoordinationSettings>;
/// encapsulation of keeper settings from keeper_server
struct KeeperSettings
{
static constexpr int NOT_EXIST = -1;

View File

@ -38,16 +38,6 @@ Int32 IFourLetterCommand::toCode(const String & name)
return __builtin_bswap32(res);
}
void IFourLetterCommand::printSet(IFourLetterCommand::StringBuffer & buffer, std::unordered_set<String> & set, String && prefix)
{
for (const auto & str : set)
{
buffer.write(prefix.data(), prefix.size());
buffer.write(str.data(), str.size());
buffer.write('\n');
}
}
IFourLetterCommand::~IFourLetterCommand() = default;
FourLetterCommandFactory & FourLetterCommandFactory::instance()
@ -82,8 +72,6 @@ void FourLetterCommandFactory::registerCommand(FourLetterCommandPtr & command)
{
throw Exception("Four letter command " + command->name() + " already registered", ErrorCodes::LOGICAL_ERROR);
}
auto * log = &Poco::Logger::get("FourLetterCommandFactory");
LOG_INFO(log, "Register four letter command {}, code {}", command->name(), std::to_string(command->code()));
commands.emplace(command->code(), std::move(command));
}
@ -230,32 +218,23 @@ String MonitorCommand::run()
{
print(ret, "followers", raft_info.getFollowerCount());
print(ret, "synced_followers", raft_info.getSyncedFollowerCount());
/// print(ret, "pending_syncs", 0);
}
/// print(ret, "last_proposal_size", -1);
/// print(ret, "max_proposal_size", -1);
/// print(ret, "min_proposal_size", -1);
return ret.str();
}
void MonitorCommand::print(IFourLetterCommand::StringBuffer & buf, const String & key, const String & value)
{
const static String prefix = "zk_";
const int prefix_len = prefix.size();
buf.write(prefix.data(), prefix_len);
buf.write(key.data(), key.size());
buf.write('\t');
buf.write(value.data(), value.size());
buf.write('\n');
writeText("zk_", buf);
writeText(key, buf);
writeText('\t', buf);
writeText(value, buf);
writeText('\n', buf);
}
void MonitorCommand::print(IFourLetterCommand::StringBuffer & buf, const String & key, UInt64 value)
{
print(buf, key, std::to_string(value));
print(buf, key, toString(value));
}
String StatResetCommand::run()
@ -292,15 +271,14 @@ String RestConnStatsCommand::run()
String ServerStatCommand::run()
{
using std::to_string;
StringBuffer buf;
auto write = [&buf](const String & key, const String & value)
{
buf.write(key.data(), key.size());
buf.write(": ", 2);
buf.write(value.data(), value.size());
buf.write('\n');
writeText(key, buf);
writeText(": ", buf);
writeText(value, buf);
writeText('\n', buf);
};
KeeperStatsPtr stats = keeper_dispatcher.getKeeperStats();
@ -313,20 +291,19 @@ String ServerStatCommand::run()
latency << stats->getMinLatency() << "/" << stats->getAvgLatency() << "/" << stats->getMaxLatency() << "\n";
write("Latency min/avg/max", latency.str());
write("Received", to_string(stats->getPacketsReceived()));
write("Sent ", to_string(stats->getPacketsSent()));
write("Connections", to_string(keeper_info.getNumAliveConnections()));
write("Outstanding", to_string(keeper_info.getOutstandingRequests()));
write("Zxid", to_string(state_machine.getLastProcessedZxid()));
write("Received", toString(stats->getPacketsReceived()));
write("Sent ", toString(stats->getPacketsSent()));
write("Connections", toString(keeper_info.getNumAliveConnections()));
write("Outstanding", toString(keeper_info.getOutstandingRequests()));
write("Zxid", toString(state_machine.getLastProcessedZxid()));
write("Mode", keeper_info.getRole());
write("Node count", to_string(state_machine.getNodeCount()));
write("Node count", toString(state_machine.getNodeCount()));
return buf.str();
}
String StatCommand::run()
{
using std::to_string;
StringBuffer buf;
auto write = [&buf](const String & key, const String & value) { buf << key << ": " << value << '\n'; };
@ -345,13 +322,13 @@ String StatCommand::run()
latency << stats->getMinLatency() << "/" << stats->getAvgLatency() << "/" << stats->getMaxLatency() << "\n";
write("Latency min/avg/max", latency.str());
write("Received", to_string(stats->getPacketsReceived()));
write("Sent ", to_string(stats->getPacketsSent()));
write("Connections", to_string(keeper_info.getNumAliveConnections()));
write("Outstanding", to_string(keeper_info.getOutstandingRequests()));
write("Zxid", to_string(state_machine.getLastProcessedZxid()));
write("Received", toString(stats->getPacketsReceived()));
write("Sent ", toString(stats->getPacketsSent()));
write("Connections", toString(keeper_info.getNumAliveConnections()));
write("Outstanding", toString(keeper_info.getOutstandingRequests()));
write("Zxid", toString(state_machine.getLastProcessedZxid()));
write("Mode", keeper_info.getRole());
write("Node count", to_string(state_machine.getNodeCount()));
write("Node count", toString(state_machine.getNodeCount()));
return buf.str();
}

View File

@ -35,8 +35,6 @@ public:
static inline String toName(Int32 code);
static inline Int32 toCode(const String & name);
static void printSet(StringBuffer & buffer, std::unordered_set<String> & set, String && prefix);
protected:
const KeeperDispatcher & keeper_dispatcher;
};

View File

@ -4,9 +4,10 @@
#include <future>
#include <chrono>
#include <Poco/Path.h>
#include <Poco/File.h>
#include <Poco/DirectoryIterator.h>
#include <Common/hex.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
@ -18,13 +19,12 @@ namespace ErrorCodes
extern const int SYSTEM_ERROR;
}
using Poco::Path;
using Poco::File;
using Poco::DirectoryIterator;
using Path = fs::path;
using DirectoryIterator = fs::directory_iterator;
UInt64 getDirSize(Path dir)
{
if (!File(dir).exists())
if (!fs::exists(dir))
{
return 0;
}
@ -35,8 +35,8 @@ UInt64 getDirSize(Path dir)
UInt64 size{0};
while (it != end)
{
if (it->isFile())
size += it->getSize();
if (!it->is_regular_file())
size += fs::file_size(*it);
else
size += getDirSize(it->path());
++it;

View File

@ -1223,18 +1223,13 @@ void KeeperStorage::clearDeadWatches(int64_t session_id)
void KeeperStorage::dumpWatches(WriteBufferFromOwnString & buf) const
{
auto write_str_set = [&buf](const std::unordered_set<String> & objs)
{
for (const String & obj : objs)
{
buf << "\t" << obj << "\n";
}
};
for (const auto & e : sessions_and_watchers)
{
buf << "0x" << getHexUIntLowercase(e.first) << "\n";
write_str_set(e.second);
for (const String & path : e.second)
{
buf << "\t" << path << "\n";
}
}
}

View File

@ -41,7 +41,7 @@ public:
ChildrenSet children{};
/// object memory size
UInt64 size() const
UInt64 sizeInBytes() const
{
UInt64 child_size{0};
for (const auto & child : children)

View File

@ -16,11 +16,11 @@ struct ListNode
};
template <class V>
struct HasSizeMethod
struct CanCalculateSize
{
private:
template <class T>
static auto check(int) -> decltype(std::declval<T>().size(), std::true_type());
static auto check(int) -> decltype(std::declval<T>().sizeInBytes(), std::true_type());
template <class T>
static std::false_type check(...);
@ -82,7 +82,7 @@ private:
approximate_data_size -= old_value_size;
}
}
/// inseert
/// insert
else
{
approximate_data_size += key_size;
@ -118,15 +118,15 @@ private:
}
/// Calculate object memory size.
/// @return size(), if T has method size(), otherwise return sizeof(T)
/// @return sizeInBytes(), if T has method sizeInBytes, otherwise return sizeof(T)
template <typename T>
inline UInt64 sizeOf(const typename std::enable_if<HasSizeMethod<T>::value, T>::type * obj)
inline UInt64 sizeOf(const typename std::enable_if<CanCalculateSize<T>::value, T>::type * obj)
{
return obj->size();
return obj->sizeInBytes();
}
template <typename T>
inline UInt64 sizeOf(const typename std::enable_if<!HasSizeMethod<T>::value, T>::type *)
inline UInt64 sizeOf(const typename std::enable_if<!CanCalculateSize<T>::value, T>::type *)
{
return sizeof(T);
}
@ -147,7 +147,7 @@ public:
ListElem elem{key, value, true};
auto itr = list.insert(list.end(), elem);
map.emplace(itr->key, itr);
updateDataSize(INSERT, sizeOf<std::string>(&key), sizeOf<V>(&value), 0);
updateDataSize(INSERT, key.size(), sizeOf<V>(&value), 0);
return true;
}
@ -182,7 +182,7 @@ public:
list_itr->value = value;
}
}
updateDataSize(INSERT_OR_REPLACE, sizeOf<std::string>(&key), sizeOf<V>(&value), old_value_size);
updateDataSize(INSERT_OR_REPLACE, key.size(), sizeOf<V>(&value), old_value_size);
}
bool erase(const std::string & key)
@ -204,7 +204,7 @@ public:
list.erase(list_itr);
}
updateDataSize(ERASE, sizeOf<std::string>(&key), 0, old_data_size);
updateDataSize(ERASE, key.size(), 0, old_data_size);
return true;
}
@ -238,7 +238,7 @@ public:
updater(list_itr->value);
ret = list_itr;
}
updateDataSize(UPDATE_VALUE, sizeOf<std::string>(&key), sizeOf<V>(&ret->value), old_value_size);
updateDataSize(UPDATE_VALUE, key.size(), sizeOf<V>(&ret->value), old_value_size);
return ret;
}
@ -265,7 +265,7 @@ public:
{
if (!itr->active_in_map)
{
updateDataSize(CLEAR_OUTDATED_NODES, sizeOf<String>(&itr->key), sizeOf<V>(&itr->value), 0);
updateDataSize(CLEAR_OUTDATED_NODES, itr->key.size(), sizeOf<V>(&itr->value), 0);
itr = list.erase(itr);
}
else

View File

@ -529,56 +529,54 @@ void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response
void KeeperTCPHandler::dumpStats(WriteBufferFromOwnString & buf, bool brief)
{
auto write_str = [&buf](const String & str) { buf.write(str.data(), str.size()); };
using std::to_string;
buf.write(' ');
write_str(socket().peerAddress().toString());
write_str("(recved=");
write_str(to_string(conn_stats->getPacketsReceived()));
write_str(",sent=");
write_str(to_string(conn_stats->getPacketsSent()));
writeText(' ', buf);
writeText(socket().peerAddress().toString(), buf);
writeText("(recved=", buf);
writeIntText(conn_stats->getPacketsReceived(), buf);
writeText(",sent=", buf);
writeIntText(conn_stats->getPacketsSent(), buf);
if (!brief)
{
if (session_id != 0)
{
write_str(",sid=0x");
write_str(getHexUIntLowercase(getSessionId()));
writeText(",sid=0x", buf);
writeText(getHexUIntLowercase(getSessionId()), buf);
write_str(",lop=");
writeText(",lop=", buf);
LastOp op;
{
std::lock_guard lock(last_op_mutex);
op = last_op.clone();
}
write_str(op.getLastOp());
write_str(",est=");
write_str(to_string(getEstablished().epochMicroseconds() / 1000));
write_str(",to=");
write_str(to_string(getSessionTimeout()));
writeText(op.getLastOp(), buf);
writeText(",est=", buf);
writeIntText(getEstablished().epochMicroseconds() / 1000, buf);
writeText(",to=", buf);
writeIntText(getSessionTimeout(), buf);
Int64 last_cxid = op.getLastCxid();
if (last_cxid >= 0)
{
write_str(",lcxid=0x");
write_str(getHexUIntLowercase(last_cxid));
writeText(",lcxid=0x", buf);
writeText(getHexUIntLowercase(last_cxid), buf);
}
write_str(",lzxid=0x");
write_str(getHexUIntLowercase(op.getLastZxid()));
write_str(",lresp=");
write_str(to_string(op.getLastResponseTime()));
writeText(",lzxid=0x", buf);
writeText(getHexUIntLowercase(op.getLastZxid()), buf);
writeText(",lresp=", buf);
writeIntText(op.getLastResponseTime(), buf);
write_str(",llat=");
write_str(to_string(conn_stats->getLastLatency()));
write_str(",minlat=");
write_str(to_string(conn_stats->getMinLatency()));
write_str(",avglat=");
write_str(to_string(conn_stats->getAvgLatency()));
write_str(",maxlat=");
write_str(to_string(conn_stats->getMaxLatency()));
writeText(",llat=", buf);
writeIntText(conn_stats->getLastLatency(), buf);
writeText(",minlat=", buf);
writeIntText(conn_stats->getMinLatency(), buf);
writeText(",avglat=", buf);
writeIntText(conn_stats->getAvgLatency(), buf);
writeText(",maxlat=", buf);
writeIntText(conn_stats->getMaxLatency(), buf);
}
}
buf.write(')');
buf.write('\n');
writeText(')', buf);
writeText('\n', buf);
}
void KeeperTCPHandler::resetStats()