dbms: development.

This commit is contained in:
Alexey Milovidov 2010-06-01 13:35:09 +00:00
parent 3de5e45cb2
commit f6f252b8f2
8 changed files with 600 additions and 212 deletions

View File

@ -35,6 +35,9 @@ namespace ErrorCodes
CANNOT_WRITE_TO_OSTREAM, CANNOT_WRITE_TO_OSTREAM,
CANNOT_PARSE_ESCAPE_SEQUENCE, CANNOT_PARSE_ESCAPE_SEQUENCE,
CANNOT_PARSE_QUOTED_STRING, CANNOT_PARSE_QUOTED_STRING,
CANNOT_PARSE_INPUT_ASSERTION_FAILED,
CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER,
CANNOT_PRINT_INTEGER,
}; };
} }

View File

@ -1,8 +1,7 @@
#ifndef DBMS_COMMON_READBUFFER_H #ifndef DBMS_COMMON_READBUFFER_H
#define DBMS_COMMON_READBUFFER_H #define DBMS_COMMON_READBUFFER_H
#include <string.h> // memcpy #include <cstring>
#include <algorithm> #include <algorithm>
#include <DB/Core/Types.h> #include <DB/Core/Types.h>
@ -57,45 +56,87 @@ public:
virtual ~ReadBuffer() {} virtual ~ReadBuffer() {}
/// Функции для чтения конкретных данных
inline bool eof() inline bool eof()
{ {
return pos == working_buffer.end() && !next(); return pos == working_buffer.end() && !next();
} }
void readChar(char & x)
{
x = 0;
if (!eof())
{
x = *pos;
++pos;
}
}
void ignore() void ignore()
{ {
if (!eof()) if (!eof())
++pos; ++pos;
} }
size_t read(char * to, size_t n)
{
size_t bytes_copied = 0;
/// грубо while (!eof() && bytes_copied < n)
template <typename T>
void readIntText(T & x)
{ {
size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
std::memcpy(to, pos, bytes_to_copy);
pos += bytes_to_copy;
}
return bytes_copied;
}
protected:
char internal_buffer[DEFAULT_READ_BUFFER_SIZE];
Buffer working_buffer;
Position pos;
};
/// Функции-помошники для форматированного чтения
static inline char parseEscapeSequence(char c)
{
switch(c)
{
case 'b':
return '\b';
case 'f':
return '\f';
case 'n':
return '\n';
case 'r':
return '\r';
case 't':
return '\t';
case '0':
return '\0';
default:
return c;
}
}
static void assertString(const char * s, ReadBuffer & buf)
{
for (; *s; ++s)
{
if (buf.eof() || *buf.position() != *s)
throw Exception(String("Cannot parse input: expected ") + s, ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
++buf.position();
}
}
/// грубо
template <typename T>
void readIntText(T & x, ReadBuffer & buf)
{
bool negative = false;
x = 0; x = 0;
while (!eof()) while (!buf.eof())
{ {
switch (*pos) switch (*buf.position())
{ {
case '+': case '+':
break; break;
case '-': case '-':
x = -x; negative = true;
break; break;
case '0': case '0':
case '1': case '1':
@ -108,31 +149,36 @@ public:
case '8': case '8':
case '9': case '9':
x *= 10; x *= 10;
x += *pos - '0'; x += *buf.position() - '0';
break; break;
default: default:
if (negative)
x = -x;
return; return;
} }
++pos; ++buf.position();
}
} }
if (negative)
x = -x;
}
/// грубо; поддерживается только простой формат /// грубо
template <typename T> template <typename T>
void readFloatText(T & x) void readFloatText(T & x, ReadBuffer & buf)
{ {
bool negative = false;
x = 0; x = 0;
bool after_point = false; bool after_point = false;
double power_of_ten = 1; double power_of_ten = 1;
while (!eof()) while (!buf.eof())
{ {
switch (*pos) switch (*buf.position())
{ {
case '+': case '+':
break; break;
case '-': case '-':
x = -x; negative = true;
break; break;
case '.': case '.':
after_point = true; after_point = true;
@ -150,149 +196,162 @@ public:
if (after_point) if (after_point)
{ {
power_of_ten /= 10; power_of_ten /= 10;
x += (*pos - '0') * power_of_ten; x += (*buf.position() - '0') * power_of_ten;
} }
else else
{ {
x *= 10; x *= 10;
x += *pos - '0'; x += *buf.position() - '0';
} }
break; break;
case 'e':
case 'E':
{
++buf.position();
Int32 exponent = 0;
readIntText(exponent, buf);
if (exponent == 0)
{
if (negative)
x = -x;
return;
}
else if (exponent > 0)
{
for (Int32 i = 0; i < exponent; ++i)
x *= 10;
if (negative)
x = -x;
return;
}
else
{
for (Int32 i = 0; i < exponent; ++i)
x /= 10;
if (negative)
x = -x;
return;
}
}
case 'i':
++buf.position();
assertString("nf", buf);
x = std::numeric_limits<T>::infinity();
if (negative)
x = -x;
return;
case 'I':
++buf.position();
assertString("NF", buf);
x = std::numeric_limits<T>::infinity();
if (negative)
x = -x;
return;
case 'n':
++buf.position();
assertString("an", buf);
x = std::numeric_limits<T>::quiet_NaN();
return;
case 'N':
++buf.position();
assertString("AN", buf);
x = std::numeric_limits<T>::quiet_NaN();
return;
default: default:
if (negative)
x = -x;
return; return;
} }
++pos; ++buf.position();
}
} }
if (negative)
x = -x;
}
/// грубо; всё до '\n' или '\t' /// грубо; всё до '\n' или '\t'
void readString(String & s) void readString(String & s, ReadBuffer & buf)
{ {
s = ""; s = "";
while (!eof()) while (!buf.eof())
{ {
size_t bytes = 0; size_t bytes = 0;
for (; pos + bytes != working_buffer.end(); ++bytes) for (; buf.position() + bytes != buf.buffer().end(); ++bytes)
if (pos[bytes] == '\t' || pos[bytes] == '\n') if (buf.position()[bytes] == '\t' || buf.position()[bytes] == '\n')
break; break;
s.append(pos, bytes); s.append(buf.position(), bytes);
pos += bytes; buf.position() += bytes;
if (pos != working_buffer.end()) if (buf.position() != buf.buffer().end())
return; return;
} }
} }
void readEscapedString(String & s) void readEscapedString(String & s, ReadBuffer & buf)
{ {
s = ""; s = "";
while (!eof()) while (!buf.eof())
{ {
size_t bytes = 0; size_t bytes = 0;
for (; pos + bytes != working_buffer.end(); ++bytes) for (; buf.position() + bytes != buf.buffer().end(); ++bytes)
if (pos[bytes] == '\\' || pos[bytes] == '\t' || pos[bytes] == '\n') if (buf.position()[bytes] == '\\' || buf.position()[bytes] == '\t' || buf.position()[bytes] == '\n')
break; break;
s.append(pos, bytes); s.append(buf.position(), bytes);
pos += bytes; buf.position() += bytes;
if (*pos == '\t' || *pos == '\n') if (*buf.position() == '\t' || *buf.position() == '\n')
return; return;
if (*pos == '\\') if (*buf.position() == '\\')
{ {
++pos; ++buf.position();
if (eof()) if (buf.eof())
throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE); throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
s += parseEscapeSequence(*pos); s += parseEscapeSequence(*buf.position());
++pos; ++buf.position();
}
} }
} }
}
void readQuotedString(String & s) void readQuotedString(String & s, ReadBuffer & buf)
{ {
s = ""; s = "";
if (eof() || *pos != '\'') if (buf.eof() || *buf.position() != '\'')
throw Exception("Cannot parse quoted string: expected opening single quote", throw Exception("Cannot parse quoted string: expected opening single quote",
ErrorCodes::CANNOT_PARSE_QUOTED_STRING); ErrorCodes::CANNOT_PARSE_QUOTED_STRING);
++pos; ++buf.position();
while (!eof()) while (!buf.eof())
{ {
size_t bytes = 0; size_t bytes = 0;
for (; pos + bytes != working_buffer.end(); ++bytes) for (; buf.position() + bytes != buf.buffer().end(); ++bytes)
if (pos[bytes] == '\\' || pos[bytes] == '\'') if (buf.position()[bytes] == '\\' || buf.position()[bytes] == '\'')
break; break;
s.append(pos, bytes); s.append(buf.position(), bytes);
pos += bytes; buf.position() += bytes;
if (*pos == '\'') if (*buf.position() == '\'')
{ {
++pos; ++buf.position();
return; return;
} }
if (*pos == '\\') if (*buf.position() == '\\')
{ {
++pos; ++buf.position();
if (eof()) if (buf.eof())
throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE); throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
s += parseEscapeSequence(*pos); s += parseEscapeSequence(*buf.position());
++pos; ++buf.position();
} }
} }
throw Exception("Cannot parse quoted string: expected closing single quote", throw Exception("Cannot parse quoted string: expected closing single quote",
ErrorCodes::CANNOT_PARSE_QUOTED_STRING); ErrorCodes::CANNOT_PARSE_QUOTED_STRING);
} }
size_t read(char * to, size_t n)
{
size_t bytes_copied = 0;
while (!eof() && bytes_copied < n)
{
size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
memcpy(to, pos, bytes_to_copy);
pos += bytes_to_copy;
}
return bytes_copied;
}
protected:
char internal_buffer[DEFAULT_READ_BUFFER_SIZE];
Buffer working_buffer;
Position pos;
private:
inline char parseEscapeSequence(char c)
{
switch(c)
{
case 'b':
return '\b';
case 'f':
return '\f';
case 'n':
return '\n';
case 'r':
return '\r';
case 't':
return '\t';
default:
return c;
}
}
};
} }

View File

@ -0,0 +1,198 @@
#ifndef DBMS_COMMON_WRITEBUFFER_H
#define DBMS_COMMON_WRITEBUFFER_H
#include <cstring>
#include <cstdio>
#include <limits>
#include <algorithm>
#include <DB/Core/Types.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#define DEFAULT_WRITE_BUFFER_SIZE 1048576
#define DEFAULT_FLOAT_PRECISION 6
/// 20 цифр, знак, и \0 для конца строки
#define MAX_INT_WIDTH 22
namespace DB
{
/** Простой абстрактный класс для буферизованной записи данных (последовательности char) куда-нибудь.
* В отличие от std::ostream, предоставляет доступ к внутреннему буферу,
* а также позволяет вручную управлять позицией внутри буфера.
*
* Наследники должны реализовать метод next().
*
* Также предоставляет набор функций для форматированной и неформатированной записи.
* (с простой и грубой реализацией)
*/
class WriteBuffer
{
public:
typedef char * Position;
struct Buffer
{
Buffer(Position begin_pos_, Position end_pos_) : begin_pos(begin_pos_), end_pos(end_pos_) {}
inline Position begin() { return begin_pos; }
inline Position end() { return end_pos; }
private:
Position begin_pos;
Position end_pos; /// на 1 байт после конца буфера
};
WriteBuffer() : working_buffer(internal_buffer, internal_buffer + DEFAULT_WRITE_BUFFER_SIZE), pos(internal_buffer) {}
/// получить часть буфера, в который можно писать данные
inline Buffer & buffer() { return working_buffer; }
/// получить (для чтения и изменения) позицию в буфере
inline Position & position() { return pos; };
/** записать данные, находящиеся в буфере (от начала буфера до текущей позиции);
* переместить позицию в начало; кинуть исключение, если что-то не так
*/
virtual void next() {}
virtual ~WriteBuffer() {}
inline void nextIfAtEnd()
{
if (pos == working_buffer.end())
next();
}
void write(const char * from, size_t n)
{
size_t bytes_copied = 0;
while (bytes_copied < n)
{
nextIfAtEnd();
size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
std::memcpy(pos, from + bytes_copied, bytes_to_copy);
pos += bytes_to_copy;
bytes_copied += bytes_to_copy;
}
}
protected:
char internal_buffer[DEFAULT_WRITE_BUFFER_SIZE];
Buffer working_buffer;
Position pos;
};
/// Функции-помошники для форматированной записи
void writeChar(char x, WriteBuffer & buf)
{
buf.nextIfAtEnd();
*buf.position() = x;
++buf.position();
}
template <typename T> struct IntFormat { static const char * format; };
template <> const char * IntFormat<Int8>::format = "%hhi";
template <> const char * IntFormat<Int16>::format = "%hi";
template <> const char * IntFormat<Int32>::format = "%li";
template <> const char * IntFormat<Int64>::format = "%lli";
template <> const char * IntFormat<UInt8>::format = "%hhi";
template <> const char * IntFormat<UInt16>::format = "%hi";
template <> const char * IntFormat<UInt32>::format = "%li";
template <> const char * IntFormat<UInt64>::format = "%lli";
/// грубо
template <typename T>
void writeIntText(T x, WriteBuffer & buf)
{
char tmp[MAX_INT_WIDTH];
int res = std::snprintf(tmp, MAX_INT_WIDTH, IntFormat<T>::format, x);
if (res >= MAX_INT_WIDTH || res <= 0)
throw Exception("Cannot print integer", ErrorCodes::CANNOT_PRINT_INTEGER);
buf.write(tmp, res - 1);
}
template <typename T>
void writeFloatText(T x, WriteBuffer & buf, unsigned precision = DEFAULT_FLOAT_PRECISION)
{
unsigned size = precision + 10;
char tmp[size]; /// знаки, +0.0e+123\0
int res = std::snprintf(tmp, size, "%.*g", precision, x);
if (res >= static_cast<int>(size) || res <= 0)
throw Exception("Cannot print float or double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER);
buf.write(tmp, res - 1);
}
void writeString(const String & s, WriteBuffer & buf)
{
buf.write(s.data(), s.size());
}
/// предполагается, что строка в оперативке хранится непрерывно, и \0-terminated.
void writeEscapedString(const String & s, WriteBuffer & buf)
{
for (String::const_iterator it = s.begin(); it != s.end(); ++it)
{
switch (*it)
{
case '\b':
writeChar('\\', buf);
writeChar('b', buf);
break;
case '\f':
writeChar('\\', buf);
writeChar('f', buf);
break;
case '\n':
writeChar('\\', buf);
writeChar('n', buf);
break;
case '\r':
writeChar('\\', buf);
writeChar('r', buf);
break;
case '\t':
writeChar('\\', buf);
writeChar('t', buf);
break;
case '\0':
writeChar('\\', buf);
writeChar('0', buf);
break;
case '\'':
writeChar('\\', buf);
writeChar('\'', buf);
break;
case '\\':
writeChar('\\', buf);
writeChar('\\', buf);
break;
default:
writeChar(*it, buf);
}
}
}
void writeQuotedString(const String & s, WriteBuffer & buf)
{
writeChar('\'', buf);
writeEscapedString(s, buf);
writeChar('\'', buf);
}
}
#endif

View File

@ -0,0 +1,40 @@
#ifndef DBMS_COMMON_WRITEBUFFER_FROM_OSTREAM_H
#define DBMS_COMMON_WRITEBUFFER_FROM_OSTREAM_H
#include <iostream>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/WriteBuffer.h>
namespace DB
{
class WriteBufferFromOStream : public WriteBuffer
{
private:
std::ostream & ostr;
public:
WriteBufferFromOStream(std::ostream & ostr_) : ostr(ostr_) {}
void next()
{
ostr.write(internal_buffer, pos - internal_buffer);
pos = internal_buffer;
if (!ostr.good())
throw Exception("Cannot write to ostream", ErrorCodes::CANNOT_WRITE_TO_OSTREAM);
}
~WriteBufferFromOStream()
{
next();
}
};
}
#endif

View File

@ -18,16 +18,16 @@ int main(int argc, char ** argv)
DB::Float64 b; DB::Float64 b;
DB::String c, d; DB::String c, d;
in.readIntText(a); DB::readIntText(a, in);
in.ignore(); in.ignore();
in.readFloatText(b); DB::readFloatText(b, in);
in.ignore(); in.ignore();
in.readEscapedString(c); DB::readEscapedString(c, in);
in.ignore(); in.ignore();
in.readQuotedString(d); DB::readQuotedString(d, in);
std::cout << a << ' ' << b << ' ' << c << '\t' << '\'' << d << '\'' << std::endl; std::cout << a << ' ' << b << ' ' << c << '\t' << '\'' << d << '\'' << std::endl;
} }

View File

@ -20,16 +20,16 @@ int main(int argc, char ** argv)
size_t i = 0; size_t i = 0;
while (!in.eof()) while (!in.eof())
{ {
in.readIntText(a); DB::readIntText(a, in);
in.ignore(); in.ignore();
in.readFloatText(b); DB::readFloatText(b, in);
in.ignore(); in.ignore();
in.readEscapedString(c); DB::readEscapedString(c, in);
in.ignore(); in.ignore();
in.readQuotedString(d); DB::readQuotedString(d, in);
in.ignore(); in.ignore();
++i; ++i;

View File

@ -0,0 +1,45 @@
#include <string>
#include <iostream>
#include <sstream>
#include <DB/Core/WriteBufferFromOStream.h>
int main(int argc, char ** argv)
{
try
{
DB::Int64 a = -123456;
DB::Float64 b = 123.456;
DB::String c = "вася пе\tтя";
DB::String d = "'xyz\\";
std::stringstream s;
{
DB::WriteBufferFromOStream out(s);
DB::writeIntText(a, out);
DB::writeChar(' ', out);
DB::writeFloatText(b, out);
DB::writeChar(' ', out);
DB::writeEscapedString(c, out);
DB::writeChar('\t', out);
DB::writeQuotedString(d, out);
DB::writeChar('\n', out);
}
std::cout << s.str();
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.message() << std::endl;
return 1;
}
return 0;
}

View File

@ -0,0 +1,43 @@
#include <string>
#include <iostream>
#include <fstream>
#include <DB/Core/WriteBufferFromOStream.h>
int main(int argc, char ** argv)
{
try
{
DB::Int64 a = -123456;
DB::Float64 b = 123.456;
DB::String c = "вася пе\tтя";
DB::String d = "'xyz\\";
std::ofstream s("test");
DB::WriteBufferFromOStream out(s);
for (int i = 0; i < 1000000; ++i)
{
DB::writeIntText(a, out);
DB::writeChar(' ', out);
DB::writeFloatText(b, out);
DB::writeChar(' ', out);
DB::writeEscapedString(c, out);
DB::writeChar('\t', out);
DB::writeQuotedString(d, out);
DB::writeChar('\n', out);
}
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.message() << std::endl;
return 1;
}
return 0;
}