mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge pull request #36113 from ClickHouse/remove-useless-code-2
Remove useless code in ReplicatedMergeTreeRestartingThread
This commit is contained in:
commit
780f7c87c7
@ -841,6 +841,21 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
|
||||
return false;
|
||||
}
|
||||
|
||||
void ZooKeeper::waitForEphemeralToDisappearIfAny(const std::string & path)
|
||||
{
|
||||
zkutil::EventPtr eph_node_disappeared = std::make_shared<Poco::Event>();
|
||||
String content;
|
||||
if (!tryGet(path, content, nullptr, eph_node_disappeared))
|
||||
return;
|
||||
|
||||
int32_t timeout_ms = 2 * session_timeout_ms;
|
||||
if (!eph_node_disappeared->tryWait(timeout_ms))
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
|
||||
"Ephemeral node {} still exists after {}s, probably it's owned by someone else. "
|
||||
"Either session_timeout_ms in client's config is different from server's config or it's a bug. "
|
||||
"Node data: '{}'", path, timeout_ms / 1000, content);
|
||||
}
|
||||
|
||||
ZooKeeperPtr ZooKeeper::startNewSession() const
|
||||
{
|
||||
return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms, operation_timeout_ms, chroot, implementation, zk_log, get_priority_load_balancing);
|
||||
|
@ -240,6 +240,10 @@ public:
|
||||
/// The function returns true if waited and false if waiting was interrupted by condition.
|
||||
bool waitForDisappear(const std::string & path, const WaitCondition & condition = {});
|
||||
|
||||
/// Wait for the ephemeral node created in previous session to disappear.
|
||||
/// Throws LOGICAL_ERROR if node still exists after 2x session_timeout.
|
||||
void waitForEphemeralToDisappearIfAny(const std::string & path);
|
||||
|
||||
/// Async interface (a small subset of operations is implemented).
|
||||
///
|
||||
/// Usage:
|
||||
|
@ -7,6 +7,9 @@
|
||||
#include <base/getThreadId.h>
|
||||
#include <base/types.h>
|
||||
|
||||
#if defined(__linux__)
|
||||
#include <sys/utsname.h>
|
||||
#endif
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -29,6 +32,15 @@ DB::UInt64 randomSeed()
|
||||
hash.update(times.tv_nsec);
|
||||
hash.update(times.tv_sec);
|
||||
hash.update(getThreadId());
|
||||
hash.update(×);
|
||||
|
||||
/// It makes sense to add something like hostname to avoid seed collision when multiple servers start simultaneously.
|
||||
/// But randomSeed() must be signal-safe and gethostname and similar functions are not.
|
||||
/// Let's try to get utsname.nodename using uname syscall (it's signal-safe).
|
||||
#if defined(__linux__)
|
||||
struct utsname sysinfo;
|
||||
if (uname(&sysinfo) == 0)
|
||||
hash.update(sysinfo);
|
||||
#endif
|
||||
|
||||
return hash.get64();
|
||||
}
|
||||
|
@ -547,15 +547,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
|
||||
{
|
||||
/// Connection has been lost and now we are retrying,
|
||||
/// but our previous ephemeral node still exists.
|
||||
zkutil::EventPtr eph_node_disappeared = std::make_shared<Poco::Event>();
|
||||
String dummy;
|
||||
if (zookeeper->tryGet(active_node_path, dummy, nullptr, eph_node_disappeared))
|
||||
{
|
||||
constexpr int timeout_ms = 60 * 1000;
|
||||
if (!eph_node_disappeared->tryWait(timeout_ms))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Ephemeral node {} still exists, "
|
||||
"probably it's owned by someone else", active_node_path);
|
||||
}
|
||||
zookeeper->waitForEphemeralToDisappearIfAny(active_node_path);
|
||||
}
|
||||
|
||||
zookeeper->create(active_node_path, {}, zkutil::CreateMode::Ephemeral);
|
||||
|
@ -178,40 +178,41 @@ namespace DB
|
||||
|
||||
switch (unit.type)
|
||||
{
|
||||
case ProcessingUnitType::START :
|
||||
case ProcessingUnitType::START:
|
||||
{
|
||||
formatter->writePrefix();
|
||||
break;
|
||||
}
|
||||
case ProcessingUnitType::PLAIN :
|
||||
case ProcessingUnitType::PLAIN:
|
||||
{
|
||||
formatter->consume(std::move(unit.chunk));
|
||||
break;
|
||||
}
|
||||
case ProcessingUnitType::PLAIN_FINISH :
|
||||
case ProcessingUnitType::PLAIN_FINISH:
|
||||
{
|
||||
formatter->writeSuffix();
|
||||
break;
|
||||
}
|
||||
case ProcessingUnitType::TOTALS :
|
||||
case ProcessingUnitType::TOTALS:
|
||||
{
|
||||
formatter->consumeTotals(std::move(unit.chunk));
|
||||
break;
|
||||
}
|
||||
case ProcessingUnitType::EXTREMES :
|
||||
case ProcessingUnitType::EXTREMES:
|
||||
{
|
||||
if (are_totals_written)
|
||||
formatter->setTotalsAreWritten();
|
||||
formatter->consumeExtremes(std::move(unit.chunk));
|
||||
break;
|
||||
}
|
||||
case ProcessingUnitType::FINALIZE :
|
||||
case ProcessingUnitType::FINALIZE:
|
||||
{
|
||||
formatter->setOutsideStatistics(std::move(unit.statistics));
|
||||
formatter->finalizeImpl();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/// Flush all the data to handmade buffer.
|
||||
formatter->flush();
|
||||
unit.actual_memory_size = out_buffer.getActualSize();
|
||||
|
@ -181,7 +181,6 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
|
||||
future_merged_part->updatePath(storage, reserved_space.get());
|
||||
future_merged_part->merge_type = entry.merge_type;
|
||||
|
||||
|
||||
if (storage_settings_ptr->allow_remote_fs_zero_copy_replication)
|
||||
{
|
||||
if (auto disk = reserved_space->getDisk(); disk->supportZeroCopyReplication())
|
||||
|
@ -303,25 +303,7 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
|
||||
ReplicatedMergeTreeAddress address = storage.getReplicatedMergeTreeAddress();
|
||||
|
||||
String is_active_path = fs::path(storage.replica_path) / "is_active";
|
||||
|
||||
/** If the node is marked as active, but the mark is made in the same instance, delete it.
|
||||
* This is possible only when session in ZooKeeper expires.
|
||||
*/
|
||||
String data;
|
||||
Coordination::Stat stat;
|
||||
bool has_is_active = zookeeper->tryGet(is_active_path, data, &stat);
|
||||
if (has_is_active && data == active_node_identifier)
|
||||
{
|
||||
auto code = zookeeper->tryRemove(is_active_path, stat.version);
|
||||
|
||||
if (code == Coordination::Error::ZBADVERSION)
|
||||
throw Exception("Another instance of replica " + storage.replica_path + " was created just now."
|
||||
" You shouldn't run multiple instances of same replica. You need to check configuration files.",
|
||||
ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);
|
||||
|
||||
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
|
||||
throw Coordination::Exception(code, is_active_path);
|
||||
}
|
||||
zookeeper->waitForEphemeralToDisappearIfAny(is_active_path);
|
||||
|
||||
/// Simultaneously declare that this replica is active, and update the host.
|
||||
Coordination::Requests ops;
|
||||
|
@ -35,7 +35,6 @@
|
||||
#include <Storages/MergeTree/LeaderElection.h>
|
||||
#include <Storages/MergeTree/ZeroCopyLock.h>
|
||||
|
||||
|
||||
#include <Databases/DatabaseOnDisk.h>
|
||||
|
||||
#include <Parsers/formatAST.h>
|
||||
@ -93,6 +92,7 @@
|
||||
#include <thread>
|
||||
#include <future>
|
||||
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace ProfileEvents
|
||||
@ -5770,7 +5770,6 @@ void StorageReplicatedMergeTree::fetchPartition(
|
||||
String best_replica;
|
||||
|
||||
{
|
||||
|
||||
/// List of replicas of source shard.
|
||||
replicas = zookeeper->getChildren(fs::path(from) / "replicas");
|
||||
|
||||
|
@ -15,7 +15,7 @@
|
||||
|
||||
<coordination_settings>
|
||||
<operation_timeout_ms>10000</operation_timeout_ms>
|
||||
<session_timeout_ms>30000</session_timeout_ms>
|
||||
<session_timeout_ms>15000</session_timeout_ms>
|
||||
<raft_logs_level>trace</raft_logs_level>
|
||||
<force_sync>false</force_sync>
|
||||
</coordination_settings>
|
||||
|
@ -15,7 +15,7 @@
|
||||
|
||||
<coordination_settings>
|
||||
<operation_timeout_ms>10000</operation_timeout_ms>
|
||||
<session_timeout_ms>30000</session_timeout_ms>
|
||||
<session_timeout_ms>15000</session_timeout_ms>
|
||||
<raft_logs_level>trace</raft_logs_level>
|
||||
<force_sync>false</force_sync>
|
||||
</coordination_settings>
|
||||
|
@ -15,7 +15,7 @@
|
||||
|
||||
<coordination_settings>
|
||||
<operation_timeout_ms>10000</operation_timeout_ms>
|
||||
<session_timeout_ms>30000</session_timeout_ms>
|
||||
<session_timeout_ms>15000</session_timeout_ms>
|
||||
<raft_logs_level>trace</raft_logs_level>
|
||||
<force_sync>false</force_sync>
|
||||
</coordination_settings>
|
||||
|
@ -12,6 +12,6 @@
|
||||
<host>zoo3</host>
|
||||
<port>2181</port>
|
||||
</node>
|
||||
<session_timeout_ms>3000</session_timeout_ms>
|
||||
<session_timeout_ms>15000</session_timeout_ms>
|
||||
</zookeeper>
|
||||
</clickhouse>
|
||||
|
@ -12,6 +12,6 @@
|
||||
<host>zoo3</host>
|
||||
<port>2281</port>
|
||||
</node>
|
||||
<session_timeout_ms>3000</session_timeout_ms>
|
||||
<session_timeout_ms>15000</session_timeout_ms>
|
||||
</zookeeper>
|
||||
</clickhouse>
|
||||
|
@ -698,10 +698,8 @@ def test_in_memory_alters(start_cluster):
|
||||
expected = "1\tab\t0\n2\tcd\t0\n"
|
||||
assert node9.query("SELECT id, s, col1 FROM alters_table ORDER BY id") == expected
|
||||
check_parts_type(1)
|
||||
# After hard restart table can be in readonly mode
|
||||
exec_query_with_retry(
|
||||
node9,
|
||||
"INSERT INTO alters_table (date, id, col1) VALUES (toDate('2020-10-10'), 3, 100)",
|
||||
node9.query(
|
||||
"INSERT INTO alters_table (date, id, col1) VALUES (toDate('2020-10-10'), 3, 100)"
|
||||
)
|
||||
node9.query("ALTER TABLE alters_table MODIFY COLUMN col1 String")
|
||||
node9.query("ALTER TABLE alters_table DROP COLUMN s")
|
||||
|
Loading…
Reference in New Issue
Block a user