Merge pull request #28212 from azat/zookeeper_log-improvements

zookeeper_log improvements
This commit is contained in:
tavplubix 2021-08-27 16:22:52 +03:00 committed by GitHub
commit 481cc011c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 14 additions and 9 deletions

View File

@ -387,6 +387,7 @@ void ZooKeeper::connect(
}
socket.connect(node.address, connection_timeout);
socket_address = socket.peerAddress();
socket.setReceiveTimeout(operation_timeout);
socket.setSendTimeout(operation_timeout);
@ -1255,7 +1256,7 @@ void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const
{
elem.type = log_type;
elem.event_time = event_time;
elem.address = socket.peerAddress();
elem.address = socket_address;
elem.session_id = session_id;
maybe_zk_log->add(elem);
}

View File

@ -199,6 +199,8 @@ private:
Poco::Timespan operation_timeout;
Poco::Net::StreamSocket socket;
/// To avoid excessive getpeername(2) calls.
Poco::Net::SocketAddress socket_address;
std::optional<ReadBufferFromPocoSocket> in;
std::optional<WriteBufferFromPocoSocket> out;

View File

@ -158,7 +158,7 @@ void ZooKeeperLogElement::appendToBlock(MutableColumns & columns) const
auto event_time_seconds = event_time / 1000000;
columns[i++]->insert(DateLUT::instance().toDayNum(event_time_seconds).toUnderType());
columns[i++]->insert(event_time);
columns[i++]->insert(IPv6ToBinary(address.host()).data());
columns[i++]->insertData(IPv6ToBinary(address.host()).data(), 16);
columns[i++]->insert(address.port());
columns[i++]->insert(session_id);

View File

@ -1,9 +1,9 @@
log
Response 0 Watch /test/01158/default/rmt/log 0 0 \N 0 0 ZOK CHILD CONNECTED 0 0 0 0
Request 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 \N \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 ZOK \N \N /test/01158/default/rmt/log 0 0 0 0
Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 ZOK \N \N /test/01158/default/rmt/log/log-0000000000 0 0 0 0
::1 Response 0 Watch /test/01158/default/rmt/log 0 0 \N 0 0 ZOK CHILD CONNECTED 0 0 0 0
::1 Request 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 \N \N \N 0 0 0 0
::1 Response 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 ZOK \N \N /test/01158/default/rmt/log 0 0 0 0
::1 Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0
::1 Response 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 ZOK \N \N /test/01158/default/rmt/log/log-0000000000 0 0 0 0
parts
Request 0 Multi 0 0 \N 5 0 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0

View File

@ -1,12 +1,14 @@
drop table if exists rmt;
create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n;
-- cleanup code will perform extra Exists
-- (so the .reference will not match)
create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n settings cleanup_delay_period=86400;
system sync replica rmt;
insert into rmt values (1);
insert into rmt values (1);
system flush logs;
select 'log';
select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
select address, type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren
from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/log%' and op_num not in (3, 4, 12)
order by xid, type, request_idx;