#include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int DIRECTORY_ALREADY_EXISTS; } MergeTreeDataPartInMemory::MergeTreeDataPartInMemory( const MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_, const MutableDataPartStoragePtr & data_part_storage_, const IMergeTreeDataPart * parent_part_) : IMergeTreeDataPart(storage_, name_, info_, data_part_storage_, Type::InMemory, parent_part_) { default_codec = CompressionCodecFactory::instance().get("NONE", {}); } IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader( const NamesAndTypesList & columns_to_read, const StorageMetadataPtr & metadata_snapshot, const MarkRanges & mark_ranges, UncompressedCache * /* uncompressed_cache */, MarkCache * /* mark_cache */, const MergeTreeReaderSettings & reader_settings, const ValueSizeMap & /* avg_value_size_hints */, const ReadBufferFromFileBase::ProfileCallback & /* profile_callback */) const { auto read_info = std::make_shared(shared_from_this()); auto ptr = std::static_pointer_cast(shared_from_this()); return std::make_unique( read_info, ptr, columns_to_read, metadata_snapshot, mark_ranges, reader_settings); } IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter( const NamesAndTypesList & columns_list, const StorageMetadataPtr & metadata_snapshot, const std::vector & /* indices_to_recalc */, const CompressionCodecPtr & /* default_codec */, const MergeTreeWriterSettings & writer_settings, const MergeTreeIndexGranularity & /* computed_index_granularity */) { auto ptr = std::static_pointer_cast(shared_from_this()); return std::make_unique( ptr, columns_list, metadata_snapshot, writer_settings); } MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const { auto reservation = storage.reserveSpace(block.bytes(), getDataPartStorage()); VolumePtr volume = storage.getStoragePolicy()->getVolume(0); VolumePtr data_part_volume = createVolumeFromReservation(reservation, volume); auto new_data_part = storage.getDataPartBuilder(name, data_part_volume, new_relative_path) .withPartFormat(storage.choosePartFormatOnDisk(block.bytes(), rows_count)) .build(); auto new_data_part_storage = new_data_part->getDataPartStoragePtr(); new_data_part_storage->beginTransaction(); new_data_part->uuid = uuid; new_data_part->setColumns(columns, {}); new_data_part->partition.value = partition.value; new_data_part->minmax_idx = minmax_idx; if (new_data_part_storage->exists()) { throw Exception( ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Could not flush part {}. Part in {} already exists", quoteString(getDataPartStorage().getFullPath()), new_data_part_storage->getFullPath()); } new_data_part_storage->createDirectories(); auto compression_codec = storage.getContext()->chooseCompressionCodec(0, 0); auto indices = MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices()); MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec, NO_TRANSACTION_PTR); out.write(block); const auto & projections = metadata_snapshot->getProjections(); for (const auto & [projection_name, projection] : projection_parts) { if (projections.has(projection_name)) { auto old_projection_part = asInMemoryPart(projection); auto new_projection_part = new_data_part->getProjectionPartBuilder(projection_name) .withPartFormat(storage.choosePartFormatOnDisk(old_projection_part->block.bytes(), rows_count)) .build(); new_projection_part->is_temp = false; // clean up will be done on parent part new_projection_part->setColumns(projection->getColumns(), {}); auto new_projection_part_storage = new_projection_part->getDataPartStoragePtr(); if (new_projection_part_storage->exists()) { throw Exception( ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Could not flush projection part {}. Projection part in {} already exists", projection_name, new_projection_part_storage->getFullPath()); } new_projection_part_storage->createDirectories(); const auto & desc = projections.get(name); auto projection_compression_codec = storage.getContext()->chooseCompressionCodec(0, 0); auto projection_indices = MergeTreeIndexFactory::instance().getMany(desc.metadata->getSecondaryIndices()); MergedBlockOutputStream projection_out( new_projection_part, desc.metadata, new_projection_part->getColumns(), projection_indices, projection_compression_codec, NO_TRANSACTION_PTR); projection_out.write(old_projection_part->block); projection_out.finalizePart(new_projection_part, false); new_data_part->addProjectionPart(projection_name, std::move(new_projection_part)); } } out.finalizePart(new_data_part, false); new_data_part_storage->commitTransaction(); return new_data_part_storage; } DataPartStoragePtr MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const { String detached_path = *getRelativePathForDetachedPart(prefix, /* broken */ false); return flushToDisk(detached_path, metadata_snapshot); } void MergeTreeDataPartInMemory::renameTo(const String & new_relative_path, bool /* remove_new_dir_if_exists */) { getDataPartStorage().setRelativePath(new_relative_path); } void MergeTreeDataPartInMemory::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const { auto it = checksums.files.find("data.bin"); if (it != checksums.files.end()) total_size.data_uncompressed += it->second.uncompressed_size; for (const auto & column : columns) each_columns_size[column.name].data_uncompressed += block.getByName(column.name).column->byteSize(); } IMergeTreeDataPart::Checksum MergeTreeDataPartInMemory::calculateBlockChecksum() const { SipHash hash; IMergeTreeDataPart::Checksum checksum; for (const auto & column : block) column.column->updateHashFast(hash); checksum.uncompressed_size = block.bytes(); hash.get128(checksum.uncompressed_hash); return checksum; } DataPartInMemoryPtr asInMemoryPart(const MergeTreeDataPartPtr & part) { return std::dynamic_pointer_cast(part); } }