mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
better formatfactory
This commit is contained in:
parent
ffd73082ba
commit
c5f92e5096
@ -1934,11 +1934,11 @@ private:
|
||||
if (has_vertical_output_suffix)
|
||||
current_format = "Vertical";
|
||||
|
||||
if (!is_interactive && !need_render_progress)
|
||||
block_out_stream = context.getOutputFormatParallelIfPossible(current_format, *out_buf, block);
|
||||
|
||||
if (!block_out_stream)
|
||||
block_out_stream = context.getOutputFormat(current_format, *out_buf, block);
|
||||
/// It is not clear how to write progress with parallel formatting. It may increase code complexity significantly.
|
||||
if (!need_render_progress)
|
||||
block_out_stream = context.getOutputStreamParallelIfPossible(current_format, *out_buf, block);
|
||||
else
|
||||
block_out_stream = context.getOutputStream(current_format, *out_buf, block);
|
||||
|
||||
block_out_stream->writePrefix();
|
||||
}
|
||||
|
@ -1180,7 +1180,7 @@ try
|
||||
file_in.seek(0, SEEK_SET);
|
||||
|
||||
BlockInputStreamPtr input = context.getInputFormat(input_format, file_in, header, max_block_size);
|
||||
BlockOutputStreamPtr output = context.getOutputFormat(output_format, file_out, header);
|
||||
BlockOutputStreamPtr output = context.getOutputStream(output_format, file_out, header);
|
||||
|
||||
if (processed_rows + source_rows > limit)
|
||||
input = std::make_shared<LimitBlockInputStream>(input, limit - processed_rows, 0);
|
||||
|
@ -176,7 +176,7 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
|
||||
std::string query = params.get("query");
|
||||
LOG_TRACE(log, "Query: {}", query);
|
||||
|
||||
BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, context);
|
||||
BlockOutputStreamPtr writer = FormatFactory::instance().getOutputStream(format, out, *sample_block, context);
|
||||
auto pool = getPool(connection_string);
|
||||
ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size);
|
||||
copyData(inp, *writer);
|
||||
|
@ -184,7 +184,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector<UInt64
|
||||
context, format, sample_block, command, log,
|
||||
[&ids, this](WriteBufferFromFile & out) mutable
|
||||
{
|
||||
auto output_stream = context.getOutputFormat(format, out, sample_block);
|
||||
auto output_stream = context.getOutputStream(format, out, sample_block);
|
||||
formatIDs(output_stream, ids);
|
||||
out.close();
|
||||
});
|
||||
@ -198,7 +198,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_col
|
||||
context, format, sample_block, command, log,
|
||||
[key_columns, &requested_rows, this](WriteBufferFromFile & out) mutable
|
||||
{
|
||||
auto output_stream = context.getOutputFormat(format, out, sample_block);
|
||||
auto output_stream = context.getOutputStream(format, out, sample_block);
|
||||
formatKeys(dict_struct, output_stream, key_columns, requested_rows);
|
||||
out.close();
|
||||
});
|
||||
|
@ -134,7 +134,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & id
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr)
|
||||
{
|
||||
WriteBufferFromOStream out_buffer(ostr);
|
||||
auto output_stream = context.getOutputFormat(format, out_buffer, sample_block);
|
||||
auto output_stream = context.getOutputStream(format, out_buffer, sample_block);
|
||||
formatIDs(output_stream, ids);
|
||||
};
|
||||
|
||||
@ -153,7 +153,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns,
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr)
|
||||
{
|
||||
WriteBufferFromOStream out_buffer(ostr);
|
||||
auto output_stream = context.getOutputFormat(format, out_buffer, sample_block);
|
||||
auto output_stream = context.getOutputStream(format, out_buffer, sample_block);
|
||||
formatKeys(dict_struct, output_stream, key_columns, requested_rows);
|
||||
};
|
||||
|
||||
|
@ -202,28 +202,20 @@ InputFormatPtr FormatFactory::getInput(
|
||||
return format;
|
||||
}
|
||||
|
||||
BlockOutputStreamPtr FormatFactory::getOutputParallelIfPossible(const String & name,
|
||||
BlockOutputStreamPtr FormatFactory::getOutputStreamParallelIfPossible(const String & name,
|
||||
WriteBuffer & buf, const Block & sample, const Context & context,
|
||||
WriteCallback callback, const std::optional<FormatSettings> & _format_settings) const
|
||||
{
|
||||
auto format_settings = _format_settings
|
||||
? *_format_settings : getFormatSettings(context);
|
||||
|
||||
if (!getCreators(name).output_processor_creator)
|
||||
{
|
||||
throw Exception("Format " + name + " is not suitable for output (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
|
||||
}
|
||||
const auto & output_getter = getCreators(name).output_processor_creator;
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
bool parallel_formatting = settings.output_format_parallel_formatting;
|
||||
|
||||
if (parallel_formatting && getCreators(name).supports_parallel_formatting && !settings.allow_experimental_live_view && !settings.output_format_json_array_of_rows)
|
||||
if (output_getter && parallel_formatting && getCreators(name).supports_parallel_formatting
|
||||
&& !settings.output_format_json_array_of_rows && !settings.allow_experimental_live_view)
|
||||
{
|
||||
const auto & output_getter = getCreators(name).output_processor_creator;
|
||||
|
||||
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
|
||||
* which only work with full columns.
|
||||
*/
|
||||
auto format_settings = _format_settings
|
||||
? *_format_settings : getFormatSettings(context);
|
||||
|
||||
auto formatter_creator = [output_getter, sample, callback, format_settings]
|
||||
(WriteBuffer & output) -> OutputFormatPtr
|
||||
@ -239,12 +231,11 @@ BlockOutputStreamPtr FormatFactory::getOutputParallelIfPossible(const String & n
|
||||
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
|
||||
}
|
||||
|
||||
|
||||
return nullptr;
|
||||
return getOutputStream(name, buf, sample, context, callback, _format_settings);
|
||||
}
|
||||
|
||||
|
||||
BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
|
||||
BlockOutputStreamPtr FormatFactory::getOutputStream(const String & name,
|
||||
WriteBuffer & buf, const Block & sample, const Context & context,
|
||||
WriteCallback callback, const std::optional<FormatSettings> & _format_settings) const
|
||||
{
|
||||
@ -253,10 +244,19 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
|
||||
|
||||
if (!getCreators(name).output_processor_creator)
|
||||
{
|
||||
throw Exception("Format " + name + " is not suitable for output (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
|
||||
const auto & output_getter = getCreators(name).output_creator;
|
||||
if (!output_getter)
|
||||
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
|
||||
|
||||
/** Materialization is needed, because formats can use the functions `IDataType`,
|
||||
* which only work with full columns.
|
||||
*/
|
||||
return std::make_shared<MaterializingBlockOutputStream>(
|
||||
output_getter(buf, sample, std::move(callback), format_settings),
|
||||
sample);
|
||||
}
|
||||
|
||||
auto format = getOutputFormat(name, buf, sample, context, std::move(callback), format_settings);
|
||||
auto format = getOutputFormat(name, buf, sample, context, std::move(callback), _format_settings);
|
||||
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
|
||||
}
|
||||
|
||||
@ -294,6 +294,39 @@ InputFormatPtr FormatFactory::getInputFormat(
|
||||
return format;
|
||||
}
|
||||
|
||||
OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
|
||||
const String & name, WriteBuffer & buf, const Block & sample,
|
||||
const Context & context, WriteCallback callback,
|
||||
const std::optional<FormatSettings> & _format_settings) const
|
||||
{
|
||||
const auto & output_getter = getCreators(name).output_processor_creator;
|
||||
if (!output_getter)
|
||||
throw Exception("Format " + name + " is not suitable for output (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
|
||||
|
||||
RowOutputFormatParams params;
|
||||
params.callback = std::move(callback);
|
||||
|
||||
auto format_settings = _format_settings
|
||||
? *_format_settings : getFormatSettings(context);
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
|
||||
if (settings.output_format_parallel_formatting && getCreators(name).supports_parallel_formatting
|
||||
&& !settings.output_format_json_array_of_rows && !settings.allow_experimental_live_view)
|
||||
{
|
||||
auto formatter_creator = [output_getter, sample, callback, format_settings]
|
||||
(WriteBuffer & output) -> OutputFormatPtr
|
||||
{ return output_getter(output, sample, {std::move(callback)}, format_settings);};
|
||||
|
||||
ParallelFormattingOutputFormat::Params builder{buf, sample, formatter_creator, settings.max_threads};
|
||||
|
||||
return std::make_shared<ParallelFormattingOutputFormat>(builder);
|
||||
}
|
||||
|
||||
return getOutputFormat(name, buf, sample, context, callback, _format_settings);
|
||||
}
|
||||
|
||||
|
||||
|
||||
OutputFormatPtr FormatFactory::getOutputFormat(
|
||||
const String & name, WriteBuffer & buf, const Block & sample,
|
||||
@ -302,7 +335,7 @@ OutputFormatPtr FormatFactory::getOutputFormat(
|
||||
{
|
||||
const auto & output_getter = getCreators(name).output_processor_creator;
|
||||
if (!output_getter)
|
||||
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
|
||||
throw Exception("Format " + name + " is not suitable for output (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
|
||||
|
||||
RowOutputFormatParams params;
|
||||
params.callback = std::move(callback);
|
||||
|
@ -116,12 +116,14 @@ public:
|
||||
UInt64 max_block_size,
|
||||
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
|
||||
|
||||
/// Checks all preconditions. Returns nullptr of parallel formatting cannot be done.
|
||||
BlockOutputStreamPtr getOutputParallelIfPossible(const String & name, WriteBuffer & buf,
|
||||
/// Checks all preconditions. Returns ordinary stream if parallel formatting cannot be done.
|
||||
/// Currenly used only in Client. Don't use it something else! Better look at getOutputFormatParallelIfPossible.
|
||||
BlockOutputStreamPtr getOutputStreamParallelIfPossible(const String & name, WriteBuffer & buf,
|
||||
const Block & sample, const Context & context, WriteCallback callback = {},
|
||||
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
|
||||
|
||||
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
|
||||
/// Currenly used only in Client. Don't use it something else! Better look at getOutputFormat.
|
||||
BlockOutputStreamPtr getOutputStream(const String & name, WriteBuffer & buf,
|
||||
const Block & sample, const Context & context, WriteCallback callback = {},
|
||||
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
|
||||
|
||||
@ -133,6 +135,12 @@ public:
|
||||
UInt64 max_block_size,
|
||||
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
|
||||
|
||||
/// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done.
|
||||
OutputFormatPtr getOutputFormatParallelIfPossible(
|
||||
const String & name, WriteBuffer & buf, const Block & sample,
|
||||
const Context & context, WriteCallback callback = {},
|
||||
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
|
||||
|
||||
OutputFormatPtr getOutputFormat(
|
||||
const String & name, WriteBuffer & buf, const Block & sample,
|
||||
const Context & context, WriteCallback callback = {},
|
||||
|
@ -2089,17 +2089,22 @@ BlockInputStreamPtr Context::getInputFormat(const String & name, ReadBuffer & bu
|
||||
return std::make_shared<InputStreamFromInputFormat>(FormatFactory::instance().getInput(name, buf, sample, *this, max_block_size));
|
||||
}
|
||||
|
||||
BlockOutputStreamPtr Context::getOutputFormatParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const
|
||||
BlockOutputStreamPtr Context::getOutputStreamParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const
|
||||
{
|
||||
return FormatFactory::instance().getOutputParallelIfPossible(name, buf, sample, *this);
|
||||
return FormatFactory::instance().getOutputStreamParallelIfPossible(name, buf, sample, *this);
|
||||
}
|
||||
|
||||
BlockOutputStreamPtr Context::getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const
|
||||
BlockOutputStreamPtr Context::getOutputStream(const String & name, WriteBuffer & buf, const Block & sample) const
|
||||
{
|
||||
return FormatFactory::instance().getOutput(name, buf, sample, *this);
|
||||
return FormatFactory::instance().getOutputStream(name, buf, sample, *this);
|
||||
}
|
||||
|
||||
OutputFormatPtr Context::getOutputFormatProcessor(const String & name, WriteBuffer & buf, const Block & sample) const
|
||||
OutputFormatPtr Context::getOutputFormatParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const
|
||||
{
|
||||
return FormatFactory::instance().getOutputFormatParallelIfPossible(name, buf, sample, *this);
|
||||
}
|
||||
|
||||
OutputFormatPtr Context::getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const
|
||||
{
|
||||
return FormatFactory::instance().getOutputFormat(name, buf, sample, *this);
|
||||
}
|
||||
|
@ -426,10 +426,12 @@ public:
|
||||
/// I/O formats.
|
||||
BlockInputStreamPtr getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size) const;
|
||||
|
||||
BlockOutputStreamPtr getOutputFormatParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const;
|
||||
BlockOutputStreamPtr getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const;
|
||||
/// Don't use streams. Better look at getOutputFormat...
|
||||
BlockOutputStreamPtr getOutputStreamParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const;
|
||||
BlockOutputStreamPtr getOutputStream(const String & name, WriteBuffer & buf, const Block & sample) const;
|
||||
|
||||
OutputFormatPtr getOutputFormatProcessor(const String & name, WriteBuffer & buf, const Block & sample) const;
|
||||
OutputFormatPtr getOutputFormatParallelIfPossible(const String & name, WriteBuffer & buf, const Block & sample) const;
|
||||
OutputFormatPtr getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const;
|
||||
|
||||
InterserverIOHandler & getInterserverIOHandler();
|
||||
|
||||
|
@ -974,10 +974,7 @@ void executeQuery(
|
||||
? getIdentifierName(ast_query_with_output->format)
|
||||
: context.getDefaultFormat();
|
||||
|
||||
BlockOutputStreamPtr out;
|
||||
out = context.getOutputFormatParallelIfPossible(format_name, *out_buf, streams.in->getHeader());
|
||||
if (!out)
|
||||
out = context.getOutputFormat(format_name, *out_buf, streams.in->getHeader());
|
||||
auto out = context.getOutputStream(format_name, *out_buf, streams.in->getHeader());
|
||||
|
||||
/// Save previous progress callback if any. TODO Do it more conveniently.
|
||||
auto previous_progress_callback = context.getProgressCallback();
|
||||
@ -1022,7 +1019,7 @@ void executeQuery(
|
||||
return std::make_shared<MaterializingTransform>(header);
|
||||
});
|
||||
|
||||
auto out = context.getOutputFormatProcessor(format_name, *out_buf, pipeline.getHeader());
|
||||
auto out = context.getOutputFormat(format_name, *out_buf, pipeline.getHeader());
|
||||
out->setAutoFlush();
|
||||
|
||||
/// Save previous progress callback if any. TODO Do it more conveniently.
|
||||
|
201
src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp
Normal file
201
src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp
Normal file
@ -0,0 +1,201 @@
|
||||
#include <Processors/Formats/Impl/ParallelFormattingOutputFormat.h>
|
||||
|
||||
#include <Common/setThreadName.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
void ParallelFormattingOutputFormat::finalize()
|
||||
{
|
||||
need_flush = true;
|
||||
IOutputFormat::finalized = true;
|
||||
/// Don't throw any background_exception here, because we want to finalize the execution.
|
||||
/// Exception will be checked after main thread is finished.
|
||||
addChunk(Chunk{}, ProcessingUnitType::FINALIZE, /*can_throw_exception*/ false);
|
||||
collector_finished.wait();
|
||||
|
||||
if (collector_thread.joinable())
|
||||
collector_thread.join();
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (background_exception)
|
||||
std::rethrow_exception(background_exception);
|
||||
}
|
||||
}
|
||||
|
||||
void ParallelFormattingOutputFormat::addChunk(Chunk chunk, ProcessingUnitType type, bool can_throw_exception)
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (background_exception && can_throw_exception)
|
||||
std::rethrow_exception(background_exception);
|
||||
}
|
||||
|
||||
const auto current_unit_number = writer_unit_number % processing_units.size();
|
||||
auto & unit = processing_units[current_unit_number];
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
writer_condvar.wait(lock,
|
||||
[&]{ return unit.status == READY_TO_INSERT || emergency_stop; });
|
||||
}
|
||||
|
||||
if (emergency_stop)
|
||||
return;
|
||||
|
||||
assert(unit.status == READY_TO_INSERT);
|
||||
unit.chunk = std::move(chunk);
|
||||
/// Resize memory without deallocation.
|
||||
unit.segment.resize(0);
|
||||
unit.status = READY_TO_FORMAT;
|
||||
unit.type = type;
|
||||
|
||||
scheduleFormatterThreadForUnitWithNumber(current_unit_number);
|
||||
|
||||
++writer_unit_number;
|
||||
}
|
||||
|
||||
|
||||
void ParallelFormattingOutputFormat::finishAndWait()
|
||||
{
|
||||
emergency_stop = true;
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
collector_condvar.notify_all();
|
||||
writer_condvar.notify_all();
|
||||
}
|
||||
|
||||
if (collector_thread.joinable())
|
||||
collector_thread.join();
|
||||
|
||||
try
|
||||
{
|
||||
pool.wait();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ParallelFormattingOutputFormat::collectorThreadFunction()
|
||||
{
|
||||
setThreadName("Collector");
|
||||
|
||||
try
|
||||
{
|
||||
while (!emergency_stop)
|
||||
{
|
||||
const auto current_unit_number = collector_unit_number % processing_units.size();
|
||||
auto & unit = processing_units[current_unit_number];
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
collector_condvar.wait(lock,
|
||||
[&]{ return unit.status == READY_TO_READ || emergency_stop; });
|
||||
}
|
||||
|
||||
if (emergency_stop)
|
||||
break;
|
||||
|
||||
assert(unit.status == READY_TO_READ);
|
||||
|
||||
/// Use this copy to after notification to stop the execution.
|
||||
auto copy_if_unit_type = unit.type;
|
||||
|
||||
/// Do main work here.
|
||||
out.write(unit.segment.data(), unit.actual_memory_size);
|
||||
|
||||
if (need_flush.exchange(false) || auto_flush)
|
||||
IOutputFormat::flush();
|
||||
|
||||
++collector_unit_number;
|
||||
|
||||
{
|
||||
/// Notify other threads.
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
unit.status = READY_TO_INSERT;
|
||||
writer_condvar.notify_all();
|
||||
}
|
||||
/// We can exit only after writing last piece of to out buffer.
|
||||
if (copy_if_unit_type == ProcessingUnitType::FINALIZE)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
collector_finished.set();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
collector_finished.set();
|
||||
onBackgroundException();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ParallelFormattingOutputFormat::formatterThreadFunction(size_t current_unit_number)
|
||||
{
|
||||
setThreadName("Formatter");
|
||||
|
||||
try
|
||||
{
|
||||
auto & unit = processing_units[current_unit_number];
|
||||
assert(unit.status = READY_TO_FORMAT);
|
||||
|
||||
/// We want to preallocate memory buffer (increase capacity)
|
||||
/// and put the pointer at the beginning of the buffer
|
||||
unit.segment.resize(DBMS_DEFAULT_BUFFER_SIZE);
|
||||
/// The second invocation won't release memory, only set size equals to 0.
|
||||
unit.segment.resize(0);
|
||||
|
||||
unit.actual_memory_size = 0;
|
||||
BufferWithOutsideMemory<WriteBuffer> out_buffer(unit.segment);
|
||||
|
||||
auto formatter = internal_formatter_creator(out_buffer);
|
||||
|
||||
switch (unit.type)
|
||||
{
|
||||
case ProcessingUnitType::START :
|
||||
{
|
||||
formatter->doWritePrefix();
|
||||
break;
|
||||
}
|
||||
case ProcessingUnitType::PLAIN :
|
||||
{
|
||||
formatter->consume(std::move(unit.chunk));
|
||||
break;
|
||||
}
|
||||
case ProcessingUnitType::TOTALS :
|
||||
{
|
||||
formatter->consumeTotals(std::move(unit.chunk));
|
||||
break;
|
||||
}
|
||||
case ProcessingUnitType::EXTREMES :
|
||||
{
|
||||
formatter->consumeExtremes(std::move(unit.chunk));
|
||||
break;
|
||||
}
|
||||
case ProcessingUnitType::FINALIZE :
|
||||
{
|
||||
formatter->doWriteSuffix();
|
||||
break;
|
||||
}
|
||||
}
|
||||
/// Flush all the data to handmade buffer.
|
||||
formatter->flush();
|
||||
unit.actual_memory_size = out_buffer.getActualSize();
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
unit.status = READY_TO_READ;
|
||||
collector_condvar.notify_all();
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
onBackgroundException();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,22 +1,19 @@
|
||||
#pragma once
|
||||
|
||||
#include <Processors/Formats/IOutputFormat.h>
|
||||
|
||||
#include <Common/Arena.h>
|
||||
#include <IO/WriteBufferFromArena.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
|
||||
#include <deque>
|
||||
#include <future>
|
||||
#include <Common/setThreadName.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Poco/Event.h>
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <IO/BufferWithOwnMemory.h>
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Poco/Event.h>
|
||||
#include <IO/BufferWithOwnMemory.h>
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <IO/WriteBufferFromArena.h>
|
||||
|
||||
#include <deque>
|
||||
#include <atomic>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -79,7 +76,7 @@ public:
|
||||
/// and n threads for formatting.
|
||||
processing_units.resize(params.max_threads_for_parallel_formatting + 2);
|
||||
collector_thread = ThreadFromGlobalPool([&] { collectorThreadFunction(); });
|
||||
LOG_TRACE(&Poco::Logger::get("ParallelFormattingOutputFormat"), "Parallel formatting is being used.");
|
||||
LOG_TRACE(&Poco::Logger::get("ParallelFormattingOutputFormat"), "Parallel formatting is being used");
|
||||
}
|
||||
|
||||
~ParallelFormattingOutputFormat() override
|
||||
@ -120,24 +117,7 @@ protected:
|
||||
addChunk(std::move(extremes), ProcessingUnitType::EXTREMES, /*can_throw_exception*/ true);
|
||||
}
|
||||
|
||||
void finalize() override
|
||||
{
|
||||
need_flush = true;
|
||||
IOutputFormat::finalized = true;
|
||||
/// Don't throw any background_exception here, because we want to finalize the execution.
|
||||
/// Exception will be checked after main thread is finished.
|
||||
addChunk(Chunk{}, ProcessingUnitType::FINALIZE, /*can_throw_exception*/ false);
|
||||
collector_finished.wait();
|
||||
|
||||
if (collector_thread.joinable())
|
||||
collector_thread.join();
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (background_exception)
|
||||
std::rethrow_exception(background_exception);
|
||||
}
|
||||
}
|
||||
void finalize() override;
|
||||
|
||||
private:
|
||||
InternalFormatterCreator internal_formatter_creator;
|
||||
@ -160,37 +140,7 @@ private:
|
||||
FINALIZE
|
||||
};
|
||||
|
||||
void addChunk(Chunk chunk, ProcessingUnitType type, bool can_throw_exception)
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (background_exception && can_throw_exception)
|
||||
std::rethrow_exception(background_exception);
|
||||
}
|
||||
|
||||
const auto current_unit_number = writer_unit_number % processing_units.size();
|
||||
auto & unit = processing_units[current_unit_number];
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
writer_condvar.wait(lock,
|
||||
[&]{ return unit.status == READY_TO_INSERT || emergency_stop; });
|
||||
}
|
||||
|
||||
if (emergency_stop)
|
||||
return;
|
||||
|
||||
assert(unit.status == READY_TO_INSERT);
|
||||
unit.chunk = std::move(chunk);
|
||||
/// Resize memory without deallocation.
|
||||
unit.segment.resize(0);
|
||||
unit.status = READY_TO_FORMAT;
|
||||
unit.type = type;
|
||||
|
||||
scheduleFormatterThreadForUnitWithNumber(current_unit_number);
|
||||
|
||||
++writer_unit_number;
|
||||
}
|
||||
void addChunk(Chunk chunk, ProcessingUnitType type, bool can_throw_exception);
|
||||
|
||||
struct ProcessingUnit
|
||||
{
|
||||
@ -224,29 +174,7 @@ private:
|
||||
std::condition_variable collector_condvar;
|
||||
std::condition_variable writer_condvar;
|
||||
|
||||
void finishAndWait()
|
||||
{
|
||||
emergency_stop = true;
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
collector_condvar.notify_all();
|
||||
writer_condvar.notify_all();
|
||||
}
|
||||
|
||||
if (collector_thread.joinable())
|
||||
collector_thread.join();
|
||||
|
||||
try
|
||||
{
|
||||
pool.wait();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
void finishAndWait();
|
||||
|
||||
void onBackgroundException()
|
||||
{
|
||||
@ -265,124 +193,11 @@ private:
|
||||
pool.scheduleOrThrowOnError([this, ticket_number] { formatterThreadFunction(ticket_number); });
|
||||
}
|
||||
|
||||
void collectorThreadFunction()
|
||||
{
|
||||
setThreadName("Collector");
|
||||
|
||||
try
|
||||
{
|
||||
while (!emergency_stop)
|
||||
{
|
||||
const auto current_unit_number = collector_unit_number % processing_units.size();
|
||||
auto & unit = processing_units[current_unit_number];
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
collector_condvar.wait(lock,
|
||||
[&]{ return unit.status == READY_TO_READ || emergency_stop; });
|
||||
}
|
||||
|
||||
if (emergency_stop)
|
||||
break;
|
||||
|
||||
assert(unit.status == READY_TO_READ);
|
||||
|
||||
/// Use this copy to after notification to stop the execution.
|
||||
auto copy_if_unit_type = unit.type;
|
||||
|
||||
/// Do main work here.
|
||||
out.write(unit.segment.data(), unit.actual_memory_size);
|
||||
|
||||
if (need_flush.exchange(false) || auto_flush)
|
||||
IOutputFormat::flush();
|
||||
|
||||
++collector_unit_number;
|
||||
|
||||
{
|
||||
/// Notify other threads.
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
unit.status = READY_TO_INSERT;
|
||||
writer_condvar.notify_all();
|
||||
}
|
||||
/// We can exit only after writing last piece of to out buffer.
|
||||
if (copy_if_unit_type == ProcessingUnitType::FINALIZE)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
collector_finished.set();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
collector_finished.set();
|
||||
onBackgroundException();
|
||||
}
|
||||
}
|
||||
/// Collects all temporary buffers into main WriteBuffer.
|
||||
void collectorThreadFunction();
|
||||
|
||||
/// This function is executed in ThreadPool and the only purpose of it is to format one Chunk into a continuous buffer in memory.
|
||||
void formatterThreadFunction(size_t current_unit_number)
|
||||
{
|
||||
setThreadName("Formatter");
|
||||
|
||||
try
|
||||
{
|
||||
auto & unit = processing_units[current_unit_number];
|
||||
assert(unit.status = READY_TO_FORMAT);
|
||||
|
||||
/// We want to preallocate memory buffer (increase capacity)
|
||||
/// and put the pointer at the beginning of the buffer
|
||||
/// FIXME: Implement reserve() method in Memory.
|
||||
unit.segment.resize(DBMS_DEFAULT_BUFFER_SIZE);
|
||||
unit.segment.resize(0);
|
||||
|
||||
unit.actual_memory_size = 0;
|
||||
BufferWithOutsideMemory<WriteBuffer> out_buffer(unit.segment);
|
||||
|
||||
auto formatter = internal_formatter_creator(out_buffer);
|
||||
|
||||
switch (unit.type)
|
||||
{
|
||||
case ProcessingUnitType::START :
|
||||
{
|
||||
formatter->doWritePrefix();
|
||||
break;
|
||||
}
|
||||
case ProcessingUnitType::PLAIN :
|
||||
{
|
||||
formatter->consume(std::move(unit.chunk));
|
||||
break;
|
||||
}
|
||||
case ProcessingUnitType::TOTALS :
|
||||
{
|
||||
formatter->consumeTotals(std::move(unit.chunk));
|
||||
break;
|
||||
}
|
||||
case ProcessingUnitType::EXTREMES :
|
||||
{
|
||||
formatter->consumeExtremes(std::move(unit.chunk));
|
||||
break;
|
||||
}
|
||||
case ProcessingUnitType::FINALIZE :
|
||||
{
|
||||
formatter->doWriteSuffix();
|
||||
break;
|
||||
}
|
||||
}
|
||||
/// Flush all the data to handmade buffer.
|
||||
formatter->flush();
|
||||
unit.actual_memory_size = out_buffer.getActualSize();
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
unit.status = READY_TO_READ;
|
||||
collector_condvar.notify_all();
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
onBackgroundException();
|
||||
}
|
||||
}
|
||||
void formatterThreadFunction(size_t current_unit_number);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -994,7 +994,7 @@ namespace
|
||||
|
||||
AsynchronousBlockInputStream async_in(io.in);
|
||||
write_buffer.emplace(*result.mutable_output());
|
||||
block_output_stream = query_context->getOutputFormat(output_format, *write_buffer, async_in.getHeader());
|
||||
block_output_stream = query_context->getOutputStream(output_format, *write_buffer, async_in.getHeader());
|
||||
Stopwatch after_send_progress;
|
||||
|
||||
/// Unless the input() function is used we are not going to receive input data anymore.
|
||||
@ -1066,7 +1066,7 @@ namespace
|
||||
|
||||
auto executor = std::make_shared<PullingAsyncPipelineExecutor>(io.pipeline);
|
||||
write_buffer.emplace(*result.mutable_output());
|
||||
block_output_stream = query_context->getOutputFormat(output_format, *write_buffer, executor->getHeader());
|
||||
block_output_stream = query_context->getOutputStream(output_format, *write_buffer, executor->getHeader());
|
||||
block_output_stream->writePrefix();
|
||||
Stopwatch after_send_progress;
|
||||
|
||||
@ -1321,7 +1321,7 @@ namespace
|
||||
return;
|
||||
|
||||
WriteBufferFromString buf{*result.mutable_totals()};
|
||||
auto stream = query_context->getOutputFormat(output_format, buf, totals);
|
||||
auto stream = query_context->getOutputStream(output_format, buf, totals);
|
||||
stream->writePrefix();
|
||||
stream->write(totals);
|
||||
stream->writeSuffix();
|
||||
@ -1333,7 +1333,7 @@ namespace
|
||||
return;
|
||||
|
||||
WriteBufferFromString buf{*result.mutable_extremes()};
|
||||
auto stream = query_context->getOutputFormat(output_format, buf, extremes);
|
||||
auto stream = query_context->getOutputStream(output_format, buf, extremes);
|
||||
stream->writePrefix();
|
||||
stream->write(extremes);
|
||||
stream->writeSuffix();
|
||||
|
@ -184,7 +184,7 @@ public:
|
||||
: sample_block(sample_block_)
|
||||
{
|
||||
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri, context.getGlobalContext().getConfigRef()), compression_method, 3);
|
||||
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
|
||||
writer = FormatFactory::instance().getOutputStream(format, *write_buf, sample_block, context);
|
||||
}
|
||||
|
||||
Block getHeader() const override
|
||||
|
@ -35,7 +35,7 @@ void KafkaBlockOutputStream::writePrefix()
|
||||
auto format_settings = getFormatSettings(*context);
|
||||
format_settings.protobuf.allow_many_rows_no_delimiters = true;
|
||||
|
||||
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer,
|
||||
child = FormatFactory::instance().getOutputStream(storage.getFormatName(), *buffer,
|
||||
getHeader(), *context,
|
||||
[this](const Columns & columns, size_t row)
|
||||
{
|
||||
|
@ -45,7 +45,7 @@ void RabbitMQBlockOutputStream::writePrefix()
|
||||
auto format_settings = getFormatSettings(context);
|
||||
format_settings.protobuf.allow_many_rows_no_delimiters = true;
|
||||
|
||||
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer,
|
||||
child = FormatFactory::instance().getOutputStream(storage.getFormatName(), *buffer,
|
||||
getHeader(), context,
|
||||
[this](const Columns & /* columns */, size_t /* rows */)
|
||||
{
|
||||
|
@ -481,12 +481,12 @@ public:
|
||||
|
||||
write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3);
|
||||
|
||||
writer = FormatFactory::instance().getOutputParallelIfPossible(storage.format_name,
|
||||
writer = FormatFactory::instance().getOutputStreamParallelIfPossible(storage.format_name,
|
||||
*write_buf, metadata_snapshot->getSampleBlock(), context,
|
||||
{}, format_settings);
|
||||
|
||||
if (!writer)
|
||||
writer = FormatFactory::instance().getOutput(storage.format_name,
|
||||
writer = FormatFactory::instance().getOutputStream(storage.format_name,
|
||||
*write_buf, metadata_snapshot->getSampleBlock(), context,
|
||||
{}, format_settings);
|
||||
}
|
||||
|
@ -147,7 +147,7 @@ public:
|
||||
sqlbuf << backQuoteMySQL(remote_database_name) << "." << backQuoteMySQL(remote_table_name);
|
||||
sqlbuf << " (" << dumpNamesWithBackQuote(block) << ") VALUES ";
|
||||
|
||||
auto writer = FormatFactory::instance().getOutput("Values", sqlbuf, metadata_snapshot->getSampleBlock(), storage.global_context);
|
||||
auto writer = FormatFactory::instance().getOutputStream("Values", sqlbuf, metadata_snapshot->getSampleBlock(), storage.global_context);
|
||||
writer->write(block);
|
||||
|
||||
if (!storage.on_duplicate_clause.empty())
|
||||
|
@ -155,7 +155,7 @@ namespace
|
||||
{
|
||||
write_buf = wrapWriteBufferWithCompressionMethod(
|
||||
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3);
|
||||
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
|
||||
writer = FormatFactory::instance().getOutputStream(format, *write_buf, sample_block, context);
|
||||
}
|
||||
|
||||
Block getHeader() const override
|
||||
|
@ -156,7 +156,7 @@ StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri,
|
||||
write_buf = wrapWriteBufferWithCompressionMethod(
|
||||
std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts),
|
||||
compression_method, 3);
|
||||
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block,
|
||||
writer = FormatFactory::instance().getOutputStream(format, *write_buf, sample_block,
|
||||
context, {} /* write callback */, format_settings);
|
||||
}
|
||||
|
||||
|
@ -134,7 +134,7 @@ std::string readData(DB::StoragePtr & table, const DB::Context & context)
|
||||
tryRegisterFormats();
|
||||
|
||||
WriteBufferFromOwnString out_buf;
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutput("Values", out_buf, sample, context);
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream("Values", out_buf, sample, context);
|
||||
|
||||
copyData(*in, *output);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user