More checks [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-12-21 19:24:47 +03:00
parent 661a117b91
commit 2c6e49c3f3

View File

@ -63,10 +63,10 @@ static const String CONFIG_PREFIX = "kafka";
class ReadBufferFromKafkaConsumer : public ReadBuffer
{
rd_kafka_t * consumer;
rd_kafka_message_t * current;
bool current_pending;
rd_kafka_message_t * current = nullptr;
bool current_pending = false; /// We've fetched "current" message and need to process it on the next iteration.
Poco::Logger * log;
size_t read_messages;
size_t read_messages = 0;
char row_delimiter;
bool nextImpl() override
@ -97,6 +97,10 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
rd_kafka_message_destroy(msg);
return nextImpl();
}
if (msg->len && !msg->payload)
throw Exception("Logical error: nullptr message returned with non-zero length", ErrorCodes::LOGICAL_ERROR);
++read_messages;
// Now we've received a new message. Check if we need to produce a delimiter
@ -129,8 +133,7 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
public:
ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, Poco::Logger * log_, char row_delimiter_)
: ReadBuffer(nullptr, 0), consumer(consumer_), current(nullptr),
current_pending(false), log(log_), read_messages(0), row_delimiter(row_delimiter_)
: ReadBuffer(nullptr, 0), consumer(consumer_), log(log_), row_delimiter(row_delimiter_)
{
if (row_delimiter != '\0')
LOG_TRACE(log, "Row delimiter is: " << row_delimiter);
@ -156,9 +159,8 @@ public:
class KafkaBlockInputStream : public IProfilingBlockInputStream
{
public:
KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_)
: storage(storage_), consumer(nullptr), context(context_), max_block_size(max_block_size_)
: storage(storage_), context(context_), max_block_size(max_block_size_)
{
// Always skip unknown fields regardless of the context (JSON or TSKV)
context.setSetting("input_format_skip_unknown_fields", 1u);