delete and fix strange code

This commit is contained in:
nikitamikhaylov 2020-09-24 21:49:18 +03:00
parent 161c921dba
commit 57705f5b73
8 changed files with 14 additions and 42 deletions

View File

@ -1,5 +1,5 @@
if (USE_CLANG_TIDY) if (USE_CLANG_TIDY)
set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH} -fix") set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}")
endif () endif ()
add_subdirectory (common) add_subdirectory (common)

View File

@ -1,5 +1,5 @@
if (USE_CLANG_TIDY) if (USE_CLANG_TIDY)
set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH} -fix") set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}")
endif () endif ()
# The `clickhouse` binary is a multi purpose tool that contains multiple execution modes (client, server, etc.), # The `clickhouse` binary is a multi purpose tool that contains multiple execution modes (client, server, etc.),

View File

@ -3,7 +3,7 @@ if (USE_INCLUDE_WHAT_YOU_USE)
endif () endif ()
if (USE_CLANG_TIDY) if (USE_CLANG_TIDY)
set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH} -fix") set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}")
endif () endif ()
if(COMPILER_PIPE) if(COMPILER_PIPE)

View File

@ -121,7 +121,7 @@ class IColumn;
\ \
M(Bool, input_format_parallel_parsing, true, "Enable parallel parsing for some data formats.", 0) \ M(Bool, input_format_parallel_parsing, true, "Enable parallel parsing for some data formats.", 0) \
M(UInt64, min_chunk_bytes_for_parallel_parsing, (10 * 1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \ M(UInt64, min_chunk_bytes_for_parallel_parsing, (10 * 1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \
M(Bool, output_format_parallel_formatting, false, "Enable parallel formatting for all data formats.", 0) \ M(Bool, output_format_parallel_formatting, false, "Enable parallel formatting for all data formats.", 1) \
\ \
M(UInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.", 0) \ M(UInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.", 0) \
M(UInt64, merge_tree_min_bytes_for_concurrent_read, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized.", 0) \ M(UInt64, merge_tree_min_bytes_for_concurrent_read, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized.", 0) \

View File

@ -149,17 +149,7 @@ InputFormatPtr FormatFactory::getInput(
if (!getCreators(name).input_processor_creator) if (!getCreators(name).input_processor_creator)
{ {
throw Exception("Format " + name + " is not suitable for input (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT;;
// const auto & input_getter = getCreators(name).input_creator;
// if (!input_getter)
// throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
//
// const Settings & settings = context.getSettingsRef();
// FormatSettings format_settings = getInputFormatSetting(settings, context);
//
// return input_getter(buf, sample, max_block_size, callback ? callback : ReadCallback(), format_settings);
throw;
} }
const Settings & settings = context.getSettingsRef(); const Settings & settings = context.getSettingsRef();
@ -185,8 +175,6 @@ InputFormatPtr FormatFactory::getInput(
if (parallel_parsing) if (parallel_parsing)
{ {
const auto & input_getter = getCreators(name).input_processor_creator; const auto & input_getter = getCreators(name).input_processor_creator;
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
RowInputFormatParams row_input_format_params; RowInputFormatParams row_input_format_params;
row_input_format_params.max_block_size = max_block_size; row_input_format_params.max_block_size = max_block_size;
@ -221,16 +209,7 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
if (!getCreators(name).output_processor_creator) if (!getCreators(name).output_processor_creator)
{ {
const auto & output_getter = getCreators(name).output_creator; throw Exception("Format " + name + " is not suitable for output (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
if (!output_getter)
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
/** Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return std::make_shared<MaterializingBlockOutputStream>(
output_getter(buf, sample, std::move(callback), format_settings),
sample);
} }
@ -240,8 +219,6 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
if (parallel_formatting) if (parallel_formatting)
{ {
const auto & output_getter = getCreators(name).output_processor_creator; const auto & output_getter = getCreators(name).output_processor_creator;
if (!output_getter)
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
FormatSettings format_settings = getOutputFormatSetting(settings, context); FormatSettings format_settings = getOutputFormatSetting(settings, context);
@ -256,7 +233,7 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name,
// if (format_settings.enable_streaming) // if (format_settings.enable_streaming)
// format->setAutoFlush(); // format->setAutoFlush();
ParallelFormattingOutputFormat::Params params{buf, sample, formatter_creator}; ParallelFormattingOutputFormat::Params params{buf, sample, formatter_creator, settings.max_threads};
auto format = std::make_shared<ParallelFormattingOutputFormat>(params); auto format = std::make_shared<ParallelFormattingOutputFormat>(params);
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample); return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
} }

View File

@ -39,15 +39,12 @@ protected:
RowsBeforeLimitCounterPtr rows_before_limit_counter; RowsBeforeLimitCounterPtr rows_before_limit_counter;
friend class ParallelFormattingOutputFormat;
virtual void consume(Chunk) = 0; virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {} virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {} virtual void consumeExtremes(Chunk) {}
virtual void finalize() {} virtual void finalize() {}
public: public:
IOutputFormat(const Block & header_, WriteBuffer & out_); IOutputFormat(const Block & header_, WriteBuffer & out_);
Status prepare() override; Status prepare() override;

View File

@ -23,12 +23,12 @@ public:
void prepareReadBuffer(ReadBuffer & buffer) override void prepareReadBuffer(ReadBuffer & buffer) override
{ {
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it. /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(in); skipBOMIfExists(buffer);
skipWhitespaceIfAny(in); skipWhitespaceIfAny(buffer);
if (!in.eof() && *in.position() == '[') if (!buffer.eof() && *buffer.position() == '[')
{ {
++in.position(); ++buffer.position();
data_in_square_brackets = true; data_in_square_brackets = true;
} }
} }

View File

@ -16,9 +16,6 @@
namespace DB namespace DB
{ {
const size_t min_chunk_bytes_for_parallel_formatting = 1024;
const size_t max_threads_for_parallel_formatting = 6;
class ParallelFormattingOutputFormat : public IOutputFormat class ParallelFormattingOutputFormat : public IOutputFormat
{ {
public: public:
@ -30,15 +27,16 @@ public:
WriteBuffer & out; WriteBuffer & out;
const Block & header; const Block & header;
InternalFormatterCreator internal_formatter_creator; InternalFormatterCreator internal_formatter_creator;
const size_t max_thread_for_parallel_formatting;
}; };
explicit ParallelFormattingOutputFormat(Params params) explicit ParallelFormattingOutputFormat(Params params)
: IOutputFormat(params.header, params.out) : IOutputFormat(params.header, params.out)
, internal_formatter_creator(params.internal_formatter_creator) , internal_formatter_creator(params.internal_formatter_creator)
, pool(max_threads_for_parallel_formatting) , pool(params.max_threads_for_parallel_formatting)
{ {
processing_units.resize(max_threads_for_parallel_formatting + 2); processing_units.resize(params.max_threads_for_parallel_formatting + 2);
collector_thread = ThreadFromGlobalPool([&] { collectorThreadFunction(); }); collector_thread = ThreadFromGlobalPool([&] { collectorThreadFunction(); });
} }