From 628c32c8401583d519177e40dd832f39f45d745e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 12 Apr 2015 07:47:03 +0300 Subject: [PATCH] dbms: using setting 'max_read_buffer_size' in Log and TinyLog tables [#METR-15090]. --- dbms/src/Storages/StorageLog.cpp | 24 +++++++++++++++--------- dbms/src/Storages/StorageTinyLog.cpp | 15 ++++++++------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index a10bf74c1ce..a46419741de 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -40,9 +40,11 @@ using Poco::SharedPtr; class LogBlockInputStream : public IProfilingBlockInputStream { public: - LogBlockInputStream(size_t block_size_, const Names & column_names_, StorageLog & storage_, size_t mark_number_, size_t rows_limit_) + LogBlockInputStream( + size_t block_size_, const Names & column_names_, StorageLog & storage_, + size_t mark_number_, size_t rows_limit_, size_t max_read_buffer_size_) : block_size(block_size_), column_names(column_names_), storage(storage_), - mark_number(mark_number_), rows_limit(rows_limit_), current_mark(mark_number_) {} + mark_number(mark_number_), rows_limit(rows_limit_), current_mark(mark_number_), max_read_buffer_size(max_read_buffer_size_) {} String getName() const { return "LogBlockInputStream"; } @@ -66,14 +68,14 @@ private: StorageLog & storage; size_t mark_number; /// С какой засечки читать данные size_t rows_limit; /// Максимальное количество строк, которых можно прочитать - size_t rows_read = 0; size_t current_mark; + size_t max_read_buffer_size; struct Stream { - Stream(const std::string & data_path, size_t offset) - : plain(data_path, std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(data_path).getSize())), + Stream(const std::string & data_path, size_t offset, size_t max_read_buffer_size) + : plain(data_path, std::min(max_read_buffer_size, Poco::File(data_path).getSize())), compressed(plain) { if (offset) @@ -278,7 +280,8 @@ void LogBlockInputStream::addStream(const String & name, const IDataType & type, storage.files[size_name].data_file.path(), mark_number ? storage.files[size_name].marks[mark_number].offset - : 0))); + : 0, + max_read_buffer_size))); addStream(name, *type_arr->getNestedType(), level + 1); } @@ -287,7 +290,8 @@ void LogBlockInputStream::addStream(const String & name, const IDataType & type, storage.files[name].data_file.path(), mark_number ? storage.files[name].marks[mark_number].offset - : 0)); + : 0, + max_read_buffer_size)); } @@ -669,7 +673,8 @@ BlockInputStreams StorageLog::read( max_block_size, column_names, *this, - 0, std::numeric_limits::max())); + 0, std::numeric_limits::max(), + settings.max_read_buffer_size)); } else { @@ -695,7 +700,8 @@ BlockInputStreams StorageLog::read( marks[from_mark + (thread + 1) * (to_mark - from_mark) / threads - 1].rows - ((thread == 0 && from_mark == 0) ? 0 - : marks[from_mark + thread * (to_mark - from_mark) / threads - 1].rows))); + : marks[from_mark + thread * (to_mark - from_mark) / threads - 1].rows), + settings.max_read_buffer_size)); } } diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index 27797787edb..b854a958883 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -39,8 +39,8 @@ using Poco::SharedPtr; class TinyLogBlockInputStream : public IProfilingBlockInputStream { public: - TinyLogBlockInputStream(size_t block_size_, const Names & column_names_, StorageTinyLog & storage_) - : block_size(block_size_), column_names(column_names_), storage(storage_) {} + TinyLogBlockInputStream(size_t block_size_, const Names & column_names_, StorageTinyLog & storage_, size_t max_read_buffer_size_) + : block_size(block_size_), column_names(column_names_), storage(storage_), max_read_buffer_size(max_read_buffer_size_) {} String getName() const { return "TinyLogBlockInputStream"; } @@ -53,11 +53,12 @@ private: Names column_names; StorageTinyLog & storage; bool finished = false; + size_t max_read_buffer_size; struct Stream { - Stream(const std::string & data_path) - : plain(data_path, std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(data_path).getSize())), + Stream(const std::string & data_path, size_t max_read_buffer_size) + : plain(data_path, std::min(max_read_buffer_size, Poco::File(data_path).getSize())), compressed(plain) { } @@ -220,12 +221,12 @@ void TinyLogBlockInputStream::addStream(const String & name, const IDataType & t { String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); if (!streams.count(size_name)) - streams.emplace(size_name, std::unique_ptr(new Stream(storage.files[size_name].data_file.path()))); + streams.emplace(size_name, std::unique_ptr(new Stream(storage.files[size_name].data_file.path(), max_read_buffer_size))); addStream(name, *type_arr->getNestedType(), level + 1); } else - streams[name].reset(new Stream(storage.files[name].data_file.path())); + streams[name].reset(new Stream(storage.files[name].data_file.path(), max_read_buffer_size)); } @@ -435,7 +436,7 @@ BlockInputStreams StorageTinyLog::read( { check(column_names); processed_stage = QueryProcessingStage::FetchColumns; - return BlockInputStreams(1, new TinyLogBlockInputStream(max_block_size, column_names, *this)); + return BlockInputStreams(1, new TinyLogBlockInputStream(max_block_size, column_names, *this, settings.max_read_buffer_size)); }