This commit is contained in:
feng lv 2021-10-07 05:56:09 +00:00
parent 28f8e1c297
commit d9959a4f22
4 changed files with 28 additions and 0 deletions

View File

@ -42,6 +42,8 @@ FileLogSource::FileLogSource(
size_t files_per_stream = file_infos.file_names.size() / max_streams_number; size_t files_per_stream = file_infos.file_names.size() / max_streams_number;
start = stream_number * files_per_stream; start = stream_number * files_per_stream;
end = stream_number == max_streams_number - 1 ? file_infos.file_names.size() : (stream_number + 1) * files_per_stream; end = stream_number == max_streams_number - 1 ? file_infos.file_names.size() : (stream_number + 1) * files_per_stream;
storage.increaseStreams();
} }
void FileLogSource::onFinish() void FileLogSource::onFinish()

View File

@ -34,6 +34,7 @@ public:
{ {
if (!finished) if (!finished)
onFinish(); onFinish();
storage.reduceStreams();
} }
protected: protected:

View File

@ -42,6 +42,7 @@ namespace ErrorCodes
extern const int CANNOT_READ_ALL_DATA; extern const int CANNOT_READ_ALL_DATA;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int TABLE_METADATA_ALREADY_EXISTS; extern const int TABLE_METADATA_ALREADY_EXISTS;
extern const int CANNOT_SELECT;
} }
namespace namespace
@ -326,6 +327,11 @@ Pipe StorageFileLog::read(
table_id.getTableName()); table_id.getTableName());
} }
if (running_streams.load(std::memory_order_relaxed))
{
throw Exception("Another select query is running on this table, need to wait it finish.", ErrorCodes::CANNOT_SELECT);
}
/// We need this lock, in case read and streamToViews execute at the same time. /// We need this lock, in case read and streamToViews execute at the same time.
/// In case of MV attached during reading /// In case of MV attached during reading
std::lock_guard<std::mutex> lock(status_mutex); std::lock_guard<std::mutex> lock(status_mutex);
@ -364,6 +370,16 @@ Pipe StorageFileLog::read(
return Pipe::unitePipes(std::move(pipes)); return Pipe::unitePipes(std::move(pipes));
} }
void StorageFileLog::increaseStreams()
{
running_streams.fetch_add(1, std::memory_order_relaxed);
}
void StorageFileLog::reduceStreams()
{
running_streams.fetch_sub(1, std::memory_order_relaxed);
}
void StorageFileLog::drop() void StorageFileLog::drop()
{ {
try try

View File

@ -122,6 +122,9 @@ public:
throw Exception(ErrorCodes::LOGICAL_ERROR, "The key {} doesn't exist.", key); throw Exception(ErrorCodes::LOGICAL_ERROR, "The key {} doesn't exist.", key);
} }
void increaseStreams();
void reduceStreams();
protected: protected:
StorageFileLog( StorageFileLog(
const StorageID & table_id_, const StorageID & table_id_,
@ -169,6 +172,12 @@ private:
}; };
std::shared_ptr<TaskContext> task; std::shared_ptr<TaskContext> task;
/// In order to avoid data race, using a naive trick to forbid execute two select
/// simultaneously, although read is not useful in this engine. Using an atomic
/// variable to records current unfinishing streams, then if have unfinishing streams,
/// later select should forbid to execute.
std::atomic<int> running_streams = 0;
using TaskThread = BackgroundSchedulePool::TaskHolder; using TaskThread = BackgroundSchedulePool::TaskHolder;
void loadFiles(); void loadFiles();