diff --git a/dbms/include/DB/DataTypes/DataTypesNumberFixed.h b/dbms/include/DB/DataTypes/DataTypesNumberFixed.h index cabd5ce7fd3..3b8f7a40c85 100644 --- a/dbms/include/DB/DataTypes/DataTypesNumberFixed.h +++ b/dbms/include/DB/DataTypes/DataTypesNumberFixed.h @@ -28,8 +28,8 @@ DEFINE_DATA_TYPE_NUMBER_FIXED(Int16); DEFINE_DATA_TYPE_NUMBER_FIXED(Int32); DEFINE_DATA_TYPE_NUMBER_FIXED(Int64); -DEFINE_DATA_TYPE_NUMBER_FIXED(Float32); -DEFINE_DATA_TYPE_NUMBER_FIXED(Float64); +/* DEFINE_DATA_TYPE_NUMBER_FIXED(Float32); +DEFINE_DATA_TYPE_NUMBER_FIXED(Float64); */ } diff --git a/dbms/include/DB/IO/CompressedInputStream.h b/dbms/include/DB/IO/CompressedInputStream.h index 94add4a8502..34f0659d38c 100644 --- a/dbms/include/DB/IO/CompressedInputStream.h +++ b/dbms/include/DB/IO/CompressedInputStream.h @@ -7,8 +7,6 @@ #include -#include - #include diff --git a/dbms/include/DB/IO/CompressedReadBuffer.h b/dbms/include/DB/IO/CompressedReadBuffer.h index 81e1cf59cd6..54c5e41b47d 100644 --- a/dbms/include/DB/IO/CompressedReadBuffer.h +++ b/dbms/include/DB/IO/CompressedReadBuffer.h @@ -45,48 +45,31 @@ public: compressed_buffer.resize(size_compressed); decompressed_buffer.resize(size_decompressed); - std::cerr << size_compressed << ", " << size_decompressed << std::endl; - in.readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE); - - std::cerr << "#" << std::endl; - - qlz_decompress(&compressed_buffer[0], &decompressed_buffer[0], &scratch[0]); - - std::cerr << "##" << std::endl; + + qlz_decompress(&compressed_buffer[0], &decompressed_buffer[0], scratch); pos_in_buffer = 0; } bool next() { - std::cerr << "?" << std::endl; - if (pos_in_buffer == decompressed_buffer.size()) { - std::cerr << "??" << std::endl; - if (in.eof()) return false; readCompressedChunk(); - - std::cerr << "!" << std::endl; } size_t bytes_to_copy = std::min(decompressed_buffer.size() - pos_in_buffer, static_cast(DEFAULT_READ_BUFFER_SIZE)); std::memcpy(internal_buffer, &decompressed_buffer[pos_in_buffer], bytes_to_copy); - std::cerr << "!!" << std::endl; - pos_in_buffer += bytes_to_copy; pos = internal_buffer; working_buffer = Buffer(internal_buffer, internal_buffer + bytes_to_copy); - /* std::cerr.write(internal_buffer, bytes_to_copy); - std::cerr << std::endl;*/ - return true; } }; diff --git a/dbms/include/DB/IO/CompressedStream.h b/dbms/include/DB/IO/CompressedStream.h index 1c1e0e55eff..07c30eba832 100644 --- a/dbms/include/DB/IO/CompressedStream.h +++ b/dbms/include/DB/IO/CompressedStream.h @@ -5,6 +5,7 @@ #define DBMS_STREAM_BUFFER_SIZE 4096 #define DBMS_COMPRESSING_STREAM_BUFFER_SIZE 1048576 + //1048576 #define QUICKLZ_ADDITIONAL_SPACE 400 #define QUICKLZ_HEADER_SIZE 9 diff --git a/dbms/include/DB/IO/ReadBuffer.h b/dbms/include/DB/IO/ReadBuffer.h index e87a9c3eba5..fa61815ff04 100644 --- a/dbms/include/DB/IO/ReadBuffer.h +++ b/dbms/include/DB/IO/ReadBuffer.h @@ -80,7 +80,7 @@ public: while (!eof() && bytes_copied < n) { size_t bytes_to_copy = std::min(static_cast(working_buffer.end() - pos), n - bytes_copied); - std::memcpy(to, pos, bytes_to_copy); + std::memcpy(to + bytes_copied, pos, bytes_to_copy); pos += bytes_to_copy; bytes_copied += bytes_to_copy; } diff --git a/dbms/include/DB/IO/WriteHelpers.h b/dbms/include/DB/IO/WriteHelpers.h index 225b0d5610d..c3e56cb797e 100644 --- a/dbms/include/DB/IO/WriteHelpers.h +++ b/dbms/include/DB/IO/WriteHelpers.h @@ -12,9 +12,9 @@ #include -#define WRITE_HELPERS_DEFAULT_FLOAT_PRECISION 6 -/// 20 цифр, знак, и \0 для конца строки -#define WRITE_HELPERS_MAX_INT_WIDTH 22 +#define WRITE_HELPERS_DEFAULT_FLOAT_PRECISION 6U +/// 20 цифр и знак +#define WRITE_HELPERS_MAX_INT_WIDTH 21U namespace DB @@ -36,12 +36,33 @@ template void writeIntText(T x, WriteBuffer & buf) { char tmp[WRITE_HELPERS_MAX_INT_WIDTH]; - int res = std::snprintf(tmp, WRITE_HELPERS_MAX_INT_WIDTH, IntFormat::format, x); + bool negative = false; - if (res >= WRITE_HELPERS_MAX_INT_WIDTH || res <= 0) - throw Exception("Cannot print integer", ErrorCodes::CANNOT_PRINT_INTEGER); + if (x == 0) + { + writeChar('0', buf); + return; + } - buf.write(tmp, res); + if (x < 0) + { + x = -x; + negative = true; + } + + char * pos; + for (pos = tmp + WRITE_HELPERS_MAX_INT_WIDTH - 1; x != 0; --pos) + { + *pos = '0' + x % 10; + x /= 10; + } + + if (negative) + *pos = '-'; + else + ++pos; + + buf.write(pos, tmp + WRITE_HELPERS_MAX_INT_WIDTH - pos); } template diff --git a/dbms/src/IO/CompressedInputStream.cpp b/dbms/src/IO/CompressedInputStream.cpp index 62cb668e142..5a8edfe8e2c 100644 --- a/dbms/src/IO/CompressedInputStream.cpp +++ b/dbms/src/IO/CompressedInputStream.cpp @@ -1,5 +1,7 @@ #include +#include + #include @@ -48,7 +50,6 @@ void DecompressingStreamBuf::readCompressedChunk() return; /// разжимаем блок - qlz_decompress(&compressed_buffer[0], &uncompressed_buffer[0], &scratch[0]); } diff --git a/dbms/src/IO/tests/compressed_buffer.cpp b/dbms/src/IO/tests/compressed_buffer.cpp index 87214347a1e..6a249ffa438 100644 --- a/dbms/src/IO/tests/compressed_buffer.cpp +++ b/dbms/src/IO/tests/compressed_buffer.cpp @@ -1,8 +1,11 @@ #include #include +#include #include +#include + #include #include #include @@ -19,14 +22,21 @@ int main(int argc, char ** argv) try { size_t n = 10000000; + Poco::Stopwatch stopwatch; { std::ofstream ostr("test1"); DB::WriteBufferFromOStream buf(ostr); DB::CompressedWriteBuffer compressed_buf(buf); + stopwatch.restart(); for (size_t i = 0; i < n; ++i) + { DB::writeIntText(i, compressed_buf); + DB::writeChar('\t', compressed_buf); + } + stopwatch.stop(); + std::cout << "Writing done (1). Elapsed: " << static_cast(stopwatch.elapsed()) / 1000000 << std::endl; } { @@ -34,26 +44,39 @@ int main(int argc, char ** argv) DB::CompressedOutputStream compressed_ostr(ostr); DB::WriteBufferFromOStream compressed_buf(compressed_ostr); + stopwatch.restart(); for (size_t i = 0; i < n; ++i) + { DB::writeIntText(i, compressed_buf); + DB::writeChar('\t', compressed_buf); + } + stopwatch.stop(); + std::cout << "Writing done (2). Elapsed: " << static_cast(stopwatch.elapsed()) / 1000000 << std::endl; } - std::cerr << "Writing done." << std::endl; - - /*{ + { std::ifstream istr("test1"); DB::ReadBufferFromIStream buf(istr); DB::CompressedReadBuffer compressed_buf(buf); std::string s; + stopwatch.restart(); for (size_t i = 0; i < n; ++i) { size_t x; DB::readIntText(x, compressed_buf); + compressed_buf.ignore(); + if (x != i) - throw DB::Exception("Failed!"); + { + std::stringstream s; + s << "Failed!, read: " << x << ", expected: " << i; + throw DB::Exception(s.str()); + } } - }*/ + stopwatch.stop(); + std::cout << "Reading done (1). Elapsed: " << static_cast(stopwatch.elapsed()) / 1000000 << std::endl; + } { std::ifstream istr("test2"); @@ -61,16 +84,23 @@ int main(int argc, char ** argv) DB::ReadBufferFromIStream compressed_buf(compressed_istr); std::string s; + stopwatch.restart(); for (size_t i = 0; i < n; ++i) { size_t x; DB::readIntText(x, compressed_buf); + compressed_buf.ignore(); + if (x != i) - throw DB::Exception("Failed!"); + { + std::stringstream s; + s << "Failed!, read: " << x << ", expected: " << i; + throw DB::Exception(s.str()); + } } + stopwatch.stop(); + std::cout << "Reading done (2). Elapsed: " << static_cast(stopwatch.elapsed()) / 1000000 << std::endl; } - - std::cerr << "Reading done." << std::endl; } catch (const DB::Exception & e) {