kafka_dead_letter_queue: initial, legacy tests work

This commit is contained in:
Ilya Golshtein 2024-08-21 07:35:14 +00:00
parent 1513eda3ef
commit b0887af6c6
18 changed files with 151 additions and 95 deletions

View File

@ -18,6 +18,7 @@
#include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Interpreters/BackupLog.h>
#include <Interpreters/KafkaDeadLetterQueue.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Common/MemoryTrackerBlockerInThread.h>

View File

@ -32,6 +32,7 @@
M(AsynchronousInsertLogElement) \
M(BackupLogElement) \
M(BlobStorageLogElement) \
M(DeadLetterQueueElement) \
M(ErrorLogElement)
namespace Poco

View File

@ -141,6 +141,11 @@ IMPLEMENT_SETTING_ENUM(StreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS,
{{"default", StreamingHandleErrorMode::DEFAULT},
{"stream", StreamingHandleErrorMode::STREAM}})
IMPLEMENT_SETTING_ENUM(ExtStreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS,
{{"default", ExtStreamingHandleErrorMode::DEFAULT},
{"stream", ExtStreamingHandleErrorMode::STREAM},
{"dead_letter_queue", ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE}})
IMPLEMENT_SETTING_ENUM(ShortCircuitFunctionEvaluation, ErrorCodes::BAD_ARGUMENTS,
{{"enable", ShortCircuitFunctionEvaluation::ENABLE},
{"force_enable", ShortCircuitFunctionEvaluation::FORCE_ENABLE},

View File

@ -261,11 +261,20 @@ enum class StreamingHandleErrorMode : uint8_t
{
DEFAULT = 0, // Ignore errors with threshold.
STREAM, // Put errors to stream in the virtual column named ``_error.
};
DECLARE_SETTING_ENUM(StreamingHandleErrorMode)
enum class ExtStreamingHandleErrorMode : uint8_t
{
DEFAULT = 0, // Ignore errors with threshold.
STREAM, // Put errors to stream in the virtual column named ``_error.
DEAD_LETTER_QUEUE
/*FIXED_SYSTEM_TABLE, Put errors to in a fixed system table likely system.kafka_errors. This is not implemented now. */
/*CUSTOM_SYSTEM_TABLE, Put errors to in a custom system table. This is not implemented now. */
};
DECLARE_SETTING_ENUM(StreamingHandleErrorMode)
DECLARE_SETTING_ENUM(ExtStreamingHandleErrorMode)
DECLARE_SETTING_ENUM(ShortCircuitFunctionEvaluation)

View File

@ -4143,7 +4143,6 @@ std::shared_ptr<PartLog> Context::getPartLog(const String & part_database) const
return shared->system_logs->part_log;
}
std::shared_ptr<TraceLog> Context::getTraceLog() const
{
SharedLockGuard lock(shared->mutex);
@ -4313,6 +4312,15 @@ std::shared_ptr<BlobStorageLog> Context::getBlobStorageLog() const
return shared->system_logs->blob_storage_log;
}
std::shared_ptr<DeadLetterQueue> Context::getDeadLetterQueue() const
{
SharedLockGuard lock(shared->mutex);
if (!shared->system_logs->dead_letter_queue)
return {};
return shared->system_logs->dead_letter_queue;
}
SystemLogs Context::getSystemLogs() const
{
SharedLockGuard lock(shared->mutex);

View File

@ -114,6 +114,7 @@ class ObjectStorageQueueLog;
class AsynchronousInsertLog;
class BackupLog;
class BlobStorageLog;
class DeadLetterQueue;
class IAsynchronousReader;
class IOUringReader;
struct MergeTreeSettings;
@ -1151,6 +1152,7 @@ public:
std::shared_ptr<AsynchronousInsertLog> getAsynchronousInsertLog() const;
std::shared_ptr<BackupLog> getBackupLog() const;
std::shared_ptr<BlobStorageLog> getBlobStorageLog() const;
std::shared_ptr<DeadLetterQueue> getDeadLetterQueue() const;
SystemLogs getSystemLogs() const;

View File

@ -28,6 +28,7 @@
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
#include <Interpreters/ObjectStorageQueueLog.h>
#include <Interpreters/KafkaDeadLetterQueue.h>
#include <Interpreters/SessionLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>

View File

@ -30,6 +30,7 @@
M(AsynchronousInsertLog, asynchronous_insert_log, "Contains a history for all asynchronous inserts executed on current server.") \
M(BackupLog, backup_log, "Contains logging entries with the information about BACKUP and RESTORE operations.") \
M(BlobStorageLog, blob_storage_log, "Contains logging entries with information about various blob storage operations such as uploads and deletes.") \
M(DeadLetterQueue, dead_letter_queue, "Contains messages that came from a streaming engine (e.g. Kafka) and were parsed unsuccessfully.") \
namespace DB

View File

@ -35,7 +35,7 @@ const auto KAFKA_CONSUMERS_POOL_TTL_MS_MAX = 600'000;
/* default is stream_flush_interval_ms */ \
M(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \
M(StreamingHandleErrorMode, kafka_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default (throw an exception after rabbitmq_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \
M(ExtStreamingHandleErrorMode, kafka_handle_error_mode, ExtStreamingHandleErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default (throw an exception after kafka_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error), dead_letter_queue", 0) \
M(Bool, kafka_commit_on_select, false, "Commit messages when select query is made", 0) \
M(UInt64, kafka_max_rows_per_message, 1, "The maximum number of rows produced in one kafka message for row-based formats.", 0) \
M(String, kafka_keeper_path, "", "The path to the table in ClickHouse Keeper", 0) \

View File

@ -6,6 +6,7 @@
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Common/logger_useful.h>
#include <Interpreters/Context.h>
#include <Interpreters/KafkaDeadLetterQueue.h>
#include <Common/ProfileEvents.h>
@ -46,7 +47,7 @@ KafkaSource::KafkaSource(
, commit_in_suffix(commit_in_suffix_)
, non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized())
, virtual_header(storage.getVirtualsHeader())
, handle_error_mode(storage.getStreamingHandleErrorMode())
, handle_error_mode(storage.getHandleErrorMode())
{
}
@ -98,8 +99,6 @@ Chunk KafkaSource::generateImpl()
// otherwise external iteration will reuse that and logic will became even more fuzzy
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto put_error_to_stream = handle_error_mode == StreamingHandleErrorMode::STREAM;
EmptyReadBuffer empty_buf;
auto input_format = FormatFactory::instance().getInput(
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
@ -108,34 +107,39 @@ Chunk KafkaSource::generateImpl()
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&, this](const MutableColumns & result_columns, Exception & e)
{
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
if (put_error_to_stream)
switch (handle_error_mode)
{
exception_message = e.message();
for (const auto & column : result_columns)
case ExtStreamingHandleErrorMode::STREAM:
case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE:
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
exception_message = e.message();
for (const auto & column : result_columns)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// all data columns will get default value in case of error
column->insertDefault();
// all data columns will get default value in case of error
column->insertDefault();
}
break;
}
case ExtStreamingHandleErrorMode::DEFAULT:
{
e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'",
consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset());
consumer->setExceptionInfo(e.message());
throw std::move(e);
}
}
return 1;
return 1;
}
else
{
e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'",
consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset());
consumer->setExceptionInfo(e.message());
throw std::move(e);
}
};
StreamingFormatExecutor executor(non_virtual_header, input_format, std::move(on_error));
@ -203,7 +207,7 @@ Chunk KafkaSource::generateImpl()
}
virtual_columns[6]->insert(headers_names);
virtual_columns[7]->insert(headers_values);
if (put_error_to_stream)
if (handle_error_mode == ExtStreamingHandleErrorMode::STREAM)
{
if (exception_message)
{
@ -217,6 +221,22 @@ Chunk KafkaSource::generateImpl()
virtual_columns[9]->insertDefault();
}
}
else if (handle_error_mode == ExtStreamingHandleErrorMode::STREAM && exception_message)
{
const auto time_now = std::chrono::system_clock::now();
auto storage_id = storage.getStorageID();
auto dead_letter_queue = context->getDeadLetterQueue();
dead_letter_queue->add(
DeadLetterQueueElement{
.event_time = timeInSeconds(time_now),
.event_time_microseconds = timeInMicroseconds(time_now),
.database_name = storage_id.database_name,
.table_name = storage_id.table_name,
.raw_message = consumer->currentPayload(),
.error = exception_message.value(),
});
}
}
total_rows = total_rows + new_rows;
@ -232,7 +252,7 @@ Chunk KafkaSource::generateImpl()
else
{
// We came here in case of tombstone (or sometimes zero-length) messages, and it is not something abnormal
// TODO: it seems like in case of put_error_to_stream=true we may need to process those differently
// TODO: it seems like in case of ExtStreamingHandleErrorMode::STREAM we may need to process those differently
// currently we just skip them with note in logs.
consumer->storeLastReadMessageOffset();
LOG_DEBUG(log, "Parsing of message (topic: {}, partition: {}, offset: {}) return no rows.", consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset());

View File

@ -51,7 +51,7 @@ private:
const Block non_virtual_header;
const Block virtual_header;
const StreamingHandleErrorMode handle_error_mode;
const ExtStreamingHandleErrorMode handle_error_mode;
Poco::Timespan max_execution_time = 0;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};

View File

@ -165,7 +165,8 @@ StorageKafka::StorageKafka(
{
kafka_settings->sanityCheck();
if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM)
if (kafka_settings->kafka_handle_error_mode == ExtStreamingHandleErrorMode::STREAM ||
kafka_settings->kafka_handle_error_mode == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE)
{
kafka_settings->input_format_allow_errors_num = 0;
kafka_settings->input_format_allow_errors_ratio = 0;

View File

@ -80,7 +80,7 @@ public:
const auto & getFormatName() const { return format_name; }
StreamingHandleErrorMode getStreamingHandleErrorMode() const { return kafka_settings->kafka_handle_error_mode; }
ExtStreamingHandleErrorMode getHandleErrorMode() const { return kafka_settings->kafka_handle_error_mode; }
struct SafeConsumers
{

View File

@ -125,7 +125,8 @@ StorageKafka2::StorageKafka2(
if (kafka_settings->kafka_num_consumers > 1 && !thread_per_consumer)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "With multiple consumers, it is required to use `kafka_thread_per_consumer` setting");
if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM)
if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::STREAM ||
getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE)
{
kafka_settings->input_format_allow_errors_num = 0;
kafka_settings->input_format_allow_errors_ratio = 0;
@ -134,7 +135,7 @@ StorageKafka2::StorageKafka2(
storage_metadata.setColumns(columns_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
setVirtuals(StorageKafkaUtils::createVirtuals(kafka_settings->kafka_handle_error_mode));
setVirtuals(StorageKafkaUtils::createVirtuals(getHandleKafkaErrorMode()));
auto task_count = thread_per_consumer ? num_consumers : 1;
for (size_t i = 0; i < task_count; ++i)
@ -807,8 +808,6 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
// otherwise external iteration will reuse that and logic will became even more fuzzy
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto put_error_to_stream = kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM;
EmptyReadBuffer empty_buf;
auto input_format = FormatFactory::instance().getInput(
getFormatName(), empty_buf, non_virtual_header, modified_context, getMaxBlockSize(), std::nullopt, 1);
@ -817,36 +816,44 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&, this](const MutableColumns & result_columns, Exception & e)
{
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
if (put_error_to_stream)
switch (getHandleKafkaErrorMode())
{
exception_message = e.message();
for (const auto & column : result_columns)
case ExtStreamingHandleErrorMode::STREAM:
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
exception_message = e.message();
for (const auto & column : result_columns)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// all data columns will get default value in case of error
column->insertDefault();
// all data columns will get default value in case of error
column->insertDefault();
}
break;
}
case ExtStreamingHandleErrorMode::DEFAULT:
{
e.addMessage(
"while parsing Kafka message (topic: {}, partition: {}, offset: {})'",
consumer.currentTopic(),
consumer.currentPartition(),
consumer.currentOffset());
throw std::move(e);
}
case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE:
{
LOG_DEBUG(log, "Not implemented.");
break;
}
return 1;
}
else
{
e.addMessage(
"while parsing Kafka message (topic: {}, partition: {}, offset: {})'",
consumer.currentTopic(),
consumer.currentPartition(),
consumer.currentOffset());
throw std::move(e);
}
return 1;
};
StreamingFormatExecutor executor(non_virtual_header, input_format, std::move(on_error));
@ -922,7 +929,7 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
}
virtual_columns[6]->insert(headers_names);
virtual_columns[7]->insert(headers_values);
if (put_error_to_stream)
if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::STREAM)
{
if (exception_message)
{
@ -947,7 +954,7 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
else
{
// We came here in case of tombstone (or sometimes zero-length) messages, and it is not something abnormal
// TODO: it seems like in case of put_error_to_stream=true we may need to process those differently
// TODO: it seems like in case of ExtStreamingHandleErrorMode::STREAM we may need to process those differently
// currently we just skip them with note in logs.
LOG_DEBUG(
log,

View File

@ -89,7 +89,7 @@ public:
const auto & getFormatName() const { return format_name; }
StreamingHandleErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; }
ExtStreamingHandleErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; }
private:
using TopicPartition = KafkaConsumer2::TopicPartition;

View File

@ -59,6 +59,8 @@ namespace ErrorCodes
void registerStorageKafka(StorageFactory & factory)
{
LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "Top of registerStorageKafka");
auto creator_fn = [](const StorageFactory::Arguments & args) -> std::shared_ptr<IStorage>
{
ASTs & engine_args = args.engine_args;
@ -72,6 +74,9 @@ void registerStorageKafka(StorageFactory & factory)
for (const auto & setting : kafka_settings->all())
{
const auto & setting_name = setting.getName();
LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka (named collection): processing {}", setting_name);
if (named_collection->has(setting_name))
kafka_settings->set(setting_name, named_collection->get<String>(setting_name));
}
@ -80,7 +85,9 @@ void registerStorageKafka(StorageFactory & factory)
if (has_settings)
{
LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: before loadFromQuery");
kafka_settings->loadFromQuery(*args.storage_def);
LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: after loadFromQuery");
}
// Check arguments and settings
@ -154,7 +161,9 @@ void registerStorageKafka(StorageFactory & factory)
CHECK_KAFKA_STORAGE_ARGUMENT(12, kafka_poll_timeout_ms, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(13, kafka_flush_interval_ms, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(14, kafka_thread_per_consumer, 0)
LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: before kafka_handle_error_mode CHECK_KAFKA_STORAGE_ARGUMENT");
CHECK_KAFKA_STORAGE_ARGUMENT(15, kafka_handle_error_mode, 0)
LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: after kafka_handle_error_mode CHECK_KAFKA_STORAGE_ARGUMENT");
CHECK_KAFKA_STORAGE_ARGUMENT(16, kafka_commit_on_select, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(17, kafka_max_rows_per_message, 0)
}
@ -282,6 +291,8 @@ void registerStorageKafka(StorageFactory & factory)
args.table_id, args.getContext(), args.columns, args.comment, std::move(kafka_settings), collection_name);
};
factory.registerStorage(
"Kafka",
creator_fn,
@ -427,7 +438,7 @@ bool checkDependencies(const StorageID & table_id, const ContextPtr& context)
}
VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode)
VirtualColumnsDescription createVirtuals(ExtStreamingHandleErrorMode handle_error_mode)
{
VirtualColumnsDescription desc;
@ -440,7 +451,7 @@ VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_m
desc.addEphemeral("_headers.name", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "");
desc.addEphemeral("_headers.value", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "");
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
if (handle_error_mode == ExtStreamingHandleErrorMode::STREAM)
{
desc.addEphemeral("_raw_message", std::make_shared<DataTypeString>(), "");
desc.addEphemeral("_error", std::make_shared<DataTypeString>(), "");

View File

@ -47,7 +47,7 @@ SettingsChanges createSettingsAdjustments(KafkaSettings & kafka_settings, const
bool checkDependencies(const StorageID & table_id, const ContextPtr& context);
VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode);
VirtualColumnsDescription createVirtuals(ExtStreamingHandleErrorMode handle_error_mode);
}
}

View File

@ -13,7 +13,7 @@ if is_arm():
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance",
main_configs=["configs/kafka.xml"],
main_configs=["configs/kafka.xml", "configs/dead_letter_queue.xml"],
with_kafka=True,
)
@ -122,8 +122,19 @@ def kafka_cluster():
finally:
cluster.shutdown()
def view_test(expected_num_messages):
attempt = 0
rows = 0
while attempt < 500:
rows = int(instance.query("SELECT count() FROM view"))
if rows == expected_num_messages:
break
attempt += 1
def test_bad_messages_parsing_stream(kafka_cluster):
assert rows == expected_num_messages
def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, check_method):
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
@ -165,7 +176,7 @@ def test_bad_messages_parsing_stream(kafka_cluster):
kafka_topic_list = '{format_name}_err',
kafka_group_name = '{format_name}',
kafka_format = '{format_name}',
kafka_handle_error_mode='stream';
kafka_handle_error_mode= '{handle_error_mode}';
CREATE MATERIALIZED VIEW view Engine=Log AS
SELECT _error FROM kafka WHERE length(_error) != 0 ;
@ -175,15 +186,7 @@ def test_bad_messages_parsing_stream(kafka_cluster):
messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"]
kafka_produce(kafka_cluster, f"{format_name}_err", messages)
attempt = 0
rows = 0
while attempt < 500:
rows = int(instance.query("SELECT count() FROM view"))
if rows == len(messages):
break
attempt += 1
assert rows == len(messages)
check_method(len(messages))
kafka_delete_topic(admin_client, f"{format_name}_err")
@ -210,7 +213,7 @@ message Message {
kafka_topic_list = '{format_name}_err',
kafka_group_name = '{format_name}',
kafka_format = '{format_name}',
kafka_handle_error_mode='stream',
kafka_handle_error_mode= '{handle_error_mode}',
kafka_schema='schema_test_errors:Message';
CREATE MATERIALIZED VIEW view Engine=Log AS
@ -225,15 +228,7 @@ message Message {
messages = ["qwertyuiop", "poiuytrewq", "zxcvbnm"]
kafka_produce(kafka_cluster, f"{format_name}_err", messages)
attempt = 0
rows = 0
while attempt < 500:
rows = int(instance.query("SELECT count() FROM view"))
if rows == len(messages):
break
attempt += 1
assert rows == len(messages)
check_method(len(messages))
kafka_delete_topic(admin_client, f"{format_name}_err")
@ -259,7 +254,7 @@ struct Message
kafka_topic_list = 'CapnProto_err',
kafka_group_name = 'CapnProto',
kafka_format = 'CapnProto',
kafka_handle_error_mode='stream',
kafka_handle_error_mode= '{handle_error_mode}',
kafka_schema='schema_test_errors:Message';
CREATE MATERIALIZED VIEW view Engine=Log AS
@ -274,18 +269,12 @@ struct Message
messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"]
kafka_produce(kafka_cluster, "CapnProto_err", messages)
attempt = 0
rows = 0
while attempt < 500:
rows = int(instance.query("SELECT count() FROM view"))
if rows == len(messages):
break
attempt += 1
assert rows == len(messages)
check_method(len(messages))
kafka_delete_topic(admin_client, "CapnProto_err")
def test_bad_messages_parsing_stream(kafka_cluster):
bad_messages_parsing_mode(kafka_cluster, 'stream', view_test)
def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20):
admin_client = KafkaAdminClient(