StorageKafka: fixed messages not showing when EOF is reached in current batch

This prevents messages from being read if there's less than 16 messages to read in Kafka.
Updated documentation with more detailed description.
This commit is contained in:
Marek Vavruša 2017-11-13 17:31:45 -08:00 committed by alexey-milovidov
parent 64a892c0e6
commit c07b48075b
2 changed files with 72 additions and 13 deletions

View File

@ -49,11 +49,16 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
Messages::iterator current;
Messages::iterator end;
Poco::Logger * log;
bool eof = false;
bool nextImpl() override
{
if (current == end)
{
// EOF reached in the previous batch, bail
if (eof)
return false;
// Fetch next batch of messages
bool res = fetchMessages();
if (!res)
@ -64,7 +69,10 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
// No error, but no messages read
if (current == end)
{
LOG_DEBUG(log, "No messages consumed.");
return false;
}
}
// Process next buffered message
@ -73,6 +81,14 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
{
if (msg->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
LOG_ERROR(log, "Consumer error: " << rd_kafka_err2str(msg->err) << " " << rd_kafka_message_errstr(msg));
else
{
// Reach EOF while reading current batch, skip it
eof = true;
if (current != end)
return nextImpl();
}
return false;
}

View File

@ -12,33 +12,76 @@ A table engine backed by Apache Kafka, a streaming platform having three key cap
Kafka(broker_list, topic_list, group_name, format[, schema])
Engine parameters:
broker_list - A comma-separated list of brokers (``localhost:9092``).
topic_list - List of Kafka topics to consume (``my_topic``).
group_name - Kafka consumer group name (``group1``). Read offsets are tracked for each consumer group, if you want to consume messages exactly once across cluster, you should use the same group name.
format - Name of the format used to deserialize messages. It accepts the same values as the ``FORMAT`` SQL statement, for example ``JSONEachRow``.
schema - Optional schema value for formats that require a schema to interpret consumed messages, for example Cap'n Proto format requires a path to schema file and root object - ``schema.capnp:Message``. Self-describing formats such as JSON don't require any schema.
broker_list
A comma-separated list of brokers (``localhost:9092``).
topic_list
List of Kafka topics to consume (``my_topic``).
group_name
Kafka consumer group name (``group1``). Read offsets are tracked for each consumer group, if you want to consume messages exactly once across cluster, you should use the same group name.
format
Name of the format used to deserialize messages. It accepts the same values as the ``FORMAT`` SQL statement, for example ``JSONEachRow``.
schema
Optional schema value for formats that require a schema to interpret consumed messages, for example Cap'n Proto format requires
a path to schema file and root object - ``schema:Message``. Self-describing formats such as JSON don't require any schema.
Example:
.. code-block:: sql
CREATE TABLE queue (timestamp UInt64, level String, message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
The consumed messages are tracked automatically in the background, so each message will be read exactly once in a single consumer group. If you want to consume the same set of messages twice, you can create a copy of the table with a different ``group_name``. The consumer group is elastic and synchronised across the cluster, for example if you have 10 topic/partitions and 5 instances of the table across cluster, it will automatically assign 2 topic/partitions per instace. If you detach a table, or add new instances, it will rebalance topic/partition allocations automatically. See http://kafka.apache.org/intro for more information about how this works.
SELECT * FROM queue LIMIT 5;
Reading messages directly however is not very useful, the table engine is typically used to build real-time ingestion pipelines using MATERIALIZED VIEW. If a MATERIALIZED VIEW is attached to a Kafka table engine, it will start consuming messages in the background, and push them into the attached views. This allows you to continuously ingest messages from Kafka and transform them using the SELECT statement into appropriate format.
The consumed messages are tracked automatically in the background, so each message will be read exactly once in a single consumer group. If you want to consume the same set of messages twice, you can create a copy of the table with a different ``group_name``. The consumer group is elastic and synchronised across the cluster. For example, if you have 10 topic/partitions and 5 instances of the table across cluster, it will automatically assign 2 topic/partitions per instace. If you detach a Kafka engine table (or create new), it will rebalance topic/partition assignments automatically. See `Kafka Introduction <https://kafka.apache.org/intro>`_ for more information about how this works.
Reading messages using SELECT is not very useful (except for troubleshooting), because each message can be read only once.
The table engine is typically used to build real-time ingestion pipelines using MATERIALIZED VIEW. It works like this:
1. You create a Kafka consumer with a Kafka engine. This is the data stream.
2. You create an arbitrary table with the desired data schema.
3. You create a MATERIALIZED VIEW that transforms the data from Kafka, and materializes it into your desired table.
When a MATERIALIZED VIEW is attached to a Kafka table engine, it will start automatically consuming messages in the background, and push them into the attached views. This allows you to continuously ingest messages from Kafka and transform them using the SELECT statement to describe transformation.
Example:
.. code-block:: sql
CREATE TABLE queue (timestamp UInt64, level String, message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
CREATE MATERIALIZED VIEW daily ENGINE = SummingMergeTree(day, (day, level), 8192) AS
SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
CREATE TABLE daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
SELECT level, sum(total) FROM daily GROUP BY level;
The messages are streamed into the attached view immediately in the same way a continuous stream of INSERT statement would. To improve performance, consumed messages are squashed into batches of ``max_insert_block_size``. If the message batch cannot be completed within ``stream_flush_interval_ms`` period (by default 7500ms), it will be flushed to ensure time bounded insertion time.
In order to stop topic consumption, or alter the transformation logc, you simply detach the MATERIALIZED VIEW:
.. code-block:: sql
DETACH TABLE consumer;
ATTACH MATERIALIZED VIEW consumer;
Note: When you're performing ALTERs on target table, it's recommended to detach materializing views to prevent a mismatch between the current schema and the result of MATERIALIZED VIEWS.