Merge pull request #19947 from azat/dist-INSERT-block-structure-mismatch

Fix "Block structure mismatch" for INSERT into Distributed
This commit is contained in:
alexey-milovidov 2021-02-03 21:27:45 +03:00 committed by GitHub
commit 402c018d1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 120 additions and 50 deletions

View File

@ -1,5 +1,7 @@
#include <DataStreams/RemoteBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Common/escapeForFileName.h>
#include <Common/CurrentMetrics.h>
#include <Common/StringUtils/StringUtils.h>
@ -184,6 +186,37 @@ namespace
return disk->getDirectorySyncGuard(path);
return nullptr;
}
void writeRemoteConvert(const DistributedHeader & header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log)
{
if (remote.getHeader() && header.header != remote.getHeader().dumpStructure())
{
LOG_WARNING(log,
"Structure does not match (remote: {}, local: {}), implicit conversion will be done",
remote.getHeader().dumpStructure(), header.header);
CompressedReadBuffer decompressing_in(in);
/// Lack of header, requires to read blocks
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
block_in.readPrefix();
while (Block block = block_in.read())
{
ConvertingBlockInputStream convert(
std::make_shared<OneBlockInputStream>(block),
remote.getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name);
auto adopted_block = convert.read();
remote.write(adopted_block);
}
block_in.readSuffix();
}
else
{
CheckingCompressedReadBuffer checking_in(in);
remote.writePrepared(checking_in);
}
}
}
@ -438,11 +471,8 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
auto connection = pool->get(timeouts, &header.insert_settings);
RemoteBlockOutputStream remote{*connection, timeouts,
header.insert_query, header.insert_settings, header.client_info};
CheckingCompressedReadBuffer checking_in(in);
remote.writePrefix();
remote.writePrepared(checking_in);
writeRemoteConvert(header, remote, in, log);
remote.writeSuffix();
}
catch (const Exception & e)
@ -560,7 +590,6 @@ struct StorageDistributedDirectoryMonitor::Batch
try
{
std::unique_ptr<RemoteBlockOutputStream> remote;
bool first = true;
for (UInt64 file_idx : file_indices)
{
@ -575,16 +604,14 @@ struct StorageDistributedDirectoryMonitor::Batch
ReadBufferFromFile in(file_path->second);
const auto & header = readDistributedHeader(in, parent.log);
if (first)
if (!remote)
{
first = false;
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts,
header.insert_query, header.insert_settings, header.client_info);
remote->writePrefix();
}
CheckingCompressedReadBuffer checking_in(in);
remote->writePrepared(checking_in);
writeRemoteConvert(header, *remote, in, parent.log);
}
if (remote)

View File

@ -60,24 +60,26 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
}
static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & block, const size_t repeats)
static Block adoptBlock(const Block & header, const Block & block, Poco::Logger * log)
{
if (!blocksHaveEqualStructure(out->getHeader(), block))
{
ConvertingBlockInputStream convert(
std::make_shared<OneBlockInputStream>(block),
out->getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name);
auto adopted_block = convert.read();
if (blocksHaveEqualStructure(header, block))
return block;
for (size_t i = 0; i < repeats; ++i)
out->write(adopted_block);
}
else
{
for (size_t i = 0; i < repeats; ++i)
out->write(block);
}
LOG_WARNING(log,
"Structure does not match (remote: {}, local: {}), implicit conversion will be done.",
header.dumpStructure(), block.dumpStructure());
ConvertingBlockInputStream convert(
std::make_shared<OneBlockInputStream>(block),
header,
ConvertingBlockInputStream::MatchColumnsMode::Name);
return convert.read();
}
static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & block, const size_t repeats, Poco::Logger * log)
{
Block adopted_block = adoptBlock(out->getHeader(), block, log);
for (size_t i = 0; i < repeats; ++i)
out->write(adopted_block);
}
@ -343,7 +345,9 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
}
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
job.stream->write(shard_block);
Block adopted_shard_block = adoptBlock(job.stream->getHeader(), shard_block, log);
job.stream->write(adopted_shard_block);
}
else // local
{
@ -367,7 +371,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
job.stream->writePrefix();
}
writeBlockConvert(job.stream, shard_block, shard_info.getLocalNodeCount());
writeBlockConvert(job.stream, shard_block, shard_info.getLocalNodeCount(), log);
}
job.blocks_written += 1;
@ -589,7 +593,7 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_
auto block_io = interp.execute();
block_io.out->writePrefix();
writeBlockConvert(block_io.out, block, repeats);
writeBlockConvert(block_io.out, block, repeats, log);
block_io.out->writeSuffix();
}

View File

@ -175,38 +175,43 @@ def test_insert_distributed_async_send_different_header(batch):
create_tables('insert_distributed_async_send_cluster_two_shards')
node = get_node(batch)
node.query("INSERT INTO dist VALUES (0, '')", settings={
node.query("INSERT INTO dist VALUES (0, 'f')", settings={
'prefer_localhost_replica': 0,
})
node.query('ALTER TABLE dist MODIFY COLUMN value Nullable(String)')
node.query("INSERT INTO dist VALUES (2, '')", settings={
node.query('ALTER TABLE dist MODIFY COLUMN value UInt64')
node.query("INSERT INTO dist VALUES (2, 1)", settings={
'prefer_localhost_replica': 0,
})
n1.query('ALTER TABLE data MODIFY COLUMN value UInt64', settings={
'mutations_sync': 1,
})
if batch:
# first batch with Nullable(String)
n1.query('ALTER TABLE data MODIFY COLUMN value Nullable(String)', settings={
'mutations_sync': 1,
})
# but only one batch will be sent
with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot convert: String to Nullable\(String\)\. Stack trace:"):
# but only one batch will be sent, and first is with UInt64 column, so
# one rows inserted, and for string ('f') exception will be throw.
with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot parse string 'f' as UInt64: syntax error at begin of string"):
node.query('SYSTEM FLUSH DISTRIBUTED dist')
assert int(n1.query('SELECT count() FROM data')) == 1
# second batch with String
n1.query('ALTER TABLE data MODIFY COLUMN value String', settings={
'mutations_sync': 1,
})
# but once underlying column String, implicit conversion will do the
# thing, and insert left batch.
n1.query("""
DROP TABLE data SYNC;
CREATE TABLE data (key Int, value String) Engine=MergeTree() ORDER BY key;
""")
node.query('SYSTEM FLUSH DISTRIBUTED dist')
assert int(n1.query('SELECT count() FROM data')) == 2
else:
# first send with String
with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot convert: Nullable\(String\) to String\. Stack trace:"):
node.query('SYSTEM FLUSH DISTRIBUTED dist')
assert int(n1.query('SELECT count() FROM data')) == 1
# second send with Nullable(String)
n1.query('ALTER TABLE data MODIFY COLUMN value Nullable(String)', settings={
'mutations_sync': 1,
})
else:
# first send with String ('f'), so zero rows will be inserted
with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot parse string 'f' as UInt64: syntax error at begin of string"):
node.query('SYSTEM FLUSH DISTRIBUTED dist')
assert int(n1.query('SELECT count() FROM data')) == 0
# but once underlying column String, implicit conversion will do the
# thing, and insert 2 rows (mixed UInt64 and String).
n1.query("""
DROP TABLE data SYNC;
CREATE TABLE data (key Int, value String) Engine=MergeTree() ORDER BY key;
""")
node.query('SYSTEM FLUSH DISTRIBUTED dist')
assert int(n1.query('SELECT count() FROM data')) == 2

View File

@ -1,6 +1,9 @@
DROP TABLE IF EXISTS dist_00967;
DROP TABLE IF EXISTS underlying_00967;
-- To suppress "Structure does not match (...), implicit conversion will be done." message
SET send_logs_level='error';
CREATE TABLE dist_00967 (key UInt64) Engine=Distributed('test_shard_localhost', currentDatabase(), underlying_00967);
-- fails for TinyLog()/MergeTree()/... but not for Memory()
CREATE TABLE underlying_00967 (key Nullable(UInt64)) Engine=TinyLog();

View File

@ -18,6 +18,9 @@ DROP TABLE tmp;
DETACH DATABASE test_01457;
ATTACH DATABASE test_01457;
-- To suppress "Structure does not match (...), implicit conversion will be done." message
SET send_logs_level='error';
CREATE TABLE tmp (n Int8) ENGINE=Memory;
INSERT INTO test_01457.tf_remote_explicit_structure VALUES ('42');
SELECT * FROM tmp;

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS tmp_01683;
DROP TABLE IF EXISTS dist_01683;
SET prefer_localhost_replica=0;
-- To suppress "Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done."
SET send_logs_level='error';
CREATE TABLE tmp_01683 (n Int8) ENGINE=Memory;
CREATE TABLE dist_01683 (n UInt64) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01683, n);
SET insert_distributed_sync=1;
INSERT INTO dist_01683 VALUES (1),(2);
SET insert_distributed_sync=0;
INSERT INTO dist_01683 VALUES (1),(2);
SYSTEM FLUSH DISTRIBUTED dist_01683;
-- TODO: cover distributed_directory_monitor_batch_inserts=1
SELECT * FROM tmp_01683 ORDER BY n;
DROP TABLE tmp_01683;
DROP TABLE dist_01683;

View File

@ -200,3 +200,4 @@
01676_clickhouse_client_autocomplete
01671_aggregate_function_group_bitmap_data
01674_executable_dictionary_implicit_key
01683_dist_INSERT_block_structure_mismatch