ClickHouse/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp

234 lines
7.8 KiB
C++
Raw Normal View History

2019-10-16 18:27:53 +00:00
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.h>
2020-02-06 15:32:00 +00:00
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
2019-10-16 18:27:53 +00:00
namespace DB
{
2019-10-22 10:50:17 +00:00
MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
const VolumePtr & volume_,
2019-10-22 10:50:17 +00:00
const String & part_path_,
const MergeTreeData & storage_,
const NamesAndTypesList & columns_list_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
2019-10-22 10:50:17 +00:00
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
2019-11-07 11:11:38 +00:00
const MergeTreeIndexGranularity & index_granularity_)
: IMergeTreeDataPartWriter(volume_, part_path_,
2019-10-22 10:50:17 +00:00
storage_, columns_list_,
2019-11-07 11:11:38 +00:00
indices_to_recalc_, marks_file_extension_,
2020-04-26 21:19:25 +00:00
default_codec_, settings_, index_granularity_)
2019-10-22 10:50:17 +00:00
{
2020-02-06 15:32:00 +00:00
using DataPart = MergeTreeDataPartCompact;
String data_file_name = DataPart::DATA_FILE_NAME;
2020-02-19 14:07:36 +00:00
stream = std::make_unique<Stream>(
2019-12-09 21:21:17 +00:00
data_file_name,
volume->getDisk(),
2020-02-06 15:32:00 +00:00
part_path + data_file_name, DataPart::DATA_FILE_EXTENSION,
2019-12-09 21:21:17 +00:00
part_path + data_file_name, marks_file_extension,
2019-10-22 10:50:17 +00:00
default_codec,
settings.max_compress_block_size,
2019-11-05 11:53:22 +00:00
settings.estimated_size,
2019-10-22 10:50:17 +00:00
settings.aio_threshold);
}
2019-11-07 11:11:38 +00:00
void MergeTreeDataPartWriterCompact::write(
2019-10-21 17:23:06 +00:00
const Block & block, const IColumn::Permutation * permutation,
2019-11-07 11:11:38 +00:00
const Block & primary_key_block, const Block & skip_indexes_block)
2019-11-27 11:35:27 +00:00
{
2019-11-27 19:57:07 +00:00
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical merge)
if (compute_granularity)
2020-04-26 21:19:25 +00:00
{
size_t index_granularity_for_block = computeIndexGranularity(block);
fillIndexGranularity(index_granularity_for_block, block.rows());
}
2019-11-27 19:57:07 +00:00
Block result_block;
2019-11-27 11:35:27 +00:00
if (permutation)
{
2019-11-27 19:57:07 +00:00
for (const auto & it : columns_list)
2019-11-27 11:35:27 +00:00
{
2019-11-27 19:57:07 +00:00
if (primary_key_block.has(it.name))
result_block.insert(primary_key_block.getByName(it.name));
else if (skip_indexes_block.has(it.name))
result_block.insert(skip_indexes_block.getByName(it.name));
else
{
auto column = block.getByName(it.name);
column.column = column.column->permute(*permutation, 0);
result_block.insert(column);
}
2019-11-27 11:35:27 +00:00
}
}
2019-11-27 19:57:07 +00:00
else
{
result_block = block;
}
2019-11-27 11:35:27 +00:00
2019-11-28 20:14:41 +00:00
if (!header)
header = result_block.cloneEmpty();
2019-12-27 21:17:53 +00:00
columns_buffer.add(result_block.mutateColumns());
size_t last_mark_rows = index_granularity.getLastMarkRows();
size_t rows_in_buffer = columns_buffer.size();
2019-12-27 21:32:55 +00:00
2019-12-27 21:17:53 +00:00
if (rows_in_buffer < last_mark_rows)
{
2020-01-21 11:56:01 +00:00
/// If it's not enough rows for granule, accumulate blocks
/// and save how much rows we already have.
2019-12-27 21:17:53 +00:00
next_index_offset = last_mark_rows - rows_in_buffer;
2019-11-27 11:35:27 +00:00
return;
2019-12-27 21:17:53 +00:00
}
2019-12-27 21:17:53 +00:00
writeBlock(header.cloneWithColumns(columns_buffer.releaseColumns()));
2019-11-27 11:35:27 +00:00
}
void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
2019-10-16 18:27:53 +00:00
{
size_t total_rows = block.rows();
size_t from_mark = getCurrentMark();
2019-10-16 18:27:53 +00:00
size_t current_row = 0;
while (current_row < total_rows)
{
2019-11-27 11:35:27 +00:00
size_t rows_to_write = index_granularity.getMarkRows(from_mark);
2019-11-07 11:11:38 +00:00
if (rows_to_write)
data_written = true;
2019-12-27 21:17:53 +00:00
for (const auto & column : columns_list)
{
2020-01-15 16:39:29 +00:00
/// There could already be enough data to compress into the new block.
if (stream->compressed.offset() >= settings.min_compress_block_size)
stream->compressed.next();
2019-12-27 21:17:53 +00:00
writeIntBinary(stream->plain_hashing.count(), stream->marks);
writeIntBinary(stream->compressed.offset(), stream->marks);
writeColumnSingleGranule(block.getByName(column.name), current_row, rows_to_write);
}
2019-11-05 11:53:22 +00:00
2019-11-27 11:35:27 +00:00
++from_mark;
2019-12-27 21:17:53 +00:00
size_t rows_written = total_rows - current_row;
current_row += rows_to_write;
2020-01-21 11:56:01 +00:00
/// Correct last mark as it should contain exact amount of rows.
2019-12-27 21:17:53 +00:00
if (current_row >= total_rows && rows_written != rows_to_write)
{
rows_to_write = rows_written;
index_granularity.popMark();
index_granularity.appendMark(rows_written);
}
writeIntBinary(rows_to_write, stream->marks);
2019-10-16 18:27:53 +00:00
}
2019-12-27 21:17:53 +00:00
next_index_offset = 0;
2019-11-07 11:11:38 +00:00
next_mark = from_mark;
2019-10-16 18:27:53 +00:00
}
2019-12-27 21:17:53 +00:00
void MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTypeAndName & column, size_t from_row, size_t number_of_rows) const
2019-10-16 18:27:53 +00:00
{
2019-10-21 00:28:29 +00:00
IDataType::SerializeBinaryBulkStatePtr state;
IDataType::SerializeBinaryBulkSettings serialize_settings;
2019-10-16 18:27:53 +00:00
2019-10-21 15:33:59 +00:00
serialize_settings.getter = [this](IDataType::SubstreamPath) -> WriteBuffer * { return &stream->compressed; };
2019-12-02 15:21:07 +00:00
serialize_settings.position_independent_encoding = true;
2019-10-21 00:28:29 +00:00
serialize_settings.low_cardinality_max_dictionary_size = 0;
2019-10-16 18:27:53 +00:00
2019-10-21 00:28:29 +00:00
column.type->serializeBinaryBulkStatePrefix(serialize_settings, state);
column.type->serializeBinaryBulkWithMultipleStreams(*column.column, from_row, number_of_rows, serialize_settings, state);
column.type->serializeBinaryBulkStateSuffix(serialize_settings, state);
2019-10-16 18:27:53 +00:00
}
2020-04-17 11:59:10 +00:00
void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums)
{
2019-12-27 21:17:53 +00:00
if (columns_buffer.size() != 0)
writeBlock(header.cloneWithColumns(columns_buffer.releaseColumns()));
2019-11-27 11:35:27 +00:00
2019-11-07 11:11:38 +00:00
if (with_final_mark && data_written)
2019-10-21 17:23:06 +00:00
{
for (size_t i = 0; i < columns_list.size(); ++i)
{
writeIntBinary(stream->plain_hashing.count(), stream->marks);
writeIntBinary(stream->compressed.offset(), stream->marks);
}
2019-12-27 21:17:53 +00:00
writeIntBinary(0ULL, stream->marks);
2019-10-21 17:23:06 +00:00
}
stream->finalize();
stream->addToChecksums(checksums);
stream.reset();
}
static void fillIndexGranularityImpl(
2020-04-26 21:19:25 +00:00
MergeTreeIndexGranularity & index_granularity,
size_t index_offset,
2020-04-26 21:19:25 +00:00
size_t index_granularity_for_block,
size_t rows_in_block)
{
for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
{
size_t rows_left_in_block = rows_in_block - current_row;
/// Try to extend last granule if block is large enough
/// or it isn't first in granule (index_offset != 0).
if (rows_left_in_block < index_granularity_for_block &&
(rows_in_block >= index_granularity_for_block || index_offset != 0))
{
// If enough rows are left, create a new granule. Otherwise, extend previous granule.
// So, real size of granule differs from index_granularity_for_block not more than 50%.
if (rows_left_in_block * 2 >= index_granularity_for_block)
index_granularity.appendMark(rows_left_in_block);
else
index_granularity.addRowsToLastMark(rows_left_in_block);
}
else
{
index_granularity.appendMark(index_granularity_for_block);
}
}
}
void MergeTreeDataPartWriterCompact::fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block)
{
fillIndexGranularityImpl(
index_granularity,
getIndexOffset(),
2020-04-26 21:19:25 +00:00
index_granularity_for_block,
rows_in_block);
}
2019-12-27 21:17:53 +00:00
void MergeTreeDataPartWriterCompact::ColumnsBuffer::add(MutableColumns && columns)
{
if (accumulated_columns.empty())
accumulated_columns = std::move(columns);
else
{
for (size_t i = 0; i < columns.size(); ++i)
accumulated_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
}
}
Columns MergeTreeDataPartWriterCompact::ColumnsBuffer::releaseColumns()
{
Columns res(std::make_move_iterator(accumulated_columns.begin()),
std::make_move_iterator(accumulated_columns.end()));
accumulated_columns.clear();
return res;
}
size_t MergeTreeDataPartWriterCompact::ColumnsBuffer::size() const
{
if (accumulated_columns.empty())
return 0;
return accumulated_columns.at(0)->size();
}
2019-10-21 15:33:59 +00:00
}