Handle errors for Kafka engine

This commit is contained in:
Peng Jian 2021-03-18 13:26:32 +08:00 committed by root
parent 57eb5f8772
commit 909d5ad2b5
12 changed files with 91 additions and 10 deletions

View File

@ -547,6 +547,7 @@
M(577, INVALID_SHARD_ID) \
M(578, INVALID_FORMAT_INSERT_QUERY_WITH_DATA) \
M(579, INCORRECT_PART_TYPE) \
M(580, UNKNOWN_HANDLE_KAFKA_ERROR_MODE) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \

View File

@ -454,6 +454,7 @@ class IColumn;
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \
M(UInt64, query_plan_max_optimizations_to_apply, 10000, "Limit the total number of optimizations applied to query plan. If zero, ignored. If limit reached, throw exception", 0) \
M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing. Will be removed after 2021-09-08", 0) \
M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Passible values: default, stream.", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below.

View File

@ -13,6 +13,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL;
extern const int UNKNOWN_UNION;
extern const int UNKNOWN_HANDLE_KAFKA_ERROR_MODE;
}
@ -108,4 +109,7 @@ IMPLEMENT_SETTING_ENUM(DistributedDDLOutputMode, ErrorCodes::BAD_ARGUMENTS,
{"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT},
{"never_throw", DistributedDDLOutputMode::NEVER_THROW}})
IMPLEMENT_SETTING_ENUM(HandleKafkaErrorMode, ErrorCodes::UNKNOWN_HANDLE_KAFKA_ERROR_MODE,
{{"default", HandleKafkaErrorMode::DEFAULT},
{"stream", HandleKafkaErrorMode::STREAM}})
}

View File

@ -138,7 +138,6 @@ enum class UnionMode
DECLARE_SETTING_ENUM(UnionMode)
enum class DistributedDDLOutputMode
{
NONE,
@ -149,4 +148,13 @@ enum class DistributedDDLOutputMode
DECLARE_SETTING_ENUM(DistributedDDLOutputMode)
enum class HandleKafkaErrorMode
{
DEFAULT = 0, // Ignore errors whith threshold.
STREAM, // Put errors to stream in the virtual column named ``_error.
/*FIXED_SYSTEM_TABLE, Put errors to in a fixed system table likey system.kafka_errors. This is not implemented now. */
/*CUSTOM_SYSTEM_TABLE, Put errors to in a custom system table. This is not implemented now. */
};
DECLARE_SETTING_ENUM(HandleKafkaErrorMode)
}

View File

@ -269,7 +269,8 @@ InputFormatPtr FormatFactory::getInputFormat(
const Block & sample,
const Context & context,
UInt64 max_block_size,
const std::optional<FormatSettings> & _format_settings) const
const std::optional<FormatSettings> & _format_settings,
const bool sync_after_error) const
{
const auto & input_getter = getCreators(name).input_processor_creator;
if (!input_getter)
@ -289,7 +290,7 @@ InputFormatPtr FormatFactory::getInputFormat(
params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
params.max_execution_time = settings.max_execution_time;
params.timeout_overflow_mode = settings.timeout_overflow_mode;
params.sync_after_error = sync_after_error;
auto format = input_getter(buf, sample, params, format_settings);
/// It's a kludge. Because I cannot remove context from values format.

View File

@ -133,7 +133,8 @@ public:
const Block & sample,
const Context & context,
UInt64 max_block_size,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
const std::optional<FormatSettings> & format_settings = std::nullopt,
const bool sync_after_error = false) const;
/// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done.
OutputFormatPtr getOutputFormatParallelIfPossible(

View File

@ -29,6 +29,7 @@ struct RowInputFormatParams
Poco::Timespan max_execution_time = 0;
OverflowMode timeout_overflow_mode = OverflowMode::THROW;
bool sync_after_error = false;
};
bool isParseError(int code);

View File

@ -35,8 +35,8 @@ KafkaBlockInputStream::KafkaBlockInputStream(
, max_block_size(max_block_size_)
, commit_in_suffix(commit_in_suffix_)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, virtual_header(metadata_snapshot->getSampleBlockForColumns(
{"_topic", "_key", "_offset", "_partition", "_timestamp", "_timestamp_ms", "_headers.name", "_headers.value"}, storage.getVirtuals(), storage.getStorageID()))
, virtual_header(metadata_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames(), storage.getVirtuals(), storage.getStorageID()))
, handle_error_mode(context_->getSettings().handle_kafka_error_mode)
{
}
@ -78,20 +78,23 @@ Block KafkaBlockInputStream::readImpl()
// now it's one-time usage InputStream
// one block of the needed size (or with desired flush timeout) is formed in one internal iteration
// otherwise external iteration will reuse that and logic will became even more fuzzy
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto put_error_to_stream = handle_error_mode == HandleKafkaErrorMode::STREAM;
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size);
storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size, {}, put_error_to_stream);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);
port.setNeeded();
std::string exception_message;
auto read_kafka_message = [&]
{
size_t new_rows = 0;
exception_message.clear();
while (true)
{
@ -100,7 +103,22 @@ Block KafkaBlockInputStream::readImpl()
switch (status)
{
case IProcessor::Status::Ready:
input_format->work();
try
{
input_format->work();
}
catch (const Exception & e)
{
if (put_error_to_stream)
{
exception_message = e.message();
new_rows++;
}
else
{
throw e;
}
}
break;
case IProcessor::Status::Finished:
@ -138,6 +156,11 @@ Block KafkaBlockInputStream::readImpl()
{
auto new_rows = buffer->poll() ? read_kafka_message() : 0;
if (!exception_message.empty())
{
for (size_t i = 0; i < result_columns.size(); ++i)
result_columns[i]->insertDefault();
}
if (new_rows)
{
// In read_kafka_message(), ReadBufferFromKafkaConsumer::nextImpl()
@ -189,6 +212,15 @@ Block KafkaBlockInputStream::readImpl()
}
virtual_columns[6]->insert(headers_names);
virtual_columns[7]->insert(headers_values);
if (put_error_to_stream)
{
auto payload = buffer->currentPayload();
virtual_columns[8]->insert(payload);
if (exception_message.empty())
virtual_columns[9]->insertDefault();
else
virtual_columns[9]->insert(exception_message);
}
}
total_rows = total_rows + new_rows;

View File

@ -51,6 +51,7 @@ private:
const Block non_virtual_header;
const Block virtual_header;
const HandleKafkaErrorMode handle_error_mode;
};
}

View File

@ -63,6 +63,7 @@ public:
auto currentPartition() const { return current[-1].get_partition(); }
auto currentTimestamp() const { return current[-1].get_timestamp(); }
const auto & currentHeaderList() const { return current[-1].get_header_list(); }
String currentPayload() const { return current[-1].get_payload(); }
private:
using Messages = std::vector<cppkafka::Message>;

View File

@ -760,7 +760,7 @@ void registerStorageKafka(StorageFactory & factory)
NamesAndTypesList StorageKafka::getVirtuals() const
{
return NamesAndTypesList{
auto result = NamesAndTypesList{
{"_topic", std::make_shared<DataTypeString>()},
{"_key", std::make_shared<DataTypeString>()},
{"_offset", std::make_shared<DataTypeUInt64>()},
@ -770,6 +770,32 @@ NamesAndTypesList StorageKafka::getVirtuals() const
{"_headers.name", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"_headers.value", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}
};
if (handle_error_mode == HandleKafkaErrorMode::STREAM)
{
result.push_back({"_raw_message", std::make_shared<DataTypeString>()});
result.push_back({"_error", std::make_shared<DataTypeString>()});
}
return result;
}
Names StorageKafka::getVirtualColumnNames() const
{
auto result = Names {
"_topic",
"_key",
"_offset",
"_partition",
"_timestamp",
"_timestamp_ms",
"_headers.name",
"_headers.value",
};
if (handle_error_mode == HandleKafkaErrorMode::STREAM)
{
result.push_back({"_raw_message"});
result.push_back({"_error"});
}
return result;
}
}

View File

@ -64,6 +64,7 @@ public:
const auto & getFormatName() const { return format_name; }
NamesAndTypesList getVirtuals() const override;
Names getVirtualColumnNames() const;
protected:
StorageKafka(
const StorageID & table_id_,
@ -112,6 +113,9 @@ private:
std::mutex thread_statuses_mutex;
std::list<std::shared_ptr<ThreadStatus>> thread_statuses;
/// Handle error mode
HandleKafkaErrorMode handle_error_mode;
SettingsChanges createSettingsAdjustments();
ConsumerBufferPtr createReadBuffer(const size_t consumer_number);