diff --git a/src/Storages/FileLog/FileLogDirectoryWatcher.h b/src/Storages/FileLog/FileLogDirectoryWatcher.h index 310b5bde388..00ee7ddfc40 100644 --- a/src/Storages/FileLog/FileLogDirectoryWatcher.h +++ b/src/Storages/FileLog/FileLogDirectoryWatcher.h @@ -47,8 +47,8 @@ private: const std::string path; /// Note, in order to avoid data race found by fuzzer, put events before dw, - /// such that when this class desctruction, dw will be destructed before events. - /// The data race is because dw create a seperate thread to monitor file events + /// such that when this class destruction, dw will be destructed before events. + /// The data race is because dw create a separate thread to monitor file events /// and put into events, then if we destruct events first, the monitor thread still /// running, it may access events during events destruction, leads to data race. Events events; diff --git a/src/Storages/FileLog/FileLogSource.cpp b/src/Storages/FileLog/FileLogSource.cpp index de46c880301..eb8cb2b28f9 100644 --- a/src/Storages/FileLog/FileLogSource.cpp +++ b/src/Storages/FileLog/FileLogSource.cpp @@ -36,21 +36,17 @@ FileLogSource::FileLogSource( metadata_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames(), storage.getVirtuals(), storage.getStorageID())) { buffer = std::make_unique(storage, max_block_size, poll_time_out, context, stream_number_, max_streams_number_); + + const auto & file_infos = storage.getFileInfos(); + + size_t files_per_stream = file_infos.file_names.size() / max_streams_number; + start = stream_number * files_per_stream; + end = stream_number == max_streams_number - 1 ? file_infos.file_names.size() : (stream_number + 1) * files_per_stream; } void FileLogSource::onFinish() { - auto & file_infos = storage.getFileInfos(); - - size_t files_per_stream = file_infos.file_names.size() / max_streams_number; - size_t start = stream_number * files_per_stream; - size_t end = stream_number == max_streams_number - 1 ? file_infos.file_names.size() : (stream_number + 1) * files_per_stream; - - /// Each stream responsible for close it's files and store meta - for (size_t i = start; i < end; ++i) - { - storage.closeFileAndStoreMeta(file_infos.file_names[i]); - } + storage.closeFilesAndStoreMeta(start, end); } Chunk FileLogSource::generate() @@ -60,6 +56,7 @@ Chunk FileLogSource::generate() /// There is no onFinish for ISource, we call it /// when no records return to close files onFinish(); + finished = true; return {}; } @@ -105,7 +102,11 @@ Chunk FileLogSource::generate() } if (total_rows == 0) + { + onFinish(); + finished = true; return {}; + } auto result_block = non_virtual_header.cloneWithColumns(executor.getResultColumns()); auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns)); @@ -121,6 +122,9 @@ Chunk FileLogSource::generate() auto converting_actions = std::make_shared(std::move(converting_dag)); converting_actions->execute(result_block); + /// After generate each block, store metas into disk + storage.storeMetas(start, end); + return Chunk(result_block.getColumns(), result_block.rows()); } diff --git a/src/Storages/FileLog/FileLogSource.h b/src/Storages/FileLog/FileLogSource.h index f57b1096749..6f94ef2470a 100644 --- a/src/Storages/FileLog/FileLogSource.h +++ b/src/Storages/FileLog/FileLogSource.h @@ -30,6 +30,12 @@ public: void onFinish(); + virtual ~FileLogSource() override + { + if (!finished) + onFinish(); + } + protected: Chunk generate() override; @@ -49,6 +55,11 @@ private: Block non_virtual_header; Block virtual_header; + + /// The start pos and end pos of files responsible by this stream, + /// does not includind end + int start; + int end; }; } diff --git a/src/Storages/FileLog/StorageFileLog.cpp b/src/Storages/FileLog/StorageFileLog.cpp index bf81457995f..431d88b9f7b 100644 --- a/src/Storages/FileLog/StorageFileLog.cpp +++ b/src/Storages/FileLog/StorageFileLog.cpp @@ -223,17 +223,26 @@ void StorageFileLog::serialize() const { std::filesystem::create_directories(root_meta_path); } - for (const auto & it : file_infos.meta_by_inode) + for (const auto & [inode, meta] : file_infos.meta_by_inode) { - auto full_name = getFullMetaPath(it.second.file_name); + auto full_name = getFullMetaPath(meta.file_name); if (!std::filesystem::exists(full_name)) { FS::createFile(full_name); } + else + { + auto last_pos = getLastWrittenPos(meta.file_name); + if (last_pos > meta.last_writen_position) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Last stored last written pos is bigger than current last written pos need to store for meta file {}.", + full_name); + } WriteBufferFromFile out(full_name); - writeIntText(it.first, out); + writeIntText(inode, out); writeChar('\n', out); - writeIntText(it.second.last_writen_position, out); + writeIntText(meta.last_writen_position, out); } } @@ -248,6 +257,15 @@ void StorageFileLog::serialize(UInt64 inode, const FileMeta & file_meta) const { FS::createFile(full_name); } + else + { + auto last_pos = getLastWrittenPos(file_meta.file_name); + if (last_pos > file_meta.last_writen_position) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Last stored last written pos is bigger than current last written pos need to store for meta file {}.", + full_name); + } WriteBufferFromFile out(full_name); writeIntText(inode, out); writeChar('\n', out); @@ -288,6 +306,23 @@ void StorageFileLog::deserialize() } } +UInt64 StorageFileLog::getLastWrittenPos(const String & file_name) const +{ + ReadBufferFromFile in(getFullMetaPath(file_name)); + UInt64 _, last_written_pos; + + if (!tryReadIntText(_, in)) + { + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed.", getFullMetaPath(file_name)); + } + assertChar('\n', in); + if (!tryReadIntText(last_written_pos, in)) + { + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed.", getFullMetaPath(file_name)); + } + return last_written_pos; +} + UInt64 StorageFileLog::getInode(const String & file_name) { struct stat file_stat; @@ -307,7 +342,19 @@ Pipe StorageFileLog::read( size_t /* max_block_size */, unsigned /* num_streams */) { - /// We need this lock, in case read and streamToViews execute at the same time + auto table_id = getStorageID(); + size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size(); + /// If there are MVs depended on this table, we just forbid reading + if (dependencies_count) + { + throw Exception( + ErrorCodes::CANNOT_READ_ALL_DATA, + "Can not read from table {}, because it has been depended by other tables.", + table_id.getTableName()); + } + + /// We need this lock, in case read and streamToViews execute at the same time. + /// In case of MV attached during reading std::lock_guard lock(status_mutex); updateFileInfos(); @@ -440,14 +487,41 @@ void StorageFileLog::closeFilesAndStoreMeta() serialize(); } -void StorageFileLog::closeFileAndStoreMeta(const String & file_name) +void StorageFileLog::closeFilesAndStoreMeta(int start, int end) { - auto & file_ctx = findInMap(file_infos.context_by_name, file_name); - if (file_ctx.reader.is_open()) - file_ctx.reader.close(); +#ifndef NDEBUG + assert(start >= 0); + assert(start < end); + assert(end <= file_infos.file_names.size()); +#endif - auto & meta = findInMap(file_infos.meta_by_inode, file_ctx.inode); - serialize(file_ctx.inode, meta); + for (int i = start; i < end; ++i) + { + auto & file_ctx = findInMap(file_infos.context_by_name, file_infos.file_names[i]); + + if (file_ctx.reader.is_open()) + file_ctx.reader.close(); + + auto & meta = findInMap(file_infos.meta_by_inode, file_ctx.inode); + serialize(file_ctx.inode, meta); + } +} + +void StorageFileLog::storeMetas(int start, int end) +{ +#ifndef NDEBUG + assert(start >= 0); + assert(start < end); + assert(end <= file_infos.file_names.size()); +#endif + + for (int i = start; i < end; ++i) + { + auto & file_ctx = findInMap(file_infos.context_by_name, file_infos.file_names[i]); + + auto & meta = findInMap(file_infos.meta_by_inode, file_ctx.inode); + serialize(file_ctx.inode, meta); + } } size_t StorageFileLog::getMaxBlockSize() const diff --git a/src/Storages/FileLog/StorageFileLog.h b/src/Storages/FileLog/StorageFileLog.h index 929be2690e1..0251544f33f 100644 --- a/src/Storages/FileLog/StorageFileLog.h +++ b/src/Storages/FileLog/StorageFileLog.h @@ -97,10 +97,14 @@ public: static UInt64 getInode(const String & file_name); void openFilesAndSetPos(); + /// Used in shutdown() void closeFilesAndStoreMeta(); - /// Used in FileSource - void closeFileAndStoreMeta(const String & file_name); + /// Used in FileLogSource when finish generating all blocks + void closeFilesAndStoreMeta(int start, int end); + + /// Used in FileLogSource after generating every block + void storeMetas(int start, int end); static void assertStreamGood(const std::ifstream & reader); @@ -183,6 +187,7 @@ private: void serialize(UInt64 inode, const FileMeta & file_meta) const; void deserialize(); + UInt64 getLastWrittenPos(const String & file_name) const; }; }