#include #if USE_NURAFT #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef POCO_HAVE_FD_EPOLL #include #else #include #endif namespace ProfileEvents { extern const Event KeeperTotalElapsedMicroseconds; } namespace DB { struct LastOp { public: String name{"NA"}; int64_t last_cxid{-1}; int64_t last_zxid{-1}; int64_t last_response_time{0}; }; static const LastOp EMPTY_LAST_OP {"NA", -1, -1, 0}; namespace ErrorCodes { extern const int SYSTEM_ERROR; extern const int LOGICAL_ERROR; extern const int UNEXPECTED_PACKET_FROM_CLIENT; extern const int TIMEOUT_EXCEEDED; } struct PollResult { size_t responses_count{0}; bool has_requests{false}; bool error{false}; }; struct SocketInterruptablePollWrapper { int sockfd; PipeFDs pipe; ReadBufferFromFileDescriptor response_in; #if defined(POCO_HAVE_FD_EPOLL) int epollfd; epoll_event socket_event{}; epoll_event pipe_event{}; #endif using InterruptCallback = std::function; explicit SocketInterruptablePollWrapper(const Poco::Net::StreamSocket & poco_socket_) : sockfd(poco_socket_.impl()->sockfd()) , response_in(pipe.fds_rw[0]) { pipe.setNonBlockingReadWrite(); #if defined(POCO_HAVE_FD_EPOLL) epollfd = epoll_create(2); if (epollfd < 0) throw ErrnoException(ErrorCodes::SYSTEM_ERROR, "Cannot epoll_create"); socket_event.events = EPOLLIN | EPOLLERR | EPOLLPRI; socket_event.data.fd = sockfd; if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &socket_event) < 0) { int err = ::close(epollfd); chassert(!err || errno == EINTR); throw ErrnoException(ErrorCodes::SYSTEM_ERROR, "Cannot insert socket into epoll queue"); } pipe_event.events = EPOLLIN | EPOLLERR | EPOLLPRI; pipe_event.data.fd = pipe.fds_rw[0]; if (epoll_ctl(epollfd, EPOLL_CTL_ADD, pipe.fds_rw[0], &pipe_event) < 0) { int err = ::close(epollfd); chassert(!err || errno == EINTR); throw ErrnoException(ErrorCodes::SYSTEM_ERROR, "Cannot insert socket into epoll queue"); } #endif } int getResponseFD() const { return pipe.fds_rw[1]; } PollResult poll(Poco::Timespan remaining_time, const ReadBufferFromPocoSocket & in) { bool socket_ready = false; bool fd_ready = false; if (in.available() != 0) socket_ready = true; if (response_in.available() != 0) fd_ready = true; int rc = 0; if (!fd_ready) { #if defined(POCO_HAVE_FD_EPOLL) epoll_event evout[2]; evout[0].data.fd = evout[1].data.fd = -1; do { Poco::Timestamp start; /// TODO: use epoll_pwait() for more precise timers rc = epoll_wait(epollfd, evout, 2, static_cast(remaining_time.totalMilliseconds())); if (rc < 0 && errno == EINTR) { Poco::Timestamp end; Poco::Timespan waited = end - start; if (waited < remaining_time) remaining_time -= waited; else remaining_time = 0; } } while (rc < 0 && errno == EINTR); for (int i = 0; i < rc; ++i) { if (evout[i].data.fd == sockfd) socket_ready = true; if (evout[i].data.fd == pipe.fds_rw[0]) fd_ready = true; } #else pollfd poll_buf[2]; poll_buf[0].fd = sockfd; poll_buf[0].events = POLLIN; poll_buf[1].fd = pipe.fds_rw[0]; poll_buf[1].events = POLLIN; do { Poco::Timestamp start; rc = ::poll(poll_buf, 2, static_cast(remaining_time.totalMilliseconds())); if (rc < 0 && errno == POCO_EINTR) { Poco::Timestamp end; Poco::Timespan waited = end - start; if (waited < remaining_time) remaining_time -= waited; else remaining_time = 0; } } while (rc < 0 && errno == POCO_EINTR); if (rc >= 1) { if (poll_buf[0].revents & POLLIN) socket_ready = true; if (poll_buf[1].revents & POLLIN) fd_ready = true; } #endif } PollResult result{}; result.has_requests = socket_ready; if (fd_ready) { UInt8 dummy; readIntBinary(dummy, response_in); result.responses_count = 1; auto available = response_in.available(); response_in.ignore(available); result.responses_count += available; } if (rc < 0) result.error = true; return result; } #if defined(POCO_HAVE_FD_EPOLL) ~SocketInterruptablePollWrapper() { int err = ::close(epollfd); chassert(!err || errno == EINTR); } #endif }; KeeperTCPHandler::KeeperTCPHandler( const Poco::Util::AbstractConfiguration & config_ref, std::shared_ptr keeper_dispatcher_, Poco::Timespan receive_timeout_, Poco::Timespan send_timeout_, const Poco::Net::StreamSocket & socket_) : Poco::Net::TCPServerConnection(socket_) , log(getLogger("KeeperTCPHandler")) , keeper_dispatcher(keeper_dispatcher_) , operation_timeout( 0, config_ref.getUInt( "keeper_server.coordination_settings.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000) , min_session_timeout( 0, config_ref.getUInt( "keeper_server.coordination_settings.min_session_timeout_ms", Coordination::DEFAULT_MIN_SESSION_TIMEOUT_MS) * 1000) , max_session_timeout( 0, config_ref.getUInt( "keeper_server.coordination_settings.session_timeout_ms", Coordination::DEFAULT_MAX_SESSION_TIMEOUT_MS) * 1000) , poll_wrapper(std::make_unique(socket_)) , send_timeout(send_timeout_) , receive_timeout(receive_timeout_) , responses(std::make_unique(std::numeric_limits::max())) , last_op(std::make_unique(EMPTY_LAST_OP)) { KeeperTCPHandler::registerConnection(this); } void KeeperTCPHandler::sendHandshake(bool has_leader, bool & use_compression) { Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, *out); if (has_leader) { if (use_compression) Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION, *out); else Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, *out); } else { /// Ignore connections if we are not leader, client will throw exception /// and reconnect to another replica faster. ClickHouse client provide /// clear message for such protocol version. Coordination::write(Coordination::KEEPER_PROTOCOL_VERSION_CONNECTION_REJECT, *out); } Coordination::write(static_cast(session_timeout.totalMilliseconds()), *out); Coordination::write(session_id, *out); std::array passwd{}; Coordination::write(passwd, *out); out->next(); } void KeeperTCPHandler::run() { runImpl(); } Poco::Timespan KeeperTCPHandler::receiveHandshake(int32_t handshake_length, bool & use_compression) { int32_t protocol_version; int64_t last_zxid_seen; int32_t timeout_ms; int64_t previous_session_id = 0; /// We don't support session restore. So previous session_id is always zero. std::array passwd {}; if (!isHandShake(handshake_length)) throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected handshake length received: {}", toString(handshake_length)); Coordination::read(protocol_version, *in); if (protocol_version != Coordination::ZOOKEEPER_PROTOCOL_VERSION && protocol_version != Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION) throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected protocol version: {}", toString(protocol_version)); use_compression = (protocol_version == Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION); Coordination::read(last_zxid_seen, *in); Coordination::read(timeout_ms, *in); /// TODO Stop ignoring this value Coordination::read(previous_session_id, *in); Coordination::read(passwd, *in); int8_t readonly; if (handshake_length == Coordination::CLIENT_HANDSHAKE_LENGTH_WITH_READONLY) Coordination::read(readonly, *in); return Poco::Timespan(timeout_ms * 1000); } void KeeperTCPHandler::runImpl() { setThreadName("KeeperHandler"); socket().setReceiveTimeout(receive_timeout); socket().setSendTimeout(send_timeout); socket().setNoDelay(true); in.emplace(socket()); out.emplace(socket()); compressed_in.reset(); compressed_out.reset(); bool use_compression = false; if (in->eof()) { LOG_WARNING(log, "Client has not sent any data."); return; } int32_t header; try { Coordination::read(header, *in); } catch (const Exception & e) { LOG_WARNING(log, "Error while read connection header {}", e.displayText()); return; } /// All four letter word command code is larger than 2^24 or lower than 0. /// Hand shake package length must be lower than 2^24 and larger than 0. /// So collision never happens. int32_t four_letter_cmd = header; if (!isHandShake(four_letter_cmd)) { connected.store(true, std::memory_order_relaxed); tryExecuteFourLetterWordCmd(four_letter_cmd); return; } try { int32_t handshake_length = header; auto client_timeout = receiveHandshake(handshake_length, use_compression); if (client_timeout.totalMilliseconds() == 0) client_timeout = Poco::Timespan(Coordination::DEFAULT_SESSION_TIMEOUT_MS * Poco::Timespan::MILLISECONDS); session_timeout = std::max(client_timeout, min_session_timeout); session_timeout = std::min(session_timeout, max_session_timeout); } catch (const Exception & e) /// Typical for an incorrect username, password, or address. { LOG_WARNING(log, "Cannot receive handshake {}", e.displayText()); return; } if (keeper_dispatcher->isServerActive()) { try { LOG_INFO(log, "Requesting session ID for the new client"); session_id = keeper_dispatcher->getSessionID(session_timeout.totalMilliseconds()); LOG_INFO(log, "Received session ID {}", session_id); } catch (const Exception & e) { LOG_WARNING(log, "Cannot receive session id {}", e.displayText()); sendHandshake(/* has_leader */ false, use_compression); return; } sendHandshake(/* has_leader */ true, use_compression); } else { LOG_WARNING(log, "Ignoring user request, because the server is not active yet"); sendHandshake(/* has_leader */ false, use_compression); return; } if (use_compression) { compressed_in.emplace(*in); compressed_out.emplace(*out, CompressionCodecFactory::instance().get("LZ4",{})); } auto response_fd = poll_wrapper->getResponseFD(); auto response_callback = [responses_ = this->responses, response_fd](const Coordination::ZooKeeperResponsePtr & response) { if (!responses_->push(response)) throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with xid {} and zxid {}", response->xid, response->zxid); UInt8 single_byte = 1; [[maybe_unused]] ssize_t result = write(response_fd, &single_byte, sizeof(single_byte)); }; keeper_dispatcher->registerSession(session_id, response_callback); Stopwatch logging_stopwatch; auto operation_max_ms = keeper_dispatcher->getKeeperContext()->getCoordinationSettings()->log_slow_connection_operation_threshold_ms; auto log_long_operation = [&](const String & operation) { auto elapsed_ms = logging_stopwatch.elapsedMilliseconds(); if (operation_max_ms < elapsed_ms) LOG_INFO(log, "{} for session {} took {} ms", operation, session_id, elapsed_ms); logging_stopwatch.restart(); }; session_stopwatch.start(); connected.store(true, std::memory_order_release); bool close_received = false; try { while (true) { using namespace std::chrono_literals; PollResult result = poll_wrapper->poll(session_timeout, *in); log_long_operation("Polling socket"); if (result.has_requests && !close_received) { if (in->eof()) { LOG_DEBUG(log, "Client closed connection, session id #{}", session_id); keeper_dispatcher->finishSession(session_id); break; } auto [received_op, received_xid] = receiveRequest(); packageReceived(); log_long_operation("Receiving request"); if (received_op == Coordination::OpNum::Close) { LOG_DEBUG(log, "Received close event with xid {} for session id #{}", received_xid, session_id); close_xid = received_xid; close_received = true; } else if (received_op == Coordination::OpNum::Heartbeat) { LOG_TRACE(log, "Received heartbeat for session #{}", session_id); } else operations[received_xid] = Poco::Timestamp(); /// Each request restarts session stopwatch session_stopwatch.restart(); } /// Process exact amount of responses from pipe /// otherwise state of responses queue and signaling pipe /// became inconsistent and race condition is possible. while (result.responses_count != 0) { Coordination::ZooKeeperResponsePtr response; if (!responses->tryPop(response)) throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have ready response, but queue is empty. It's a bug."); log_long_operation("Waiting for response to be ready"); if (response->xid == close_xid) { LOG_DEBUG(log, "Session #{} successfully closed", session_id); return; } updateStats(response); packageSent(); response->write(getWriteBuffer()); flushWriteBuffer(); log_long_operation("Sending response"); if (response->error == Coordination::Error::ZSESSIONEXPIRED) { LOG_DEBUG(log, "Session #{} expired because server shutting down or quorum is not alive", session_id); keeper_dispatcher->finishSession(session_id); return; } result.responses_count--; } if (result.error) throw Exception(ErrorCodes::SYSTEM_ERROR, "Exception happened while reading from socket"); if (session_stopwatch.elapsedMicroseconds() > static_cast(session_timeout.totalMicroseconds())) { LOG_DEBUG(log, "Session #{} expired", session_id); keeper_dispatcher->finishSession(session_id); break; } } } catch (const Exception & ex) { log_long_operation("Unknown operation"); LOG_TRACE(log, "Has {} responses in the queue", responses->size()); LOG_INFO(log, "Got exception processing session #{}: {}", session_id, getExceptionMessage(ex, true)); keeper_dispatcher->finishSession(session_id); } } bool KeeperTCPHandler::isHandShake(int32_t handshake_length) { return handshake_length == Coordination::CLIENT_HANDSHAKE_LENGTH || handshake_length == Coordination::CLIENT_HANDSHAKE_LENGTH_WITH_READONLY; } bool KeeperTCPHandler::tryExecuteFourLetterWordCmd(int32_t command) { if (!FourLetterCommandFactory::instance().isKnown(command)) { LOG_WARNING(log, "invalid four letter command {}", IFourLetterCommand::toName(command)); return false; } else if (!FourLetterCommandFactory::instance().isEnabled(command)) { LOG_WARNING(log, "Not enabled four letter command {}", IFourLetterCommand::toName(command)); return false; } else { auto command_ptr = FourLetterCommandFactory::instance().get(command); LOG_DEBUG(log, "Receive four letter command {}", command_ptr->name()); try { String res = command_ptr->run(); out->write(res.data(),res.size()); out->next(); } catch (...) { tryLogCurrentException(log, "Error when executing four letter command " + command_ptr->name()); } return true; } } WriteBuffer & KeeperTCPHandler::getWriteBuffer() { if (compressed_out) return *compressed_out; return *out; } void KeeperTCPHandler::flushWriteBuffer() { if (compressed_out) compressed_out->next(); out->next(); } ReadBuffer & KeeperTCPHandler::getReadBuffer() { if (compressed_in) return *compressed_in; return *in; } std::pair KeeperTCPHandler::receiveRequest() { auto & read_buffer = getReadBuffer(); int32_t length; Coordination::read(length, read_buffer); int32_t xid; Coordination::read(xid, read_buffer); Coordination::OpNum opnum; Coordination::read(opnum, read_buffer); Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum); request->xid = xid; request->readImpl(read_buffer); if (!keeper_dispatcher->putRequest(request, session_id)) throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id); return std::make_pair(opnum, xid); } void KeeperTCPHandler::packageSent() { conn_stats.incrementPacketsSent(); keeper_dispatcher->incrementPacketsSent(); } void KeeperTCPHandler::packageReceived() { conn_stats.incrementPacketsReceived(); keeper_dispatcher->incrementPacketsReceived(); } void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response) { /// update statistics ignoring watch response and heartbeat. if (response->xid != Coordination::WATCH_XID && response->getOpNum() != Coordination::OpNum::Heartbeat) { Int64 elapsed = (Poco::Timestamp() - operations[response->xid]); ProfileEvents::increment(ProfileEvents::KeeperTotalElapsedMicroseconds, elapsed); Int64 elapsed_ms = elapsed / 1000; conn_stats.updateLatency(elapsed_ms); operations.erase(response->xid); keeper_dispatcher->updateKeeperStatLatency(elapsed_ms); last_op.set(std::make_unique(LastOp{ .name = Coordination::toString(response->getOpNum()), .last_cxid = response->xid, .last_zxid = response->zxid, .last_response_time = Poco::Timestamp().epochMicroseconds() / 1000, })); } } KeeperConnectionStats & KeeperTCPHandler::getConnectionStats() { return conn_stats; } void KeeperTCPHandler::dumpStats(WriteBufferFromOwnString & buf, bool brief) { if (!connected.load(std::memory_order_acquire)) return; auto & stats = getConnectionStats(); writeText(' ', buf); writeText(socket().peerAddress().toString(), buf); writeText("(recved=", buf); writeIntText(stats.getPacketsReceived(), buf); writeText(",sent=", buf); writeIntText(stats.getPacketsSent(), buf); if (!brief) { if (session_id != 0) { writeText(",sid=0x", buf); writeText(getHexUIntLowercase(session_id), buf); writeText(",lop=", buf); LastOpPtr op = last_op.get(); writeText(op->name, buf); writeText(",est=", buf); writeIntText(established.epochMicroseconds() / 1000, buf); writeText(",to=", buf); writeIntText(session_timeout.totalMilliseconds(), buf); int64_t last_cxid = op->last_cxid; if (last_cxid >= 0) { writeText(",lcxid=0x", buf); writeText(getHexUIntLowercase(last_cxid), buf); } writeText(",lzxid=0x", buf); writeText(getHexUIntLowercase(op->last_zxid), buf); writeText(",lresp=", buf); writeIntText(op->last_response_time, buf); writeText(",llat=", buf); writeIntText(stats.getLastLatency(), buf); writeText(",minlat=", buf); writeIntText(stats.getMinLatency(), buf); writeText(",avglat=", buf); writeIntText(stats.getAvgLatency(), buf); writeText(",maxlat=", buf); writeIntText(stats.getMaxLatency(), buf); } } writeText(')', buf); writeText('\n', buf); } void KeeperTCPHandler::resetStats() { conn_stats.reset(); last_op.set(std::make_unique(EMPTY_LAST_OP)); } KeeperTCPHandler::~KeeperTCPHandler() { KeeperTCPHandler::unregisterConnection(this); } std::mutex KeeperTCPHandler::conns_mutex; std::unordered_set KeeperTCPHandler::connections; void KeeperTCPHandler::registerConnection(KeeperTCPHandler * conn) { std::lock_guard lock(conns_mutex); connections.insert(conn); } void KeeperTCPHandler::unregisterConnection(KeeperTCPHandler * conn) { std::lock_guard lock(conns_mutex); connections.erase(conn); } void KeeperTCPHandler::dumpConnections(WriteBufferFromOwnString & buf, bool brief) { std::lock_guard lock(conns_mutex); for (auto * conn : connections) { conn->dumpStats(buf, brief); } } void KeeperTCPHandler::resetConnsStats() { std::lock_guard lock(conns_mutex); for (auto * conn : connections) { conn->resetStats(); } } } #endif