diff --git a/dbms/include/DB/IO/HashingReadBuffer.h b/dbms/include/DB/IO/HashingReadBuffer.h index 4b9750e332e..f04c36b7253 100644 --- a/dbms/include/DB/IO/HashingReadBuffer.h +++ b/dbms/include/DB/IO/HashingReadBuffer.h @@ -14,13 +14,16 @@ class HashingReadBuffer : public BufferWithOwnMemory { public: HashingReadBuffer(ReadBuffer & in_, size_t block_size = DBMS_DEFAULT_HASHING_BLOCK_SIZE) : - block_pos(0), block_size(DBMS_DEFAULT_HASHING_BLOCK_SIZE), state(0, 0), in(in_), ignore_before_this(nullptr) + block_pos(0), block_size(DBMS_DEFAULT_HASHING_BLOCK_SIZE), state(0, 0), in(in_) { working_buffer = in.buffer(); + pos = in.position(); - /// если какая то часть данных уже была прочитана до нас, то не дадим этим данным повлиять на хэш - if (in.position() != in.buffer().begin()) - ignore_before_this = in.position(); + /// считаем хэш от уже прочитанных данных + if (working_buffer.size()) + { + calculateHash(pos, working_buffer.end() - pos); + } } uint128 getHash() @@ -39,19 +42,8 @@ private: state = CityHash128WithSeed(data, block_size, state); } - bool nextImpl() override + void calculateHash(Position data, size_t len) { - size_t len = working_buffer.size(); - Position data = working_buffer.begin(); - - /// корректировка на данные прочитанные до нас - if (ignore_before_this) - { - len -= ignore_before_this - working_buffer.begin(); - data = ignore_before_this; - ignore_before_this = nullptr; - } - if (len) { /// если данных меньше, чем block_size то сложим их в свой буффер и посчитаем от них hash позже @@ -66,18 +58,18 @@ private: if (block_pos) { size_t n = block_size - block_pos; - memcpy(&memory[block_pos], data, len); + memcpy(&memory[block_pos], data, n); append(&memory[0]); len -= n; data += n; block_pos = 0; } - while (len >= block_pos) + while (len >= block_size) { append(data); - len -= block_pos; - data += block_pos; + len -= block_size; + data += block_size; } if (len) @@ -87,9 +79,17 @@ private: } } } + } + bool nextImpl() override + { + in.position() = pos; bool res = in.next(); working_buffer = in.buffer(); + pos = in.position(); + + calculateHash(working_buffer.begin(), working_buffer.size()); + return res; } @@ -98,8 +98,5 @@ private: size_t block_size; uint128 state; ReadBuffer & in; - - /// игнорируем уже прочитанные данные - Position ignore_before_this; }; } diff --git a/dbms/src/IO/tests/hashing_buffer.h b/dbms/src/IO/tests/hashing_buffer.h new file mode 100644 index 00000000000..22c96add418 --- /dev/null +++ b/dbms/src/IO/tests/hashing_buffer.h @@ -0,0 +1,22 @@ +#include +#include + +#define FAIL(msg) { std::cout << msg; exit(1); } + + +uint128 referenceHash(const char * data, size_t len) +{ + const size_t block_size = DBMS_DEFAULT_HASHING_BLOCK_SIZE; + uint128 state(0, 0); + size_t pos; + + for (pos = 0; pos + block_size <= len; pos += block_size) + { + state = CityHash128WithSeed(data + pos, block_size, state); + } + + if (pos < len) + state = CityHash128WithSeed(data + pos, len - pos, state); + + return state; +} diff --git a/dbms/src/IO/tests/hashing_read_buffer.cpp b/dbms/src/IO/tests/hashing_read_buffer.cpp new file mode 100644 index 00000000000..ad65704b207 --- /dev/null +++ b/dbms/src/IO/tests/hashing_read_buffer.cpp @@ -0,0 +1,73 @@ +#include +#include +#include +#include "hashing_buffer.h" +#include + +void test(size_t data_size) +{ + std::vector vec(data_size); + char * data = &vec[0]; + + for (size_t i = 0; i < data_size; ++i) + data[i] = rand() & 255; + + uint128 reference = referenceHash(data, data_size); + + std::vector block_sizes = {56, 128, 513, 2048, 3055, 4097, 4096}; + for (size_t read_buffer_block_size : block_sizes) + { + std::cout << "block size " << read_buffer_block_size << std::endl; + std::stringstream io; + DB::WriteBufferFromOStream out_(io); + DB::HashingWriteBuffer out(out_); + out.write(data, data_size); + out.next(); + + //std::cout.write(data, data_size); + //std::cout << std::endl; + //std::cout << io.str() << std::endl; + + DB::ReadBufferFromIStream source(io, read_buffer_block_size); + DB::HashingReadBuffer buf(source); + + std::vector read_buf(data_size); + buf.read(read_buf.data(), data_size); + + bool failed_to_read = false; + for (size_t i = 0; i < data_size; ++i) + if (read_buf[i] != vec[i]) + { + failed_to_read = true; + } + + if (failed_to_read) + { + std::cout.write(data, data_size); + std::cout << std::endl; + std::cout.write(read_buf.data(), data_size); + std::cout << std::endl; + FAIL("Fail to read data"); + } + + if (buf.getHash() != reference) + { + std::cout << uint128ToString(buf.getHash()) << " " << uint128ToString(reference) << std::endl; + FAIL("failed on data size " << data_size << " reading by blocks of size " << read_buffer_block_size); + } + if (buf.getHash() != out.getHash()) + FAIL("Hash of HashingReadBuffer doesn't match with hash of HashingWriteBuffer on data size " << data_size << " reading by blocks of size " << read_buffer_block_size); + } +} + +int main() +{ + test(5); + test(100); + test(2048); + test(2049); + test(100000); + test(1 << 17); + + return 0; +} diff --git a/dbms/src/IO/tests/hashing_write_buffer.cpp b/dbms/src/IO/tests/hashing_write_buffer.cpp index 1143ae759af..7aa8b423f6b 100644 --- a/dbms/src/IO/tests/hashing_write_buffer.cpp +++ b/dbms/src/IO/tests/hashing_write_buffer.cpp @@ -1,26 +1,7 @@ #include #include -#define FAIL(msg) { std::cout << msg; exit(1); } - - -uint128 referenceHash(char * data, size_t len) -{ - const size_t block_size = DBMS_DEFAULT_HASHING_BLOCK_SIZE; - uint128 state(0, 0); - size_t pos; - - for (pos = 0; pos + block_size <= len; pos += block_size) - { - state = CityHash128WithSeed(data + pos, block_size, state); - } - - if (pos < len) - state = CityHash128WithSeed(data + pos, len - pos, state); - - return state; -} - +#include "hashing_buffer.h" void test(size_t data_size) {