Add some comments

This commit is contained in:
alesapin 2020-12-15 13:34:28 +03:00
parent 74c2211510
commit 8670836573
6 changed files with 73 additions and 35 deletions

View File

@ -72,6 +72,7 @@ void MergeTreeDataPartWriterCompact::addStreams(const String & name, const IData
namespace namespace
{ {
/// Get granules for block using index_granularity
Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity, size_t block_rows, size_t current_mark, bool last_block) Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity, size_t block_rows, size_t current_mark, bool last_block)
{ {
if (current_mark >= index_granularity.getMarksCount()) if (current_mark >= index_granularity.getMarksCount())
@ -85,6 +86,8 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
size_t rest_rows = block_rows - current_row; size_t rest_rows = block_rows - current_row;
if (rest_rows < expected_rows) if (rest_rows < expected_rows)
{ {
/// Invariant: we always have equal amount of rows for block in compact parts because we accumulate them in buffer.
/// The only exclusion is the last block, when we cannot accumulate more rows.
if (!last_block) if (!last_block)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Required to write {} rows, but only {} rows was written for the non last granule", expected_rows, rest_rows); throw Exception(ErrorCodes::LOGICAL_ERROR, "Required to write {} rows, but only {} rows was written for the non last granule", expected_rows, rest_rows);
@ -98,6 +101,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
} }
else else
{ {
/// Normal granule with amount of rows equal to rows in compute granularity
result.emplace_back(Granule{ result.emplace_back(Granule{
.start_row = current_row, .start_row = current_row,
.granularity_rows = expected_rows, .granularity_rows = expected_rows,

View File

@ -23,13 +23,16 @@ public:
void finish(IMergeTreeDataPart::Checksums & checksums, bool sync) override; void finish(IMergeTreeDataPart::Checksums & checksums, bool sync) override;
private: private:
/// Finish serialization of the data. Flush rows in buffer to disk, compute checksums.
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync); void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync);
void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override; void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override;
/// Write block of rows into .bin file and marks in .mrk files
void writeDataBlock(const Block & block, const Granules & granules); void writeDataBlock(const Block & block, const Granules & granules);
/// Write block of rows into .bin file and marks in .mrk files, primary index in .idx file
/// and skip indices in their corresponding files.
void writeDataBlockPrimaryIndexAndSkipIndices(const Block & block, const Granules & granules); void writeDataBlockPrimaryIndexAndSkipIndices(const Block & block, const Granules & granules);
void addToChecksums(MergeTreeDataPartChecksums & checksums); void addToChecksums(MergeTreeDataPartChecksums & checksums);

View File

@ -167,7 +167,7 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
default_codec, settings.max_compress_block_size, default_codec, settings.max_compress_block_size,
0, settings.aio_threshold)); 0, settings.aio_threshold));
skip_indices_aggregators.push_back(index_helper->createIndexAggregator()); skip_indices_aggregators.push_back(index_helper->createIndexAggregator());
skip_index_filling.push_back(0); skip_index_accumulated_marks.push_back(0);
} }
} }
@ -221,7 +221,6 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
if (skip_indices_aggregators[i]->empty() && granule.mark_on_start) if (skip_indices_aggregators[i]->empty() && granule.mark_on_start)
{ {
skip_indices_aggregators[i] = index_helper->createIndexAggregator(); skip_indices_aggregators[i] = index_helper->createIndexAggregator();
skip_index_filling[i] = 0;
if (stream.compressed.offset() >= settings.min_compress_block_size) if (stream.compressed.offset() >= settings.min_compress_block_size)
stream.compressed.next(); stream.compressed.next();
@ -238,13 +237,13 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
skip_indices_aggregators[i]->update(skip_indexes_block, &pos, granule.granularity_rows); skip_indices_aggregators[i]->update(skip_indexes_block, &pos, granule.granularity_rows);
if (granule.isCompleted()) if (granule.isCompleted())
{ {
++skip_index_filling[i]; ++skip_index_accumulated_marks[i];
/// write index if it is filled /// write index if it is filled
if (skip_index_filling[i] == index_helper->index.granularity) if (skip_index_accumulated_marks[i] == index_helper->index.granularity)
{ {
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed); skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
skip_index_filling[i] = 0; skip_index_accumulated_marks[i] = 0;
} }
} }
} }
@ -302,7 +301,7 @@ void MergeTreeDataPartWriterOnDisk::finishSkipIndicesSerialization(
skip_indices_streams.clear(); skip_indices_streams.clear();
skip_indices_aggregators.clear(); skip_indices_aggregators.clear();
skip_index_filling.clear(); skip_index_accumulated_marks.clear();
} }
Names MergeTreeDataPartWriterOnDisk::getSkipIndicesColumns() const Names MergeTreeDataPartWriterOnDisk::getSkipIndicesColumns() const

View File

@ -14,20 +14,34 @@
namespace DB namespace DB
{ {
/// Single unit for writing data to disk. Contains information about
/// amount of rows to write and marks.
struct Granule struct Granule
{ {
/// Start row in block for granule
size_t start_row; size_t start_row;
/// Amount of rows which granule have to contain according to index
/// granularity.
/// NOTE: Sometimes it's not equal to actually written rows, for example
/// for the last granule if it's smaller than computed granularity.
size_t granularity_rows; size_t granularity_rows;
/// Amount of rows from block which have to be written to disk from start_row
size_t block_rows; size_t block_rows;
/// Global mark number in the list of all marks (index_granularity) for this part
size_t mark_number; size_t mark_number;
/// Should writer write mark for the first of this granule to disk.
/// NOTE: Sometimes we don't write mark for the start row, because
/// this granule can be continuation of the previous one.
bool mark_on_start; bool mark_on_start;
/// Is this granule contain amout of rows equal to the value in index granularity
bool isCompleted() const bool isCompleted() const
{ {
return granularity_rows == block_rows; return granularity_rows == block_rows;
} }
}; };
/// Multiple granules to write for concrete block.
using Granules = std::vector<Granule>; using Granules = std::vector<Granule>;
/// Writes data part to disk in different formats. /// Writes data part to disk in different formats.
@ -90,19 +104,29 @@ public:
{ {
written_offset_columns = written_offset_columns_; written_offset_columns = written_offset_columns_;
} }
protected: protected:
/// Count index_granularity for block and store in `index_granularity` /// Count index_granularity for block and store in `index_granularity`
size_t computeIndexGranularity(const Block & block) const; size_t computeIndexGranularity(const Block & block) const;
/// Write primary index according to granules_to_write
void calculateAndSerializePrimaryIndex(const Block & primary_index_block, const Granules & granules_to_write); void calculateAndSerializePrimaryIndex(const Block & primary_index_block, const Granules & granules_to_write);
/// Write skip indices according to granules_to_write. Skip indices also have their own marks
/// and one skip index granule can contain multiple "normal" marks. So skip indices serialization
/// require additional state: skip_indices_aggregators and skip_index_accumulated_marks
void calculateAndSerializeSkipIndices(const Block & skip_indexes_block, const Granules & granules_to_write); void calculateAndSerializeSkipIndices(const Block & skip_indexes_block, const Granules & granules_to_write);
/// Finishes primary index serialization: write final primary index row (if required) and compute checksums
void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums, bool sync); void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums, bool sync);
/// Finishes skip indices serialization: write all accumulated data to disk and compute checksums
void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums, bool sync); void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums, bool sync);
/// Get global number of the current which we are writing (or going to start to write)
size_t getCurrentMark() const { return current_mark; } size_t getCurrentMark() const { return current_mark; }
void setCurrentMark(size_t mark) { current_mark = mark; } void setCurrentMark(size_t mark) { current_mark = mark; }
/// Get unique non ordered skip indices column.
Names getSkipIndicesColumns() const; Names getSkipIndicesColumns() const;
const MergeTreeIndices skip_indices; const MergeTreeIndices skip_indices;
@ -113,13 +137,9 @@ protected:
const bool compute_granularity; const bool compute_granularity;
/// Number of marsk in data from which skip indices have to start
/// aggregation. I.e. it's data mark number, not skip indices mark.
size_t skip_index_data_mark = 0;
std::vector<StreamPtr> skip_indices_streams; std::vector<StreamPtr> skip_indices_streams;
MergeTreeIndexAggregators skip_indices_aggregators; MergeTreeIndexAggregators skip_indices_aggregators;
std::vector<size_t> skip_index_filling; std::vector<size_t> skip_index_accumulated_marks;
std::unique_ptr<WriteBufferFromFileBase> index_file_stream; std::unique_ptr<WriteBufferFromFileBase> index_file_stream;
std::unique_ptr<HashingWriteBuffer> index_stream; std::unique_ptr<HashingWriteBuffer> index_stream;

View File

@ -18,6 +18,7 @@ namespace
namespace namespace
{ {
/// Get granules for block using index_granularity
Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity, size_t block_rows, size_t current_mark, size_t rows_written_in_last_mark) Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity, size_t block_rows, size_t current_mark, size_t rows_written_in_last_mark)
{ {
if (current_mark >= index_granularity.getMarksCount()) if (current_mark >= index_granularity.getMarksCount())
@ -25,6 +26,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
Granules result; Granules result;
size_t current_row = 0; size_t current_row = 0;
/// When our last mark is not finished yet and we have to write in rows into it
if (rows_written_in_last_mark > 0) if (rows_written_in_last_mark > 0)
{ {
size_t rows_left_in_last_mark = index_granularity.getMarkRows(current_mark) - rows_written_in_last_mark; size_t rows_left_in_last_mark = index_granularity.getMarkRows(current_mark) - rows_written_in_last_mark;
@ -35,7 +37,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
.granularity_rows = rows_left_in_last_mark, .granularity_rows = rows_left_in_last_mark,
.block_rows = rest_rows, .block_rows = rest_rows,
.mark_number = current_mark, .mark_number = current_mark,
.mark_on_start = false, .mark_on_start = false, /// Don't mark this granule because we have already marked it
}); });
else else
result.emplace_back(Granule{ result.emplace_back(Granule{
@ -43,16 +45,19 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
.granularity_rows = rows_left_in_last_mark, .granularity_rows = rows_left_in_last_mark,
.block_rows = rows_left_in_last_mark, .block_rows = rows_left_in_last_mark,
.mark_number = current_mark, .mark_number = current_mark,
.mark_on_start = false, .mark_on_start = false, /// Don't mark this granule because we have already marked it
}); });
current_row += rows_left_in_last_mark; current_row += rows_left_in_last_mark;
current_mark++; current_mark++;
} }
/// Calculating normal granules for block
while (current_row < block_rows) while (current_row < block_rows)
{ {
size_t expected_rows = index_granularity.getMarkRows(current_mark); size_t expected_rows = index_granularity.getMarkRows(current_mark);
size_t rest_rows = block_rows - current_row; size_t rest_rows = block_rows - current_row;
/// If we have less rows in block than expected in granularity
/// save incomplete granule
if (rest_rows < expected_rows) if (rest_rows < expected_rows)
result.emplace_back(Granule{ result.emplace_back(Granule{
.start_row = current_row, .start_row = current_row,
@ -153,10 +158,14 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_written) void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_written)
{ {
auto last_granule = granules_written.back(); auto last_granule = granules_written.back();
/// If we didn't finished last granule than we will continue to write it from new block
if (!last_granule.isCompleted()) if (!last_granule.isCompleted())
{ {
/// Shift forward except last granule
setCurrentMark(getCurrentMark() + granules_written.size() - 1); setCurrentMark(getCurrentMark() + granules_written.size() - 1);
bool still_in_the_same_granule = granules_written.size() == 1; bool still_in_the_same_granule = granules_written.size() == 1;
/// We wrote whole block in the same granule, but didn't finished it.
/// So add written rows to rows written in last_mark
if (still_in_the_same_granule) if (still_in_the_same_granule)
rows_written_in_last_mark += last_granule.block_rows; rows_written_in_last_mark += last_granule.block_rows;
else else
@ -289,14 +298,12 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
WrittenOffsetColumns & offset_columns, WrittenOffsetColumns & offset_columns,
IDataType::SerializeBinaryBulkStatePtr & serialization_state, IDataType::SerializeBinaryBulkStatePtr & serialization_state,
IDataType::SerializeBinaryBulkSettings & serialize_settings, IDataType::SerializeBinaryBulkSettings & serialize_settings,
size_t from_row, const Granule & granule)
size_t number_of_rows,
bool write_marks)
{ {
if (write_marks) if (granule.mark_on_start)
writeSingleMark(name, type, offset_columns, number_of_rows, serialize_settings.path); writeSingleMark(name, type, offset_columns, granule.granularity_rows, serialize_settings.path);
type.serializeBinaryBulkWithMultipleStreams(column, from_row, number_of_rows, serialize_settings, serialization_state); type.serializeBinaryBulkWithMultipleStreams(column, granule.start_row, granule.granularity_rows, serialize_settings, serialization_state);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one. /// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */) type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
@ -322,6 +329,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
const Granules & granules) const Granules & granules)
{ {
auto [it, inserted] = serialization_states.emplace(name, nullptr); auto [it, inserted] = serialization_states.emplace(name, nullptr);
if (inserted) if (inserted)
{ {
IDataType::SerializeBinaryBulkSettings serialize_settings; IDataType::SerializeBinaryBulkSettings serialize_settings;
@ -347,9 +355,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
offset_columns, offset_columns,
it->second, it->second,
serialize_settings, serialize_settings,
granule.start_row, granule
granule.granularity_rows,
granule.mark_on_start
); );
} }
@ -376,7 +382,6 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
DB::ReadBufferFromFile mrk_in(mrk_path); DB::ReadBufferFromFile mrk_in(mrk_path);
DB::CompressedReadBufferFromFile bin_in(bin_path, 0, 0, 0); DB::CompressedReadBufferFromFile bin_in(bin_path, 0, 0, 0);
bool must_be_last = false; bool must_be_last = false;
//auto * log = &Poco::Logger::get(storage.getLogName());
UInt64 offset_in_compressed_file = 0; UInt64 offset_in_compressed_file = 0;
UInt64 offset_in_decompressed_block = 0; UInt64 offset_in_decompressed_block = 0;
UInt64 index_granularity_rows = 0; UInt64 index_granularity_rows = 0;

View File

@ -32,7 +32,8 @@ public:
void finish(IMergeTreeDataPart::Checksums & checksums, bool sync) final; void finish(IMergeTreeDataPart::Checksums & checksums, bool sync) final;
private: private:
/// Finish serialization of data: write final mark if required and compute checksums
/// Also validate written data in debug mode
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync); void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync);
/// Write data of one column. /// Write data of one column.
@ -45,7 +46,7 @@ private:
WrittenOffsetColumns & offset_columns, WrittenOffsetColumns & offset_columns,
const Granules & granules); const Granules & granules);
/// Write single granule of one column (rows between 2 marks) /// Write single granule of one column.
void writeSingleGranule( void writeSingleGranule(
const String & name, const String & name,
const IDataType & type, const IDataType & type,
@ -53,22 +54,21 @@ private:
WrittenOffsetColumns & offset_columns, WrittenOffsetColumns & offset_columns,
IDataType::SerializeBinaryBulkStatePtr & serialization_state, IDataType::SerializeBinaryBulkStatePtr & serialization_state,
IDataType::SerializeBinaryBulkSettings & serialize_settings, IDataType::SerializeBinaryBulkSettings & serialize_settings,
size_t from_row, const Granule & granule);
size_t number_of_rows,
bool write_marks);
void flushMarkToFile(
const StreamNameAndMark & stream_with_mark,
size_t rows_in_mark);
/// Take offsets from column and return as MarkInCompressed file with stream name
StreamsWithMarks getCurrentMarksForColumn( StreamsWithMarks getCurrentMarksForColumn(
const String & name, const String & name,
const IDataType & type, const IDataType & type,
WrittenOffsetColumns & offset_columns, WrittenOffsetColumns & offset_columns,
DB::IDataType::SubstreamPath & path); DB::IDataType::SubstreamPath & path);
/// Write mark for column /// Write mark to disk using stream and rows count
void flushMarkToFile(
const StreamNameAndMark & stream_with_mark,
size_t rows_in_mark);
/// Write mark for column taking offsets from column stream
void writeSingleMark( void writeSingleMark(
const String & name, const String & name,
const IDataType & type, const IDataType & type,
@ -88,10 +88,15 @@ private:
const ASTPtr & effective_codec_desc, const ASTPtr & effective_codec_desc,
size_t estimated_size); size_t estimated_size);
/// Method for self check (used in debug-build only). Checks that written
/// data and corresponding marks are consistent. Otherwise throws logical
/// errors.
void validateColumnOfFixedSize(const String & name, const IDataType & type); void validateColumnOfFixedSize(const String & name, const IDataType & type);
void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override; void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override;
/// Use information from just written granules to shift current mark
/// in our index_granularity array.
void shiftCurrentMark(const Granules & granules_written); void shiftCurrentMark(const Granules & granules_written);
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns) const; IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns) const;
@ -104,6 +109,8 @@ private:
using ColumnStreams = std::map<String, StreamPtr>; using ColumnStreams = std::map<String, StreamPtr>;
ColumnStreams column_streams; ColumnStreams column_streams;
/// How many rows we have already written in the current mark.
/// More than zero when incoming blocks are smaller then their granularity.
size_t rows_written_in_last_mark = 0; size_t rows_written_in_last_mark = 0;
}; };