ClickHouse/src/Storages/FileLog/ReadBufferFromFileLog.h

93 lines
1.8 KiB
C++
Raw Normal View History

2021-06-09 02:03:36 +00:00
#pragma once
#include <Core/BackgroundSchedulePool.h>
#include <Core/Names.h>
#include <IO/ReadBuffer.h>
2021-09-05 06:32:32 +00:00
#include <Storages/FileLog/StorageFileLog.h>
2021-10-03 11:03:20 +00:00
#include <base/types.h>
2021-06-09 02:03:36 +00:00
#include <fstream>
#include <mutex>
namespace Poco
{
class Logger;
}
namespace DB
{
class ReadBufferFromFileLog : public ReadBuffer
{
public:
2021-09-05 06:32:32 +00:00
ReadBufferFromFileLog(
StorageFileLog & storage_,
size_t max_batch_size,
size_t poll_timeout_,
ContextPtr context_,
size_t stream_number_,
size_t max_streams_number_);
2021-06-09 02:03:36 +00:00
~ReadBufferFromFileLog() override = default;
auto pollTimeout() const { return poll_timeout; }
2021-09-05 06:32:32 +00:00
bool hasMorePolledRecords() const { return current != records.end(); }
2021-06-09 02:03:36 +00:00
bool poll();
2021-09-04 17:04:35 +00:00
bool noRecords() { return buffer_status == BufferStatus::NO_RECORD_RETURNED; }
2021-07-04 06:16:40 +00:00
2021-09-26 07:22:45 +00:00
auto getFileName() const { return current_file; }
auto getOffset() const { return current_offset; }
2021-09-24 16:44:22 +00:00
2021-06-09 02:03:36 +00:00
private:
2021-07-04 06:16:40 +00:00
enum class BufferStatus
{
2021-09-14 03:15:05 +00:00
INIT,
2021-07-04 06:16:40 +00:00
NO_RECORD_RETURNED,
2021-09-04 17:04:35 +00:00
POLLED_OK,
2021-06-09 02:03:36 +00:00
};
2021-09-14 03:15:05 +00:00
BufferStatus buffer_status = BufferStatus::INIT;
2021-07-04 06:16:40 +00:00
2021-09-05 06:32:32 +00:00
Poco::Logger * log;
2021-07-03 17:14:56 +00:00
2021-09-05 06:32:32 +00:00
StorageFileLog & storage;
2021-07-04 06:16:40 +00:00
2021-09-14 03:15:05 +00:00
bool stream_out = false;
2021-09-05 11:41:13 +00:00
size_t batch_size;
size_t poll_timeout;
2021-06-09 02:03:36 +00:00
ContextPtr context;
2021-09-05 06:32:32 +00:00
size_t stream_number;
size_t max_streams_number;
2021-06-09 02:03:36 +00:00
bool allowed = true;
2021-09-24 16:44:22 +00:00
using RecordData = std::string;
struct Record
{
RecordData data;
std::string file_name;
UInt64 offset;
};
2021-06-09 02:03:36 +00:00
using Records = std::vector<Record>;
Records records;
Records::const_iterator current;
2021-09-27 08:44:48 +00:00
String current_file = {};
UInt64 current_offset = 0;
2021-09-26 07:22:45 +00:00
2021-06-09 02:03:36 +00:00
using TaskThread = BackgroundSchedulePool::TaskHolder;
Records pollBatch(size_t batch_size_);
void readNewRecords(Records & new_records, size_t batch_size_);
bool nextImpl() override;
};
}