From 04047f76c75a4cc2826eca016c776c95261071d4 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Fri, 8 Oct 2021 11:48:08 +0300 Subject: [PATCH] Fixed tests --- base/loggers/OwnSplitChannel.cpp | 2 +- src/Access/ReplicatedAccessStorage.cpp | 5 +- src/Access/ReplicatedAccessStorage.h | 17 ++-- src/Common/ConcurrentBoundedQueue.h | 7 ++ src/Coordination/KeeperDispatcher.cpp | 8 +- src/Coordination/KeeperStateMachine.cpp | 9 ++- src/Coordination/KeeperStateMachine.h | 5 +- src/Coordination/ThreadSafeQueue.h | 78 ------------------- src/Coordination/tests/gtest_coordination.cpp | 4 +- src/Processors/Formats/LazyOutputFormat.cpp | 2 +- src/Processors/Formats/LazyOutputFormat.h | 7 +- src/Server/KeeperTCPHandler.cpp | 4 +- src/Server/KeeperTCPHandler.h | 4 +- .../ReadBufferFromRabbitMQConsumer.cpp | 5 ++ .../WriteBufferToRabbitMQProducer.cpp | 3 +- utils/keeper-data-dumper/main.cpp | 2 +- 16 files changed, 55 insertions(+), 107 deletions(-) delete mode 100644 src/Coordination/ThreadSafeQueue.h diff --git a/base/loggers/OwnSplitChannel.cpp b/base/loggers/OwnSplitChannel.cpp index 16c9fedf21d..2349c60856f 100644 --- a/base/loggers/OwnSplitChannel.cpp +++ b/base/loggers/OwnSplitChannel.cpp @@ -100,7 +100,7 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg) columns[i++]->insert(msg.getSource()); columns[i++]->insert(msg.getText()); - (void)(logs_queue->emplace(std::move(columns))); + [[maybe_unused]] bool push_result = logs_queue->emplace(std::move(columns)); } /// Also log to system.text_log table, if message is not too noisy diff --git a/src/Access/ReplicatedAccessStorage.cpp b/src/Access/ReplicatedAccessStorage.cpp index 921d7bfe37f..7b29aab3a89 100644 --- a/src/Access/ReplicatedAccessStorage.cpp +++ b/src/Access/ReplicatedAccessStorage.cpp @@ -34,6 +34,7 @@ ReplicatedAccessStorage::ReplicatedAccessStorage( : IAccessStorage(storage_name_) , zookeeper_path(zookeeper_path_) , get_zookeeper(get_zookeeper_) + , refresh_queue(std::numeric_limits::max()) { if (zookeeper_path.empty()) throw Exception("ZooKeeper path must be non-empty", ErrorCodes::BAD_ARGUMENTS); @@ -366,7 +367,7 @@ void ReplicatedAccessStorage::refreshEntities(const zkutil::ZooKeeperPtr & zooke const String zookeeper_uuids_path = zookeeper_path + "/uuid"; auto watch_entities_list = [this](const Coordination::WatchResponse &) { - refresh_queue.push(UUIDHelpers::Nil); + [[maybe_unused]] bool push_result = refresh_queue.push(UUIDHelpers::Nil); }; Coordination::Stat stat; const auto entity_uuid_strs = zookeeper->getChildrenWatch(zookeeper_uuids_path, &stat, watch_entities_list); @@ -418,7 +419,7 @@ void ReplicatedAccessStorage::refreshEntityNoLock(const zkutil::ZooKeeperPtr & z const auto watch_entity = [this, id](const Coordination::WatchResponse & response) { if (response.type == Coordination::Event::CHANGED) - refresh_queue.push(id); + [[maybe_unused]] bool push_result = refresh_queue.push(id); }; Coordination::Stat entity_stat; const String entity_path = zookeeper_path + "/uuid/" + toString(id); diff --git a/src/Access/ReplicatedAccessStorage.h b/src/Access/ReplicatedAccessStorage.h index 0df1d5ef5ff..458bc0d614b 100644 --- a/src/Access/ReplicatedAccessStorage.h +++ b/src/Access/ReplicatedAccessStorage.h @@ -1,17 +1,20 @@ #pragma once -#include -#include -#include -#include -#include -#include #include #include #include #include #include +#include + +#include +#include +#include +#include + +#include + namespace DB { @@ -36,7 +39,7 @@ private: std::atomic initialized = false; std::atomic stop_flag = false; ThreadFromGlobalPool worker_thread; - ThreadSafeQueue refresh_queue; + ConcurrentBoundedQueue refresh_queue; UUID insertImpl(const AccessEntityPtr & entity, bool replace_if_exists) override; void removeImpl(const UUID & id) override; diff --git a/src/Common/ConcurrentBoundedQueue.h b/src/Common/ConcurrentBoundedQueue.h index fdd1c7cc388..e15aa462f1c 100644 --- a/src/Common/ConcurrentBoundedQueue.h +++ b/src/Common/ConcurrentBoundedQueue.h @@ -179,6 +179,13 @@ public: return is_finished; } + /// Returns if queue is finished and empty + bool isFinishedAndEmpty() const + { + std::lock_guard lock(queue_mutex); + return is_finished && queue.empty(); + } + /// Clear queue void clear() { diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 8f465e59ed5..3823a4129e3 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -15,6 +15,7 @@ namespace ErrorCodes KeeperDispatcher::KeeperDispatcher() : coordination_settings(std::make_shared()) + , responses_queue(std::numeric_limits::max()) , log(&Poco::Logger::get("KeeperDispatcher")) { } @@ -414,7 +415,12 @@ void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSession response->xid = request->xid; response->zxid = 0; response->error = error; - responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response}); + if (!responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response})) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Could not push error response xid {} zxid {} error message {} to responses queue", + response->xid, + response->zxid, + errorMessage(error)); } } diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 2617c01e24f..b1c8aab697e 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -120,7 +120,8 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n session_id = storage->getSessionID(session_id_request.session_timeout_ms); LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms); response->session_id = session_id; - responses_queue.push(response_for_session); + if (!responses_queue.push(response_for_session)) + LOG_WARNING(log, "Could not push response {} into responses queue", session_id); } } else @@ -128,7 +129,8 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n std::lock_guard lock(storage_and_responses_lock); KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx); for (auto & response_for_session : responses_for_sessions) - responses_queue.push(response_for_session); + if (!responses_queue.push(response_for_session)) + LOG_WARNING(log, "Could not push response {} into responses queue", response_for_session.session_id); } last_committed_idx = log_idx; @@ -305,7 +307,8 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi std::lock_guard lock(storage_and_responses_lock); auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt); for (const auto & response : responses) - responses_queue.push(response); + if (!responses_queue.push(response)) + LOG_WARNING(log, "Could not push response {} into responses queue", response.session_id); } std::vector KeeperStateMachine::getDeadSessions() diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index fcf9c7d14c4..983692f7b7f 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -1,16 +1,17 @@ #pragma once +#include #include #include // Y_IGNORE #include -#include #include #include + namespace DB { -using ResponsesQueue = ThreadSafeQueue; +using ResponsesQueue = ConcurrentBoundedQueue; using SnapshotsQueue = ConcurrentBoundedQueue; /// ClickHouse Keeper state machine. Wrapper for KeeperStorage. diff --git a/src/Coordination/ThreadSafeQueue.h b/src/Coordination/ThreadSafeQueue.h deleted file mode 100644 index ba79a99f94e..00000000000 --- a/src/Coordination/ThreadSafeQueue.h +++ /dev/null @@ -1,78 +0,0 @@ -#pragma once - -#include -#include - -#include - - -namespace DB -{ - -/// Queue with mutex and condvar. As simple as possible. -template -class ThreadSafeQueue -{ -private: - mutable std::mutex queue_mutex; - std::condition_variable cv; - std::queue queue; - bool is_finished; -public: - - bool push(const T & response) - { - { - std::lock_guard lock(queue_mutex); - - if (is_finished) - return false; - - queue.push(response); - } - - cv.notify_one(); - return true; - } - - [[nodiscard]] bool tryPop(T & response, int64_t timeout_ms = 0) - { - std::unique_lock lock(queue_mutex); - if (!cv.wait_for(lock, - std::chrono::milliseconds(timeout_ms), [this] { return is_finished || !queue.empty(); })) - return false; - - if (is_finished && queue.empty()) - return false; - - ::detail::moveOrCopyIfThrow(std::move(queue.front()), response); - queue.pop(); - - return true; - } - - size_t size() const - { - std::lock_guard lock(queue_mutex); - return queue.size(); - } - - bool isFinished() const - { - std::lock_guard lock(queue_mutex); - return is_finished; - } - - bool finish() - { - std::lock_guard lock(queue_mutex); - bool was_finished_before = is_finished; - is_finished = true; - - cv.notify_all(); - - return was_finished_before; - } -}; - -} diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 3c85f8c410c..0cb5972f718 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1159,7 +1159,7 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint ChangelogDirTest snapshots("./snapshots"); ChangelogDirTest logs("./logs"); - ResponsesQueue queue; + ResponsesQueue queue(std::numeric_limits::max()); SnapshotsQueue snapshots_queue{1}; auto state_machine = std::make_shared(queue, snapshots_queue, "./snapshots", settings); state_machine->init(); @@ -1310,7 +1310,7 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove) ChangelogDirTest snapshots("./snapshots"); CoordinationSettingsPtr settings = std::make_shared(); - ResponsesQueue queue; + ResponsesQueue queue(std::numeric_limits::max()); SnapshotsQueue snapshots_queue{1}; auto state_machine = std::make_shared(queue, snapshots_queue, "./snapshots", settings); state_machine->init(); diff --git a/src/Processors/Formats/LazyOutputFormat.cpp b/src/Processors/Formats/LazyOutputFormat.cpp index 4d919833df2..792d805eac3 100644 --- a/src/Processors/Formats/LazyOutputFormat.cpp +++ b/src/Processors/Formats/LazyOutputFormat.cpp @@ -9,7 +9,7 @@ WriteBuffer LazyOutputFormat::out(nullptr, 0); Chunk LazyOutputFormat::getChunk(UInt64 milliseconds) { - if (queue.isFinished()) + if (isFinished()) return {}; Chunk chunk; diff --git a/src/Processors/Formats/LazyOutputFormat.h b/src/Processors/Formats/LazyOutputFormat.h index 91309be8570..50dc87f2e70 100644 --- a/src/Processors/Formats/LazyOutputFormat.h +++ b/src/Processors/Formats/LazyOutputFormat.h @@ -23,7 +23,7 @@ public: Chunk getTotals(); Chunk getExtremes(); - bool isFinished() { return queue.isFinished(); } + bool isFinished() { return queue.isFinishedAndEmpty(); } BlockStreamProfileInfo & getProfileInfo() { return info; } @@ -31,7 +31,7 @@ public: void onCancel() override { - queue.finish(); + queue.clearAndFinish(); } void finalize() override @@ -44,8 +44,7 @@ public: protected: void consume(Chunk chunk) override { - if (!queue.isFinished()) - (void)(queue.emplace(std::move(chunk))); + (void)(queue.emplace(std::move(chunk))); } void consumeTotals(Chunk chunk) override { totals = std::move(chunk); } diff --git a/src/Server/KeeperTCPHandler.cpp b/src/Server/KeeperTCPHandler.cpp index 7ebbda9dfe6..48519f0af30 100644 --- a/src/Server/KeeperTCPHandler.cpp +++ b/src/Server/KeeperTCPHandler.cpp @@ -198,7 +198,7 @@ KeeperTCPHandler::KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSoc , operation_timeout(0, global_context->getConfigRef().getUInt("keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000) , session_timeout(0, global_context->getConfigRef().getUInt("keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000) , poll_wrapper(std::make_unique(socket_)) - , responses(std::make_unique()) + , responses(std::make_unique(std::numeric_limits::max())) { } @@ -314,7 +314,7 @@ void KeeperTCPHandler::runImpl() auto response_fd = poll_wrapper->getResponseFD(); auto response_callback = [this, response_fd] (const Coordination::ZooKeeperResponsePtr & response) { - responses->push(response); + [[maybe_unused]] bool push_result = responses->push(response); UInt8 single_byte = 1; [[maybe_unused]] int result = write(response_fd, &single_byte, sizeof(single_byte)); }; diff --git a/src/Server/KeeperTCPHandler.h b/src/Server/KeeperTCPHandler.h index 7abfb72c846..274fb21af63 100644 --- a/src/Server/KeeperTCPHandler.h +++ b/src/Server/KeeperTCPHandler.h @@ -13,10 +13,10 @@ #include #include #include +#include #include #include #include -#include #include namespace DB @@ -25,7 +25,7 @@ namespace DB struct SocketInterruptablePollWrapper; using SocketInterruptablePollWrapperPtr = std::unique_ptr; -using ThreadSafeResponseQueue = ThreadSafeQueue; +using ThreadSafeResponseQueue = ConcurrentBoundedQueue; using ThreadSafeResponseQueuePtr = std::unique_ptr; diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index 6f42aeb9776..ac60d748e36 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -14,6 +14,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( ChannelPtr consumer_channel_, RabbitMQHandler & event_handler_, diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 7a9ec1d567b..c4d5b0e9a23 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -21,6 +21,7 @@ static const auto RETURNED_LIMIT = 50000; namespace ErrorCodes { extern const int CANNOT_CONNECT_RABBITMQ; + extern const int LOGICAL_ERROR; } WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( @@ -181,7 +182,7 @@ void WriteBufferToRabbitMQProducer::removeRecord(UInt64 received_delivery_tag, b else { if (republish) - if (returned.push(record_iter->second)) + if (!returned.push(record_iter->second)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to returned queue"); delivery_record.erase(record_iter); diff --git a/utils/keeper-data-dumper/main.cpp b/utils/keeper-data-dumper/main.cpp index b238c2ef569..ed6a7aea972 100644 --- a/utils/keeper-data-dumper/main.cpp +++ b/utils/keeper-data-dumper/main.cpp @@ -59,7 +59,7 @@ int main(int argc, char *argv[]) Poco::Logger::root().setLevel("trace"); } auto * logger = &Poco::Logger::get("keeper-dumper"); - ResponsesQueue queue; + ResponsesQueue queue(std::numeric_limits::max()); SnapshotsQueue snapshots_queue{1}; CoordinationSettingsPtr settings = std::make_shared(); auto state_machine = std::make_shared(queue, snapshots_queue, argv[1], settings);