From 595005eb211b60b548819963232de325312dd893 Mon Sep 17 00:00:00 2001 From: feng lv Date: Sat, 4 Sep 2021 17:04:35 +0000 Subject: [PATCH] refactor some code --- src/Storages/FileLog/FileLogSettings.cpp | 41 +++ src/Storages/FileLog/FileLogSettings.h | 36 +++ ...BlockInputStream.cpp => FileLogSource.cpp} | 56 ++-- ...eLogBlockInputStream.h => FileLogSource.h} | 33 +- .../FileLog/ReadBufferFromFileLog.cpp | 69 +---- src/Storages/FileLog/ReadBufferFromFileLog.h | 23 +- src/Storages/FileLog/StorageFileLog.cpp | 281 +++++++++++++----- src/Storages/FileLog/StorageFileLog.h | 39 ++- 8 files changed, 366 insertions(+), 212 deletions(-) create mode 100644 src/Storages/FileLog/FileLogSettings.cpp create mode 100644 src/Storages/FileLog/FileLogSettings.h rename src/Storages/FileLog/{FileLogBlockInputStream.cpp => FileLogSource.cpp} (73%) rename src/Storages/FileLog/{FileLogBlockInputStream.h => FileLogSource.h} (57%) diff --git a/src/Storages/FileLog/FileLogSettings.cpp b/src/Storages/FileLog/FileLogSettings.cpp new file mode 100644 index 00000000000..2cd42c35870 --- /dev/null +++ b/src/Storages/FileLog/FileLogSettings.cpp @@ -0,0 +1,41 @@ +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_SETTING; +} + +IMPLEMENT_SETTINGS_TRAITS(FileLogSettingsTraits, LIST_OF_FILELOG_SETTINGS) + +void FileLogSettings::loadFromQuery(ASTStorage & storage_def) +{ + if (storage_def.settings) + { + try + { + applyChanges(storage_def.settings->changes); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::UNKNOWN_SETTING) + e.addMessage("for storage " + storage_def.engine->name); + throw; + } + } + else + { + auto settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + storage_def.set(storage_def.settings, settings_ast); + } +} + +} diff --git a/src/Storages/FileLog/FileLogSettings.h b/src/Storages/FileLog/FileLogSettings.h new file mode 100644 index 00000000000..cfea9d1e195 --- /dev/null +++ b/src/Storages/FileLog/FileLogSettings.h @@ -0,0 +1,36 @@ +#pragma once + +#include +#include + + +namespace DB +{ +class ASTStorage; + + +#define FILELOG_RELATED_SETTINGS(M) \ + /* default is stream_poll_timeout_ms */ \ + M(Milliseconds, filelog_poll_timeout_ms, 0, "Timeout for single poll from FileLog.", 0) \ + /* default is min(max_block_size, kafka_max_block_size)*/ \ + M(UInt64, filelog_poll_max_batch_size, 0, "Maximum amount of messages to be polled in a single Kafka poll.", 0) \ + /* default is = max_insert_block_size / kafka_num_consumers */ \ + M(UInt64, filelog_max_block_size, 0, "Number of row collected by poll(s) for flushing data from Kafka.", 0) \ + M(UInt64, filelog_max_threads, 8, "Number of max threads to parse files, default is 8", 0) + +#define LIST_OF_FILELOG_SETTINGS(M) \ + FILELOG_RELATED_SETTINGS(M) \ + FORMAT_FACTORY_SETTINGS(M) + +DECLARE_SETTINGS_TRAITS(FileLogSettingsTraits, LIST_OF_FILELOG_SETTINGS) + + +/** Settings for the Kafka engine. + * Could be loaded from a CREATE TABLE query (SETTINGS clause). + */ +struct FileLogSettings : public BaseSettings +{ + void loadFromQuery(ASTStorage & storage_def); +}; + +} diff --git a/src/Storages/FileLog/FileLogBlockInputStream.cpp b/src/Storages/FileLog/FileLogSource.cpp similarity index 73% rename from src/Storages/FileLog/FileLogBlockInputStream.cpp rename to src/Storages/FileLog/FileLogSource.cpp index 5c206bfae6d..4a15cd07f25 100644 --- a/src/Storages/FileLog/FileLogBlockInputStream.cpp +++ b/src/Storages/FileLog/FileLogSource.cpp @@ -1,11 +1,11 @@ -#include - #include #include #include #include #include +#include #include +#include #include namespace DB @@ -15,42 +15,34 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -FileLogBlockInputStream::FileLogBlockInputStream( +FileLogSource::FileLogSource( StorageFileLog & storage_, const StorageMetadataPtr & metadata_snapshot_, - const std::shared_ptr & context_, + const ContextPtr & context_, const Names & columns, - size_t max_block_size_) + size_t max_block_size_, + size_t poll_time_out_, + size_t stream_number_, + size_t max_streams_number_) : storage(storage_) , metadata_snapshot(metadata_snapshot_) , context(context_) , column_names(columns) , max_block_size(max_block_size_) + , poll_time_out(poll_time_out_) + , stream_number(stream_number_) + , max_streams_number(max_streams_number_) , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized()) , virtual_header( metadata_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames(), storage.getVirtuals(), storage.getStorageID())) { + createReadBuffer(); } -Block FileLogBlockInputStream::getHeader() const -{ - return metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID()); -} - -void FileLogBlockInputStream::readPrefixImpl() -{ - buffer = storage.getBuffer(); - - if (!buffer) - return; - - buffer->open(); -} - -Block FileLogBlockInputStream::readImpl() +Chunk FileLogSource::generate() { if (!buffer) - return Block(); + return {}; MutableColumns result_columns = non_virtual_header.cloneEmptyColumns(); @@ -104,8 +96,8 @@ Block FileLogBlockInputStream::readImpl() while (true) { + Stopwatch watch; size_t new_rows = 0; - exception_message.reset(); if (buffer->poll()) { try @@ -122,28 +114,16 @@ Block FileLogBlockInputStream::readImpl() total_rows = total_rows + new_rows; } - if (!buffer->hasMorePolledRecords() && (total_rows >= max_block_size || !checkTimeLimit())) + if ((!buffer->hasMorePolledRecords() && (total_rows >= max_block_size)) || watch.elapsedMilliseconds() > poll_time_out) { break; } } if (total_rows == 0) - return Block(); + return {}; - auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns)); - - return ConvertingBlockInputStream( - std::make_shared(result_block), - getHeader(), - ConvertingBlockInputStream::MatchColumnsMode::Name) - .read(); -} - -void FileLogBlockInputStream::readSuffixImpl() -{ - if (buffer) - buffer->close(); + return Chunk(std::move(result_columns), total_rows); } } diff --git a/src/Storages/FileLog/FileLogBlockInputStream.h b/src/Storages/FileLog/FileLogSource.h similarity index 57% rename from src/Storages/FileLog/FileLogBlockInputStream.h rename to src/Storages/FileLog/FileLogSource.h index d72ab5649ee..042a973986c 100644 --- a/src/Storages/FileLog/FileLogBlockInputStream.h +++ b/src/Storages/FileLog/FileLogSource.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include @@ -12,26 +12,24 @@ namespace Poco } namespace DB { -class FileLogBlockInputStream : public IBlockInputStream +class FileLogSource : public SourceWithProgress { public: - FileLogBlockInputStream( + FileLogSource( StorageFileLog & storage_, const StorageMetadataPtr & metadata_snapshot_, - const std::shared_ptr & context_, + const ContextPtr & context_, const Names & columns, - size_t max_block_size_); - ~FileLogBlockInputStream() override = default; - - String getName() const override { return storage.getName(); } - Block getHeader() const override; - - void readPrefixImpl() override; - Block readImpl() override; - void readSuffixImpl() override; + size_t max_block_size_, + size_t poll_time_out_, + size_t stream_number_, + size_t max_streams_number_); bool isStalled() { return !buffer || buffer->isStalled(); } +protected: + Chunk generate() override; + private: StorageFileLog & storage; StorageMetadataPtr metadata_snapshot; @@ -39,10 +37,19 @@ private: Names column_names; UInt64 max_block_size; + size_t poll_time_out; + + size_t stream_number; + size_t max_streams_number; + ReadBufferFromFileLogPtr buffer; + bool started = false; + const Block non_virtual_header; const Block virtual_header; + + void createReadBuffer(); }; } diff --git a/src/Storages/FileLog/ReadBufferFromFileLog.cpp b/src/Storages/FileLog/ReadBufferFromFileLog.cpp index e5849b50894..1c39296dabb 100644 --- a/src/Storages/FileLog/ReadBufferFromFileLog.cpp +++ b/src/Storages/FileLog/ReadBufferFromFileLog.cpp @@ -1,13 +1,11 @@ #include #include -#include -#include #include -#include -#include #include +#include +#include namespace DB { @@ -29,33 +27,9 @@ ReadBufferFromFileLog::ReadBufferFromFileLog( void ReadBufferFromFileLog::open() { - Poco::File file(path); - - if (file.isFile()) - { - file_status[path].reader = std::ifstream(path); - } - else if (file.isDirectory()) - { - path_is_directory = true; - Poco::DirectoryIterator dir_iter(file); - Poco::DirectoryIterator end; - while (dir_iter != end) - { - if (dir_iter->isFile()) - file_status[dir_iter->path()].reader = std::ifstream(dir_iter->path()); - ++dir_iter; - } - } - wait_task = context->getMessageBrokerSchedulePool().createTask("waitTask", [this] { waitFunc(); }); wait_task->deactivate(); - if (path_is_directory) - { - select_task = context->getMessageBrokerSchedulePool().createTask("watchTask", [this] { watchFunc(); }); - select_task->activateAndSchedule(); - } cleanUnprocessed(); allowed = false; @@ -183,43 +157,4 @@ void ReadBufferFromFileLog::waitFunc() time_out = true; } -void ReadBufferFromFileLog::watchFunc() -{ - FileLogDirectoryWatcher dw(path); - while (true) - { - sleepForNanoseconds(poll_timeout); - - auto error = dw.getError(); - if (error) - LOG_INFO(log, "Error happened during watching directory {}.", dw.getPath()); - - auto events = dw.getEvents(); - std::lock_guard lock(status_mutex); - - for (const auto & event : events) - { - switch (event.type) - { - - case Poco::DirectoryWatcher::DW_ITEM_ADDED: - LOG_TRACE(log, "New event {} watched.", event.callback); - file_status[event.path].reader = std::ifstream(event.path); - break; - - case Poco::DirectoryWatcher::DW_ITEM_MODIFIED: - LOG_TRACE(log, "New event {} watched.", event.callback); - file_status[event.path].status = FileStatus::UPDATED; - break; - - case Poco::DirectoryWatcher::DW_ITEM_REMOVED: - case Poco::DirectoryWatcher::DW_ITEM_MOVED_TO: - case Poco::DirectoryWatcher::DW_ITEM_MOVED_FROM: - LOG_TRACE(log, "New event {} watched.", event.callback); - file_status[event.path].status = FileStatus::REMOVED; - break; - } - } - } -} } diff --git a/src/Storages/FileLog/ReadBufferFromFileLog.h b/src/Storages/FileLog/ReadBufferFromFileLog.h index e74c93d189e..e7385282f71 100644 --- a/src/Storages/FileLog/ReadBufferFromFileLog.h +++ b/src/Storages/FileLog/ReadBufferFromFileLog.h @@ -33,26 +33,13 @@ public: bool poll(); - bool isStalled() { return buffer_status != BufferStatus::NOT_STALLED; } + bool noRecords() { return buffer_status == BufferStatus::NO_RECORD_RETURNED; } private: enum class BufferStatus { NO_RECORD_RETURNED, - NOT_STALLED, - }; - enum class FileStatus - { - BEGIN, - NO_CHANGE, - UPDATED, - REMOVED - }; - - struct FileContext - { - FileStatus status = FileStatus::BEGIN; - std::ifstream reader; + POLLED_OK, }; BufferStatus buffer_status; @@ -67,10 +54,6 @@ private: bool time_out = false; - using NameToFile = std::unordered_map; - NameToFile file_status; - - std::mutex status_mutex; ContextPtr context; @@ -85,7 +68,6 @@ private: using TaskThread = BackgroundSchedulePool::TaskHolder; TaskThread wait_task; - TaskThread select_task; Records pollBatch(size_t batch_size_); @@ -97,6 +79,5 @@ private: void waitFunc(); - [[noreturn]] void watchFunc(); }; } diff --git a/src/Storages/FileLog/StorageFileLog.cpp b/src/Storages/FileLog/StorageFileLog.cpp index 565b206e2cc..a6f819580d5 100644 --- a/src/Storages/FileLog/StorageFileLog.cpp +++ b/src/Storages/FileLog/StorageFileLog.cpp @@ -1,16 +1,3 @@ -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include #include @@ -18,20 +5,25 @@ #include #include #include +#include #include #include +#include +#include +#include #include #include -#include -#include #include #include #include +#include #include #include #include #include +#include +#include namespace DB { @@ -53,19 +45,41 @@ StorageFileLog::StorageFileLog( ContextPtr context_, const ColumnsDescription & columns_, const String & path_, - const String & format_name_) + const String & format_name_, + std::unique_ptr settings) : IStorage(table_id_) , WithContext(context_->getGlobalContext()) + , filelog_settings(std::move(settings)) , path(path_) , format_name(format_name_) - , log(&Poco::Logger::get("StorageFile (" + table_id_.table_name + ")")) + , log(&Poco::Logger::get("StorageFileLog (" + table_id_.table_name + ")")) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); setInMemoryMetadata(storage_metadata); + if (std::filesystem::is_regular_file(path)) + { + file_status[path].reader = std::ifstream(path); + file_names.push_back(path); + } + else if (std::filesystem::is_directory(path)) + { + path_is_directory = true; + /// Just consider file with depth 1 + for (const auto & dir_entry : std::filesystem::directory_iterator{path}) + { + if (dir_entry.is_regular_file()) + { + file_status[dir_entry.path()].reader = std::ifstream(dir_entry.path()); + file_names.push_back(dir_entry.path()); + } + } + } + + watch_task = getContext()->getMessageBrokerSchedulePool().createTask("watchTask", [this] { watchFunc(); }); + auto thread = getContext()->getMessageBrokerSchedulePool().createTask(log->name(), [this] { threadFunc(); }); - thread->deactivate(); task = std::make_shared(std::move(thread)); } @@ -78,23 +92,38 @@ Pipe StorageFileLog::read( size_t /* max_block_size */, unsigned /* num_streams */) { + std::lock_guard lock(status_mutex); auto modified_context = Context::createCopy(local_context); - return Pipe(std::make_shared( - std::make_shared(*this, metadata_snapshot, modified_context, column_names, 1))); + clearInvalidFiles(); + + auto max_streams_number = std::min(filelog_settings->filelog_max_threads, file_names.size()); + /// No files to parse + if (max_streams_number == 0) + { + return Pipe{}; + } + + Pipes pipes; + pipes.reserve(max_streams_number); + for (size_t stream_number = 0; stream_number < max_streams_number; ++stream_number) + { + pipes.emplace_back(std::make_shared( + *this, + metadata_snapshot, + modified_context, + column_names, + getMaxBlockSize(), + getPollTimeoutMillisecond(), + stream_number, + max_streams_number)); + } + + return Pipe::unitePipes(std::move(pipes)); } void StorageFileLog::startup() { - try - { - createReadBuffer(); - } - catch (const Exception &) - { - tryLogCurrentException(log); - } - task->holder->activateAndSchedule(); } @@ -105,26 +134,32 @@ void StorageFileLog::shutdown() LOG_TRACE(log, "Waiting for cleanup"); task->holder->deactivate(); + watch_task->deactivate(); - LOG_TRACE(log, "Closing files"); - destroyReadBuffer(); + for (auto & file : file_status) + { + file.second.reader.close(); + } } size_t StorageFileLog::getMaxBlockSize() const { - return getContext()->getSettingsRef().max_insert_block_size.value; + return filelog_settings->filelog_max_block_size.changed ? filelog_settings->filelog_max_block_size.value + : getContext()->getSettingsRef().max_insert_block_size.value; } size_t StorageFileLog::getPollMaxBatchSize() const { - size_t batch_size = getContext()->getSettingsRef().max_block_size.value; + size_t batch_size = filelog_settings->filelog_poll_max_batch_size.changed ? filelog_settings->filelog_poll_max_batch_size.value + : getContext()->getSettingsRef().max_block_size.value; - return std::min(batch_size,getMaxBlockSize()); + return std::min(batch_size, getMaxBlockSize()); } size_t StorageFileLog::getPollTimeoutMillisecond() const { - return getContext()->getSettingsRef().stream_poll_timeout_ms.totalMilliseconds(); + return filelog_settings->filelog_poll_timeout_ms.changed ? filelog_settings->filelog_poll_timeout_ms.totalMilliseconds() + : getContext()->getSettingsRef().stream_poll_timeout_ms.totalMilliseconds(); } @@ -135,9 +170,9 @@ bool StorageFileLog::checkDependencies(const StorageID & table_id) if (dependencies.empty()) return true; - for (const auto & db_tab : dependencies) + for (const auto & storage : dependencies) { - auto table = DatabaseCatalog::instance().tryGetTable(db_tab, getContext()); + auto table = DatabaseCatalog::instance().tryGetTable(storage, getContext()); if (!table) return false; @@ -147,25 +182,13 @@ bool StorageFileLog::checkDependencies(const StorageID & table_id) return false; // Check all its dependencies - if (!checkDependencies(db_tab)) + if (!checkDependencies(storage)) return false; } return true; } -void StorageFileLog::createReadBuffer() -{ - auto new_context = Context::createCopy(getContext()); - buffer = std::make_shared(path, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), new_context); -} - -void StorageFileLog::destroyReadBuffer() -{ - if (buffer) - buffer->close(); -} - void StorageFileLog::threadFunc() { try @@ -177,6 +200,8 @@ void StorageFileLog::threadFunc() { auto start_time = std::chrono::steady_clock::now(); + watch_task->activateAndSchedule(); + // Keep streaming as long as there are attached views and streaming is not cancelled while (!task->stream_cancelled) { @@ -207,6 +232,7 @@ void StorageFileLog::threadFunc() tryLogCurrentException(__PRETTY_FUNCTION__); } + watch_task->deactivate(); // Wait for attached views if (!task->stream_cancelled) task->holder->scheduleAfter(RESCHEDULE_MS); @@ -215,6 +241,7 @@ void StorageFileLog::threadFunc() bool StorageFileLog::streamToViews() { + std::lock_guard lock(status_mutex); Stopwatch watch; auto table_id = getStorageID(); @@ -223,41 +250,79 @@ bool StorageFileLog::streamToViews() throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR); auto metadata_snapshot = getInMemoryMetadataPtr(); + clearInvalidFiles(); + + auto max_streams_number = std::min(filelog_settings->filelog_max_threads, file_names.size()); + /// No files to parse + if (max_streams_number == 0) + { + return false; + } + // Create an INSERT query for streaming data auto insert = std::make_shared(); insert->table_id = table_id; - size_t block_size = getMaxBlockSize(); - auto new_context = Context::createCopy(getContext()); InterpreterInsertQuery interpreter(insert, new_context, false, true, true); auto block_io = interpreter.execute(); - auto stream = std::make_shared( - *this, metadata_snapshot, new_context, block_io.out->getHeader().getNames(), block_size); + Pipes pipes; + pipes.reserve(max_streams_number); + for (size_t stream_number = 0; stream_number < max_streams_number; ++stream_number) + { + pipes.emplace_back(std::make_shared( + *this, + metadata_snapshot, + new_context, + block_io.out->getHeader().getNames(), + getPollMaxBatchSize(), + getPollTimeoutMillisecond(), + stream_number, + max_streams_number)); + } - StreamLocalLimits limits; + QueryPipeline pipeline; + pipeline.init(Pipe::unitePipes(std::move(pipes))); - limits.speed_limits.max_execution_time = getContext()->getSettingsRef().stream_flush_interval_ms; - - limits.timeout_overflow_mode = OverflowMode::BREAK; - stream->setLimits(limits); - - std::atomic stub = {false}; size_t rows = 0; - copyData( - *stream, *block_io.out, [&rows](const Block & block) { rows += block.rows(); }, &stub); - bool stream_is_stalled = false; - - stream_is_stalled = stream->as()->isStalled(); + PullingPipelineExecutor executor(pipeline); + Block block; + block_io.out->writePrefix(); + while (executor.pull(block)) + { + block_io.out->write(block); + rows += block.rows(); + } + block_io.out->writeSuffix(); UInt64 milliseconds = watch.elapsedMilliseconds(); LOG_DEBUG(log, "Pushing {} rows to {} took {} ms.", formatReadableQuantity(rows), table_id.getNameForLogs(), milliseconds); - return stream_is_stalled; + return true; +} + +void StorageFileLog::clearInvalidFiles() +{ + /// Do not need to hold file_status lock, since it will be holded + /// by caller when call this function + std::vector valid_files; + for (const auto & it : file_names) + { + if (file_status.at(it).status == FileStatus::REMOVED) + { + file_status.erase(it); + } + else + { + valid_files.push_back(it); + } + } + + file_names.swap(valid_files); } void registerStorageFileLog(StorageFactory & factory) @@ -267,6 +332,36 @@ void registerStorageFileLog(StorageFactory & factory) ASTs & engine_args = args.engine_args; size_t args_count = engine_args.size(); + bool has_settings = args.storage_def->settings; + + auto filelog_settings = std::make_unique(); + if (has_settings) + { + filelog_settings->loadFromQuery(*args.storage_def); + } + + auto physical_cpu_cores = getNumberOfPhysicalCPUCores(); + auto num_threads = filelog_settings->filelog_max_threads.value; + + if (num_threads > physical_cpu_cores) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Number of threads to parse files can not be bigger than {}", physical_cpu_cores); + } + else if (num_threads < 1) + { + throw Exception("Number of threads to parse files can not be lower than 1", ErrorCodes::BAD_ARGUMENTS); + } + + if (filelog_settings->filelog_max_block_size.changed && filelog_settings->filelog_max_block_size.value < 1) + { + throw Exception("filelog_max_block_size can not be lower than 1", ErrorCodes::BAD_ARGUMENTS); + } + + if (filelog_settings->filelog_poll_max_batch_size.changed && filelog_settings->filelog_poll_max_batch_size.value < 1) + { + throw Exception("filelog_poll_max_batch_size can not be lower than 1", ErrorCodes::BAD_ARGUMENTS); + } + if (args_count != 2) throw Exception( "Arguments size of StorageFileLog should be 2, path and format name", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); @@ -277,7 +372,7 @@ void registerStorageFileLog(StorageFactory & factory) auto path = path_ast->as().value.safeGet(); auto format = format_ast->as().value.safeGet(); - return StorageFileLog::create(args.table_id, args.getContext(), args.columns, path, format); + return StorageFileLog::create(args.table_id, args.getContext(), args.columns, path, format, std::move(filelog_settings)); }; factory.registerStorage( @@ -300,4 +395,54 @@ Names StorageFileLog::getVirtualColumnNames() return result; } +void StorageFileLog::watchFunc() +{ + FileLogDirectoryWatcher dw(path); + while (true) + { + sleepForMicroseconds(filelog_settings->filelog_poll_timeout_ms.totalMilliseconds()); + + auto error = dw.getError(); + if (error) + LOG_INFO(log, "Error happened during watching directory {}.", dw.getPath()); + + auto events = dw.getEvents(); + std::lock_guard lock(status_mutex); + + for (const auto & event : events) + { + switch (event.type) + { + + case Poco::DirectoryWatcher::DW_ITEM_ADDED: + LOG_TRACE(log, "New event {} watched.", event.callback); + if (std::filesystem::is_regular_file(event.path)) + { + file_status[event.path].reader = std::ifstream(event.path); + file_names.push_back(event.path); + } + break; + + case Poco::DirectoryWatcher::DW_ITEM_MODIFIED: + LOG_TRACE(log, "New event {} watched.", event.callback); + if (std::filesystem::is_regular_file(event.path) && file_status.contains(event.path)) + { + file_status[event.path].status = FileStatus::UPDATED; + } + break; + + case Poco::DirectoryWatcher::DW_ITEM_REMOVED: + case Poco::DirectoryWatcher::DW_ITEM_MOVED_TO: + case Poco::DirectoryWatcher::DW_ITEM_MOVED_FROM: + LOG_TRACE(log, "New event {} watched.", event.callback); + if (std::filesystem::is_regular_file(event.path) && file_status.contains(event.path)) + { + file_status[event.path].status = FileStatus::REMOVED; + } + break; + } + } + } +} + } diff --git a/src/Storages/FileLog/StorageFileLog.h b/src/Storages/FileLog/StorageFileLog.h index 93b67fe5a31..314724d73be 100644 --- a/src/Storages/FileLog/StorageFileLog.h +++ b/src/Storages/FileLog/StorageFileLog.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -11,8 +12,8 @@ #include #include -#include #include +#include namespace DB { @@ -52,16 +53,39 @@ protected: ContextPtr context_, const ColumnsDescription & columns_, const String & path_, - const String & format_name_); + const String & format_name_, + std::unique_ptr settings); private: + std::unique_ptr filelog_settings; const String path; + bool path_is_directory = false; + const String format_name; Poco::Logger * log; ReadBufferFromFileLogPtr buffer; - std::mutex mutex; + enum class FileStatus + { + BEGIN, + NO_CHANGE, + UPDATED, + REMOVED + }; + + struct FileContext + { + FileStatus status = FileStatus::BEGIN; + std::ifstream reader; + }; + + using NameToFile = std::unordered_map; + NameToFile file_status; + + std::vector file_names; + + std::mutex status_mutex; // Stream thread struct TaskContext @@ -74,17 +98,22 @@ private: }; std::shared_ptr task; - void createReadBuffer(); - void destroyReadBuffer(); + using TaskThread = BackgroundSchedulePool::TaskHolder; + + TaskThread watch_task; void threadFunc(); + void clearInvalidFiles(); + size_t getPollMaxBatchSize() const; size_t getMaxBlockSize() const; size_t getPollTimeoutMillisecond() const; bool streamToViews(); bool checkDependencies(const StorageID & table_id); + + [[noreturn]] void watchFunc(); }; }