diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 903052b9d45..f94e6700a74 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -78,6 +78,70 @@ namespace } } + struct DistributedHeader + { + Settings insert_settings; + std::string insert_query; + ClientInfo client_info; + }; + + static DistributedHeader readDistributedHeader(ReadBuffer & in, Poco::Logger * log) + { + DistributedHeader header; + + UInt64 query_size; + readVarUInt(query_size, in); + + if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER) + { + /// Read the header as a string. + String header_data; + readStringBinary(header_data, in); + + /// Check the checksum of the header. + CityHash_v1_0_2::uint128 checksum; + readPODBinary(checksum, in); + assertChecksum(checksum, CityHash_v1_0_2::CityHash128(header_data.data(), header_data.size())); + + /// Read the parts of the header. + ReadBufferFromString header_buf(header_data); + + UInt64 initiator_revision; + readVarUInt(initiator_revision, header_buf); + if (DBMS_TCP_PROTOCOL_VERSION < initiator_revision) + { + LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features."); + } + + readStringBinary(header.insert_query, header_buf); + header.insert_settings.read(header_buf); + + if (header_buf.hasPendingData()) + header.client_info.read(header_buf, initiator_revision); + + /// Add handling new data here, for example: + /// + /// if (header_buf.hasPendingData()) + /// readVarUInt(my_new_data, header_buf); + /// + /// And note that it is safe, because we have checksum and size for header. + + return header; + } + + if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT) + { + header.insert_settings.read(in, SettingsWriteFormat::BINARY); + readStringBinary(header.insert_query, in); + return header; + } + + header.insert_query.resize(query_size); + in.readStrict(header.insert_query.data(), query_size); + + return header; + } + } @@ -330,13 +394,10 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa try { Block sample_block; - Settings insert_settings; - String insert_query; - ClientInfo client_info; /// Determine metadata of the current file and check if it is not broken. ReadBufferFromFile in{file_path}; - readHeader(in, insert_settings, insert_query, client_info, log); + readDistributedHeader(in, log); CompressedReadBuffer decompressing_in(in); NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION); @@ -359,16 +420,11 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; ReadBufferFromFile in{file_path}; + const auto & header = readDistributedHeader(in, log); - Settings insert_settings; - std::string insert_query; - ClientInfo client_info; - - readHeader(in, insert_settings, insert_query, client_info, log); - - auto connection = pool->get(timeouts, &insert_settings); - - RemoteBlockOutputStream remote{*connection, timeouts, insert_query, insert_settings, client_info}; + auto connection = pool->get(timeouts, &header.insert_settings); + RemoteBlockOutputStream remote{*connection, timeouts, + header.insert_query, header.insert_settings, header.client_info}; remote.writePrefix(); remote.writePrepared(in); @@ -390,57 +446,6 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa LOG_TRACE(log, "Finished processing `{}`", file_path); } -void StorageDistributedDirectoryMonitor::readHeader( - ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Poco::Logger * log) -{ - UInt64 query_size; - readVarUInt(query_size, in); - - if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER) - { - /// Read the header as a string. - String header; - readStringBinary(header, in); - - /// Check the checksum of the header. - CityHash_v1_0_2::uint128 checksum; - readPODBinary(checksum, in); - assertChecksum(checksum, CityHash_v1_0_2::CityHash128(header.data(), header.size())); - - /// Read the parts of the header. - ReadBufferFromString header_buf(header); - - UInt64 initiator_revision; - readVarUInt(initiator_revision, header_buf); - if (DBMS_TCP_PROTOCOL_VERSION < initiator_revision) - { - LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features."); - } - - readStringBinary(insert_query, header_buf); - insert_settings.read(header_buf); - - if (header_buf.hasPendingData()) - client_info.read(header_buf, initiator_revision); - - /// Add handling new data here, for example: - /// if (header_buf.hasPendingData()) - /// readVarUInt(my_new_data, header_buf); - - return; - } - - if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT) - { - insert_settings.read(in, SettingsWriteFormat::BINARY); - readStringBinary(insert_query, in); - return; - } - - insert_query.resize(query_size); - in.readStrict(insert_query.data(), query_size); -} - struct StorageDistributedDirectoryMonitor::BatchHeader { Settings settings; @@ -551,9 +556,6 @@ struct StorageDistributedDirectoryMonitor::Batch bool batch_broken = false; try { - Settings insert_settings; - String insert_query; - ClientInfo client_info; std::unique_ptr remote; bool first = true; @@ -568,12 +570,13 @@ struct StorageDistributedDirectoryMonitor::Batch } ReadBufferFromFile in(file_path->second); - parent.readHeader(in, insert_settings, insert_query, client_info, parent.log); + const auto & header = readDistributedHeader(in, parent.log); if (first) { first = false; - remote = std::make_unique(*connection, timeouts, insert_query, insert_settings, client_info); + remote = std::make_unique(*connection, timeouts, + header.insert_query, header.insert_settings, header.client_info); remote->writePrefix(); } @@ -652,10 +655,7 @@ public: , block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION) , log{&Poco::Logger::get("DirectoryMonitorBlockInputStream")} { - Settings insert_settings; - String insert_query; - ClientInfo client_info; - StorageDistributedDirectoryMonitor::readHeader(in, insert_settings, insert_query, client_info, log); + readDistributedHeader(in, log); block_in.readPrefix(); first_block = block_in.read(); @@ -744,14 +744,12 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map size_t total_rows = 0; size_t total_bytes = 0; Block sample_block; - Settings insert_settings; - String insert_query; - ClientInfo client_info; + DistributedHeader header; try { /// Determine metadata of the current file and check if it is not broken. ReadBufferFromFile in{file_path}; - readHeader(in, insert_settings, insert_query, client_info, log); + header = readDistributedHeader(in, log); CompressedReadBuffer decompressing_in(in); NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION); @@ -778,7 +776,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map throw; } - BatchHeader batch_header(std::move(insert_settings), std::move(insert_query), std::move(client_info), std::move(sample_block)); + BatchHeader batch_header(std::move(header.insert_settings), std::move(header.insert_query), std::move(header.client_info), std::move(sample_block)); Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second; batch.file_indices.push_back(file_idx); diff --git a/src/Storages/Distributed/DirectoryMonitor.h b/src/Storages/Distributed/DirectoryMonitor.h index a6175b44d7b..bc897136786 100644 --- a/src/Storages/Distributed/DirectoryMonitor.h +++ b/src/Storages/Distributed/DirectoryMonitor.h @@ -111,9 +111,6 @@ private: CurrentMetrics::Increment metric_pending_files; - /// Read insert query and insert settings for backward compatible. - static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Poco::Logger * log); - friend class DirectoryMonitorBlockInputStream; };