ClickHouse/src/Storages/Kafka/KafkaSource.h

57 lines
1.2 KiB
C++
Raw Normal View History

#pragma once
#include <Processors/Sources/SourceWithProgress.h>
#include <Storages/Kafka/StorageKafka.h>
2019-08-28 13:21:19 +00:00
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
namespace Poco
{
class Logger;
}
namespace DB
{
class KafkaSource : public SourceWithProgress
{
public:
KafkaSource(
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
2021-04-20 12:57:23 +00:00
const ContextPtr & context_,
const Names & columns,
Poco::Logger * log_,
size_t max_block_size_,
bool commit_in_suffix = true);
~KafkaSource() override;
String getName() const override { return storage.getName(); }
Chunk generate() override;
void commit();
bool isStalled() const { return !buffer || buffer->isStalled(); }
private:
StorageKafka & storage;
StorageMetadataPtr metadata_snapshot;
ContextPtr context;
2019-05-22 19:38:43 +00:00
Names column_names;
Poco::Logger * log;
2019-02-11 11:54:30 +00:00
UInt64 max_block_size;
ConsumerBufferPtr buffer;
bool broken = true;
bool is_finished = false;
bool commit_in_suffix;
const Block non_virtual_header;
const Block virtual_header;
2021-03-18 05:26:32 +00:00
const HandleKafkaErrorMode handle_error_mode;
Chunk generateImpl();
};
}