This commit is contained in:
Andrey Mironov 2015-09-02 18:48:10 +03:00
parent 70ec50316e
commit 288f46936a
3 changed files with 23 additions and 20 deletions

View File

@ -121,8 +121,8 @@ struct Settings
\
/** Распределять чтение из MergeTree по потокам равномерно, обеспечивая стабильное среднее время исполнения каждого потока в пределах одного чтения. */ \
M(SettingBool, merge_tree_uniform_read_distribution, false) \
/** Переиспользовать MergeTreeReader'ы после вычитывания подзадачи на чтение. */ \
M(SettingBool, merge_tree_uniform_read_reuse_readers, false) \
/** Переиспользовать буферы в MergeTreeReader::reconf() после выполнения подзадачи на чтение. */ \
M(SettingBool, merge_tree_uniform_read_reuse_buffers, false) \
\
/** Минимальная длина выражения expr = x1 OR ... expr = xN для оптимизации */ \
M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3) \

View File

@ -31,9 +31,9 @@ public:
const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns_,
UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_,
MergeTreeData & storage_, const MarkRanges & all_mark_ranges,
size_t aio_threshold_, size_t max_read_buffer_size_)
size_t aio_threshold_, size_t max_read_buffer_size_, bool reuse_buffers = false)
: uncompressed_cache(uncompressed_cache_), mark_cache(mark_cache_), storage(storage_),
aio_threshold(aio_threshold_), max_read_buffer_size(max_read_buffer_size_)
aio_threshold(aio_threshold_), max_read_buffer_size(max_read_buffer_size_), reuse_buffers{reuse_buffers}
{
reconf(path_, data_part, columns_, all_mark_ranges);
}
@ -42,6 +42,13 @@ public:
const String & path, const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns,
const MarkRanges & all_mark_ranges)
{
/// @todo should avg_value_size_hints be cleared on this->data_part != data_part? supposedly so
if (this->data_part != data_part)
avg_value_size_hints.clear();
if (!reuse_buffers)
buffers.clear();
this->path = path;
this->data_part = data_part;
this->part_name = data_part->name;
@ -307,6 +314,7 @@ private:
* (ReadBufferAIO passes pointer to buffer to a syscall and waits for it's completion in destructor, thus there is
* a chance that system will write to memory after it has been freed */
std::vector<std::unique_ptr<Memory>> buffers;
std::map<std::string, double> avg_value_size_hints;
String path;
MergeTreeData::DataPartPtr data_part;
String part_name;
@ -322,6 +330,7 @@ private:
MarkRanges all_mark_ranges;
size_t aio_threshold;
size_t max_read_buffer_size;
bool reuse_buffers;
void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, size_t level = 0)
{
@ -415,8 +424,9 @@ private:
else
{
Stream & stream = *streams[name];
double & avg_value_size_hint = avg_value_size_hints[name];
stream.seekToMark(from_mark);
type.deserializeBinary(column, *stream.data_buffer, max_rows_to_read, stream.avg_value_size_hint);
type.deserializeBinary(column, *stream.data_buffer, max_rows_to_read, avg_value_size_hint);
/// Вычисление подсказки о среднем размере значения.
size_t column_size = column.size();
@ -425,10 +435,10 @@ private:
double current_avg_value_size = static_cast<double>(column.byteSize()) / column_size;
/// Эвристика, чтобы при изменениях, значение avg_value_size_hint быстро росло, но медленно уменьшалось.
if (current_avg_value_size > stream.avg_value_size_hint)
stream.avg_value_size_hint = current_avg_value_size;
else if (current_avg_value_size * 2 < stream.avg_value_size_hint)
stream.avg_value_size_hint = (current_avg_value_size + stream.avg_value_size_hint * 3) / 4;
if (current_avg_value_size > avg_value_size_hint)
avg_value_size_hint = current_avg_value_size;
else if (current_avg_value_size * 2 < avg_value_size_hint)
avg_value_size_hint = (current_avg_value_size + avg_value_size_hint * 3) / 4;
}
}
}

View File

@ -24,7 +24,7 @@ public:
use_uncompressed_cache{use_uncompressed_cache}, prewhere_actions{prewhere_actions},
prewhere_column{prewhere_column}, min_bytes_to_use_direct_io{settings.min_bytes_to_use_direct_io},
max_read_buffer_size{settings.max_read_buffer_size},
reuse_readers(settings.merge_tree_uniform_read_reuse_readers), virt_column_names{virt_column_names},
reuse_buffers(settings.merge_tree_uniform_read_reuse_buffers), virt_column_names{virt_column_names},
log{&Logger::get("MergeTreeThreadBlockInputStream")}
{}
@ -69,14 +69,7 @@ protected:
injectVirtualColumns(res);
if (task->mark_ranges.empty())
{
if (!reuse_readers)
{
reader = {};
pre_reader = {};
}
task = {};
}
}
return res;
@ -108,13 +101,13 @@ private:
reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(),
storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, reuse_buffers);
if (prewhere_actions)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), storage, task->mark_ranges, min_bytes_to_use_direct_io,
max_read_buffer_size);
max_read_buffer_size, reuse_buffers);
}
else
{
@ -325,7 +318,7 @@ private:
const String prewhere_column;
const std::size_t min_bytes_to_use_direct_io;
const std::size_t max_read_buffer_size;
const bool reuse_readers;
const bool reuse_buffers;
const Names virt_column_names;
Logger * log;