dbms: development

This commit is contained in:
Alexey Milovidov 2010-06-04 18:25:25 +00:00
parent 7a360b0cd6
commit e0f951245a
50 changed files with 814 additions and 580 deletions

View File

@ -1,7 +0,0 @@
#ifndef DBMS_COMMON_ESCAPE_MANIPULATORS_H
#define DBMS_COMMON_ESCAPE_MANIPULATORS_H
#include <strconvert/escape_manip.h>
#include <strconvert/unescape_manip.h>
#endif

View File

@ -1,49 +0,0 @@
#ifndef DBMS_COMMON_ESCAPING_OUTPUT_STREAM_H
#define DBMS_COMMON_ESCAPING_OUTPUT_STREAM_H
#include <ostream>
#include <Poco/UnbufferedStreamBuf.h>
namespace DB
{
/** Поток, который эскейпит всё, что в него пишут.
*/
class EscapingStreamBuf : public Poco::UnbufferedStreamBuf
{
public:
EscapingStreamBuf(std::ostream & ostr);
protected:
int writeToDevice(char c);
private:
std::ostream * p_ostr;
};
class EscapingIOS : public virtual std::ios
{
public:
EscapingIOS(std::ostream & ostr);
EscapingStreamBuf * rdbuf();
protected:
EscapingStreamBuf buf;
};
class EscapingOutputStream : public EscapingIOS, public std::ostream
{
public:
EscapingOutputStream(std::ostream & ostr);
};
}
#endif

View File

@ -1,7 +0,0 @@
#ifndef DBMS_COMMON_QUOTE_MANIPULATORS_H
#define DBMS_COMMON_QUOTE_MANIPULATORS_H
#include <strconvert/escape_manip.h>
#include <strconvert/unescape_manip.h>
#endif

View File

@ -1,49 +0,0 @@
#ifndef DBMS_COMMON_UNESCAPING_INPUT_STREAM_H
#define DBMS_COMMON_UNESCAPING_INPUT_STREAM_H
#include <istream>
#include <Poco/UnbufferedStreamBuf.h>
namespace DB
{
/** Поток, который unescape-ит всё, что из него читают.
*/
class UnescapingStreamBuf : public Poco::UnbufferedStreamBuf
{
public:
UnescapingStreamBuf(std::istream & istr, char delimiter_);
protected:
int readFromDevice();
private:
std::istream * p_istr;
char delimiter;
};
class UnescapingIOS : public virtual std::ios
{
public:
UnescapingIOS(std::istream & istr, char delimiter_);
UnescapingStreamBuf * rdbuf();
protected:
UnescapingStreamBuf buf;
};
class UnescapingInputStream : public UnescapingIOS, public std::istream
{
public:
UnescapingInputStream(std::istream & istr, char delimiter_);
};
}
#endif

View File

@ -12,7 +12,7 @@ namespace ErrorCodes
UNSUPPORTED_METHOD,
UNSUPPORTED_PARAMETER,
UNEXPECTED_END_OF_FILE,
CANNOT_READ_DATA_FROM_ISTREAM,
CANNOT_READ_DATA_FROM_READ_BUFFER,
CANNOT_PARSE_TEXT,
INCORRECT_NUMBER_OF_COLUMNS,
THERE_IS_NO_COLUMN,
@ -38,6 +38,10 @@ namespace ErrorCodes
CANNOT_PARSE_INPUT_ASSERTION_FAILED,
CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER,
CANNOT_PRINT_INTEGER,
CANNOT_READ_SIZE_OF_COMPRESSED_CHUNK,
CANNOT_READ_COMPRESSED_CHUNK,
ATTEMPT_TO_READ_AFTER_EOF,
CANNOT_READ_ALL_DATA,
};
}

View File

@ -1,10 +1,9 @@
#ifndef DBMS_DATA_STREAMS_TABSEPARATEDROWINPUTSTREAM_H
#define DBMS_DATA_STREAMS_TABSEPARATEDROWINPUTSTREAM_H
#include <ostream>
#include <Poco/SharedPtr.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/DataTypes/DataTypes.h>
#include <DB/DataStreams/IRowInputStream.h>
@ -20,12 +19,12 @@ using Poco::SharedPtr;
class TabSeparatedRowInputStream : public IRowInputStream
{
public:
TabSeparatedRowInputStream(std::istream & istr_, SharedPtr<DataTypes> data_types_);
TabSeparatedRowInputStream(ReadBuffer & istr_, SharedPtr<DataTypes> data_types_);
Row read();
private:
std::istream & istr;
ReadBuffer & istr;
SharedPtr<DataTypes> data_types;
};

View File

@ -1,10 +1,9 @@
#ifndef DBMS_DATA_STREAMS_TABSEPARATEDROWOUTPUTSTREAM_H
#define DBMS_DATA_STREAMS_TABSEPARATEDROWOUTPUTSTREAM_H
#include <ostream>
#include <Poco/SharedPtr.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/DataTypes/DataTypes.h>
#include <DB/DataStreams/IRowOutputStream.h>
@ -20,14 +19,14 @@ using Poco::SharedPtr;
class TabSeparatedRowOutputStream : public IRowOutputStream
{
public:
TabSeparatedRowOutputStream(std::ostream & ostr_, SharedPtr<DataTypes> data_types_);
TabSeparatedRowOutputStream(WriteBuffer & ostr_, SharedPtr<DataTypes> data_types_);
void writeField(const Field & field);
void writeFieldDelimiter();
void writeRowEndDelimiter();
private:
std::ostream & ostr;
WriteBuffer & ostr;
SharedPtr<DataTypes> data_types;
size_t field_number;
};

View File

@ -5,6 +5,10 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/DataTypes/IDataTypeNumberFixed.h>
@ -23,50 +27,49 @@ public:
std::string getName() const { return "Date"; }
SharedPtr<IDataType> clone() const { return new DataTypeDate; }
void serializeText(const Field & field, std::ostream & ostr) const
void serializeText(const Field & field, WriteBuffer & ostr) const
{
DateLUT::Values & values = date_lut.getValues(boost::get<UInt16>(field));
ostr << values.year << '-' << values.month << '-' << values.day_of_month;
writeIntText(values.year, ostr);
writeChar('-', ostr);
writeIntText(values.month, ostr);
writeChar('-', ostr);
writeIntText(values.day_of_month, ostr);
}
void deserializeText(Field & field, std::istream & istr) const
void deserializeText(Field & field, ReadBuffer & istr) const
{
std::string s;
istr >> s;
readString(s, istr);
// TODO: тормоза
int time_zone_diff = 0;
field = date_lut.toDayNum(Poco::DateTimeParser::parse(
s, time_zone_diff).timestamp().epochTime());
}
void serializeTextEscaped(const Field & field, std::ostream & ostr) const
void serializeTextEscaped(const Field & field, WriteBuffer & ostr) const
{
serializeText(field, ostr);
}
void deserializeTextEscaped(Field & field, std::istream & istr) const
void deserializeTextEscaped(Field & field, ReadBuffer & istr) const
{
deserializeText(field, istr);
}
void serializeTextQuoted(const Field & field, std::ostream & ostr, bool compatible = false) const
void serializeTextQuoted(const Field & field, WriteBuffer & ostr, bool compatible = false) const
{
ostr << '\'';
writeChar('\'', ostr);
serializeText(field, ostr);
ostr << '\'';
writeChar('\'', ostr);
}
void deserializeTextQuoted(Field & field, std::istream & istr, bool compatible = false) const
void deserializeTextQuoted(Field & field, ReadBuffer & istr, bool compatible = false) const
{
char delim = istr.peek();
if (delim == '\'' || delim == '"')
istr.ignore();
assertString("'", istr);
deserializeText(field, istr);
if (istr.get() != delim)
throw Exception("Delimiter in string literal doesn't match",
ErrorCodes::DELIMITER_IN_STRING_LITERAL_DOESNT_MATCH);
assertString("'", istr);
}
};

View File

@ -27,19 +27,19 @@ public:
return new DataTypeString;
}
void serializeBinary(const Field & field, std::ostream & ostr) const;
void deserializeBinary(Field & field, std::istream & istr) const;
void serializeBinary(const IColumn & column, std::ostream & ostr) const;
void deserializeBinary(IColumn & column, std::istream & istr, size_t limit) const;
void serializeBinary(const Field & field, WriteBuffer & ostr) const;
void deserializeBinary(Field & field, ReadBuffer & istr) const;
void serializeBinary(const IColumn & column, WriteBuffer & ostr) const;
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const;
void serializeText(const Field & field, std::ostream & ostr) const;
void deserializeText(Field & field, std::istream & istr) const;
void serializeText(const Field & field, WriteBuffer & ostr) const;
void deserializeText(Field & field, ReadBuffer & istr) const;
void serializeTextEscaped(const Field & field, std::ostream & ostr) const;
void deserializeTextEscaped(Field & field, std::istream & istr) const;
void serializeTextEscaped(const Field & field, WriteBuffer & ostr) const;
void deserializeTextEscaped(Field & field, ReadBuffer & istr) const;
void serializeTextQuoted(const Field & field, std::ostream & ostr, bool compatible = false) const;
void deserializeTextQuoted(Field & field, std::istream & istr, bool compatible = false) const;
void serializeTextQuoted(const Field & field, WriteBuffer & ostr, bool compatible = false) const;
void deserializeTextQuoted(Field & field, ReadBuffer & istr, bool compatible = false) const;
SharedPtr<IColumn> createColumn() const;
};

View File

@ -1,11 +1,11 @@
#ifndef DBMS_DATA_TYPES_IDATATYPE_H
#define DBMS_DATA_TYPES_IDATATYPE_H
#include <ostream>
#include <Poco/SharedPtr.h>
#include <DB/Core/Field.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/Columns/IColumn.h>
@ -33,29 +33,29 @@ public:
* Обратите внимание, что присутствует по два вида методов
* - для работы с единичными значениями и целыми столбцами.
*/
virtual void serializeBinary(const Field & field, std::ostream & ostr) const = 0;
virtual void deserializeBinary(Field & field, std::istream & istr) const = 0;
virtual void serializeBinary(const IColumn & column, std::ostream & ostr) const = 0;
virtual void serializeBinary(const Field & field, WriteBuffer & ostr) const = 0;
virtual void deserializeBinary(Field & field, ReadBuffer & istr) const = 0;
virtual void serializeBinary(const IColumn & column, WriteBuffer & ostr) const = 0;
/** Считать не более limit значений. */
virtual void deserializeBinary(IColumn & column, std::istream & istr, size_t limit) const = 0;
virtual void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const = 0;
/** Текстовая сериализация - для вывода на экран / сохранения в текстовый файл и т. п.
* Без эскейпинга и квотирования.
*/
virtual void serializeText(const Field & field, std::ostream & ostr) const = 0;
virtual void deserializeText(Field & field, std::istream & istr) const = 0;
virtual void serializeText(const Field & field, WriteBuffer & ostr) const = 0;
virtual void deserializeText(Field & field, ReadBuffer & istr) const = 0;
/** Текстовая сериализация с эскейпингом, но без квотирования.
*/
virtual void serializeTextEscaped(const Field & field, std::ostream & ostr) const = 0;
virtual void deserializeTextEscaped(Field & field, std::istream & istr) const = 0;
virtual void serializeTextEscaped(const Field & field, WriteBuffer & ostr) const = 0;
virtual void deserializeTextEscaped(Field & field, ReadBuffer & istr) const = 0;
/** Текстовая сериализация в виде литерала, который может быть вставлен в запрос.
* Если compatible = true, то значение типа "массив" и "кортеж" ещё дополнительно записывается в кавычки,
* чтобы текстовый дамп можно было загрузить в другую СУБД с этими значениями в виде строки.
*/
virtual void serializeTextQuoted(const Field & field, std::ostream & ostr, bool compatible = false) const = 0;
virtual void deserializeTextQuoted(Field & field, std::istream & istr, bool compatible = false) const = 0;
virtual void serializeTextQuoted(const Field & field, WriteBuffer & ostr, bool compatible = false) const = 0;
virtual void deserializeTextQuoted(Field & field, ReadBuffer & istr, bool compatible = false) const = 0;
/** Создать пустой столбец соответствующего типа.
*/

View File

@ -3,6 +3,9 @@
#include <DB/DataTypes/IDataType.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
namespace DB
{
@ -15,34 +18,34 @@ template <typename FieldType>
class IDataTypeNumber : public IDataType
{
public:
void serializeText(const Field & field, std::ostream & ostr) const
void serializeText(const Field & field, WriteBuffer & ostr) const
{
ostr << boost::get<typename NearestFieldType<FieldType>::Type>(field);
writeIntText(boost::get<typename NearestFieldType<FieldType>::Type>(field), ostr);
}
void deserializeText(Field & field, std::istream & istr) const
void deserializeText(Field & field, ReadBuffer & istr) const
{
typename NearestFieldType<FieldType>::Type x;
istr >> x;
readIntText(x, istr);
field = x;
}
void serializeTextEscaped(const Field & field, std::ostream & ostr) const
void serializeTextEscaped(const Field & field, WriteBuffer & ostr) const
{
serializeText(field, ostr);
}
void deserializeTextEscaped(Field & field, std::istream & istr) const
void deserializeTextEscaped(Field & field, ReadBuffer & istr) const
{
deserializeText(field, istr);
}
void serializeTextQuoted(const Field & field, std::ostream & ostr, bool compatible = false) const
void serializeTextQuoted(const Field & field, WriteBuffer & ostr, bool compatible = false) const
{
serializeText(field, ostr);
}
void deserializeTextQuoted(Field & field, std::istream & istr, bool compatible = false) const
void deserializeTextQuoted(Field & field, ReadBuffer & istr, bool compatible = false) const
{
deserializeText(field, istr);
}

View File

@ -24,32 +24,32 @@ public:
/** Формат платформозависимый (зависит от представления данных в памяти).
*/
void serializeBinary(const Field & field, std::ostream & ostr) const
void serializeBinary(const Field & field, WriteBuffer & ostr) const
{
/// ColumnType::value_type - более узкий тип. Например, UInt8, когда тип Field - UInt64
typename ColumnType::value_type x = boost::get<FieldType>(field);
ostr.write(reinterpret_cast<const char *>(&x), sizeof(x));
}
void deserializeBinary(Field & field, std::istream & istr) const
void deserializeBinary(Field & field, ReadBuffer & istr) const
{
typename ColumnType::value_type x;
istr.read(reinterpret_cast<char *>(&x), sizeof(x));
field = typename NearestFieldType<FieldType>::Type(x);
}
void serializeBinary(const IColumn & column, std::ostream & ostr) const
void serializeBinary(const IColumn & column, WriteBuffer & ostr) const
{
const typename ColumnType::Container_t & x = dynamic_cast<const ColumnType &>(column).getData();
ostr.write(reinterpret_cast<const char *>(&x[0]), sizeof(typename ColumnType::value_type) * x.size());
}
void deserializeBinary(IColumn & column, std::istream & istr, size_t limit) const
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const
{
typename ColumnType::Container_t & x = dynamic_cast<ColumnType &>(column).getData();
x.resize(limit);
istr.read(reinterpret_cast<char*>(&x[0]), sizeof(typename ColumnType::value_type) * limit);
x.resize(istr.gcount() / sizeof(typename ColumnType::value_type));
size_t size = istr.read(reinterpret_cast<char*>(&x[0]), sizeof(typename ColumnType::value_type) * limit);
x.resize(size / sizeof(typename ColumnType::value_type));
}
SharedPtr<IColumn> createColumn() const

View File

@ -1,10 +1,11 @@
#ifndef DBMS_DATA_TYPES_IDATATYPE_NUMBER_VARIABLE_H
#define DBMS_DATA_TYPES_IDATATYPE_NUMBER_VARIABLE_H
#include <DB/Common/VarInt.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/VarInt.h>
#include <DB/DataTypes/IDataTypeNumber.h>
@ -23,17 +24,17 @@ template <typename FieldType, typename ColumnType>
class IDataTypeNumberVariable : public IDataTypeNumber<FieldType>
{
public:
void serializeBinary(const Field & field, std::ostream & ostr) const
void serializeBinary(const Field & field, WriteBuffer & ostr) const
{
writeVarT(static_cast<typename ColumnType::value_type>(boost::get<FieldType>(field)), ostr);
}
void deserializeBinary(Field & field, std::istream & istr) const
void deserializeBinary(Field & field, ReadBuffer & istr) const
{
readVarT(static_cast<typename ColumnType::value_type &>(boost::get<FieldType>(field)), istr);
readVarT(static_cast<typename ColumnType::value_type &>(boost::get<FieldType &>(field)), istr);
}
void serializeBinary(const IColumn & column, std::ostream & ostr) const
void serializeBinary(const IColumn & column, WriteBuffer & ostr) const
{
const typename ColumnType::Container_t & x = dynamic_cast<const ColumnType &>(column).getData();
size_t size = x.size();
@ -41,7 +42,7 @@ public:
writeVarT(x[i], ostr);
}
void deserializeBinary(IColumn & column, std::istream & istr, size_t limit) const
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const
{
typename ColumnType::Container_t & x = dynamic_cast<ColumnType &>(column).getData();
x.resize(limit);
@ -54,8 +55,6 @@ public:
x.resize(i);
break;
}
else if (!istr.good())
throw Exception("Cannot read data from istream", ErrorCodes::CANNOT_READ_DATA_FROM_ISTREAM);
}
}

View File

@ -9,7 +9,7 @@
#include <quicklz/quicklz_level1.h>
#include <DB/Common/CompressedStream.h>
#include <DB/IO/CompressedStream.h>
namespace DB

View File

@ -9,7 +9,7 @@
#include <quicklz/quicklz_level1.h>
#include <DB/Common/CompressedStream.h>
#include <DB/IO/CompressedStream.h>
namespace DB

View File

@ -0,0 +1,78 @@
#ifndef DBMS_COMMON_COMPRESSED_READBUFFER_H
#define DBMS_COMMON_COMPRESSED_READBUFFER_H
#include <algorithm>
#include <quicklz/quicklz_level1.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/CompressedStream.h>
namespace DB
{
class CompressedReadBuffer : public ReadBuffer
{
private:
ReadBuffer & in;
std::vector<char> compressed_buffer;
std::vector<char> decompressed_buffer;
char scratch[QLZ_SCRATCH_COMPRESS];
size_t pos_in_buffer;
public:
CompressedReadBuffer(ReadBuffer & in_)
: in(in_),
compressed_buffer(QUICKLZ_HEADER_SIZE),
pos_in_buffer(0)
{
}
/** Читает и разжимает следующий кусок сжатых данных. */
void readCompressedChunk()
{
size_t size = in.read(&compressed_buffer[0], QUICKLZ_HEADER_SIZE);
if (size != QUICKLZ_HEADER_SIZE)
throw Exception("Cannot read size of compressed chunk", ErrorCodes::CANNOT_READ_SIZE_OF_COMPRESSED_CHUNK);
size_t size_compressed = qlz_size_compressed(internal_buffer);
size_t size_decompressed = qlz_size_decompressed(internal_buffer);
compressed_buffer.resize(size_compressed);
decompressed_buffer.resize(size_decompressed);
size = in.read(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE);
if (size != size_compressed - QUICKLZ_HEADER_SIZE)
throw Exception("Cannot read compressed chunk", ErrorCodes::CANNOT_READ_COMPRESSED_CHUNK);
pos_in_buffer = 0;
}
bool next()
{
if (pos_in_buffer == decompressed_buffer.size())
{
if (in.eof())
return false;
readCompressedChunk();
}
size_t bytes_to_copy = std::min(decompressed_buffer.size() - pos_in_buffer, DEFAULT_READ_BUFFER_SIZE);
std::memcpy(internal_buffer, &decompressed_buffer[pos_in_buffer], bytes_to_copy);
pos = internal_buffer;
working_buffer = Buffer(internal_buffer, internal_buffer + bytes_to_copy);
return true;
}
};
}
#endif

View File

@ -0,0 +1,44 @@
#ifndef DBMS_COMMON_COMPRESSED_WRITEBUFFER_H
#define DBMS_COMMON_COMPRESSED_WRITEBUFFER_H
#include <quicklz/quicklz_level1.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/CompressedStream.h>
namespace DB
{
class CompressedWriteBuffer : public WriteBuffer
{
private:
WriteBuffer & out;
char compressed_buffer[DBMS_COMPRESSING_STREAM_BUFFER_SIZE + QUICKLZ_ADDITIONAL_SPACE];
char scratch[QLZ_SCRATCH_COMPRESS];
public:
CompressedWriteBuffer(WriteBuffer & out_) : out(out_) {}
void next()
{
size_t compressed_size = qlz_compress(
internal_buffer,
compressed_buffer,
pos - internal_buffer,
scratch);
out.write(compressed_buffer, compressed_size);
pos = internal_buffer;
}
~CompressedWriteBuffer()
{
next();
}
};
}
#endif

View File

@ -0,0 +1,106 @@
#ifndef DBMS_COMMON_READBUFFER_H
#define DBMS_COMMON_READBUFFER_H
#include <cstring>
#include <algorithm>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#define DEFAULT_READ_BUFFER_SIZE 1048576U
namespace DB
{
/** Простой абстрактный класс для буферизованного чтения данных (последовательности char) откуда-нибудь.
* В отличие от std::istream, предоставляет доступ к внутреннему буферу,
* а также позволяет вручную управлять позицией внутри буфера.
*
* Наследники должны реализовать метод next().
*/
class ReadBuffer
{
public:
typedef const char * Position;
struct Buffer
{
Buffer(Position begin_pos_, Position end_pos_) : begin_pos(begin_pos_), end_pos(end_pos_) {}
inline Position begin() { return begin_pos; }
inline Position end() { return end_pos; }
private:
Position begin_pos;
Position end_pos; /// на 1 байт после конца буфера
};
ReadBuffer() : working_buffer(internal_buffer, internal_buffer), pos(internal_buffer) {}
/// получить часть буфера, из которого можно читать данные
inline Buffer & buffer() { return working_buffer; }
/// получить (для чтения и изменения) позицию в буфере
inline Position & position() { return pos; };
/** прочитать следующие данные и заполнить ими буфер; переместить позицию в начало;
* вернуть false в случае конца, true иначе; кинуть исключение, если что-то не так
*/
virtual bool next() { return false; }
virtual ~ReadBuffer() {}
/** В отличие от std::istream, возвращает true, если все данные были прочитаны
* (а не в случае, если была попытка чтения после конца).
* Если на данный момент позиция находится на конце буфера, то вызывает метод next().
* То есть, имеет побочный эффект - если буфер закончился, то обновляет его и переносит позицию в начало.
*
* При попытке чтения после конца, следует кидать исключение.
*/
inline bool eof()
{
return pos == working_buffer.end() && !next();
}
void ignore()
{
if (!eof())
++pos;
else
throw Exception("Attempt to read after eof", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
}
/** Читает столько, сколько есть, не больше n байт. */
size_t read(char * to, size_t n)
{
size_t bytes_copied = 0;
while (!eof() && bytes_copied < n)
{
size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
std::memcpy(to, pos, bytes_to_copy);
pos += bytes_to_copy;
}
return bytes_copied;
}
/** Читает n байт, если есть меньше - кидает исключение. */
void readStrict(char * to, size_t n)
{
if (n != read(to, n))
throw Exception("Cannot read all data", ErrorCodes::CANNOT_READ_ALL_DATA);
}
protected:
char internal_buffer[DEFAULT_READ_BUFFER_SIZE];
Buffer working_buffer;
Position pos;
};
}
#endif

View File

@ -6,7 +6,7 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/ReadBuffer.h>
#include <DB/IO/ReadBuffer.h>
namespace DB

View File

@ -1,5 +1,5 @@
#ifndef DBMS_COMMON_READBUFFER_H
#define DBMS_COMMON_READBUFFER_H
#ifndef DBMS_COMMON_READHELPERS_H
#define DBMS_COMMON_READHELPERS_H
#include <cstring>
#include <limits>
@ -9,87 +9,12 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#define DEFAULT_READ_BUFFER_SIZE 1048576
#include <DB/IO/ReadBuffer.h>
namespace DB
{
/** Простой абстрактный класс для буферизованного чтения данных (последовательности char) откуда-нибудь.
* В отличие от std::istream, предоставляет доступ к внутреннему буферу,
* а также позволяет вручную управлять позицией внутри буфера.
*
* Наследники должны реализовать метод next().
*
* Также предоставляет набор функций для форматированного и неформатированного чтения.
* (с простой и грубой реализацией)
*/
class ReadBuffer
{
public:
typedef const char * Position;
struct Buffer
{
Buffer(Position begin_pos_, Position end_pos_) : begin_pos(begin_pos_), end_pos(end_pos_) {}
inline Position begin() { return begin_pos; }
inline Position end() { return end_pos; }
private:
Position begin_pos;
Position end_pos; /// на 1 байт после конца буфера
};
ReadBuffer() : working_buffer(internal_buffer, internal_buffer), pos(internal_buffer) {}
/// получить часть буфера, из которого можно читать данные
inline Buffer & buffer() { return working_buffer; }
/// получить (для чтения и изменения) позицию в буфере
inline Position & position() { return pos; };
/** прочитать следующие данные и заполнить ими буфер; переместить позицию в начало;
* вернуть false в случае конца, true иначе; кинуть исключение, если что-то не так
*/
virtual bool next() { return false; }
virtual ~ReadBuffer() {}
inline bool eof()
{
return pos == working_buffer.end() && !next();
}
void ignore()
{
if (!eof())
++pos;
}
size_t read(char * to, size_t n)
{
size_t bytes_copied = 0;
while (!eof() && bytes_copied < n)
{
size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
std::memcpy(to, pos, bytes_to_copy);
pos += bytes_to_copy;
}
return bytes_copied;
}
protected:
char internal_buffer[DEFAULT_READ_BUFFER_SIZE];
Buffer working_buffer;
Position pos;
};
/// Функции-помошники для форматированного чтения
static inline char parseEscapeSequence(char c)
@ -113,6 +38,23 @@ static inline char parseEscapeSequence(char c)
}
}
static inline void throwReadAfterEOF()
{
throw Exception("Attempt to read after eof", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
}
inline void readChar(char & x, ReadBuffer & buf)
{
if (!buf.eof())
{
x = *buf.position();
++buf.position();
}
else
throwReadAfterEOF();
}
void assertString(const char * s, ReadBuffer & buf);
/// грубо
@ -121,6 +63,9 @@ void readIntText(T & x, ReadBuffer & buf)
{
bool negative = false;
x = 0;
if (buf.eof())
throwReadAfterEOF();
while (!buf.eof())
{
switch (*buf.position())
@ -163,6 +108,9 @@ void readFloatText(T & x, ReadBuffer & buf)
bool after_point = false;
double power_of_ten = 1;
if (buf.eof())
throwReadAfterEOF();
while (!buf.eof())
{
switch (*buf.position())

View File

@ -2,8 +2,8 @@
#define DB_VARINT_H
#include <DB/Core/Types.h>
#include <DB/Core/ReadBuffer.h>
#include <DB/Core/WriteBuffer.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/WriteBuffer.h>
namespace DB

View File

@ -2,18 +2,9 @@
#define DBMS_COMMON_WRITEBUFFER_H
#include <cstring>
#include <cstdio>
#include <limits>
#include <algorithm>
#include <DB/Core/Types.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#define DEFAULT_WRITE_BUFFER_SIZE 1048576
#define DEFAULT_FLOAT_PRECISION 6
/// 20 цифр, знак, и \0 для конца строки
#define MAX_INT_WIDTH 22
#define DEFAULT_WRITE_BUFFER_SIZE 1048576U
namespace DB
@ -24,9 +15,6 @@ namespace DB
* а также позволяет вручную управлять позицией внутри буфера.
*
* Наследники должны реализовать метод next().
*
* Также предоставляет набор функций для форматированной и неформатированной записи.
* (с простой и грубой реализацией)
*/
class WriteBuffer
{
@ -58,6 +46,9 @@ public:
*/
virtual void next() {}
/** желательно в наследниках поместить в деструктор вызов next(),
* чтобы последние данные записались
*/
virtual ~WriteBuffer() {}
@ -89,59 +80,6 @@ protected:
};
/// Функции-помошники для форматированной записи
inline void writeChar(char x, WriteBuffer & buf)
{
buf.nextIfAtEnd();
*buf.position() = x;
++buf.position();
}
template <typename T> struct IntFormat { static const char * format; };
template <typename T>
void writeIntText(T x, WriteBuffer & buf)
{
char tmp[MAX_INT_WIDTH];
int res = std::snprintf(tmp, MAX_INT_WIDTH, IntFormat<T>::format, x);
if (res >= MAX_INT_WIDTH || res <= 0)
throw Exception("Cannot print integer", ErrorCodes::CANNOT_PRINT_INTEGER);
buf.write(tmp, res - 1);
}
template <typename T>
void writeFloatText(T x, WriteBuffer & buf, unsigned precision = DEFAULT_FLOAT_PRECISION)
{
unsigned size = precision + 10;
char tmp[size]; /// знаки, +0.0e+123\0
int res = std::snprintf(tmp, size, "%.*g", precision, x);
if (res >= static_cast<int>(size) || res <= 0)
throw Exception("Cannot print float or double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER);
buf.write(tmp, res - 1);
}
inline void writeString(const String & s, WriteBuffer & buf)
{
buf.write(s.data(), s.size());
}
/// предполагается, что строка в оперативке хранится непрерывно, и \0-terminated.
void writeEscapedString(const String & s, WriteBuffer & buf);
inline void writeQuotedString(const String & s, WriteBuffer & buf)
{
writeChar('\'', buf);
writeEscapedString(s, buf);
writeChar('\'', buf);
}
}
#endif

View File

@ -6,7 +6,7 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/WriteBuffer.h>
#include <DB/IO/WriteBuffer.h>
namespace DB
@ -23,6 +23,7 @@ public:
void next()
{
ostr.write(internal_buffer, pos - internal_buffer);
ostr.flush();
pos = internal_buffer;
if (!ostr.good())

View File

@ -0,0 +1,78 @@
#ifndef DBMS_COMMON_WRITEHELPERS_H
#define DBMS_COMMON_WRITEHELPERS_H
#include <cstring>
#include <cstdio>
#include <limits>
#include <algorithm>
#include <DB/Core/Types.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/WriteBuffer.h>
#define WRITE_HELPERS_DEFAULT_FLOAT_PRECISION 6
/// 20 цифр, знак, и \0 для конца строки
#define WRITE_HELPERS_MAX_INT_WIDTH 22
namespace DB
{
/// Функции-помошники для форматированной записи
inline void writeChar(char x, WriteBuffer & buf)
{
buf.nextIfAtEnd();
*buf.position() = x;
++buf.position();
}
template <typename T> struct IntFormat { static const char * format; };
template <typename T>
void writeIntText(T x, WriteBuffer & buf)
{
char tmp[WRITE_HELPERS_MAX_INT_WIDTH];
int res = std::snprintf(tmp, WRITE_HELPERS_MAX_INT_WIDTH, IntFormat<T>::format, x);
if (res >= WRITE_HELPERS_MAX_INT_WIDTH || res <= 0)
throw Exception("Cannot print integer", ErrorCodes::CANNOT_PRINT_INTEGER);
buf.write(tmp, res - 1);
}
template <typename T>
void writeFloatText(T x, WriteBuffer & buf, unsigned precision = WRITE_HELPERS_DEFAULT_FLOAT_PRECISION)
{
unsigned size = precision + 10;
char tmp[size]; /// знаки, +0.0e+123\0
int res = std::snprintf(tmp, size, "%.*g", precision, x);
if (res >= static_cast<int>(size) || res <= 0)
throw Exception("Cannot print float or double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER);
buf.write(tmp, res - 1);
}
inline void writeString(const String & s, WriteBuffer & buf)
{
buf.write(s.data(), s.size());
}
/// предполагается, что строка в оперативке хранится непрерывно, и \0-terminated.
void writeEscapedString(const String & s, WriteBuffer & buf);
inline void writeQuotedString(const String & s, WriteBuffer & buf)
{
writeChar('\'', buf);
writeEscapedString(s, buf);
writeChar('\'', buf);
}
}
#endif

View File

@ -8,8 +8,10 @@
#include <Poco/FileStream.h>
#include <DB/Core/NamesAndTypes.h>
#include <DB/Common/CompressedInputStream.h>
#include <DB/Common/CompressedOutputStream.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/Storages/IStorage.h>
@ -32,10 +34,11 @@ private:
struct Stream
{
Stream(const std::string & path)
: plain(path, std::ios::in | std::ios::binary), compressed(plain) {}
: istr(path, std::ios::in | std::ios::binary), plain(istr), compressed(plain) {}
Poco::FileInputStream plain;
CompressedInputStream compressed;
Poco::FileInputStream istr;
ReadBufferFromIStream plain;
CompressedReadBuffer compressed;
};
typedef std::map<std::string, SharedPtr<Stream> > FileStreams;
@ -54,10 +57,11 @@ private:
struct Stream
{
Stream(const std::string & path)
: plain(path, std::ios::out | std::ios::ate | std::ios::binary), compressed(plain) {}
: ostr(path, std::ios::out | std::ios::ate | std::ios::binary), plain(ostr), compressed(plain) {}
Poco::FileOutputStream plain;
CompressedOutputStream compressed;
Poco::FileOutputStream ostr;
WriteBufferFromOStream plain;
CompressedWriteBuffer compressed;
};
typedef std::map<std::string, SharedPtr<Stream> > FileStreams;

View File

@ -1,70 +0,0 @@
#include <DB/Common/EscapingOutputStream.h>
#define ESCAPING_STREAM_BUFFER_SIZE 4096
namespace DB
{
EscapingStreamBuf::EscapingStreamBuf(std::ostream & ostr)
: p_ostr(&ostr)
{
}
int EscapingStreamBuf::writeToDevice(char c)
{
switch(c)
{
case '\\':
p_ostr->write("\\\\", 2);
break;
case '\b':
p_ostr->write("\\b", 2);
break;
case '\f':
p_ostr->write("\\f", 2);
break;
case '\n':
p_ostr->write("\\n", 2);
break;
case '\r':
p_ostr->write("\\r", 2);
break;
case '\t':
p_ostr->write("\\t", 2);
break;
case '\'':
p_ostr->write("\\'", 2);
break;
case '"':
p_ostr->write("\\\"", 2);
break;
default:
p_ostr->put(c);
}
return 1;
}
EscapingIOS::EscapingIOS(std::ostream & ostr)
: buf(ostr)
{
}
EscapingStreamBuf * EscapingIOS::rdbuf()
{
return &buf;
}
EscapingOutputStream::EscapingOutputStream(std::ostream & ostr)
: EscapingIOS(ostr),
std::ostream(&buf)
{
}
}

View File

@ -1,81 +0,0 @@
#include <DB/Common/UnescapingInputStream.h>
namespace DB
{
UnescapingStreamBuf::UnescapingStreamBuf(std::istream & istr, char delimiter_)
: p_istr(&istr), delimiter(delimiter_)
{
}
int UnescapingStreamBuf::readFromDevice()
{
int res = p_istr->get();
if (!p_istr->good())
return res;
char c = res;
if (c == '\\')
{
res = p_istr->get();
if (!p_istr->good())
return res;
c = res;
switch (c)
{
case '\\':
return '\\';
break;
case 'b':
return '\b';
break;
case 'f':
return '\f';
break;
case 'n':
return '\n';
break;
case 'r':
return '\r';
break;
case 't':
return '\t';
break;
default:
return c;
}
}
else if (c == delimiter)
{
return std::char_traits<char>::eof();
}
else
{
return c;
}
}
UnescapingIOS::UnescapingIOS(std::istream & istr, char delimiter_)
: buf(istr, delimiter_)
{
}
UnescapingStreamBuf * UnescapingIOS::rdbuf()
{
return &buf;
}
UnescapingInputStream::UnescapingInputStream(std::istream & istr, char delimiter_)
: UnescapingIOS(istr, delimiter_),
std::istream(&buf)
{
}
}

View File

@ -1,24 +0,0 @@
#include <iostream>
#include <sstream>
#include <DB/Common/EscapingOutputStream.h>
#include <DB/Common/UnescapingInputStream.h>
int main(int argc, char ** argv)
{
std::string s1("abc'd\"e\\f\\"), s2;
std::stringstream stream;
DB::EscapingOutputStream o(stream);
DB::UnescapingInputStream i(stream, '"');
std::cout << s1 << std::endl;
o << s1;
stream << "\"xxx";
std::cout << stream.str() << std::endl;
std::getline(i, s2);
std::cout << s2 << std::endl;
return 0;
}

View File

@ -1,4 +1,4 @@
#include <DB/Core/ReadBuffer.h>
#include <DB/IO/ReadBuffer.h>
namespace DB
{

View File

@ -1,6 +1,8 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/DataStreams/TabSeparatedRowInputStream.h>
@ -9,7 +11,7 @@ namespace DB
using Poco::SharedPtr;
TabSeparatedRowInputStream::TabSeparatedRowInputStream(std::istream & istr_, SharedPtr<DataTypes> data_types_)
TabSeparatedRowInputStream::TabSeparatedRowInputStream(ReadBuffer & istr_, SharedPtr<DataTypes> data_types_)
: istr(istr_), data_types(data_types_)
{
}
@ -30,27 +32,15 @@ Row TabSeparatedRowInputStream::read()
res.clear();
return res;
}
else if (!istr.good())
throw Exception("Cannot read all data from tab separated input",
ErrorCodes::CANNOT_READ_ALL_DATA_FROM_TAB_SEPARATED_INPUT);
/// пропускаем разделители
if (i + 1 == size)
{
if (istr.peek() == '\n')
istr.ignore();
else if (!istr.eof())
throw Exception("Cannot parse all value in tab separated input",
ErrorCodes::CANNOT_PARSE_ALL_VALUE_FROM_TAB_SEPARATED_INPUT);
if (!istr.eof())
assertString("\n", istr);
}
else
{
if (istr.peek() == '\t')
istr.ignore();
else
throw Exception("Cannot parse all value in tab separated input",
ErrorCodes::CANNOT_PARSE_ALL_VALUE_FROM_TAB_SEPARATED_INPUT);
}
assertString("\t", istr);
}
return res;

View File

@ -1,5 +1,7 @@
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/IO/WriteHelpers.h>
namespace DB
{
@ -7,7 +9,7 @@ namespace DB
using Poco::SharedPtr;
TabSeparatedRowOutputStream::TabSeparatedRowOutputStream(std::ostream & ostr_, SharedPtr<DataTypes> data_types_)
TabSeparatedRowOutputStream::TabSeparatedRowOutputStream(WriteBuffer & ostr_, SharedPtr<DataTypes> data_types_)
: ostr(ostr_), data_types(data_types_), field_number(0)
{
}
@ -22,13 +24,13 @@ void TabSeparatedRowOutputStream::writeField(const Field & field)
void TabSeparatedRowOutputStream::writeFieldDelimiter()
{
ostr.put('\t');
writeChar('\t', ostr);
}
void TabSeparatedRowOutputStream::writeRowEndDelimiter()
{
ostr.put('\n');
writeChar('\n', ostr);
field_number = 0;
}

View File

@ -9,6 +9,9 @@
#include <DB/Core/Block.h>
#include <DB/Core/ColumnWithNameAndType.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
@ -43,9 +46,12 @@ int main(int argc, char ** argv)
std::ifstream istr("test_in");
std::ofstream ostr("test_out");
DB::TabSeparatedRowInputStream row_input(istr, data_types);
DB::ReadBufferFromIStream in_buf(istr);
DB::WriteBufferFromOStream out_buf(ostr);
DB::TabSeparatedRowInputStream row_input(in_buf, data_types);
DB::BlockInputStreamFromRowInputStream block_input(row_input, sample);
DB::TabSeparatedRowOutputStream row_output(ostr, data_types);
DB::TabSeparatedRowOutputStream row_output(out_buf, data_types);
DB::copyData(block_input, row_output);
}

View File

@ -6,6 +6,8 @@
#include <Poco/Stopwatch.h>
#include <Poco/SharedPtr.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataStreams/TabSeparatedRowInputStream.h>
@ -24,8 +26,11 @@ int main(int argc, char ** argv)
std::ifstream istr("test_in");
std::ofstream ostr("test_out");
DB::TabSeparatedRowInputStream row_input(istr, data_types);
DB::TabSeparatedRowOutputStream row_output(ostr, data_types);
DB::ReadBufferFromIStream in_buf(istr);
DB::WriteBufferFromOStream out_buf(ostr);
DB::TabSeparatedRowInputStream row_input(in_buf, data_types);
DB::TabSeparatedRowOutputStream row_output(out_buf, data_types);
DB::copyData(row_input, row_output);
}

View File

@ -1,15 +1,15 @@
#include <Poco/SharedPtr.h>
#include <DB/Common/VarInt.h>
#include <DB/Common/QuoteManipulators.h>
#include <DB/Common/EscapeManipulators.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/VarInt.h>
namespace DB
{
@ -17,19 +17,19 @@ namespace DB
using Poco::SharedPtr;
void DataTypeString::serializeBinary(const Field & field, std::ostream & ostr) const
void DataTypeString::serializeBinary(const Field & field, WriteBuffer & ostr) const
{
const String & s = boost::get<String>(field);
writeVarUInt(s.size(), ostr);
ostr << s;
writeString(s, ostr);
}
void DataTypeString::deserializeBinary(Field & field, std::istream & istr) const
void DataTypeString::deserializeBinary(Field & field, ReadBuffer & istr) const
{
UInt64 size;
readVarUInt(size, istr);
if (!istr.good())
if (istr.eof())
return;
field = String("");
String & s = boost::get<String>(field);
@ -39,7 +39,7 @@ void DataTypeString::deserializeBinary(Field & field, std::istream & istr) const
}
void DataTypeString::serializeBinary(const IColumn & column, std::ostream & ostr) const
void DataTypeString::serializeBinary(const IColumn & column, WriteBuffer & ostr) const
{
const ColumnArray & column_array = dynamic_cast<const ColumnArray &>(column);
const ColumnUInt8::Container_t & data = dynamic_cast<const ColumnUInt8 &>(column_array.getData()).getData();
@ -61,7 +61,7 @@ void DataTypeString::serializeBinary(const IColumn & column, std::ostream & ostr
}
void DataTypeString::deserializeBinary(IColumn & column, std::istream & istr, size_t limit) const
void DataTypeString::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const
{
ColumnArray & column_array = dynamic_cast<ColumnArray &>(column);
ColumnUInt8::Container_t & data = dynamic_cast<ColumnUInt8 &>(column_array.getData()).getData();
@ -76,7 +76,7 @@ void DataTypeString::deserializeBinary(IColumn & column, std::istream & istr, si
UInt64 size;
readVarUInt(size, istr);
if (!istr.good())
if (istr.eof())
break;
offset += size;
@ -87,50 +87,50 @@ void DataTypeString::deserializeBinary(IColumn & column, std::istream & istr, si
istr.read(reinterpret_cast<char*>(&data[offset - size]), sizeof(ColumnUInt8::value_type) * size);
if (!istr.good())
throw Exception("Cannot read all data from stream", ErrorCodes::CANNOT_READ_DATA_FROM_ISTREAM);
if (istr.eof())
throw Exception("Cannot read all data from ReadBuffer", ErrorCodes::CANNOT_READ_DATA_FROM_READ_BUFFER);
}
}
void DataTypeString::serializeText(const Field & field, std::ostream & ostr) const
void DataTypeString::serializeText(const Field & field, WriteBuffer & ostr) const
{
ostr << boost::get<const String &>(field);
writeString(boost::get<const String &>(field), ostr);
}
void DataTypeString::deserializeText(Field & field, std::istream & istr) const
void DataTypeString::deserializeText(Field & field, ReadBuffer & istr) const
{
String s;
istr >> s;
readString(s, istr);
field = s;
}
void DataTypeString::serializeTextEscaped(const Field & field, std::ostream & ostr) const
void DataTypeString::serializeTextEscaped(const Field & field, WriteBuffer & ostr) const
{
ostr << strconvert::escape_file << boost::get<const String &>(field);
writeEscapedString(boost::get<const String &>(field), ostr);
}
void DataTypeString::deserializeTextEscaped(Field & field, std::istream & istr) const
void DataTypeString::deserializeTextEscaped(Field & field, ReadBuffer & istr) const
{
String s;
istr >> strconvert::unescape_file >> s;
readEscapedString(s, istr);
field = s;
}
void DataTypeString::serializeTextQuoted(const Field & field, std::ostream & ostr, bool compatible) const
void DataTypeString::serializeTextQuoted(const Field & field, WriteBuffer & ostr, bool compatible) const
{
ostr << strconvert::quote_fast << boost::get<const String &>(field);
writeQuotedString(boost::get<const String &>(field), ostr);
}
void DataTypeString::deserializeTextQuoted(Field & field, std::istream & istr, bool compatible) const
void DataTypeString::deserializeTextQuoted(Field & field, ReadBuffer & istr, bool compatible) const
{
String s;
istr >> strconvert::unquote_fast >> s;
readQuotedString(s, istr);
field = s;
}

View File

@ -6,6 +6,8 @@
#include <Poco/Stopwatch.h>
#include <Poco/SharedPtr.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/Columns/ColumnString.h>
#include <DB/DataTypes/DataTypeString.h>
@ -32,9 +34,10 @@ int main(int argc, char ** argv)
}
std::ofstream ostr("test");
DB::WriteBufferFromOStream out_buf(ostr);
stopwatch.restart();
data_type.serializeBinary(*column, ostr);
data_type.serializeBinary(*column, out_buf);
stopwatch.stop();
std::cout << "Writing, elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
@ -44,9 +47,10 @@ int main(int argc, char ** argv)
Poco::SharedPtr<DB::ColumnString> column = new DB::ColumnString();
std::ifstream istr("test");
DB::ReadBufferFromIStream in_buf(istr);
stopwatch.restart();
data_type.deserializeBinary(*column, istr, n);
data_type.deserializeBinary(*column, in_buf, n);
stopwatch.stop();
std::cout << "Reading, elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;

View File

@ -4,6 +4,7 @@
#include <Poco/Stopwatch.h>
#include <Poco/SharedPtr.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
@ -22,9 +23,10 @@ int main(int argc, char ** argv)
vec[i] = i;
std::ofstream ostr("test");
DB::WriteBufferFromOStream out_buf(ostr);
stopwatch.restart();
data_type.serializeBinary(*column, ostr);
data_type.serializeBinary(*column, out_buf);
stopwatch.stop();
std::cout << "Elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;

View File

@ -4,6 +4,7 @@
#include <Poco/Stopwatch.h>
#include <Poco/SharedPtr.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/DataTypes/DataTypesNumberVariable.h>
@ -22,9 +23,10 @@ int main(int argc, char ** argv)
vec[i] = i;
std::ofstream ostr("test");
DB::WriteBufferFromOStream out_buf(ostr);
stopwatch.restart();
data_type.serializeBinary(*column, ostr);
data_type.serializeBinary(*column, out_buf);
stopwatch.stop();
std::cout << "Elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;

View File

@ -1,6 +1,6 @@
#include <algorithm>
#include <DB/Common/CompressedInputStream.h>
#include <DB/IO/CompressedInputStream.h>
namespace DB

View File

@ -1,6 +1,6 @@
#include <algorithm>
#include <DB/Common/CompressedOutputStream.h>
#include <DB/IO/CompressedOutputStream.h>
namespace DB

100
dbms/src/IO/ReadHelpers.cpp Normal file
View File

@ -0,0 +1,100 @@
#include <DB/IO/ReadHelpers.h>
namespace DB
{
void assertString(const char * s, ReadBuffer & buf)
{
for (; *s; ++s)
{
if (buf.eof() || *buf.position() != *s)
throw Exception(String("Cannot parse input: expected ") + s, ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
++buf.position();
}
}
void readString(String & s, ReadBuffer & buf)
{
s = "";
while (!buf.eof())
{
size_t bytes = 0;
for (; buf.position() + bytes != buf.buffer().end(); ++bytes)
if (buf.position()[bytes] == '\t' || buf.position()[bytes] == '\n')
break;
s.append(buf.position(), bytes);
buf.position() += bytes;
if (buf.position() != buf.buffer().end())
return;
}
}
void readEscapedString(String & s, ReadBuffer & buf)
{
s = "";
while (!buf.eof())
{
size_t bytes = 0;
for (; buf.position() + bytes != buf.buffer().end(); ++bytes)
if (buf.position()[bytes] == '\\' || buf.position()[bytes] == '\t' || buf.position()[bytes] == '\n')
break;
s.append(buf.position(), bytes);
buf.position() += bytes;
if (*buf.position() == '\t' || *buf.position() == '\n')
return;
if (*buf.position() == '\\')
{
++buf.position();
if (buf.eof())
throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
s += parseEscapeSequence(*buf.position());
++buf.position();
}
}
}
void readQuotedString(String & s, ReadBuffer & buf)
{
s = "";
if (buf.eof() || *buf.position() != '\'')
throw Exception("Cannot parse quoted string: expected opening single quote",
ErrorCodes::CANNOT_PARSE_QUOTED_STRING);
++buf.position();
while (!buf.eof())
{
size_t bytes = 0;
for (; buf.position() + bytes != buf.buffer().end(); ++bytes)
if (buf.position()[bytes] == '\\' || buf.position()[bytes] == '\'')
break;
s.append(buf.position(), bytes);
buf.position() += bytes;
if (*buf.position() == '\'')
{
++buf.position();
return;
}
if (*buf.position() == '\\')
{
++buf.position();
if (buf.eof())
throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
s += parseEscapeSequence(*buf.position());
++buf.position();
}
}
throw Exception("Cannot parse quoted string: expected closing single quote",
ErrorCodes::CANNOT_PARSE_QUOTED_STRING);
}
}

View File

@ -1,7 +1,10 @@
#include <istream>
#include <ostream>
#include <DB/Common/VarInt.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/VarInt.h>
#include <Poco/Types.h>
@ -140,6 +143,136 @@ void readVarUInt(UInt64 & x, std::istream & istr)
}
void writeVarUInt(UInt64 x, WriteBuffer & ostr)
{
char buf[9];
buf[0] = static_cast<Poco::UInt8>(x | 0x80);
if (x >= (1ULL << 7))
{
buf[1] = static_cast<Poco::UInt8>((x >> 7) | 0x80);
if (x >= (1ULL << 14))
{
buf[2] = static_cast<Poco::UInt8>((x >> 14) | 0x80);
if (x >= (1ULL << 21))
{
buf[3] = static_cast<Poco::UInt8>((x >> 21) | 0x80);
if (x >= (1ULL << 28))
{
buf[4] = static_cast<Poco::UInt8>((x >> 28) | 0x80);
if (x >= (1ULL << 35))
{
buf[5] = static_cast<Poco::UInt8>((x >> 35) | 0x80);
if (x >= (1ULL << 42))
{
buf[6] = static_cast<Poco::UInt8>((x >> 42) | 0x80);
if (x >= (1ULL << 49))
{
buf[7] = static_cast<Poco::UInt8>((x >> 49) | 0x80);
if (x >= (1ULL << 56))
{
buf[8] = static_cast<Poco::UInt8>((x >> 56) | 0x80);
ostr.write(buf, 9);
}
else
{
buf[7] &= 0x7F;
ostr.write(buf, 8);
}
}
else
{
buf[6] &= 0x7F;
ostr.write(buf, 7);
}
}
else
{
buf[5] &= 0x7F;
ostr.write(buf, 6);
}
}
else
{
buf[4] &= 0x7F;
ostr.write(buf, 5);
}
}
else
{
buf[3] &= 0x7F;
ostr.write(buf, 4);
}
}
else
{
buf[2] &= 0x7F;
ostr.write(buf, 3);
}
}
else
{
buf[1] &= 0x7F;
ostr.write(buf, 2);
}
}
else
{
buf[0] &= 0x7F;
ostr.write(buf, 1);
}
}
void readVarUInt(UInt64 & x, ReadBuffer & istr)
{
char byte;
readChar(byte, istr);
x = static_cast<Poco::UInt64>(byte) & 0x7F;
if (byte & 0x80)
{
readChar(byte, istr);
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 7;
if (byte & 0x80)
{
readChar(byte, istr);
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 14;
if (byte & 0x80)
{
readChar(byte, istr);
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 21;
if (byte & 0x80)
{
readChar(byte, istr);
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 28;
if (byte & 0x80)
{
readChar(byte, istr);
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 35;
if (byte & 0x80)
{
readChar(byte, istr);
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 42;
if (byte & 0x80)
{
readChar(byte, istr);
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 49;
if (byte & 0x80)
{
readChar(byte, istr);
x |= (static_cast<Poco::UInt64>(byte) & 0x7F) << 56;
}
}
}
}
}
}
}
}
}
size_t getLengthOfVarUInt(UInt64 x)
{
return x < (1ULL << 7) ? 1

View File

@ -0,0 +1,60 @@
#include <DB/IO/WriteHelpers.h>
namespace DB
{
template <> const char * IntFormat<Int8>::format = "%hhi";
template <> const char * IntFormat<Int16>::format = "%hi";
template <> const char * IntFormat<Int32>::format = "%li";
template <> const char * IntFormat<Int64>::format = "%lli";
template <> const char * IntFormat<UInt8>::format = "%hhi";
template <> const char * IntFormat<UInt16>::format = "%hi";
template <> const char * IntFormat<UInt32>::format = "%li";
template <> const char * IntFormat<UInt64>::format = "%lli";
void writeEscapedString(const String & s, WriteBuffer & buf)
{
for (String::const_iterator it = s.begin(); it != s.end(); ++it)
{
switch (*it)
{
case '\b':
writeChar('\\', buf);
writeChar('b', buf);
break;
case '\f':
writeChar('\\', buf);
writeChar('f', buf);
break;
case '\n':
writeChar('\\', buf);
writeChar('n', buf);
break;
case '\r':
writeChar('\\', buf);
writeChar('r', buf);
break;
case '\t':
writeChar('\\', buf);
writeChar('t', buf);
break;
case '\0':
writeChar('\\', buf);
writeChar('0', buf);
break;
case '\'':
writeChar('\\', buf);
writeChar('\'', buf);
break;
case '\\':
writeChar('\\', buf);
writeChar('\\', buf);
break;
default:
writeChar(*it, buf);
}
}
}
}

View File

@ -3,7 +3,7 @@
#include <iostream>
#include <sstream>
#include <DB/Core/ReadBufferFromIStream.h>
#include <DB/IO/ReadBufferFromIStream.h>
int main(int argc, char ** argv)

View File

@ -3,7 +3,7 @@
#include <iostream>
#include <fstream>
#include <DB/Core/ReadBufferFromIStream.h>
#include <DB/IO/ReadBufferFromIStream.h>
int main(int argc, char ** argv)

View File

@ -3,7 +3,7 @@
#include <iostream>
#include <sstream>
#include <DB/Core/WriteBufferFromOStream.h>
#include <DB/IO/WriteBufferFromOStream.h>
int main(int argc, char ** argv)

View File

@ -3,7 +3,7 @@
#include <iostream>
#include <fstream>
#include <DB/Core/WriteBufferFromOStream.h>
#include <DB/IO/WriteBufferFromOStream.h>
int main(int argc, char ** argv)

View File

@ -6,6 +6,9 @@
#include <Poco/SharedPtr.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
@ -111,7 +114,9 @@ int main(int argc, char ** argv)
/// читаем данные из tsv файла и одновременно пишем в таблицу
if (argc == 2 && 0 == strcmp(argv[1], "write"))
{
DB::TabSeparatedRowInputStream in(std::cin, data_types);
DB::ReadBufferFromIStream in_buf(std::cin);
DB::TabSeparatedRowInputStream in(in_buf, data_types);
SharedPtr<DB::IBlockOutputStream> out = table.write(0);
DB::copyData(in, *out, sample);
}
@ -127,8 +132,10 @@ int main(int argc, char ** argv)
boost::assign::push_back(*data_types)
(new DB::DataTypeString);
*/
DB::WriteBufferFromOStream out_buf(std::cout);
SharedPtr<DB::IBlockInputStream> in = table.read(column_names, 0);
DB::TabSeparatedRowOutputStream out(std::cout, data_types);
DB::TabSeparatedRowOutputStream out(out_buf, data_types);
DB::copyData(*in, out);
}
}

View File

@ -2,6 +2,7 @@
#include <Poco/SharedPtr.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/Storages/StorageLog.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/LimitBlockInputStream.h>
@ -69,9 +70,11 @@ int main(int argc, char ** argv)
Poco::SharedPtr<DB::DataTypes> data_types = new DB::DataTypes;
data_types->push_back(new DB::DataTypeUInt64);
data_types->push_back(new DB::DataTypeUInt8);
DB::WriteBufferFromOStream out_buf(std::cout);
DB::LimitBlockInputStream in_limit(in, 10);
DB::TabSeparatedRowOutputStream output(std::cout, data_types);
DB::TabSeparatedRowOutputStream output(out_buf, data_types);
DB::copyData(in_limit, output);
}

View File

@ -2,6 +2,7 @@
#include <Poco/SharedPtr.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/Storages/StorageSystemNumbers.h>
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
@ -22,9 +23,11 @@ int main(int argc, char ** argv)
Poco::SharedPtr<DB::DataTypes> column_types = new DB::DataTypes;
column_types->push_back(new DB::DataTypeUInt64);
DB::WriteBufferFromOStream out_buf(std::cout);
DB::LimitBlockInputStream input(table.read(column_names, 0, 10), 10, 96);
DB::TabSeparatedRowOutputStream output(std::cout, column_types);
DB::TabSeparatedRowOutputStream output(out_buf, column_types);
DB::copyData(input, output);
}