From 89fde21e34e10aec8fc63b19982d3b3fd3bb268d Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 18 Jul 2012 19:16:16 +0000 Subject: [PATCH] dbms: development [#CONV-2944]. --- dbms/include/DB/Storages/StorageMergeTree.h | 2 +- dbms/src/DataTypes/DataTypeFixedString.cpp | 2 +- dbms/src/Storages/StorageMergeTree.cpp | 94 +++++++++++++-------- 3 files changed, 62 insertions(+), 36 deletions(-) diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index 1b1a513d16a..11b2c91f954 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -31,7 +31,7 @@ namespace DB * Структура файлов: * / increment.txt - файл, содержащий одно число, увеличивающееся на 1 - для генерации идентификаторов кусков. * / min-date _ max-date _ min-id _ max-id _ level / - директория с куском. - * / min-date _ max-date _ min-id _ max-id _ level . idx - индексный файл. + * / min-date _ max-date _ min-id _ max-id _ level / primary.idx - индексный файл. * Внутри директории с куском: * Column.bin - данные столбца * Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк. diff --git a/dbms/src/DataTypes/DataTypeFixedString.cpp b/dbms/src/DataTypes/DataTypeFixedString.cpp index a644da20842..c7efbcbf21f 100644 --- a/dbms/src/DataTypes/DataTypeFixedString.cpp +++ b/dbms/src/DataTypes/DataTypeFixedString.cpp @@ -71,7 +71,7 @@ void DataTypeFixedString::deserializeBinary(IColumn & column, ReadBuffer & istr, if (read_bytes % n != 0) throw Exception("Cannot read all data of type FixedString", ErrorCodes::CANNOT_READ_ALL_DATA); - + data.resize(read_bytes); } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 69fd6475214..64051a181ed 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -1,4 +1,4 @@ -#pragma once +#include #include @@ -33,7 +33,8 @@ public: size_t columns = block.columns(); /// Достаём столбец с датой. - const ColumnUInt16::Container_t & dates = dynamic_cast(block.getByName(storage.date_column_name)).getData(); + const ColumnUInt16::Container_t & dates = + dynamic_cast(*block.getByName(storage.date_column_name).column).getData(); /// Минимальная и максимальная дата. UInt16 min_date = std::numeric_limits::max(); @@ -76,7 +77,7 @@ public: } /// Для каждого месяца. - for (BlocksByMonth::const_iterator it = blocks_by_month.begin(); it != blocks_by_month.end(); ++it) + for (BlocksByMonth::iterator it = blocks_by_month.begin(); it != blocks_by_month.end(); ++it) writePart(it->second.block, it->second.min_date, it->second.max_date); } @@ -92,33 +93,26 @@ private: UInt16 max_date; BlockWithDateInterval() : min_date(std::numeric_limits::max()), max_date(0) {} - BlockWithDateInterval(Block & block, UInt16 min_date_, UInt16 max_date_) : min_date(min_date_), max_date(max_date_) {} + BlockWithDateInterval(const Block & block_, UInt16 min_date_, UInt16 max_date_) + : block(block_), min_date(min_date_), max_date(max_date_) {} }; - struct Stream - { - Stream(const std::string & data_path, const std::string & marks_path) : - plain(data_path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY), - compressed(plain), - marks(marks_path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY) {} - - WriteBufferFromFile plain; - CompressedWriteBuffer compressed; - WriteBufferFromFile marks; - }; - - typedef std::map > FileStreams; - FileStreams streams; - void writePart(Block & block, UInt16 min_date, UInt16 max_date) { + size_t rows = block.rows(); size_t columns = block.columns(); UInt64 part_id = storage.increment.get(true); String part_tmp_path = storage.full_path + "tmp_" + storage.getPartName( - min_date, max_date, + Yandex::DayNum_t(min_date), Yandex::DayNum_t(max_date), + part_id, part_id, 0) + + "/"; + + String part_res_path = storage.full_path + + storage.getPartName( + Yandex::DayNum_t(min_date), Yandex::DayNum_t(max_date), part_id, part_id, 0) + "/"; @@ -130,27 +124,59 @@ private: /// Сортируем. sortBlock(block, storage.sort_descr); - /// Теперь удаляем лишние (вычисленные только для сортировки) столбцы. - while (block.columns() != columns) - block.erase(columns); - /// Наконец-то можно писать данные на диск. + int flags = O_EXCL | O_CREAT | O_WRONLY; + + /// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки. + { + WriteBufferFromFile index(part_tmp_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, flags); + + typedef std::vector PrimaryColumns; + PrimaryColumns primary_columns; + + for (size_t i = 0, size = storage.sort_descr.size(); i < size; ++i) + primary_columns.push_back( + !storage.sort_descr[i].column_name.empty() + ? &block.getByName(storage.sort_descr[i].column_name) + : &block.getByPosition(storage.sort_descr[i].column_number)); + + for (size_t i = 0; i < rows; i += storage.index_granularity) + for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it) + (*it)->type->serializeBinary((*(*it)->column)[i], index); + } + for (size_t i = 0; i < columns; ++i) { const ColumnWithNameAndType & column = block.getByPosition(i); + String escaped_column_name = escapeForFileName(column.name); - Mark mark; - mark.rows = (storage.files[column.name].marks.empty() ? 0 : storage.files[column.name].marks.back().rows) + column.column->size(); - mark.offset = streams[column.name]->plain.count(); + WriteBufferFromFile plain(part_tmp_path + escaped_column_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags); + WriteBufferFromFile marks(part_tmp_path + escaped_column_name + ".mrk", DBMS_DEFAULT_BUFFER_SIZE, flags); + CompressedWriteBuffer compressed(plain); - writeIntBinary(mark.rows, streams[column.name]->marks); - writeIntBinary(mark.offset, streams[column.name]->marks); - - storage.files[column.name].marks.push_back(mark); - - column.type->serializeBinary(*column.column, streams[column.name]->compressed); - streams[column.name]->compressed.next(); + size_t prev_mark = 0; + column.type->serializeBinary(*column.column, compressed, + boost::bind(&MergeTreeBlockOutputStream::writeCallback, this, + boost::ref(prev_mark), boost::ref(plain), boost::ref(compressed), boost::ref(marks))); } + + /// Переименовываем кусок. + Poco::File(part_tmp_path).renameTo(part_res_path); + } + + /// Вызывается каждые index_granularity строк и пишет в файл с засечками (.mrk). + size_t writeCallback(size_t & prev_mark, + WriteBufferFromFile & plain, + CompressedWriteBuffer & compressed, + WriteBufferFromFile & marks) + { + /// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока) + + writeIntBinary(plain.count(), marks); + writeIntBinary(compressed.offset(), marks); + + prev_mark += storage.index_granularity; + return prev_mark; } };