diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index fb5e5080314..7f4a8e06b75 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -104,12 +104,14 @@ namespace size_t rows = 0; size_t bytes = 0; - std::string header; + /// dumpStructure() of the header -- obsolete + std::string block_header_string; + Block block_header; }; - DistributedHeader readDistributedHeader(ReadBuffer & in, Poco::Logger * log) + DistributedHeader readDistributedHeader(ReadBufferFromFile & in, Poco::Logger * log) { - DistributedHeader header; + DistributedHeader distributed_header; UInt64 query_size; readVarUInt(query_size, in); @@ -135,17 +137,25 @@ namespace LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features."); } - readStringBinary(header.insert_query, header_buf); - header.insert_settings.read(header_buf); + readStringBinary(distributed_header.insert_query, header_buf); + distributed_header.insert_settings.read(header_buf); if (header_buf.hasPendingData()) - header.client_info.read(header_buf, initiator_revision); + distributed_header.client_info.read(header_buf, initiator_revision); if (header_buf.hasPendingData()) { - readVarUInt(header.rows, header_buf); - readVarUInt(header.bytes, header_buf); - readStringBinary(header.header, header_buf); + readVarUInt(distributed_header.rows, header_buf); + readVarUInt(distributed_header.bytes, header_buf); + readStringBinary(distributed_header.block_header_string, header_buf); + } + + if (header_buf.hasPendingData()) + { + NativeBlockInputStream header_block_in(header_buf, DBMS_TCP_PROTOCOL_VERSION); + distributed_header.block_header = header_block_in.read(); + if (!distributed_header.block_header) + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read header from the {} batch", in.getFileName()); } /// Add handling new data here, for example: @@ -155,20 +165,20 @@ namespace /// /// And note that it is safe, because we have checksum and size for header. - return header; + return distributed_header; } if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT) { - header.insert_settings.read(in, SettingsWriteFormat::BINARY); - readStringBinary(header.insert_query, in); - return header; + distributed_header.insert_settings.read(in, SettingsWriteFormat::BINARY); + readStringBinary(distributed_header.insert_query, in); + return distributed_header; } - header.insert_query.resize(query_size); - in.readStrict(header.insert_query.data(), query_size); + distributed_header.insert_query.resize(query_size); + in.readStrict(distributed_header.insert_query.data(), query_size); - return header; + return distributed_header; } /// remote_error argument is used to decide whether some errors should be @@ -200,35 +210,58 @@ namespace return nullptr; } - void writeRemoteConvert(const DistributedHeader & header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log) + void writeAndConvert(RemoteBlockOutputStream & remote, ReadBufferFromFile & in) { - if (remote.getHeader() && header.header != remote.getHeader().dumpStructure()) + CompressedReadBuffer decompressing_in(in); + NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION); + block_in.readPrefix(); + + while (Block block = block_in.read()) { - LOG_WARNING(log, - "Structure does not match (remote: {}, local: {}), implicit conversion will be done", - remote.getHeader().dumpStructure(), header.header); - - CompressedReadBuffer decompressing_in(in); - /// Lack of header, requires to read blocks - NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION); - - block_in.readPrefix(); - while (Block block = block_in.read()) - { - ConvertingBlockInputStream convert( - std::make_shared(block), - remote.getHeader(), - ConvertingBlockInputStream::MatchColumnsMode::Name); - auto adopted_block = convert.read(); - remote.write(adopted_block); - } - block_in.readSuffix(); + ConvertingBlockInputStream convert( + std::make_shared(block), + remote.getHeader(), + ConvertingBlockInputStream::MatchColumnsMode::Name); + auto adopted_block = convert.read(); + remote.write(adopted_block); } - else + + block_in.readSuffix(); + } + + void writeRemoteConvert(const DistributedHeader & distributed_header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log) + { + if (!remote.getHeader()) { CheckingCompressedReadBuffer checking_in(in); remote.writePrepared(checking_in); + return; } + + /// This is old format, that does not have header for the block in the file header, + /// applying ConvertingBlockInputStream in this case is not a big overhead. + /// + /// Anyway we can get header only from the first block, which contain all rows anyway. + if (!distributed_header.block_header) + { + LOG_TRACE(log, "Processing batch {} with old format (no header)", in.getFileName()); + + writeAndConvert(remote, in); + return; + } + + if (!blocksHaveEqualStructure(distributed_header.block_header, remote.getHeader())) + { + LOG_WARNING(log, + "Structure does not match (remote: {}, local: {}), implicit conversion will be done", + remote.getHeader().dumpStructure(), distributed_header.block_header.dumpStructure()); + + writeAndConvert(remote, in); + return; + } + + CheckingCompressedReadBuffer checking_in(in); + remote.writePrepared(checking_in); } } @@ -498,13 +531,15 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; ReadBufferFromFile in(file_path); - const auto & header = readDistributedHeader(in, log); + const auto & distributed_header = readDistributedHeader(in, log); - auto connection = pool->get(timeouts, &header.insert_settings); + auto connection = pool->get(timeouts, &distributed_header.insert_settings); RemoteBlockOutputStream remote{*connection, timeouts, - header.insert_query, header.insert_settings, header.client_info}; + distributed_header.insert_query, + distributed_header.insert_settings, + distributed_header.client_info}; remote.writePrefix(); - writeRemoteConvert(header, remote, in, log); + writeRemoteConvert(distributed_header, remote, in, log); remote.writeSuffix(); } catch (const Exception & e) @@ -523,20 +558,21 @@ struct StorageDistributedDirectoryMonitor::BatchHeader Settings settings; String query; ClientInfo client_info; - String sample_block_structure; + Block header; - BatchHeader(Settings settings_, String query_, ClientInfo client_info_, String sample_block_structure_) + BatchHeader(Settings settings_, String query_, ClientInfo client_info_, Block header_) : settings(std::move(settings_)) , query(std::move(query_)) , client_info(std::move(client_info_)) - , sample_block_structure(std::move(sample_block_structure_)) + , header(std::move(header_)) { } bool operator==(const BatchHeader & other) const { - return std::tie(settings, query, client_info.query_kind, sample_block_structure) == - std::tie(other.settings, other.query, other.client_info.query_kind, other.sample_block_structure); + return std::tie(settings, query, client_info.query_kind) == + std::tie(other.settings, other.query, other.client_info.query_kind) && + blocksHaveEqualStructure(header, other.header); } struct Hash @@ -545,7 +581,7 @@ struct StorageDistributedDirectoryMonitor::BatchHeader { SipHash hash_state; hash_state.update(batch_header.query.data(), batch_header.query.size()); - hash_state.update(batch_header.sample_block_structure.data(), batch_header.sample_block_structure.size()); + batch_header.header.updateHash(hash_state); return hash_state.get64(); } }; @@ -632,16 +668,17 @@ struct StorageDistributedDirectoryMonitor::Batch } ReadBufferFromFile in(file_path->second); - const auto & header = readDistributedHeader(in, parent.log); + const auto & distributed_header = readDistributedHeader(in, parent.log); if (!remote) { remote = std::make_unique(*connection, timeouts, - header.insert_query, header.insert_settings, header.client_info); + distributed_header.insert_query, + distributed_header.insert_settings, + distributed_header.client_info); remote->writePrefix(); } - - writeRemoteConvert(header, *remote, in, parent.log); + writeRemoteConvert(distributed_header, *remote, in, parent.log); } if (remote) @@ -808,22 +845,27 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map size_t total_rows = 0; size_t total_bytes = 0; - std::string sample_block_structure; - DistributedHeader header; + Block header; + DistributedHeader distributed_header; try { /// Determine metadata of the current file and check if it is not broken. ReadBufferFromFile in{file_path}; - header = readDistributedHeader(in, log); + distributed_header = readDistributedHeader(in, log); - if (header.rows) + if (distributed_header.rows) { - total_rows += header.rows; - total_bytes += header.bytes; - sample_block_structure = header.header; + total_rows += distributed_header.rows; + total_bytes += distributed_header.bytes; } - else + + if (distributed_header.block_header) + header = distributed_header.block_header; + + if (!total_rows || !header) { + LOG_TRACE(log, "Processing batch {} with old format (no header/rows)", in.getFileName()); + CompressedReadBuffer decompressing_in(in); NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION); block_in.readPrefix(); @@ -833,8 +875,8 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map total_rows += block.rows(); total_bytes += block.bytes(); - if (sample_block_structure.empty()) - sample_block_structure = block.cloneEmpty().dumpStructure(); + if (!header) + header = block.cloneEmpty(); } block_in.readSuffix(); } @@ -850,7 +892,12 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map throw; } - BatchHeader batch_header(std::move(header.insert_settings), std::move(header.insert_query), std::move(header.client_info), std::move(sample_block_structure)); + BatchHeader batch_header( + std::move(distributed_header.insert_settings), + std::move(distributed_header.insert_query), + std::move(distributed_header.client_info), + std::move(header) + ); Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second; batch.file_indices.push_back(file_idx); diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index f8ba4221842..5fd778b8063 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -679,7 +679,13 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: context.getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION); writeVarUInt(block.rows(), header_buf); writeVarUInt(block.bytes(), header_buf); - writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf); + writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf); /// obsolete + /// Write block header separately in the batch header. + /// It is required for checking does conversion is required or not. + { + NativeBlockOutputStream header_stream{header_buf, DBMS_TCP_PROTOCOL_VERSION, block.cloneEmpty()}; + header_stream.write(block.cloneEmpty()); + } /// Add new fields here, for example: /// writeVarUInt(my_new_data, header_buf); diff --git a/tests/queries/0_stateless/01790_dist_INSERT_block_structure_mismatch_types_and_names.reference b/tests/queries/0_stateless/01790_dist_INSERT_block_structure_mismatch_types_and_names.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01790_dist_INSERT_block_structure_mismatch_types_and_names.sql b/tests/queries/0_stateless/01790_dist_INSERT_block_structure_mismatch_types_and_names.sql new file mode 100644 index 00000000000..e921460ccfc --- /dev/null +++ b/tests/queries/0_stateless/01790_dist_INSERT_block_structure_mismatch_types_and_names.sql @@ -0,0 +1,22 @@ +DROP TABLE IF EXISTS tmp_01781; +DROP TABLE IF EXISTS dist_01781; + +SET prefer_localhost_replica=0; + +CREATE TABLE tmp_01781 (n LowCardinality(String)) ENGINE=Memory; +CREATE TABLE dist_01781 (n LowCardinality(String)) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01781, cityHash64(n)); + +SET insert_distributed_sync=1; +INSERT INTO dist_01781 VALUES ('1'),('2'); +-- different LowCardinality size +INSERT INTO dist_01781 SELECT * FROM numbers(1000); + +SET insert_distributed_sync=0; +SYSTEM STOP DISTRIBUTED SENDS dist_01781; +INSERT INTO dist_01781 VALUES ('1'),('2'); +-- different LowCardinality size +INSERT INTO dist_01781 SELECT * FROM numbers(1000); +SYSTEM FLUSH DISTRIBUTED dist_01781; + +DROP TABLE tmp_01781; +DROP TABLE dist_01781; diff --git a/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.reference b/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.reference new file mode 100644 index 00000000000..3bba1ac23c0 --- /dev/null +++ b/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.reference @@ -0,0 +1,6 @@ + DistributedBlockOutputStream: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done. + DistributedBlockOutputStream: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done. +1 +1 +2 +2 diff --git a/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.sh b/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.sh new file mode 100755 index 00000000000..e989696da03 --- /dev/null +++ b/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +# NOTE: this is a partial copy of the 01683_dist_INSERT_block_structure_mismatch, +# but this test also checks the log messages + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT --prefer_localhost_replica=0 -nm -q " + DROP TABLE IF EXISTS tmp_01683; + DROP TABLE IF EXISTS dist_01683; + + CREATE TABLE tmp_01683 (n Int8) ENGINE=Memory; + CREATE TABLE dist_01683 (n UInt64) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01683, n); + + SET insert_distributed_sync=1; + INSERT INTO dist_01683 VALUES (1),(2); + + SET insert_distributed_sync=0; + INSERT INTO dist_01683 VALUES (1),(2); + SYSTEM FLUSH DISTRIBUTED dist_01683; + + -- TODO: cover distributed_directory_monitor_batch_inserts=1 + + SELECT * FROM tmp_01683 ORDER BY n; + + DROP TABLE tmp_01683; + DROP TABLE dist_01683; +" |& sed 's/^.*