From 5aac7088d176fa12e41e467d8bdc03daa8807c5c Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 19 Jul 2012 20:32:10 +0000 Subject: [PATCH] dbms: development [#CONV-2944]. --- dbms/include/DB/Storages/StorageMergeTree.h | 80 ++++++++- dbms/src/Storages/StorageMergeTree.cpp | 188 ++++++++++++++++++-- 2 files changed, 249 insertions(+), 19 deletions(-) diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index 06c7761ca03..bd9315e6165 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -35,6 +36,10 @@ namespace DB * Внутри директории с куском: * Column.bin - данные столбца * Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк. + * + * TODO: Может быть, все .mrk писать в один файл, так как их мало. + * А если надо, можно и все .bin писать в один файл + * (но, наверное, это усложнит добавление новых столбцов). */ class StorageMergeTree : public IStorage { @@ -99,7 +104,80 @@ private: Increment increment; - static String getPartName(Yandex::DayNum_t left_month, Yandex::DayNum_t right_month, UInt64 left_id, UInt64 right_id, UInt64 level); + Logger * log; + + /// Описание куска с данными. + struct DataPart + { + Yandex::DayNum_t left_date; + Yandex::DayNum_t right_date; + UInt64 left; + UInt64 right; + UInt32 level; + + std::string name; + size_t size; /// в количестве засечек. + time_t modification_time; + + Yandex::DayNum_t left_month; + Yandex::DayNum_t right_month; + + /// TODO рефкаунт для того, чтобы можно было определить, когда можно удалить кусок. + + void remove() const + { + /// TODO + } + + bool operator< (const DataPart & rhs) const + { + if (left_month < rhs.left_month) + return true; + if (left_month > rhs.left_month) + return false; + if (right_month < rhs.right_month) + return true; + if (right_month > rhs.right_month) + return false; + + if (left < rhs.left) + return true; + if (left > rhs.left) + return false; + if (right < rhs.right) + return true; + if (right > rhs.right) + return false; + + if (level < rhs.level) + return true; + + return false; + } + + /// Содержит другой кусок (получен после объединения другого куска с каким-то ещё) + bool contains(const DataPart & rhs) const + { + return left_month == rhs.left_month /// Куски за разные месяцы не объединяются + && right_month == rhs.right_month + && level > rhs.level + && left_date <= rhs.left_date + && right_date >= rhs.right_date + && left <= rhs.left + && right >= rhs.right + && (left == rhs.left /// У кусков общее начало или конец. + || right == rhs.right); /// (только такие образуются после объединения) + } + }; + + /// Множество кусков с данными. Оно обычно небольшое (десятки элементов). + typedef std::set DataParts; + Yandex::MultiVersion data_parts; + + static String getPartName(Yandex::DayNum_t left_date, Yandex::DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level); + + /// Загрузить множество кусков с данными с диска. + void loadDataParts(); }; } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 4c48bb2a298..a8924822fbd 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -1,18 +1,30 @@ #include +#include +#include + +#include + #include #include #include #include +#include +#include #include +#include + #include #include +#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t)) + + namespace DB { @@ -181,6 +193,112 @@ private: }; +/// Для чтения из одного куска. Для чтения сразу из многих, Storage использует сразу много таких объектов. +class MergeTreeBlockInputStream : public IProfilingBlockInputStream +{ +public: + MergeTreeBlockInputStream(const String & path_, /// Путь к куску + size_t block_size_, const Names & column_names_, StorageMergeTree & storage_, + size_t mark_number_, size_t rows_limit_) + : path(path_), block_size(block_size_), column_names(column_names_), + storage(storage_), mark_number(mark_number_), rows_limit(rows_limit_), rows_read(0) + { + } + + Block readImpl() + { + Block res; + + if (rows_read == rows_limit) + return res; + + /// Если файлы не открыты, то открываем их. + if (streams.empty()) + for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it) + streams.insert(std::make_pair(*it, new Stream( + path + escapeForFileName(*it), + mark_number))); + + /// Сколько строк читать для следующего блока. + size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read); + + for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it) + { + String column_path = escapeForFileName(*it); + ReadBufferFromFile plain(path, std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize())); + CompressedReadBuffer compressed(plain); + + ColumnWithNameAndType column; + column.name = *it; + column.type = storage.getDataTypeByName(*it); + column.column = column.type->createColumn(); + column.type->deserializeBinary(*column.column, streams[column.name]->compressed, max_rows_to_read); + + if (column.column->size()) + res.insert(column); + } + + if (res) + rows_read += res.rows(); + + if (!res || rows_read == rows_limit) + { + /** Закрываем файлы (ещё до уничтожения объекта). + * Чтобы при создании многих источников, но одновременном чтении только из нескольких, + * буферы не висели в памяти. + */ + streams.clear(); + } + + return res; + } + + String getName() const { return "MergeTreeBlockInputStream"; } + BlockInputStreamPtr clone() { return new MergeTreeBlockInputStream(path, block_size, column_names, storage, mark_number, rows_limit); } + +private: + const String path; + size_t block_size; + Names column_names; + StorageMergeTree & storage; + size_t mark_number; /// С какой засечки читать данные + size_t rows_limit; /// Максимальное количество строк, которых можно прочитать + + size_t rows_read; + + struct Stream + { + Stream(const String & path_prefix, size_t mark_number) + : plain(path_prefix + ".bin", std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path_prefix + ".bin").getSize())), + compressed(plain) + { + if (mark_number) + { + /// Прочитаем из файла с засечками смещение в файле с данными. + ReadBufferFromFile marks(path_prefix + ".mrk", MERGE_TREE_MARK_SIZE); + marks.seek(mark_number * MERGE_TREE_MARK_SIZE); + + size_t offset_in_compressed_file = 0; + size_t offset_in_decompressed_block = 0; + + readIntBinary(offset_in_compressed_file, marks); + readIntBinary(offset_in_decompressed_block, marks); + + plain.seek(offset_in_compressed_file); + compressed.next(); + compressed.position() += offset_in_decompressed_block; + } + } + + ReadBufferFromFile plain; + CompressedReadBuffer compressed; + }; + + typedef std::map > FileStreams; + FileStreams streams; +}; + + StorageMergeTree::StorageMergeTree( const String & path_, const String & name_, NamesAndTypesListPtr columns_, Context & context_, @@ -189,7 +307,7 @@ StorageMergeTree::StorageMergeTree( : path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), columns(columns_), context(context_), primary_expr_ast(primary_expr_ast_->clone()), date_column_name(date_column_name_), index_granularity(index_granularity_), - increment(full_path + "increment.txt") + increment(full_path + "increment.txt"), log(&Logger::get("StorageMergeTree: " + name)) { /// создаём директорию, если её нет Poco::File(full_path).createDirectories(); @@ -206,6 +324,8 @@ StorageMergeTree::StorageMergeTree( context.columns = *columns; primary_expr = new Expression(primary_expr_ast, context); + + loadDataParts(); } @@ -215,32 +335,21 @@ BlockOutputStreamPtr StorageMergeTree::write(ASTPtr query) } -String StorageMergeTree::getPartName(Yandex::DayNum_t left_month, Yandex::DayNum_t right_month, UInt64 left_id, UInt64 right_id, UInt64 level) +String StorageMergeTree::getPartName(Yandex::DayNum_t left_date, Yandex::DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level) { Yandex::DateLUTSingleton & date_lut = Yandex::DateLUTSingleton::instance(); - /// Имя директории для куска иммет вид: YYYYMM_YYYYMM_N_N_0. + /// Имя директории для куска иммет вид: YYYYMMDD_YYYYMMDD_N_N_L. String res; { - unsigned min_y = date_lut.toYear(left_month); - unsigned max_y = date_lut.toYear(right_month); - unsigned min_m = date_lut.toMonth(left_month); - unsigned max_m = date_lut.toMonth(right_month); + unsigned left_date_id = Yandex::Date2OrderedIdentifier(date_lut.fromDayNum(left_date)); + unsigned right_date_id = Yandex::Date2OrderedIdentifier(date_lut.fromDayNum(right_date)); WriteBufferFromString wb(res); - writeIntText(min_y, wb); - if (min_m < 10) - writeChar('0', wb); - writeIntText(min_m, wb); - + writeIntText(left_date_id, wb); writeChar('_', wb); - - writeIntText(max_y, wb); - if (max_m < 10) - writeChar('0', wb); - writeIntText(max_m, wb); - + writeIntText(right_date_id, wb); writeChar('_', wb); writeIntText(left_id, wb); writeChar('_', wb); @@ -252,4 +361,47 @@ String StorageMergeTree::getPartName(Yandex::DayNum_t left_month, Yandex::DayNum return res; } + +void StorageMergeTree::loadDataParts() +{ + LOG_DEBUG(log, "Loading data parts"); + + Yandex::DateLUTSingleton & date_lut = Yandex::DateLUTSingleton::instance(); + SharedPtr new_data_parts = new DataParts; + + static Poco::RegularExpression file_name_regexp("^(\\d{8})_(\\d{8})_(\\d+)_(\\d+)_(\\d+)"); + Poco::DirectoryIterator end; + Poco::RegularExpression::MatchVec matches; + for (Poco::DirectoryIterator it(full_path); it != end; ++it) + { + std::string file_name = it.name(); + + if (!(file_name_regexp.match(file_name, 0, matches) && 6 == matches.size())) + continue; + + DataPart part; + part.left_date = date_lut.toDayNum(Yandex::OrderedIdentifier2Date(file_name.substr(matches[1].offset, matches[1].length))); + part.right_date = date_lut.toDayNum(Yandex::OrderedIdentifier2Date(file_name.substr(matches[2].offset, matches[2].length))); + part.left = Poco::NumberParser::parseUnsigned64(file_name.substr(matches[3].offset, matches[3].length)); + part.right = Poco::NumberParser::parseUnsigned64(file_name.substr(matches[4].offset, matches[4].length)); + part.level = Poco::NumberParser::parseUnsigned(file_name.substr(matches[5].offset, matches[5].length)); + part.name = file_name; + + /// Размер - в количестве засечек. + part.size = Poco::File(full_path + file_name + "/" + escapeForFileName(columns->front().first) + ".mrk").getSize() + / MERGE_TREE_MARK_SIZE; + + part.modification_time = it->getLastModified().epochTime(); + + part.left_month = date_lut.toFirstDayOfMonth(part.left_date); + part.right_month = date_lut.toFirstDayOfMonth(part.right_date); + + new_data_parts->insert(part); + } + + data_parts.set(new_data_parts); + + LOG_DEBUG(log, "Loaded data parts (" << new_data_parts->size() << " items)"); +} + }