This commit is contained in:
Andrey Mironov 2015-09-03 15:07:46 +03:00
parent 11857853a4
commit e8111d7e07
6 changed files with 23 additions and 75 deletions

View File

@ -19,7 +19,6 @@ class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadB
private: private:
const std::string path; const std::string path;
UncompressedCache * cache; UncompressedCache * cache;
Memory * memory;
size_t buf_size; size_t buf_size;
size_t estimated_size; size_t estimated_size;
size_t aio_threshold; size_t aio_threshold;
@ -35,26 +34,7 @@ private:
{ {
if (!file_in) if (!file_in)
{ {
if (memory) file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size);
{
auto & memory = *this->memory;
const auto resize = [&memory] (const std::size_t size) {
const auto growth_factor = 1.6f; /// close to golden ratio
if (memory.m_capacity == 0)
memory.resize(size);
else if (memory.m_capacity < size)
memory.resize(growth_factor * size);
};
if (aio_threshold == 0 || estimated_size < aio_threshold)
resize(buf_size);
else
resize(2 * Memory::align(buf_size + DEFAULT_AIO_FILE_BLOCK_SIZE, DEFAULT_AIO_FILE_BLOCK_SIZE));
}
file_in = createReadBufferFromFileBase(
path, estimated_size, aio_threshold, buf_size, -1, memory ? &(*memory)[0] : nullptr);
compressed_in = &*file_in; compressed_in = &*file_in;
} }
} }
@ -104,9 +84,9 @@ private:
public: public:
CachedCompressedReadBuffer( CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, size_t aio_threshold_, const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, size_t aio_threshold_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE, Memory * memory = nullptr) size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(nullptr, 0), path(path_), cache(cache_), memory{memory}, buf_size(buf_size_), : ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_), estimated_size(estimated_size_),
estimated_size(estimated_size_), aio_threshold(aio_threshold_), file_pos(0) aio_threshold(aio_threshold_), file_pos(0)
{ {
} }

View File

@ -43,10 +43,9 @@ private:
public: public:
CompressedReadBufferFromFile( CompressedReadBufferFromFile(
const std::string & path, size_t estimated_size, size_t aio_threshold, const std::string & path, size_t estimated_size, size_t aio_threshold, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr)
: BufferWithOwnMemory<ReadBuffer>(0), : BufferWithOwnMemory<ReadBuffer>(0),
p_file_in(createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size, -1, existing_memory)), p_file_in(createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size)),
file_in(*p_file_in) file_in(*p_file_in)
{ {
compressed_in = &file_in; compressed_in = &file_in;

View File

@ -121,8 +121,6 @@ struct Settings
\ \
/** Распределять чтение из MergeTree по потокам равномерно, обеспечивая стабильное среднее время исполнения каждого потока в пределах одного чтения. */ \ /** Распределять чтение из MergeTree по потокам равномерно, обеспечивая стабильное среднее время исполнения каждого потока в пределах одного чтения. */ \
M(SettingBool, merge_tree_uniform_read_distribution, false) \ M(SettingBool, merge_tree_uniform_read_distribution, false) \
/** Переиспользовать буферы в MergeTreeReader::reconf() после выполнения подзадачи на чтение. */ \
M(SettingBool, merge_tree_uniform_read_reuse_buffers, false) \
\ \
/** Минимальная длина выражения expr = x1 OR ... expr = xN для оптимизации */ \ /** Минимальная длина выражения expr = x1 OR ... expr = xN для оптимизации */ \
M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3) \ M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3) \

View File

@ -31,9 +31,9 @@ public:
const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns_, const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns_,
UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_, UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_,
MergeTreeData & storage_, const MarkRanges & all_mark_ranges, MergeTreeData & storage_, const MarkRanges & all_mark_ranges,
size_t aio_threshold_, size_t max_read_buffer_size_, bool reuse_buffers = false) size_t aio_threshold_, size_t max_read_buffer_size_)
: uncompressed_cache(uncompressed_cache_), mark_cache(mark_cache_), storage(storage_), : uncompressed_cache(uncompressed_cache_), mark_cache(mark_cache_), storage(storage_),
aio_threshold(aio_threshold_), max_read_buffer_size(max_read_buffer_size_), reuse_buffers{reuse_buffers} aio_threshold(aio_threshold_), max_read_buffer_size(max_read_buffer_size_)
{ {
reconf(path_, data_part, columns_, all_mark_ranges); reconf(path_, data_part, columns_, all_mark_ranges);
} }
@ -42,9 +42,6 @@ public:
const String & path, const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns, const String & path, const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns,
const MarkRanges & all_mark_ranges) const MarkRanges & all_mark_ranges)
{ {
if (!reuse_buffers)
buffers.clear();
this->path = path; this->path = path;
this->data_part = data_part; this->data_part = data_part;
this->part_name = data_part->name; this->part_name = data_part->name;
@ -52,8 +49,6 @@ public:
this->all_mark_ranges = all_mark_ranges; this->all_mark_ranges = all_mark_ranges;
this->streams.clear(); this->streams.clear();
/// @todo sort buffers using capacity, find best match for Stream.
try try
{ {
if (!Poco::File(path).exists()) if (!Poco::File(path).exists())
@ -167,7 +162,7 @@ private:
Stream( Stream(
const String & path_prefix_, UncompressedCache * uncompressed_cache, MarkCache * mark_cache, const String & path_prefix_, UncompressedCache * uncompressed_cache, MarkCache * mark_cache,
const MarkRanges & all_mark_ranges, size_t aio_threshold, size_t max_read_buffer_size, Memory & memory) const MarkRanges & all_mark_ranges, size_t aio_threshold, size_t max_read_buffer_size)
: path_prefix(path_prefix_) : path_prefix(path_prefix_)
{ {
loadMarks(mark_cache); loadMarks(mark_cache);
@ -220,28 +215,13 @@ private:
if (uncompressed_cache) if (uncompressed_cache)
{ {
cached_buffer = std::make_unique<CachedCompressedReadBuffer>( cached_buffer = std::make_unique<CachedCompressedReadBuffer>(
path_prefix + ".bin", uncompressed_cache, estimated_size, aio_threshold, buffer_size, &memory); path_prefix + ".bin", uncompressed_cache, estimated_size, aio_threshold, buffer_size);
data_buffer = cached_buffer.get(); data_buffer = cached_buffer.get();
} }
else else
{ {
const auto resize = [&memory] (const std::size_t size) {
const auto growth_factor = 1.6f; /// close to golden ratio
if (memory.m_capacity == 0)
memory.resize(size);
else if (memory.m_capacity < size)
memory.resize(growth_factor * size);
};
if (aio_threshold == 0 || estimated_size < aio_threshold)
resize(buffer_size);
else
resize(2 * Memory::align(buffer_size + DEFAULT_AIO_FILE_BLOCK_SIZE, DEFAULT_AIO_FILE_BLOCK_SIZE));
/** @todo CompressedReadBufferFromFile creates buffer for decompressed blocks, consider providing another
* instance of Memory type for it */
non_cached_buffer = std::make_unique<CompressedReadBufferFromFile>( non_cached_buffer = std::make_unique<CompressedReadBufferFromFile>(
path_prefix + ".bin", estimated_size, aio_threshold, buffer_size, &memory[0]); path_prefix + ".bin", estimated_size, aio_threshold, buffer_size);
data_buffer = non_cached_buffer.get(); data_buffer = non_cached_buffer.get();
} }
} }
@ -303,10 +283,6 @@ private:
typedef std::map<std::string, std::unique_ptr<Stream> > FileStreams; typedef std::map<std::string, std::unique_ptr<Stream> > FileStreams;
/** buffers shall be deleted after streams because some streams may use existing_memory even inside destructor
* (ReadBufferAIO passes pointer to buffer to a syscall and waits for it's completion in destructor, thus there is
* a chance that system will write to memory after it has been freed */
std::vector<std::unique_ptr<Memory>> buffers;
/// Используется в качестве подсказки, чтобы уменьшить количество реаллокаций при создании столбца переменной длины. /// Используется в качестве подсказки, чтобы уменьшить количество реаллокаций при создании столбца переменной длины.
std::map<std::string, double> avg_value_size_hints; std::map<std::string, double> avg_value_size_hints;
String path; String path;
@ -324,7 +300,6 @@ private:
MarkRanges all_mark_ranges; MarkRanges all_mark_ranges;
size_t aio_threshold; size_t aio_threshold;
size_t max_read_buffer_size; size_t max_read_buffer_size;
bool reuse_buffers;
void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, size_t level = 0) void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, size_t level = 0)
{ {
@ -336,10 +311,6 @@ private:
if (!Poco::File(path + escaped_column_name + ".bin").exists()) if (!Poco::File(path + escaped_column_name + ".bin").exists())
return; return;
const auto buffer_idx = streams.size();
if (buffer_idx == buffers.size())
buffers.push_back(std::make_unique<Memory>(0, DEFAULT_AIO_FILE_BLOCK_SIZE));
/// Для массивов используются отдельные потоки для размеров. /// Для массивов используются отдельные потоки для размеров.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type)) if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{ {
@ -351,14 +322,14 @@ private:
if (!streams.count(size_name)) if (!streams.count(size_name))
streams.emplace(size_name, std::make_unique<Stream>( streams.emplace(size_name, std::make_unique<Stream>(
path + escaped_size_name, uncompressed_cache, mark_cache, path + escaped_size_name, uncompressed_cache, mark_cache,
all_mark_ranges, aio_threshold, max_read_buffer_size, *buffers[buffer_idx])); all_mark_ranges, aio_threshold, max_read_buffer_size));
addStream(name, *type_arr->getNestedType(), all_mark_ranges, level + 1); addStream(name, *type_arr->getNestedType(), all_mark_ranges, level + 1);
} }
else else
streams.emplace(name, std::make_unique<Stream>( streams.emplace(name, std::make_unique<Stream>(
path + escaped_column_name, uncompressed_cache, mark_cache, path + escaped_column_name, uncompressed_cache, mark_cache,
all_mark_ranges, aio_threshold, max_read_buffer_size, *buffers[buffer_idx])); all_mark_ranges, aio_threshold, max_read_buffer_size));
} }

View File

@ -21,11 +21,15 @@ public:
MergeTreeData & storage, const bool use_uncompressed_cache, const ExpressionActionsPtr & prewhere_actions, MergeTreeData & storage, const bool use_uncompressed_cache, const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column, const Settings & settings, const Names & virt_column_names) const String & prewhere_column, const Settings & settings, const Names & virt_column_names)
: thread{thread}, pool{pool}, block_size_marks{block_size / storage.index_granularity}, : thread{thread}, pool{pool}, block_size_marks{block_size / storage.index_granularity},
min_marks_to_read{(min_marks_to_read + block_size_marks - 1) / block_size_marks * block_size_marks}, /// round min_marks_to_read up to nearest multiple of block_size expressed in marks
min_marks_to_read{block_size
? (min_marks_to_read * storage.index_granularity + block_size - 1)
/ block_size * block_size / storage.index_granularity
: min_marks_to_read
},
storage{storage}, use_uncompressed_cache{use_uncompressed_cache}, prewhere_actions{prewhere_actions}, storage{storage}, use_uncompressed_cache{use_uncompressed_cache}, prewhere_actions{prewhere_actions},
prewhere_column{prewhere_column}, min_bytes_to_use_direct_io{settings.min_bytes_to_use_direct_io}, prewhere_column{prewhere_column}, min_bytes_to_use_direct_io{settings.min_bytes_to_use_direct_io},
max_read_buffer_size{settings.max_read_buffer_size}, max_read_buffer_size{settings.max_read_buffer_size}, virt_column_names{virt_column_names},
reuse_buffers(settings.merge_tree_uniform_read_reuse_buffers), virt_column_names{virt_column_names},
log{&Logger::get("MergeTreeThreadBlockInputStream")} log{&Logger::get("MergeTreeThreadBlockInputStream")}
{} {}
@ -103,13 +107,13 @@ private:
reader = std::make_unique<MergeTreeReader>( reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(),
storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, reuse_buffers); storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
if (prewhere_actions) if (prewhere_actions)
pre_reader = std::make_unique<MergeTreeReader>( pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), storage, task->mark_ranges, min_bytes_to_use_direct_io, owned_mark_cache.get(), storage, task->mark_ranges, min_bytes_to_use_direct_io,
max_read_buffer_size, reuse_buffers); max_read_buffer_size);
} }
else else
{ {
@ -320,7 +324,6 @@ private:
const String prewhere_column; const String prewhere_column;
const std::size_t min_bytes_to_use_direct_io; const std::size_t min_bytes_to_use_direct_io;
const std::size_t max_read_buffer_size; const std::size_t max_read_buffer_size;
const bool reuse_buffers;
const Names virt_column_names; const Names virt_column_names;
Logger * log; Logger * log;

View File

@ -13,10 +13,7 @@ namespace DB
/// не влезают в основной буфер. /// не влезают в основной буфер.
ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, char * existing_memory_) ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, char * existing_memory_)
: ReadBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE), : ReadBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
fill_buffer(BufferWithOwnMemory<ReadBuffer>(internalBuffer().size(), fill_buffer(BufferWithOwnMemory<ReadBuffer>(internalBuffer().size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
existing_memory_ ? existing_memory_ + Memory::align(internalBuffer().size(), DEFAULT_AIO_FILE_BLOCK_SIZE)
: nullptr,
DEFAULT_AIO_FILE_BLOCK_SIZE)),
filename(filename_) filename(filename_)
{ {
ProfileEvents::increment(ProfileEvents::FileOpen); ProfileEvents::increment(ProfileEvents::FileOpen);