mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
commit
c0ef021c6a
@ -1,22 +1,25 @@
|
|||||||
#include <Storages/StorageJoin.h>
|
#include <Storages/StorageJoin.h>
|
||||||
#include <Storages/StorageFactory.h>
|
#include <Storages/StorageFactory.h>
|
||||||
|
#include <Storages/StorageSet.h>
|
||||||
#include <Interpreters/HashJoin.h>
|
#include <Interpreters/HashJoin.h>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <Parsers/ASTCreateQuery.h>
|
#include <Parsers/ASTCreateQuery.h>
|
||||||
#include <Parsers/ASTSetQuery.h>
|
|
||||||
#include <Parsers/ASTIdentifier.h>
|
#include <Parsers/ASTIdentifier.h>
|
||||||
#include <Core/ColumnNumbers.h>
|
#include <Core/ColumnNumbers.h>
|
||||||
#include <DataStreams/IBlockInputStream.h>
|
#include <DataStreams/IBlockInputStream.h>
|
||||||
#include <DataTypes/NestedUtils.h>
|
#include <DataTypes/NestedUtils.h>
|
||||||
#include <Disks/IDisk.h>
|
#include <Disks/IDisk.h>
|
||||||
#include <Interpreters/joinDispatch.h>
|
#include <Interpreters/joinDispatch.h>
|
||||||
|
#include <Interpreters/MutationsInterpreter.h>
|
||||||
#include <Interpreters/TableJoin.h>
|
#include <Interpreters/TableJoin.h>
|
||||||
#include <Interpreters/castColumn.h>
|
#include <Interpreters/castColumn.h>
|
||||||
#include <Common/assert_cast.h>
|
|
||||||
#include <Common/quoteString.h>
|
#include <Common/quoteString.h>
|
||||||
#include <Poco/String.h> /// toLower
|
#include <Common/Exception.h>
|
||||||
|
|
||||||
|
#include <Compression/CompressedWriteBuffer.h>
|
||||||
#include <Processors/Sources/SourceWithProgress.h>
|
#include <Processors/Sources/SourceWithProgress.h>
|
||||||
#include <Processors/Pipe.h>
|
#include <Processors/Pipe.h>
|
||||||
|
#include <Poco/String.h> /// toLower
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -65,10 +68,18 @@ StorageJoin::StorageJoin(
|
|||||||
restore();
|
restore();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BlockOutputStreamPtr StorageJoin::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
|
||||||
|
{
|
||||||
|
std::lock_guard mutate_lock(mutate_mutex);
|
||||||
|
return StorageSetOrJoinBase::write(query, metadata_snapshot, context);
|
||||||
|
}
|
||||||
|
|
||||||
void StorageJoin::truncate(
|
void StorageJoin::truncate(
|
||||||
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder&)
|
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder&)
|
||||||
{
|
{
|
||||||
|
std::lock_guard mutate_lock(mutate_mutex);
|
||||||
|
std::unique_lock<std::shared_mutex> lock(rwlock);
|
||||||
|
|
||||||
disk->removeRecursive(path);
|
disk->removeRecursive(path);
|
||||||
disk->createDirectories(path);
|
disk->createDirectories(path);
|
||||||
disk->createDirectories(path + "tmp/");
|
disk->createDirectories(path + "tmp/");
|
||||||
@ -77,6 +88,70 @@ void StorageJoin::truncate(
|
|||||||
join = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
|
join = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void StorageJoin::checkMutationIsPossible(const MutationCommands & commands, const Settings & /* settings */) const
|
||||||
|
{
|
||||||
|
for (const auto & command : commands)
|
||||||
|
if (command.type != MutationCommand::DELETE)
|
||||||
|
throw Exception("Table engine Join supports only DELETE mutations", ErrorCodes::NOT_IMPLEMENTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
|
||||||
|
{
|
||||||
|
/// Firstly accuire lock for mutation, that locks changes of data.
|
||||||
|
/// We cannot accuire rwlock here, because read lock is needed
|
||||||
|
/// for execution of mutation interpreter.
|
||||||
|
std::lock_guard mutate_lock(mutate_mutex);
|
||||||
|
|
||||||
|
constexpr auto tmp_backup_file_name = "tmp/mut.bin";
|
||||||
|
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||||
|
|
||||||
|
auto backup_buf = disk->writeFile(path + tmp_backup_file_name);
|
||||||
|
auto compressed_backup_buf = CompressedWriteBuffer(*backup_buf);
|
||||||
|
auto backup_stream = NativeBlockOutputStream(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock());
|
||||||
|
|
||||||
|
auto new_data = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
|
||||||
|
|
||||||
|
// New scope controls lifetime of InputStream.
|
||||||
|
{
|
||||||
|
auto storage_ptr = DatabaseCatalog::instance().getTable(getStorageID(), context);
|
||||||
|
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context, true);
|
||||||
|
auto in = interpreter->execute();
|
||||||
|
in->readPrefix();
|
||||||
|
|
||||||
|
while (const Block & block = in->read())
|
||||||
|
{
|
||||||
|
new_data->addJoinedBlock(block, true);
|
||||||
|
if (persistent)
|
||||||
|
backup_stream.write(block);
|
||||||
|
}
|
||||||
|
|
||||||
|
in->readSuffix();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Now accuire exclusive lock and modify storage.
|
||||||
|
std::unique_lock<std::shared_mutex> lock(rwlock);
|
||||||
|
|
||||||
|
join = std::move(new_data);
|
||||||
|
increment = 1;
|
||||||
|
|
||||||
|
if (persistent)
|
||||||
|
{
|
||||||
|
backup_stream.flush();
|
||||||
|
compressed_backup_buf.next();
|
||||||
|
backup_buf->next();
|
||||||
|
backup_buf->finalize();
|
||||||
|
|
||||||
|
std::vector<std::string> files;
|
||||||
|
disk->listFiles(path, files);
|
||||||
|
for (const auto & file_name: files)
|
||||||
|
{
|
||||||
|
if (file_name.ends_with(".bin"))
|
||||||
|
disk->removeFileIfExists(path + file_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
disk->replaceFile(path + tmp_backup_file_name, path + std::to_string(increment) + ".bin");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join) const
|
HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join) const
|
||||||
{
|
{
|
||||||
|
@ -29,6 +29,10 @@ public:
|
|||||||
|
|
||||||
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &) override;
|
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &) override;
|
||||||
|
|
||||||
|
/// Only delete is supported.
|
||||||
|
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
|
||||||
|
void mutate(const MutationCommands & commands, ContextPtr context) override;
|
||||||
|
|
||||||
/// Return instance of HashJoin holding lock that protects from insertions to StorageJoin.
|
/// Return instance of HashJoin holding lock that protects from insertions to StorageJoin.
|
||||||
/// HashJoin relies on structure of hash table that's why we need to return it with locked mutex.
|
/// HashJoin relies on structure of hash table that's why we need to return it with locked mutex.
|
||||||
HashJoinPtr getJoinLocked(std::shared_ptr<TableJoin> analyzed_join) const;
|
HashJoinPtr getJoinLocked(std::shared_ptr<TableJoin> analyzed_join) const;
|
||||||
@ -41,6 +45,8 @@ public:
|
|||||||
/// (but not during processing whole query, it's safe for joinGet that doesn't involve `used_flags` from HashJoin)
|
/// (but not during processing whole query, it's safe for joinGet that doesn't involve `used_flags` from HashJoin)
|
||||||
ColumnWithTypeAndName joinGet(const Block & block, const Block & block_with_columns_to_add) const;
|
ColumnWithTypeAndName joinGet(const Block & block, const Block & block_with_columns_to_add) const;
|
||||||
|
|
||||||
|
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
|
||||||
|
|
||||||
Pipe read(
|
Pipe read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
const StorageMetadataPtr & /*metadata_snapshot*/,
|
||||||
@ -68,6 +74,7 @@ private:
|
|||||||
/// Protect state for concurrent use in insertFromBlock and joinBlock.
|
/// Protect state for concurrent use in insertFromBlock and joinBlock.
|
||||||
/// Lock is stored in HashJoin instance during query and blocks concurrent insertions.
|
/// Lock is stored in HashJoin instance during query and blocks concurrent insertions.
|
||||||
mutable std::shared_mutex rwlock;
|
mutable std::shared_mutex rwlock;
|
||||||
|
mutable std::mutex mutate_mutex;
|
||||||
|
|
||||||
void insertBlock(const Block & block) override;
|
void insertBlock(const Block & block) override;
|
||||||
void finishInsert() override {}
|
void finishInsert() override {}
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
100
|
||||||
|
10
|
||||||
|
99
|
||||||
|
m10
|
||||||
|
50
|
||||||
|
48
|
||||||
|
0
|
35
tests/queries/0_stateless/01821_join_table_mutation.sql
Normal file
35
tests/queries/0_stateless/01821_join_table_mutation.sql
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
DROP TABLE IF EXISTS join_table_mutation;
|
||||||
|
|
||||||
|
CREATE TABLE join_table_mutation(id Int32, name String) ENGINE = Join(ANY, LEFT, id);
|
||||||
|
|
||||||
|
INSERT INTO join_table_mutation select number, toString(number) from numbers(100);
|
||||||
|
|
||||||
|
SELECT count() FROM join_table_mutation;
|
||||||
|
|
||||||
|
SELECT name FROM join_table_mutation WHERE id = 10;
|
||||||
|
|
||||||
|
ALTER TABLE join_table_mutation DELETE WHERE id = 10;
|
||||||
|
|
||||||
|
SELECT count() FROM join_table_mutation;
|
||||||
|
|
||||||
|
SELECT name FROM join_table_mutation WHERE id = 10;
|
||||||
|
|
||||||
|
INSERT INTO join_table_mutation VALUES (10, 'm10');
|
||||||
|
|
||||||
|
SELECT name FROM join_table_mutation WHERE id = 10;
|
||||||
|
|
||||||
|
ALTER TABLE join_table_mutation DELETE WHERE id % 2 = 0;
|
||||||
|
|
||||||
|
ALTER TABLE join_table_mutation UPDATE name = 'some' WHERE 1; -- {serverError 48}
|
||||||
|
|
||||||
|
SELECT count() FROM join_table_mutation;
|
||||||
|
|
||||||
|
ALTER TABLE join_table_mutation DELETE WHERE name IN ('1', '2', '3', '4');
|
||||||
|
|
||||||
|
SELECT count() FROM join_table_mutation;
|
||||||
|
|
||||||
|
ALTER TABLE join_table_mutation DELETE WHERE 1;
|
||||||
|
|
||||||
|
SELECT count() FROM join_table_mutation;
|
||||||
|
|
||||||
|
DROP TABLE join_table_mutation;
|
@ -0,0 +1 @@
|
|||||||
|
1 foo
|
25
tests/queries/0_stateless/01821_join_table_race_long.sh
Executable file
25
tests/queries/0_stateless/01821_join_table_race_long.sh
Executable file
@ -0,0 +1,25 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS join_table_race"
|
||||||
|
$CLICKHOUSE_CLIENT -q "CREATE TABLE join_table_race(id Int32, name String) ENGINE = Join(ANY, LEFT, id)"
|
||||||
|
|
||||||
|
for _ in {0..100}; do $CLICKHOUSE_CLIENT -q "INSERT INTO join_table_race VALUES ($RANDOM, '$RANDOM')" > /dev/null 2> /dev/null; done &
|
||||||
|
|
||||||
|
for _ in {0..200}; do $CLICKHOUSE_CLIENT -q "SELECT count() FROM join_table_race FORMAT Null" > /dev/null 2> /dev/null; done &
|
||||||
|
|
||||||
|
for _ in {0..100}; do $CLICKHOUSE_CLIENT -q "TRUNCATE TABLE join_table_race" > /dev/null 2> /dev/null; done &
|
||||||
|
|
||||||
|
for _ in {0..100}; do $CLICKHOUSE_CLIENT -q "ALTER TABLE join_table_race DELETE WHERE id % 2 = 0" > /dev/null 2> /dev/null; done &
|
||||||
|
|
||||||
|
wait
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE join_table_race"
|
||||||
|
$CLICKHOUSE_CLIENT -q "INSERT INTO join_table_race VALUES (1, 'foo')"
|
||||||
|
$CLICKHOUSE_CLIENT -q "SELECT id, name FROM join_table_race"
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT -q "DROP TABLE join_table_race"
|
Loading…
Reference in New Issue
Block a user