This commit is contained in:
Andrey Mironov 2015-09-03 16:30:50 +03:00
commit 40026a8e7b
13 changed files with 1050 additions and 203 deletions

View File

@ -1870,7 +1870,8 @@ private:
if (const auto value_col = typeid_cast<const ColumnVector<T> *>(value_col_untyped))
{
const auto size = value_col->size();
const auto & mask = createMask<T>(size, block, arguments);
bool is_const;
const auto mask = createConstMask<T>(size, block, arguments, is_const);
const auto & val = value_col->getData();
const auto out_col = new ColumnVector<UInt8>(size);
@ -1879,25 +1880,46 @@ private:
auto & out = out_col->getData();
for (const auto i : ext::range(0, size))
out[i] = Impl::combine(val[i], mask[i]);
if (is_const)
{
for (const auto i : ext::range(0, size))
out[i] = Impl::combine(val[i], mask);
}
else
{
const auto mask = createMask<T>(size, block, arguments);
for (const auto i : ext::range(0, size))
out[i] = Impl::combine(val[i], mask[i]);
}
return true;
}
else if (const auto value_col = typeid_cast<const ColumnConst<T> *>(value_col_untyped))
{
const auto size = value_col->size();
const auto & mask = createMask<T>(size, block, arguments);
const auto & val = value_col->getData();
bool is_const;
const auto mask = createConstMask<T>(size, block, arguments, is_const);
const auto val = value_col->getData();
const auto out_col = new ColumnVector<UInt8>(size);
ColumnPtr out_col_ptr{out_col};
block.getByPosition(result).column = out_col_ptr;
if (is_const)
{
block.getByPosition(result).column = new ColumnConst<UInt8>{
size, Impl::combine(val, mask)
};
}
else
{
const auto mask = createMask<T>(size, block, arguments);
const auto out_col = new ColumnVector<UInt8>(size);
ColumnPtr out_col_ptr{out_col};
block.getByPosition(result).column = out_col_ptr;
auto & out = out_col->getData();
auto & out = out_col->getData();
for (const auto i : ext::range(0, size))
out[i] = Impl::combine(val, mask[i]);
for (const auto i : ext::range(0, size))
out[i] = Impl::combine(val, mask[i]);
}
return true;
}
@ -1905,26 +1927,49 @@ private:
return false;
}
template <typename T>
PODArray<T> createMask(const std::size_t size, const Block & block, const ColumnNumbers & arguments)
template <typename ValueType>
ValueType createConstMask(const std::size_t size, const Block & block, const ColumnNumbers & arguments, bool & is_const)
{
PODArray<T> mask(size, T{});
is_const = true;
ValueType mask{};
for (const auto i : ext::range(1, arguments.size()))
addToMask(mask, block.getByPosition(arguments[i]).column.get());
{
const auto pos_col = block.getByPosition(arguments[i]).column.get();
if (pos_col->isConst())
{
const auto pos = static_cast<const ColumnConst<ValueType> *>(pos_col)->getData();
mask = mask | 1 << pos;
}
else
{
is_const = false;
return {};
}
}
return mask;
}
template <typename ValueType>
void addToMask(PODArray<ValueType> & mask, const IColumn * const pos_col)
PODArray<ValueType> createMask(const std::size_t size, const Block & block, const ColumnNumbers & arguments)
{
if (!addToMaskImpl<UInt8>(mask, pos_col) && !addToMaskImpl<UInt16>(mask, pos_col) &&
!addToMaskImpl<UInt32>(mask, pos_col) && !addToMaskImpl<UInt64>(mask, pos_col))
throw Exception{
"Illegal column " + pos_col->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN
};
PODArray<ValueType> mask(size, ValueType{});
for (const auto i : ext::range(1, arguments.size()))
{
const auto pos_col = block.getByPosition(arguments[i]).column.get();
if (!addToMaskImpl<UInt8>(mask, pos_col) && !addToMaskImpl<UInt16>(mask, pos_col) &&
!addToMaskImpl<UInt32>(mask, pos_col) && !addToMaskImpl<UInt64>(mask, pos_col))
throw Exception{
"Illegal column " + pos_col->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN
};
}
return mask;
}
template <typename PosType, typename ValueType>

View File

@ -59,22 +59,27 @@ struct Memory : boost::noncopyable, Allocator
void resize(size_t new_size)
{
if (new_size < m_capacity)
if (0 == m_capacity)
{
m_size = m_capacity = new_size;
alloc();
}
else if (new_size < m_capacity)
{
m_size = new_size;
return;
}
else
{
new_size = align(new_size);
new_size = align(new_size, alignment);
/// @todo pointer to void can be converted to pointer to any type with static_cast by ISO C++, reinterpret_cast has no advantages
m_data = reinterpret_cast<char *>(Allocator::realloc(m_data, m_capacity, new_size, alignment));
m_capacity = new_size;
m_size = m_capacity;
}
}
private:
size_t align(size_t value) const
static size_t align(const size_t value, const size_t alignment)
{
if (!alignment)
return value;
@ -82,6 +87,7 @@ private:
return (value + alignment - 1) / alignment * alignment;
}
private:
void alloc()
{
if (!m_capacity)
@ -93,7 +99,8 @@ private:
ProfileEvents::increment(ProfileEvents::IOBufferAllocs);
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, m_capacity);
size_t new_capacity = align(m_capacity);
size_t new_capacity = align(m_capacity, alignment);
/// @todo pointer to void can be converted to pointer to any type with static_cast by ISO C++, reinterpret_cast has no advantages
m_data = reinterpret_cast<char *>(Allocator::alloc(new_capacity, alignment));
m_capacity = new_capacity;
m_size = m_capacity;
@ -104,6 +111,7 @@ private:
if (!m_data)
return;
/// @todo pointer to any type can be implicitly converted to pointer to void, no cast required
Allocator::free(reinterpret_cast<void *>(m_data), m_capacity);
m_data = nullptr; /// Чтобы избежать double free, если последующий вызов alloc кинет исключение.
}

View File

@ -82,10 +82,11 @@ private:
}
public:
CachedCompressedReadBuffer(const std::string & path_, UncompressedCache * cache_, size_t estimated_size_,
size_t aio_threshold_, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_),
estimated_size(estimated_size_), aio_threshold(aio_threshold_), file_pos(0)
CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, size_t aio_threshold_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_), estimated_size(estimated_size_),
aio_threshold(aio_threshold_), file_pos(0)
{
}

View File

@ -42,10 +42,11 @@ private:
}
public:
CompressedReadBufferFromFile(const std::string & path, size_t estimated_size, size_t aio_threshold, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
CompressedReadBufferFromFile(
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<ReadBuffer>(0),
p_file_in(createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size)),
file_in(*p_file_in)
p_file_in(createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size)),
file_in(*p_file_in)
{
compressed_in = &file_in;
}

View File

@ -119,6 +119,9 @@ struct Settings
* (Чтобы большие запросы не вымывали кэш.) */ \
M(SettingUInt64, merge_tree_max_rows_to_use_cache, (1024 * 1024)) \
\
/** Распределять чтение из MergeTree по потокам равномерно, обеспечивая стабильное среднее время исполнения каждого потока в пределах одного чтения. */ \
M(SettingBool, merge_tree_uniform_read_distribution, false) \
\
/** Минимальная длина выражения expr = x1 OR ... expr = xN для оптимизации */ \
M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3) \
\

View File

@ -0,0 +1,24 @@
#pragma once
#include <cstddef>
namespace DB
{
/** Пара засечек, определяющая диапазон строк в куске. Именно, диапазон имеет вид [begin * index_granularity, end * index_granularity).
*/
struct MarkRange
{
std::size_t begin;
std::size_t end;
MarkRange() = default;
MarkRange(const std::size_t begin, const std::size_t end) : begin{begin}, end{end} {}
};
using MarkRanges = std::vector<MarkRange>;
}

View File

@ -2,6 +2,7 @@
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/Storages/MergeTree/MergeTreeReader.h>
#include <DB/Storages/MergeTree/RangesInDataPart.h>
namespace DB
@ -33,22 +34,6 @@ private:
Logger * log;
struct RangesInDataPart
{
MergeTreeData::DataPartPtr data_part;
size_t part_index_in_query;
MarkRanges ranges;
RangesInDataPart() {}
RangesInDataPart(MergeTreeData::DataPartPtr data_part_, size_t part_index_in_query_)
: data_part(data_part_), part_index_in_query(part_index_in_query_)
{
}
};
typedef std::vector<RangesInDataPart> RangesInDataParts;
BlockInputStreams spreadMarkRangesAmongThreads(
RangesInDataParts parts,
size_t threads,

View File

@ -0,0 +1,435 @@
#pragma once
#include <DB/Core/NamesAndTypes.h>
#include <DB/Storages/MergeTree/RangesInDataPart.h>
#include <statdaemons/ext/range.hpp>
#include <mutex>
namespace DB
{
struct MergeTreeReadTask
{
MergeTreeData::DataPartPtr data_part;
MarkRanges mark_ranges;
std::size_t part_index_in_query;
const Names & ordered_names;
const NameSet & column_name_set;
const NamesAndTypesList & columns;
const NamesAndTypesList & pre_columns;
const bool remove_prewhere_column;
const bool should_reorder;
MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part, const MarkRanges & ranges, const std::size_t part_index_in_query,
const Names & ordered_names, const NameSet & column_name_set, const NamesAndTypesList & columns,
const NamesAndTypesList & pre_columns, const bool remove_prewhere_column, const bool should_reorder)
: data_part{data_part}, mark_ranges{ranges}, part_index_in_query{part_index_in_query},
ordered_names{ordered_names}, column_name_set{column_name_set}, columns{columns}, pre_columns{pre_columns},
remove_prewhere_column{remove_prewhere_column}, should_reorder{should_reorder}
{}
};
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
class MergeTreeReadPool
{
public:
MergeTreeReadPool(
const std::size_t threads, const std::size_t sum_marks, const std::size_t min_marks_for_concurrent_read,
RangesInDataParts parts, MergeTreeData & data, const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column_name, const bool check_columns, const Names & column_names,
const bool do_not_steal_tasks = false)
: data{data}, column_names{column_names}, do_not_steal_tasks{do_not_steal_tasks}
{
const auto per_part_sum_marks = fillPerPartInfo(parts, prewhere_actions, prewhere_column_name, check_columns);
fillPerThreadInfo(threads, sum_marks, per_part_sum_marks, parts, min_marks_for_concurrent_read);
}
MergeTreeReadPool(const MergeTreeReadPool &) = delete;
MergeTreeReadPool & operator=(const MergeTreeReadPool &) = delete;
MergeTreeReadTaskPtr getTask(const std::size_t min_marks_to_read, const std::size_t thread)
{
const std::lock_guard<std::mutex> lock{mutex};
if (remaining_thread_tasks.empty())
return nullptr;
const auto tasks_remaining_for_this_thread = !threads_tasks[thread].sum_marks_in_parts.empty();
if (!tasks_remaining_for_this_thread && do_not_steal_tasks)
return nullptr;
const auto thread_idx = tasks_remaining_for_this_thread ? thread : *std::begin(remaining_thread_tasks);
auto & thread_tasks = threads_tasks[thread_idx];
auto & thread_task = thread_tasks.parts_and_ranges.back();
const auto part_idx = thread_task.part_idx;
auto & part = parts[part_idx];
auto & marks_in_part = thread_tasks.sum_marks_in_parts.back();
/// Берём весь кусок, если он достаточно мал
auto need_marks = std::min(marks_in_part, min_marks_to_read);
/// Не будем оставлять в куске слишком мало строк.
if (marks_in_part > need_marks &&
marks_in_part - need_marks < min_marks_to_read)
need_marks = marks_in_part;
MarkRanges ranges_to_get_from_part;
/// Возьмем весь кусок, если он достаточно мал.
if (marks_in_part <= need_marks)
{
const auto marks_to_get_from_range = marks_in_part;
/// Восстановим порядок отрезков.
std::reverse(thread_task.ranges.begin(), thread_task.ranges.end());
ranges_to_get_from_part = thread_task.ranges;
marks_in_part -= marks_to_get_from_range;
thread_tasks.parts_and_ranges.pop_back();
thread_tasks.sum_marks_in_parts.pop_back();
if (thread_tasks.sum_marks_in_parts.empty())
remaining_thread_tasks.erase(thread_idx);
}
else
{
/// Цикл по отрезкам куска.
while (need_marks > 0 && !thread_task.ranges.empty())
{
auto & range = thread_task.ranges.back();
const std::size_t marks_in_range = range.end - range.begin;
const std::size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
range.begin += marks_to_get_from_range;
if (range.begin == range.end)
{
std::swap(range, thread_task.ranges.back());
thread_task.ranges.pop_back();
}
marks_in_part -= marks_to_get_from_range;
need_marks -= marks_to_get_from_range;
}
}
return std::make_unique<MergeTreeReadTask>(
part.data_part, ranges_to_get_from_part, part.part_index_in_query, column_names,
per_part_column_name_set[part_idx], per_part_columns[part_idx], per_part_pre_columns[part_idx],
per_part_remove_prewhere_column[part_idx], per_part_should_reorder[part_idx]);
}
public:
std::vector<std::size_t> fillPerPartInfo(
RangesInDataParts & parts, const ExpressionActionsPtr & prewhere_actions, const String & prewhere_column_name,
const bool check_columns)
{
std::vector<std::size_t> per_part_sum_marks;
for (const auto i : ext::range(0, parts.size()))
{
auto & part = parts[i];
/// Посчитаем засечки для каждого куска.
size_t sum_marks = 0;
/// Отрезки уже перечислены справа налево, reverse в MergeTreeDataSelectExecutor.
for (const auto & range : part.ranges)
sum_marks += range.end - range.begin;
per_part_sum_marks.push_back(sum_marks);
per_part_columns_lock.push_back(std::make_unique<Poco::ScopedReadRWLock>(
part.data_part->columns_lock));
/// inject column names required for DEFAULT evaluation in current part
auto required_column_names = column_names;
const auto injected_columns = injectRequiredColumns(part.data_part, required_column_names);
auto should_reoder = !injected_columns.empty();
Names required_pre_column_names;
if (prewhere_actions)
{
/// collect columns required for PREWHERE evaluation
required_pre_column_names = prewhere_actions->getRequiredColumns();
/// there must be at least one column required for PREWHERE
if (required_pre_column_names.empty())
required_pre_column_names.push_back(required_column_names[0]);
/// PREWHERE columns may require some additional columns for DEFAULT evaluation
const auto injected_pre_columns = injectRequiredColumns(part.data_part, required_pre_column_names);
if (!injected_pre_columns.empty())
should_reoder = true;
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const NameSet pre_name_set{
std::begin(required_pre_column_names), std::end(required_pre_column_names)
};
/** Если выражение в PREWHERE - не столбец таблицы, не нужно отдавать наружу столбец с ним
* (от storage ожидают получить только столбцы таблицы). */
per_part_remove_prewhere_column.push_back(0 == pre_name_set.count(prewhere_column_name));
Names post_column_names;
for (const auto & name : required_column_names)
if (!pre_name_set.count(name))
post_column_names.push_back(name);
required_column_names = post_column_names;
}
else
per_part_remove_prewhere_column.push_back(false);
per_part_column_name_set.emplace_back(std::begin(required_column_names), std::end(required_column_names));
if (check_columns)
{
/** Под part->columns_lock проверим, что все запрошенные столбцы в куске того же типа, что в таблице.
* Это может быть не так во время ALTER MODIFY. */
if (!required_pre_column_names.empty())
data.check(part.data_part->columns, required_pre_column_names);
if (!required_column_names.empty())
data.check(part.data_part->columns, required_column_names);
per_part_pre_columns.push_back(data.getColumnsList().addTypes(required_pre_column_names));
per_part_columns.push_back(data.getColumnsList().addTypes(required_column_names));
}
else
{
per_part_pre_columns.push_back(part.data_part->columns.addTypes(required_pre_column_names));
per_part_columns.push_back(part.data_part->columns.addTypes(required_column_names));
}
per_part_should_reorder.push_back(should_reoder);
this->parts.push_back({ part.data_part, part.part_index_in_query });
}
return per_part_sum_marks;
}
void fillPerThreadInfo(
const std::size_t threads, const std::size_t sum_marks, std::vector<std::size_t> per_part_sum_marks,
RangesInDataParts & parts, const std::size_t min_marks_for_concurrent_read)
{
threads_tasks.resize(threads);
const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;
for (std::size_t i = 0; i < threads && !parts.empty(); ++i)
{
auto need_marks = min_marks_per_thread;
while (need_marks > 0 && !parts.empty())
{
const auto part_idx = parts.size() - 1;
RangesInDataPart & part = parts.back();
size_t & marks_in_part = per_part_sum_marks.back();
/// Не будем брать из куска слишком мало строк.
if (marks_in_part >= min_marks_for_concurrent_read &&
need_marks < min_marks_for_concurrent_read)
need_marks = min_marks_for_concurrent_read;
/// Не будем оставлять в куске слишком мало строк.
if (marks_in_part > need_marks &&
marks_in_part - need_marks < min_marks_for_concurrent_read)
need_marks = marks_in_part;
MarkRanges ranges_to_get_from_part;
size_t marks_in_ranges = need_marks;
/// Возьмем весь кусок, если он достаточно мал.
if (marks_in_part <= need_marks)
{
/// Оставим отрезки перечисленными справа налево для удобства.
ranges_to_get_from_part = part.ranges;
marks_in_ranges = marks_in_part;
need_marks -= marks_in_part;
parts.pop_back();
per_part_sum_marks.pop_back();
}
else
{
/// Цикл по отрезкам куска.
while (need_marks > 0)
{
if (part.ranges.empty())
throw Exception("Unexpected end of ranges while spreading marks among threads", ErrorCodes::LOGICAL_ERROR);
MarkRange & range = part.ranges.back();
const size_t marks_in_range = range.end - range.begin;
const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
range.begin += marks_to_get_from_range;
marks_in_part -= marks_to_get_from_range;
need_marks -= marks_to_get_from_range;
if (range.begin == range.end)
part.ranges.pop_back();
}
/// Вновь перечислим отрезки справа налево, чтобы .getTask() мог забирать их с помощью .pop_back().
std::reverse(std::begin(ranges_to_get_from_part), std::end(ranges_to_get_from_part));
}
threads_tasks[i].parts_and_ranges.push_back({ part_idx, ranges_to_get_from_part });
threads_tasks[i].sum_marks_in_parts.push_back(marks_in_ranges);
if (marks_in_ranges != 0)
remaining_thread_tasks.insert(i);
}
}
}
/** Если некоторых запрошенных столбцов нет в куске,
* то выясняем, какие столбцы может быть необходимо дополнительно прочитать,
* чтобы можно было вычислить DEFAULT выражение для этих столбцов.
* Добавляет их в columns. */
NameSet injectRequiredColumns(const MergeTreeData::DataPartPtr & part, Names & columns) const
{
NameSet required_columns{std::begin(columns), std::end(columns)};
NameSet injected_columns;
auto all_column_files_missing = true;
for (size_t i = 0; i < columns.size(); ++i)
{
const auto & column_name = columns[i];
/// column has files and hence does not require evaluation
if (part->hasColumnFiles(column_name))
{
all_column_files_missing = false;
continue;
}
const auto default_it = data.column_defaults.find(column_name);
/// columns has no explicit default expression
if (default_it == std::end(data.column_defaults))
continue;
/// collect identifiers required for evaluation
IdentifierNameSet identifiers;
default_it->second.expression->collectIdentifierNames(identifiers);
for (const auto & identifier : identifiers)
{
if (data.hasColumn(identifier))
{
/// ensure each column is added only once
if (required_columns.count(identifier) == 0)
{
columns.emplace_back(identifier);
required_columns.emplace(identifier);
injected_columns.emplace(identifier);
}
}
}
}
if (all_column_files_missing)
{
addMinimumSizeColumn(part, columns);
/// correctly report added column
injected_columns.insert(columns.back());
}
return injected_columns;
}
/** Добавить столбец минимального размера.
* Используется в случае, когда ни один столбец не нужен, но нужно хотя бы знать количество строк.
* Добавляет в columns.
*/
void addMinimumSizeColumn(const MergeTreeData::DataPartPtr & part, Names & columns) const
{
const auto get_column_size = [this, &part] (const String & name) {
const auto & files = part->checksums.files;
const auto escaped_name = escapeForFileName(name);
const auto bin_file_name = escaped_name + ".bin";
const auto mrk_file_name = escaped_name + ".mrk";
return files.find(bin_file_name)->second.file_size + files.find(mrk_file_name)->second.file_size;
};
const auto & storage_columns = data.getColumnsList();
const NameAndTypePair * minimum_size_column = nullptr;
auto minimum_size = std::numeric_limits<size_t>::max();
for (const auto & column : storage_columns)
{
if (!part->hasColumnFiles(column.name))
continue;
const auto size = get_column_size(column.name);
if (size < minimum_size)
{
minimum_size = size;
minimum_size_column = &column;
}
}
if (!minimum_size_column)
throw Exception{
"Could not find a column of minimum size in MergeTree",
ErrorCodes::LOGICAL_ERROR
};
columns.push_back(minimum_size_column->name);
}
std::vector<std::unique_ptr<Poco::ScopedReadRWLock>> per_part_columns_lock;
MergeTreeData & data;
Names column_names;
const bool do_not_steal_tasks;
std::vector<NameSet> per_part_column_name_set;
std::vector<NamesAndTypesList> per_part_columns;
std::vector<NamesAndTypesList> per_part_pre_columns;
/// @todo actually all of these values are either true or false for the whole query, thus no vector required
std::vector<bool> per_part_remove_prewhere_column;
std::vector<bool> per_part_should_reorder;
struct part_t
{
MergeTreeData::DataPartPtr data_part;
std::size_t part_index_in_query;
};
std::vector<part_t> parts;
struct thread_task_t
{
struct part_index_and_range_t
{
std::size_t part_idx;
MarkRanges ranges;
};
std::vector<part_index_and_range_t> parts_and_ranges;
std::vector<std::size_t> sum_marks_in_parts;
};
std::vector<thread_task_t> threads_tasks;
std::unordered_set<std::size_t> remaining_thread_tasks;
mutable std::mutex mutex;
};
using MergeTreeReadPoolPtr = std::shared_ptr<MergeTreeReadPool>;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <DB/Storages/MarkCache.h>
#include <DB/Storages/MergeTree/MarkRange.h>
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/DataTypes/IDataType.h>
#include <DB/DataTypes/DataTypeNested.h>
@ -17,19 +18,6 @@
namespace DB
{
/** Пара засечек, определяющая диапазон строк в куске. Именно, диапазон имеет вид [begin * index_granularity, end * index_granularity).
*/
struct MarkRange
{
size_t begin;
size_t end;
MarkRange() {}
MarkRange(size_t begin_, size_t end_) : begin(begin_), end(end_) {}
};
typedef std::vector<MarkRange> MarkRanges;
/** Умеет читать данные между парой засечек из одного куска. При чтении последовательных отрезков не делает лишних seek-ов.
* При чтении почти последовательных отрезков делает seek-и быстро, не выбрасывая содержимое буфера.
@ -44,11 +32,23 @@ public:
UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_,
MergeTreeData & storage_, const MarkRanges & all_mark_ranges,
size_t aio_threshold_, size_t max_read_buffer_size_)
: path(path_), data_part(data_part), part_name(data_part->name), columns(columns_),
uncompressed_cache(uncompressed_cache_), mark_cache(mark_cache_),
storage(storage_), all_mark_ranges(all_mark_ranges),
aio_threshold(aio_threshold_), max_read_buffer_size(max_read_buffer_size_)
: uncompressed_cache(uncompressed_cache_), mark_cache(mark_cache_), storage(storage_),
aio_threshold(aio_threshold_), max_read_buffer_size(max_read_buffer_size_)
{
reconf(path_, data_part, columns_, all_mark_ranges);
}
void reconf(
const String & path, const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns,
const MarkRanges & all_mark_ranges)
{
this->path = path;
this->data_part = data_part;
this->part_name = data_part->name;
this->columns = columns;
this->all_mark_ranges = all_mark_ranges;
this->streams.clear();
try
{
if (!Poco::File(path).exists())
@ -74,20 +74,14 @@ public:
{
size_t max_rows_to_read = (to_mark - from_mark) * storage.index_granularity;
/** Для некоторых столбцов файлы с данными могут отсутствовать.
* Это бывает для старых кусков, после добавления новых столбцов в структуру таблицы.
*/
auto has_missing_columns = false;
/// Указатели на столбцы смещений, общие для столбцов из вложенных структур данных
/// Если append, все значения nullptr, и offset_columns используется только для проверки, что столбец смещений уже прочитан.
OffsetColumns offset_columns;
const auto read_column = [&] (const NameAndTypePair & it) {
for (const NameAndTypePair & it : columns)
{
if (streams.end() == streams.find(it.name))
{
has_missing_columns = true;
return;
}
continue;
/// Все столбцы уже есть в блоке. Будем добавлять значения в конец.
bool append = res.has(it.name);
@ -120,24 +114,12 @@ public:
if (!append && column.column->size())
res.insert(column);
};
for (const NameAndTypePair & it : columns)
read_column(it);
if (has_missing_columns && !res)
{
addMinimumSizeColumn();
/// minimum size column is necessarily at list's front
read_column(columns.front());
}
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
{
storage.reportBrokenPart(part_name);
}
/// Более хорошая диагностика.
throw Exception(e.message() + "\n(while reading from part " + path + " from mark " + toString(from_mark) + " to "
@ -151,60 +133,13 @@ public:
}
}
/** Добавить столбец минимального размера.
* Используется в случае, когда ни один столбец не нужен, но нужно хотя бы знать количество строк.
* Добавляет в columns.
*/
void addMinimumSizeColumn()
{
const auto get_column_size = [this] (const String & name) {
const auto & files = data_part->checksums.files;
const auto escaped_name = escapeForFileName(name);
const auto bin_file_name = escaped_name + ".bin";
const auto mrk_file_name = escaped_name + ".mrk";
return files.find(bin_file_name)->second.file_size + files.find(mrk_file_name)->second.file_size;
};
const auto & storage_columns = storage.getColumnsList();
const NameAndTypePair * minimum_size_column = nullptr;
auto minimum_size = std::numeric_limits<size_t>::max();
for (const auto & column : storage_columns)
{
if (!data_part->hasColumnFiles(column.name))
continue;
const auto size = get_column_size(column.name);
if (size < minimum_size)
{
minimum_size = size;
minimum_size_column = &column;
}
}
if (!minimum_size_column)
throw Exception{
"could not find a column of minimum size in MergeTree",
ErrorCodes::LOGICAL_ERROR
};
addStream(minimum_size_column->name, *minimum_size_column->type, all_mark_ranges);
columns.emplace(std::begin(columns), *minimum_size_column);
added_minimum_size_column = &columns.front();
}
/** Добавляет в блок недостающие столбцы из ordered_names, состоящие из значений по-умолчанию.
* Недостающие столбцы добавляются в позиции, такие же как в ordered_names.
* Если был добавлен хотя бы один столбец - то все столбцы в блоке переупорядочиваются как в ordered_names.
*/
void fillMissingColumns(Block & res, const Names & ordered_names)
void fillMissingColumns(Block & res, const Names & ordered_names, const bool always_reorder = false)
{
fillMissingColumnsImpl(res, ordered_names, false);
fillMissingColumnsImpl(res, ordered_names, always_reorder);
}
/** То же самое, но всегда переупорядочивает столбцы в блоке, как в ordered_names
@ -220,16 +155,14 @@ private:
{
MarkCache::MappedPtr marks;
ReadBuffer * data_buffer;
Poco::SharedPtr<CachedCompressedReadBuffer> cached_buffer;
Poco::SharedPtr<CompressedReadBufferFromFile> non_cached_buffer;
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
std::string path_prefix;
size_t max_mark_range;
/// Используется в качестве подсказки, чтобы уменьшить количество реаллокаций при создании столбца переменной длины.
double avg_value_size_hint = 0;
Stream(const String & path_prefix_, UncompressedCache * uncompressed_cache, MarkCache * mark_cache, const MarkRanges & all_mark_ranges,
size_t aio_threshold, size_t max_read_buffer_size)
Stream(
const String & path_prefix_, UncompressedCache * uncompressed_cache, MarkCache * mark_cache,
const MarkRanges & all_mark_ranges, size_t aio_threshold, size_t max_read_buffer_size)
: path_prefix(path_prefix_)
{
loadMarks(mark_cache);
@ -281,15 +214,15 @@ private:
if (uncompressed_cache)
{
cached_buffer = new CachedCompressedReadBuffer(path_prefix + ".bin", uncompressed_cache,
estimated_size, aio_threshold, buffer_size);
data_buffer = &*cached_buffer;
cached_buffer = std::make_unique<CachedCompressedReadBuffer>(
path_prefix + ".bin", uncompressed_cache, estimated_size, aio_threshold, buffer_size);
data_buffer = cached_buffer.get();
}
else
{
non_cached_buffer = new CompressedReadBufferFromFile(path_prefix + ".bin", estimated_size,
aio_threshold, buffer_size);
data_buffer = &*non_cached_buffer;
non_cached_buffer = std::make_unique<CompressedReadBufferFromFile>(
path_prefix + ".bin", estimated_size, aio_threshold, buffer_size);
data_buffer = non_cached_buffer.get();
}
}
@ -350,20 +283,21 @@ private:
typedef std::map<std::string, std::unique_ptr<Stream> > FileStreams;
/// Используется в качестве подсказки, чтобы уменьшить количество реаллокаций при создании столбца переменной длины.
std::map<std::string, double> avg_value_size_hints;
String path;
const MergeTreeData::DataPartPtr & data_part;
MergeTreeData::DataPartPtr data_part;
String part_name;
FileStreams streams;
/// Запрашиваемые столбцы. Возможно, с добавлением minimum_size_column.
/// Запрашиваемые столбцы.
NamesAndTypesList columns;
const NameAndTypePair * added_minimum_size_column = nullptr;
UncompressedCache * uncompressed_cache;
MarkCache * mark_cache;
MergeTreeData & storage;
const MarkRanges & all_mark_ranges;
MarkRanges all_mark_ranges;
size_t aio_threshold;
size_t max_read_buffer_size;
@ -386,14 +320,16 @@ private:
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name))
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(
path + escaped_size_name, uncompressed_cache, mark_cache, all_mark_ranges, aio_threshold, max_read_buffer_size)));
streams.emplace(size_name, std::make_unique<Stream>(
path + escaped_size_name, uncompressed_cache, mark_cache,
all_mark_ranges, aio_threshold, max_read_buffer_size));
addStream(name, *type_arr->getNestedType(), all_mark_ranges, level + 1);
}
else
streams[name].reset(new Stream(
path + escaped_column_name, uncompressed_cache, mark_cache, all_mark_ranges, aio_threshold, max_read_buffer_size));
streams.emplace(name, std::make_unique<Stream>(
path + escaped_column_name, uncompressed_cache, mark_cache,
all_mark_ranges, aio_threshold, max_read_buffer_size));
}
@ -453,8 +389,9 @@ private:
else
{
Stream & stream = *streams[name];
double & avg_value_size_hint = avg_value_size_hints[name];
stream.seekToMark(from_mark);
type.deserializeBinary(column, *stream.data_buffer, max_rows_to_read, stream.avg_value_size_hint);
type.deserializeBinary(column, *stream.data_buffer, max_rows_to_read, avg_value_size_hint);
/// Вычисление подсказки о среднем размере значения.
size_t column_size = column.size();
@ -463,10 +400,10 @@ private:
double current_avg_value_size = static_cast<double>(column.byteSize()) / column_size;
/// Эвристика, чтобы при изменениях, значение avg_value_size_hint быстро росло, но медленно уменьшалось.
if (current_avg_value_size > stream.avg_value_size_hint)
stream.avg_value_size_hint = current_avg_value_size;
else if (current_avg_value_size * 2 < stream.avg_value_size_hint)
stream.avg_value_size_hint = (current_avg_value_size + stream.avg_value_size_hint * 3) / 4;
if (current_avg_value_size > avg_value_size_hint)
avg_value_size_hint = current_avg_value_size;
else if (current_avg_value_size * 2 < avg_value_size_hint)
avg_value_size_hint = (current_avg_value_size + avg_value_size_hint * 3) / 4;
}
}
}
@ -548,15 +485,6 @@ private:
if (should_evaluate_defaults)
evaluateMissingDefaults(res, columns, storage.column_defaults, storage.context);
/// remove added column to ensure same content among all blocks
if (added_minimum_size_column)
{
res.erase(0);
streams.erase(added_minimum_size_column->name);
columns.erase(std::begin(columns));
added_minimum_size_column = nullptr;
}
/// sort columns to ensure consistent order among all blocks
if (should_sort)
{
@ -566,12 +494,6 @@ private:
if (res.has(name))
ordered_block.insert(res.getByName(name));
if (res.columns() != ordered_block.columns())
throw Exception{
"Ordered block has different number of columns than original one:\n" +
ordered_block.dumpNames() + "\nvs.\n" + res.dumpNames(),
ErrorCodes::LOGICAL_ERROR};
std::swap(res, ordered_block);
}
}

View File

@ -0,0 +1,342 @@
#pragma once
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/Storages/MergeTree/PKCondition.h>
#include <DB/Storages/MergeTree/MergeTreeReader.h>
#include <DB/Storages/MergeTree/MergeTreeReadPool.h>
namespace DB
{
class MergeTreeThreadBlockInputStream : public IProfilingBlockInputStream
{
std::size_t thread;
public:
MergeTreeThreadBlockInputStream(
const std::size_t thread,
const MergeTreeReadPoolPtr & pool, const std::size_t min_marks_to_read, const std::size_t block_size,
MergeTreeData & storage, const bool use_uncompressed_cache, const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column, const Settings & settings, const Names & virt_column_names)
: thread{thread}, pool{pool}, block_size_marks{block_size / storage.index_granularity},
/// round min_marks_to_read up to nearest multiple of block_size expressed in marks
min_marks_to_read{block_size
? (min_marks_to_read * storage.index_granularity + block_size - 1)
/ block_size * block_size / storage.index_granularity
: min_marks_to_read
},
storage{storage}, use_uncompressed_cache{use_uncompressed_cache}, prewhere_actions{prewhere_actions},
prewhere_column{prewhere_column}, min_bytes_to_use_direct_io{settings.min_bytes_to_use_direct_io},
max_read_buffer_size{settings.max_read_buffer_size}, virt_column_names{virt_column_names},
log{&Logger::get("MergeTreeThreadBlockInputStream")}
{}
String getName() const override { return "MergeTreeThread"; }
String getID() const override
{
std::stringstream res;
/// @todo print some meaningful information
// res << "MergeTreeThread(columns";
//
// for (const auto & column : columns)
// res << ", " << column.name;
//
// if (prewhere_actions)
// res << ", prewhere, " << prewhere_actions->getID();
//
// res << ", marks";
//
// for (size_t i = 0; i < all_mark_ranges.size(); ++i)
// res << ", " << all_mark_ranges[i].begin << ", " << all_mark_ranges[i].end;
//
// res << ")";
return res.str();
}
protected:
/// Будем вызывать progressImpl самостоятельно.
void progress(const Progress & value) override {}
Block readImpl() override
{
Block res;
while (!res)
{
if (!task && !getNewTask())
break;
res = readFromPart();
if (res)
injectVirtualColumns(res);
if (task->mark_ranges.empty())
task = {};
}
return res;
}
private:
bool getNewTask()
{
task = pool->getTask(min_marks_to_read, thread);
if (!task)
{
/** Закрываем файлы (ещё до уничтожения объекта).
* Чтобы при создании многих источников, но одновременном чтении только из нескольких,
* буферы не висели в памяти. */
reader = {};
pre_reader = {};
return false;
}
const auto path = storage.getFullPath() + task->data_part->name + '/';
if (!reader)
{
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.context.getUncompressedCache();
owned_mark_cache = storage.context.getMarkCache();
reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(),
storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
if (prewhere_actions)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), storage, task->mark_ranges, min_bytes_to_use_direct_io,
max_read_buffer_size);
}
else
{
reader->reconf(path, task->data_part, task->columns, task->mark_ranges);
if (prewhere_actions)
pre_reader->reconf(path, task->data_part, task->pre_columns, task->mark_ranges);
}
return true;
}
Block readFromPart()
{
Block res;
if (prewhere_actions)
{
do
{
/// Прочитаем полный блок столбцов, нужных для вычисления выражения в PREWHERE.
size_t space_left = std::max(1LU, block_size_marks);
MarkRanges ranges_to_read;
while (!task->mark_ranges.empty() && space_left)
{
auto & range = task->mark_ranges.back();
size_t marks_to_read = std::min(range.end - range.begin, space_left);
pre_reader->readRange(range.begin, range.begin + marks_to_read, res);
ranges_to_read.emplace_back(range.begin, range.begin + marks_to_read);
space_left -= marks_to_read;
range.begin += marks_to_read;
if (range.begin == range.end)
task->mark_ranges.pop_back();
}
progressImpl({ res.rowsInFirstColumn(), res.bytes() });
pre_reader->fillMissingColumns(res, task->ordered_names, task->should_reorder);
/// Вычислим выражение в PREWHERE.
prewhere_actions->execute(res);
ColumnPtr column = res.getByName(prewhere_column).column;
if (task->remove_prewhere_column)
res.erase(prewhere_column);
const auto pre_bytes = res.bytes();
/** Если фильтр - константа (например, написано PREWHERE 1),
* то либо вернём пустой блок, либо вернём блок без изменений.
*/
if (const auto column_const = typeid_cast<const ColumnConstUInt8 *>(column.get()))
{
if (!column_const->getData())
{
res.clear();
return res;
}
for (const auto & range : ranges_to_read)
reader->readRange(range.begin, range.end, res);
progressImpl({ 0, res.bytes() - pre_bytes });
}
else if (const auto column_vec = typeid_cast<const ColumnUInt8 *>(column.get()))
{
size_t index_granularity = storage.index_granularity;
const auto & pre_filter = column_vec->getData();
IColumn::Filter post_filter(pre_filter.size());
/// Прочитаем в нужных отрезках остальные столбцы и составим для них свой фильтр.
size_t pre_filter_pos = 0;
size_t post_filter_pos = 0;
for (const auto & range : ranges_to_read)
{
auto begin = range.begin;
auto pre_filter_begin_pos = pre_filter_pos;
for (auto mark = range.begin; mark <= range.end; ++mark)
{
UInt8 nonzero = 0;
if (mark != range.end)
{
const size_t limit = std::min(pre_filter.size(), pre_filter_pos + index_granularity);
for (size_t row = pre_filter_pos; row < limit; ++row)
nonzero |= pre_filter[row];
}
if (!nonzero)
{
if (mark > begin)
{
memcpy(
&post_filter[post_filter_pos],
&pre_filter[pre_filter_begin_pos],
pre_filter_pos - pre_filter_begin_pos);
post_filter_pos += pre_filter_pos - pre_filter_begin_pos;
reader->readRange(begin, mark, res);
}
begin = mark + 1;
pre_filter_begin_pos = std::min(pre_filter_pos + index_granularity, pre_filter.size());
}
if (mark < range.end)
pre_filter_pos = std::min(pre_filter_pos + index_granularity, pre_filter.size());
}
}
if (!post_filter_pos)
{
res.clear();
continue;
}
progressImpl({ 0, res.bytes() - pre_bytes });
post_filter.resize(post_filter_pos);
/// Отфильтруем столбцы, относящиеся к PREWHERE, используя pre_filter,
/// остальные столбцы - используя post_filter.
size_t rows = 0;
for (const auto i : ext::range(0, res.columns()))
{
auto & col = res.getByPosition(i);
if (col.name == prewhere_column && res.columns() > 1)
continue;
col.column =
col.column->filter(task->column_name_set.count(col.name) ? post_filter : pre_filter);
rows = col.column->size();
}
/// Заменим столбец со значением условия из PREWHERE на константу.
if (!task->remove_prewhere_column)
res.getByName(prewhere_column).column = new ColumnConstUInt8{rows, 1};
}
else
throw Exception{
"Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8.",
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER
};
reader->fillMissingColumnsAndReorder(res, task->ordered_names);
}
while (!task->mark_ranges.empty() && !res && !isCancelled());
}
else
{
size_t space_left = std::max(1LU, block_size_marks);
while (!task->mark_ranges.empty() && space_left)
{
auto & range = task->mark_ranges.back();
const size_t marks_to_read = std::min(range.end - range.begin, space_left);
reader->readRange(range.begin, range.begin + marks_to_read, res);
space_left -= marks_to_read;
range.begin += marks_to_read;
if (range.begin == range.end)
task->mark_ranges.pop_back();
}
progressImpl({ res.rowsInFirstColumn(), res.bytes() });
reader->fillMissingColumns(res, task->ordered_names, task->should_reorder);
}
return res;
}
void injectVirtualColumns(Block & block)
{
const auto rows = block.rowsInFirstColumn();
/// add virtual columns
if (!virt_column_names.empty())
{
for (const auto & virt_column_name : virt_column_names)
{
if (virt_column_name == "_part")
{
block.insert(ColumnWithTypeAndName{
ColumnConst<String>{rows, task->data_part->name}.convertToFullColumn(),
new DataTypeString,
virt_column_name
});
}
else if (virt_column_name == "_part_index")
{
block.insert(ColumnWithTypeAndName{
ColumnConst<UInt64>{rows, task->part_index_in_query}.convertToFullColumn(),
new DataTypeUInt64,
virt_column_name
});
}
}
}
}
MergeTreeReadPoolPtr pool;
const std::size_t block_size_marks;
const std::size_t min_marks_to_read;
MergeTreeData & storage;
const bool use_uncompressed_cache;
ExpressionActionsPtr prewhere_actions;
const String prewhere_column;
const std::size_t min_bytes_to_use_direct_io;
const std::size_t max_read_buffer_size;
const Names virt_column_names;
Logger * log;
using MergeTreeReaderPtr = std::unique_ptr<MergeTreeReader>;
UncompressedCachePtr owned_uncompressed_cache;
MarkCachePtr owned_mark_cache;
MergeTreeReadTaskPtr task;
MergeTreeReaderPtr reader;
MergeTreeReaderPtr pre_reader;
};
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/Storages/MergeTree/MarkRange.h>
namespace DB
{
struct RangesInDataPart
{
MergeTreeData::DataPartPtr data_part;
std::size_t part_index_in_query;
MarkRanges ranges;
RangesInDataPart() = default;
RangesInDataPart(const MergeTreeData::DataPartPtr & data_part, const std::size_t part_index_in_query,
const MarkRanges & ranges = MarkRanges{})
: data_part{data_part}, part_index_in_query{part_index_in_query}, ranges{ranges}
{
}
};
using RangesInDataParts = std::vector<RangesInDataPart>;
}

View File

@ -11,11 +11,10 @@ namespace DB
/// Примечание: выделяется дополнительная страница, которая содежрит те данные, которые
/// не влезают в основной буфер.
ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_,
char * existing_memory_)
ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, char * existing_memory_)
: ReadBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
fill_buffer(BufferWithOwnMemory<ReadBuffer>(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
filename(filename_)
fill_buffer(BufferWithOwnMemory<ReadBuffer>(internalBuffer().size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
filename(filename_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);

View File

@ -1,6 +1,7 @@
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Storages/MergeTree/MergeTreeReadPool.h>
#include <DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
#include <DB/DataStreams/FilterBlockInputStream.h>
@ -346,9 +347,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
const Names & virt_columns,
const Settings & settings)
{
const size_t min_marks_for_concurrent_read =
const std::size_t min_marks_for_concurrent_read =
(settings.merge_tree_min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity;
const size_t max_marks_to_use_cache =
const std::size_t max_marks_to_use_cache =
(settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;
/// Посчитаем засечки для каждого куска.
@ -370,7 +371,27 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
BlockInputStreams res;
if (sum_marks > 0)
if (sum_marks > 0 && settings.merge_tree_uniform_read_distribution == 1)
{
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
threads, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_actions, prewhere_column, true,
column_names);
for (std::size_t i = 0; i < threads; ++i)
res.emplace_back(new MergeTreeThreadBlockInputStream{
i, pool, min_marks_for_concurrent_read, max_block_size, data, use_uncompressed_cache, prewhere_actions,
prewhere_column, settings, virt_columns
});
/// Оценим общее количество строк - для прогресс-бара.
const std::size_t total_rows = data.index_granularity * sum_marks;
/// Выставим приблизительное количество строк только для первого источника
static_cast<IProfilingBlockInputStream &>(*res.front()).setTotalRowsApprox(total_rows);
LOG_TRACE(log, "Reading approx. " << total_rows);
}
else if (sum_marks > 0)
{
const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;
@ -468,7 +489,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
const Settings & settings,
const Context & context)
{
size_t max_marks_to_use_cache = (settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;
const size_t max_marks_to_use_cache =
(settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;
const size_t min_marks_for_read_task =
(settings.merge_tree_min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity;
size_t sum_marks = 0;
for (size_t i = 0; i < parts.size(); ++i)
@ -480,26 +504,55 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
BlockInputStreams to_merge;
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
if (settings.merge_tree_uniform_read_distribution == 1)
{
RangesInDataPart & part = parts[part_index];
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
parts.size(), sum_marks, min_marks_for_read_task, parts, data, prewhere_actions, prewhere_column, true,
column_names, true);
BlockInputStreamPtr source_stream = new MergeTreeBlockInputStream(
data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data,
part.data_part, part.ranges, use_uncompressed_cache,
prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size);
for (const String & virt_column : virt_columns)
for (const auto i : ext::range(0, parts.size()))
{
if (virt_column == "_part")
source_stream = new AddingConstColumnBlockInputStream<String>(
source_stream, new DataTypeString, part.data_part->name, "_part");
else if (virt_column == "_part_index")
source_stream = new AddingConstColumnBlockInputStream<UInt64>(
source_stream, new DataTypeUInt64, part.part_index_in_query, "_part_index");
BlockInputStreamPtr source_stream{
new MergeTreeThreadBlockInputStream{
i, pool, min_marks_for_read_task, max_block_size, data, use_uncompressed_cache, prewhere_actions,
prewhere_column, settings, virt_columns
}
};
to_merge.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression()));
}
to_merge.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression()));
/// Оценим общее количество строк - для прогресс-бара.
const std::size_t total_rows = data.index_granularity * sum_marks;
/// Выставим приблизительное количество строк только для первого источника
static_cast<IProfilingBlockInputStream &>(*to_merge.front()).setTotalRowsApprox(total_rows);
LOG_TRACE(log, "Reading approx. " << total_rows);
}
else
{
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
RangesInDataPart & part = parts[part_index];
BlockInputStreamPtr source_stream = new MergeTreeBlockInputStream(
data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data,
part.data_part, part.ranges, use_uncompressed_cache,
prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size);
for (const String & virt_column : virt_columns)
{
if (virt_column == "_part")
source_stream = new AddingConstColumnBlockInputStream<String>(
source_stream, new DataTypeString, part.data_part->name, "_part");
else if (virt_column == "_part_index")
source_stream = new AddingConstColumnBlockInputStream<UInt64>(
source_stream, new DataTypeUInt64, part.part_index_in_query, "_part_index");
}
to_merge.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression()));
}
}
BlockInputStreams res;