diff --git a/src/Storages/FileLog/FileLogSource.cpp b/src/Storages/FileLog/FileLogSource.cpp index eb8cb2b28f9..0c266178142 100644 --- a/src/Storages/FileLog/FileLogSource.cpp +++ b/src/Storages/FileLog/FileLogSource.cpp @@ -42,6 +42,8 @@ FileLogSource::FileLogSource( size_t files_per_stream = file_infos.file_names.size() / max_streams_number; start = stream_number * files_per_stream; end = stream_number == max_streams_number - 1 ? file_infos.file_names.size() : (stream_number + 1) * files_per_stream; + + storage.increaseStreams(); } void FileLogSource::onFinish() diff --git a/src/Storages/FileLog/FileLogSource.h b/src/Storages/FileLog/FileLogSource.h index 0ff71c1af87..cdf60cd4fc1 100644 --- a/src/Storages/FileLog/FileLogSource.h +++ b/src/Storages/FileLog/FileLogSource.h @@ -34,6 +34,7 @@ public: { if (!finished) onFinish(); + storage.reduceStreams(); } protected: diff --git a/src/Storages/FileLog/StorageFileLog.cpp b/src/Storages/FileLog/StorageFileLog.cpp index 6489c9270ca..fb13137ce31 100644 --- a/src/Storages/FileLog/StorageFileLog.cpp +++ b/src/Storages/FileLog/StorageFileLog.cpp @@ -42,6 +42,7 @@ namespace ErrorCodes extern const int CANNOT_READ_ALL_DATA; extern const int LOGICAL_ERROR; extern const int TABLE_METADATA_ALREADY_EXISTS; + extern const int CANNOT_SELECT; } namespace @@ -326,6 +327,11 @@ Pipe StorageFileLog::read( table_id.getTableName()); } + if (running_streams.load(std::memory_order_relaxed)) + { + throw Exception("Another select query is running on this table, need to wait it finish.", ErrorCodes::CANNOT_SELECT); + } + /// We need this lock, in case read and streamToViews execute at the same time. /// In case of MV attached during reading std::lock_guard lock(status_mutex); @@ -364,6 +370,16 @@ Pipe StorageFileLog::read( return Pipe::unitePipes(std::move(pipes)); } +void StorageFileLog::increaseStreams() +{ + running_streams.fetch_add(1, std::memory_order_relaxed); +} + +void StorageFileLog::reduceStreams() +{ + running_streams.fetch_sub(1, std::memory_order_relaxed); +} + void StorageFileLog::drop() { try diff --git a/src/Storages/FileLog/StorageFileLog.h b/src/Storages/FileLog/StorageFileLog.h index 82c7a2dc9f9..bdab85866c0 100644 --- a/src/Storages/FileLog/StorageFileLog.h +++ b/src/Storages/FileLog/StorageFileLog.h @@ -122,6 +122,9 @@ public: throw Exception(ErrorCodes::LOGICAL_ERROR, "The key {} doesn't exist.", key); } + void increaseStreams(); + void reduceStreams(); + protected: StorageFileLog( const StorageID & table_id_, @@ -169,6 +172,12 @@ private: }; std::shared_ptr task; + /// In order to avoid data race, using a naive trick to forbid execute two select + /// simultaneously, although read is not useful in this engine. Using an atomic + /// variable to records current unfinishing streams, then if have unfinishing streams, + /// later select should forbid to execute. + std::atomic running_streams = 0; + using TaskThread = BackgroundSchedulePool::TaskHolder; void loadFiles();