diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h index d18b31edc72..3e3496c88da 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h @@ -107,7 +107,7 @@ public: void initSkipIndices(); void initPrimaryIndex(); - virtual void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync = false) = 0; + virtual void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) = 0; void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums); void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums); diff --git a/src/Storages/MergeTree/IMergedBlockOutputStream.h b/src/Storages/MergeTree/IMergedBlockOutputStream.h index ba04d7fa71b..7b808ef6784 100644 --- a/src/Storages/MergeTree/IMergedBlockOutputStream.h +++ b/src/Storages/MergeTree/IMergedBlockOutputStream.h @@ -25,7 +25,7 @@ public: protected: using SerializationState = IDataType::SerializeBinaryBulkStatePtr; - IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets); + IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns); /// Remove all columns marked expired in data_part. Also, clears checksums /// and columns array. Return set of removed files names. diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index c10a6c6dd59..8bc871476ed 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -876,9 +876,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor MergedColumnOnlyOutputStream column_to( new_data_part, column_gathered_stream.getHeader(), - false, compression_codec, - false, /// we don't need to recalc indices here /// because all of them were already recalculated and written /// as key part of vertical merge @@ -1588,9 +1586,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns( MergedColumnOnlyOutputStream out( new_data_part, mutation_header, - /* sync = */ false, compression_codec, - /* skip_offsets = */ false, std::vector(indices_to_recalc.begin(), indices_to_recalc.end()), nullptr, source_part->index_granularity, diff --git a/src/Storages/MergeTree/MergeTreeDataPartCompact.h b/src/Storages/MergeTree/MergeTreeDataPartCompact.h index 12b2e106284..fa98a2c863f 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartCompact.h @@ -20,7 +20,6 @@ class MergeTreeDataPartCompact : public IMergeTreeDataPart public: static constexpr auto DATA_FILE_NAME = "data"; static constexpr auto DATA_FILE_EXTENSION = ".bin"; - static constexpr auto TEMP_FILE_SUFFIX = "_temp"; static constexpr auto DATA_FILE_NAME_WITH_EXTENSION = "data.bin"; MergeTreeDataPartCompact( diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index bd507866e9b..e33d4a97cac 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -22,8 +22,6 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact( { using DataPart = MergeTreeDataPartCompact; String data_file_name = DataPart::DATA_FILE_NAME; - if (settings.is_writing_temp_files) - data_file_name += DataPart::TEMP_FILE_SUFFIX; stream = std::make_unique( data_file_name, @@ -145,7 +143,7 @@ void MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTy column.type->serializeBinaryBulkStateSuffix(serialize_settings, state); } -void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) +void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) { if (columns_buffer.size() != 0) writeBlock(header.cloneWithColumns(columns_buffer.releaseColumns())); @@ -161,8 +159,6 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart: } stream->finalize(); - if (sync) - stream->sync(); stream->addToChecksums(checksums); stream.reset(); } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h index 598a4dd47fb..0aff55588aa 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h @@ -21,7 +21,7 @@ public: void write(const Block & block, const IColumn::Permutation * permutation, const Block & primary_key_block, const Block & skip_indexes_block) override; - void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) override; + void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) override; private: /// Write single granule of one column (rows between 2 marks) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 5c1c6e35a20..1e5640b4e23 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -39,9 +39,6 @@ void MergeTreeDataPartWriterWide::addStreams( { IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path) { - if (settings.skip_offsets && !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes) - return; - String stream_name = IDataType::getFileNameForStream(name, substream_path); /// Shared offsets for Nested type. if (column_streams.count(stream_name)) @@ -69,8 +66,6 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter( return [&, this] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer * { bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes; - if (is_offsets && settings.skip_offsets) - return nullptr; String stream_name = IDataType::getFileNameForStream(name, substream_path); @@ -135,8 +130,6 @@ void MergeTreeDataPartWriterWide::writeSingleMark( type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path) { bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes; - if (is_offsets && settings.skip_offsets) - return; String stream_name = IDataType::getFileNameForStream(name, substream_path); @@ -177,8 +170,6 @@ size_t MergeTreeDataPartWriterWide::writeSingleGranule( type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path) { bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes; - if (is_offsets && settings.skip_offsets) - return; String stream_name = IDataType::getFileNameForStream(name, substream_path); @@ -270,7 +261,7 @@ void MergeTreeDataPartWriterWide::writeColumn( next_index_offset = current_row - total_rows; } -void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) +void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) { const auto & global_settings = storage.global_context.getSettingsRef(); IDataType::SerializeBinaryBulkSettings serialize_settings; @@ -300,8 +291,6 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch for (auto & stream : column_streams) { stream.second->finalize(); - if (sync) - stream.second->sync(); stream.second->addToChecksums(checksums); } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index 95e43cd31af..4e4f4806d53 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -24,7 +24,7 @@ public: void write(const Block & block, const IColumn::Permutation * permutation, const Block & primary_key_block, const Block & skip_indexes_block) override; - void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) override; + void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) override; IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns); diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.h b/src/Storages/MergeTree/MergeTreeIOSettings.h index 5d3b2945d47..f5c57659052 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.h +++ b/src/Storages/MergeTree/MergeTreeIOSettings.h @@ -30,10 +30,7 @@ struct MergeTreeWriterSettings size_t aio_threshold; bool can_use_adaptive_granularity; bool blocks_are_granules_size; - /// true if we write temporary files during alter. - bool is_writing_temp_files = false; + size_t estimated_size = 0; - /// used when ALTERing columns if we know that array offsets are not altered. - bool skip_offsets = false; }; } diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp index 77bff8e4f02..892b4eccfbc 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp @@ -8,15 +8,14 @@ namespace ErrorCodes } MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( - const MergeTreeDataPartPtr & data_part, const Block & header_, bool sync_, - CompressionCodecPtr default_codec, bool skip_offsets_, + const MergeTreeDataPartPtr & data_part, + const Block & header_, + CompressionCodecPtr default_codec, const std::vector & indices_to_recalc, WrittenOffsetColumns * offset_columns_, const MergeTreeIndexGranularity & index_granularity, - const MergeTreeIndexGranularityInfo * index_granularity_info, - bool is_writing_temp_files) - : IMergedBlockOutputStream(data_part), - header(header_), sync(sync_) + const MergeTreeIndexGranularityInfo * index_granularity_info) + : IMergedBlockOutputStream(data_part), header(header_) { const auto & global_settings = data_part->storage.global_context.getSettings(); MergeTreeWriterSettings writer_settings( @@ -24,11 +23,13 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity(), global_settings.min_bytes_to_use_direct_io); - writer_settings.is_writing_temp_files = is_writing_temp_files; - writer_settings.skip_offsets = skip_offsets_; + writer = data_part->getWriter( + header.getNamesAndTypesList(), + indices_to_recalc, + default_codec, + std::move(writer_settings), + index_granularity); - writer = data_part->getWriter(header.getNamesAndTypesList(), indices_to_recalc, - default_codec,std::move(writer_settings), index_granularity); writer->setWrittenOffsetColumns(offset_columns_); writer->initSkipIndices(); } @@ -62,7 +63,7 @@ MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums(MergeTreeData::MutableD { /// Finish columns serialization. MergeTreeData::DataPart::Checksums checksums; - writer->finishDataSerialization(checksums, sync); + writer->finishDataSerialization(checksums); writer->finishSkipIndicesSerialization(checksums); auto columns = new_part->getColumns(); diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h index c785bbaf6d0..2c5024bbcfe 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h @@ -11,17 +11,16 @@ class MergeTreeDataPartWriterWide; class MergedColumnOnlyOutputStream final : public IMergedBlockOutputStream { public: - /// skip_offsets: used when ALTERing columns if we know that array offsets are not altered. /// Pass empty 'already_written_offset_columns' first time then and pass the same object to subsequent instances of MergedColumnOnlyOutputStream /// if you want to serialize elements of Nested data structure in different instances of MergedColumnOnlyOutputStream. MergedColumnOnlyOutputStream( - const MergeTreeDataPartPtr & data_part, const Block & header_, bool sync_, - CompressionCodecPtr default_codec_, bool skip_offsets_, + const MergeTreeDataPartPtr & data_part, + const Block & header_, + CompressionCodecPtr default_codec_, const std::vector & indices_to_recalc_, WrittenOffsetColumns * offset_columns_ = nullptr, const MergeTreeIndexGranularity & index_granularity = {}, - const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr, - bool is_writing_temp_files = false); + const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr); Block getHeader() const override { return header; } void write(const Block & block) override; @@ -31,7 +30,6 @@ public: private: Block header; - bool sync; };