This commit is contained in:
Alexey Milovidov 2013-06-15 08:38:30 +00:00
parent 44aad8f802
commit 91a062a575

View File

@ -88,14 +88,18 @@ void LogBlockInputStream::addStream(const String & name, const IDataType & type,
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco::NumberFormatter::format(level);
streams.insert(std::make_pair(size_name, new Stream(
storage.files[size_name].data_file.path(),
storage.files[size_name].marks[mark_number].offset)));
mark_number
? storage.files[size_name].marks[mark_number].offset
: 0)));
addStream(name, *type_arr->getNestedType(), level + 1);
}
else
streams.insert(std::make_pair(name, new Stream(
storage.files[name].data_file.path(),
storage.files[name].marks[mark_number].offset)));
mark_number
? storage.files[name].marks[mark_number].offset
: 0)));
}
@ -407,36 +411,58 @@ BlockInputStreams StorageLog::read(
size_t max_block_size,
unsigned threads)
{
loadMarks();
/** Если читаем все данные в один поток, то засечки не требуются.
* Отсутствие необходимости загружать засечки позволяет уменьшить потребление памяти при использовании таблицы типа ChunkMerger.
*/
bool read_all_data_in_one_thread = (threads == 1 && from_mark == 0 && to_mark == std::numeric_limits<size_t>::max());
if (!read_all_data_in_one_thread)
loadMarks();
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
Poco::ScopedReadRWLock lock(rwlock);
const Marks & marks = files.begin()->second.marks;
size_t marks_size = marks.size();
if (to_mark > marks_size || to_mark < from_mark)
throw Exception("Marks out of range in StorageLog::read", ErrorCodes::LOGICAL_ERROR);
if (threads > to_mark - from_mark)
threads = to_mark - from_mark;
BlockInputStreams res;
for (size_t thread = 0; thread < threads; ++thread)
if (read_all_data_in_one_thread)
{
res.push_back(new LogBlockInputStream(
max_block_size,
column_names,
thisPtr(),
from_mark + thread * (to_mark - from_mark) / threads,
marks[from_mark + (thread + 1) * (to_mark - from_mark) / threads - 1].rows -
((thread == 0 && from_mark == 0)
? 0
: marks[from_mark + thread * (to_mark - from_mark) / threads - 1].rows)));
0, std::numeric_limits<size_t>::max()));
}
else
{
const Marks & marks = files.begin()->second.marks;
size_t marks_size = marks.size();
if (to_mark == std::numeric_limits<size_t>::max())
to_mark = marks_size;
if (to_mark > marks_size || to_mark < from_mark)
throw Exception("Marks out of range in StorageLog::read", ErrorCodes::LOGICAL_ERROR);
if (threads > to_mark - from_mark)
threads = to_mark - from_mark;
BlockInputStreams res;
for (size_t thread = 0; thread < threads; ++thread)
{
res.push_back(new LogBlockInputStream(
max_block_size,
column_names,
thisPtr(),
from_mark + thread * (to_mark - from_mark) / threads,
marks[from_mark + (thread + 1) * (to_mark - from_mark) / threads - 1].rows -
((thread == 0 && from_mark == 0)
? 0
: marks[from_mark + thread * (to_mark - from_mark) / threads - 1].rows)));
}
}
return res;
}
@ -449,8 +475,7 @@ BlockInputStreams StorageLog::read(
size_t max_block_size,
unsigned threads)
{
loadMarks();
return read(0, marksCount(), column_names, query, settings, processed_stage, max_block_size, threads);
return read(0, std::numeric_limits<size_t>::max(), column_names, query, settings, processed_stage, max_block_size, threads);
}