mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-23 10:10:50 +00:00
Returned back Increment and CounterInFile, because they are still needed [#CLICKHOUSE-2].
This commit is contained in:
parent
bb41d47096
commit
a7b9a12759
184
dbms/src/Common/CounterInFile.h
Normal file
184
dbms/src/Common/CounterInFile.h
Normal file
@ -0,0 +1,184 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <sys/file.h>
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
#include <Poco/File.h>
|
||||||
|
#include <Poco/Exception.h>
|
||||||
|
#include <mutex>
|
||||||
|
#include <Poco/ScopedLock.h>
|
||||||
|
|
||||||
|
#include <Common/Exception.h>
|
||||||
|
#include <IO/ReadBufferFromFileDescriptor.h>
|
||||||
|
#include <IO/WriteBufferFromFileDescriptor.h>
|
||||||
|
#include <IO/ReadHelpers.h>
|
||||||
|
#include <IO/WriteHelpers.h>
|
||||||
|
|
||||||
|
#include <common/Types.h>
|
||||||
|
|
||||||
|
#define SMALL_READ_WRITE_BUFFER_SIZE 16
|
||||||
|
|
||||||
|
|
||||||
|
/** Stores a number in the file.
|
||||||
|
* Designed for rare calls (not designed for performance).
|
||||||
|
*/
|
||||||
|
class CounterInFile
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
/// path - the name of the file, including the path
|
||||||
|
CounterInFile(const std::string & path_) : path(path_) {}
|
||||||
|
|
||||||
|
/** Add `delta` to the number in the file and return the new value.
|
||||||
|
* If the `create_if_need` parameter is not set to true, then
|
||||||
|
* the file should already have a number written (if not - create the file manually with zero).
|
||||||
|
*
|
||||||
|
* To protect against race conditions between different processes, file locks are used.
|
||||||
|
* (But when the first file is created, the race condition is possible, so it's better to create the file in advance.)
|
||||||
|
*
|
||||||
|
* `locked_callback` is called when the counter file is locked. A new value is passed to it.
|
||||||
|
* `locked_callback` can be used to do something atomically with incrementing the counter (for example, renaming files).
|
||||||
|
*/
|
||||||
|
template <typename Callback>
|
||||||
|
Int64 add(Int64 delta, Callback && locked_callback, bool create_if_need = false)
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mutex);
|
||||||
|
|
||||||
|
Int64 res = -1;
|
||||||
|
|
||||||
|
bool file_doesnt_exists = !Poco::File(path).exists();
|
||||||
|
if (file_doesnt_exists && !create_if_need)
|
||||||
|
{
|
||||||
|
throw Poco::Exception("File " + path + " does not exist. "
|
||||||
|
"You must create it manulally with appropriate value or 0 for first start.");
|
||||||
|
}
|
||||||
|
|
||||||
|
int fd = open(path.c_str(), O_RDWR | O_CREAT, 0666);
|
||||||
|
if (-1 == fd)
|
||||||
|
DB::throwFromErrno("Cannot open file " + path);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
int flock_ret = flock(fd, LOCK_EX);
|
||||||
|
if (-1 == flock_ret)
|
||||||
|
DB::throwFromErrno("Cannot lock file " + path);
|
||||||
|
|
||||||
|
if (!file_doesnt_exists)
|
||||||
|
{
|
||||||
|
DB::ReadBufferFromFileDescriptor rb(fd, SMALL_READ_WRITE_BUFFER_SIZE);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
DB::readIntText(res, rb);
|
||||||
|
}
|
||||||
|
catch (const DB::Exception & e)
|
||||||
|
{
|
||||||
|
/// A more understandable error message.
|
||||||
|
if (e.code() == DB::ErrorCodes::CANNOT_READ_ALL_DATA || e.code() == DB::ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
|
||||||
|
throw DB::Exception("File " + path + " is empty. You must fill it manually with appropriate value.", e.code());
|
||||||
|
else
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
res = 0;
|
||||||
|
|
||||||
|
if (delta || file_doesnt_exists)
|
||||||
|
{
|
||||||
|
res += delta;
|
||||||
|
|
||||||
|
DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE);
|
||||||
|
wb.seek(0);
|
||||||
|
wb.truncate();
|
||||||
|
DB::writeIntText(res, wb);
|
||||||
|
DB::writeChar('\n', wb);
|
||||||
|
wb.sync();
|
||||||
|
}
|
||||||
|
|
||||||
|
locked_callback(res);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
close(fd);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
close(fd);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
Int64 add(Int64 delta, bool create_if_need = false)
|
||||||
|
{
|
||||||
|
return add(delta, [](UInt64){}, create_if_need);
|
||||||
|
}
|
||||||
|
|
||||||
|
const std::string & getPath() const
|
||||||
|
{
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Change the path to the file.
|
||||||
|
void setPath(std::string path_)
|
||||||
|
{
|
||||||
|
path = path_;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Not thread-safe and not synchronized between processes.
|
||||||
|
void fixIfBroken(UInt64 value)
|
||||||
|
{
|
||||||
|
bool file_exists = Poco::File(path).exists();
|
||||||
|
|
||||||
|
int fd = open(path.c_str(), O_RDWR | O_CREAT, 0666);
|
||||||
|
if (-1 == fd)
|
||||||
|
DB::throwFromErrno("Cannot open file " + path);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
bool broken = true;
|
||||||
|
|
||||||
|
if (file_exists)
|
||||||
|
{
|
||||||
|
DB::ReadBufferFromFileDescriptor rb(fd, SMALL_READ_WRITE_BUFFER_SIZE);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
UInt64 current_value;
|
||||||
|
DB::readIntText(current_value, rb);
|
||||||
|
char c;
|
||||||
|
DB::readChar(c, rb);
|
||||||
|
if (rb.count() > 0 && c == '\n' && rb.eof())
|
||||||
|
broken = false;
|
||||||
|
}
|
||||||
|
catch (const DB::Exception & e)
|
||||||
|
{
|
||||||
|
if (e.code() != DB::ErrorCodes::CANNOT_READ_ALL_DATA && e.code() != DB::ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (broken)
|
||||||
|
{
|
||||||
|
DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE);
|
||||||
|
wb.seek(0);
|
||||||
|
wb.truncate();
|
||||||
|
DB::writeIntText(value, wb);
|
||||||
|
DB::writeChar('\n', wb);
|
||||||
|
wb.sync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
close(fd);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
close(fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::string path;
|
||||||
|
std::mutex mutex;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
#undef SMALL_READ_WRITE_BUFFER_SIZE
|
87
dbms/src/Common/Increment.h
Normal file
87
dbms/src/Common/Increment.h
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <Common/CounterInFile.h>
|
||||||
|
|
||||||
|
|
||||||
|
/** Allows to get an auto-increment number, storing it in a file.
|
||||||
|
* Intended for rare calls (not designed for performance).
|
||||||
|
*/
|
||||||
|
class Increment
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
/// path - the name of the file, including the path
|
||||||
|
Increment(const std::string & path_) : counter(path_) {}
|
||||||
|
|
||||||
|
/** Get the next number.
|
||||||
|
* If the `create_if_need` parameter is not set to true, then
|
||||||
|
* the file must already have a number written (if not - create the file manually with zero).
|
||||||
|
*
|
||||||
|
* To protect against race conditions between different processes, file locks are used.
|
||||||
|
* (But when the first file is created, the race condition is possible, so it's better to create the file in advance.)
|
||||||
|
*
|
||||||
|
* `locked_callback` is called when the counter file is locked. A new value is passed to it.
|
||||||
|
* `locked_callback` can be used to do something atomically with the increment of the counter (for example, rename files).
|
||||||
|
*/
|
||||||
|
template <typename Callback>
|
||||||
|
UInt64 get(Callback && locked_callback, bool create_if_need = false)
|
||||||
|
{
|
||||||
|
return static_cast<UInt64>(counter.add(1, std::forward<Callback>(locked_callback), create_if_need));
|
||||||
|
}
|
||||||
|
|
||||||
|
UInt64 get(bool create_if_need = false)
|
||||||
|
{
|
||||||
|
return getBunch(1, create_if_need);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Peek the next value.
|
||||||
|
UInt64 peek(bool create_if_need = false)
|
||||||
|
{
|
||||||
|
return getBunch(0, create_if_need);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Get the next number and increase the counter by `count`.
|
||||||
|
* If the `create_if_need` parameter is not set to true, then
|
||||||
|
* the file should already have a number written (if not - create the file manually with zero).
|
||||||
|
*
|
||||||
|
* To protect against race conditions between different processes, file locks are used.
|
||||||
|
* (But when the first file is created, the race condition is possible, so it's better to create the file in advance.)
|
||||||
|
*/
|
||||||
|
UInt64 getBunch(UInt64 count, bool create_if_need = false)
|
||||||
|
{
|
||||||
|
return static_cast<UInt64>(counter.add(static_cast<Int64>(count), create_if_need) - count + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Change the path to the file.
|
||||||
|
void setPath(std::string path_)
|
||||||
|
{
|
||||||
|
counter.setPath(path_);
|
||||||
|
}
|
||||||
|
|
||||||
|
void fixIfBroken(UInt64 value)
|
||||||
|
{
|
||||||
|
counter.fixIfBroken(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
CounterInFile counter;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/** The same, but without storing it in a file.
|
||||||
|
*/
|
||||||
|
struct SimpleIncrement : private boost::noncopyable
|
||||||
|
{
|
||||||
|
std::atomic<UInt64> value;
|
||||||
|
|
||||||
|
SimpleIncrement(UInt64 start = 0) : value(start) {}
|
||||||
|
|
||||||
|
void set(UInt64 new_value)
|
||||||
|
{
|
||||||
|
value = new_value;
|
||||||
|
}
|
||||||
|
|
||||||
|
UInt64 get()
|
||||||
|
{
|
||||||
|
return ++value;
|
||||||
|
}
|
||||||
|
};
|
@ -22,6 +22,7 @@
|
|||||||
#include <Functions/FunctionFactory.h>
|
#include <Functions/FunctionFactory.h>
|
||||||
#include <Functions/IFunction.h>
|
#include <Functions/IFunction.h>
|
||||||
#include <Poco/DirectoryIterator.h>
|
#include <Poco/DirectoryIterator.h>
|
||||||
|
#include <Common/Increment.h>
|
||||||
#include <Common/SimpleIncrement.h>
|
#include <Common/SimpleIncrement.h>
|
||||||
#include <Common/escapeForFileName.h>
|
#include <Common/escapeForFileName.h>
|
||||||
#include <Common/StringUtils.h>
|
#include <Common/StringUtils.h>
|
||||||
|
Loading…
Reference in New Issue
Block a user