keeper: store only unique session IDs for watches

This should speed up keeper, especially in case of incorrect usage (like
the case that had been fixed in #37640), especially in case on non
release build.

And also this should fix SIGKILL in stress tests.

You will find some details for one of such SIGKILL in `<details>` tag [1]:

<details>

    $ pigz -cd clickhouse-server.stress.log.gz | tail
    2022.05.27 16:17:24.882971 [ 637 ] {} <Trace> BackgroundSchedulePool/BgSchPool: Waiting for threads to finish.
    2022.05.27 16:17:24.896749 [ 637 ] {} <Debug> MemoryTracker: Peak memory usage (for query): 4.09 MiB.
    2022.05.27 16:17:24.907163 [ 637 ] {} <Debug> Application: Shut down storages.
    2022.05.27 16:17:24.907233 [ 637 ] {} <Debug> Application: Waiting for current connections to servers for tables to finish.
    2022.05.27 16:17:24.934335 [ 637 ] {} <Information> Application: Closed all listening sockets. Waiting for 1 outstanding connections.
    2022.05.27 16:17:29.843491 [ 637 ] {} <Information> Application: Closed connections to servers for tables. But 1 remain. Probably some tables of other users cannot finish their connections after context shutdown.
    2022.05.27 16:17:29.843632 [ 637 ] {} <Debug> KeeperDispatcher: Shutting down storage dispatcher
    2022.05.27 16:17:34.612616 [ 688 ] {} <Test> virtual Coordination::ZooKeeperRequest::~ZooKeeperRequest(): Processing of request xid=2147483647 took 10000 ms
    2022.05.27 16:17:54.612109 [ 3176 ] {} <Debug> KeeperTCPHandler: Session #12 expired
    2022.05.27 16:19:59.823038 [ 635 ] {} <Fatal> Application: Child process was terminated by signal 9 (KILL). If it is not done by 'forcestop' command or manually, the possible cause is OOM Killer (see 'dmesg' and look at the '/var/log/kern.log' for the details).

    Thread 26 (Thread 0x7f1c7703f700 (LWP 708)):
    0  0x000000000b074b2a in __tsan::MemoryAccessImpl(__tsan::ThreadState*, unsigned long, int, bool, bool, unsigned long long*, __tsan::Shadow) ()
    1  0x000000000b08630c in __tsan::MemoryAccessRange(__tsan::ThreadState*, unsigned long, unsigned long, unsigned long, bool) ()
    2  0x000000000b01ff03 in memmove ()
    3  0x000000001bbc8996 in std::__1::__move<long, long> (__first=0xb8600000d83304, __last=<optimized out>, __result=0x7f1c021cd000) at ../contrib/libcxx/include/__algorithm/move.h:57
    4  std::__1::move<long*, long*> (__first=0xb8600000d83304, __last=<optimized out>, __result=0x7f1c021cd000) at ../contrib/libcxx/include/__algorithm/move.h:70
    5  std::__1::vector<long, std::__1::allocator<long> >::erase (this=0x7b1400584c48, __position=...) at ../contrib/libcxx/include/vector:1608
    6  DB::KeeperStorage::clearDeadWatches (this=0x7b5800001ad8, this@entry=0x7b5800001800, session_id=session_id@entry=12) at ../src/Coordination/KeeperStorage.cpp:1228
    7  0x000000001bbc5c55 in DB::KeeperStorage::processRequest (this=0x7b5800001800, zk_request=..., session_id=12, time=1, new_last_zxid=..., check_acl=true) at ../src/Coordination/KeeperStorage.cpp:1122
    8  0x000000001bba06a3 in DB::KeeperStateMachine::commit (this=<optimized out>, log_idx=3549, data=...) at ../src/Coordination/KeeperStateMachine.cpp:143
    9  0x000000001bba6193 in nuraft::state_machine::commit_ext (this=0x7b4c00001f98, params=...) at ../contrib/NuRaft/include/libnuraft/state_machine.hxx:75
    10 0x00000000202c5a55 in nuraft::raft_server::commit_app_log (this=this@entry=0x7b6c00002a18, idx_to_commit=idx_to_commit@entry=3549, le=..., need_to_handle_commit_elem=true, initial_commit_exec=false) at ../contrib/NuRaft/src/handle_commit.cxx:311
    11 0x00000000202c4f98 in nuraft::raft_server::commit_in_bg_exec (this=<optimized out>, this@entry=0x7b6c00002a18, timeout_ms=timeout_ms@entry=0, initial_commit_exec=false) at ../contrib/NuRaft/src/handle_commit.cxx:241
    12 0x00000000202c4613 in nuraft::raft_server::commit_in_bg (this=this@entry=0x7b6c00002a18) at ../contrib/NuRaft/src/handle_commit.cxx:149
    ...
    Thread 28 (Thread 0x7f1c7603d700 (LWP 710)):
    0  0x00007f1d22a6d110 in __lll_lock_wait () from /lib/x86_64-linux-gnu/libpthread.so.0
    1  0x00007f1d22a650a3 in pthread_mutex_lock () from /lib/x86_64-linux-gnu/libpthread.so.0
    2  0x000000000b0337b0 in pthread_mutex_lock ()
    3  0x00000000221884da in std::__1::__libcpp_mutex_lock (__m=0x7b4c00002088) at ../contrib/libcxx/include/__threading_support:303
    4  std::__1::mutex::lock (this=0x7b4c00002088) at ../contrib/libcxx/src/mutex.cpp:33
    5  0x000000001bba4188 in std::__1::lock_guard<std::__1::mutex>::lock_guard (__m=..., this=<optimized out>) at ../contrib/libcxx/include/__mutex_base:91
    6  DB::KeeperStateMachine::getDeadSessions (this=0x7b4c00001f98) at ../src/Coordination/KeeperStateMachine.cpp:360
    7  0x000000001bb79b4b in DB::KeeperServer::getDeadSessions (this=0x7b4400012700) at ../src/Coordination/KeeperServer.cpp:572
    8  0x000000001bb64d1a in DB::KeeperDispatcher::sessionCleanerTask (this=<optimized out>, this@entry=0x7b640001c218) at ../src/Coordination/KeeperDispatcher.cpp:399
    ...
    Thread 1 (Thread 0x7f1d227148c0 (LWP 637)):
    0  0x00007f1d22a69376 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/x86_64-linux-gnu/libpthread.so.0
    1  0x000000000b0895e0 in __tsan::call_pthread_cancel_with_cleanup(int (*)(void*), void (*)(void*), void*) ()
    2  0x000000000b017091 in pthread_cond_wait ()
    3  0x0000000020569d98 in Poco::EventImpl::waitImpl (this=0x7b2000008798) at ../contrib/poco/Foundation/src/Event_POSIX.cpp:106
    4  0x000000001bb636cf in Poco::Event::wait (this=0x7b2000008798) at ../contrib/poco/Foundation/include/Poco/Event.h:97
    5  ThreadFromGlobalPool::join (this=<optimized out>) at ../src/Common/ThreadPool.h:217
    6  DB::KeeperDispatcher::shutdown (this=0x7b640001c218) at ../src/Coordination/KeeperDispatcher.cpp:322
    7  0x0000000019ca8bfc in DB::Context::shutdownKeeperDispatcher (this=<optimized out>) at ../src/Interpreters/Context.cpp:2111
    8  0x000000000b0a979b in DB::Server::main(std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&)::$_9::operator()() const (this=0x7ffcde44f0a0) at ../programs/server/Server.cpp:1407

</details>

  [1]: https://s3.amazonaws.com/clickhouse-test-reports/37593/2613149f6bf4f242bbbf2c3c8539b5176fd77286/stress_test__thread__actions_.html

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-05-30 11:43:49 +03:00
parent 33b2a50740
commit d86181d3cd
2 changed files with 8 additions and 20 deletions

View File

@ -1166,12 +1166,12 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
? list_watches
: watches;
watches_type[zk_request->getPath()].emplace_back(session_id);
watches_type[zk_request->getPath()].emplace(session_id);
sessions_and_watchers[session_id].emplace(zk_request->getPath());
}
else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists)
{
watches[zk_request->getPath()].emplace_back(session_id);
watches[zk_request->getPath()].emplace(session_id);
sessions_and_watchers[session_id].emplace(zk_request->getPath());
}
}
@ -1206,13 +1206,7 @@ void KeeperStorage::clearDeadWatches(int64_t session_id)
if (watch != watches.end())
{
auto & watches_for_path = watch->second;
for (auto w_it = watches_for_path.begin(); w_it != watches_for_path.end();)
{
if (*w_it == session_id)
w_it = watches_for_path.erase(w_it);
else
++w_it;
}
watches_for_path.erase(session_id);
if (watches_for_path.empty())
watches.erase(watch);
}
@ -1222,13 +1216,7 @@ void KeeperStorage::clearDeadWatches(int64_t session_id)
if (list_watch != list_watches.end())
{
auto & list_watches_for_path = list_watch->second;
for (auto w_it = list_watches_for_path.begin(); w_it != list_watches_for_path.end();)
{
if (*w_it == session_id)
w_it = list_watches_for_path.erase(w_it);
else
++w_it;
}
list_watches_for_path.erase(session_id);
if (list_watches_for_path.empty())
list_watches.erase(list_watch);
}
@ -1250,7 +1238,7 @@ void KeeperStorage::dumpWatches(WriteBufferFromOwnString & buf) const
void KeeperStorage::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
{
auto write_int_vec = [&buf](const std::vector<int64_t> & session_ids)
auto write_int_container = [&buf](const auto & session_ids)
{
for (int64_t session_id : session_ids)
{
@ -1261,13 +1249,13 @@ void KeeperStorage::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
for (const auto & [watch_path, sessions] : watches)
{
buf << watch_path << "\n";
write_int_vec(sessions);
write_int_container(sessions);
}
for (const auto & [watch_path, sessions] : list_watches)
{
buf << watch_path << "\n";
write_int_vec(sessions);
write_int_container(sessions);
}
}

View File

@ -96,7 +96,7 @@ public:
using Container = SnapshotableHashTable<Node>;
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionIDs = std::vector<int64_t>;
using SessionIDs = std::unordered_set<int64_t>;
/// Just vector of SHA1 from user:password
using AuthIDs = std::vector<AuthID>;