diff --git a/src/Storages/StorageJoin.cpp b/src/Storages/StorageJoin.cpp index 6f7df6f4252..e271a50e505 100644 --- a/src/Storages/StorageJoin.cpp +++ b/src/Storages/StorageJoin.cpp @@ -68,11 +68,18 @@ StorageJoin::StorageJoin( restore(); } +BlockOutputStreamPtr StorageJoin::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) +{ + std::lock_guard mutate_lock(mutate_mutex); + return StorageSetOrJoinBase::write(query, metadata_snapshot, context); +} void StorageJoin::truncate( const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder&) { - std::lock_guard lock(mutex); + std::lock_guard mutate_lock(mutate_mutex); + std::unique_lock lock(rwlock); + disk->removeRecursive(path); disk->createDirectories(path); disk->createDirectories(path + "tmp/"); @@ -83,45 +90,47 @@ void StorageJoin::truncate( void StorageJoin::checkMutationIsPossible(const MutationCommands & commands, const Settings & /* settings */) const { - for (const auto & command: commands) - { - switch (command.type) - { - case MutationCommand::Type::DELETE: - break; - case MutationCommand::Type::UPDATE: - throw Exception("Table engine Join doesn't support update mutation, please use insert instead", ErrorCodes::NOT_IMPLEMENTED); - default: - throw Exception("Table engine Join doesn't support this mutation", ErrorCodes::NOT_IMPLEMENTED); - } - } + for (const auto & command : commands) + if (command.type != MutationCommand::DELETE) + throw Exception("Table engine Join supports only DELETE mutations", ErrorCodes::NOT_IMPLEMENTED); } void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context) { - // Only delete is supported - std::lock_guard lock(mutex); + /// Firstly accuire lock for mutation, that locks changes of data. + /// We cannot accuire rwlock here, because read lock is needed + /// for execution of mutation interpreter. + std::lock_guard mutate_lock(mutate_mutex); + + constexpr auto tmp_backup_file_name = "tmp/mut.bin"; auto metadata_snapshot = getInMemoryMetadataPtr(); - auto storage = getStorageID(); - auto storage_ptr = DatabaseCatalog::instance().getTable(storage, context); - auto interpreter = std::make_unique(storage_ptr, metadata_snapshot, commands, context, true); - auto in = interpreter->execute(); - in->readPrefix(); - auto new_data = std::make_shared(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite); - - const String backup_file_name = "1.bin"; // id starts from 1 - auto backup_buf = disk->writeFile(path + "tmp/" + backup_file_name); + auto backup_buf = disk->writeFile(path + tmp_backup_file_name); auto compressed_backup_buf = CompressedWriteBuffer(*backup_buf); auto backup_stream = NativeBlockOutputStream(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock()); - while (const Block & block = in->read()) + auto new_data = std::make_shared(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite); + + // New scope controls lifetime of InputStream. { - new_data->addJoinedBlock(block, true); - if (persistent) - backup_stream.write(block); + auto storage_ptr = DatabaseCatalog::instance().getTable(getStorageID(), context); + auto interpreter = std::make_unique(storage_ptr, metadata_snapshot, commands, context, true); + auto in = interpreter->execute(); + in->readPrefix(); + + while (const Block & block = in->read()) + { + new_data->addJoinedBlock(block, true); + if (persistent) + backup_stream.write(block); + } + + in->readSuffix(); } + /// Now accuire exclusive lock and modify storage. + std::unique_lock lock(rwlock); + join = std::move(new_data); increment = 1; @@ -140,7 +149,7 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context) disk->removeFileIfExists(path + file_name); } - disk->replaceFile(path + "tmp/" + backup_file_name, path + backup_file_name); + disk->replaceFile(path + tmp_backup_file_name, path + std::to_string(increment) + ".bin"); } } diff --git a/src/Storages/StorageJoin.h b/src/Storages/StorageJoin.h index 7315115520e..78d8b9768e9 100644 --- a/src/Storages/StorageJoin.h +++ b/src/Storages/StorageJoin.h @@ -29,6 +29,7 @@ public: void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &) override; + /// Only delete is supported. void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override; void mutate(const MutationCommands & commands, ContextPtr context) override; @@ -44,6 +45,8 @@ public: /// (but not during processing whole query, it's safe for joinGet that doesn't involve `used_flags` from HashJoin) ColumnWithTypeAndName joinGet(const Block & block, const Block & block_with_columns_to_add) const; + BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override; + Pipe read( const Names & column_names, const StorageMetadataPtr & /*metadata_snapshot*/, @@ -71,8 +74,7 @@ private: /// Protect state for concurrent use in insertFromBlock and joinBlock. /// Lock is stored in HashJoin instance during query and blocks concurrent insertions. mutable std::shared_mutex rwlock; - - mutable std::mutex mutex; + mutable std::mutex mutate_mutex; void insertBlock(const Block & block) override; void finishInsert() override {} diff --git a/tests/queries/0_stateless/01821_join_table_mutation.reference b/tests/queries/0_stateless/01821_join_table_mutation.reference index e79d145b39b..8c446c806b5 100644 --- a/tests/queries/0_stateless/01821_join_table_mutation.reference +++ b/tests/queries/0_stateless/01821_join_table_mutation.reference @@ -3,3 +3,5 @@ 99 m10 50 +48 +0 diff --git a/tests/queries/0_stateless/01821_join_table_mutation.sql b/tests/queries/0_stateless/01821_join_table_mutation.sql index 9662a197b88..78903ebd6ec 100644 --- a/tests/queries/0_stateless/01821_join_table_mutation.sql +++ b/tests/queries/0_stateless/01821_join_table_mutation.sql @@ -4,13 +4,13 @@ CREATE TABLE join_table_mutation(id Int32, name String) ENGINE = Join(ANY, LEFT, INSERT INTO join_table_mutation select number, toString(number) from numbers(100); -SELECT count(1) FROM join_table_mutation; +SELECT count() FROM join_table_mutation; SELECT name FROM join_table_mutation WHERE id = 10; ALTER TABLE join_table_mutation DELETE WHERE id = 10; -SELECT count(1) FROM join_table_mutation; +SELECT count() FROM join_table_mutation; SELECT name FROM join_table_mutation WHERE id = 10; @@ -20,4 +20,16 @@ SELECT name FROM join_table_mutation WHERE id = 10; ALTER TABLE join_table_mutation DELETE WHERE id % 2 = 0; -SELECT count(1) FROM join_table_mutation; \ No newline at end of file +ALTER TABLE join_table_mutation UPDATE name = 'some' WHERE 1; -- {serverError 48} + +SELECT count() FROM join_table_mutation; + +ALTER TABLE join_table_mutation DELETE WHERE name IN ('1', '2', '3', '4'); + +SELECT count() FROM join_table_mutation; + +ALTER TABLE join_table_mutation DELETE WHERE 1; + +SELECT count() FROM join_table_mutation; + +DROP TABLE join_table_mutation; diff --git a/tests/queries/0_stateless/01821_join_table_race_long.reference b/tests/queries/0_stateless/01821_join_table_race_long.reference new file mode 100644 index 00000000000..f2018833bc6 --- /dev/null +++ b/tests/queries/0_stateless/01821_join_table_race_long.reference @@ -0,0 +1 @@ +1 foo diff --git a/tests/queries/0_stateless/01821_join_table_race_long.sh b/tests/queries/0_stateless/01821_join_table_race_long.sh new file mode 100755 index 00000000000..9602da1e12a --- /dev/null +++ b/tests/queries/0_stateless/01821_join_table_race_long.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + + +$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS join_table_race" +$CLICKHOUSE_CLIENT -q "CREATE TABLE join_table_race(id Int32, name String) ENGINE = Join(ANY, LEFT, id)" + +for i in {0..100}; do $CLICKHOUSE_CLIENT -q "INSERT INTO join_table_race VALUES ($RANDOM, '$RANDOM')" > /dev/null 2> /dev/null; done & + +for i in {0..200}; do $CLICKHOUSE_CLIENT -q "SELECT count() FROM join_table_race FORMAT Null" > /dev/null 2> /dev/null; done & + +for i in {0..100}; do $CLICKHOUSE_CLIENT -q "TRUNCATE TABLE join_table_race" > /dev/null 2> /dev/null; done & + +for i in {0..100}; do $CLICKHOUSE_CLIENT -q "ALTER TABLE join_table_race DELETE WHERE id % 2 = 0" > /dev/null 2> /dev/null; done & + +wait + +$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE join_table_race" +$CLICKHOUSE_CLIENT -q "INSERT INTO join_table_race VALUES (1, 'foo')" +$CLICKHOUSE_CLIENT -q "SELECT id, name FROM join_table_race" + +$CLICKHOUSE_CLIENT -q "DROP TABLE join_table_race"