Merge pull request #68437 from bigo-sg/devirtualize_schema_reader

Try to devirtualize format reader in RowInputFormatWithNamesAndTypes
This commit is contained in:
Kruglov Pavel 2024-08-26 11:43:54 +00:00 committed by GitHub
commit 72c3b0212d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 67 additions and 36 deletions

View File

@ -15,8 +15,8 @@ namespace ErrorCodes
}
template <bool with_defaults>
BinaryRowInputFormat<with_defaults>::BinaryRowInputFormat(ReadBuffer & in_, const Block & header, Params params_, bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: RowInputFormatWithNamesAndTypes(
BinaryRowInputFormat<with_defaults>::BinaryRowInputFormat(ReadBuffer & in_, const Block & header, IRowInputFormat::Params params_, bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: RowInputFormatWithNamesAndTypes<BinaryFormatReader<with_defaults>>(
header,
in_,
params_,

View File

@ -10,13 +10,16 @@ namespace DB
class ReadBuffer;
template <bool>
class BinaryFormatReader;
/** A stream for inputting data in a binary line-by-line format.
*/
template <bool with_defaults = false>
class BinaryRowInputFormat final : public RowInputFormatWithNamesAndTypes
class BinaryRowInputFormat final : public RowInputFormatWithNamesAndTypes<BinaryFormatReader<with_defaults>>
{
public:
BinaryRowInputFormat(ReadBuffer & in_, const Block & header, Params params_, bool with_names_, bool with_types_, const FormatSettings & format_settings_);
BinaryRowInputFormat(ReadBuffer & in_, const Block & header, IRowInputFormat::Params params_, bool with_names_, bool with_types_, const FormatSettings & format_settings_);
String getName() const override { return "BinaryRowInputFormat"; }

View File

@ -61,7 +61,7 @@ CSVRowInputFormat::CSVRowInputFormat(
bool with_names_,
bool with_types_,
const FormatSettings & format_settings_,
std::unique_ptr<FormatWithNamesAndTypesReader> format_reader_)
std::unique_ptr<CSVFormatReader> format_reader_)
: RowInputFormatWithNamesAndTypes(
header_,
*in_,

View File

@ -1,7 +1,6 @@
#pragma once
#include <optional>
#include <unordered_map>
#include <Core/Block.h>
#include <Processors/Formats/RowInputFormatWithNamesAndTypes.h>
@ -13,10 +12,12 @@
namespace DB
{
class CSVFormatReader;
/** A stream for inputting data in csv format.
* Does not conform with https://tools.ietf.org/html/rfc4180 because it skips spaces and tabs between values.
*/
class CSVRowInputFormat : public RowInputFormatWithNamesAndTypes
class CSVRowInputFormat : public RowInputFormatWithNamesAndTypes<CSVFormatReader>
{
public:
/** with_names - in the first line the header with column names
@ -32,7 +33,7 @@ public:
protected:
CSVRowInputFormat(const Block & header_, std::shared_ptr<PeekableReadBuffer> in_, const Params & params_,
bool with_names_, bool with_types_, const FormatSettings & format_settings_, std::unique_ptr<FormatWithNamesAndTypesReader> format_reader_);
bool with_names_, bool with_types_, const FormatSettings & format_settings_, std::unique_ptr<CSVFormatReader> format_reader_);
CSVRowInputFormat(const Block & header_, std::shared_ptr<PeekableReadBuffer> in_buf_, const Params & params_,
bool with_names_, bool with_types_, const FormatSettings & format_settings_);

View File

@ -9,7 +9,8 @@
namespace DB
{
class CustomSeparatedRowInputFormat final : public RowInputFormatWithNamesAndTypes
class CustomSeparatedFormatReader;
class CustomSeparatedRowInputFormat final : public RowInputFormatWithNamesAndTypes<CustomSeparatedFormatReader>
{
public:
CustomSeparatedRowInputFormat(

View File

@ -11,7 +11,7 @@ namespace DB
{
class ReadBuffer;
class JSONCompactEachRowFormatReader;
/** A stream for reading data in a bunch of formats:
* - JSONCompactEachRow
@ -20,7 +20,7 @@ class ReadBuffer;
* - JSONCompactStringsEachRowWithNamesAndTypes
*
*/
class JSONCompactEachRowRowInputFormat final : public RowInputFormatWithNamesAndTypes
class JSONCompactEachRowRowInputFormat final : public RowInputFormatWithNamesAndTypes<JSONCompactEachRowFormatReader>
{
public:
JSONCompactEachRowRowInputFormat(

View File

@ -14,7 +14,7 @@ namespace ErrorCodes
JSONCompactRowInputFormat::JSONCompactRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: RowInputFormatWithNamesAndTypes(
: RowInputFormatWithNamesAndTypes<JSONCompactFormatReader>(
header_, in_, params_, false, false, false, format_settings_, std::make_unique<JSONCompactFormatReader>(in_, format_settings_))
{
}

View File

@ -5,8 +5,8 @@
namespace DB
{
class JSONCompactRowInputFormat final : public RowInputFormatWithNamesAndTypes
class JSONCompactFormatReader;
class JSONCompactRowInputFormat final : public RowInputFormatWithNamesAndTypes<JSONCompactFormatReader>
{
public:
JSONCompactRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_);

View File

@ -10,9 +10,11 @@
namespace DB
{
class TabSeparatedFormatReader;
/** A stream to input data in tsv format.
*/
class TabSeparatedRowInputFormat final : public RowInputFormatWithNamesAndTypes
class TabSeparatedRowInputFormat final : public RowInputFormatWithNamesAndTypes<TabSeparatedFormatReader>
{
public:
/** with_names - the first line is the header with the names of the columns

View File

@ -1,14 +1,20 @@
#include <Processors/Formats/RowInputFormatWithNamesAndTypes.h>
#include <Processors/Formats/ISchemaReader.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/PeekableReadBuffer.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <Formats/EscapingRuleUtils.h>
#include <IO/Operators.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Processors/Formats/Impl/BinaryRowInputFormat.h>
#include <Processors/Formats/Impl/CSVRowInputFormat.h>
#include <Processors/Formats/Impl/CustomSeparatedRowInputFormat.h>
#include <Processors/Formats/Impl/HiveTextRowInputFormat.h>
#include <Processors/Formats/Impl/JSONCompactRowInputFormat.h>
#include <Processors/Formats/Impl/TabSeparatedRowInputFormat.h>
#include <Processors/Formats/RowInputFormatWithNamesAndTypes.h>
namespace DB
@ -44,7 +50,8 @@ namespace
}
}
RowInputFormatWithNamesAndTypes::RowInputFormatWithNamesAndTypes(
template <typename FormatReaderImpl>
RowInputFormatWithNamesAndTypes<FormatReaderImpl>::RowInputFormatWithNamesAndTypes(
const Block & header_,
ReadBuffer & in_,
const Params & params_,
@ -52,7 +59,7 @@ RowInputFormatWithNamesAndTypes::RowInputFormatWithNamesAndTypes(
bool with_names_,
bool with_types_,
const FormatSettings & format_settings_,
std::unique_ptr<FormatWithNamesAndTypesReader> format_reader_,
std::unique_ptr<FormatReaderImpl> format_reader_,
bool try_detect_header_)
: RowInputFormatWithDiagnosticInfo(header_, in_, params_)
, format_settings(format_settings_)
@ -66,7 +73,8 @@ RowInputFormatWithNamesAndTypes::RowInputFormatWithNamesAndTypes(
column_indexes_by_names = getPort().getHeader().getNamesToIndexesMap();
}
void RowInputFormatWithNamesAndTypes::readPrefix()
template <typename FormatReaderImpl>
void RowInputFormatWithNamesAndTypes<FormatReaderImpl>::readPrefix()
{
/// Search and remove BOM only in textual formats (CSV, TSV etc), not in binary ones (RowBinary*).
/// Also, we assume that column name or type cannot contain BOM, so, if format has header,
@ -138,7 +146,8 @@ void RowInputFormatWithNamesAndTypes::readPrefix()
}
}
void RowInputFormatWithNamesAndTypes::tryDetectHeader(std::vector<String> & column_names_out, std::vector<String> & type_names_out)
template <typename FormatReaderImpl>
void RowInputFormatWithNamesAndTypes<FormatReaderImpl>::tryDetectHeader(std::vector<String> & column_names_out, std::vector<String> & type_names_out)
{
auto & read_buf = getReadBuffer();
PeekableReadBuffer * peekable_buf = dynamic_cast<PeekableReadBuffer *>(&read_buf);
@ -201,7 +210,8 @@ void RowInputFormatWithNamesAndTypes::tryDetectHeader(std::vector<String> & colu
peekable_buf->dropCheckpoint();
}
bool RowInputFormatWithNamesAndTypes::readRow(MutableColumns & columns, RowReadExtension & ext)
template <typename FormatReaderImpl>
bool RowInputFormatWithNamesAndTypes<FormatReaderImpl>::readRow(MutableColumns & columns, RowReadExtension & ext)
{
if (unlikely(end_of_stream))
return false;
@ -280,7 +290,8 @@ bool RowInputFormatWithNamesAndTypes::readRow(MutableColumns & columns, RowReadE
return true;
}
size_t RowInputFormatWithNamesAndTypes::countRows(size_t max_block_size)
template <typename FormatReaderImpl>
size_t RowInputFormatWithNamesAndTypes<FormatReaderImpl>::countRows(size_t max_block_size)
{
if (unlikely(end_of_stream))
return 0;
@ -304,7 +315,8 @@ size_t RowInputFormatWithNamesAndTypes::countRows(size_t max_block_size)
return num_rows;
}
void RowInputFormatWithNamesAndTypes::resetParser()
template <typename FormatReaderImpl>
void RowInputFormatWithNamesAndTypes<FormatReaderImpl>::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
column_mapping->column_indexes_for_input_fields.clear();
@ -313,7 +325,8 @@ void RowInputFormatWithNamesAndTypes::resetParser()
end_of_stream = false;
}
void RowInputFormatWithNamesAndTypes::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
template <typename FormatReaderImpl>
void RowInputFormatWithNamesAndTypes<FormatReaderImpl>::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
{
const auto & index = column_mapping->column_indexes_for_input_fields[file_column];
if (index)
@ -328,7 +341,8 @@ void RowInputFormatWithNamesAndTypes::tryDeserializeField(const DataTypePtr & ty
}
}
bool RowInputFormatWithNamesAndTypes::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
template <typename FormatReaderImpl>
bool RowInputFormatWithNamesAndTypes<FormatReaderImpl>::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
{
if (in->eof())
{
@ -374,12 +388,14 @@ bool RowInputFormatWithNamesAndTypes::parseRowAndPrintDiagnosticInfo(MutableColu
return format_reader->parseRowEndWithDiagnosticInfo(out);
}
bool RowInputFormatWithNamesAndTypes::isGarbageAfterField(size_t index, ReadBuffer::Position pos)
template <typename FormatReaderImpl>
bool RowInputFormatWithNamesAndTypes<FormatReaderImpl>::isGarbageAfterField(size_t index, ReadBuffer::Position pos)
{
return format_reader->isGarbageAfterField(index, pos);
}
void RowInputFormatWithNamesAndTypes::setReadBuffer(ReadBuffer & in_)
template <typename FormatReaderImpl>
void RowInputFormatWithNamesAndTypes<FormatReaderImpl>::setReadBuffer(ReadBuffer & in_)
{
format_reader->setReadBuffer(in_);
IInputFormat::setReadBuffer(in_);
@ -582,5 +598,12 @@ void FormatWithNamesAndTypesSchemaReader::transformTypesIfNeeded(DB::DataTypePtr
transformInferredTypesIfNeeded(type, new_type, format_settings);
}
template class RowInputFormatWithNamesAndTypes<JSONCompactFormatReader>;
template class RowInputFormatWithNamesAndTypes<JSONCompactEachRowFormatReader>;
template class RowInputFormatWithNamesAndTypes<TabSeparatedFormatReader>;
template class RowInputFormatWithNamesAndTypes<CSVFormatReader>;
template class RowInputFormatWithNamesAndTypes<CustomSeparatedFormatReader>;
template class RowInputFormatWithNamesAndTypes<BinaryFormatReader<true>>;
template class RowInputFormatWithNamesAndTypes<BinaryFormatReader<false>>;
}

View File

@ -26,6 +26,7 @@ class FormatWithNamesAndTypesReader;
/// will be compared types from header.
/// It's important that firstly this class reads/skips names and only
/// then reads/skips types. So you can this invariant.
template <typename FormatReaderImpl>
class RowInputFormatWithNamesAndTypes : public RowInputFormatWithDiagnosticInfo
{
protected:
@ -41,7 +42,7 @@ protected:
bool with_names_,
bool with_types_,
const FormatSettings & format_settings_,
std::unique_ptr<FormatWithNamesAndTypesReader> format_reader_,
std::unique_ptr<FormatReaderImpl> format_reader_,
bool try_detect_header_ = false);
void resetParser() override;
@ -70,7 +71,7 @@ private:
bool is_header_detected = false;
protected:
std::unique_ptr<FormatWithNamesAndTypesReader> format_reader;
std::unique_ptr<FormatReaderImpl> format_reader;
Block::NameMap column_indexes_by_names;
};