mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-01 03:52:15 +00:00
better
This commit is contained in:
parent
eab35bfc7f
commit
369c9da161
@ -13,6 +13,7 @@ namespace ErrorCodes
|
|||||||
IInputFormat::IInputFormat(Block header, ReadBuffer & in_)
|
IInputFormat::IInputFormat(Block header, ReadBuffer & in_)
|
||||||
: ISource(std::move(header)), in(in_)
|
: ISource(std::move(header)), in(in_)
|
||||||
{
|
{
|
||||||
|
column_mapping = std::make_shared<ColumnMapping>();
|
||||||
}
|
}
|
||||||
|
|
||||||
void IInputFormat::resetParser()
|
void IInputFormat::resetParser()
|
||||||
|
@ -2,9 +2,26 @@
|
|||||||
|
|
||||||
#include <Processors/ISource.h>
|
#include <Processors/ISource.h>
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
/// Used to pass info from header between different InputFormats in ParallelParsing
|
||||||
|
struct ColumnMapping
|
||||||
|
{
|
||||||
|
/// Maps indexes of columns in the input file to indexes of table columns
|
||||||
|
using OptionalIndexes = std::vector<std::optional<size_t>>;
|
||||||
|
OptionalIndexes column_indexes_for_input_fields;
|
||||||
|
|
||||||
|
/// Tracks which columns we have read in a single read() call.
|
||||||
|
/// For columns that are never read, it is initialized to false when we
|
||||||
|
/// read the file header, and never changed afterwards.
|
||||||
|
/// For other columns, it is updated on each read() call.
|
||||||
|
std::vector<UInt8> read_columns;
|
||||||
|
};
|
||||||
|
|
||||||
|
using ColumnMappingPtr = std::shared_ptr<ColumnMapping>;
|
||||||
|
|
||||||
class ReadBuffer;
|
class ReadBuffer;
|
||||||
|
|
||||||
@ -39,9 +56,17 @@ public:
|
|||||||
return none;
|
return none;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Must be called from ParallelParsingInputFormat after readSuffix
|
||||||
|
ColumnMappingPtr getColumnMapping() const { return column_mapping; }
|
||||||
|
/// Must be called from ParallelParsingInputFormat before readPrefix
|
||||||
|
void setColumnMapping(ColumnMappingPtr column_mapping_ ) { column_mapping = column_mapping_; }
|
||||||
|
|
||||||
size_t getCurrentUnitNumber() const { return current_unit_number; }
|
size_t getCurrentUnitNumber() const { return current_unit_number; }
|
||||||
void setCurrentUnitNumber(size_t current_unit_number_) { current_unit_number = current_unit_number_; }
|
void setCurrentUnitNumber(size_t current_unit_number_) { current_unit_number = current_unit_number_; }
|
||||||
|
|
||||||
|
protected:
|
||||||
|
ColumnMappingPtr column_mapping{};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// Number of currently parsed chunk (if parallel parsing is enabled)
|
/// Number of currently parsed chunk (if parallel parsing is enabled)
|
||||||
size_t current_unit_number = 0;
|
size_t current_unit_number = 0;
|
||||||
|
@ -55,13 +55,13 @@ void CSVRowInputFormat::addInputColumn(const String & column_name)
|
|||||||
{
|
{
|
||||||
if (format_settings.skip_unknown_fields)
|
if (format_settings.skip_unknown_fields)
|
||||||
{
|
{
|
||||||
column_indexes_for_input_fields.push_back(std::nullopt);
|
column_mapping->column_indexes_for_input_fields.push_back(std::nullopt);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
throw Exception(
|
throw Exception(
|
||||||
"Unknown field found in CSV header: '" + column_name + "' " +
|
"Unknown field found in CSV header: '" + column_name + "' " +
|
||||||
"at position " + std::to_string(column_indexes_for_input_fields.size()) +
|
"at position " + std::to_string(column_mapping->column_indexes_for_input_fields.size()) +
|
||||||
"\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
|
"\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
|
||||||
ErrorCodes::INCORRECT_DATA
|
ErrorCodes::INCORRECT_DATA
|
||||||
);
|
);
|
||||||
@ -69,11 +69,11 @@ void CSVRowInputFormat::addInputColumn(const String & column_name)
|
|||||||
|
|
||||||
const auto column_index = column_it->second;
|
const auto column_index = column_it->second;
|
||||||
|
|
||||||
if (read_columns[column_index])
|
if (column_mapping->read_columns[column_index])
|
||||||
throw Exception("Duplicate field found while parsing CSV header: " + column_name, ErrorCodes::INCORRECT_DATA);
|
throw Exception("Duplicate field found while parsing CSV header: " + column_name, ErrorCodes::INCORRECT_DATA);
|
||||||
|
|
||||||
read_columns[column_index] = true;
|
column_mapping->read_columns[column_index] = true;
|
||||||
column_indexes_for_input_fields.emplace_back(column_index);
|
column_mapping->column_indexes_for_input_fields.emplace_back(column_index);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void skipEndOfLine(ReadBuffer & in)
|
static void skipEndOfLine(ReadBuffer & in)
|
||||||
@ -165,7 +165,7 @@ void CSVRowInputFormat::readPrefix()
|
|||||||
{
|
{
|
||||||
/// Look at the file header to see which columns we have there.
|
/// Look at the file header to see which columns we have there.
|
||||||
/// The missing columns are filled with defaults.
|
/// The missing columns are filled with defaults.
|
||||||
read_columns.assign(header.columns(), false);
|
column_mapping->read_columns.assign(header.columns(), false);
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
String column_name;
|
String column_name;
|
||||||
@ -179,7 +179,7 @@ void CSVRowInputFormat::readPrefix()
|
|||||||
|
|
||||||
skipDelimiter(in, format_settings.csv.delimiter, true);
|
skipDelimiter(in, format_settings.csv.delimiter, true);
|
||||||
|
|
||||||
for (auto read_column : read_columns)
|
for (auto read_column : column_mapping->read_columns)
|
||||||
{
|
{
|
||||||
if (!read_column)
|
if (!read_column)
|
||||||
{
|
{
|
||||||
@ -196,12 +196,12 @@ void CSVRowInputFormat::readPrefix()
|
|||||||
|
|
||||||
/// The default: map each column of the file to the column of the table with
|
/// The default: map each column of the file to the column of the table with
|
||||||
/// the same index.
|
/// the same index.
|
||||||
read_columns.assign(header.columns(), true);
|
column_mapping->read_columns.assign(header.columns(), true);
|
||||||
column_indexes_for_input_fields.resize(header.columns());
|
column_mapping->column_indexes_for_input_fields.resize(header.columns());
|
||||||
|
|
||||||
for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
|
for (size_t i = 0; i < column_mapping->column_indexes_for_input_fields.size(); ++i)
|
||||||
{
|
{
|
||||||
column_indexes_for_input_fields[i] = i;
|
column_mapping->column_indexes_for_input_fields[i] = i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,12 +218,12 @@ bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext
|
|||||||
/// it doesn't have to check it.
|
/// it doesn't have to check it.
|
||||||
bool have_default_columns = have_always_default_columns;
|
bool have_default_columns = have_always_default_columns;
|
||||||
|
|
||||||
ext.read_columns.assign(read_columns.size(), true);
|
ext.read_columns.assign(column_mapping->read_columns.size(), true);
|
||||||
const auto delimiter = format_settings.csv.delimiter;
|
const auto delimiter = format_settings.csv.delimiter;
|
||||||
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
|
for (size_t file_column = 0; file_column < column_mapping->column_indexes_for_input_fields.size(); ++file_column)
|
||||||
{
|
{
|
||||||
const auto & table_column = column_indexes_for_input_fields[file_column];
|
const auto & table_column = column_mapping->column_indexes_for_input_fields[file_column];
|
||||||
const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
|
const bool is_last_file_column = file_column + 1 == column_mapping->column_indexes_for_input_fields.size();
|
||||||
|
|
||||||
if (table_column)
|
if (table_column)
|
||||||
{
|
{
|
||||||
@ -245,9 +245,9 @@ bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext
|
|||||||
|
|
||||||
if (have_default_columns)
|
if (have_default_columns)
|
||||||
{
|
{
|
||||||
for (size_t i = 0; i < read_columns.size(); i++)
|
for (size_t i = 0; i < column_mapping->read_columns.size(); i++)
|
||||||
{
|
{
|
||||||
if (!read_columns[i])
|
if (!column_mapping->read_columns[i])
|
||||||
{
|
{
|
||||||
/// The column value for this row is going to be overwritten
|
/// The column value for this row is going to be overwritten
|
||||||
/// with default by the caller, but the general assumption is
|
/// with default by the caller, but the general assumption is
|
||||||
@ -268,7 +268,7 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
|
|||||||
{
|
{
|
||||||
const char delimiter = format_settings.csv.delimiter;
|
const char delimiter = format_settings.csv.delimiter;
|
||||||
|
|
||||||
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
|
for (size_t file_column = 0; file_column < column_mapping->column_indexes_for_input_fields.size(); ++file_column)
|
||||||
{
|
{
|
||||||
if (file_column == 0 && in.eof())
|
if (file_column == 0 && in.eof())
|
||||||
{
|
{
|
||||||
@ -277,10 +277,10 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
|
|||||||
}
|
}
|
||||||
|
|
||||||
skipWhitespacesAndTabs(in);
|
skipWhitespacesAndTabs(in);
|
||||||
if (column_indexes_for_input_fields[file_column].has_value())
|
if (column_mapping->column_indexes_for_input_fields[file_column].has_value())
|
||||||
{
|
{
|
||||||
const auto & header = getPort().getHeader();
|
const auto & header = getPort().getHeader();
|
||||||
size_t col_idx = column_indexes_for_input_fields[file_column].value();
|
size_t col_idx = column_mapping->column_indexes_for_input_fields[file_column].value();
|
||||||
if (!deserializeFieldAndPrintDiagnosticInfo(header.getByPosition(col_idx).name, data_types[col_idx], *columns[col_idx],
|
if (!deserializeFieldAndPrintDiagnosticInfo(header.getByPosition(col_idx).name, data_types[col_idx], *columns[col_idx],
|
||||||
out, file_column))
|
out, file_column))
|
||||||
return false;
|
return false;
|
||||||
@ -296,7 +296,7 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
|
|||||||
skipWhitespacesAndTabs(in);
|
skipWhitespacesAndTabs(in);
|
||||||
|
|
||||||
/// Delimiters
|
/// Delimiters
|
||||||
if (file_column + 1 == column_indexes_for_input_fields.size())
|
if (file_column + 1 == column_mapping->column_indexes_for_input_fields.size())
|
||||||
{
|
{
|
||||||
if (in.eof())
|
if (in.eof())
|
||||||
return false;
|
return false;
|
||||||
@ -358,9 +358,9 @@ void CSVRowInputFormat::syncAfterError()
|
|||||||
|
|
||||||
void CSVRowInputFormat::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
|
void CSVRowInputFormat::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
|
||||||
{
|
{
|
||||||
if (column_indexes_for_input_fields[file_column])
|
if (column_mapping->column_indexes_for_input_fields[file_column])
|
||||||
{
|
{
|
||||||
const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
|
const bool is_last_file_column = file_column + 1 == column_mapping->column_indexes_for_input_fields.size();
|
||||||
readField(column, type, is_last_file_column);
|
readField(column, type, is_last_file_column);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -406,8 +406,8 @@ bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bo
|
|||||||
void CSVRowInputFormat::resetParser()
|
void CSVRowInputFormat::resetParser()
|
||||||
{
|
{
|
||||||
RowInputFormatWithDiagnosticInfo::resetParser();
|
RowInputFormatWithDiagnosticInfo::resetParser();
|
||||||
column_indexes_for_input_fields.clear();
|
column_mapping->column_indexes_for_input_fields.clear();
|
||||||
read_columns.clear();
|
column_mapping->read_columns.clear();
|
||||||
have_always_default_columns = false;
|
have_always_default_columns = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,16 +38,6 @@ private:
|
|||||||
using IndexesMap = std::unordered_map<String, size_t>;
|
using IndexesMap = std::unordered_map<String, size_t>;
|
||||||
IndexesMap column_indexes_by_names;
|
IndexesMap column_indexes_by_names;
|
||||||
|
|
||||||
/// Maps indexes of columns in the input file to indexes of table columns
|
|
||||||
using OptionalIndexes = std::vector<std::optional<size_t>>;
|
|
||||||
OptionalIndexes column_indexes_for_input_fields;
|
|
||||||
|
|
||||||
/// Tracks which columns we have read in a single read() call.
|
|
||||||
/// For columns that are never read, it is initialized to false when we
|
|
||||||
/// read the file header, and never changed afterwards.
|
|
||||||
/// For other columns, it is updated on each read() call.
|
|
||||||
std::vector<UInt8> read_columns;
|
|
||||||
|
|
||||||
/// Whether we have any columns that are not read from file at all,
|
/// Whether we have any columns that are not read from file at all,
|
||||||
/// and must be always initialized with defaults.
|
/// and must be always initialized with defaults.
|
||||||
bool have_always_default_columns = false;
|
bool have_always_default_columns = false;
|
||||||
|
@ -89,6 +89,11 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr threa
|
|||||||
unit.chunk_ext.chunk.clear();
|
unit.chunk_ext.chunk.clear();
|
||||||
unit.chunk_ext.block_missing_values.clear();
|
unit.chunk_ext.block_missing_values.clear();
|
||||||
|
|
||||||
|
/// Propagate column_mapping to other parsers.
|
||||||
|
/// Note: column_mapping is used only for *WithNames types
|
||||||
|
if (current_ticket_number != 0)
|
||||||
|
input_format->setColumnMapping(column_mapping);
|
||||||
|
|
||||||
// We don't know how many blocks will be. So we have to read them all
|
// We don't know how many blocks will be. So we have to read them all
|
||||||
// until an empty block occurred.
|
// until an empty block occurred.
|
||||||
Chunk chunk;
|
Chunk chunk;
|
||||||
@ -100,6 +105,10 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr threa
|
|||||||
unit.chunk_ext.block_missing_values.emplace_back(parser.getMissingValues());
|
unit.chunk_ext.block_missing_values.emplace_back(parser.getMissingValues());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Extract column_mapping from first parser to propage it to others
|
||||||
|
if (current_ticket_number == 0)
|
||||||
|
column_mapping = input_format->getColumnMapping();
|
||||||
|
|
||||||
// We suppose we will get at least some blocks for a non-empty buffer,
|
// We suppose we will get at least some blocks for a non-empty buffer,
|
||||||
// except at the end of file. Also see a matching assert in readImpl().
|
// except at the end of file. Also see a matching assert in readImpl().
|
||||||
assert(unit.is_last || !unit.chunk_ext.chunk.empty() || parsing_finished);
|
assert(unit.is_last || !unit.chunk_ext.chunk.empty() || parsing_finished);
|
||||||
|
@ -253,6 +253,9 @@ private:
|
|||||||
{
|
{
|
||||||
parserThreadFunction(group, ticket_number);
|
parserThreadFunction(group, ticket_number);
|
||||||
});
|
});
|
||||||
|
/// We have to wait here to possibly extract ColumnMappingPtr from the first parser.
|
||||||
|
if (ticket_number == 0)
|
||||||
|
pool.wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
void finishAndWait()
|
void finishAndWait()
|
||||||
|
@ -62,19 +62,19 @@ TabSeparatedRowInputFormat::TabSeparatedRowInputFormat(const Block & header_, Re
|
|||||||
column_indexes_by_names.emplace(column_info.name, i);
|
column_indexes_by_names.emplace(column_info.name, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
column_indexes_for_input_fields.reserve(num_columns);
|
column_mapping->column_indexes_for_input_fields.reserve(num_columns);
|
||||||
read_columns.assign(num_columns, false);
|
column_mapping->read_columns.assign(num_columns, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void TabSeparatedRowInputFormat::setupAllColumnsByTableSchema()
|
void TabSeparatedRowInputFormat::setupAllColumnsByTableSchema()
|
||||||
{
|
{
|
||||||
const auto & header = getPort().getHeader();
|
const auto & header = getPort().getHeader();
|
||||||
read_columns.assign(header.columns(), true);
|
column_mapping->read_columns.assign(header.columns(), true);
|
||||||
column_indexes_for_input_fields.resize(header.columns());
|
column_mapping->column_indexes_for_input_fields.resize(header.columns());
|
||||||
|
|
||||||
for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
|
for (size_t i = 0; i < column_mapping->column_indexes_for_input_fields.size(); ++i)
|
||||||
column_indexes_for_input_fields[i] = i;
|
column_mapping->column_indexes_for_input_fields[i] = i;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -85,13 +85,13 @@ void TabSeparatedRowInputFormat::addInputColumn(const String & column_name)
|
|||||||
{
|
{
|
||||||
if (format_settings.skip_unknown_fields)
|
if (format_settings.skip_unknown_fields)
|
||||||
{
|
{
|
||||||
column_indexes_for_input_fields.push_back(std::nullopt);
|
column_mapping->column_indexes_for_input_fields.push_back(std::nullopt);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
throw Exception(
|
throw Exception(
|
||||||
"Unknown field found in TSV header: '" + column_name + "' " +
|
"Unknown field found in TSV header: '" + column_name + "' " +
|
||||||
"at position " + std::to_string(column_indexes_for_input_fields.size()) +
|
"at position " + std::to_string(column_mapping->column_indexes_for_input_fields.size()) +
|
||||||
"\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
|
"\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
|
||||||
ErrorCodes::INCORRECT_DATA
|
ErrorCodes::INCORRECT_DATA
|
||||||
);
|
);
|
||||||
@ -99,11 +99,11 @@ void TabSeparatedRowInputFormat::addInputColumn(const String & column_name)
|
|||||||
|
|
||||||
const auto column_index = column_it->second;
|
const auto column_index = column_it->second;
|
||||||
|
|
||||||
if (read_columns[column_index])
|
if (column_mapping->read_columns[column_index])
|
||||||
throw Exception("Duplicate field found while parsing TSV header: " + column_name, ErrorCodes::INCORRECT_DATA);
|
throw Exception("Duplicate field found while parsing TSV header: " + column_name, ErrorCodes::INCORRECT_DATA);
|
||||||
|
|
||||||
read_columns[column_index] = true;
|
column_mapping->read_columns[column_index] = true;
|
||||||
column_indexes_for_input_fields.emplace_back(column_index);
|
column_mapping->column_indexes_for_input_fields.emplace_back(column_index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -113,8 +113,8 @@ void TabSeparatedRowInputFormat::fillUnreadColumnsWithDefaults(MutableColumns &
|
|||||||
if (unlikely(row_num == 1))
|
if (unlikely(row_num == 1))
|
||||||
{
|
{
|
||||||
columns_to_fill_with_default_values.clear();
|
columns_to_fill_with_default_values.clear();
|
||||||
for (size_t index = 0; index < read_columns.size(); ++index)
|
for (size_t index = 0; index < column_mapping->read_columns.size(); ++index)
|
||||||
if (read_columns[index] == 0)
|
if (column_mapping->read_columns[index] == 0)
|
||||||
columns_to_fill_with_default_values.push_back(index);
|
columns_to_fill_with_default_values.push_back(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -167,7 +167,7 @@ void TabSeparatedRowInputFormat::readPrefix()
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
setupAllColumnsByTableSchema();
|
setupAllColumnsByTableSchema();
|
||||||
skipTSVRow(in, column_indexes_for_input_fields.size());
|
skipTSVRow(in, column_mapping->column_indexes_for_input_fields.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -175,7 +175,7 @@ void TabSeparatedRowInputFormat::readPrefix()
|
|||||||
|
|
||||||
if (with_types)
|
if (with_types)
|
||||||
{
|
{
|
||||||
skipTSVRow(in, column_indexes_for_input_fields.size());
|
skipTSVRow(in, column_mapping->column_indexes_for_input_fields.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -187,11 +187,11 @@ bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtens
|
|||||||
|
|
||||||
updateDiagnosticInfo();
|
updateDiagnosticInfo();
|
||||||
|
|
||||||
ext.read_columns.assign(read_columns.size(), true);
|
ext.read_columns.assign(column_mapping->read_columns.size(), true);
|
||||||
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
|
for (size_t file_column = 0; file_column < column_mapping->column_indexes_for_input_fields.size(); ++file_column)
|
||||||
{
|
{
|
||||||
const auto & column_index = column_indexes_for_input_fields[file_column];
|
const auto & column_index = column_mapping->column_indexes_for_input_fields[file_column];
|
||||||
const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
|
const bool is_last_file_column = file_column + 1 == column_mapping->column_indexes_for_input_fields.size();
|
||||||
if (column_index)
|
if (column_index)
|
||||||
{
|
{
|
||||||
const auto & type = data_types[*column_index];
|
const auto & type = data_types[*column_index];
|
||||||
@ -204,7 +204,7 @@ bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtens
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// skip separators
|
/// skip separators
|
||||||
if (file_column + 1 < column_indexes_for_input_fields.size())
|
if (file_column + 1 < column_mapping->column_indexes_for_input_fields.size())
|
||||||
{
|
{
|
||||||
assertChar('\t', in);
|
assertChar('\t', in);
|
||||||
}
|
}
|
||||||
@ -240,7 +240,7 @@ bool TabSeparatedRowInputFormat::readField(IColumn & column, const DataTypePtr &
|
|||||||
|
|
||||||
bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
|
bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
|
||||||
{
|
{
|
||||||
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
|
for (size_t file_column = 0; file_column < column_mapping->column_indexes_for_input_fields.size(); ++file_column)
|
||||||
{
|
{
|
||||||
if (file_column == 0 && in.eof())
|
if (file_column == 0 && in.eof())
|
||||||
{
|
{
|
||||||
@ -248,10 +248,10 @@ bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns &
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (column_indexes_for_input_fields[file_column].has_value())
|
if (column_mapping->column_indexes_for_input_fields[file_column].has_value())
|
||||||
{
|
{
|
||||||
const auto & header = getPort().getHeader();
|
const auto & header = getPort().getHeader();
|
||||||
size_t col_idx = column_indexes_for_input_fields[file_column].value();
|
size_t col_idx = column_mapping->column_indexes_for_input_fields[file_column].value();
|
||||||
if (!deserializeFieldAndPrintDiagnosticInfo(header.getByPosition(col_idx).name, data_types[col_idx], *columns[col_idx],
|
if (!deserializeFieldAndPrintDiagnosticInfo(header.getByPosition(col_idx).name, data_types[col_idx], *columns[col_idx],
|
||||||
out, file_column))
|
out, file_column))
|
||||||
return false;
|
return false;
|
||||||
@ -266,7 +266,7 @@ bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns &
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Delimiters
|
/// Delimiters
|
||||||
if (file_column + 1 == column_indexes_for_input_fields.size())
|
if (file_column + 1 == column_mapping->column_indexes_for_input_fields.size())
|
||||||
{
|
{
|
||||||
if (!in.eof())
|
if (!in.eof())
|
||||||
{
|
{
|
||||||
@ -332,7 +332,7 @@ bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns &
|
|||||||
|
|
||||||
void TabSeparatedRowInputFormat::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
|
void TabSeparatedRowInputFormat::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
|
||||||
{
|
{
|
||||||
if (column_indexes_for_input_fields[file_column])
|
if (column_mapping->column_indexes_for_input_fields[file_column])
|
||||||
{
|
{
|
||||||
// check null value for type is not nullable. don't cross buffer bound for simplicity, so maybe missing some case
|
// check null value for type is not nullable. don't cross buffer bound for simplicity, so maybe missing some case
|
||||||
if (!type->isNullable() && !in.eof())
|
if (!type->isNullable() && !in.eof())
|
||||||
@ -351,7 +351,7 @@ void TabSeparatedRowInputFormat::tryDeserializeField(const DataTypePtr & type, I
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
|
const bool is_last_file_column = file_column + 1 == column_mapping->column_indexes_for_input_fields.size();
|
||||||
readField(column, type, is_last_file_column);
|
readField(column, type, is_last_file_column);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -370,8 +370,8 @@ void TabSeparatedRowInputFormat::resetParser()
|
|||||||
{
|
{
|
||||||
RowInputFormatWithDiagnosticInfo::resetParser();
|
RowInputFormatWithDiagnosticInfo::resetParser();
|
||||||
const auto & sample = getPort().getHeader();
|
const auto & sample = getPort().getHeader();
|
||||||
read_columns.assign(sample.columns(), false);
|
column_mapping->read_columns.assign(sample.columns(), false);
|
||||||
column_indexes_for_input_fields.clear();
|
column_mapping->column_indexes_for_input_fields.clear();
|
||||||
columns_to_fill_with_default_values.clear();
|
columns_to_fill_with_default_values.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -465,7 +465,7 @@ static std::pair<bool, size_t> fileSegmentationEngineTabSeparatedImpl(ReadBuffer
|
|||||||
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)
|
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)
|
||||||
{
|
{
|
||||||
// We can use the same segmentation engine for TSKV.
|
// We can use the same segmentation engine for TSKV.
|
||||||
for (const std::string & name : {"TabSeparated", "TSV", "TSKV", "TabSeparatedWithNames", "TSVWithNames"})
|
for (const auto & name : {"TabSeparated", "TSV", "TSKV", "TabSeparatedWithNames", "TSVWithNames"})
|
||||||
{
|
{
|
||||||
factory.registerFileSegmentationEngine(name, &fileSegmentationEngineTabSeparatedImpl);
|
factory.registerFileSegmentationEngine(name, &fileSegmentationEngineTabSeparatedImpl);
|
||||||
}
|
}
|
||||||
|
@ -41,10 +41,6 @@ private:
|
|||||||
using IndexesMap = std::unordered_map<String, size_t>;
|
using IndexesMap = std::unordered_map<String, size_t>;
|
||||||
IndexesMap column_indexes_by_names;
|
IndexesMap column_indexes_by_names;
|
||||||
|
|
||||||
using OptionalIndexes = std::vector<std::optional<size_t>>;
|
|
||||||
OptionalIndexes column_indexes_for_input_fields;
|
|
||||||
|
|
||||||
std::vector<UInt8> read_columns;
|
|
||||||
std::vector<size_t> columns_to_fill_with_default_values;
|
std::vector<size_t> columns_to_fill_with_default_values;
|
||||||
|
|
||||||
void addInputColumn(const String & column_name);
|
void addInputColumn(const String & column_name);
|
||||||
|
@ -19,7 +19,7 @@ protected:
|
|||||||
virtual std::optional<Chunk> tryGenerate();
|
virtual std::optional<Chunk> tryGenerate();
|
||||||
|
|
||||||
public:
|
public:
|
||||||
ISource(Block header);
|
explicit ISource(Block header);
|
||||||
|
|
||||||
Status prepare() override;
|
Status prepare() override;
|
||||||
void work() override;
|
void work() override;
|
||||||
|
@ -9,7 +9,8 @@ $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parsing_with_names"
|
|||||||
|
|
||||||
for format in "${FORMATS[@]}"
|
for format in "${FORMATS[@]}"
|
||||||
do
|
do
|
||||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE parsing_with_names(a DateTime, b String, c FixedString(16)) ENGINE=Memory()"
|
# Columns are permuted
|
||||||
|
$CLICKHOUSE_CLIENT -q "CREATE TABLE parsing_with_names(c FixedString(16), a DateTime, b String) ENGINE=Memory()"
|
||||||
|
|
||||||
echo "$format, false";
|
echo "$format, false";
|
||||||
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=false -q \
|
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=false -q \
|
||||||
|
Loading…
Reference in New Issue
Block a user