Merge pull request #14522 from ClickHouse/formats-dont-skip-bom-in-constructor

More consistent invocation of skipBOMIfExists
This commit is contained in:
alexey-milovidov 2020-09-07 21:38:03 +03:00 committed by GitHub
commit 4bd5524da9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 24 additions and 11 deletions

View File

@ -101,8 +101,8 @@ BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
namespace namespace
{ {
/** A stream, that also runs and waits for background thread /** A stream, that also runs and waits for background thread
* (that will feed data into pipe to be read from the other side of the pipe). * (that will feed data into pipe to be read from the other side of the pipe).
*/ */
class BlockInputStreamWithBackgroundThread final : public IBlockInputStream class BlockInputStreamWithBackgroundThread final : public IBlockInputStream
{ {
public: public:

View File

@ -32,9 +32,6 @@ JSONEachRowRowInputFormat::JSONEachRowRowInputFormat(
ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_) ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), name_map(header_.columns()) : IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), name_map(header_.columns())
{ {
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(in);
size_t num_columns = getPort().getHeader().columns(); size_t num_columns = getPort().getHeader().columns();
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
{ {
@ -285,6 +282,9 @@ void JSONEachRowRowInputFormat::resetParser()
void JSONEachRowRowInputFormat::readPrefix() void JSONEachRowRowInputFormat::readPrefix()
{ {
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(in);
skipWhitespaceIfAny(in); skipWhitespaceIfAny(in);
if (!in.eof() && *in.position() == '[') if (!in.eof() && *in.position() == '[')
{ {

View File

@ -19,10 +19,6 @@ namespace ErrorCodes
TSKVRowInputFormat::TSKVRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const FormatSettings & format_settings_) TSKVRowInputFormat::TSKVRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(std::move(header_), in_, std::move(params_)), format_settings(format_settings_), name_map(header_.columns()) : IRowInputFormat(std::move(header_), in_, std::move(params_)), format_settings(format_settings_), name_map(header_.columns())
{ {
/// In this format, we assume that column name cannot contain BOM,
/// so BOM at beginning of stream cannot be confused with name of field, and it is safe to skip it.
skipBOMIfExists(in);
const auto & sample_block = getPort().getHeader(); const auto & sample_block = getPort().getHeader();
size_t num_columns = sample_block.columns(); size_t num_columns = sample_block.columns();
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
@ -30,6 +26,14 @@ TSKVRowInputFormat::TSKVRowInputFormat(ReadBuffer & in_, Block header_, Params p
} }
void TSKVRowInputFormat::readPrefix()
{
/// In this format, we assume that column name cannot contain BOM,
/// so BOM at beginning of stream cannot be confused with name of field, and it is safe to skip it.
skipBOMIfExists(in);
}
/** Read the field name in the `tskv` format. /** Read the field name in the `tskv` format.
* Return true if the field is followed by an equal sign, * Return true if the field is followed by an equal sign,
* otherwise (field with no value) return false. * otherwise (field with no value) return false.

View File

@ -27,6 +27,7 @@ public:
String getName() const override { return "TSKVRowInputFormat"; } String getName() const override { return "TSKVRowInputFormat"; }
void readPrefix() override;
bool readRow(MutableColumns & columns, RowReadExtension &) override; bool readRow(MutableColumns & columns, RowReadExtension &) override;
bool allowSyncAfterError() const override { return true; } bool allowSyncAfterError() const override { return true; }
void syncAfterError() override; void syncAfterError() override;

View File

@ -35,12 +35,13 @@ ValuesBlockInputFormat::ValuesBlockInputFormat(ReadBuffer & in_, const Block & h
attempts_to_deduce_template(num_columns), attempts_to_deduce_template_cached(num_columns), attempts_to_deduce_template(num_columns), attempts_to_deduce_template_cached(num_columns),
rows_parsed_using_template(num_columns), templates(num_columns), types(header_.getDataTypes()) rows_parsed_using_template(num_columns), templates(num_columns), types(header_.getDataTypes())
{ {
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(buf);
} }
Chunk ValuesBlockInputFormat::generate() Chunk ValuesBlockInputFormat::generate()
{ {
if (total_rows == 0)
readPrefix();
const Block & header = getPort().getHeader(); const Block & header = getPort().getHeader();
MutableColumns columns = header.cloneEmptyColumns(); MutableColumns columns = header.cloneEmptyColumns();
block_missing_values.clear(); block_missing_values.clear();
@ -405,6 +406,12 @@ bool ValuesBlockInputFormat::shouldDeduceNewTemplate(size_t column_idx)
return false; return false;
} }
void ValuesBlockInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(buf);
}
void ValuesBlockInputFormat::readSuffix() void ValuesBlockInputFormat::readSuffix()
{ {
if (buf.hasUnreadData()) if (buf.hasUnreadData())

View File

@ -63,6 +63,7 @@ private:
bool shouldDeduceNewTemplate(size_t column_idx); bool shouldDeduceNewTemplate(size_t column_idx);
void readPrefix();
void readSuffix(); void readSuffix();
bool skipToNextRow(size_t min_chunk_bytes = 0, int balance = 0); bool skipToNextRow(size_t min_chunk_bytes = 0, int balance = 0);