From 7f4348ecdc678b487027a9a6f7907f098ded9ad2 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 30 Nov 2012 04:28:13 +0000 Subject: [PATCH] dbms: added locks to storages Memory and Log [#CONV-2944]. --- dbms/include/DB/Core/Block.h | 1 + dbms/include/DB/Storages/StorageLog.h | 4 ++++ dbms/include/DB/Storages/StorageMemory.h | 16 +++++++++------ dbms/src/Storages/StorageLog.cpp | 21 +++++++++++++------- dbms/src/Storages/StorageMemory.cpp | 25 +++++++++++++++++------- 5 files changed, 47 insertions(+), 20 deletions(-) diff --git a/dbms/include/DB/Core/Block.h b/dbms/include/DB/Core/Block.h index 0c48675dc59..a50c85108a0 100644 --- a/dbms/include/DB/Core/Block.h +++ b/dbms/include/DB/Core/Block.h @@ -81,5 +81,6 @@ public: }; typedef std::vector Blocks; +typedef std::list BlocksList; } diff --git a/dbms/include/DB/Storages/StorageLog.h b/dbms/include/DB/Storages/StorageLog.h index 434a62d9d20..8cfebcdf586 100644 --- a/dbms/include/DB/Storages/StorageLog.h +++ b/dbms/include/DB/Storages/StorageLog.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -76,6 +77,7 @@ public: BlockOutputStreamPtr clone() { return new LogBlockOutputStream(storage); } private: StorageLog & storage; + Poco::ScopedWriteRWLock lock; struct Stream { @@ -152,6 +154,8 @@ private: typedef std::map Files_t; Files_t files; + Poco::RWLock rwlock; + void addFile(const String & column_name, const IDataType & type, size_t level = 0); /** Прочитать файлы с засечками, если они ещё не прочитаны. diff --git a/dbms/include/DB/Storages/StorageMemory.h b/dbms/include/DB/Storages/StorageMemory.h index 188bcb52dc7..158cd66766e 100644 --- a/dbms/include/DB/Storages/StorageMemory.h +++ b/dbms/include/DB/Storages/StorageMemory.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -13,16 +15,16 @@ class StorageMemory; class MemoryBlockInputStream : public IProfilingBlockInputStream { public: - MemoryBlockInputStream(const Names & column_names_, Blocks::iterator begin_, Blocks::iterator end_); + MemoryBlockInputStream(const Names & column_names_, BlocksList::iterator begin_, BlocksList::iterator end_); String getName() const { return "MemoryBlockInputStream"; } BlockInputStreamPtr clone() { return new MemoryBlockInputStream(column_names, begin, end); } protected: Block readImpl(); private: Names column_names; - Blocks::iterator begin; - Blocks::iterator end; - Blocks::iterator it; + BlocksList::iterator begin; + BlocksList::iterator end; + BlocksList::iterator it; }; @@ -72,8 +74,10 @@ private: String name; NamesAndTypesListPtr columns; - /// Сами данные - Blocks data; + /// Сами данные. list - чтобы при вставке в конец, существующие итераторы не инвалидировались. + BlocksList data; + + Poco::FastMutex mutex; }; } diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index e807b9b71dc..94efe469721 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -40,8 +40,12 @@ Block LogBlockInputStream::readImpl() /// Если файлы не открыты, то открываем их. if (streams.empty()) + { + Poco::ScopedReadRWLock lock(storage.rwlock); + for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it) addStream(*it, *storage.getDataTypeByName(*it)); + } /// Сколько строк читать для следующего блока. size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read); @@ -117,7 +121,7 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type, LogBlockOutputStream::LogBlockOutputStream(StorageLog & storage_) - : storage(storage_) + : storage(storage_), lock(storage.rwlock) { for (NamesAndTypesList::const_iterator it = storage.columns->begin(); it != storage.columns->end(); ++it) addStream(it->first, *it->second); @@ -240,6 +244,8 @@ void StorageLog::addFile(const String & column_name, const IDataType & type, siz void StorageLog::loadMarks() { + Poco::ScopedWriteRWLock lock(rwlock); + if (loaded_marks) return; @@ -277,6 +283,8 @@ void StorageLog::loadMarks() void StorageLog::rename(const String & new_path_to_db, const String & new_name) { + Poco::ScopedWriteRWLock lock(rwlock); + /// Переименовываем директорию с данными. Poco::File(path + escapeForFileName(name)).renameTo(new_path_to_db + escapeForFileName(new_name)); @@ -302,7 +310,9 @@ BlockInputStreams StorageLog::read( check(column_names); processed_stage = QueryProcessingStage::FetchColumns; - Marks marks = files.begin()->second.marks; + Poco::ScopedReadRWLock lock(rwlock); + + const Marks & marks = files.begin()->second.marks; size_t marks_size = marks.size(); if (threads > marks_size) @@ -312,11 +322,6 @@ BlockInputStreams StorageLog::read( for (size_t thread = 0; thread < threads; ++thread) { -/* std::cerr << "Thread " << thread << ", mark " << thread * marks_size / max_threads - << ", rows " << (thread == 0 - ? marks[marks_size / max_threads - 1].rows - : (marks[(thread + 1) * marks_size / max_threads - 1].rows - marks[thread * marks_size / max_threads - 1].rows)) << std::endl;*/ - res.push_back(new LogBlockInputStream( max_block_size, column_names, @@ -341,6 +346,8 @@ BlockOutputStreamPtr StorageLog::write( void StorageLog::drop() { + Poco::ScopedWriteRWLock lock(rwlock); + for (Files_t::iterator it = files.begin(); it != files.end(); ++it) { if (it->second.data_file.exists()) diff --git a/dbms/src/Storages/StorageMemory.cpp b/dbms/src/Storages/StorageMemory.cpp index d252d508f41..a183c9e09a4 100644 --- a/dbms/src/Storages/StorageMemory.cpp +++ b/dbms/src/Storages/StorageMemory.cpp @@ -12,7 +12,7 @@ namespace DB using Poco::SharedPtr; -MemoryBlockInputStream::MemoryBlockInputStream(const Names & column_names_, Blocks::iterator begin_, Blocks::iterator end_) +MemoryBlockInputStream::MemoryBlockInputStream(const Names & column_names_, BlocksList::iterator begin_, BlocksList::iterator end_) : column_names(column_names_), begin(begin_), end(end_), it(begin) { } @@ -36,6 +36,7 @@ MemoryBlockOutputStream::MemoryBlockOutputStream(StorageMemory & storage_) void MemoryBlockOutputStream::write(const Block & block) { storage.check(block); + Poco::ScopedLock lock(storage.mutex); storage.data.push_back(block); } @@ -56,16 +57,25 @@ BlockInputStreams StorageMemory::read( check(column_names); processed_stage = QueryProcessingStage::FetchColumns; - if (threads > data.size()) - threads = data.size(); + Poco::ScopedLock lock(mutex); + + size_t size = data.size(); + + if (threads > size) + threads = size; BlockInputStreams res; for (size_t thread = 0; thread < threads; ++thread) - res.push_back(new MemoryBlockInputStream( - column_names, - data.begin() + thread * data.size() / threads, - data.begin() + (thread + 1) * data.size() / threads)); + { + BlocksList::iterator begin = data.begin(); + BlocksList::iterator end = data.begin(); + + std::advance(begin, thread * size / threads); + std::advance(end, (thread + 1) * size / threads); + + res.push_back(new MemoryBlockInputStream(column_names, begin, end)); + } return res; } @@ -80,6 +90,7 @@ BlockOutputStreamPtr StorageMemory::write( void StorageMemory::drop() { + Poco::ScopedLock lock(mutex); data.clear(); }