From 27d4fbd13b7c6eeade74a84ab0813c2fd1962eaf Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 2 Apr 2021 07:09:39 +0300 Subject: [PATCH] Compare Block itself for distributed async INSERT batches INSERT into Distributed with insert_distributed_sync=1 stores the distributed batches on the disk for sending in background. But types may be a little bit different for the Distributed and it's underlying table, so the initiator need to know whether conversion is required or not. Before this patch those on disk distributed batches contains header, which includes dumpStructure() for the block in that batch, however it checks not only names and types and plus dumpStructure() is a debug method. So instead of storing string representation for the block header we should store empty block in the file header (note, that we cannot store the empty block not in header, since this will require reading all blocks from file, due to some trickery of the readers interface). Note, that this patch also contains tiny refactoring: - s/header/distributed_header/ v1: dumpNamesAndTypes() v2: dump empty block into the batch itself v3: move empty block into the header --- src/Storages/Distributed/DirectoryMonitor.cpp | 173 +++++++++++------- .../DistributedBlockOutputStream.cpp | 8 +- ...ructure_mismatch_types_and_names.reference | 0 ...ock_structure_mismatch_types_and_names.sql | 22 +++ ..._INSERT_block_structure_mismatch.reference | 6 + ...91_dist_INSERT_block_structure_mismatch.sh | 30 +++ .../queries/0_stateless/arcadia_skip_list.txt | 2 + 7 files changed, 177 insertions(+), 64 deletions(-) create mode 100644 tests/queries/0_stateless/01790_dist_INSERT_block_structure_mismatch_types_and_names.reference create mode 100644 tests/queries/0_stateless/01790_dist_INSERT_block_structure_mismatch_types_and_names.sql create mode 100644 tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.reference create mode 100755 tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.sh diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index fb5e5080314..7f4a8e06b75 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -104,12 +104,14 @@ namespace size_t rows = 0; size_t bytes = 0; - std::string header; + /// dumpStructure() of the header -- obsolete + std::string block_header_string; + Block block_header; }; - DistributedHeader readDistributedHeader(ReadBuffer & in, Poco::Logger * log) + DistributedHeader readDistributedHeader(ReadBufferFromFile & in, Poco::Logger * log) { - DistributedHeader header; + DistributedHeader distributed_header; UInt64 query_size; readVarUInt(query_size, in); @@ -135,17 +137,25 @@ namespace LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features."); } - readStringBinary(header.insert_query, header_buf); - header.insert_settings.read(header_buf); + readStringBinary(distributed_header.insert_query, header_buf); + distributed_header.insert_settings.read(header_buf); if (header_buf.hasPendingData()) - header.client_info.read(header_buf, initiator_revision); + distributed_header.client_info.read(header_buf, initiator_revision); if (header_buf.hasPendingData()) { - readVarUInt(header.rows, header_buf); - readVarUInt(header.bytes, header_buf); - readStringBinary(header.header, header_buf); + readVarUInt(distributed_header.rows, header_buf); + readVarUInt(distributed_header.bytes, header_buf); + readStringBinary(distributed_header.block_header_string, header_buf); + } + + if (header_buf.hasPendingData()) + { + NativeBlockInputStream header_block_in(header_buf, DBMS_TCP_PROTOCOL_VERSION); + distributed_header.block_header = header_block_in.read(); + if (!distributed_header.block_header) + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read header from the {} batch", in.getFileName()); } /// Add handling new data here, for example: @@ -155,20 +165,20 @@ namespace /// /// And note that it is safe, because we have checksum and size for header. - return header; + return distributed_header; } if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT) { - header.insert_settings.read(in, SettingsWriteFormat::BINARY); - readStringBinary(header.insert_query, in); - return header; + distributed_header.insert_settings.read(in, SettingsWriteFormat::BINARY); + readStringBinary(distributed_header.insert_query, in); + return distributed_header; } - header.insert_query.resize(query_size); - in.readStrict(header.insert_query.data(), query_size); + distributed_header.insert_query.resize(query_size); + in.readStrict(distributed_header.insert_query.data(), query_size); - return header; + return distributed_header; } /// remote_error argument is used to decide whether some errors should be @@ -200,35 +210,58 @@ namespace return nullptr; } - void writeRemoteConvert(const DistributedHeader & header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log) + void writeAndConvert(RemoteBlockOutputStream & remote, ReadBufferFromFile & in) { - if (remote.getHeader() && header.header != remote.getHeader().dumpStructure()) + CompressedReadBuffer decompressing_in(in); + NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION); + block_in.readPrefix(); + + while (Block block = block_in.read()) { - LOG_WARNING(log, - "Structure does not match (remote: {}, local: {}), implicit conversion will be done", - remote.getHeader().dumpStructure(), header.header); - - CompressedReadBuffer decompressing_in(in); - /// Lack of header, requires to read blocks - NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION); - - block_in.readPrefix(); - while (Block block = block_in.read()) - { - ConvertingBlockInputStream convert( - std::make_shared(block), - remote.getHeader(), - ConvertingBlockInputStream::MatchColumnsMode::Name); - auto adopted_block = convert.read(); - remote.write(adopted_block); - } - block_in.readSuffix(); + ConvertingBlockInputStream convert( + std::make_shared(block), + remote.getHeader(), + ConvertingBlockInputStream::MatchColumnsMode::Name); + auto adopted_block = convert.read(); + remote.write(adopted_block); } - else + + block_in.readSuffix(); + } + + void writeRemoteConvert(const DistributedHeader & distributed_header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log) + { + if (!remote.getHeader()) { CheckingCompressedReadBuffer checking_in(in); remote.writePrepared(checking_in); + return; } + + /// This is old format, that does not have header for the block in the file header, + /// applying ConvertingBlockInputStream in this case is not a big overhead. + /// + /// Anyway we can get header only from the first block, which contain all rows anyway. + if (!distributed_header.block_header) + { + LOG_TRACE(log, "Processing batch {} with old format (no header)", in.getFileName()); + + writeAndConvert(remote, in); + return; + } + + if (!blocksHaveEqualStructure(distributed_header.block_header, remote.getHeader())) + { + LOG_WARNING(log, + "Structure does not match (remote: {}, local: {}), implicit conversion will be done", + remote.getHeader().dumpStructure(), distributed_header.block_header.dumpStructure()); + + writeAndConvert(remote, in); + return; + } + + CheckingCompressedReadBuffer checking_in(in); + remote.writePrepared(checking_in); } } @@ -498,13 +531,15 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; ReadBufferFromFile in(file_path); - const auto & header = readDistributedHeader(in, log); + const auto & distributed_header = readDistributedHeader(in, log); - auto connection = pool->get(timeouts, &header.insert_settings); + auto connection = pool->get(timeouts, &distributed_header.insert_settings); RemoteBlockOutputStream remote{*connection, timeouts, - header.insert_query, header.insert_settings, header.client_info}; + distributed_header.insert_query, + distributed_header.insert_settings, + distributed_header.client_info}; remote.writePrefix(); - writeRemoteConvert(header, remote, in, log); + writeRemoteConvert(distributed_header, remote, in, log); remote.writeSuffix(); } catch (const Exception & e) @@ -523,20 +558,21 @@ struct StorageDistributedDirectoryMonitor::BatchHeader Settings settings; String query; ClientInfo client_info; - String sample_block_structure; + Block header; - BatchHeader(Settings settings_, String query_, ClientInfo client_info_, String sample_block_structure_) + BatchHeader(Settings settings_, String query_, ClientInfo client_info_, Block header_) : settings(std::move(settings_)) , query(std::move(query_)) , client_info(std::move(client_info_)) - , sample_block_structure(std::move(sample_block_structure_)) + , header(std::move(header_)) { } bool operator==(const BatchHeader & other) const { - return std::tie(settings, query, client_info.query_kind, sample_block_structure) == - std::tie(other.settings, other.query, other.client_info.query_kind, other.sample_block_structure); + return std::tie(settings, query, client_info.query_kind) == + std::tie(other.settings, other.query, other.client_info.query_kind) && + blocksHaveEqualStructure(header, other.header); } struct Hash @@ -545,7 +581,7 @@ struct StorageDistributedDirectoryMonitor::BatchHeader { SipHash hash_state; hash_state.update(batch_header.query.data(), batch_header.query.size()); - hash_state.update(batch_header.sample_block_structure.data(), batch_header.sample_block_structure.size()); + batch_header.header.updateHash(hash_state); return hash_state.get64(); } }; @@ -632,16 +668,17 @@ struct StorageDistributedDirectoryMonitor::Batch } ReadBufferFromFile in(file_path->second); - const auto & header = readDistributedHeader(in, parent.log); + const auto & distributed_header = readDistributedHeader(in, parent.log); if (!remote) { remote = std::make_unique(*connection, timeouts, - header.insert_query, header.insert_settings, header.client_info); + distributed_header.insert_query, + distributed_header.insert_settings, + distributed_header.client_info); remote->writePrefix(); } - - writeRemoteConvert(header, *remote, in, parent.log); + writeRemoteConvert(distributed_header, *remote, in, parent.log); } if (remote) @@ -808,22 +845,27 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map size_t total_rows = 0; size_t total_bytes = 0; - std::string sample_block_structure; - DistributedHeader header; + Block header; + DistributedHeader distributed_header; try { /// Determine metadata of the current file and check if it is not broken. ReadBufferFromFile in{file_path}; - header = readDistributedHeader(in, log); + distributed_header = readDistributedHeader(in, log); - if (header.rows) + if (distributed_header.rows) { - total_rows += header.rows; - total_bytes += header.bytes; - sample_block_structure = header.header; + total_rows += distributed_header.rows; + total_bytes += distributed_header.bytes; } - else + + if (distributed_header.block_header) + header = distributed_header.block_header; + + if (!total_rows || !header) { + LOG_TRACE(log, "Processing batch {} with old format (no header/rows)", in.getFileName()); + CompressedReadBuffer decompressing_in(in); NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION); block_in.readPrefix(); @@ -833,8 +875,8 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map total_rows += block.rows(); total_bytes += block.bytes(); - if (sample_block_structure.empty()) - sample_block_structure = block.cloneEmpty().dumpStructure(); + if (!header) + header = block.cloneEmpty(); } block_in.readSuffix(); } @@ -850,7 +892,12 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map throw; } - BatchHeader batch_header(std::move(header.insert_settings), std::move(header.insert_query), std::move(header.client_info), std::move(sample_block_structure)); + BatchHeader batch_header( + std::move(distributed_header.insert_settings), + std::move(distributed_header.insert_query), + std::move(distributed_header.client_info), + std::move(header) + ); Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second; batch.file_indices.push_back(file_idx); diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index f8ba4221842..5fd778b8063 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -679,7 +679,13 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: context.getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION); writeVarUInt(block.rows(), header_buf); writeVarUInt(block.bytes(), header_buf); - writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf); + writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf); /// obsolete + /// Write block header separately in the batch header. + /// It is required for checking does conversion is required or not. + { + NativeBlockOutputStream header_stream{header_buf, DBMS_TCP_PROTOCOL_VERSION, block.cloneEmpty()}; + header_stream.write(block.cloneEmpty()); + } /// Add new fields here, for example: /// writeVarUInt(my_new_data, header_buf); diff --git a/tests/queries/0_stateless/01790_dist_INSERT_block_structure_mismatch_types_and_names.reference b/tests/queries/0_stateless/01790_dist_INSERT_block_structure_mismatch_types_and_names.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01790_dist_INSERT_block_structure_mismatch_types_and_names.sql b/tests/queries/0_stateless/01790_dist_INSERT_block_structure_mismatch_types_and_names.sql new file mode 100644 index 00000000000..e921460ccfc --- /dev/null +++ b/tests/queries/0_stateless/01790_dist_INSERT_block_structure_mismatch_types_and_names.sql @@ -0,0 +1,22 @@ +DROP TABLE IF EXISTS tmp_01781; +DROP TABLE IF EXISTS dist_01781; + +SET prefer_localhost_replica=0; + +CREATE TABLE tmp_01781 (n LowCardinality(String)) ENGINE=Memory; +CREATE TABLE dist_01781 (n LowCardinality(String)) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01781, cityHash64(n)); + +SET insert_distributed_sync=1; +INSERT INTO dist_01781 VALUES ('1'),('2'); +-- different LowCardinality size +INSERT INTO dist_01781 SELECT * FROM numbers(1000); + +SET insert_distributed_sync=0; +SYSTEM STOP DISTRIBUTED SENDS dist_01781; +INSERT INTO dist_01781 VALUES ('1'),('2'); +-- different LowCardinality size +INSERT INTO dist_01781 SELECT * FROM numbers(1000); +SYSTEM FLUSH DISTRIBUTED dist_01781; + +DROP TABLE tmp_01781; +DROP TABLE dist_01781; diff --git a/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.reference b/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.reference new file mode 100644 index 00000000000..3bba1ac23c0 --- /dev/null +++ b/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.reference @@ -0,0 +1,6 @@ + DistributedBlockOutputStream: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done. + DistributedBlockOutputStream: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done. +1 +1 +2 +2 diff --git a/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.sh b/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.sh new file mode 100755 index 00000000000..e989696da03 --- /dev/null +++ b/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +# NOTE: this is a partial copy of the 01683_dist_INSERT_block_structure_mismatch, +# but this test also checks the log messages + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT --prefer_localhost_replica=0 -nm -q " + DROP TABLE IF EXISTS tmp_01683; + DROP TABLE IF EXISTS dist_01683; + + CREATE TABLE tmp_01683 (n Int8) ENGINE=Memory; + CREATE TABLE dist_01683 (n UInt64) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01683, n); + + SET insert_distributed_sync=1; + INSERT INTO dist_01683 VALUES (1),(2); + + SET insert_distributed_sync=0; + INSERT INTO dist_01683 VALUES (1),(2); + SYSTEM FLUSH DISTRIBUTED dist_01683; + + -- TODO: cover distributed_directory_monitor_batch_inserts=1 + + SELECT * FROM tmp_01683 ORDER BY n; + + DROP TABLE tmp_01683; + DROP TABLE dist_01683; +" |& sed 's/^.*