diff --git a/src/Storages/FileLog/DirectoryWatcherBase.cpp b/src/Storages/FileLog/DirectoryWatcherBase.cpp index 8209483fac9..f1cf0866de7 100644 --- a/src/Storages/FileLog/DirectoryWatcherBase.cpp +++ b/src/Storages/FileLog/DirectoryWatcherBase.cpp @@ -34,8 +34,8 @@ DirectoryWatcherBase::DirectoryWatcherBase( if (!std::filesystem::is_directory(path)) throw Exception(ErrorCodes::BAD_FILE_TYPE, "Path {} is not a directory", path); - fd = inotify_init(); - if (fd == -1) + inotify_fd = inotify_init(); + if (inotify_fd == -1) throw ErrnoException(ErrorCodes::IO_SETUP_ERROR, "Cannot initialize inotify"); watch_task = getContext()->getSchedulePool().createTask("directory_watch", [this] { watchFunc(); }); @@ -56,7 +56,7 @@ void DirectoryWatcherBase::watchFunc() if (eventMask() & DirectoryWatcherBase::DW_ITEM_MOVED_TO) mask |= IN_MOVED_TO; - int wd = inotify_add_watch(fd, path.c_str(), mask); + int wd = inotify_add_watch(inotify_fd, path.c_str(), mask); if (wd == -1) { owner.onError(Exception(ErrorCodes::IO_SETUP_ERROR, "Watch directory {} failed", path)); @@ -65,16 +65,20 @@ void DirectoryWatcherBase::watchFunc() std::string buffer; buffer.resize(buffer_size); - pollfd pfd; - pfd.fd = fd; - pfd.events = POLLIN; + pollfd pfds[2]; + /// inotify descriptor + pfds[0].fd = inotify_fd; + pfds[0].events = POLLIN; + // notifier + pfds[1].fd = event_pipe.fds_rw[0]; + pfds[1].events = POLLIN; while (!stopped) { const auto & settings = owner.storage.getFileLogSettings(); - if (poll(&pfd, 1, static_cast(milliseconds_to_wait)) > 0 && pfd.revents & POLLIN) + if (poll(pfds, 2, static_cast(milliseconds_to_wait)) > 0 && pfds[0].revents & POLLIN) { milliseconds_to_wait = settings->poll_directory_watch_events_backoff_init.totalMilliseconds(); - ssize_t n = read(fd, buffer.data(), buffer.size()); + ssize_t n = read(inotify_fd, buffer.data(), buffer.size()); int i = 0; if (n > 0) { @@ -130,7 +134,7 @@ void DirectoryWatcherBase::watchFunc() DirectoryWatcherBase::~DirectoryWatcherBase() { stop(); - int err = ::close(fd); + int err = ::close(inotify_fd); chassert(!err || errno == EINTR); } @@ -143,6 +147,7 @@ void DirectoryWatcherBase::start() void DirectoryWatcherBase::stop() { stopped = true; + ::write(event_pipe.fds_rw[1], "\0", 1); if (watch_task) watch_task->deactivate(); } diff --git a/src/Storages/FileLog/DirectoryWatcherBase.h b/src/Storages/FileLog/DirectoryWatcherBase.h index a640f686c8a..0dfb58fbc5c 100644 --- a/src/Storages/FileLog/DirectoryWatcherBase.h +++ b/src/Storages/FileLog/DirectoryWatcherBase.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -85,10 +86,6 @@ public: void watchFunc(); -protected: - void start(); - void stop(); - private: FileLogDirectoryWatcher & owner; @@ -102,7 +99,11 @@ private: int event_mask; uint64_t milliseconds_to_wait; - int fd; + int inotify_fd; + PipeFDs event_pipe; + + void start(); + void stop(); }; }