dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-07-19 20:32:10 +00:00
parent c390fcb123
commit 5aac7088d1
2 changed files with 249 additions and 19 deletions

View File

@ -1,6 +1,7 @@
#pragma once
#include <statdaemons/Increment.h>
#include <Yandex/MultiVersion.h>
#include <DB/Core/SortDescription.h>
#include <DB/Interpreters/Context.h>
@ -35,6 +36,10 @@ namespace DB
* Внутри директории с куском:
* Column.bin - данные столбца
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
*
* TODO: Может быть, все .mrk писать в один файл, так как их мало.
* А если надо, можно и все .bin писать в один файл
* (но, наверное, это усложнит добавление новых столбцов).
*/
class StorageMergeTree : public IStorage
{
@ -99,7 +104,80 @@ private:
Increment increment;
static String getPartName(Yandex::DayNum_t left_month, Yandex::DayNum_t right_month, UInt64 left_id, UInt64 right_id, UInt64 level);
Logger * log;
/// Описание куска с данными.
struct DataPart
{
Yandex::DayNum_t left_date;
Yandex::DayNum_t right_date;
UInt64 left;
UInt64 right;
UInt32 level;
std::string name;
size_t size; /// в количестве засечек.
time_t modification_time;
Yandex::DayNum_t left_month;
Yandex::DayNum_t right_month;
/// TODO рефкаунт для того, чтобы можно было определить, когда можно удалить кусок.
void remove() const
{
/// TODO
}
bool operator< (const DataPart & rhs) const
{
if (left_month < rhs.left_month)
return true;
if (left_month > rhs.left_month)
return false;
if (right_month < rhs.right_month)
return true;
if (right_month > rhs.right_month)
return false;
if (left < rhs.left)
return true;
if (left > rhs.left)
return false;
if (right < rhs.right)
return true;
if (right > rhs.right)
return false;
if (level < rhs.level)
return true;
return false;
}
/// Содержит другой кусок (получен после объединения другого куска с каким-то ещё)
bool contains(const DataPart & rhs) const
{
return left_month == rhs.left_month /// Куски за разные месяцы не объединяются
&& right_month == rhs.right_month
&& level > rhs.level
&& left_date <= rhs.left_date
&& right_date >= rhs.right_date
&& left <= rhs.left
&& right >= rhs.right
&& (left == rhs.left /// У кусков общее начало или конец.
|| right == rhs.right); /// (только такие образуются после объединения)
}
};
/// Множество кусков с данными. Оно обычно небольшое (десятки элементов).
typedef std::set<DataPart> DataParts;
Yandex::MultiVersion<DataParts> data_parts;
static String getPartName(Yandex::DayNum_t left_date, Yandex::DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level);
/// Загрузить множество кусков с данными с диска.
void loadDataParts();
};
}

View File

@ -1,18 +1,30 @@
#include <boost/bind.hpp>
#include <Poco/DirectoryIterator.h>
#include <Poco/NumberParser.h>
#include <Yandex/time2str.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Interpreters/sortBlock.h>
#include <DB/Storages/StorageMergeTree.h>
#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
namespace DB
{
@ -181,6 +193,112 @@ private:
};
/// Для чтения из одного куска. Для чтения сразу из многих, Storage использует сразу много таких объектов.
class MergeTreeBlockInputStream : public IProfilingBlockInputStream
{
public:
MergeTreeBlockInputStream(const String & path_, /// Путь к куску
size_t block_size_, const Names & column_names_, StorageMergeTree & storage_,
size_t mark_number_, size_t rows_limit_)
: path(path_), block_size(block_size_), column_names(column_names_),
storage(storage_), mark_number(mark_number_), rows_limit(rows_limit_), rows_read(0)
{
}
Block readImpl()
{
Block res;
if (rows_read == rows_limit)
return res;
/// Если файлы не открыты, то открываем их.
if (streams.empty())
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
streams.insert(std::make_pair(*it, new Stream(
path + escapeForFileName(*it),
mark_number)));
/// Сколько строк читать для следующего блока.
size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read);
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
{
String column_path = escapeForFileName(*it);
ReadBufferFromFile plain(path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
CompressedReadBuffer compressed(plain);
ColumnWithNameAndType column;
column.name = *it;
column.type = storage.getDataTypeByName(*it);
column.column = column.type->createColumn();
column.type->deserializeBinary(*column.column, streams[column.name]->compressed, max_rows_to_read);
if (column.column->size())
res.insert(column);
}
if (res)
rows_read += res.rows();
if (!res || rows_read == rows_limit)
{
/** Закрываем файлы (ещё до уничтожения объекта).
* Чтобы при создании многих источников, но одновременном чтении только из нескольких,
* буферы не висели в памяти.
*/
streams.clear();
}
return res;
}
String getName() const { return "MergeTreeBlockInputStream"; }
BlockInputStreamPtr clone() { return new MergeTreeBlockInputStream(path, block_size, column_names, storage, mark_number, rows_limit); }
private:
const String path;
size_t block_size;
Names column_names;
StorageMergeTree & storage;
size_t mark_number; /// С какой засечки читать данные
size_t rows_limit; /// Максимальное количество строк, которых можно прочитать
size_t rows_read;
struct Stream
{
Stream(const String & path_prefix, size_t mark_number)
: plain(path_prefix + ".bin", std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path_prefix + ".bin").getSize())),
compressed(plain)
{
if (mark_number)
{
/// Прочитаем из файла с засечками смещение в файле с данными.
ReadBufferFromFile marks(path_prefix + ".mrk", MERGE_TREE_MARK_SIZE);
marks.seek(mark_number * MERGE_TREE_MARK_SIZE);
size_t offset_in_compressed_file = 0;
size_t offset_in_decompressed_block = 0;
readIntBinary(offset_in_compressed_file, marks);
readIntBinary(offset_in_decompressed_block, marks);
plain.seek(offset_in_compressed_file);
compressed.next();
compressed.position() += offset_in_decompressed_block;
}
}
ReadBufferFromFile plain;
CompressedReadBuffer compressed;
};
typedef std::map<std::string, SharedPtr<Stream> > FileStreams;
FileStreams streams;
};
StorageMergeTree::StorageMergeTree(
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
Context & context_,
@ -189,7 +307,7 @@ StorageMergeTree::StorageMergeTree(
: path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), columns(columns_),
context(context_), primary_expr_ast(primary_expr_ast_->clone()),
date_column_name(date_column_name_), index_granularity(index_granularity_),
increment(full_path + "increment.txt")
increment(full_path + "increment.txt"), log(&Logger::get("StorageMergeTree: " + name))
{
/// создаём директорию, если её нет
Poco::File(full_path).createDirectories();
@ -206,6 +324,8 @@ StorageMergeTree::StorageMergeTree(
context.columns = *columns;
primary_expr = new Expression(primary_expr_ast, context);
loadDataParts();
}
@ -215,32 +335,21 @@ BlockOutputStreamPtr StorageMergeTree::write(ASTPtr query)
}
String StorageMergeTree::getPartName(Yandex::DayNum_t left_month, Yandex::DayNum_t right_month, UInt64 left_id, UInt64 right_id, UInt64 level)
String StorageMergeTree::getPartName(Yandex::DayNum_t left_date, Yandex::DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level)
{
Yandex::DateLUTSingleton & date_lut = Yandex::DateLUTSingleton::instance();
/// Имя директории для куска иммет вид: YYYYMM_YYYYMM_N_N_0.
/// Имя директории для куска иммет вид: YYYYMMDD_YYYYMMDD_N_N_L.
String res;
{
unsigned min_y = date_lut.toYear(left_month);
unsigned max_y = date_lut.toYear(right_month);
unsigned min_m = date_lut.toMonth(left_month);
unsigned max_m = date_lut.toMonth(right_month);
unsigned left_date_id = Yandex::Date2OrderedIdentifier(date_lut.fromDayNum(left_date));
unsigned right_date_id = Yandex::Date2OrderedIdentifier(date_lut.fromDayNum(right_date));
WriteBufferFromString wb(res);
writeIntText(min_y, wb);
if (min_m < 10)
writeChar('0', wb);
writeIntText(min_m, wb);
writeIntText(left_date_id, wb);
writeChar('_', wb);
writeIntText(max_y, wb);
if (max_m < 10)
writeChar('0', wb);
writeIntText(max_m, wb);
writeIntText(right_date_id, wb);
writeChar('_', wb);
writeIntText(left_id, wb);
writeChar('_', wb);
@ -252,4 +361,47 @@ String StorageMergeTree::getPartName(Yandex::DayNum_t left_month, Yandex::DayNum
return res;
}
void StorageMergeTree::loadDataParts()
{
LOG_DEBUG(log, "Loading data parts");
Yandex::DateLUTSingleton & date_lut = Yandex::DateLUTSingleton::instance();
SharedPtr<DataParts> new_data_parts = new DataParts;
static Poco::RegularExpression file_name_regexp("^(\\d{8})_(\\d{8})_(\\d+)_(\\d+)_(\\d+)");
Poco::DirectoryIterator end;
Poco::RegularExpression::MatchVec matches;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
{
std::string file_name = it.name();
if (!(file_name_regexp.match(file_name, 0, matches) && 6 == matches.size()))
continue;
DataPart part;
part.left_date = date_lut.toDayNum(Yandex::OrderedIdentifier2Date(file_name.substr(matches[1].offset, matches[1].length)));
part.right_date = date_lut.toDayNum(Yandex::OrderedIdentifier2Date(file_name.substr(matches[2].offset, matches[2].length)));
part.left = Poco::NumberParser::parseUnsigned64(file_name.substr(matches[3].offset, matches[3].length));
part.right = Poco::NumberParser::parseUnsigned64(file_name.substr(matches[4].offset, matches[4].length));
part.level = Poco::NumberParser::parseUnsigned(file_name.substr(matches[5].offset, matches[5].length));
part.name = file_name;
/// Размер - в количестве засечек.
part.size = Poco::File(full_path + file_name + "/" + escapeForFileName(columns->front().first) + ".mrk").getSize()
/ MERGE_TREE_MARK_SIZE;
part.modification_time = it->getLastModified().epochTime();
part.left_month = date_lut.toFirstDayOfMonth(part.left_date);
part.right_month = date_lut.toFirstDayOfMonth(part.right_date);
new_data_parts->insert(part);
}
data_parts.set(new_data_parts);
LOG_DEBUG(log, "Loaded data parts (" << new_data_parts->size() << " items)");
}
}