diff --git a/src/Storages/FileLog/DirectoryWatcherBase.cpp b/src/Storages/FileLog/DirectoryWatcherBase.cpp index cad2072a8e6..b5d914f700d 100644 --- a/src/Storages/FileLog/DirectoryWatcherBase.cpp +++ b/src/Storages/FileLog/DirectoryWatcherBase.cpp @@ -1,6 +1,8 @@ #include #include #include +#include +#include #include #include @@ -20,7 +22,11 @@ static constexpr int buffer_size = 4096; DirectoryWatcherBase::DirectoryWatcherBase( FileLogDirectoryWatcher & owner_, const std::string & path_, ContextPtr context_, int event_mask_) - : WithContext(context_), owner(owner_), path(path_), event_mask(event_mask_) + : WithContext(context_) + , owner(owner_) + , path(path_) + , event_mask(event_mask_) + , milliseconds_to_wait(owner.storage.getFileLogSettings()->poll_directory_watch_events_backoff_init.totalMilliseconds()) { if (!std::filesystem::exists(path)) throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Path {} does not exist", path); @@ -64,8 +70,9 @@ void DirectoryWatcherBase::watchFunc() while (!stopped) { const auto & settings = owner.storage.getFileLogSettings(); - if (poll(&pfd, 1, 500) > 0 && pfd.revents & POLLIN) + if (poll(&pfd, 1, milliseconds_to_wait) > 0 && pfd.revents & POLLIN) { + milliseconds_to_wait = settings->poll_directory_watch_events_backoff_init.totalMilliseconds(); int n = read(fd, buffer.data(), buffer.size()); int i = 0; if (n > 0) @@ -109,33 +116,20 @@ void DirectoryWatcherBase::watchFunc() } /// Wake up reader thread - auto & mutex = owner.storage.getMutex(); - auto & cv = owner.storage.getConditionVariable(); - std::unique_lock lock(mutex); - owner.storage.setNewEvents(); - lock.unlock(); - cv.notify_one(); + owner.storage.wakeUp(); } - else - { - if (milliseconds_to_wait < static_cast(settings->poll_directory_watch_events_backoff_max.totalMilliseconds())) - milliseconds_to_wait *= settings->poll_directory_watch_events_backoff_factor.value; - break; + else + { + if (milliseconds_to_wait < static_cast(settings->poll_directory_watch_events_backoff_max.totalMilliseconds())) + milliseconds_to_wait *= settings->poll_directory_watch_events_backoff_factor.value; } } - - if (!stopped) - watch_task->scheduleAfter(milliseconds_to_wait); } - DirectoryWatcherBase::~DirectoryWatcherBase() { stop(); close(fd); - - if (watch_task) - watch_task->deactivate(); } void DirectoryWatcherBase::start() diff --git a/src/Storages/FileLog/DirectoryWatcherBase.h b/src/Storages/FileLog/DirectoryWatcherBase.h index 88f864ac17d..a640f686c8a 100644 --- a/src/Storages/FileLog/DirectoryWatcherBase.h +++ b/src/Storages/FileLog/DirectoryWatcherBase.h @@ -97,10 +97,11 @@ private: std::atomic stopped{false}; - uint64_t milliseconds_to_wait; const std::string path; int event_mask; + uint64_t milliseconds_to_wait; + int fd; }; diff --git a/src/Storages/FileLog/FileLogDirectoryWatcher.cpp b/src/Storages/FileLog/FileLogDirectoryWatcher.cpp index c4a153dbfe7..192721f9f3c 100644 --- a/src/Storages/FileLog/FileLogDirectoryWatcher.cpp +++ b/src/Storages/FileLog/FileLogDirectoryWatcher.cpp @@ -78,9 +78,9 @@ void FileLogDirectoryWatcher::onItemModified(DirectoryWatcherBase::DirectoryEven auto event_path = ev.path; EventInfo info{ev.event, "onItemModified"}; - /// Already have MODIFY event for this file if (auto it = events.find(event_path); it != events.end()) { + /// Already have MODIFY event for this file if (it->second.received_modification_event) return; else diff --git a/src/Storages/FileLog/FileLogDirectoryWatcher.h b/src/Storages/FileLog/FileLogDirectoryWatcher.h index 8e0ecf5358c..0b0c86397aa 100644 --- a/src/Storages/FileLog/FileLogDirectoryWatcher.h +++ b/src/Storages/FileLog/FileLogDirectoryWatcher.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include @@ -10,6 +9,7 @@ namespace DB { +class StorageFileLog; class FileLogDirectoryWatcher { diff --git a/src/Storages/FileLog/ReadBufferFromFileLog.cpp b/src/Storages/FileLog/ReadBufferFromFileLog.cpp index 384de64cd3f..a55df9fe09e 100644 --- a/src/Storages/FileLog/ReadBufferFromFileLog.cpp +++ b/src/Storages/FileLog/ReadBufferFromFileLog.cpp @@ -117,7 +117,7 @@ void ReadBufferFromFileLog::readNewRecords(ReadBufferFromFileLog::Records & new_ { /// Need to get offset before reading record from stream auto offset = reader.tellg(); - if (static_cast(offset) < file_meta.last_open_end) + if (static_cast(offset) >= file_meta.last_open_end) break; record.offset = offset; StorageFileLog::assertStreamGood(reader); diff --git a/src/Storages/FileLog/StorageFileLog.cpp b/src/Storages/FileLog/StorageFileLog.cpp index 66add8128ef..81303f623aa 100644 --- a/src/Storages/FileLog/StorageFileLog.cpp +++ b/src/Storages/FileLog/StorageFileLog.cpp @@ -47,8 +47,6 @@ namespace ErrorCodes namespace { - const auto RESCHEDULE_MS = 500; - const auto BACKOFF_TRESHOLD = 32000; const auto MAX_THREAD_WORK_DURATION_MS = 60000; } @@ -57,7 +55,6 @@ StorageFileLog::StorageFileLog( ContextPtr context_, const ColumnsDescription & columns_, const String & path_, - const String & relative_data_path_, const String & format_name_, std::unique_ptr settings, const String & comment, @@ -66,10 +63,9 @@ StorageFileLog::StorageFileLog( , WithContext(context_->getGlobalContext()) , filelog_settings(std::move(settings)) , path(path_) - , relative_data_path(relative_data_path_) , format_name(format_name_) , log(&Poco::Logger::get("StorageFileLog (" + table_id_.table_name + ")")) - , milliseconds_to_wait(RESCHEDULE_MS) + , milliseconds_to_wait(filelog_settings->poll_directory_watch_events_backoff_init.totalMilliseconds()) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); @@ -100,9 +96,9 @@ StorageFileLog::StorageFileLog( void StorageFileLog::loadMetaFiles(bool attach) { - const auto & storage_id = getStorageID(); - root_meta_path = std::filesystem::path(getContext()->getPath()) / "metadata" / "filelog_storage_metadata" / storage_id.getDatabaseName() - / storage_id.getTableName(); + const auto & storage = getStorageID(); + root_meta_path + = std::filesystem::path(getContext()->getPath()) / ".filelog_storage_metadata" / storage.getDatabaseName() / storage.getTableName(); /// Attach table if (attach) @@ -110,8 +106,8 @@ void StorageFileLog::loadMetaFiles(bool attach) /// Meta file may lost, log and create directory if (!std::filesystem::exists(root_meta_path)) { + /// Create root_meta_path directory when store meta data LOG_ERROR(log, "Metadata files of table {} are lost.", getStorageID().getTableName()); - std::filesystem::create_directories(root_meta_path); } /// Load all meta info to file_infos; deserialize(); @@ -180,8 +176,8 @@ void StorageFileLog::loadFiles() /// data file have been renamed, need update meta file's name if (it->second.file_name != file) { - it->second.file_name = file; std::filesystem::rename(getFullMetaPath(it->second.file_name), getFullMetaPath(file)); + it->second.file_name = file; } } /// New file @@ -261,6 +257,8 @@ void StorageFileLog::serialize(UInt64 inode, const FileMeta & file_meta) const void StorageFileLog::deserialize() { + if (!std::filesystem::exists(root_meta_path)) + return; /// In case of single file (not a watched directory), /// iterated directoy always has one file inside. for (const auto & dir_entry : std::filesystem::directory_iterator{root_meta_path}) @@ -324,7 +322,7 @@ Pipe StorageFileLog::read( getStorageID().getTableName()); } - if (running_streams.load(std::memory_order_relaxed)) + if (running_streams) { throw Exception("Another select query is running on this table, need to wait it finish.", ErrorCodes::CANNOT_SELECT); } @@ -409,6 +407,9 @@ void StorageFileLog::shutdown() { task->stream_cancelled = true; + /// Reader thread may wait for wake up + wakeUp(); + LOG_TRACE(log, "Waiting for cleanup"); task->holder->deactivate(); } @@ -623,10 +624,13 @@ void StorageFileLog::threadFunc() { if (path_is_directory) { - std::unique_lock lock(mutex); - /// Waiting for watch directory thread to wake up + std::unique_lock lock(mutex); + /// Waiting for watch directory thread to wake up cv.wait(lock, [this] { return has_new_events; }); has_new_events = false; + + if (task->stream_cancelled) + return; task->holder->schedule(); } else @@ -636,7 +640,7 @@ void StorageFileLog::threadFunc() bool StorageFileLog::streamToViews() { - if (running_streams.load(std::memory_order_relaxed)) + if (running_streams) { throw Exception("Another select query is running on this table, need to wait it finish.", ErrorCodes::CANNOT_SELECT); } @@ -702,6 +706,14 @@ bool StorageFileLog::streamToViews() return updateFileInfos(); } +void StorageFileLog::wakeUp() +{ + std::unique_lock lock(mutex); + has_new_events = true; + lock.unlock(); + cv.notify_one(); +} + void registerStorageFileLog(StorageFactory & factory) { auto creator_fn = [](const StorageFactory::Arguments & args) @@ -767,7 +779,6 @@ void registerStorageFileLog(StorageFactory & factory) args.getContext(), args.columns, path, - args.relative_data_path, format, std::move(filelog_settings), args.comment, @@ -813,10 +824,10 @@ bool StorageFileLog::updateFileInfos() auto events = directory_watch->getEventsAndReset(); - for (const auto & [file_name, event_info] : events) + for (const auto & [file_name, event_infos] : events) { String file_path = getFullDataPath(file_name); - for(const auto & event_info : event_info.file_events) + for (const auto & event_info : event_infos.file_events) { switch (event_info.type) { @@ -836,7 +847,7 @@ bool StorageFileLog::updateFileInfos() file_infos.meta_by_inode.emplace(inode, FileMeta{.file_name = file_name}); if (auto it = file_infos.context_by_name.find(file_name); it != file_infos.context_by_name.end()) - it->second = FileContext{.inode = inode}; + it->second = FileContext{.status = FileStatus::OPEN, .inode = inode}; else file_infos.context_by_name.emplace(file_name, FileContext{.inode = inode}); } diff --git a/src/Storages/FileLog/StorageFileLog.h b/src/Storages/FileLog/StorageFileLog.h index 04c83d8837d..da87cd1be5a 100644 --- a/src/Storages/FileLog/StorageFileLog.h +++ b/src/Storages/FileLog/StorageFileLog.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -126,9 +127,7 @@ public: void increaseStreams(); void reduceStreams(); - auto & getConditionVariable() { return cv; } - auto & getMutex() { return mutex; } - void setNewEvents() { has_new_events = true; } + void wakeUp(); const auto & getFileLogSettings() const { return filelog_settings; } @@ -138,7 +137,6 @@ protected: ContextPtr context_, const ColumnsDescription & columns_, const String & path_, - const String & relative_data_path_, const String & format_name_, std::unique_ptr settings, const String & comment, @@ -148,14 +146,11 @@ private: std::unique_ptr filelog_settings; const String path; - /// For meta file - const String relative_data_path; bool path_is_directory = true; /// If path argument of the table is a regular file, it equals to user_files_path /// otherwise, it equals to user_files_path/ + path_argument/, e.g. path String root_data_path; - /// relative_data_path/ + table_name/ String root_meta_path; FileInfos file_infos; @@ -163,20 +158,8 @@ private: const String format_name; Poco::Logger * log; - std::unique_ptr directory_watch = nullptr; - uint64_t milliseconds_to_wait; - struct TaskContext - { - BackgroundSchedulePool::TaskHolder holder; - std::atomic stream_cancelled {false}; - explicit TaskContext(BackgroundSchedulePool::TaskHolder&& task_) : holder(std::move(task_)) - { - } - }; - std::shared_ptr task; - /// In order to avoid data race, using a naive trick to forbid execute two select /// simultaneously, although read is not useful in this engine. Using an atomic /// variable to records current unfinishing streams, then if have unfinishing streams, @@ -189,6 +172,18 @@ private: bool has_new_events = false; std::condition_variable cv; + struct TaskContext + { + BackgroundSchedulePool::TaskHolder holder; + std::atomic stream_cancelled {false}; + explicit TaskContext(BackgroundSchedulePool::TaskHolder&& task_) : holder(std::move(task_)) + { + } + }; + std::shared_ptr task; + + std::unique_ptr directory_watch = nullptr; + void loadFiles(); void loadMetaFiles(bool attach); diff --git a/tests/queries/0_stateless/02023_storage_filelog.reference b/tests/queries/0_stateless/02023_storage_filelog.reference index 0ab71c65c6b..c787d2047db 100644 --- a/tests/queries/0_stateless/02023_storage_filelog.reference +++ b/tests/queries/0_stateless/02023_storage_filelog.reference @@ -141,4 +141,147 @@ 120 120 120 120 120 120 +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 +10 10 +11 11 +12 12 +13 13 +14 14 +15 15 +16 16 +17 17 +18 18 +19 19 +20 20 +100 100 +101 101 +102 102 +103 103 +104 104 +105 105 +106 106 +107 107 +108 108 +109 109 +110 110 +111 111 +112 112 +113 113 +114 114 +115 115 +116 116 +117 117 +118 118 +119 119 +120 120 +150 150 +151 151 +152 152 +153 153 +154 154 +155 155 +156 156 +157 157 +158 158 +159 159 +160 160 +161 161 +162 162 +163 163 +164 164 +165 165 +166 166 +167 167 +168 168 +169 169 +170 170 +171 171 +172 172 +173 173 +174 174 +175 175 +176 176 +177 177 +178 178 +179 179 +180 180 +181 181 +182 182 +183 183 +184 184 +185 185 +186 186 +187 187 +188 188 +189 189 +190 190 +191 191 +192 192 +193 193 +194 194 +195 195 +196 196 +197 197 +198 198 +199 199 +200 200 +200 200 +201 201 +202 202 +203 203 +204 204 +205 205 +206 206 +207 207 +208 208 +209 209 +210 210 +211 211 +212 212 +213 213 +214 214 +215 215 +216 216 +217 217 +218 218 +219 219 +220 220 +221 221 +222 222 +223 223 +224 224 +225 225 +226 226 +227 227 +228 228 +229 229 +230 230 +231 231 +232 232 +233 233 +234 234 +235 235 +236 236 +237 237 +238 238 +239 239 +240 240 +241 241 +242 242 +243 243 +244 244 +245 245 +246 246 +247 247 +248 248 +249 249 +250 250 OK diff --git a/tests/queries/0_stateless/02023_storage_filelog.sh b/tests/queries/0_stateless/02023_storage_filelog.sh index b695c270835..b13bb4da065 100755 --- a/tests/queries/0_stateless/02023_storage_filelog.sh +++ b/tests/queries/0_stateless/02023_storage_filelog.sh @@ -10,7 +10,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # Data preparation. # Now we can get the user_files_path by use the table file function for trick. also we can get it by query as: # "insert into function file('exist.txt', 'CSV', 'val1 char') values ('aaaa'); select _path from file('exist.txt', 'CSV', 'val1 char')" -user_files_path=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}') +user_files_path=$(${CLICKHOUSE_CLIENT} --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}') mkdir -p ${user_files_path}/logs/ @@ -41,11 +41,28 @@ touch ${user_files_path}/logs/a.txt cp ${user_files_path}/logs/a.txt ${user_files_path}/logs/c.txt cp ${user_files_path}/logs/a.txt ${user_files_path}/logs/d.txt cp ${user_files_path}/logs/a.txt ${user_files_path}/logs/e.txt +mv ${user_files_path}/logs/b.txt ${user_files_path}/logs/j.txt rm ${user_files_path}/logs/d.txt ${CLICKHOUSE_CLIENT} --query "select * from file_log order by k;" +${CLICKHOUSE_CLIENT} --query "detach table file_log;" +cp ${user_files_path}/logs/e.txt ${user_files_path}/logs/f.txt +mv ${user_files_path}/logs/e.txt ${user_files_path}/logs/g.txt +mv ${user_files_path}/logs/c.txt ${user_files_path}/logs/h.txt +for i in {150..200} +do + echo $i, $i >> ${user_files_path}/logs/h.txt +done +for i in {200..250} +do + echo $i, $i >> ${user_files_path}/logs/i.txt +done +${CLICKHOUSE_CLIENT} --query "attach table file_log;" + +${CLICKHOUSE_CLIENT} --query "select * from file_log order by k;" + ${CLICKHOUSE_CLIENT} --query "detach table file_log;" ${CLICKHOUSE_CLIENT} --query "attach table file_log;" diff --git a/tests/queries/0_stateless/02024_storage_filelog_mv.sh b/tests/queries/0_stateless/02024_storage_filelog_mv.sh index 0503552b8dd..95dfb74ecf6 100755 --- a/tests/queries/0_stateless/02024_storage_filelog_mv.sh +++ b/tests/queries/0_stateless/02024_storage_filelog_mv.sh @@ -14,10 +14,6 @@ user_files_path=$(clickhouse-client --query "select _path,_file from file('nonex mkdir -p ${user_files_path}/logs/ rm -rf ${user_files_path}/logs/* -for i in {1..20} -do - echo $i, $i >> ${user_files_path}/logs/a.txt -done ${CLICKHOUSE_CLIENT} --query "drop table if exists file_log;" ${CLICKHOUSE_CLIENT} --query "create table file_log(k UInt8, v UInt8) engine=FileLog('${user_files_path}/logs/', 'CSV');" @@ -25,6 +21,11 @@ ${CLICKHOUSE_CLIENT} --query "create table file_log(k UInt8, v UInt8) engine=Fil ${CLICKHOUSE_CLIENT} --query "drop table if exists mv;" ${CLICKHOUSE_CLIENT} --query "create Materialized View mv engine=MergeTree order by k as select * from file_log;" +for i in {1..20} +do + echo $i, $i >> ${user_files_path}/logs/a.txt +done + for i in {1..200} do sleep 0.1