mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Add logging about pushing to underlying in StorageKafka
This commit is contained in:
parent
6b9534cad8
commit
4b69684cd2
@ -28,6 +28,7 @@
|
|||||||
#include <Poco/Util/AbstractConfiguration.h>
|
#include <Poco/Util/AbstractConfiguration.h>
|
||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
#include <Common/Macros.h>
|
#include <Common/Macros.h>
|
||||||
|
#include <Common/formatReadable.h>
|
||||||
#include <Common/config_version.h>
|
#include <Common/config_version.h>
|
||||||
#include <Common/setThreadName.h>
|
#include <Common/setThreadName.h>
|
||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
@ -585,6 +586,8 @@ void StorageKafka::threadFunc(size_t idx)
|
|||||||
|
|
||||||
bool StorageKafka::streamToViews()
|
bool StorageKafka::streamToViews()
|
||||||
{
|
{
|
||||||
|
Stopwatch watch;
|
||||||
|
|
||||||
auto table_id = getStorageID();
|
auto table_id = getStorageID();
|
||||||
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
|
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
|
||||||
if (!table)
|
if (!table)
|
||||||
@ -637,7 +640,11 @@ bool StorageKafka::streamToViews()
|
|||||||
// We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff.
|
// We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff.
|
||||||
// It will be cancelled on underlying layer (kafka buffer)
|
// It will be cancelled on underlying layer (kafka buffer)
|
||||||
std::atomic<bool> stub = {false};
|
std::atomic<bool> stub = {false};
|
||||||
copyData(*in, *block_io.out, &stub);
|
size_t rows = 0;
|
||||||
|
copyData(*in, *block_io.out, [&rows](const Block & block)
|
||||||
|
{
|
||||||
|
rows += block.rows();
|
||||||
|
}, &stub);
|
||||||
|
|
||||||
bool some_stream_is_stalled = false;
|
bool some_stream_is_stalled = false;
|
||||||
for (auto & stream : streams)
|
for (auto & stream : streams)
|
||||||
@ -646,6 +653,10 @@ bool StorageKafka::streamToViews()
|
|||||||
stream->as<KafkaBlockInputStream>()->commit();
|
stream->as<KafkaBlockInputStream>()->commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
UInt64 milliseconds = watch.elapsedMilliseconds();
|
||||||
|
LOG_DEBUG(log, "Pushing {} rows to {} took {} ms.",
|
||||||
|
formatReadableQuantity(rows), table_id.getNameForLogs(), milliseconds);
|
||||||
|
|
||||||
return some_stream_is_stalled;
|
return some_stream_is_stalled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user