diff --git a/contrib/libpoco/Util/include/Poco/Util/ServerApplication.h b/contrib/libpoco/Util/include/Poco/Util/ServerApplication.h index bcb23fa55b9..aaf4261e2fb 100644 --- a/contrib/libpoco/Util/include/Poco/Util/ServerApplication.h +++ b/contrib/libpoco/Util/include/Poco/Util/ServerApplication.h @@ -163,7 +163,7 @@ public: protected: int run(); - void waitForTerminationRequest(); + virtual void waitForTerminationRequest(); #if !defined(_WIN32_WCE) void defineOptions(OptionSet& options); #endif diff --git a/dbms/include/DB/Client/Connection.h b/dbms/include/DB/Client/Connection.h index 21f9315c26e..e2b2f0da71c 100644 --- a/dbms/include/DB/Client/Connection.h +++ b/dbms/include/DB/Client/Connection.h @@ -129,20 +129,10 @@ public: void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & revision); /// Для сообщений в логе и в эксепшенах. - const String & getDescription() const - { - return description; - } - - const String & getHost() const - { - return host; - } - - UInt16 getPort() const - { - return port; - } + const String & getDescription() const; + const String & getHost() const; + UInt16 getPort() const; + const String & getDefaultDatabase() const; /// Если последний флаг true, то затем необходимо вызвать sendExternalTablesData void sendQuery(const String & query, const String & query_id_ = "", UInt64 stage = QueryProcessingStage::Complete, diff --git a/dbms/include/DB/Common/Exception.h b/dbms/include/DB/Common/Exception.h index b9514ef885f..2c710f22517 100644 --- a/dbms/include/DB/Common/Exception.h +++ b/dbms/include/DB/Common/Exception.h @@ -88,7 +88,7 @@ void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::str std::string getExceptionMessage(std::exception_ptr e, bool with_stacktrace); -void rethrowFirstException(Exceptions & exceptions); +void rethrowFirstException(const Exceptions & exceptions); std::unique_ptr convertCurrentException(); diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 7a414a934ca..6a3e063e966 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -142,12 +142,30 @@ void Connection::receiveHello() } } - void Connection::setDefaultDatabase(const String & database) { default_database = database; } +const String & Connection::getDefaultDatabase() const +{ + return default_database; +} + +const String & Connection::getDescription() const +{ + return description; +} + +const String & Connection::getHost() const +{ + return host; +} + +UInt16 Connection::getPort() const +{ + return port; +} void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & revision) { diff --git a/dbms/src/Common/Exception.cpp b/dbms/src/Common/Exception.cpp index e0c965a03b4..4af5db6d3e2 100644 --- a/dbms/src/Common/Exception.cpp +++ b/dbms/src/Common/Exception.cpp @@ -139,7 +139,7 @@ std::unique_ptr convertCurrentException() } -void rethrowFirstException(Exceptions & exceptions) +void rethrowFirstException(const Exceptions & exceptions) { for (size_t i = 0, size = exceptions.size(); i < size; ++i) if (exceptions[i]) diff --git a/libs/libcommon/src/create_revision.sh b/libs/libcommon/src/create_revision.sh index 383032a475b..db28d0e5350 100755 --- a/libs/libcommon/src/create_revision.sh +++ b/libs/libcommon/src/create_revision.sh @@ -44,6 +44,7 @@ else # нужно для stash или неполноценной копии репозитория revision="77777" fi + revision=$(echo $revision | sed 's/\([0-9]*\)[^0-9]*/\1/') echo " #ifndef REVISION diff --git a/libs/libdaemon/include/daemon/BaseDaemon.h b/libs/libdaemon/include/daemon/BaseDaemon.h index 93d2c15f2ad..6a5ba88e2ad 100644 --- a/libs/libdaemon/include/daemon/BaseDaemon.h +++ b/libs/libdaemon/include/daemon/BaseDaemon.h @@ -52,6 +52,8 @@ namespace Poco { class TaskManager; } class BaseDaemon : public Poco::Util::ServerApplication { + friend class SignalListener; + public: BaseDaemon(); ~BaseDaemon(); @@ -132,6 +134,14 @@ protected: /// Используется при exitOnTaskError() void handleNotification(Poco::TaskFailedNotification *); + /// thread safe + virtual void handleSignal(int signal_id); + + /// реализация обработки сигналов завершения через pipe не требует блокировки сигнала с помощью sigprocmask во всех потоках + void waitForTerminationRequest() override; + /// thread safe + virtual void onInterruptSignals(int signal_id); + std::unique_ptr task_manager; /// Создание и автоматическое удаление pid файла. @@ -156,8 +166,7 @@ protected: PID pid; - /// Получен ли сигнал на завершение? Этот флаг устанавливается в BaseDaemonApplication. - bool is_cancelled = false; + std::atomic_bool is_cancelled{false}; /// Флаг устанавливается по сообщению из Task (при аварийном завершении). bool task_failed = false; @@ -179,4 +188,8 @@ protected: std::unique_ptr graphite_writer; boost::optional layer; + + std::mutex signal_handler_mutex; + std::condition_variable signal_event; + size_t terminate_signals_counter = 0; }; diff --git a/libs/libdaemon/src/BaseDaemon.cpp b/libs/libdaemon/src/BaseDaemon.cpp index 1de0a5e025d..80e427b856b 100644 --- a/libs/libdaemon/src/BaseDaemon.cpp +++ b/libs/libdaemon/src/BaseDaemon.cpp @@ -131,9 +131,9 @@ static void call_default_signal_handler(int sig) using ThreadNumber = decltype(Poco::ThreadNumber::get()); static const size_t buf_size = sizeof(int) + sizeof(siginfo_t) + sizeof(ucontext_t) + sizeof(ThreadNumber); +using signal_function = void(int, siginfo_t*, void*); -/** Обработчик сигналов HUP / USR1 */ -static void close_logs_signal_handler(int sig, siginfo_t * info, void * context) +static void writeSignalIDtoSignalPipe(int sig) { char buf[buf_size]; DB::WriteBufferFromFileDescriptor out(signal_pipe.write_fd, buf_size, buf); @@ -141,10 +141,21 @@ static void close_logs_signal_handler(int sig, siginfo_t * info, void * context) out.next(); } +/** Обработчик сигналов HUP / USR1 */ +static void closeLogsSignalHandler(int sig, siginfo_t * info, void * context) +{ + writeSignalIDtoSignalPipe(sig); +} + +static void terminateRequestedSignalHandler(int sig, siginfo_t * info, void * context) +{ + writeSignalIDtoSignalPipe(sig); +} + /** Обработчик некоторых сигналов. Выводит информацию в лог (если получится). */ -static void fault_signal_handler(int sig, siginfo_t * info, void * context) +static void faultSignalHandler(int sig, siginfo_t * info, void * context) { char buf[buf_size]; DB::WriteBufferFromFileDescriptor out(signal_pipe.write_fd, buf_size, buf); @@ -174,7 +185,9 @@ static bool already_printed_stack_trace = false; class SignalListener : public Poco::Runnable { public: - SignalListener() : log(&Logger::get("BaseDaemon")) + SignalListener(BaseDaemon & daemon_) + : log(&Logger::get("BaseDaemon")) + , daemon(daemon_) { } @@ -204,6 +217,12 @@ public: onTerminate(message, thread_num); } + else if (sig == SIGINT || + sig == SIGQUIT || + sig == SIGTERM) + { + daemon.handleSignal(sig); + } else { siginfo_t info; @@ -221,8 +240,9 @@ public: private: Logger * log; + BaseDaemon & daemon; - +private: void onTerminate(const std::string & message, ThreadNumber thread_num) const { LOG_ERROR(log, "(from thread " << thread_num << ") " << message); @@ -739,42 +759,31 @@ void BaseDaemon::initialize(Application& self) std::set_terminate(terminate_handler); /// Ставим обработчики сигналов - struct sigaction sa; - memset(&sa, 0, sizeof(sa)); - sa.sa_sigaction = fault_signal_handler; - sa.sa_flags = SA_SIGINFO; + auto add_signal_handler = + [](const std::vector & signals, signal_function handler) + { + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sa.sa_sigaction = handler; + sa.sa_flags = SA_SIGINFO; - { - int signals[] = {SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, 0}; + { + if (sigemptyset(&sa.sa_mask)) + throw Poco::Exception("Cannot set signal handler."); - if (sigemptyset(&sa.sa_mask)) - throw Poco::Exception("Cannot set signal handler."); + for (auto signal : signals) + if (sigaddset(&sa.sa_mask, signal)) + throw Poco::Exception("Cannot set signal handler."); - for (size_t i = 0; signals[i]; ++i) - if (sigaddset(&sa.sa_mask, signals[i])) - throw Poco::Exception("Cannot set signal handler."); + for (auto signal : signals) + if (sigaction(signal, &sa, 0)) + throw Poco::Exception("Cannot set signal handler."); + } + }; - for (size_t i = 0; signals[i]; ++i) - if (sigaction(signals[i], &sa, 0)) - throw Poco::Exception("Cannot set signal handler."); - } - - sa.sa_sigaction = close_logs_signal_handler; - - { - int signals[] = {SIGHUP, SIGUSR1, 0}; - - if (sigemptyset(&sa.sa_mask)) - throw Poco::Exception("Cannot set signal handler."); - - for (size_t i = 0; signals[i]; ++i) - if (sigaddset(&sa.sa_mask, signals[i])) - throw Poco::Exception("Cannot set signal handler."); - - for (size_t i = 0; signals[i]; ++i) - if (sigaction(signals[i], &sa, 0)) - throw Poco::Exception("Cannot set signal handler."); - } + add_signal_handler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE}, faultSignalHandler); + add_signal_handler({SIGHUP, SIGUSR1}, closeLogsSignalHandler); + add_signal_handler({SIGINT, SIGQUIT, SIGTERM}, terminateRequestedSignalHandler); /// Ставим ErrorHandler для потоков static KillingErrorHandler killing_error_handler; @@ -783,7 +792,7 @@ void BaseDaemon::initialize(Application& self) /// Выведем ревизию демона logRevision(); - signal_listener.reset(new SignalListener); + signal_listener.reset(new SignalListener(*this)); signal_listener_thread.start(*signal_listener); graphite_writer.reset(new GraphiteWriter("graphite")); @@ -890,3 +899,35 @@ void BaseDaemon::PID::clear() file.clear(); } } + +void BaseDaemon::handleSignal(int signal_id) +{ + if (signal_id == SIGINT || + signal_id == SIGQUIT || + signal_id == SIGTERM) + { + std::unique_lock lock(signal_handler_mutex); + { + ++terminate_signals_counter; + signal_event.notify_all(); + } + + onInterruptSignals(signal_id); + } + else + throw DB::Exception(std::string("Unsupported signal: ") + strsignal(signal_id)); +} + +void BaseDaemon::onInterruptSignals(int signal_id) +{ + is_cancelled = true; + LOG_INFO(&logger(), "Received termination signal(" << strsignal(signal_id) << ")"); +} + + +void BaseDaemon::waitForTerminationRequest() +{ + std::unique_lock lock(signal_handler_mutex); + signal_event.wait(lock, [this](){ return terminate_signals_counter > 0; }); +} + diff --git a/libs/libzkutil/CMakeLists.txt b/libs/libzkutil/CMakeLists.txt index 491a410b9bf..7be8268f3f7 100644 --- a/libs/libzkutil/CMakeLists.txt +++ b/libs/libzkutil/CMakeLists.txt @@ -16,6 +16,6 @@ add_library(zkutil include/zkutil/Types.h include/zkutil/ZooKeeperHolder.h) -target_link_libraries(zkutil libzookeeper_mt.a pthread) +target_link_libraries(zkutil libzookeeper_mt.a pthread PocoFoundation) add_subdirectory (src) diff --git a/utils/zookeeper-dump-tree/main.cpp b/utils/zookeeper-dump-tree/main.cpp index 91c3e8211a0..59be841a374 100644 --- a/utils/zookeeper-dump-tree/main.cpp +++ b/utils/zookeeper-dump-tree/main.cpp @@ -12,6 +12,8 @@ struct CallbackState { std::string path; std::list::const_iterator it; + std::list::const_iterator> children; + int64_t dataLength = 0; }; using CallbackStates = std::list; @@ -19,10 +21,11 @@ CallbackStates states; zkutil::ZooKeeper * zookeeper; +int running_count = 0; Poco::Event completed; -void process(const CallbackState & state); +void process(CallbackState & state); void callback( int rc, @@ -30,13 +33,16 @@ void callback( const Stat * stat, const void * data) { - const CallbackState * state = reinterpret_cast(data); + CallbackState * state = reinterpret_cast(const_cast(data)); if (rc != ZOK && rc != ZNONODE) { std::cerr << zerror(rc) << ", path: " << state->path << "\n"; } + if (stat != nullptr) + state->dataLength = stat->dataLength; + if (rc == ZOK && strings) { for (int32_t i = 0; i < strings->count; ++i) @@ -44,23 +50,38 @@ void callback( states.emplace_back(); states.back().path = state->path + (state->path == "/" ? "" : "/") + strings->data[i]; states.back().it = --states.end(); + state->children.push_back(states.back().it); - std::cout << states.back().path << '\n'; process(states.back()); } } - states.erase(state->it); - - if (states.empty()) + --running_count; + if (running_count == 0) completed.set(); } -void process(const CallbackState & state) +void process(CallbackState & state) { + ++running_count; zoo_awget_children2(zookeeper->getHandle(), state.path.data(), nullptr, nullptr, callback, &state); } +typedef std::pair NodesBytes; + +NodesBytes printTree(const CallbackState & state) +{ + int64_t nodes = 1; + int64_t bytes = state.dataLength; + for (auto child : state.children) + { + NodesBytes nodesBytes = printTree(*child); + nodes += nodesBytes.first; + bytes += nodesBytes.second; + } + std::cout << state.path << '\t' << nodes << '\t' << bytes <<'\n'; + return NodesBytes(nodes, bytes); +} int main(int argc, char ** argv) try @@ -95,6 +116,8 @@ try process(states.back()); completed.wait(); + + printTree(*states.begin()); } catch (...) {