dbms: development.

This commit is contained in:
Alexey Milovidov 2010-06-07 17:14:13 +00:00
parent a10879ca79
commit f409cf1540
8 changed files with 74 additions and 40 deletions

View File

@ -28,8 +28,8 @@ DEFINE_DATA_TYPE_NUMBER_FIXED(Int16);
DEFINE_DATA_TYPE_NUMBER_FIXED(Int32); DEFINE_DATA_TYPE_NUMBER_FIXED(Int32);
DEFINE_DATA_TYPE_NUMBER_FIXED(Int64); DEFINE_DATA_TYPE_NUMBER_FIXED(Int64);
DEFINE_DATA_TYPE_NUMBER_FIXED(Float32); /* DEFINE_DATA_TYPE_NUMBER_FIXED(Float32);
DEFINE_DATA_TYPE_NUMBER_FIXED(Float64); DEFINE_DATA_TYPE_NUMBER_FIXED(Float64); */
} }

View File

@ -7,8 +7,6 @@
#include <Poco/BufferedStreamBuf.h> #include <Poco/BufferedStreamBuf.h>
#include <quicklz/quicklz_level1.h>
#include <DB/IO/CompressedStream.h> #include <DB/IO/CompressedStream.h>

View File

@ -45,48 +45,31 @@ public:
compressed_buffer.resize(size_compressed); compressed_buffer.resize(size_compressed);
decompressed_buffer.resize(size_decompressed); decompressed_buffer.resize(size_decompressed);
std::cerr << size_compressed << ", " << size_decompressed << std::endl;
in.readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE); in.readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE);
std::cerr << "#" << std::endl; qlz_decompress(&compressed_buffer[0], &decompressed_buffer[0], scratch);
qlz_decompress(&compressed_buffer[0], &decompressed_buffer[0], &scratch[0]);
std::cerr << "##" << std::endl;
pos_in_buffer = 0; pos_in_buffer = 0;
} }
bool next() bool next()
{ {
std::cerr << "?" << std::endl;
if (pos_in_buffer == decompressed_buffer.size()) if (pos_in_buffer == decompressed_buffer.size())
{ {
std::cerr << "??" << std::endl;
if (in.eof()) if (in.eof())
return false; return false;
readCompressedChunk(); readCompressedChunk();
std::cerr << "!" << std::endl;
} }
size_t bytes_to_copy = std::min(decompressed_buffer.size() - pos_in_buffer, size_t bytes_to_copy = std::min(decompressed_buffer.size() - pos_in_buffer,
static_cast<size_t>(DEFAULT_READ_BUFFER_SIZE)); static_cast<size_t>(DEFAULT_READ_BUFFER_SIZE));
std::memcpy(internal_buffer, &decompressed_buffer[pos_in_buffer], bytes_to_copy); std::memcpy(internal_buffer, &decompressed_buffer[pos_in_buffer], bytes_to_copy);
std::cerr << "!!" << std::endl;
pos_in_buffer += bytes_to_copy; pos_in_buffer += bytes_to_copy;
pos = internal_buffer; pos = internal_buffer;
working_buffer = Buffer(internal_buffer, internal_buffer + bytes_to_copy); working_buffer = Buffer(internal_buffer, internal_buffer + bytes_to_copy);
/* std::cerr.write(internal_buffer, bytes_to_copy);
std::cerr << std::endl;*/
return true; return true;
} }
}; };

View File

@ -5,6 +5,7 @@
#define DBMS_STREAM_BUFFER_SIZE 4096 #define DBMS_STREAM_BUFFER_SIZE 4096
#define DBMS_COMPRESSING_STREAM_BUFFER_SIZE 1048576 #define DBMS_COMPRESSING_STREAM_BUFFER_SIZE 1048576
//1048576
#define QUICKLZ_ADDITIONAL_SPACE 400 #define QUICKLZ_ADDITIONAL_SPACE 400
#define QUICKLZ_HEADER_SIZE 9 #define QUICKLZ_HEADER_SIZE 9

View File

@ -80,7 +80,7 @@ public:
while (!eof() && bytes_copied < n) while (!eof() && bytes_copied < n)
{ {
size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied); size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
std::memcpy(to, pos, bytes_to_copy); std::memcpy(to + bytes_copied, pos, bytes_to_copy);
pos += bytes_to_copy; pos += bytes_to_copy;
bytes_copied += bytes_to_copy; bytes_copied += bytes_to_copy;
} }

View File

@ -12,9 +12,9 @@
#include <DB/IO/WriteBuffer.h> #include <DB/IO/WriteBuffer.h>
#define WRITE_HELPERS_DEFAULT_FLOAT_PRECISION 6 #define WRITE_HELPERS_DEFAULT_FLOAT_PRECISION 6U
/// 20 цифр, знак, и \0 для конца строки /// 20 цифр и знак
#define WRITE_HELPERS_MAX_INT_WIDTH 22 #define WRITE_HELPERS_MAX_INT_WIDTH 21U
namespace DB namespace DB
@ -36,12 +36,33 @@ template <typename T>
void writeIntText(T x, WriteBuffer & buf) void writeIntText(T x, WriteBuffer & buf)
{ {
char tmp[WRITE_HELPERS_MAX_INT_WIDTH]; char tmp[WRITE_HELPERS_MAX_INT_WIDTH];
int res = std::snprintf(tmp, WRITE_HELPERS_MAX_INT_WIDTH, IntFormat<T>::format, x); bool negative = false;
if (res >= WRITE_HELPERS_MAX_INT_WIDTH || res <= 0) if (x == 0)
throw Exception("Cannot print integer", ErrorCodes::CANNOT_PRINT_INTEGER); {
writeChar('0', buf);
return;
}
buf.write(tmp, res); if (x < 0)
{
x = -x;
negative = true;
}
char * pos;
for (pos = tmp + WRITE_HELPERS_MAX_INT_WIDTH - 1; x != 0; --pos)
{
*pos = '0' + x % 10;
x /= 10;
}
if (negative)
*pos = '-';
else
++pos;
buf.write(pos, tmp + WRITE_HELPERS_MAX_INT_WIDTH - pos);
} }
template <typename T> template <typename T>

View File

@ -1,5 +1,7 @@
#include <algorithm> #include <algorithm>
#include <quicklz/quicklz_level1.h>
#include <DB/IO/CompressedInputStream.h> #include <DB/IO/CompressedInputStream.h>
@ -48,7 +50,6 @@ void DecompressingStreamBuf::readCompressedChunk()
return; return;
/// разжимаем блок /// разжимаем блок
qlz_decompress(&compressed_buffer[0], &uncompressed_buffer[0], &scratch[0]); qlz_decompress(&compressed_buffer[0], &uncompressed_buffer[0], &scratch[0]);
} }

View File

@ -1,8 +1,11 @@
#include <string> #include <string>
#include <iostream> #include <iostream>
#include <sstream>
#include <fstream> #include <fstream>
#include <Poco/Stopwatch.h>
#include <DB/Core/Types.h> #include <DB/Core/Types.h>
#include <DB/IO/WriteBufferFromOStream.h> #include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromIStream.h> #include <DB/IO/ReadBufferFromIStream.h>
@ -19,14 +22,21 @@ int main(int argc, char ** argv)
try try
{ {
size_t n = 10000000; size_t n = 10000000;
Poco::Stopwatch stopwatch;
{ {
std::ofstream ostr("test1"); std::ofstream ostr("test1");
DB::WriteBufferFromOStream buf(ostr); DB::WriteBufferFromOStream buf(ostr);
DB::CompressedWriteBuffer compressed_buf(buf); DB::CompressedWriteBuffer compressed_buf(buf);
stopwatch.restart();
for (size_t i = 0; i < n; ++i) for (size_t i = 0; i < n; ++i)
{
DB::writeIntText(i, compressed_buf); DB::writeIntText(i, compressed_buf);
DB::writeChar('\t', compressed_buf);
}
stopwatch.stop();
std::cout << "Writing done (1). Elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
} }
{ {
@ -34,26 +44,39 @@ int main(int argc, char ** argv)
DB::CompressedOutputStream compressed_ostr(ostr); DB::CompressedOutputStream compressed_ostr(ostr);
DB::WriteBufferFromOStream compressed_buf(compressed_ostr); DB::WriteBufferFromOStream compressed_buf(compressed_ostr);
stopwatch.restart();
for (size_t i = 0; i < n; ++i) for (size_t i = 0; i < n; ++i)
{
DB::writeIntText(i, compressed_buf); DB::writeIntText(i, compressed_buf);
DB::writeChar('\t', compressed_buf);
}
stopwatch.stop();
std::cout << "Writing done (2). Elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
} }
std::cerr << "Writing done." << std::endl; {
/*{
std::ifstream istr("test1"); std::ifstream istr("test1");
DB::ReadBufferFromIStream buf(istr); DB::ReadBufferFromIStream buf(istr);
DB::CompressedReadBuffer compressed_buf(buf); DB::CompressedReadBuffer compressed_buf(buf);
std::string s; std::string s;
stopwatch.restart();
for (size_t i = 0; i < n; ++i) for (size_t i = 0; i < n; ++i)
{ {
size_t x; size_t x;
DB::readIntText(x, compressed_buf); DB::readIntText(x, compressed_buf);
compressed_buf.ignore();
if (x != i) if (x != i)
throw DB::Exception("Failed!"); {
std::stringstream s;
s << "Failed!, read: " << x << ", expected: " << i;
throw DB::Exception(s.str());
}
} }
}*/ stopwatch.stop();
std::cout << "Reading done (1). Elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
}
{ {
std::ifstream istr("test2"); std::ifstream istr("test2");
@ -61,16 +84,23 @@ int main(int argc, char ** argv)
DB::ReadBufferFromIStream compressed_buf(compressed_istr); DB::ReadBufferFromIStream compressed_buf(compressed_istr);
std::string s; std::string s;
stopwatch.restart();
for (size_t i = 0; i < n; ++i) for (size_t i = 0; i < n; ++i)
{ {
size_t x; size_t x;
DB::readIntText(x, compressed_buf); DB::readIntText(x, compressed_buf);
compressed_buf.ignore();
if (x != i) if (x != i)
throw DB::Exception("Failed!"); {
std::stringstream s;
s << "Failed!, read: " << x << ", expected: " << i;
throw DB::Exception(s.str());
}
} }
stopwatch.stop();
std::cout << "Reading done (2). Elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
} }
std::cerr << "Reading done." << std::endl;
} }
catch (const DB::Exception & e) catch (const DB::Exception & e)
{ {