dbms: lowered memory usage for merge operations [#CONV-8705].

This commit is contained in:
Alexey Milovidov 2013-09-13 23:28:40 +00:00
parent ca2da37d2e
commit 2be066285a
3 changed files with 10 additions and 5 deletions

View File

@ -13,7 +13,11 @@
#define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300
#define DBMS_DEFAULT_POLL_INTERVAL 10
/// Какими блоками по-умолчанию читаются и пишутся данные (в числе строк).
#define DEFAULT_BLOCK_SIZE 1048576
/// То же самое, но для операций слияния. Меньше DEFAULT_BLOCK_SIZE для экономии оперативки (так как читаются все столбцы).
#define DEFAULT_MERGE_BLOCK_SIZE 10000
#define DEFAULT_MAX_QUERY_SIZE 1048576
#define SHOW_CHARS_ON_SYNTAX_ERROR 160L
#define DEFAULT_MAX_THREADS 8

View File

@ -381,7 +381,7 @@ bool StorageChunkMerger::mergeChunks(const Storages & chunks)
select_list->children.push_back(newIdentifier(it->first, ASTIdentifier::Column));
}
QueryProcessingStage::Enum processed_stage;
QueryProcessingStage::Enum processed_stage = QueryProcessingStage::Complete;
Settings settings = context.getSettings();
@ -389,7 +389,8 @@ bool StorageChunkMerger::mergeChunks(const Storages & chunks)
src_column_names,
select_query_ptr,
settings,
processed_stage);
processed_stage,
DEFAULT_MERGE_BLOCK_SIZE);
BlockInputStreamPtr input = new AddingDefaultBlockInputStream(new ConcatBlockInputStream(input_streams), required_columns);

View File

@ -860,14 +860,14 @@ void StorageMergeTree::mergeParts(std::vector<DataPartPtr> parts)
{
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
src_streams.push_back(new ExpressionBlockInputStream(new MergeTreeBlockInputStream(
full_path + parts[i]->name + '/', DEFAULT_BLOCK_SIZE, all_column_names, *this, parts[i], ranges, StoragePtr(), false), primary_expr));
full_path + parts[i]->name + '/', DEFAULT_MERGE_BLOCK_SIZE, all_column_names, *this, parts[i], ranges, StoragePtr(), false), primary_expr));
}
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.
/// В слитом куске строки с одинаковым ключом должны идти в порядке возрастания идентификатора исходного куска, то есть (примерного) возрастания времени вставки.
BlockInputStreamPtr merged_stream = sign_column.empty()
? new MergingSortedBlockInputStream(src_streams, sort_descr, DEFAULT_BLOCK_SIZE)
: new CollapsingSortedBlockInputStream(src_streams, sort_descr, sign_column, DEFAULT_BLOCK_SIZE);
? new MergingSortedBlockInputStream(src_streams, sort_descr, DEFAULT_MERGE_BLOCK_SIZE)
: new CollapsingSortedBlockInputStream(src_streams, sort_descr, sign_column, DEFAULT_MERGE_BLOCK_SIZE);
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream(*this,
new_data_part->left_date, new_data_part->right_date, new_data_part->left, new_data_part->right, new_data_part->level);