diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index b8ee4d438df..a297f487318 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -10,6 +10,8 @@ namespace DB /// Used to pass info from header between different InputFormats in ParallelParsing struct ColumnMapping { + /// Non-atomic because only read access in possible + bool is_set; /// Maps indexes of columns in the input file to indexes of table columns using OptionalIndexes = std::vector>; OptionalIndexes column_indexes_for_input_fields; @@ -59,7 +61,7 @@ public: /// Must be called from ParallelParsingInputFormat after readSuffix ColumnMappingPtr getColumnMapping() const { return column_mapping; } /// Must be called from ParallelParsingInputFormat before readPrefix - void setColumnMapping(ColumnMappingPtr column_mapping_ ) { column_mapping = column_mapping_; } + void setColumnMapping(ColumnMappingPtr column_mapping_) { column_mapping = column_mapping_; } size_t getCurrentUnitNumber() const { return current_unit_number; } void setCurrentUnitNumber(size_t current_unit_number_) { current_unit_number = current_unit_number_; } diff --git a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp index ce041f6636b..4c2b9df304b 100644 --- a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp @@ -92,7 +92,11 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr threa /// Propagate column_mapping to other parsers. /// Note: column_mapping is used only for *WithNames types if (current_ticket_number != 0) + { + column_mapping->is_set = true; input_format->setColumnMapping(column_mapping); + } + // We don't know how many blocks will be. So we have to read them all // until an empty block occurred. @@ -105,7 +109,7 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr threa unit.chunk_ext.block_missing_values.emplace_back(parser.getMissingValues()); } - /// Extract column_mapping from first parser to propage it to others + /// Extract column_mapping from first parser to propagate it to others if (current_ticket_number == 0) column_mapping = input_format->getColumnMapping(); diff --git a/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp b/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp index 04c6b4c3ee0..ffb1b96f70e 100644 --- a/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp @@ -170,7 +170,7 @@ void TabSeparatedRowInputFormat::readPrefix() skipTSVRow(in, column_mapping->column_indexes_for_input_fields.size()); } } - else + else if (!column_mapping->is_set) setupAllColumnsByTableSchema(); if (with_types)