mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-11 17:02:25 +00:00
Convert blocks if structure does not match on INSERT into Distributed()
Follow-up for: #10105
This commit is contained in:
parent
6d80ab1eed
commit
6d85207bfb
@ -12,6 +12,8 @@
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <DataStreams/NativeBlockOutputStream.h>
|
||||
#include <DataStreams/RemoteBlockOutputStream.h>
|
||||
#include <DataStreams/ConvertingBlockInputStream.h>
|
||||
#include <DataStreams/OneBlockInputStream.h>
|
||||
#include <Interpreters/InterpreterInsertQuery.h>
|
||||
#include <Interpreters/createBlockSelector.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
@ -59,6 +61,26 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_LINK;
|
||||
}
|
||||
|
||||
static void writeBlockConvert(const Context & context, const BlockOutputStreamPtr & out, const Block & block, const size_t repeats)
|
||||
{
|
||||
if (!blocksHaveEqualStructure(out->getHeader(), block))
|
||||
{
|
||||
ConvertingBlockInputStream convert(context,
|
||||
std::make_shared<OneBlockInputStream>(block),
|
||||
out->getHeader(),
|
||||
ConvertingBlockInputStream::MatchColumnsMode::Name);
|
||||
auto adopted_block = convert.read();
|
||||
|
||||
for (size_t i = 0; i < repeats; ++i)
|
||||
out->write(adopted_block);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (size_t i = 0; i < repeats; ++i)
|
||||
out->write(block);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
DistributedBlockOutputStream::DistributedBlockOutputStream(
|
||||
const Context & context_, StorageDistributed & storage_, const ASTPtr & query_ast_, const ClusterPtr & cluster_,
|
||||
@ -306,14 +328,12 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
|
||||
|
||||
InterpreterInsertQuery interp(query_ast, *job.local_context);
|
||||
auto block_io = interp.execute();
|
||||
assertBlocksHaveEqualStructure(block_io.out->getHeader(), shard_block, "flushing shard block for " + storage.getStorageID().getNameForLogs());
|
||||
|
||||
job.stream = block_io.out;
|
||||
job.stream->writePrefix();
|
||||
}
|
||||
|
||||
size_t num_repetitions = shard_info.getLocalNodeCount();
|
||||
for (size_t i = 0; i < num_repetitions; ++i)
|
||||
job.stream->write(shard_block);
|
||||
writeBlockConvert(context, job.stream, shard_block, shard_info.getLocalNodeCount());
|
||||
}
|
||||
|
||||
job.blocks_written += 1;
|
||||
@ -547,13 +567,8 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_
|
||||
|
||||
auto block_io = interp.execute();
|
||||
|
||||
assertBlocksHaveEqualStructure(block_io.out->getHeader(), block, "flushing " + storage.getStorageID().getNameForLogs());
|
||||
|
||||
block_io.out->writePrefix();
|
||||
|
||||
for (size_t i = 0; i < repeats; ++i)
|
||||
block_io.out->write(block);
|
||||
|
||||
writeBlockConvert(context, block_io.out, block, repeats);
|
||||
block_io.out->writeSuffix();
|
||||
}
|
||||
|
||||
|
@ -0,0 +1 @@
|
||||
0
|
@ -4,6 +4,6 @@ DROP TABLE IF EXISTS underlying_00967;
|
||||
CREATE TABLE dist_00967 (key UInt64) Engine=Distributed('test_shard_localhost', currentDatabase(), underlying_00967);
|
||||
-- fails for TinyLog()/MergeTree()/... but not for Memory()
|
||||
CREATE TABLE underlying_00967 (key Nullable(UInt64)) Engine=TinyLog();
|
||||
INSERT INTO dist_00967 SELECT toUInt64(number) FROM system.numbers LIMIT 1; -- { serverError 171; }
|
||||
INSERT INTO dist_00967 SELECT toUInt64(number) FROM system.numbers LIMIT 1;
|
||||
|
||||
SELECT * FROM dist_00967;
|
||||
|
Loading…
Reference in New Issue
Block a user