remove the flag in the parser

This commit is contained in:
Peng Jian 2021-03-31 22:25:51 +08:00
parent e30c07db20
commit 26b5482b4d
7 changed files with 9 additions and 16 deletions

View File

@ -269,8 +269,7 @@ InputFormatPtr FormatFactory::getInputFormat(
const Block & sample,
const Context & context,
UInt64 max_block_size,
const std::optional<FormatSettings> & _format_settings,
const bool sync_after_error) const
const std::optional<FormatSettings> & _format_settings) const
{
const auto & input_getter = getCreators(name).input_processor_creator;
if (!input_getter)
@ -290,7 +289,6 @@ InputFormatPtr FormatFactory::getInputFormat(
params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
params.max_execution_time = settings.max_execution_time;
params.timeout_overflow_mode = settings.timeout_overflow_mode;
params.sync_after_error = sync_after_error;
auto format = input_getter(buf, sample, params, format_settings);
/// It's a kludge. Because I cannot remove context from values format.

View File

@ -133,8 +133,7 @@ public:
const Block & sample,
const Context & context,
UInt64 max_block_size,
const std::optional<FormatSettings> & format_settings = std::nullopt,
const bool sync_after_error = false) const;
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
/// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done.
OutputFormatPtr getOutputFormatParallelIfPossible(

View File

@ -18,9 +18,7 @@ IInputFormat::IInputFormat(Block header, ReadBuffer & in_)
void IInputFormat::resetParser()
{
if (in.hasPendingData())
throw Exception("Unread data in IInputFormat::resetParser. Most likely it's a bug.", ErrorCodes::LOGICAL_ERROR);
in.ignoreAll();
// those are protected attributes from ISource (I didn't want to propagate resetParser up there)
finished = false;
got_exception = false;

View File

@ -108,12 +108,6 @@ Chunk IRowInputFormat::generate()
if (!isParseError(e.code()))
throw;
if (params.sync_after_error)
{
syncAfterError();
throw;
}
if (params.allow_errors_num == 0 && params.allow_errors_ratio == 0)
throw;

View File

@ -29,7 +29,6 @@ struct RowInputFormatParams
Poco::Timespan max_execution_time = 0;
OverflowMode timeout_overflow_mode = OverflowMode::THROW;
bool sync_after_error = false;
};
bool isParseError(int code);

View File

@ -84,7 +84,7 @@ Block KafkaBlockInputStream::readImpl()
auto put_error_to_stream = handle_error_mode == HandleKafkaErrorMode::STREAM;
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size, {}, put_error_to_stream);
storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);

View File

@ -190,6 +190,11 @@ StorageKafka::StorageKafka(
, settings_adjustments(createSettingsAdjustments())
, thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value)
{
if (kafka_settings->kafka_handle_error_mode == HandleKafkaErrorMode::STREAM)
{
kafka_settings->input_format_allow_errors_num = 0;
kafka_settings->input_format_allow_errors_ratio = 0;
}
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);