diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 6e248ccb4e0..56a4f7f7133 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -31,6 +31,10 @@ namespace ErrorCodes NUMBER_OF_COLUMNS_DOESNT_MATCH, CANNOT_READ_ALL_DATA_FROM_TAB_SEPARATED_INPUT, CANNOT_PARSE_ALL_VALUE_FROM_TAB_SEPARATED_INPUT, + CANNOT_READ_FROM_ISTREAM, + CANNOT_WRITE_TO_OSTREAM, + CANNOT_PARSE_ESCAPE_SEQUENCE, + CANNOT_PARSE_QUOTED_STRING, }; } diff --git a/dbms/include/DB/Core/ReadBuffer.h b/dbms/include/DB/Core/ReadBuffer.h new file mode 100644 index 00000000000..f5e4aaba8af --- /dev/null +++ b/dbms/include/DB/Core/ReadBuffer.h @@ -0,0 +1,298 @@ +#ifndef DBMS_COMMON_READBUFFER_H +#define DBMS_COMMON_READBUFFER_H + +#include + +#include +#include +#include + +#define DEFAULT_READ_BUFFER_SIZE 1048576 + + +namespace DB +{ + +/** Простой абстрактный класс для буферизованного чтения данных (последовательности char) откуда-нибудь. + * В отличие от std::istream, предоставляет доступ к внутреннему буферу, + * а также позволяет вручную управлять позицией внутри буфера. + * + * Наследники должны реализовать метод next(). + * + * Также предоставляет набор функций для форматированного и неформатированного чтения. + * (с простой и грубой реализацией) + */ +class ReadBuffer +{ +public: + typedef const char * Position; + + struct Buffer + { + Buffer(Position begin_pos_, Position end_pos_) : begin_pos(begin_pos_), end_pos(end_pos_) {} + + inline Position begin() { return begin_pos; } + inline Position end() { return end_pos; } + + private: + Position begin_pos; + Position end_pos; /// на 1 байт после конца буфера + }; + + ReadBuffer() : working_buffer(internal_buffer, internal_buffer), pos(internal_buffer) {} + + /// получить часть буфера, из которого можно читать данные + inline Buffer & buffer() { return working_buffer; } + + /// получить (для чтения и изменения) позицию в буфере + inline Position & position() { return pos; }; + + /** прочитать следующие данные и заполнить ими буфер; переместить позицию в начало; + * вернуть false в случае конца, true иначе; кинуть исключение, если что-то не так + */ + virtual bool next() { return false; } + + virtual ~ReadBuffer() {} + + + /// Функции для чтения конкретных данных + + inline bool eof() + { + return pos == working_buffer.end() && !next(); + } + + + void readChar(char & x) + { + x = 0; + if (!eof()) + { + x = *pos; + ++pos; + } + } + + + void ignore() + { + if (!eof()) + ++pos; + } + + + /// грубо + template + void readIntText(T & x) + { + x = 0; + while (!eof()) + { + switch (*pos) + { + case '+': + break; + case '-': + x = -x; + break; + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + x *= 10; + x += *pos - '0'; + break; + default: + return; + } + ++pos; + } + } + + /// грубо; поддерживается только простой формат + template + void readFloatText(T & x) + { + x = 0; + bool after_point = false; + double power_of_ten = 1; + + while (!eof()) + { + switch (*pos) + { + case '+': + break; + case '-': + x = -x; + break; + case '.': + after_point = true; + break; + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + if (after_point) + { + power_of_ten /= 10; + x += (*pos - '0') * power_of_ten; + } + else + { + x *= 10; + x += *pos - '0'; + } + break; + default: + return; + } + ++pos; + } + } + + /// грубо; всё до '\n' или '\t' + void readString(String & s) + { + s = ""; + while (!eof()) + { + size_t bytes = 0; + for (; pos + bytes != working_buffer.end(); ++bytes) + if (pos[bytes] == '\t' || pos[bytes] == '\n') + break; + + s.append(pos, bytes); + pos += bytes; + + if (pos != working_buffer.end()) + return; + } + } + + void readEscapedString(String & s) + { + s = ""; + while (!eof()) + { + size_t bytes = 0; + for (; pos + bytes != working_buffer.end(); ++bytes) + if (pos[bytes] == '\\' || pos[bytes] == '\t' || pos[bytes] == '\n') + break; + + s.append(pos, bytes); + pos += bytes; + + if (*pos == '\t' || *pos == '\n') + return; + + if (*pos == '\\') + { + ++pos; + if (eof()) + throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE); + s += parseEscapeSequence(*pos); + ++pos; + } + } + } + + void readQuotedString(String & s) + { + s = ""; + + if (eof() || *pos != '\'') + throw Exception("Cannot parse quoted string: expected opening single quote", + ErrorCodes::CANNOT_PARSE_QUOTED_STRING); + ++pos; + + while (!eof()) + { + size_t bytes = 0; + for (; pos + bytes != working_buffer.end(); ++bytes) + if (pos[bytes] == '\\' || pos[bytes] == '\'') + break; + + s.append(pos, bytes); + pos += bytes; + + if (*pos == '\'') + { + ++pos; + return; + } + + if (*pos == '\\') + { + ++pos; + if (eof()) + throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE); + s += parseEscapeSequence(*pos); + ++pos; + } + } + + throw Exception("Cannot parse quoted string: expected closing single quote", + ErrorCodes::CANNOT_PARSE_QUOTED_STRING); + } + + + size_t read(char * to, size_t n) + { + size_t bytes_copied = 0; + + while (!eof() && bytes_copied < n) + { + size_t bytes_to_copy = std::min(static_cast(working_buffer.end() - pos), n - bytes_copied); + memcpy(to, pos, bytes_to_copy); + pos += bytes_to_copy; + } + + return bytes_copied; + } + + + + +protected: + char internal_buffer[DEFAULT_READ_BUFFER_SIZE]; + Buffer working_buffer; + Position pos; + +private: + inline char parseEscapeSequence(char c) + { + switch(c) + { + case 'b': + return '\b'; + case 'f': + return '\f'; + case 'n': + return '\n'; + case 'r': + return '\r'; + case 't': + return '\t'; + default: + return c; + } + } +}; + + +} + +#endif diff --git a/dbms/include/DB/Core/ReadBufferFromIStream.h b/dbms/include/DB/Core/ReadBufferFromIStream.h new file mode 100644 index 00000000000..dfa37eb150a --- /dev/null +++ b/dbms/include/DB/Core/ReadBufferFromIStream.h @@ -0,0 +1,44 @@ +#ifndef DBMS_COMMON_READBUFFER_FROM_ISTREAM_H +#define DBMS_COMMON_READBUFFER_FROM_ISTREAM_H + +#include + +#include +#include + +#include + + +namespace DB +{ + +class ReadBufferFromIStream : public ReadBuffer +{ +private: + std::istream & istr; + +public: + ReadBufferFromIStream(std::istream & istr_) : istr(istr_) {} + + bool next() + { + istr.read(internal_buffer, DEFAULT_READ_BUFFER_SIZE); + + pos = internal_buffer; + working_buffer = Buffer(internal_buffer, internal_buffer + istr.gcount()); + + if (working_buffer.end() == working_buffer.begin()) + { + if (istr.eof()) + return false; + if (!istr.good()) + throw Exception("Cannot read from istream", ErrorCodes::CANNOT_READ_FROM_ISTREAM); + } + + return true; + } +}; + +} + +#endif diff --git a/dbms/src/Core/tests/read_buffer.cpp b/dbms/src/Core/tests/read_buffer.cpp new file mode 100644 index 00000000000..4e7ff2a58fd --- /dev/null +++ b/dbms/src/Core/tests/read_buffer.cpp @@ -0,0 +1,41 @@ +#include + +#include +#include + +#include + + +int main(int argc, char ** argv) +{ + try + { + std::stringstream s; + s << "-123456 123.456 вася пе\\tтя\t'\\'xyz\\\\'"; + DB::ReadBufferFromIStream in(s); + + DB::Int64 a; + DB::Float64 b; + DB::String c, d; + + in.readIntText(a); + in.ignore(); + + in.readFloatText(b); + in.ignore(); + + in.readEscapedString(c); + in.ignore(); + + in.readQuotedString(d); + + std::cout << a << ' ' << b << ' ' << c << '\t' << '\'' << d << '\'' << std::endl; + } + catch (const DB::Exception & e) + { + std::cerr << e.what() << ", " << e.message() << std::endl; + return 1; + } + + return 0; +} diff --git a/dbms/src/Core/tests/read_buffer_perf.cpp b/dbms/src/Core/tests/read_buffer_perf.cpp new file mode 100644 index 00000000000..1fd775c8452 --- /dev/null +++ b/dbms/src/Core/tests/read_buffer_perf.cpp @@ -0,0 +1,48 @@ +#include + +#include +#include + +#include + + +int main(int argc, char ** argv) +{ + try + { + std::ifstream istr("test"); + DB::ReadBufferFromIStream in(istr); + + DB::Int64 a = 0; + DB::Float64 b = 0; + DB::String c, d; + + size_t i = 0; + while (!in.eof()) + { + in.readIntText(a); + in.ignore(); + + in.readFloatText(b); + in.ignore(); + + in.readEscapedString(c); + in.ignore(); + + in.readQuotedString(d); + in.ignore(); + + ++i; + } + + std::cout << a << ' ' << b << ' ' << c << '\t' << '\'' << d << '\'' << std::endl; + std::cout << i << std::endl; + } + catch (const DB::Exception & e) + { + std::cerr << e.what() << ", " << e.message() << std::endl; + return 1; + } + + return 0; +} diff --git a/dbms/src/Storages/tests/hit_log.cpp b/dbms/src/Storages/tests/hit_log.cpp index 3e4cb3263af..3505679dc04 100644 --- a/dbms/src/Storages/tests/hit_log.cpp +++ b/dbms/src/Storages/tests/hit_log.cpp @@ -119,6 +119,14 @@ int main(int argc, char ** argv) /// читаем из неё if (argc == 2 && 0 == strcmp(argv[1], "read")) { +/* DB::ColumnNames column_names; + boost::assign::push_back(column_names) + ("SearchPhrase"); + + SharedPtr data_types = new DB::DataTypes; + boost::assign::push_back(*data_types) + (new DB::DataTypeString); +*/ SharedPtr in = table.read(column_names, 0); DB::TabSeparatedRowOutputStream out(std::cout, data_types); DB::copyData(*in, out);