Merge pull request #70972 from ClickHouse/fix-keeper-with-xid-64

Fix Keeper entry serialization compatibility
This commit is contained in:
Antonio Andelic 2024-10-25 13:44:16 +00:00 committed by GitHub
commit 6a5742a6cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 67 additions and 21 deletions

View File

@ -163,6 +163,10 @@ void KeeperClient::defineOptions(Poco::Util::OptionSet & options)
.argument("<seconds>")
.binding("operation-timeout"));
options.addOption(
Poco::Util::Option("use-xid-64", "", "use 64-bit XID. default false.")
.binding("use-xid-64"));
options.addOption(
Poco::Util::Option("config-file", "c", "if set, will try to get a connection string from clickhouse config. default `config.xml`")
.argument("<file>")
@ -411,6 +415,7 @@ int KeeperClient::main(const std::vector<String> & /* args */)
zk_args.connection_timeout_ms = config().getInt("connection-timeout", 10) * 1000;
zk_args.session_timeout_ms = config().getInt("session-timeout", 10) * 1000;
zk_args.operation_timeout_ms = config().getInt("operation-timeout", 10) * 1000;
zk_args.use_xid_64 = config().hasOption("use-xid-64");
zookeeper = zkutil::ZooKeeper::createWithoutKillingPreviousSessions(zk_args);
if (config().has("no-confirmation") || config().has("query"))

View File

@ -99,9 +99,12 @@ void ZooKeeperArgs::initFromKeeperServerSection(const Poco::Util::AbstractConfig
if (auto session_timeout_key = coordination_key + ".session_timeout_ms";
config.has(session_timeout_key))
session_timeout_ms = config.getInt(session_timeout_key);
}
use_xid_64 = config.getBool(std::string{config_name} + ".use_xid_64", false);
if (auto use_xid_64_key = coordination_key + ".use_xid_64";
config.has(use_xid_64_key))
use_xid_64 = config.getBool(use_xid_64_key);
}
Poco::Util::AbstractConfiguration::Keys keys;
std::string raft_configuration_key = std::string{config_name} + ".raft_configuration";

View File

@ -1226,6 +1226,9 @@ void ZooKeeper::pushRequest(RequestInfo && info)
if (!info.request->xid)
{
info.request->xid = next_xid.fetch_add(1);
if (!use_xid_64)
info.request->xid = static_cast<int32_t>(info.request->xid);
if (info.request->xid == close_xid)
throw Exception::fromMessage(Error::ZSESSIONEXPIRED, "xid equal to close_xid");
if (info.request->xid < 0)

View File

@ -62,7 +62,8 @@ namespace ErrorCodes
DECLARE(UInt64, disk_move_retries_during_init, 100, "The amount of retries after a failure which happened while a file was being moved between disks during initialization.", 0) \
DECLARE(UInt64, log_slow_total_threshold_ms, 5000, "Requests for which the total latency is larger than this settings will be logged", 0) \
DECLARE(UInt64, log_slow_cpu_threshold_ms, 100, "Requests for which the CPU (preprocessing and processing) latency is larger than this settings will be logged", 0) \
DECLARE(UInt64, log_slow_connection_operation_threshold_ms, 1000, "Log message if a certain operation took too long inside a single connection", 0)
DECLARE(UInt64, log_slow_connection_operation_threshold_ms, 1000, "Log message if a certain operation took too long inside a single connection", 0) \
DECLARE(Bool, use_xid_64, false, "Enable 64-bit XID. It is disabled by default because of backward compatibility", 0)
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)
IMPLEMENT_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)

View File

@ -417,7 +417,7 @@ void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKe
}
}
bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, bool use_xid_64)
{
{
/// If session was already disconnected than we will ignore requests
@ -427,6 +427,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
}
KeeperStorageBase::RequestForSession request_info;
request_info.use_xid_64 = use_xid_64;
request_info.request = request;
using namespace std::chrono;
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();

View File

@ -140,7 +140,7 @@ public:
void forceRecovery();
/// Put request to ClickHouse Keeper
bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, bool use_xid_64);
/// Get new session ID
int64_t getSessionID(int64_t session_timeout_ms);

View File

@ -877,7 +877,8 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
auto entry_buf = entry->get_buf_ptr();
IKeeperStateMachine::ZooKeeperLogSerializationVersion serialization_version;
auto request_for_session = state_machine->parseRequest(*entry_buf, /*final=*/false, &serialization_version);
size_t request_end_position = 0;
auto request_for_session = state_machine->parseRequest(*entry_buf, /*final=*/false, &serialization_version, &request_end_position);
request_for_session->zxid = next_zxid;
if (!state_machine->preprocess(*request_for_session))
return nuraft::cb_func::ReturnCode::ReturnNull;
@ -892,9 +893,6 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
if (serialization_version < IKeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_ZXID_DIGEST)
bytes_missing += sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version) + sizeof(request_for_session->digest->value);
if (serialization_version < IKeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_XID_64)
bytes_missing += sizeof(uint32_t);
if (bytes_missing != 0)
{
auto new_buffer = nuraft::buffer::alloc(entry_buf->size() + bytes_missing);
@ -904,12 +902,14 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
}
size_t write_buffer_header_size = sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version)
+ sizeof(request_for_session->digest->value) + sizeof(uint32_t);
+ sizeof(request_for_session->digest->value);
if (serialization_version < IKeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_TIME)
write_buffer_header_size += sizeof(request_for_session->time);
else
request_end_position += sizeof(request_for_session->time);
auto * buffer_start = reinterpret_cast<BufferBase::Position>(entry_buf->data_begin() + entry_buf->size() - write_buffer_header_size);
auto * buffer_start = reinterpret_cast<BufferBase::Position>(entry_buf->data_begin() + request_end_position);
WriteBufferFromPointer write_buf(buffer_start, write_buffer_header_size);

View File

@ -70,7 +70,6 @@ private:
const bool create_snapshot_on_exit;
const bool enable_reconfiguration;
public:
KeeperServer(
const KeeperConfigurationAndSettingsPtr & settings_,

View File

@ -267,7 +267,11 @@ nuraft::ptr<nuraft::buffer> IKeeperStateMachine::getZooKeeperLogEntry(const Keep
size_t request_size = sizeof(uint32_t) + Coordination::size(request->getOpNum()) + request->sizeImpl();
Coordination::write(static_cast<int32_t>(request_size), write_buf);
XidHelper xid_helper{.xid = request->xid};
Coordination::write(xid_helper.parts.lower, write_buf);
if (request_for_session.use_xid_64)
Coordination::write(xid_helper.parts.lower, write_buf);
else
Coordination::write(static_cast<int32_t>(xid_helper.xid), write_buf);
Coordination::write(request->getOpNum(), write_buf);
request->writeImpl(write_buf);
@ -276,13 +280,15 @@ nuraft::ptr<nuraft::buffer> IKeeperStateMachine::getZooKeeperLogEntry(const Keep
DB::writeIntBinary(static_cast<int64_t>(0), write_buf); /// zxid
DB::writeIntBinary(KeeperStorageBase::DigestVersion::NO_DIGEST, write_buf); /// digest version or NO_DIGEST flag
DB::writeIntBinary(static_cast<uint64_t>(0), write_buf); /// digest value
Coordination::write(xid_helper.parts.upper, write_buf); /// for 64bit XID MSB
if (request_for_session.use_xid_64)
Coordination::write(xid_helper.parts.upper, write_buf); /// for 64bit XID MSB
/// if new fields are added, update KeeperStateMachine::ZooKeeperLogSerializationVersion along with parseRequest function and PreAppendLog callback handler
return write_buf.getBuffer();
}
std::shared_ptr<KeeperStorageBase::RequestForSession>
IKeeperStateMachine::parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version)
std::shared_ptr<KeeperStorageBase::RequestForSession> IKeeperStateMachine::parseRequest(
nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version, size_t * request_end_position)
{
ReadBufferFromNuraftBuffer buffer(data);
auto request_for_session = std::make_shared<KeeperStorageBase::RequestForSession>();
@ -302,6 +308,9 @@ IKeeperStateMachine::parseRequest(nuraft::buffer & data, bool final, ZooKeeperLo
auto buffer_position = buffer.getPosition();
buffer.seek(length - sizeof(uint32_t), SEEK_CUR);
if (request_end_position)
*request_end_position = buffer.getPosition();
using enum ZooKeeperLogSerializationVersion;
ZooKeeperLogSerializationVersion version = INITIAL;
@ -333,6 +342,10 @@ IKeeperStateMachine::parseRequest(nuraft::buffer & data, bool final, ZooKeeperLo
version = WITH_XID_64;
Coordination::read(xid_helper.parts.upper, buffer);
}
else
{
xid_helper.xid = static_cast<int32_t>(xid_helper.parts.lower);
}
if (serialization_version)
*serialization_version = version;

View File

@ -48,8 +48,11 @@ public:
///
/// final - whether it's the final time we will fetch the request so we can safely remove it from cache
/// serialization_version - information about which fields were parsed from the buffer so we can modify the buffer accordingly
std::shared_ptr<KeeperStorageBase::RequestForSession>
parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version = nullptr);
std::shared_ptr<KeeperStorageBase::RequestForSession> parseRequest(
nuraft::buffer & data,
bool final,
ZooKeeperLogSerializationVersion * serialization_version = nullptr,
size_t * request_end_position = nullptr);
static nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(const KeeperStorageBase::RequestForSession & request_for_session);

View File

@ -303,6 +303,7 @@ public:
int64_t zxid{0};
std::optional<Digest> digest;
int64_t log_idx{0};
bool use_xid_64{false};
};
using RequestsForSessions = std::vector<RequestForSession>;

View File

@ -43,6 +43,7 @@ namespace CoordinationSetting
{
extern const CoordinationSettingsUInt64 log_slow_connection_operation_threshold_ms;
extern const CoordinationSettingsUInt64 log_slow_total_threshold_ms;
extern const CoordinationSettingsBool use_xid_64;
}
struct LastOp
@ -312,6 +313,10 @@ Poco::Timespan KeeperTCPHandler::receiveHandshake(int32_t handshake_length, bool
}
else if (protocol_version >= Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_XID_64)
{
if (!keeper_dispatcher->getKeeperContext()->getCoordinationSettings()[CoordinationSetting::use_xid_64])
throw Exception(
ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT,
"keeper_server.coordination_settings.use_xid_64 is set to 'false' while client has it enabled");
close_xid = Coordination::CLOSE_XID_64;
use_xid_64 = true;
Coordination::read(use_compression, *in);
@ -618,7 +623,7 @@ std::pair<Coordination::OpNum, Coordination::XID> KeeperTCPHandler::receiveReque
request->xid = xid;
request->readImpl(read_buffer);
if (!keeper_dispatcher->putRequest(request, session_id))
if (!keeper_dispatcher->putRequest(request, session_id, use_xid_64))
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id);
return std::make_pair(opnum, xid);
}

View File

@ -27,6 +27,8 @@
<latest_logs_cache_size_threshold>1073741824</latest_logs_cache_size_threshold>
<commit_logs_cache_size_threshold>524288000</commit_logs_cache_size_threshold>
<use_xid_64>1</use_xid_64>
</coordination_settings>
<raft_configuration>

View File

@ -70,6 +70,7 @@ if [[ -n "$BUGFIX_VALIDATE_CHECK" ]] && [[ "$BUGFIX_VALIDATE_CHECK" -eq 1 ]]; th
}
remove_keeper_config "remove_recursive" "[[:digit:]]\+"
remove_keeper_config "use_xid_64" "[[:digit:]]\+"
fi
export IS_FLAKY_CHECK=0

View File

@ -4,4 +4,4 @@
<async_replication>1</async_replication>
</coordination_settings>
</keeper_server>
</clickhouse>
</clickhouse>

View File

@ -18,6 +18,7 @@
<tcp_port>2181</tcp_port>
<server_id>1</server_id>
<digest_enabled>1</digest_enabled>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>15000</session_timeout_ms>
@ -27,6 +28,7 @@
<election_timeout_upper_bound_ms>4000</election_timeout_upper_bound_ms>
<async_replication>1</async_replication>
<use_xid_64>1</use_xid_64>
</coordination_settings>
<raft_configuration>

View File

@ -17,6 +17,7 @@
<keeper_server>
<tcp_port>2181</tcp_port>
<server_id>2</server_id>
<digest_enabled>1</digest_enabled>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
@ -27,6 +28,7 @@
<election_timeout_upper_bound_ms>4000</election_timeout_upper_bound_ms>
<async_replication>1</async_replication>
<use_xid_64>1</use_xid_64>
</coordination_settings>
<raft_configuration>

View File

@ -12,6 +12,7 @@
<keeper_server>
<tcp_port>2181</tcp_port>
<server_id>3</server_id>
<digest_enabled>1</digest_enabled>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
@ -22,6 +23,7 @@
<election_timeout_upper_bound_ms>4000</election_timeout_upper_bound_ms>
<async_replication>1</async_replication>
<use_xid_64>1</use_xid_64>
</coordination_settings>
<raft_configuration>

View File

@ -10,6 +10,7 @@
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
<use_xid_64>1</use_xid_64>
</coordination_settings>
<raft_configuration>

View File

@ -7,6 +7,7 @@
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<use_xid_64>1</use_xid_64>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>

View File

@ -6,6 +6,7 @@
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<use_xid_64>1</use_xid_64>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>

View File

@ -34,7 +34,6 @@
<tcp_port>9181</tcp_port>
<server_id>{id}</server_id>
<digest_enabled>1</digest_enabled>
<use_xid_64>1</use_xid_64>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
@ -50,6 +49,7 @@
<stale_log_gap>{stale_log_gap}</stale_log_gap>
<reserved_log_items>{reserved_log_items}</reserved_log_items>
<async_replication>1</async_replication>
<use_xid_64>1</use_xid_64>
</coordination_settings>
<raft_configuration>