2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/ReadHelpers.h>
|
|
|
|
#include <IO/Operators.h>
|
2016-02-07 08:42:21 +00:00
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <DataStreams/verbosePrintString.h>
|
|
|
|
#include <DataStreams/CSVRowInputStream.h>
|
|
|
|
#include <DataTypes/DataTypesNumber.h>
|
2016-02-07 08:42:21 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int INCORRECT_DATA;
|
2016-02-07 08:42:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
CSVRowInputStream::CSVRowInputStream(ReadBuffer & istr_, const Block & sample_, const char delimiter_, bool with_names_, bool with_types_)
|
2017-04-01 07:20:54 +00:00
|
|
|
: istr(istr_), sample(sample_), delimiter(delimiter_), with_names(with_names_), with_types(with_types_)
|
2016-02-07 08:42:21 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t columns = sample.columns();
|
|
|
|
data_types.resize(columns);
|
|
|
|
for (size_t i = 0; i < columns; ++i)
|
|
|
|
data_types[i] = sample.safeGetByPosition(i).type;
|
2016-02-07 08:42:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2016-02-07 11:49:49 +00:00
|
|
|
static void skipEndOfLine(ReadBuffer & istr)
|
2016-02-07 08:42:21 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
/// \n (Unix) or \r\n (DOS/Windows) or \n\r (Mac OS Classic)
|
|
|
|
|
|
|
|
if (*istr.position() == '\n')
|
|
|
|
{
|
|
|
|
++istr.position();
|
|
|
|
if (!istr.eof() && *istr.position() == '\r')
|
|
|
|
++istr.position();
|
|
|
|
}
|
|
|
|
else if (*istr.position() == '\r')
|
|
|
|
{
|
|
|
|
++istr.position();
|
|
|
|
if (!istr.eof() && *istr.position() == '\n')
|
|
|
|
++istr.position();
|
|
|
|
else
|
|
|
|
throw Exception("Cannot parse CSV format: found \\r (CR) not followed by \\n (LF)."
|
|
|
|
" Line must end by \\n (LF) or \\r\\n (CR LF) or \\n\\r.", ErrorCodes::INCORRECT_DATA);
|
|
|
|
}
|
|
|
|
else if (!istr.eof())
|
|
|
|
throw Exception("Expected end of line", ErrorCodes::INCORRECT_DATA);
|
2016-02-07 10:43:02 +00:00
|
|
|
}
|
2016-02-07 08:42:21 +00:00
|
|
|
|
2016-02-07 10:43:02 +00:00
|
|
|
|
|
|
|
static void skipDelimiter(ReadBuffer & istr, const char delimiter, bool is_last_column)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (is_last_column)
|
|
|
|
{
|
|
|
|
if (istr.eof())
|
|
|
|
return;
|
|
|
|
|
|
|
|
/// we support the extra delimiter at the end of the line
|
|
|
|
if (*istr.position() == delimiter)
|
|
|
|
{
|
|
|
|
++istr.position();
|
|
|
|
if (istr.eof())
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
skipEndOfLine(istr);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
assertChar(delimiter, istr);
|
2016-02-07 08:42:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-03-25 20:12:56 +00:00
|
|
|
/// Skip `whitespace` symbols allowed in CSV.
|
2016-02-07 08:42:21 +00:00
|
|
|
static inline void skipWhitespacesAndTabs(ReadBuffer & buf)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
while (!buf.eof()
|
|
|
|
&& (*buf.position() == ' '
|
|
|
|
|| *buf.position() == '\t'))
|
|
|
|
++buf.position();
|
2016-02-07 08:42:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2016-02-07 10:43:02 +00:00
|
|
|
static void skipRow(ReadBuffer & istr, const char delimiter, size_t columns)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
String tmp;
|
|
|
|
for (size_t i = 0; i < columns; ++i)
|
|
|
|
{
|
|
|
|
skipWhitespacesAndTabs(istr);
|
|
|
|
readCSVString(tmp, istr);
|
|
|
|
skipWhitespacesAndTabs(istr);
|
|
|
|
|
|
|
|
skipDelimiter(istr, delimiter, i + 1 == columns);
|
|
|
|
}
|
2016-02-07 10:43:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void CSVRowInputStream::readPrefix()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
/// In this format, we assume, that if first string field contain BOM as value, it will be written in quotes,
|
|
|
|
/// so BOM at beginning of stream cannot be confused with BOM in first string value, and it is safe to skip it.
|
|
|
|
skipBOMIfExists(istr);
|
2016-06-23 19:39:20 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t columns = sample.columns();
|
|
|
|
String tmp;
|
2016-02-07 10:43:02 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (with_names)
|
|
|
|
skipRow(istr, delimiter, columns);
|
2016-02-07 10:43:02 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (with_types)
|
|
|
|
skipRow(istr, delimiter, columns);
|
2016-02-07 10:43:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2016-02-16 16:39:39 +00:00
|
|
|
bool CSVRowInputStream::read(Block & block)
|
2016-02-07 08:42:21 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
updateDiagnosticInfo();
|
2016-02-07 08:42:21 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t size = data_types.size();
|
2016-02-07 08:42:21 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (istr.eof())
|
|
|
|
return false;
|
2016-02-07 08:42:21 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
for (size_t i = 0; i < size; ++i)
|
|
|
|
{
|
|
|
|
skipWhitespacesAndTabs(istr);
|
|
|
|
data_types[i].get()->deserializeTextCSV(*block.getByPosition(i).column.get(), istr, delimiter);
|
|
|
|
skipWhitespacesAndTabs(istr);
|
2016-02-07 08:42:21 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
skipDelimiter(istr, delimiter, i + 1 == size);
|
|
|
|
}
|
2016-02-07 08:42:21 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return true;
|
2016-02-07 08:42:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-01-27 04:29:47 +00:00
|
|
|
String CSVRowInputStream::getDiagnosticInfo()
|
2016-02-07 08:42:21 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (istr.eof()) /// Buffer has gone, cannot extract information about what has been parsed.
|
|
|
|
return {};
|
|
|
|
|
|
|
|
String res;
|
|
|
|
WriteBufferFromString out(res);
|
|
|
|
Block block = sample.cloneEmpty();
|
|
|
|
|
|
|
|
/// It is possible to display detailed diagnostics only if the last and next to last rows are still in the read buffer.
|
|
|
|
size_t bytes_read_at_start_of_buffer = istr.count() - istr.offset();
|
|
|
|
if (bytes_read_at_start_of_buffer != bytes_read_at_start_of_buffer_on_prev_row)
|
|
|
|
{
|
|
|
|
out << "Could not print diagnostic info because two last rows aren't in buffer (rare case)\n";
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t max_length_of_column_name = 0;
|
|
|
|
for (size_t i = 0; i < sample.columns(); ++i)
|
|
|
|
if (sample.safeGetByPosition(i).name.size() > max_length_of_column_name)
|
|
|
|
max_length_of_column_name = sample.safeGetByPosition(i).name.size();
|
|
|
|
|
|
|
|
size_t max_length_of_data_type_name = 0;
|
|
|
|
for (size_t i = 0; i < sample.columns(); ++i)
|
|
|
|
if (sample.safeGetByPosition(i).type->getName().size() > max_length_of_data_type_name)
|
|
|
|
max_length_of_data_type_name = sample.safeGetByPosition(i).type->getName().size();
|
|
|
|
|
|
|
|
/// Roll back the cursor to the beginning of the previous or current row and parse all over again. But now we derive detailed information.
|
|
|
|
|
|
|
|
if (pos_of_prev_row)
|
|
|
|
{
|
|
|
|
istr.position() = pos_of_prev_row;
|
|
|
|
|
|
|
|
out << "\nRow " << (row_num - 1) << ":\n";
|
|
|
|
if (!parseRowAndPrintDiagnosticInfo(block, out, max_length_of_column_name, max_length_of_data_type_name))
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
if (!pos_of_current_row)
|
|
|
|
{
|
|
|
|
out << "Could not print diagnostic info because parsing of data hasn't started.\n";
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
istr.position() = pos_of_current_row;
|
|
|
|
}
|
|
|
|
|
|
|
|
out << "\nRow " << row_num << ":\n";
|
|
|
|
parseRowAndPrintDiagnosticInfo(block, out, max_length_of_column_name, max_length_of_data_type_name);
|
|
|
|
out << "\n";
|
|
|
|
|
|
|
|
return res;
|
2016-02-07 08:42:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2016-02-16 16:39:39 +00:00
|
|
|
bool CSVRowInputStream::parseRowAndPrintDiagnosticInfo(Block & block,
|
2017-04-01 07:20:54 +00:00
|
|
|
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name)
|
2016-02-07 08:42:21 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t size = data_types.size();
|
|
|
|
for (size_t i = 0; i < size; ++i)
|
|
|
|
{
|
|
|
|
if (i == 0 && istr.eof())
|
|
|
|
{
|
|
|
|
out << "<End of stream>\n";
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
out << "Column " << i << ", " << std::string((i < 10 ? 2 : i < 100 ? 1 : 0), ' ')
|
|
|
|
<< "name: " << sample.safeGetByPosition(i).name << ", " << std::string(max_length_of_column_name - sample.safeGetByPosition(i).name.size(), ' ')
|
|
|
|
<< "type: " << data_types[i]->getName() << ", " << std::string(max_length_of_data_type_name - data_types[i]->getName().size(), ' ');
|
|
|
|
|
|
|
|
auto prev_position = istr.position();
|
|
|
|
auto curr_position = istr.position();
|
|
|
|
std::exception_ptr exception;
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
skipWhitespacesAndTabs(istr);
|
|
|
|
prev_position = istr.position();
|
|
|
|
data_types[i]->deserializeTextCSV(*block.safeGetByPosition(i).column, istr, delimiter);
|
|
|
|
curr_position = istr.position();
|
|
|
|
skipWhitespacesAndTabs(istr);
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
exception = std::current_exception();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (curr_position < prev_position)
|
|
|
|
throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
if (data_types[i]->isNumeric())
|
|
|
|
{
|
|
|
|
/// An empty string instead of a number.
|
|
|
|
if (curr_position == prev_position)
|
|
|
|
{
|
|
|
|
out << "ERROR: text ";
|
|
|
|
verbosePrintString(prev_position, std::min(prev_position + 10, istr.buffer().end()), out);
|
|
|
|
out << " is not like " << data_types[i]->getName() << "\n";
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
out << "parsed text: ";
|
|
|
|
verbosePrintString(prev_position, curr_position, out);
|
|
|
|
|
|
|
|
if (exception)
|
|
|
|
{
|
|
|
|
if (data_types[i]->getName() == "DateTime")
|
|
|
|
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
|
|
|
|
else if (data_types[i]->getName() == "Date")
|
|
|
|
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
|
|
|
|
else
|
|
|
|
out << "ERROR\n";
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
out << "\n";
|
|
|
|
|
|
|
|
if (data_types[i]->isNumeric())
|
|
|
|
{
|
|
|
|
if (*curr_position != '\n' && *curr_position != '\r' && *curr_position != delimiter)
|
|
|
|
{
|
|
|
|
out << "ERROR: garbage after " << data_types[i]->getName() << ": ";
|
|
|
|
verbosePrintString(curr_position, std::min(curr_position + 10, istr.buffer().end()), out);
|
|
|
|
out << "\n";
|
|
|
|
|
|
|
|
if (data_types[i]->getName() == "DateTime")
|
|
|
|
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
|
|
|
|
else if (data_types[i]->getName() == "Date")
|
|
|
|
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
|
|
|
|
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Delimiters
|
|
|
|
if (i + 1 == size)
|
|
|
|
{
|
|
|
|
if (istr.eof())
|
|
|
|
return false;
|
|
|
|
|
|
|
|
/// we support the extra delimiter at the end of the line
|
|
|
|
if (*istr.position() == delimiter)
|
|
|
|
{
|
|
|
|
++istr.position();
|
|
|
|
if (istr.eof())
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!istr.eof() && *istr.position() != '\n' && *istr.position() != '\r')
|
|
|
|
{
|
|
|
|
out << "ERROR: There is no line feed. ";
|
|
|
|
verbosePrintString(istr.position(), istr.position() + 1, out);
|
|
|
|
out << " found instead.\n"
|
|
|
|
" It's like your file has more columns than expected.\n"
|
|
|
|
"And if your file have right number of columns, maybe it have unquoted string value with comma.\n";
|
|
|
|
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
skipEndOfLine(istr);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
assertChar(delimiter, istr);
|
|
|
|
}
|
|
|
|
catch (const DB::Exception &)
|
|
|
|
{
|
|
|
|
if (*istr.position() == '\n' || *istr.position() == '\r')
|
|
|
|
{
|
|
|
|
out << "ERROR: Line feed found where delimiter (" << delimiter << ") is expected."
|
|
|
|
" It's like your file has less columns than expected.\n"
|
|
|
|
"And if your file have right number of columns, maybe it have unescaped quotes in values.\n";
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
out << "ERROR: There is no delimiter (" << delimiter << "). ";
|
|
|
|
verbosePrintString(istr.position(), istr.position() + 1, out);
|
|
|
|
out << " found instead.\n";
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
2016-02-07 08:42:21 +00:00
|
|
|
}
|
|
|
|
|
2017-01-27 04:29:47 +00:00
|
|
|
|
|
|
|
void CSVRowInputStream::syncAfterError()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
skipToNextLineOrEOF(istr);
|
2017-01-27 04:29:47 +00:00
|
|
|
}
|
|
|
|
|
2017-03-11 00:27:59 +00:00
|
|
|
void CSVRowInputStream::updateDiagnosticInfo()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
++row_num;
|
2017-03-11 00:27:59 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
bytes_read_at_start_of_buffer_on_prev_row = bytes_read_at_start_of_buffer_on_current_row;
|
|
|
|
bytes_read_at_start_of_buffer_on_current_row = istr.count() - istr.offset();
|
2017-03-11 00:27:59 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
pos_of_prev_row = pos_of_current_row;
|
|
|
|
pos_of_current_row = istr.position();
|
2017-03-11 00:27:59 +00:00
|
|
|
}
|
|
|
|
|
2016-02-07 08:42:21 +00:00
|
|
|
}
|