mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
some unrelated fixes
This commit is contained in:
parent
6e45ed0534
commit
16647fe8ce
@ -4897,7 +4897,7 @@ void MergeTreeData::removeQueryId(const String & query_id) const
|
||||
{
|
||||
std::lock_guard lock(query_id_set_mutex);
|
||||
if (query_id_set.find(query_id) == query_id_set.end())
|
||||
LOG_WARNING(log, "We have query_id removed but it's not recorded. This is a bug");
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "We have query_id removed but it's not recorded. This is a bug");
|
||||
else
|
||||
query_id_set.erase(query_id);
|
||||
}
|
||||
@ -5077,7 +5077,10 @@ CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger()
|
||||
for (const auto & part : submerging_parts)
|
||||
{
|
||||
if (!storage.currently_submerging_big_parts.count(part))
|
||||
LOG_WARNING(log, "currently_submerging_big_parts doesn't contain part {} to erase. This is a bug", part->name);
|
||||
{
|
||||
LOG_ERROR(log, "currently_submerging_big_parts doesn't contain part {} to erase. This is a bug", part->name);
|
||||
assert(false);
|
||||
}
|
||||
else
|
||||
storage.currently_submerging_big_parts.erase(part);
|
||||
}
|
||||
|
@ -71,6 +71,13 @@ struct MergeTreePartInfo
|
||||
|| max_block < rhs.min_block;
|
||||
}
|
||||
|
||||
bool isFakeDropRangePart() const
|
||||
{
|
||||
/// Another max level was previously used for REPLACE/MOVE PARTITION
|
||||
auto another_max_level = std::numeric_limits<decltype(level)>::max();
|
||||
return level == MergeTreePartInfo::MAX_LEVEL || level == another_max_level;
|
||||
}
|
||||
|
||||
String getPartName() const;
|
||||
String getPartNameV0(DayNum left_date, DayNum right_date) const;
|
||||
UInt64 getBlocksCount() const
|
||||
|
@ -18,6 +18,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int TABLE_DIFFERS_TOO_MUCH;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
static const auto PART_CHECK_ERROR_SLEEP_MS = 5 * 1000;
|
||||
@ -190,7 +191,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible(
|
||||
if (missing_part_search_result == MissingPartSearchResult::LostForever)
|
||||
{
|
||||
/// Is it in the replication queue? If there is - delete, because the task can not be processed.
|
||||
if (!storage.queue.remove(zookeeper, part_name))
|
||||
if (!storage.queue.markPartAsLostForever(zookeeper, part_name))
|
||||
{
|
||||
/// The part was not in our queue.
|
||||
LOG_WARNING(log, "Missing part {} is not in our queue, this can happen rarely.", part_name);
|
||||
@ -367,8 +368,8 @@ void ReplicatedMergeTreePartCheckThread::run()
|
||||
{
|
||||
if (!parts_set.empty())
|
||||
{
|
||||
LOG_ERROR(log, "Non-empty parts_set with empty parts_queue. This is a bug.");
|
||||
parts_set.clear();
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Non-empty parts_set with empty parts_queue. This is a bug.");
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -401,7 +402,7 @@ void ReplicatedMergeTreePartCheckThread::run()
|
||||
|
||||
if (parts_queue.empty())
|
||||
{
|
||||
LOG_ERROR(log, "Someone erased checking part from parts_queue. This is a bug.");
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Someone erased checking part from parts_queue. This is a bug.");
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -174,6 +174,9 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
|
||||
storage.partial_shutdown_called = false;
|
||||
storage.partial_shutdown_event.reset();
|
||||
|
||||
/// Start queue processing
|
||||
storage.background_executor.start();
|
||||
|
||||
storage.queue_updating_task->activateAndSchedule();
|
||||
storage.mutations_updating_task->activateAndSchedule();
|
||||
storage.mutations_finalizing_task->activateAndSchedule();
|
||||
@ -227,7 +230,7 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
|
||||
{
|
||||
LOG_DEBUG(log, "Found part {} with failed quorum. Moving to detached. This shouldn't happen often.", part_name);
|
||||
storage.forgetPartAndMoveToDetached(part, "noquorum");
|
||||
storage.queue.removeFromVirtualParts(part->info);
|
||||
storage.queue.removeFailedQuorumPart(part->info);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -352,6 +355,9 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
|
||||
storage.cleanup_thread.stop();
|
||||
storage.part_check_thread.stop();
|
||||
|
||||
/// Stop queue processing
|
||||
storage.background_executor.finish();
|
||||
|
||||
LOG_TRACE(log, "Threads finished");
|
||||
}
|
||||
|
||||
|
@ -1465,6 +1465,12 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
|
||||
return true;
|
||||
}
|
||||
|
||||
if (entry.type == LogEntry::PART_IS_LOST)
|
||||
{
|
||||
queue.executePartIsLost(getZooKeeper(), entry);
|
||||
return true;
|
||||
}
|
||||
|
||||
const bool is_get_or_attach = entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART;
|
||||
|
||||
if (is_get_or_attach || entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::MUTATE_PART)
|
||||
@ -2043,7 +2049,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
|
||||
if (code == Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_DEBUG(log, "Marked quorum for part {} as failed.", entry.new_part_name);
|
||||
queue.removeFromVirtualParts(part_info);
|
||||
queue.removeFailedQuorumPart(part_info);
|
||||
return true;
|
||||
}
|
||||
else if (code == Coordination::Error::ZBADVERSION || code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
|
||||
@ -2055,7 +2061,10 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_WARNING(log, "No active replica has part {}, but that part needs quorum and /quorum/status contains entry about another part {}. It means that part was successfully written to {} replicas, but then all of them goes offline. Or it is a bug.", entry.new_part_name, quorum_entry.part_name, entry.quorum);
|
||||
LOG_WARNING(log, "No active replica has part {}, "
|
||||
"but that part needs quorum and /quorum/status contains entry about another part {}. "
|
||||
"It means that part was successfully written to {} replicas, but then all of them goes offline. "
|
||||
"Or it is a bug.", entry.new_part_name, quorum_entry.part_name, entry.quorum);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2743,7 +2752,6 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
||||
{
|
||||
if (active_parts_set.getContainingPart(part).empty())
|
||||
{
|
||||
queue.remove(zookeeper, part);
|
||||
parts_to_remove_from_zk.emplace_back(part);
|
||||
LOG_WARNING(log, "Source replica does not have part {}. Removing it from ZooKeeper.", part);
|
||||
}
|
||||
@ -2988,6 +2996,7 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke
|
||||
|
||||
/// Clear obsolete queue that we no longer need.
|
||||
zookeeper->removeChildren(replica_path + "/queue");
|
||||
queue.clear();
|
||||
|
||||
/// Will do repair from the selected replica.
|
||||
cloneReplica(source_replica, source_is_lost_stat, zookeeper);
|
||||
|
@ -37,12 +37,17 @@ def start_cluster():
|
||||
def test_inconsistent_parts_if_drop_while_replica_not_active(start_cluster):
|
||||
with PartitionManager() as pm:
|
||||
# insert into all replicas
|
||||
for i in range(50):
|
||||
for i in range(10):
|
||||
node1.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(i))
|
||||
assert_eq_with_retry(node2, "SELECT count(*) FROM test_table", node1.query("SELECT count(*) FROM test_table"))
|
||||
|
||||
# disable network on the first replica
|
||||
# partition the first replica from the second one and (later) from zk
|
||||
pm.partition_instances(node1, node2)
|
||||
|
||||
# insert some parts on the second replica only, we will drop these parts
|
||||
for i in range(10):
|
||||
node2.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(10 + i))
|
||||
|
||||
pm.drop_instance_zk_connections(node1)
|
||||
|
||||
# drop all parts on the second replica
|
||||
@ -51,9 +56,14 @@ def test_inconsistent_parts_if_drop_while_replica_not_active(start_cluster):
|
||||
|
||||
# insert into the second replica
|
||||
# DROP_RANGE will be removed from the replication log and the first replica will be lost
|
||||
for i in range(50):
|
||||
node2.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(50 + i))
|
||||
for i in range(20):
|
||||
node2.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(20 + i))
|
||||
|
||||
# the first replica will be cloned from the second
|
||||
pm.heal_all()
|
||||
assert_eq_with_retry(node1, "SELECT count(*) FROM test_table", node2.query("SELECT count(*) FROM test_table"))
|
||||
# ensure replica was cloned
|
||||
assert node1.contains_in_log("Will mimic node2")
|
||||
# queue must be empty (except some merges that are possibly executing right now)
|
||||
assert node1.query("SELECT count() FROM system.replication_queue WHERE type != 'MERGE_PARTS'") == "0\n"
|
||||
assert node2.query("SELECT count() FROM system.replication_queue WHERE type != 'MERGE_PARTS'") == "0\n"
|
||||
|
@ -12,7 +12,7 @@ DATA_SIZE=200
|
||||
SEQ=$(seq 0 $(($NUM_REPLICAS - 1)))
|
||||
|
||||
for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "DROP TABLE IF EXISTS r$REPLICA"; done
|
||||
for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE r$REPLICA (x UInt64) ENGINE = ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r$REPLICA') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M';"; done
|
||||
for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE r$REPLICA (x UInt64) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r$REPLICA') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M';"; done
|
||||
|
||||
function thread()
|
||||
{
|
||||
|
@ -11,7 +11,7 @@ $CLICKHOUSE_CLIENT --query "CREATE DATABASE test_01320 ENGINE=Ordinary" # Diff
|
||||
|
||||
function thread1()
|
||||
{
|
||||
while true; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE test_01320.r (x UInt64) ENGINE = ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r') ORDER BY x; DROP TABLE test_01320.r;"; done
|
||||
while true; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE test_01320.r (x UInt64) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r') ORDER BY x; DROP TABLE test_01320.r;"; done
|
||||
}
|
||||
|
||||
function thread2()
|
||||
|
@ -11,7 +11,7 @@ FREEZE_OUT_STRUCTURE='backup_name String, backup_path String , part_backup_path
|
||||
# setup
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS table_for_freeze_replicated;"
|
||||
${CLICKHOUSE_CLIENT} --query "CREATE TABLE table_for_freeze_replicated (key UInt64, value String) ENGINE = ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table_for_freeze_replicated', '1') ORDER BY key PARTITION BY key % 10;"
|
||||
${CLICKHOUSE_CLIENT} --query "CREATE TABLE table_for_freeze_replicated (key UInt64, value String) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table_for_freeze_replicated', '1') ORDER BY key PARTITION BY key % 10;"
|
||||
${CLICKHOUSE_CLIENT} --query "INSERT INTO table_for_freeze_replicated SELECT number, toString(number) from numbers(10);"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "ALTER TABLE table_for_freeze_replicated FREEZE WITH NAME 'test_01417' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;" \
|
||||
|
Loading…
Reference in New Issue
Block a user