diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h index e14410f3100..d23dff164f9 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h @@ -73,12 +73,15 @@ public: } /// Получает набор диапазонов засечек, вне которых не могут находиться ключи из заданного диапазона. - static MarkRanges markRangesFromPkRange(const String & path, - size_t marks_count, - StorageMergeTree & storage, - PKCondition & key_condition) + static MarkRanges markRangesFromPkRange( + const StorageMergeTree::DataPart::Index & index, + StorageMergeTree & storage, + PKCondition & key_condition) { MarkRanges res; + + size_t key_size = storage.sort_descr.size(); + size_t marks_count = index.size() / key_size; /// Если индекс не используется. if (key_condition.alwaysTrue()) @@ -87,31 +90,11 @@ public: } else { - /// Читаем индекс. - typedef AutoArray Index; - size_t key_size = storage.sort_descr.size(); - Index index(marks_count); - for (size_t i = 0; i < marks_count; ++i) - index[i].resize(key_size); - - { - String index_path = path + "primary.idx"; - ReadBufferFromFile index_file(index_path, std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize())); - - for (size_t i = 0; i < marks_count; ++i) - { - for (size_t j = 0; j < key_size; ++j) - storage.primary_key_sample.getByPosition(j).type->deserializeBinary(index[i][j], index_file); - } - - if (!index_file.eof()) - throw Exception("index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE); - } - - /// В стеке всегда будут находиться непересекающиеся подозрительные отрезки, самый левый наверху (back). - /// На каждом шаге берем левый отрезок и проверяем, подходит ли он. - /// Если подходит, разбиваем его на более мелкие и кладем их в стек. Если нет - выбрасываем его. - /// Если отрезок уже длиной в одну засечку, добавляем его в ответ и выбрасываем. + /** В стеке всегда будут находиться непересекающиеся подозрительные отрезки, самый левый наверху (back). + * На каждом шаге берем левый отрезок и проверяем, подходит ли он. + * Если подходит, разбиваем его на более мелкие и кладем их в стек. Если нет - выбрасываем его. + * Если отрезок уже длиной в одну засечку, добавляем его в ответ и выбрасываем. + */ std::vector ranges_stack; ranges_stack.push_back(MarkRange(0, marks_count)); while (!ranges_stack.empty()) @@ -121,9 +104,9 @@ public: bool may_be_true; if (range.end == marks_count) - may_be_true = key_condition.mayBeTrueAfter(index[range.begin]); + may_be_true = key_condition.mayBeTrueAfter(&index[range.begin * key_size]); else - may_be_true = key_condition.mayBeTrueInRange(index[range.begin], index[range.end]); + may_be_true = key_condition.mayBeTrueInRange(&index[range.begin * key_size], &index[range.end * key_size]); if (!may_be_true) continue; @@ -132,23 +115,19 @@ public: { /// Увидели полезный промежуток между соседними засечками. Либо добавим его к последнему диапазону, либо начнем новый диапазон. if (res.empty() || range.begin - res.back().end > storage.min_marks_for_seek) - { res.push_back(range); - } else - { res.back().end = range.end; - } } else { /// Разбиваем отрезок и кладем результат в стек справа налево. size_t step = (range.end - range.begin - 1) / storage.settings.coarse_index_granularity + 1; size_t end; + for (end = range.end; end > range.begin + step; end -= step) - { ranges_stack.push_back(MarkRange(end - step, end)); - } + ranges_stack.push_back(MarkRange(range.begin, end)); } } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h index c1b5c5c60c0..e421808334b 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockOutputStream.h @@ -104,10 +104,11 @@ private: size_t rows = block.rows(); size_t columns = block.columns(); UInt64 part_id = storage.increment.get(false); + size_t part_size = (rows + storage.index_granularity - 1) / storage.index_granularity; String part_name = storage.getPartName( DayNum_t(min_date), DayNum_t(max_date), - part_id, part_id, 0); + part_id, part_id, 0); String part_tmp_path = storage.full_path + "tmp_" + part_name + "/"; String part_res_path = storage.full_path + part_name + "/"; @@ -128,6 +129,9 @@ private: LOG_TRACE(storage.log, "Writing index."); /// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки. + StorageMergeTree::DataPart::Index index_vec; + index_vec.reserve(part_size * storage.sort_descr.size()); + { WriteBufferFromFile index(part_tmp_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, flags); @@ -141,8 +145,13 @@ private: : &block.getByPosition(storage.sort_descr[i].column_number)); for (size_t i = 0; i < rows; i += storage.index_granularity) + { for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it) - (*it)->type->serializeBinary((*(*it)->column)[i], index); + { + index_vec.push_back((*(*it)->column)[i]); + (*it)->type->serializeBinary(index_vec.back(), index); + } + } index.next(); } @@ -175,10 +184,11 @@ private: new_data_part->right = part_id; new_data_part->level = 0; new_data_part->name = part_name; - new_data_part->size = (rows + storage.index_granularity - 1) / storage.index_granularity; + new_data_part->size = part_size; new_data_part->modification_time = time(0); new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date); new_data_part->right_month = date_lut.toFirstDayNumOfMonth(new_data_part->right_date); + new_data_part->index.swap(index_vec); storage.data_parts.insert(new_data_part); storage.all_data_parts.insert(new_data_part); diff --git a/dbms/include/DB/Storages/MergeTree/PKCondition.h b/dbms/include/DB/Storages/MergeTree/PKCondition.h index fc625ad01ff..7a2a6c9ff3e 100644 --- a/dbms/include/DB/Storages/MergeTree/PKCondition.h +++ b/dbms/include/DB/Storages/MergeTree/PKCondition.h @@ -158,11 +158,11 @@ public: /// Выполнимо ли условие в диапазоне ключей. /// left_pk и right_pk должны содержать все поля из sort_descr в соответствующем порядке. - bool mayBeTrueInRange(const Row & left_pk, const Row & right_pk); + bool mayBeTrueInRange(const Field * left_pk, const Field * right_pk); /// Выполнимо ли условие в полубесконечном (не ограниченном справа) диапазоне ключей. /// left_pk должен содержать все поля из sort_descr в соответствующем порядке. - bool mayBeTrueAfter(const Row & left_pk); + bool mayBeTrueAfter(const Field * left_pk); bool alwaysTrue() { @@ -230,7 +230,7 @@ private: typedef std::vector RPN; typedef std::map ColumnIndices; - bool mayBeTrueInRange(const Row & left_pk, const Row & right_pk, bool right_bounded); + bool mayBeTrueInRange(const Field * left_pk, const Field * right_pk, bool right_bounded); void traverseAST(ASTPtr & node, Block & block_with_constants); bool atomFromAST(ASTPtr & node, Block & block_with_constants, RPNElement & out); diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index bf9adda0c26..17e8c300414 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -260,7 +260,11 @@ private: /// Смотреть и изменять это поле следует под залоченным data_parts_mutex. bool currently_merging; - /// NOTE можно загружать индекс и засечки в оперативку + /// Первичный ключ. Всегда загружается в оперативку. + typedef std::vector Index; + Index index; + + /// NOTE можно загружать засечки тоже в оперативку void remove() const { @@ -318,6 +322,23 @@ private: && left <= rhs.left && right >= rhs.right; } + + /// Загрузить индекс. + void loadIndex() + { + size_t key_size = storage.sort_descr.size(); + index.resize(key_size * size); + + String index_path = storage.full_path + name + "/primary.idx"; + ReadBufferFromFile index_file(index_path, std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize())); + + for (size_t i = 0; i < size; ++i) + for (size_t j = 0; j < key_size; ++j) + storage.primary_key_sample.getByPosition(j).type->deserializeBinary(index[i * key_size + j], index_file); + + if (!index_file.eof()) + throw Exception("index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE); + } }; typedef SharedPtr DataPartPtr; @@ -370,6 +391,7 @@ private: bool use_uncompressed_cache, ExpressionActionsPtr prewhere_actions, const String & prewhere_column); + BlockInputStreams spreadMarkRangesAmongThreadsFinal( RangesInDataParts parts, size_t threads, diff --git a/dbms/src/Storages/MergeTree/PKCondition.cpp b/dbms/src/Storages/MergeTree/PKCondition.cpp index 96f2b13ee8f..812bea11eb2 100644 --- a/dbms/src/Storages/MergeTree/PKCondition.cpp +++ b/dbms/src/Storages/MergeTree/PKCondition.cpp @@ -241,7 +241,7 @@ struct BoolMask } }; -bool PKCondition::mayBeTrueInRange(const Row & left_pk, const Row & right_pk, bool right_bounded) +bool PKCondition::mayBeTrueInRange(const Field * left_pk, const Field * right_pk, bool right_bounded) { /// Найдем диапазоны элементов ключа. std::vector key_ranges(sort_descr.size(), Range()); @@ -311,14 +311,14 @@ bool PKCondition::mayBeTrueInRange(const Row & left_pk, const Row & right_pk, bo return rpn_stack[0].can_be_true; } -bool PKCondition::mayBeTrueInRange(const Row & left_pk, const Row & right_pk) +bool PKCondition::mayBeTrueInRange(const Field * left_pk, const Field * right_pk) { return mayBeTrueInRange(left_pk, right_pk, true); } -bool PKCondition::mayBeTrueAfter(const Row & left_pk) +bool PKCondition::mayBeTrueAfter(const Field * left_pk) { - return mayBeTrueInRange(left_pk, Row(), false); + return mayBeTrueInRange(left_pk, NULL, false); } } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index d8baa95ad3c..2f16d481f9f 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -166,10 +166,15 @@ BlockInputStreams StorageMergeTree::read( /// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition. { Poco::ScopedLock lock(data_parts_mutex); - + for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it) - if (date_condition.mayBeTrueInRange(Row(1, static_cast((*it)->left_date)),Row(1, static_cast((*it)->right_date)))) + { + Field left = static_cast((*it)->left_date); + Field right = static_cast((*it)->right_date); + + if (date_condition.mayBeTrueInRange(&left, &right)) parts.push_back(*it); + } } /// Семплирование. @@ -196,10 +201,8 @@ BlockInputStreams StorageMergeTree::read( for (size_t i = 0; i < parts.size(); ++i) { DataPartPtr & part = parts[i]; - MarkRanges ranges = MergeTreeBlockInputStream::markRangesFromPkRange(full_path + part->name + '/', - part->size, - *this, - key_condition); + MarkRanges ranges = MergeTreeBlockInputStream::markRangesFromPkRange(part->index, *this, key_condition); + for (size_t j = 0; j < ranges.size(); ++j) total_count += ranges[j].end - ranges[j].begin; } @@ -272,10 +275,8 @@ BlockInputStreams StorageMergeTree::read( { DataPartPtr & part = parts[i]; RangesInDataPart ranges(part); - ranges.ranges = MergeTreeBlockInputStream::markRangesFromPkRange(full_path + part->name + '/', - part->size, - *this, - key_condition); + ranges.ranges = MergeTreeBlockInputStream::markRangesFromPkRange(part->index, *this, key_condition); + if (!ranges.ranges.empty()) { parts_with_ranges.push_back(ranges); @@ -638,6 +639,17 @@ void StorageMergeTree::loadDataParts() part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime(); + try + { + part->loadIndex(); + } + catch (...) + { + /// Не будем вставлять в набор кусок с битым индексом. Пропустим кусок и позволим серверу запуститься. + tryLogCurrentException(__PRETTY_FUNCTION__); + continue; + } + data_parts.insert(part); } @@ -1070,6 +1082,7 @@ void StorageMergeTree::mergeParts(std::vector parts) new_data_part->size = to->marksCount(); new_data_part->modification_time = time(0); + new_data_part->loadIndex(); /// NOTE Только что записанный индекс заново считывается с диска. Можно было бы формировать его сразу при записи. { Poco::ScopedLock lock(data_parts_mutex);