delete BlockInputStreamFromRowInputStream

This commit is contained in:
Alexander Tokmakov 2019-08-27 21:29:56 +03:00
parent cf3a8b993b
commit 93c672aa0b
7 changed files with 8 additions and 307 deletions

1
.gitignore vendored
View File

@ -90,7 +90,6 @@ dbms/src/Core/tests/field
dbms/src/Core/tests/rvo_test
dbms/src/Core/tests/string_pool
dbms/src/DataStreams/tests/aggregating_stream
dbms/src/DataStreams/tests/block_row_transforms
dbms/src/DataStreams/tests/block_tab_separated_streams
dbms/src/DataStreams/tests/collapsing_sorted_stream
dbms/src/DataStreams/tests/expression_stream

View File

@ -1,178 +0,0 @@
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED;
extern const int CANNOT_PARSE_QUOTED_STRING;
extern const int CANNOT_PARSE_DATE;
extern const int CANNOT_PARSE_DATETIME;
extern const int CANNOT_READ_ARRAY_FROM_TEXT;
extern const int CANNOT_PARSE_NUMBER;
extern const int CANNOT_PARSE_UUID;
extern const int TOO_LARGE_STRING_SIZE;
extern const int CANNOT_READ_ALL_DATA;
extern const int INCORRECT_DATA;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
}
BlockInputStreamFromRowInputStream::BlockInputStreamFromRowInputStream(
const RowInputStreamPtr & row_input_,
const Block & sample_,
UInt64 max_block_size_,
UInt64 rows_portion_size_,
FormatFactory::ReadCallback callback,
const FormatSettings & settings)
: row_input(row_input_)
, sample(sample_)
, max_block_size(max_block_size_)
, rows_portion_size(rows_portion_size_)
, read_virtual_columns_callback(callback)
, allow_errors_num(settings.input_allow_errors_num)
, allow_errors_ratio(settings.input_allow_errors_ratio)
{
}
static bool isParseError(int code)
{
return code == ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED
|| code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING
|| code == ErrorCodes::CANNOT_PARSE_DATE
|| code == ErrorCodes::CANNOT_PARSE_DATETIME
|| code == ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT
|| code == ErrorCodes::CANNOT_PARSE_NUMBER
|| code == ErrorCodes::CANNOT_PARSE_UUID
|| code == ErrorCodes::TOO_LARGE_STRING_SIZE
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|| code == ErrorCodes::INCORRECT_DATA;
}
Block BlockInputStreamFromRowInputStream::readImpl()
{
size_t num_columns = sample.columns();
MutableColumns columns = sample.cloneEmptyColumns();
block_missing_values.clear();
try
{
for (size_t rows = 0, batch = 0; rows < max_block_size; ++rows, ++batch)
{
if (rows_portion_size && batch == rows_portion_size)
{
batch = 0;
if (!checkTimeLimit() || isCancelled())
break;
}
try
{
++total_rows;
RowReadExtension info_;
if (!row_input->read(columns, info_))
break;
if (read_virtual_columns_callback)
read_virtual_columns_callback();
for (size_t column_idx = 0; column_idx < info_.read_columns.size(); ++column_idx)
{
if (!info_.read_columns[column_idx])
{
size_t column_size = columns[column_idx]->size();
if (column_size == 0)
throw Exception("Unexpected empty column", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
block_missing_values.setBit(column_idx, column_size - 1);
}
}
}
catch (Exception & e)
{
/// Logic for possible skipping of errors.
if (!isParseError(e.code()))
throw;
if (allow_errors_num == 0 && allow_errors_ratio == 0)
throw;
++num_errors;
Float32 current_error_ratio = static_cast<Float32>(num_errors) / total_rows;
if (num_errors > allow_errors_num
&& current_error_ratio > allow_errors_ratio)
{
e.addMessage("(Already have " + toString(num_errors) + " errors"
" out of " + toString(total_rows) + " rows"
", which is " + toString(current_error_ratio) + " of all rows)");
throw;
}
if (!row_input->allowSyncAfterError())
{
e.addMessage("(Input format doesn't allow to skip errors)");
throw;
}
row_input->syncAfterError();
/// Truncate all columns in block to minimal size (remove values, that was appended to only part of columns).
size_t min_size = std::numeric_limits<size_t>::max();
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
min_size = std::min(min_size, columns[column_idx]->size());
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
{
auto & column = columns[column_idx];
if (column->size() > min_size)
column->popBack(column->size() - min_size);
}
}
}
}
catch (Exception & e)
{
if (!isParseError(e.code()))
throw;
String verbose_diagnostic;
try
{
verbose_diagnostic = row_input->getDiagnosticInfo();
}
catch (...)
{
/// Error while trying to obtain verbose diagnostic. Ok to ignore.
}
e.addMessage("(at row " + toString(total_rows) + ")\n" + verbose_diagnostic);
throw;
}
if (columns.empty() || columns[0]->empty())
return {};
return sample.cloneWithColumns(std::move(columns));
}
void BlockInputStreamFromRowInputStream::readSuffix()
{
if (allow_errors_num > 0 || allow_errors_ratio > 0)
{
Logger * log = &Logger::get("BlockInputStreamFromRowInputStream");
LOG_TRACE(log, "Skipped " << num_errors << " rows with errors while reading the input stream");
}
row_input->readSuffix();
}
}

View File

@ -1,62 +0,0 @@
#pragma once
#include <Core/Defines.h>
#include <DataStreams/IBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <Formats/IRowInputStream.h>
namespace DB
{
/** Makes block-oriented stream on top of row-oriented stream.
* It is used to read data from text formats.
*
* Also controls over parsing errors and prints diagnostic information about them.
*/
class BlockInputStreamFromRowInputStream : public IBlockInputStream
{
public:
/// |sample| is a block with zero rows, that structure describes how to interpret values
/// |rows_portion_size| is a number of rows to read before break and check limits
BlockInputStreamFromRowInputStream(
const RowInputStreamPtr & row_input_,
const Block & sample_,
UInt64 max_block_size_,
UInt64 rows_portion_size_,
FormatFactory::ReadCallback callback,
const FormatSettings & settings);
void readPrefix() override { row_input->readPrefix(); }
void readSuffix() override;
String getName() const override { return "BlockInputStreamFromRowInputStream"; }
RowInputStreamPtr & getRowInput() { return row_input; }
Block getHeader() const override { return sample; }
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
protected:
Block readImpl() override;
private:
RowInputStreamPtr row_input;
Block sample;
UInt64 max_block_size;
UInt64 rows_portion_size;
/// Callback used to setup virtual columns after reading each row.
FormatFactory::ReadCallback read_virtual_columns_callback;
BlockMissingValues block_missing_values;
UInt64 allow_errors_num;
Float32 allow_errors_ratio;
size_t total_rows = 0;
size_t num_errors = 0;
};
}

View File

@ -2,6 +2,3 @@ set(SRCS )
add_executable (tab_separated_streams tab_separated_streams.cpp ${SRCS})
target_link_libraries (tab_separated_streams PRIVATE dbms)
add_executable (block_row_transforms block_row_transforms.cpp ${SRCS})
target_link_libraries (block_row_transforms PRIVATE dbms)

View File

@ -1,57 +0,0 @@
#include <string>
#include <iostream>
#include <fstream>
#include <Core/Block.h>
#include <Core/ColumnWithTypeAndName.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Formats/TabSeparatedRowInputStream.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <DataStreams/copyData.h>
#include <Processors/Formats/Impl/TabSeparatedRowOutputFormat.h>
#include <Processors/Formats/OutputStreamToOutputFormat.h>
int main(int, char **)
try
{
using namespace DB;
Block sample;
ColumnWithTypeAndName col1;
col1.name = "col1";
col1.type = std::make_shared<DataTypeUInt64>();
col1.column = col1.type->createColumn();
sample.insert(col1);
ColumnWithTypeAndName col2;
col2.name = "col2";
col2.type = std::make_shared<DataTypeString>();
col2.column = col2.type->createColumn();
sample.insert(col2);
ReadBufferFromFile in_buf("test_in");
WriteBufferFromFile out_buf("test_out");
FormatSettings format_settings;
RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings);
BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, []{}, format_settings));
copyData(block_input, *block_output);
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}

View File

@ -9,12 +9,12 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Formats/TabSeparatedRowInputStream.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <Processors/Formats/Impl/TabSeparatedRowInputFormat.h>
#include <DataStreams/copyData.h>
#include <Processors/Formats/OutputStreamToOutputFormat.h>
#include <Processors/Formats/Impl/TabSeparatedRowOutputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
using namespace DB;
@ -39,13 +39,15 @@ try
FormatSettings format_settings;
RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings);
RowInputFormatParams params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0, 0, []{}};
InputFormatPtr input_format = std::make_shared<TabSeparatedRowInputFormat>(in_buf, sample, false, false, params, format_settings);
BlockInputStreamPtr block_input = std::make_shared<InputStreamFromInputFormat>(std::move(input_format));
BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, [] {}, format_settings));
copyData(block_input, *block_output);
copyData(*block_input, *block_output);
return 0;
}
catch (...)

View File

@ -174,7 +174,7 @@ Chunk IRowInputFormat::generate()
{
if (params.allow_errors_num > 0 || params.allow_errors_ratio > 0)
{
Logger * log = &Logger::get("BlockInputStreamFromRowInputStream");
Logger * log = &Logger::get("IRowInputFormat");
LOG_TRACE(log, "Skipped " << num_errors << " rows with errors while reading the input stream");
}