Compare Block itself for distributed async INSERT batches

INSERT into Distributed with insert_distributed_sync=1 stores the
distributed batches on the disk for sending in background.

But types may be a little bit different for the Distributed and it's
underlying table, so the initiator need to know whether conversion is
required or not.

Before this patch those on disk distributed batches contains header,
which includes dumpStructure() for the block in that batch, however it
checks not only names and types and plus dumpStructure() is a debug
method.

So instead of storing string representation for the block header we
should store empty block in the file header (note, that we cannot store
the empty block not in header, since this will require reading all
blocks from file, due to some trickery of the readers interface).

Note, that this patch also contains tiny refactoring:
- s/header/distributed_header/

v1: dumpNamesAndTypes()
v2: dump empty block into the batch itself
v3: move empty block into the header
This commit is contained in:
Azat Khuzhin 2021-04-02 07:09:39 +03:00
parent cf2931384b
commit 27d4fbd13b
7 changed files with 177 additions and 64 deletions

View File

@ -104,12 +104,14 @@ namespace
size_t rows = 0;
size_t bytes = 0;
std::string header;
/// dumpStructure() of the header -- obsolete
std::string block_header_string;
Block block_header;
};
DistributedHeader readDistributedHeader(ReadBuffer & in, Poco::Logger * log)
DistributedHeader readDistributedHeader(ReadBufferFromFile & in, Poco::Logger * log)
{
DistributedHeader header;
DistributedHeader distributed_header;
UInt64 query_size;
readVarUInt(query_size, in);
@ -135,17 +137,25 @@ namespace
LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features.");
}
readStringBinary(header.insert_query, header_buf);
header.insert_settings.read(header_buf);
readStringBinary(distributed_header.insert_query, header_buf);
distributed_header.insert_settings.read(header_buf);
if (header_buf.hasPendingData())
header.client_info.read(header_buf, initiator_revision);
distributed_header.client_info.read(header_buf, initiator_revision);
if (header_buf.hasPendingData())
{
readVarUInt(header.rows, header_buf);
readVarUInt(header.bytes, header_buf);
readStringBinary(header.header, header_buf);
readVarUInt(distributed_header.rows, header_buf);
readVarUInt(distributed_header.bytes, header_buf);
readStringBinary(distributed_header.block_header_string, header_buf);
}
if (header_buf.hasPendingData())
{
NativeBlockInputStream header_block_in(header_buf, DBMS_TCP_PROTOCOL_VERSION);
distributed_header.block_header = header_block_in.read();
if (!distributed_header.block_header)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read header from the {} batch", in.getFileName());
}
/// Add handling new data here, for example:
@ -155,20 +165,20 @@ namespace
///
/// And note that it is safe, because we have checksum and size for header.
return header;
return distributed_header;
}
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT)
{
header.insert_settings.read(in, SettingsWriteFormat::BINARY);
readStringBinary(header.insert_query, in);
return header;
distributed_header.insert_settings.read(in, SettingsWriteFormat::BINARY);
readStringBinary(distributed_header.insert_query, in);
return distributed_header;
}
header.insert_query.resize(query_size);
in.readStrict(header.insert_query.data(), query_size);
distributed_header.insert_query.resize(query_size);
in.readStrict(distributed_header.insert_query.data(), query_size);
return header;
return distributed_header;
}
/// remote_error argument is used to decide whether some errors should be
@ -200,35 +210,58 @@ namespace
return nullptr;
}
void writeRemoteConvert(const DistributedHeader & header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log)
void writeAndConvert(RemoteBlockOutputStream & remote, ReadBufferFromFile & in)
{
if (remote.getHeader() && header.header != remote.getHeader().dumpStructure())
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
block_in.readPrefix();
while (Block block = block_in.read())
{
LOG_WARNING(log,
"Structure does not match (remote: {}, local: {}), implicit conversion will be done",
remote.getHeader().dumpStructure(), header.header);
CompressedReadBuffer decompressing_in(in);
/// Lack of header, requires to read blocks
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
block_in.readPrefix();
while (Block block = block_in.read())
{
ConvertingBlockInputStream convert(
std::make_shared<OneBlockInputStream>(block),
remote.getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name);
auto adopted_block = convert.read();
remote.write(adopted_block);
}
block_in.readSuffix();
ConvertingBlockInputStream convert(
std::make_shared<OneBlockInputStream>(block),
remote.getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name);
auto adopted_block = convert.read();
remote.write(adopted_block);
}
else
block_in.readSuffix();
}
void writeRemoteConvert(const DistributedHeader & distributed_header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log)
{
if (!remote.getHeader())
{
CheckingCompressedReadBuffer checking_in(in);
remote.writePrepared(checking_in);
return;
}
/// This is old format, that does not have header for the block in the file header,
/// applying ConvertingBlockInputStream in this case is not a big overhead.
///
/// Anyway we can get header only from the first block, which contain all rows anyway.
if (!distributed_header.block_header)
{
LOG_TRACE(log, "Processing batch {} with old format (no header)", in.getFileName());
writeAndConvert(remote, in);
return;
}
if (!blocksHaveEqualStructure(distributed_header.block_header, remote.getHeader()))
{
LOG_WARNING(log,
"Structure does not match (remote: {}, local: {}), implicit conversion will be done",
remote.getHeader().dumpStructure(), distributed_header.block_header.dumpStructure());
writeAndConvert(remote, in);
return;
}
CheckingCompressedReadBuffer checking_in(in);
remote.writePrepared(checking_in);
}
}
@ -498,13 +531,15 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
ReadBufferFromFile in(file_path);
const auto & header = readDistributedHeader(in, log);
const auto & distributed_header = readDistributedHeader(in, log);
auto connection = pool->get(timeouts, &header.insert_settings);
auto connection = pool->get(timeouts, &distributed_header.insert_settings);
RemoteBlockOutputStream remote{*connection, timeouts,
header.insert_query, header.insert_settings, header.client_info};
distributed_header.insert_query,
distributed_header.insert_settings,
distributed_header.client_info};
remote.writePrefix();
writeRemoteConvert(header, remote, in, log);
writeRemoteConvert(distributed_header, remote, in, log);
remote.writeSuffix();
}
catch (const Exception & e)
@ -523,20 +558,21 @@ struct StorageDistributedDirectoryMonitor::BatchHeader
Settings settings;
String query;
ClientInfo client_info;
String sample_block_structure;
Block header;
BatchHeader(Settings settings_, String query_, ClientInfo client_info_, String sample_block_structure_)
BatchHeader(Settings settings_, String query_, ClientInfo client_info_, Block header_)
: settings(std::move(settings_))
, query(std::move(query_))
, client_info(std::move(client_info_))
, sample_block_structure(std::move(sample_block_structure_))
, header(std::move(header_))
{
}
bool operator==(const BatchHeader & other) const
{
return std::tie(settings, query, client_info.query_kind, sample_block_structure) ==
std::tie(other.settings, other.query, other.client_info.query_kind, other.sample_block_structure);
return std::tie(settings, query, client_info.query_kind) ==
std::tie(other.settings, other.query, other.client_info.query_kind) &&
blocksHaveEqualStructure(header, other.header);
}
struct Hash
@ -545,7 +581,7 @@ struct StorageDistributedDirectoryMonitor::BatchHeader
{
SipHash hash_state;
hash_state.update(batch_header.query.data(), batch_header.query.size());
hash_state.update(batch_header.sample_block_structure.data(), batch_header.sample_block_structure.size());
batch_header.header.updateHash(hash_state);
return hash_state.get64();
}
};
@ -632,16 +668,17 @@ struct StorageDistributedDirectoryMonitor::Batch
}
ReadBufferFromFile in(file_path->second);
const auto & header = readDistributedHeader(in, parent.log);
const auto & distributed_header = readDistributedHeader(in, parent.log);
if (!remote)
{
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts,
header.insert_query, header.insert_settings, header.client_info);
distributed_header.insert_query,
distributed_header.insert_settings,
distributed_header.client_info);
remote->writePrefix();
}
writeRemoteConvert(header, *remote, in, parent.log);
writeRemoteConvert(distributed_header, *remote, in, parent.log);
}
if (remote)
@ -808,22 +845,27 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
size_t total_rows = 0;
size_t total_bytes = 0;
std::string sample_block_structure;
DistributedHeader header;
Block header;
DistributedHeader distributed_header;
try
{
/// Determine metadata of the current file and check if it is not broken.
ReadBufferFromFile in{file_path};
header = readDistributedHeader(in, log);
distributed_header = readDistributedHeader(in, log);
if (header.rows)
if (distributed_header.rows)
{
total_rows += header.rows;
total_bytes += header.bytes;
sample_block_structure = header.header;
total_rows += distributed_header.rows;
total_bytes += distributed_header.bytes;
}
else
if (distributed_header.block_header)
header = distributed_header.block_header;
if (!total_rows || !header)
{
LOG_TRACE(log, "Processing batch {} with old format (no header/rows)", in.getFileName());
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
block_in.readPrefix();
@ -833,8 +875,8 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
total_rows += block.rows();
total_bytes += block.bytes();
if (sample_block_structure.empty())
sample_block_structure = block.cloneEmpty().dumpStructure();
if (!header)
header = block.cloneEmpty();
}
block_in.readSuffix();
}
@ -850,7 +892,12 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
throw;
}
BatchHeader batch_header(std::move(header.insert_settings), std::move(header.insert_query), std::move(header.client_info), std::move(sample_block_structure));
BatchHeader batch_header(
std::move(distributed_header.insert_settings),
std::move(distributed_header.insert_query),
std::move(distributed_header.client_info),
std::move(header)
);
Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second;
batch.file_indices.push_back(file_idx);

View File

@ -679,7 +679,13 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
context.getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
writeVarUInt(block.rows(), header_buf);
writeVarUInt(block.bytes(), header_buf);
writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf);
writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf); /// obsolete
/// Write block header separately in the batch header.
/// It is required for checking does conversion is required or not.
{
NativeBlockOutputStream header_stream{header_buf, DBMS_TCP_PROTOCOL_VERSION, block.cloneEmpty()};
header_stream.write(block.cloneEmpty());
}
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);

View File

@ -0,0 +1,22 @@
DROP TABLE IF EXISTS tmp_01781;
DROP TABLE IF EXISTS dist_01781;
SET prefer_localhost_replica=0;
CREATE TABLE tmp_01781 (n LowCardinality(String)) ENGINE=Memory;
CREATE TABLE dist_01781 (n LowCardinality(String)) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01781, cityHash64(n));
SET insert_distributed_sync=1;
INSERT INTO dist_01781 VALUES ('1'),('2');
-- different LowCardinality size
INSERT INTO dist_01781 SELECT * FROM numbers(1000);
SET insert_distributed_sync=0;
SYSTEM STOP DISTRIBUTED SENDS dist_01781;
INSERT INTO dist_01781 VALUES ('1'),('2');
-- different LowCardinality size
INSERT INTO dist_01781 SELECT * FROM numbers(1000);
SYSTEM FLUSH DISTRIBUTED dist_01781;
DROP TABLE tmp_01781;
DROP TABLE dist_01781;

View File

@ -0,0 +1,6 @@
<Warning> DistributedBlockOutputStream: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done.
<Warning> DistributedBlockOutputStream: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done.
1
1
2
2

View File

@ -0,0 +1,30 @@
#!/usr/bin/env bash
# NOTE: this is a partial copy of the 01683_dist_INSERT_block_structure_mismatch,
# but this test also checks the log messages
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --prefer_localhost_replica=0 -nm -q "
DROP TABLE IF EXISTS tmp_01683;
DROP TABLE IF EXISTS dist_01683;
CREATE TABLE tmp_01683 (n Int8) ENGINE=Memory;
CREATE TABLE dist_01683 (n UInt64) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01683, n);
SET insert_distributed_sync=1;
INSERT INTO dist_01683 VALUES (1),(2);
SET insert_distributed_sync=0;
INSERT INTO dist_01683 VALUES (1),(2);
SYSTEM FLUSH DISTRIBUTED dist_01683;
-- TODO: cover distributed_directory_monitor_batch_inserts=1
SELECT * FROM tmp_01683 ORDER BY n;
DROP TABLE tmp_01683;
DROP TABLE dist_01683;
" |& sed 's/^.*</</g'

View File

@ -225,3 +225,5 @@
01702_system_query_log
01759_optimize_skip_unused_shards_zero_shards
01780_clickhouse_dictionary_source_loop
01790_dist_INSERT_block_structure_mismatch_types_and_names
01791_dist_INSERT_block_structure_mismatch