ClickHouse/src/Storages/Distributed/DistributedBlockOutputStream.cpp

721 lines
25 KiB
C++
Raw Normal View History

#include <Storages/Distributed/DistributedBlockOutputStream.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/StorageDistributed.h>
#include <Disks/StoragePolicy.h>
2016-01-28 01:00:42 +00:00
#include <Parsers/formatAST.h>
#include <Parsers/queryToString.h>
2016-01-28 01:00:42 +00:00
#include <IO/WriteBufferFromFile.h>
2018-12-28 18:15:26 +00:00
#include <Compression/CompressedWriteBuffer.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/RemoteBlockOutputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/Context.h>
2016-01-28 01:00:42 +00:00
#include <DataTypes/DataTypesNumber.h>
2018-12-03 13:11:26 +00:00
#include <DataTypes/DataTypeLowCardinality.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
2017-07-13 20:58:19 +00:00
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/escapeForFileName.h>
2018-05-29 18:14:31 +00:00
#include <Common/CurrentThread.h>
#include <Common/createHardLink.h>
#include <Common/DirectorySyncGuard.h>
#include <common/logger_useful.h>
#include <ext/range.h>
#include <ext/scope_guard.h>
2016-01-28 01:00:42 +00:00
2017-05-10 04:29:36 +00:00
#include <Poco/DirectoryIterator.h>
#include <future>
#include <condition_variable>
#include <mutex>
namespace CurrentMetrics
{
extern const Metric DistributedSend;
}
2016-01-28 01:00:42 +00:00
namespace ProfileEvents
{
extern const Event DistributedSyncInsertionTimeoutExceeded;
}
2016-01-28 01:00:42 +00:00
namespace DB
2016-01-28 01:00:42 +00:00
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
}
2020-04-14 21:05:45 +00:00
static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & block, const size_t repeats)
{
if (!blocksHaveEqualStructure(out->getHeader(), block))
{
2020-04-14 21:05:45 +00:00
ConvertingBlockInputStream convert(
std::make_shared<OneBlockInputStream>(block),
out->getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name);
auto adopted_block = convert.read();
for (size_t i = 0; i < repeats; ++i)
out->write(adopted_block);
}
else
{
for (size_t i = 0; i < repeats; ++i)
out->write(block);
}
}
DistributedBlockOutputStream::DistributedBlockOutputStream(
const Context & context_,
StorageDistributed & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const ASTPtr & query_ast_,
const ClusterPtr & cluster_,
bool insert_sync_,
UInt64 insert_timeout_)
: context(context_)
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, query_ast(query_ast_)
, query_string(queryToString(query_ast_))
, cluster(cluster_)
, insert_sync(insert_sync_)
, insert_timeout(insert_timeout_)
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
{
}
Block DistributedBlockOutputStream::getHeader() const
2016-01-28 01:00:42 +00:00
{
return metadata_snapshot->getSampleBlock();
2016-01-28 01:00:42 +00:00
}
void DistributedBlockOutputStream::writePrefix()
{
}
2016-01-28 01:00:42 +00:00
void DistributedBlockOutputStream::write(const Block & block)
{
Fix INSERT into Distributed non local node with MATERIALIZED columns Previous patch e527def18a1bbe5fba0920b7747e9c556fd21ff5 ("Fix INSERT into Distributed() table with MATERIALIZED column") fixes it only for cases when the node is local, i.e. direct insert. This patch address the problem when the node is not local (`is_local == false`), by erasing materialized columns on INSERT into Distributed. And this patch fixes two cases, depends on `insert_distributed_sync` setting: - `insert_distributed_sync=0` ``` Not found column value in block. There are only columns: date. Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5d6cf6 DB::Block::getByName(...) dbms/src/Core/Block.cpp:187 4. 0x7fffec2fe067 DB::NativeBlockInputStream::readImpl() dbms/src/DataStreams/NativeBlockInputStream.cpp:159 5. 0x7fffec2d223f DB::IBlockInputStream::read() dbms/src/DataStreams/IBlockInputStream.cpp:61 6. 0x7ffff7c6d40d DB::TCPHandler::receiveData() dbms/programs/server/TCPHandler.cpp:971 7. 0x7ffff7c6cc1d DB::TCPHandler::receivePacket() dbms/programs/server/TCPHandler.cpp:855 8. 0x7ffff7c6a1ef DB::TCPHandler::readDataNext(unsigned long const&, int const&) dbms/programs/server/TCPHandler.cpp:406 9. 0x7ffff7c6a41b DB::TCPHandler::readData(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:437 10. 0x7ffff7c6a5d9 DB::TCPHandler::processInsertQuery(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:464 11. 0x7ffff7c687b5 DB::TCPHandler::runImpl() dbms/programs/server/TCPHandler.cpp:257 ``` - `insert_distributed_sync=1` ``` 2019.10.18 13:23:22.114578 [ 44 ] {a78f669f-0b08-4337-abf8-d31e958f6d12} <Error> executeQuery: Code: 171, e.displayText() = DB::Exception: Block structure mismatch in RemoteBlockOutputStream stream: different number of columns: date Date UInt16(size = 1), value Date UInt16(size = 1) date Date UInt16(size = 0): Insertion status: Wrote 1 blocks and 0 rows on shard 0 replica 0, 127.0.0.1:59000 (average 0 ms per block) Wrote 0 blocks and 0 rows on shard 1 replica 0, 127.0.0.2:59000 (average 2 ms per block) (version 19.16.1.1) (from [::1]:3624) (in query: INSERT INTO distributed_00952 VALUES ), Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5da4e9 DB::checkBlockStructure<void>(...)::{...}::operator()(...) const dbms/src/Core/Block.cpp:460 4. 0x7fffec5da671 void DB::checkBlockStructure<void>(...) dbms/src/Core/Block.cpp:467 5. 0x7fffec5d8d58 DB::assertBlocksHaveEqualStructure(...) dbms/src/Core/Block.cpp:515 6. 0x7fffec326630 DB::RemoteBlockOutputStream::write(DB::Block const&) dbms/src/DataStreams/RemoteBlockOutputStream.cpp:68 7. 0x7fffe98bd154 DB::DistributedBlockOutputStream::runWritingJob(DB::DistributedBlockOutputStream::JobReplica&, DB::Block const&)::{lambda()#1}::operator()() const dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp:280 <snip> ```` Fixes: #7365 Fixes: #5429 Refs: #6891
2019-10-17 21:33:26 +00:00
Block ordinary_block{ block };
/* They are added by the AddingDefaultBlockOutputStream, and we will get
* different number of columns eventually */
for (const auto & col : metadata_snapshot->getColumns().getMaterialized())
{
Fix INSERT into Distributed non local node with MATERIALIZED columns Previous patch e527def18a1bbe5fba0920b7747e9c556fd21ff5 ("Fix INSERT into Distributed() table with MATERIALIZED column") fixes it only for cases when the node is local, i.e. direct insert. This patch address the problem when the node is not local (`is_local == false`), by erasing materialized columns on INSERT into Distributed. And this patch fixes two cases, depends on `insert_distributed_sync` setting: - `insert_distributed_sync=0` ``` Not found column value in block. There are only columns: date. Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5d6cf6 DB::Block::getByName(...) dbms/src/Core/Block.cpp:187 4. 0x7fffec2fe067 DB::NativeBlockInputStream::readImpl() dbms/src/DataStreams/NativeBlockInputStream.cpp:159 5. 0x7fffec2d223f DB::IBlockInputStream::read() dbms/src/DataStreams/IBlockInputStream.cpp:61 6. 0x7ffff7c6d40d DB::TCPHandler::receiveData() dbms/programs/server/TCPHandler.cpp:971 7. 0x7ffff7c6cc1d DB::TCPHandler::receivePacket() dbms/programs/server/TCPHandler.cpp:855 8. 0x7ffff7c6a1ef DB::TCPHandler::readDataNext(unsigned long const&, int const&) dbms/programs/server/TCPHandler.cpp:406 9. 0x7ffff7c6a41b DB::TCPHandler::readData(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:437 10. 0x7ffff7c6a5d9 DB::TCPHandler::processInsertQuery(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:464 11. 0x7ffff7c687b5 DB::TCPHandler::runImpl() dbms/programs/server/TCPHandler.cpp:257 ``` - `insert_distributed_sync=1` ``` 2019.10.18 13:23:22.114578 [ 44 ] {a78f669f-0b08-4337-abf8-d31e958f6d12} <Error> executeQuery: Code: 171, e.displayText() = DB::Exception: Block structure mismatch in RemoteBlockOutputStream stream: different number of columns: date Date UInt16(size = 1), value Date UInt16(size = 1) date Date UInt16(size = 0): Insertion status: Wrote 1 blocks and 0 rows on shard 0 replica 0, 127.0.0.1:59000 (average 0 ms per block) Wrote 0 blocks and 0 rows on shard 1 replica 0, 127.0.0.2:59000 (average 2 ms per block) (version 19.16.1.1) (from [::1]:3624) (in query: INSERT INTO distributed_00952 VALUES ), Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5da4e9 DB::checkBlockStructure<void>(...)::{...}::operator()(...) const dbms/src/Core/Block.cpp:460 4. 0x7fffec5da671 void DB::checkBlockStructure<void>(...) dbms/src/Core/Block.cpp:467 5. 0x7fffec5d8d58 DB::assertBlocksHaveEqualStructure(...) dbms/src/Core/Block.cpp:515 6. 0x7fffec326630 DB::RemoteBlockOutputStream::write(DB::Block const&) dbms/src/DataStreams/RemoteBlockOutputStream.cpp:68 7. 0x7fffe98bd154 DB::DistributedBlockOutputStream::runWritingJob(DB::DistributedBlockOutputStream::JobReplica&, DB::Block const&)::{lambda()#1}::operator()() const dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp:280 <snip> ```` Fixes: #7365 Fixes: #5429 Refs: #6891
2019-10-17 21:33:26 +00:00
if (ordinary_block.has(col.name))
{
ordinary_block.erase(col.name);
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "{}: column {} will be removed, because it is MATERIALIZED",
2020-05-23 21:26:45 +00:00
storage.getStorageID().getNameForLogs(), col.name);
Fix INSERT into Distributed non local node with MATERIALIZED columns Previous patch e527def18a1bbe5fba0920b7747e9c556fd21ff5 ("Fix INSERT into Distributed() table with MATERIALIZED column") fixes it only for cases when the node is local, i.e. direct insert. This patch address the problem when the node is not local (`is_local == false`), by erasing materialized columns on INSERT into Distributed. And this patch fixes two cases, depends on `insert_distributed_sync` setting: - `insert_distributed_sync=0` ``` Not found column value in block. There are only columns: date. Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5d6cf6 DB::Block::getByName(...) dbms/src/Core/Block.cpp:187 4. 0x7fffec2fe067 DB::NativeBlockInputStream::readImpl() dbms/src/DataStreams/NativeBlockInputStream.cpp:159 5. 0x7fffec2d223f DB::IBlockInputStream::read() dbms/src/DataStreams/IBlockInputStream.cpp:61 6. 0x7ffff7c6d40d DB::TCPHandler::receiveData() dbms/programs/server/TCPHandler.cpp:971 7. 0x7ffff7c6cc1d DB::TCPHandler::receivePacket() dbms/programs/server/TCPHandler.cpp:855 8. 0x7ffff7c6a1ef DB::TCPHandler::readDataNext(unsigned long const&, int const&) dbms/programs/server/TCPHandler.cpp:406 9. 0x7ffff7c6a41b DB::TCPHandler::readData(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:437 10. 0x7ffff7c6a5d9 DB::TCPHandler::processInsertQuery(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:464 11. 0x7ffff7c687b5 DB::TCPHandler::runImpl() dbms/programs/server/TCPHandler.cpp:257 ``` - `insert_distributed_sync=1` ``` 2019.10.18 13:23:22.114578 [ 44 ] {a78f669f-0b08-4337-abf8-d31e958f6d12} <Error> executeQuery: Code: 171, e.displayText() = DB::Exception: Block structure mismatch in RemoteBlockOutputStream stream: different number of columns: date Date UInt16(size = 1), value Date UInt16(size = 1) date Date UInt16(size = 0): Insertion status: Wrote 1 blocks and 0 rows on shard 0 replica 0, 127.0.0.1:59000 (average 0 ms per block) Wrote 0 blocks and 0 rows on shard 1 replica 0, 127.0.0.2:59000 (average 2 ms per block) (version 19.16.1.1) (from [::1]:3624) (in query: INSERT INTO distributed_00952 VALUES ), Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5da4e9 DB::checkBlockStructure<void>(...)::{...}::operator()(...) const dbms/src/Core/Block.cpp:460 4. 0x7fffec5da671 void DB::checkBlockStructure<void>(...) dbms/src/Core/Block.cpp:467 5. 0x7fffec5d8d58 DB::assertBlocksHaveEqualStructure(...) dbms/src/Core/Block.cpp:515 6. 0x7fffec326630 DB::RemoteBlockOutputStream::write(DB::Block const&) dbms/src/DataStreams/RemoteBlockOutputStream.cpp:68 7. 0x7fffe98bd154 DB::DistributedBlockOutputStream::runWritingJob(DB::DistributedBlockOutputStream::JobReplica&, DB::Block const&)::{lambda()#1}::operator()() const dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp:280 <snip> ```` Fixes: #7365 Fixes: #5429 Refs: #6891
2019-10-17 21:33:26 +00:00
}
}
Fix INSERT into Distributed non local node with MATERIALIZED columns Previous patch e527def18a1bbe5fba0920b7747e9c556fd21ff5 ("Fix INSERT into Distributed() table with MATERIALIZED column") fixes it only for cases when the node is local, i.e. direct insert. This patch address the problem when the node is not local (`is_local == false`), by erasing materialized columns on INSERT into Distributed. And this patch fixes two cases, depends on `insert_distributed_sync` setting: - `insert_distributed_sync=0` ``` Not found column value in block. There are only columns: date. Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5d6cf6 DB::Block::getByName(...) dbms/src/Core/Block.cpp:187 4. 0x7fffec2fe067 DB::NativeBlockInputStream::readImpl() dbms/src/DataStreams/NativeBlockInputStream.cpp:159 5. 0x7fffec2d223f DB::IBlockInputStream::read() dbms/src/DataStreams/IBlockInputStream.cpp:61 6. 0x7ffff7c6d40d DB::TCPHandler::receiveData() dbms/programs/server/TCPHandler.cpp:971 7. 0x7ffff7c6cc1d DB::TCPHandler::receivePacket() dbms/programs/server/TCPHandler.cpp:855 8. 0x7ffff7c6a1ef DB::TCPHandler::readDataNext(unsigned long const&, int const&) dbms/programs/server/TCPHandler.cpp:406 9. 0x7ffff7c6a41b DB::TCPHandler::readData(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:437 10. 0x7ffff7c6a5d9 DB::TCPHandler::processInsertQuery(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:464 11. 0x7ffff7c687b5 DB::TCPHandler::runImpl() dbms/programs/server/TCPHandler.cpp:257 ``` - `insert_distributed_sync=1` ``` 2019.10.18 13:23:22.114578 [ 44 ] {a78f669f-0b08-4337-abf8-d31e958f6d12} <Error> executeQuery: Code: 171, e.displayText() = DB::Exception: Block structure mismatch in RemoteBlockOutputStream stream: different number of columns: date Date UInt16(size = 1), value Date UInt16(size = 1) date Date UInt16(size = 0): Insertion status: Wrote 1 blocks and 0 rows on shard 0 replica 0, 127.0.0.1:59000 (average 0 ms per block) Wrote 0 blocks and 0 rows on shard 1 replica 0, 127.0.0.2:59000 (average 2 ms per block) (version 19.16.1.1) (from [::1]:3624) (in query: INSERT INTO distributed_00952 VALUES ), Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5da4e9 DB::checkBlockStructure<void>(...)::{...}::operator()(...) const dbms/src/Core/Block.cpp:460 4. 0x7fffec5da671 void DB::checkBlockStructure<void>(...) dbms/src/Core/Block.cpp:467 5. 0x7fffec5d8d58 DB::assertBlocksHaveEqualStructure(...) dbms/src/Core/Block.cpp:515 6. 0x7fffec326630 DB::RemoteBlockOutputStream::write(DB::Block const&) dbms/src/DataStreams/RemoteBlockOutputStream.cpp:68 7. 0x7fffe98bd154 DB::DistributedBlockOutputStream::runWritingJob(DB::DistributedBlockOutputStream::JobReplica&, DB::Block const&)::{lambda()#1}::operator()() const dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp:280 <snip> ```` Fixes: #7365 Fixes: #5429 Refs: #6891
2019-10-17 21:33:26 +00:00
if (insert_sync)
Fix INSERT into Distributed non local node with MATERIALIZED columns Previous patch e527def18a1bbe5fba0920b7747e9c556fd21ff5 ("Fix INSERT into Distributed() table with MATERIALIZED column") fixes it only for cases when the node is local, i.e. direct insert. This patch address the problem when the node is not local (`is_local == false`), by erasing materialized columns on INSERT into Distributed. And this patch fixes two cases, depends on `insert_distributed_sync` setting: - `insert_distributed_sync=0` ``` Not found column value in block. There are only columns: date. Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5d6cf6 DB::Block::getByName(...) dbms/src/Core/Block.cpp:187 4. 0x7fffec2fe067 DB::NativeBlockInputStream::readImpl() dbms/src/DataStreams/NativeBlockInputStream.cpp:159 5. 0x7fffec2d223f DB::IBlockInputStream::read() dbms/src/DataStreams/IBlockInputStream.cpp:61 6. 0x7ffff7c6d40d DB::TCPHandler::receiveData() dbms/programs/server/TCPHandler.cpp:971 7. 0x7ffff7c6cc1d DB::TCPHandler::receivePacket() dbms/programs/server/TCPHandler.cpp:855 8. 0x7ffff7c6a1ef DB::TCPHandler::readDataNext(unsigned long const&, int const&) dbms/programs/server/TCPHandler.cpp:406 9. 0x7ffff7c6a41b DB::TCPHandler::readData(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:437 10. 0x7ffff7c6a5d9 DB::TCPHandler::processInsertQuery(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:464 11. 0x7ffff7c687b5 DB::TCPHandler::runImpl() dbms/programs/server/TCPHandler.cpp:257 ``` - `insert_distributed_sync=1` ``` 2019.10.18 13:23:22.114578 [ 44 ] {a78f669f-0b08-4337-abf8-d31e958f6d12} <Error> executeQuery: Code: 171, e.displayText() = DB::Exception: Block structure mismatch in RemoteBlockOutputStream stream: different number of columns: date Date UInt16(size = 1), value Date UInt16(size = 1) date Date UInt16(size = 0): Insertion status: Wrote 1 blocks and 0 rows on shard 0 replica 0, 127.0.0.1:59000 (average 0 ms per block) Wrote 0 blocks and 0 rows on shard 1 replica 0, 127.0.0.2:59000 (average 2 ms per block) (version 19.16.1.1) (from [::1]:3624) (in query: INSERT INTO distributed_00952 VALUES ), Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5da4e9 DB::checkBlockStructure<void>(...)::{...}::operator()(...) const dbms/src/Core/Block.cpp:460 4. 0x7fffec5da671 void DB::checkBlockStructure<void>(...) dbms/src/Core/Block.cpp:467 5. 0x7fffec5d8d58 DB::assertBlocksHaveEqualStructure(...) dbms/src/Core/Block.cpp:515 6. 0x7fffec326630 DB::RemoteBlockOutputStream::write(DB::Block const&) dbms/src/DataStreams/RemoteBlockOutputStream.cpp:68 7. 0x7fffe98bd154 DB::DistributedBlockOutputStream::runWritingJob(DB::DistributedBlockOutputStream::JobReplica&, DB::Block const&)::{lambda()#1}::operator()() const dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp:280 <snip> ```` Fixes: #7365 Fixes: #5429 Refs: #6891
2019-10-17 21:33:26 +00:00
writeSync(ordinary_block);
else
Fix INSERT into Distributed non local node with MATERIALIZED columns Previous patch e527def18a1bbe5fba0920b7747e9c556fd21ff5 ("Fix INSERT into Distributed() table with MATERIALIZED column") fixes it only for cases when the node is local, i.e. direct insert. This patch address the problem when the node is not local (`is_local == false`), by erasing materialized columns on INSERT into Distributed. And this patch fixes two cases, depends on `insert_distributed_sync` setting: - `insert_distributed_sync=0` ``` Not found column value in block. There are only columns: date. Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5d6cf6 DB::Block::getByName(...) dbms/src/Core/Block.cpp:187 4. 0x7fffec2fe067 DB::NativeBlockInputStream::readImpl() dbms/src/DataStreams/NativeBlockInputStream.cpp:159 5. 0x7fffec2d223f DB::IBlockInputStream::read() dbms/src/DataStreams/IBlockInputStream.cpp:61 6. 0x7ffff7c6d40d DB::TCPHandler::receiveData() dbms/programs/server/TCPHandler.cpp:971 7. 0x7ffff7c6cc1d DB::TCPHandler::receivePacket() dbms/programs/server/TCPHandler.cpp:855 8. 0x7ffff7c6a1ef DB::TCPHandler::readDataNext(unsigned long const&, int const&) dbms/programs/server/TCPHandler.cpp:406 9. 0x7ffff7c6a41b DB::TCPHandler::readData(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:437 10. 0x7ffff7c6a5d9 DB::TCPHandler::processInsertQuery(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:464 11. 0x7ffff7c687b5 DB::TCPHandler::runImpl() dbms/programs/server/TCPHandler.cpp:257 ``` - `insert_distributed_sync=1` ``` 2019.10.18 13:23:22.114578 [ 44 ] {a78f669f-0b08-4337-abf8-d31e958f6d12} <Error> executeQuery: Code: 171, e.displayText() = DB::Exception: Block structure mismatch in RemoteBlockOutputStream stream: different number of columns: date Date UInt16(size = 1), value Date UInt16(size = 1) date Date UInt16(size = 0): Insertion status: Wrote 1 blocks and 0 rows on shard 0 replica 0, 127.0.0.1:59000 (average 0 ms per block) Wrote 0 blocks and 0 rows on shard 1 replica 0, 127.0.0.2:59000 (average 2 ms per block) (version 19.16.1.1) (from [::1]:3624) (in query: INSERT INTO distributed_00952 VALUES ), Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5da4e9 DB::checkBlockStructure<void>(...)::{...}::operator()(...) const dbms/src/Core/Block.cpp:460 4. 0x7fffec5da671 void DB::checkBlockStructure<void>(...) dbms/src/Core/Block.cpp:467 5. 0x7fffec5d8d58 DB::assertBlocksHaveEqualStructure(...) dbms/src/Core/Block.cpp:515 6. 0x7fffec326630 DB::RemoteBlockOutputStream::write(DB::Block const&) dbms/src/DataStreams/RemoteBlockOutputStream.cpp:68 7. 0x7fffe98bd154 DB::DistributedBlockOutputStream::runWritingJob(DB::DistributedBlockOutputStream::JobReplica&, DB::Block const&)::{lambda()#1}::operator()() const dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp:280 <snip> ```` Fixes: #7365 Fixes: #5429 Refs: #6891
2019-10-17 21:33:26 +00:00
writeAsync(ordinary_block);
}
void DistributedBlockOutputStream::writeAsync(const Block & block)
2016-01-28 01:00:42 +00:00
{
const Settings & settings = context.getSettingsRef();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
if (random_shard_insert)
{
writeAsyncImpl(block, storage.getRandomShardIndex(cluster->getShardsInfo()));
++inserted_blocks;
}
else
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
return writeSplitAsync(block);
2016-01-28 01:00:42 +00:00
writeAsyncImpl(block);
++inserted_blocks;
}
2016-01-28 01:00:42 +00:00
}
std::string DistributedBlockOutputStream::getCurrentStateDescription()
{
2020-11-10 18:22:26 +00:00
WriteBufferFromOwnString buffer;
const auto & addresses = cluster->getShardsAddresses();
buffer << "Insertion status:\n";
for (auto & shard_jobs : per_shard_jobs)
for (JobReplica & job : shard_jobs.replicas_jobs)
{
buffer << "Wrote " << job.blocks_written << " blocks and " << job.rows_written << " rows"
<< " on shard " << job.shard_index << " replica " << job.replica_index
<< ", " << addresses[job.shard_index][job.replica_index].readableString();
/// Performance statistics
if (job.blocks_started > 0)
{
buffer << " (average " << job.elapsed_time_ms / job.blocks_started << " ms per block";
if (job.blocks_started > 1)
buffer << ", the slowest block " << job.max_elapsed_time_for_block_ms << " ms";
buffer << ")";
}
buffer << "\n";
}
return buffer.str();
}
void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, size_t start, size_t end)
{
const Settings & settings = context.getSettingsRef();
const auto & addresses_with_failovers = cluster->getShardsAddresses();
const auto & shards_info = cluster->getShardsInfo();
size_t num_shards = end - start;
remote_jobs_count = 0;
local_jobs_count = 0;
per_shard_jobs.resize(shards_info.size());
for (size_t shard_index : ext::range(start, end))
{
const auto & shard_info = shards_info[shard_index];
auto & shard_jobs = per_shard_jobs[shard_index];
/// If hasInternalReplication, than prefer local replica (if !prefer_localhost_replica)
if (!shard_info.hasInternalReplication() || !shard_info.isLocal() || !settings.prefer_localhost_replica)
{
const auto & replicas = addresses_with_failovers[shard_index];
for (size_t replica_index : ext::range(0, replicas.size()))
{
if (!replicas[replica_index].is_local || !settings.prefer_localhost_replica)
{
shard_jobs.replicas_jobs.emplace_back(shard_index, replica_index, false, first_block);
++remote_jobs_count;
if (shard_info.hasInternalReplication())
break;
}
}
}
if (shard_info.isLocal() && settings.prefer_localhost_replica)
{
shard_jobs.replicas_jobs.emplace_back(shard_index, 0, true, first_block);
++local_jobs_count;
}
if (num_shards > 1)
2020-01-11 09:50:41 +00:00
shard_jobs.shard_current_block_permutation.reserve(first_block.rows());
}
}
void DistributedBlockOutputStream::waitForJobs()
{
pool->wait();
if (insert_timeout)
{
if (static_cast<UInt64>(watch.elapsedSeconds()) > insert_timeout)
{
ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded);
throw Exception("Synchronous distributed insert timeout exceeded.", ErrorCodes::TIMEOUT_EXCEEDED);
}
}
size_t jobs_count = remote_jobs_count + local_jobs_count;
size_t num_finished_jobs = finished_jobs_count;
if (num_finished_jobs < jobs_count)
2020-05-23 22:24:01 +00:00
LOG_WARNING(log, "Expected {} writing jobs, but finished only {}", jobs_count, num_finished_jobs);
}
ThreadPool::Job
DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block, size_t num_shards)
{
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block, num_shards]()
{
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
2018-05-29 18:14:31 +00:00
setThreadName("DistrOutStrProc");
++job.blocks_started;
SCOPE_EXIT({
++finished_jobs_count;
UInt64 elapsed_time_for_block_ms = watch_current_block.elapsedMilliseconds();
job.elapsed_time_ms += elapsed_time_for_block_ms;
job.max_elapsed_time_for_block_ms = std::max(job.max_elapsed_time_for_block_ms, elapsed_time_for_block_ms);
});
const auto & shard_info = cluster->getShardsInfo()[job.shard_index];
auto & shard_job = per_shard_jobs[job.shard_index];
const auto & addresses = cluster->getShardsAddresses();
/// Generate current shard block
if (num_shards > 1)
{
2020-01-11 09:50:41 +00:00
auto & shard_permutation = shard_job.shard_current_block_permutation;
size_t num_shard_rows = shard_permutation.size();
for (size_t j = 0; j < current_block.columns(); ++j)
{
2020-04-22 06:22:14 +00:00
const auto & src_column = current_block.getByPosition(j).column;
auto & dst_column = job.current_shard_block.getByPosition(j).column;
/// Zero permutation size has special meaning in IColumn::permute
if (num_shard_rows)
dst_column = src_column->permute(shard_permutation, num_shard_rows);
else
dst_column = src_column->cloneEmpty();
}
}
const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
const Settings & settings = context.getSettingsRef();
/// Do not initiate INSERT for empty block.
if (shard_block.rows() == 0)
return;
if (!job.is_local_job || !settings.prefer_localhost_replica)
{
if (!job.stream)
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
if (shard_info.hasInternalReplication())
{
/// Skip replica_index in case of internal replication
if (shard_job.replicas_jobs.size() != 1)
throw Exception("There are several writing job for an automatically replicated shard", ErrorCodes::LOGICAL_ERROR);
/// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here
auto connections = shard_info.pool->getMany(timeouts, &settings, PoolMode::GET_ONE);
if (connections.empty() || connections.front().isNull())
throw Exception("Expected exactly one connection for shard " + toString(job.shard_index), ErrorCodes::LOGICAL_ERROR);
job.connection_entry = std::move(connections.front());
}
else
{
const auto & replica = addresses.at(job.shard_index).at(job.replica_index);
const ConnectionPoolPtr & connection_pool = shard_info.per_replica_pools.at(job.replica_index);
if (!connection_pool)
throw Exception("Connection pool for replica " + replica.readableString() + " does not exist", ErrorCodes::LOGICAL_ERROR);
job.connection_entry = connection_pool->get(timeouts, &settings);
if (job.connection_entry.isNull())
throw Exception("Got empty connection for replica" + replica.readableString(), ErrorCodes::LOGICAL_ERROR);
}
if (throttler)
job.connection_entry->setThrottler(throttler);
2020-05-17 05:45:20 +00:00
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, settings, context.getClientInfo());
job.stream->writePrefix();
}
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
job.stream->write(shard_block);
}
else // local
{
if (!job.stream)
{
/// Forward user settings
job.local_context = std::make_unique<Context>(context);
/// Copying of the query AST is required to avoid race,
/// in case of INSERT into multiple local shards.
///
/// Since INSERT into local node uses AST,
/// and InterpreterInsertQuery::execute() is modifying it,
/// to resolve tables (in InterpreterInsertQuery::getTable())
2021-01-15 15:54:35 +00:00
auto copy_query_ast = query_ast->clone();
InterpreterInsertQuery interp(copy_query_ast, *job.local_context);
auto block_io = interp.execute();
job.stream = block_io.out;
job.stream->writePrefix();
}
2020-04-14 21:05:45 +00:00
writeBlockConvert(job.stream, shard_block, shard_info.getLocalNodeCount());
}
job.blocks_written += 1;
job.rows_written += shard_block.rows();
};
}
void DistributedBlockOutputStream::writeSync(const Block & block)
{
const Settings & settings = context.getSettingsRef();
const auto & shards_info = cluster->getShardsInfo();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
size_t start = 0;
size_t end = shards_info.size();
if (random_shard_insert)
{
start = storage.getRandomShardIndex(shards_info);
end = start + 1;
}
size_t num_shards = end - start;
if (!pool)
{
/// Deferred initialization. Only for sync insertion.
initWritingJobs(block, start, end);
pool.emplace(remote_jobs_count + local_jobs_count);
if (!throttler && (settings.max_network_bandwidth || settings.max_network_bytes))
{
throttler = std::make_shared<Throttler>(settings.max_network_bandwidth, settings.max_network_bytes,
"Network bandwidth limit for a query exceeded.");
}
watch.restart();
}
watch_current_block.restart();
if (num_shards > 1)
{
auto current_selector = createSelector(block);
/// Prepare row numbers for each shard
for (size_t shard_index : ext::range(0, num_shards))
2020-01-11 09:50:41 +00:00
per_shard_jobs[shard_index].shard_current_block_permutation.resize(0);
for (size_t i = 0; i < block.rows(); ++i)
2020-01-11 09:50:41 +00:00
per_shard_jobs[current_selector[i]].shard_current_block_permutation.push_back(i);
}
try
{
/// Run jobs in parallel for each block and wait them
finished_jobs_count = 0;
for (size_t shard_index : ext::range(0, shards_info.size()))
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
pool->scheduleOrThrowOnError(runWritingJob(job, block, num_shards));
}
catch (...)
{
pool->wait();
throw;
}
try
{
waitForJobs();
}
catch (Exception & exception)
{
exception.addMessage(getCurrentStateDescription());
throw;
}
inserted_blocks += 1;
inserted_rows += block.rows();
}
2016-01-28 01:00:42 +00:00
void DistributedBlockOutputStream::writeSuffix()
{
2020-05-23 21:26:45 +00:00
auto log_performance = [this]()
{
double elapsed = watch.elapsedSeconds();
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "It took {} sec. to insert {} blocks, {} rows per second. {}", elapsed, inserted_blocks, inserted_rows / elapsed, getCurrentStateDescription());
};
if (insert_sync && pool)
{
finished_jobs_count = 0;
try
{
for (auto & shard_jobs : per_shard_jobs)
{
for (JobReplica & job : shard_jobs.replicas_jobs)
{
if (job.stream)
2018-05-18 18:31:18 +00:00
{
pool->scheduleOrThrowOnError([&job]()
{
job.stream->writeSuffix();
});
}
}
}
}
catch (...)
{
pool->wait();
throw;
}
try
{
pool->wait();
log_performance();
}
catch (Exception & exception)
{
log_performance();
exception.addMessage(getCurrentStateDescription());
throw;
}
}
}
IColumn::Selector DistributedBlockOutputStream::createSelector(const Block & source_block) const
{
Block current_block_with_sharding_key_expr = source_block;
storage.getShardingKeyExpr()->execute(current_block_with_sharding_key_expr);
const auto & key_column = current_block_with_sharding_key_expr.getByName(storage.getShardingKeyColumnName());
return storage.createSelector(cluster, key_column);
2016-01-28 01:00:42 +00:00
}
Blocks DistributedBlockOutputStream::splitBlock(const Block & block)
2016-01-28 01:00:42 +00:00
{
auto selector = createSelector(block);
2016-01-28 01:00:42 +00:00
/// Split block to num_shard smaller block, using 'selector'.
2016-01-28 01:00:42 +00:00
const size_t num_shards = cluster->getShardsInfo().size();
Blocks splitted_blocks(num_shards);
2016-01-28 01:00:42 +00:00
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
splitted_blocks[shard_idx] = block.cloneEmpty();
2016-01-28 01:00:42 +00:00
size_t columns_in_block = block.columns();
for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block)
{
MutableColumns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_shards, selector);
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
splitted_blocks[shard_idx].getByPosition(col_idx_in_block).column = std::move(splitted_columns[shard_idx]);
}
return splitted_blocks;
}
void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
{
Blocks splitted_blocks = splitBlock(block);
const size_t num_shards = splitted_blocks.size();
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
if (splitted_blocks[shard_idx].rows())
writeAsyncImpl(splitted_blocks[shard_idx], shard_idx);
++inserted_blocks;
2016-01-28 01:00:42 +00:00
}
void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const size_t shard_id)
2016-01-28 01:00:42 +00:00
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context.getSettingsRef();
2016-01-28 01:00:42 +00:00
if (shard_info.hasInternalReplication())
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
/// Prefer insert into current instance directly
writeToLocal(block, shard_info.getLocalNodeCount());
else
{
const auto & path = shard_info.insertPathForInternalReplication(
settings.prefer_localhost_replica,
settings.use_compact_format_in_distributed_parts_names);
if (path.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
writeToShard(block, {path});
}
}
else
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
writeToLocal(block, shard_info.getLocalNodeCount());
std::vector<std::string> dir_names;
for (const auto & address : cluster->getShardsAddresses()[shard_id])
if (!address.is_local || !settings.prefer_localhost_replica)
dir_names.push_back(address.toFullString(settings.use_compact_format_in_distributed_parts_names));
if (!dir_names.empty())
writeToShard(block, dir_names);
}
2016-01-28 01:00:42 +00:00
}
void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats)
2016-01-28 01:00:42 +00:00
{
/// Async insert does not support settings forwarding yet whereas sync one supports
InterpreterInsertQuery interp(query_ast, context);
auto block_io = interp.execute();
block_io.out->writePrefix();
2020-04-14 21:05:45 +00:00
writeBlockConvert(block_io.out, block, repeats);
block_io.out->writeSuffix();
2016-01-28 01:00:42 +00:00
}
2016-01-28 01:00:42 +00:00
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
const auto & settings = context.getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef();
bool fsync = distributed_settings.fsync_after_insert;
bool dir_fsync = distributed_settings.fsync_directories;
std::string compression_method = Poco::toUpper(settings.network_compression_method.toString());
std::optional<int> compression_level;
if (compression_method == "ZSTD")
compression_level = settings.network_zstd_compression_level;
CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs);
CompressionCodecPtr compression_codec = CompressionCodecFactory::instance().get(compression_method, compression_level);
/// tmp directory is used to ensure atomicity of transactions
/// and keep monitor thread out from reading incomplete data
std::string first_file_tmp_path{};
2016-01-28 01:00:42 +00:00
2020-12-02 14:49:43 +00:00
auto reservation = storage.getStoragePolicy()->reserveAndCheck(block.bytes());
const auto disk = reservation->getDisk();
auto disk_path = disk->getPath();
2020-07-23 14:10:48 +00:00
auto data_path = storage.getRelativeDataPath();
2016-01-28 01:00:42 +00:00
auto make_directory_sync_guard = [&](const std::string & current_path)
{
std::unique_ptr<DirectorySyncGuard> guard;
if (dir_fsync)
{
const std::string relative_path(data_path + current_path);
guard = std::make_unique<DirectorySyncGuard>(disk, relative_path);
}
return guard;
};
auto it = dir_names.begin();
/// on first iteration write block to a temporary directory for subsequent
/// hardlinking to ensure the inode is not freed until we're done
{
const std::string path(disk_path + data_path + *it);
Poco::File(path).createDirectory();
2016-01-28 01:00:42 +00:00
const std::string tmp_path(path + "/tmp/");
Poco::File(tmp_path).createDirectory();
2016-01-28 01:00:42 +00:00
const std::string file_name(toString(storage.file_names_increment.get()) + ".bin");
first_file_tmp_path = tmp_path + file_name;
/// Write batch to temporary location
{
auto tmp_dir_sync_guard = make_directory_sync_guard(*it + "/tmp/");
WriteBufferFromFile out{first_file_tmp_path};
CompressedWriteBuffer compress{out, compression_codec};
2020-09-17 12:15:05 +00:00
NativeBlockOutputStream stream{compress, DBMS_TCP_PROTOCOL_VERSION, block.cloneEmpty()};
/// Prepare the header.
/// We wrap the header into a string for compatibility with older versions:
/// a shard will able to read the header partly and ignore other parts based on its version.
WriteBufferFromOwnString header_buf;
2020-09-17 12:15:05 +00:00
writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, header_buf);
writeStringBinary(query_string, header_buf);
context.getSettingsRef().write(header_buf);
2020-09-17 12:15:05 +00:00
context.getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
writeVarUInt(block.rows(), header_buf);
writeVarUInt(block.bytes(), header_buf);
writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf);
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);
/// And note that it is safe, because we have checksum and size for header.
/// Write the header.
const StringRef header = header_buf.stringRef();
writeVarUInt(DBMS_DISTRIBUTED_SIGNATURE_HEADER, out);
writeStringBinary(header, out);
writePODBinary(CityHash_v1_0_2::CityHash128(header.data, header.size), out);
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
out.finalize();
if (fsync)
out.sync();
}
2016-01-28 01:00:42 +00:00
// Create hardlink here to reuse increment number
const std::string block_file_path(path + '/' + file_name);
createHardLink(first_file_tmp_path, block_file_path);
auto dir_sync_guard = make_directory_sync_guard(*it);
}
++it;
2016-01-28 01:00:42 +00:00
/// Make hardlinks
for (; it != dir_names.end(); ++it)
{
const std::string path(disk_path + data_path + *it);
Poco::File(path).createDirectory();
const std::string block_file_path(path + '/' + toString(storage.file_names_increment.get()) + ".bin");
createHardLink(first_file_tmp_path, block_file_path);
auto dir_sync_guard = make_directory_sync_guard(*it);
}
/// remove the temporary file, enabling the OS to reclaim inode after all threads
/// have removed their corresponding files
Poco::File(first_file_tmp_path).remove();
/// Notify
auto sleep_ms = context.getSettingsRef().distributed_directory_monitor_sleep_time_ms;
for (const auto & dir_name : dir_names)
{
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
directory_monitor.scheduleAfter(sleep_ms.totalMilliseconds());
}
2016-01-28 01:00:42 +00:00
}
2016-01-28 01:00:42 +00:00
}