Allow to save unparsed records and errors in RabbitMQ, NATS and FileLog engines

This commit is contained in:
avogar 2023-10-10 16:21:33 +00:00
parent 9ebecb5499
commit 1afd655bbc
32 changed files with 680 additions and 82 deletions

View File

@ -828,7 +828,7 @@ class IColumn;
MAKE_OBSOLETE(M, Bool, allow_experimental_geo_types, true) \ MAKE_OBSOLETE(M, Bool, allow_experimental_geo_types, true) \
\ \
MAKE_OBSOLETE(M, Milliseconds, async_insert_stale_timeout_ms, 0) \ MAKE_OBSOLETE(M, Milliseconds, async_insert_stale_timeout_ms, 0) \
MAKE_OBSOLETE(M, HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT) \ MAKE_OBSOLETE(M, StreamingHandleErrorMode, handle_kafka_error_mode, StreamingHandleErrorMode::DEFAULT) \
MAKE_OBSOLETE(M, Bool, database_replicated_ddl_output, true) \ MAKE_OBSOLETE(M, Bool, database_replicated_ddl_output, true) \
MAKE_OBSOLETE(M, UInt64, replication_alter_columns_timeout, 60) \ MAKE_OBSOLETE(M, UInt64, replication_alter_columns_timeout, 60) \
MAKE_OBSOLETE(M, UInt64, odbc_max_field_size, 0) \ MAKE_OBSOLETE(M, UInt64, odbc_max_field_size, 0) \

View File

@ -110,9 +110,9 @@ IMPLEMENT_SETTING_ENUM(DistributedDDLOutputMode, ErrorCodes::BAD_ARGUMENTS,
{"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT}, {"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT},
{"never_throw", DistributedDDLOutputMode::NEVER_THROW}}) {"never_throw", DistributedDDLOutputMode::NEVER_THROW}})
IMPLEMENT_SETTING_ENUM(HandleKafkaErrorMode, ErrorCodes::BAD_ARGUMENTS, IMPLEMENT_SETTING_ENUM(StreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS,
{{"default", HandleKafkaErrorMode::DEFAULT}, {{"default", StreamingHandleErrorMode::DEFAULT},
{"stream", HandleKafkaErrorMode::STREAM}}) {"stream", StreamingHandleErrorMode::STREAM}})
IMPLEMENT_SETTING_ENUM(ShortCircuitFunctionEvaluation, ErrorCodes::BAD_ARGUMENTS, IMPLEMENT_SETTING_ENUM(ShortCircuitFunctionEvaluation, ErrorCodes::BAD_ARGUMENTS,
{{"enable", ShortCircuitFunctionEvaluation::ENABLE}, {{"enable", ShortCircuitFunctionEvaluation::ENABLE},

View File

@ -163,7 +163,7 @@ enum class DistributedDDLOutputMode
DECLARE_SETTING_ENUM(DistributedDDLOutputMode) DECLARE_SETTING_ENUM(DistributedDDLOutputMode)
enum class HandleKafkaErrorMode enum class StreamingHandleErrorMode
{ {
DEFAULT = 0, // Ignore errors with threshold. DEFAULT = 0, // Ignore errors with threshold.
STREAM, // Put errors to stream in the virtual column named ``_error. STREAM, // Put errors to stream in the virtual column named ``_error.
@ -171,7 +171,7 @@ enum class HandleKafkaErrorMode
/*CUSTOM_SYSTEM_TABLE, Put errors to in a custom system table. This is not implemented now. */ /*CUSTOM_SYSTEM_TABLE, Put errors to in a custom system table. This is not implemented now. */
}; };
DECLARE_SETTING_ENUM(HandleKafkaErrorMode) DECLARE_SETTING_ENUM(StreamingHandleErrorMode)
enum class ShortCircuitFunctionEvaluation enum class ShortCircuitFunctionEvaluation
{ {

View File

@ -214,7 +214,6 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
{ {
auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, configuration); auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, configuration);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer)); auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
avro::NodePtr root_node = file_reader->dataSchema().root(); avro::NodePtr root_node = file_reader->dataSchema().root();
size_t leaves_num = root_node->leaves(); size_t leaves_num = root_node->leaves();
size_t expected_min_num = metadata.format_version == 1 ? 3 : 2; size_t expected_min_num = metadata.format_version == 1 ? 3 : 2;

View File

@ -4,7 +4,7 @@
namespace DB namespace DB
{ {
class ReadBufferFromFileLog; class FileLogConsumer;
using ReadBufferFromFileLogPtr = std::shared_ptr<ReadBufferFromFileLog>; using ReadBufferFromFileLogPtr = std::shared_ptr<FileLogConsumer>;
} }

View File

@ -0,0 +1,165 @@
#include <Interpreters/Context.h>
#include <Storages/FileLog/FileLogConsumer.h>
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <IO/ReadBufferFromString.h>
#include <algorithm>
#include <filesystem>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_READ_ALL_DATA;
}
FileLogConsumer::FileLogConsumer(
StorageFileLog & storage_,
size_t max_batch_size,
size_t poll_timeout_,
ContextPtr context_,
size_t stream_number_,
size_t max_streams_number_)
: log(&Poco::Logger::get("FileLogConsumer " + toString(stream_number_)))
, storage(storage_)
, batch_size(max_batch_size)
, poll_timeout(poll_timeout_)
, context(context_)
, stream_number(stream_number_)
, max_streams_number(max_streams_number_)
{
current = records.begin();
allowed = false;
}
ReadBufferPtr FileLogConsumer::consume()
{
if (hasMorePolledRecords())
return getNextRecord();
auto new_records = pollBatch(batch_size);
if (new_records.empty())
{
buffer_status = BufferStatus::NO_RECORD_RETURNED;
LOG_TRACE(log, "No new records to read");
return nullptr;
}
else
{
records = std::move(new_records);
current = records.begin();
LOG_TRACE(log, "Polled batch of {} records. ", records.size());
buffer_status = BufferStatus::POLLED_OK;
allowed = true;
return getNextRecord();
}
}
FileLogConsumer::Records FileLogConsumer::pollBatch(size_t batch_size_)
{
Records new_records;
new_records.reserve(batch_size_);
readNewRecords(new_records, batch_size);
if (new_records.size() == batch_size_ || stream_out)
return new_records;
Stopwatch watch;
while (watch.elapsedMilliseconds() < poll_timeout && new_records.size() != batch_size_)
{
readNewRecords(new_records, batch_size);
/// All ifstrem reach end, no need to wait for timeout,
/// since file status can not be updated during a streamToViews
if (stream_out)
break;
}
return new_records;
}
void FileLogConsumer::readNewRecords(FileLogConsumer::Records & new_records, size_t batch_size_)
{
size_t need_records_size = batch_size_ - new_records.size();
size_t read_records_size = 0;
auto & file_infos = storage.getFileInfos();
size_t files_per_stream = file_infos.file_names.size() / max_streams_number;
size_t start = stream_number * files_per_stream;
size_t end = stream_number == max_streams_number - 1 ? file_infos.file_names.size() : (stream_number + 1) * files_per_stream;
for (size_t i = start; i < end; ++i)
{
const auto & file_name = file_infos.file_names[i];
auto & file_ctx = StorageFileLog::findInMap(file_infos.context_by_name, file_name);
if (file_ctx.status == StorageFileLog::FileStatus::NO_CHANGE)
continue;
auto & file_meta = StorageFileLog::findInMap(file_infos.meta_by_inode, file_ctx.inode);
if (!file_ctx.reader)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Ifstream for file {} is not initialized", file_meta.file_name);
auto & reader = file_ctx.reader.value();
StorageFileLog::assertStreamGood(reader);
Record record;
while (read_records_size < need_records_size)
{
/// Need to get offset before reading record from stream
auto offset = reader.tellg();
if (static_cast<UInt64>(offset) >= file_meta.last_open_end)
break;
record.offset = offset;
StorageFileLog::assertStreamGood(reader);
record.file_name = file_name;
std::getline(reader, record.data);
StorageFileLog::assertStreamGood(reader);
new_records.emplace_back(record);
++read_records_size;
}
UInt64 current_position = reader.tellg();
StorageFileLog::assertStreamGood(reader);
file_meta.last_writen_position = current_position;
/// stream reach to end
if (current_position == file_meta.last_open_end)
{
file_ctx.status = StorageFileLog::FileStatus::NO_CHANGE;
}
/// All ifstream reach end
if (i == end - 1 && (file_ctx.status == StorageFileLog::FileStatus::NO_CHANGE))
{
stream_out = true;
}
if (read_records_size == need_records_size)
{
break;
}
}
}
ReadBufferPtr FileLogConsumer::getNextRecord()
{
if (!hasMorePolledRecords())
return nullptr;
auto buf = std::make_unique<ReadBufferFromString>(current->data);
++current;
return buf;
}
}

View File

@ -0,0 +1,82 @@
#pragma once
#include <Core/BackgroundSchedulePool.h>
#include <IO/ReadBuffer.h>
#include <Storages/FileLog/StorageFileLog.h>
#include <fstream>
#include <mutex>
namespace DB
{
class FileLogConsumer
{
public:
FileLogConsumer(
StorageFileLog & storage_,
size_t max_batch_size,
size_t poll_timeout_,
ContextPtr context_,
size_t stream_number_,
size_t max_streams_number_);
auto pollTimeout() const { return poll_timeout; }
bool hasMorePolledRecords() const { return current != records.end(); }
ReadBufferPtr consume();
bool noRecords() { return buffer_status == BufferStatus::NO_RECORD_RETURNED; }
auto getFileName() const { return current[-1].file_name; }
auto getOffset() const { return current[-1].offset; }
const String & getCurrentRecord() const { return current[-1].data; }
private:
enum class BufferStatus
{
INIT,
NO_RECORD_RETURNED,
POLLED_OK,
};
BufferStatus buffer_status = BufferStatus::INIT;
Poco::Logger * log;
StorageFileLog & storage;
bool stream_out = false;
size_t batch_size;
size_t poll_timeout;
ContextPtr context;
size_t stream_number;
size_t max_streams_number;
bool allowed = true;
using RecordData = std::string;
struct Record
{
RecordData data;
std::string file_name;
/// Offset is the start of a row, which is needed for virtual columns.
UInt64 offset;
};
using Records = std::vector<Record>;
Records records;
Records::const_iterator current;
using TaskThread = BackgroundSchedulePool::TaskHolder;
Records pollBatch(size_t batch_size_);
void readNewRecords(Records & new_records, size_t batch_size_);
ReadBufferPtr getNextRecord();
};
}

View File

@ -17,7 +17,8 @@ class ASTStorage;
M(UInt64, max_threads, 0, "Number of max threads to parse files, default is 0, which means the number will be max(1, physical_cpu_cores / 4)", 0) \ M(UInt64, max_threads, 0, "Number of max threads to parse files, default is 0, which means the number will be max(1, physical_cpu_cores / 4)", 0) \
M(Milliseconds, poll_directory_watch_events_backoff_init, 500, "The initial sleep value for watch directory thread.", 0) \ M(Milliseconds, poll_directory_watch_events_backoff_init, 500, "The initial sleep value for watch directory thread.", 0) \
M(Milliseconds, poll_directory_watch_events_backoff_max, 32000, "The max sleep value for watch directory thread.", 0) \ M(Milliseconds, poll_directory_watch_events_backoff_max, 32000, "The max sleep value for watch directory thread.", 0) \
M(UInt64, poll_directory_watch_events_backoff_factor, 2, "The speed of backoff, exponential by default", 0) M(UInt64, poll_directory_watch_events_backoff_factor, 2, "The speed of backoff, exponential by default", 0) \
M(StreamingHandleErrorMode, handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for FileLog engine. Possible values: default (throw an exception after nats_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \
#define LIST_OF_FILELOG_SETTINGS(M, ALIAS) \ #define LIST_OF_FILELOG_SETTINGS(M, ALIAS) \
FILELOG_RELATED_SETTINGS(M, ALIAS) \ FILELOG_RELATED_SETTINGS(M, ALIAS) \

View File

@ -1,8 +1,8 @@
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Processors/Executors/StreamingFormatExecutor.h> #include <Processors/Executors/StreamingFormatExecutor.h>
#include <Storages/FileLog/FileLogConsumer.h>
#include <Storages/FileLog/FileLogSource.h> #include <Storages/FileLog/FileLogSource.h>
#include <Storages/FileLog/ReadBufferFromFileLog.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
@ -18,7 +18,8 @@ FileLogSource::FileLogSource(
size_t max_block_size_, size_t max_block_size_,
size_t poll_time_out_, size_t poll_time_out_,
size_t stream_number_, size_t stream_number_,
size_t max_streams_number_) size_t max_streams_number_,
StreamingHandleErrorMode handle_error_mode_)
: ISource(storage_snapshot_->getSampleBlockForColumns(columns)) : ISource(storage_snapshot_->getSampleBlockForColumns(columns))
, storage(storage_) , storage(storage_)
, storage_snapshot(storage_snapshot_) , storage_snapshot(storage_snapshot_)
@ -28,10 +29,11 @@ FileLogSource::FileLogSource(
, poll_time_out(poll_time_out_) , poll_time_out(poll_time_out_)
, stream_number(stream_number_) , stream_number(stream_number_)
, max_streams_number(max_streams_number_) , max_streams_number(max_streams_number_)
, handle_error_mode(handle_error_mode_)
, non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized()) , non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized())
, virtual_header(storage_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames())) , virtual_header(storage_snapshot->getSampleBlockForColumns(storage.getVirtuals().getNames()))
{ {
buffer = std::make_unique<ReadBufferFromFileLog>(storage, max_block_size, poll_time_out, context, stream_number_, max_streams_number_); consumer = std::make_unique<FileLogConsumer>(storage, max_block_size, poll_time_out, context, stream_number_, max_streams_number_);
const auto & file_infos = storage.getFileInfos(); const auto & file_infos = storage.getFileInfos();
@ -67,7 +69,7 @@ Chunk FileLogSource::generate()
/// Store metas of last written chunk into disk /// Store metas of last written chunk into disk
storage.storeMetas(start, end); storage.storeMetas(start, end);
if (!buffer || buffer->noRecords()) if (!consumer || consumer->noRecords())
{ {
/// There is no onFinish for ISource, we call it /// There is no onFinish for ISource, we call it
/// when no records return to close files /// when no records return to close files
@ -77,29 +79,72 @@ Chunk FileLogSource::generate()
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
EmptyReadBuffer empty_buf;
auto input_format = FormatFactory::instance().getInput( auto input_format = FormatFactory::instance().getInput(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, std::nullopt, 1); storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
StreamingFormatExecutor executor(non_virtual_header, input_format);
std::optional<String> exception_message;
size_t total_rows = 0; size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// All data columns will get default value in case of error.
column->insertDefault();
}
return 1;
}
else
{
throw std::move(e);
}
};
StreamingFormatExecutor executor(non_virtual_header, input_format, on_error);
size_t failed_poll_attempts = 0; size_t failed_poll_attempts = 0;
Stopwatch watch; Stopwatch watch;
while (true) while (true)
{ {
exception_message.reset();
size_t new_rows = 0; size_t new_rows = 0;
if (buffer->poll()) if (auto buf = consumer->consume())
new_rows = executor.execute(); new_rows = executor.execute(*buf);
if (new_rows) if (new_rows)
{ {
auto file_name = buffer->getFileName(); auto file_name = consumer->getFileName();
auto offset = buffer->getOffset(); auto offset = consumer->getOffset();
for (size_t i = 0; i < new_rows; ++i) for (size_t i = 0; i < new_rows; ++i)
{ {
virtual_columns[0]->insert(file_name); virtual_columns[0]->insert(file_name);
virtual_columns[1]->insert(offset); virtual_columns[1]->insert(offset);
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
if (exception_message)
{
const auto & current_record = consumer->getCurrentRecord();
virtual_columns[2]->insertData(current_record.data(), current_record.size());
virtual_columns[3]->insertData(exception_message->data(), exception_message->size());
}
else
{
virtual_columns[2]->insertDefault();
virtual_columns[3]->insertDefault();
}
}
} }
total_rows = total_rows + new_rows; total_rows = total_rows + new_rows;
} }
@ -108,7 +153,7 @@ Chunk FileLogSource::generate()
++failed_poll_attempts; ++failed_poll_attempts;
} }
if (!buffer->hasMorePolledRecords() if (!consumer->hasMorePolledRecords()
&& ((total_rows >= max_block_size) || watch.elapsedMilliseconds() > poll_time_out && ((total_rows >= max_block_size) || watch.elapsedMilliseconds() > poll_time_out
|| failed_poll_attempts >= MAX_FAILED_POLL_ATTEMPTS)) || failed_poll_attempts >= MAX_FAILED_POLL_ATTEMPTS))
{ {

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <Processors/ISource.h> #include <Processors/ISource.h>
#include <Storages/FileLog/ReadBufferFromFileLog.h> #include <Storages/FileLog/FileLogConsumer.h>
#include <Storages/FileLog/StorageFileLog.h> #include <Storages/FileLog/StorageFileLog.h>
namespace Poco namespace Poco
@ -21,11 +21,12 @@ public:
size_t max_block_size_, size_t max_block_size_,
size_t poll_time_out_, size_t poll_time_out_,
size_t stream_number_, size_t stream_number_,
size_t max_streams_number_); size_t max_streams_number_,
StreamingHandleErrorMode handle_error_mode_);
String getName() const override { return "FileLog"; } String getName() const override { return "FileLog"; }
bool noRecords() { return !buffer || buffer->noRecords(); } bool noRecords() { return !consumer || consumer->noRecords(); }
void onFinish(); void onFinish();
@ -45,8 +46,9 @@ private:
size_t stream_number; size_t stream_number;
size_t max_streams_number; size_t max_streams_number;
StreamingHandleErrorMode handle_error_mode;
std::unique_ptr<ReadBufferFromFileLog> buffer; std::unique_ptr<FileLogConsumer> consumer;
Block non_virtual_header; Block non_virtual_header;
Block virtual_header; Block virtual_header;

View File

@ -166,6 +166,7 @@ bool ReadBufferFromFileLog::nextImpl()
current_file = current->file_name; current_file = current->file_name;
current_offset = current->offset; current_offset = current->offset;
current_record = current->data;
++current; ++current;

View File

@ -32,6 +32,7 @@ public:
auto getFileName() const { return current_file; } auto getFileName() const { return current_file; }
auto getOffset() const { return current_offset; } auto getOffset() const { return current_offset; }
const String & getCurrentRecord() const { return current_record; }
private: private:
enum class BufferStatus enum class BufferStatus
@ -74,6 +75,7 @@ private:
String current_file; String current_file;
UInt64 current_offset = 0; UInt64 current_offset = 0;
String current_record;
using TaskThread = BackgroundSchedulePool::TaskHolder; using TaskThread = BackgroundSchedulePool::TaskHolder;

View File

@ -1,6 +1,7 @@
#include <DataTypes/DataTypeLowCardinality.h> #include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <Disks/StoragePolicy.h> #include <Disks/StoragePolicy.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
@ -346,7 +347,8 @@ Pipe StorageFileLog::read(
getMaxBlockSize(), getMaxBlockSize(),
getPollTimeoutMillisecond(), getPollTimeoutMillisecond(),
stream_number, stream_number,
max_streams_number)); max_streams_number,
filelog_settings->handle_error_mode));
} }
return Pipe::unitePipes(std::move(pipes)); return Pipe::unitePipes(std::move(pipes));
@ -708,7 +710,8 @@ bool StorageFileLog::streamToViews()
getPollMaxBatchSize(), getPollMaxBatchSize(),
getPollTimeoutMillisecond(), getPollTimeoutMillisecond(),
stream_number, stream_number,
max_streams_number)); max_streams_number,
filelog_settings->handle_error_mode));
} }
auto input= Pipe::unitePipes(std::move(pipes)); auto input= Pipe::unitePipes(std::move(pipes));
@ -978,13 +981,17 @@ bool StorageFileLog::updateFileInfos()
NamesAndTypesList StorageFileLog::getVirtuals() const NamesAndTypesList StorageFileLog::getVirtuals() const
{ {
return NamesAndTypesList{ auto virtuals = NamesAndTypesList{
{"_filename", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}, {"_filename", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_offset", std::make_shared<DataTypeUInt64>()}}; {"_offset", std::make_shared<DataTypeUInt64>()}};
if (filelog_settings->handle_error_mode == StreamingHandleErrorMode::STREAM)
{
virtuals.push_back({"_raw_record", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())});
virtuals.push_back({"_error", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())});
} }
Names StorageFileLog::getVirtualColumnNames() return virtuals;
{
return {"_filename", "_offset"};
} }
} }

View File

@ -103,8 +103,6 @@ public:
NamesAndTypesList getVirtuals() const override; NamesAndTypesList getVirtuals() const override;
static Names getVirtualColumnNames();
static UInt64 getInode(const String & file_name); static UInt64 getInode(const String & file_name);
void openFilesAndSetPos(); void openFilesAndSetPos();

View File

@ -104,7 +104,7 @@ public:
auto currentPartition() const { return current[-1].get_partition(); } auto currentPartition() const { return current[-1].get_partition(); }
auto currentTimestamp() const { return current[-1].get_timestamp(); } auto currentTimestamp() const { return current[-1].get_timestamp(); }
const auto & currentHeaderList() const { return current[-1].get_header_list(); } const auto & currentHeaderList() const { return current[-1].get_header_list(); }
String currentPayload() const { return current[-1].get_payload(); } const cppkafka::Buffer & currentPayload() const { return current[-1].get_payload(); }
void setExceptionInfo(const cppkafka::Error & err, bool with_stacktrace = true); void setExceptionInfo(const cppkafka::Error & err, bool with_stacktrace = true);
void setExceptionInfo(const std::string & text, bool with_stacktrace = true); void setExceptionInfo(const std::string & text, bool with_stacktrace = true);
void setRDKafkaStat(const std::string & stat_json_string) void setRDKafkaStat(const std::string & stat_json_string)

View File

@ -15,7 +15,6 @@ class ASTStorage;
M(String, kafka_group_name, "", "Client group id string. All Kafka consumers sharing the same group.id belong to the same group.", 0) \ M(String, kafka_group_name, "", "Client group id string. All Kafka consumers sharing the same group.id belong to the same group.", 0) \
/* those are mapped to format factory settings */ \ /* those are mapped to format factory settings */ \
M(String, kafka_format, "", "The message format for Kafka engine.", 0) \ M(String, kafka_format, "", "The message format for Kafka engine.", 0) \
M(Char, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.", 0) \
M(String, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \ M(String, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \
M(UInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.", 0) \ M(UInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.", 0) \
/* default is = max_insert_block_size / kafka_num_consumers */ \ /* default is = max_insert_block_size / kafka_num_consumers */ \
@ -29,17 +28,21 @@ class ASTStorage;
/* default is stream_flush_interval_ms */ \ /* default is stream_flush_interval_ms */ \
M(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \ M(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \ M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \
M(HandleKafkaErrorMode, kafka_handle_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default, stream.", 0) \ M(StreamingHandleErrorMode, kafka_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default (throw an exception after rabbitmq_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \
M(Bool, kafka_commit_on_select, false, "Commit messages when select query is made", 0) \ M(Bool, kafka_commit_on_select, false, "Commit messages when select query is made", 0) \
M(UInt64, kafka_max_rows_per_message, 1, "The maximum number of rows produced in one kafka message for row-based formats.", 0) \ M(UInt64, kafka_max_rows_per_message, 1, "The maximum number of rows produced in one kafka message for row-based formats.", 0) \
#define OBSOLETE_KAFKA_SETTINGS(M, ALIAS) \
MAKE_OBSOLETE(M, Char, kafka_row_delimiter, '\0') \
/** TODO: */ /** TODO: */
/* https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md */ /* https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md */
/* https://github.com/edenhill/librdkafka/blob/v1.4.2/src/rdkafka_conf.c */ /* https://github.com/edenhill/librdkafka/blob/v1.4.2/src/rdkafka_conf.c */
#define LIST_OF_KAFKA_SETTINGS(M, ALIAS) \ #define LIST_OF_KAFKA_SETTINGS(M, ALIAS) \
KAFKA_RELATED_SETTINGS(M, ALIAS) \ KAFKA_RELATED_SETTINGS(M, ALIAS) \
LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) OBSOLETE_KAFKA_SETTINGS(M, ALIAS) \
LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) \
DECLARE_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS) DECLARE_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS)

View File

@ -46,7 +46,7 @@ KafkaSource::KafkaSource(
, commit_in_suffix(commit_in_suffix_) , commit_in_suffix(commit_in_suffix_)
, non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized()) , non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized())
, virtual_header(storage_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames())) , virtual_header(storage_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames()))
, handle_error_mode(storage.getHandleKafkaErrorMode()) , handle_error_mode(storage.getStreamingHandleErrorMode())
{ {
} }
@ -98,7 +98,7 @@ Chunk KafkaSource::generateImpl()
// otherwise external iteration will reuse that and logic will became even more fuzzy // otherwise external iteration will reuse that and logic will became even more fuzzy
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto put_error_to_stream = handle_error_mode == HandleKafkaErrorMode::STREAM; auto put_error_to_stream = handle_error_mode == StreamingHandleErrorMode::STREAM;
EmptyReadBuffer empty_buf; EmptyReadBuffer empty_buf;
auto input_format = FormatFactory::instance().getInput( auto input_format = FormatFactory::instance().getInput(
@ -207,9 +207,9 @@ Chunk KafkaSource::generateImpl()
{ {
if (exception_message) if (exception_message)
{ {
auto payload = consumer->currentPayload(); const auto & payload = consumer->currentPayload();
virtual_columns[8]->insert(payload); virtual_columns[8]->insertData(reinterpret_cast<const char *>(payload.get_data()), payload.get_size());
virtual_columns[9]->insert(*exception_message); virtual_columns[9]->insertData(exception_message->data(), exception_message->size());
} }
else else
{ {

View File

@ -51,7 +51,7 @@ private:
const Block non_virtual_header; const Block non_virtual_header;
const Block virtual_header; const Block virtual_header;
const HandleKafkaErrorMode handle_error_mode; const StreamingHandleErrorMode handle_error_mode;
Poco::Timespan max_execution_time = 0; Poco::Timespan max_execution_time = 0;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};

View File

@ -268,7 +268,7 @@ StorageKafka::StorageKafka(
, thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value) , thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value)
, collection_name(collection_name_) , collection_name(collection_name_)
{ {
if (kafka_settings->kafka_handle_error_mode == HandleKafkaErrorMode::STREAM) if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM)
{ {
kafka_settings->input_format_allow_errors_num = 0; kafka_settings->input_format_allow_errors_num = 0;
kafka_settings->input_format_allow_errors_ratio = 0; kafka_settings->input_format_allow_errors_ratio = 0;
@ -1056,7 +1056,7 @@ NamesAndTypesList StorageKafka::getVirtuals() const
{"_timestamp_ms", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime64>(3))}, {"_timestamp_ms", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime64>(3))},
{"_headers.name", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, {"_headers.name", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"_headers.value", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}}; {"_headers.value", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}};
if (kafka_settings->kafka_handle_error_mode == HandleKafkaErrorMode::STREAM) if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM)
{ {
result.push_back({"_raw_message", std::make_shared<DataTypeString>()}); result.push_back({"_raw_message", std::make_shared<DataTypeString>()});
result.push_back({"_error", std::make_shared<DataTypeString>()}); result.push_back({"_error", std::make_shared<DataTypeString>()});
@ -1076,7 +1076,7 @@ Names StorageKafka::getVirtualColumnNames() const
"_headers.name", "_headers.name",
"_headers.value", "_headers.value",
}; };
if (kafka_settings->kafka_handle_error_mode == HandleKafkaErrorMode::STREAM) if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM)
{ {
result.push_back({"_raw_message"}); result.push_back({"_raw_message"});
result.push_back({"_error"}); result.push_back({"_error"});

View File

@ -78,7 +78,7 @@ public:
NamesAndTypesList getVirtuals() const override; NamesAndTypesList getVirtuals() const override;
Names getVirtualColumnNames() const; Names getVirtualColumnNames() const;
HandleKafkaErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; } StreamingHandleErrorMode getStreamingHandleErrorMode() const { return kafka_settings->kafka_handle_error_mode; }
struct SafeConsumers struct SafeConsumers
{ {

View File

@ -45,6 +45,7 @@ public:
size_t queueSize() { return received.size(); } size_t queueSize() { return received.size(); }
auto getSubject() const { return current.subject; } auto getSubject() const { return current.subject; }
const String & getCurrentMessage() const { return current.message; }
/// Return read buffer containing next available message /// Return read buffer containing next available message
/// or nullptr if there are no messages to process. /// or nullptr if there are no messages to process.

View File

@ -2,6 +2,7 @@
#include <Core/BaseSettings.h> #include <Core/BaseSettings.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Core/SettingsEnums.h>
namespace DB namespace DB
{ {
@ -11,7 +12,6 @@ class ASTStorage;
M(String, nats_url, "", "A host-port to connect to NATS server.", 0) \ M(String, nats_url, "", "A host-port to connect to NATS server.", 0) \
M(String, nats_subjects, "", "List of subject for NATS table to subscribe/publish to.", 0) \ M(String, nats_subjects, "", "List of subject for NATS table to subscribe/publish to.", 0) \
M(String, nats_format, "", "The message format.", 0) \ M(String, nats_format, "", "The message format.", 0) \
M(Char, nats_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \
M(String, nats_schema, "", "Schema identifier (used by schema-based formats) for NATS engine", 0) \ M(String, nats_schema, "", "Schema identifier (used by schema-based formats) for NATS engine", 0) \
M(UInt64, nats_num_consumers, 1, "The number of consumer channels per table.", 0) \ M(UInt64, nats_num_consumers, 1, "The number of consumer channels per table.", 0) \
M(String, nats_queue_group, "", "Name for queue group of NATS subscribers.", 0) \ M(String, nats_queue_group, "", "Name for queue group of NATS subscribers.", 0) \
@ -27,10 +27,15 @@ class ASTStorage;
M(String, nats_token, "", "NATS token", 0) \ M(String, nats_token, "", "NATS token", 0) \
M(UInt64, nats_startup_connect_tries, 5, "Number of connect tries at startup", 0) \ M(UInt64, nats_startup_connect_tries, 5, "Number of connect tries at startup", 0) \
M(UInt64, nats_max_rows_per_message, 1, "The maximum number of rows produced in one message for row-based formats.", 0) \ M(UInt64, nats_max_rows_per_message, 1, "The maximum number of rows produced in one message for row-based formats.", 0) \
M(StreamingHandleErrorMode, nats_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for NATS engine. Possible values: default (throw an exception after nats_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \
#define OBSOLETE_NATS_SETTINGS(M, ALIAS) \
MAKE_OBSOLETE(M, Char, nats_row_delimiter, '\0') \
#define LIST_OF_NATS_SETTINGS(M, ALIAS) \ #define LIST_OF_NATS_SETTINGS(M, ALIAS) \
NATS_RELATED_SETTINGS(M, ALIAS) \ NATS_RELATED_SETTINGS(M, ALIAS) \
LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) OBSOLETE_NATS_SETTINGS(M, ALIAS) \
LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) \
DECLARE_SETTINGS_TRAITS(NATSSettingsTraits, LIST_OF_NATS_SETTINGS) DECLARE_SETTINGS_TRAITS(NATSSettingsTraits, LIST_OF_NATS_SETTINGS)

View File

@ -9,10 +9,10 @@
namespace DB namespace DB
{ {
static std::pair<Block, Block> getHeaders(const StorageSnapshotPtr & storage_snapshot) static std::pair<Block, Block> getHeaders(StorageNATS & storage, const StorageSnapshotPtr & storage_snapshot)
{ {
auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized(); auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized();
auto virtual_header = storage_snapshot->getSampleBlockForColumns({"_subject"}); auto virtual_header = storage_snapshot->getSampleBlockForColumns(storage.getVirtuals().getNames());
return {non_virtual_header, virtual_header}; return {non_virtual_header, virtual_header};
} }
@ -31,8 +31,9 @@ NATSSource::NATSSource(
const StorageSnapshotPtr & storage_snapshot_, const StorageSnapshotPtr & storage_snapshot_,
ContextPtr context_, ContextPtr context_,
const Names & columns, const Names & columns,
size_t max_block_size_) size_t max_block_size_,
: NATSSource(storage_, storage_snapshot_, getHeaders(storage_snapshot_), context_, columns, max_block_size_) StreamingHandleErrorMode handle_error_mode_)
: NATSSource(storage_, storage_snapshot_, getHeaders(storage_, storage_snapshot_), context_, columns, max_block_size_, handle_error_mode_)
{ {
} }
@ -42,13 +43,15 @@ NATSSource::NATSSource(
std::pair<Block, Block> headers, std::pair<Block, Block> headers,
ContextPtr context_, ContextPtr context_,
const Names & columns, const Names & columns,
size_t max_block_size_) size_t max_block_size_,
StreamingHandleErrorMode handle_error_mode_)
: ISource(getSampleBlock(headers.first, headers.second)) : ISource(getSampleBlock(headers.first, headers.second))
, storage(storage_) , storage(storage_)
, storage_snapshot(storage_snapshot_) , storage_snapshot(storage_snapshot_)
, context(context_) , context(context_)
, column_names(columns) , column_names(columns)
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, handle_error_mode(handle_error_mode_)
, non_virtual_header(std::move(headers.first)) , non_virtual_header(std::move(headers.first))
, virtual_header(std::move(headers.second)) , virtual_header(std::move(headers.second))
{ {
@ -97,16 +100,41 @@ Chunk NATSSource::generate()
EmptyReadBuffer empty_buf; EmptyReadBuffer empty_buf;
auto input_format = FormatFactory::instance().getInput( auto input_format = FormatFactory::instance().getInput(
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1); storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
std::optional<String> exception_message;
StreamingFormatExecutor executor(non_virtual_header, input_format);
size_t total_rows = 0; size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// All data columns will get default value in case of error.
column->insertDefault();
}
return 1;
}
else
{
throw std::move(e);
}
};
StreamingFormatExecutor executor(non_virtual_header, input_format, on_error);
while (true) while (true)
{ {
if (consumer->queueEmpty()) if (consumer->queueEmpty())
break; break;
exception_message.reset();
size_t new_rows = 0; size_t new_rows = 0;
if (auto buf = consumer->consume()) if (auto buf = consumer->consume())
new_rows = executor.execute(*buf); new_rows = executor.execute(*buf);
@ -115,6 +143,21 @@ Chunk NATSSource::generate()
{ {
auto subject = consumer->getSubject(); auto subject = consumer->getSubject();
virtual_columns[0]->insertMany(subject, new_rows); virtual_columns[0]->insertMany(subject, new_rows);
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
if (exception_message)
{
const auto & current_message = consumer->getCurrentMessage();
virtual_columns[1]->insertData(current_message.data(), current_message.size());
virtual_columns[2]->insertData(exception_message->data(), exception_message->size());
}
else
{
virtual_columns[1]->insertDefault();
virtual_columns[2]->insertDefault();
}
}
total_rows = total_rows + new_rows; total_rows = total_rows + new_rows;
} }

View File

@ -16,7 +16,8 @@ public:
const StorageSnapshotPtr & storage_snapshot_, const StorageSnapshotPtr & storage_snapshot_,
ContextPtr context_, ContextPtr context_,
const Names & columns, const Names & columns,
size_t max_block_size_); size_t max_block_size_,
StreamingHandleErrorMode handle_error_mode_);
~NATSSource() override; ~NATSSource() override;
@ -37,6 +38,7 @@ private:
ContextPtr context; ContextPtr context;
Names column_names; Names column_names;
const size_t max_block_size; const size_t max_block_size;
StreamingHandleErrorMode handle_error_mode;
bool is_finished = false; bool is_finished = false;
const Block non_virtual_header; const Block non_virtual_header;
@ -53,7 +55,8 @@ private:
std::pair<Block, Block> headers, std::pair<Block, Block> headers,
ContextPtr context_, ContextPtr context_,
const Names & columns, const Names & columns,
size_t max_block_size_); size_t max_block_size_,
StreamingHandleErrorMode handle_error_mode_);
}; };
} }

View File

@ -1,4 +1,5 @@
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h> #include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterSelectQuery.h> #include <Interpreters/InterpreterSelectQuery.h>
@ -156,7 +157,11 @@ ContextMutablePtr StorageNATS::addSettings(ContextPtr local_context) const
auto modified_context = Context::createCopy(local_context); auto modified_context = Context::createCopy(local_context);
modified_context->setSetting("input_format_skip_unknown_fields", true); modified_context->setSetting("input_format_skip_unknown_fields", true);
modified_context->setSetting("input_format_allow_errors_ratio", 0.); modified_context->setSetting("input_format_allow_errors_ratio", 0.);
if (nats_settings->nats_handle_error_mode == StreamingHandleErrorMode::DEFAULT)
modified_context->setSetting("input_format_allow_errors_num", nats_settings->nats_skip_broken_messages.value); modified_context->setSetting("input_format_allow_errors_num", nats_settings->nats_skip_broken_messages.value);
else
modified_context->setSetting("input_format_allow_errors_num", Field{0});
/// Since we are reusing the same context for all queries executed simultaneously, we don't want to used shared `analyze_count` /// Since we are reusing the same context for all queries executed simultaneously, we don't want to used shared `analyze_count`
modified_context->setSetting("max_analyze_depth", Field{0}); modified_context->setSetting("max_analyze_depth", Field{0});
@ -319,7 +324,7 @@ void StorageNATS::read(
for (size_t i = 0; i < num_created_consumers; ++i) for (size_t i = 0; i < num_created_consumers; ++i)
{ {
auto nats_source = std::make_shared<NATSSource>(*this, storage_snapshot, modified_context, column_names, 1); auto nats_source = std::make_shared<NATSSource>(*this, storage_snapshot, modified_context, column_names, 1, nats_settings->nats_handle_error_mode);
auto converting_dag = ActionsDAG::makeConvertingActions( auto converting_dag = ActionsDAG::makeConvertingActions(
nats_source->getPort().getHeader().getColumnsWithTypeAndName(), nats_source->getPort().getHeader().getColumnsWithTypeAndName(),
@ -642,7 +647,7 @@ bool StorageNATS::streamToViews()
for (size_t i = 0; i < num_created_consumers; ++i) for (size_t i = 0; i < num_created_consumers; ++i)
{ {
LOG_DEBUG(log, "Current queue size: {}", consumers[0]->queueSize()); LOG_DEBUG(log, "Current queue size: {}", consumers[0]->queueSize());
auto source = std::make_shared<NATSSource>(*this, storage_snapshot, nats_context, column_names, block_size); auto source = std::make_shared<NATSSource>(*this, storage_snapshot, nats_context, column_names, block_size, nats_settings->nats_handle_error_mode);
sources.emplace_back(source); sources.emplace_back(source);
pipes.emplace_back(source); pipes.emplace_back(source);
@ -743,9 +748,17 @@ void registerStorageNATS(StorageFactory & factory)
NamesAndTypesList StorageNATS::getVirtuals() const NamesAndTypesList StorageNATS::getVirtuals() const
{ {
return NamesAndTypesList{ auto virtuals = NamesAndTypesList{
{"_subject", std::make_shared<DataTypeString>()} {"_subject", std::make_shared<DataTypeString>()}
}; };
if (nats_settings->nats_handle_error_mode == StreamingHandleErrorMode::STREAM)
{
virtuals.push_back({"_raw_message", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())});
virtuals.push_back({"_error", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())});
}
return virtuals;
} }
} }

View File

@ -14,7 +14,6 @@ namespace DB
M(String, rabbitmq_format, "", "The message format.", 0) \ M(String, rabbitmq_format, "", "The message format.", 0) \
M(String, rabbitmq_exchange_type, "default", "The exchange type.", 0) \ M(String, rabbitmq_exchange_type, "default", "The exchange type.", 0) \
M(String, rabbitmq_routing_key_list, "5672", "A string of routing keys, separated by dots.", 0) \ M(String, rabbitmq_routing_key_list, "5672", "A string of routing keys, separated by dots.", 0) \
M(Char, rabbitmq_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \
M(String, rabbitmq_schema, "", "Schema identifier (used by schema-based formats) for RabbitMQ engine", 0) \ M(String, rabbitmq_schema, "", "Schema identifier (used by schema-based formats) for RabbitMQ engine", 0) \
M(UInt64, rabbitmq_num_consumers, 1, "The number of consumer channels per table.", 0) \ M(UInt64, rabbitmq_num_consumers, 1, "The number of consumer channels per table.", 0) \
M(UInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \ M(UInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \
@ -35,10 +34,16 @@ namespace DB
M(String, rabbitmq_password, "", "RabbitMQ password", 0) \ M(String, rabbitmq_password, "", "RabbitMQ password", 0) \
M(Bool, rabbitmq_commit_on_select, false, "Commit messages when select query is made", 0) \ M(Bool, rabbitmq_commit_on_select, false, "Commit messages when select query is made", 0) \
M(UInt64, rabbitmq_max_rows_per_message, 1, "The maximum number of rows produced in one message for row-based formats.", 0) \ M(UInt64, rabbitmq_max_rows_per_message, 1, "The maximum number of rows produced in one message for row-based formats.", 0) \
M(StreamingHandleErrorMode, rabbitmq_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for RabbitMQ engine. Possible values: default (throw an exception after rabbitmq_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \
#define OBSOLETE_RABBITMQ_SETTINGS(M, ALIAS) \
MAKE_OBSOLETE(M, Char, rabbitmq_row_delimiter, '\0') \
#define LIST_OF_RABBITMQ_SETTINGS(M, ALIAS) \ #define LIST_OF_RABBITMQ_SETTINGS(M, ALIAS) \
RABBITMQ_RELATED_SETTINGS(M, ALIAS) \ RABBITMQ_RELATED_SETTINGS(M, ALIAS) \
LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) OBSOLETE_RABBITMQ_SETTINGS(M, ALIAS) \
LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) \
DECLARE_SETTINGS_TRAITS(RabbitMQSettingsTraits, LIST_OF_RABBITMQ_SETTINGS) DECLARE_SETTINGS_TRAITS(RabbitMQSettingsTraits, LIST_OF_RABBITMQ_SETTINGS)

View File

@ -11,11 +11,10 @@
namespace DB namespace DB
{ {
static std::pair<Block, Block> getHeaders(const StorageSnapshotPtr & storage_snapshot) static std::pair<Block, Block> getHeaders(StorageRabbitMQ & storage_, const StorageSnapshotPtr & storage_snapshot)
{ {
auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized(); auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized();
auto virtual_header = storage_snapshot->getSampleBlockForColumns( auto virtual_header = storage_snapshot->getSampleBlockForColumns(storage_.getVirtuals().getNames());
{"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id", "_timestamp"});
return {non_virtual_header, virtual_header}; return {non_virtual_header, virtual_header};
} }
@ -36,15 +35,17 @@ RabbitMQSource::RabbitMQSource(
const Names & columns, const Names & columns,
size_t max_block_size_, size_t max_block_size_,
UInt64 max_execution_time_, UInt64 max_execution_time_,
StreamingHandleErrorMode handle_error_mode_,
bool ack_in_suffix_) bool ack_in_suffix_)
: RabbitMQSource( : RabbitMQSource(
storage_, storage_,
storage_snapshot_, storage_snapshot_,
getHeaders(storage_snapshot_), getHeaders(storage_, storage_snapshot_),
context_, context_,
columns, columns,
max_block_size_, max_block_size_,
max_execution_time_, max_execution_time_,
handle_error_mode_,
ack_in_suffix_) ack_in_suffix_)
{ {
} }
@ -57,6 +58,7 @@ RabbitMQSource::RabbitMQSource(
const Names & columns, const Names & columns,
size_t max_block_size_, size_t max_block_size_,
UInt64 max_execution_time_, UInt64 max_execution_time_,
StreamingHandleErrorMode handle_error_mode_,
bool ack_in_suffix_) bool ack_in_suffix_)
: ISource(getSampleBlock(headers.first, headers.second)) : ISource(getSampleBlock(headers.first, headers.second))
, storage(storage_) , storage(storage_)
@ -64,6 +66,7 @@ RabbitMQSource::RabbitMQSource(
, context(context_) , context(context_)
, column_names(columns) , column_names(columns)
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, handle_error_mode(handle_error_mode_)
, ack_in_suffix(ack_in_suffix_) , ack_in_suffix(ack_in_suffix_)
, non_virtual_header(std::move(headers.first)) , non_virtual_header(std::move(headers.first))
, virtual_header(std::move(headers.second)) , virtual_header(std::move(headers.second))
@ -131,12 +134,40 @@ Chunk RabbitMQSource::generateImpl()
auto input_format = FormatFactory::instance().getInput( auto input_format = FormatFactory::instance().getInput(
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1); storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
StreamingFormatExecutor executor(non_virtual_header, input_format); std::optional<String> exception_message;
size_t total_rows = 0; size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// All data columns will get default value in case of error.
column->insertDefault();
}
return 1;
}
else
{
throw std::move(e);
}
};
StreamingFormatExecutor executor(non_virtual_header, input_format, on_error);
RabbitMQConsumer::CommitInfo current_commit_info; RabbitMQConsumer::CommitInfo current_commit_info;
while (true) while (true)
{ {
exception_message.reset();
size_t new_rows = 0; size_t new_rows = 0;
if (consumer->hasPendingMessages()) if (consumer->hasPendingMessages())
@ -158,6 +189,19 @@ Chunk RabbitMQSource::generateImpl()
virtual_columns[3]->insert(message.redelivered); virtual_columns[3]->insert(message.redelivered);
virtual_columns[4]->insert(message.message_id); virtual_columns[4]->insert(message.message_id);
virtual_columns[5]->insert(message.timestamp); virtual_columns[5]->insert(message.timestamp);
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
if (exception_message)
{
virtual_columns[6]->insertData(message.message.data(), message.message.size());
virtual_columns[7]->insertData(exception_message->data(), exception_message->size());
}
else
{
virtual_columns[6]->insertDefault();
virtual_columns[7]->insertDefault();
}
}
} }
total_rows += new_rows; total_rows += new_rows;

View File

@ -19,6 +19,7 @@ public:
const Names & columns, const Names & columns,
size_t max_block_size_, size_t max_block_size_,
UInt64 max_execution_time_, UInt64 max_execution_time_,
StreamingHandleErrorMode handle_error_mode_,
bool ack_in_suffix = false); bool ack_in_suffix = false);
~RabbitMQSource() override; ~RabbitMQSource() override;
@ -39,6 +40,7 @@ private:
ContextPtr context; ContextPtr context;
Names column_names; Names column_names;
const size_t max_block_size; const size_t max_block_size;
StreamingHandleErrorMode handle_error_mode;
bool ack_in_suffix; bool ack_in_suffix;
bool is_finished = false; bool is_finished = false;
@ -61,6 +63,7 @@ private:
const Names & columns, const Names & columns,
size_t max_block_size_, size_t max_block_size_,
UInt64 max_execution_time_, UInt64 max_execution_time_,
StreamingHandleErrorMode handle_error_mode_,
bool ack_in_suffix); bool ack_in_suffix);
Chunk generateImpl(); Chunk generateImpl();

View File

@ -1,6 +1,7 @@
#include <amqpcpp.h> #include <amqpcpp.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h> #include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterSelectQuery.h> #include <Interpreters/InterpreterSelectQuery.h>
@ -240,7 +241,11 @@ ContextMutablePtr StorageRabbitMQ::addSettings(ContextPtr local_context) const
auto modified_context = Context::createCopy(local_context); auto modified_context = Context::createCopy(local_context);
modified_context->setSetting("input_format_skip_unknown_fields", true); modified_context->setSetting("input_format_skip_unknown_fields", true);
modified_context->setSetting("input_format_allow_errors_ratio", 0.); modified_context->setSetting("input_format_allow_errors_ratio", 0.);
if (rabbitmq_settings->rabbitmq_handle_error_mode == StreamingHandleErrorMode::DEFAULT)
modified_context->setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value); modified_context->setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
else
modified_context->setSetting("input_format_allow_errors_num", Field(0));
/// Since we are reusing the same context for all queries executed simultaneously, we don't want to used shared `analyze_count` /// Since we are reusing the same context for all queries executed simultaneously, we don't want to used shared `analyze_count`
modified_context->setSetting("max_analyze_depth", Field{0}); modified_context->setSetting("max_analyze_depth", Field{0});
@ -730,7 +735,7 @@ void StorageRabbitMQ::read(
{ {
auto rabbit_source = std::make_shared<RabbitMQSource>( auto rabbit_source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, modified_context, column_names, 1, *this, storage_snapshot, modified_context, column_names, 1,
max_execution_time_ms, rabbitmq_settings->rabbitmq_commit_on_select); max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, rabbitmq_settings->rabbitmq_commit_on_select);
auto converting_dag = ActionsDAG::makeConvertingActions( auto converting_dag = ActionsDAG::makeConvertingActions(
rabbit_source->getPort().getHeader().getColumnsWithTypeAndName(), rabbit_source->getPort().getHeader().getColumnsWithTypeAndName(),
@ -1076,7 +1081,7 @@ bool StorageRabbitMQ::tryStreamToViews()
for (size_t i = 0; i < num_created_consumers; ++i) for (size_t i = 0; i < num_created_consumers; ++i)
{ {
auto source = std::make_shared<RabbitMQSource>( auto source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, rabbitmq_context, column_names, block_size, max_execution_time_ms, false); *this, storage_snapshot, rabbitmq_context, column_names, block_size, max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, false);
sources.emplace_back(source); sources.emplace_back(source);
pipes.emplace_back(source); pipes.emplace_back(source);
@ -1212,7 +1217,7 @@ void registerStorageRabbitMQ(StorageFactory & factory)
NamesAndTypesList StorageRabbitMQ::getVirtuals() const NamesAndTypesList StorageRabbitMQ::getVirtuals() const
{ {
return NamesAndTypesList{ auto virtuals = NamesAndTypesList{
{"_exchange_name", std::make_shared<DataTypeString>()}, {"_exchange_name", std::make_shared<DataTypeString>()},
{"_channel_id", std::make_shared<DataTypeString>()}, {"_channel_id", std::make_shared<DataTypeString>()},
{"_delivery_tag", std::make_shared<DataTypeUInt64>()}, {"_delivery_tag", std::make_shared<DataTypeUInt64>()},
@ -1220,6 +1225,14 @@ NamesAndTypesList StorageRabbitMQ::getVirtuals() const
{"_message_id", std::make_shared<DataTypeString>()}, {"_message_id", std::make_shared<DataTypeString>()},
{"_timestamp", std::make_shared<DataTypeUInt64>()} {"_timestamp", std::make_shared<DataTypeUInt64>()}
}; };
if (rabbitmq_settings->rabbitmq_handle_error_mode == StreamingHandleErrorMode::STREAM)
{
virtuals.push_back({"_raw_message", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())});
virtuals.push_back({"_error", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())});
}
return virtuals;
} }
} }

View File

@ -3416,3 +3416,100 @@ def test_rabbitmq_flush_by_time(rabbitmq_cluster):
) )
assert int(result) == 3 assert int(result) == 3
def test_rabbitmq_handle_error_mode_stream(rabbitmq_cluster):
instance.query(
"""
DROP TABLE IF EXISTS test.rabbitmq;
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.data;
DROP TABLE IF EXISTS test.errors;
DROP TABLE IF EXISTS test.errors_view;
CREATE TABLE test.rabbit (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{}:5672',
rabbitmq_exchange_name = 'select',
rabbitmq_commit_on_select = 1,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n',
rabbitmq_handle_error_mode = 'stream';
CREATE TABLE test.errors (error Nullable(String), broken_message Nullable(String))
ENGINE = MergeTree()
ORDER BY tuple();
CREATE MATERIALIZED VIEW test.errors_view TO test.errors AS
SELECT _error as error, _raw_message as broken_message FROM test.rabbit where not isNull(_error);
CREATE TABLE test.data (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.view TO test.data AS
SELECT key, value FROM test.rabbit;
""".format(
rabbitmq_cluster.rabbitmq_host
)
)
credentials = pika.PlainCredentials("root", "clickhouse")
parameters = pika.ConnectionParameters(
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
messages = []
num_rows = 50
for i in range(num_rows):
if i % 2 == 0:
messages.append(json.dumps({"key": i, "value": i}))
else:
messages.append("Broken message " + str(i))
for message in messages:
channel.basic_publish(exchange="select", routing_key="", body=message)
connection.close()
# The order of messages in select * from test.rabbitmq is not guaranteed, so sleep to collect everything in one select
time.sleep(1)
attempt = 0
rows = 0
while attempt < 100:
rows = int(instance.query("SELECT count() FROM test.data"))
if rows == num_rows:
break
attempt += 1
assert rows == num_rows
result = instance.query("SELECT * FROM test.data ORDER by key")
expected = "0\t0\n" * (num_rows // 2)
for i in range(num_rows):
if i % 2 == 0:
expected += str(i) + "\t" + str(i) + "\n"
assert result == expected
attempt = 0
errors_count = 0
while attempt < 100:
errors_count = int(instance.query("SELECT count() FROM test.errors"))
if errors_count == num_rows:
break
attempt += 1
assert errors_count == num_rows / 2
broken_messages = instance.query("SELECT broken_message FROM test.errors order by broken_message")
expected = []
for i in range(num_rows):
if i % 2 != 0:
expected.append("Broken message " + str(i) + "\n")
expected = "".join(sorted(expected))
assert broken_messages == expected

View File

@ -0,0 +1,20 @@
Cannot parse input: expected \'{\' before: \'Error 0\' Error 0 a.jsonl
Cannot parse input: expected \'{\' before: \'Error 1\' Error 1 a.jsonl
Cannot parse input: expected \'{\' before: \'Error 2\' Error 2 a.jsonl
Cannot parse input: expected \'{\' before: \'Error 3\' Error 3 a.jsonl
Cannot parse input: expected \'{\' before: \'Error 4\' Error 4 a.jsonl
Cannot parse input: expected \'{\' before: \'Error 5\' Error 5 a.jsonl
Cannot parse input: expected \'{\' before: \'Error 6\' Error 6 a.jsonl
Cannot parse input: expected \'{\' before: \'Error 7\' Error 7 a.jsonl
Cannot parse input: expected \'{\' before: \'Error 8\' Error 8 a.jsonl
Cannot parse input: expected \'{\' before: \'Error 9\' Error 9 a.jsonl
Cannot parse input: expected \'{\' before: \'Error 10\' Error 10 b.jsonl
Cannot parse input: expected \'{\' before: \'Error 11\' Error 11 b.jsonl
Cannot parse input: expected \'{\' before: \'Error 12\' Error 12 b.jsonl
Cannot parse input: expected \'{\' before: \'Error 13\' Error 13 b.jsonl
Cannot parse input: expected \'{\' before: \'Error 14\' Error 14 b.jsonl
Cannot parse input: expected \'{\' before: \'Error 15\' Error 15 b.jsonl
Cannot parse input: expected \'{\' before: \'Error 16\' Error 16 b.jsonl
Cannot parse input: expected \'{\' before: \'Error 17\' Error 17 b.jsonl
Cannot parse input: expected \'{\' before: \'Error 18\' Error 18 b.jsonl
Cannot parse input: expected \'{\' before: \'Error 19\' Error 19 b.jsonl

View File

@ -0,0 +1,46 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
user_files_path=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
${CLICKHOUSE_CLIENT} --query "drop table if exists file_log;"
${CLICKHOUSE_CLIENT} --query "drop table if exists log_errors;"
mkdir -p ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/
rm -rf ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME:?}/*
for i in {0..9}
do
echo "{\"key\" : $i, \"value\" : $i}" >> ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/a.jsonl
echo "Error $i" >> ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/a.jsonl
done
for i in {10..19}
do
echo "{\"key\" : $i, \"value\" : $i}" >> ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/b.jsonl
echo "Error $i" >> ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/b.jsonl
done
${CLICKHOUSE_CLIENT} --query "create table file_log(key UInt8, value UInt8) engine=FileLog('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/', 'JSONEachRow') settings handle_error_mode='stream';"
${CLICKHOUSE_CLIENT} --query "create Materialized View log_errors engine=MergeTree order by tuple() as select _error as error, _raw_record as record, _filename as file from file_log where not isNull(_error);"
function count()
{
COUNT=$(${CLICKHOUSE_CLIENT} --query "select count() from log_errors;")
echo $COUNT
}
while true; do
[[ $(count) == 20 ]] && break
sleep 1
done
${CLICKHOUSE_CLIENT} --query "select * from log_errors order by file, record;"
${CLICKHOUSE_CLIENT} --query "drop table file_log;"
${CLICKHOUSE_CLIENT} --query "drop table log_errors;"
rm -rf ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME:?}