Merge pull request #22325 from azat/dist-async-insert-header-check

Check only column name and type for async distributed blocks
This commit is contained in:
Vladimir 2021-04-07 11:51:12 +03:00 committed by GitHub
commit db0550013c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 177 additions and 64 deletions

View File

@ -104,12 +104,14 @@ namespace
size_t rows = 0;
size_t bytes = 0;
std::string header;
/// dumpStructure() of the header -- obsolete
std::string block_header_string;
Block block_header;
};
DistributedHeader readDistributedHeader(ReadBuffer & in, Poco::Logger * log)
DistributedHeader readDistributedHeader(ReadBufferFromFile & in, Poco::Logger * log)
{
DistributedHeader header;
DistributedHeader distributed_header;
UInt64 query_size;
readVarUInt(query_size, in);
@ -135,17 +137,25 @@ namespace
LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features.");
}
readStringBinary(header.insert_query, header_buf);
header.insert_settings.read(header_buf);
readStringBinary(distributed_header.insert_query, header_buf);
distributed_header.insert_settings.read(header_buf);
if (header_buf.hasPendingData())
header.client_info.read(header_buf, initiator_revision);
distributed_header.client_info.read(header_buf, initiator_revision);
if (header_buf.hasPendingData())
{
readVarUInt(header.rows, header_buf);
readVarUInt(header.bytes, header_buf);
readStringBinary(header.header, header_buf);
readVarUInt(distributed_header.rows, header_buf);
readVarUInt(distributed_header.bytes, header_buf);
readStringBinary(distributed_header.block_header_string, header_buf);
}
if (header_buf.hasPendingData())
{
NativeBlockInputStream header_block_in(header_buf, DBMS_TCP_PROTOCOL_VERSION);
distributed_header.block_header = header_block_in.read();
if (!distributed_header.block_header)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read header from the {} batch", in.getFileName());
}
/// Add handling new data here, for example:
@ -155,20 +165,20 @@ namespace
///
/// And note that it is safe, because we have checksum and size for header.
return header;
return distributed_header;
}
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT)
{
header.insert_settings.read(in, SettingsWriteFormat::BINARY);
readStringBinary(header.insert_query, in);
return header;
distributed_header.insert_settings.read(in, SettingsWriteFormat::BINARY);
readStringBinary(distributed_header.insert_query, in);
return distributed_header;
}
header.insert_query.resize(query_size);
in.readStrict(header.insert_query.data(), query_size);
distributed_header.insert_query.resize(query_size);
in.readStrict(distributed_header.insert_query.data(), query_size);
return header;
return distributed_header;
}
/// remote_error argument is used to decide whether some errors should be
@ -200,35 +210,58 @@ namespace
return nullptr;
}
void writeRemoteConvert(const DistributedHeader & header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log)
void writeAndConvert(RemoteBlockOutputStream & remote, ReadBufferFromFile & in)
{
if (remote.getHeader() && header.header != remote.getHeader().dumpStructure())
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
block_in.readPrefix();
while (Block block = block_in.read())
{
LOG_WARNING(log,
"Structure does not match (remote: {}, local: {}), implicit conversion will be done",
remote.getHeader().dumpStructure(), header.header);
CompressedReadBuffer decompressing_in(in);
/// Lack of header, requires to read blocks
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
block_in.readPrefix();
while (Block block = block_in.read())
{
ConvertingBlockInputStream convert(
std::make_shared<OneBlockInputStream>(block),
remote.getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name);
auto adopted_block = convert.read();
remote.write(adopted_block);
}
block_in.readSuffix();
ConvertingBlockInputStream convert(
std::make_shared<OneBlockInputStream>(block),
remote.getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name);
auto adopted_block = convert.read();
remote.write(adopted_block);
}
else
block_in.readSuffix();
}
void writeRemoteConvert(const DistributedHeader & distributed_header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log)
{
if (!remote.getHeader())
{
CheckingCompressedReadBuffer checking_in(in);
remote.writePrepared(checking_in);
return;
}
/// This is old format, that does not have header for the block in the file header,
/// applying ConvertingBlockInputStream in this case is not a big overhead.
///
/// Anyway we can get header only from the first block, which contain all rows anyway.
if (!distributed_header.block_header)
{
LOG_TRACE(log, "Processing batch {} with old format (no header)", in.getFileName());
writeAndConvert(remote, in);
return;
}
if (!blocksHaveEqualStructure(distributed_header.block_header, remote.getHeader()))
{
LOG_WARNING(log,
"Structure does not match (remote: {}, local: {}), implicit conversion will be done",
remote.getHeader().dumpStructure(), distributed_header.block_header.dumpStructure());
writeAndConvert(remote, in);
return;
}
CheckingCompressedReadBuffer checking_in(in);
remote.writePrepared(checking_in);
}
}
@ -498,13 +531,15 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
ReadBufferFromFile in(file_path);
const auto & header = readDistributedHeader(in, log);
const auto & distributed_header = readDistributedHeader(in, log);
auto connection = pool->get(timeouts, &header.insert_settings);
auto connection = pool->get(timeouts, &distributed_header.insert_settings);
RemoteBlockOutputStream remote{*connection, timeouts,
header.insert_query, header.insert_settings, header.client_info};
distributed_header.insert_query,
distributed_header.insert_settings,
distributed_header.client_info};
remote.writePrefix();
writeRemoteConvert(header, remote, in, log);
writeRemoteConvert(distributed_header, remote, in, log);
remote.writeSuffix();
}
catch (const Exception & e)
@ -523,20 +558,21 @@ struct StorageDistributedDirectoryMonitor::BatchHeader
Settings settings;
String query;
ClientInfo client_info;
String sample_block_structure;
Block header;
BatchHeader(Settings settings_, String query_, ClientInfo client_info_, String sample_block_structure_)
BatchHeader(Settings settings_, String query_, ClientInfo client_info_, Block header_)
: settings(std::move(settings_))
, query(std::move(query_))
, client_info(std::move(client_info_))
, sample_block_structure(std::move(sample_block_structure_))
, header(std::move(header_))
{
}
bool operator==(const BatchHeader & other) const
{
return std::tie(settings, query, client_info.query_kind, sample_block_structure) ==
std::tie(other.settings, other.query, other.client_info.query_kind, other.sample_block_structure);
return std::tie(settings, query, client_info.query_kind) ==
std::tie(other.settings, other.query, other.client_info.query_kind) &&
blocksHaveEqualStructure(header, other.header);
}
struct Hash
@ -545,7 +581,7 @@ struct StorageDistributedDirectoryMonitor::BatchHeader
{
SipHash hash_state;
hash_state.update(batch_header.query.data(), batch_header.query.size());
hash_state.update(batch_header.sample_block_structure.data(), batch_header.sample_block_structure.size());
batch_header.header.updateHash(hash_state);
return hash_state.get64();
}
};
@ -632,16 +668,17 @@ struct StorageDistributedDirectoryMonitor::Batch
}
ReadBufferFromFile in(file_path->second);
const auto & header = readDistributedHeader(in, parent.log);
const auto & distributed_header = readDistributedHeader(in, parent.log);
if (!remote)
{
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts,
header.insert_query, header.insert_settings, header.client_info);
distributed_header.insert_query,
distributed_header.insert_settings,
distributed_header.client_info);
remote->writePrefix();
}
writeRemoteConvert(header, *remote, in, parent.log);
writeRemoteConvert(distributed_header, *remote, in, parent.log);
}
if (remote)
@ -808,22 +845,27 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
size_t total_rows = 0;
size_t total_bytes = 0;
std::string sample_block_structure;
DistributedHeader header;
Block header;
DistributedHeader distributed_header;
try
{
/// Determine metadata of the current file and check if it is not broken.
ReadBufferFromFile in{file_path};
header = readDistributedHeader(in, log);
distributed_header = readDistributedHeader(in, log);
if (header.rows)
if (distributed_header.rows)
{
total_rows += header.rows;
total_bytes += header.bytes;
sample_block_structure = header.header;
total_rows += distributed_header.rows;
total_bytes += distributed_header.bytes;
}
else
if (distributed_header.block_header)
header = distributed_header.block_header;
if (!total_rows || !header)
{
LOG_TRACE(log, "Processing batch {} with old format (no header/rows)", in.getFileName());
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
block_in.readPrefix();
@ -833,8 +875,8 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
total_rows += block.rows();
total_bytes += block.bytes();
if (sample_block_structure.empty())
sample_block_structure = block.cloneEmpty().dumpStructure();
if (!header)
header = block.cloneEmpty();
}
block_in.readSuffix();
}
@ -850,7 +892,12 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
throw;
}
BatchHeader batch_header(std::move(header.insert_settings), std::move(header.insert_query), std::move(header.client_info), std::move(sample_block_structure));
BatchHeader batch_header(
std::move(distributed_header.insert_settings),
std::move(distributed_header.insert_query),
std::move(distributed_header.client_info),
std::move(header)
);
Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second;
batch.file_indices.push_back(file_idx);

View File

@ -679,7 +679,13 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
context.getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
writeVarUInt(block.rows(), header_buf);
writeVarUInt(block.bytes(), header_buf);
writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf);
writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf); /// obsolete
/// Write block header separately in the batch header.
/// It is required for checking does conversion is required or not.
{
NativeBlockOutputStream header_stream{header_buf, DBMS_TCP_PROTOCOL_VERSION, block.cloneEmpty()};
header_stream.write(block.cloneEmpty());
}
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);

View File

@ -0,0 +1,22 @@
DROP TABLE IF EXISTS tmp_01781;
DROP TABLE IF EXISTS dist_01781;
SET prefer_localhost_replica=0;
CREATE TABLE tmp_01781 (n LowCardinality(String)) ENGINE=Memory;
CREATE TABLE dist_01781 (n LowCardinality(String)) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01781, cityHash64(n));
SET insert_distributed_sync=1;
INSERT INTO dist_01781 VALUES ('1'),('2');
-- different LowCardinality size
INSERT INTO dist_01781 SELECT * FROM numbers(1000);
SET insert_distributed_sync=0;
SYSTEM STOP DISTRIBUTED SENDS dist_01781;
INSERT INTO dist_01781 VALUES ('1'),('2');
-- different LowCardinality size
INSERT INTO dist_01781 SELECT * FROM numbers(1000);
SYSTEM FLUSH DISTRIBUTED dist_01781;
DROP TABLE tmp_01781;
DROP TABLE dist_01781;

View File

@ -0,0 +1,6 @@
<Warning> DistributedBlockOutputStream: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done.
<Warning> DistributedBlockOutputStream: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done.
1
1
2
2

View File

@ -0,0 +1,30 @@
#!/usr/bin/env bash
# NOTE: this is a partial copy of the 01683_dist_INSERT_block_structure_mismatch,
# but this test also checks the log messages
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --prefer_localhost_replica=0 -nm -q "
DROP TABLE IF EXISTS tmp_01683;
DROP TABLE IF EXISTS dist_01683;
CREATE TABLE tmp_01683 (n Int8) ENGINE=Memory;
CREATE TABLE dist_01683 (n UInt64) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01683, n);
SET insert_distributed_sync=1;
INSERT INTO dist_01683 VALUES (1),(2);
SET insert_distributed_sync=0;
INSERT INTO dist_01683 VALUES (1),(2);
SYSTEM FLUSH DISTRIBUTED dist_01683;
-- TODO: cover distributed_directory_monitor_batch_inserts=1
SELECT * FROM tmp_01683 ORDER BY n;
DROP TABLE tmp_01683;
DROP TABLE dist_01683;
" |& sed 's/^.*</</g'

View File

@ -225,3 +225,5 @@
01702_system_query_log
01759_optimize_skip_unused_shards_zero_shards
01780_clickhouse_dictionary_source_loop
01790_dist_INSERT_block_structure_mismatch_types_and_names
01791_dist_INSERT_block_structure_mismatch