ClickHouse/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp

369 lines
14 KiB
C++
Raw Normal View History

2020-04-14 01:26:34 +00:00
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
#include <utility>
2019-10-19 16:49:36 +00:00
namespace DB
{
2020-02-25 18:10:48 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
2019-10-21 15:33:59 +00:00
2019-11-05 11:53:22 +00:00
namespace
{
constexpr auto INDEX_FILE_EXTENSION = ".idx";
}
2019-10-19 16:49:36 +00:00
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk::Stream::finalize()
2019-10-19 16:49:36 +00:00
{
compressed.next();
/// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually.
plain_hashing.next();
2019-10-19 16:49:36 +00:00
marks.next();
plain_file->finalize();
marks_file->finalize();
2019-10-19 16:49:36 +00:00
}
void MergeTreeDataPartWriterOnDisk::Stream::sync() const
2019-10-19 16:49:36 +00:00
{
plain_file->sync();
marks_file->sync();
2019-10-19 16:49:36 +00:00
}
2020-04-14 01:26:34 +00:00
MergeTreeDataPartWriterOnDisk::Stream::Stream(
2019-10-19 16:49:36 +00:00
const String & escaped_column_name_,
DiskPtr disk_,
2019-10-19 16:49:36 +00:00
const String & data_path_,
const std::string & data_file_extension_,
const std::string & marks_path_,
const std::string & marks_file_extension_,
const CompressionCodecPtr & compression_codec_,
size_t max_compress_block_size_,
size_t estimated_size_,
size_t aio_threshold_) :
escaped_column_name(escaped_column_name_),
data_file_extension{data_file_extension_},
marks_file_extension{marks_file_extension_},
plain_file(disk_->writeFile(data_path_ + data_file_extension, max_compress_block_size_, WriteMode::Rewrite, estimated_size_, aio_threshold_)),
2019-10-19 16:49:36 +00:00
plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_codec_), compressed(compressed_buf),
marks_file(disk_->writeFile(marks_path_ + marks_file_extension, 4096, WriteMode::Rewrite)), marks(*marks_file)
2019-10-19 16:49:36 +00:00
{
}
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPart::Checksums & checksums)
2019-10-21 15:33:59 +00:00
{
String name = escaped_column_name;
2019-10-19 16:49:36 +00:00
2019-10-21 15:33:59 +00:00
checksums.files[name + data_file_extension].is_compressed = true;
checksums.files[name + data_file_extension].uncompressed_size = compressed.count();
checksums.files[name + data_file_extension].uncompressed_hash = compressed.getHash();
checksums.files[name + data_file_extension].file_size = plain_hashing.count();
checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash();
2019-10-19 16:49:36 +00:00
2019-10-21 15:33:59 +00:00
checksums.files[name + marks_file_extension].file_size = marks.count();
checksums.files[name + marks_file_extension].file_hash = marks.getHash();
}
2019-10-19 16:49:36 +00:00
2019-11-05 11:53:22 +00:00
2020-04-14 01:26:34 +00:00
MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk(
const MergeTreeData::DataPartPtr & data_part_,
2019-11-05 11:53:22 +00:00
const NamesAndTypesList & columns_list_,
2020-06-26 11:30:23 +00:00
const StorageMetadataPtr & metadata_snapshot_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
2019-11-05 11:53:22 +00:00
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: IMergeTreeDataPartWriter(data_part_,
2020-06-26 11:30:23 +00:00
columns_list_, metadata_snapshot_, indices_to_recalc_,
2020-04-14 01:26:34 +00:00
index_granularity_, settings_)
, part_path(data_part_->getFullRelativePath())
2019-11-05 11:53:22 +00:00
, marks_file_extension(marks_file_extension_)
, default_codec(default_codec_)
2019-11-07 11:11:38 +00:00
, compute_granularity(index_granularity.empty())
2019-11-05 11:53:22 +00:00
{
if (settings.blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
auto disk = data_part->volume->getDisk();
if (!disk->exists(part_path))
disk->createDirectories(part_path);
2019-11-05 11:53:22 +00:00
}
2020-08-08 01:01:47 +00:00
// Implementation is split into static functions for ability
/// of making unit tests without creation instance of IMergeTreeDataPartWriter,
/// which requires a lot of dependencies and access to filesystem.
static size_t computeIndexGranularityImpl(
2019-11-05 11:53:22 +00:00
const Block & block,
size_t index_granularity_bytes,
size_t fixed_index_granularity_rows,
bool blocks_are_granules,
bool can_use_adaptive_index_granularity)
2019-11-05 11:53:22 +00:00
{
size_t rows_in_block = block.rows();
size_t index_granularity_for_block;
if (!can_use_adaptive_index_granularity)
index_granularity_for_block = fixed_index_granularity_rows;
else
{
size_t block_size_in_memory = block.bytes();
if (blocks_are_granules)
index_granularity_for_block = rows_in_block;
else if (block_size_in_memory >= index_granularity_bytes)
{
size_t granules_in_block = block_size_in_memory / index_granularity_bytes;
index_granularity_for_block = rows_in_block / granules_in_block;
}
else
{
size_t size_of_row_in_bytes = block_size_in_memory / rows_in_block;
index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes;
}
}
if (index_granularity_for_block == 0) /// very rare case when index granularity bytes less then single row
index_granularity_for_block = 1;
/// We should be less or equal than fixed index granularity
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
return index_granularity_for_block;
}
static void fillIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity,
size_t index_offset,
size_t index_granularity_for_block,
size_t rows_in_block)
{
for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
index_granularity.appendMark(index_granularity_for_block);
2019-11-05 11:53:22 +00:00
}
size_t MergeTreeDataPartWriterOnDisk::computeIndexGranularity(const Block & block)
2019-11-05 11:53:22 +00:00
{
const auto storage_settings = storage.getSettings();
return computeIndexGranularityImpl(
block,
storage_settings->index_granularity_bytes,
storage_settings->index_granularity,
settings.blocks_are_granules_size,
settings.can_use_adaptive_granularity);
}
void MergeTreeDataPartWriterOnDisk::fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block)
{
2019-11-05 11:53:22 +00:00
fillIndexGranularityImpl(
index_granularity,
getIndexOffset(),
index_granularity_for_block,
rows_in_block);
2019-11-05 11:53:22 +00:00
}
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk::initPrimaryIndex()
2019-11-05 11:53:22 +00:00
{
2020-06-26 11:30:23 +00:00
if (metadata_snapshot->hasPrimaryKey())
2019-11-05 11:53:22 +00:00
{
index_file_stream = data_part->volume->getDisk()->writeFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
2019-11-05 11:53:22 +00:00
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
}
2019-11-08 14:36:10 +00:00
primary_index_initialized = true;
2019-11-05 11:53:22 +00:00
}
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk::initSkipIndices()
2019-11-05 11:53:22 +00:00
{
for (const auto & index_helper : skip_indices)
2019-11-05 11:53:22 +00:00
{
String stream_name = index_helper->getFileName();
2019-11-05 11:53:22 +00:00
skip_indices_streams.emplace_back(
2020-04-14 01:26:34 +00:00
std::make_unique<MergeTreeDataPartWriterOnDisk::Stream>(
2019-11-05 11:53:22 +00:00
stream_name,
data_part->volume->getDisk(),
2019-11-05 11:53:22 +00:00
part_path + stream_name, INDEX_FILE_EXTENSION,
part_path + stream_name, marks_file_extension,
default_codec, settings.max_compress_block_size,
0, settings.aio_threshold));
skip_indices_aggregators.push_back(index_helper->createIndexAggregator());
2019-11-05 11:53:22 +00:00
skip_index_filling.push_back(0);
}
2019-11-08 14:36:10 +00:00
skip_indices_initialized = true;
2019-11-05 11:53:22 +00:00
}
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Block & primary_index_block)
2019-11-05 11:53:22 +00:00
{
2019-11-08 14:36:10 +00:00
if (!primary_index_initialized)
throw Exception("Primary index is not initialized", ErrorCodes::LOGICAL_ERROR);
size_t rows = primary_index_block.rows();
2020-03-09 01:50:33 +00:00
size_t primary_columns_num = primary_index_block.columns();
2019-11-05 11:53:22 +00:00
if (index_columns.empty())
{
2020-03-09 01:50:33 +00:00
index_types = primary_index_block.getDataTypes();
2019-11-05 11:53:22 +00:00
index_columns.resize(primary_columns_num);
last_index_row.resize(primary_columns_num);
for (size_t i = 0; i < primary_columns_num; ++i)
2020-03-09 01:50:33 +00:00
index_columns[i] = primary_index_block.getByPosition(i).column->cloneEmpty();
2019-11-05 11:53:22 +00:00
}
/** While filling index (index_columns), disable memory tracker.
* Because memory is allocated here (maybe in context of INSERT query),
* but then freed in completely different place (while merging parts), where query memory_tracker is not available.
* And otherwise it will look like excessively growing memory consumption in context of query.
* (observed in long INSERT SELECTs)
*/
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
2019-11-05 11:53:22 +00:00
/// Write index. The index contains Primary Key value for each `index_granularity` row.
2019-11-28 20:14:41 +00:00
size_t current_row = getIndexOffset();
size_t total_marks = index_granularity.getMarksCount();
while (index_mark < total_marks && current_row < rows)
2019-11-05 11:53:22 +00:00
{
2020-06-26 11:30:23 +00:00
if (metadata_snapshot->hasPrimaryKey())
2019-11-05 11:53:22 +00:00
{
for (size_t j = 0; j < primary_columns_num; ++j)
{
2020-03-09 01:50:33 +00:00
const auto & primary_column = primary_index_block.getByPosition(j);
index_columns[j]->insertFrom(*primary_column.column, current_row);
primary_column.type->serializeBinary(*primary_column.column, current_row, *index_stream);
2019-11-05 11:53:22 +00:00
}
}
current_row += index_granularity.getMarkRows(index_mark++);
2019-11-05 11:53:22 +00:00
}
/// store last index row to write final mark at the end of column
for (size_t j = 0; j < primary_columns_num; ++j)
{
2020-03-09 01:50:33 +00:00
const IColumn & primary_column = *primary_index_block.getByPosition(j).column.get();
2019-11-05 11:53:22 +00:00
primary_column.get(rows - 1, last_index_row[j]);
}
}
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block & skip_indexes_block)
2019-11-05 11:53:22 +00:00
{
2019-11-08 14:36:10 +00:00
if (!skip_indices_initialized)
throw Exception("Skip indices are not initialized", ErrorCodes::LOGICAL_ERROR);
size_t rows = skip_indexes_block.rows();
2019-11-05 11:53:22 +00:00
size_t skip_index_current_data_mark = 0;
2019-12-27 21:17:53 +00:00
/// Filling and writing skip indices like in MergeTreeDataPartWriterWide::writeColumn
2019-11-05 11:53:22 +00:00
for (size_t i = 0; i < skip_indices.size(); ++i)
{
const auto index_helper = skip_indices[i];
2019-11-05 11:53:22 +00:00
auto & stream = *skip_indices_streams[i];
size_t prev_pos = 0;
skip_index_current_data_mark = skip_index_data_mark;
while (prev_pos < rows)
{
UInt64 limit = 0;
size_t current_index_offset = getIndexOffset();
if (prev_pos == 0 && current_index_offset != 0)
2019-11-05 11:53:22 +00:00
{
limit = current_index_offset;
2019-11-05 11:53:22 +00:00
}
else if (skip_index_current_data_mark == index_granularity.getMarksCount())
{
/// Case, when last granule was exceeded and no new granule was created.
limit = rows - prev_pos;
}
2019-11-05 11:53:22 +00:00
else
{
limit = index_granularity.getMarkRows(skip_index_current_data_mark);
if (skip_indices_aggregators[i]->empty())
{
skip_indices_aggregators[i] = index_helper->createIndexAggregator();
2019-11-05 11:53:22 +00:00
skip_index_filling[i] = 0;
if (stream.compressed.offset() >= settings.min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
/// Actually this numbers is redundant, but we have to store them
/// to be compatible with normal .mrk2 file format
if (settings.can_use_adaptive_granularity)
writeIntBinary(1UL, stream.marks);
}
/// this mark is aggregated, go to the next one
skip_index_current_data_mark++;
}
size_t pos = prev_pos;
skip_indices_aggregators[i]->update(skip_indexes_block, &pos, limit);
if (pos == prev_pos + limit)
{
++skip_index_filling[i];
/// write index if it is filled
if (skip_index_filling[i] == index_helper->index.granularity)
2019-11-05 11:53:22 +00:00
{
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
skip_index_filling[i] = 0;
}
}
prev_pos = pos;
}
}
skip_index_data_mark = skip_index_current_data_mark;
}
void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(
MergeTreeData::DataPart::Checksums & checksums, bool sync)
2019-11-05 11:53:22 +00:00
{
2019-11-20 13:33:41 +00:00
bool write_final_mark = (with_final_mark && data_written);
if (write_final_mark && compute_granularity)
index_granularity.appendMark(0);
2019-11-05 11:53:22 +00:00
if (index_stream)
{
2019-11-20 13:33:41 +00:00
if (write_final_mark)
2019-11-05 11:53:22 +00:00
{
for (size_t j = 0; j < index_columns.size(); ++j)
{
index_columns[j]->insert(last_index_row[j]);
index_types[j]->serializeBinary(last_index_row[j], *index_stream);
}
2019-11-08 14:36:10 +00:00
2019-11-05 11:53:22 +00:00
last_index_row.clear();
}
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();
index_file_stream->finalize();
if (sync)
2020-09-01 15:26:49 +00:00
index_file_stream->sync();
2019-11-05 11:53:22 +00:00
index_stream = nullptr;
}
}
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk::finishSkipIndicesSerialization(
MergeTreeData::DataPart::Checksums & checksums, bool sync)
2019-11-05 11:53:22 +00:00
{
for (size_t i = 0; i < skip_indices.size(); ++i)
{
auto & stream = *skip_indices_streams[i];
if (!skip_indices_aggregators[i]->empty())
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
}
for (auto & stream : skip_indices_streams)
{
stream->finalize();
stream->addToChecksums(checksums);
if (sync)
stream->sync();
2019-11-05 11:53:22 +00:00
}
2019-11-07 11:11:38 +00:00
skip_indices_streams.clear();
2019-11-05 11:53:22 +00:00
skip_indices_aggregators.clear();
skip_index_filling.clear();
}
2019-10-21 15:33:59 +00:00
}