mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-29 21:20:49 +00:00
adjust code style for keeper 4lw cmd
This commit is contained in:
parent
a60663e33d
commit
e5b0eedd31
@ -20,7 +20,7 @@ int getCurrentProcessFDCount()
|
|||||||
WriteBufferFromOwnString out;
|
WriteBufferFromOwnString out;
|
||||||
copyData(command->out, out);
|
copyData(command->out, out);
|
||||||
|
|
||||||
if(!out.str().empty())
|
if (!out.str().empty())
|
||||||
{
|
{
|
||||||
return std::stoi(out.str());
|
return std::stoi(out.str());
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@ int getMaxFileDescriptorCount()
|
|||||||
WriteBufferFromOwnString out;
|
WriteBufferFromOwnString out;
|
||||||
copyData(command->out, out);
|
copyData(command->out, out);
|
||||||
|
|
||||||
if(!out.str().empty())
|
if (!out.str().empty())
|
||||||
{
|
{
|
||||||
return std::stoi(out.str());
|
return std::stoi(out.str());
|
||||||
}
|
}
|
||||||
|
@ -37,13 +37,15 @@ void KeeperSettings::dump(WriteBufferFromOwnString & buf) const
|
|||||||
{
|
{
|
||||||
auto write = [&buf](const String & content) { buf.write(content.data(), content.size()); };
|
auto write = [&buf](const String & content) { buf.write(content.data(), content.size()); };
|
||||||
|
|
||||||
auto write_int = [&buf](Int64 value) {
|
auto write_int = [&buf](Int64 value)
|
||||||
|
{
|
||||||
String str_val = std::to_string(value);
|
String str_val = std::to_string(value);
|
||||||
buf.write(str_val.data(), str_val.size());
|
buf.write(str_val.data(), str_val.size());
|
||||||
buf.write('\n');
|
buf.write('\n');
|
||||||
};
|
};
|
||||||
|
|
||||||
auto write_bool = [&buf](bool value) {
|
auto write_bool = [&buf](bool value)
|
||||||
|
{
|
||||||
String str_val = value ? "true" : "false";
|
String str_val = value ? "true" : "false";
|
||||||
buf.write(str_val.data(), str_val.size());
|
buf.write(str_val.data(), str_val.size());
|
||||||
buf.write('\n');
|
buf.write('\n');
|
||||||
@ -52,17 +54,17 @@ void KeeperSettings::dump(WriteBufferFromOwnString & buf) const
|
|||||||
write("server_id=");
|
write("server_id=");
|
||||||
write_int(server_id);
|
write_int(server_id);
|
||||||
|
|
||||||
if(tcp_port != NO_PORT)
|
if (tcp_port != NO_PORT)
|
||||||
{
|
{
|
||||||
write("tcp_port=");
|
write("tcp_port=");
|
||||||
write_int(tcp_port);
|
write_int(tcp_port);
|
||||||
}
|
}
|
||||||
if(tcp_port_secure != NO_PORT)
|
if (tcp_port_secure != NO_PORT)
|
||||||
{
|
{
|
||||||
write("tcp_port_secure=");
|
write("tcp_port_secure=");
|
||||||
write_int(tcp_port_secure);
|
write_int(tcp_port_secure);
|
||||||
}
|
}
|
||||||
if(!super_digest.empty())
|
if (!super_digest.empty())
|
||||||
{
|
{
|
||||||
write("superdigest=");
|
write("superdigest=");
|
||||||
write(super_digest);
|
write(super_digest);
|
||||||
@ -136,15 +138,15 @@ KeeperSettings::loadFromConfig(const Poco::Util::AbstractConfiguration & config,
|
|||||||
ret->server_id = config.getInt("keeper_server.server_id");
|
ret->server_id = config.getInt("keeper_server.server_id");
|
||||||
ret->standalone_keeper = standalone_keeper_;
|
ret->standalone_keeper = standalone_keeper_;
|
||||||
|
|
||||||
if(config.has("keeper_server.tcp_port"))
|
if (config.has("keeper_server.tcp_port"))
|
||||||
{
|
{
|
||||||
ret->tcp_port = config.getInt("keeper_server.tcp_port");
|
ret->tcp_port = config.getInt("keeper_server.tcp_port");
|
||||||
}
|
}
|
||||||
if(config.has("keeper_server.tcp_port_secure"))
|
if (config.has("keeper_server.tcp_port_secure"))
|
||||||
{
|
{
|
||||||
ret->tcp_port_secure = config.getInt("keeper_server.tcp_port_secure");
|
ret->tcp_port_secure = config.getInt("keeper_server.tcp_port_secure");
|
||||||
}
|
}
|
||||||
if(config.has("keeper_server.superdigest"))
|
if (config.has("keeper_server.superdigest"))
|
||||||
{
|
{
|
||||||
ret->super_digest = config.getString("keeper_server.tcp_port_secure");
|
ret->super_digest = config.getString("keeper_server.tcp_port_secure");
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,6 @@
|
|||||||
#include <Coordination/KeeperDispatcher.h>
|
#include <Coordination/KeeperDispatcher.h>
|
||||||
#include <Coordination/KeeperInfos.h>
|
#include <Coordination/KeeperInfos.h>
|
||||||
#include <IO/WriteBufferFromString.h>
|
#include <IO/WriteBufferFromString.h>
|
||||||
#include <common/types.h>
|
|
||||||
|
|
||||||
#if !defined(ARCADIA_BUILD)
|
#if !defined(ARCADIA_BUILD)
|
||||||
# include <Common/config_version.h>
|
# include <Common/config_version.h>
|
||||||
@ -98,9 +97,6 @@ struct RuokCommand : public IFourLetterCommand
|
|||||||
* zk_followers 2 - only exposed by the Leader
|
* zk_followers 2 - only exposed by the Leader
|
||||||
* zk_synced_followers 2 - only exposed by the Leader
|
* zk_synced_followers 2 - only exposed by the Leader
|
||||||
* zk_pending_syncs 0 - only exposed by the Leader
|
* zk_pending_syncs 0 - only exposed by the Leader
|
||||||
* zk_last_proposal_size -1
|
|
||||||
* zk_max_proposal_size -1
|
|
||||||
* zk_min_proposal_size -1
|
|
||||||
*/
|
*/
|
||||||
struct MonitorCommand : public IFourLetterCommand
|
struct MonitorCommand : public IFourLetterCommand
|
||||||
{
|
{
|
||||||
@ -144,6 +140,4 @@ struct ConfCommand : public IFourLetterCommand
|
|||||||
~ConfCommand() override;
|
~ConfCommand() override;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,6 @@ namespace ErrorCodes
|
|||||||
extern const int LOGICAL_ERROR;
|
extern const int LOGICAL_ERROR;
|
||||||
extern const int TIMEOUT_EXCEEDED;
|
extern const int TIMEOUT_EXCEEDED;
|
||||||
extern const int SYSTEM_ERROR;
|
extern const int SYSTEM_ERROR;
|
||||||
extern const int UNKNOWN_SETTING;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
UInt64 KeeperDispatcher::KeeperStats::getMinLatency() const
|
UInt64 KeeperDispatcher::KeeperStats::getMinLatency() const
|
||||||
@ -330,7 +329,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
|
|||||||
if (!requests_queue->push(std::move(request_info)))
|
if (!requests_queue->push(std::move(request_info)))
|
||||||
throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR);
|
throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR);
|
||||||
}
|
}
|
||||||
else if (!requests_queue->tryPush(std::move(request_info), coordination_settings->operation_timeout_ms.totalMilliseconds()))
|
else if (!requests_queue->tryPush(std::move(request_info), settings->coordination_settings->operation_timeout_ms.totalMilliseconds()))
|
||||||
{
|
{
|
||||||
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
|
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
}
|
}
|
||||||
@ -342,14 +341,14 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
|
|||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Initializing storage dispatcher");
|
LOG_DEBUG(log, "Initializing storage dispatcher");
|
||||||
|
|
||||||
settings = KeeperSettings::loadFromConfig(config_, standalone_keeper);
|
settings = KeeperSettings::loadFromConfig(config, standalone_keeper);
|
||||||
requests_queue = std::make_unique<RequestsQueue>(settings->coordination_settings->max_requests_batch_size);
|
requests_queue = std::make_unique<RequestsQueue>(settings->coordination_settings->max_requests_batch_size);
|
||||||
|
|
||||||
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
|
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
|
||||||
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
|
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
|
||||||
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
|
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
|
||||||
|
|
||||||
server = std::make_unique<KeeperServer>(settings, config_, responses_queue, snapshots_queue);
|
server = std::make_unique<KeeperServer>(settings, config, responses_queue, snapshots_queue);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -44,7 +44,7 @@ public:
|
|||||||
UInt64 size() const
|
UInt64 size() const
|
||||||
{
|
{
|
||||||
UInt64 child_size{0};
|
UInt64 child_size{0};
|
||||||
for(auto & child : children)
|
for (auto & child : children)
|
||||||
{
|
{
|
||||||
child_size += child.size();
|
child_size += child.size();
|
||||||
}
|
}
|
||||||
@ -201,7 +201,7 @@ public:
|
|||||||
UInt64 getEphemeralCount() const
|
UInt64 getEphemeralCount() const
|
||||||
{
|
{
|
||||||
UInt64 ret{0};
|
UInt64 ret{0};
|
||||||
for(const auto & ephs : ephemerals)
|
for (const auto & ephs : ephemerals)
|
||||||
{
|
{
|
||||||
ret += ephs.second.size();
|
ret += ephs.second.size();
|
||||||
}
|
}
|
||||||
|
@ -382,7 +382,7 @@ void KeeperTCPHandler::runImpl()
|
|||||||
|
|
||||||
/// Do request statistics,
|
/// Do request statistics,
|
||||||
/// not accurate when there is watch response in the channel
|
/// not accurate when there is watch response in the channel
|
||||||
if(result.responses_count != 0)
|
if (result.responses_count != 0)
|
||||||
{
|
{
|
||||||
process_time_stopwatch.stop();
|
process_time_stopwatch.stop();
|
||||||
keeper_dispatcher->updateKeeperStat(process_time_stopwatch.elapsedMilliseconds());
|
keeper_dispatcher->updateKeeperStat(process_time_stopwatch.elapsedMilliseconds());
|
||||||
|
Loading…
Reference in New Issue
Block a user