This commit is contained in:
Alexey Milovidov 2015-03-14 05:37:53 +03:00
parent 5c69667f2a
commit 291d9daabf
4 changed files with 30 additions and 12 deletions

View File

@ -235,8 +235,11 @@ public:
{
Poco::File(part_path).createDirectories();
index_file_stream = new WriteBufferFromFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
index_stream = new HashingWriteBuffer(*index_file_stream);
if (storage.mode != MergeTreeData::Unsorted)
{
index_file_stream = new WriteBufferFromFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
index_stream = new HashingWriteBuffer(*index_file_stream);
}
for (const auto & it : columns_list)
addStream(part_path, it.name, *it.type);
@ -260,7 +263,9 @@ public:
{
for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it)
{
index_vec.push_back((*(*it)->column)[i]);
if (storage.mode != MergeTreeData::Unsorted)
index_vec.push_back((*(*it)->column)[i]);
(*it)->type->serializeBinary(index_vec.back(), *index_stream);
}
@ -291,9 +296,13 @@ public:
/// Заканчиваем запись и достаем чексуммы.
MergeTreeData::DataPart::Checksums checksums;
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();
if (storage.mode != MergeTreeData::Unsorted)
{
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();
index_stream = nullptr;
}
for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
{
@ -301,7 +310,6 @@ public:
it->second->addToChecksums(checksums);
}
index_stream = nullptr;
column_streams.clear();
if (marks_count == 0)

View File

@ -336,8 +336,12 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
__sync_add_and_fetch(&merge_entry->bytes_read_uncompressed, value.bytes);
});
src_streams.push_back(new MaterializingBlockInputStream{
new ExpressionBlockInputStream(input.release(), data.getPrimaryExpression())});
if (data.mode != MergeTreeData::Unsorted)
src_streams.push_back(new MaterializingBlockInputStream{
new ExpressionBlockInputStream(input.release(), data.getPrimaryExpression())});
else
src_streams.push_back(input.release());
sum_rows_approx += parts[i]->size * data.index_granularity;
}

View File

@ -271,7 +271,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
for (auto & part : parts)
{
RangesInDataPart ranges(part, (*part_index)++);
ranges.ranges = markRangesFromPkRange(part->index, key_condition, settings);
if (data.mode != MergeTreeData::Unsorted)
ranges.ranges = markRangesFromPkRange(part->index, key_condition, settings);
else
ranges.ranges = MarkRanges{MarkRange{0, part->size}};
if (!ranges.ranges.empty())
{

View File

@ -94,12 +94,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
new_data_part->is_temp = true;
/// Если для сортировки надо вычислить некоторые столбцы - делаем это.
data.getPrimaryExpression()->execute(block);
if (data.mode != MergeTreeData::Unsorted)
data.getPrimaryExpression()->execute(block);
SortDescription sort_descr = data.getSortDescription();
/// Сортируем.
stableSortBlock(block, sort_descr);
if (data.mode != MergeTreeData::Unsorted)
stableSortBlock(block, sort_descr);
NamesAndTypesList columns = data.getColumnsList().filter(block.getColumnsList().getNames());
MergedBlockOutputStream out(data, part_tmp_path, columns, CompressionMethod::LZ4);