ClickHouse/src/Storages/FileLog/ReadBufferFromFileLog.h

103 lines
2.0 KiB
C++
Raw Normal View History

2021-06-09 02:03:36 +00:00
#pragma once
#include <Core/BackgroundSchedulePool.h>
#include <Core/Names.h>
#include <IO/ReadBuffer.h>
2021-07-03 17:14:56 +00:00
#include <Storages/FileLog/FileLogDirectoryWatcher.h>
2021-06-09 02:03:36 +00:00
#include <common/types.h>
#include <fstream>
#include <mutex>
#include <unordered_map>
namespace Poco
{
class Logger;
}
namespace DB
{
class ReadBufferFromFileLog : public ReadBuffer
{
public:
2021-07-03 17:14:56 +00:00
ReadBufferFromFileLog(const String & path_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, ContextPtr context_);
2021-06-09 02:03:36 +00:00
~ReadBufferFromFileLog() override = default;
void open();
void close();
auto pollTimeout() const { return poll_timeout; }
inline bool hasMorePolledRecords() const { return current != records.end(); }
bool poll();
2021-07-04 06:16:40 +00:00
bool isStalled() { return buffer_status != BufferStatus::NOT_STALLED; }
2021-06-09 02:03:36 +00:00
private:
2021-07-04 06:16:40 +00:00
enum class BufferStatus
{
NO_RECORD_RETURNED,
NOT_STALLED,
};
2021-06-09 02:03:36 +00:00
enum class FileStatus
{
BEGIN,
NO_CHANGE,
UPDATED,
REMOVED
};
struct FileContext
{
FileStatus status = FileStatus::BEGIN;
std::ifstream reader;
};
2021-07-04 06:16:40 +00:00
BufferStatus buffer_status;
2021-07-03 17:14:56 +00:00
const String path;
2021-07-04 06:16:40 +00:00
bool path_is_directory = false;
2021-06-09 02:03:36 +00:00
Poco::Logger * log;
const size_t batch_size = 1;
const size_t poll_timeout = 0;
bool time_out = false;
using NameToFile = std::unordered_map<String, FileContext>;
NameToFile file_status;
2021-07-03 17:14:56 +00:00
std::mutex status_mutex;
2021-06-09 02:03:36 +00:00
ContextPtr context;
bool allowed = true;
using Record = std::string;
using Records = std::vector<Record>;
Records records;
Records::const_iterator current;
using TaskThread = BackgroundSchedulePool::TaskHolder;
TaskThread wait_task;
TaskThread select_task;
Records pollBatch(size_t batch_size_);
void readNewRecords(Records & new_records, size_t batch_size_);
void cleanUnprocessed();
bool nextImpl() override;
void waitFunc();
2021-07-04 06:16:40 +00:00
[[noreturn]] void watchFunc();
2021-06-09 02:03:36 +00:00
};
}