Compare commits

...

71 Commits

Author SHA1 Message Date
MikhailBurdukov
4f5c290bb8
Merge 7c0810e2b4 into b94a7167a8 2024-09-18 13:33:43 -07:00
Robert Schulze
b94a7167a8
Merge pull request #69580 from rschu1ze/bump-libpqxx
Bump libpqxx to v7.7.5
2024-09-18 18:56:12 +00:00
Konstantin Bogdanov
64e58baba1
Merge pull request #69682 from ClickHouse/more-asserts-for-hashjoin
Try fix asserts failure in `HashJoin`
2024-09-18 18:20:27 +00:00
max-vostrikov
a3fe155579
Merge pull request #69737 from ClickHouse/test_printf
added some edge cases for printf tests
2024-09-18 17:49:57 +00:00
Antonio Andelic
a997cfad2b
Merge pull request #68108 from ClickHouse/keeper-some-improvement2
Keeper improvements package
2024-09-18 16:35:57 +00:00
maxvostrikov
f4b4b3cc35 added some edge cases for printf tests
added some edge cases for printf tests
2024-09-18 17:22:36 +02:00
Konstantin Bogdanov
cb24849396
Move assert 2024-09-18 15:24:48 +02:00
Antonio Andelic
4f73c677ac Merge branch 'master' into keeper-some-improvement2 2024-09-18 13:19:24 +02:00
Antonio Andelic
3106653852 Fix watches 2024-09-18 10:47:40 +02:00
Konstantin Bogdanov
b08e727aef
Count allocated bytes from scratch after rerange 2024-09-17 19:02:10 +02:00
Konstantin Bogdanov
a210f98819
Lint 2024-09-17 18:28:27 +02:00
Konstantin Bogdanov
7c5d55c6b2
Lint 2024-09-17 18:10:51 +02:00
Konstantin Bogdanov
80259659ff
More asserts 2024-09-17 18:03:19 +02:00
Antonio Andelic
8db3dddb3d Fix watches count and list request 2024-09-17 16:15:55 +02:00
Antonio Andelic
f3654b8fc8 Merge branch 'master' into keeper-some-improvement2 2024-09-17 10:35:38 +02:00
Antonio Andelic
676b6238d0 Update comments 2024-09-17 10:30:39 +02:00
Antonio Andelic
e876997ebb Merge branch 'master' into keeper-some-improvement2 2024-09-17 10:28:02 +02:00
Robert Schulze
aab0d3dd9e
Bump to 7.7.5 2024-09-12 19:42:32 +00:00
Robert Schulze
5a34b9f24e
Bump to 7.6.1 2024-09-12 19:14:41 +00:00
Robert Schulze
a0a4858e00
Scratch build of libpqxx at 7.5.3 + patches 2024-09-12 18:55:35 +00:00
Antonio Andelic
65019c4b9b Merge branch 'master' into keeper-some-improvement2 2024-09-07 20:59:04 +02:00
Antonio Andelic
190339c4e6 Fix snapshot sync 2024-09-07 17:34:59 +02:00
Antonio Andelic
5a86371b02 Merge branch 'master' into keeper-some-improvement2 2024-09-07 11:32:44 +02:00
Antonio Andelic
03c7f3817b Correct lock order 2024-09-06 15:41:04 +02:00
MikhailBurdukov
7c0810e2b4 Trying to fix the test 2024-09-06 13:17:56 +00:00
Antonio Andelic
f44eaa808d Merge branch 'master' into keeper-some-improvement2 2024-09-06 09:35:56 +02:00
Antonio Andelic
e388f6f99b Remove useless log 2024-09-06 09:35:02 +02:00
MikhailBurdukov
3d68507b61 Fix inf loop after in the replicated merge tree with zero copy. 2024-09-05 13:31:48 +00:00
Antonio Andelic
a3e233a537 Fix watch 2024-09-04 15:19:56 +02:00
Antonio Andelic
955412888c Merge branch 'master' into keeper-some-improvement2 2024-09-04 11:30:29 +02:00
Antonio Andelic
9633563fbd Fix 2024-09-04 11:30:05 +02:00
Antonio Andelic
79fc8d67ad More fixes 2024-09-02 15:46:04 +02:00
Antonio Andelic
596ba574e3 Merge branch 'master' into keeper-some-improvement2 2024-09-02 09:31:02 +02:00
Antonio Andelic
e968984d17 More changes 2024-09-02 08:25:17 +02:00
Antonio Andelic
c61fc591c4 Use functions instead of classes 2024-08-13 11:33:17 +02:00
Antonio Andelic
dcbc590302 Merge branch 'master' into keeper-some-improvement2 2024-08-13 09:01:10 +02:00
Antonio Andelic
b6c3619543 Whitespace 2024-08-09 15:41:11 +02:00
Antonio Andelic
b2172af817 Merge branch 'master' into keeper-some-improvement2 2024-08-09 14:50:52 +02:00
Antonio Andelic
5ea4844d69 Merge branch 'master' into keeper-some-improvement2 2024-08-07 11:26:33 +02:00
Antonio Andelic
48e7057200 Merge branch 'master' into keeper-some-improvement2 2024-07-22 16:51:20 +02:00
Antonio Andelic
5a96290cce Merge branch 'master' into keeper-some-improvement2 2024-07-10 12:45:43 +02:00
Antonio Andelic
7e22af06f1 Merge branch 'master' into keeper-some-improvement2 2024-07-02 09:01:48 +02:00
Antonio Andelic
ac78184fe7 Merge branch 'tracing-try-2' into keeper-some-improvement2 2024-06-18 11:04:00 +02:00
Antonio Andelic
1777ff37c0 Merge branch 'master' into keeper-some-improvement2 2024-06-18 11:03:38 +02:00
Antonio Andelic
7dca59da56 Revert "Merge branch 'use-thread-from-global-pool-in-poco-threadpool' into keeper-some-improvement"
This reverts commit 737d7484c5, reversing
changes made to b3a742304e.
2024-06-17 09:03:49 +02:00
Antonio Andelic
0fa45c3954 More parallel storage 2024-06-11 16:39:35 +02:00
Antonio Andelic
c802d7d58a Writing improvements 2024-06-11 14:35:26 +02:00
Antonio Andelic
5ab06caffc Merge branch 'keeper-parallel-storage' into keeper-some-improvement2 2024-06-11 10:18:27 +02:00
Antonio Andelic
737d7484c5 Merge branch 'use-thread-from-global-pool-in-poco-threadpool' into keeper-some-improvement 2024-06-11 09:46:58 +02:00
Antonio Andelic
b3a742304e Merge branch 'master' into keeper-some-improvement 2024-06-11 09:46:41 +02:00
kssenii
6514d72fea Move servers pool back 2024-06-10 18:53:51 +02:00
kssenii
c3d4b429d9 Fix merge 2024-06-10 15:39:54 +02:00
kssenii
7ff848c2c8 Merge remote-tracking branch 'origin/master' into use-thread-from-global-pool-in-poco-threadpool 2024-06-10 15:20:03 +02:00
kssenii
a11ba3f437 Fix shutdown 2024-06-10 15:19:03 +02:00
kssenii
6604d94271 Ping CI: skip fast test to see all stateless runs 2024-06-07 17:11:49 +02:00
kssenii
e30fa1da4d Fix ThreadStatus 2024-06-07 15:03:13 +02:00
kssenii
7ea3345e0d Use ThreadFromGlobalPool in Poco::ThreadPool 2024-06-06 17:25:15 +02:00
kssenii
1e97d73bd0 Squashed commit of the following:
commit 27fe0439fa
Merge: bfb1c4c793 bb469e0d45
Author: Antonio Andelic <antonio@clickhouse.com>
Date:   Thu Jun 6 14:36:02 2024 +0200

    Merge branch 'master' into fix-global-trace-collector

commit bfb1c4c793
Author: Antonio Andelic <antonio@clickhouse.com>
Date:   Thu Jun 6 11:29:42 2024 +0200

    better

commit fcee260b25
Author: Antonio Andelic <antonio2368@users.noreply.github.com>
Date:   Thu Jun 6 11:22:48 2024 +0200

    Update src/Interpreters/TraceCollector.h

    Co-authored-by: alesapin <alesapin@clickhouse.com>

commit 1d3cf17053
Author: Antonio Andelic <antonio@clickhouse.com>
Date:   Thu Jun 6 11:11:08 2024 +0200

    Fix global trace collector
2024-06-06 17:13:37 +02:00
Antonio Andelic
f0e9703384 Some small improvements 2024-06-06 09:45:07 +02:00
Antonio Andelic
514941627b Merge branch 'master' into keeper-parallel-storage 2024-06-05 15:31:57 +02:00
Antonio Andelic
acc08c65d9 Add stopwatch 2024-05-22 11:56:45 +02:00
Antonio Andelic
f1e4403f98 Merge branch 'master' into keeper-parallel-storage 2024-05-22 11:39:57 +02:00
Antonio Andelic
b1d53f0472 Merge branch 'master' into keeper-parallel-storage 2024-04-29 15:13:19 +02:00
Antonio Andelic
bc3cfb008e Merge branch 'master' into keeper-parallel-storage 2024-03-25 13:14:57 +01:00
Antonio Andelic
9791a2ea40 Merge branch 'keeper-batch-flushes' into keeper-parallel-storage 2023-09-08 16:26:12 +00:00
Antonio Andelic
9fb9d16737 Merge branch 'keeper-batch-flushes' into keeper-parallel-storage 2023-09-06 13:30:05 +00:00
Antonio Andelic
6be1d0724a More mutex 2023-09-06 13:04:08 +00:00
Antonio Andelic
9238520490 Merge branch 'master' into keeper-parallel-storage 2023-09-06 10:57:33 +00:00
Antonio Andelic
dd1bb579df Better 2023-09-05 12:05:37 +00:00
Antonio Andelic
57943798b7 Merge branch 'master' into keeper-parallel-storage 2023-09-05 08:46:38 +00:00
Antonio Andelic
b43c3d75a2 Initial implementation 2023-09-04 14:49:49 +00:00
32 changed files with 3333 additions and 2683 deletions

2
contrib/libpqxx vendored

@ -1 +1 @@
Subproject commit c995193a3a14d71f4711f1f421f65a1a1db64640
Subproject commit 41e4c331564167cca97ad6eccbd5b8879c2ca044

View File

@ -1,9 +1,9 @@
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libpqxx")
set (SRCS
"${LIBRARY_DIR}/src/strconv.cxx"
"${LIBRARY_DIR}/src/array.cxx"
"${LIBRARY_DIR}/src/binarystring.cxx"
"${LIBRARY_DIR}/src/blob.cxx"
"${LIBRARY_DIR}/src/connection.cxx"
"${LIBRARY_DIR}/src/cursor.cxx"
"${LIBRARY_DIR}/src/encodings.cxx"
@ -12,59 +12,25 @@ set (SRCS
"${LIBRARY_DIR}/src/field.cxx"
"${LIBRARY_DIR}/src/largeobject.cxx"
"${LIBRARY_DIR}/src/notification.cxx"
"${LIBRARY_DIR}/src/params.cxx"
"${LIBRARY_DIR}/src/pipeline.cxx"
"${LIBRARY_DIR}/src/result.cxx"
"${LIBRARY_DIR}/src/robusttransaction.cxx"
"${LIBRARY_DIR}/src/row.cxx"
"${LIBRARY_DIR}/src/sql_cursor.cxx"
"${LIBRARY_DIR}/src/strconv.cxx"
"${LIBRARY_DIR}/src/stream_from.cxx"
"${LIBRARY_DIR}/src/stream_to.cxx"
"${LIBRARY_DIR}/src/subtransaction.cxx"
"${LIBRARY_DIR}/src/time.cxx"
"${LIBRARY_DIR}/src/transaction.cxx"
"${LIBRARY_DIR}/src/transaction_base.cxx"
"${LIBRARY_DIR}/src/row.cxx"
"${LIBRARY_DIR}/src/params.cxx"
"${LIBRARY_DIR}/src/util.cxx"
"${LIBRARY_DIR}/src/version.cxx"
"${LIBRARY_DIR}/src/wait.cxx"
)
# Need to explicitly include each header file, because in the directory include/pqxx there are also files
# like just 'array'. So if including the whole directory with `target_include_directories`, it will make
# conflicts with all includes of <array>.
set (HDRS
"${LIBRARY_DIR}/include/pqxx/array.hxx"
"${LIBRARY_DIR}/include/pqxx/params.hxx"
"${LIBRARY_DIR}/include/pqxx/binarystring.hxx"
"${LIBRARY_DIR}/include/pqxx/composite.hxx"
"${LIBRARY_DIR}/include/pqxx/connection.hxx"
"${LIBRARY_DIR}/include/pqxx/cursor.hxx"
"${LIBRARY_DIR}/include/pqxx/dbtransaction.hxx"
"${LIBRARY_DIR}/include/pqxx/errorhandler.hxx"
"${LIBRARY_DIR}/include/pqxx/except.hxx"
"${LIBRARY_DIR}/include/pqxx/field.hxx"
"${LIBRARY_DIR}/include/pqxx/isolation.hxx"
"${LIBRARY_DIR}/include/pqxx/largeobject.hxx"
"${LIBRARY_DIR}/include/pqxx/nontransaction.hxx"
"${LIBRARY_DIR}/include/pqxx/notification.hxx"
"${LIBRARY_DIR}/include/pqxx/pipeline.hxx"
"${LIBRARY_DIR}/include/pqxx/prepared_statement.hxx"
"${LIBRARY_DIR}/include/pqxx/result.hxx"
"${LIBRARY_DIR}/include/pqxx/robusttransaction.hxx"
"${LIBRARY_DIR}/include/pqxx/row.hxx"
"${LIBRARY_DIR}/include/pqxx/separated_list.hxx"
"${LIBRARY_DIR}/include/pqxx/strconv.hxx"
"${LIBRARY_DIR}/include/pqxx/stream_from.hxx"
"${LIBRARY_DIR}/include/pqxx/stream_to.hxx"
"${LIBRARY_DIR}/include/pqxx/subtransaction.hxx"
"${LIBRARY_DIR}/include/pqxx/transaction.hxx"
"${LIBRARY_DIR}/include/pqxx/transaction_base.hxx"
"${LIBRARY_DIR}/include/pqxx/types.hxx"
"${LIBRARY_DIR}/include/pqxx/util.hxx"
"${LIBRARY_DIR}/include/pqxx/version.hxx"
"${LIBRARY_DIR}/include/pqxx/zview.hxx"
)
add_library(_libpqxx ${SRCS} ${HDRS})
add_library(_libpqxx ${SRCS})
target_link_libraries(_libpqxx PUBLIC ch_contrib::libpq)
target_include_directories (_libpqxx SYSTEM BEFORE PUBLIC "${LIBRARY_DIR}/include")

View File

@ -181,12 +181,6 @@ void SetACLRequest::addRootPath(const String & root_path) { Coordination::addRoo
void GetACLRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void SyncRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void MultiRequest::addRootPath(const String & root_path)
{
for (auto & request : requests)
request->addRootPath(root_path);
}
void CreateResponse::removeRootPath(const String & root_path) { Coordination::removeRootPath(path_created, root_path); }
void WatchResponse::removeRootPath(const String & root_path) { Coordination::removeRootPath(path, root_path); }

View File

@ -408,11 +408,17 @@ struct ReconfigResponse : virtual Response
size_t bytesSize() const override { return value.size() + sizeof(stat); }
};
template <typename T>
struct MultiRequest : virtual Request
{
Requests requests;
std::vector<T> requests;
void addRootPath(const String & root_path) override
{
for (auto & request : requests)
request->addRootPath(root_path);
}
void addRootPath(const String & root_path) override;
String getPath() const override { return {}; }
size_t bytesSize() const override

View File

@ -184,7 +184,7 @@ struct TestKeeperReconfigRequest final : ReconfigRequest, TestKeeperRequest
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
struct TestKeeperMultiRequest final : MultiRequest<RequestPtr>, TestKeeperRequest
{
explicit TestKeeperMultiRequest(const Requests & generic_requests)
: TestKeeperMultiRequest(std::span(generic_requests))

View File

@ -18,14 +18,16 @@ using namespace DB;
void ZooKeeperResponse::write(WriteBuffer & out) const
{
/// Excessive copy to calculate length.
WriteBufferFromOwnString buf;
Coordination::write(xid, buf);
Coordination::write(zxid, buf);
Coordination::write(error, buf);
auto response_size = Coordination::size(xid) + Coordination::size(zxid) + Coordination::size(error);
if (error == Error::ZOK)
writeImpl(buf);
Coordination::write(buf.str(), out);
response_size += sizeImpl();
Coordination::write(static_cast<int32_t>(response_size), out);
Coordination::write(xid, out);
Coordination::write(zxid, out);
Coordination::write(error, out);
if (error == Error::ZOK)
writeImpl(out);
}
std::string ZooKeeperRequest::toString(bool short_format) const
@ -41,12 +43,12 @@ std::string ZooKeeperRequest::toString(bool short_format) const
void ZooKeeperRequest::write(WriteBuffer & out) const
{
/// Excessive copy to calculate length.
WriteBufferFromOwnString buf;
Coordination::write(xid, buf);
Coordination::write(getOpNum(), buf);
writeImpl(buf);
Coordination::write(buf.str(), out);
auto request_size = Coordination::size(xid) + Coordination::size(getOpNum()) + sizeImpl();
Coordination::write(static_cast<int32_t>(request_size), out);
Coordination::write(xid, out);
Coordination::write(getOpNum(), out);
writeImpl(out);
}
void ZooKeeperSyncRequest::writeImpl(WriteBuffer & out) const
@ -54,6 +56,11 @@ void ZooKeeperSyncRequest::writeImpl(WriteBuffer & out) const
Coordination::write(path, out);
}
size_t ZooKeeperSyncRequest::sizeImpl() const
{
return Coordination::size(path);
}
void ZooKeeperSyncRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
@ -74,6 +81,11 @@ void ZooKeeperSyncResponse::writeImpl(WriteBuffer & out) const
Coordination::write(path, out);
}
size_t ZooKeeperSyncResponse::sizeImpl() const
{
return Coordination::size(path);
}
void ZooKeeperReconfigRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(joining, out);
@ -82,6 +94,11 @@ void ZooKeeperReconfigRequest::writeImpl(WriteBuffer & out) const
Coordination::write(version, out);
}
size_t ZooKeeperReconfigRequest::sizeImpl() const
{
return Coordination::size(joining) + Coordination::size(leaving) + Coordination::size(new_members) + Coordination::size(version);
}
void ZooKeeperReconfigRequest::readImpl(ReadBuffer & in)
{
Coordination::read(joining, in);
@ -109,6 +126,11 @@ void ZooKeeperReconfigResponse::writeImpl(WriteBuffer & out) const
Coordination::write(stat, out);
}
size_t ZooKeeperReconfigResponse::sizeImpl() const
{
return Coordination::size(value) + Coordination::size(stat);
}
void ZooKeeperWatchResponse::readImpl(ReadBuffer & in)
{
Coordination::read(type, in);
@ -123,6 +145,11 @@ void ZooKeeperWatchResponse::writeImpl(WriteBuffer & out) const
Coordination::write(path, out);
}
size_t ZooKeeperWatchResponse::sizeImpl() const
{
return Coordination::size(type) + Coordination::size(state) + Coordination::size(path);
}
void ZooKeeperWatchResponse::write(WriteBuffer & out) const
{
if (error == Error::ZOK)
@ -137,6 +164,11 @@ void ZooKeeperAuthRequest::writeImpl(WriteBuffer & out) const
Coordination::write(data, out);
}
size_t ZooKeeperAuthRequest::sizeImpl() const
{
return Coordination::size(type) + Coordination::size(scheme) + Coordination::size(data);
}
void ZooKeeperAuthRequest::readImpl(ReadBuffer & in)
{
Coordination::read(type, in);
@ -175,6 +207,12 @@ void ZooKeeperCreateRequest::writeImpl(WriteBuffer & out) const
Coordination::write(flags, out);
}
size_t ZooKeeperCreateRequest::sizeImpl() const
{
int32_t flags = 0;
return Coordination::size(path) + Coordination::size(data) + Coordination::size(acls) + Coordination::size(flags);
}
void ZooKeeperCreateRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
@ -211,12 +249,22 @@ void ZooKeeperCreateResponse::writeImpl(WriteBuffer & out) const
Coordination::write(path_created, out);
}
size_t ZooKeeperCreateResponse::sizeImpl() const
{
return Coordination::size(path_created);
}
void ZooKeeperRemoveRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
Coordination::write(version, out);
}
size_t ZooKeeperRemoveRequest::sizeImpl() const
{
return Coordination::size(path) + Coordination::size(version);
}
std::string ZooKeeperRemoveRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format(
@ -244,6 +292,11 @@ void ZooKeeperRemoveRecursiveRequest::readImpl(ReadBuffer & in)
Coordination::read(remove_nodes_limit, in);
}
size_t ZooKeeperRemoveRecursiveRequest::sizeImpl() const
{
return Coordination::size(path) + Coordination::size(remove_nodes_limit);
}
std::string ZooKeeperRemoveRecursiveRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format(
@ -259,6 +312,11 @@ void ZooKeeperExistsRequest::writeImpl(WriteBuffer & out) const
Coordination::write(has_watch, out);
}
size_t ZooKeeperExistsRequest::sizeImpl() const
{
return Coordination::size(path) + Coordination::size(has_watch);
}
void ZooKeeperExistsRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
@ -280,12 +338,22 @@ void ZooKeeperExistsResponse::writeImpl(WriteBuffer & out) const
Coordination::write(stat, out);
}
size_t ZooKeeperExistsResponse::sizeImpl() const
{
return Coordination::size(stat);
}
void ZooKeeperGetRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
Coordination::write(has_watch, out);
}
size_t ZooKeeperGetRequest::sizeImpl() const
{
return Coordination::size(path) + Coordination::size(has_watch);
}
void ZooKeeperGetRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
@ -309,6 +377,11 @@ void ZooKeeperGetResponse::writeImpl(WriteBuffer & out) const
Coordination::write(stat, out);
}
size_t ZooKeeperGetResponse::sizeImpl() const
{
return Coordination::size(data) + Coordination::size(stat);
}
void ZooKeeperSetRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
@ -316,6 +389,11 @@ void ZooKeeperSetRequest::writeImpl(WriteBuffer & out) const
Coordination::write(version, out);
}
size_t ZooKeeperSetRequest::sizeImpl() const
{
return Coordination::size(path) + Coordination::size(data) + Coordination::size(version);
}
void ZooKeeperSetRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
@ -342,12 +420,22 @@ void ZooKeeperSetResponse::writeImpl(WriteBuffer & out) const
Coordination::write(stat, out);
}
size_t ZooKeeperSetResponse::sizeImpl() const
{
return Coordination::size(stat);
}
void ZooKeeperListRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
Coordination::write(has_watch, out);
}
size_t ZooKeeperListRequest::sizeImpl() const
{
return Coordination::size(path) + Coordination::size(has_watch);
}
void ZooKeeperListRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
@ -366,6 +454,11 @@ void ZooKeeperFilteredListRequest::writeImpl(WriteBuffer & out) const
Coordination::write(static_cast<uint8_t>(list_request_type), out);
}
size_t ZooKeeperFilteredListRequest::sizeImpl() const
{
return Coordination::size(path) + Coordination::size(has_watch) + Coordination::size(static_cast<uint8_t>(list_request_type));
}
void ZooKeeperFilteredListRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
@ -397,6 +490,11 @@ void ZooKeeperListResponse::writeImpl(WriteBuffer & out) const
Coordination::write(stat, out);
}
size_t ZooKeeperListResponse::sizeImpl() const
{
return Coordination::size(names) + Coordination::size(stat);
}
void ZooKeeperSimpleListResponse::readImpl(ReadBuffer & in)
{
Coordination::read(names, in);
@ -407,6 +505,11 @@ void ZooKeeperSimpleListResponse::writeImpl(WriteBuffer & out) const
Coordination::write(names, out);
}
size_t ZooKeeperSimpleListResponse::sizeImpl() const
{
return Coordination::size(names);
}
void ZooKeeperSetACLRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
@ -414,6 +517,11 @@ void ZooKeeperSetACLRequest::writeImpl(WriteBuffer & out) const
Coordination::write(version, out);
}
size_t ZooKeeperSetACLRequest::sizeImpl() const
{
return Coordination::size(path) + Coordination::size(acls) + Coordination::size(version);
}
void ZooKeeperSetACLRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
@ -431,6 +539,11 @@ void ZooKeeperSetACLResponse::writeImpl(WriteBuffer & out) const
Coordination::write(stat, out);
}
size_t ZooKeeperSetACLResponse::sizeImpl() const
{
return Coordination::size(stat);
}
void ZooKeeperSetACLResponse::readImpl(ReadBuffer & in)
{
Coordination::read(stat, in);
@ -446,6 +559,11 @@ void ZooKeeperGetACLRequest::writeImpl(WriteBuffer & out) const
Coordination::write(path, out);
}
size_t ZooKeeperGetACLRequest::sizeImpl() const
{
return Coordination::size(path);
}
std::string ZooKeeperGetACLRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format("path = {}", path);
@ -457,6 +575,11 @@ void ZooKeeperGetACLResponse::writeImpl(WriteBuffer & out) const
Coordination::write(stat, out);
}
size_t ZooKeeperGetACLResponse::sizeImpl() const
{
return Coordination::size(acl) + Coordination::size(stat);
}
void ZooKeeperGetACLResponse::readImpl(ReadBuffer & in)
{
Coordination::read(acl, in);
@ -469,6 +592,11 @@ void ZooKeeperCheckRequest::writeImpl(WriteBuffer & out) const
Coordination::write(version, out);
}
size_t ZooKeeperCheckRequest::sizeImpl() const
{
return Coordination::size(path) + Coordination::size(version);
}
void ZooKeeperCheckRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
@ -494,6 +622,11 @@ void ZooKeeperErrorResponse::writeImpl(WriteBuffer & out) const
Coordination::write(error, out);
}
size_t ZooKeeperErrorResponse::sizeImpl() const
{
return Coordination::size(error);
}
void ZooKeeperMultiRequest::checkOperationType(OperationType type)
{
chassert(!operation_type.has_value() || *operation_type == type);
@ -596,6 +729,27 @@ void ZooKeeperMultiRequest::writeImpl(WriteBuffer & out) const
Coordination::write(error, out);
}
size_t ZooKeeperMultiRequest::sizeImpl() const
{
size_t total_size = 0;
for (const auto & request : requests)
{
const auto & zk_request = dynamic_cast<const ZooKeeperRequest &>(*request);
bool done = false;
int32_t error = -1;
total_size
+= Coordination::size(zk_request.getOpNum()) + Coordination::size(done) + Coordination::size(error) + zk_request.sizeImpl();
}
OpNum op_num = OpNum::Error;
bool done = true;
int32_t error = -1;
return total_size + Coordination::size(op_num) + Coordination::size(done) + Coordination::size(error);
}
void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
{
while (true)
@ -729,31 +883,54 @@ void ZooKeeperMultiResponse::writeImpl(WriteBuffer & out) const
}
}
ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperHeartbeatResponse>()); }
ZooKeeperResponsePtr ZooKeeperSyncRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSyncResponse>()); }
ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperAuthResponse>()); }
ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperRemoveResponse>()); }
ZooKeeperResponsePtr ZooKeeperRemoveRecursiveRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperRemoveRecursiveResponse>()); }
ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperExistsResponse>()); }
ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetResponse>()); }
ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetResponse>()); }
ZooKeeperResponsePtr ZooKeeperReconfigRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperReconfigResponse>()); }
ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperListResponse>()); }
ZooKeeperResponsePtr ZooKeeperSimpleListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSimpleListResponse>()); }
size_t ZooKeeperMultiResponse::sizeImpl() const
{
size_t total_size = 0;
for (const auto & response : responses)
{
const ZooKeeperResponse & zk_response = dynamic_cast<const ZooKeeperResponse &>(*response);
OpNum op_num = zk_response.getOpNum();
bool done = false;
Error op_error = zk_response.error;
total_size += Coordination::size(op_num) + Coordination::size(done) + Coordination::size(op_error);
if (op_error == Error::ZOK || op_num == OpNum::Error)
total_size += zk_response.sizeImpl();
}
/// Footer.
OpNum op_num = OpNum::Error;
bool done = true;
int32_t error_read = - 1;
return total_size + Coordination::size(op_num) + Coordination::size(done) + Coordination::size(error_read);
}
ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return std::make_shared<ZooKeeperHeartbeatResponse>(); }
ZooKeeperResponsePtr ZooKeeperSyncRequest::makeResponse() const { return std::make_shared<ZooKeeperSyncResponse>(); }
ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return std::make_shared<ZooKeeperAuthResponse>(); }
ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return std::make_shared<ZooKeeperRemoveResponse>(); }
ZooKeeperResponsePtr ZooKeeperRemoveRecursiveRequest::makeResponse() const { return std::make_shared<ZooKeeperRemoveRecursiveResponse>(); }
ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return std::make_shared<ZooKeeperExistsResponse>(); }
ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return std::make_shared<ZooKeeperGetResponse>(); }
ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return std::make_shared<ZooKeeperSetResponse>(); }
ZooKeeperResponsePtr ZooKeeperReconfigRequest::makeResponse() const { return std::make_shared<ZooKeeperReconfigResponse>(); }
ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return std::make_shared<ZooKeeperListResponse>(); }
ZooKeeperResponsePtr ZooKeeperSimpleListRequest::makeResponse() const { return std::make_shared<ZooKeeperSimpleListResponse>(); }
ZooKeeperResponsePtr ZooKeeperCreateRequest::makeResponse() const
{
if (not_exists)
return setTime(std::make_shared<ZooKeeperCreateIfNotExistsResponse>());
return setTime(std::make_shared<ZooKeeperCreateResponse>());
return std::make_shared<ZooKeeperCreateIfNotExistsResponse>();
return std::make_shared<ZooKeeperCreateResponse>();
}
ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const
{
if (not_exists)
return setTime(std::make_shared<ZooKeeperCheckNotExistsResponse>());
return std::make_shared<ZooKeeperCheckNotExistsResponse>();
return setTime(std::make_shared<ZooKeeperCheckResponse>());
return std::make_shared<ZooKeeperCheckResponse>();
}
ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const
@ -764,11 +941,12 @@ ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const
else
response = std::make_shared<ZooKeeperMultiReadResponse>(requests);
return setTime(std::move(response));
return std::move(response);
}
ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperCloseResponse>()); }
ZooKeeperResponsePtr ZooKeeperSetACLRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetACLResponse>()); }
ZooKeeperResponsePtr ZooKeeperGetACLRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetACLResponse>()); }
ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return std::make_shared<ZooKeeperCloseResponse>(); }
ZooKeeperResponsePtr ZooKeeperSetACLRequest::makeResponse() const { return std::make_shared<ZooKeeperSetACLResponse>(); }
ZooKeeperResponsePtr ZooKeeperGetACLRequest::makeResponse() const { return std::make_shared<ZooKeeperGetACLResponse>(); }
void ZooKeeperSessionIDRequest::writeImpl(WriteBuffer & out) const
{
@ -777,6 +955,11 @@ void ZooKeeperSessionIDRequest::writeImpl(WriteBuffer & out) const
Coordination::write(server_id, out);
}
size_t ZooKeeperSessionIDRequest::sizeImpl() const
{
return Coordination::size(internal_id) + Coordination::size(session_timeout_ms) + Coordination::size(server_id);
}
void ZooKeeperSessionIDRequest::readImpl(ReadBuffer & in)
{
Coordination::read(internal_id, in);
@ -803,6 +986,11 @@ void ZooKeeperSessionIDResponse::writeImpl(WriteBuffer & out) const
Coordination::write(server_id, out);
}
size_t ZooKeeperSessionIDResponse::sizeImpl() const
{
return Coordination::size(internal_id) + Coordination::size(session_id) + Coordination::size(server_id);
}
void ZooKeeperRequest::createLogElements(LogElements & elems) const
{
@ -960,40 +1148,6 @@ std::shared_ptr<ZooKeeperRequest> ZooKeeperRequest::read(ReadBuffer & in)
return request;
}
ZooKeeperRequest::~ZooKeeperRequest()
{
if (!request_created_time_ns)
return;
UInt64 elapsed_ns = clock_gettime_ns() - request_created_time_ns;
constexpr UInt64 max_request_time_ns = 1000000000ULL; /// 1 sec
if (max_request_time_ns < elapsed_ns)
{
LOG_TEST(getLogger(__PRETTY_FUNCTION__), "Processing of request xid={} took {} ms", xid, elapsed_ns / 1000000UL);
}
}
ZooKeeperResponsePtr ZooKeeperRequest::setTime(ZooKeeperResponsePtr response) const
{
if (request_created_time_ns)
{
response->response_created_time_ns = clock_gettime_ns();
}
return response;
}
ZooKeeperResponse::~ZooKeeperResponse()
{
if (!response_created_time_ns)
return;
UInt64 elapsed_ns = clock_gettime_ns() - response_created_time_ns;
constexpr UInt64 max_request_time_ns = 1000000000ULL; /// 1 sec
if (max_request_time_ns < elapsed_ns)
{
LOG_TEST(getLogger(__PRETTY_FUNCTION__), "Processing of response xid={} took {} ms", xid, elapsed_ns / 1000000UL);
}
}
ZooKeeperRequestPtr ZooKeeperRequestFactory::get(OpNum op_num) const
{
auto it = op_num_to_request.find(op_num);
@ -1015,7 +1169,6 @@ void registerZooKeeperRequest(ZooKeeperRequestFactory & factory)
factory.registerRequest(num, []
{
auto res = std::make_shared<RequestT>();
res->request_created_time_ns = clock_gettime_ns();
if constexpr (num == OpNum::MultiRead)
res->operation_type = ZooKeeperMultiRequest::OperationType::Read;

View File

@ -7,13 +7,11 @@
#include <boost/noncopyable.hpp>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <unordered_map>
#include <vector>
#include <memory>
#include <cstdint>
#include <optional>
#include <functional>
#include <span>
namespace Coordination
@ -25,13 +23,11 @@ struct ZooKeeperResponse : virtual Response
{
XID xid = 0;
UInt64 response_created_time_ns = 0;
ZooKeeperResponse() = default;
ZooKeeperResponse(const ZooKeeperResponse &) = default;
~ZooKeeperResponse() override;
virtual void readImpl(ReadBuffer &) = 0;
virtual void writeImpl(WriteBuffer &) const = 0;
virtual size_t sizeImpl() const = 0;
virtual void write(WriteBuffer & out) const;
virtual OpNum getOpNum() const = 0;
virtual void fillLogElements(LogElements & elems, size_t idx) const;
@ -51,13 +47,11 @@ struct ZooKeeperRequest : virtual Request
bool restored_from_zookeeper_log = false;
UInt64 request_created_time_ns = 0;
UInt64 thread_id = 0;
String query_id;
ZooKeeperRequest() = default;
ZooKeeperRequest(const ZooKeeperRequest &) = default;
~ZooKeeperRequest() override;
virtual OpNum getOpNum() const = 0;
@ -66,6 +60,7 @@ struct ZooKeeperRequest : virtual Request
std::string toString(bool short_format = false) const;
virtual void writeImpl(WriteBuffer &) const = 0;
virtual size_t sizeImpl() const = 0;
virtual void readImpl(ReadBuffer &) = 0;
virtual std::string toStringImpl(bool /*short_format*/) const { return ""; }
@ -73,7 +68,6 @@ struct ZooKeeperRequest : virtual Request
static std::shared_ptr<ZooKeeperRequest> read(ReadBuffer & in);
virtual ZooKeeperResponsePtr makeResponse() const = 0;
ZooKeeperResponsePtr setTime(ZooKeeperResponsePtr response) const;
virtual bool isReadRequest() const = 0;
virtual void createLogElements(LogElements & elems) const;
@ -86,6 +80,7 @@ struct ZooKeeperHeartbeatRequest final : ZooKeeperRequest
String getPath() const override { return {}; }
OpNum getOpNum() const override { return OpNum::Heartbeat; }
void writeImpl(WriteBuffer &) const override {}
size_t sizeImpl() const override { return 0; }
void readImpl(ReadBuffer &) override {}
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -97,6 +92,7 @@ struct ZooKeeperSyncRequest final : ZooKeeperRequest
String getPath() const override { return path; }
OpNum getOpNum() const override { return OpNum::Sync; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
@ -109,6 +105,7 @@ struct ZooKeeperSyncResponse final : SyncResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
OpNum getOpNum() const override { return OpNum::Sync; }
};
@ -122,6 +119,7 @@ struct ZooKeeperReconfigRequest final : ZooKeeperRequest
String getPath() const override { return keeper_config_path; }
OpNum getOpNum() const override { return OpNum::Reconfig; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
@ -138,6 +136,7 @@ struct ZooKeeperReconfigResponse final : ReconfigResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
OpNum getOpNum() const override { return OpNum::Reconfig; }
};
@ -145,6 +144,7 @@ struct ZooKeeperHeartbeatResponse final : ZooKeeperResponse
{
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
size_t sizeImpl() const override { return 0; }
OpNum getOpNum() const override { return OpNum::Heartbeat; }
};
@ -153,6 +153,7 @@ struct ZooKeeperWatchResponse final : WatchResponse, ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void write(WriteBuffer & out) const override;
@ -175,6 +176,7 @@ struct ZooKeeperAuthRequest final : ZooKeeperRequest
String getPath() const override { return {}; }
OpNum getOpNum() const override { return OpNum::Auth; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
@ -189,6 +191,7 @@ struct ZooKeeperAuthResponse final : ZooKeeperResponse
{
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
size_t sizeImpl() const override { return 0; }
OpNum getOpNum() const override { return OpNum::Auth; }
@ -200,6 +203,7 @@ struct ZooKeeperCloseRequest final : ZooKeeperRequest
String getPath() const override { return {}; }
OpNum getOpNum() const override { return OpNum::Close; }
void writeImpl(WriteBuffer &) const override {}
size_t sizeImpl() const override { return 0; }
void readImpl(ReadBuffer &) override {}
ZooKeeperResponsePtr makeResponse() const override;
@ -214,6 +218,7 @@ struct ZooKeeperCloseResponse final : ZooKeeperResponse
}
void writeImpl(WriteBuffer &) const override {}
size_t sizeImpl() const override { return 0; }
OpNum getOpNum() const override { return OpNum::Close; }
};
@ -228,6 +233,7 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
OpNum getOpNum() const override { return not_exists ? OpNum::CreateIfNotExists : OpNum::Create; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
@ -244,6 +250,7 @@ struct ZooKeeperCreateResponse : CreateResponse, ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
OpNum getOpNum() const override { return OpNum::Create; }
@ -265,6 +272,7 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Remove; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
@ -280,6 +288,7 @@ struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
size_t sizeImpl() const override { return 0; }
OpNum getOpNum() const override { return OpNum::Remove; }
size_t bytesSize() const override { return RemoveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
@ -293,6 +302,7 @@ struct ZooKeeperRemoveRecursiveRequest final : RemoveRecursiveRequest, ZooKeeper
OpNum getOpNum() const override { return OpNum::RemoveRecursive; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
size_t sizeImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
@ -305,6 +315,7 @@ struct ZooKeeperRemoveRecursiveResponse : RemoveRecursiveResponse, ZooKeeperResp
{
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
size_t sizeImpl() const override { return 0; }
OpNum getOpNum() const override { return OpNum::RemoveRecursive; }
size_t bytesSize() const override { return RemoveRecursiveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
@ -317,6 +328,7 @@ struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Exists; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
@ -330,6 +342,7 @@ struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
OpNum getOpNum() const override { return OpNum::Exists; }
size_t bytesSize() const override { return ExistsResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
@ -344,6 +357,7 @@ struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Get; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
@ -357,6 +371,7 @@ struct ZooKeeperGetResponse final : GetResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
OpNum getOpNum() const override { return OpNum::Get; }
size_t bytesSize() const override { return GetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
@ -371,6 +386,7 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Set; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
@ -385,6 +401,7 @@ struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
OpNum getOpNum() const override { return OpNum::Set; }
size_t bytesSize() const override { return SetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
@ -399,6 +416,7 @@ struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::List; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
@ -419,6 +437,7 @@ struct ZooKeeperFilteredListRequest final : ZooKeeperListRequest
OpNum getOpNum() const override { return OpNum::FilteredList; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
@ -429,6 +448,7 @@ struct ZooKeeperListResponse : ListResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
OpNum getOpNum() const override { return OpNum::List; }
size_t bytesSize() const override { return ListResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
@ -440,6 +460,7 @@ struct ZooKeeperSimpleListResponse final : ZooKeeperListResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
OpNum getOpNum() const override { return OpNum::SimpleList; }
size_t bytesSize() const override { return ZooKeeperListResponse::bytesSize() - sizeof(stat); }
@ -452,6 +473,7 @@ struct ZooKeeperCheckRequest : CheckRequest, ZooKeeperRequest
OpNum getOpNum() const override { return not_exists ? OpNum::CheckNotExists : OpNum::Check; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
@ -467,6 +489,7 @@ struct ZooKeeperCheckResponse : CheckResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
size_t sizeImpl() const override { return 0; }
OpNum getOpNum() const override { return OpNum::Check; }
size_t bytesSize() const override { return CheckResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
@ -483,6 +506,7 @@ struct ZooKeeperErrorResponse final : ErrorResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
OpNum getOpNum() const override { return OpNum::Error; }
@ -493,6 +517,7 @@ struct ZooKeeperSetACLRequest final : SetACLRequest, ZooKeeperRequest
{
OpNum getOpNum() const override { return OpNum::SetACL; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
@ -505,6 +530,7 @@ struct ZooKeeperSetACLResponse final : SetACLResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
OpNum getOpNum() const override { return OpNum::SetACL; }
size_t bytesSize() const override { return SetACLResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
@ -514,6 +540,7 @@ struct ZooKeeperGetACLRequest final : GetACLRequest, ZooKeeperRequest
{
OpNum getOpNum() const override { return OpNum::GetACL; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
@ -526,12 +553,13 @@ struct ZooKeeperGetACLResponse final : GetACLResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
OpNum getOpNum() const override { return OpNum::GetACL; }
size_t bytesSize() const override { return GetACLResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
struct ZooKeeperMultiRequest final : MultiRequest<ZooKeeperRequestPtr>, ZooKeeperRequest
{
OpNum getOpNum() const override;
ZooKeeperMultiRequest() = default;
@ -540,6 +568,7 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
ZooKeeperMultiRequest(std::span<const Coordination::RequestPtr> generic_requests, const ACLs & default_acls);
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
@ -563,12 +592,14 @@ private:
struct ZooKeeperMultiResponse : MultiResponse, ZooKeeperResponse
{
explicit ZooKeeperMultiResponse(const Requests & requests)
ZooKeeperMultiResponse() = default;
explicit ZooKeeperMultiResponse(const std::vector<ZooKeeperRequestPtr> & requests)
{
responses.reserve(requests.size());
for (const auto & request : requests)
responses.emplace_back(dynamic_cast<const ZooKeeperRequest &>(*request).makeResponse());
responses.emplace_back(request->makeResponse());
}
explicit ZooKeeperMultiResponse(const Responses & responses_)
@ -579,6 +610,7 @@ struct ZooKeeperMultiResponse : MultiResponse, ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
size_t bytesSize() const override { return MultiResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
@ -609,6 +641,7 @@ struct ZooKeeperSessionIDRequest final : ZooKeeperRequest
Coordination::OpNum getOpNum() const override { return OpNum::SessionID; }
String getPath() const override { return {}; }
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
Coordination::ZooKeeperResponsePtr makeResponse() const override;
@ -627,6 +660,7 @@ struct ZooKeeperSessionIDResponse final : ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
Coordination::OpNum getOpNum() const override { return OpNum::SessionID; }
};

View File

@ -42,6 +42,32 @@ void write(const Error & x, WriteBuffer & out)
write(static_cast<int32_t>(x), out);
}
size_t size(OpNum x)
{
return size(static_cast<int32_t>(x));
}
size_t size(const std::string & s)
{
return size(static_cast<int32_t>(s.size())) + s.size();
}
size_t size(const ACL & acl)
{
return size(acl.permissions) + size(acl.scheme) + size(acl.id);
}
size_t size(const Stat & stat)
{
return size(stat.czxid) + size(stat.mzxid) + size(stat.ctime) + size(stat.mtime) + size(stat.version) + size(stat.cversion)
+ size(stat.aversion) + size(stat.ephemeralOwner) + size(stat.dataLength) + size(stat.numChildren) + size(stat.pzxid);
}
size_t size(const Error & x)
{
return size(static_cast<int32_t>(x));
}
void read(OpNum & x, ReadBuffer & in)
{
int32_t raw_op_num;

View File

@ -43,6 +43,36 @@ void write(const std::vector<T> & arr, WriteBuffer & out)
write(elem, out);
}
template <typename T>
requires is_arithmetic_v<T>
size_t size(T x)
{
return sizeof(x);
}
size_t size(OpNum x);
size_t size(const std::string & s);
size_t size(const ACL & acl);
size_t size(const Stat & stat);
size_t size(const Error & x);
template <size_t N>
size_t size(const std::array<char, N>)
{
return size(static_cast<int32_t>(N)) + N;
}
template <typename T>
size_t size(const std::vector<T> & arr)
{
size_t total_size = size(static_cast<int32_t>(arr.size()));
for (const auto & elem : arr)
total_size += size(elem);
return total_size;
}
template <typename T>
requires is_arithmetic_v<T>
void read(T & x, ReadBuffer & in)

View File

@ -45,6 +45,7 @@ uint64_t ACLMap::convertACLs(const Coordination::ACLs & acls)
if (acls.empty())
return 0;
std::lock_guard lock(map_mutex);
if (acl_to_num.contains(acls))
return acl_to_num[acls];
@ -62,6 +63,7 @@ Coordination::ACLs ACLMap::convertNumber(uint64_t acls_id) const
if (acls_id == 0)
return Coordination::ACLs{};
std::lock_guard lock(map_mutex);
if (!num_to_acl.contains(acls_id))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown ACL id {}. It's a bug", acls_id);
@ -70,6 +72,7 @@ Coordination::ACLs ACLMap::convertNumber(uint64_t acls_id) const
void ACLMap::addMapping(uint64_t acls_id, const Coordination::ACLs & acls)
{
std::lock_guard lock(map_mutex);
num_to_acl[acls_id] = acls;
acl_to_num[acls] = acls_id;
max_acl_id = std::max(acls_id + 1, max_acl_id); /// max_acl_id pointer next slot
@ -77,11 +80,13 @@ void ACLMap::addMapping(uint64_t acls_id, const Coordination::ACLs & acls)
void ACLMap::addUsage(uint64_t acl_id)
{
std::lock_guard lock(map_mutex);
usage_counter[acl_id]++;
}
void ACLMap::removeUsage(uint64_t acl_id)
{
std::lock_guard lock(map_mutex);
if (!usage_counter.contains(acl_id))
return;

View File

@ -32,6 +32,8 @@ private:
NumToACLMap num_to_acl;
UsageCounter usage_counter;
uint64_t max_acl_id{1};
mutable std::mutex map_mutex;
public:
/// Convert ACL to number. If it's new ACL than adds it to map

View File

@ -301,11 +301,13 @@ String MonitorCommand::run()
print(ret, "server_state", keeper_info.getRole());
print(ret, "znode_count", state_machine.getNodesCount());
print(ret, "watch_count", state_machine.getTotalWatchesCount());
print(ret, "ephemerals_count", state_machine.getTotalEphemeralNodesCount());
print(ret, "approximate_data_size", state_machine.getApproximateDataSize());
print(ret, "key_arena_size", state_machine.getKeyArenaSize());
const auto & storage_stats = state_machine.getStorageStats();
print(ret, "znode_count", storage_stats.nodes_count.load(std::memory_order_relaxed));
print(ret, "watch_count", storage_stats.total_watches_count.load(std::memory_order_relaxed));
print(ret, "ephemerals_count", storage_stats.total_emphemeral_nodes_count.load(std::memory_order_relaxed));
print(ret, "approximate_data_size", storage_stats.approximate_data_size.load(std::memory_order_relaxed));
print(ret, "key_arena_size", 0);
print(ret, "latest_snapshot_size", state_machine.getLatestSnapshotSize());
#if defined(OS_LINUX) || defined(OS_DARWIN)
@ -387,6 +389,7 @@ String ServerStatCommand::run()
auto & stats = keeper_dispatcher.getKeeperConnectionStats();
Keeper4LWInfo keeper_info = keeper_dispatcher.getKeeper4LWInfo();
const auto & storage_stats = keeper_dispatcher.getStateMachine().getStorageStats();
write("ClickHouse Keeper version", String(VERSION_DESCRIBE) + "-" + VERSION_GITHASH);
@ -398,9 +401,9 @@ String ServerStatCommand::run()
write("Sent", toString(stats.getPacketsSent()));
write("Connections", toString(keeper_info.alive_connections_count));
write("Outstanding", toString(keeper_info.outstanding_requests_count));
write("Zxid", formatZxid(keeper_info.last_zxid));
write("Zxid", formatZxid(storage_stats.last_zxid.load(std::memory_order_relaxed)));
write("Mode", keeper_info.getRole());
write("Node count", toString(keeper_info.total_nodes_count));
write("Node count", toString(storage_stats.nodes_count.load(std::memory_order_relaxed)));
return buf.str();
}
@ -416,6 +419,7 @@ String StatCommand::run()
auto & stats = keeper_dispatcher.getKeeperConnectionStats();
Keeper4LWInfo keeper_info = keeper_dispatcher.getKeeper4LWInfo();
const auto & storage_stats = keeper_dispatcher.getStateMachine().getStorageStats();
write("ClickHouse Keeper version", String(VERSION_DESCRIBE) + "-" + VERSION_GITHASH);
@ -431,9 +435,9 @@ String StatCommand::run()
write("Sent", toString(stats.getPacketsSent()));
write("Connections", toString(keeper_info.alive_connections_count));
write("Outstanding", toString(keeper_info.outstanding_requests_count));
write("Zxid", formatZxid(keeper_info.last_zxid));
write("Zxid", formatZxid(storage_stats.last_zxid.load(std::memory_order_relaxed)));
write("Mode", keeper_info.getRole());
write("Node count", toString(keeper_info.total_nodes_count));
write("Node count", toString(storage_stats.nodes_count.load(std::memory_order_relaxed)));
return buf.str();
}

View File

@ -1,7 +1,5 @@
#pragma once
#include <string>
#include <base/types.h>
#include <Common/Exception.h>
@ -30,9 +28,6 @@ struct Keeper4LWInfo
uint64_t follower_count;
uint64_t synced_follower_count;
uint64_t total_nodes_count;
int64_t last_zxid;
String getRole() const
{
if (is_standalone)

View File

@ -38,15 +38,16 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
is_follower = static_cast<size_t>(keeper_info.is_follower);
is_exceeding_mem_soft_limit = static_cast<size_t>(keeper_info.is_exceeding_mem_soft_limit);
zxid = keeper_info.last_zxid;
const auto & state_machine = keeper_dispatcher.getStateMachine();
znode_count = state_machine.getNodesCount();
watch_count = state_machine.getTotalWatchesCount();
ephemerals_count = state_machine.getTotalEphemeralNodesCount();
approximate_data_size = state_machine.getApproximateDataSize();
key_arena_size = state_machine.getKeyArenaSize();
session_with_watches = state_machine.getSessionsWithWatchesCount();
paths_watched = state_machine.getWatchedPathsCount();
const auto & storage_stats = state_machine.getStorageStats();
zxid = storage_stats.last_zxid.load(std::memory_order_relaxed);
znode_count = storage_stats.nodes_count.load(std::memory_order_relaxed);
watch_count = storage_stats.total_watches_count.load(std::memory_order_relaxed);
ephemerals_count = storage_stats.total_emphemeral_nodes_count.load(std::memory_order_relaxed);
approximate_data_size = storage_stats.approximate_data_size.load(std::memory_order_relaxed);
key_arena_size = 0;
session_with_watches = storage_stats.sessions_with_watches_count.load(std::memory_order_relaxed);
paths_watched = storage_stats.watched_paths_count.load(std::memory_order_relaxed);
# if defined(__linux__) || defined(__APPLE__)
open_file_descriptor_count = getCurrentProcessFDCount();

View File

@ -305,7 +305,7 @@ void KeeperDispatcher::requestThread()
if (has_read_request)
{
if (server->isLeaderAlive())
server->putLocalReadRequest(request);
server->putLocalReadRequest({request});
else
addErrorResponses({request}, Coordination::Error::ZCONNECTIONLOSS);
}

View File

@ -1207,8 +1207,6 @@ Keeper4LWInfo KeeperServer::getPartiallyFilled4LWInfo() const
result.synced_follower_count = getSyncedFollowerCount();
}
result.is_exceeding_mem_soft_limit = isExceedingMemorySoftLimit();
result.total_nodes_count = getKeeperStateMachine()->getNodesCount();
result.last_zxid = getKeeperStateMachine()->getLastProcessedZxid();
return result;
}

View File

@ -78,20 +78,20 @@ namespace
writeBinary(false, out);
/// Serialize stat
writeBinary(node.czxid, out);
writeBinary(node.mzxid, out);
writeBinary(node.ctime(), out);
writeBinary(node.mtime, out);
writeBinary(node.version, out);
writeBinary(node.cversion, out);
writeBinary(node.aversion, out);
writeBinary(node.ephemeralOwner(), out);
writeBinary(node.stats.czxid, out);
writeBinary(node.stats.mzxid, out);
writeBinary(node.stats.ctime(), out);
writeBinary(node.stats.mtime, out);
writeBinary(node.stats.version, out);
writeBinary(node.stats.cversion, out);
writeBinary(node.stats.aversion, out);
writeBinary(node.stats.ephemeralOwner(), out);
if (version < SnapshotVersion::V6)
writeBinary(static_cast<int32_t>(node.getData().size()), out);
writeBinary(node.numChildren(), out);
writeBinary(node.pzxid, out);
writeBinary(static_cast<int32_t>(node.stats.data_size), out);
writeBinary(node.stats.numChildren(), out);
writeBinary(node.stats.pzxid, out);
writeBinary(node.seqNum(), out);
writeBinary(node.stats.seqNum(), out);
if (version >= SnapshotVersion::V4 && version <= SnapshotVersion::V5)
writeBinary(node.sizeInBytes(), out);
@ -100,11 +100,11 @@ namespace
template<typename Node>
void readNode(Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map)
{
readVarUInt(node.data_size, in);
if (node.data_size != 0)
readVarUInt(node.stats.data_size, in);
if (node.stats.data_size != 0)
{
node.data = std::unique_ptr<char[]>(new char[node.data_size]);
in.readStrict(node.data.get(), node.data_size);
node.data = std::unique_ptr<char[]>(new char[node.stats.data_size]);
in.readStrict(node.data.get(), node.stats.data_size);
}
if (version >= SnapshotVersion::V1)
@ -141,19 +141,19 @@ namespace
}
/// Deserialize stat
readBinary(node.czxid, in);
readBinary(node.mzxid, in);
readBinary(node.stats.czxid, in);
readBinary(node.stats.mzxid, in);
int64_t ctime;
readBinary(ctime, in);
node.setCtime(ctime);
readBinary(node.mtime, in);
readBinary(node.version, in);
readBinary(node.cversion, in);
readBinary(node.aversion, in);
node.stats.setCtime(ctime);
readBinary(node.stats.mtime, in);
readBinary(node.stats.version, in);
readBinary(node.stats.cversion, in);
readBinary(node.stats.aversion, in);
int64_t ephemeral_owner = 0;
readBinary(ephemeral_owner, in);
if (ephemeral_owner != 0)
node.setEphemeralOwner(ephemeral_owner);
node.stats.setEphemeralOwner(ephemeral_owner);
if (version < SnapshotVersion::V6)
{
@ -163,14 +163,14 @@ namespace
int32_t num_children = 0;
readBinary(num_children, in);
if (ephemeral_owner == 0)
node.setNumChildren(num_children);
node.stats.setNumChildren(num_children);
readBinary(node.pzxid, in);
readBinary(node.stats.pzxid, in);
int32_t seq_num = 0;
readBinary(seq_num, in);
if (ephemeral_owner == 0)
node.setSeqNum(seq_num);
node.stats.setSeqNum(seq_num);
if (version >= SnapshotVersion::V4 && version <= SnapshotVersion::V5)
{
@ -256,7 +256,7 @@ void KeeperStorageSnapshot<Storage>::serialize(const KeeperStorageSnapshot<Stora
/// Benign race condition possible while taking snapshot: NuRaft decide to create snapshot at some log id
/// and only after some time we lock storage and enable snapshot mode. So snapshot_container_size can be
/// slightly bigger than required.
if (node.mzxid > snapshot.zxid)
if (node.stats.mzxid > snapshot.zxid)
break;
writeBinary(path, out);
writeNode(node, snapshot.version, out);
@ -306,7 +306,7 @@ void KeeperStorageSnapshot<Storage>::serialize(const KeeperStorageSnapshot<Stora
}
template<typename Storage>
void KeeperStorageSnapshot<Storage>::deserialize(SnapshotDeserializationResult<Storage> & deserialization_result, ReadBuffer & in, KeeperContextPtr keeper_context)
void KeeperStorageSnapshot<Storage>::deserialize(SnapshotDeserializationResult<Storage> & deserialization_result, ReadBuffer & in, KeeperContextPtr keeper_context) TSA_NO_THREAD_SAFETY_ANALYSIS
{
uint8_t version;
readBinary(version, in);
@ -435,13 +435,13 @@ void KeeperStorageSnapshot<Storage>::deserialize(SnapshotDeserializationResult<S
}
}
auto ephemeral_owner = node.ephemeralOwner();
auto ephemeral_owner = node.stats.ephemeralOwner();
if constexpr (!use_rocksdb)
if (!node.isEphemeral() && node.numChildren() > 0)
node.getChildren().reserve(node.numChildren());
if (!node.stats.isEphemeral() && node.stats.numChildren() > 0)
node.getChildren().reserve(node.stats.numChildren());
if (ephemeral_owner != 0)
storage.ephemerals[node.ephemeralOwner()].insert(std::string{path});
storage.committed_ephemerals[node.stats.ephemeralOwner()].insert(std::string{path});
if (recalculate_digest)
storage.nodes_digest += node.getDigest(path);
@ -467,16 +467,25 @@ void KeeperStorageSnapshot<Storage>::deserialize(SnapshotDeserializationResult<S
{
if (itr.key != "/")
{
if (itr.value.numChildren() != static_cast<int32_t>(itr.value.getChildren().size()))
if (itr.value.stats.numChildren() != static_cast<int32_t>(itr.value.getChildren().size()))
{
#ifdef NDEBUG
/// TODO (alesapin) remove this, it should be always CORRUPTED_DATA.
LOG_ERROR(getLogger("KeeperSnapshotManager"), "Children counter in stat.numChildren {}"
" is different from actual children size {} for node {}", itr.value.numChildren(), itr.value.getChildren().size(), itr.key);
LOG_ERROR(
getLogger("KeeperSnapshotManager"),
"Children counter in stat.numChildren {}"
" is different from actual children size {} for node {}",
itr.value.stats.numChildren(),
itr.value.getChildren().size(),
itr.key);
#else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Children counter in stat.numChildren {}"
" is different from actual children size {} for node {}",
itr.value.numChildren(), itr.value.getChildren().size(), itr.key);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Children counter in stat.numChildren {}"
" is different from actual children size {} for node {}",
itr.value.stats.numChildren(),
itr.value.getChildren().size(),
itr.key);
#endif
}
}
@ -511,7 +520,7 @@ void KeeperStorageSnapshot<Storage>::deserialize(SnapshotDeserializationResult<S
session_auth_counter++;
}
if (!ids.empty())
storage.session_and_auth[active_session_id] = ids;
storage.committed_session_and_auth[active_session_id] = ids;
}
current_session_size++;
}
@ -527,6 +536,8 @@ void KeeperStorageSnapshot<Storage>::deserialize(SnapshotDeserializationResult<S
buffer->pos(0);
deserialization_result.cluster_config = ClusterConfig::deserialize(*buffer);
}
storage.updateStats();
}
template<typename Storage>
@ -544,7 +555,7 @@ KeeperStorageSnapshot<Storage>::KeeperStorageSnapshot(Storage * storage_, uint64
begin = storage->getSnapshotIteratorBegin();
session_and_timeout = storage->getActiveSessions();
acl_map = storage->acl_map.getMapping();
session_and_auth = storage->session_and_auth;
session_and_auth = storage->committed_session_and_auth;
}
template<typename Storage>
@ -563,7 +574,7 @@ KeeperStorageSnapshot<Storage>::KeeperStorageSnapshot(
begin = storage->getSnapshotIteratorBegin();
session_and_timeout = storage->getActiveSessions();
acl_map = storage->acl_map.getMapping();
session_and_auth = storage->session_and_auth;
session_and_auth = storage->committed_session_and_auth;
}
template<typename Storage>

View File

@ -36,6 +36,11 @@ namespace ProfileEvents
extern const Event KeeperStorageLockWaitMicroseconds;
}
namespace CurrentMetrics
{
extern const Metric KeeperAliveConnections;
}
namespace DB
{
@ -56,6 +61,7 @@ IKeeperStateMachine::IKeeperStateMachine(
, snapshots_queue(snapshots_queue_)
, min_request_size_to_cache(keeper_context_->getCoordinationSettings()->min_request_size_for_cache)
, log(getLogger("KeeperStateMachine"))
, read_pool(CurrentMetrics::KeeperAliveConnections, CurrentMetrics::KeeperAliveConnections, CurrentMetrics::KeeperAliveConnections, 100, 10000, 10000)
, superdigest(superdigest_)
, keeper_context(keeper_context_)
, snapshot_manager_s3(snapshot_manager_s3_)
@ -175,18 +181,20 @@ void assertDigest(
}
}
struct TSA_SCOPED_LOCKABLE LockGuardWithStats final
template <bool shared = false>
struct LockGuardWithStats final
{
std::unique_lock<std::mutex> lock;
explicit LockGuardWithStats(std::mutex & mutex) TSA_ACQUIRE(mutex)
using LockType = std::conditional_t<shared, std::shared_lock<SharedMutex>, std::unique_lock<SharedMutex>>;
LockType lock;
explicit LockGuardWithStats(SharedMutex & mutex)
{
Stopwatch watch;
std::unique_lock l(mutex);
LockType l(mutex);
ProfileEvents::increment(ProfileEvents::KeeperStorageLockWaitMicroseconds, watch.elapsedMicroseconds());
lock = std::move(l);
}
~LockGuardWithStats() TSA_RELEASE() = default;
~LockGuardWithStats() = default;
};
}
@ -312,13 +320,12 @@ bool KeeperStateMachine<Storage>::preprocess(const KeeperStorageBase::RequestFor
if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig)
return true;
LockGuardWithStats lock(storage_and_responses_lock);
if (storage->isFinalized())
return false;
try
{
LockGuardWithStats<true> lock(storage_mutex);
storage->preprocessRequest(
request_for_session.request,
request_for_session.session_id,
@ -335,7 +342,12 @@ bool KeeperStateMachine<Storage>::preprocess(const KeeperStorageBase::RequestFor
}
if (keeper_context->digestEnabled() && request_for_session.digest)
assertDigest(*request_for_session.digest, storage->getNodesDigest(false), *request_for_session.request, request_for_session.log_idx, false);
assertDigest(
*request_for_session.digest,
storage->getNodesDigest(false, /*lock_transaction_mutex=*/true),
*request_for_session.request,
request_for_session.log_idx,
false);
return true;
}
@ -343,7 +355,7 @@ bool KeeperStateMachine<Storage>::preprocess(const KeeperStorageBase::RequestFor
template<typename Storage>
void KeeperStateMachine<Storage>::reconfigure(const KeeperStorageBase::RequestForSession& request_for_session)
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
KeeperStorageBase::ResponseForSession response = processReconfiguration(request_for_session);
if (!responses_queue.push(response))
{
@ -461,7 +473,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine<Storage>::commit(const uint64_t l
response_for_session.response = response;
response_for_session.request = request_for_session->request;
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
response->session_id = session_id;
@ -472,24 +484,31 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine<Storage>::commit(const uint64_t l
if (op_num == Coordination::OpNum::Close)
{
std::lock_guard lock(request_cache_mutex);
std::lock_guard cache_lock(request_cache_mutex);
parsed_request_cache.erase(request_for_session->session_id);
}
LockGuardWithStats lock(storage_and_responses_lock);
KeeperStorageBase::ResponsesForSessions responses_for_sessions
= storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid);
for (auto & response_for_session : responses_for_sessions)
{
if (response_for_session.response->xid != Coordination::WATCH_XID)
response_for_session.request = request_for_session->request;
LockGuardWithStats<true> lock(storage_mutex);
std::lock_guard response_lock(process_and_responses_lock);
KeeperStorageBase::ResponsesForSessions responses_for_sessions
= storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid);
for (auto & response_for_session : responses_for_sessions)
{
if (response_for_session.response->xid != Coordination::WATCH_XID)
response_for_session.request = request_for_session->request;
try_push(response_for_session);
try_push(response_for_session);
}
}
if (keeper_context->digestEnabled() && request_for_session->digest)
assertDigest(*request_for_session->digest, storage->getNodesDigest(true), *request_for_session->request, request_for_session->log_idx, true);
assertDigest(
*request_for_session->digest,
storage->getNodesDigest(true, /*lock_transaction_mutex=*/true),
*request_for_session->request,
request_for_session->log_idx,
true);
}
ProfileEvents::increment(ProfileEvents::KeeperCommits);
@ -534,8 +553,6 @@ bool KeeperStateMachine<Storage>::apply_snapshot(nuraft::snapshot & s)
}
{ /// deserialize and apply snapshot to storage
LockGuardWithStats lock(storage_and_responses_lock);
SnapshotDeserializationResult<Storage> snapshot_deserialization_result;
if (latest_snapshot_ptr)
snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr);
@ -543,6 +560,7 @@ bool KeeperStateMachine<Storage>::apply_snapshot(nuraft::snapshot & s)
snapshot_deserialization_result
= snapshot_manager.deserializeSnapshotFromBuffer(snapshot_manager.deserializeSnapshotBufferFromDisk(s.get_last_log_idx()));
LockGuardWithStats storage_lock(storage_mutex);
/// maybe some logs were preprocessed with log idx larger than the snapshot idx
/// we have to apply them to the new storage
storage->applyUncommittedState(*snapshot_deserialization_result.storage, snapshot_deserialization_result.snapshot_meta->get_last_log_idx());
@ -587,16 +605,7 @@ void KeeperStateMachine<Storage>::rollbackRequest(const KeeperStorageBase::Reque
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return;
LockGuardWithStats lock(storage_and_responses_lock);
storage->rollbackRequest(request_for_session.zxid, allow_missing);
}
template<typename Storage>
void KeeperStateMachine<Storage>::rollbackRequestNoLock(const KeeperStorageBase::RequestForSession & request_for_session, bool allow_missing)
{
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return;
LockGuardWithStats lock(storage_mutex);
storage->rollbackRequest(request_for_session.zxid, allow_missing);
}
@ -616,7 +625,7 @@ void KeeperStateMachine<Storage>::create_snapshot(nuraft::snapshot & s, nuraft::
auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf);
CreateSnapshotTask snapshot_task;
{ /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking.
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot<Storage>>(storage.get(), snapshot_meta_copy, getClusterConfig());
}
@ -681,7 +690,7 @@ void KeeperStateMachine<Storage>::create_snapshot(nuraft::snapshot & s, nuraft::
}
{
/// Destroy snapshot with lock
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
LOG_TRACE(log, "Clearing garbage after snapshot");
/// Turn off "snapshot mode" and clear outdate part of storage state
storage->clearGarbageAfterSnapshot();
@ -824,10 +833,10 @@ template<typename Storage>
void KeeperStateMachine<Storage>::processReadRequest(const KeeperStorageBase::RequestForSession & request_for_session)
{
/// Pure local request, just process it with storage
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats<true> storage_lock(storage_mutex);
std::lock_guard response_lock(process_and_responses_lock);
auto responses = storage->processRequest(
request_for_session.request, request_for_session.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/);
for (auto & response_for_session : responses)
{
if (response_for_session.response->xid != Coordination::WATCH_XID)
@ -840,112 +849,116 @@ void KeeperStateMachine<Storage>::processReadRequest(const KeeperStorageBase::Re
template<typename Storage>
void KeeperStateMachine<Storage>::shutdownStorage()
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
storage->finalize();
}
template<typename Storage>
std::vector<int64_t> KeeperStateMachine<Storage>::getDeadSessions()
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
return storage->getDeadSessions();
}
template<typename Storage>
int64_t KeeperStateMachine<Storage>::getNextZxid() const
{
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getNextZXID();
}
template<typename Storage>
KeeperStorageBase::Digest KeeperStateMachine<Storage>::getNodesDigest() const
{
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getNodesDigest(false);
LockGuardWithStats lock(storage_mutex);
return storage->getNodesDigest(false, /*lock_transaction_mutex=*/true);
}
template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getLastProcessedZxid() const
{
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getZXID();
}
template<typename Storage>
const KeeperStorageBase::Stats & KeeperStateMachine<Storage>::getStorageStats() const TSA_NO_THREAD_SAFETY_ANALYSIS
{
return storage->getStorageStats();
}
template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getNodesCount() const
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
return storage->getNodesCount();
}
template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getTotalWatchesCount() const
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
return storage->getTotalWatchesCount();
}
template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getWatchedPathsCount() const
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
return storage->getWatchedPathsCount();
}
template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getSessionsWithWatchesCount() const
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
return storage->getSessionsWithWatchesCount();
}
template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getTotalEphemeralNodesCount() const
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
return storage->getTotalEphemeralNodesCount();
}
template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getSessionWithEphemeralNodesCount() const
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
return storage->getSessionWithEphemeralNodesCount();
}
template<typename Storage>
void KeeperStateMachine<Storage>::dumpWatches(WriteBufferFromOwnString & buf) const
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
storage->dumpWatches(buf);
}
template<typename Storage>
void KeeperStateMachine<Storage>::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
storage->dumpWatchesByPath(buf);
}
template<typename Storage>
void KeeperStateMachine<Storage>::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
storage->dumpSessionsAndEphemerals(buf);
}
template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getApproximateDataSize() const
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
return storage->getApproximateDataSize();
}
template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getKeyArenaSize() const
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
return storage->getArenaDataSize();
}
@ -988,7 +1001,7 @@ ClusterConfigPtr IKeeperStateMachine::getClusterConfig() const
template<typename Storage>
void KeeperStateMachine<Storage>::recalculateStorageStats()
{
LockGuardWithStats lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_mutex);
LOG_INFO(log, "Recalculating storage stats");
storage->recalculateStats();
LOG_INFO(log, "Done recalculating storage stats");

View File

@ -85,6 +85,8 @@ public:
/// Introspection functions for 4lw commands
virtual uint64_t getLastProcessedZxid() const = 0;
virtual const KeeperStorageBase::Stats & getStorageStats() const = 0;
virtual uint64_t getNodesCount() const = 0;
virtual uint64_t getTotalWatchesCount() const = 0;
virtual uint64_t getWatchedPathsCount() const = 0;
@ -124,12 +126,16 @@ protected:
/// Mutex for snapshots
mutable std::mutex snapshots_lock;
/// Lock for storage and responses_queue. It's important to process requests
/// Lock for the storage
/// Storage works in thread-safe way ONLY for preprocessing/processing
/// In any other case, unique storage lock needs to be taken
mutable SharedMutex storage_mutex;
/// Lock for processing and responses_queue. It's important to process requests
/// and push them to the responses queue while holding this lock. Otherwise
/// we can get strange cases when, for example client send read request with
/// watch and after that receive watch response and only receive response
/// for request.
mutable std::mutex storage_and_responses_lock;
mutable std::mutex process_and_responses_lock;
std::unordered_map<int64_t, std::unordered_map<Coordination::XID, std::shared_ptr<KeeperStorageBase::RequestForSession>>> parsed_request_cache;
uint64_t min_request_size_to_cache{0};
@ -146,6 +152,7 @@ protected:
mutable std::mutex cluster_config_lock;
ClusterConfigPtr cluster_config;
ThreadPool read_pool;
/// Special part of ACL system -- superdigest specified in server config.
const std::string superdigest;
@ -153,10 +160,8 @@ protected:
KeeperSnapshotManagerS3 * snapshot_manager_s3;
virtual KeeperStorageBase::ResponseForSession processReconfiguration(
const KeeperStorageBase::RequestForSession& request_for_session)
TSA_REQUIRES(storage_and_responses_lock) = 0;
virtual KeeperStorageBase::ResponseForSession processReconfiguration(const KeeperStorageBase::RequestForSession & request_for_session)
= 0;
};
/// ClickHouse Keeper state machine. Wrapper for KeeperStorage.
@ -189,10 +194,6 @@ public:
// (can happen in case of exception during preprocessing)
void rollbackRequest(const KeeperStorageBase::RequestForSession & request_for_session, bool allow_missing) override;
void rollbackRequestNoLock(
const KeeperStorageBase::RequestForSession & request_for_session,
bool allow_missing) TSA_NO_THREAD_SAFETY_ANALYSIS;
/// Apply preliminarily saved (save_logical_snp_obj) snapshot to our state.
bool apply_snapshot(nuraft::snapshot & s) override;
@ -205,7 +206,7 @@ public:
// This should be used only for tests or keeper-data-dumper because it violates
// TSA -- we can't acquire the lock outside of this class or return a storage under lock
// in a reasonable way.
Storage & getStorageUnsafe() TSA_NO_THREAD_SAFETY_ANALYSIS
Storage & getStorageUnsafe()
{
return *storage;
}
@ -224,6 +225,8 @@ public:
/// Introspection functions for 4lw commands
uint64_t getLastProcessedZxid() const override;
const KeeperStorageBase::Stats & getStorageStats() const override;
uint64_t getNodesCount() const override;
uint64_t getTotalWatchesCount() const override;
uint64_t getWatchedPathsCount() const override;
@ -245,12 +248,12 @@ public:
private:
/// Main state machine logic
std::unique_ptr<Storage> storage; //TSA_PT_GUARDED_BY(storage_and_responses_lock);
std::unique_ptr<Storage> storage;
/// Save/Load and Serialize/Deserialize logic for snapshots.
KeeperSnapshotManager<Storage> snapshot_manager;
KeeperStorageBase::ResponseForSession processReconfiguration(const KeeperStorageBase::RequestForSession & request_for_session)
TSA_REQUIRES(storage_and_responses_lock) override;
KeeperStorageBase::ResponseForSession processReconfiguration(const KeeperStorageBase::RequestForSession & request_for_session) override;
};
}

File diff suppressed because it is too large Load Diff

View File

@ -1,10 +1,16 @@
#pragma once
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <Coordination/ACLMap.h>
#include <Coordination/SessionExpiryQueue.h>
#include <Coordination/SnapshotableHashTable.h>
#include "Common/StringHashForHeterogeneousLookup.h"
#include <Common/SharedMutex.h>
#include <Common/Concepts.h>
#include <base/defines.h>
#include <absl/container/flat_hash_set.h>
@ -23,14 +29,11 @@ using ResponseCallback = std::function<void(const Coordination::ZooKeeperRespons
using ChildrenSet = absl::flat_hash_set<StringRef, StringRefHash>;
using SessionAndTimeout = std::unordered_map<int64_t, int64_t>;
/// KeeperRocksNodeInfo is used in RocksDB keeper.
/// It is serialized directly as POD to RocksDB.
struct KeeperRocksNodeInfo
struct NodeStats
{
int64_t czxid{0};
int64_t mzxid{0};
int64_t pzxid{0};
uint64_t acl_id = 0; /// 0 -- no ACL by default
int64_t mtime{0};
@ -38,225 +41,9 @@ struct KeeperRocksNodeInfo
int32_t cversion{0};
int32_t aversion{0};
int32_t seq_num = 0;
mutable UInt64 digest = 0; /// we cached digest for this node.
/// as ctime can't be negative because it stores the timestamp when the
/// node was created, we can use the MSB for a bool
struct
{
bool is_ephemeral : 1;
int64_t ctime : 63;
} is_ephemeral_and_ctime{false, 0};
/// ephemeral notes cannot have children so a node can set either
/// ephemeral_owner OR seq_num + num_children
union
{
int64_t ephemeral_owner;
struct
{
int32_t seq_num;
int32_t num_children;
} children_info;
} ephemeral_or_children_data{0};
bool isEphemeral() const
{
return is_ephemeral_and_ctime.is_ephemeral;
}
int64_t ephemeralOwner() const
{
if (isEphemeral())
return ephemeral_or_children_data.ephemeral_owner;
return 0;
}
void setEphemeralOwner(int64_t ephemeral_owner)
{
is_ephemeral_and_ctime.is_ephemeral = ephemeral_owner != 0;
ephemeral_or_children_data.ephemeral_owner = ephemeral_owner;
}
int32_t numChildren() const
{
if (isEphemeral())
return 0;
return ephemeral_or_children_data.children_info.num_children;
}
void setNumChildren(int32_t num_children)
{
ephemeral_or_children_data.children_info.num_children = num_children;
}
/// dummy interface for test
void addChild(StringRef) {}
auto getChildren() const
{
return std::vector<int>(numChildren());
}
void increaseNumChildren()
{
chassert(!isEphemeral());
++ephemeral_or_children_data.children_info.num_children;
}
void decreaseNumChildren()
{
chassert(!isEphemeral());
--ephemeral_or_children_data.children_info.num_children;
}
int32_t seqNum() const
{
if (isEphemeral())
return 0;
return ephemeral_or_children_data.children_info.seq_num;
}
void setSeqNum(int32_t seq_num_)
{
ephemeral_or_children_data.children_info.seq_num = seq_num_;
}
void increaseSeqNum()
{
chassert(!isEphemeral());
++ephemeral_or_children_data.children_info.seq_num;
}
int64_t ctime() const
{
return is_ephemeral_and_ctime.ctime;
}
void setCtime(uint64_t ctime)
{
is_ephemeral_and_ctime.ctime = ctime;
}
uint32_t data_size{0};
void copyStats(const Coordination::Stat & stat);
};
/// KeeperRocksNode is the memory structure used by RocksDB
struct KeeperRocksNode : public KeeperRocksNodeInfo
{
#if USE_ROCKSDB
friend struct RocksDBContainer<KeeperRocksNode>;
#endif
using Meta = KeeperRocksNodeInfo;
uint64_t size_bytes = 0; // only for compatible, should be deprecated
uint64_t sizeInBytes() const { return data_size + sizeof(KeeperRocksNodeInfo); }
void setData(String new_data)
{
data_size = static_cast<uint32_t>(new_data.size());
if (data_size != 0)
{
data = std::unique_ptr<char[]>(new char[new_data.size()]);
memcpy(data.get(), new_data.data(), data_size);
}
}
void shallowCopy(const KeeperRocksNode & other)
{
czxid = other.czxid;
mzxid = other.mzxid;
pzxid = other.pzxid;
acl_id = other.acl_id; /// 0 -- no ACL by default
mtime = other.mtime;
is_ephemeral_and_ctime = other.is_ephemeral_and_ctime;
ephemeral_or_children_data = other.ephemeral_or_children_data;
data_size = other.data_size;
if (data_size != 0)
{
data = std::unique_ptr<char[]>(new char[data_size]);
memcpy(data.get(), other.data.get(), data_size);
}
version = other.version;
cversion = other.cversion;
aversion = other.aversion;
/// cached_digest = other.cached_digest;
}
void invalidateDigestCache() const;
UInt64 getDigest(std::string_view path) const;
String getEncodedString();
void decodeFromString(const String & buffer_str);
void recalculateSize() {}
std::string_view getData() const noexcept { return {data.get(), data_size}; }
void setResponseStat(Coordination::Stat & response_stat) const
{
response_stat.czxid = czxid;
response_stat.mzxid = mzxid;
response_stat.ctime = ctime();
response_stat.mtime = mtime;
response_stat.version = version;
response_stat.cversion = cversion;
response_stat.aversion = aversion;
response_stat.ephemeralOwner = ephemeralOwner();
response_stat.dataLength = static_cast<int32_t>(data_size);
response_stat.numChildren = numChildren();
response_stat.pzxid = pzxid;
}
void reset()
{
serialized = false;
}
bool empty() const
{
return data_size == 0 && mzxid == 0;
}
std::unique_ptr<char[]> data{nullptr};
uint32_t data_size{0};
private:
bool serialized = false;
};
/// KeeperMemNode should have as minimal size as possible to reduce memory footprint
/// of stored nodes
/// New fields should be added to the struct only if it's really necessary
struct KeeperMemNode
{
int64_t czxid{0};
int64_t mzxid{0};
int64_t pzxid{0};
uint64_t acl_id = 0; /// 0 -- no ACL by default
int64_t mtime{0};
std::unique_ptr<char[]> data{nullptr};
uint32_t data_size{0};
int32_t version{0};
int32_t cversion{0};
int32_t aversion{0};
mutable uint64_t cached_digest = 0;
KeeperMemNode() = default;
KeeperMemNode & operator=(const KeeperMemNode & other);
KeeperMemNode(const KeeperMemNode & other);
KeeperMemNode & operator=(KeeperMemNode && other) noexcept;
KeeperMemNode(KeeperMemNode && other) noexcept;
bool empty() const;
bool isEphemeral() const
{
@ -287,6 +74,7 @@ struct KeeperMemNode
void setNumChildren(int32_t num_children)
{
is_ephemeral_and_ctime.is_ephemeral = false;
ephemeral_or_children_data.children_info.num_children = num_children;
}
@ -331,34 +119,6 @@ struct KeeperMemNode
is_ephemeral_and_ctime.ctime = ctime;
}
void copyStats(const Coordination::Stat & stat);
void setResponseStat(Coordination::Stat & response_stat) const;
/// Object memory size
uint64_t sizeInBytes() const;
void setData(const String & new_data);
std::string_view getData() const noexcept { return {data.get(), data_size}; }
void addChild(StringRef child_path);
void removeChild(StringRef child_path);
const auto & getChildren() const noexcept { return children; }
auto & getChildren() { return children; }
// Invalidate the calculated digest so it's recalculated again on the next
// getDigest call
void invalidateDigestCache() const;
// get the calculated digest of the node
UInt64 getDigest(std::string_view path) const;
// copy only necessary information for preprocessing and digest calculation
// (e.g. we don't need to copy list of children)
void shallowCopy(const KeeperMemNode & other);
private:
/// as ctime can't be negative because it stores the timestamp when the
/// node was created, we can use the MSB for a bool
@ -379,7 +139,132 @@ private:
int32_t num_children;
} children_info;
} ephemeral_or_children_data{0};
};
/// KeeperRocksNodeInfo is used in RocksDB keeper.
/// It is serialized directly as POD to RocksDB.
struct KeeperRocksNodeInfo
{
NodeStats stats;
uint64_t acl_id = 0; /// 0 -- no ACL by default
/// dummy interface for test
void addChild(StringRef) {}
auto getChildren() const
{
return std::vector<int>(stats.numChildren());
}
void copyStats(const Coordination::Stat & stat);
};
/// KeeperRocksNode is the memory structure used by RocksDB
struct KeeperRocksNode : public KeeperRocksNodeInfo
{
#if USE_ROCKSDB
friend struct RocksDBContainer<KeeperRocksNode>;
#endif
using Meta = KeeperRocksNodeInfo;
uint64_t size_bytes = 0; // only for compatible, should be deprecated
uint64_t sizeInBytes() const { return stats.data_size + sizeof(KeeperRocksNodeInfo); }
void setData(String new_data)
{
stats.data_size = static_cast<uint32_t>(new_data.size());
if (stats.data_size != 0)
{
data = std::unique_ptr<char[]>(new char[new_data.size()]);
memcpy(data.get(), new_data.data(), stats.data_size);
}
}
void shallowCopy(const KeeperRocksNode & other)
{
stats = other.stats;
acl_id = other.acl_id;
if (stats.data_size != 0)
{
data = std::unique_ptr<char[]>(new char[stats.data_size]);
memcpy(data.get(), other.data.get(), stats.data_size);
}
/// cached_digest = other.cached_digest;
}
void invalidateDigestCache() const;
UInt64 getDigest(std::string_view path) const;
String getEncodedString();
void decodeFromString(const String & buffer_str);
void recalculateSize() {}
std::string_view getData() const noexcept { return {data.get(), stats.data_size}; }
void setResponseStat(Coordination::Stat & response_stat) const;
void reset()
{
serialized = false;
}
bool empty() const
{
return stats.data_size == 0 && stats.mzxid == 0;
}
std::unique_ptr<char[]> data{nullptr};
mutable UInt64 cached_digest = 0; /// we cached digest for this node.
private:
bool serialized = false;
};
/// KeeperMemNode should have as minimal size as possible to reduce memory footprint
/// of stored nodes
/// New fields should be added to the struct only if it's really necessary
struct KeeperMemNode
{
NodeStats stats;
std::unique_ptr<char[]> data{nullptr};
mutable uint64_t cached_digest = 0;
uint64_t acl_id = 0; /// 0 -- no ACL by default
KeeperMemNode() = default;
KeeperMemNode & operator=(const KeeperMemNode & other);
KeeperMemNode(const KeeperMemNode & other);
KeeperMemNode & operator=(KeeperMemNode && other) noexcept;
KeeperMemNode(KeeperMemNode && other) noexcept;
bool empty() const;
void copyStats(const Coordination::Stat & stat);
void setResponseStat(Coordination::Stat & response_stat) const;
/// Object memory size
uint64_t sizeInBytes() const;
void setData(const String & new_data);
std::string_view getData() const noexcept { return {data.get(), stats.data_size}; }
void addChild(StringRef child_path);
void removeChild(StringRef child_path);
const auto & getChildren() const noexcept { return children; }
auto & getChildren() { return children; }
// Invalidate the calculated digest so it's recalculated again on the next
// getDigest call
void invalidateDigestCache() const;
// get the calculated digest of the node
UInt64 getDigest(std::string_view path) const;
// copy only necessary information for preprocessing and digest calculation
// (e.g. we don't need to copy list of children)
void shallowCopy(const KeeperMemNode & other);
private:
ChildrenSet children{};
};
@ -430,18 +315,187 @@ public:
};
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<std::string>>;
struct WatchInfo
{
std::string_view path;
bool is_list_watch;
bool operator==(const WatchInfo &) const = default;
};
struct WatchInfoHash
{
auto operator()(WatchInfo info) const
{
SipHash hash;
hash.update(info.path);
hash.update(info.is_list_watch);
return hash.get64();
}
};
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<WatchInfo, WatchInfoHash>>;
using SessionIDs = std::unordered_set<int64_t>;
/// Just vector of SHA1 from user:password
using AuthIDs = std::vector<AuthID>;
using SessionAndAuth = std::unordered_map<int64_t, AuthIDs>;
using Watches = std::unordered_map<String /* path, relative of root_path */, SessionIDs>;
using Watches = std::unordered_map<
String /* path, relative of root_path */,
SessionIDs,
StringHashForHeterogeneousLookup,
StringHashForHeterogeneousLookup::transparent_key_equal>;
// Applying ZooKeeper request to storage consists of two steps:
// - preprocessing which, instead of applying the changes directly to storage,
// generates deltas with those changes, denoted with the request ZXID
// - processing which applies deltas with the correct ZXID to the storage
//
// Delta objects allow us two things:
// - fetch the latest, uncommitted state of an object by getting the committed
// state of that same object from the storage and applying the deltas
// in the same order as they are defined
// - quickly commit the changes to the storage
struct CreateNodeDelta
{
Coordination::Stat stat;
Coordination::ACLs acls;
String data;
};
struct RemoveNodeDelta
{
int32_t version{-1};
NodeStats stat;
Coordination::ACLs acls;
String data;
};
struct UpdateNodeStatDelta
{
template <is_any_of<KeeperMemNode, KeeperRocksNode> Node>
explicit UpdateNodeStatDelta(const Node & node)
: old_stats(node.stats)
, new_stats(node.stats)
{}
NodeStats old_stats;
NodeStats new_stats;
int32_t version{-1};
};
struct UpdateNodeDataDelta
{
std::string old_data;
std::string new_data;
int32_t version{-1};
};
struct SetACLDelta
{
Coordination::ACLs old_acls;
Coordination::ACLs new_acls;
int32_t version{-1};
};
struct ErrorDelta
{
Coordination::Error error;
};
struct FailedMultiDelta
{
std::vector<Coordination::Error> error_codes;
Coordination::Error global_error{Coordination::Error::ZOK};
};
// Denotes end of a subrequest in multi request
struct SubDeltaEnd
{
};
struct AddAuthDelta
{
int64_t session_id;
std::shared_ptr<AuthID> auth_id;
};
struct CloseSessionDelta
{
int64_t session_id;
};
using Operation = std::variant<
CreateNodeDelta,
RemoveNodeDelta,
UpdateNodeStatDelta,
UpdateNodeDataDelta,
SetACLDelta,
AddAuthDelta,
ErrorDelta,
SubDeltaEnd,
FailedMultiDelta,
CloseSessionDelta>;
struct Delta
{
Delta(String path_, int64_t zxid_, Operation operation_) : path(std::move(path_)), zxid(zxid_), operation(std::move(operation_)) { }
Delta(int64_t zxid_, Coordination::Error error) : Delta("", zxid_, ErrorDelta{error}) { }
Delta(int64_t zxid_, Operation subdelta) : Delta("", zxid_, subdelta) { }
String path;
int64_t zxid;
Operation operation;
};
using DeltaIterator = std::list<KeeperStorageBase::Delta>::const_iterator;
struct DeltaRange
{
DeltaIterator begin_it;
DeltaIterator end_it;
auto begin() const
{
return begin_it;
}
auto end() const
{
return end_it;
}
bool empty() const
{
return begin_it == end_it;
}
const auto & front() const
{
return *begin_it;
}
};
struct Stats
{
std::atomic<uint64_t> nodes_count = 0;
std::atomic<uint64_t> approximate_data_size = 0;
std::atomic<uint64_t> total_watches_count = 0;
std::atomic<uint64_t> watched_paths_count = 0;
std::atomic<uint64_t> sessions_with_watches_count = 0;
std::atomic<uint64_t> session_with_ephemeral_nodes_count = 0;
std::atomic<uint64_t> total_emphemeral_nodes_count = 0;
std::atomic<int64_t> last_zxid = 0;
};
Stats stats;
static bool checkDigest(const Digest & first, const Digest & second);
};
/// Keeper state machine almost equal to the ZooKeeper's state machine.
/// Implements all logic of operations, data changes, sessions allocation.
/// In-memory and not thread safe.
@ -472,143 +526,49 @@ public:
int64_t session_id_counter{1};
SessionAndAuth session_and_auth;
mutable SharedMutex auth_mutex;
SessionAndAuth committed_session_and_auth;
mutable SharedMutex storage_mutex;
/// Main hashtable with nodes. Contain all information about data.
/// All other structures expect session_and_timeout can be restored from
/// container.
Container container;
// Applying ZooKeeper request to storage consists of two steps:
// - preprocessing which, instead of applying the changes directly to storage,
// generates deltas with those changes, denoted with the request ZXID
// - processing which applies deltas with the correct ZXID to the storage
//
// Delta objects allow us two things:
// - fetch the latest, uncommitted state of an object by getting the committed
// state of that same object from the storage and applying the deltas
// in the same order as they are defined
// - quickly commit the changes to the storage
struct CreateNodeDelta
{
Coordination::Stat stat;
Coordination::ACLs acls;
String data;
};
struct RemoveNodeDelta
{
int32_t version{-1};
int64_t ephemeral_owner{0};
};
struct UpdateNodeDelta
{
std::function<void(Node &)> update_fn;
int32_t version{-1};
};
struct SetACLDelta
{
Coordination::ACLs acls;
int32_t version{-1};
};
struct ErrorDelta
{
Coordination::Error error;
};
struct FailedMultiDelta
{
std::vector<Coordination::Error> error_codes;
Coordination::Error global_error{Coordination::Error::ZOK};
};
// Denotes end of a subrequest in multi request
struct SubDeltaEnd
{
};
struct AddAuthDelta
{
int64_t session_id;
AuthID auth_id;
};
struct CloseSessionDelta
{
int64_t session_id;
};
using Operation = std::
variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, AddAuthDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta, CloseSessionDelta>;
struct Delta
{
Delta(String path_, int64_t zxid_, Operation operation_) : path(std::move(path_)), zxid(zxid_), operation(std::move(operation_)) { }
Delta(int64_t zxid_, Coordination::Error error) : Delta("", zxid_, ErrorDelta{error}) { }
Delta(int64_t zxid_, Operation subdelta) : Delta("", zxid_, subdelta) { }
String path;
int64_t zxid;
Operation operation;
};
struct UncommittedState
{
explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { }
void addDelta(Delta new_delta);
void addDeltas(std::vector<Delta> new_deltas);
void commit(int64_t commit_zxid);
void addDeltas(std::list<Delta> new_deltas);
void cleanup(int64_t commit_zxid);
void rollback(int64_t rollback_zxid);
void rollback(std::list<Delta> rollback_deltas);
std::shared_ptr<Node> getNode(StringRef path) const;
std::shared_ptr<Node> getNode(StringRef path, bool should_lock_storage = true) const;
const Node * getActualNodeView(StringRef path, const Node & storage_node) const;
Coordination::ACLs getACLs(StringRef path) const;
void applyDeltas(const std::list<Delta> & new_deltas);
void applyDelta(const Delta & delta);
void rollbackDelta(const Delta & delta);
bool hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate) const;
void forEachAuthInSession(int64_t session_id, std::function<void(const AuthID &)> func) const;
std::shared_ptr<Node> tryGetNodeFromStorage(StringRef path) const;
std::shared_ptr<Node> tryGetNodeFromStorage(StringRef path, bool should_lock_storage = true) const;
std::unordered_map<int64_t, std::list<const AuthID *>> session_and_auth;
std::unordered_set<int64_t> closed_sessions;
using ZxidToNodes = std::map<int64_t, std::unordered_set<std::string_view>>;
struct UncommittedNode
{
std::shared_ptr<Node> node{nullptr};
Coordination::ACLs acls{};
int64_t zxid{0};
};
std::optional<Coordination::ACLs> acls{};
std::unordered_set<uint64_t> applied_zxids{};
struct Hash
{
auto operator()(const std::string_view view) const
{
SipHash hash;
hash.update(view);
return hash.get64();
}
using is_transparent = void; // required to make find() work with different type than key_type
};
struct Equal
{
auto operator()(const std::string_view a,
const std::string_view b) const
{
return a == b;
}
using is_transparent = void; // required to make find() work with different type than key_type
void materializeACL(const ACLMap & current_acl_map);
};
struct PathCmp
@ -624,10 +584,15 @@ public:
using is_transparent = void; // required to make find() work with different type than key_type
};
mutable std::map<std::string, UncommittedNode, PathCmp> nodes;
std::unordered_map<std::string, std::list<const Delta *>, Hash, Equal> deltas_for_path;
Ephemerals ephemerals;
std::list<Delta> deltas;
std::unordered_map<int64_t, std::list<std::pair<int64_t, std::shared_ptr<AuthID>>>> session_and_auth;
mutable std::map<std::string, UncommittedNode, PathCmp> nodes;
mutable ZxidToNodes zxid_to_nodes;
mutable std::mutex deltas_mutex;
std::list<Delta> deltas TSA_GUARDED_BY(deltas_mutex);
KeeperStorage<Container> & storage;
};
@ -637,7 +602,7 @@ public:
// with zxid > last_zxid
void applyUncommittedState(KeeperStorage & other, int64_t last_log_idx);
Coordination::Error commit(int64_t zxid);
Coordination::Error commit(DeltaRange deltas);
// Create node in the storage
// Returns false if it failed to create the node, true otherwise
@ -655,12 +620,11 @@ public:
bool checkACL(StringRef path, int32_t permissions, int64_t session_id, bool is_local);
void unregisterEphemeralPath(int64_t session_id, const std::string & path);
std::mutex ephemeral_mutex;
/// Mapping session_id -> set of ephemeral nodes paths
Ephemerals ephemerals;
/// Mapping session_id -> set of watched nodes paths
SessionAndWatcher sessions_and_watchers;
Ephemerals committed_ephemerals;
size_t committed_ephemeral_nodes{0};
/// Expiration queue for session, allows to get dead sessions at some point of time
SessionExpiryQueue session_expiry_queue;
/// All active sessions with timeout
@ -669,8 +633,10 @@ public:
/// ACLMap for more compact ACLs storage inside nodes.
ACLMap acl_map;
mutable std::mutex transaction_mutex;
/// Global id of all requests applied to storage
int64_t zxid{0};
int64_t zxid TSA_GUARDED_BY(transaction_mutex) = 0;
// older Keeper node (pre V5 snapshots) can create snapshots and receive logs from newer Keeper nodes
// this can lead to some inconsistencies, e.g. from snapshot it will use log_idx as zxid
@ -687,11 +653,16 @@ public:
int64_t log_idx = 0;
};
std::deque<TransactionInfo> uncommitted_transactions;
std::list<TransactionInfo> uncommitted_transactions TSA_GUARDED_BY(transaction_mutex);
uint64_t nodes_digest{0};
uint64_t nodes_digest = 0;
bool finalized{false};
std::atomic<bool> finalized{false};
/// Mapping session_id -> set of watched nodes paths
SessionAndWatcher sessions_and_watchers;
size_t total_watches_count = 0;
/// Currently active watches (node_path -> subscribed sessions)
Watches watches;
@ -700,45 +671,30 @@ public:
void clearDeadWatches(int64_t session_id);
/// Get current committed zxid
int64_t getZXID() const { return zxid; }
int64_t getZXID() const;
int64_t getNextZXID() const
{
if (uncommitted_transactions.empty())
return zxid + 1;
int64_t getNextZXID() const;
int64_t getNextZXIDLocked() const TSA_REQUIRES(transaction_mutex);
return uncommitted_transactions.back().zxid + 1;
}
Digest getNodesDigest(bool committed) const;
Digest getNodesDigest(bool committed, bool lock_transaction_mutex) const;
KeeperContextPtr keeper_context;
const String superdigest;
bool initialized{false};
std::atomic<bool> initialized{false};
KeeperStorage(int64_t tick_time_ms, const String & superdigest_, const KeeperContextPtr & keeper_context_, bool initialize_system_nodes = true);
void initializeSystemNodes();
void initializeSystemNodes() TSA_NO_THREAD_SAFETY_ANALYSIS;
/// Allocate new session id with the specified timeouts
int64_t getSessionID(int64_t session_timeout_ms)
{
auto result = session_id_counter++;
session_and_timeout.emplace(result, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms);
return result;
}
int64_t getSessionID(int64_t session_timeout_ms);
/// Add session id. Used when restoring KeeperStorage from snapshot.
void addSessionID(int64_t session_id, int64_t session_timeout_ms)
{
session_and_timeout.emplace(session_id, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(session_id, session_timeout_ms);
}
void addSessionID(int64_t session_id, int64_t session_timeout_ms) TSA_NO_THREAD_SAFETY_ANALYSIS;
UInt64 calculateNodesDigest(UInt64 current_digest, const std::vector<Delta> & new_deltas) const;
UInt64 calculateNodesDigest(UInt64 current_digest, const std::list<Delta> & new_deltas) const;
/// Process user request and return response.
/// check_acl = false only when converting data from ZooKeeper.
@ -765,42 +721,39 @@ public:
/// Set of methods for creating snapshots
/// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.
void enableSnapshotMode(size_t up_to_version)
{
container.enableSnapshotMode(up_to_version);
}
void enableSnapshotMode(size_t up_to_version);
/// Turn off snapshot mode.
void disableSnapshotMode()
{
container.disableSnapshotMode();
}
void disableSnapshotMode();
Container::const_iterator getSnapshotIteratorBegin() const { return container.begin(); }
Container::const_iterator getSnapshotIteratorBegin() const;
/// Clear outdated data from internal container.
void clearGarbageAfterSnapshot() { container.clearOutdatedNodes(); }
void clearGarbageAfterSnapshot();
/// Get all active sessions
const SessionAndTimeout & getActiveSessions() const { return session_and_timeout; }
SessionAndTimeout getActiveSessions() const;
/// Get all dead sessions
std::vector<int64_t> getDeadSessions() const { return session_expiry_queue.getExpiredSessions(); }
std::vector<int64_t> getDeadSessions() const;
void updateStats();
const Stats & getStorageStats() const;
/// Introspection functions mostly used in 4-letter commands
uint64_t getNodesCount() const { return container.size(); }
uint64_t getNodesCount() const;
uint64_t getApproximateDataSize() const { return container.getApproximateDataSize(); }
uint64_t getApproximateDataSize() const;
uint64_t getArenaDataSize() const { return container.keyArenaSize(); }
uint64_t getArenaDataSize() const;
uint64_t getTotalWatchesCount() const;
uint64_t getWatchedPathsCount() const { return watches.size() + list_watches.size(); }
uint64_t getWatchedPathsCount() const;
uint64_t getSessionsWithWatchesCount() const;
uint64_t getSessionWithEphemeralNodesCount() const { return ephemerals.size(); }
uint64_t getSessionWithEphemeralNodesCount() const;
uint64_t getTotalEphemeralNodesCount() const;
void dumpWatches(WriteBufferFromOwnString & buf) const;

View File

@ -155,11 +155,11 @@ public:
ReadBufferFromOwnString buffer(iter->value().ToStringView());
typename Node::Meta & meta = new_pair->value;
readPODBinary(meta, buffer);
readVarUInt(new_pair->value.data_size, buffer);
if (new_pair->value.data_size)
readVarUInt(new_pair->value.stats.data_size, buffer);
if (new_pair->value.stats.data_size)
{
new_pair->value.data = std::unique_ptr<char[]>(new char[new_pair->value.data_size]);
buffer.readStrict(new_pair->value.data.get(), new_pair->value.data_size);
new_pair->value.data = std::unique_ptr<char[]>(new char[new_pair->value.stats.data_size]);
buffer.readStrict(new_pair->value.data.get(), new_pair->value.stats.data_size);
}
pair = new_pair;
}
@ -211,7 +211,7 @@ public:
}
}
std::vector<std::pair<std::string, Node>> getChildren(const std::string & key_)
std::vector<std::pair<std::string, Node>> getChildren(const std::string & key_, bool read_data = false)
{
rocksdb::ReadOptions read_options;
read_options.total_order_seek = true;
@ -232,6 +232,15 @@ public:
typename Node::Meta & meta = node;
/// We do not read data here
readPODBinary(meta, buffer);
if (read_data)
{
readVarUInt(meta.stats.data_size, buffer);
if (meta.stats.data_size)
{
node.data = std::unique_ptr<char[]>(new char[meta.stats.data_size]);
buffer.readStrict(node.data.get(), meta.stats.data_size);
}
}
std::string real_key(iter->key().data() + len, iter->key().size() - len);
// std::cout << "real key: " << real_key << std::endl;
result.emplace_back(std::move(real_key), std::move(node));
@ -268,11 +277,11 @@ public:
typename Node::Meta & meta = kv->value;
readPODBinary(meta, buffer);
/// TODO: Sometimes we don't need to load data.
readVarUInt(kv->value.data_size, buffer);
if (kv->value.data_size)
readVarUInt(kv->value.stats.data_size, buffer);
if (kv->value.stats.data_size)
{
kv->value.data = std::unique_ptr<char[]>(new char[kv->value.data_size]);
buffer.readStrict(kv->value.data.get(), kv->value.data_size);
kv->value.data = std::unique_ptr<char[]>(new char[kv->value.stats.data_size]);
buffer.readStrict(kv->value.data.get(), kv->value.stats.data_size);
}
return const_iterator(kv);
}
@ -281,7 +290,7 @@ public:
{
auto it = find(key);
chassert(it != end());
return MockNode(it->value.numChildren(), it->value.getData());
return MockNode(it->value.stats.numChildren(), it->value.getData());
}
const_iterator updateValue(StringRef key_, ValueUpdater updater)

View File

@ -93,7 +93,7 @@ void deserializeACLMap(Storage & storage, ReadBuffer & in)
}
template<typename Storage>
int64_t deserializeStorageData(Storage & storage, ReadBuffer & in, LoggerPtr log)
int64_t deserializeStorageData(Storage & storage, ReadBuffer & in, LoggerPtr log) TSA_NO_THREAD_SAFETY_ANALYSIS
{
int64_t max_zxid = 0;
std::string path;
@ -108,33 +108,33 @@ int64_t deserializeStorageData(Storage & storage, ReadBuffer & in, LoggerPtr log
Coordination::read(node.acl_id, in);
/// Deserialize stat
Coordination::read(node.czxid, in);
Coordination::read(node.mzxid, in);
Coordination::read(node.stats.czxid, in);
Coordination::read(node.stats.mzxid, in);
/// For some reason ZXID specified in filename can be smaller
/// then actual zxid from nodes. In this case we will use zxid from nodes.
max_zxid = std::max(max_zxid, node.mzxid);
max_zxid = std::max(max_zxid, node.stats.mzxid);
int64_t ctime;
Coordination::read(ctime, in);
node.setCtime(ctime);
Coordination::read(node.mtime, in);
Coordination::read(node.version, in);
Coordination::read(node.cversion, in);
Coordination::read(node.aversion, in);
node.stats.setCtime(ctime);
Coordination::read(node.stats.mtime, in);
Coordination::read(node.stats.version, in);
Coordination::read(node.stats.cversion, in);
Coordination::read(node.stats.aversion, in);
int64_t ephemeral_owner;
Coordination::read(ephemeral_owner, in);
if (ephemeral_owner != 0)
node.setEphemeralOwner(ephemeral_owner);
Coordination::read(node.pzxid, in);
node.stats.setEphemeralOwner(ephemeral_owner);
Coordination::read(node.stats.pzxid, in);
if (!path.empty())
{
if (ephemeral_owner == 0)
node.setSeqNum(node.cversion);
node.stats.setSeqNum(node.stats.cversion);
storage.container.insertOrReplace(path, node);
if (ephemeral_owner != 0)
storage.ephemerals[ephemeral_owner].insert(path);
storage.committed_ephemerals[ephemeral_owner].insert(path);
storage.acl_map.addUsage(node.acl_id);
}
@ -149,7 +149,13 @@ int64_t deserializeStorageData(Storage & storage, ReadBuffer & in, LoggerPtr log
if (itr.key != "/")
{
auto parent_path = parentNodePath(itr.key);
storage.container.updateValue(parent_path, [my_path = itr.key] (typename Storage::Node & value) { value.addChild(getBaseNodeName(my_path)); value.increaseNumChildren(); });
storage.container.updateValue(
parent_path,
[my_path = itr.key](typename Storage::Node & value)
{
value.addChild(getBaseNodeName(my_path));
value.stats.increaseNumChildren();
});
}
}
@ -157,7 +163,7 @@ int64_t deserializeStorageData(Storage & storage, ReadBuffer & in, LoggerPtr log
}
template<typename Storage>
void deserializeKeeperStorageFromSnapshot(Storage & storage, const std::string & snapshot_path, LoggerPtr log)
void deserializeKeeperStorageFromSnapshot(Storage & storage, const std::string & snapshot_path, LoggerPtr log) TSA_NO_THREAD_SAFETY_ANALYSIS
{
LOG_INFO(log, "Deserializing storage snapshot {}", snapshot_path);
int64_t zxid = getZxidFromName(snapshot_path);
@ -487,7 +493,7 @@ bool hasErrorsInMultiRequest(Coordination::ZooKeeperRequestPtr request)
}
template<typename Storage>
bool deserializeTxn(Storage & storage, ReadBuffer & in, LoggerPtr /*log*/)
bool deserializeTxn(Storage & storage, ReadBuffer & in, LoggerPtr /*log*/) TSA_NO_THREAD_SAFETY_ANALYSIS
{
int64_t checksum;
Coordination::read(checksum, in);
@ -568,7 +574,7 @@ void deserializeLogAndApplyToStorage(Storage & storage, const std::string & log_
}
template<typename Storage>
void deserializeLogsAndApplyToStorage(Storage & storage, const std::string & path, LoggerPtr log)
void deserializeLogsAndApplyToStorage(Storage & storage, const std::string & path, LoggerPtr log) TSA_NO_THREAD_SAFETY_ANALYSIS
{
std::map<int64_t, std::string> existing_logs;
for (const auto & p : fs::directory_iterator(path))

View File

@ -1,6 +1,7 @@
#include <chrono>
#include <gtest/gtest.h>
#include "base/defines.h"
#include "config.h"
#if USE_NURAFT
@ -1540,7 +1541,7 @@ void addNode(Storage & storage, const std::string & path, const std::string & da
using Node = typename Storage::Node;
Node node{};
node.setData(data);
node.setEphemeralOwner(ephemeral_owner);
node.stats.setEphemeralOwner(ephemeral_owner);
storage.container.insertOrReplace(path, node);
auto child_it = storage.container.find(path);
auto child_path = DB::getBaseNodeName(child_it->key);
@ -1549,7 +1550,7 @@ void addNode(Storage & storage, const std::string & path, const std::string & da
[&](auto & parent)
{
parent.addChild(child_path);
parent.increaseNumChildren();
parent.stats.increaseNumChildren();
});
}
@ -1570,9 +1571,9 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotSimple)
addNode(storage, "/hello1", "world", 1);
addNode(storage, "/hello2", "somedata", 3);
storage.session_id_counter = 5;
storage.zxid = 2;
storage.ephemerals[3] = {"/hello2"};
storage.ephemerals[1] = {"/hello1"};
TSA_SUPPRESS_WARNING_FOR_WRITE(storage.zxid) = 2;
storage.committed_ephemerals[3] = {"/hello2"};
storage.committed_ephemerals[1] = {"/hello1"};
storage.getSessionID(130);
storage.getSessionID(130);
@ -1601,10 +1602,10 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotSimple)
EXPECT_EQ(restored_storage->container.getValue("/hello1").getData(), "world");
EXPECT_EQ(restored_storage->container.getValue("/hello2").getData(), "somedata");
EXPECT_EQ(restored_storage->session_id_counter, 7);
EXPECT_EQ(restored_storage->zxid, 2);
EXPECT_EQ(restored_storage->ephemerals.size(), 2);
EXPECT_EQ(restored_storage->ephemerals[3].size(), 1);
EXPECT_EQ(restored_storage->ephemerals[1].size(), 1);
EXPECT_EQ(restored_storage->getZXID(), 2);
EXPECT_EQ(restored_storage->committed_ephemerals.size(), 2);
EXPECT_EQ(restored_storage->committed_ephemerals[3].size(), 1);
EXPECT_EQ(restored_storage->committed_ephemerals[1].size(), 1);
EXPECT_EQ(restored_storage->session_and_timeout.size(), 2);
}
@ -2027,7 +2028,7 @@ TYPED_TEST(CoordinationTest, TestEphemeralNodeRemove)
state_machine->commit(1, entry_c->get_buf());
const auto & storage = state_machine->getStorageUnsafe();
EXPECT_EQ(storage.ephemerals.size(), 1);
EXPECT_EQ(storage.committed_ephemerals.size(), 1);
std::shared_ptr<ZooKeeperRemoveRequest> request_d = std::make_shared<ZooKeeperRemoveRequest>();
request_d->path = "/hello";
/// Delete from other session
@ -2035,7 +2036,7 @@ TYPED_TEST(CoordinationTest, TestEphemeralNodeRemove)
state_machine->pre_commit(2, entry_d->get_buf());
state_machine->commit(2, entry_d->get_buf());
EXPECT_EQ(storage.ephemerals.size(), 0);
EXPECT_EQ(storage.committed_ephemerals.size(), 0);
}
@ -2590,9 +2591,9 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotDifferentCompressions)
addNode(storage, "/hello1", "world", 1);
addNode(storage, "/hello2", "somedata", 3);
storage.session_id_counter = 5;
storage.zxid = 2;
storage.ephemerals[3] = {"/hello2"};
storage.ephemerals[1] = {"/hello1"};
TSA_SUPPRESS_WARNING_FOR_WRITE(storage.zxid) = 2;
storage.committed_ephemerals[3] = {"/hello2"};
storage.committed_ephemerals[1] = {"/hello1"};
storage.getSessionID(130);
storage.getSessionID(130);
@ -2617,10 +2618,10 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotDifferentCompressions)
EXPECT_EQ(restored_storage->container.getValue("/hello1").getData(), "world");
EXPECT_EQ(restored_storage->container.getValue("/hello2").getData(), "somedata");
EXPECT_EQ(restored_storage->session_id_counter, 7);
EXPECT_EQ(restored_storage->zxid, 2);
EXPECT_EQ(restored_storage->ephemerals.size(), 2);
EXPECT_EQ(restored_storage->ephemerals[3].size(), 1);
EXPECT_EQ(restored_storage->ephemerals[1].size(), 1);
EXPECT_EQ(restored_storage->getZXID(), 2);
EXPECT_EQ(restored_storage->committed_ephemerals.size(), 2);
EXPECT_EQ(restored_storage->committed_ephemerals[3].size(), 1);
EXPECT_EQ(restored_storage->committed_ephemerals[1].size(), 1);
EXPECT_EQ(restored_storage->session_and_timeout.size(), 2);
}
@ -2805,13 +2806,13 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotEqual)
storage.session_id_counter = 5;
storage.ephemerals[3] = {"/hello"};
storage.ephemerals[1] = {"/hello/somepath"};
storage.committed_ephemerals[3] = {"/hello"};
storage.committed_ephemerals[1] = {"/hello/somepath"};
for (size_t j = 0; j < 3333; ++j)
storage.getSessionID(130 * j);
DB::KeeperStorageSnapshot<Storage> snapshot(&storage, storage.zxid);
DB::KeeperStorageSnapshot<Storage> snapshot(&storage, storage.getZXID());
auto buf = manager.serializeSnapshotToBuffer(snapshot);
@ -3315,7 +3316,7 @@ TYPED_TEST(CoordinationTest, TestCheckNotExistsRequest)
create_path("/test_node");
auto node_it = storage.container.find("/test_node");
ASSERT_NE(node_it, storage.container.end());
auto node_version = node_it->value.version;
auto node_version = node_it->value.stats.version;
{
SCOPED_TRACE("CheckNotExists returns ZNODEEXISTS");
@ -3566,12 +3567,12 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest)
{
SCOPED_TRACE("Recursive Remove Ephemeral");
create("/T7", zkutil::CreateMode::Ephemeral);
ASSERT_EQ(storage.ephemerals.size(), 1);
ASSERT_EQ(storage.committed_ephemerals.size(), 1);
auto responses = remove_recursive("/T7", 100);
ASSERT_EQ(responses.size(), 1);
ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK);
ASSERT_EQ(storage.ephemerals.size(), 0);
ASSERT_EQ(storage.committed_ephemerals.size(), 0);
ASSERT_FALSE(exists("/T7"));
}
@ -3581,12 +3582,12 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest)
create("/T8/A", zkutil::CreateMode::Persistent);
create("/T8/B", zkutil::CreateMode::Ephemeral);
create("/T8/A/C", zkutil::CreateMode::Ephemeral);
ASSERT_EQ(storage.ephemerals.size(), 1);
ASSERT_EQ(storage.committed_ephemerals.size(), 1);
auto responses = remove_recursive("/T8", 4);
ASSERT_EQ(responses.size(), 1);
ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK);
ASSERT_EQ(storage.ephemerals.size(), 0);
ASSERT_EQ(storage.committed_ephemerals.size(), 0);
ASSERT_FALSE(exists("/T8"));
ASSERT_FALSE(exists("/T8/A"));
ASSERT_FALSE(exists("/T8/B"));
@ -3889,14 +3890,26 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveWatches)
auto responses = storage.processRequest(remove_request, 1, new_zxid);
ASSERT_EQ(responses.size(), 7);
/// request response is last
ASSERT_EQ(dynamic_cast<Coordination::ZooKeeperWatchResponse *>(responses.back().response.get()), nullptr);
for (size_t i = 0; i < 7; ++i)
std::unordered_map<std::string, std::vector<Coordination::Event>> expected_watch_responses
{
{"/A/B/D", {Coordination::Event::DELETED}},
{"/A/B", {Coordination::Event::CHILD, Coordination::Event::DELETED}},
{"/A/C", {Coordination::Event::DELETED}},
{"/A", {Coordination::Event::CHILD, Coordination::Event::DELETED}},
};
std::unordered_map<std::string, std::vector<Coordination::Event>> actual_watch_responses;
for (size_t i = 0; i < 6; ++i)
{
ASSERT_EQ(responses[i].response->error, Coordination::Error::ZOK);
if (const auto * watch_response = dynamic_cast<Coordination::ZooKeeperWatchResponse *>(responses[i].response.get()))
ASSERT_EQ(watch_response->type, Coordination::Event::DELETED);
const auto & watch_response = dynamic_cast<Coordination::ZooKeeperWatchResponse &>(*responses[i].response);
actual_watch_responses[watch_response.path].push_back(static_cast<Coordination::Event>(watch_response.type));
}
ASSERT_EQ(expected_watch_responses, actual_watch_responses);
ASSERT_EQ(storage.watches.size(), 0);
ASSERT_EQ(storage.list_watches.size(), 0);

View File

@ -338,11 +338,8 @@ size_t HashJoin::getTotalRowCount() const
return res;
}
size_t HashJoin::getTotalByteCount() const
void HashJoin::doDebugAsserts() const
{
if (!data)
return 0;
#ifndef NDEBUG
size_t debug_blocks_allocated_size = 0;
for (const auto & block : data->blocks)
@ -360,6 +357,14 @@ size_t HashJoin::getTotalByteCount() const
throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})",
data->blocks_nullmaps_allocated_size, debug_blocks_nullmaps_allocated_size);
#endif
}
size_t HashJoin::getTotalByteCount() const
{
if (!data)
return 0;
doDebugAsserts();
size_t res = 0;
@ -544,9 +549,11 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
have_compressed = true;
}
doDebugAsserts();
data->blocks_allocated_size += block_to_save.allocatedBytes();
data->blocks.emplace_back(std::move(block_to_save));
Block * stored_block = &data->blocks.back();
doDebugAsserts();
if (rows)
data->empty = false;
@ -634,9 +641,11 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (!flag_per_row && !is_inserted)
{
doDebugAsserts();
LOG_TRACE(log, "Skipping inserting block with {} rows", rows);
data->blocks_allocated_size -= stored_block->allocatedBytes();
data->blocks.pop_back();
doDebugAsserts();
}
if (!check_limits)
@ -683,6 +692,8 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
for (auto & stored_block : data->blocks)
{
doDebugAsserts();
size_t old_size = stored_block.allocatedBytes();
stored_block = stored_block.shrinkToFit();
size_t new_size = stored_block.allocatedBytes();
@ -700,6 +711,8 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
else
/// Sometimes after clone resized block can be bigger than original
data->blocks_allocated_size += new_size - old_size;
doDebugAsserts();
}
auto new_total_bytes_in_join = getTotalByteCount();
@ -1416,7 +1429,13 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
};
BlocksList sorted_blocks;
visit_rows_map(sorted_blocks, map);
doDebugAsserts();
data->blocks.swap(sorted_blocks);
size_t new_blocks_allocated_size = 0;
for (const auto & block : data->blocks)
new_blocks_allocated_size += block.allocatedBytes();
data->blocks_allocated_size = new_blocks_allocated_size;
doDebugAsserts();
}
}

View File

@ -470,6 +470,7 @@ private:
void tryRerangeRightTableData() override;
template <JoinKind KIND, typename Map, JoinStrictness STRICTNESS>
void tryRerangeRightTableDataImpl(Map & map);
void doDebugAsserts() const;
};
}

View File

@ -2102,7 +2102,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
renameTempPartAndReplace(part, transaction);
checkPartChecksumsAndCommit(transaction, part);
checkPartChecksumsAndCommit(transaction, part, /*hardlinked_files*/ {}, /*replace_zero_copy_lock*/ true);
writePartLog(PartLogElement::Type::NEW_PART, {}, 0 /** log entry is fake so we don't measure the time */,
part->name, part, {} /** log entry is fake so there are no initial parts */, nullptr,

View File

@ -6,6 +6,7 @@ import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
@ -145,9 +146,30 @@ def test_insert_select_replicated(cluster, min_rows_for_wide_part, files_per_par
) == (3 * FILES_OVERHEAD) + (files_per_part * 3)
def remove_leftovers_from_zk(node_data, node_for_query, replica_name):
replicas = node_data.query_with_retry(
"select name from system.zookeeper where path='/test/drop_table/replicas'"
)
if replica_name in replicas and "test_drop_table" not in node_data.query(
"show tables"
):
node_for_query.query(
f"system drop replica '{replica_name}' from table test_drop_table"
)
def test_drop_table(cluster):
node = list(cluster.instances.values())[0]
node2 = list(cluster.instances.values())[1]
# We are checking log entries in this test, so it should be empty before the execution.
node.rotate_logs()
node2.rotate_logs()
# drop table .. sync, doesn't removes replica from zk immediately. Prevent race contition by removing old nodes from zk.
remove_leftovers_from_zk(node, node2, "1")
remove_leftovers_from_zk(node2, node, "2")
node.query(
"create table test_drop_table (n int) engine=ReplicatedMergeTree('/test/drop_table', '1') order by n partition by n % 99 settings storage_policy='s3'"
)
@ -195,11 +217,7 @@ def test_drop_table(cluster):
)
# It could leave some leftovers, remove them
replicas = node.query_with_retry(
"select name from system.zookeeper where path='/test/drop_table/replicas'"
)
if "1" in replicas and "test_drop_table" not in node.query("show tables"):
node2.query("system drop replica '1' from table test_drop_table")
remove_leftovers_from_zk(node, node2, "1")
# Just in case table was not created due to connection errors
node.query(
@ -222,3 +240,26 @@ def test_drop_table(cluster):
"select count(n), sum(n) from test_drop_table"
)
node.query("drop table test_drop_table sync")
def test_s3_check_restore(cluster):
create_table(cluster)
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-02", 2)),
)
node1.query("DETACH TABLE s3_test;")
node2.query("SYSTEM DROP REPLICA '1' FROM TABLE s3_test;")
node2.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-02", 2)),
)
node1.query("ATTACH TABLE s3_test;")
node1.query("SYSTEM RESTORE REPLICA s3_test;")
assert_eq_with_retry(
node1,
"SELECT count() FROM system.replication_queue WHERE table='s3_test' and type='ATTACH_PART'",
"0\n",
)

View File

@ -1,21 +1,31 @@
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
%d: 123
%d: -123
%d: 0
%d: 9223372036854775807
%i: 123
%u: 123
%o: 173
%x: 7b
%X: 7B
%f: 0.000000
%f: 123.456000
%f: -123.456000
%F: 123.456000
%e: 1.234560e+02
%E: 1.234560E+02
%g: 123.456
%G: 123.456
%a: 0x1.edd2f1a9fbe77p+6
%A: 0X1.EDD2F1A9FBE77P+6
%s: abc
┌─printf('%%s: %s', '\n\t')─┐
1. │ %s:
└───────────────────────────┘
%s:
%%: %
%.5d: 00123
%.2f: 123.46
%.2e: 1.23e+02
%.2g: 1.2e+02
%.2s: ab

View File

@ -1,39 +1,47 @@
-- Testing integer formats
select printf('%%d: %d', 123) = '%d: 123';
select printf('%%i: %i', 123) = '%i: 123';
select printf('%%u: %u', 123) = '%u: 123';
select printf('%%o: %o', 123) = '%o: 173';
select printf('%%x: %x', 123) = '%x: 7b';
select printf('%%X: %X', 123) = '%X: 7B';
select printf('%%d: %d', 123);
select printf('%%d: %d', -123);
select printf('%%d: %d', 0);
select printf('%%d: %d', 9223372036854775807);
select printf('%%i: %i', 123);
select printf('%%u: %u', 123);
select printf('%%o: %o', 123);
select printf('%%x: %x', 123);
select printf('%%X: %X', 123);
-- Testing floating point formats
select printf('%%f: %f', 123.456) = '%f: 123.456000';
select printf('%%F: %F', 123.456) = '%F: 123.456000';
select printf('%%e: %e', 123.456) = '%e: 1.234560e+02';
select printf('%%E: %E', 123.456) = '%E: 1.234560E+02';
select printf('%%g: %g', 123.456) = '%g: 123.456';
select printf('%%G: %G', 123.456) = '%G: 123.456';
select printf('%%a: %a', 123.456) = '%a: 0x1.edd2f1a9fbe77p+6';
select printf('%%A: %A', 123.456) = '%A: 0X1.EDD2F1A9FBE77P+6';
select printf('%%f: %f', 0.0);
select printf('%%f: %f', 123.456);
select printf('%%f: %f', -123.456);
select printf('%%F: %F', 123.456);
select printf('%%e: %e', 123.456);
select printf('%%E: %E', 123.456);
select printf('%%g: %g', 123.456);
select printf('%%G: %G', 123.456);
select printf('%%a: %a', 123.456);
select printf('%%A: %A', 123.456);
-- Testing character formats
select printf('%%s: %s', 'abc') = '%s: abc';
select printf('%%s: %s', 'abc');
SELECT printf('%%s: %s', '\n\t') FORMAT PrettyCompact;
select printf('%%s: %s', '');
-- Testing the %% specifier
select printf('%%%%: %%') = '%%: %';
select printf('%%%%: %%');
-- Testing integer formats with precision
select printf('%%.5d: %.5d', 123) = '%.5d: 00123';
select printf('%%.5d: %.5d', 123);
-- Testing floating point formats with precision
select printf('%%.2f: %.2f', 123.456) = '%.2f: 123.46';
select printf('%%.2e: %.2e', 123.456) = '%.2e: 1.23e+02';
select printf('%%.2g: %.2g', 123.456) = '%.2g: 1.2e+02';
select printf('%%.2f: %.2f', 123.456);
select printf('%%.2e: %.2e', 123.456);
select printf('%%.2g: %.2g', 123.456);
-- Testing character formats with precision
select printf('%%.2s: %.2s', 'abc') = '%.2s: ab';
select printf('%%.2s: %.2s', 'abc');
select printf('%%X: %X', 123.123); -- { serverError BAD_ARGUMENTS }
select printf('%%A: %A', 'abc'); -- { serverError BAD_ARGUMENTS }
select printf('%%s: %s', 100); -- { serverError BAD_ARGUMENTS }
select printf('%%n: %n', 100); -- { serverError BAD_ARGUMENTS }
select printf('%%f: %f', 0); -- { serverError BAD_ARGUMENTS }

View File

@ -1114,6 +1114,7 @@ void Runner::runBenchmarkFromLog()
else
{
request_from_log->connection = get_zookeeper_connection(request_from_log->session_id);
request_from_log->executor_id %= concurrency;
push_request(std::move(*request_from_log));
}

View File

@ -28,13 +28,13 @@ void dumpMachine(std::shared_ptr<KeeperStateMachine<DB::KeeperMemoryStorage>> ma
keys.pop();
std::cout << key << "\n";
auto value = storage.container.getValue(key);
std::cout << "\tStat: {version: " << value.version <<
", mtime: " << value.mtime <<
", emphemeralOwner: " << value.ephemeralOwner() <<
", czxid: " << value.czxid <<
", mzxid: " << value.mzxid <<
", numChildren: " << value.numChildren() <<
", dataLength: " << value.data_size <<
std::cout << "\tStat: {version: " << value.stats.version <<
", mtime: " << value.stats.mtime <<
", emphemeralOwner: " << value.stats.ephemeralOwner() <<
", czxid: " << value.stats.czxid <<
", mzxid: " << value.stats.mzxid <<
", numChildren: " << value.stats.numChildren() <<
", dataLength: " << value.stats.data_size <<
"}" << std::endl;
std::cout << "\tData: " << storage.container.getValue(key).getData() << std::endl;