From f6f252b8f2bc605f45b4d147dff564d9a9d9e87c Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 1 Jun 2010 13:35:09 +0000 Subject: [PATCH] dbms: development. --- dbms/include/DB/Core/ErrorCodes.h | 3 + dbms/include/DB/Core/ReadBuffer.h | 467 ++++++++++-------- dbms/include/DB/Core/WriteBuffer.h | 198 ++++++++ dbms/include/DB/Core/WriteBufferFromOStream.h | 40 ++ dbms/src/Core/tests/read_buffer.cpp | 8 +- dbms/src/Core/tests/read_buffer_perf.cpp | 8 +- dbms/src/Core/tests/write_buffer.cpp | 45 ++ dbms/src/Core/tests/write_buffer_perf.cpp | 43 ++ 8 files changed, 600 insertions(+), 212 deletions(-) create mode 100644 dbms/include/DB/Core/WriteBuffer.h create mode 100644 dbms/include/DB/Core/WriteBufferFromOStream.h create mode 100644 dbms/src/Core/tests/write_buffer.cpp create mode 100644 dbms/src/Core/tests/write_buffer_perf.cpp diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 56a4f7f7133..453e6a6ba62 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -35,6 +35,9 @@ namespace ErrorCodes CANNOT_WRITE_TO_OSTREAM, CANNOT_PARSE_ESCAPE_SEQUENCE, CANNOT_PARSE_QUOTED_STRING, + CANNOT_PARSE_INPUT_ASSERTION_FAILED, + CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER, + CANNOT_PRINT_INTEGER, }; } diff --git a/dbms/include/DB/Core/ReadBuffer.h b/dbms/include/DB/Core/ReadBuffer.h index 94cd26cf435..807fda27636 100644 --- a/dbms/include/DB/Core/ReadBuffer.h +++ b/dbms/include/DB/Core/ReadBuffer.h @@ -1,8 +1,7 @@ #ifndef DBMS_COMMON_READBUFFER_H #define DBMS_COMMON_READBUFFER_H -#include // memcpy - +#include #include #include @@ -57,199 +56,16 @@ public: virtual ~ReadBuffer() {} - /// Функции для чтения конкретных данных - inline bool eof() { return pos == working_buffer.end() && !next(); } - - void readChar(char & x) - { - x = 0; - if (!eof()) - { - x = *pos; - ++pos; - } - } - - void ignore() { if (!eof()) ++pos; } - - - /// грубо - template - void readIntText(T & x) - { - x = 0; - while (!eof()) - { - switch (*pos) - { - case '+': - break; - case '-': - x = -x; - break; - case '0': - case '1': - case '2': - case '3': - case '4': - case '5': - case '6': - case '7': - case '8': - case '9': - x *= 10; - x += *pos - '0'; - break; - default: - return; - } - ++pos; - } - } - - /// грубо; поддерживается только простой формат - template - void readFloatText(T & x) - { - x = 0; - bool after_point = false; - double power_of_ten = 1; - - while (!eof()) - { - switch (*pos) - { - case '+': - break; - case '-': - x = -x; - break; - case '.': - after_point = true; - break; - case '0': - case '1': - case '2': - case '3': - case '4': - case '5': - case '6': - case '7': - case '8': - case '9': - if (after_point) - { - power_of_ten /= 10; - x += (*pos - '0') * power_of_ten; - } - else - { - x *= 10; - x += *pos - '0'; - } - break; - default: - return; - } - ++pos; - } - } - - /// грубо; всё до '\n' или '\t' - void readString(String & s) - { - s = ""; - while (!eof()) - { - size_t bytes = 0; - for (; pos + bytes != working_buffer.end(); ++bytes) - if (pos[bytes] == '\t' || pos[bytes] == '\n') - break; - - s.append(pos, bytes); - pos += bytes; - - if (pos != working_buffer.end()) - return; - } - } - - void readEscapedString(String & s) - { - s = ""; - while (!eof()) - { - size_t bytes = 0; - for (; pos + bytes != working_buffer.end(); ++bytes) - if (pos[bytes] == '\\' || pos[bytes] == '\t' || pos[bytes] == '\n') - break; - - s.append(pos, bytes); - pos += bytes; - - if (*pos == '\t' || *pos == '\n') - return; - - if (*pos == '\\') - { - ++pos; - if (eof()) - throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE); - s += parseEscapeSequence(*pos); - ++pos; - } - } - } - - void readQuotedString(String & s) - { - s = ""; - - if (eof() || *pos != '\'') - throw Exception("Cannot parse quoted string: expected opening single quote", - ErrorCodes::CANNOT_PARSE_QUOTED_STRING); - ++pos; - - while (!eof()) - { - size_t bytes = 0; - for (; pos + bytes != working_buffer.end(); ++bytes) - if (pos[bytes] == '\\' || pos[bytes] == '\'') - break; - - s.append(pos, bytes); - pos += bytes; - - if (*pos == '\'') - { - ++pos; - return; - } - - if (*pos == '\\') - { - ++pos; - if (eof()) - throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE); - s += parseEscapeSequence(*pos); - ++pos; - } - } - - throw Exception("Cannot parse quoted string: expected closing single quote", - ErrorCodes::CANNOT_PARSE_QUOTED_STRING); - } - size_t read(char * to, size_t n) { @@ -258,41 +74,284 @@ public: while (!eof() && bytes_copied < n) { size_t bytes_to_copy = std::min(static_cast(working_buffer.end() - pos), n - bytes_copied); - memcpy(to, pos, bytes_to_copy); + std::memcpy(to, pos, bytes_to_copy); pos += bytes_to_copy; } return bytes_copied; } - - - protected: char internal_buffer[DEFAULT_READ_BUFFER_SIZE]; Buffer working_buffer; Position pos; +}; -private: - inline char parseEscapeSequence(char c) + + +/// Функции-помошники для форматированного чтения + +static inline char parseEscapeSequence(char c) +{ + switch(c) { - switch(c) + case 'b': + return '\b'; + case 'f': + return '\f'; + case 'n': + return '\n'; + case 'r': + return '\r'; + case 't': + return '\t'; + case '0': + return '\0'; + default: + return c; + } +} + +static void assertString(const char * s, ReadBuffer & buf) +{ + for (; *s; ++s) + { + if (buf.eof() || *buf.position() != *s) + throw Exception(String("Cannot parse input: expected ") + s, ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED); + ++buf.position(); + } +} + + +/// грубо +template +void readIntText(T & x, ReadBuffer & buf) +{ + bool negative = false; + x = 0; + while (!buf.eof()) + { + switch (*buf.position()) { - case 'b': - return '\b'; - case 'f': - return '\f'; - case 'n': - return '\n'; - case 'r': - return '\r'; - case 't': - return '\t'; + case '+': + break; + case '-': + negative = true; + break; + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + x *= 10; + x += *buf.position() - '0'; + break; default: - return c; + if (negative) + x = -x; + return; + } + ++buf.position(); + } + if (negative) + x = -x; +} + +/// грубо +template +void readFloatText(T & x, ReadBuffer & buf) +{ + bool negative = false; + x = 0; + bool after_point = false; + double power_of_ten = 1; + + while (!buf.eof()) + { + switch (*buf.position()) + { + case '+': + break; + case '-': + negative = true; + break; + case '.': + after_point = true; + break; + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + if (after_point) + { + power_of_ten /= 10; + x += (*buf.position() - '0') * power_of_ten; + } + else + { + x *= 10; + x += *buf.position() - '0'; + } + break; + case 'e': + case 'E': + { + ++buf.position(); + Int32 exponent = 0; + readIntText(exponent, buf); + if (exponent == 0) + { + if (negative) + x = -x; + return; + } + else if (exponent > 0) + { + for (Int32 i = 0; i < exponent; ++i) + x *= 10; + if (negative) + x = -x; + return; + } + else + { + for (Int32 i = 0; i < exponent; ++i) + x /= 10; + if (negative) + x = -x; + return; + } + } + case 'i': + ++buf.position(); + assertString("nf", buf); + x = std::numeric_limits::infinity(); + if (negative) + x = -x; + return; + case 'I': + ++buf.position(); + assertString("NF", buf); + x = std::numeric_limits::infinity(); + if (negative) + x = -x; + return; + case 'n': + ++buf.position(); + assertString("an", buf); + x = std::numeric_limits::quiet_NaN(); + return; + case 'N': + ++buf.position(); + assertString("AN", buf); + x = std::numeric_limits::quiet_NaN(); + return; + default: + if (negative) + x = -x; + return; + } + ++buf.position(); + } + if (negative) + x = -x; +} + +/// грубо; всё до '\n' или '\t' +void readString(String & s, ReadBuffer & buf) +{ + s = ""; + while (!buf.eof()) + { + size_t bytes = 0; + for (; buf.position() + bytes != buf.buffer().end(); ++bytes) + if (buf.position()[bytes] == '\t' || buf.position()[bytes] == '\n') + break; + + s.append(buf.position(), bytes); + buf.position() += bytes; + + if (buf.position() != buf.buffer().end()) + return; + } +} + +void readEscapedString(String & s, ReadBuffer & buf) +{ + s = ""; + while (!buf.eof()) + { + size_t bytes = 0; + for (; buf.position() + bytes != buf.buffer().end(); ++bytes) + if (buf.position()[bytes] == '\\' || buf.position()[bytes] == '\t' || buf.position()[bytes] == '\n') + break; + + s.append(buf.position(), bytes); + buf.position() += bytes; + + if (*buf.position() == '\t' || *buf.position() == '\n') + return; + + if (*buf.position() == '\\') + { + ++buf.position(); + if (buf.eof()) + throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE); + s += parseEscapeSequence(*buf.position()); + ++buf.position(); } } -}; +} + +void readQuotedString(String & s, ReadBuffer & buf) +{ + s = ""; + + if (buf.eof() || *buf.position() != '\'') + throw Exception("Cannot parse quoted string: expected opening single quote", + ErrorCodes::CANNOT_PARSE_QUOTED_STRING); + ++buf.position(); + + while (!buf.eof()) + { + size_t bytes = 0; + for (; buf.position() + bytes != buf.buffer().end(); ++bytes) + if (buf.position()[bytes] == '\\' || buf.position()[bytes] == '\'') + break; + + s.append(buf.position(), bytes); + buf.position() += bytes; + + if (*buf.position() == '\'') + { + ++buf.position(); + return; + } + + if (*buf.position() == '\\') + { + ++buf.position(); + if (buf.eof()) + throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE); + s += parseEscapeSequence(*buf.position()); + ++buf.position(); + } + } + + throw Exception("Cannot parse quoted string: expected closing single quote", + ErrorCodes::CANNOT_PARSE_QUOTED_STRING); +} } diff --git a/dbms/include/DB/Core/WriteBuffer.h b/dbms/include/DB/Core/WriteBuffer.h new file mode 100644 index 00000000000..79d56936317 --- /dev/null +++ b/dbms/include/DB/Core/WriteBuffer.h @@ -0,0 +1,198 @@ +#ifndef DBMS_COMMON_WRITEBUFFER_H +#define DBMS_COMMON_WRITEBUFFER_H + +#include +#include +#include +#include + +#include +#include +#include + +#define DEFAULT_WRITE_BUFFER_SIZE 1048576 +#define DEFAULT_FLOAT_PRECISION 6 +/// 20 цифр, знак, и \0 для конца строки +#define MAX_INT_WIDTH 22 + + +namespace DB +{ + +/** Простой абстрактный класс для буферизованной записи данных (последовательности char) куда-нибудь. + * В отличие от std::ostream, предоставляет доступ к внутреннему буферу, + * а также позволяет вручную управлять позицией внутри буфера. + * + * Наследники должны реализовать метод next(). + * + * Также предоставляет набор функций для форматированной и неформатированной записи. + * (с простой и грубой реализацией) + */ +class WriteBuffer +{ +public: + typedef char * Position; + + struct Buffer + { + Buffer(Position begin_pos_, Position end_pos_) : begin_pos(begin_pos_), end_pos(end_pos_) {} + + inline Position begin() { return begin_pos; } + inline Position end() { return end_pos; } + + private: + Position begin_pos; + Position end_pos; /// на 1 байт после конца буфера + }; + + WriteBuffer() : working_buffer(internal_buffer, internal_buffer + DEFAULT_WRITE_BUFFER_SIZE), pos(internal_buffer) {} + + /// получить часть буфера, в который можно писать данные + inline Buffer & buffer() { return working_buffer; } + + /// получить (для чтения и изменения) позицию в буфере + inline Position & position() { return pos; }; + + /** записать данные, находящиеся в буфере (от начала буфера до текущей позиции); + * переместить позицию в начало; кинуть исключение, если что-то не так + */ + virtual void next() {} + + virtual ~WriteBuffer() {} + + + inline void nextIfAtEnd() + { + if (pos == working_buffer.end()) + next(); + } + + + void write(const char * from, size_t n) + { + size_t bytes_copied = 0; + + while (bytes_copied < n) + { + nextIfAtEnd(); + size_t bytes_to_copy = std::min(static_cast(working_buffer.end() - pos), n - bytes_copied); + std::memcpy(pos, from + bytes_copied, bytes_to_copy); + pos += bytes_to_copy; + bytes_copied += bytes_to_copy; + } + } + +protected: + char internal_buffer[DEFAULT_WRITE_BUFFER_SIZE]; + Buffer working_buffer; + Position pos; +}; + + +/// Функции-помошники для форматированной записи + +void writeChar(char x, WriteBuffer & buf) +{ + buf.nextIfAtEnd(); + *buf.position() = x; + ++buf.position(); +} + + +template struct IntFormat { static const char * format; }; +template <> const char * IntFormat::format = "%hhi"; +template <> const char * IntFormat::format = "%hi"; +template <> const char * IntFormat::format = "%li"; +template <> const char * IntFormat::format = "%lli"; +template <> const char * IntFormat::format = "%hhi"; +template <> const char * IntFormat::format = "%hi"; +template <> const char * IntFormat::format = "%li"; +template <> const char * IntFormat::format = "%lli"; + +/// грубо +template +void writeIntText(T x, WriteBuffer & buf) +{ + char tmp[MAX_INT_WIDTH]; + int res = std::snprintf(tmp, MAX_INT_WIDTH, IntFormat::format, x); + + if (res >= MAX_INT_WIDTH || res <= 0) + throw Exception("Cannot print integer", ErrorCodes::CANNOT_PRINT_INTEGER); + + buf.write(tmp, res - 1); +} + +template +void writeFloatText(T x, WriteBuffer & buf, unsigned precision = DEFAULT_FLOAT_PRECISION) +{ + unsigned size = precision + 10; + char tmp[size]; /// знаки, +0.0e+123\0 + int res = std::snprintf(tmp, size, "%.*g", precision, x); + + if (res >= static_cast(size) || res <= 0) + throw Exception("Cannot print float or double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER); + + buf.write(tmp, res - 1); +} + +void writeString(const String & s, WriteBuffer & buf) +{ + buf.write(s.data(), s.size()); +} + +/// предполагается, что строка в оперативке хранится непрерывно, и \0-terminated. +void writeEscapedString(const String & s, WriteBuffer & buf) +{ + for (String::const_iterator it = s.begin(); it != s.end(); ++it) + { + switch (*it) + { + case '\b': + writeChar('\\', buf); + writeChar('b', buf); + break; + case '\f': + writeChar('\\', buf); + writeChar('f', buf); + break; + case '\n': + writeChar('\\', buf); + writeChar('n', buf); + break; + case '\r': + writeChar('\\', buf); + writeChar('r', buf); + break; + case '\t': + writeChar('\\', buf); + writeChar('t', buf); + break; + case '\0': + writeChar('\\', buf); + writeChar('0', buf); + break; + case '\'': + writeChar('\\', buf); + writeChar('\'', buf); + break; + case '\\': + writeChar('\\', buf); + writeChar('\\', buf); + break; + default: + writeChar(*it, buf); + } + } +} + +void writeQuotedString(const String & s, WriteBuffer & buf) +{ + writeChar('\'', buf); + writeEscapedString(s, buf); + writeChar('\'', buf); +} + + +} + +#endif diff --git a/dbms/include/DB/Core/WriteBufferFromOStream.h b/dbms/include/DB/Core/WriteBufferFromOStream.h new file mode 100644 index 00000000000..f177bf21da4 --- /dev/null +++ b/dbms/include/DB/Core/WriteBufferFromOStream.h @@ -0,0 +1,40 @@ +#ifndef DBMS_COMMON_WRITEBUFFER_FROM_OSTREAM_H +#define DBMS_COMMON_WRITEBUFFER_FROM_OSTREAM_H + +#include + +#include +#include + +#include + + +namespace DB +{ + +class WriteBufferFromOStream : public WriteBuffer +{ +private: + std::ostream & ostr; + +public: + WriteBufferFromOStream(std::ostream & ostr_) : ostr(ostr_) {} + + void next() + { + ostr.write(internal_buffer, pos - internal_buffer); + pos = internal_buffer; + + if (!ostr.good()) + throw Exception("Cannot write to ostream", ErrorCodes::CANNOT_WRITE_TO_OSTREAM); + } + + ~WriteBufferFromOStream() + { + next(); + } +}; + +} + +#endif diff --git a/dbms/src/Core/tests/read_buffer.cpp b/dbms/src/Core/tests/read_buffer.cpp index 4e7ff2a58fd..89ed3f494a2 100644 --- a/dbms/src/Core/tests/read_buffer.cpp +++ b/dbms/src/Core/tests/read_buffer.cpp @@ -18,16 +18,16 @@ int main(int argc, char ** argv) DB::Float64 b; DB::String c, d; - in.readIntText(a); + DB::readIntText(a, in); in.ignore(); - in.readFloatText(b); + DB::readFloatText(b, in); in.ignore(); - in.readEscapedString(c); + DB::readEscapedString(c, in); in.ignore(); - in.readQuotedString(d); + DB::readQuotedString(d, in); std::cout << a << ' ' << b << ' ' << c << '\t' << '\'' << d << '\'' << std::endl; } diff --git a/dbms/src/Core/tests/read_buffer_perf.cpp b/dbms/src/Core/tests/read_buffer_perf.cpp index 1fd775c8452..8e33718073f 100644 --- a/dbms/src/Core/tests/read_buffer_perf.cpp +++ b/dbms/src/Core/tests/read_buffer_perf.cpp @@ -20,16 +20,16 @@ int main(int argc, char ** argv) size_t i = 0; while (!in.eof()) { - in.readIntText(a); + DB::readIntText(a, in); in.ignore(); - in.readFloatText(b); + DB::readFloatText(b, in); in.ignore(); - in.readEscapedString(c); + DB::readEscapedString(c, in); in.ignore(); - in.readQuotedString(d); + DB::readQuotedString(d, in); in.ignore(); ++i; diff --git a/dbms/src/Core/tests/write_buffer.cpp b/dbms/src/Core/tests/write_buffer.cpp new file mode 100644 index 00000000000..da71744e19b --- /dev/null +++ b/dbms/src/Core/tests/write_buffer.cpp @@ -0,0 +1,45 @@ +#include + +#include +#include + +#include + + +int main(int argc, char ** argv) +{ + try + { + DB::Int64 a = -123456; + DB::Float64 b = 123.456; + DB::String c = "вася пе\tтя"; + DB::String d = "'xyz\\"; + + std::stringstream s; + + { + DB::WriteBufferFromOStream out(s); + + DB::writeIntText(a, out); + DB::writeChar(' ', out); + + DB::writeFloatText(b, out); + DB::writeChar(' ', out); + + DB::writeEscapedString(c, out); + DB::writeChar('\t', out); + + DB::writeQuotedString(d, out); + DB::writeChar('\n', out); + } + + std::cout << s.str(); + } + catch (const DB::Exception & e) + { + std::cerr << e.what() << ", " << e.message() << std::endl; + return 1; + } + + return 0; +} diff --git a/dbms/src/Core/tests/write_buffer_perf.cpp b/dbms/src/Core/tests/write_buffer_perf.cpp new file mode 100644 index 00000000000..a8cdb11e152 --- /dev/null +++ b/dbms/src/Core/tests/write_buffer_perf.cpp @@ -0,0 +1,43 @@ +#include + +#include +#include + +#include + + +int main(int argc, char ** argv) +{ + try + { + DB::Int64 a = -123456; + DB::Float64 b = 123.456; + DB::String c = "вася пе\tтя"; + DB::String d = "'xyz\\"; + + std::ofstream s("test"); + DB::WriteBufferFromOStream out(s); + + for (int i = 0; i < 1000000; ++i) + { + DB::writeIntText(a, out); + DB::writeChar(' ', out); + + DB::writeFloatText(b, out); + DB::writeChar(' ', out); + + DB::writeEscapedString(c, out); + DB::writeChar('\t', out); + + DB::writeQuotedString(d, out); + DB::writeChar('\n', out); + } + } + catch (const DB::Exception & e) + { + std::cerr << e.what() << ", " << e.message() << std::endl; + return 1; + } + + return 0; +}