From f53c9a6b25e7b55920660bc711d1a1bdd1d1f787 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 1 Feb 2021 21:02:36 +0300 Subject: [PATCH 1/5] Fix "Block structure mismatch" for INSERT into Distributed Add missing conversion (via ConvertingBlockInputStream) for INSERT into remote nodes (for sync insert, async insert and async batch insert), like for local nodes (in DistributedBlockOutputStream::writeBlockConverted). This is required when the structure of the Distributed table differs from the structure of the local table. And also add a warning message, to highlight this in logs (since this works slower). Fixes: #19888 --- src/Storages/Distributed/DirectoryMonitor.cpp | 45 +++++++++++++++---- .../DistributedBlockOutputStream.cpp | 42 +++++++++-------- ..._INSERT_block_structure_mismatch.reference | 4 ++ ...3_dist_INSERT_block_structure_mismatch.sql | 23 ++++++++++ 4 files changed, 86 insertions(+), 28 deletions(-) create mode 100644 tests/queries/0_stateless/01683_dist_INSERT_block_structure_mismatch.reference create mode 100644 tests/queries/0_stateless/01683_dist_INSERT_block_structure_mismatch.sql diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 8d1b9103357..bf15ca22ca9 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -1,5 +1,7 @@ #include #include +#include +#include #include #include #include @@ -184,6 +186,37 @@ namespace return disk->getDirectorySyncGuard(path); return nullptr; } + + void writeRemoteConvert(const DistributedHeader & header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log) + { + if (remote.getHeader() && header.header != remote.getHeader().dumpStructure()) + { + LOG_WARNING(log, + "Structure does not match (remote: {}, local: {}), implicit conversion will be done", + remote.getHeader().dumpStructure(), header.header); + + CompressedReadBuffer decompressing_in(in); + /// Lack of header, requires to read blocks + NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION); + + block_in.readPrefix(); + while (Block block = block_in.read()) + { + ConvertingBlockInputStream convert( + std::make_shared(block), + remote.getHeader(), + ConvertingBlockInputStream::MatchColumnsMode::Name); + auto adopted_block = convert.read(); + remote.write(adopted_block); + } + block_in.readSuffix(); + } + else + { + CheckingCompressedReadBuffer checking_in(in); + remote.writePrepared(checking_in); + } + } } @@ -438,11 +471,8 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa auto connection = pool->get(timeouts, &header.insert_settings); RemoteBlockOutputStream remote{*connection, timeouts, header.insert_query, header.insert_settings, header.client_info}; - - CheckingCompressedReadBuffer checking_in(in); - remote.writePrefix(); - remote.writePrepared(checking_in); + writeRemoteConvert(header, remote, in, log); remote.writeSuffix(); } catch (const Exception & e) @@ -560,7 +590,6 @@ struct StorageDistributedDirectoryMonitor::Batch try { std::unique_ptr remote; - bool first = true; for (UInt64 file_idx : file_indices) { @@ -575,16 +604,14 @@ struct StorageDistributedDirectoryMonitor::Batch ReadBufferFromFile in(file_path->second); const auto & header = readDistributedHeader(in, parent.log); - if (first) + if (!remote) { - first = false; remote = std::make_unique(*connection, timeouts, header.insert_query, header.insert_settings, header.client_info); remote->writePrefix(); } - CheckingCompressedReadBuffer checking_in(in); - remote->writePrepared(checking_in); + writeRemoteConvert(header, *remote, in, parent.log); } if (remote) diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index d21764bbb7d..c698c0b18d5 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -60,24 +60,26 @@ namespace ErrorCodes extern const int TIMEOUT_EXCEEDED; } -static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & block, const size_t repeats) +static Block adoptBlock(const Block & header, const Block & block, Poco::Logger * log) { - if (!blocksHaveEqualStructure(out->getHeader(), block)) - { - ConvertingBlockInputStream convert( - std::make_shared(block), - out->getHeader(), - ConvertingBlockInputStream::MatchColumnsMode::Name); - auto adopted_block = convert.read(); + if (blocksHaveEqualStructure(header, block)) + return block; - for (size_t i = 0; i < repeats; ++i) - out->write(adopted_block); - } - else - { - for (size_t i = 0; i < repeats; ++i) - out->write(block); - } + LOG_WARNING(log, + "Structure does not match (remote: {}, local: {}), implicit conversion will be done.", + header.dumpStructure(), block.dumpStructure()); + + ConvertingBlockInputStream convert( + std::make_shared(block), + header, + ConvertingBlockInputStream::MatchColumnsMode::Name); + return convert.read(); +} +static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & block, const size_t repeats, Poco::Logger * log) +{ + Block adopted_block = adoptBlock(out->getHeader(), block, log); + for (size_t i = 0; i < repeats; ++i) + out->write(adopted_block); } @@ -343,7 +345,9 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep } CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; - job.stream->write(shard_block); + + Block adopted_shard_block = adoptBlock(job.stream->getHeader(), shard_block, log); + job.stream->write(adopted_shard_block); } else // local { @@ -367,7 +371,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep job.stream->writePrefix(); } - writeBlockConvert(job.stream, shard_block, shard_info.getLocalNodeCount()); + writeBlockConvert(job.stream, shard_block, shard_info.getLocalNodeCount(), log); } job.blocks_written += 1; @@ -589,7 +593,7 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_ auto block_io = interp.execute(); block_io.out->writePrefix(); - writeBlockConvert(block_io.out, block, repeats); + writeBlockConvert(block_io.out, block, repeats, log); block_io.out->writeSuffix(); } diff --git a/tests/queries/0_stateless/01683_dist_INSERT_block_structure_mismatch.reference b/tests/queries/0_stateless/01683_dist_INSERT_block_structure_mismatch.reference new file mode 100644 index 00000000000..be589c9ceb0 --- /dev/null +++ b/tests/queries/0_stateless/01683_dist_INSERT_block_structure_mismatch.reference @@ -0,0 +1,4 @@ +1 +1 +2 +2 diff --git a/tests/queries/0_stateless/01683_dist_INSERT_block_structure_mismatch.sql b/tests/queries/0_stateless/01683_dist_INSERT_block_structure_mismatch.sql new file mode 100644 index 00000000000..eaf15ed9fd8 --- /dev/null +++ b/tests/queries/0_stateless/01683_dist_INSERT_block_structure_mismatch.sql @@ -0,0 +1,23 @@ +DROP TABLE IF EXISTS tmp_01683; +DROP TABLE IF EXISTS dist_01683; + +SET prefer_localhost_replica=0; +-- To suppress "Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done." +SET send_logs_level='error'; + +CREATE TABLE tmp_01683 (n Int8) ENGINE=Memory; +CREATE TABLE dist_01683 (n UInt64) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01683, n); + +SET insert_distributed_sync=1; +INSERT INTO dist_01683 VALUES (1),(2); + +SET insert_distributed_sync=0; +INSERT INTO dist_01683 VALUES (1),(2); +SYSTEM FLUSH DISTRIBUTED dist_01683; + +-- TODO: cover distributed_directory_monitor_batch_inserts=1 + +SELECT * FROM tmp_01683 ORDER BY n; + +DROP TABLE tmp_01683; +DROP TABLE dist_01683; From 594c6b0dd4471117629a848a686a2dcb6fb4095e Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 1 Feb 2021 21:02:36 +0300 Subject: [PATCH 2/5] Suppress warnings in 00967_insert_into_distributed_different_types --- .../00967_insert_into_distributed_different_types.sql | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/queries/0_stateless/00967_insert_into_distributed_different_types.sql b/tests/queries/0_stateless/00967_insert_into_distributed_different_types.sql index 455fab694cd..6324c6a6c10 100644 --- a/tests/queries/0_stateless/00967_insert_into_distributed_different_types.sql +++ b/tests/queries/0_stateless/00967_insert_into_distributed_different_types.sql @@ -1,6 +1,9 @@ DROP TABLE IF EXISTS dist_00967; DROP TABLE IF EXISTS underlying_00967; +-- To suppress "Structure does not match (...), implicit conversion will be done." message +SET send_logs_level='error'; + CREATE TABLE dist_00967 (key UInt64) Engine=Distributed('test_shard_localhost', currentDatabase(), underlying_00967); -- fails for TinyLog()/MergeTree()/... but not for Memory() CREATE TABLE underlying_00967 (key Nullable(UInt64)) Engine=TinyLog(); From edd79e3fafe79da1c124474c4b4d368c2e77d9cd Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 2 Feb 2021 01:10:08 +0300 Subject: [PATCH 3/5] Suppress warnings in 01457_create_as_table_function_structure --- .../0_stateless/01457_create_as_table_function_structure.sql | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/queries/0_stateless/01457_create_as_table_function_structure.sql b/tests/queries/0_stateless/01457_create_as_table_function_structure.sql index 1c9c1e1ef44..9399f06220b 100644 --- a/tests/queries/0_stateless/01457_create_as_table_function_structure.sql +++ b/tests/queries/0_stateless/01457_create_as_table_function_structure.sql @@ -18,6 +18,9 @@ DROP TABLE tmp; DETACH DATABASE test_01457; ATTACH DATABASE test_01457; +-- To suppress "Structure does not match (...), implicit conversion will be done." message +SET send_logs_level='error'; + CREATE TABLE tmp (n Int8) ENGINE=Memory; INSERT INTO test_01457.tf_remote_explicit_structure VALUES ('42'); SELECT * FROM tmp; From 5070b8a76a68d690ea2bb0f378d9c2837ba356ae Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 2 Feb 2021 21:38:29 +0300 Subject: [PATCH 4/5] Update test_insert_distributed_async_send for recent block conversion changes After the implicit conversion had been added, String and Nullable(String) successfully converted, let's use UInt64 over Nullable(String). --- .../test.py | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/tests/integration/test_insert_distributed_async_send/test.py b/tests/integration/test_insert_distributed_async_send/test.py index 7f6a2887c3b..b469da4e2e1 100644 --- a/tests/integration/test_insert_distributed_async_send/test.py +++ b/tests/integration/test_insert_distributed_async_send/test.py @@ -175,38 +175,43 @@ def test_insert_distributed_async_send_different_header(batch): create_tables('insert_distributed_async_send_cluster_two_shards') node = get_node(batch) - node.query("INSERT INTO dist VALUES (0, '')", settings={ + node.query("INSERT INTO dist VALUES (0, 'f')", settings={ 'prefer_localhost_replica': 0, }) - node.query('ALTER TABLE dist MODIFY COLUMN value Nullable(String)') - node.query("INSERT INTO dist VALUES (2, '')", settings={ + node.query('ALTER TABLE dist MODIFY COLUMN value UInt64') + node.query("INSERT INTO dist VALUES (2, 1)", settings={ 'prefer_localhost_replica': 0, }) + n1.query('ALTER TABLE data MODIFY COLUMN value UInt64', settings={ + 'mutations_sync': 1, + }) + if batch: - # first batch with Nullable(String) - n1.query('ALTER TABLE data MODIFY COLUMN value Nullable(String)', settings={ - 'mutations_sync': 1, - }) - # but only one batch will be sent - with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot convert: String to Nullable\(String\)\. Stack trace:"): + # but only one batch will be sent, and first is with UInt64 column, so + # one rows inserted, and for string ('f') exception will be throw. + with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot parse string 'f' as UInt64: syntax error at begin of string"): node.query('SYSTEM FLUSH DISTRIBUTED dist') assert int(n1.query('SELECT count() FROM data')) == 1 - # second batch with String - n1.query('ALTER TABLE data MODIFY COLUMN value String', settings={ - 'mutations_sync': 1, - }) + # but once underlying column String, implicit conversion will do the + # thing, and insert left batch. + n1.query(""" + DROP TABLE data SYNC; + CREATE TABLE data (key Int, value String) Engine=MergeTree() ORDER BY key; + """) node.query('SYSTEM FLUSH DISTRIBUTED dist') - assert int(n1.query('SELECT count() FROM data')) == 2 - else: - # first send with String - with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot convert: Nullable\(String\) to String\. Stack trace:"): - node.query('SYSTEM FLUSH DISTRIBUTED dist') assert int(n1.query('SELECT count() FROM data')) == 1 - # second send with Nullable(String) - n1.query('ALTER TABLE data MODIFY COLUMN value Nullable(String)', settings={ - 'mutations_sync': 1, - }) + else: + # first send with String ('f'), so zero rows will be inserted + with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot parse string 'f' as UInt64: syntax error at begin of string"): + node.query('SYSTEM FLUSH DISTRIBUTED dist') + assert int(n1.query('SELECT count() FROM data')) == 0 + # but once underlying column String, implicit conversion will do the + # thing, and insert 2 rows (mixed UInt64 and String). + n1.query(""" + DROP TABLE data SYNC; + CREATE TABLE data (key Int, value String) Engine=MergeTree() ORDER BY key; + """) node.query('SYSTEM FLUSH DISTRIBUTED dist') assert int(n1.query('SELECT count() FROM data')) == 2 From 22bedec33ebc84b768fddcae63c17ca2928ba547 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 3 Feb 2021 08:00:37 +0300 Subject: [PATCH 5/5] Add 01683_dist_INSERT_block_structure_mismatch into arcadia_skip_list --- tests/queries/0_stateless/arcadia_skip_list.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/queries/0_stateless/arcadia_skip_list.txt b/tests/queries/0_stateless/arcadia_skip_list.txt index 5b8256bb5af..76be95863cb 100644 --- a/tests/queries/0_stateless/arcadia_skip_list.txt +++ b/tests/queries/0_stateless/arcadia_skip_list.txt @@ -199,3 +199,4 @@ 01675_data_type_coroutine 01671_aggregate_function_group_bitmap_data 01674_executable_dictionary_implicit_key +01683_dist_INSERT_block_structure_mismatch