Compilable code!

This commit is contained in:
alesapin 2018-11-30 18:36:10 +03:00
parent f83ed167ac
commit 5524b77915
5 changed files with 252 additions and 113 deletions

View File

@ -114,7 +114,7 @@ Block MergingSortedBlockInputStream::readImpl()
template <typename TSortCursor> template <typename TSortCursor>
void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue) void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue)
{ {
size_t order = current.impl->order; size_t order = current->order;
size_t size = cursors.size(); size_t size = cursors.size();
if (order >= size || &cursors[order] != current.impl) if (order >= size || &cursors[order] != current.impl)
@ -130,6 +130,16 @@ void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current,
} }
} }
namespace {
size_t getAvgGranularity(const std::unordered_map<size_t, size_t> & rows_granularity, size_t total_rows)
{
size_t sum = 0;
for (const auto [granularity, rows_num] : rows_granularity)
sum += granularity * rows_num;
return sum / total_rows;
}
}
template template
void MergingSortedBlockInputStream::fetchNextBlock<SortCursor>(const SortCursor & current, std::priority_queue<SortCursor> & queue); void MergingSortedBlockInputStream::fetchNextBlock<SortCursor>(const SortCursor & current, std::priority_queue<SortCursor> & queue);
@ -142,10 +152,11 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
{ {
size_t merged_rows = 0; size_t merged_rows = 0;
std::unordered_map<size_t, size_t> rows_granularity;
/** Increase row counters. /** Increase row counters.
* Return true if it's time to finish generating the current data block. * Return true if it's time to finish generating the current data block.
*/ */
auto count_row_and_check_limit = [&, this]() auto count_row_and_check_limit = [&, this](size_t current_granularity)
{ {
++total_merged_rows; ++total_merged_rows;
if (limit && total_merged_rows == limit) if (limit && total_merged_rows == limit)
@ -156,8 +167,9 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
return true; return true;
} }
rows_granularity[current_granularity]++;
++merged_rows; ++merged_rows;
if (merged_rows == max_block_size) if (merged_rows == getAvgGranularity(rows_granularity, merged_rows))
{ {
// std::cerr << "max_block_size reached\n"; // std::cerr << "max_block_size reached\n";
return true; return true;
@ -170,6 +182,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
while (!queue.empty()) while (!queue.empty())
{ {
TSortCursor current = queue.top(); TSortCursor current = queue.top();
size_t current_block_granularity = current->rows;
queue.pop(); queue.pop();
while (true) while (true)
@ -177,7 +190,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
/** And what if the block is totally less or equal than the rest for the current cursor? /** And what if the block is totally less or equal than the rest for the current cursor?
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor. * Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
*/ */
if (current.impl->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top()))) if (current->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top())))
{ {
// std::cerr << "current block is totally less or equals\n"; // std::cerr << "current block is totally less or equals\n";
@ -189,8 +202,8 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
return; return;
} }
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) /// Actually, current->order stores source number (i.e. cursors[current->order] == current)
size_t source_num = current.impl->order; size_t source_num = current->order;
if (source_num >= cursors.size()) if (source_num >= cursors.size())
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
@ -202,6 +215,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
merged_rows = merged_columns.at(0)->size(); merged_rows = merged_columns.at(0)->size();
/// Limit output
if (limit && total_merged_rows + merged_rows > limit) if (limit && total_merged_rows + merged_rows > limit)
{ {
merged_rows = limit - total_merged_rows; merged_rows = limit - total_merged_rows;
@ -215,6 +229,8 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
finished = true; finished = true;
} }
/// Write order of rows for other columns
/// this data will be used in grather stream
if (out_row_sources_buf) if (out_row_sources_buf)
{ {
RowSourcePart row_source(source_num); RowSourcePart row_source(source_num);
@ -237,7 +253,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
if (out_row_sources_buf) if (out_row_sources_buf)
{ {
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
RowSourcePart row_source(current.impl->order); RowSourcePart row_source(current->order);
out_row_sources_buf->write(row_source.data); out_row_sources_buf->write(row_source.data);
} }
@ -248,7 +264,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
if (queue.empty() || !(current.greater(queue.top()))) if (queue.empty() || !(current.greater(queue.top())))
{ {
if (count_row_and_check_limit()) if (count_row_and_check_limit(current_block_granularity))
{ {
// std::cerr << "pushing back to queue\n"; // std::cerr << "pushing back to queue\n";
queue.push(current); queue.push(current);
@ -275,7 +291,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
break; break;
} }
if (count_row_and_check_limit()) if (count_row_and_check_limit(current_block_granularity))
return; return;
} }

View File

@ -1248,7 +1248,14 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
*/ */
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets; IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out( MergedColumnOnlyOutputStream out(
*this, in.getHeader(), full_path + part->name + '/', true /* sync */, compression_settings, true /* skip_offsets */, unused_written_offsets); *this,
in.getHeader(),
full_path + part->name + '/',
true /* sync */,
compression_settings,
true /* skip_offsets */,
unused_written_offsets,
part->marks_index_granularity);
in.readPrefix(); in.readPrefix();
out.writePrefix(); out.writePrefix();

View File

@ -700,8 +700,17 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
if (deduplicate) if (deduplicate)
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, SizeLimits(), 0 /*limit_hint*/, Names()); merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, SizeLimits(), 0 /*limit_hint*/, Names());
/// If merge is vertical we cannot calculate it
bool calculate_index_granularity = (merge_alg != MergeAlgorithm::Vertical);
MergedBlockOutputStream to{ MergedBlockOutputStream to{
data, new_part_tmp_path, merging_columns, compression_settings, merged_column_to_size, data.settings.min_merge_bytes_to_use_direct_io}; data,
new_part_tmp_path,
merging_columns,
compression_settings,
merged_column_to_size,
data.settings.min_merge_bytes_to_use_direct_io,
calculate_index_granularity};
merged_stream->readPrefix(); merged_stream->readPrefix();
to.writePrefix(); to.writePrefix();
@ -786,7 +795,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
rows_sources_read_buf.seek(0, 0); rows_sources_read_buf.seek(0, 0);
ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf); ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf);
MergedColumnOnlyOutputStream column_to( MergedColumnOnlyOutputStream column_to(
data, column_gathered_stream.getHeader(), new_part_tmp_path, false, compression_settings, false, written_offset_columns); data, column_gathered_stream.getHeader(), new_part_tmp_path, false, compression_settings, false, written_offset_columns, to.getIndexGranularity());
size_t column_elems_written = 0; size_t column_elems_written = 0;
column_to.writePrefix(); column_to.writePrefix();
@ -963,7 +972,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets; IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out( MergedColumnOnlyOutputStream out(
data, in_header, new_part_tmp_path, /* sync = */ false, compression_settings, /* skip_offsets = */ false, unused_written_offsets); data, in_header, new_part_tmp_path, /* sync = */ false, compression_settings, /* skip_offsets = */ false, unused_written_offsets, source_part->marks_index_granularity);
in->readPrefix(); in->readPrefix();
out.writePrefix(); out.writePrefix();
@ -1002,6 +1011,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
new_data_part->rows_count = source_part->rows_count; new_data_part->rows_count = source_part->rows_count;
new_data_part->marks_count = source_part->marks_count; new_data_part->marks_count = source_part->marks_count;
new_data_part->marks_index_granularity = source_part->marks_index_granularity;
new_data_part->index = source_part->index; new_data_part->index = source_part->index;
new_data_part->partition.assign(source_part->partition); new_data_part->partition.assign(source_part->partition);
new_data_part->minmax_idx = source_part->minmax_idx; new_data_part->minmax_idx = source_part->minmax_idx;

View File

@ -32,16 +32,18 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
size_t max_compress_block_size_, size_t max_compress_block_size_,
CompressionSettings compression_settings_, CompressionSettings compression_settings_,
size_t aio_threshold_, size_t aio_threshold_,
bool blocks_are_granules_size_,
const std::vector<size_t> & index_granularity_) const std::vector<size_t> & index_granularity_)
: storage(storage_), : storage(storage_)
min_compress_block_size(min_compress_block_size_), , min_compress_block_size(min_compress_block_size_)
max_compress_block_size(max_compress_block_size_), , max_compress_block_size(max_compress_block_size_)
aio_threshold(aio_threshold_), , aio_threshold(aio_threshold_)
compression_settings(compression_settings_), , compression_settings(compression_settings_)
marks_file_extension(storage.index_granularity_bytes == 0 ? FIXED_MARKS_FILE_EXTENSION : ADAPTIVE_MARKS_FILE_EXTENSION), , index_granularity(index_granularity_)
mark_size_in_bytes(storage.index_granularity_bytes == 0 ? FIXED_MARK_BYTE_SIZE : ADAPTIVE_MARK_BYTE_SIZE), , marks_file_extension(storage.index_granularity_bytes == 0 ? FIXED_MARKS_FILE_EXTENSION : ADAPTIVE_MARKS_FILE_EXTENSION)
index_granularity(index_granularity_), , mark_size_in_bytes(storage.index_granularity_bytes == 0 ? FIXED_MARK_BYTE_SIZE : ADAPTIVE_MARK_BYTE_SIZE)
compute_granularity_unknown(index_granularity.empty()) , blocks_are_granules_size(blocks_are_granules_size_)
, compute_granularity(index_granularity.empty())
{ {
} }
@ -98,73 +100,43 @@ IDataType::OutputStreamGetter IMergedBlockOutputStream::createStreamGetter(
}; };
} }
size_t IMergedBlockOutputStream::getBlockIndexGranularity(const Block & block) const void IMergedBlockOutputStream::fillIndexGranularity(const Block & block)
{ {
size_t rows = block.rows();
size_t index_granularity_for_block;
if (storage.index_granularity_bytes == 0) if (storage.index_granularity_bytes == 0)
return storage.index_granularity; index_granularity_for_block = storage.index_granularity;
else
size_t block_size_in_memory = block.allocatedBytes(); {
return std::max(block_size_in_memory / storage.index_granularity_bytes, 1); size_t block_size_in_memory = block.allocatedBytes();
if (blocks_are_granules_size)
index_granularity_for_block = rows;
else
index_granularity_for_block = std::max(block_size_in_memory / storage.index_granularity_bytes, 1);
}
size_t current_row = 0;
while (current_row < rows)
{
index_granularity.push_back(index_granularity_for_block);
++current_row;
}
} }
size_t IMergedBlockOutputStream::writeSingleGranule(
void IMergedBlockOutputStream::writeData(
const String & name, const String & name,
const IDataType & type, const IDataType & type,
const IColumn & column, const IColumn & column,
WrittenOffsetColumns & offset_columns, WrittenOffsetColumns & offset_columns,
bool skip_offsets, bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state, IDataType::SerializeBinaryBulkStatePtr & serialization_state,
size_t index_granularity) IDataType::SerializeBinaryBulkSettings & serialize_settings,
size_t from_row,
size_t number_of_rows,
bool write_marks)
{ {
auto & settings = storage.context.getSettingsRef(); if (write_marks)
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = createStreamGetter(name, offset_columns, skip_offsets);
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
size_t size = column.size();
size_t prev_mark = 0;
while (prev_mark < size)
{ {
size_t limit = 0; /// Write marks.
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
if (prev_mark == 0 && index_offset != 0)
limit = index_offset;
else
{
limit = index_granularity;
/// Write marks.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return;
ColumnStream & stream = *column_streams[stream_name];
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
if (stream.marks_file_extension != FIXED_MARKS_FILE_EXTENSION)
writeIntBinary(index_granularity, stream.marks);
}, serialize_settings.path);
}
type.serializeBinaryBulkWithMultipleStreams(column, prev_mark, limit, serialize_settings, serialization_state);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path) type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{ {
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes; bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
@ -177,10 +149,93 @@ void IMergedBlockOutputStream::writeData(
if (is_offsets && offset_columns.count(stream_name)) if (is_offsets && offset_columns.count(stream_name))
return; return;
column_streams[stream_name]->compressed.nextIfAtEnd(); ColumnStream & stream = *column_streams[stream_name];
}, serialize_settings.path);
prev_mark += limit; /// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
if (stream.marks_file_extension != FIXED_MARKS_FILE_EXTENSION)
writeIntBinary(index_granularity, stream.marks);
}, serialize_settings.path);
}
type.serializeBinaryBulkWithMultipleStreams(column, from_row, number_of_rows, serialize_settings, serialization_state);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return;
column_streams[stream_name]->compressed.nextIfAtEnd();
}, serialize_settings.path);
return from_row + number_of_rows;
}
std::pair<size_t, size_t> IMergedBlockOutputStream::writeColumn(
const String & name,
const IDataType & type,
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
size_t from_mark)
{
auto & settings = storage.context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = createStreamGetter(name, offset_columns, skip_offsets);
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
size_t total_rows = column.size();
size_t current_row = 0;
size_t current_column_mark = from_mark;
while (current_row < total_rows)
{
size_t rows_to_write;
bool write_marks = true;
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
if (current_row == 0 && index_offset != 0)
{
write_marks = false;
rows_to_write = index_offset;
}
else
{
if (index_granularity.size() <= current_column_mark)
throw Exception(
"Incorrect size of index granularity expect mark " + toString(current_column_mark) + " totally have marks " + toString(index_granularity.size()),
ErrorCodes::LOGICAL_ERROR);
rows_to_write = index_granularity[current_column_mark];
}
current_row = writeSingleGranule(
name,
type,
column,
offset_columns,
skip_offsets,
serialization_state,
serialize_settings,
current_row,
rows_to_write,
write_marks
);
current_column_mark++;
} }
/// Memoize offsets for Nested types, that are already written. They will not be written again for next columns of Nested structure. /// Memoize offsets for Nested types, that are already written. They will not be written again for next columns of Nested structure.
@ -193,6 +248,8 @@ void IMergedBlockOutputStream::writeData(
offset_columns.insert(stream_name); offset_columns.insert(stream_name);
} }
}, serialize_settings.path); }, serialize_settings.path);
return std::make_pair(current_column_mark - from_mark - 1, current_row - total_rows);
} }
@ -251,11 +308,15 @@ MergedBlockOutputStream::MergedBlockOutputStream(
MergeTreeData & storage_, MergeTreeData & storage_,
String part_path_, String part_path_,
const NamesAndTypesList & columns_list_, const NamesAndTypesList & columns_list_,
CompressionSettings compression_settings) CompressionSettings compression_settings,
bool blocks_are_granules_size_,
const std::vector<size_t> & index_granularity_)
: IMergedBlockOutputStream( : IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size, storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_settings, storage_.context.getSettings().max_compress_block_size, compression_settings,
storage_.context.getSettings().min_bytes_to_use_direct_io), storage_.context.getSettings().min_bytes_to_use_direct_io,
blocks_are_granules_size_,
index_granularity_),
columns_list(columns_list_), part_path(part_path_) columns_list(columns_list_), part_path(part_path_)
{ {
init(); init();
@ -269,11 +330,13 @@ MergedBlockOutputStream::MergedBlockOutputStream(
const NamesAndTypesList & columns_list_, const NamesAndTypesList & columns_list_,
CompressionSettings compression_settings, CompressionSettings compression_settings,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_, const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_,
size_t aio_threshold_) size_t aio_threshold_,
bool blocks_are_granules_size_,
const std::vector<size_t> & index_granularity_)
: IMergedBlockOutputStream( : IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size, storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_settings, storage_.context.getSettings().max_compress_block_size, compression_settings,
aio_threshold_), aio_threshold_, blocks_are_granules_size_, index_granularity_),
columns_list(columns_list_), part_path(part_path_) columns_list(columns_list_), part_path(part_path_)
{ {
init(); init();
@ -394,14 +457,14 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
} }
new_part->rows_count = rows_count; new_part->rows_count = rows_count;
new_part->marks_count = marks_count; new_part->marks_count = current_mark;
new_part->modification_time = time(nullptr); new_part->modification_time = time(nullptr);
new_part->columns = *total_column_list; new_part->columns = *total_column_list;
new_part->index.assign(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end())); new_part->index.assign(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end()));
new_part->checksums = checksums; new_part->checksums = checksums;
new_part->bytes_on_disk = MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_part->getFullPath()); new_part->bytes_on_disk = MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_part->getFullPath());
new_part->marks_file_extension = marks_file_extension; new_part->marks_file_extension = marks_file_extension;
new_part->marks_index_granularity.swap(marks_index_granularity); new_part->marks_index_granularity.swap(index_granularity);
new_part->mark_size_in_bytes = mark_size_in_bytes; new_part->mark_size_in_bytes = mark_size_in_bytes;
} }
@ -422,7 +485,12 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
{ {
block.checkNumberOfRows(); block.checkNumberOfRows();
size_t rows = block.rows(); size_t rows = block.rows();
size_t current_block_index_granularity = getBlockIndexGranularity(block);
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical merge)
if (compute_granularity)
fillIndexGranularity(block);
/// The set of written offset columns so that you do not write shared offsets of nested structures columns several times /// The set of written offset columns so that you do not write shared offsets of nested structures columns several times
WrittenOffsetColumns offset_columns; WrittenOffsetColumns offset_columns;
@ -468,6 +536,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
} }
} }
size_t new_index_offset = 0;
/// Now write the data. /// Now write the data.
auto it = columns_list.begin(); auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it) for (size_t i = 0; i < columns_list.size(); ++i, ++it)
@ -480,18 +549,18 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
if (primary_columns_name_to_position.end() != primary_column_it) if (primary_columns_name_to_position.end() != primary_column_it)
{ {
auto & primary_column = *primary_columns[primary_column_it->second].column; auto & primary_column = *primary_columns[primary_column_it->second].column;
writeData(column.name, *column.type, primary_column, offset_columns, false, serialization_states[i], current_block_index_granularity); std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, primary_column, offset_columns, false, serialization_states[i], current_mark);
} }
else else
{ {
/// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM. /// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM.
ColumnPtr permuted_column = column.column->permute(*permutation, 0); ColumnPtr permuted_column = column.column->permute(*permutation, 0);
writeData(column.name, *column.type, *permuted_column, offset_columns, false, serialization_states[i], current_block_index_granularity); std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, *permuted_column, offset_columns, false, serialization_states[i], current_mark);
} }
} }
else else
{ {
writeData(column.name, *column.type, *column.column, offset_columns, false, serialization_states[i], current_block_index_granularity); std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, false, serialization_states[i], current_mark);
} }
} }
@ -507,7 +576,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
/// Write index. The index contains Primary Key value for each `index_granularity` row. /// Write index. The index contains Primary Key value for each `index_granularity` row.
for (size_t i = index_offset; i < rows; i += current_block_index_granularity) for (size_t i = index_offset; i < rows; i += index_granularity[current_mark])
{ {
if (storage.hasPrimaryKey()) if (storage.hasPrimaryKey())
{ {
@ -519,13 +588,11 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
} }
} }
marks_index_granularity.push_back(current_block_index_granularity); ++current_mark;
++marks_count;
} }
} }
size_t written_for_last_mark = (current_block_index_granularity - index_offset + rows) % current_block_index_granularity; index_offset = new_index_offset;
index_offset = (current_block_index_granularity - written_for_last_mark) % current_block_index_granularity;
} }
@ -534,14 +601,19 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_, MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionSettings compression_settings, bool skip_offsets_, CompressionSettings compression_settings, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns) WrittenOffsetColumns & already_written_offset_columns,
const std::vector<size_t> & index_granularity_)
: IMergedBlockOutputStream( : IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size, storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_settings, storage_.context.getSettings().max_compress_block_size, compression_settings,
storage_.context.getSettings().min_bytes_to_use_direct_io), storage_.context.getSettings().min_bytes_to_use_direct_io,
false,
index_granularity_),
header(header_), part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_), header(header_), part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_),
already_written_offset_columns(already_written_offset_columns) already_written_offset_columns(already_written_offset_columns)
{ {
if (index_granularity.empty())
throw Exception("Can't write column without information about part index granularity", ErrorCodes::LOGICAL_ERROR);
} }
void MergedColumnOnlyOutputStream::write(const Block & block) void MergedColumnOnlyOutputStream::write(const Block & block)
@ -567,17 +639,17 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
initialized = true; initialized = true;
} }
size_t rows = block.rows(); size_t new_index_offset = 0;
size_t new_current_mark = 0;
WrittenOffsetColumns offset_columns = already_written_offset_columns; WrittenOffsetColumns offset_columns = already_written_offset_columns;
for (size_t i = 0; i < block.columns(); ++i) for (size_t i = 0; i < block.columns(); ++i)
{ {
const ColumnWithTypeAndName & column = block.safeGetByPosition(i); const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
writeData(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], storage.index_granularity); std::tie(new_current_mark, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], current_mark);
} }
// size_t written_for_last_mark = (current_block_index_granularity - index_offset + rows) % current_block_index_granularity; index_offset = new_index_offset;
// index_offset = (current_block_index_granularity - written_for_last_mark) % current_block_index_granularity; current_mark = new_current_mark;
} }
void MergedColumnOnlyOutputStream::writeSuffix() void MergedColumnOnlyOutputStream::writeSuffix()

View File

@ -22,7 +22,8 @@ public:
size_t max_compress_block_size_, size_t max_compress_block_size_,
CompressionSettings compression_settings_, CompressionSettings compression_settings_,
size_t aio_threshold_, size_t aio_threshold_,
const std::vector<size_t> & index_granularity_ = {}); bool blocks_are_granules_size_,
const std::vector<size_t> & index_granularity_);
using WrittenOffsetColumns = std::set<std::string>; using WrittenOffsetColumns = std::set<std::string>;
@ -72,10 +73,31 @@ protected:
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets); IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);
/// Write data of one column. /// Write data of one column.
void writeData(const String & name, const IDataType & type, const IColumn & column, WrittenOffsetColumns & offset_columns, /// Return how many marks were written and
bool skip_offsets, IDataType::SerializeBinaryBulkStatePtr & serialization_state, size_t index_granularity); /// how many rows were written for last mark
std::pair<size_t, size_t> writeColumn(
const String & name,
const IDataType & type,
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
size_t from_mark
);
size_t getBlockIndexGranularity(const Block & block) const; size_t writeSingleGranule(
const String & name,
const IDataType & type,
const IColumn & column,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
IDataType::SerializeBinaryBulkSettings & serialize_settings,
size_t from_row,
size_t number_of_rows,
bool write_marks);
void fillIndexGranularity(const Block & block);
MergeTreeData & storage; MergeTreeData & storage;
@ -89,14 +111,17 @@ protected:
size_t aio_threshold; size_t aio_threshold;
size_t current_mark = 0;
CompressionSettings compression_settings; CompressionSettings compression_settings;
const std::string marks_file_extension;
const size_t mark_size_in_bytes;
const bool blocks_are_granules_size;
std::vector<size_t> index_granularity; std::vector<size_t> index_granularity;
std::string marks_file_extension; const bool compute_granularity;
size_t mark_size_in_bytes;
bool compute_granularity_unknown;
}; };
@ -110,7 +135,9 @@ public:
MergeTreeData & storage_, MergeTreeData & storage_,
String part_path_, String part_path_,
const NamesAndTypesList & columns_list_, const NamesAndTypesList & columns_list_,
CompressionSettings compression_settings); CompressionSettings compression_settings,
bool blocks_are_granules_size_ = false,
const std::vector<size_t> & index_granularity_ = {});
MergedBlockOutputStream( MergedBlockOutputStream(
MergeTreeData & storage_, MergeTreeData & storage_,
@ -118,7 +145,9 @@ public:
const NamesAndTypesList & columns_list_, const NamesAndTypesList & columns_list_,
CompressionSettings compression_settings, CompressionSettings compression_settings,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_, const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_,
size_t aio_threshold_); size_t aio_threshold_,
bool blocks_are_granules_size_ = false,
const std::vector<size_t> & index_granularity_ = {});
std::string getPartPath() const; std::string getPartPath() const;
@ -140,6 +169,11 @@ public:
const NamesAndTypesList * total_columns_list = nullptr, const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr); MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
const std::vector<size_t> & getIndexGranularity() const
{
return index_granularity;
}
private: private:
void init(); void init();
@ -154,7 +188,6 @@ private:
String part_path; String part_path;
size_t rows_count = 0; size_t rows_count = 0;
size_t marks_count = 0;
std::unique_ptr<WriteBufferFromFile> index_file_stream; std::unique_ptr<WriteBufferFromFile> index_file_stream;
std::unique_ptr<HashingWriteBuffer> index_stream; std::unique_ptr<HashingWriteBuffer> index_stream;
@ -172,7 +205,8 @@ public:
MergedColumnOnlyOutputStream( MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_, MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionSettings compression_settings, bool skip_offsets_, CompressionSettings compression_settings, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns); WrittenOffsetColumns & already_written_offset_columns,
const std::vector<size_t> & index_granularity_);
Block getHeader() const override { return header; } Block getHeader() const override { return header; }
void write(const Block & block) override; void write(const Block & block) override;