This commit is contained in:
Alexander Tokmakov 2021-07-02 19:39:55 +03:00
parent 1b2416007e
commit aa3f0b2032
17 changed files with 2496 additions and 2333 deletions

View File

@ -2677,6 +2677,13 @@ ZooKeeperMetadataTransactionPtr Context::getZooKeeperMetadataTransaction() const
return metadata_transaction;
}
void Context::resetZooKeeperMetadataTransaction()
{
assert(metadata_transaction);
assert(hasQueryContext());
metadata_transaction = nullptr;
}
PartUUIDsPtr Context::getPartUUIDs() const
{
auto lock = getLock();

View File

@ -782,6 +782,8 @@ public:
void initZooKeeperMetadataTransaction(ZooKeeperMetadataTransactionPtr txn, bool attach_existing = false);
/// Returns context of current distributed DDL query or nullptr.
ZooKeeperMetadataTransactionPtr getZooKeeperMetadataTransaction() const;
/// Removes context of current distributed DDL.
void resetZooKeeperMetadataTransaction();
struct MySQLWireContext
{

View File

@ -22,6 +22,7 @@ namespace ErrorCodes
extern const int UNKNOWN_FORMAT_VERSION;
extern const int UNKNOWN_TYPE_OF_QUERY;
extern const int INCONSISTENT_CLUSTER_DEFINITION;
extern const int LOGICAL_ERROR;
}
HostID HostID::fromString(const String & host_port_str)
@ -401,7 +402,8 @@ UInt32 DDLTaskBase::getLogEntryNumber(const String & log_entry_name)
void ZooKeeperMetadataTransaction::commit()
{
assert(state == CREATED);
if (state != CREATED)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect state ({}), it's a bug", state);
state = FAILED;
current_zookeeper->multi(ops);
state = COMMITTED;

View File

@ -20,6 +20,11 @@ namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class ASTQueryWithOnCluster;
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
using ClusterPtr = std::shared_ptr<Cluster>;
@ -180,15 +185,19 @@ public:
String getDatabaseZooKeeperPath() const { return zookeeper_path; }
ZooKeeperPtr getZooKeeper() const { return current_zookeeper; }
void addOp(Coordination::RequestPtr && op)
{
assert(!isExecuted());
if (isExecuted())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add ZooKeeper operation because query is executed. It's a bug.");
ops.emplace_back(op);
}
void moveOpsTo(Coordination::Requests & other_ops)
{
assert(!isExecuted());
if (isExecuted())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add ZooKeeper operation because query is executed. It's a bug.");
std::move(ops.begin(), ops.end(), std::back_inserter(other_ops));
ops.clear();
state = COMMITTED;

View File

@ -34,6 +34,7 @@
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
@ -86,7 +87,6 @@ namespace ErrorCodes
extern const int UNKNOWN_DATABASE;
extern const int PATH_ACCESS_DENIED;
extern const int NOT_IMPLEMENTED;
extern const int UNKNOWN_TABLE;
}
namespace fs = std::filesystem;
@ -1075,6 +1075,29 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
const InterpreterCreateQuery::TableProperties & properties)
{
/// Replicated database requires separate contexts for each DDL query
ContextPtr current_context = getContext();
ContextMutablePtr create_context = Context::createCopy(current_context);
create_context->setQueryContext(std::const_pointer_cast<Context>(current_context));
auto make_drop_context = [&](bool on_error) -> ContextMutablePtr
{
ContextMutablePtr drop_context = Context::createCopy(current_context);
drop_context->makeQueryContext();
if (on_error)
return drop_context;
if (auto txn = current_context->getZooKeeperMetadataTransaction())
{
/// Execute drop as separate query, because [CREATE OR] REPLACE query can be considered as
/// successfully executed after RENAME/EXCHANGE query.
drop_context->resetZooKeeperMetadataTransaction();
auto drop_txn = std::make_shared<ZooKeeperMetadataTransaction>(txn->getZooKeeper(), txn->getDatabaseZooKeeperPath(), txn->isInitialQuery());
drop_context->initZooKeeperMetadataTransaction(drop_txn);
}
return drop_context;
};
auto ast_drop = std::make_shared<ASTDropQuery>();
String table_to_replace_name = create.table;
@ -1091,6 +1114,11 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
create.table = fmt::format("_tmp_replace_{}_{}",
getHexUIntLowercase(name_hash),
getHexUIntLowercase(random_suffix));
ast_drop->table = create.table;
ast_drop->is_dictionary = create.is_dictionary;
ast_drop->database = create.database;
ast_drop->kind = ASTDropQuery::Drop;
}
bool created = false;
@ -1098,12 +1126,8 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
try
{
/// Create temporary table (random name will be generated)
[[maybe_unused]] bool done = doCreateTable(create, properties);
[[maybe_unused]] bool done = InterpreterCreateQuery(query_ptr, create_context).doCreateTable(create, properties);
assert(done);
ast_drop->table = create.table;
ast_drop->is_dictionary = create.is_dictionary;
ast_drop->database = create.database;
ast_drop->kind = ASTDropQuery::Drop;
created = true;
/// Try fill temporary table
@ -1124,14 +1148,15 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
/// Will execute ordinary RENAME instead of EXCHANGE if the target table does not exist
ast_rename->rename_if_cannot_exchange = create.create_or_replace;
InterpreterRenameQuery interpreter_rename{ast_rename, getContext()};
InterpreterRenameQuery interpreter_rename{ast_rename, current_context};
interpreter_rename.execute();
renamed = true;
if (!interpreter_rename.renamedInsteadOfExchange())
{
/// Target table was replaced with new one, drop old table
InterpreterDropQuery(ast_drop, getContext()).execute();
auto drop_context = make_drop_context(false);
InterpreterDropQuery(ast_drop, drop_context).execute();
}
create.table = table_to_replace_name;
@ -1142,7 +1167,10 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
{
/// Drop temporary table if it was successfully created, but was not renamed to target name
if (created && !renamed)
InterpreterDropQuery(ast_drop, getContext()).execute();
{
auto drop_context = make_drop_context(true);
InterpreterDropQuery(ast_drop, drop_context).execute();
}
throw;
}
}

View File

@ -226,11 +226,17 @@ CreateDictionaryQuery::CreateDictionaryQuery(
PtrTo<ClusterClause> cluster,
bool attach_,
bool if_not_exists_,
bool create_or_replace_,
bool replace_,
PtrTo<TableIdentifier> identifier,
PtrTo<UUIDClause> uuid,
PtrTo<DictionarySchemaClause> schema,
PtrTo<DictionaryEngineClause> engine)
: DDLQuery(cluster, {identifier, uuid, schema, engine}), attach(attach_), if_not_exists(if_not_exists_)
: DDLQuery(cluster, {identifier, uuid, schema, engine})
, attach(attach_)
, if_not_exists(if_not_exists_)
, create_or_replace(create_or_replace_)
, replace(replace_)
{
}
@ -251,6 +257,8 @@ ASTPtr CreateDictionaryQuery::convertToOld() const
query->is_dictionary = true;
query->attach = attach;
query->if_not_exists = if_not_exists;
query->create_or_replace = create_or_replace;
query->replace_table = replace;
query->set(query->dictionary_attributes_list, get(SCHEMA)->convertToOld());
query->set(query->dictionary, get(ENGINE)->convertToOld());
@ -272,7 +280,7 @@ antlrcpp::Any ParseTreeVisitor::visitCreateDictionaryStmt(ClickHouseParser::Crea
auto schema = ctx->dictionarySchemaClause() ? visit(ctx->dictionarySchemaClause()).as<PtrTo<DictionarySchemaClause>>() : nullptr;
auto engine = ctx->dictionaryEngineClause() ? visit(ctx->dictionaryEngineClause()).as<PtrTo<DictionaryEngineClause>>() : nullptr;
return std::make_shared<CreateDictionaryQuery>(
cluster, !!ctx->ATTACH(), !!ctx->IF(), visit(ctx->tableIdentifier()), uuid, schema, engine);
cluster, !!ctx->ATTACH(), !!ctx->IF(), !!ctx->OR(), !!ctx->REPLACE(), visit(ctx->tableIdentifier()), uuid, schema, engine);
}
antlrcpp::Any ParseTreeVisitor::visitDictionaryArgExpr(ClickHouseParser::DictionaryArgExprContext *ctx)

View File

@ -161,6 +161,8 @@ class CreateDictionaryQuery : public DDLQuery
PtrTo<ClusterClause> cluster,
bool attach,
bool if_not_exists,
bool create_or_replace,
bool replace,
PtrTo<TableIdentifier> identifier,
PtrTo<UUIDClause> uuid,
PtrTo<DictionarySchemaClause> schema,
@ -177,7 +179,10 @@ class CreateDictionaryQuery : public DDLQuery
ENGINE, // DictionaryEngineClause
};
const bool attach, if_not_exists;
const bool attach;
const bool if_not_exists;
const bool create_or_replace;
const bool replace;
};
}

View File

@ -99,12 +99,19 @@ CreateTableQuery::CreateTableQuery(
bool attach_,
bool temporary_,
bool if_not_exists_,
bool create_or_replace_,
bool replace_,
PtrTo<TableIdentifier> identifier,
PtrTo<UUIDClause> uuid,
PtrTo<TableSchemaClause> schema,
PtrTo<EngineClause> engine,
PtrTo<SelectUnionQuery> query)
: DDLQuery(cluster, {identifier, uuid, schema, engine, query}), attach(attach_), temporary(temporary_), if_not_exists(if_not_exists_)
: DDLQuery(cluster, {identifier, uuid, schema, engine, query})
, attach(attach_)
, temporary(temporary_)
, if_not_exists(if_not_exists_)
, create_or_replace(create_or_replace_)
, replace(replace_)
{
}
@ -125,6 +132,8 @@ ASTPtr CreateTableQuery::convertToOld() const
query->attach = attach;
query->if_not_exists = if_not_exists;
query->temporary = temporary;
query->create_or_replace = create_or_replace;
query->replace_table = replace;
if (has(SCHEMA))
{
@ -164,6 +173,10 @@ String CreateTableQuery::dumpInfo() const
else info += "temporary=false, ";
if (if_not_exists) info += "if_not_exists=true";
else info += "if_not_exists=false";
if (create_or_replace) info += "create_or_replace=true";
else info += "create_or_replace=false";
if (replace) info += "replace=true";
else info += "replace=false";
return info;
}
@ -191,7 +204,7 @@ antlrcpp::Any ParseTreeVisitor::visitCreateTableStmt(ClickHouseParser::CreateTab
auto engine = ctx->engineClause() ? visit(ctx->engineClause()).as<PtrTo<EngineClause>>() : nullptr;
auto query = ctx->subqueryClause() ? visit(ctx->subqueryClause()).as<PtrTo<SelectUnionQuery>>() : nullptr;
return std::make_shared<CreateTableQuery>(
cluster, !!ctx->ATTACH(), !!ctx->TEMPORARY(), !!ctx->IF(), visit(ctx->tableIdentifier()), uuid, schema, engine, query);
cluster, !!ctx->ATTACH(), !!ctx->TEMPORARY(), !!ctx->IF(), !!ctx->OR(), !!ctx->REPLACE(), visit(ctx->tableIdentifier()), uuid, schema, engine, query);
}
antlrcpp::Any ParseTreeVisitor::visitSchemaDescriptionClause(ClickHouseParser::SchemaDescriptionClauseContext *ctx)

View File

@ -50,6 +50,8 @@ class CreateTableQuery : public DDLQuery
bool attach,
bool temporary,
bool if_not_exists,
bool create_or_replace,
bool replace,
PtrTo<TableIdentifier> identifier,
PtrTo<UUIDClause> uuid,
PtrTo<TableSchemaClause> schema,
@ -68,7 +70,11 @@ class CreateTableQuery : public DDLQuery
SUBQUERY, // SelectUnionQuery
};
const bool attach, temporary, if_not_exists;
const bool attach;
const bool temporary;
const bool if_not_exists;
const bool create_or_replace;
const bool replace;
String dumpInfo() const override;
};

File diff suppressed because it is too large Load Diff

View File

@ -91,10 +91,10 @@ checkStmt: CHECK TABLE tableIdentifier partitionClause?;
createStmt
: (ATTACH | CREATE) DATABASE (IF NOT EXISTS)? databaseIdentifier clusterClause? engineExpr? # CreateDatabaseStmt
| (ATTACH | CREATE) DICTIONARY (IF NOT EXISTS)? tableIdentifier uuidClause? clusterClause? dictionarySchemaClause dictionaryEngineClause # CreateDictionaryStmt
| (ATTACH | CREATE (OR REPLACE)? | REPLACE) DICTIONARY (IF NOT EXISTS)? tableIdentifier uuidClause? clusterClause? dictionarySchemaClause dictionaryEngineClause # CreateDictionaryStmt
| (ATTACH | CREATE) LIVE VIEW (IF NOT EXISTS)? tableIdentifier uuidClause? clusterClause? (WITH TIMEOUT DECIMAL_LITERAL?)? destinationClause? tableSchemaClause? subqueryClause # CreateLiveViewStmt
| (ATTACH | CREATE) MATERIALIZED VIEW (IF NOT EXISTS)? tableIdentifier uuidClause? clusterClause? tableSchemaClause? (destinationClause | engineClause POPULATE?) subqueryClause # CreateMaterializedViewStmt
| (ATTACH | CREATE) TEMPORARY? TABLE (IF NOT EXISTS)? tableIdentifier uuidClause? clusterClause? tableSchemaClause? engineClause? subqueryClause? # CreateTableStmt
| (ATTACH | CREATE (OR REPLACE)? | REPLACE) TEMPORARY? TABLE (IF NOT EXISTS)? tableIdentifier uuidClause? clusterClause? tableSchemaClause? engineClause? subqueryClause? # CreateTableStmt
| (ATTACH | CREATE) (OR REPLACE)? VIEW (IF NOT EXISTS)? tableIdentifier uuidClause? clusterClause? tableSchemaClause? subqueryClause # CreateViewStmt
;

View File

@ -793,11 +793,13 @@ public:
DictionaryEngineClauseContext *dictionaryEngineClause();
antlr4::tree::TerminalNode *ATTACH();
antlr4::tree::TerminalNode *CREATE();
antlr4::tree::TerminalNode *REPLACE();
antlr4::tree::TerminalNode *IF();
antlr4::tree::TerminalNode *NOT();
antlr4::tree::TerminalNode *EXISTS();
UuidClauseContext *uuidClause();
ClusterClauseContext *clusterClause();
antlr4::tree::TerminalNode *OR();
virtual antlrcpp::Any accept(antlr4::tree::ParseTreeVisitor *visitor) override;
};
@ -870,6 +872,7 @@ public:
TableIdentifierContext *tableIdentifier();
antlr4::tree::TerminalNode *ATTACH();
antlr4::tree::TerminalNode *CREATE();
antlr4::tree::TerminalNode *REPLACE();
antlr4::tree::TerminalNode *TEMPORARY();
antlr4::tree::TerminalNode *IF();
antlr4::tree::TerminalNode *NOT();
@ -879,6 +882,7 @@ public:
TableSchemaClauseContext *tableSchemaClause();
EngineClauseContext *engineClause();
SubqueryClauseContext *subqueryClause();
antlr4::tree::TerminalNode *OR();
virtual antlrcpp::Any accept(antlr4::tree::ParseTreeVisitor *visitor) override;
};

View File

@ -8,14 +8,17 @@ create table t (n UInt64, s String default 's' || toString(n)) engine=Memory;
create table dist (n int) engine=Distributed(test_shard_localhost, currentDatabase(), t);
create table buf (n int) engine=Buffer(currentDatabase(), dist, 1, 10, 100, 10, 100, 1000, 1000);
system stop distributed sends dist;
insert into buf values (1);
replace table buf (n int) engine=Distributed(test_shard_localhost, currentDatabase(), dist);
replace table dist (n int) engine=Buffer(currentDatabase(), t, 1, 10, 100, 10, 100, 1000, 1000);
system stop distributed sends buf;
insert into buf values (2);
replace table buf (n int) engine=Buffer(currentDatabase(), dist, 1, 10, 100, 10, 100, 1000, 1000);
replace table dist (n int) engine=Distributed(test_shard_localhost, currentDatabase(), t);
system stop distributed sends dist;
insert into buf values (3);
replace table buf (n int) engine=Null;
replace table dist (n int) engine=Null;

View File

@ -1,8 +1,8 @@
t1
CREATE TABLE test_01185.t1\n(\n `n` UInt64,\n `s` String\n)\nENGINE = MergeTree\nORDER BY n\nSETTINGS index_granularity = 8192
CREATE TABLE default.t1\n(\n `n` UInt64,\n `s` String\n)\nENGINE = MergeTree\nORDER BY n\nSETTINGS index_granularity = 8192
t1
CREATE TABLE test_01185.t1\n(\n `n` UInt64,\n `s` Nullable(String)\n)\nENGINE = MergeTree\nORDER BY n\nSETTINGS index_granularity = 8192
CREATE TABLE default.t1\n(\n `n` UInt64,\n `s` Nullable(String)\n)\nENGINE = MergeTree\nORDER BY n\nSETTINGS index_granularity = 8192
2 \N
t1
CREATE TABLE test_01185.t1\n(\n `n` UInt64\n)\nENGINE = MergeTree\nORDER BY n\nSETTINGS index_granularity = 8192
CREATE TABLE default.t1\n(\n `n` UInt64\n)\nENGINE = MergeTree\nORDER BY n\nSETTINGS index_granularity = 8192
3

View File

@ -1,23 +1,22 @@
drop database if exists test_01185;
create database test_01185 engine=Atomic;
drop table if exists t1;
replace table test_01185.t1 (n UInt64, s String) engine=MergeTree order by n; -- { serverError 60 }
show tables from test_01185;
create or replace table test_01185.t1 (n UInt64, s String) engine=MergeTree order by n;
show tables from test_01185;
show create table test_01185.t1;
replace table t1 (n UInt64, s String) engine=MergeTree order by n; -- { serverError 60 }
show tables;
create or replace table t1 (n UInt64, s String) engine=MergeTree order by n;
show tables;
show create table t1;
insert into test_01185.t1 values (1, 'test');
create or replace table test_01185.t1 (n UInt64, s Nullable(String)) engine=MergeTree order by n;
insert into test_01185.t1 values (2, null);
show tables from test_01185;
show create table test_01185.t1;
select * from test_01185.t1;
insert into t1 values (1, 'test');
create or replace table t1 (n UInt64, s Nullable(String)) engine=MergeTree order by n;
insert into t1 values (2, null);
show tables;
show create table t1;
select * from t1;
replace table test_01185.t1 (n UInt64) engine=MergeTree order by n;
insert into test_01185.t1 values (3);
show tables from test_01185;
show create table test_01185.t1;
select * from test_01185.t1;
replace table t1 (n UInt64) engine=MergeTree order by n;
insert into t1 values (3);
show tables;
show create table t1;
select * from t1;
drop database test_01185;
drop table t1;

View File

@ -1,51 +1,51 @@
DROP DATABASE IF EXISTS 01915_db;
CREATE DATABASE 01915_db ENGINE=Atomic;
DROP DATABASE IF EXISTS test_01915_db;
CREATE DATABASE test_01915_db ENGINE=Atomic;
DROP TABLE IF EXISTS 01915_db.test_source_table_1;
CREATE TABLE 01915_db.test_source_table_1
DROP TABLE IF EXISTS test_01915_db.test_source_table_1;
CREATE TABLE test_01915_db.test_source_table_1
(
id UInt64,
value String
) ENGINE=TinyLog;
INSERT INTO 01915_db.test_source_table_1 VALUES (0, 'Value0');
INSERT INTO test_01915_db.test_source_table_1 VALUES (0, 'Value0');
DROP DICTIONARY IF EXISTS 01915_db.test_dictionary;
CREATE OR REPLACE DICTIONARY 01915_db.test_dictionary
DROP DICTIONARY IF EXISTS test_01915_db.test_dictionary;
CREATE OR REPLACE DICTIONARY test_01915_db.test_dictionary
(
id UInt64,
value String
)
PRIMARY KEY id
LAYOUT(DIRECT())
SOURCE(CLICKHOUSE(DB '01915_db' TABLE 'test_source_table_1'));
SOURCE(CLICKHOUSE(DB 'test_01915_db' TABLE 'test_source_table_1'));
SELECT * FROM 01915_db.test_dictionary;
SELECT * FROM test_01915_db.test_dictionary;
DROP TABLE IF EXISTS 01915_db.test_source_table_2;
CREATE TABLE 01915_db.test_source_table_2
DROP TABLE IF EXISTS test_01915_db.test_source_table_2;
CREATE TABLE test_01915_db.test_source_table_2
(
id UInt64,
value_1 String
) ENGINE=TinyLog;
INSERT INTO 01915_db.test_source_table_2 VALUES (0, 'Value1');
INSERT INTO test_01915_db.test_source_table_2 VALUES (0, 'Value1');
CREATE OR REPLACE DICTIONARY 01915_db.test_dictionary
CREATE OR REPLACE DICTIONARY test_01915_db.test_dictionary
(
id UInt64,
value_1 String
)
PRIMARY KEY id
LAYOUT(HASHED())
SOURCE(CLICKHOUSE(DB '01915_db' TABLE 'test_source_table_2'))
SOURCE(CLICKHOUSE(DB 'test_01915_db' TABLE 'test_source_table_2'))
LIFETIME(0);
SELECT * FROM 01915_db.test_dictionary;
SELECT * FROM test_01915_db.test_dictionary;
DROP DICTIONARY 01915_db.test_dictionary;
DROP DICTIONARY test_01915_db.test_dictionary;
DROP TABLE 01915_db.test_source_table_1;
DROP TABLE 01915_db.test_source_table_2;
DROP TABLE test_01915_db.test_source_table_1;
DROP TABLE test_01915_db.test_source_table_2;
DROP DATABASE 01915_db;
DROP DATABASE test_01915_db;

View File

@ -110,6 +110,7 @@
"00738_lock_for_inner_table",
"01153_attach_mv_uuid",
"01157_replace_table",
"01185_create_or_replace_table",
/// Sometimes cannot lock file most likely due to concurrent or adjacent tests, but we don't care how it works in Ordinary database.
"rocksdb",
"01914_exchange_dictionaries" /// Requires Atomic database
@ -519,7 +520,6 @@
"01924_argmax_bitmap_state",
"01913_replace_dictionary",
"01914_exchange_dictionaries",
"01915_create_or_replace_dictionary",
"01913_names_of_tuple_literal",
"01925_merge_prewhere_table"
],