mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #12519 from vzakaznikov/fix_data_duplication_and_tests_for_live_view
Fixing race condition in live view tables which could cause data duplication and live view tests
This commit is contained in:
commit
8966c09ed6
@ -328,8 +328,14 @@ bool StorageLiveView::getNewBlocks()
|
|||||||
BlocksPtr new_blocks = std::make_shared<Blocks>();
|
BlocksPtr new_blocks = std::make_shared<Blocks>();
|
||||||
BlocksMetadataPtr new_blocks_metadata = std::make_shared<BlocksMetadata>();
|
BlocksMetadataPtr new_blocks_metadata = std::make_shared<BlocksMetadata>();
|
||||||
|
|
||||||
mergeable_blocks = collectMergeableBlocks(*live_view_context);
|
/// can't set mergeable_blocks here or anywhere else outside the writeIntoLiveView function
|
||||||
Pipes from = blocksToPipes(mergeable_blocks->blocks, mergeable_blocks->sample_block);
|
/// as there could be a race codition when the new block has been inserted into
|
||||||
|
/// the source table by the PushingToViewsBlockOutputStream and this method
|
||||||
|
/// called before writeIntoLiveView function is called which can lead to
|
||||||
|
/// the same block added twice to the mergeable_blocks leading to
|
||||||
|
/// inserted data to be duplicated
|
||||||
|
auto new_mergeable_blocks = collectMergeableBlocks(*live_view_context);
|
||||||
|
Pipes from = blocksToPipes(new_mergeable_blocks->blocks, new_mergeable_blocks->sample_block);
|
||||||
BlockInputStreamPtr data = completeQuery(std::move(from));
|
BlockInputStreamPtr data = completeQuery(std::move(from));
|
||||||
|
|
||||||
while (Block block = data->read())
|
while (Block block = data->read())
|
||||||
|
@ -30,6 +30,7 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
|
|||||||
client1.send('CREATE LIVE VIEW test.lv WITH TIMEOUT AS SELECT sum(a) FROM test.mt')
|
client1.send('CREATE LIVE VIEW test.lv WITH TIMEOUT AS SELECT sum(a) FROM test.mt')
|
||||||
client1.expect(prompt)
|
client1.expect(prompt)
|
||||||
client1.send('WATCH test.lv')
|
client1.send('WATCH test.lv')
|
||||||
|
client1.expect('_version')
|
||||||
client1.expect(r'0.*1' + end_of_block)
|
client1.expect(r'0.*1' + end_of_block)
|
||||||
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
client1.expect(r'6.*2' + end_of_block)
|
client1.expect(r'6.*2' + end_of_block)
|
||||||
|
@ -30,6 +30,7 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
|
|||||||
client1.send('CREATE LIVE VIEW test.lv WITH TIMEOUT 1 AS SELECT sum(a) FROM test.mt')
|
client1.send('CREATE LIVE VIEW test.lv WITH TIMEOUT 1 AS SELECT sum(a) FROM test.mt')
|
||||||
client1.expect(prompt)
|
client1.expect(prompt)
|
||||||
client1.send('WATCH test.lv')
|
client1.send('WATCH test.lv')
|
||||||
|
client1.expect('_version')
|
||||||
client1.expect(r'0.*1' + end_of_block)
|
client1.expect(r'0.*1' + end_of_block)
|
||||||
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
client2.expect(prompt)
|
client2.expect(prompt)
|
||||||
|
@ -32,6 +32,7 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
|
|||||||
client1.send('CREATE LIVE VIEW test.lv WITH TIMEOUT AS SELECT sum(a) FROM test.mt')
|
client1.send('CREATE LIVE VIEW test.lv WITH TIMEOUT AS SELECT sum(a) FROM test.mt')
|
||||||
client1.expect(prompt)
|
client1.expect(prompt)
|
||||||
client1.send('WATCH test.lv')
|
client1.send('WATCH test.lv')
|
||||||
|
client1.expect('_version')
|
||||||
client1.expect(r'0.*1' + end_of_block)
|
client1.expect(r'0.*1' + end_of_block)
|
||||||
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
client1.expect(r'6.*2' + end_of_block)
|
client1.expect(r'6.*2' + end_of_block)
|
||||||
|
@ -29,8 +29,8 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
|
|||||||
client1.expect(prompt)
|
client1.expect(prompt)
|
||||||
client1.send('CREATE LIVE VIEW test.lv AS SELECT toStartOfDay(time) AS day, location, avg(temperature) FROM test.mt GROUP BY day, location ORDER BY day, location')
|
client1.send('CREATE LIVE VIEW test.lv AS SELECT toStartOfDay(time) AS day, location, avg(temperature) FROM test.mt GROUP BY day, location ORDER BY day, location')
|
||||||
client1.expect(prompt)
|
client1.expect(prompt)
|
||||||
client1.send('WATCH test.lv FORMAT CSV')
|
client1.send('WATCH test.lv FORMAT CSVWithNames')
|
||||||
client1.expect(r'0.*1' + end_of_block)
|
client1.expect(r'_version')
|
||||||
client2.send("INSERT INTO test.mt VALUES ('2019-01-01 00:00:00','New York',60),('2019-01-01 00:10:00','New York',70)")
|
client2.send("INSERT INTO test.mt VALUES ('2019-01-01 00:00:00','New York',60),('2019-01-01 00:10:00','New York',70)")
|
||||||
client2.expect(prompt)
|
client2.expect(prompt)
|
||||||
client1.expect(r'"2019-01-01 00:00:00","New York",65')
|
client1.expect(r'"2019-01-01 00:00:00","New York",65')
|
||||||
|
@ -30,6 +30,7 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
|
|||||||
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
|
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
|
||||||
client1.expect(prompt)
|
client1.expect(prompt)
|
||||||
client1.send('WATCH test.lv')
|
client1.send('WATCH test.lv')
|
||||||
|
client1.expect('_version')
|
||||||
client1.expect(r'0.*1' + end_of_block)
|
client1.expect(r'0.*1' + end_of_block)
|
||||||
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
client1.expect(r'6.*2' + end_of_block)
|
client1.expect(r'6.*2' + end_of_block)
|
||||||
|
@ -30,6 +30,7 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
|
|||||||
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a)/2 FROM (SELECT a, id FROM ( SELECT a, id FROM test.mt ORDER BY id DESC LIMIT 2 ) ORDER BY id DESC LIMIT 2)')
|
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a)/2 FROM (SELECT a, id FROM ( SELECT a, id FROM test.mt ORDER BY id DESC LIMIT 2 ) ORDER BY id DESC LIMIT 2)')
|
||||||
client1.expect(prompt)
|
client1.expect(prompt)
|
||||||
client1.send('WATCH test.lv')
|
client1.send('WATCH test.lv')
|
||||||
|
client1.expect('_version')
|
||||||
client1.expect(r'0.*1' + end_of_block)
|
client1.expect(r'0.*1' + end_of_block)
|
||||||
client2.send('INSERT INTO test.mt VALUES (1, 1),(2, 2),(3, 3)')
|
client2.send('INSERT INTO test.mt VALUES (1, 1),(2, 2),(3, 3)')
|
||||||
client1.expect(r'2\.5.*2' + end_of_block)
|
client1.expect(r'2\.5.*2' + end_of_block)
|
||||||
|
@ -30,6 +30,7 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
|
|||||||
client1.send('CREATE LIVE VIEW test.lv AS SELECT * FROM ( SELECT sum(A.a) FROM (SELECT * FROM test.mt) AS A )')
|
client1.send('CREATE LIVE VIEW test.lv AS SELECT * FROM ( SELECT sum(A.a) FROM (SELECT * FROM test.mt) AS A )')
|
||||||
client1.expect(prompt)
|
client1.expect(prompt)
|
||||||
client1.send('WATCH test.lv')
|
client1.send('WATCH test.lv')
|
||||||
|
client1.expect('_version')
|
||||||
client1.expect(r'0.*1' + end_of_block)
|
client1.expect(r'0.*1' + end_of_block)
|
||||||
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
|
||||||
client1.expect(r'6.*2' + end_of_block)
|
client1.expect(r'6.*2' + end_of_block)
|
||||||
|
@ -38,8 +38,8 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
|
|||||||
client3.send('CREATE LIVE VIEW test.lv_sums AS SELECT * FROM test.sums ORDER BY version')
|
client3.send('CREATE LIVE VIEW test.lv_sums AS SELECT * FROM test.sums ORDER BY version')
|
||||||
client3.expect(prompt)
|
client3.expect(prompt)
|
||||||
|
|
||||||
client3.send("WATCH test.lv_sums FORMAT CSV")
|
client3.send("WATCH test.lv_sums FORMAT CSVWithNames")
|
||||||
client3.expect(r'0.*1' + end_of_block)
|
client3.expect('_version')
|
||||||
|
|
||||||
client1.send('INSERT INTO test.sums WATCH test.lv')
|
client1.send('INSERT INTO test.sums WATCH test.lv')
|
||||||
client1.expect(r'INSERT INTO')
|
client1.expect(r'INSERT INTO')
|
||||||
|
Loading…
Reference in New Issue
Block a user