This commit is contained in:
Michael Kolupaev 2012-12-03 08:52:58 +00:00
parent 914b41cf2f
commit 2d8026c0b0

View File

@ -273,7 +273,7 @@ class MergedBlockOutputStream : public IBlockOutputStream
public:
MergedBlockOutputStream(StorageMergeTree & storage_,
UInt16 min_date, UInt16 max_date, UInt64 min_part_id, UInt64 max_part_id, UInt32 level)
: storage(storage_), index_offset(0)
: storage(storage_), index_offset(0), marks_count(0)
{
part_name = storage.getPartName(
Yandex::DayNum_t(min_date), Yandex::DayNum_t(max_date),
@ -305,8 +305,14 @@ public:
: &block.getByPosition(storage.sort_descr[i].column_number));
for (size_t i = index_offset; i < rows; i += storage.index_granularity)
{
for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it)
{
(*it)->type->serializeBinary((*(*it)->column)[i], *index_stream);
}
++marks_count;
}
/// Теперь пишем данные.
for (NamesAndTypesList::const_iterator it = storage.columns->begin(); it != storage.columns->end(); ++it)
@ -326,6 +332,9 @@ public:
index_stream = NULL;
column_streams.clear();
if (marks_count == 0)
throw Exception("Empty part", ErrorCodes::LOGICAL_ERROR);
/// Переименовываем кусок.
Poco::File(part_tmp_path).renameTo(part_res_path);
@ -334,11 +343,18 @@ public:
BlockOutputStreamPtr clone() { throw Exception("Cannot clone MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED); }
/// Сколько засечек уже записано.
size_t marksCount()
{
return marks_count;
}
private:
StorageMergeTree & storage;
String part_name;
String part_tmp_path;
String part_res_path;
size_t marks_count;
struct ColumnStream
{
@ -434,6 +450,8 @@ private:
}
};
typedef Poco::SharedPtr<MergedBlockOutputStream> MergedBlockOutputStreamPtr;
/** Диапазон с открытыми или закрытыми концами; возможно, неограниченный.
* Определяет, какую часть данных читать, при наличии индекса.
@ -1514,11 +1532,9 @@ void StorageMergeTree::mergeParts(std::vector<DataPartPtr> parts)
new_data_part->left = parts.front()->left;
new_data_part->right = parts.back()->right;
new_data_part->level = 0;
new_data_part->size = 0;
for (size_t i = 0; i < parts.size(); ++i)
{
new_data_part->level = std::max(new_data_part->level, parts[i]->level);
new_data_part->size += parts[i]->size;
}
++new_data_part->level;
new_data_part->name = getPartName(
@ -1541,11 +1557,12 @@ void StorageMergeTree::mergeParts(std::vector<DataPartPtr> parts)
? new MergingSortedBlockInputStream(src_streams, sort_descr, DEFAULT_BLOCK_SIZE)
: new CollapsingSortedBlockInputStream(src_streams, sort_descr, sign_column, DEFAULT_BLOCK_SIZE);
BlockOutputStreamPtr to = new MergedBlockOutputStream(*this,
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream(*this,
new_data_part->left_date, new_data_part->right_date, new_data_part->left, new_data_part->right, new_data_part->level);
copyData(*merged_stream, *to);
new_data_part->size = to->marksCount();
new_data_part->modification_time = time(0);
{