mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 09:10:48 +00:00
support join table mutation
This commit is contained in:
parent
8b3c37e648
commit
31f2c65e4f
@ -1,24 +1,25 @@
|
||||
#include <Storages/StorageJoin.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/StorageSet.h>
|
||||
#include <Interpreters/HashJoin.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTSetQuery.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Core/ColumnNumbers.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Interpreters/joinDispatch.h>
|
||||
#include <Interpreters/MutationsInterpreter.h>
|
||||
#include <Interpreters/TableJoin.h>
|
||||
#include <Interpreters/castColumn.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
#include <Poco/String.h> /// toLower
|
||||
#include <Poco/File.h>
|
||||
#include <Processors/Sources/SourceWithProgress.h>
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
#include <Processors/Pipe.h>
|
||||
#include <Processors/Sources/SourceWithProgress.h>
|
||||
#include <Poco/String.h> /// toLower
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -70,6 +71,7 @@ StorageJoin::StorageJoin(
|
||||
void StorageJoin::truncate(
|
||||
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder&)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
disk->removeRecursive(path);
|
||||
disk->createDirectories(path);
|
||||
disk->createDirectories(path + "tmp/");
|
||||
@ -78,6 +80,64 @@ void StorageJoin::truncate(
|
||||
join = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
|
||||
}
|
||||
|
||||
void StorageJoin::checkMutationIsPossible(const MutationCommands & commands, const Settings & /* settings */) const
|
||||
{
|
||||
for(const auto& command: commands) {
|
||||
switch (command.type) {
|
||||
case MutationCommand::Type::DELETE:
|
||||
break;
|
||||
case MutationCommand::Type::UPDATE:
|
||||
throw Exception("Table engine Join doesn't support update mutation, please use insert instead", ErrorCodes::NOT_IMPLEMENTED);
|
||||
default:
|
||||
throw Exception("Table engine Join doesn't support this mutation", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
|
||||
{
|
||||
// Only delete is supported
|
||||
std::lock_guard lock(mutex);
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
auto storage = getStorageID();
|
||||
auto storage_ptr = DatabaseCatalog::instance().getTable(storage, context);
|
||||
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context, true);
|
||||
auto in = interpreter->execute();
|
||||
in->readPrefix();
|
||||
|
||||
auto new_data = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
|
||||
|
||||
const String backup_file_name = "1.bin"; // id starts from 1
|
||||
auto backup_buf = disk->writeFile(path + "tmp/" + backup_file_name);
|
||||
auto compressed_backup_buf = CompressedWriteBuffer(*backup_buf);
|
||||
auto backup_stream = NativeBlockOutputStream(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock());
|
||||
|
||||
while (const Block & block = in->read()) {
|
||||
new_data->addJoinedBlock(block, true);
|
||||
if (persistent)
|
||||
backup_stream.write(block);
|
||||
}
|
||||
|
||||
join = std::move(new_data);
|
||||
increment = 1;
|
||||
|
||||
if (persistent) {
|
||||
backup_stream.flush();
|
||||
compressed_backup_buf.next();
|
||||
backup_buf->next();
|
||||
backup_buf->finalize();
|
||||
|
||||
std::vector<std::string> files;
|
||||
disk->listFiles(path, files);
|
||||
for (const auto& file_name: files) {
|
||||
if (file_name.ends_with(".bin")) {
|
||||
disk->removeFileIfExists(path + file_name);
|
||||
}
|
||||
}
|
||||
|
||||
disk->replaceFile(path + "tmp/" + backup_file_name, path + backup_file_name);
|
||||
}
|
||||
}
|
||||
|
||||
HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join) const
|
||||
{
|
||||
@ -104,8 +164,7 @@ HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join)
|
||||
|
||||
|
||||
void StorageJoin::insertBlock(const Block & block)
|
||||
{
|
||||
std::unique_lock<std::shared_mutex> lock(rwlock);
|
||||
{std::unique_lock<std::shared_mutex> lock(rwlock);
|
||||
join->addJoinedBlock(block, true);
|
||||
}
|
||||
|
||||
|
@ -29,6 +29,9 @@ public:
|
||||
|
||||
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &) override;
|
||||
|
||||
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
|
||||
void mutate(const MutationCommands & commands, ContextPtr context) override;
|
||||
|
||||
/// Return instance of HashJoin holding lock that protects from insertions to StorageJoin.
|
||||
/// HashJoin relies on structure of hash table that's why we need to return it with locked mutex.
|
||||
HashJoinPtr getJoinLocked(std::shared_ptr<TableJoin> analyzed_join) const;
|
||||
@ -69,6 +72,8 @@ private:
|
||||
/// Lock is stored in HashJoin instance during query and blocks concurrent insertions.
|
||||
mutable std::shared_mutex rwlock;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
|
||||
void insertBlock(const Block & block) override;
|
||||
void finishInsert() override {}
|
||||
size_t getSize() const override;
|
||||
|
@ -0,0 +1,5 @@
|
||||
100
|
||||
10
|
||||
99
|
||||
m10
|
||||
50
|
23
tests/queries/0_stateless/01821_join_table_mutation.sql
Normal file
23
tests/queries/0_stateless/01821_join_table_mutation.sql
Normal file
@ -0,0 +1,23 @@
|
||||
DROP TABLE IF EXISTS join_table_mutation;
|
||||
|
||||
CREATE TABLE join_table_mutation(id Int32, name String) ENGINE = Join(ANY, LEFT, id);
|
||||
|
||||
INSERT INTO join_table_mutation select number, toString(number) from numbers(100);
|
||||
|
||||
SELECT count(1) FROM join_table_mutation;
|
||||
|
||||
SELECT name FROM join_table_mutation WHERE id = 10;
|
||||
|
||||
ALTER TABLE join_table_mutation DELETE WHERE id = 10;
|
||||
|
||||
SELECT count(1) FROM join_table_mutation;
|
||||
|
||||
SELECT name FROM join_table_mutation WHERE id = 10;
|
||||
|
||||
INSERT INTO join_table_mutation VALUES (10, 'm10');
|
||||
|
||||
SELECT name FROM join_table_mutation WHERE id = 10;
|
||||
|
||||
ALTER TABLE join_table_mutation DELETE WHERE id % 2 = 0;
|
||||
|
||||
SELECT count(1) FROM join_table_mutation;
|
Loading…
Reference in New Issue
Block a user