Make StorageKafka2 handle keeper session better

This commit is contained in:
János Benjamin Antal 2024-06-18 11:41:46 +00:00
parent 20bac3ed5f
commit 7fa6111865
2 changed files with 303 additions and 116 deletions

View File

@ -1,5 +1,6 @@
#include <Storages/Kafka/StorageKafka2.h>
#include <Core/ServerUUID.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
@ -92,6 +93,8 @@ extern const int LOGICAL_ERROR;
extern const int REPLICA_ALREADY_EXISTS;
extern const int TABLE_IS_DROPPED;
extern const int TABLE_WAS_NOT_DROPPED;
extern const int NO_ZOOKEEPER;
extern const int REPLICA_IS_ALREADY_ACTIVE;
}
namespace
@ -109,6 +112,8 @@ StorageKafka2::StorageKafka2(
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, keeper(getContext()->getZooKeeper())
, keeper_path(kafka_settings_->kafka_keeper_path.value)
, replica_path(keeper_path + "/replicas/" + kafka_settings_->kafka_replica_name.value)
, kafka_settings(std::move(kafka_settings_))
, macros_info{.table_id = table_id_}
, topics(parseTopics(getContext()->getMacros()->expand(kafka_settings->kafka_topic_list.value, macros_info)))
@ -122,11 +127,12 @@ StorageKafka2::StorageKafka2(
, max_rows_per_message(kafka_settings->kafka_max_rows_per_message.value)
, schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value, macros_info))
, num_consumers(kafka_settings->kafka_num_consumers.value)
, log(getLogger("StorageKafka2 (" + table_id_.table_name + ")"))
, log(getLogger(String("StorageKafka2 ") + table_id_.getNameForLogs()))
, semaphore(0, static_cast<int>(num_consumers))
, settings_adjustments(createSettingsAdjustments())
, thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value)
, collection_name(collection_name_)
, active_node_identifier(toString(ServerUUID::get()))
{
if (kafka_settings->kafka_num_consumers > 1 && !thread_per_consumer)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "With multiple consumer you have to use thread per consumer!");
@ -149,23 +155,13 @@ StorageKafka2::StorageKafka2(
tasks.emplace_back(std::make_shared<TaskContext>(std::move(task)));
}
for (size_t i = 0; i < num_consumers; ++i)
{
try
{
consumers.push_back(ConsumerAndAssignmentInfo{.consumer = createConsumer(i), .keeper = keeper});
++num_created_consumers;
}
catch (const cppkafka::Exception &)
{
tryLogCurrentException(log);
}
}
const auto first_replica = createTableIfNotExists();
if (!first_replica)
createReplica();
activating_task = getContext()->getSchedulePool().createTask(log->name() + "(activating task)", [this]() { activate(); });
activating_task->deactivate();
}
VirtualColumnsDescription StorageKafka2::createVirtuals(StreamingHandleErrorMode handle_error_mode)
@ -189,6 +185,150 @@ VirtualColumnsDescription StorageKafka2::createVirtuals(StreamingHandleErrorMode
return desc;
}
void StorageKafka2::partialShutdown()
{
for (auto & task : tasks)
{
LOG_TRACE(log, "Cancelling streams");
task->stream_cancelled = true;
}
for (auto & task : tasks)
{
LOG_TRACE(log, "Waiting for cleanup");
task->holder->deactivate();
}
is_active = false;
}
bool StorageKafka2::activate()
{
LOG_TEST(log, "activate task");
if (is_active && !getZooKeeper()->expired())
{
LOG_TEST(log, "No need to activate");
return true;
}
if (first_time)
{
LOG_DEBUG(log, "Activating replica");
assert(!is_active);
}
else if (!is_active)
{
LOG_WARNING(log, "Table was not active. Will try to activate it");
}
else if (getZooKeeper()->expired())
{
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session");
partialShutdown();
}
else
{
UNREACHABLE();
}
try
{
setZooKeeper();
}
catch (const Coordination::Exception &)
{
/// The exception when you try to zookeeper_init usually happens if DNS does not work or the connection with ZK fails
tryLogCurrentException(log, "Failed to establish a new ZK connection. Will try again");
assert(!is_active);
return false;
}
if (shutdown_called)
return false;
auto activate_in_keeper = [this]()
{
try
{
auto zookeeper = getZooKeeper();
String is_active_path = fs::path(replica_path) / "is_active";
zookeeper->deleteEphemeralNodeIfContentMatches(is_active_path, active_node_identifier);
/// Simultaneously declare that this replica is active, and update the host.
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(is_active_path, active_node_identifier, zkutil::CreateMode::Ephemeral));
try
{
zookeeper->create(is_active_path, active_node_identifier, zkutil::CreateMode::Ephemeral);
}
catch (const Coordination::Exception & e)
{
if (e.code == Coordination::Error::ZNODEEXISTS)
throw Exception(
ErrorCodes::REPLICA_IS_ALREADY_ACTIVE,
"Replica {} appears to be already active. If you're sure it's not, "
"try again in a minute or remove znode {}/is_active manually",
replica_path,
replica_path);
throw;
}
replica_is_active_node = zkutil::EphemeralNodeHolder::existing(is_active_path, *zookeeper);
return true;
}
catch (...)
{
replica_is_active_node = nullptr;
try
{
throw;
}
catch (const Coordination::Exception & e)
{
LOG_ERROR(log, "Couldn't start replica: {}. {}", e.what(), DB::getCurrentExceptionMessage(true));
return false;
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::REPLICA_IS_ALREADY_ACTIVE)
throw;
LOG_ERROR(log, "Couldn't start replica: {}. {}", e.what(), DB::getCurrentExceptionMessage(true));
return false;
}
}
};
if (!activate_in_keeper())
{
assert(storage.is_readonly);
return false;
}
is_active = true;
// Start the reader threads
for (auto & task : tasks)
{
task->stream_cancelled = false;
task->holder->activateAndSchedule();
}
if (first_time)
first_time = false;
LOG_DEBUG(log, "Table activated successfully");
return true;
}
void StorageKafka2::assertActive() const
{
// TODO(antaljanosbenjamin): change LOGICAL_ERROR to something sensible
if (!is_active)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table is not active (replica path: {})", replica_path);
}
SettingsChanges StorageKafka2::createSettingsAdjustments()
{
@ -282,7 +422,8 @@ void StorageKafka2::startup()
{
try
{
consumers.emplace_back(ConsumerAndAssignmentInfo{.consumer = createConsumer(i), .keeper = keeper});
consumers.push_back(ConsumerAndAssignmentInfo{.consumer = createConsumer(i), .keeper = getZooKeeper()});
LOG_DEBUG(log, "Created #{} consumer", num_created_consumers);
++num_created_consumers;
}
catch (const cppkafka::Exception &)
@ -290,28 +431,15 @@ void StorageKafka2::startup()
tryLogCurrentException(log);
}
}
// Start the reader thread
for (auto & task : tasks)
task->holder->activateAndSchedule();
activating_task->activateAndSchedule();
}
void StorageKafka2::shutdown(bool)
{
shutdown_called = true;
for (auto & task : tasks)
{
LOG_TRACE(log, "Cancelling streams");
// Interrupt streaming thread
task->stream_cancelled = true;
}
for (auto & task : tasks)
{
LOG_TRACE(log, "Waiting for cleanup");
task->holder->deactivate();
}
activating_task->deactivate();
partialShutdown();
LOG_TRACE(log, "Closing consumers");
consumers.clear();
LOG_TRACE(log, "Consumers closed");
@ -372,7 +500,7 @@ cppkafka::Configuration StorageKafka2::getConsumerConfiguration(size_t consumer_
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
for (auto & property : conf.get_all())
LOG_TRACE(log, "Consumer set property {}:{}", property.first, property.second);
LOG_TEST(log, "Consumer set property {}:{}", property.first, property.second);
return conf;
}
@ -389,7 +517,7 @@ cppkafka::Configuration StorageKafka2::getProducerConfiguration()
updateProducerConfiguration(conf);
for (auto & property : conf.get_all())
LOG_TRACE(log, "Producer set property {}:{}", property.first, property.second);
LOG_TEST(log, "Producer set property {}:{}", property.first, property.second);
return conf;
}
@ -558,32 +686,32 @@ std::optional<int64_t> getNumber(zkutil::ZooKeeper & keeper, const fs::path & pa
bool StorageKafka2::createTableIfNotExists()
{
const auto & keeper_path = fs::path(kafka_settings->kafka_keeper_path.value);
const auto & replicas_path = keeper_path / "replicas";
// Heavily based on StorageReplicatedMergeTree::createTableIfNotExists
const auto my_keeper_path = fs::path(keeper_path);
const auto replicas_path = my_keeper_path / "replicas";
for (auto i = 0; i < 1000; ++i)
{
if (keeper->exists(replicas_path))
{
LOG_DEBUG(log, "This table {} is already created, will add new replica", String(keeper_path));
LOG_DEBUG(log, "This table {} is already created, will add new replica", keeper_path);
return false;
}
/// There are leftovers from incompletely dropped table.
if (keeper->exists(keeper_path / "dropped"))
if (keeper->exists(my_keeper_path / "dropped"))
{
/// This condition may happen when the previous drop attempt was not completed
/// or when table is dropped by another replica right now.
/// This is Ok because another replica is definitely going to drop the table.
LOG_WARNING(log, "Removing leftovers from table {} (this might take several minutes)", String(keeper_path));
String drop_lock_path = keeper_path / "dropped" / "lock";
LOG_WARNING(log, "Removing leftovers from table {}", keeper_path);
String drop_lock_path = my_keeper_path / "dropped" / "lock";
Coordination::Error code = keeper->tryCreate(drop_lock_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
{
LOG_WARNING(log, "The leftovers from table {} were removed by another replica", String(keeper_path));
LOG_WARNING(log, "The leftovers from table {} were removed by another replica", keeper_path);
}
else if (code != Coordination::Error::ZOK)
{
@ -592,7 +720,7 @@ bool StorageKafka2::createTableIfNotExists()
else
{
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *keeper);
if (!removeTableNodesFromZooKeeper(metadata_drop_lock))
if (!removeTableNodesFromZooKeeper(keeper, metadata_drop_lock))
{
/// Someone is recursively removing table right now, we cannot create new table until old one is removed
continue;
@ -605,7 +733,7 @@ bool StorageKafka2::createTableIfNotExists()
ops.emplace_back(zkutil::makeCreateRequest(keeper_path, "", zkutil::CreateMode::Persistent));
const auto topics_path = keeper_path / "topics";
const auto topics_path = my_keeper_path / "topics";
ops.emplace_back(zkutil::makeCreateRequest(topics_path, "", zkutil::CreateMode::Persistent));
for (const auto & topic : topics)
@ -621,15 +749,14 @@ bool StorageKafka2::createTableIfNotExists()
// Create the first replica
ops.emplace_back(zkutil::makeCreateRequest(replicas_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(
zkutil::makeCreateRequest(replicas_path / kafka_settings->kafka_replica_name.value, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "", zkutil::CreateMode::Persistent));
Coordination::Responses responses;
const auto code = keeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "It looks like the table {} was created by another replica at the same moment, will retry", String(keeper_path));
LOG_INFO(log, "It looks like the table {} was created by another replica at the same moment, will retry", keeper_path);
continue;
}
else if (code != Coordination::Error::ZOK)
@ -637,7 +764,7 @@ bool StorageKafka2::createTableIfNotExists()
zkutil::KeeperMultiException::check(code, ops, responses);
}
LOG_INFO(log, "Table {} created successfully ", String(keeper_path));
LOG_INFO(log, "Table {} created successfully ", keeper_path);
return true;
}
@ -649,25 +776,25 @@ bool StorageKafka2::createTableIfNotExists()
}
bool StorageKafka2::removeTableNodesFromZooKeeper(const zkutil::EphemeralNodeHolder::Ptr & drop_lock)
bool StorageKafka2::removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr keeper_to_use, const zkutil::EphemeralNodeHolder::Ptr & drop_lock)
{
bool completely_removed = false;
Strings children;
if (const auto code = keeper->tryGetChildren(kafka_settings->kafka_keeper_path.value, children); code == Coordination::Error::ZNONODE)
if (const auto code = keeper_to_use->tryGetChildren(keeper_path, children); code == Coordination::Error::ZNONODE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is a race condition between creation and removal. It's a bug");
const auto keeper_path = fs::path(kafka_settings->kafka_keeper_path.value);
const auto my_keeper_path = fs::path(keeper_path);
for (const auto & child : children)
if (child != "dropped")
keeper->tryRemoveRecursive(keeper_path / child);
keeper_to_use->tryRemoveRecursive(my_keeper_path / child);
Coordination::Requests ops;
Coordination::Responses responses;
ops.emplace_back(zkutil::makeRemoveRequest(drop_lock->getPath(), -1));
ops.emplace_back(zkutil::makeRemoveRequest(keeper_path / "dropped", -1));
ops.emplace_back(zkutil::makeRemoveRequest(keeper_path, -1));
const auto code = keeper->tryMulti(ops, responses, /* check_session_valid */ true);
ops.emplace_back(zkutil::makeRemoveRequest(my_keeper_path / "dropped", -1));
ops.emplace_back(zkutil::makeRemoveRequest(my_keeper_path, -1));
const auto code = keeper_to_use->tryMulti(ops, responses, /* check_session_valid */ true);
if (code == Coordination::Error::ZNONODE)
{
@ -680,7 +807,7 @@ bool StorageKafka2::removeTableNodesFromZooKeeper(const zkutil::EphemeralNodeHol
log,
"Table was not completely removed from Keeper, {} still exists and may contain some garbage,"
"but someone is removing it right now.",
kafka_settings->kafka_keeper_path.value);
keeper_path);
}
else if (code != Coordination::Error::ZOK)
{
@ -691,7 +818,7 @@ bool StorageKafka2::removeTableNodesFromZooKeeper(const zkutil::EphemeralNodeHol
{
drop_lock->setAlreadyRemoved();
completely_removed = true;
LOG_INFO(log, "Table {} was successfully removed from ZooKeeper", kafka_settings->kafka_keeper_path.value);
LOG_INFO(log, "Table {} was successfully removed from ZooKeeper", keeper_path);
}
return completely_removed;
@ -699,45 +826,50 @@ bool StorageKafka2::removeTableNodesFromZooKeeper(const zkutil::EphemeralNodeHol
void StorageKafka2::createReplica()
{
const auto replica_path = kafka_settings->kafka_keeper_path.value + "/replicas/" + kafka_settings->kafka_replica_name.value;
LOG_INFO(log, "Creating replica {}", replica_path);
// TODO: This can cause issues if a new table is created with the same path. To make this work, we should store some metadata
// about the table to be able to identify that the same table is created, not a new one.
const auto code = keeper->tryCreate(replica_path, "", zkutil::CreateMode::Persistent);
if (code == Coordination::Error::ZNODEEXISTS)
throw Exception(ErrorCodes::REPLICA_ALREADY_EXISTS, "Replica {} already exists", replica_path);
else if (code == Coordination::Error::ZNONODE)
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {} was suddenly removed", kafka_settings->kafka_keeper_path.value);
else if (code != Coordination::Error::ZOK)
throw Coordination::Exception::fromPath(code, replica_path);
LOG_INFO(log, "Replica {} created", replica_path);
switch (code)
{
case Coordination::Error::ZNODEEXISTS:
LOG_INFO(log, "Replica {} already exists, will try to use it", replica_path);
break;
case Coordination::Error::ZOK:
LOG_INFO(log, "Replica {} created", replica_path);
break;
case Coordination::Error::ZNONODE:
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {} was suddenly removed", keeper_path);
default:
throw Coordination::Exception::fromPath(code, replica_path);
}
}
void StorageKafka2::dropReplica()
{
if (keeper->expired())
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Table was not dropped because ZooKeeper session has expired.");
auto replica_path = kafka_settings->kafka_keeper_path.value + "/replicas/" + kafka_settings->kafka_replica_name.value;
LOG_INFO(log, "Trying to drop replica {}", replica_path);
auto my_keeper = getZooKeeperIfTableShutDown();
LOG_INFO(log, "Removing replica {}", replica_path);
if (!keeper->exists(replica_path))
if (!my_keeper->exists(replica_path))
{
LOG_INFO(log, "Removing replica {} does not exist", replica_path);
return;
}
{
keeper->tryRemoveChildrenRecursive(replica_path);
my_keeper->tryRemoveChildrenRecursive(replica_path);
if (keeper->tryRemove(replica_path) != Coordination::Error::ZOK)
if (my_keeper->tryRemove(replica_path) != Coordination::Error::ZOK)
LOG_ERROR(log, "Replica was not completely removed from Keeper, {} still exists and may contain some garbage.", replica_path);
}
/// Check that `zookeeper_path` exists: it could have been deleted by another replica after execution of previous line.
Strings replicas;
if (Coordination::Error::ZOK != keeper->tryGetChildren(kafka_settings->kafka_keeper_path.value + "/replicas", replicas)
|| !replicas.empty())
if (Coordination::Error::ZOK != my_keeper->tryGetChildren(keeper_path + "/replicas", replicas) || !replicas.empty())
return;
LOG_INFO(log, "{} is the last replica, will remove table", replica_path);
@ -758,11 +890,12 @@ void StorageKafka2::dropReplica()
/// (The existence of child node does not allow to remove parent node).
Coordination::Requests ops;
Coordination::Responses responses;
String drop_lock_path = kafka_settings->kafka_keeper_path.value + "/dropped/lock";
ops.emplace_back(zkutil::makeRemoveRequest(kafka_settings->kafka_keeper_path.value + "/replicas", -1));
ops.emplace_back(zkutil::makeCreateRequest(kafka_settings->kafka_keeper_path.value + "/dropped", "", zkutil::CreateMode::Persistent));
fs::path my_keeper_path = keeper_path;
String drop_lock_path = my_keeper_path / "dropped" / "lock";
ops.emplace_back(zkutil::makeRemoveRequest(my_keeper_path / "replicas", -1));
ops.emplace_back(zkutil::makeCreateRequest(my_keeper_path / "dropped", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(drop_lock_path, "", zkutil::CreateMode::Ephemeral));
Coordination::Error code = keeper->tryMulti(ops, responses);
Coordination::Error code = my_keeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
{
@ -778,9 +911,9 @@ void StorageKafka2::dropReplica()
}
else
{
auto drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *keeper);
LOG_INFO(log, "Removing table {} (this might take several minutes)", kafka_settings->kafka_keeper_path.value);
removeTableNodesFromZooKeeper(drop_lock);
auto drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *my_keeper);
LOG_INFO(log, "Removing table {} (this might take several minutes)", keeper_path);
removeTableNodesFromZooKeeper(my_keeper, drop_lock);
}
}
@ -1073,6 +1206,7 @@ void StorageKafka2::threadFunc(size_t idx)
{
assert(idx < tasks.size());
auto task = tasks[idx];
std::optional<StallReason> maybe_stall_reason;
try
{
auto table_id = getStorageID();
@ -1082,21 +1216,19 @@ void StorageKafka2::threadFunc(size_t idx)
{
auto start_time = std::chrono::steady_clock::now();
mv_attached.store(true);
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!task->stream_cancelled && num_created_consumers > 0)
{
maybe_stall_reason.reset();
if (!checkDependencies(table_id))
break;
LOG_DEBUG(log, "Started streaming to {} attached views", num_views);
// Exit the loop & reschedule if some stream stalled
auto some_stream_is_stalled = streamToViews(idx);
if (some_stream_is_stalled)
if (maybe_stall_reason = streamToViews(idx); maybe_stall_reason.has_value())
{
LOG_TRACE(log, "Stream(s) stalled. Reschedule.");
LOG_TRACE(log, "Stream stalled.");
break;
}
@ -1115,14 +1247,18 @@ void StorageKafka2::threadFunc(size_t idx)
tryLogCurrentException(__PRETTY_FUNCTION__);
}
mv_attached.store(false);
// Wait for attached views
if (!task->stream_cancelled)
task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS);
{
// Keeper related problems should be solved relatively fast, it makes sense wait less time
if (maybe_stall_reason.has_value()
&& (*maybe_stall_reason == StallReason::KeeperSessionEnded || *maybe_stall_reason == StallReason::CouldNotAcquireLocks))
task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS / 10);
else
task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS);
}
}
bool StorageKafka2::streamToViews(size_t idx)
std::optional<StorageKafka2::StallReason> StorageKafka2::streamToViews(size_t idx)
{
// This function is written assuming that each consumer has their own thread. This means once this is changed, this function should be revisited.
// The return values should be revisited, as stalling all consumers because of a single one stalled is not a good idea.
@ -1140,21 +1276,24 @@ bool StorageKafka2::streamToViews(size_t idx)
// To keep the consumer alive
const auto wait_for_assignment = consumer_info.locks.empty();
LOG_TRACE(log, "Polling consumer for events");
LOG_TRACE(log, "Polling consumer {} for events", idx);
consumer->pollEvents();
if (wait_for_assignment)
{
while (nullptr == consumer->getKafkaAssignment() && consumer_info.watch.elapsedMilliseconds() < MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS)
consumer->pollEvents();
LOG_INFO(log, "Consumer has assignment: {}", nullptr == consumer->getKafkaAssignment());
}
try
{
if (consumer->needsOffsetUpdate() || consumer_info.locks.empty())
{
LOG_TRACE(log, "Consumer needs update offset");
// First release the locks so let other consumers acquire them ASAP
consumer_info.locks.clear();
consumer_info.topic_partitions.clear();
const auto * current_assignment = consumer->getKafkaAssignment();
if (current_assignment == nullptr)
@ -1162,13 +1301,15 @@ bool StorageKafka2::streamToViews(size_t idx)
// The consumer lost its assignment and haven't received a new one.
// By returning true this function reports the current consumer as a "stalled" stream, which
LOG_TRACE(log, "No assignment");
return true;
return StallReason::NoAssignment;
}
LOG_TRACE(log, "Consumer needs update offset");
consumer_info.consume_from_topic_partition_index = 0;
consumer_info.locks.clear();
consumer_info.topic_partitions.clear();
if (consumer_info.keeper->expired())
{
consumer_info.keeper = getZooKeeperAndAssertActive();
LOG_TEST(log, "Got new zookeeper");
}
auto maybe_locks = lockTopicPartitions(*consumer_info.keeper, *current_assignment);
@ -1176,7 +1317,7 @@ bool StorageKafka2::streamToViews(size_t idx)
{
// We couldn't acquire locks, probably some other consumers are still holding them.
LOG_TRACE(log, "Couldn't acquire locks");
return true;
return StallReason::CouldNotAcquireLocks;
}
consumer_info.locks = std::move(*maybe_locks);
@ -1200,7 +1341,7 @@ bool StorageKafka2::streamToViews(size_t idx)
if (consumer_info.topic_partitions.empty())
{
LOG_TRACE(log, "Consumer {} has assignment, but has no partitions, probably because there are more consumers in the consumer group than partitions.", idx);
return true;
return StallReason::NoPartitions;
}
LOG_TRACE(log, "Trying to consume from consumer {}", idx);
const auto maybe_rows = streamFromConsumer(consumer_info);
@ -1213,22 +1354,22 @@ bool StorageKafka2::streamToViews(size_t idx)
else
{
LOG_DEBUG(log, "Couldn't stream any messages");
return true;
return StallReason::NoMessages;
}
}
catch (const zkutil::KeeperException & e)
{
if (Coordination::isHardwareError(e.code))
{
// Clear ephemeral nodes here as we got a new keeper here
LOG_INFO(log, "Cleaning up topic-partitions locks because of exception: {}", e.displayText());
consumer_info.locks.clear();
consumer_info.keeper = getZooKeeper();
return true;
activating_task->schedule();
return StallReason::KeeperSessionEnded;
}
throw;
}
return false;
return {};
}
@ -1300,30 +1441,51 @@ std::optional<size_t> StorageKafka2::streamFromConsumer(ConsumerAndAssignmentInf
}
lock_info.committed_offset = last_read_offset + 1;
topic_partition.offset = last_read_offset + 1;
consumer_info.consumer->commit(topic_partition);
saveCommittedOffset(keeper_to_use, topic_partition);
consumer_info.consumer->commit(topic_partition);
lock_info.intent_size.reset();
needs_offset_reset = false;
return rows;
}
zkutil::ZooKeeperPtr StorageKafka2::getZooKeeper()
void StorageKafka2::setZooKeeper()
{
std::unique_lock lock{keeper_mutex};
keeper = getContext()->getZooKeeper();
}
zkutil::ZooKeeperPtr StorageKafka2::tryGetZooKeeper() const
{
std::unique_lock lock{keeper_mutex};
if (keeper->expired())
{
keeper = keeper->startNewSession();
}
return keeper;
}
zkutil::ZooKeeperPtr StorageKafka2::getZooKeeper() const
{
auto res = tryGetZooKeeper();
if (!res)
throw Exception(ErrorCodes::NO_ZOOKEEPER, "Cannot get ZooKeeper");
return res;
}
zkutil::ZooKeeperPtr StorageKafka2::getZooKeeperAndAssertActive() const
{
auto res = getZooKeeper();
assertActive();
return res;
}
zkutil::ZooKeeperPtr StorageKafka2::getZooKeeperIfTableShutDown() const
{
zkutil::ZooKeeperPtr new_zookeeper = getContext()->getZooKeeper();
new_zookeeper->sync(keeper_path);
return new_zookeeper;
}
fs::path StorageKafka2::getTopicPartitionPath(const TopicPartition & topic_partition)
{
return fs::path(kafka_settings->kafka_keeper_path.value) / "topics" / topic_partition.topic / "partitions"
/ std::to_string(topic_partition.partition_id);
return fs::path(keeper_path) / "topics" / topic_partition.topic / "partitions" / std::to_string(topic_partition.partition_id);
}
}

View File

@ -127,8 +127,10 @@ private:
};
// Configuration and state
std::mutex keeper_mutex;
mutable std::mutex keeper_mutex;
zkutil::ZooKeeperPtr keeper;
String keeper_path;
String replica_path;
std::unique_ptr<KafkaSettings> kafka_settings;
Macros::MacroExpansionInfo macros_info;
const Names topics;
@ -142,7 +144,6 @@ private:
LoggerPtr log;
Poco::Semaphore semaphore;
const SettingsChanges settings_adjustments;
std::atomic<bool> mv_attached = false;
/// Can differ from num_consumers in case of exception in startup() (or if startup() hasn't been called).
/// In this case we still need to be able to shutdown() properly.
size_t num_created_consumers = 0; /// number of actually created consumers.
@ -156,6 +157,16 @@ private:
String collection_name;
std::atomic<bool> shutdown_called = false;
// Handling replica activation.
std::atomic<bool> is_active = false;
zkutil::EphemeralNodeHolderPtr replica_is_active_node;
BackgroundSchedulePool::TaskHolder activating_task;
String active_node_identifier;
bool first_time = true;
bool activate();
void partialShutdown();
void assertActive() const;
SettingsChanges createSettingsAdjustments();
KafkaConsumer2Ptr createConsumer(size_t consumer_number);
// Returns full consumer related configuration, also the configuration
@ -186,7 +197,16 @@ private:
static Names parseTopics(String topic_list);
static String getDefaultClientId(const StorageID & table_id_);
bool streamToViews(size_t idx);
enum class StallReason
{
NoAssignment,
CouldNotAcquireLocks,
NoPartitions,
NoMessages,
KeeperSessionEnded,
};
std::optional<StallReason> streamToViews(size_t idx);
std::optional<size_t> streamFromConsumer(ConsumerAndAssignmentInfo & consumer_info);
@ -195,7 +215,7 @@ private:
// Returns true if this is the first replica
bool createTableIfNotExists();
// Returns true if all of the nodes were cleaned up
bool removeTableNodesFromZooKeeper(const zkutil::EphemeralNodeHolder::Ptr & drop_lock);
bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr keeper_to_use, const zkutil::EphemeralNodeHolder::Ptr & drop_lock);
// Creates only the replica in ZooKeeper. Shouldn't be called on the first replica as it is created in createTableIfNotExists
void createReplica();
void dropReplica();
@ -212,7 +232,12 @@ private:
Stopwatch & watch,
const ContextPtr & context);
zkutil::ZooKeeperPtr getZooKeeper();
void setZooKeeper();
zkutil::ZooKeeperPtr tryGetZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeperAndAssertActive() const;
zkutil::ZooKeeperPtr getZooKeeperIfTableShutDown() const;
std::filesystem::path getTopicPartitionPath(const TopicPartition & topic_partition);