dbms: added locks to storages Memory and Log [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-11-30 04:28:13 +00:00
parent 5aa8c9efc4
commit 7f4348ecdc
5 changed files with 47 additions and 20 deletions

View File

@ -81,5 +81,6 @@ public:
};
typedef std::vector<Block> Blocks;
typedef std::list<Block> BlocksList;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Poco/File.h>
#include <Poco/RWLock.h>
#include <DB/Core/NamesAndTypes.h>
#include <DB/IO/ReadBufferFromFile.h>
@ -76,6 +77,7 @@ public:
BlockOutputStreamPtr clone() { return new LogBlockOutputStream(storage); }
private:
StorageLog & storage;
Poco::ScopedWriteRWLock lock;
struct Stream
{
@ -152,6 +154,8 @@ private:
typedef std::map<String, ColumnData> Files_t;
Files_t files;
Poco::RWLock rwlock;
void addFile(const String & column_name, const IDataType & type, size_t level = 0);
/** Прочитать файлы с засечками, если они ещё не прочитаны.

View File

@ -1,5 +1,7 @@
#pragma once
#include <Poco/Mutex.h>
#include <DB/Core/NamesAndTypes.h>
#include <DB/Storages/IStorage.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
@ -13,16 +15,16 @@ class StorageMemory;
class MemoryBlockInputStream : public IProfilingBlockInputStream
{
public:
MemoryBlockInputStream(const Names & column_names_, Blocks::iterator begin_, Blocks::iterator end_);
MemoryBlockInputStream(const Names & column_names_, BlocksList::iterator begin_, BlocksList::iterator end_);
String getName() const { return "MemoryBlockInputStream"; }
BlockInputStreamPtr clone() { return new MemoryBlockInputStream(column_names, begin, end); }
protected:
Block readImpl();
private:
Names column_names;
Blocks::iterator begin;
Blocks::iterator end;
Blocks::iterator it;
BlocksList::iterator begin;
BlocksList::iterator end;
BlocksList::iterator it;
};
@ -72,8 +74,10 @@ private:
String name;
NamesAndTypesListPtr columns;
/// Сами данные
Blocks data;
/// Сами данные. list - чтобы при вставке в конец, существующие итераторы не инвалидировались.
BlocksList data;
Poco::FastMutex mutex;
};
}

View File

@ -40,8 +40,12 @@ Block LogBlockInputStream::readImpl()
/// Если файлы не открыты, то открываем их.
if (streams.empty())
{
Poco::ScopedReadRWLock lock(storage.rwlock);
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
addStream(*it, *storage.getDataTypeByName(*it));
}
/// Сколько строк читать для следующего блока.
size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read);
@ -117,7 +121,7 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type,
LogBlockOutputStream::LogBlockOutputStream(StorageLog & storage_)
: storage(storage_)
: storage(storage_), lock(storage.rwlock)
{
for (NamesAndTypesList::const_iterator it = storage.columns->begin(); it != storage.columns->end(); ++it)
addStream(it->first, *it->second);
@ -240,6 +244,8 @@ void StorageLog::addFile(const String & column_name, const IDataType & type, siz
void StorageLog::loadMarks()
{
Poco::ScopedWriteRWLock lock(rwlock);
if (loaded_marks)
return;
@ -277,6 +283,8 @@ void StorageLog::loadMarks()
void StorageLog::rename(const String & new_path_to_db, const String & new_name)
{
Poco::ScopedWriteRWLock lock(rwlock);
/// Переименовываем директорию с данными.
Poco::File(path + escapeForFileName(name)).renameTo(new_path_to_db + escapeForFileName(new_name));
@ -302,7 +310,9 @@ BlockInputStreams StorageLog::read(
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
Marks marks = files.begin()->second.marks;
Poco::ScopedReadRWLock lock(rwlock);
const Marks & marks = files.begin()->second.marks;
size_t marks_size = marks.size();
if (threads > marks_size)
@ -312,11 +322,6 @@ BlockInputStreams StorageLog::read(
for (size_t thread = 0; thread < threads; ++thread)
{
/* std::cerr << "Thread " << thread << ", mark " << thread * marks_size / max_threads
<< ", rows " << (thread == 0
? marks[marks_size / max_threads - 1].rows
: (marks[(thread + 1) * marks_size / max_threads - 1].rows - marks[thread * marks_size / max_threads - 1].rows)) << std::endl;*/
res.push_back(new LogBlockInputStream(
max_block_size,
column_names,
@ -341,6 +346,8 @@ BlockOutputStreamPtr StorageLog::write(
void StorageLog::drop()
{
Poco::ScopedWriteRWLock lock(rwlock);
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
{
if (it->second.data_file.exists())

View File

@ -12,7 +12,7 @@ namespace DB
using Poco::SharedPtr;
MemoryBlockInputStream::MemoryBlockInputStream(const Names & column_names_, Blocks::iterator begin_, Blocks::iterator end_)
MemoryBlockInputStream::MemoryBlockInputStream(const Names & column_names_, BlocksList::iterator begin_, BlocksList::iterator end_)
: column_names(column_names_), begin(begin_), end(end_), it(begin)
{
}
@ -36,6 +36,7 @@ MemoryBlockOutputStream::MemoryBlockOutputStream(StorageMemory & storage_)
void MemoryBlockOutputStream::write(const Block & block)
{
storage.check(block);
Poco::ScopedLock<Poco::FastMutex> lock(storage.mutex);
storage.data.push_back(block);
}
@ -56,16 +57,25 @@ BlockInputStreams StorageMemory::read(
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
if (threads > data.size())
threads = data.size();
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
size_t size = data.size();
if (threads > size)
threads = size;
BlockInputStreams res;
for (size_t thread = 0; thread < threads; ++thread)
res.push_back(new MemoryBlockInputStream(
column_names,
data.begin() + thread * data.size() / threads,
data.begin() + (thread + 1) * data.size() / threads));
{
BlocksList::iterator begin = data.begin();
BlocksList::iterator end = data.begin();
std::advance(begin, thread * size / threads);
std::advance(end, (thread + 1) * size / threads);
res.push_back(new MemoryBlockInputStream(column_names, begin, end));
}
return res;
}
@ -80,6 +90,7 @@ BlockOutputStreamPtr StorageMemory::write(
void StorageMemory::drop()
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
data.clear();
}