mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Add some comments
This commit is contained in:
parent
74c2211510
commit
8670836573
@ -72,6 +72,7 @@ void MergeTreeDataPartWriterCompact::addStreams(const String & name, const IData
|
||||
namespace
|
||||
{
|
||||
|
||||
/// Get granules for block using index_granularity
|
||||
Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity, size_t block_rows, size_t current_mark, bool last_block)
|
||||
{
|
||||
if (current_mark >= index_granularity.getMarksCount())
|
||||
@ -85,6 +86,8 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
|
||||
size_t rest_rows = block_rows - current_row;
|
||||
if (rest_rows < expected_rows)
|
||||
{
|
||||
/// Invariant: we always have equal amount of rows for block in compact parts because we accumulate them in buffer.
|
||||
/// The only exclusion is the last block, when we cannot accumulate more rows.
|
||||
if (!last_block)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Required to write {} rows, but only {} rows was written for the non last granule", expected_rows, rest_rows);
|
||||
|
||||
@ -98,6 +101,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Normal granule with amount of rows equal to rows in compute granularity
|
||||
result.emplace_back(Granule{
|
||||
.start_row = current_row,
|
||||
.granularity_rows = expected_rows,
|
||||
|
@ -23,13 +23,16 @@ public:
|
||||
void finish(IMergeTreeDataPart::Checksums & checksums, bool sync) override;
|
||||
|
||||
private:
|
||||
|
||||
/// Finish serialization of the data. Flush rows in buffer to disk, compute checksums.
|
||||
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync);
|
||||
|
||||
void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override;
|
||||
|
||||
/// Write block of rows into .bin file and marks in .mrk files
|
||||
void writeDataBlock(const Block & block, const Granules & granules);
|
||||
|
||||
/// Write block of rows into .bin file and marks in .mrk files, primary index in .idx file
|
||||
/// and skip indices in their corresponding files.
|
||||
void writeDataBlockPrimaryIndexAndSkipIndices(const Block & block, const Granules & granules);
|
||||
|
||||
void addToChecksums(MergeTreeDataPartChecksums & checksums);
|
||||
|
@ -167,7 +167,7 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
|
||||
default_codec, settings.max_compress_block_size,
|
||||
0, settings.aio_threshold));
|
||||
skip_indices_aggregators.push_back(index_helper->createIndexAggregator());
|
||||
skip_index_filling.push_back(0);
|
||||
skip_index_accumulated_marks.push_back(0);
|
||||
}
|
||||
}
|
||||
|
||||
@ -221,7 +221,6 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
|
||||
if (skip_indices_aggregators[i]->empty() && granule.mark_on_start)
|
||||
{
|
||||
skip_indices_aggregators[i] = index_helper->createIndexAggregator();
|
||||
skip_index_filling[i] = 0;
|
||||
|
||||
if (stream.compressed.offset() >= settings.min_compress_block_size)
|
||||
stream.compressed.next();
|
||||
@ -238,13 +237,13 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
|
||||
skip_indices_aggregators[i]->update(skip_indexes_block, &pos, granule.granularity_rows);
|
||||
if (granule.isCompleted())
|
||||
{
|
||||
++skip_index_filling[i];
|
||||
++skip_index_accumulated_marks[i];
|
||||
|
||||
/// write index if it is filled
|
||||
if (skip_index_filling[i] == index_helper->index.granularity)
|
||||
if (skip_index_accumulated_marks[i] == index_helper->index.granularity)
|
||||
{
|
||||
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
|
||||
skip_index_filling[i] = 0;
|
||||
skip_index_accumulated_marks[i] = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -302,7 +301,7 @@ void MergeTreeDataPartWriterOnDisk::finishSkipIndicesSerialization(
|
||||
|
||||
skip_indices_streams.clear();
|
||||
skip_indices_aggregators.clear();
|
||||
skip_index_filling.clear();
|
||||
skip_index_accumulated_marks.clear();
|
||||
}
|
||||
|
||||
Names MergeTreeDataPartWriterOnDisk::getSkipIndicesColumns() const
|
||||
|
@ -14,20 +14,34 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Single unit for writing data to disk. Contains information about
|
||||
/// amount of rows to write and marks.
|
||||
struct Granule
|
||||
{
|
||||
/// Start row in block for granule
|
||||
size_t start_row;
|
||||
/// Amount of rows which granule have to contain according to index
|
||||
/// granularity.
|
||||
/// NOTE: Sometimes it's not equal to actually written rows, for example
|
||||
/// for the last granule if it's smaller than computed granularity.
|
||||
size_t granularity_rows;
|
||||
/// Amount of rows from block which have to be written to disk from start_row
|
||||
size_t block_rows;
|
||||
/// Global mark number in the list of all marks (index_granularity) for this part
|
||||
size_t mark_number;
|
||||
/// Should writer write mark for the first of this granule to disk.
|
||||
/// NOTE: Sometimes we don't write mark for the start row, because
|
||||
/// this granule can be continuation of the previous one.
|
||||
bool mark_on_start;
|
||||
|
||||
/// Is this granule contain amout of rows equal to the value in index granularity
|
||||
bool isCompleted() const
|
||||
{
|
||||
return granularity_rows == block_rows;
|
||||
}
|
||||
};
|
||||
|
||||
/// Multiple granules to write for concrete block.
|
||||
using Granules = std::vector<Granule>;
|
||||
|
||||
/// Writes data part to disk in different formats.
|
||||
@ -90,19 +104,29 @@ public:
|
||||
{
|
||||
written_offset_columns = written_offset_columns_;
|
||||
}
|
||||
|
||||
protected:
|
||||
/// Count index_granularity for block and store in `index_granularity`
|
||||
size_t computeIndexGranularity(const Block & block) const;
|
||||
|
||||
/// Write primary index according to granules_to_write
|
||||
void calculateAndSerializePrimaryIndex(const Block & primary_index_block, const Granules & granules_to_write);
|
||||
/// Write skip indices according to granules_to_write. Skip indices also have their own marks
|
||||
/// and one skip index granule can contain multiple "normal" marks. So skip indices serialization
|
||||
/// require additional state: skip_indices_aggregators and skip_index_accumulated_marks
|
||||
void calculateAndSerializeSkipIndices(const Block & skip_indexes_block, const Granules & granules_to_write);
|
||||
|
||||
/// Finishes primary index serialization: write final primary index row (if required) and compute checksums
|
||||
void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums, bool sync);
|
||||
/// Finishes skip indices serialization: write all accumulated data to disk and compute checksums
|
||||
void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums, bool sync);
|
||||
|
||||
/// Get global number of the current which we are writing (or going to start to write)
|
||||
size_t getCurrentMark() const { return current_mark; }
|
||||
|
||||
void setCurrentMark(size_t mark) { current_mark = mark; }
|
||||
|
||||
/// Get unique non ordered skip indices column.
|
||||
Names getSkipIndicesColumns() const;
|
||||
|
||||
const MergeTreeIndices skip_indices;
|
||||
@ -113,13 +137,9 @@ protected:
|
||||
|
||||
const bool compute_granularity;
|
||||
|
||||
/// Number of marsk in data from which skip indices have to start
|
||||
/// aggregation. I.e. it's data mark number, not skip indices mark.
|
||||
size_t skip_index_data_mark = 0;
|
||||
|
||||
std::vector<StreamPtr> skip_indices_streams;
|
||||
MergeTreeIndexAggregators skip_indices_aggregators;
|
||||
std::vector<size_t> skip_index_filling;
|
||||
std::vector<size_t> skip_index_accumulated_marks;
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> index_file_stream;
|
||||
std::unique_ptr<HashingWriteBuffer> index_stream;
|
||||
|
@ -18,6 +18,7 @@ namespace
|
||||
namespace
|
||||
{
|
||||
|
||||
/// Get granules for block using index_granularity
|
||||
Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity, size_t block_rows, size_t current_mark, size_t rows_written_in_last_mark)
|
||||
{
|
||||
if (current_mark >= index_granularity.getMarksCount())
|
||||
@ -25,6 +26,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
|
||||
|
||||
Granules result;
|
||||
size_t current_row = 0;
|
||||
/// When our last mark is not finished yet and we have to write in rows into it
|
||||
if (rows_written_in_last_mark > 0)
|
||||
{
|
||||
size_t rows_left_in_last_mark = index_granularity.getMarkRows(current_mark) - rows_written_in_last_mark;
|
||||
@ -35,7 +37,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
|
||||
.granularity_rows = rows_left_in_last_mark,
|
||||
.block_rows = rest_rows,
|
||||
.mark_number = current_mark,
|
||||
.mark_on_start = false,
|
||||
.mark_on_start = false, /// Don't mark this granule because we have already marked it
|
||||
});
|
||||
else
|
||||
result.emplace_back(Granule{
|
||||
@ -43,16 +45,19 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
|
||||
.granularity_rows = rows_left_in_last_mark,
|
||||
.block_rows = rows_left_in_last_mark,
|
||||
.mark_number = current_mark,
|
||||
.mark_on_start = false,
|
||||
.mark_on_start = false, /// Don't mark this granule because we have already marked it
|
||||
});
|
||||
current_row += rows_left_in_last_mark;
|
||||
current_mark++;
|
||||
}
|
||||
|
||||
/// Calculating normal granules for block
|
||||
while (current_row < block_rows)
|
||||
{
|
||||
size_t expected_rows = index_granularity.getMarkRows(current_mark);
|
||||
size_t rest_rows = block_rows - current_row;
|
||||
/// If we have less rows in block than expected in granularity
|
||||
/// save incomplete granule
|
||||
if (rest_rows < expected_rows)
|
||||
result.emplace_back(Granule{
|
||||
.start_row = current_row,
|
||||
@ -153,10 +158,14 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
|
||||
void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_written)
|
||||
{
|
||||
auto last_granule = granules_written.back();
|
||||
/// If we didn't finished last granule than we will continue to write it from new block
|
||||
if (!last_granule.isCompleted())
|
||||
{
|
||||
/// Shift forward except last granule
|
||||
setCurrentMark(getCurrentMark() + granules_written.size() - 1);
|
||||
bool still_in_the_same_granule = granules_written.size() == 1;
|
||||
/// We wrote whole block in the same granule, but didn't finished it.
|
||||
/// So add written rows to rows written in last_mark
|
||||
if (still_in_the_same_granule)
|
||||
rows_written_in_last_mark += last_granule.block_rows;
|
||||
else
|
||||
@ -289,14 +298,12 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
|
||||
WrittenOffsetColumns & offset_columns,
|
||||
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
|
||||
IDataType::SerializeBinaryBulkSettings & serialize_settings,
|
||||
size_t from_row,
|
||||
size_t number_of_rows,
|
||||
bool write_marks)
|
||||
const Granule & granule)
|
||||
{
|
||||
if (write_marks)
|
||||
writeSingleMark(name, type, offset_columns, number_of_rows, serialize_settings.path);
|
||||
if (granule.mark_on_start)
|
||||
writeSingleMark(name, type, offset_columns, granule.granularity_rows, serialize_settings.path);
|
||||
|
||||
type.serializeBinaryBulkWithMultipleStreams(column, from_row, number_of_rows, serialize_settings, serialization_state);
|
||||
type.serializeBinaryBulkWithMultipleStreams(column, granule.start_row, granule.granularity_rows, serialize_settings, serialization_state);
|
||||
|
||||
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
|
||||
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
|
||||
@ -322,6 +329,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
|
||||
const Granules & granules)
|
||||
{
|
||||
auto [it, inserted] = serialization_states.emplace(name, nullptr);
|
||||
|
||||
if (inserted)
|
||||
{
|
||||
IDataType::SerializeBinaryBulkSettings serialize_settings;
|
||||
@ -347,9 +355,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
|
||||
offset_columns,
|
||||
it->second,
|
||||
serialize_settings,
|
||||
granule.start_row,
|
||||
granule.granularity_rows,
|
||||
granule.mark_on_start
|
||||
granule
|
||||
);
|
||||
}
|
||||
|
||||
@ -376,7 +382,6 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
|
||||
DB::ReadBufferFromFile mrk_in(mrk_path);
|
||||
DB::CompressedReadBufferFromFile bin_in(bin_path, 0, 0, 0);
|
||||
bool must_be_last = false;
|
||||
//auto * log = &Poco::Logger::get(storage.getLogName());
|
||||
UInt64 offset_in_compressed_file = 0;
|
||||
UInt64 offset_in_decompressed_block = 0;
|
||||
UInt64 index_granularity_rows = 0;
|
||||
|
@ -32,7 +32,8 @@ public:
|
||||
void finish(IMergeTreeDataPart::Checksums & checksums, bool sync) final;
|
||||
|
||||
private:
|
||||
|
||||
/// Finish serialization of data: write final mark if required and compute checksums
|
||||
/// Also validate written data in debug mode
|
||||
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync);
|
||||
|
||||
/// Write data of one column.
|
||||
@ -45,7 +46,7 @@ private:
|
||||
WrittenOffsetColumns & offset_columns,
|
||||
const Granules & granules);
|
||||
|
||||
/// Write single granule of one column (rows between 2 marks)
|
||||
/// Write single granule of one column.
|
||||
void writeSingleGranule(
|
||||
const String & name,
|
||||
const IDataType & type,
|
||||
@ -53,22 +54,21 @@ private:
|
||||
WrittenOffsetColumns & offset_columns,
|
||||
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
|
||||
IDataType::SerializeBinaryBulkSettings & serialize_settings,
|
||||
size_t from_row,
|
||||
size_t number_of_rows,
|
||||
bool write_marks);
|
||||
|
||||
|
||||
void flushMarkToFile(
|
||||
const StreamNameAndMark & stream_with_mark,
|
||||
size_t rows_in_mark);
|
||||
const Granule & granule);
|
||||
|
||||
/// Take offsets from column and return as MarkInCompressed file with stream name
|
||||
StreamsWithMarks getCurrentMarksForColumn(
|
||||
const String & name,
|
||||
const IDataType & type,
|
||||
WrittenOffsetColumns & offset_columns,
|
||||
DB::IDataType::SubstreamPath & path);
|
||||
|
||||
/// Write mark for column
|
||||
/// Write mark to disk using stream and rows count
|
||||
void flushMarkToFile(
|
||||
const StreamNameAndMark & stream_with_mark,
|
||||
size_t rows_in_mark);
|
||||
|
||||
/// Write mark for column taking offsets from column stream
|
||||
void writeSingleMark(
|
||||
const String & name,
|
||||
const IDataType & type,
|
||||
@ -88,10 +88,15 @@ private:
|
||||
const ASTPtr & effective_codec_desc,
|
||||
size_t estimated_size);
|
||||
|
||||
/// Method for self check (used in debug-build only). Checks that written
|
||||
/// data and corresponding marks are consistent. Otherwise throws logical
|
||||
/// errors.
|
||||
void validateColumnOfFixedSize(const String & name, const IDataType & type);
|
||||
|
||||
void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override;
|
||||
|
||||
/// Use information from just written granules to shift current mark
|
||||
/// in our index_granularity array.
|
||||
void shiftCurrentMark(const Granules & granules_written);
|
||||
|
||||
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns) const;
|
||||
@ -104,6 +109,8 @@ private:
|
||||
using ColumnStreams = std::map<String, StreamPtr>;
|
||||
ColumnStreams column_streams;
|
||||
|
||||
/// How many rows we have already written in the current mark.
|
||||
/// More than zero when incoming blocks are smaller then their granularity.
|
||||
size_t rows_written_in_last_mark = 0;
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user