Add return flag for preFinalize.

This commit is contained in:
Nikolai Kochetov 2023-02-09 19:50:54 +00:00
parent 98765cc20b
commit 3261378ef8
16 changed files with 74 additions and 37 deletions

View File

@ -169,6 +169,7 @@ void SerializationAggregateFunction::deserializeTextQuoted(IColumn & column, Rea
{ {
String s; String s;
readQuotedStringWithSQLStyle(s, istr); readQuotedStringWithSQLStyle(s, istr);
std::cerr << "==========\n" << s << std::endl << "====\n";
deserializeFromString(function, column, s, version); deserializeFromString(function, column, s, version);
} }

View File

@ -110,7 +110,14 @@ public:
/// and start sending data asynchronously. It may improve writing performance in case you have /// and start sending data asynchronously. It may improve writing performance in case you have
/// multiple files to finalize. Mainly, for blob storage, finalization has high latency, /// multiple files to finalize. Mainly, for blob storage, finalization has high latency,
/// and calling preFinalize in a loop may parallelize it. /// and calling preFinalize in a loop may parallelize it.
virtual void preFinalize() { next(); } ///
/// Returns true if preFinalize did something useful and some data was sent asynchronously indeed.
/// Otherwise, we can call finalize() and remove a buffer to decrease memory usage.
[[nodiscard]] virtual bool preFinalize()
{
next();
return false;
}
/// Write the last data. /// Write the last data.
void finalize() void finalize()

View File

@ -16,7 +16,7 @@ void WriteBufferFromFileDecorator::finalizeImpl()
next(); next();
if (!is_prefinalized) if (!is_prefinalized)
WriteBufferFromFileDecorator::preFinalize(); std::ignore = WriteBufferFromFileDecorator::preFinalize();
impl->finalize(); impl->finalize();
} }

View File

@ -17,11 +17,12 @@ public:
std::string getFileName() const override; std::string getFileName() const override;
void preFinalize() override bool preFinalize() override
{ {
next(); next();
impl->preFinalize(); auto res = impl->preFinalize();
is_prefinalized = true; is_prefinalized = true;
return res;
} }
const WriteBuffer & getImpl() const { return *impl; } const WriteBuffer & getImpl() const { return *impl; }

View File

@ -148,7 +148,7 @@ WriteBufferFromS3::~WriteBufferFromS3()
#endif #endif
} }
void WriteBufferFromS3::preFinalize() bool WriteBufferFromS3::preFinalize()
{ {
next(); next();
@ -163,12 +163,13 @@ void WriteBufferFromS3::preFinalize()
} }
is_prefinalized = true; is_prefinalized = true;
return true;
} }
void WriteBufferFromS3::finalizeImpl() void WriteBufferFromS3::finalizeImpl()
{ {
if (!is_prefinalized) if (!is_prefinalized)
preFinalize(); std::ignore = preFinalize();
waitForAllBackGroundTasks(); waitForAllBackGroundTasks();

View File

@ -55,7 +55,7 @@ public:
void nextImpl() override; void nextImpl() override;
void preFinalize() override; bool preFinalize() override;
private: private:
void allocateBuffer(); void allocateBuffer();

View File

@ -139,8 +139,10 @@ IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::s
out_hashing.next(); out_hashing.next();
out_checksums.files[file_name].file_size = out_hashing.count(); out_checksums.files[file_name].file_size = out_hashing.count();
out_checksums.files[file_name].file_hash = out_hashing.getHash(); out_checksums.files[file_name].file_hash = out_hashing.getHash();
out->preFinalize(); if (out->preFinalize())
written_files.emplace_back(std::move(out)); written_files.emplace_back(std::move(out));
else
out->finalize();
} }
return written_files; return written_files;

View File

@ -39,6 +39,8 @@ public:
Columns releaseIndexColumns(); Columns releaseIndexColumns();
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; } const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
bool hasAsynchronousWritingBuffers() const { return has_asynchronous_writing_buffers; }
protected: protected:
const MergeTreeMutableDataPartPtr data_part; const MergeTreeMutableDataPartPtr data_part;
@ -48,6 +50,7 @@ protected:
const MergeTreeWriterSettings settings; const MergeTreeWriterSettings settings;
MergeTreeIndexGranularity index_granularity; MergeTreeIndexGranularity index_granularity;
const bool with_final_mark; const bool with_final_mark;
bool has_asynchronous_writing_buffers = false;
MutableColumns index_columns; MutableColumns index_columns;
}; };

View File

@ -558,7 +558,10 @@ void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const
auto changed_checksums = ctx->column_to->fillChecksums(global_ctx->new_data_part, global_ctx->checksums_gathered_columns); auto changed_checksums = ctx->column_to->fillChecksums(global_ctx->new_data_part, global_ctx->checksums_gathered_columns);
global_ctx->checksums_gathered_columns.add(std::move(changed_checksums)); global_ctx->checksums_gathered_columns.add(std::move(changed_checksums));
ctx->delayed_streams.emplace_back(std::move(ctx->column_to)); if (ctx->column_to->hasAsynchronousWritingBuffers())
ctx->delayed_streams.emplace_back(std::move(ctx->column_to));
else
ctx->column_to->finish(ctx->need_sync);
while (ctx->delayed_streams.size() > ctx->max_delayed_streams) while (ctx->delayed_streams.size() > ctx->max_delayed_streams)
{ {

View File

@ -286,8 +286,8 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Check
marks_file_hashing->next(); marks_file_hashing->next();
addToChecksums(checksums); addToChecksums(checksums);
plain_file->preFinalize(); has_asynchronous_writing_buffers |= plain_file->preFinalize();
marks_file->preFinalize(); has_asynchronous_writing_buffers |= marks_file->preFinalize();
} }
void MergeTreeDataPartWriterCompact::finishDataSerialization(bool sync) void MergeTreeDataPartWriterCompact::finishDataSerialization(bool sync)

View File

@ -11,7 +11,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
void MergeTreeDataPartWriterOnDisk::Stream::preFinalize() bool MergeTreeDataPartWriterOnDisk::Stream::preFinalize()
{ {
compressed_hashing.next(); compressed_hashing.next();
compressor.next(); compressor.next();
@ -25,16 +25,18 @@ void MergeTreeDataPartWriterOnDisk::Stream::preFinalize()
marks_hashing.next(); marks_hashing.next();
plain_file->preFinalize(); bool has_asynchronous_writing_buffers = false;
marks_file->preFinalize(); has_asynchronous_writing_buffers |= plain_file->preFinalize();
has_asynchronous_writing_buffers |= marks_file->preFinalize();
is_prefinalized = true; is_prefinalized = true;
return has_asynchronous_writing_buffers;
} }
void MergeTreeDataPartWriterOnDisk::Stream::finalize() void MergeTreeDataPartWriterOnDisk::Stream::finalize()
{ {
if (!is_prefinalized) if (!is_prefinalized)
preFinalize(); std::ignore = preFinalize();
plain_file->finalize(); plain_file->finalize();
marks_file->finalize(); marks_file->finalize();
@ -360,7 +362,7 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
} }
checksums.files[index_name].file_size = index_file_hashing_stream->count(); checksums.files[index_name].file_size = index_file_hashing_stream->count();
checksums.files[index_name].file_hash = index_file_hashing_stream->getHash(); checksums.files[index_name].file_hash = index_file_hashing_stream->getHash();
index_file_stream->preFinalize(); has_asynchronous_writing_buffers |= index_file_stream->preFinalize();
} }
} }
@ -392,7 +394,7 @@ void MergeTreeDataPartWriterOnDisk::fillSkipIndicesChecksums(MergeTreeData::Data
for (auto & stream : skip_indices_streams) for (auto & stream : skip_indices_streams)
{ {
stream->preFinalize(); has_asynchronous_writing_buffers |= stream->preFinalize();
stream->addToChecksums(checksums); stream->addToChecksums(checksums);
} }
} }

View File

@ -80,7 +80,7 @@ public:
bool is_prefinalized = false; bool is_prefinalized = false;
void preFinalize(); [[nodiscard]] bool preFinalize();
void finalize(); void finalize();

View File

@ -589,7 +589,7 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksum
for (auto & stream : column_streams) for (auto & stream : column_streams)
{ {
stream.second->preFinalize(); has_asynchronous_writing_buffers |= stream.second->preFinalize();
stream.second->addToChecksums(checksums); stream.second->addToChecksums(checksums);
} }
} }

View File

@ -418,7 +418,7 @@ std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const Block &
out_hashing.next(); out_hashing.next();
checksums.files["partition.dat"].file_size = out_hashing.count(); checksums.files["partition.dat"].file_size = out_hashing.count();
checksums.files["partition.dat"].file_hash = out_hashing.getHash(); checksums.files["partition.dat"].file_hash = out_hashing.getHash();
out->preFinalize(); std::ignore = out->preFinalize();
return out; return out;
} }

View File

@ -213,8 +213,10 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
count_out_hashing.next(); count_out_hashing.next();
checksums.files["count.txt"].file_size = count_out_hashing.count(); checksums.files["count.txt"].file_size = count_out_hashing.count();
checksums.files["count.txt"].file_hash = count_out_hashing.getHash(); checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
count_out->preFinalize(); if (count_out->preFinalize())
written_files.emplace_back(std::move(count_out)); written_files.emplace_back(std::move(count_out));
else
count_out->finalize();
} }
} }
else else
@ -226,8 +228,10 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
writeUUIDText(new_part->uuid, out_hashing); writeUUIDText(new_part->uuid, out_hashing);
checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count(); checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash(); checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash();
out->preFinalize(); if (out->preFinalize())
written_files.emplace_back(std::move(out)); written_files.emplace_back(std::move(out));
else
out->finalize();
} }
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
@ -253,8 +257,10 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
count_out_hashing.next(); count_out_hashing.next();
checksums.files["count.txt"].file_size = count_out_hashing.count(); checksums.files["count.txt"].file_size = count_out_hashing.count();
checksums.files["count.txt"].file_hash = count_out_hashing.getHash(); checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
count_out->preFinalize(); if (count_out->preFinalize())
written_files.emplace_back(std::move(count_out)); written_files.emplace_back(std::move(count_out));
else
count_out->finalize();
} }
} }
@ -266,8 +272,10 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
new_part->ttl_infos.write(out_hashing); new_part->ttl_infos.write(out_hashing);
checksums.files["ttl.txt"].file_size = out_hashing.count(); checksums.files["ttl.txt"].file_size = out_hashing.count();
checksums.files["ttl.txt"].file_hash = out_hashing.getHash(); checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
out->preFinalize(); if (out->preFinalize())
written_files.emplace_back(std::move(out)); written_files.emplace_back(std::move(out));
else
out->finalize();
} }
if (!new_part->getSerializationInfos().empty()) if (!new_part->getSerializationInfos().empty())
@ -277,24 +285,30 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
new_part->getSerializationInfos().writeJSON(out_hashing); new_part->getSerializationInfos().writeJSON(out_hashing);
checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count(); checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash(); checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash();
out->preFinalize(); if (out->preFinalize())
written_files.emplace_back(std::move(out)); written_files.emplace_back(std::move(out));
else
out->finalize();
} }
{ {
/// Write a file with a description of columns. /// Write a file with a description of columns.
auto out = new_part->getDataPartStorage().writeFile("columns.txt", 4096, write_settings); auto out = new_part->getDataPartStorage().writeFile("columns.txt", 4096, write_settings);
new_part->getColumns().writeText(*out); new_part->getColumns().writeText(*out);
out->preFinalize(); if (out->preFinalize())
written_files.emplace_back(std::move(out)); written_files.emplace_back(std::move(out));
else
out->finalize();
} }
if (default_codec != nullptr) if (default_codec != nullptr)
{ {
auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, write_settings); auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, write_settings);
DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out); DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out);
out->preFinalize(); if (out->preFinalize())
written_files.emplace_back(std::move(out)); written_files.emplace_back(std::move(out));
else
out->finalize();
} }
else else
{ {
@ -306,8 +320,10 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
/// Write file with checksums. /// Write file with checksums.
auto out = new_part->getDataPartStorage().writeFile("checksums.txt", 4096, write_settings); auto out = new_part->getDataPartStorage().writeFile("checksums.txt", 4096, write_settings);
checksums.write(*out); checksums.write(*out);
out->preFinalize(); if (out->preFinalize())
written_files.emplace_back(std::move(out)); written_files.emplace_back(std::move(out));
else
out->finalize();
} }
return written_files; return written_files;

View File

@ -29,6 +29,7 @@ public:
MergeTreeData::DataPart::Checksums MergeTreeData::DataPart::Checksums
fillChecksums(MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::DataPart::Checksums & all_checksums); fillChecksums(MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::DataPart::Checksums & all_checksums);
bool hasAsynchronousWritingBuffers() const { return writer->hasAsynchronousWritingBuffers(); }
void finish(bool sync); void finish(bool sync);
private: private: