Merge pull request #10940 from azat/dist-send-partially-written-read-fix

Avoid sending partially written files by the DistributedBlockOutputStream
This commit is contained in:
alexey-milovidov 2020-05-17 06:53:51 +03:00 committed by GitHub
commit 0d76091fd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -574,31 +574,34 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
first_file_tmp_path = tmp_path + file_name;
WriteBufferFromFile out{first_file_tmp_path};
CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()};
/// Write batch to temporary location
{
WriteBufferFromFile out{first_file_tmp_path};
CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()};
/// Prepare the header.
/// We wrap the header into a string for compatibility with older versions:
/// a shard will able to read the header partly and ignore other parts based on its version.
WriteBufferFromOwnString header_buf;
writeVarUInt(ClickHouseRevision::get(), header_buf);
writeStringBinary(query_string, header_buf);
context.getSettingsRef().serialize(header_buf);
context.getClientInfo().write(header_buf, ClickHouseRevision::get());
/// Prepare the header.
/// We wrap the header into a string for compatibility with older versions:
/// a shard will able to read the header partly and ignore other parts based on its version.
WriteBufferFromOwnString header_buf;
writeVarUInt(ClickHouseRevision::get(), header_buf);
writeStringBinary(query_string, header_buf);
context.getSettingsRef().serialize(header_buf);
context.getClientInfo().write(header_buf, ClickHouseRevision::get());
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);
/// Write the header.
const StringRef header = header_buf.stringRef();
writeVarUInt(DBMS_DISTRIBUTED_SIGNATURE_HEADER, out);
writeStringBinary(header, out);
writePODBinary(CityHash_v1_0_2::CityHash128(header.data, header.size), out);
/// Write the header.
const StringRef header = header_buf.stringRef();
writeVarUInt(DBMS_DISTRIBUTED_SIGNATURE_HEADER, out);
writeStringBinary(header, out);
writePODBinary(CityHash_v1_0_2::CityHash128(header.data, header.size), out);
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
}
// Create hardlink here to reuse increment number
const std::string block_file_path(path + '/' + file_name);