#include #include #include namespace DB { namespace { // constexpr auto DATA_FILE_EXTENSION = ".bin"; constexpr auto INDEX_FILE_EXTENSION = ".idx"; } void IMergeTreeDataPartWriter::ColumnStream::finalize() { compressed.next(); plain_file->next(); marks.next(); } void IMergeTreeDataPartWriter::ColumnStream::sync() { plain_file->sync(); marks_file.sync(); } IMergeTreeDataPartWriter::ColumnStream::ColumnStream( const String & escaped_column_name_, const String & data_path_, const std::string & data_file_extension_, const std::string & marks_path_, const std::string & marks_file_extension_, const CompressionCodecPtr & compression_codec_, size_t max_compress_block_size_, size_t estimated_size_, size_t aio_threshold_) : escaped_column_name(escaped_column_name_), data_file_extension{data_file_extension_}, marks_file_extension{marks_file_extension_}, plain_file(createWriteBufferFromFileBase(data_path_ + data_file_extension, estimated_size_, aio_threshold_, max_compress_block_size_)), plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_codec_), compressed(compressed_buf), marks_file(marks_path_ + marks_file_extension, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file) { } void IMergeTreeDataPartWriter::ColumnStream::addToChecksums(MergeTreeData::DataPart::Checksums & checksums) { String name = escaped_column_name; checksums.files[name + data_file_extension].is_compressed = true; checksums.files[name + data_file_extension].uncompressed_size = compressed.count(); checksums.files[name + data_file_extension].uncompressed_hash = compressed.getHash(); checksums.files[name + data_file_extension].file_size = plain_hashing.count(); checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash(); checksums.files[name + marks_file_extension].file_size = marks.count(); checksums.files[name + marks_file_extension].file_hash = marks.getHash(); } IMergeTreeDataPartWriter::IMergeTreeDataPartWriter( const String & part_path_, const MergeTreeData & storage_, const NamesAndTypesList & columns_list_, const std::vector & indices_to_recalc_, const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const WriterSettings & settings_, const MergeTreeIndexGranularity & index_granularity_, bool need_finish_last_granule_) : part_path(part_path_) , storage(storage_) , columns_list(columns_list_) , marks_file_extension(marks_file_extension_) , index_granularity(index_granularity_) , default_codec(default_codec_) , skip_indices(indices_to_recalc_) , settings(settings_) , compute_granularity(index_granularity.empty()) , with_final_mark(storage.getSettings()->write_final_mark && settings.can_use_adaptive_granularity) , need_finish_last_granule(need_finish_last_granule_) { if (settings.blocks_are_granules_size && !index_granularity.empty()) throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR); Poco::File part_dir(part_path); if (!part_dir.exists()) part_dir.createDirectories(); } IMergeTreeDataPartWriter::~IMergeTreeDataPartWriter() = default; void fillIndexGranularityImpl( const Block & block, size_t index_granularity_bytes, size_t fixed_index_granularity_rows, bool blocks_are_granules, size_t index_offset, MergeTreeIndexGranularity & index_granularity, bool can_use_adaptive_index_granularity, bool need_finish_last_granule) { /// FIXME correct index granularity for compact size_t rows_in_block = block.rows(); size_t index_granularity_for_block; if (!can_use_adaptive_index_granularity) index_granularity_for_block = fixed_index_granularity_rows; else { size_t block_size_in_memory = block.bytes(); if (blocks_are_granules) index_granularity_for_block = rows_in_block; else if (block_size_in_memory >= index_granularity_bytes) { size_t granules_in_block = block_size_in_memory / index_granularity_bytes; index_granularity_for_block = rows_in_block / granules_in_block; } else { size_t size_of_row_in_bytes = block_size_in_memory / rows_in_block; index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes; } } if (index_granularity_for_block == 0) /// very rare case when index granularity bytes less then single row index_granularity_for_block = 1; /// We should be less or equal than fixed index granularity index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block); size_t current_row; for (current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block) { size_t rows_left_in_block = rows_in_block - current_row; // if (need_finish_last_granule && rows_left_in_block < index_granularity_for_block) // { // /// If enough rows are left, create a new granule. Otherwise, extend previous granule. // /// So,real size of granule differs from index_granularity_for_block not more than 50%. // if (rows_left_in_block * 2 >= index_granularity_for_block) // index_granularity.appendMark(rows_left_in_block); // else // index_granularity.addRowsToLastMark(rows_left_in_block); // } // else // { // index_granularity.appendMark(index_granularity_for_block); // } if (need_finish_last_granule && rows_left_in_block < index_granularity_for_block) index_granularity.appendMark(rows_left_in_block); else index_granularity.appendMark(index_granularity_for_block); } for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) std::cerr << "marks: " << index_granularity.getMarkRows(i) << "\n"; } void IMergeTreeDataPartWriter::fillIndexGranularity(const Block & block) { const auto storage_settings = storage.getSettings(); fillIndexGranularityImpl( block, storage_settings->index_granularity_bytes, storage_settings->index_granularity, settings.blocks_are_granules_size, index_offset, index_granularity, settings.can_use_adaptive_granularity, need_finish_last_granule); } void IMergeTreeDataPartWriter::initPrimaryIndex() { if (storage.hasPrimaryKey()) { index_file_stream = std::make_unique( part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY); index_stream = std::make_unique(*index_file_stream); } primary_index_initialized = true; } void IMergeTreeDataPartWriter::initSkipIndices() { for (const auto & index : skip_indices) { String stream_name = index->getFileName(); skip_indices_streams.emplace_back( std::make_unique( stream_name, part_path + stream_name, INDEX_FILE_EXTENSION, part_path + stream_name, marks_file_extension, default_codec, settings.max_compress_block_size, 0, settings.aio_threshold)); skip_indices_aggregators.push_back(index->createIndexAggregator()); skip_index_filling.push_back(0); } skip_indices_initialized = true; } void IMergeTreeDataPartWriter::calculateAndSerializePrimaryIndex(const Block & primary_keys_block, size_t rows) { if (!primary_index_initialized) throw Exception("Primary index is not initialized", ErrorCodes::LOGICAL_ERROR); size_t primary_columns_num = primary_keys_block.columns(); if (index_columns.empty()) { index_types = primary_keys_block.getDataTypes(); index_columns.resize(primary_columns_num); last_index_row.resize(primary_columns_num); for (size_t i = 0; i < primary_columns_num; ++i) index_columns[i] = primary_keys_block.getByPosition(i).column->cloneEmpty(); } /** While filling index (index_columns), disable memory tracker. * Because memory is allocated here (maybe in context of INSERT query), * but then freed in completely different place (while merging parts), where query memory_tracker is not available. * And otherwise it will look like excessively growing memory consumption in context of query. * (observed in long INSERT SELECTs) */ auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); /// Write index. The index contains Primary Key value for each `index_granularity` row. std::cerr << "writing index...\n"; for (size_t i = index_offset; i < rows;) { if (storage.hasPrimaryKey()) { for (size_t j = 0; j < primary_columns_num; ++j) { const auto & primary_column = primary_keys_block.getByPosition(j); index_columns[j]->insertFrom(*primary_column.column, i); primary_column.type->serializeBinary(*primary_column.column, i, *index_stream); } } std::cerr << "(index) i: " << i << "\n"; std::cerr << "(index) current_mark: " << current_mark << "\n"; std::cerr << "(index) rows in mark: " << index_granularity.getMarkRows(current_mark) << "\n"; i += index_granularity.getMarkRows(current_mark++); if (current_mark >= index_granularity.getMarksCount()) break; } /// store last index row to write final mark at the end of column for (size_t j = 0; j < primary_columns_num; ++j) { const IColumn & primary_column = *primary_keys_block.getByPosition(j).column.get(); primary_column.get(rows - 1, last_index_row[j]); } } void IMergeTreeDataPartWriter::calculateAndSerializeSkipIndices( const Block & skip_indexes_block, size_t rows) { if (!skip_indices_initialized) throw Exception("Skip indices are not initialized", ErrorCodes::LOGICAL_ERROR); size_t skip_index_current_data_mark = 0; /// Filling and writing skip indices like in IMergeTreeDataPartWriter::writeColumn for (size_t i = 0; i < skip_indices.size(); ++i) { const auto index = skip_indices[i]; auto & stream = *skip_indices_streams[i]; size_t prev_pos = 0; skip_index_current_data_mark = skip_index_data_mark; while (prev_pos < rows) { UInt64 limit = 0; if (prev_pos == 0 && index_offset != 0) { limit = index_offset; } else { limit = index_granularity.getMarkRows(skip_index_current_data_mark); if (skip_indices_aggregators[i]->empty()) { skip_indices_aggregators[i] = index->createIndexAggregator(); skip_index_filling[i] = 0; if (stream.compressed.offset() >= settings.min_compress_block_size) stream.compressed.next(); writeIntBinary(stream.plain_hashing.count(), stream.marks); writeIntBinary(stream.compressed.offset(), stream.marks); /// Actually this numbers is redundant, but we have to store them /// to be compatible with normal .mrk2 file format if (settings.can_use_adaptive_granularity) writeIntBinary(1UL, stream.marks); } /// this mark is aggregated, go to the next one skip_index_current_data_mark++; } size_t pos = prev_pos; skip_indices_aggregators[i]->update(skip_indexes_block, &pos, limit); if (pos == prev_pos + limit) { ++skip_index_filling[i]; /// write index if it is filled if (skip_index_filling[i] == index->granularity) { skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed); skip_index_filling[i] = 0; } } prev_pos = pos; } } skip_index_data_mark = skip_index_current_data_mark; } void IMergeTreeDataPartWriter::finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums) { std::cerr << "finishPrimaryIndexSerialization called...\n"; bool write_final_mark = (with_final_mark && data_written); if (write_final_mark && compute_granularity) index_granularity.appendMark(0); if (index_stream) { if (write_final_mark) { for (size_t j = 0; j < index_columns.size(); ++j) { index_columns[j]->insert(last_index_row[j]); index_types[j]->serializeBinary(last_index_row[j], *index_stream); } last_index_row.clear(); } std::cerr << "(finishPrimaryIndexSerialization) marks_count: " << index_granularity.getMarksCount() << "\n"; index_stream->next(); checksums.files["primary.idx"].file_size = index_stream->count(); checksums.files["primary.idx"].file_hash = index_stream->getHash(); index_stream = nullptr; } } void IMergeTreeDataPartWriter::finishSkipIndicesSerialization( MergeTreeData::DataPart::Checksums & checksums) { for (size_t i = 0; i < skip_indices.size(); ++i) { auto & stream = *skip_indices_streams[i]; if (!skip_indices_aggregators[i]->empty()) skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed); } for (auto & stream : skip_indices_streams) { stream->finalize(); stream->addToChecksums(checksums); } skip_indices_streams.clear(); skip_indices_aggregators.clear(); skip_index_filling.clear(); } void IMergeTreeDataPartWriter::next() { current_mark = next_mark; index_offset = next_index_offset; } }