2018-05-24 01:02:16 +00:00
|
|
|
#include <Processors/Formats/IRowInputFormat.h>
|
|
|
|
#include <IO/WriteHelpers.h> // toString
|
2019-08-01 14:25:41 +00:00
|
|
|
#include <common/logger_useful.h>
|
2018-05-24 01:02:16 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED;
|
|
|
|
extern const int CANNOT_PARSE_QUOTED_STRING;
|
|
|
|
extern const int CANNOT_PARSE_DATE;
|
|
|
|
extern const int CANNOT_PARSE_DATETIME;
|
|
|
|
extern const int CANNOT_READ_ARRAY_FROM_TEXT;
|
|
|
|
extern const int CANNOT_PARSE_NUMBER;
|
|
|
|
extern const int CANNOT_PARSE_UUID;
|
|
|
|
extern const int TOO_LARGE_STRING_SIZE;
|
2019-02-19 18:41:18 +00:00
|
|
|
extern const int INCORRECT_NUMBER_OF_COLUMNS;
|
2019-08-01 14:25:41 +00:00
|
|
|
extern const int TIMEOUT_EXCEEDED;
|
2018-05-24 01:02:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static bool isParseError(int code)
|
|
|
|
{
|
|
|
|
return code == ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED
|
|
|
|
|| code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING
|
|
|
|
|| code == ErrorCodes::CANNOT_PARSE_DATE
|
|
|
|
|| code == ErrorCodes::CANNOT_PARSE_DATETIME
|
|
|
|
|| code == ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT
|
|
|
|
|| code == ErrorCodes::CANNOT_PARSE_NUMBER
|
|
|
|
|| code == ErrorCodes::CANNOT_PARSE_UUID
|
|
|
|
|| code == ErrorCodes::TOO_LARGE_STRING_SIZE;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-08-01 14:25:41 +00:00
|
|
|
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
|
|
|
|
{
|
|
|
|
switch (mode)
|
|
|
|
{
|
|
|
|
case OverflowMode::THROW:
|
|
|
|
throw Exception(message, code);
|
|
|
|
case OverflowMode::BREAK:
|
|
|
|
return false;
|
|
|
|
default:
|
|
|
|
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static bool checkTimeLimit(const IRowInputFormat::Params & params, const Stopwatch & stopwatch)
|
|
|
|
{
|
|
|
|
if (params.max_execution_time != 0
|
|
|
|
&& stopwatch.elapsed() > static_cast<UInt64>(params.max_execution_time.totalMicroseconds()) * 1000)
|
|
|
|
return handleOverflowMode(params.timeout_overflow_mode,
|
|
|
|
"Timeout exceeded: elapsed " + toString(stopwatch.elapsedSeconds())
|
|
|
|
+ " seconds, maximum: " + toString(params.max_execution_time.totalMicroseconds() / 1000000.0),
|
|
|
|
ErrorCodes::TIMEOUT_EXCEEDED);
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-02-18 16:36:07 +00:00
|
|
|
Chunk IRowInputFormat::generate()
|
2018-05-24 01:02:16 +00:00
|
|
|
{
|
|
|
|
if (total_rows == 0)
|
|
|
|
readPrefix();
|
|
|
|
|
|
|
|
const Block & header = getPort().getHeader();
|
|
|
|
|
|
|
|
size_t num_columns = header.columns();
|
|
|
|
MutableColumns columns = header.cloneEmptyColumns();
|
2019-02-18 16:36:07 +00:00
|
|
|
size_t prev_rows = total_rows;
|
2018-05-24 01:02:16 +00:00
|
|
|
|
2019-07-30 18:48:40 +00:00
|
|
|
///auto chunk_missing_values = std::make_unique<ChunkMissingValues>();
|
2019-02-19 18:41:18 +00:00
|
|
|
|
2018-05-24 01:02:16 +00:00
|
|
|
try
|
|
|
|
{
|
2019-08-01 14:25:41 +00:00
|
|
|
for (size_t rows = 0, batch = 0; rows < params.max_block_size; ++rows, ++batch)
|
2018-05-24 01:02:16 +00:00
|
|
|
{
|
2019-08-01 14:25:41 +00:00
|
|
|
if (params.rows_portion_size && batch == params.rows_portion_size)
|
|
|
|
{
|
|
|
|
batch = 0;
|
|
|
|
if (!checkTimeLimit(params, total_stopwatch) || isCancelled())
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2018-05-24 01:02:16 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
++total_rows;
|
2019-02-19 18:41:18 +00:00
|
|
|
|
|
|
|
RowReadExtension info;
|
|
|
|
if (!readRow(columns, info))
|
2018-05-24 01:02:16 +00:00
|
|
|
break;
|
2019-08-01 14:25:41 +00:00
|
|
|
if (params.callback)
|
|
|
|
params.callback();
|
2019-02-19 18:41:18 +00:00
|
|
|
|
|
|
|
for (size_t column_idx = 0; column_idx < info.read_columns.size(); ++column_idx)
|
|
|
|
{
|
|
|
|
if (!info.read_columns[column_idx])
|
|
|
|
{
|
|
|
|
size_t column_size = columns[column_idx]->size();
|
|
|
|
if (column_size == 0)
|
|
|
|
throw Exception("Unexpected empty column", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
|
2019-07-30 18:48:40 +00:00
|
|
|
block_missing_values.setBit(column_idx, column_size - 1);
|
2019-02-19 18:41:18 +00:00
|
|
|
}
|
|
|
|
}
|
2018-05-24 01:02:16 +00:00
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
/// Logic for possible skipping of errors.
|
|
|
|
|
|
|
|
if (!isParseError(e.code()))
|
|
|
|
throw;
|
|
|
|
|
|
|
|
if (params.allow_errors_num == 0 && params.allow_errors_ratio == 0)
|
|
|
|
throw;
|
|
|
|
|
|
|
|
++num_errors;
|
|
|
|
Float64 current_error_ratio = static_cast<Float64>(num_errors) / total_rows;
|
|
|
|
|
|
|
|
if (num_errors > params.allow_errors_num
|
|
|
|
&& current_error_ratio > params.allow_errors_ratio)
|
|
|
|
{
|
|
|
|
e.addMessage("(Already have " + toString(num_errors) + " errors"
|
|
|
|
" out of " + toString(total_rows) + " rows"
|
|
|
|
", which is " + toString(current_error_ratio) + " of all rows)");
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!allowSyncAfterError())
|
|
|
|
{
|
|
|
|
e.addMessage("(Input format doesn't allow to skip errors)");
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
|
|
|
syncAfterError();
|
|
|
|
|
|
|
|
/// Truncate all columns in block to minimal size (remove values, that was appended to only part of columns).
|
|
|
|
|
|
|
|
size_t min_size = std::numeric_limits<size_t>::max();
|
|
|
|
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
|
|
|
|
min_size = std::min(min_size, columns[column_idx]->size());
|
|
|
|
|
|
|
|
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
|
|
|
|
{
|
|
|
|
auto & column = columns[column_idx];
|
|
|
|
if (column->size() > min_size)
|
|
|
|
column->popBack(column->size() - min_size);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
if (!isParseError(e.code()))
|
|
|
|
throw;
|
|
|
|
|
|
|
|
String verbose_diagnostic;
|
|
|
|
try
|
|
|
|
{
|
|
|
|
verbose_diagnostic = getDiagnosticInfo();
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
/// Error while trying to obtain verbose diagnostic. Ok to ignore.
|
|
|
|
}
|
|
|
|
|
|
|
|
e.addMessage("(at row " + toString(total_rows) + ")\n" + verbose_diagnostic);
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (columns.empty() || columns[0]->empty())
|
|
|
|
{
|
2019-08-01 14:25:41 +00:00
|
|
|
if (params.allow_errors_num > 0 || params.allow_errors_ratio > 0)
|
|
|
|
{
|
|
|
|
Logger * log = &Logger::get("BlockInputStreamFromRowInputStream");
|
|
|
|
LOG_TRACE(log, "Skipped " << num_errors << " rows with errors while reading the input stream");
|
|
|
|
}
|
|
|
|
|
2018-05-24 01:02:16 +00:00
|
|
|
readSuffix();
|
|
|
|
return {};
|
|
|
|
}
|
|
|
|
|
2019-02-19 18:41:18 +00:00
|
|
|
Chunk chunk(std::move(columns), total_rows - prev_rows);
|
2019-07-30 18:48:40 +00:00
|
|
|
//chunk.setChunkInfo(std::move(chunk_missing_values));
|
2019-02-18 16:36:07 +00:00
|
|
|
return chunk;
|
2018-05-24 01:02:16 +00:00
|
|
|
}
|
|
|
|
|
2019-02-19 18:41:18 +00:00
|
|
|
void IRowInputFormat::syncAfterError()
|
|
|
|
{
|
|
|
|
throw Exception("Method syncAfterError is not implemented for input format", ErrorCodes::NOT_IMPLEMENTED);
|
|
|
|
}
|
|
|
|
|
2018-05-24 01:02:16 +00:00
|
|
|
}
|