mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
Merge branch 'master' into fix-pr-max-execution-timeout-leaf
This commit is contained in:
commit
23da4381f9
@ -1236,6 +1236,7 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block,
|
|||||||
|
|
||||||
void HashJoin::reuseJoinedData(const HashJoin & join)
|
void HashJoin::reuseJoinedData(const HashJoin & join)
|
||||||
{
|
{
|
||||||
|
have_compressed = join.have_compressed;
|
||||||
data = join.data;
|
data = join.data;
|
||||||
from_storage_join = true;
|
from_storage_join = true;
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@ namespace ErrorCodes
|
|||||||
{
|
{
|
||||||
extern const int SUPPORT_IS_DISABLED;
|
extern const int SUPPORT_IS_DISABLED;
|
||||||
extern const int REPLICA_STATUS_CHANGED;
|
extern const int REPLICA_STATUS_CHANGED;
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicatedMergeTree & storage_)
|
ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicatedMergeTree & storage_)
|
||||||
@ -117,6 +118,67 @@ void ReplicatedMergeTreeAttachThread::checkHasReplicaMetadataInZooKeeper(const z
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper)
|
||||||
|
{
|
||||||
|
const String & zookeeper_path = storage.zookeeper_path;
|
||||||
|
const String & replica_path = storage.replica_path;
|
||||||
|
const bool replica_readonly = storage.is_readonly;
|
||||||
|
|
||||||
|
for (size_t i = 0; i != 2; ++i)
|
||||||
|
{
|
||||||
|
String replica_metadata_version_str;
|
||||||
|
const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version_str);
|
||||||
|
if (!replica_metadata_version_exists)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
const Int32 metadata_version = parse<Int32>(replica_metadata_version_str);
|
||||||
|
|
||||||
|
if (metadata_version != 0 || replica_readonly)
|
||||||
|
{
|
||||||
|
/// No need to fix anything
|
||||||
|
return metadata_version;
|
||||||
|
}
|
||||||
|
|
||||||
|
Coordination::Stat stat;
|
||||||
|
zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat);
|
||||||
|
if (stat.version == 0)
|
||||||
|
{
|
||||||
|
/// No need to fix anything
|
||||||
|
return metadata_version;
|
||||||
|
}
|
||||||
|
|
||||||
|
ReplicatedMergeTreeQueue & queue = storage.queue;
|
||||||
|
queue.pullLogsToQueue(zookeeper);
|
||||||
|
if (queue.getStatus().metadata_alters_in_queue != 0)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue");
|
||||||
|
return metadata_version;
|
||||||
|
}
|
||||||
|
|
||||||
|
const Coordination::Requests ops = {
|
||||||
|
zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0),
|
||||||
|
zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version),
|
||||||
|
};
|
||||||
|
Coordination::Responses ops_responses;
|
||||||
|
const auto code = zookeeper->tryMulti(ops, ops_responses);
|
||||||
|
if (code == Coordination::Error::ZOK)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "Successfully set metadata_version to {}", stat.version);
|
||||||
|
return stat.version;
|
||||||
|
}
|
||||||
|
if (code != Coordination::Error::ZBADVERSION)
|
||||||
|
{
|
||||||
|
throw zkutil::KeeperException(code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Second attempt is only possible if metadata_version != 0 or metadata.version changed during the first attempt.
|
||||||
|
/// If metadata_version != 0, on second attempt we will return the new metadata_version.
|
||||||
|
/// If metadata.version changed, on second attempt we will either get metadata_version != 0 and return the new metadata_version or we will get metadata_alters_in_queue != 0 and return 0.
|
||||||
|
/// Either way, on second attempt this method should return.
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to fix replica metadata_version in ZooKeeper after two attempts");
|
||||||
|
}
|
||||||
|
|
||||||
void ReplicatedMergeTreeAttachThread::runImpl()
|
void ReplicatedMergeTreeAttachThread::runImpl()
|
||||||
{
|
{
|
||||||
storage.setZooKeeper();
|
storage.setZooKeeper();
|
||||||
@ -160,11 +222,11 @@ void ReplicatedMergeTreeAttachThread::runImpl()
|
|||||||
/// Just in case it was not removed earlier due to connection loss
|
/// Just in case it was not removed earlier due to connection loss
|
||||||
zookeeper->tryRemove(replica_path + "/flags/force_restore_data");
|
zookeeper->tryRemove(replica_path + "/flags/force_restore_data");
|
||||||
|
|
||||||
String replica_metadata_version;
|
const Int32 replica_metadata_version = fixReplicaMetadataVersionIfNeeded(zookeeper);
|
||||||
const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version);
|
const bool replica_metadata_version_exists = replica_metadata_version != -1;
|
||||||
if (replica_metadata_version_exists)
|
if (replica_metadata_version_exists)
|
||||||
{
|
{
|
||||||
storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(parse<int>(replica_metadata_version)));
|
storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(replica_metadata_version));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -48,6 +48,8 @@ private:
|
|||||||
void runImpl();
|
void runImpl();
|
||||||
|
|
||||||
void finalizeInitialization();
|
void finalizeInitialization();
|
||||||
|
|
||||||
|
Int32 fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper);
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2222,6 +2222,7 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const
|
|||||||
res.inserts_in_queue = 0;
|
res.inserts_in_queue = 0;
|
||||||
res.merges_in_queue = 0;
|
res.merges_in_queue = 0;
|
||||||
res.part_mutations_in_queue = 0;
|
res.part_mutations_in_queue = 0;
|
||||||
|
res.metadata_alters_in_queue = 0;
|
||||||
res.queue_oldest_time = 0;
|
res.queue_oldest_time = 0;
|
||||||
res.inserts_oldest_time = 0;
|
res.inserts_oldest_time = 0;
|
||||||
res.merges_oldest_time = 0;
|
res.merges_oldest_time = 0;
|
||||||
@ -2264,6 +2265,11 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const
|
|||||||
res.oldest_part_to_mutate_to = entry->new_part_name;
|
res.oldest_part_to_mutate_to = entry->new_part_name;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (entry->type == LogEntry::ALTER_METADATA)
|
||||||
|
{
|
||||||
|
++res.metadata_alters_in_queue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
|
@ -473,6 +473,7 @@ public:
|
|||||||
UInt32 inserts_in_queue;
|
UInt32 inserts_in_queue;
|
||||||
UInt32 merges_in_queue;
|
UInt32 merges_in_queue;
|
||||||
UInt32 part_mutations_in_queue;
|
UInt32 part_mutations_in_queue;
|
||||||
|
UInt32 metadata_alters_in_queue;
|
||||||
UInt32 queue_oldest_time;
|
UInt32 queue_oldest_time;
|
||||||
UInt32 inserts_oldest_time;
|
UInt32 inserts_oldest_time;
|
||||||
UInt32 merges_oldest_time;
|
UInt32 merges_oldest_time;
|
||||||
|
@ -277,7 +277,7 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
|||||||
|
|
||||||
if (has_valid_arguments)
|
if (has_valid_arguments)
|
||||||
{
|
{
|
||||||
if (is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 0)
|
if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 0)
|
||||||
{
|
{
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||||
"It's not allowed to specify explicit zookeeper_path and replica_name "
|
"It's not allowed to specify explicit zookeeper_path and replica_name "
|
||||||
@ -285,7 +285,7 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
|||||||
"specify them explicitly, enable setting "
|
"specify them explicitly, enable setting "
|
||||||
"database_replicated_allow_replicated_engine_arguments.");
|
"database_replicated_allow_replicated_engine_arguments.");
|
||||||
}
|
}
|
||||||
else if (is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 1)
|
else if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 1)
|
||||||
{
|
{
|
||||||
LOG_WARNING(&Poco::Logger::get("registerStorageMergeTree"), "It's not recommended to explicitly specify "
|
LOG_WARNING(&Poco::Logger::get("registerStorageMergeTree"), "It's not recommended to explicitly specify "
|
||||||
"zookeeper_path and replica_name in ReplicatedMergeTree arguments");
|
"zookeeper_path and replica_name in ReplicatedMergeTree arguments");
|
||||||
@ -305,7 +305,7 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
|||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must be a string literal{}", verbose_help_message);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must be a string literal{}", verbose_help_message);
|
||||||
|
|
||||||
|
|
||||||
if (is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 2)
|
if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 2)
|
||||||
{
|
{
|
||||||
LOG_WARNING(&Poco::Logger::get("registerStorageMergeTree"), "Replacing user-provided ZooKeeper path and replica name ({}, {}) "
|
LOG_WARNING(&Poco::Logger::get("registerStorageMergeTree"), "Replacing user-provided ZooKeeper path and replica name ({}, {}) "
|
||||||
"with default arguments", zookeeper_path, replica_name);
|
"with default arguments", zookeeper_path, replica_name);
|
||||||
|
@ -1549,3 +1549,19 @@ def test_all_groups_cluster(started_cluster):
|
|||||||
assert "bad_settings_node\ndummy_node\n" == bad_settings_node.query(
|
assert "bad_settings_node\ndummy_node\n" == bad_settings_node.query(
|
||||||
"select host_name from system.clusters where name='all_groups.db_cluster' order by host_name"
|
"select host_name from system.clusters where name='all_groups.db_cluster' order by host_name"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_detach_attach_table(started_cluster):
|
||||||
|
main_node.query("DROP DATABASE IF EXISTS detach_attach_db SYNC")
|
||||||
|
main_node.query(
|
||||||
|
"CREATE DATABASE detach_attach_db ENGINE = Replicated('/clickhouse/databases/detach_attach_db');"
|
||||||
|
)
|
||||||
|
main_node.query(
|
||||||
|
"CREATE TABLE detach_attach_db.detach_attach_table (k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k;"
|
||||||
|
)
|
||||||
|
main_node.query("INSERT INTO detach_attach_db.detach_attach_table VALUES (1);")
|
||||||
|
main_node.query("DETACH TABLE detach_attach_db.detach_attach_table PERMANENTLY;")
|
||||||
|
main_node.query("ATTACH TABLE detach_attach_db.detach_attach_table;")
|
||||||
|
assert (
|
||||||
|
main_node.query("SELECT * FROM detach_attach_db.detach_attach_table;") == "1\n"
|
||||||
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user