Check the time limit every (flush_interval / poll_timeout) number of rows from Kafka (#5249)

This commit is contained in:
Ivan 2019-05-14 18:52:03 +03:00 committed by GitHub
parent aba710a70e
commit 4a5832b18a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 93 additions and 38 deletions

View File

@ -206,6 +206,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.") \ M(SettingUInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.") \
M(SettingInt64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. Negative value means infinite.") \ M(SettingInt64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. Negative value means infinite.") \
M(SettingMilliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.") \ M(SettingMilliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.") \
M(SettingMilliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from streaming storages.") \
M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \ M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \
M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.") \ M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.") \
M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \ M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \

View File

@ -269,6 +269,11 @@ protected:
children.push_back(child); children.push_back(child);
} }
/** Check limits.
* But only those that can be checked within each separate stream.
*/
bool checkTimeLimit();
private: private:
bool enabled_extremes = false; bool enabled_extremes = false;
@ -296,10 +301,9 @@ private:
void updateExtremes(Block & block); void updateExtremes(Block & block);
/** Check limits and quotas. /** Check quotas.
* But only those that can be checked within each separate stream. * But only those that can be checked within each separate stream.
*/ */
bool checkTimeLimit();
void checkQuota(Block & block); void checkQuota(Block & block);
size_t checkDepthImpl(size_t max_depth, size_t level) const; size_t checkDepthImpl(size_t max_depth, size_t level) const;

View File

@ -64,23 +64,25 @@ void registerInputFormatRowBinary(FormatFactory & factory)
const Block & sample, const Block & sample,
const Context &, const Context &,
UInt64 max_block_size, UInt64 max_block_size,
UInt64 rows_portion_size,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<BlockInputStreamFromRowInputStream>( return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<BinaryRowInputStream>(buf, sample, false, false), std::make_shared<BinaryRowInputStream>(buf, sample, false, false),
sample, max_block_size, settings); sample, max_block_size, rows_portion_size, settings);
}); });
factory.registerInputFormat("RowBinaryWithNamesAndTypes", []( factory.registerInputFormat("RowBinaryWithNamesAndTypes", [](
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
size_t max_block_size, UInt64 max_block_size,
UInt64 rows_portion_size,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<BlockInputStreamFromRowInputStream>( return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<BinaryRowInputStream>(buf, sample, true, true), std::make_shared<BinaryRowInputStream>(buf, sample, true, true),
sample, max_block_size, settings); sample, max_block_size, rows_portion_size, settings);
}); });
} }

View File

@ -27,8 +27,9 @@ BlockInputStreamFromRowInputStream::BlockInputStreamFromRowInputStream(
const RowInputStreamPtr & row_input_, const RowInputStreamPtr & row_input_,
const Block & sample_, const Block & sample_,
UInt64 max_block_size_, UInt64 max_block_size_,
UInt64 rows_portion_size_,
const FormatSettings & settings) const FormatSettings & settings)
: row_input(row_input_), sample(sample_), max_block_size(max_block_size_), : row_input(row_input_), sample(sample_), max_block_size(max_block_size_), rows_portion_size(rows_portion_size_),
allow_errors_num(settings.input_allow_errors_num), allow_errors_ratio(settings.input_allow_errors_ratio) allow_errors_num(settings.input_allow_errors_num), allow_errors_ratio(settings.input_allow_errors_ratio)
{ {
} }
@ -57,8 +58,15 @@ Block BlockInputStreamFromRowInputStream::readImpl()
try try
{ {
for (size_t rows = 0; rows < max_block_size; ++rows) for (size_t rows = 0, batch = 0; rows < max_block_size; ++rows, ++batch)
{ {
if (rows_portion_size && batch == rows_portion_size)
{
batch = 0;
if (!checkTimeLimit())
break;
}
try try
{ {
++total_rows; ++total_rows;

View File

@ -17,11 +17,13 @@ namespace DB
class BlockInputStreamFromRowInputStream : public IBlockInputStream class BlockInputStreamFromRowInputStream : public IBlockInputStream
{ {
public: public:
/** sample_ - block with zero rows, that structure describes how to interpret values */ /// |sample| is a block with zero rows, that structure describes how to interpret values
/// |rows_portion_size| is a number of rows to read before break and check limits
BlockInputStreamFromRowInputStream( BlockInputStreamFromRowInputStream(
const RowInputStreamPtr & row_input_, const RowInputStreamPtr & row_input_,
const Block & sample_, const Block & sample_,
UInt64 max_block_size_, UInt64 max_block_size_,
UInt64 rows_portion_size_,
const FormatSettings & settings); const FormatSettings & settings);
void readPrefix() override { row_input->readPrefix(); } void readPrefix() override { row_input->readPrefix(); }
@ -42,6 +44,7 @@ private:
RowInputStreamPtr row_input; RowInputStreamPtr row_input;
Block sample; Block sample;
UInt64 max_block_size; UInt64 max_block_size;
UInt64 rows_portion_size;
BlockMissingValues block_missing_values; BlockMissingValues block_missing_values;
UInt64 allow_errors_num; UInt64 allow_errors_num;
@ -50,5 +53,4 @@ private:
size_t total_rows = 0; size_t total_rows = 0;
size_t num_errors = 0; size_t num_errors = 0;
}; };
} }

View File

@ -478,11 +478,12 @@ void registerInputFormatCSV(FormatFactory & factory)
const Block & sample, const Block & sample,
const Context &, const Context &,
UInt64 max_block_size, UInt64 max_block_size,
UInt64 rows_portion_size,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<BlockInputStreamFromRowInputStream>( return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<CSVRowInputStream>(buf, sample, with_names, settings), std::make_shared<CSVRowInputStream>(buf, sample, with_names, settings),
sample, max_block_size, settings); sample, max_block_size, rows_portion_size, settings);
}); });
} }
} }

View File

@ -302,12 +302,18 @@ void registerInputFormatCapnProto(FormatFactory & factory)
{ {
factory.registerInputFormat( factory.registerInputFormat(
"CapnProto", "CapnProto",
[](ReadBuffer & buf, const Block & sample, const Context & context, UInt64 max_block_size, const FormatSettings & settings) [](ReadBuffer & buf,
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
const FormatSettings & settings)
{ {
return std::make_shared<BlockInputStreamFromRowInputStream>( return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<CapnProtoRowInputStream>(buf, sample, FormatSchemaInfo(context, "CapnProto")), std::make_shared<CapnProtoRowInputStream>(buf, sample, FormatSchemaInfo(context, "CapnProto")),
sample, sample,
max_block_size, max_block_size,
rows_portion_size,
settings); settings);
}); });
} }

View File

@ -27,7 +27,7 @@ const FormatFactory::Creators & FormatFactory::getCreators(const String & name)
} }
BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & buf, const Block & sample, const Context & context, UInt64 max_block_size) const BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & buf, const Block & sample, const Context & context, UInt64 max_block_size, UInt64 rows_portion_size) const
{ {
const auto & input_getter = getCreators(name).first; const auto & input_getter = getCreators(name).first;
if (!input_getter) if (!input_getter)
@ -47,7 +47,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num; format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio; format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
return input_getter(buf, sample, context, max_block_size, format_settings); return input_getter(buf, sample, context, max_block_size, rows_portion_size, format_settings);
} }

View File

@ -35,6 +35,7 @@ private:
const Block & sample, const Block & sample,
const Context & context, const Context & context,
UInt64 max_block_size, UInt64 max_block_size,
UInt64 rows_portion_size,
const FormatSettings & settings)>; const FormatSettings & settings)>;
using OutputCreator = std::function<BlockOutputStreamPtr( using OutputCreator = std::function<BlockOutputStreamPtr(
@ -49,7 +50,7 @@ private:
public: public:
BlockInputStreamPtr getInput(const String & name, ReadBuffer & buf, BlockInputStreamPtr getInput(const String & name, ReadBuffer & buf,
const Block & sample, const Context & context, UInt64 max_block_size) const; const Block & sample, const Context & context, UInt64 max_block_size, UInt64 rows_portion_size = 0) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf, BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context) const; const Block & sample, const Context & context) const;

View File

@ -259,11 +259,12 @@ void registerInputFormatJSONEachRow(FormatFactory & factory)
const Block & sample, const Block & sample,
const Context &, const Context &,
UInt64 max_block_size, UInt64 max_block_size,
UInt64 rows_portion_size,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<BlockInputStreamFromRowInputStream>( return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<JSONEachRowRowInputStream>(buf, sample, settings), std::make_shared<JSONEachRowRowInputStream>(buf, sample, settings),
sample, max_block_size, settings); sample, max_block_size, rows_portion_size, settings);
}); });
} }

View File

@ -12,7 +12,8 @@ void registerInputFormatNative(FormatFactory & factory)
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
size_t, UInt64 /* max_block_size */,
UInt64 /* min_read_rows */,
const FormatSettings &) const FormatSettings &)
{ {
return std::make_shared<NativeBlockInputStream>(buf, sample, 0); return std::make_shared<NativeBlockInputStream>(buf, sample, 0);

View File

@ -475,7 +475,8 @@ void registerInputFormatParquet(FormatFactory & factory)
[](ReadBuffer & buf, [](ReadBuffer & buf,
const Block & sample, const Block & sample,
const Context & context, const Context & context,
size_t /*max_block_size */, UInt64 /* max_block_size */,
UInt64 /* rows_portion_size */,
const FormatSettings & /* settings */) { return std::make_shared<ParquetBlockInputStream>(buf, sample, context); }); const FormatSettings & /* settings */) { return std::make_shared<ParquetBlockInputStream>(buf, sample, context); });
} }

View File

@ -72,11 +72,12 @@ void registerInputFormatProtobuf(FormatFactory & factory)
const Block & sample, const Block & sample,
const Context & context, const Context & context,
UInt64 max_block_size, UInt64 max_block_size,
UInt64 rows_portion_size,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<BlockInputStreamFromRowInputStream>( return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<ProtobufRowInputStream>(buf, sample, FormatSchemaInfo(context, "Protobuf")), std::make_shared<ProtobufRowInputStream>(buf, sample, FormatSchemaInfo(context, "Protobuf")),
sample, max_block_size, settings); sample, max_block_size, rows_portion_size, settings);
}); });
} }

View File

@ -198,11 +198,12 @@ void registerInputFormatTSKV(FormatFactory & factory)
const Block & sample, const Block & sample,
const Context &, const Context &,
UInt64 max_block_size, UInt64 max_block_size,
UInt64 rows_portion_size,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<BlockInputStreamFromRowInputStream>( return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TSKVRowInputStream>(buf, sample, settings), std::make_shared<TSKVRowInputStream>(buf, sample, settings),
sample, max_block_size, settings); sample, max_block_size, rows_portion_size, settings);
}); });
} }

View File

@ -456,11 +456,12 @@ void registerInputFormatTabSeparated(FormatFactory & factory)
const Block & sample, const Block & sample,
const Context &, const Context &,
UInt64 max_block_size, UInt64 max_block_size,
UInt64 rows_portion_size,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<BlockInputStreamFromRowInputStream>( return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TabSeparatedRowInputStream>(buf, sample, false, false, settings), std::make_shared<TabSeparatedRowInputStream>(buf, sample, false, false, settings),
sample, max_block_size, settings); sample, max_block_size, rows_portion_size, settings);
}); });
} }
@ -471,11 +472,12 @@ void registerInputFormatTabSeparated(FormatFactory & factory)
const Block & sample, const Block & sample,
const Context &, const Context &,
UInt64 max_block_size, UInt64 max_block_size,
UInt64 rows_portion_size,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<BlockInputStreamFromRowInputStream>( return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TabSeparatedRowInputStream>(buf, sample, true, false, settings), std::make_shared<TabSeparatedRowInputStream>(buf, sample, true, false, settings),
sample, max_block_size, settings); sample, max_block_size, rows_portion_size, settings);
}); });
} }
@ -486,11 +488,12 @@ void registerInputFormatTabSeparated(FormatFactory & factory)
const Block & sample, const Block & sample,
const Context &, const Context &,
UInt64 max_block_size, UInt64 max_block_size,
UInt64 rows_portion_size,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<BlockInputStreamFromRowInputStream>( return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TabSeparatedRowInputStream>(buf, sample, true, true, settings), std::make_shared<TabSeparatedRowInputStream>(buf, sample, true, true, settings),
sample, max_block_size, settings); sample, max_block_size, rows_portion_size, settings);
}); });
} }
} }

View File

@ -155,11 +155,12 @@ void registerInputFormatValues(FormatFactory & factory)
const Block & sample, const Block & sample,
const Context & context, const Context & context,
UInt64 max_block_size, UInt64 max_block_size,
UInt64 rows_portion_size,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<BlockInputStreamFromRowInputStream>( return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<ValuesRowInputStream>(buf, sample, context, settings), std::make_shared<ValuesRowInputStream>(buf, sample, context, settings),
sample, max_block_size, settings); sample, max_block_size, rows_portion_size, settings);
}); });
} }

View File

@ -45,7 +45,7 @@ try
FormatSettings format_settings; FormatSettings format_settings;
RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings); RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, format_settings); BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, format_settings);
RowOutputStreamPtr row_output = std::make_shared<TabSeparatedRowOutputStream>(out_buf, sample, false, false, format_settings); RowOutputStreamPtr row_output = std::make_shared<TabSeparatedRowOutputStream>(out_buf, sample, false, false, format_settings);
BlockOutputStreamFromRowOutputStream block_output(row_output, sample); BlockOutputStreamFromRowOutputStream block_output(row_output, sample);

View File

@ -42,7 +42,7 @@ try
RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings); RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings);
RowOutputStreamPtr row_output = std::make_shared<TabSeparatedRowOutputStream>(out_buf, sample, false, false, format_settings); RowOutputStreamPtr row_output = std::make_shared<TabSeparatedRowOutputStream>(out_buf, sample, false, false, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, format_settings); BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, format_settings);
BlockOutputStreamFromRowOutputStream block_output(row_output, sample); BlockOutputStreamFromRowOutputStream block_output(row_output, sample);
copyData(block_input, block_output); copyData(block_input, block_output);

View File

@ -10,7 +10,7 @@ namespace DB
class DelimitedReadBuffer : public ReadBuffer class DelimitedReadBuffer : public ReadBuffer
{ {
public: public:
DelimitedReadBuffer(ReadBuffer * buffer_, char delimiter_) : ReadBuffer(nullptr, 0), buffer(buffer_), delimiter(delimiter_) DelimitedReadBuffer(std::unique_ptr<ReadBuffer> buffer_, char delimiter_) : ReadBuffer(nullptr, 0), buffer(std::move(buffer_)), delimiter(delimiter_)
{ {
// TODO: check that `buffer_` is not nullptr. // TODO: check that `buffer_` is not nullptr.
} }

View File

@ -39,7 +39,14 @@ void KafkaBlockInputStream::readPrefixImpl()
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->subscribe(storage.topics); buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->subscribe(storage.topics);
addChild(FormatFactory::instance().getInput(storage.format_name, *buffer, storage.getSampleBlock(), context, max_block_size)); const auto & limits = getLimits();
const size_t poll_timeout = buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->pollTimeout();
size_t rows_portion_size = poll_timeout ? std::min(max_block_size, limits.max_execution_time.totalMilliseconds() / poll_timeout) : max_block_size;
rows_portion_size = std::max(rows_portion_size, 1ul);
auto child = FormatFactory::instance().getInput(storage.format_name, *buffer, storage.getSampleBlock(), context, max_block_size, rows_portion_size);
child->setLimits(limits);
addChild(child);
broken = true; broken = true;
} }

View File

@ -3,11 +3,6 @@
namespace DB namespace DB
{ {
namespace
{
const auto READ_POLL_MS = 500; /// How long to wait for a batch of messages.
}
void ReadBufferFromKafkaConsumer::commit() void ReadBufferFromKafkaConsumer::commit()
{ {
if (messages.empty() || current == messages.begin()) if (messages.empty() || current == messages.begin())
@ -46,7 +41,7 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
if (current == messages.end()) if (current == messages.end())
{ {
commit(); commit();
messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(READ_POLL_MS)); messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout));
current = messages.begin(); current = messages.begin();
LOG_TRACE(log, "Polled batch of " << messages.size() << " messages"); LOG_TRACE(log, "Polled batch of " << messages.size() << " messages");

View File

@ -12,12 +12,16 @@ namespace DB
using BufferPtr = std::shared_ptr<DelimitedReadBuffer>; using BufferPtr = std::shared_ptr<DelimitedReadBuffer>;
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>; using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
class ReadBufferFromKafkaConsumer : public ReadBuffer class ReadBufferFromKafkaConsumer : public ReadBuffer
{ {
public: public:
ReadBufferFromKafkaConsumer(ConsumerPtr consumer_, Poco::Logger * log_, size_t max_batch_size) ReadBufferFromKafkaConsumer(ConsumerPtr consumer_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_)
: ReadBuffer(nullptr, 0), consumer(consumer_), log(log_), batch_size(max_batch_size), current(messages.begin()) : ReadBuffer(nullptr, 0)
, consumer(consumer_)
, log(log_)
, batch_size(max_batch_size)
, poll_timeout(poll_timeout_)
, current(messages.begin())
{ {
} }
@ -25,12 +29,15 @@ public:
void subscribe(const Names & topics); // Subscribe internal consumer to topics. void subscribe(const Names & topics); // Subscribe internal consumer to topics.
void unsubscribe(); // Unsubscribe internal consumer in case of failure. void unsubscribe(); // Unsubscribe internal consumer in case of failure.
auto pollTimeout() { return poll_timeout; }
private: private:
using Messages = std::vector<cppkafka::Message>; using Messages = std::vector<cppkafka::Message>;
ConsumerPtr consumer; ConsumerPtr consumer;
Poco::Logger * log; Poco::Logger * log;
const size_t batch_size = 1; const size_t batch_size = 1;
const size_t poll_timeout = 0;
Messages messages; Messages messages;
Messages::const_iterator current; Messages::const_iterator current;

View File

@ -210,8 +210,9 @@ BufferPtr StorageKafka::createBuffer()
size_t batch_size = max_block_size; size_t batch_size = max_block_size;
if (!batch_size) if (!batch_size)
batch_size = settings.max_block_size.value; batch_size = settings.max_block_size.value;
size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();
return std::make_shared<DelimitedReadBuffer>(new ReadBufferFromKafkaConsumer(consumer, log, batch_size), row_delimiter); return std::make_shared<DelimitedReadBuffer>(std::make_unique<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout), row_delimiter);
} }
BufferPtr StorageKafka::claimBuffer() BufferPtr StorageKafka::claimBuffer()

View File

@ -9,6 +9,7 @@ from concurrent.futures import ThreadPoolExecutor
import enum import enum
import multiprocessing import multiprocessing
import sys import sys
import time
class Sync(enum.Enum): class Sync(enum.Enum):
@ -45,6 +46,12 @@ def main():
parser.add_argument('--repeat', type=int, default=1, parser.add_argument('--repeat', type=int, default=1,
help='send same (multiplied) message many times') help='send same (multiplied) message many times')
mode_group = parser.add_mutually_exclusive_group()
mode_group.add_argument('--jobs', type=int, default=multiprocessing.cpu_count(),
help='number of concurrent jobs')
mode_group.add_argument('--delay', type=int, metavar='SECONDS', default=0,
help='delay before sending next message')
args = parser.parse_args() args = parser.parse_args()
config = { config = {
'bootstrap_servers': f'{args.server}:{args.port}', 'bootstrap_servers': f'{args.server}:{args.port}',
@ -56,10 +63,14 @@ def main():
message = sys.stdin.buffer.read() * args.multiply message = sys.stdin.buffer.read() * args.multiply
def send(num): def send(num):
if args.delay > 0:
time.sleep(args.delay)
client.send(topic=args.topic, value=message) client.send(topic=args.topic, value=message)
print(f'iteration {num}: sent a message multiplied {args.multiply} times') print(f'iteration {num}: sent a message multiplied {args.multiply} times')
pool = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) if args.delay > 0:
args.jobs = 1
pool = ThreadPoolExecutor(max_workers=args.jobs)
for num in range(args.repeat): for num in range(args.repeat):
pool.submit(send, num) pool.submit(send, num)
pool.shutdown() pool.shutdown()