Try fix kafka test.

This commit is contained in:
Nikolai Kochetov 2021-10-05 20:42:29 +03:00
parent fc12e2677b
commit 2107e54969
3 changed files with 22 additions and 3 deletions

View File

@ -52,7 +52,7 @@ KafkaSource::~KafkaSource()
storage.pushReadBuffer(buffer);
}
Chunk KafkaSource::generateImpl()
void KafkaSource::createBuffer()
{
if (!buffer)
{
@ -60,12 +60,18 @@ Chunk KafkaSource::generateImpl()
buffer = storage.popReadBuffer(timeout);
if (!buffer)
return {};
return;
buffer->subscribe();
broken = true;
}
}
Chunk KafkaSource::generateImpl()
{
if (!buffer)
createBuffer();
if (!buffer || is_finished)
return {};
@ -260,4 +266,15 @@ void KafkaSource::commit()
broken = false;
}
bool KafkaSource::isStalled() const
{
// if (!buffer)
// LOG_WARNING(log, "isStalled no buffer");
// if (buffer->isStalled())
// LOG_WARNING(log, "isStalled buffer is stalled");
// else
// LOG_WARNING(log, "isStalled ok");
return !buffer || buffer->isStalled();
}
}

View File

@ -30,8 +30,9 @@ public:
Chunk generate() override;
void createBuffer();
void commit();
bool isStalled() const { return !buffer || buffer->isStalled(); }
bool isStalled() const;
private:
StorageKafka & storage;

View File

@ -630,6 +630,7 @@ bool StorageKafka::streamToViews()
limits.timeout_overflow_mode = OverflowMode::BREAK;
source->setLimits(limits);
source->createBuffer();
}
auto pipe = Pipe::unitePipes(std::move(pipes));