Fix one more test

This commit is contained in:
János Benjamin Antal 2024-06-08 21:52:28 +00:00
parent 26851f1d34
commit 419660d1b0
5 changed files with 23 additions and 64 deletions

View File

@ -17,7 +17,6 @@
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include "base/scope_guard.h"
namespace CurrentMetrics
{
@ -45,7 +44,6 @@ namespace ErrorCodes
}
using namespace std::chrono_literals;
static constexpr auto MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS = 15000;
static constexpr auto EVENT_POLL_TIMEOUT = 50ms;
static constexpr auto DRAIN_TIMEOUT_MS = 5000ms;
@ -122,7 +120,6 @@ KafkaConsumer2::KafkaConsumer2(
assignment.reset();
queues.clear();
needs_offset_update = true;
waited_for_assignment = 0;
});
consumer->set_rebalance_error_callback(
@ -210,13 +207,8 @@ void KafkaConsumer2::pollEvents()
consumer_has_subscription = !consumer->get_subscription().empty();
}
auto msg = consumer->poll(EVENT_POLL_TIMEOUT);
LOG_TRACE(log, "Consumer has subscription: {}", consumer_has_subscription);
// All the partition queues are detached, so the consumer shouldn't be able to poll any messages
chassert(!msg && "Consumer returned a message when it was not expected");
auto consumer_queue = consumer->get_consumer_queue();
for(auto i = 0; i < max_tries && consumer_queue.get_length() > 0; ++i)
consumer->poll(EVENT_POLL_TIMEOUT);
};
KafkaConsumer2::TopicPartitionCounts KafkaConsumer2::getPartitionCounts() const
@ -321,34 +313,10 @@ ReadBufferPtr KafkaConsumer2::consume(const TopicPartition & topic_partition, co
}
if (new_messages.empty())
{
// While we wait for an assignment after subscription, we'll poll zero messages anyway.
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (!assignment.has_value())
{
waited_for_assignment += poll_timeout; // slightly innaccurate, but rough calculation is ok.
if (waited_for_assignment < MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS)
{
continue;
}
else
{
LOG_WARNING(log, "Can't get assignment. Will keep trying.");
stalled_status = StalledStatus::NO_ASSIGNMENT;
return nullptr;
}
}
else if (assignment->empty())
{
LOG_TRACE(log, "Empty assignment.");
return nullptr;
}
else
{
LOG_TRACE(log, "Stalled");
return nullptr;
}
}
else
{
messages = std::move(new_messages);

View File

@ -141,8 +141,6 @@ private:
StalledStatus stalled_status = StalledStatus::NO_MESSAGES_RETURNED;
size_t waited_for_assignment = 0;
const std::atomic<bool> & stopped;
// order is important, need to be destructed before consumer

View File

@ -100,6 +100,7 @@ extern const int TABLE_WAS_NOT_DROPPED;
namespace
{
constexpr auto MAX_FAILED_POLL_ATTEMPTS = 10;
constexpr auto MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS = 15000;
}
StorageKafka2::StorageKafka2(
@ -163,21 +164,6 @@ StorageKafka2::StorageKafka2(
tryLogCurrentException(log);
}
}
// for (auto try_count = 0; try_count < 5; ++try_count)
// {
// bool all_had_assignment = true;
// for (auto & consumer_info : consumers)
// {
// if (nullptr == consumer_info.consumer->getKafkaAssignment())
// {
// all_had_assignment = false;
// consumer_info.consumer->pollEvents();
// }
// }
// if (all_had_assignment)
// break;
// }
const auto first_replica = createTableIfNotExists();
@ -875,8 +861,10 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
KafkaConsumer2 & consumer,
const TopicPartition & topic_partition,
std::optional<int64_t> message_count,
Stopwatch & total_stopwatch,
const ContextPtr & modified_context)
{
LOG_TEST(log, "Polling consumer");
PolledBatchInfo batch_info;
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext());
Block non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized());
@ -936,9 +924,7 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
? kafka_settings->kafka_flush_interval_ms
: getContext()->getSettingsRef().stream_flush_interval_ms;
Stopwatch total_stopwatch{CLOCK_MONOTONIC_COARSE};
const auto check_time_limit = [&max_execution_time, &total_stopwatch]()
const auto check_time_limit = [&max_execution_time, &total_stopwatch, this]()
{
if (max_execution_time != 0)
{
@ -1139,8 +1125,6 @@ bool StorageKafka2::streamToViews(size_t idx)
// 7. Execute the pipeline
// 8. Write the offset to Keeper
Stopwatch watch;
auto table_id = getStorageID();
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (!table)
@ -1150,12 +1134,20 @@ bool StorageKafka2::streamToViews(size_t idx)
ProfileEvents::increment(ProfileEvents::KafkaBackgroundReads);
auto & consumer_info = consumers[idx];
consumer_info.watch.restart();
auto & consumer = consumer_info.consumer;
// To keep the consumer alive
const auto wait_for_assignment = consumer_info.locks.empty();
LOG_TRACE(log, "Polling consumer for events");
consumer->pollEvents();
if (wait_for_assignment)
{
while (nullptr == consumer->getKafkaAssignment() && consumer_info.watch.elapsedMilliseconds() < MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS)
consumer->pollEvents();
}
try
{
if (consumer->needsOffsetUpdate() || consumer_info.locks.empty())
@ -1168,6 +1160,7 @@ bool StorageKafka2::streamToViews(size_t idx)
{
// The consumer lost its assignment and haven't received a new one.
// By returning true this function reports the current consumer as a "stalled" stream, which
LOG_TRACE(log, "No assignment");
return true;
}
LOG_TRACE(log, "Consumer needs update offset");
@ -1181,6 +1174,7 @@ bool StorageKafka2::streamToViews(size_t idx)
if (!maybe_locks.has_value())
{
// We couldn't acquire locks, probably some other consumers are still holding them.
LOG_TRACE(log, "Couldn't acquire locks");
return true;
}
@ -1206,7 +1200,7 @@ bool StorageKafka2::streamToViews(size_t idx)
const auto maybe_rows = streamFromConsumer(consumer_info);
if (maybe_rows.has_value())
{
const auto milliseconds = watch.elapsedMilliseconds();
const auto milliseconds = consumer_info.watch.elapsedMilliseconds();
LOG_DEBUG(log, "Pushing {} rows to {} took {} ms.", formatReadableQuantity(*maybe_rows), table_id.getNameForLogs(), milliseconds);
}
else
@ -1262,8 +1256,8 @@ std::optional<size_t> StorageKafka2::streamFromConsumer(ConsumerAndAssignmentInf
return;
consumer_info.consumer->updateOffsets(consumer_info.topic_partitions);
});
auto [blocks, last_read_offset]
= pollConsumer(*consumer_info.consumer, topic_partition, consumer_info.locks[topic_partition].intent_size, kafka_context);
auto [blocks, last_read_offset] = pollConsumer(
*consumer_info.consumer, topic_partition, consumer_info.locks[topic_partition].intent_size, consumer_info.watch, kafka_context);
if (blocks.empty())
{

View File

@ -102,6 +102,7 @@ private:
TopicPartitions topic_partitions;
zkutil::ZooKeeperPtr keeper;
TopicPartitionLocks locks;
Stopwatch watch{CLOCK_MONOTONIC_COARSE};
};
struct PolledBatchInfo
@ -208,6 +209,7 @@ private:
KafkaConsumer2 & consumer,
const TopicPartition & topic_partition,
std::optional<int64_t> message_count,
Stopwatch & watch,
const ContextPtr & context);
zkutil::ZooKeeperPtr getZooKeeper();

View File

@ -2860,13 +2860,10 @@ def test_kafka_produce_consume_avro(kafka_cluster, create_query_generator):
assert int(expected_max_key) == (num_rows - 1) * 10
@pytest.mark.parametrize(
"create_query_generator",
[
generate_old_create_table_query,
# TODO(antaljanosbenjamin): Something is off with timing
# generate_new_create_table_query
],
[generate_old_create_table_query, generate_new_create_table_query],
)
def test_kafka_flush_by_time(kafka_cluster, create_query_generator):
admin_client = KafkaAdminClient(