fix stupid bug

This commit is contained in:
Nikita Mikhaylov 2021-03-02 16:31:19 +03:00
parent 3a7d48f3a4
commit 3372dd7b6a
4 changed files with 18 additions and 15 deletions

View File

@ -10,7 +10,8 @@ namespace DB
/// Used to pass info from header between different InputFormats in ParallelParsing
struct ColumnMapping
{
/// Non-atomic because only read access in possible
/// Non-atomic because there is strict `happens-before` between read and write access
/// See InputFormatParallelParsing
bool is_set;
/// Maps indexes of columns in the input file to indexes of table columns
using OptionalIndexes = std::vector<std::optional<size_t>>;

View File

@ -145,6 +145,16 @@ static void skipRow(ReadBuffer & in, const FormatSettings::CSV & settings, size_
}
}
void CSVRowInputFormat::setupAllColumnsByTableSchema()
{
const auto & header = getPort().getHeader();
column_mapping->read_columns.assign(header.columns(), true);
column_mapping->column_indexes_for_input_fields.resize(header.columns());
for (size_t i = 0; i < column_mapping->column_indexes_for_input_fields.size(); ++i)
column_mapping->column_indexes_for_input_fields[i] = i;
}
void CSVRowInputFormat::readPrefix()
{
@ -193,16 +203,8 @@ void CSVRowInputFormat::readPrefix()
else
skipRow(in, format_settings.csv, num_columns);
}
/// The default: map each column of the file to the column of the table with
/// the same index.
column_mapping->read_columns.assign(header.columns(), true);
column_mapping->column_indexes_for_input_fields.resize(header.columns());
for (size_t i = 0; i < column_mapping->column_indexes_for_input_fields.size(); ++i)
{
column_mapping->column_indexes_for_input_fields[i] = i;
}
else if (!column_mapping->is_set)
setupAllColumnsByTableSchema();
}

View File

@ -44,6 +44,7 @@ private:
void addInputColumn(const String & column_name);
void setupAllColumnsByTableSchema();
bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out) override;
void tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column) override;
bool isGarbageAfterField(size_t, ReadBuffer::Position pos) override

View File

@ -92,11 +92,7 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr threa
/// Propagate column_mapping to other parsers.
/// Note: column_mapping is used only for *WithNames types
if (current_ticket_number != 0)
{
column_mapping->is_set = true;
input_format->setColumnMapping(column_mapping);
}
// We don't know how many blocks will be. So we have to read them all
// until an empty block occurred.
@ -111,7 +107,10 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr threa
/// Extract column_mapping from first parser to propagate it to others
if (current_ticket_number == 0)
{
column_mapping = input_format->getColumnMapping();
column_mapping->is_set = true;
}
// We suppose we will get at least some blocks for a non-empty buffer,
// except at the end of file. Also see a matching assert in readImpl().