mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 09:40:49 +00:00
Merge
This commit is contained in:
parent
db3c061396
commit
b48bc12739
@ -34,12 +34,12 @@ public:
|
||||
MergeTreeDataWriter(MergeTreeData & data_) : data(data_), log(&Logger::get("MergeTreeDataWriter")), flags(O_TRUNC | O_CREAT | O_WRONLY) {}
|
||||
|
||||
/** Разбивает блок на блоки, каждый из которых нужно записать в отдельный кусок.
|
||||
* (читай: разбивает строки по месяцам)
|
||||
* (читай: разбивает строки по месяцам)
|
||||
* Работает детерминированно: если отдать на вход такой же блок, на выходе получатся такие же блоки в таком же порядке.
|
||||
*/
|
||||
BlocksWithDateIntervals splitBlockIntoParts(const Block & block);
|
||||
|
||||
/** Все строки должны относиться к одному месяцу. Возвращает название временного куска.
|
||||
/** Все строки должны относиться к одному месяцу.
|
||||
* temp_index - значение left и right для нового куска. Можно будет изменить при переименовании.
|
||||
* Возвращает кусок с именем, начинающимся с tmp_, еще не добавленный в MergeTreeData.
|
||||
*/
|
||||
@ -56,7 +56,7 @@ private:
|
||||
|
||||
/// Записать данные одного столбца.
|
||||
void writeData(const String & path, const String & name, const IDataType & type, const IColumn & column,
|
||||
OffsetColumns & offset_columns, size_t level = 0);
|
||||
OffsetColumns & offset_columns, MergeTreeData::DataPart::Checksums & checksums, size_t level = 0);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <DB/Common/escapeForFileName.h>
|
||||
#include <DB/DataTypes/DataTypeNested.h>
|
||||
#include <DB/DataTypes/DataTypeArray.h>
|
||||
#include <DB/IO/HashingWriteBuffer.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -104,12 +105,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
|
||||
/// Наконец-то можно писать данные на диск.
|
||||
LOG_TRACE(log, "Writing index.");
|
||||
|
||||
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
|
||||
MergeTreeData::DataPart::Checksums checksums;
|
||||
MergeTreeData::DataPart::Index index_vec;
|
||||
|
||||
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
|
||||
index_vec.reserve(part_size * sort_descr.size());
|
||||
|
||||
{
|
||||
WriteBufferFromFile index(part_tmp_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, flags);
|
||||
WriteBufferFromFile index_file(part_tmp_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, flags);
|
||||
HashingWriteBuffer index(index_file);
|
||||
|
||||
typedef std::vector<const ColumnWithNameAndType *> PrimaryColumns;
|
||||
PrimaryColumns primary_columns;
|
||||
@ -130,6 +134,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
|
||||
}
|
||||
|
||||
index.next();
|
||||
checksums.file_checksums["primary.idx"].size = index.count();
|
||||
checksums.file_checksums["primary.idx"].hash = index.getHash();
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Writing data.");
|
||||
@ -140,7 +146,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
|
||||
for (size_t i = 0; i < columns; ++i)
|
||||
{
|
||||
const ColumnWithNameAndType & column = block.getByPosition(i);
|
||||
writeData(part_tmp_path, column.name, *column.type, *column.column, offset_columns);
|
||||
writeData(part_tmp_path, column.name, *column.type, *column.column, offset_columns, checksums);
|
||||
}
|
||||
|
||||
/// Запишем файл с чексуммами.
|
||||
{
|
||||
WriteBufferFromFile checksums_file(part_tmp_path + "checksums.txt", 1024, flags);
|
||||
checksums.writeText(checksums_file);
|
||||
checksums_file.next();
|
||||
}
|
||||
|
||||
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
|
||||
@ -155,12 +168,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
|
||||
new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date);
|
||||
new_data_part->right_month = date_lut.toFirstDayNumOfMonth(new_data_part->right_date);
|
||||
new_data_part->index.swap(index_vec);
|
||||
new_data_part->checksums = checksums;
|
||||
|
||||
return new_data_part;
|
||||
}
|
||||
|
||||
void MergeTreeDataWriter::writeData(const String & path, const String & name, const IDataType & type, const IColumn & column,
|
||||
OffsetColumns & offset_columns, size_t level)
|
||||
OffsetColumns & offset_columns, MergeTreeData::DataPart::Checksums & checksums, size_t level)
|
||||
{
|
||||
String escaped_column_name = escapeForFileName(name);
|
||||
size_t size = column.size();
|
||||
@ -174,15 +188,17 @@ void MergeTreeDataWriter::writeData(const String & path, const String & name, co
|
||||
{
|
||||
offset_columns.insert(size_name);
|
||||
|
||||
WriteBufferFromFile plain(path + size_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags);
|
||||
WriteBufferFromFile marks(path + size_name + ".mrk", 4096, flags);
|
||||
CompressedWriteBuffer compressed(plain);
|
||||
WriteBufferFromFile plain_file(path + size_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags);
|
||||
WriteBufferFromFile marks_file(path + size_name + ".mrk", 4096, flags);
|
||||
CompressedWriteBuffer compressed_file(plain_file);
|
||||
HashingWriteBuffer marks(marks_file);
|
||||
HashingWriteBuffer compressed(compressed_file);
|
||||
|
||||
size_t prev_mark = 0;
|
||||
while (prev_mark < size)
|
||||
{
|
||||
/// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока)
|
||||
writeIntBinary(plain.count(), marks);
|
||||
writeIntBinary(plain_file.count(), marks);
|
||||
writeIntBinary(compressed.offset(), marks);
|
||||
|
||||
type_arr->serializeOffsets(column, compressed, prev_mark, data.index_granularity);
|
||||
@ -192,45 +208,26 @@ void MergeTreeDataWriter::writeData(const String & path, const String & name, co
|
||||
}
|
||||
|
||||
compressed.next();
|
||||
plain.next();
|
||||
plain_file.next();
|
||||
marks.next();
|
||||
checksums.file_checksums[size_name + ".bin"].size = compressed.count();
|
||||
checksums.file_checksums[size_name + ".bin"].hash = compressed.getHash();
|
||||
checksums.file_checksums[size_name + ".mrk"].size = marks.count();
|
||||
checksums.file_checksums[size_name + ".mrk"].hash = marks.getHash();
|
||||
}
|
||||
}
|
||||
if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
|
||||
{
|
||||
String size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
|
||||
|
||||
WriteBufferFromFile plain(path + size_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags);
|
||||
WriteBufferFromFile marks(path + size_name + ".mrk", 4096, flags);
|
||||
CompressedWriteBuffer compressed(plain);
|
||||
{
|
||||
WriteBufferFromFile plain_file(path + escaped_column_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags);
|
||||
WriteBufferFromFile marks_file(path + escaped_column_name + ".mrk", 4096, flags);
|
||||
CompressedWriteBuffer compressed_file(plain_file);
|
||||
HashingWriteBuffer marks(marks_file);
|
||||
HashingWriteBuffer compressed(compressed_file);
|
||||
|
||||
size_t prev_mark = 0;
|
||||
while (prev_mark < size)
|
||||
{
|
||||
/// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока)
|
||||
writeIntBinary(plain.count(), marks);
|
||||
writeIntBinary(compressed.offset(), marks);
|
||||
|
||||
type_nested->serializeOffsets(column, compressed, prev_mark, data.index_granularity);
|
||||
prev_mark += data.index_granularity;
|
||||
|
||||
compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего.
|
||||
}
|
||||
|
||||
compressed.next();
|
||||
plain.next();
|
||||
marks.next();
|
||||
}
|
||||
|
||||
{
|
||||
WriteBufferFromFile plain(path + escaped_column_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags);
|
||||
WriteBufferFromFile marks(path + escaped_column_name + ".mrk", 4096, flags);
|
||||
CompressedWriteBuffer compressed(plain);
|
||||
|
||||
size_t prev_mark = 0;
|
||||
while (prev_mark < size)
|
||||
{
|
||||
writeIntBinary(plain.count(), marks);
|
||||
writeIntBinary(plain_file.count(), marks);
|
||||
writeIntBinary(compressed.offset(), marks);
|
||||
|
||||
type.serializeBinary(column, compressed, prev_mark, data.index_granularity);
|
||||
@ -240,8 +237,12 @@ void MergeTreeDataWriter::writeData(const String & path, const String & name, co
|
||||
}
|
||||
|
||||
compressed.next();
|
||||
plain.next();
|
||||
plain_file.next();
|
||||
marks.next();
|
||||
checksums.file_checksums[escaped_column_name + ".bin"].size = compressed.count();
|
||||
checksums.file_checksums[escaped_column_name + ".bin"].hash = compressed.getHash();
|
||||
checksums.file_checksums[escaped_column_name + ".mrk"].size = marks.count();
|
||||
checksums.file_checksums[escaped_column_name + ".mrk"].hash = marks.getHash();
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user