diff --git a/src/Storages/Kafka/StorageKafka2.cpp b/src/Storages/Kafka/StorageKafka2.cpp index de27adbbe3f..86330cbf122 100644 --- a/src/Storages/Kafka/StorageKafka2.cpp +++ b/src/Storages/Kafka/StorageKafka2.cpp @@ -1,5 +1,6 @@ #include +#include #include #include #include @@ -92,6 +93,8 @@ extern const int LOGICAL_ERROR; extern const int REPLICA_ALREADY_EXISTS; extern const int TABLE_IS_DROPPED; extern const int TABLE_WAS_NOT_DROPPED; +extern const int NO_ZOOKEEPER; +extern const int REPLICA_IS_ALREADY_ACTIVE; } namespace @@ -109,6 +112,8 @@ StorageKafka2::StorageKafka2( : IStorage(table_id_) , WithContext(context_->getGlobalContext()) , keeper(getContext()->getZooKeeper()) + , keeper_path(kafka_settings_->kafka_keeper_path.value) + , replica_path(keeper_path + "/replicas/" + kafka_settings_->kafka_replica_name.value) , kafka_settings(std::move(kafka_settings_)) , macros_info{.table_id = table_id_} , topics(parseTopics(getContext()->getMacros()->expand(kafka_settings->kafka_topic_list.value, macros_info))) @@ -122,11 +127,12 @@ StorageKafka2::StorageKafka2( , max_rows_per_message(kafka_settings->kafka_max_rows_per_message.value) , schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value, macros_info)) , num_consumers(kafka_settings->kafka_num_consumers.value) - , log(getLogger("StorageKafka2 (" + table_id_.table_name + ")")) + , log(getLogger(String("StorageKafka2 ") + table_id_.getNameForLogs())) , semaphore(0, static_cast(num_consumers)) , settings_adjustments(createSettingsAdjustments()) , thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value) , collection_name(collection_name_) + , active_node_identifier(toString(ServerUUID::get())) { if (kafka_settings->kafka_num_consumers > 1 && !thread_per_consumer) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "With multiple consumer you have to use thread per consumer!"); @@ -149,23 +155,13 @@ StorageKafka2::StorageKafka2( tasks.emplace_back(std::make_shared(std::move(task))); } - for (size_t i = 0; i < num_consumers; ++i) - { - try - { - consumers.push_back(ConsumerAndAssignmentInfo{.consumer = createConsumer(i), .keeper = keeper}); - ++num_created_consumers; - } - catch (const cppkafka::Exception &) - { - tryLogCurrentException(log); - } - } - const auto first_replica = createTableIfNotExists(); if (!first_replica) createReplica(); + + activating_task = getContext()->getSchedulePool().createTask(log->name() + "(activating task)", [this]() { activate(); }); + activating_task->deactivate(); } VirtualColumnsDescription StorageKafka2::createVirtuals(StreamingHandleErrorMode handle_error_mode) @@ -189,6 +185,150 @@ VirtualColumnsDescription StorageKafka2::createVirtuals(StreamingHandleErrorMode return desc; } +void StorageKafka2::partialShutdown() +{ + for (auto & task : tasks) + { + LOG_TRACE(log, "Cancelling streams"); + task->stream_cancelled = true; + } + + for (auto & task : tasks) + { + LOG_TRACE(log, "Waiting for cleanup"); + task->holder->deactivate(); + } + is_active = false; +} + +bool StorageKafka2::activate() +{ + LOG_TEST(log, "activate task"); + if (is_active && !getZooKeeper()->expired()) + { + LOG_TEST(log, "No need to activate"); + return true; + } + + if (first_time) + { + LOG_DEBUG(log, "Activating replica"); + assert(!is_active); + } + else if (!is_active) + { + LOG_WARNING(log, "Table was not active. Will try to activate it"); + } + else if (getZooKeeper()->expired()) + { + LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session"); + partialShutdown(); + } + else + { + UNREACHABLE(); + } + + try + { + setZooKeeper(); + } + catch (const Coordination::Exception &) + { + /// The exception when you try to zookeeper_init usually happens if DNS does not work or the connection with ZK fails + tryLogCurrentException(log, "Failed to establish a new ZK connection. Will try again"); + assert(!is_active); + return false; + } + + if (shutdown_called) + return false; + + auto activate_in_keeper = [this]() + { + try + { + auto zookeeper = getZooKeeper(); + + String is_active_path = fs::path(replica_path) / "is_active"; + zookeeper->deleteEphemeralNodeIfContentMatches(is_active_path, active_node_identifier); + + /// Simultaneously declare that this replica is active, and update the host. + Coordination::Requests ops; + ops.emplace_back(zkutil::makeCreateRequest(is_active_path, active_node_identifier, zkutil::CreateMode::Ephemeral)); + + try + { + zookeeper->create(is_active_path, active_node_identifier, zkutil::CreateMode::Ephemeral); + } + catch (const Coordination::Exception & e) + { + if (e.code == Coordination::Error::ZNODEEXISTS) + throw Exception( + ErrorCodes::REPLICA_IS_ALREADY_ACTIVE, + "Replica {} appears to be already active. If you're sure it's not, " + "try again in a minute or remove znode {}/is_active manually", + replica_path, + replica_path); + + throw; + } + replica_is_active_node = zkutil::EphemeralNodeHolder::existing(is_active_path, *zookeeper); + + return true; + } + catch (...) + { + replica_is_active_node = nullptr; + + try + { + throw; + } + catch (const Coordination::Exception & e) + { + LOG_ERROR(log, "Couldn't start replica: {}. {}", e.what(), DB::getCurrentExceptionMessage(true)); + return false; + } + catch (const Exception & e) + { + if (e.code() != ErrorCodes::REPLICA_IS_ALREADY_ACTIVE) + throw; + + LOG_ERROR(log, "Couldn't start replica: {}. {}", e.what(), DB::getCurrentExceptionMessage(true)); + return false; + } + } + }; + + if (!activate_in_keeper()) + { + assert(storage.is_readonly); + return false; + } + + is_active = true; + + // Start the reader threads + for (auto & task : tasks) + { + task->stream_cancelled = false; + task->holder->activateAndSchedule(); + } + + if (first_time) + first_time = false; + + LOG_DEBUG(log, "Table activated successfully"); + return true; +} + +void StorageKafka2::assertActive() const +{ + // TODO(antaljanosbenjamin): change LOGICAL_ERROR to something sensible + if (!is_active) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table is not active (replica path: {})", replica_path); +} SettingsChanges StorageKafka2::createSettingsAdjustments() { @@ -282,7 +422,8 @@ void StorageKafka2::startup() { try { - consumers.emplace_back(ConsumerAndAssignmentInfo{.consumer = createConsumer(i), .keeper = keeper}); + consumers.push_back(ConsumerAndAssignmentInfo{.consumer = createConsumer(i), .keeper = getZooKeeper()}); + LOG_DEBUG(log, "Created #{} consumer", num_created_consumers); ++num_created_consumers; } catch (const cppkafka::Exception &) @@ -290,28 +431,15 @@ void StorageKafka2::startup() tryLogCurrentException(log); } } - // Start the reader thread - for (auto & task : tasks) - task->holder->activateAndSchedule(); + activating_task->activateAndSchedule(); } void StorageKafka2::shutdown(bool) { shutdown_called = true; - for (auto & task : tasks) - { - LOG_TRACE(log, "Cancelling streams"); - // Interrupt streaming thread - task->stream_cancelled = true; - } - - for (auto & task : tasks) - { - LOG_TRACE(log, "Waiting for cleanup"); - task->holder->deactivate(); - } - + activating_task->deactivate(); + partialShutdown(); LOG_TRACE(log, "Closing consumers"); consumers.clear(); LOG_TRACE(log, "Consumers closed"); @@ -372,7 +500,7 @@ cppkafka::Configuration StorageKafka2::getConsumerConfiguration(size_t consumer_ conf.set("enable.partition.eof", "false"); // Ignore EOF messages for (auto & property : conf.get_all()) - LOG_TRACE(log, "Consumer set property {}:{}", property.first, property.second); + LOG_TEST(log, "Consumer set property {}:{}", property.first, property.second); return conf; } @@ -389,7 +517,7 @@ cppkafka::Configuration StorageKafka2::getProducerConfiguration() updateProducerConfiguration(conf); for (auto & property : conf.get_all()) - LOG_TRACE(log, "Producer set property {}:{}", property.first, property.second); + LOG_TEST(log, "Producer set property {}:{}", property.first, property.second); return conf; } @@ -558,32 +686,32 @@ std::optional getNumber(zkutil::ZooKeeper & keeper, const fs::path & pa bool StorageKafka2::createTableIfNotExists() { - const auto & keeper_path = fs::path(kafka_settings->kafka_keeper_path.value); - - const auto & replicas_path = keeper_path / "replicas"; + // Heavily based on StorageReplicatedMergeTree::createTableIfNotExists + const auto my_keeper_path = fs::path(keeper_path); + const auto replicas_path = my_keeper_path / "replicas"; for (auto i = 0; i < 1000; ++i) { if (keeper->exists(replicas_path)) { - LOG_DEBUG(log, "This table {} is already created, will add new replica", String(keeper_path)); + LOG_DEBUG(log, "This table {} is already created, will add new replica", keeper_path); return false; } /// There are leftovers from incompletely dropped table. - if (keeper->exists(keeper_path / "dropped")) + if (keeper->exists(my_keeper_path / "dropped")) { /// This condition may happen when the previous drop attempt was not completed /// or when table is dropped by another replica right now. /// This is Ok because another replica is definitely going to drop the table. - LOG_WARNING(log, "Removing leftovers from table {} (this might take several minutes)", String(keeper_path)); - String drop_lock_path = keeper_path / "dropped" / "lock"; + LOG_WARNING(log, "Removing leftovers from table {}", keeper_path); + String drop_lock_path = my_keeper_path / "dropped" / "lock"; Coordination::Error code = keeper->tryCreate(drop_lock_path, "", zkutil::CreateMode::Ephemeral); if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS) { - LOG_WARNING(log, "The leftovers from table {} were removed by another replica", String(keeper_path)); + LOG_WARNING(log, "The leftovers from table {} were removed by another replica", keeper_path); } else if (code != Coordination::Error::ZOK) { @@ -592,7 +720,7 @@ bool StorageKafka2::createTableIfNotExists() else { auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *keeper); - if (!removeTableNodesFromZooKeeper(metadata_drop_lock)) + if (!removeTableNodesFromZooKeeper(keeper, metadata_drop_lock)) { /// Someone is recursively removing table right now, we cannot create new table until old one is removed continue; @@ -605,7 +733,7 @@ bool StorageKafka2::createTableIfNotExists() ops.emplace_back(zkutil::makeCreateRequest(keeper_path, "", zkutil::CreateMode::Persistent)); - const auto topics_path = keeper_path / "topics"; + const auto topics_path = my_keeper_path / "topics"; ops.emplace_back(zkutil::makeCreateRequest(topics_path, "", zkutil::CreateMode::Persistent)); for (const auto & topic : topics) @@ -621,15 +749,14 @@ bool StorageKafka2::createTableIfNotExists() // Create the first replica ops.emplace_back(zkutil::makeCreateRequest(replicas_path, "", zkutil::CreateMode::Persistent)); - ops.emplace_back( - zkutil::makeCreateRequest(replicas_path / kafka_settings->kafka_replica_name.value, "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(replica_path, "", zkutil::CreateMode::Persistent)); Coordination::Responses responses; const auto code = keeper->tryMulti(ops, responses); if (code == Coordination::Error::ZNODEEXISTS) { - LOG_INFO(log, "It looks like the table {} was created by another replica at the same moment, will retry", String(keeper_path)); + LOG_INFO(log, "It looks like the table {} was created by another replica at the same moment, will retry", keeper_path); continue; } else if (code != Coordination::Error::ZOK) @@ -637,7 +764,7 @@ bool StorageKafka2::createTableIfNotExists() zkutil::KeeperMultiException::check(code, ops, responses); } - LOG_INFO(log, "Table {} created successfully ", String(keeper_path)); + LOG_INFO(log, "Table {} created successfully ", keeper_path); return true; } @@ -649,25 +776,25 @@ bool StorageKafka2::createTableIfNotExists() } -bool StorageKafka2::removeTableNodesFromZooKeeper(const zkutil::EphemeralNodeHolder::Ptr & drop_lock) +bool StorageKafka2::removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr keeper_to_use, const zkutil::EphemeralNodeHolder::Ptr & drop_lock) { bool completely_removed = false; Strings children; - if (const auto code = keeper->tryGetChildren(kafka_settings->kafka_keeper_path.value, children); code == Coordination::Error::ZNONODE) + if (const auto code = keeper_to_use->tryGetChildren(keeper_path, children); code == Coordination::Error::ZNONODE) throw Exception(ErrorCodes::LOGICAL_ERROR, "There is a race condition between creation and removal. It's a bug"); - const auto keeper_path = fs::path(kafka_settings->kafka_keeper_path.value); + const auto my_keeper_path = fs::path(keeper_path); for (const auto & child : children) if (child != "dropped") - keeper->tryRemoveRecursive(keeper_path / child); + keeper_to_use->tryRemoveRecursive(my_keeper_path / child); Coordination::Requests ops; Coordination::Responses responses; ops.emplace_back(zkutil::makeRemoveRequest(drop_lock->getPath(), -1)); - ops.emplace_back(zkutil::makeRemoveRequest(keeper_path / "dropped", -1)); - ops.emplace_back(zkutil::makeRemoveRequest(keeper_path, -1)); - const auto code = keeper->tryMulti(ops, responses, /* check_session_valid */ true); + ops.emplace_back(zkutil::makeRemoveRequest(my_keeper_path / "dropped", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(my_keeper_path, -1)); + const auto code = keeper_to_use->tryMulti(ops, responses, /* check_session_valid */ true); if (code == Coordination::Error::ZNONODE) { @@ -680,7 +807,7 @@ bool StorageKafka2::removeTableNodesFromZooKeeper(const zkutil::EphemeralNodeHol log, "Table was not completely removed from Keeper, {} still exists and may contain some garbage," "but someone is removing it right now.", - kafka_settings->kafka_keeper_path.value); + keeper_path); } else if (code != Coordination::Error::ZOK) { @@ -691,7 +818,7 @@ bool StorageKafka2::removeTableNodesFromZooKeeper(const zkutil::EphemeralNodeHol { drop_lock->setAlreadyRemoved(); completely_removed = true; - LOG_INFO(log, "Table {} was successfully removed from ZooKeeper", kafka_settings->kafka_keeper_path.value); + LOG_INFO(log, "Table {} was successfully removed from ZooKeeper", keeper_path); } return completely_removed; @@ -699,45 +826,50 @@ bool StorageKafka2::removeTableNodesFromZooKeeper(const zkutil::EphemeralNodeHol void StorageKafka2::createReplica() { - const auto replica_path = kafka_settings->kafka_keeper_path.value + "/replicas/" + kafka_settings->kafka_replica_name.value; + LOG_INFO(log, "Creating replica {}", replica_path); + // TODO: This can cause issues if a new table is created with the same path. To make this work, we should store some metadata + // about the table to be able to identify that the same table is created, not a new one. const auto code = keeper->tryCreate(replica_path, "", zkutil::CreateMode::Persistent); - if (code == Coordination::Error::ZNODEEXISTS) - throw Exception(ErrorCodes::REPLICA_ALREADY_EXISTS, "Replica {} already exists", replica_path); - else if (code == Coordination::Error::ZNONODE) - throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {} was suddenly removed", kafka_settings->kafka_keeper_path.value); - else if (code != Coordination::Error::ZOK) - throw Coordination::Exception::fromPath(code, replica_path); - LOG_INFO(log, "Replica {} created", replica_path); + switch (code) + { + case Coordination::Error::ZNODEEXISTS: + LOG_INFO(log, "Replica {} already exists, will try to use it", replica_path); + break; + case Coordination::Error::ZOK: + LOG_INFO(log, "Replica {} created", replica_path); + break; + case Coordination::Error::ZNONODE: + throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {} was suddenly removed", keeper_path); + default: + throw Coordination::Exception::fromPath(code, replica_path); + } } void StorageKafka2::dropReplica() { - if (keeper->expired()) - throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Table was not dropped because ZooKeeper session has expired."); - - auto replica_path = kafka_settings->kafka_keeper_path.value + "/replicas/" + kafka_settings->kafka_replica_name.value; + LOG_INFO(log, "Trying to drop replica {}", replica_path); + auto my_keeper = getZooKeeperIfTableShutDown(); LOG_INFO(log, "Removing replica {}", replica_path); - if (!keeper->exists(replica_path)) + if (!my_keeper->exists(replica_path)) { LOG_INFO(log, "Removing replica {} does not exist", replica_path); return; } { - keeper->tryRemoveChildrenRecursive(replica_path); + my_keeper->tryRemoveChildrenRecursive(replica_path); - if (keeper->tryRemove(replica_path) != Coordination::Error::ZOK) + if (my_keeper->tryRemove(replica_path) != Coordination::Error::ZOK) LOG_ERROR(log, "Replica was not completely removed from Keeper, {} still exists and may contain some garbage.", replica_path); } /// Check that `zookeeper_path` exists: it could have been deleted by another replica after execution of previous line. Strings replicas; - if (Coordination::Error::ZOK != keeper->tryGetChildren(kafka_settings->kafka_keeper_path.value + "/replicas", replicas) - || !replicas.empty()) + if (Coordination::Error::ZOK != my_keeper->tryGetChildren(keeper_path + "/replicas", replicas) || !replicas.empty()) return; LOG_INFO(log, "{} is the last replica, will remove table", replica_path); @@ -758,11 +890,12 @@ void StorageKafka2::dropReplica() /// (The existence of child node does not allow to remove parent node). Coordination::Requests ops; Coordination::Responses responses; - String drop_lock_path = kafka_settings->kafka_keeper_path.value + "/dropped/lock"; - ops.emplace_back(zkutil::makeRemoveRequest(kafka_settings->kafka_keeper_path.value + "/replicas", -1)); - ops.emplace_back(zkutil::makeCreateRequest(kafka_settings->kafka_keeper_path.value + "/dropped", "", zkutil::CreateMode::Persistent)); + fs::path my_keeper_path = keeper_path; + String drop_lock_path = my_keeper_path / "dropped" / "lock"; + ops.emplace_back(zkutil::makeRemoveRequest(my_keeper_path / "replicas", -1)); + ops.emplace_back(zkutil::makeCreateRequest(my_keeper_path / "dropped", "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(drop_lock_path, "", zkutil::CreateMode::Ephemeral)); - Coordination::Error code = keeper->tryMulti(ops, responses); + Coordination::Error code = my_keeper->tryMulti(ops, responses); if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS) { @@ -778,9 +911,9 @@ void StorageKafka2::dropReplica() } else { - auto drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *keeper); - LOG_INFO(log, "Removing table {} (this might take several minutes)", kafka_settings->kafka_keeper_path.value); - removeTableNodesFromZooKeeper(drop_lock); + auto drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *my_keeper); + LOG_INFO(log, "Removing table {} (this might take several minutes)", keeper_path); + removeTableNodesFromZooKeeper(my_keeper, drop_lock); } } @@ -1073,6 +1206,7 @@ void StorageKafka2::threadFunc(size_t idx) { assert(idx < tasks.size()); auto task = tasks[idx]; + std::optional maybe_stall_reason; try { auto table_id = getStorageID(); @@ -1082,21 +1216,19 @@ void StorageKafka2::threadFunc(size_t idx) { auto start_time = std::chrono::steady_clock::now(); - mv_attached.store(true); - // Keep streaming as long as there are attached views and streaming is not cancelled while (!task->stream_cancelled && num_created_consumers > 0) { + maybe_stall_reason.reset(); if (!checkDependencies(table_id)) break; LOG_DEBUG(log, "Started streaming to {} attached views", num_views); // Exit the loop & reschedule if some stream stalled - auto some_stream_is_stalled = streamToViews(idx); - if (some_stream_is_stalled) + if (maybe_stall_reason = streamToViews(idx); maybe_stall_reason.has_value()) { - LOG_TRACE(log, "Stream(s) stalled. Reschedule."); + LOG_TRACE(log, "Stream stalled."); break; } @@ -1115,14 +1247,18 @@ void StorageKafka2::threadFunc(size_t idx) tryLogCurrentException(__PRETTY_FUNCTION__); } - mv_attached.store(false); - - // Wait for attached views if (!task->stream_cancelled) - task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS); + { + // Keeper related problems should be solved relatively fast, it makes sense wait less time + if (maybe_stall_reason.has_value() + && (*maybe_stall_reason == StallReason::KeeperSessionEnded || *maybe_stall_reason == StallReason::CouldNotAcquireLocks)) + task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS / 10); + else + task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS); + } } -bool StorageKafka2::streamToViews(size_t idx) +std::optional StorageKafka2::streamToViews(size_t idx) { // This function is written assuming that each consumer has their own thread. This means once this is changed, this function should be revisited. // The return values should be revisited, as stalling all consumers because of a single one stalled is not a good idea. @@ -1140,21 +1276,24 @@ bool StorageKafka2::streamToViews(size_t idx) // To keep the consumer alive const auto wait_for_assignment = consumer_info.locks.empty(); - LOG_TRACE(log, "Polling consumer for events"); + LOG_TRACE(log, "Polling consumer {} for events", idx); consumer->pollEvents(); if (wait_for_assignment) { while (nullptr == consumer->getKafkaAssignment() && consumer_info.watch.elapsedMilliseconds() < MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS) consumer->pollEvents(); + LOG_INFO(log, "Consumer has assignment: {}", nullptr == consumer->getKafkaAssignment()); } try { if (consumer->needsOffsetUpdate() || consumer_info.locks.empty()) { + LOG_TRACE(log, "Consumer needs update offset"); // First release the locks so let other consumers acquire them ASAP consumer_info.locks.clear(); + consumer_info.topic_partitions.clear(); const auto * current_assignment = consumer->getKafkaAssignment(); if (current_assignment == nullptr) @@ -1162,13 +1301,15 @@ bool StorageKafka2::streamToViews(size_t idx) // The consumer lost its assignment and haven't received a new one. // By returning true this function reports the current consumer as a "stalled" stream, which LOG_TRACE(log, "No assignment"); - return true; + return StallReason::NoAssignment; } - LOG_TRACE(log, "Consumer needs update offset"); consumer_info.consume_from_topic_partition_index = 0; - consumer_info.locks.clear(); - consumer_info.topic_partitions.clear(); + if (consumer_info.keeper->expired()) + { + consumer_info.keeper = getZooKeeperAndAssertActive(); + LOG_TEST(log, "Got new zookeeper"); + } auto maybe_locks = lockTopicPartitions(*consumer_info.keeper, *current_assignment); @@ -1176,7 +1317,7 @@ bool StorageKafka2::streamToViews(size_t idx) { // We couldn't acquire locks, probably some other consumers are still holding them. LOG_TRACE(log, "Couldn't acquire locks"); - return true; + return StallReason::CouldNotAcquireLocks; } consumer_info.locks = std::move(*maybe_locks); @@ -1200,7 +1341,7 @@ bool StorageKafka2::streamToViews(size_t idx) if (consumer_info.topic_partitions.empty()) { LOG_TRACE(log, "Consumer {} has assignment, but has no partitions, probably because there are more consumers in the consumer group than partitions.", idx); - return true; + return StallReason::NoPartitions; } LOG_TRACE(log, "Trying to consume from consumer {}", idx); const auto maybe_rows = streamFromConsumer(consumer_info); @@ -1213,22 +1354,22 @@ bool StorageKafka2::streamToViews(size_t idx) else { LOG_DEBUG(log, "Couldn't stream any messages"); - return true; + return StallReason::NoMessages; } } catch (const zkutil::KeeperException & e) { if (Coordination::isHardwareError(e.code)) { - // Clear ephemeral nodes here as we got a new keeper here + LOG_INFO(log, "Cleaning up topic-partitions locks because of exception: {}", e.displayText()); consumer_info.locks.clear(); - consumer_info.keeper = getZooKeeper(); - return true; + activating_task->schedule(); + return StallReason::KeeperSessionEnded; } throw; } - return false; + return {}; } @@ -1300,30 +1441,51 @@ std::optional StorageKafka2::streamFromConsumer(ConsumerAndAssignmentInf } lock_info.committed_offset = last_read_offset + 1; topic_partition.offset = last_read_offset + 1; - consumer_info.consumer->commit(topic_partition); saveCommittedOffset(keeper_to_use, topic_partition); + consumer_info.consumer->commit(topic_partition); lock_info.intent_size.reset(); needs_offset_reset = false; return rows; } - -zkutil::ZooKeeperPtr StorageKafka2::getZooKeeper() +void StorageKafka2::setZooKeeper() +{ + std::unique_lock lock{keeper_mutex}; + keeper = getContext()->getZooKeeper(); +} + +zkutil::ZooKeeperPtr StorageKafka2::tryGetZooKeeper() const { std::unique_lock lock{keeper_mutex}; - if (keeper->expired()) - { - keeper = keeper->startNewSession(); - } return keeper; } +zkutil::ZooKeeperPtr StorageKafka2::getZooKeeper() const +{ + auto res = tryGetZooKeeper(); + if (!res) + throw Exception(ErrorCodes::NO_ZOOKEEPER, "Cannot get ZooKeeper"); + return res; +} + +zkutil::ZooKeeperPtr StorageKafka2::getZooKeeperAndAssertActive() const +{ + auto res = getZooKeeper(); + assertActive(); + return res; +} + +zkutil::ZooKeeperPtr StorageKafka2::getZooKeeperIfTableShutDown() const +{ + zkutil::ZooKeeperPtr new_zookeeper = getContext()->getZooKeeper(); + new_zookeeper->sync(keeper_path); + return new_zookeeper; +} fs::path StorageKafka2::getTopicPartitionPath(const TopicPartition & topic_partition) { - return fs::path(kafka_settings->kafka_keeper_path.value) / "topics" / topic_partition.topic / "partitions" - / std::to_string(topic_partition.partition_id); + return fs::path(keeper_path) / "topics" / topic_partition.topic / "partitions" / std::to_string(topic_partition.partition_id); } } diff --git a/src/Storages/Kafka/StorageKafka2.h b/src/Storages/Kafka/StorageKafka2.h index a2cbdce51a0..99c97caf9da 100644 --- a/src/Storages/Kafka/StorageKafka2.h +++ b/src/Storages/Kafka/StorageKafka2.h @@ -127,8 +127,10 @@ private: }; // Configuration and state - std::mutex keeper_mutex; + mutable std::mutex keeper_mutex; zkutil::ZooKeeperPtr keeper; + String keeper_path; + String replica_path; std::unique_ptr kafka_settings; Macros::MacroExpansionInfo macros_info; const Names topics; @@ -142,7 +144,6 @@ private: LoggerPtr log; Poco::Semaphore semaphore; const SettingsChanges settings_adjustments; - std::atomic mv_attached = false; /// Can differ from num_consumers in case of exception in startup() (or if startup() hasn't been called). /// In this case we still need to be able to shutdown() properly. size_t num_created_consumers = 0; /// number of actually created consumers. @@ -156,6 +157,16 @@ private: String collection_name; std::atomic shutdown_called = false; + // Handling replica activation. + std::atomic is_active = false; + zkutil::EphemeralNodeHolderPtr replica_is_active_node; + BackgroundSchedulePool::TaskHolder activating_task; + String active_node_identifier; + bool first_time = true; + bool activate(); + void partialShutdown(); + + void assertActive() const; SettingsChanges createSettingsAdjustments(); KafkaConsumer2Ptr createConsumer(size_t consumer_number); // Returns full consumer related configuration, also the configuration @@ -186,7 +197,16 @@ private: static Names parseTopics(String topic_list); static String getDefaultClientId(const StorageID & table_id_); - bool streamToViews(size_t idx); + enum class StallReason + { + NoAssignment, + CouldNotAcquireLocks, + NoPartitions, + NoMessages, + KeeperSessionEnded, + }; + + std::optional streamToViews(size_t idx); std::optional streamFromConsumer(ConsumerAndAssignmentInfo & consumer_info); @@ -195,7 +215,7 @@ private: // Returns true if this is the first replica bool createTableIfNotExists(); // Returns true if all of the nodes were cleaned up - bool removeTableNodesFromZooKeeper(const zkutil::EphemeralNodeHolder::Ptr & drop_lock); + bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr keeper_to_use, const zkutil::EphemeralNodeHolder::Ptr & drop_lock); // Creates only the replica in ZooKeeper. Shouldn't be called on the first replica as it is created in createTableIfNotExists void createReplica(); void dropReplica(); @@ -212,7 +232,12 @@ private: Stopwatch & watch, const ContextPtr & context); - zkutil::ZooKeeperPtr getZooKeeper(); + void setZooKeeper(); + zkutil::ZooKeeperPtr tryGetZooKeeper() const; + zkutil::ZooKeeperPtr getZooKeeper() const; + zkutil::ZooKeeperPtr getZooKeeperAndAssertActive() const; + zkutil::ZooKeeperPtr getZooKeeperIfTableShutDown() const; + std::filesystem::path getTopicPartitionPath(const TopicPartition & topic_partition);