ClickHouse/src/Storages/FileLog/DirectoryWatcherBase.cpp

155 lines
5.3 KiB
C++
Raw Normal View History

2021-10-10 16:31:32 +00:00
#include <Interpreters/Context.h>
#include <Storages/FileLog/DirectoryWatcherBase.h>
#include <Storages/FileLog/FileLogDirectoryWatcher.h>
#include <filesystem>
#include <unistd.h>
#include <sys/inotify.h>
2021-10-13 09:10:41 +00:00
#include <sys/poll.h>
2021-10-10 16:31:32 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
2021-10-17 14:49:27 +00:00
extern const int BAD_FILE_TYPE;
2021-10-11 04:06:31 +00:00
extern const int IO_SETUP_ERROR;
2021-10-10 16:31:32 +00:00
}
2021-10-14 05:06:21 +00:00
static constexpr int buffer_size = 4096;
2021-10-11 04:06:31 +00:00
DirectoryWatcherBase::DirectoryWatcherBase(
FileLogDirectoryWatcher & owner_, const std::string & path_, ContextPtr context_, int event_mask_)
2021-10-17 14:49:27 +00:00
: WithContext(context_), owner(owner_), path(path_), event_mask(event_mask_)
2021-10-10 16:31:32 +00:00
{
if (!std::filesystem::exists(path))
2021-10-17 14:49:27 +00:00
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Path {} does not exist", path);
2021-10-10 16:31:32 +00:00
if (!std::filesystem::is_directory(path))
2021-10-17 14:49:27 +00:00
throw Exception(ErrorCodes::BAD_FILE_TYPE, "Path {} is not a directory", path);
2021-10-10 16:31:32 +00:00
fd = inotify_init();
if (fd == -1)
2021-10-13 05:23:00 +00:00
throw Exception("Cannot initialize inotify", ErrorCodes::IO_SETUP_ERROR);
2021-10-11 04:06:31 +00:00
2021-10-17 14:49:27 +00:00
watch_task = getContext()->getSchedulePool().createTask("directory_watch", [this] { watchFunc(); });
2021-10-10 16:31:32 +00:00
start();
}
void DirectoryWatcherBase::watchFunc()
{
int mask = 0;
if (eventMask() & DirectoryWatcherBase::DW_ITEM_ADDED)
mask |= IN_CREATE;
if (eventMask() & DirectoryWatcherBase::DW_ITEM_REMOVED)
mask |= IN_DELETE;
if (eventMask() & DirectoryWatcherBase::DW_ITEM_MODIFIED)
mask |= IN_MODIFY;
if (eventMask() & DirectoryWatcherBase::DW_ITEM_MOVED_FROM)
mask |= IN_MOVED_FROM;
if (eventMask() & DirectoryWatcherBase::DW_ITEM_MOVED_TO)
mask |= IN_MOVED_TO;
int wd = inotify_add_watch(fd, path.c_str(), mask);
if (wd == -1)
{
2021-10-13 05:23:00 +00:00
owner.onError(Exception(ErrorCodes::IO_SETUP_ERROR, "Watch directory {} failed", path));
2021-10-10 16:31:32 +00:00
}
std::string buffer;
2021-10-11 04:06:31 +00:00
buffer.resize(buffer_size);
2021-10-13 09:10:41 +00:00
pollfd pfd;
pfd.fd = fd;
pfd.events = POLLIN;
2021-10-11 04:06:31 +00:00
while (!stopped)
2021-10-10 16:31:32 +00:00
{
2021-10-17 14:49:27 +00:00
const auto & settings = owner.storage.getFileLogSettings();
2021-10-14 05:06:21 +00:00
if (poll(&pfd, 1, 500) > 0 && pfd.revents & POLLIN)
2021-10-10 16:31:32 +00:00
{
int n = read(fd, buffer.data(), buffer.size());
int i = 0;
if (n > 0)
{
while (n > 0)
{
struct inotify_event * p_event = reinterpret_cast<struct inotify_event *>(buffer.data() + i);
if (p_event->len > 0)
{
if ((p_event->mask & IN_CREATE) && (eventMask() & DirectoryWatcherBase::DW_ITEM_ADDED))
{
DirectoryWatcherBase::DirectoryEvent ev(p_event->name, DirectoryWatcherBase::DW_ITEM_ADDED);
owner.onItemAdded(ev);
}
if ((p_event->mask & IN_DELETE) && (eventMask() & DirectoryWatcherBase::DW_ITEM_REMOVED))
{
DirectoryWatcherBase::DirectoryEvent ev(p_event->name, DirectoryWatcherBase::DW_ITEM_REMOVED);
owner.onItemRemoved(ev);
}
if ((p_event->mask & IN_MODIFY) && (eventMask() & DirectoryWatcherBase::DW_ITEM_MODIFIED))
{
DirectoryWatcherBase::DirectoryEvent ev(p_event->name, DirectoryWatcherBase::DW_ITEM_MODIFIED);
owner.onItemModified(ev);
}
if ((p_event->mask & IN_MOVED_FROM) && (eventMask() & DirectoryWatcherBase::DW_ITEM_MOVED_FROM))
{
DirectoryWatcherBase::DirectoryEvent ev(p_event->name, DirectoryWatcherBase::DW_ITEM_MOVED_FROM);
owner.onItemMovedFrom(ev);
}
if ((p_event->mask & IN_MOVED_TO) && (eventMask() & DirectoryWatcherBase::DW_ITEM_MOVED_TO))
{
DirectoryWatcherBase::DirectoryEvent ev(p_event->name, DirectoryWatcherBase::DW_ITEM_MOVED_TO);
owner.onItemMovedTo(ev);
}
}
i += sizeof(inotify_event) + p_event->len;
n -= sizeof(inotify_event) + p_event->len;
}
}
2021-10-17 14:49:27 +00:00
/// Wake up reader thread
auto & mutex = owner.storage.getMutex();
auto & cv = owner.storage.getConditionVariable();
std::unique_lock<std::mutex> lock(mutex);
owner.storage.setNewEvents();
lock.unlock();
cv.notify_one();
}
else
{
if (milliseconds_to_wait < static_cast<uint64_t>(settings->poll_directory_watch_events_backoff_max.totalMilliseconds()))
milliseconds_to_wait *= settings->poll_directory_watch_events_backoff_factor.value;
break;
2021-10-10 16:31:32 +00:00
}
}
2021-10-17 14:49:27 +00:00
if (!stopped)
watch_task->scheduleAfter(milliseconds_to_wait);
2021-10-10 16:31:32 +00:00
}
DirectoryWatcherBase::~DirectoryWatcherBase()
{
stop();
close(fd);
2021-10-17 14:49:27 +00:00
if (watch_task)
watch_task->deactivate();
2021-10-10 16:31:32 +00:00
}
void DirectoryWatcherBase::start()
{
if (watch_task)
watch_task->activateAndSchedule();
}
void DirectoryWatcherBase::stop()
{
2021-10-11 04:06:31 +00:00
stopped = true;
2021-10-10 16:31:32 +00:00
if (watch_task)
watch_task->deactivate();
}
}