Fix issues detected by clang tidy, fix issue with flush timeout

This commit is contained in:
Mikhail Filimonov 2020-06-03 17:34:14 +02:00
parent bf1e5db0ee
commit 94261b9786
2 changed files with 34 additions and 53 deletions

View File

@ -180,13 +180,13 @@ SettingsChanges StorageKafka::createSettingsAdjustments()
Names StorageKafka::parseTopics(String topic_list)
{
Names topics_;
boost::split(topics_,topic_list,[](char c){ return c == ','; });
for (String & topic : topics_)
Names result;
boost::split(result,topic_list,[](char c){ return c == ','; });
for (String & topic : result)
{
boost::trim(topic);
}
return topics_;
return result;
}
String StorageKafka::getDefaultClientId(const StorageID & table_id_)
@ -211,8 +211,8 @@ Pipes StorageKafka::read(
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
Pipes pipes;
pipes.reserve(num_created_consumers);
auto _tmp_context = std::make_shared<Context>(context);
_tmp_context->applySettingsChanges(settings_adjustments);
auto modified_context = std::make_shared<Context>(context);
modified_context->applySettingsChanges(settings_adjustments);
// Claim as many consumers as requested, but don't block
for (size_t i = 0; i < num_created_consumers; ++i)
@ -221,7 +221,7 @@ Pipes StorageKafka::read(
/// TODO: probably that leads to awful performance.
/// FIXME: seems that doesn't help with extra reading and committing unprocessed messages.
/// TODO: rewrite KafkaBlockInputStream to KafkaSource. Now it is used in other place.
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<KafkaBlockInputStream>(*this, _tmp_context, column_names, 1)));
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<KafkaBlockInputStream>(*this, modified_context, column_names, 1)));
}
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
@ -231,12 +231,12 @@ Pipes StorageKafka::read(
BlockOutputStreamPtr StorageKafka::write(const ASTPtr &, const Context & context)
{
auto _tmp_context = std::make_shared<Context>(context);
_tmp_context->applySettingsChanges(settings_adjustments);
auto modified_context = std::make_shared<Context>(context);
modified_context->applySettingsChanges(settings_adjustments);
if (topics.size() > 1)
throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<KafkaBlockOutputStream>(*this, _tmp_context);
return std::make_shared<KafkaBlockOutputStream>(*this, modified_context);
}
@ -360,56 +360,32 @@ ConsumerBufferPtr StorageKafka::createReadBuffer(const size_t consumer_number)
consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, getPollMaxBatchSize(), getPollTimeout(), intermediate_commit, stream_cancelled, topics);
return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
}
size_t StorageKafka::getMaxBlockSize() const
{
size_t block_size = kafka_settings->kafka_max_block_size.value;
if (!block_size)
{
block_size = global_context.getSettingsRef().max_insert_block_size.value / num_consumers;
}
return block_size;
return kafka_settings->kafka_max_block_size.changed
? kafka_settings->kafka_max_block_size.value
: (global_context.getSettingsRef().max_insert_block_size.value / num_consumers);
}
size_t StorageKafka::getPollMaxBatchSize() const
{
size_t batch_size = kafka_settings->kafka_poll_max_batch_size.value;
size_t batch_size = kafka_settings->kafka_poll_max_batch_size.changed
? kafka_settings->kafka_poll_max_batch_size.value
: global_context.getSettingsRef().max_block_size.value;
if (!batch_size)
{
batch_size = global_context.getSettingsRef().max_block_size.value;
}
return std::min(batch_size,getMaxBlockSize());
}
size_t StorageKafka::getPollTimeout() const
size_t StorageKafka::getPollTimeoutMillisecond() const
{
size_t poll_timeout = kafka_settings->kafka_poll_timeout_ms.totalMilliseconds();
if (!poll_timeout)
{
poll_timeout = global_context.getSettingsRef().stream_poll_timeout_ms.totalMilliseconds();
return kafka_settings->kafka_poll_timeout_ms.changed
? kafka_settings->kafka_poll_timeout_ms.totalMilliseconds()
: global_context.getSettingsRef().stream_poll_timeout_ms.totalMilliseconds();
}
return poll_timeout;
}
size_t StorageKafka::getFlushTimeout() const
{
size_t flush_timeout = kafka_settings->kafka_flush_interval_ms.totalMilliseconds();
if (!flush_timeout)
{
flush_timeout = global_context.getSettingsRef().stream_flush_interval_ms.totalMilliseconds();
}
return flush_timeout;
}
void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
{
// Update consumer configuration from the configuration
@ -558,7 +534,13 @@ bool StorageKafka::streamToViews()
// Limit read batch to maximum block size to allow DDL
IBlockInputStream::LocalLimits limits;
limits.speed_limits.max_execution_time = getFlushTimeout();
limits.speed_limits.max_execution_time = kafka_settings->kafka_flush_interval_ms.changed
? kafka_settings->kafka_flush_interval_ms
: global_context.getSettingsRef().stream_flush_interval_ms;
LOG_ERROR(log, "limits.speed_limits.max_execution_time {}", limits.speed_limits.max_execution_time.totalMilliseconds() );
limits.timeout_overflow_mode = OverflowMode::BREAK;
stream->setLimits(limits);
}
@ -627,20 +609,20 @@ void registerStorageKafka(StorageFactory & factory)
{ \
if ((EVAL) == 1) \
{ \
engine_args[ARG_NUM-1] = \
engine_args[(ARG_NUM)-1] = \
evaluateConstantExpressionAsLiteral( \
engine_args[ARG_NUM-1], \
engine_args[(ARG_NUM)-1], \
args.local_context); \
} \
if ((EVAL) == 2) \
{ \
engine_args[ARG_NUM-1] = \
engine_args[(ARG_NUM)-1] = \
evaluateConstantExpressionOrIdentifierAsLiteral( \
engine_args[ARG_NUM-1], \
engine_args[(ARG_NUM)-1], \
args.local_context); \
} \
kafka_settings->PAR_NAME.set( \
engine_args[ARG_NUM-1]->as<ASTLiteral &>().value); \
engine_args[(ARG_NUM)-1]->as<ASTLiteral &>().value);\
} \
}

View File

@ -105,8 +105,7 @@ private:
size_t getPollMaxBatchSize() const;
size_t getMaxBlockSize() const;
size_t getPollTimeout() const;
size_t getFlushTimeout() const;
size_t getPollTimeoutMillisecond() const;
static Names parseTopics(String topic_list);
static String getDefaultClientId(const StorageID & table_id_);