This commit is contained in:
Nikita Mikhailov 2020-12-28 19:52:54 +03:00
parent 8667842390
commit 2dde73f700
7 changed files with 23 additions and 15 deletions

View File

@ -1364,9 +1364,6 @@ private:
if (with_output && with_output->settings_ast) if (with_output && with_output->settings_ast)
apply_query_settings(*with_output->settings_ast); apply_query_settings(*with_output->settings_ast);
// if (context.getSettingsRef().output_format_parallel_formatting)
// need_render_progress = false;
connection->forceConnected(connection_parameters.timeouts); connection->forceConnected(connection_parameters.timeouts);
ASTPtr input_function; ASTPtr input_function;
@ -1939,7 +1936,7 @@ private:
if (!is_interactive && !need_render_progress) if (!is_interactive && !need_render_progress)
block_out_stream = context.getOutputFormatParallelIfPossible(current_format, *out_buf, block); block_out_stream = context.getOutputFormatParallelIfPossible(current_format, *out_buf, block);
if (!block_out_stream) if (!block_out_stream)
block_out_stream = context.getOutputFormat(current_format, *out_buf, block); block_out_stream = context.getOutputFormat(current_format, *out_buf, block);

View File

@ -224,7 +224,7 @@ BlockOutputStreamPtr FormatFactory::getOutputParallelIfPossible(const String & n
/** TODO: Materialization is needed, because formats can use the functions `IDataType`, /** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns. * which only work with full columns.
*/ */
auto formatter_creator = [output_getter, sample, callback, format_settings] auto formatter_creator = [output_getter, sample, callback, format_settings]
(WriteBuffer & output) -> OutputFormatPtr (WriteBuffer & output) -> OutputFormatPtr
{ return output_getter(output, sample, {std::move(callback)}, format_settings);}; { return output_getter(output, sample, {std::move(callback)}, format_settings);};
@ -244,7 +244,6 @@ BlockOutputStreamPtr FormatFactory::getOutputParallelIfPossible(const String & n
} }
BlockOutputStreamPtr FormatFactory::getOutput(const String & name, BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
WriteBuffer & buf, const Block & sample, const Context & context, WriteBuffer & buf, const Block & sample, const Context & context,
WriteCallback callback, const std::optional<FormatSettings> & _format_settings) const WriteCallback callback, const std::optional<FormatSettings> & _format_settings) const
@ -256,7 +255,7 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
{ {
throw Exception("Format " + name + " is not suitable for output (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT); throw Exception("Format " + name + " is not suitable for output (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
} }
auto format = getOutputFormat(name, buf, sample, context, std::move(callback), format_settings); auto format = getOutputFormat(name, buf, sample, context, std::move(callback), format_settings);
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample); return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
} }

View File

@ -246,7 +246,7 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi
/// Semicolon is added for convenience as it could be used at end of INSERT query. /// Semicolon is added for convenience as it could be used at end of INSERT query.
bool is_first_row = getTotalRows() == 1; bool is_first_row = getTotalRows() == 1;
if (!in.eof()) if (!in.eof())
{ {
/// There may be optional ',' (but not before the first row) /// There may be optional ',' (but not before the first row)
if (!is_first_row && *in.position() == ',') if (!is_first_row && *in.position() == ',')

View File

@ -1,3 +1,4 @@
#pragma once
#include <DataStreams/NativeBlockInputStream.h> #include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h> #include <DataStreams/NativeBlockOutputStream.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>

View File

@ -52,7 +52,7 @@ public:
~ParallelFormattingOutputFormat() override ~ParallelFormattingOutputFormat() override
{ {
need_flush = true; need_flush = true;
if (!IOutputFormat::finalized) if (!IOutputFormat::finalized)
finalize(); finalize();
finishAndWait(); finishAndWait();
} }
@ -64,12 +64,12 @@ public:
need_flush = true; need_flush = true;
} }
void doWritePrefix() override void doWritePrefix() override
{ {
addChunk(Chunk{}, ProcessingUnitType::START); addChunk(Chunk{}, ProcessingUnitType::START);
} }
void onCancel() override void onCancel() override
{ {
finishAndWait(); finishAndWait();
} }
@ -260,7 +260,7 @@ private:
IOutputFormat::flush(); IOutputFormat::flush();
++collector_unit_number; ++collector_unit_number;
{ {
/// Notify other threads. /// Notify other threads.
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
@ -292,13 +292,18 @@ private:
auto & unit = processing_units[current_unit_number]; auto & unit = processing_units[current_unit_number];
assert(unit.status = READY_TO_FORMAT); assert(unit.status = READY_TO_FORMAT);
/// We want to preallocate memory buffer (increase capacity)
/// and put the pointer at the beginning of the buffer
/// FIXME: Implement reserve() method in Memory.
unit.segment.resize(DBMS_DEFAULT_BUFFER_SIZE); unit.segment.resize(DBMS_DEFAULT_BUFFER_SIZE);
unit.segment.resize(0);
unit.actual_memory_size = 0; unit.actual_memory_size = 0;
BufferWithOutsideMemory<WriteBuffer> out_buffer(unit.segment); BufferWithOutsideMemory<WriteBuffer> out_buffer(unit.segment);
auto formatter = internal_formatter_creator(out_buffer); auto formatter = internal_formatter_creator(out_buffer);
switch (unit.type) switch (unit.type)
{ {
case ProcessingUnitType::START : case ProcessingUnitType::START :
{ {

View File

@ -95,7 +95,7 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr threa
{ {
/// Variable chunk is moved, but it is not really used in the next iteration. /// Variable chunk is moved, but it is not really used in the next iteration.
/// NOLINTNEXTLINE(bugprone-use-after-move) /// NOLINTNEXTLINE(bugprone-use-after-move)
unit.chunk_ext.chunk.emplace_back(std::move(chunk)); unit.chunk_ext.chunk.emplace_back(std::move(chunk));
unit.chunk_ext.block_missing_values.emplace_back(parser.getMissingValues()); unit.chunk_ext.block_missing_values.emplace_back(parser.getMissingValues());
} }

View File

@ -13,6 +13,12 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class Context; class Context;
/** /**
@ -52,7 +58,7 @@ class Context;
class ParallelParsingInputFormat : public IInputFormat class ParallelParsingInputFormat : public IInputFormat
{ {
public: public:
/* Used to recreate parser on every new data piece. */ /* Used to recreate parser on every new data piece.*/
using InternalParserCreator = std::function<InputFormatPtr(ReadBuffer & buf)>; using InternalParserCreator = std::function<InputFormatPtr(ReadBuffer & buf)>;
struct Params struct Params