better lock in StorageJoin

This commit is contained in:
Anton Popov 2021-06-01 02:22:05 +03:00
parent 7f27305787
commit f9cf7c46e1
6 changed files with 85 additions and 34 deletions

View File

@ -68,11 +68,18 @@ StorageJoin::StorageJoin(
restore();
}
BlockOutputStreamPtr StorageJoin::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
std::lock_guard mutate_lock(mutate_mutex);
return StorageSetOrJoinBase::write(query, metadata_snapshot, context);
}
void StorageJoin::truncate(
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder&)
{
std::lock_guard lock(mutex);
std::lock_guard mutate_lock(mutate_mutex);
std::unique_lock<std::shared_mutex> lock(rwlock);
disk->removeRecursive(path);
disk->createDirectories(path);
disk->createDirectories(path + "tmp/");
@ -83,45 +90,47 @@ void StorageJoin::truncate(
void StorageJoin::checkMutationIsPossible(const MutationCommands & commands, const Settings & /* settings */) const
{
for (const auto & command: commands)
{
switch (command.type)
{
case MutationCommand::Type::DELETE:
break;
case MutationCommand::Type::UPDATE:
throw Exception("Table engine Join doesn't support update mutation, please use insert instead", ErrorCodes::NOT_IMPLEMENTED);
default:
throw Exception("Table engine Join doesn't support this mutation", ErrorCodes::NOT_IMPLEMENTED);
}
}
for (const auto & command : commands)
if (command.type != MutationCommand::DELETE)
throw Exception("Table engine Join supports only DELETE mutations", ErrorCodes::NOT_IMPLEMENTED);
}
void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
{
// Only delete is supported
std::lock_guard lock(mutex);
/// Firstly accuire lock for mutation, that locks changes of data.
/// We cannot accuire rwlock here, because read lock is needed
/// for execution of mutation interpreter.
std::lock_guard mutate_lock(mutate_mutex);
constexpr auto tmp_backup_file_name = "tmp/mut.bin";
auto metadata_snapshot = getInMemoryMetadataPtr();
auto storage = getStorageID();
auto storage_ptr = DatabaseCatalog::instance().getTable(storage, context);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context, true);
auto in = interpreter->execute();
in->readPrefix();
auto new_data = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
const String backup_file_name = "1.bin"; // id starts from 1
auto backup_buf = disk->writeFile(path + "tmp/" + backup_file_name);
auto backup_buf = disk->writeFile(path + tmp_backup_file_name);
auto compressed_backup_buf = CompressedWriteBuffer(*backup_buf);
auto backup_stream = NativeBlockOutputStream(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock());
while (const Block & block = in->read())
auto new_data = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
// New scope controls lifetime of InputStream.
{
new_data->addJoinedBlock(block, true);
if (persistent)
backup_stream.write(block);
auto storage_ptr = DatabaseCatalog::instance().getTable(getStorageID(), context);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context, true);
auto in = interpreter->execute();
in->readPrefix();
while (const Block & block = in->read())
{
new_data->addJoinedBlock(block, true);
if (persistent)
backup_stream.write(block);
}
in->readSuffix();
}
/// Now accuire exclusive lock and modify storage.
std::unique_lock<std::shared_mutex> lock(rwlock);
join = std::move(new_data);
increment = 1;
@ -140,7 +149,7 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
disk->removeFileIfExists(path + file_name);
}
disk->replaceFile(path + "tmp/" + backup_file_name, path + backup_file_name);
disk->replaceFile(path + tmp_backup_file_name, path + std::to_string(increment) + ".bin");
}
}

View File

@ -29,6 +29,7 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &) override;
/// Only delete is supported.
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
@ -44,6 +45,8 @@ public:
/// (but not during processing whole query, it's safe for joinGet that doesn't involve `used_flags` from HashJoin)
ColumnWithTypeAndName joinGet(const Block & block, const Block & block_with_columns_to_add) const;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
@ -71,8 +74,7 @@ private:
/// Protect state for concurrent use in insertFromBlock and joinBlock.
/// Lock is stored in HashJoin instance during query and blocks concurrent insertions.
mutable std::shared_mutex rwlock;
mutable std::mutex mutex;
mutable std::mutex mutate_mutex;
void insertBlock(const Block & block) override;
void finishInsert() override {}

View File

@ -3,3 +3,5 @@
99
m10
50
48
0

View File

@ -4,13 +4,13 @@ CREATE TABLE join_table_mutation(id Int32, name String) ENGINE = Join(ANY, LEFT,
INSERT INTO join_table_mutation select number, toString(number) from numbers(100);
SELECT count(1) FROM join_table_mutation;
SELECT count() FROM join_table_mutation;
SELECT name FROM join_table_mutation WHERE id = 10;
ALTER TABLE join_table_mutation DELETE WHERE id = 10;
SELECT count(1) FROM join_table_mutation;
SELECT count() FROM join_table_mutation;
SELECT name FROM join_table_mutation WHERE id = 10;
@ -20,4 +20,16 @@ SELECT name FROM join_table_mutation WHERE id = 10;
ALTER TABLE join_table_mutation DELETE WHERE id % 2 = 0;
SELECT count(1) FROM join_table_mutation;
ALTER TABLE join_table_mutation UPDATE name = 'some' WHERE 1; -- {serverError 48}
SELECT count() FROM join_table_mutation;
ALTER TABLE join_table_mutation DELETE WHERE name IN ('1', '2', '3', '4');
SELECT count() FROM join_table_mutation;
ALTER TABLE join_table_mutation DELETE WHERE 1;
SELECT count() FROM join_table_mutation;
DROP TABLE join_table_mutation;

View File

@ -0,0 +1 @@
1 foo

View File

@ -0,0 +1,25 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS join_table_race"
$CLICKHOUSE_CLIENT -q "CREATE TABLE join_table_race(id Int32, name String) ENGINE = Join(ANY, LEFT, id)"
for i in {0..100}; do $CLICKHOUSE_CLIENT -q "INSERT INTO join_table_race VALUES ($RANDOM, '$RANDOM')" > /dev/null 2> /dev/null; done &
for i in {0..200}; do $CLICKHOUSE_CLIENT -q "SELECT count() FROM join_table_race FORMAT Null" > /dev/null 2> /dev/null; done &
for i in {0..100}; do $CLICKHOUSE_CLIENT -q "TRUNCATE TABLE join_table_race" > /dev/null 2> /dev/null; done &
for i in {0..100}; do $CLICKHOUSE_CLIENT -q "ALTER TABLE join_table_race DELETE WHERE id % 2 = 0" > /dev/null 2> /dev/null; done &
wait
$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE join_table_race"
$CLICKHOUSE_CLIENT -q "INSERT INTO join_table_race VALUES (1, 'foo')"
$CLICKHOUSE_CLIENT -q "SELECT id, name FROM join_table_race"
$CLICKHOUSE_CLIENT -q "DROP TABLE join_table_race"