#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; } /// Model is used to transform columns with source data to columns /// with similar by structure and by probability distributions but anonymized data. class IModel { public: /// Call train iteratively for each block to train a model. virtual void train(const IColumn & column); /// Call finalize one time after training before generating. virtual void finalize(); /// Call generate: pass source data column to obtain a column with anonymized data as a result. virtual ColumnPtr generate(const IColumn & column); virtual ~IModel() {} }; using ModelPtr = std::unique_ptr; template UInt64 hash(Ts... xs) { SipHash hash; (hash.update(xs), ...); return hash.get64(); } UInt64 maskBits(UInt64 x, size_t num_bits) { return x & ((1 << num_bits) - 1); } /// Apply Feistel network round to least significant num_bits part of x. UInt64 feistelRound(UInt64 x, size_t num_bits, UInt64 seed, size_t round) { size_t num_bits_left_half = num_bits / 2; size_t num_bits_right_half = num_bits - num_bits_left_half; UInt64 left_half = maskBits(x >> num_bits_right_half, num_bits_left_half); UInt64 right_half = maskBits(x, num_bits_right_half); UInt64 new_left_half = right_half; UInt64 new_right_half = left_half ^ maskBits(hash(right_half, seed, round), num_bits_left_half); return (new_left_half << num_bits_left_half) ^ new_right_half; } /// Apply Feistel network with num_rounds to least significant num_bits part of x. UInt64 feistelNetwork(UInt64 x, size_t num_bits, UInt64 seed, size_t num_rounds = 4) { UInt64 bits = maskBits(x, num_bits); for (size_t i = 0; i < num_rounds; ++i) bits = feistelRound(bits, num_bits, seed, i); return (x & ~((1 << num_bits) - 1)) ^ bits; } /// Pseudorandom permutation within set of numbers with the same log2(x). UInt64 transform(UInt64 x, UInt64 seed) { /// Keep 0 and 1 as is. if (x == 0 || x == 1) return x; /// Pseudorandom permutation of two elements. if (x == 2 || x == 3) return x ^ (seed & 1); size_t num_leading_zeros = __builtin_clzll(x); return feistelNetwork(x, 64 - num_leading_zeros - 1, seed); } class UnsignedIntegerModel : public IModel { private: const UInt64 seed; public: UnsignedIntegerModel(UInt64 seed) : seed(seed) {} void train(const IColumn &) override {} void finalize() override {} ColumnPtr generate(const IColumn & column) override { MutableColumnPtr res = column.cloneEmpty(); size_t size = column.size(); res->reserve(size); for (size_t i = 0; i < size; ++i) res->insert(transform(column.getUInt(i), seed)); return res; } }; /// Keep sign and apply pseudorandom permutation after converting to unsigned as above. Int64 transformSigned(Int64 x, UInt64 seed) { if (x >= 0) return transform(x, seed); else return -transform(-x, seed); /// It works Ok even for minimum signed number. } class SignedIntegerModel : public IModel { private: const UInt64 seed; public: SignedIntegerModel(UInt64 seed) : seed(seed) {} void train(const IColumn &) override {} void finalize() override {} ColumnPtr generate(const IColumn & column) override { MutableColumnPtr res = column.cloneEmpty(); size_t size = column.size(); res->reserve(size); for (size_t i = 0; i < size; ++i) res->insert(transformSigned(column.getInt(i), seed)); return res; } }; /// Pseudorandom permutation of mantissa. template Float transformFloatMantissa(Float x, UInt64 seed) { using UInt = std::conditional_t, UInt32, UInt64>; constexpr size_t mantissa_num_bits = std::is_same_v ? 23 : 52; UInt x_uint = ext::bit_cast(x); x_uint = feistelNetwork(x_uint, mantissa_num_bits, seed); return ext::bit_cast(x_uint); } /// Transform difference from previous number by applying pseudorandom permutation to mantissa part of it. /// It allows to retain some continuouty property of source data. template class FloatModel : public IModel { private: const UInt64 seed; Float src_prev_value = 0; Float res_prev_value = 0; public: FloatModel(UInt64 seed) : seed(seed) {} void train(const IColumn &) override {} void finalize() override {} ColumnPtr generate(const IColumn & column) override { const auto & src_data = static_cast &>(column).getData(); size_t size = src_data.size(); auto res_column = ColumnVector::create(size); auto & res_data = static_cast &>(*res_column).getData(); for (size_t i = 0; i < size; ++i) { res_data[i] = res_prev_value + transformFloatMantissa(src_data[i] - src_prev_value, seed); src_prev_value = src_data[i]; res_prev_value = res_data[i]; } return res_column; } }; /// Leave all data as is. For example, it is used for columns of type Date. class IdentityModel : public IModel { public: void train(const IColumn &) override {} void finalize() override {} ColumnPtr generate(const IColumn & column) override { return column.cloneResized(column.size()); } }; /// Just pseudorandom function. void transformFixedString(const UInt8 * src, UInt8 * dst, size_t size, UInt64 seed) { { SipHash hash; hash.update(seed); hash.update(reinterpret_cast(src), size); seed = hash.get64(); } UInt8 * pos = dst; UInt8 * end = dst + size; size_t i = 0; while (pos < end) { SipHash hash; hash.update(seed); hash.update(i); char * dst = reinterpret_cast(std::min(pos, end - 16)); hash.get128(dst); pos += 16; ++i; } } class FixedStringModel : public IModel { private: const UInt64 seed; public: FixedStringModel(UInt64 seed) : seed(seed) {} void train(const IColumn &) override {} void finalize() override {} ColumnPtr generate(const IColumn & column) override { const ColumnFixedString & column_fixed_string = static_cast(column); const size_t string_size = column_fixed_string.getN(); const auto & src_data = column_fixed_string.getChars(); size_t size = column_fixed_string.size(); auto res_column = ColumnFixedString::create(string_size); auto & res_data = res_column->getChars(); res_data.resize(src_data.size()); for (size_t i = 0; i < size; ++i) transformFixedString(&src_data[i * string_size], &res_data[i * string_size], string_size, seed); return res_column; } }; /// Leave date part as is and apply pseudorandom permutation to time difference with previous value within the same log2 class. class DateTimeModel : public IModel { private: const UInt64 seed; UInt32 src_prev_value = 0; UInt32 res_prev_value = 0; const DateLUTImpl & date_lut; public: DateTimeModel(UInt64 seed) : seed(seed), date_lut(DateLUT::instance()) {} void train(const IColumn &) override {} void finalize() override {} ColumnPtr generate(const IColumn & column) override { const auto & src_data = static_cast &>(column).getData(); size_t size = src_data.size(); auto res_column = ColumnVector::create(size); auto & res_data = static_cast &>(*res_column).getData(); for (size_t i = 0; i < size; ++i) { UInt32 src_time = src_data[i]; UInt32 src_date = date_lut.toDate(src_time); Int32 src_diff = src_time - src_prev_value; Int32 res_diff = transform(src_diff, seed); UInt32 new_time = res_prev_value + res_diff; res_data[i] = src_date + new_time % 86400; /// Don't care about tz changes and daylight saving time. src_prev_value = src_time; res_prev_value = res_data[i]; } return res_column; } }; class MarkovModel { private: using CodePoint = UInt32; using NGramHash = UInt32; struct Bucket { CodePoint code; UInt64 count; Bucket(CodePoint code) : code(code), count(1) {} }; struct Histogram { UInt64 total = 0; /// Not including count_end. UInt64 count_end = 0; std::vector data; void add(CodePoint code) { ++total; for (auto & elem : data) { if (elem.code == code) { ++elem.count; return; } } data.emplace_back(code); } void addEnd() { ++count_end; } CodePoint sample(UInt64 random, double end_multiplier) const { UInt64 range = total + UInt64(count_end * end_multiplier); if (range == 0) return END; random %= range; UInt64 sum = 0; for (const auto & elem : data) { sum += elem.count; if (sum > random) return elem.code; } return END; } }; using Table = HashMap; Table table; size_t order; size_t frequency_cutoff; std::vector code_points; static constexpr CodePoint BEGIN = -1; static constexpr CodePoint END = -2; NGramHash hashContext(const CodePoint * begin, const CodePoint * end) const { return CRC32Hash()(StringRef(reinterpret_cast(begin), (end - begin) * sizeof(CodePoint))); } /// By the way, we don't have to use actual Unicode numbers. We use just arbitary bijective mapping. CodePoint readCodePoint(const char *& pos, const char * end) { size_t length = UTF8::seqLength(*pos); if (pos + length > end) length = end - pos; CodePoint res = 0; memcpy(&res, pos, length); pos += length; return res; } bool writeCodePoint(CodePoint code, char *& pos, char * end) { size_t length = (code & 0xFF000000) ? 4 : (code & 0xFFFF0000) ? 3 : (code & 0xFFFFFF00) ? 2 : 1; if (pos + length > end) return false; memcpy(pos, &code, length); pos += length; return true; } public: explicit MarkovModel(size_t order, size_t frequency_cutoff) : order(order), frequency_cutoff(frequency_cutoff), code_points(order, BEGIN) {} void consume(const char * data, size_t size) { code_points.resize(order); const char * pos = data; const char * end = data + size; while (true) { bool inside = pos < end; CodePoint next_code_point; if (inside) next_code_point = readCodePoint(pos, end); for (size_t context_size = 0; context_size < order; ++context_size) { NGramHash context_hash = hashContext(code_points.data() + code_points.size() - context_size, code_points.data() + code_points.size()); if (inside) table[context_hash].add(next_code_point); else /// if (context_size != 0 || order == 0) /// Don't allow to break string without context (except order-0 model). table[context_hash].addEnd(); } if (inside) code_points.push_back(next_code_point); else break; } } void finalize() { if (frequency_cutoff == 0) return; // size_t total_buckets = 0; // size_t erased_buckets = 0; for (auto & elem : table) { Histogram & histogram = elem.second; // total_buckets += histogram.data.size(); if (histogram.total + histogram.count_end < frequency_cutoff) { // erased_buckets += histogram.data.size(); histogram.data.clear(); histogram.total = 0; } else { auto erased = std::remove_if(histogram.data.begin(), histogram.data.end(), [frequency_cutoff=frequency_cutoff](const Bucket & bucket) { return bucket.count < frequency_cutoff; }); UInt64 erased_count = 0; for (auto it = erased; it < histogram.data.end(); ++it) erased_count += it->count; // erased_buckets += histogram.data.end() - erased; histogram.data.erase(erased, histogram.data.end()); histogram.total -= erased_count; } } // std::cerr << "Erased " << erased_buckets << " out of " << total_buckets << " buckets\n"; } size_t generate(char * data, size_t desired_size, size_t buffer_size, UInt64 seed, const char * determinator_data, size_t determinator_size) { code_points.resize(order); char * pos = data; char * end = data + buffer_size; while (pos < end) { Table::iterator it = table.end(); size_t context_size = order; while (true) { it = table.find(hashContext(code_points.data() + code_points.size() - context_size, code_points.data() + code_points.size())); if (table.end() != it && it->second.total + it->second.count_end != 0) break; if (context_size == 0) break; --context_size; } if (table.end() == it) throw Exception("Logical error in markov model"); size_t offset_from_begin_of_string = pos - data; constexpr size_t determinator_sliding_window_size = 8; size_t determinator_sliding_window_overflow = offset_from_begin_of_string + determinator_sliding_window_size > determinator_size ? offset_from_begin_of_string + determinator_sliding_window_size - determinator_size : 0; const char * determinator_sliding_window_begin = determinator_data + offset_from_begin_of_string - determinator_sliding_window_overflow; SipHash hash; hash.update(seed); hash.update(determinator_sliding_window_begin, determinator_sliding_window_size); hash.update(determinator_sliding_window_overflow); UInt64 determinator = hash.get64(); /// If string is greater than desired_size, increase probability of end. double end_probability_multiplier = 0; Int64 num_bytes_after_desired_size = (pos - data) - desired_size; if (num_bytes_after_desired_size) end_probability_multiplier = std::pow(1.25, num_bytes_after_desired_size); CodePoint code = it->second.sample(determinator, end_probability_multiplier); if (code == END) break; if (!writeCodePoint(code, pos, end)) break; code_points.push_back(code); } return pos - data; } }; /// Generate length of strings as above. /// To generate content of strings, use /// order-N Markov model on Unicode code points, /// and to generate next code point use deterministic RNG /// determined by hash of 8-byte sliding window of source string. /// This is intended to generate locally-similar strings from locally-similar sources. class StringModel : public IModel { private: UInt64 seed; MarkovModel markov_model; public: StringModel(UInt64 seed, UInt8 order, UInt64 frequency_cutoff) : seed(seed), markov_model(order, frequency_cutoff) {} void train(const IColumn & column) override { const ColumnString & column_string = static_cast(column); size_t size = column_string.size(); for (size_t i = 0; i < size; ++i) { StringRef string = column_string.getDataAt(i); markov_model.consume(string.data, string.size); } } void finalize() override { markov_model.finalize(); } ColumnPtr generate(const IColumn & column) override { const ColumnString & column_string = static_cast(column); size_t size = column_string.size(); auto res_column = ColumnString::create(); res_column->reserve(size); std::string new_string; for (size_t i = 0; i < size; ++i) { StringRef src_string = column_string.getDataAt(i); size_t desired_string_size = transform(src_string.size, seed); new_string.resize(desired_string_size * 2); size_t actual_size = 0; if (desired_string_size != 0) actual_size = markov_model.generate(new_string.data(), desired_string_size, new_string.size(), seed, src_string.data, src_string.size); res_column->insertData(new_string.data(), actual_size); } return res_column; } }; class ArrayModel : public IModel { private: ModelPtr nested_model; public: ArrayModel(ModelPtr nested_model) : nested_model(std::move(nested_model)) {} void train(const IColumn & column) override { const ColumnArray & column_array = static_cast(column); const IColumn & nested_column = column_array.getData(); nested_model->train(nested_column); } void finalize() override { nested_model->finalize(); } ColumnPtr generate(const IColumn & column) override { const ColumnArray & column_array = static_cast(column); const IColumn & nested_column = column_array.getData(); ColumnPtr new_nested_column = nested_model->generate(nested_column); return ColumnArray::create((*std::move(new_nested_column)).mutate(), (*std::move(column_array.getOffsetsPtr())).mutate()); } }; class NullableModel : public IModel { private: ModelPtr nested_model; public: NullableModel(ModelPtr nested_model) : nested_model(std::move(nested_model)) {} void train(const IColumn & column) override { const ColumnNullable & column_nullable = static_cast(column); const IColumn & nested_column = column_nullable.getNestedColumn(); nested_model->train(nested_column); } void finalize() override { nested_model->finalize(); } ColumnPtr generate(const IColumn & column) override { const ColumnNullable & column_nullable = static_cast(column); const IColumn & nested_column = column_nullable.getNestedColumn(); ColumnPtr new_nested_column = nested_model->generate(nested_column); return ColumnNullable::create((*std::move(new_nested_column)).mutate(), (*std::move(column_nullable.getNullMapColumnPtr())).mutate()); } }; class ModelFactory { public: ModelPtr get(const IDataType & data_type, UInt64 seed, UInt8 markov_model_order, UInt64 frequency_cutoff) const { if (data_type.isInteger()) { if (data_type.isUnsignedInteger()) return std::make_unique(seed); else return std::make_unique(seed); } if (typeid_cast(&data_type)) return std::make_unique>(seed); if (typeid_cast(&data_type)) return std::make_unique>(seed); if (typeid_cast(&data_type)) return std::make_unique(); if (typeid_cast(&data_type)) return std::make_unique(seed); if (typeid_cast(&data_type)) return std::make_unique(seed, markov_model_order, frequency_cutoff); if (typeid_cast(&data_type)) return std::make_unique(seed); if (auto type = typeid_cast(&data_type)) return std::make_unique(get(*type->getNestedType(), seed, markov_model_order, frequency_cutoff)); if (auto type = typeid_cast(&data_type)) return std::make_unique(get(*type->getNestedType(), seed, markov_model_order, frequency_cutoff)); throw Exception("Unsupported data type"); } }; class Anonymizer { private: std::vector models; public: Anonymizer(const Block & header, UInt64 seed, UInt8 markov_model_order, UInt64 frequency_cutoff) { ModelFactory factory; size_t columns = header.columns(); models.reserve(columns); for (size_t i = 0; i < columns; ++i) models.emplace_back(factory.get(*header.getByPosition(i).type, hash(seed, i), markov_model_order, frequency_cutoff)); } void train(const Columns & columns) { size_t size = columns.size(); for (size_t i = 0; i < size; ++i) models[i]->train(*columns[i]); } void finalize() { for (auto & model : models) model->finalize(); } Columns generate(const Columns & columns) { size_t size = columns.size(); Columns res(size); for (size_t i = 0; i < size; ++i) res[i] = models[i]->generate(*columns[i]); return res; } }; } int main(int argc, char ** argv) try { using namespace DB; namespace po = boost::program_options; po::options_description description("Main options"); description.add_options() ("help", "produce help message") ("structure,S", po::value(), "structure of the initial table (list of column and type names)") ("input-format", po::value(), "input format of the initial table data") ("output-format", po::value(), "default output format") ("seed", po::value(), "seed (arbitary string), must be random string with at least 10 bytes length") ("order", po::value()->default_value(5), "order of markov model to generate strings") ("cutoff", po::value()->default_value(5), "frequency cutoff for markov model") ; po::parsed_options parsed = po::command_line_parser(argc, argv).options(description).run(); po::variables_map options; po::store(parsed, options); if (options.count("help")) { /// TODO return 0; } UInt64 seed = sipHash64(options["seed"].as()); std::string structure = options["structure"].as(); std::string input_format = options["input-format"].as(); std::string output_format = options["output-format"].as(); UInt64 markov_model_order = options["order"].as(); UInt64 frequency_cutoff = options["cutoff"].as(); // Create header block std::vector structure_vals; boost::split(structure_vals, structure, boost::algorithm::is_any_of(" ,"), boost::algorithm::token_compress_on); if (structure_vals.size() % 2 != 0) throw Exception("Odd number of elements in section structure: must be a list of name type pairs", ErrorCodes::LOGICAL_ERROR); Block header; const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); for (size_t i = 0, size = structure_vals.size(); i < size; i += 2) { ColumnWithTypeAndName column; column.name = structure_vals[i]; column.type = data_type_factory.get(structure_vals[i + 1]); column.column = column.type->createColumn(); header.insert(std::move(column)); } Context context = Context::createGlobal(); /// stdin must be seekable ReadBufferFromFileDescriptor file_in(STDIN_FILENO); WriteBufferFromFileDescriptor file_out(STDOUT_FILENO); Anonymizer anonymizer(header, seed, markov_model_order, frequency_cutoff); size_t max_block_size = 8192; /// Train step std::cerr << "Training models\n"; { BlockInputStreamPtr input = context.getInputFormat(input_format, file_in, header, max_block_size); input->readPrefix(); while (Block block = input->read()) anonymizer.train(block.getColumns()); input->readSuffix(); } anonymizer.finalize(); /// Generation step std::cerr << "Generating data\n"; { file_in.seek(0); BlockInputStreamPtr input = context.getInputFormat(input_format, file_in, header, max_block_size); BlockOutputStreamPtr output = context.getOutputFormat(output_format, file_out, header); input->readPrefix(); output->writePrefix(); while (Block block = input->read()) { Columns columns = anonymizer.generate(block.getColumns()); output->write(header.cloneWithColumns(columns)); } output->writeSuffix(); input->readSuffix(); } return 0; } catch (...) { std::cerr << DB::getCurrentExceptionMessage(true) << "\n"; auto code = DB::getCurrentExceptionCode(); return code ? code : 1; }