dbms: using setting 'max_read_buffer_size' in Log and TinyLog tables [#METR-15090].

This commit is contained in:
Alexey Milovidov 2015-04-12 07:47:03 +03:00
parent cac503ff82
commit 628c32c840
2 changed files with 23 additions and 16 deletions

View File

@ -40,9 +40,11 @@ using Poco::SharedPtr;
class LogBlockInputStream : public IProfilingBlockInputStream
{
public:
LogBlockInputStream(size_t block_size_, const Names & column_names_, StorageLog & storage_, size_t mark_number_, size_t rows_limit_)
LogBlockInputStream(
size_t block_size_, const Names & column_names_, StorageLog & storage_,
size_t mark_number_, size_t rows_limit_, size_t max_read_buffer_size_)
: block_size(block_size_), column_names(column_names_), storage(storage_),
mark_number(mark_number_), rows_limit(rows_limit_), current_mark(mark_number_) {}
mark_number(mark_number_), rows_limit(rows_limit_), current_mark(mark_number_), max_read_buffer_size(max_read_buffer_size_) {}
String getName() const { return "LogBlockInputStream"; }
@ -66,14 +68,14 @@ private:
StorageLog & storage;
size_t mark_number; /// С какой засечки читать данные
size_t rows_limit; /// Максимальное количество строк, которых можно прочитать
size_t rows_read = 0;
size_t current_mark;
size_t max_read_buffer_size;
struct Stream
{
Stream(const std::string & data_path, size_t offset)
: plain(data_path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(data_path).getSize())),
Stream(const std::string & data_path, size_t offset, size_t max_read_buffer_size)
: plain(data_path, std::min(max_read_buffer_size, Poco::File(data_path).getSize())),
compressed(plain)
{
if (offset)
@ -278,7 +280,8 @@ void LogBlockInputStream::addStream(const String & name, const IDataType & type,
storage.files[size_name].data_file.path(),
mark_number
? storage.files[size_name].marks[mark_number].offset
: 0)));
: 0,
max_read_buffer_size)));
addStream(name, *type_arr->getNestedType(), level + 1);
}
@ -287,7 +290,8 @@ void LogBlockInputStream::addStream(const String & name, const IDataType & type,
storage.files[name].data_file.path(),
mark_number
? storage.files[name].marks[mark_number].offset
: 0));
: 0,
max_read_buffer_size));
}
@ -669,7 +673,8 @@ BlockInputStreams StorageLog::read(
max_block_size,
column_names,
*this,
0, std::numeric_limits<size_t>::max()));
0, std::numeric_limits<size_t>::max(),
settings.max_read_buffer_size));
}
else
{
@ -695,7 +700,8 @@ BlockInputStreams StorageLog::read(
marks[from_mark + (thread + 1) * (to_mark - from_mark) / threads - 1].rows -
((thread == 0 && from_mark == 0)
? 0
: marks[from_mark + thread * (to_mark - from_mark) / threads - 1].rows)));
: marks[from_mark + thread * (to_mark - from_mark) / threads - 1].rows),
settings.max_read_buffer_size));
}
}

View File

@ -39,8 +39,8 @@ using Poco::SharedPtr;
class TinyLogBlockInputStream : public IProfilingBlockInputStream
{
public:
TinyLogBlockInputStream(size_t block_size_, const Names & column_names_, StorageTinyLog & storage_)
: block_size(block_size_), column_names(column_names_), storage(storage_) {}
TinyLogBlockInputStream(size_t block_size_, const Names & column_names_, StorageTinyLog & storage_, size_t max_read_buffer_size_)
: block_size(block_size_), column_names(column_names_), storage(storage_), max_read_buffer_size(max_read_buffer_size_) {}
String getName() const { return "TinyLogBlockInputStream"; }
@ -53,11 +53,12 @@ private:
Names column_names;
StorageTinyLog & storage;
bool finished = false;
size_t max_read_buffer_size;
struct Stream
{
Stream(const std::string & data_path)
: plain(data_path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(data_path).getSize())),
Stream(const std::string & data_path, size_t max_read_buffer_size)
: plain(data_path, std::min(max_read_buffer_size, Poco::File(data_path).getSize())),
compressed(plain)
{
}
@ -220,12 +221,12 @@ void TinyLogBlockInputStream::addStream(const String & name, const IDataType & t
{
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name))
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(storage.files[size_name].data_file.path())));
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(storage.files[size_name].data_file.path(), max_read_buffer_size)));
addStream(name, *type_arr->getNestedType(), level + 1);
}
else
streams[name].reset(new Stream(storage.files[name].data_file.path()));
streams[name].reset(new Stream(storage.files[name].data_file.path(), max_read_buffer_size));
}
@ -435,7 +436,7 @@ BlockInputStreams StorageTinyLog::read(
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
return BlockInputStreams(1, new TinyLogBlockInputStream(max_block_size, column_names, *this));
return BlockInputStreams(1, new TinyLogBlockInputStream(max_block_size, column_names, *this, settings.max_read_buffer_size));
}