mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-18 20:32:43 +00:00
Merge pull request #6911 from yandex/fix-insert-select-data-loss
Fix insert select data loss
(cherry picked from commit 6fbf9ca7ab
)
This commit is contained in:
parent
da44d9fd9b
commit
1602daba59
@ -28,8 +28,6 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall
|
||||
break;
|
||||
|
||||
to.write(block);
|
||||
if (!block.rows())
|
||||
to.flush();
|
||||
progress(block);
|
||||
}
|
||||
|
||||
|
@ -35,6 +35,7 @@
|
||||
#include <Processors/Transforms/LimitsCheckingTransform.h>
|
||||
#include <Processors/Transforms/MaterializingTransform.h>
|
||||
#include <Processors/Formats/IOutputFormat.h>
|
||||
#include <Parsers/ASTWatchQuery.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
@ -631,7 +632,19 @@ void executeQuery(
|
||||
if (set_query_id)
|
||||
set_query_id(context.getClientInfo().current_query_id);
|
||||
|
||||
copyData(*streams.in, *out);
|
||||
if (ast->as<ASTWatchQuery>())
|
||||
{
|
||||
/// For Watch query, flush data if block is empty (to send data to client).
|
||||
auto flush_callback = [&out](const Block & block)
|
||||
{
|
||||
if (block.rows() == 0)
|
||||
out->flush();
|
||||
};
|
||||
|
||||
copyData(*streams.in, *out, [](){ return false; }, std::move(flush_callback));
|
||||
}
|
||||
else
|
||||
copyData(*streams.in, *out);
|
||||
}
|
||||
|
||||
if (pipeline.initialized())
|
||||
|
@ -0,0 +1,2 @@
|
||||
0
|
||||
10
|
@ -0,0 +1,5 @@
|
||||
drop table if exists tab;
|
||||
create table tab (x UInt64) engine = MergeTree order by tuple();
|
||||
|
||||
insert into tab select number as n from numbers(20) any inner join (select number * 10 as n from numbers(2)) using(n) settings any_join_distinct_right_table_keys = 1, max_block_size = 5;
|
||||
select * from tab order by x;
|
Loading…
Reference in New Issue
Block a user