ClickHouse/src/Storages/FileLog/FileLogDirectoryWatcher.cpp

134 lines
3.7 KiB
C++
Raw Normal View History

2021-07-03 17:14:56 +00:00
#include <Storages/FileLog/FileLogDirectoryWatcher.h>
2021-10-10 16:31:32 +00:00
namespace DB
{
2021-07-03 17:14:56 +00:00
FileLogDirectoryWatcher::FileLogDirectoryWatcher(const std::string & path_)
2021-10-10 16:31:32 +00:00
: path(path_), dw(std::make_unique<DirectoryWatcherBase>(*this, path)), log(&Poco::Logger::get("DirectoryIterator (" + path + ")"))
2021-07-03 17:14:56 +00:00
{
}
2021-09-21 16:11:35 +00:00
FileLogDirectoryWatcher::Events FileLogDirectoryWatcher::getEventsAndReset()
2021-07-03 17:14:56 +00:00
{
std::lock_guard<std::mutex> lock(mutex);
Events res;
res.swap(events);
return res;
}
2021-09-21 16:11:35 +00:00
FileLogDirectoryWatcher::Error FileLogDirectoryWatcher::getErrorAndReset()
2021-07-03 17:14:56 +00:00
{
2021-09-21 16:11:35 +00:00
std::lock_guard<std::mutex> lock(mutex);
Error old_error = error;
error = {};
return old_error;
2021-07-03 17:14:56 +00:00
}
const std::string & FileLogDirectoryWatcher::getPath() const
{
return path;
}
2021-10-10 16:31:32 +00:00
void FileLogDirectoryWatcher::onItemAdded(const DirectoryWatcherBase::DirectoryEvent & ev)
2021-07-03 17:14:56 +00:00
{
std::lock_guard<std::mutex> lock(mutex);
2021-10-03 17:41:08 +00:00
2021-10-01 05:06:19 +00:00
EventInfo info{ev.event, "onItemAdded"};
2021-10-10 16:31:32 +00:00
std::string event_path = ev.path;
2021-10-03 17:41:08 +00:00
if (auto it = events.find(event_path); it != events.end())
{
it->second.emplace_back(info);
}
else
{
events.emplace(event_path, std::vector<EventInfo>{info});
}
2021-07-03 17:14:56 +00:00
}
2021-10-10 16:31:32 +00:00
void FileLogDirectoryWatcher::onItemRemoved(const DirectoryWatcherBase::DirectoryEvent & ev)
2021-07-03 17:14:56 +00:00
{
std::lock_guard<std::mutex> lock(mutex);
2021-10-03 17:41:08 +00:00
2021-10-01 05:06:19 +00:00
EventInfo info{ev.event, "onItemRemoved"};
2021-10-10 16:31:32 +00:00
std::string event_path = ev.path;
2021-10-03 17:41:08 +00:00
if (auto it = events.find(event_path); it != events.end())
{
it->second.emplace_back(info);
}
else
{
events.emplace(event_path, std::vector<EventInfo>{info});
}
2021-07-03 17:14:56 +00:00
}
2021-10-01 05:06:19 +00:00
/// Optimize for MODIFY event, during a streamToViews period, since the log files
/// are append only, there are may a lots of MODIFY events produced for one file.
/// For example, appending 10000 logs into one file will result in 10000 MODIFY event.
/// So, if we record all of these events, it will use a lot of memory, and then we
/// need to handle it one by one in StorageFileLog::updateFileInfos, this is unnecessary
/// because it is equal to just record and handle one MODIY event
2021-10-10 16:31:32 +00:00
void FileLogDirectoryWatcher::onItemModified(const DirectoryWatcherBase::DirectoryEvent & ev)
2021-07-03 17:14:56 +00:00
{
std::lock_guard<std::mutex> lock(mutex);
2021-10-03 17:41:08 +00:00
2021-10-10 16:31:32 +00:00
auto event_path = ev.path;
2021-10-01 05:06:19 +00:00
EventInfo info{ev.event, "onItemModified"};
2021-10-03 17:41:08 +00:00
/// Already have MODIFY event for this file
if (auto it = events.find(event_path); it != events.end())
{
if (it->second.back().type == ev.event)
return;
else
it->second.emplace_back(info);
}
else
{
events.emplace(event_path, std::vector<EventInfo>{info});
}
2021-07-03 17:14:56 +00:00
}
2021-10-10 16:31:32 +00:00
void FileLogDirectoryWatcher::onItemMovedFrom(const DirectoryWatcherBase::DirectoryEvent & ev)
2021-07-03 17:14:56 +00:00
{
std::lock_guard<std::mutex> lock(mutex);
2021-10-03 17:41:08 +00:00
2021-10-01 05:06:19 +00:00
EventInfo info{ev.event, "onItemMovedFrom"};
2021-10-10 16:31:32 +00:00
std::string event_path = ev.path;
2021-10-03 17:41:08 +00:00
if (auto it = events.find(event_path); it != events.end())
{
it->second.emplace_back(info);
}
else
{
events.emplace(event_path, std::vector<EventInfo>{info});
}
2021-07-03 17:14:56 +00:00
}
2021-10-10 16:31:32 +00:00
void FileLogDirectoryWatcher::onItemMovedTo(const DirectoryWatcherBase::DirectoryEvent & ev)
2021-07-03 17:14:56 +00:00
{
std::lock_guard<std::mutex> lock(mutex);
2021-10-03 17:41:08 +00:00
2021-10-01 05:06:19 +00:00
EventInfo info{ev.event, "onItemMovedTo"};
2021-10-10 16:31:32 +00:00
std::string event_path = ev.path;
2021-10-03 17:41:08 +00:00
if (auto it = events.find(event_path); it != events.end())
{
it->second.emplace_back(info);
}
else
{
events.emplace(event_path, std::vector<EventInfo>{info});
}
2021-07-03 17:14:56 +00:00
}
2021-10-10 16:31:32 +00:00
void FileLogDirectoryWatcher::onError(const Exception & e)
2021-07-03 17:14:56 +00:00
{
2021-09-21 16:11:35 +00:00
std::lock_guard<std::mutex> lock(mutex);
LOG_ERROR(log, "Error happened during watching directory {}: {}", path, error.error_msg);
error.has_error = true;
error.error_msg = e.message();
2021-07-03 17:14:56 +00:00
}
2021-10-10 16:31:32 +00:00
}