From a7b9a1275983cc6b9d4b57846803d2fb0ba24390 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 10 May 2017 04:08:32 -0400 Subject: [PATCH] Returned back Increment and CounterInFile, because they are still needed [#CLICKHOUSE-2]. --- dbms/src/Common/CounterInFile.h | 184 ++++++++++++++++++ dbms/src/Common/Increment.h | 87 +++++++++ dbms/src/Storages/MergeTree/MergeTreeData.cpp | 1 + 3 files changed, 272 insertions(+) create mode 100644 dbms/src/Common/CounterInFile.h create mode 100644 dbms/src/Common/Increment.h diff --git a/dbms/src/Common/CounterInFile.h b/dbms/src/Common/CounterInFile.h new file mode 100644 index 00000000000..99320a1fbfd --- /dev/null +++ b/dbms/src/Common/CounterInFile.h @@ -0,0 +1,184 @@ +#pragma once + +#include +#include + +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#define SMALL_READ_WRITE_BUFFER_SIZE 16 + + +/** Stores a number in the file. + * Designed for rare calls (not designed for performance). + */ +class CounterInFile +{ +public: + /// path - the name of the file, including the path + CounterInFile(const std::string & path_) : path(path_) {} + + /** Add `delta` to the number in the file and return the new value. + * If the `create_if_need` parameter is not set to true, then + * the file should already have a number written (if not - create the file manually with zero). + * + * To protect against race conditions between different processes, file locks are used. + * (But when the first file is created, the race condition is possible, so it's better to create the file in advance.) + * + * `locked_callback` is called when the counter file is locked. A new value is passed to it. + * `locked_callback` can be used to do something atomically with incrementing the counter (for example, renaming files). + */ + template + Int64 add(Int64 delta, Callback && locked_callback, bool create_if_need = false) + { + std::lock_guard lock(mutex); + + Int64 res = -1; + + bool file_doesnt_exists = !Poco::File(path).exists(); + if (file_doesnt_exists && !create_if_need) + { + throw Poco::Exception("File " + path + " does not exist. " + "You must create it manulally with appropriate value or 0 for first start."); + } + + int fd = open(path.c_str(), O_RDWR | O_CREAT, 0666); + if (-1 == fd) + DB::throwFromErrno("Cannot open file " + path); + + try + { + int flock_ret = flock(fd, LOCK_EX); + if (-1 == flock_ret) + DB::throwFromErrno("Cannot lock file " + path); + + if (!file_doesnt_exists) + { + DB::ReadBufferFromFileDescriptor rb(fd, SMALL_READ_WRITE_BUFFER_SIZE); + try + { + DB::readIntText(res, rb); + } + catch (const DB::Exception & e) + { + /// A more understandable error message. + if (e.code() == DB::ErrorCodes::CANNOT_READ_ALL_DATA || e.code() == DB::ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF) + throw DB::Exception("File " + path + " is empty. You must fill it manually with appropriate value.", e.code()); + else + throw; + } + } + else + res = 0; + + if (delta || file_doesnt_exists) + { + res += delta; + + DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE); + wb.seek(0); + wb.truncate(); + DB::writeIntText(res, wb); + DB::writeChar('\n', wb); + wb.sync(); + } + + locked_callback(res); + } + catch (...) + { + close(fd); + throw; + } + + close(fd); + return res; + } + + Int64 add(Int64 delta, bool create_if_need = false) + { + return add(delta, [](UInt64){}, create_if_need); + } + + const std::string & getPath() const + { + return path; + } + + /// Change the path to the file. + void setPath(std::string path_) + { + path = path_; + } + + // Not thread-safe and not synchronized between processes. + void fixIfBroken(UInt64 value) + { + bool file_exists = Poco::File(path).exists(); + + int fd = open(path.c_str(), O_RDWR | O_CREAT, 0666); + if (-1 == fd) + DB::throwFromErrno("Cannot open file " + path); + + try + { + bool broken = true; + + if (file_exists) + { + DB::ReadBufferFromFileDescriptor rb(fd, SMALL_READ_WRITE_BUFFER_SIZE); + try + { + UInt64 current_value; + DB::readIntText(current_value, rb); + char c; + DB::readChar(c, rb); + if (rb.count() > 0 && c == '\n' && rb.eof()) + broken = false; + } + catch (const DB::Exception & e) + { + if (e.code() != DB::ErrorCodes::CANNOT_READ_ALL_DATA && e.code() != DB::ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF) + throw; + } + } + + if (broken) + { + DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE); + wb.seek(0); + wb.truncate(); + DB::writeIntText(value, wb); + DB::writeChar('\n', wb); + wb.sync(); + } + } + catch (...) + { + close(fd); + throw; + } + + close(fd); + } + +private: + std::string path; + std::mutex mutex; +}; + + +#undef SMALL_READ_WRITE_BUFFER_SIZE diff --git a/dbms/src/Common/Increment.h b/dbms/src/Common/Increment.h new file mode 100644 index 00000000000..c03c6ef5575 --- /dev/null +++ b/dbms/src/Common/Increment.h @@ -0,0 +1,87 @@ +#pragma once + +#include + + +/** Allows to get an auto-increment number, storing it in a file. + * Intended for rare calls (not designed for performance). + */ +class Increment +{ +public: + /// path - the name of the file, including the path + Increment(const std::string & path_) : counter(path_) {} + + /** Get the next number. + * If the `create_if_need` parameter is not set to true, then + * the file must already have a number written (if not - create the file manually with zero). + * + * To protect against race conditions between different processes, file locks are used. + * (But when the first file is created, the race condition is possible, so it's better to create the file in advance.) + * + * `locked_callback` is called when the counter file is locked. A new value is passed to it. + * `locked_callback` can be used to do something atomically with the increment of the counter (for example, rename files). + */ + template + UInt64 get(Callback && locked_callback, bool create_if_need = false) + { + return static_cast(counter.add(1, std::forward(locked_callback), create_if_need)); + } + + UInt64 get(bool create_if_need = false) + { + return getBunch(1, create_if_need); + } + + /// Peek the next value. + UInt64 peek(bool create_if_need = false) + { + return getBunch(0, create_if_need); + } + + /** Get the next number and increase the counter by `count`. + * If the `create_if_need` parameter is not set to true, then + * the file should already have a number written (if not - create the file manually with zero). + * + * To protect against race conditions between different processes, file locks are used. + * (But when the first file is created, the race condition is possible, so it's better to create the file in advance.) + */ + UInt64 getBunch(UInt64 count, bool create_if_need = false) + { + return static_cast(counter.add(static_cast(count), create_if_need) - count + 1); + } + + /// Change the path to the file. + void setPath(std::string path_) + { + counter.setPath(path_); + } + + void fixIfBroken(UInt64 value) + { + counter.fixIfBroken(value); + } + +private: + CounterInFile counter; +}; + + +/** The same, but without storing it in a file. + */ +struct SimpleIncrement : private boost::noncopyable +{ + std::atomic value; + + SimpleIncrement(UInt64 start = 0) : value(start) {} + + void set(UInt64 new_value) + { + value = new_value; + } + + UInt64 get() + { + return ++value; + } +}; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index bc16caa2ba5..1662a0f208f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include