From 824341c1c2bce853a3d7a3b08a966ae2a12a2e75 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Wed, 24 Aug 2016 13:27:19 +0300 Subject: [PATCH] dbms: made code more readable [#METR-19266] --- dbms/src/Storages/StorageLog.cpp | 111 +++++++++++++++++-------------- 1 file changed, 61 insertions(+), 50 deletions(-) diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index 347254b2f17..2df7e790663 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -885,50 +885,36 @@ BlockInputStreams StorageLog::read( 0, std::numeric_limits::max(), settings.max_read_buffer_size)); } - else if (has_nullable_columns) - { - const Marks & marks = getMarksWithRealRowCount(); - size_t marks_size = marks.size(); - - if (to_mark == std::numeric_limits::max()) - to_mark = marks_size; - - if (to_mark > marks_size || to_mark < from_mark) - throw Exception("Marks out of range in StorageLog::read", ErrorCodes::LOGICAL_ERROR); - - if (threads > to_mark - from_mark) - threads = to_mark - from_mark; - - for (size_t thread = 0; thread < threads; ++thread) - { - size_t mark_number = from_mark + thread * (to_mark - from_mark) / threads; - - size_t cur_total_row_count = ((thread == 0 && from_mark == 0) - ? 0 - : marks[from_mark + thread * (to_mark - from_mark) / threads - 1].rows); - - size_t next_total_row_count = marks[from_mark + (thread + 1) * (to_mark - from_mark) / threads - 1].rows; - - size_t rows_limit = next_total_row_count - cur_total_row_count; - - /// We must have the same number of marks and of null marks. - size_t null_mark_number = from_null_mark + (mark_number - from_mark); - - res.push_back(std::make_shared( - max_block_size, - column_names, - *this, - mark_number, - null_mark_number, - rows_limit, - settings.max_read_buffer_size)); - } - } else { const Marks & marks = getMarksWithRealRowCount(); size_t marks_size = marks.size(); + /// Given a thread, return the start of the area from which + /// it can read data, i.e. a mark number. + auto mark_from_thread = [&](size_t thread) + { + /// The computation below reflects the fact that marks + /// are uniformly distributed among threads. + return from_mark + thread * (to_mark - from_mark) / threads; + }; + + /// Given a thread, get the parameters that specify the area + /// from which it can read data, i.e. a mark number and a + /// maximum number of rows. + auto get_reader_parameters = [&](size_t thread) + { + size_t mark_number = mark_from_thread(thread); + + size_t cur_total_row_count = ((thread == 0 && from_mark == 0) + ? 0 + : marks[mark_number - 1].rows); + size_t next_total_row_count = marks[mark_from_thread(thread + 1) - 1].rows; + size_t rows_limit = next_total_row_count - cur_total_row_count; + + return std::make_pair(mark_number, rows_limit); + }; + if (to_mark == std::numeric_limits::max()) to_mark = marks_size; @@ -938,18 +924,43 @@ BlockInputStreams StorageLog::read( if (threads > to_mark - from_mark) threads = to_mark - from_mark; - for (size_t thread = 0; thread < threads; ++thread) + if (has_nullable_columns) { - res.push_back(std::make_shared( - max_block_size, - column_names, - *this, - from_mark + thread * (to_mark - from_mark) / threads, - marks[from_mark + (thread + 1) * (to_mark - from_mark) / threads - 1].rows - - ((thread == 0 && from_mark == 0) - ? 0 - : marks[from_mark + thread * (to_mark - from_mark) / threads - 1].rows), - settings.max_read_buffer_size)); + for (size_t thread = 0; thread < threads; ++thread) + { + size_t mark_number; + size_t rows_limit; + std::tie(mark_number, rows_limit) = get_reader_parameters(thread); + + /// This works since we have the same number of marks and null marks. + size_t null_mark_number = from_null_mark + (mark_number - from_mark); + + res.push_back(std::make_shared( + max_block_size, + column_names, + *this, + mark_number, + null_mark_number, + rows_limit, + settings.max_read_buffer_size)); + } + } + else + { + for (size_t thread = 0; thread < threads; ++thread) + { + size_t mark_number; + size_t rows_limit; + std::tie(mark_number, rows_limit) = get_reader_parameters(thread); + + res.push_back(std::make_shared( + max_block_size, + column_names, + *this, + mark_number, + rows_limit, + settings.max_read_buffer_size)); + } } }