Remove useless code in ReplicatedMergeTreeRestartingThread

This commit is contained in:
Alexey Milovidov 2022-04-11 00:44:30 +02:00
parent 3c14659ab2
commit a54c01cf72
4 changed files with 8 additions and 28 deletions

View File

@ -178,40 +178,41 @@ namespace DB
switch (unit.type) switch (unit.type)
{ {
case ProcessingUnitType::START : case ProcessingUnitType::START:
{ {
formatter->writePrefix(); formatter->writePrefix();
break; break;
} }
case ProcessingUnitType::PLAIN : case ProcessingUnitType::PLAIN:
{ {
formatter->consume(std::move(unit.chunk)); formatter->consume(std::move(unit.chunk));
break; break;
} }
case ProcessingUnitType::PLAIN_FINISH : case ProcessingUnitType::PLAIN_FINISH:
{ {
formatter->writeSuffix(); formatter->writeSuffix();
break; break;
} }
case ProcessingUnitType::TOTALS : case ProcessingUnitType::TOTALS:
{ {
formatter->consumeTotals(std::move(unit.chunk)); formatter->consumeTotals(std::move(unit.chunk));
break; break;
} }
case ProcessingUnitType::EXTREMES : case ProcessingUnitType::EXTREMES:
{ {
if (are_totals_written) if (are_totals_written)
formatter->setTotalsAreWritten(); formatter->setTotalsAreWritten();
formatter->consumeExtremes(std::move(unit.chunk)); formatter->consumeExtremes(std::move(unit.chunk));
break; break;
} }
case ProcessingUnitType::FINALIZE : case ProcessingUnitType::FINALIZE:
{ {
formatter->setOutsideStatistics(std::move(unit.statistics)); formatter->setOutsideStatistics(std::move(unit.statistics));
formatter->finalizeImpl(); formatter->finalizeImpl();
break; break;
} }
} }
/// Flush all the data to handmade buffer. /// Flush all the data to handmade buffer.
formatter->flush(); formatter->flush();
unit.actual_memory_size = out_buffer.getActualSize(); unit.actual_memory_size = out_buffer.getActualSize();

View File

@ -181,7 +181,6 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
future_merged_part->updatePath(storage, reserved_space.get()); future_merged_part->updatePath(storage, reserved_space.get());
future_merged_part->merge_type = entry.merge_type; future_merged_part->merge_type = entry.merge_type;
if (storage_settings_ptr->allow_remote_fs_zero_copy_replication) if (storage_settings_ptr->allow_remote_fs_zero_copy_replication)
{ {
if (auto disk = reserved_space->getDisk(); disk->getType() == DB::DiskType::S3) if (auto disk = reserved_space->getDisk(); disk->getType() == DB::DiskType::S3)

View File

@ -304,25 +304,6 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
String is_active_path = fs::path(storage.replica_path) / "is_active"; String is_active_path = fs::path(storage.replica_path) / "is_active";
/** If the node is marked as active, but the mark is made in the same instance, delete it.
* This is possible only when session in ZooKeeper expires.
*/
String data;
Coordination::Stat stat;
bool has_is_active = zookeeper->tryGet(is_active_path, data, &stat);
if (has_is_active && data == active_node_identifier)
{
auto code = zookeeper->tryRemove(is_active_path, stat.version);
if (code == Coordination::Error::ZBADVERSION)
throw Exception("Another instance of replica " + storage.replica_path + " was created just now."
" You shouldn't run multiple instances of same replica. You need to check configuration files.",
ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
throw Coordination::Exception(code, is_active_path);
}
/// Simultaneously declare that this replica is active, and update the host. /// Simultaneously declare that this replica is active, and update the host.
Coordination::Requests ops; Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(is_active_path, active_node_identifier, zkutil::CreateMode::Ephemeral)); ops.emplace_back(zkutil::makeCreateRequest(is_active_path, active_node_identifier, zkutil::CreateMode::Ephemeral));

View File

@ -35,7 +35,6 @@
#include <Storages/MergeTree/LeaderElection.h> #include <Storages/MergeTree/LeaderElection.h>
#include <Storages/MergeTree/ZeroCopyLock.h> #include <Storages/MergeTree/ZeroCopyLock.h>
#include <Databases/DatabaseOnDisk.h> #include <Databases/DatabaseOnDisk.h>
#include <Parsers/formatAST.h> #include <Parsers/formatAST.h>
@ -84,6 +83,7 @@
#include <thread> #include <thread>
#include <future> #include <future>
namespace fs = std::filesystem; namespace fs = std::filesystem;
namespace ProfileEvents namespace ProfileEvents
@ -5735,7 +5735,6 @@ void StorageReplicatedMergeTree::fetchPartition(
String best_replica; String best_replica;
{ {
/// List of replicas of source shard. /// List of replicas of source shard.
replicas = zookeeper->getChildren(fs::path(from) / "replicas"); replicas = zookeeper->getChildren(fs::path(from) / "replicas");