pcg serialization

This commit is contained in:
Alexander Tokmakov 2020-11-09 16:07:38 +03:00
parent 67099f28ac
commit a06be511df
8 changed files with 71 additions and 25 deletions

View File

@ -113,6 +113,12 @@
#include "pcg_extras.hpp"
namespace DB
{
struct PcgSerializer;
struct PcgDeserializer;
}
namespace pcg_detail {
using namespace pcg_extras;
@ -557,6 +563,9 @@ public:
engine<xtype1, itype1,
output_mixin1, output_previous1,
stream_mixin1, multiplier_mixin1>& rng);
friend ::DB::PcgSerializer;
friend ::DB::PcgDeserializer;
};
template <typename CharT, typename Traits,

View File

@ -2,6 +2,9 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
@ -244,10 +247,9 @@ public:
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::writeIntBinary<size_t>(this->data(place).total_values, buf);
std::ostringstream rng_stream;
rng_stream.exceptions(std::ios::failbit);
rng_stream << this->data(place).rng;
DB::writeStringBinary(rng_stream.str(), buf);
WriteBufferFromOwnString rng_buf;
rng_buf << this->data(place).rng;
DB::writeStringBinary(rng_buf.str(), buf);
}
// TODO
@ -275,9 +277,8 @@ public:
DB::readIntBinary<size_t>(this->data(place).total_values, buf);
std::string rng_string;
DB::readStringBinary(rng_string, buf);
std::istringstream rng_stream(rng_string);
rng_stream.exceptions(std::ios::failbit);
rng_stream >> this->data(place).rng;
ReadBufferFromString rng_buf(rng_string);
rng_buf >> this->data(place).rng;
}
// TODO
@ -565,10 +566,9 @@ public:
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::writeIntBinary<size_t>(data(place).total_values, buf);
std::ostringstream rng_stream;
rng_stream.exceptions(std::ios::failbit);
rng_stream << data(place).rng;
DB::writeStringBinary(rng_stream.str(), buf);
WriteBufferFromOwnString rng_buf;
rng_buf << data(place).rng;
DB::writeStringBinary(rng_buf.str(), buf);
}
// TODO
@ -600,9 +600,8 @@ public:
DB::readIntBinary<size_t>(data(place).total_values, buf);
std::string rng_string;
DB::readStringBinary(rng_string, buf);
std::istringstream rng_stream(rng_string);
rng_stream.exceptions(std::ios::failbit);
rng_stream >> data(place).rng;
ReadBufferFromString rng_buf(rng_string);
rng_buf >> data(place).rng;
}
// TODO

View File

@ -8,6 +8,9 @@
#include <IO/ReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/PODArray.h>
#include <Common/NaNUtils.h>
#include <Poco/Exception.h>
@ -190,9 +193,8 @@ public:
std::string rng_string;
DB::readStringBinary(rng_string, buf);
std::istringstream rng_stream(rng_string);
rng_stream.exceptions(std::ios::failbit);
rng_stream >> rng;
DB::ReadBufferFromString rng_buf(rng_string);
rng_buf >> rng;
for (size_t i = 0; i < samples.size(); ++i)
DB::readBinary(samples[i], buf);
@ -205,10 +207,9 @@ public:
DB::writeIntBinary<size_t>(sample_count, buf);
DB::writeIntBinary<size_t>(total_values, buf);
std::ostringstream rng_stream;
rng_stream.exceptions(std::ios::failbit);
rng_stream << rng;
DB::writeStringBinary(rng_stream.str(), buf);
DB::WriteBufferFromOwnString rng_buf;
rng_buf << rng;
DB::writeStringBinary(rng_buf.str(), buf);
for (size_t i = 0; i < std::min(sample_count, total_values); ++i)
DB::writeBinary(samples[i], buf);

View File

@ -1,5 +1,6 @@
#include <Client/MultiplexedConnections.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h>
#include <Common/thread_local_rng.h>
@ -222,19 +223,18 @@ std::string MultiplexedConnections::dumpAddresses() const
std::string MultiplexedConnections::dumpAddressesUnlocked() const
{
bool is_first = true;
std::ostringstream os;
os.exceptions(std::ios::failbit);
WriteBufferFromOwnString buf;
for (const ReplicaState & state : replica_states)
{
const Connection * connection = state.connection;
if (connection)
{
os << (is_first ? "" : "; ") << connection->getDescription();
buf << (is_first ? "" : "; ") << connection->getDescription();
is_first = false;
}
}
return os.str();
return buf.str();
}
Packet MultiplexedConnections::receivePacketUnlocked()

View File

@ -46,6 +46,7 @@ template <typename T> WriteBuffer & operator<< (WriteBuffer & buf, const T &
/// If you do not use the manipulators, the string is displayed without an escape, as is.
template <> inline WriteBuffer & operator<< (WriteBuffer & buf, const String & x) { writeString(x, buf); return buf; }
template <> inline WriteBuffer & operator<< (WriteBuffer & buf, const char & x) { writeChar(x, buf); return buf; }
template <> inline WriteBuffer & operator<< (WriteBuffer & buf, const pcg32_fast & x) { PcgSerializer::serializePcg32(x, buf); return buf; }
inline WriteBuffer & operator<< (WriteBuffer & buf, const char * x) { writeCString(x, buf); return buf; }
@ -73,6 +74,7 @@ inline WriteBuffer & operator<< (WriteBuffer & buf, FlushManip) { buf.next(); re
template <typename T> ReadBuffer & operator>> (ReadBuffer & buf, T & x) { readText(x, buf); return buf; }
template <> inline ReadBuffer & operator>> (ReadBuffer & buf, String & x) { readString(x, buf); return buf; }
template <> inline ReadBuffer & operator>> (ReadBuffer & buf, char & x) { readChar(x, buf); return buf; }
template <> inline ReadBuffer & operator>> (ReadBuffer & buf, pcg32_fast & x) { PcgDeserializer::deserializePcg32(x, buf); return buf; }
/// If you specify a string literal for reading, this will mean - make sure there is a sequence of bytes and skip it.
inline ReadBuffer & operator>> (ReadBuffer & buf, const char * x) { assertString(x, buf); return buf; }

View File

@ -53,6 +53,7 @@ namespace ErrorCodes
extern const int CANNOT_READ_ARRAY_FROM_TEXT;
extern const int CANNOT_PARSE_NUMBER;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int INCORRECT_DATA;
}
/// Helper functions for formatted input.
@ -1228,4 +1229,23 @@ void saveUpToPosition(ReadBuffer & in, Memory<> & memory, char * current);
*/
bool loadAtPosition(ReadBuffer & in, Memory<> & memory, char * & current);
struct PcgDeserializer
{
static void deserializePcg32(const pcg32_fast & rng, ReadBuffer & buf)
{
decltype(rng.state_) multiplier, increment, state;
readText(multiplier, buf);
assertChar(' ', buf);
readText(increment, buf);
assertChar(' ', buf);
readText(state, buf);
if (multiplier != rng.multiplier())
throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect multiplier in pcg32: expected {}, got {}", rng.multiplier(), multiplier);
if (increment != rng.increment())
throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect increment in pcg32: expected {}, got {}", rng.increment(), increment);
}
};
}

View File

@ -1093,4 +1093,16 @@ writeBinaryBigEndian(T x, WriteBuffer & buf) /// Assuming little endian archi
writePODBinary(x, buf);
}
struct PcgSerializer
{
static void serializePcg32(const pcg32_fast & rng, WriteBuffer & buf)
{
writeText(rng.multiplier(), buf);
writeChar(' ', buf);
writeText(rng.increment(), buf);
writeChar(' ', buf);
writeText(rng.state_, buf);
}
};
}

View File

@ -105,3 +105,6 @@ find $ROOT_PATH/{src,base,programs,utils} -name '*.h' -or -name '*.cpp' | xargs
# Trailing whitespaces
find $ROOT_PATH/{src,base,programs,utils} -name '*.h' -or -name '*.cpp' | xargs grep -P ' $' | grep -P '.' && echo "^ Trailing whitespaces."
# Forbid stringstream because it's easy to use them incorrectly and hard to debug possible issues
find $ROOT_PATH/{src,base,programs,utils} -name '*.h' -or -name '*.cpp' | xargs grep 'std::ostringstream\|std::istringstream' && echo "Use WriteBufferFromString or ReadBufferFromString instead of std::ostringstream or std::istringstream"