More atomic insert to MATERIALIZED VIEW. [#CLICKHOUSE-3743]

This commit is contained in:
Vitaliy Lyudvichenko 2018-05-22 02:17:57 +03:00 committed by Vitaliy Lyudvichenko
parent 6cc7068c6b
commit 87ad1df259
11 changed files with 127 additions and 31 deletions

View File

@ -18,6 +18,10 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
*/
addTableLock(storage->lockStructure(true, __PRETTY_FUNCTION__));
/// If the "root" table deduplactes blocks, there are no need to make deduplication for children
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks
bool disable_deduplication_for_children = !no_destination && storage->supportsDeduplication();
if (!table.empty())
{
Dependencies dependencies = context.getDependencies(database, table);
@ -27,7 +31,8 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
{
views_context = std::make_unique<Context>(context);
// Do not deduplicate insertions into MV if the main insertion is Ok
views_context->getSettingsRef().insert_deduplicate = false;
if (disable_deduplication_for_children)
views_context->getSettingsRef().insert_deduplicate = false;
}
for (const auto & database_table : dependencies)
@ -89,4 +94,51 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
}
}
void PushingToViewsBlockOutputStream::writePrefix()
{
if (output)
output->writePrefix();
for (auto & view : views)
{
try
{
view.out->writePrefix();
}
catch (Exception & ex)
{
ex.addMessage("while write prefix to view " + view.database + "." + view.table);
throw;
}
}
}
void PushingToViewsBlockOutputStream::writeSuffix()
{
if (output)
output->writeSuffix();
for (auto & view : views)
{
try
{
view.out->writeSuffix();
}
catch (Exception & ex)
{
ex.addMessage("while write prefix to view " + view.database + "." + view.table);
throw;
}
}
}
void PushingToViewsBlockOutputStream::flush()
{
if (output)
output->flush();
for (auto & view : views)
view.out->flush();
}
}

View File

@ -25,32 +25,9 @@ public:
Block getHeader() const override { return storage->getSampleBlock(); }
void write(const Block & block) override;
void flush() override
{
if (output)
output->flush();
for (auto & view : views)
view.out->flush();
}
void writePrefix() override
{
if (output)
output->writePrefix();
for (auto & view : views)
view.out->writePrefix();
}
void writeSuffix() override
{
if (output)
output->writeSuffix();
for (auto & view : views)
view.out->writeSuffix();
}
void flush() override;
void writePrefix() override;
void writeSuffix() override;
private:
StoragePtr storage;

View File

@ -107,6 +107,9 @@ public:
/** Returns true if the storage replicates SELECT, INSERT and ALTER commands among replicas. */
virtual bool supportsReplication() const { return false; }
/** Returns true if the storage supports deduplication of inserted data blocks . */
virtual bool supportsDeduplication() const { return false; }
/** Does not allow you to change the structure or name of the table.
* If you change the data in the table, you will need to specify will_modify_data = true.
* This will take an extra lock that does not allow starting ALTER MODIFY.

View File

@ -14,7 +14,7 @@ Block MergeTreeBlockOutputStream::getHeader() const
void MergeTreeBlockOutputStream::write(const Block & block)
{
storage.data.delayInsertIfNeeded();
storage.data.delayInsertOrThrowIfNeeded();
auto part_blocks = storage.writer.splitBlockIntoParts(block);
for (auto & current_block : part_blocks)

View File

@ -1787,7 +1787,7 @@ size_t MergeTreeData::getMaxPartsCountForPartition() const
}
void MergeTreeData::delayInsertIfNeeded(Poco::Event * until)
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const
{
const size_t parts_count = getMaxPartsCountForPartition();
if (parts_count < settings.parts_to_delay_insert)
@ -1817,6 +1817,17 @@ void MergeTreeData::delayInsertIfNeeded(Poco::Event * until)
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<size_t>(delay_milliseconds)));
}
void MergeTreeData::throwInsertIfNeeded() const
{
const size_t parts_count = getMaxPartsCountForPartition();
if (parts_count >= settings.parts_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
}
}
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/)
{

View File

@ -389,7 +389,8 @@ public:
/// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertIfNeeded(Poco::Event * until = nullptr);
void delayInsertOrThrowIfNeeded(Poco::Event *until = nullptr) const;
void throwInsertIfNeeded() const;
/// Renames temporary part to a permanent part and adds it to the parts set.
/// It is assumed that the part does not intersect with existing parts.

View File

@ -108,7 +108,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
last_block_is_duplicate = false;
/// TODO Is it possible to not lock the table structure here?
storage.data.delayInsertIfNeeded(&storage.restarting_thread->getWakeupEvent());
storage.data.delayInsertOrThrowIfNeeded(&storage.restarting_thread->getWakeupEvent());
auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper);
@ -402,5 +402,10 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
}
}
void ReplicatedMergeTreeBlockOutputStream::writePrefix()
{
storage.data.throwInsertIfNeeded();
}
}

View File

@ -26,6 +26,7 @@ public:
bool deduplicate_);
Block getHeader() const override;
void writePrefix() override;
void write(const Block & block) override;
/// For ATTACHing existing data on filesystem.

View File

@ -87,6 +87,7 @@ public:
bool supportsFinal() const override { return data.supportsFinal(); }
bool supportsPrewhere() const override { return data.supportsPrewhere(); }
bool supportsReplication() const override { return true; }
bool supportsDeduplication() const override { return true; }
const ColumnsDescription & getColumns() const override { return data.getColumns(); }
void setColumns(ColumnsDescription columns_) override { return data.setColumns(std::move(columns_)); }

View File

@ -0,0 +1,9 @@
a 1
b 1
c 1
a 1
b 1
c 1
1

View File

@ -0,0 +1,36 @@
#!/bin/bash
set -e
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.root"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.a"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.b"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.c"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE test.root (d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/root', '1') ORDER BY d"
${CLICKHOUSE_CLIENT} --query "CREATE MATERIALIZED VIEW test.a (d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/a', '1') ORDER BY d AS SELECT * FROM test.root"
${CLICKHOUSE_CLIENT} --query "CREATE MATERIALIZED VIEW test.b (d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/b', '1') ORDER BY d SETTINGS parts_to_delay_insert=1, parts_to_throw_insert=1 AS SELECT * FROM test.root"
${CLICKHOUSE_CLIENT} --query "CREATE MATERIALIZED VIEW test.c (d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/c', '1') ORDER BY d AS SELECT * FROM test.root"
${CLICKHOUSE_CLIENT} --query "INSERT INTO test.root VALUES (1)";
${CLICKHOUSE_CLIENT} --query "SELECT _table, d FROM merge('test', '^[abc]\$') ORDER BY _table"
if ${CLICKHOUSE_CLIENT} --query "INSERT INTO test.root VALUES (2)" 2>/dev/null; then
echo "FAIL\nExpected 'too many parts' on table test.b"
fi
echo
${CLICKHOUSE_CLIENT} --query "SELECT _table, d FROM merge('test', '^[abc]\$') ORDER BY _table"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.root"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.a"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.b"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.c"
# Deduplication check for non-replicated root table
echo
${CLICKHOUSE_CLIENT} --query "CREATE TABLE test.root (d UInt64) ENGINE = Null"
${CLICKHOUSE_CLIENT} --query "CREATE MATERIALIZED VIEW test.a (d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/a', '1') ORDER BY d AS SELECT * FROM test.root"
${CLICKHOUSE_CLIENT} --query "INSERT INTO test.root VALUES (1)";
${CLICKHOUSE_CLIENT} --query "INSERT INTO test.root VALUES (1)";
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test.a";
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.root"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.a"