mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
dbms: made code more readable [#METR-19266]
This commit is contained in:
parent
cd0541134a
commit
824341c1c2
@ -885,50 +885,36 @@ BlockInputStreams StorageLog::read(
|
||||
0, std::numeric_limits<size_t>::max(),
|
||||
settings.max_read_buffer_size));
|
||||
}
|
||||
else if (has_nullable_columns)
|
||||
{
|
||||
const Marks & marks = getMarksWithRealRowCount();
|
||||
size_t marks_size = marks.size();
|
||||
|
||||
if (to_mark == std::numeric_limits<size_t>::max())
|
||||
to_mark = marks_size;
|
||||
|
||||
if (to_mark > marks_size || to_mark < from_mark)
|
||||
throw Exception("Marks out of range in StorageLog::read", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (threads > to_mark - from_mark)
|
||||
threads = to_mark - from_mark;
|
||||
|
||||
for (size_t thread = 0; thread < threads; ++thread)
|
||||
{
|
||||
size_t mark_number = from_mark + thread * (to_mark - from_mark) / threads;
|
||||
|
||||
size_t cur_total_row_count = ((thread == 0 && from_mark == 0)
|
||||
? 0
|
||||
: marks[from_mark + thread * (to_mark - from_mark) / threads - 1].rows);
|
||||
|
||||
size_t next_total_row_count = marks[from_mark + (thread + 1) * (to_mark - from_mark) / threads - 1].rows;
|
||||
|
||||
size_t rows_limit = next_total_row_count - cur_total_row_count;
|
||||
|
||||
/// We must have the same number of marks and of null marks.
|
||||
size_t null_mark_number = from_null_mark + (mark_number - from_mark);
|
||||
|
||||
res.push_back(std::make_shared<LogBlockInputStream>(
|
||||
max_block_size,
|
||||
column_names,
|
||||
*this,
|
||||
mark_number,
|
||||
null_mark_number,
|
||||
rows_limit,
|
||||
settings.max_read_buffer_size));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
const Marks & marks = getMarksWithRealRowCount();
|
||||
size_t marks_size = marks.size();
|
||||
|
||||
/// Given a thread, return the start of the area from which
|
||||
/// it can read data, i.e. a mark number.
|
||||
auto mark_from_thread = [&](size_t thread)
|
||||
{
|
||||
/// The computation below reflects the fact that marks
|
||||
/// are uniformly distributed among threads.
|
||||
return from_mark + thread * (to_mark - from_mark) / threads;
|
||||
};
|
||||
|
||||
/// Given a thread, get the parameters that specify the area
|
||||
/// from which it can read data, i.e. a mark number and a
|
||||
/// maximum number of rows.
|
||||
auto get_reader_parameters = [&](size_t thread)
|
||||
{
|
||||
size_t mark_number = mark_from_thread(thread);
|
||||
|
||||
size_t cur_total_row_count = ((thread == 0 && from_mark == 0)
|
||||
? 0
|
||||
: marks[mark_number - 1].rows);
|
||||
size_t next_total_row_count = marks[mark_from_thread(thread + 1) - 1].rows;
|
||||
size_t rows_limit = next_total_row_count - cur_total_row_count;
|
||||
|
||||
return std::make_pair(mark_number, rows_limit);
|
||||
};
|
||||
|
||||
if (to_mark == std::numeric_limits<size_t>::max())
|
||||
to_mark = marks_size;
|
||||
|
||||
@ -938,18 +924,43 @@ BlockInputStreams StorageLog::read(
|
||||
if (threads > to_mark - from_mark)
|
||||
threads = to_mark - from_mark;
|
||||
|
||||
for (size_t thread = 0; thread < threads; ++thread)
|
||||
if (has_nullable_columns)
|
||||
{
|
||||
res.push_back(std::make_shared<LogBlockInputStream>(
|
||||
max_block_size,
|
||||
column_names,
|
||||
*this,
|
||||
from_mark + thread * (to_mark - from_mark) / threads,
|
||||
marks[from_mark + (thread + 1) * (to_mark - from_mark) / threads - 1].rows -
|
||||
((thread == 0 && from_mark == 0)
|
||||
? 0
|
||||
: marks[from_mark + thread * (to_mark - from_mark) / threads - 1].rows),
|
||||
settings.max_read_buffer_size));
|
||||
for (size_t thread = 0; thread < threads; ++thread)
|
||||
{
|
||||
size_t mark_number;
|
||||
size_t rows_limit;
|
||||
std::tie(mark_number, rows_limit) = get_reader_parameters(thread);
|
||||
|
||||
/// This works since we have the same number of marks and null marks.
|
||||
size_t null_mark_number = from_null_mark + (mark_number - from_mark);
|
||||
|
||||
res.push_back(std::make_shared<LogBlockInputStream>(
|
||||
max_block_size,
|
||||
column_names,
|
||||
*this,
|
||||
mark_number,
|
||||
null_mark_number,
|
||||
rows_limit,
|
||||
settings.max_read_buffer_size));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (size_t thread = 0; thread < threads; ++thread)
|
||||
{
|
||||
size_t mark_number;
|
||||
size_t rows_limit;
|
||||
std::tie(mark_number, rows_limit) = get_reader_parameters(thread);
|
||||
|
||||
res.push_back(std::make_shared<LogBlockInputStream>(
|
||||
max_block_size,
|
||||
column_names,
|
||||
*this,
|
||||
mark_number,
|
||||
rows_limit,
|
||||
settings.max_read_buffer_size));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user