Fix WithNamesAndTypes parallel parsing, add new tests, small refactoring

This commit is contained in:
avogar 2021-10-20 14:48:54 +03:00
parent 74fd33b62e
commit 7007286088
23 changed files with 306 additions and 118 deletions

View File

@ -591,6 +591,7 @@
M(621, CANNOT_NORMALIZE_STRING) \
M(622, CANNOT_PARSE_CAPN_PROTO_SCHEMA) \
M(623, CAPN_PROTO_BAD_CAST) \
M(624, CANNOT_SKIP_UNKNOWN_FIELD) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -67,8 +67,6 @@ public:
const Columns & columns,
size_t row)>;
private:
using InputCreatorFunc = InputFormatPtr(
ReadBuffer & buf,
const Block & header,
@ -83,6 +81,7 @@ private:
const RowOutputFormatParams & params,
const FormatSettings & settings)>;
private:
/// Some input formats can have non trivial readPrefix() and readSuffix(),
/// so in some cases there is no possibility to use parallel parsing.
/// The checker should return true if parallel parsing should be disabled.

View File

@ -14,7 +14,7 @@ namespace ErrorCodes
}
template <const char opening_bracket, const char closing_bracket>
static std::pair<bool, size_t> fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
static std::pair<bool, size_t> fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, size_t min_rows)
{
skipWhitespaceIfAny(in);
@ -23,7 +23,7 @@ static std::pair<bool, size_t> fileSegmentationEngineJSONEachRowImpl(ReadBuffer
bool quotes = false;
size_t number_of_rows = 0;
while (loadAtPosition(in, memory, pos) && (balance || memory.size() + static_cast<size_t>(pos - in.position()) < min_chunk_size))
while (loadAtPosition(in, memory, pos) && (balance || memory.size() + static_cast<size_t>(pos - in.position()) < min_chunk_size || number_of_rows < min_rows))
{
const auto current_object_size = memory.size() + static_cast<size_t>(pos - in.position());
if (current_object_size > 10 * min_chunk_size)
@ -94,12 +94,12 @@ static std::pair<bool, size_t> fileSegmentationEngineJSONEachRowImpl(ReadBuffer
std::pair<bool, size_t> fileSegmentationEngineJSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
return fileSegmentationEngineJSONEachRowImpl<'{', '}'>(in, memory, min_chunk_size);
return fileSegmentationEngineJSONEachRowImpl<'{', '}'>(in, memory, min_chunk_size, 1);
}
std::pair<bool, size_t> fileSegmentationEngineJSONCompactEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
std::pair<bool, size_t> fileSegmentationEngineJSONCompactEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, size_t min_rows)
{
return fileSegmentationEngineJSONEachRowImpl<'[', ']'>(in, memory, min_chunk_size);
return fileSegmentationEngineJSONEachRowImpl<'[', ']'>(in, memory, min_chunk_size, min_rows);
}
bool nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl(ReadBuffer & buf)

View File

@ -9,7 +9,7 @@ namespace DB
{
std::pair<bool, size_t> fileSegmentationEngineJSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size);
std::pair<bool, size_t> fileSegmentationEngineJSONCompactEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size);
std::pair<bool, size_t> fileSegmentationEngineJSONCompactEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, size_t min_rows);
bool nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl(ReadBuffer & buf);

View File

@ -0,0 +1,39 @@
#include <Formats/registerWithNamesAndTypes.h>
namespace DB
{
void registerInputFormatWithNamesAndTypes(FormatFactory & factory, const String & base_format_name, GetInputCreatorWithNamesAndTypesFunc get_input_creator)
{
factory.registerInputFormat(base_format_name, get_input_creator(false, false));
factory.registerInputFormat(base_format_name + "WithNames", get_input_creator(true, false));
factory.registerInputFormat(base_format_name + "WithNamesAndTypes", get_input_creator(true, true));
}
void registerOutputFormatWithNamesAndTypes(
FormatFactory & factory,
const String & base_format_name,
GetOutputCreatorWithNamesAndTypesFunc get_output_creator,
bool supports_parallel_formatting)
{
factory.registerOutputFormat(base_format_name, get_output_creator(false, false));
factory.registerOutputFormat(base_format_name + "WithNames", get_output_creator(true, false));
factory.registerOutputFormat(base_format_name + "WithNamesAndTypes", get_output_creator(true, true));
if (supports_parallel_formatting)
{
factory.markOutputFormatSupportsParallelFormatting(base_format_name);
factory.markOutputFormatSupportsParallelFormatting(base_format_name + "WithNames");
factory.markOutputFormatSupportsParallelFormatting(base_format_name + "WithNamesAndTypes");
}
}
void registerFileSegmentationEngineForFormatWithNamesAndTypes(
FormatFactory & factory, const String & base_format_name, GetFileSegmentationEngineWithNamesAndTypesFunc get_file_segmentation_engine)
{
factory.registerFileSegmentationEngine(base_format_name, get_file_segmentation_engine(1));
factory.registerFileSegmentationEngine(base_format_name + "WithNames", get_file_segmentation_engine(2));
factory.registerFileSegmentationEngine(base_format_name + "WithNamesAndTypes", get_file_segmentation_engine(3));
}
}

View File

@ -0,0 +1,23 @@
#pragma once
#include <Formats/FormatFactory.h>
namespace DB
{
using GetInputCreatorWithNamesAndTypesFunc = std::function<FormatFactory::InputCreator(bool with_names, bool with_types)>;
void registerInputFormatWithNamesAndTypes(
FormatFactory & factory, const String & base_format_name, GetInputCreatorWithNamesAndTypesFunc get_input_creator);
using GetOutputCreatorWithNamesAndTypesFunc = std::function<FormatFactory::OutputCreator(bool with_names, bool with_types)>;
void registerOutputFormatWithNamesAndTypes(
FormatFactory & factory,
const String & base_format_name,
GetOutputCreatorWithNamesAndTypesFunc get_output_creator,
bool supports_parallel_formatting = false);
using GetFileSegmentationEngineWithNamesAndTypesFunc = std::function<FormatFactory::FileSegmentationEngine(size_t min_rows)>;
void registerFileSegmentationEngineForFormatWithNamesAndTypes(
FormatFactory & factory, const String & base_format_name, GetFileSegmentationEngineWithNamesAndTypesFunc get_file_segmentation_engine);
}

View File

@ -113,11 +113,4 @@ void IRowOutputFormat::writeTotals(const DB::Columns & columns, size_t row_num)
write(columns, row_num);
}
void registerOutputFormatWithNamesAndTypes(const String & base_format_name, RegisterOutputFormatWithNamesAndTypes register_func)
{
register_func(base_format_name, false, false);
register_func(base_format_name + "WithNames", true, false);
register_func(base_format_name + "WithNamesAndTypes", true, true);
}
}

View File

@ -87,7 +87,4 @@ private:
};
using RegisterOutputFormatWithNamesAndTypes = std::function<void(const String & format_name, bool with_names, bool with_types)>;
void registerOutputFormatWithNamesAndTypes(const String & base_format_name, RegisterOutputFormatWithNamesAndTypes register_func);
}

View File

@ -2,6 +2,7 @@
#include <IO/ReadHelpers.h>
#include <Processors/Formats/Impl/BinaryRowInputFormat.h>
#include <Formats/FormatFactory.h>
#include <Formats/registerWithNamesAndTypes.h>
#include <DataTypes/DataTypeFactory.h>
@ -10,11 +11,11 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CANNOT_SKIP_UNKNOWN_FIELD;
}
BinaryRowInputFormat::BinaryRowInputFormat(ReadBuffer & in_, Block header, Params params_, bool with_names_and_types, const FormatSettings & format_settings_)
: RowInputFormatWithNamesAndTypes(std::move(header), in_, std::move(params_), with_names_and_types, with_names_and_types, format_settings_)
BinaryRowInputFormat::BinaryRowInputFormat(ReadBuffer & in_, Block header, Params params_, bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: RowInputFormatWithNamesAndTypes(std::move(header), in_, std::move(params_), with_names_, with_types_, format_settings_)
{
}
@ -71,30 +72,26 @@ void BinaryRowInputFormat::skipTypes()
void BinaryRowInputFormat::skipField(size_t file_column)
{
if (file_column >= read_data_types.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot skip field in Binary format, because it's type is unknown");
throw Exception(ErrorCodes::CANNOT_SKIP_UNKNOWN_FIELD, "Cannot skip unknown field in RowBinaryWithNames format, because it's type is unknown");
Field field;
read_data_types[file_column]->getDefaultSerialization()->deserializeBinary(field, *in);
}
void registerInputFormatRowBinary(FormatFactory & factory)
{
factory.registerInputFormat("RowBinary", [](
auto get_input_creator = [](bool with_names, bool with_types)
{
return [with_names, with_types](
ReadBuffer & buf,
const Block & sample,
const IRowInputFormat::Params & params,
const FormatSettings & settings)
{
return std::make_shared<BinaryRowInputFormat>(buf, sample, params, false, settings);
});
return std::make_shared<BinaryRowInputFormat>(buf, sample, params, with_names, with_types, settings);
};
};
factory.registerInputFormat("RowBinaryWithNamesAndTypes", [](
ReadBuffer & buf,
const Block & sample,
const IRowInputFormat::Params & params,
const FormatSettings & settings)
{
return std::make_shared<BinaryRowInputFormat>(buf, sample, params, true, settings);
});
registerInputFormatWithNamesAndTypes(factory, "RowBinary", get_input_creator);
}
}

View File

@ -16,7 +16,7 @@ class ReadBuffer;
class BinaryRowInputFormat : public RowInputFormatWithNamesAndTypes
{
public:
BinaryRowInputFormat(ReadBuffer & in_, Block header, Params params_, bool with_names_and_types, const FormatSettings & format_settings_);
BinaryRowInputFormat(ReadBuffer & in_, Block header, Params params_, bool with_names_, bool with_types_, const FormatSettings & format_settings_);
String getName() const override { return "BinaryRowInputFormat"; }

View File

@ -4,6 +4,7 @@
#include <DataTypes/IDataType.h>
#include <Processors/Formats/Impl/BinaryRowOutputFormat.h>
#include <Formats/FormatFactory.h>
#include <Formats/registerWithNamesAndTypes.h>
namespace DB
@ -49,23 +50,19 @@ void BinaryRowOutputFormat::writeField(const IColumn & column, const ISerializat
void registerOutputFormatRowBinary(FormatFactory & factory)
{
factory.registerOutputFormat("RowBinary", [](
auto get_output_creator = [&](bool with_names, bool with_types)
{
return [with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false, params);
});
return std::make_shared<BinaryRowOutputFormat>(buf, sample, with_names, with_types, params);
};
};
factory.registerOutputFormat("RowBinaryWithNamesAndTypes", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, true, true, params);
});
registerOutputFormatWithNamesAndTypes(factory, "RowBinary", get_output_creator);
}
}

View File

@ -3,8 +3,9 @@
#include <IO/Operators.h>
#include <Formats/verbosePrintString.h>
#include <Processors/Formats/Impl/CSVRowInputFormat.h>
#include <Formats/registerWithNamesAndTypes.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/Impl/CSVRowInputFormat.h>
#include <DataTypes/Serializations/SerializationNullable.h>
#include <DataTypes/DataTypeNothing.h>
@ -232,22 +233,22 @@ bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, co
void registerInputFormatCSV(FormatFactory & factory)
{
auto register_func = [&](const String & format_name, bool with_names, bool with_types)
auto get_input_creator = [](bool with_names, bool with_types)
{
factory.registerInputFormat(format_name, [with_names, with_types](
return [with_names, with_types](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<CSVRowInputFormat>(sample, buf, std::move(params), with_names, with_types, settings);
});
};
};
registerInputFormatWithNamesAndTypes("CSV", register_func);
registerInputFormatWithNamesAndTypes(factory, "CSV", get_input_creator);
}
static std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
static std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, size_t min_rows)
{
char * pos = in.position();
bool quotes = false;
@ -287,7 +288,7 @@ static std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB
else if (*pos == '\n')
{
++number_of_rows;
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size && number_of_rows >= min_rows)
need_more_data = false;
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\r')
@ -295,7 +296,7 @@ static std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB
}
else if (*pos == '\r')
{
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size && number_of_rows >= min_rows)
need_more_data = false;
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\n')
@ -313,7 +314,14 @@ static std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB
void registerFileSegmentationEngineCSV(FormatFactory & factory)
{
registerFileSegmentationEngineForFormatWithNamesAndTypes(factory, "CSV", &fileSegmentationEngineCSVImpl);
auto get_file_segmentation_engine = [](size_t min_rows)
{
return [min_rows](ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
return fileSegmentationEngineCSVImpl(in, memory, min_chunk_size, min_rows);
};
};
registerFileSegmentationEngineForFormatWithNamesAndTypes(factory, "CSV", get_file_segmentation_engine);
}
}

View File

@ -1,5 +1,6 @@
#include <Processors/Formats/Impl/CSVRowOutputFormat.h>
#include <Formats/FormatFactory.h>
#include <Formats/registerWithNamesAndTypes.h>
#include <IO/WriteHelpers.h>
@ -74,20 +75,19 @@ void CSVRowOutputFormat::writeBeforeExtremes()
void registerOutputFormatCSV(FormatFactory & factory)
{
auto register_func = [&](const String & format_name, bool with_names, bool with_types)
auto get_output_creator = [](bool with_names, bool with_types)
{
factory.registerOutputFormat(format_name, [=](
return [with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, with_types, params, format_settings);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
};
};
registerOutputFormatWithNamesAndTypes("CSV", register_func);
registerOutputFormatWithNamesAndTypes(factory, "CSV", get_output_creator, true);
}
}

View File

@ -5,6 +5,7 @@
#include <Formats/FormatFactory.h>
#include <Formats/verbosePrintString.h>
#include <Formats/JSONEachRowUtils.h>
#include <Formats/registerWithNamesAndTypes.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/Serializations/SerializationNullable.h>
@ -183,26 +184,34 @@ void registerInputFormatJSONCompactEachRow(FormatFactory & factory)
{
for (bool yield_strings : {true, false})
{
auto register_func = [&](const String & format_name, bool with_names, bool with_types)
auto get_input_creator = [yield_strings](bool with_names, bool with_types)
{
factory.registerInputFormat(format_name, [with_names, with_types, yield_strings](
return [with_names, with_types, yield_strings](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONCompactEachRowRowInputFormat>(sample, buf, std::move(params), with_names, with_types, yield_strings, settings);
});
};
};
registerInputFormatWithNamesAndTypes(yield_strings ? "JSONCompactStringsEachRow" : "JSONCompactEachRow", register_func);
registerInputFormatWithNamesAndTypes(factory, yield_strings ? "JSONCompactStringsEachRow" : "JSONCompactEachRow", get_input_creator);
}
}
void registerFileSegmentationEngineJSONCompactEachRow(FormatFactory & factory)
{
registerFileSegmentationEngineForFormatWithNamesAndTypes(factory, "JSONCompactEachRow", &fileSegmentationEngineJSONCompactEachRow);
registerFileSegmentationEngineForFormatWithNamesAndTypes(factory, "JSONCompactStringsEachRow", &fileSegmentationEngineJSONCompactEachRow);
auto get_file_segmentation_engine = [](size_t min_rows)
{
return [min_rows](ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
return fileSegmentationEngineJSONCompactEachRow(in, memory, min_chunk_size, min_rows);
};
};
registerFileSegmentationEngineForFormatWithNamesAndTypes(factory, "JSONCompactEachRow", get_file_segmentation_engine);
registerFileSegmentationEngineForFormatWithNamesAndTypes(factory, "JSONCompactStringsEachRow", get_file_segmentation_engine);
}
}

View File

@ -2,6 +2,7 @@
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h>
#include <Formats/FormatFactory.h>
#include <Formats/registerWithNamesAndTypes.h>
namespace DB
@ -101,20 +102,19 @@ void registerOutputFormatJSONCompactEachRow(FormatFactory & factory)
{
for (bool yield_strings : {false, true})
{
auto register_func = [&](const String & format_name, bool with_names, bool with_types)
auto get_output_creator = [yield_strings](bool with_names, bool with_types)
{
factory.registerOutputFormat(format_name, [=](
return [yield_strings, with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, with_names, with_types, yield_strings);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
};
};
registerOutputFormatWithNamesAndTypes(yield_strings ? "JSONCompactStringsEachRow" : "JSONCompactEachRow", register_func);
registerOutputFormatWithNamesAndTypes(factory, yield_strings ? "JSONCompactStringsEachRow" : "JSONCompactEachRow", get_output_creator, true);
}
}

View File

@ -5,6 +5,7 @@
#include <Processors/Formats/Impl/TabSeparatedRowInputFormat.h>
#include <Formats/verbosePrintString.h>
#include <Formats/FormatFactory.h>
#include <Formats/registerWithNamesAndTypes.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/Serializations/SerializationNullable.h>
@ -221,25 +222,24 @@ void registerInputFormatTabSeparated(FormatFactory & factory)
{
for (bool is_raw : {false, true})
{
auto register_func = [&](const String & format_name, bool with_names, bool with_types)
auto get_input_creator = [is_raw](bool with_names, bool with_types)
{
factory.registerInputFormat(format_name, [with_names, with_types, is_raw](
return [with_names, with_types, is_raw](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRowInputFormat>(sample, buf, std::move(params), with_names, with_types, is_raw, settings);
});
};
};
registerInputFormatWithNamesAndTypes(is_raw ? "TabSeparatedRaw" : "TabSeparated", register_func);
registerInputFormatWithNamesAndTypes(is_raw ? "TSVRaw" : "TSV", register_func);
registerInputFormatWithNamesAndTypes(factory, is_raw ? "TabSeparatedRaw" : "TabSeparated", get_input_creator);
registerInputFormatWithNamesAndTypes(factory, is_raw ? "TSVRaw" : "TSV", get_input_creator);
}
}
template <bool is_raw>
static std::pair<bool, size_t> fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
static std::pair<bool, size_t> fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, bool is_raw, size_t min_rows)
{
bool need_more_data = true;
char * pos = in.position();
@ -247,7 +247,7 @@ static std::pair<bool, size_t> fileSegmentationEngineTabSeparatedImpl(ReadBuffer
while (loadAtPosition(in, memory, pos) && need_more_data)
{
if constexpr (is_raw)
if (is_raw)
pos = find_first_symbols<'\r', '\n'>(pos, in.buffer().end());
else
pos = find_first_symbols<'\\', '\r', '\n'>(pos, in.buffer().end());
@ -269,7 +269,7 @@ static std::pair<bool, size_t> fileSegmentationEngineTabSeparatedImpl(ReadBuffer
if (*pos == '\n')
++number_of_rows;
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
if ((memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size) && number_of_rows >= min_rows)
need_more_data = false;
++pos;
}
@ -282,13 +282,28 @@ static std::pair<bool, size_t> fileSegmentationEngineTabSeparatedImpl(ReadBuffer
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)
{
registerFileSegmentationEngineForFormatWithNamesAndTypes(factory, "TSV", &fileSegmentationEngineTabSeparatedImpl<false>);
registerFileSegmentationEngineForFormatWithNamesAndTypes(factory, "TabSeparated", &fileSegmentationEngineTabSeparatedImpl<false>);
registerFileSegmentationEngineForFormatWithNamesAndTypes(factory, "TSVRaw", &fileSegmentationEngineTabSeparatedImpl<true>);
registerFileSegmentationEngineForFormatWithNamesAndTypes(factory, "TabSeparatedRaw", &fileSegmentationEngineTabSeparatedImpl<true>);
for (bool is_raw : {false, true})
{
auto get_file_segmentation_engine = [is_raw](size_t min_rows)
{
return [is_raw, min_rows](ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
return fileSegmentationEngineTabSeparatedImpl(in, memory, min_chunk_size, is_raw, min_rows);
};
};
registerFileSegmentationEngineForFormatWithNamesAndTypes(factory, is_raw ? "TSVRaw" : "TSV", get_file_segmentation_engine);
registerFileSegmentationEngineForFormatWithNamesAndTypes(factory, is_raw ? "TabSeparatedRaw" : "TabSeparated", get_file_segmentation_engine);
}
// We can use the same segmentation engine for TSKV.
factory.registerFileSegmentationEngine("TSKV", &fileSegmentationEngineTabSeparatedImpl<false>);
factory.registerFileSegmentationEngine("TSKV", [](
ReadBuffer & in,
DB::Memory<> & memory,
size_t min_chunk_size)
{
return fileSegmentationEngineTabSeparatedImpl(in, memory, min_chunk_size, false, 0);
});
}
}

View File

@ -1,5 +1,6 @@
#include <Processors/Formats/Impl/TabSeparatedRowOutputFormat.h>
#include <Formats/FormatFactory.h>
#include <Formats/registerWithNamesAndTypes.h>
#include <IO/WriteHelpers.h>
@ -78,21 +79,20 @@ void registerOutputFormatTabSeparated(FormatFactory & factory)
{
for (bool is_raw : {false, true})
{
auto register_func = [&](const String & format_name, bool with_names, bool with_types)
auto get_output_creator = [is_raw](bool with_names, bool with_types)
{
factory.registerOutputFormat(format_name, [=](
return [is_raw, with_names, with_types](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, with_names, with_types, is_raw, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
};
};
registerOutputFormatWithNamesAndTypes(is_raw ? "TSVRaw" : "TSV", register_func);
registerOutputFormatWithNamesAndTypes(is_raw ? "TabSeparatedRaw" : "TabSeparated", register_func);
registerOutputFormatWithNamesAndTypes(factory, is_raw ? "TSVRaw" : "TSV", get_output_creator, true);
registerOutputFormatWithNamesAndTypes(factory, is_raw ? "TabSeparatedRaw" : "TabSeparated", get_output_creator, true);
}
}

View File

@ -247,12 +247,6 @@ bool RowInputFormatWithNamesAndTypes::parseRowAndPrintDiagnosticInfo(MutableColu
return parseRowEndWithDiagnosticInfo(out);
}
void registerInputFormatWithNamesAndTypes(const String & base_format_name, RegisterFormatWithNamesAndTypesFunc register_func)
{
register_func(base_format_name, false, false);
register_func(base_format_name + "WithNames", true, false);
register_func(base_format_name + "WithNamesAndTypes", true, true);
}
void registerFileSegmentationEngineForFormatWithNamesAndTypes(
FormatFactory & factory, const String & base_format_name, FormatFactory::FileSegmentationEngine segmentation_engine)

View File

@ -63,13 +63,6 @@ private:
std::unordered_map<String, size_t> column_indexes_by_names;
};
using RegisterFormatWithNamesAndTypesFunc = std::function<void(
const String & format_name,
bool with_names,
bool with_types)>;
void registerInputFormatWithNamesAndTypes(const String & base_format_name, RegisterFormatWithNamesAndTypesFunc register_func);
void registerFileSegmentationEngineForFormatWithNamesAndTypes(
FormatFactory & factory, const String & base_format_name, FormatFactory::FileSegmentationEngine segmentation_engine);

View File

@ -1,8 +1,14 @@
1 text 2020-01-01
1 text 2020-01-01
1 text 2020-01-01
1 text 2020-01-01
1 text 2020-01-01
1 text 2020-01-01
1 default 1970-01-01
1 default 1970-01-01
1 1970-01-01
1 1970-01-01
OK
1 default 1970-01-01
OK
OK

View File

@ -9,43 +9,60 @@ $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_02102"
$CLICKHOUSE_CLIENT -q "CREATE TABLE test_02102 (x UInt32, y String DEFAULT 'default', z Date) engine=Memory()"
$CLICKHOUSE_CLIENT -q "SELECT toUInt32(1) AS x, 'text' AS y, toDate('2020-01-01') AS z FORMAT RowBinaryWithNamesAndTypes" | $CLICKHOUSE_CLIENT --input_format_with_names_use_header=1 --input_format_with_types_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNamesAndTypes"
$CLICKHOUSE_CLIENT -q "SELECT toUInt32(1) AS x, 'text' AS y, toDate('2020-01-01') AS z FORMAT RowBinaryWithNames" | $CLICKHOUSE_CLIENT --input_format_with_names_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNames"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02102"
$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE test_02102"
$CLICKHOUSE_CLIENT -q "SELECT toUInt32(1) AS x, 'text' AS y, toDate('2020-01-01') AS z FORMAT RowBinaryWithNamesAndTypes" | $CLICKHOUSE_CLIENT --input_format_with_names_use_header=1 --input_format_with_types_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNamesAndTypes"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02102"
$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE test_02102"
$CLICKHOUSE_CLIENT -q "SELECT toUInt32(1) AS x, 'text' AS y, toDate('2020-01-01') AS z FORMAT RowBinaryWithNames" | $CLICKHOUSE_CLIENT --input_format_with_names_use_header=0 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNames"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02102"
$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE test_02102"
$CLICKHOUSE_CLIENT -q "SELECT toUInt32(1) AS x, 'text' AS y, toDate('2020-01-01') AS z FORMAT RowBinaryWithNamesAndTypes" | $CLICKHOUSE_CLIENT --input_format_with_names_use_header=0 --input_format_with_types_use_header=0 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNamesAndTypes"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02102"
$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE test_02102"
$CLICKHOUSE_CLIENT -q "SELECT 'text' AS y, toDate('2020-01-01') AS z, toUInt32(1) AS x FORMAT RowBinaryWithNames" | $CLICKHOUSE_CLIENT --input_format_with_names_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNames"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02102"
$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE test_02102"
$CLICKHOUSE_CLIENT -q "SELECT 'text' AS y, toDate('2020-01-01') AS z, toUInt32(1) AS x FORMAT RowBinaryWithNamesAndTypes" | $CLICKHOUSE_CLIENT --input_format_with_names_use_header=1 --input_format_with_types_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNamesAndTypes"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02102"
$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE test_02102"
$CLICKHOUSE_CLIENT -q "SELECT toUInt32(1) AS x FORMAT RowBinaryWithNamesAndTypes" | $CLICKHOUSE_CLIENT --input_format_with_names_use_header=1 --input_format_with_types_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNamesAndTypes"
$CLICKHOUSE_CLIENT -q "SELECT toUInt32(1) AS x FORMAT RowBinaryWithNames" | $CLICKHOUSE_CLIENT --input_format_with_names_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNames"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02102"
$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE test_02102"
$CLICKHOUSE_CLIENT -q "SELECT toUInt32(1) AS x FORMAT RowBinaryWithNamesAndTypes" | $CLICKHOUSE_CLIENT --input_format_with_names_use_header=1 --input_format_with_types_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNamesAndTypes"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02102"
$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE test_02102"
$CLICKHOUSE_CLIENT -q "SELECT toUInt32(1) AS x FORMAT RowBinaryWithNames" | $CLICKHOUSE_CLIENT --input_format_defaults_for_omitted_fields=0 --input_format_with_names_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNames"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02102"
$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE test_02102"
$CLICKHOUSE_CLIENT -q "SELECT toUInt32(1) AS x FORMAT RowBinaryWithNamesAndTypes" | $CLICKHOUSE_CLIENT --input_format_defaults_for_omitted_fields=0 --input_format_with_names_use_header=1 --input_format_with_types_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNamesAndTypes"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02102"
$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE test_02102"
$CLICKHOUSE_CLIENT -q "SELECT toUInt32(1) AS x, [[1, 2, 3], [4, 5], []] as a FORMAT RowBinaryWithNames" | $CLICKHOUSE_CLIENT --input_format_skip_unknown_fields=1 --input_format_with_names_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNames" 2>&1 | grep -F -q "CANNOT_SKIP_UNKNOWN_FIELD" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "SELECT toUInt32(1) AS x, [[1, 2, 3], [4, 5], []] as a FORMAT RowBinaryWithNamesAndTypes" | $CLICKHOUSE_CLIENT --input_format_skip_unknown_fields=1 --input_format_with_names_use_header=1 --input_format_with_types_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNamesAndTypes"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02102"
$CLICKHOUSE_CLIENT -q "TRUNCATE TABLE test_02102"
$CLICKHOUSE_CLIENT -q "SELECT 'text' AS x, toDate('2020-01-01') AS y, toUInt32(1) AS z FORMAT RowBinaryWithNamesAndTypes" | $CLICKHOUSE_CLIENT --input_format_with_names_use_header=1 --input_format_with_types_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNamesAndTypes" 2>&1 | grep -F -q "INCORRECT_DATA" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "SELECT toUInt32(1) AS x, 'text' as z, toDate('2020-01-01') AS y FORMAT RowBinaryWithNamesAndTypes" | $CLICKHOUSE_CLIENT --input_format_with_names_use_header=1 --input_format_with_types_use_header=1 -q "INSERT INTO test_02102 FORMAT RowBinaryWithNamesAndTypes" 2>&1 | grep -F -q "INCORRECT_DATA" && echo 'OK' || echo 'FAIL'

View File

@ -0,0 +1,80 @@
0 [0,1,2,3,4,5,6,7,8,9] 0
1 [0,1,2,3,4,5,6,7,8,9,10] 1
2 [0,1,2,3,4,5,6,7,8,9,10,11] 2
3 [0,1,2,3,4,5,6,7,8,9,10,11,12] 3
4 [0,1,2,3,4,5,6,7,8,9,10,11,12,13] 4
5 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14] 5
6 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] 6
7 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] 7
8 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17] 8
9 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18] 9
0 [0,1,2,3,4,5,6,7,8,9] 0
1 [0,1,2,3,4,5,6,7,8,9,10] 1
2 [0,1,2,3,4,5,6,7,8,9,10,11] 2
3 [0,1,2,3,4,5,6,7,8,9,10,11,12] 3
4 [0,1,2,3,4,5,6,7,8,9,10,11,12,13] 4
5 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14] 5
6 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] 6
7 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] 7
8 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17] 8
9 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18] 9
0 [0,1,2,3,4,5,6,7,8,9] 0
1 [0,1,2,3,4,5,6,7,8,9,10] 1
2 [0,1,2,3,4,5,6,7,8,9,10,11] 2
3 [0,1,2,3,4,5,6,7,8,9,10,11,12] 3
4 [0,1,2,3,4,5,6,7,8,9,10,11,12,13] 4
5 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14] 5
6 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] 6
7 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] 7
8 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17] 8
9 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18] 9
0 [0,1,2,3,4,5,6,7,8,9] 0
1 [0,1,2,3,4,5,6,7,8,9,10] 1
2 [0,1,2,3,4,5,6,7,8,9,10,11] 2
3 [0,1,2,3,4,5,6,7,8,9,10,11,12] 3
4 [0,1,2,3,4,5,6,7,8,9,10,11,12,13] 4
5 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14] 5
6 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] 6
7 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] 7
8 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17] 8
9 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18] 9
0 [0,1,2,3,4,5,6,7,8,9] 0
1 [0,1,2,3,4,5,6,7,8,9,10] 1
2 [0,1,2,3,4,5,6,7,8,9,10,11] 2
3 [0,1,2,3,4,5,6,7,8,9,10,11,12] 3
4 [0,1,2,3,4,5,6,7,8,9,10,11,12,13] 4
5 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14] 5
6 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] 6
7 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] 7
8 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17] 8
9 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18] 9
0 [0,1,2,3,4,5,6,7,8,9] 0
1 [0,1,2,3,4,5,6,7,8,9,10] 1
2 [0,1,2,3,4,5,6,7,8,9,10,11] 2
3 [0,1,2,3,4,5,6,7,8,9,10,11,12] 3
4 [0,1,2,3,4,5,6,7,8,9,10,11,12,13] 4
5 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14] 5
6 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] 6
7 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] 7
8 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17] 8
9 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18] 9
0 [0,1,2,3,4,5,6,7,8,9] 0
1 [0,1,2,3,4,5,6,7,8,9,10] 1
2 [0,1,2,3,4,5,6,7,8,9,10,11] 2
3 [0,1,2,3,4,5,6,7,8,9,10,11,12] 3
4 [0,1,2,3,4,5,6,7,8,9,10,11,12,13] 4
5 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14] 5
6 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] 6
7 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] 7
8 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17] 8
9 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18] 9
0 [0,1,2,3,4,5,6,7,8,9] 0
1 [0,1,2,3,4,5,6,7,8,9,10] 1
2 [0,1,2,3,4,5,6,7,8,9,10,11] 2
3 [0,1,2,3,4,5,6,7,8,9,10,11,12] 3
4 [0,1,2,3,4,5,6,7,8,9,10,11,12,13] 4
5 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14] 5
6 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] 6
7 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] 7
8 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17] 8
9 [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18] 9

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
USER_FILES_PATH=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
DATA_FILE=$USER_FILES_PATH/test_02103.data
FORMATS=('TSVWithNames' 'TSVWithNamesAndTypes' 'TSVRawWithNames' 'TSVRawWithNamesAndTypes' 'CSVWithNames' 'CSVWithNamesAndTypes' 'JSONCompactEachRowWithNames' 'JSONCompactEachRowWithNamesAndTypes')
for format in "${FORMATS[@]}"
do
$CLICKHOUSE_CLIENT -q "SELECT number, range(number + 10) AS array, toString(number) AS string FROM numbers(10) FORMAT $format" > $DATA_FILE
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('test_02103.data', '$format', 'number UInt64, array Array(UInt64), string String') SETTINGS input_format_parallel_parsing=1, min_chunk_bytes_for_parallel_parsing=40"
done
rm $DATA_FILE