This commit is contained in:
Nikita Mikhailov 2021-02-24 20:04:37 +03:00 committed by Nikita Mikhaylov
parent a2bedd592e
commit 138dedf2df
6 changed files with 50 additions and 3 deletions

View File

@ -53,6 +53,7 @@ public:
void resetParser() override;
protected:
friend class ParallelParsingInputFormat;
/** Read next row and append it to the columns.
* If no more rows - return false.
*/

View File

@ -8,6 +8,8 @@
#include <DataTypes/DataTypeNothing.h>
#include <iostream>
namespace DB
{
@ -155,7 +157,7 @@ void CSVRowInputFormat::readPrefix()
size_t num_columns = data_types.size();
const auto & header = getPort().getHeader();
if (with_names)
if (with_names && getCurrentUnitNumber() == 0)
{
/// This CSV file has a header row with column names. Depending on the
/// settings, use it or skip it.
@ -492,6 +494,7 @@ static std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB
void registerFileSegmentationEngineCSV(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("CSV", &fileSegmentationEngineCSVImpl);
factory.registerFileSegmentationEngine("CSVWithNames", &fileSegmentationEngineCSVImpl);
}
}

View File

@ -10,6 +10,7 @@
#include <IO/ReadBuffer.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Interpreters/Context.h>
#include <common/logger_useful.h>
namespace DB
{
@ -97,6 +98,8 @@ public:
segmentator_thread = ThreadFromGlobalPool(
&ParallelParsingInputFormat::segmentatorThreadFunction, this, CurrentThread::getGroup());
LOG_DEBUG(&Poco::Logger::get("ParallelParsingInputFormat"), "Parallel parsing is used");
}
~ParallelParsingInputFormat() override

View File

@ -136,7 +136,7 @@ void TabSeparatedRowInputFormat::readPrefix()
skipBOMIfExists(in);
}
if (with_names)
if (with_names && getCurrentUnitNumber() == 0)
{
if (format_settings.with_names_use_header)
{
@ -463,9 +463,10 @@ static std::pair<bool, size_t> fileSegmentationEngineTabSeparatedImpl(ReadBuffer
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)
{
// We can use the same segmentation engine for TSKV.
for (const auto * name : {"TabSeparated", "TSV", "TSKV"})
for (const std::string & name : {"TabSeparated", "TSV", "TSKV"})
{
factory.registerFileSegmentationEngine(name, &fileSegmentationEngineTabSeparatedImpl);
factory.registerFileSegmentationEngine(name + "WithNames", &fileSegmentationEngineTabSeparatedImpl);
}
}

View File

@ -0,0 +1,8 @@
TSVWithNames, false
50000
TSVWithNames, true
50000
CSVWithNames, false
50000
CSVWithNames, true
50000

View File

@ -0,0 +1,31 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
FORMATS=('TSVWithNames' 'CSVWithNames')
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parsing_with_names"
for format in "${FORMATS[@]}"
do
$CLICKHOUSE_CLIENT -q "CREATE TABLE parsing_with_names(a DateTime, b String, c FixedString(16)) ENGINE=Memory()"
echo "$format, false";
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=false -q \
"SELECT ClientEventTime as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits LIMIT 50000 Format $format" | \
$CLICKHOUSE_CLIENT --input_format_parallel_parsing=false -q "INSERT INTO parsing_with_names FORMAT $format"
$CLICKHOUSE_CLIENT -q "SELECT count() FROM parsing_with_names;"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parsing_with_names"
$CLICKHOUSE_CLIENT -q "CREATE TABLE parsing_with_names(a DateTime, b String, c FixedString(16)) ENGINE=Memory()"
echo "$format, true";
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=false -q \
"SELECT ClientEventTime as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits LIMIT 50000 Format $format" | \
$CLICKHOUSE_CLIENT --input_format_parallel_parsing=true -q "INSERT INTO parsing_with_names FORMAT $format"
$CLICKHOUSE_CLIENT -q "SELECT count() FROM parsing_with_names;"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parsing_with_names"
done