clickhouse: StorageLog now stores all marks in one file [#CONV-6705].

This commit is contained in:
Michael Kolupaev 2013-02-26 13:06:01 +00:00
parent 58d5f65e60
commit 2a05c2829c
3 changed files with 162 additions and 73 deletions

View File

@ -99,7 +99,8 @@ public:
drop_on_destroy = true;
}
/** Вызывается перед удалением директории с данными.
/** Вызывается перед удалением директории с данными и вызовом деструктора.
* Если не требуется никаких действий, кроме удаления директории с данными, этот метод можно не перегружать.
*/
virtual void dropImpl() {}

View File

@ -81,26 +81,29 @@ private:
struct Stream
{
Stream(const std::string & data_path, const std::string & marks_path) :
Stream(const std::string & data_path) :
plain(data_path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain),
marks(marks_path, 4096, O_APPEND | O_CREAT | O_WRONLY)
compressed(plain)
{
plain_offset = Poco::File(data_path).getSize();
}
WriteBufferFromFile plain;
CompressedWriteBuffer compressed;
WriteBufferFromFile marks;
size_t plain_offset; /// Сколько байт было в файле на момент создания LogBlockOutputStream.
};
typedef std::vector<std::pair<size_t, Mark> > MarksForColumns;
typedef std::map<std::string, SharedPtr<Stream> > FileStreams;
FileStreams streams;
WriteBufferFromFile marks_stream; /// Объявлен ниже lock, чтобы файл открывался при захваченном rwlock.
void addStream(const String & name, const IDataType & type, size_t level = 0);
void writeData(const String & name, const IDataType & type, const IColumn & column, size_t level = 0);
void writeData(const String & name, const IDataType & type, const IColumn & column, MarksForColumns & out_marks, size_t level = 0);
void writeMarks(MarksForColumns marks);
};
@ -136,8 +139,6 @@ public:
BlockOutputStreamPtr write(
ASTPtr query);
void dropImpl();
void rename(const String & new_path_to_db, const String & new_name);
protected:
@ -149,9 +150,14 @@ protected:
StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_);
/// Прочитать файлы с засечками, если они ещё не прочитаны.
/// Делается лениво, чтобы при большом количестве таблиц, сервер быстро стартовал.
/// Нельзя вызывать с залоченным на запись rwlock.
void loadMarks();
/// Записать засечки. Используется только для конвертации файлов засечек из старого формата в новый (для обратной совместимости).
void writeAllMarks();
/// Можно вызывать при любом состоянии rwlock.
size_t marksCount();
@ -169,18 +175,22 @@ private:
/// Данные столбца
struct ColumnData
{
/// Задает номер столбца в файле с засечками.
/// Не обязательно совпадает с номером столбца среди столбцов таблицы: здесь нумеруются также столбцы с длинами массивов.
size_t column_index;
Poco::File data_file;
Poco::File marks_file;
Marks marks;
};
typedef std::map<String, ColumnData> Files_t;
Files_t files;
Files_t files; /// name -> data
Names column_names; /// column_index -> name
Poco::File marks_file;
/// Порядок добавления файлов не должен меняться: он соответствует порядку столбцов в файле с засечками.
void addFile(const String & column_name, const IDataType & type, size_t level = 0);
/** Прочитать файлы с засечками, если они ещё не прочитаны.
* Делается лениво, чтобы при большом количестве таблиц, сервер быстро стартовал.
*/
bool loaded_marks;
};

View File

@ -17,7 +17,8 @@
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
#define DBMS_STORAGE_LOG_MARKS_FILE_EXTENSION ".mrk"
#define DBMS_STORAGE_LOG_MARKS_FILE_EXTENSION ".mrk"
#define DBMS_STORAGE_LOG_MARKS_FILE_NAME "__marks.mrk"
namespace DB
@ -122,7 +123,8 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type,
LogBlockOutputStream::LogBlockOutputStream(StoragePtr owned_storage)
: IBlockOutputStream(owned_storage), storage(dynamic_cast<StorageLog &>(*owned_storage)), lock(storage.rwlock)
: IBlockOutputStream(owned_storage), storage(dynamic_cast<StorageLog &>(*owned_storage)),
lock(storage.rwlock), marks_stream(storage.marks_file.path(), 4096, O_APPEND | O_CREAT | O_WRONLY)
{
for (NamesAndTypesList::const_iterator it = storage.columns->begin(); it != storage.columns->end(); ++it)
addStream(it->first, *it->second);
@ -133,11 +135,14 @@ void LogBlockOutputStream::write(const Block & block)
{
storage.check(block, true);
MarksForColumns marks;
marks.reserve(storage.files.size());
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithNameAndType & column = block.getByPosition(i);
writeData(column.name, *column.type, *column.column);
writeData(column.name, *column.type, *column.column, marks);
}
writeMarks(marks);
}
@ -148,19 +153,17 @@ void LogBlockOutputStream::addStream(const String & name, const IDataType & type
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco::NumberFormatter::format(level);
streams.insert(std::make_pair(size_name, new Stream(
storage.files[size_name].data_file.path(),
storage.files[size_name].marks_file.path())));
storage.files[size_name].data_file.path())));
addStream(name, *type_arr->getNestedType(), level + 1);
}
else
streams.insert(std::make_pair(name, new Stream(
storage.files[name].data_file.path(),
storage.files[name].marks_file.path())));
storage.files[name].data_file.path())));
}
void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, size_t level)
void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, MarksForColumns & out_marks, size_t level)
{
/// Для массивов требуется сначала сериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
@ -171,15 +174,12 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
mark.rows = (storage.files[size_name].marks.empty() ? 0 : storage.files[size_name].marks.back().rows) + column.size();
mark.offset = streams[size_name]->plain_offset + streams[size_name]->plain.count();
writeIntBinary(mark.rows, streams[size_name]->marks);
writeIntBinary(mark.offset, streams[size_name]->marks);
storage.files[size_name].marks.push_back(mark);
out_marks.push_back(std::make_pair(storage.files[size_name].column_index, mark));
type_arr->serializeOffsets(column, streams[size_name]->compressed);
streams[size_name]->compressed.next();
writeData(name, *type_arr->getNestedType(), dynamic_cast<const ColumnArray &>(column).getData(), level + 1);
writeData(name, *type_arr->getNestedType(), dynamic_cast<const ColumnArray &>(column).getData(), out_marks, level + 1);
}
else
{
@ -187,16 +187,39 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
mark.rows = (storage.files[name].marks.empty() ? 0 : storage.files[name].marks.back().rows) + column.size();
mark.offset = streams[name]->plain_offset + streams[name]->plain.count();
writeIntBinary(mark.rows, streams[name]->marks);
writeIntBinary(mark.offset, streams[name]->marks);
storage.files[name].marks.push_back(mark);
out_marks.push_back(std::make_pair(storage.files[name].column_index, mark));
type.serializeBinary(column, streams[name]->compressed);
streams[name]->compressed.next();
}
}
static bool ColumnIndexLess(const std::pair<size_t, Mark> & a, const std::pair<size_t, Mark> & b)
{
return a.first < b.first;
}
void LogBlockOutputStream::writeMarks(MarksForColumns marks)
{
if (marks.size() != storage.files.size())
throw Exception("Wrong number of marks generated from block. Makes no sense.", ErrorCodes::LOGICAL_ERROR);
sort(marks.begin(), marks.end(), ColumnIndexLess);
for (size_t i = 0; i < marks.size(); ++i)
{
if (marks[i].first != i)
throw Exception("Invalid marks generated from block. Makes no sense.", ErrorCodes::LOGICAL_ERROR);
Mark mark = marks[i].second;
writeIntBinary(mark.rows, marks_stream);
writeIntBinary(mark.offset, marks_stream);
storage.files[storage.column_names[i]].marks.push_back(mark);
}
}
StorageLog::StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_)
: path(path_), name(name_), columns(columns_), loaded_marks(false)
@ -209,6 +232,8 @@ StorageLog::StorageLog(const std::string & path_, const std::string & name_, Nam
for (NamesAndTypesList::const_iterator it = columns->begin(); it != columns->end(); ++it)
addFile(it->first, *it->second);
marks_file = Poco::File(path + escapeForFileName(name) + '/' + DBMS_STORAGE_LOG_MARKS_FILE_NAME);
}
StoragePtr StorageLog::create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_)
@ -227,23 +252,23 @@ void StorageLog::addFile(const String & column_name, const IDataType & type, siz
{
String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco::NumberFormatter::format(level);
ColumnData column_data;
files.insert(std::make_pair(column_name + size_column_suffix, column_data));
files[column_name + size_column_suffix].data_file = Poco::File(
ColumnData & column_data = files.insert(std::make_pair(column_name + size_column_suffix, ColumnData())).first->second;
column_data.column_index = column_names.size();
column_data.data_file = Poco::File(
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + size_column_suffix + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION);
files[column_name + size_column_suffix].marks_file = Poco::File(
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + size_column_suffix + DBMS_STORAGE_LOG_MARKS_FILE_EXTENSION);
column_names.push_back(column_name + size_column_suffix);
addFile(column_name, *type_arr->getNestedType(), level + 1);
}
else
{
ColumnData column_data;
files.insert(std::make_pair(column_name, column_data));
files[column_name].data_file = Poco::File(
ColumnData & column_data = files.insert(std::make_pair(column_name, ColumnData())).first->second;
column_data.column_index = column_names.size();
column_data.data_file = Poco::File(
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION);
files[column_name].marks_file = Poco::File(
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + DBMS_STORAGE_LOG_MARKS_FILE_EXTENSION);
column_names.push_back(column_name);
}
}
@ -255,30 +280,90 @@ void StorageLog::loadMarks()
if (loaded_marks)
return;
ssize_t size_of_marks_file = -1;
typedef std::vector<Files_t::iterator> FilesByIndex;
FilesByIndex files_by_index(files.size());
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
{
/// Считаем засечки
if (it->second.marks_file.exists())
files_by_index[it->second.column_index] = it;
}
if (marks_file.exists())
{
size_t file_size = marks_file.getSize();
if (file_size % (files.size() * sizeof(Mark)) != 0)
throw Exception("Size of marks file is inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);
int marks_count = file_size / (files.size() * sizeof(Mark));
for (size_t i = 0; i < files_by_index.size(); ++i)
{
ssize_t size_of_current_marks_file = it->second.marks_file.getSize();
files_by_index[i]->second.marks.reserve(marks_count);
}
if (size_of_current_marks_file % sizeof(Mark) != 0)
throw Exception("Sizes of marks files are inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);
if (-1 == size_of_marks_file)
size_of_marks_file = size_of_current_marks_file;
else if (size_of_marks_file != size_of_current_marks_file)
throw Exception("Sizes of marks files are inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);
it->second.marks.reserve(it->second.marks_file.getSize() / sizeof(Mark));
ReadBufferFromFile marks_rb(it->second.marks_file.path(), 32768);
while (!marks_rb.eof())
ReadBufferFromFile marks_rb(marks_file.path(), 32768);
while (!marks_rb.eof())
{
for (size_t i = 0; i < files_by_index.size(); ++i)
{
Mark mark;
readIntBinary(mark.rows, marks_rb);
readIntBinary(mark.offset, marks_rb);
it->second.marks.push_back(mark);
files_by_index[i]->second.marks.push_back(mark);
}
}
}
else
{
/// Сконвертируем засечки из старого формата (по файлу для каждого столбца) в новый (один файл для всех столбцов).
ssize_t size_of_marks_file = -1;
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
{
Poco::File marks_file(path + escapeForFileName(name) + '/' + Poco::Path(it->second.data_file.path()).getBaseName() + DBMS_STORAGE_LOG_MARKS_FILE_EXTENSION);
if (marks_file.exists())
{
ssize_t size_of_current_marks_file = marks_file.getSize();
if (size_of_current_marks_file % sizeof(Mark) != 0)
throw Exception("Sizes of marks files are inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);
if (-1 == size_of_marks_file)
size_of_marks_file = size_of_current_marks_file;
else if (size_of_marks_file != size_of_current_marks_file)
throw Exception("Sizes of marks files are inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);
it->second.marks.reserve(marks_file.getSize() / sizeof(Mark));
ReadBufferFromFile marks_rb(marks_file.path(), 32768);
while (!marks_rb.eof())
{
Mark mark;
readIntBinary(mark.rows, marks_rb);
readIntBinary(mark.offset, marks_rb);
it->second.marks.push_back(mark);
}
}
}
if (size_of_marks_file != -1)
{
/// Запишем засечки в новом формате.
WriteBufferFromFile marks_wb(marks_file.path(), 4096, O_APPEND | O_CREAT | O_WRONLY);
size_t marks_count = marksCount();
for (size_t mark_index = 0; mark_index < marks_count; ++mark_index)
{
for (size_t i = 0; i < files_by_index.size(); ++i)
{
Mark mark = files_by_index[i]->second.marks[mark_index];
writeIntBinary(mark.rows, marks_wb);
writeIntBinary(mark.offset, marks_wb);
}
}
/// Удалим файлы с засечками.
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
{
Poco::File marks_file(path + escapeForFileName(name) + '/' + Poco::Path(it->second.data_file.path()).getBaseName() + DBMS_STORAGE_LOG_MARKS_FILE_EXTENSION);
marks_file.remove();
}
}
}
@ -287,6 +372,12 @@ void StorageLog::loadMarks()
}
void StorageLog::writeAllMarks()
{
}
size_t StorageLog::marksCount()
{
return files.begin()->second.marks.size();
@ -306,8 +397,9 @@ void StorageLog::rename(const String & new_path_to_db, const String & new_name)
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
{
it->second.data_file = Poco::File(path + escapeForFileName(name) + '/' + Poco::Path(it->second.data_file.path()).getFileName());
it->second.marks_file = Poco::File(path + escapeForFileName(name) + '/' + Poco::Path(it->second.marks_file.path()).getFileName());
}
marks_file = Poco::File(path + escapeForFileName(name) + '/' + DBMS_STORAGE_LOG_MARKS_FILE_NAME);
}
@ -376,18 +468,4 @@ BlockOutputStreamPtr StorageLog::write(
}
void StorageLog::dropImpl()
{
Poco::ScopedWriteRWLock lock(rwlock);
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
{
if (it->second.data_file.exists())
it->second.data_file.remove();
if (it->second.marks_file.exists())
it->second.marks_file.remove();
}
}
}