Merge pull request #57006 from Avogar/save-errors-better

Fix early stop while parsing file with skipping lots of errors
This commit is contained in:
Kruglov Pavel 2023-12-11 19:03:14 +01:00 committed by GitHub
commit 20510cde34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 113 additions and 56 deletions

View File

@ -347,7 +347,13 @@ InputFormatPtr FormatFactory::getInput(
if (owned_buf)
format->addBuffer(std::move(owned_buf));
if (!settings.input_format_record_errors_file_path.toString().empty())
format->setErrorsLogger(std::make_shared<ParallelInputFormatErrorsLogger>(context));
{
if (parallel_parsing)
format->setErrorsLogger(std::make_shared<ParallelInputFormatErrorsLogger>(context));
else
format->setErrorsLogger(std::make_shared<InputFormatErrorsLogger>(context));
}
/// It's a kludge. Because I cannot remove context from values format.
/// (Not needed in the parallel_parsing case above because VALUES format doesn't support it.)

View File

@ -128,7 +128,7 @@ Chunk IRowInputFormat::generate()
RowReadExtension info;
bool continue_reading = true;
for (size_t rows = 0; rows < params.max_block_size && continue_reading; ++rows)
for (size_t rows = 0; (rows < params.max_block_size || num_rows == 0) && continue_reading; ++rows)
{
try
{

View File

@ -126,10 +126,6 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupPtr thread_grou
first_parser_finished.set();
}
// We suppose we will get at least some blocks for a non-empty buffer,
// except at the end of file. Also see a matching assert in readImpl().
assert(unit.is_last || !unit.chunk_ext.chunk.empty() || parsing_finished);
std::lock_guard<std::mutex> lock(mutex);
unit.status = READY_TO_READ;
reader_condvar.notify_all();
@ -200,62 +196,69 @@ Chunk ParallelParsingInputFormat::generate()
}
const auto inserter_unit_number = reader_ticket_number % processing_units.size();
auto & unit = processing_units[inserter_unit_number];
auto * unit = &processing_units[inserter_unit_number];
if (!next_block_in_current_unit.has_value())
{
// We have read out all the Blocks from the previous Processing Unit,
// wait for the current one to become ready.
std::unique_lock<std::mutex> lock(mutex);
reader_condvar.wait(lock, [&](){ return unit.status == READY_TO_READ || parsing_finished; });
if (parsing_finished)
while (true)
{
/**
* Check for background exception and rethrow it before we return.
*/
if (background_exception)
// We have read out all the Blocks from the previous Processing Unit,
// wait for the current one to become ready.
std::unique_lock<std::mutex> lock(mutex);
reader_condvar.wait(lock, [&]() { return unit->status == READY_TO_READ || parsing_finished; });
if (parsing_finished)
{
lock.unlock();
cancel();
std::rethrow_exception(background_exception);
/// Check for background exception and rethrow it before we return.
if (background_exception)
{
lock.unlock();
cancel();
std::rethrow_exception(background_exception);
}
return {};
}
return {};
assert(unit->status == READY_TO_READ);
if (!unit->chunk_ext.chunk.empty())
break;
/// If this uint is last, parsing is finished.
if (unit->is_last)
{
parsing_finished = true;
return {};
}
/// We can get zero blocks for an entire segment if format parser
/// skipped all rows. For example, it can happen while using settings
/// input_format_allow_errors_num/input_format_allow_errors_ratio
/// and this segment contained only rows with errors.
/// Process the next unit.
++reader_ticket_number;
unit = &processing_units[reader_ticket_number % processing_units.size()];
}
assert(unit.status == READY_TO_READ);
next_block_in_current_unit = 0;
}
if (unit.chunk_ext.chunk.empty())
{
/*
* Can we get zero blocks for an entire segment, when the format parser
* skips it entire content and does not create any blocks? Probably not,
* but if we ever do, we should add a loop around the above if, to skip
* these. Also see a matching assert in the parser thread.
*/
assert(unit.is_last);
parsing_finished = true;
return {};
}
assert(next_block_in_current_unit.value() < unit->chunk_ext.chunk.size());
assert(next_block_in_current_unit.value() < unit.chunk_ext.chunk.size());
Chunk res = std::move(unit.chunk_ext.chunk.at(*next_block_in_current_unit));
last_block_missing_values = std::move(unit.chunk_ext.block_missing_values[*next_block_in_current_unit]);
last_approx_bytes_read_for_chunk = unit.chunk_ext.approx_chunk_sizes.at(*next_block_in_current_unit);
Chunk res = std::move(unit->chunk_ext.chunk.at(*next_block_in_current_unit));
last_block_missing_values = std::move(unit->chunk_ext.block_missing_values[*next_block_in_current_unit]);
last_approx_bytes_read_for_chunk = unit->chunk_ext.approx_chunk_sizes.at(*next_block_in_current_unit);
next_block_in_current_unit.value() += 1;
if (*next_block_in_current_unit == unit.chunk_ext.chunk.size())
if (*next_block_in_current_unit == unit->chunk_ext.chunk.size())
{
// parsing_finished reading this Processing Unit, move to the next one.
next_block_in_current_unit.reset();
++reader_ticket_number;
if (unit.is_last)
if (unit->is_last)
{
// It it was the last unit, we're parsing_finished.
parsing_finished = true;
@ -264,7 +267,7 @@ Chunk ParallelParsingInputFormat::generate()
{
// Pass the unit back to the segmentator.
std::lock_guard lock(mutex);
unit.status = READY_TO_INSERT;
unit->status = READY_TO_INSERT;
segmentator_condvar.notify_all();
}
}

View File

@ -20,7 +20,7 @@ namespace
const String DEFAULT_OUTPUT_FORMAT = "CSV";
}
InputFormatErrorsLogger::InputFormatErrorsLogger(const ContextPtr & context)
InputFormatErrorsLogger::InputFormatErrorsLogger(const ContextPtr & context) : max_block_size(context->getSettingsRef().max_block_size)
{
String output_format = context->getSettingsRef().errors_output_format;
if (!FormatFactory::instance().isOutputFormat(output_format))
@ -59,30 +59,47 @@ InputFormatErrorsLogger::InputFormatErrorsLogger(const ContextPtr & context)
{std::make_shared<DataTypeUInt32>(), "offset"},
{std::make_shared<DataTypeString>(), "reason"},
{std::make_shared<DataTypeString>(), "raw_data"}};
errors_columns = header.cloneEmptyColumns();
writer = context->getOutputFormat(output_format, *write_buf, header);
}
InputFormatErrorsLogger::~InputFormatErrorsLogger()
{
writer->finalize();
writer->flush();
write_buf->finalize();
try
{
if (!errors_columns[0]->empty())
writeErrors();
writer->finalize();
writer->flush();
write_buf->finalize();
}
catch (...)
{
tryLogCurrentException("InputFormatErrorsLogger");
}
}
void InputFormatErrorsLogger::logErrorImpl(ErrorEntry entry)
{
auto error = header.cloneEmpty();
auto columns = error.mutateColumns();
columns[0]->insert(entry.time);
database.empty() ? columns[1]->insertDefault() : columns[1]->insert(database);
table.empty() ? columns[2]->insertDefault() : columns[2]->insert(table);
columns[3]->insert(entry.offset);
columns[4]->insert(entry.reason);
columns[5]->insert(entry.raw_data);
error.setColumns(std::move(columns));
errors_columns[0]->insert(entry.time);
database.empty() ? errors_columns[1]->insertDefault() : errors_columns[1]->insert(database);
table.empty() ? errors_columns[2]->insertDefault() : errors_columns[2]->insert(table);
errors_columns[3]->insert(entry.offset);
errors_columns[4]->insert(entry.reason);
errors_columns[5]->insert(entry.raw_data);
writer->write(error);
if (errors_columns[0]->size() >= max_block_size)
writeErrors();
}
void InputFormatErrorsLogger::writeErrors()
{
auto block = header.cloneEmpty();
block.setColumns(std::move(errors_columns));
writer->write(block);
errors_columns = header.cloneEmptyColumns();
}
void InputFormatErrorsLogger::logError(ErrorEntry entry)

View File

@ -24,6 +24,7 @@ public:
virtual void logError(ErrorEntry entry);
void logErrorImpl(ErrorEntry entry);
void writeErrors();
private:
Block header;
@ -34,6 +35,9 @@ private:
String database;
String table;
MutableColumns errors_columns;
size_t max_block_size;
};
using InputFormatErrorsLoggerPtr = std::shared_ptr<InputFormatErrorsLogger>;

View File

@ -0,0 +1,4 @@
42
100000
42
100000

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
FILE=$CLICKHOUSE_TEST_UNIQUE_NAME
ERRORS_FILE=$CLICKHOUSE_TEST_UNIQUE_NAME.errors
$CLICKHOUSE_LOCAL -q "select 'Error' from numbers(100000) format TSVRaw" > $FILE
echo -e "42" >> $FILE
$CLICKHOUSE_LOCAL -q "select * from file('$FILE', CSV, 'x UInt32') settings input_format_allow_errors_ratio=1, max_block_size=10000, input_format_parallel_parsing=0, input_format_record_errors_file_path='$ERRORS_FILE'";
$CLICKHOUSE_LOCAL -q "select count() from file('$ERRORS_FILE', CSV)"
rm $ERRORS_FILE
$CLICKHOUSE_LOCAL -q "select * from file('$FILE', CSV, 'x UInt32') settings input_format_allow_errors_ratio=1, max_block_size=10000, input_format_parallel_parsing=1, input_format_record_errors_file_path='$ERRORS_FILE'";
$CLICKHOUSE_LOCAL -q "select count() from file('$ERRORS_FILE', CSV)"
rm $ERRORS_FILE
rm $FILE