save changes

This commit is contained in:
Nikita Mikhaylov 2020-05-18 13:00:22 +03:00 committed by nikitamikhaylov
parent 49f65ecf9d
commit e0addac6fc
13 changed files with 536 additions and 38 deletions

View File

@ -158,7 +158,6 @@ private:
// constructor, which is absent for atomics that are inside ProcessingUnit. // constructor, which is absent for atomics that are inside ProcessingUnit.
std::deque<ProcessingUnit> processing_units; std::deque<ProcessingUnit> processing_units;
void scheduleParserThreadForUnitWithNumber(size_t ticket_number); void scheduleParserThreadForUnitWithNumber(size_t ticket_number);
void finishAndWait(); void finishAndWait();

View File

@ -4,6 +4,7 @@
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <IO/ReadWriteBufferFromHTTP.h> #include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/ConnectionTimeoutsContext.h> #include <IO/ConnectionTimeoutsContext.h>
@ -47,8 +48,8 @@ namespace
: name(name_) : name(name_)
{ {
read_buf = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, callback, timeouts); read_buf = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, callback, timeouts);
reader auto format = FormatFactory::instance().getInput(IXDBCBridgeHelper::DEFAULT_FORMAT, *read_buf, sample_block, context, max_block_size);
= FormatFactory::instance().getInput(IXDBCBridgeHelper::DEFAULT_FORMAT, *read_buf, sample_block, context, max_block_size); reader = std::make_shared<InputStreamFromInputFormat>(format);
} }
Block getHeader() const override { return reader->getHeader(); } Block getHeader() const override { return reader->getHeader(); }

View File

@ -6,15 +6,17 @@
#include <Core/Settings.h> #include <Core/Settings.h>
#include <DataStreams/MaterializingBlockOutputStream.h> #include <DataStreams/MaterializingBlockOutputStream.h>
#include <DataStreams/ParallelParsingBlockInputStream.h> #include <DataStreams/ParallelParsingBlockInputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
#include <Processors/Formats/IRowInputFormat.h> #include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/IRowOutputFormat.h> #include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h> #include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/OutputStreamToOutputFormat.h> #include <Processors/Formats/OutputStreamToOutputFormat.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h> #include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h> #include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Processors/Formats/Impl/PostgreSQLOutputFormat.h> #include <Processors/Formats/Impl/NativeFormat.cpp>
#include <Processors/Formats/Impl/ParallelParsingBlockInputFormat.h>
#include <Poco/URI.h> #include <Poco/URI.h>
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
@ -131,7 +133,7 @@ FormatSettings getFormatSettings<Settings>(const Context & context,
const Settings & settings); const Settings & settings);
BlockInputStreamPtr FormatFactory::getInput( InputFormatPtr FormatFactory::getInput(
const String & name, const String & name,
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
@ -140,19 +142,23 @@ BlockInputStreamPtr FormatFactory::getInput(
const std::optional<FormatSettings> & _format_settings) const const std::optional<FormatSettings> & _format_settings) const
{ {
if (name == "Native") if (name == "Native")
return std::make_shared<NativeBlockInputStream>(buf, sample, 0); return std::make_shared<NativeInputFormatFromNativeBlockInputStream>(sample, buf);
auto format_settings = _format_settings auto format_settings = _format_settings
? *_format_settings : getFormatSettings(context); ? *_format_settings : getFormatSettings(context);
if (!getCreators(name).input_processor_creator) if (!getCreators(name).input_processor_creator)
{ {
const auto & input_getter = getCreators(name).input_creator;
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
// const auto & input_getter = getCreators(name).input_creator;
return input_getter(buf, sample, max_block_size, {}, format_settings); // if (!input_getter)
// throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
//
// const Settings & settings = context.getSettingsRef();
// FormatSettings format_settings = getInputFormatSetting(settings, context);
//
// return input_getter(buf, sample, max_block_size, callback ? callback : ReadCallback(), format_settings);
throw;
} }
const Settings & settings = context.getSettingsRef(); const Settings & settings = context.getSettingsRef();
@ -188,19 +194,32 @@ BlockInputStreamPtr FormatFactory::getInput(
row_input_format_params.max_execution_time = settings.max_execution_time; row_input_format_params.max_execution_time = settings.max_execution_time;
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode; row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
auto input_creator_params =
ParallelParsingBlockInputStream::InputCreatorParams{sample, auto parser_creator = std::bind(
row_input_format_params, format_settings}; input_getter.target<InputProcessorCreatorFunc>(),
ParallelParsingBlockInputStream::Params params{buf, input_getter, std::placeholders::_1, sample, row_input_format_params, format_settings);
input_creator_params, file_segmentation_engine,
// auto anime = parser_creator(buf)
auto boruto = input_getter(buf, sample, row_input_format_params, format_settings);
// auto naruto = input_getter.target<InputProcessorCreatorFunc>()(buf, sample, row_input_format_params, format_settings);
auto naruto =
[sample, row_input_format_params, format_settings, input_getter]
(ReadBuffer & input)
{return input_getter(input, sample, row_input_format_params, format_settings);};
auto aaa = naruto(buf);
ParallelParsingBlockInputFormat::Params params{buf, sample,
naruto, file_segmentation_engine,
settings.max_threads, settings.max_threads,
settings.min_chunk_bytes_for_parallel_parsing}; settings.min_chunk_bytes_for_parallel_parsing};
return std::make_shared<ParallelParsingBlockInputStream>(params); return std::make_shared<ParallelParsingBlockInputFormat>(params);
} }
auto format = getInputFormat(name, buf, sample, context, max_block_size,
format_settings); auto format = getInputFormat(name, buf, sample, context, max_block_size, std::move(callback));
return std::make_shared<InputStreamFromInputFormat>(std::move(format)); return format;
} }

View File

@ -79,11 +79,13 @@ private:
WriteCallback callback, WriteCallback callback,
const FormatSettings & settings)>; const FormatSettings & settings)>;
using InputProcessorCreator = std::function<InputFormatPtr( using InputProcessorCreatorFunc = InputFormatPtr(
ReadBuffer & buf, ReadBuffer & buf,
const Block & header, const Block & header,
const RowInputFormatParams & params, const RowInputFormatParams & params,
const FormatSettings & settings)>; const FormatSettings & settings);
using InputProcessorCreator = std::function<InputProcessorCreatorFunc>;
using OutputProcessorCreator = std::function<OutputFormatPtr( using OutputProcessorCreator = std::function<OutputFormatPtr(
WriteBuffer & buf, WriteBuffer & buf,
@ -105,7 +107,7 @@ private:
public: public:
static FormatFactory & instance(); static FormatFactory & instance();
BlockInputStreamPtr getInput( InputFormatPtr getInput(
const String & name, const String & name,
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,

View File

@ -16,6 +16,7 @@
#include <Compression/ICompressionCodec.h> #include <Compression/ICompressionCodec.h>
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/MarkCache.h> #include <Storages/MarkCache.h>
@ -2072,7 +2073,7 @@ void Context::checkPartitionCanBeDropped(const String & database, const String &
BlockInputStreamPtr Context::getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size) const BlockInputStreamPtr Context::getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size) const
{ {
return FormatFactory::instance().getInput(name, buf, sample, *this, max_block_size); return std::make_shared<InputStreamFromInputFormat>(FormatFactory::instance().getInput(name, buf, sample, *this, max_block_size));
} }
BlockOutputStreamPtr Context::getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const BlockOutputStreamPtr Context::getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const

View File

@ -464,6 +464,9 @@ void PipelineExecutor::finalizeExecution()
if (!all_processors_finished) if (!all_processors_finished)
throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR); throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR);
WriteBufferFromOStream out(std::cout);
printPipeline(processors, out);
} }
void PipelineExecutor::wakeUpExecutor(size_t thread_num) void PipelineExecutor::wakeUpExecutor(size_t thread_num)

View File

@ -0,0 +1,202 @@
#include <Processors/Formats/Impl/ParallelParsingBlockInputFormat.h>
namespace DB
{
void ParallelParsingBlockInputFormat::segmentatorThreadFunction()
{
setThreadName("Segmentator");
try
{
while (!parsing_finished)
{
const auto current_unit_number = segmentator_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
{
std::unique_lock<std::mutex> lock(mutex);
segmentator_condvar.wait(lock,
[&]{ return unit.status == READY_TO_INSERT || parsing_finished; });
}
if (parsing_finished)
{
break;
}
assert(unit.status == READY_TO_INSERT);
// Segmentating the original input.
unit.segment.resize(0);
const bool have_more_data = file_segmentation_engine(in, unit.segment, min_chunk_bytes);
unit.is_last = !have_more_data;
unit.status = READY_TO_PARSE;
scheduleParserThreadForUnitWithNumber(segmentator_ticket_number);
++segmentator_ticket_number;
if (!have_more_data)
{
break;
}
}
}
catch (...)
{
onBackgroundException();
}
}
void ParallelParsingBlockInputFormat::parserThreadFunction(size_t current_ticket_number)
{
try
{
setThreadName("ChunkParser");
const auto current_unit_number = current_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
/*
* This is kind of suspicious -- the input_process_creator contract with
* respect to multithreaded use is not clear, but we hope that it is
* just a 'normal' factory class that doesn't have any state, and so we
* can use it from multiple threads simultaneously.
*/
ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);
InputFormatPtr input_format = internal_parser_creator(read_buffer);
InternalParser parser(input_format);
unit.chunk_ext.chunk.clear();
unit.chunk_ext.block_missing_values.clear();
// We don't know how many blocks will be. So we have to read them all
// until an empty block occured.
Chunk chunk;
while (!parsing_finished && (chunk = parser.getChunk()) != Chunk())
{
unit.chunk_ext.chunk.emplace_back(std::move(chunk));
unit.chunk_ext.block_missing_values.emplace_back(parser.getMissingValues());
}
// We suppose we will get at least some blocks for a non-empty buffer,
// except at the end of file. Also see a matching assert in readImpl().
assert(unit.is_last || !unit.chunk_ext.chunk.empty());
std::lock_guard<std::mutex> lock(mutex);
unit.status = READY_TO_READ;
reader_condvar.notify_all();
}
catch (...)
{
onBackgroundException();
}
}
void ParallelParsingBlockInputFormat::onBackgroundException()
{
tryLogCurrentException(__PRETTY_FUNCTION__);
std::unique_lock<std::mutex> lock(mutex);
if (!background_exception)
{
background_exception = std::current_exception();
}
parsing_finished = true;
reader_condvar.notify_all();
segmentator_condvar.notify_all();
}
Chunk ParallelParsingBlockInputFormat::generate()
{
if (isCancelled() || parsing_finished)
{
/**
* Check for background exception and rethrow it before we return.
*/
std::unique_lock<std::mutex> lock(mutex);
if (background_exception)
{
lock.unlock();
onCancel();
std::rethrow_exception(background_exception);
}
return {};
}
const auto current_unit_number = reader_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
if (!next_block_in_current_unit.has_value())
{
// We have read out all the Blocks from the previous Processing Unit,
// wait for the current one to become ready.
std::unique_lock<std::mutex> lock(mutex);
reader_condvar.wait(lock, [&](){ return unit.status == READY_TO_READ || parsing_finished; });
if (parsing_finished)
{
/**
* Check for background exception and rethrow it before we return.
*/
if (background_exception)
{
lock.unlock();
cancel();
std::rethrow_exception(background_exception);
}
return {};
}
assert(unit.status == READY_TO_READ);
next_block_in_current_unit = 0;
}
if (unit.chunk_ext.chunk.empty())
{
/*
* Can we get zero blocks for an entire segment, when the format parser
* skips it entire content and does not create any blocks? Probably not,
* but if we ever do, we should add a loop around the above if, to skip
* these. Also see a matching assert in the parser thread.
*/
assert(unit.is_last);
parsing_finished = true;
return {};
}
assert(next_block_in_current_unit.value() < unit.chunk_ext.chunk.size());
Chunk res = std::move(unit.chunk_ext.chunk.at(*next_block_in_current_unit));
last_block_missing_values = std::move(unit.chunk_ext.block_missing_values[*next_block_in_current_unit]);
next_block_in_current_unit.value() += 1;
if (*next_block_in_current_unit == unit.chunk_ext.chunk.size())
{
// parsing_finished reading this Processing Unit, move to the next one.
next_block_in_current_unit.reset();
++reader_ticket_number;
if (unit.is_last)
{
// It it was the last unit, we're parsing_finished.
parsing_finished = true;
}
else
{
// Pass the unit back to the segmentator.
std::unique_lock<std::mutex> lock(mutex);
unit.status = READY_TO_INSERT;
segmentator_condvar.notify_all();
}
}
return res;
}
}

View File

@ -0,0 +1,259 @@
#pragma once
#include <Processors/Formats/IInputFormat.h>
#include <DataStreams/IBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBuffer.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Interpreters/Context.h>
namespace DB
{
class Context;
/**
* ORDER-PRESERVING parallel parsing of data formats.
* It splits original data into chunks. Then each chunk is parsed by different thread.
* The number of chunks equals to the number or parser threads.
* The size of chunk is equal to min_chunk_bytes_for_parallel_parsing setting.
*
* This stream has three kinds of threads: one segmentator, multiple parsers,
* and one reader thread -- that is, the one from which readImpl() is called.
* They operate one after another on parts of data called "processing units".
* One unit consists of buffer with raw data from file, filled by segmentator
* thread. This raw data is then parsed by a parser thread to form a number of
* Blocks. These Blocks are returned to the parent stream from readImpl().
* After being read out, a processing unit is reused, to save on allocating
* memory for the raw buffer. The processing units are organized into a circular
* array to facilitate reuse and to apply backpressure on the segmentator thread
* -- after it runs out of processing units, it has to wait for the reader to
* read out the previous blocks.
* The outline of what the threads do is as follows:
* segmentator thread:
* 1) wait for the next processing unit to become empty
* 2) fill it with a part of input file
* 3) start a parser thread
* 4) repeat until eof
* parser thread:
* 1) parse the given raw buffer without any synchronization
* 2) signal that the given unit is ready to read
* 3) finish
* readImpl():
* 1) wait for the next processing unit to become ready to read
* 2) take the blocks from the processing unit to return them to the caller
* 3) signal that the processing unit is empty
* 4) repeat until it encounters unit that is marked as "past_the_end"
* All threads must also check for cancel/eof/exception flags.
*/
class ParallelParsingBlockInputFormat : public IInputFormat
{
public:
/* Used to recreate parser on every new data piece. */
using InternalParserCreator = std::function<InputFormatPtr(ReadBuffer & buf)>;
struct Params
{
ReadBuffer & in;
Block header;
InternalParserCreator internal_parser_creator;
FormatFactory::FileSegmentationEngine file_segmentation_engine;
size_t max_threads;
size_t min_chunk_bytes;
};
explicit ParallelParsingBlockInputFormat(Params params)
: IInputFormat(std::move(params.header), params.in)
, internal_parser_creator(params.internal_parser_creator)
, file_segmentation_engine(params.file_segmentation_engine)
, min_chunk_bytes(params.min_chunk_bytes)
// Subtract one thread that we use for segmentation and one for
// reading. After that, must have at least two threads left for
// parsing. See the assertion below.
, pool(params.max_threads)
{
// One unit for each thread, including segmentator and reader, plus a
// couple more units so that the segmentation thread doesn't spuriously
// bump into reader thread on wraparound.
processing_units.resize(params.max_threads + 2);
segmentator_thread = ThreadFromGlobalPool([this] { segmentatorThreadFunction(); });
}
void resetParser() override final
{
throw Exception("resetParser() is not allowed for " + getName(), ErrorCodes::LOGICAL_ERROR);
}
const BlockMissingValues & getMissingValues() const override final
{
return last_block_missing_values;
}
String getName() const override final { return "ParallelParsingBlockInputFormat"; }
protected:
Chunk generate() override final;
void onCancel() override final
{
/*
* The format parsers themselves are not being cancelled here, so we'll
* have to wait until they process the current block. Given that the
* chunk size is on the order of megabytes, this should't be too long.
* We can't call IInputFormat->cancel here, because the parser object is
* local to the parser thread, and we don't want to introduce any
* synchronization between parser threads and the other threads to get
* better performance. An ideal solution would be to add a callback to
* IInputFormat that checks whether it was cancelled.
*/
finishAndWait();
}
private:
class InternalParser
{
public:
explicit InternalParser(const InputFormatPtr & input_format_)
: input_format(input_format_)
, port(input_format->getPort().getHeader(), input_format.get())
{
connect(input_format->getPort(), port);
port.setNeeded();
}
Chunk getChunk()
{
while (true)
{
IProcessor::Status status = input_format->prepare();
switch (status)
{
case IProcessor::Status::Ready:
input_format->work();
break;
case IProcessor::Status::Finished:
return {};
case IProcessor::Status::PortFull:
return port.pull();
case IProcessor::Status::NeedData: break;
case IProcessor::Status::Async: break;
case IProcessor::Status::Wait: break;
case IProcessor::Status::ExpandPipeline:
throw Exception("One of the parsers returned status " + IProcessor::statusToName(status) +
" during parallel parsing", ErrorCodes::LOGICAL_ERROR);
}
}
}
const BlockMissingValues & getMissingValues() const { return input_format->getMissingValues(); }
private:
const InputFormatPtr & input_format;
InputPort port;
};
const InternalParserCreator internal_parser_creator;
// Function to segment the file. Then "parsers" will parse that segments.
FormatFactory::FileSegmentationEngine file_segmentation_engine;
const size_t min_chunk_bytes;
BlockMissingValues last_block_missing_values;
//Non-atomic because it is used in one thread.
std::optional<size_t> next_block_in_current_unit;
size_t segmentator_ticket_number{0};
size_t reader_ticket_number{0};
std::mutex mutex;
std::condition_variable reader_condvar;
std::condition_variable segmentator_condvar;
std::atomic<bool> parsing_finished;
// There are multiple "parsers", that's why we use thread pool.
ThreadPool pool;
// Reading and segmentating the file
ThreadFromGlobalPool segmentator_thread;
enum ProcessingUnitStatus
{
READY_TO_INSERT,
READY_TO_PARSE,
READY_TO_READ
};
struct ChunkExt
{
std::vector<Chunk> chunk;
std::vector<BlockMissingValues> block_missing_values;
};
struct ProcessingUnit
{
explicit ProcessingUnit()
: status(ProcessingUnitStatus::READY_TO_INSERT)
{
}
ChunkExt chunk_ext;
Memory<> segment;
std::atomic<ProcessingUnitStatus> status;
bool is_last{false};
};
std::exception_ptr background_exception = nullptr;
// We use deque instead of vector, because it does not require a move
// constructor, which is absent for atomics that are inside ProcessingUnit.
std::deque<ProcessingUnit> processing_units;
void scheduleParserThreadForUnitWithNumber(size_t ticket_number)
{
pool.scheduleOrThrowOnError([this, ticket_number] { parserThreadFunction(ticket_number); });
}
void finishAndWait()
{
parsing_finished = true;
{
std::unique_lock<std::mutex> lock(mutex);
segmentator_condvar.notify_all();
reader_condvar.notify_all();
}
if (segmentator_thread.joinable())
segmentator_thread.join();
try
{
pool.wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void segmentatorThreadFunction();
void parserThreadFunction(size_t current_ticket_number);
// Save/log a background exception, set termination flag, wake up all
// threads. This function is used by segmentator and parsed threads.
// readImpl() is called from the main thread, so the exception handling
// is different.
void onBackgroundException();
};
}

View File

@ -7,6 +7,7 @@
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/NestedUtils.h> #include <DataTypes/NestedUtils.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Functions/FunctionFactory.h> #include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h> #include <Functions/IFunction.h>
#include <IO/ConcatReadBuffer.h> #include <IO/ConcatReadBuffer.h>
@ -2903,7 +2904,8 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
ReadBufferFromMemory right_paren_buf(")", 1); ReadBufferFromMemory right_paren_buf(")", 1);
ConcatReadBuffer buf({&left_paren_buf, &fields_buf, &right_paren_buf}); ConcatReadBuffer buf({&left_paren_buf, &fields_buf, &right_paren_buf});
auto input_stream = FormatFactory::instance().getInput("Values", buf, metadata_snapshot->getPartitionKey().sample_block, context, context.getSettingsRef().max_block_size); auto format = FormatFactory::instance().getInput("Values", buf, partition_key_sample, context, context.getSettingsRef().max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(format);
auto block = input_stream->read(); auto block = input_stream->read();
if (!block || !block.rows()) if (!block || !block.rows())

View File

@ -25,7 +25,6 @@
#include <fcntl.h> #include <fcntl.h>
#include <unistd.h> #include <unistd.h>
#include <sys/types.h>
#include <Poco/Path.h> #include <Poco/Path.h>
#include <Poco/File.h> #include <Poco/File.h>
@ -34,6 +33,7 @@
#include <filesystem> #include <filesystem>
#include <Storages/Distributed/DirectoryMonitor.h> #include <Storages/Distributed/DirectoryMonitor.h>
#include <Processors/Sources/SourceWithProgress.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
namespace fs = std::filesystem; namespace fs = std::filesystem;
@ -291,6 +291,7 @@ public:
Chunk generate() override Chunk generate() override
{ {
std::cout << StackTrace().toString() << std::endl;
while (!finished_generate) while (!finished_generate)
{ {
/// Open file lazily on first read. This is needed to avoid too many open files from different streams. /// Open file lazily on first read. This is needed to avoid too many open files from different streams.
@ -332,8 +333,11 @@ public:
*read_buf, metadata_snapshot->getSampleBlock(), context, *read_buf, metadata_snapshot->getSampleBlock(), context,
max_block_size, storage->format_settings); max_block_size, storage->format_settings);
if (columns_description.hasDefaults())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns_description, context); reader = std::make_shared<InputStreamFromInputFormat>(format);
if (!column_defaults.empty())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
reader->readPrefix(); reader->readPrefix();
} }

View File

@ -13,6 +13,7 @@
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/HDFSCommon.h> #include <IO/HDFSCommon.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OwningBlockInputStream.h> #include <DataStreams/OwningBlockInputStream.h>
@ -122,7 +123,8 @@ public:
auto compression = chooseCompressionMethod(path, compression_method); auto compression = chooseCompressionMethod(path, compression_method);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(current_path), compression); auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(current_path), compression);
auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format);
reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf)); reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf));
reader->readPrefix(); reader->readPrefix();

View File

@ -17,6 +17,7 @@
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h> #include <DataStreams/AddingDefaultsBlockInputStream.h>
@ -33,6 +34,7 @@
#include <re2/re2.h> #include <re2/re2.h>
#include <Processors/Sources/SourceWithProgress.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
@ -82,7 +84,8 @@ namespace
, file_path(bucket + "/" + key) , file_path(bucket + "/" + key)
{ {
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key), compression_method); read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key), compression_method);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<InputStreamFromInputFormat>(input_format);
if (columns.hasDefaults()) if (columns.hasDefaults())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns, context); reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns, context);

View File

@ -14,6 +14,7 @@
#include <IO/ConnectionTimeoutsContext.h> #include <IO/ConnectionTimeoutsContext.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h> #include <DataStreams/AddingDefaultsBlockInputStream.h>
@ -105,10 +106,10 @@ namespace
context.getRemoteHostFilter()), context.getRemoteHostFilter()),
compression_method); compression_method);
reader = FormatFactory::instance().getInput(format, *read_buf,
sample_block, context, max_block_size, format_settings); auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, reader = std::make_shared<InputStreamFromInputFormat>(input_format);
columns, context); reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
} }
String getName() const override String getName() const override