2021-06-09 02:03:36 +00:00
|
|
|
#pragma once
|
|
|
|
|
|
|
|
#include <Core/BackgroundSchedulePool.h>
|
|
|
|
#include <Core/Names.h>
|
|
|
|
#include <IO/ReadBuffer.h>
|
2021-09-05 06:32:32 +00:00
|
|
|
#include <Storages/FileLog/StorageFileLog.h>
|
2021-06-09 02:03:36 +00:00
|
|
|
#include <common/types.h>
|
|
|
|
|
|
|
|
#include <fstream>
|
|
|
|
#include <mutex>
|
|
|
|
|
|
|
|
namespace Poco
|
|
|
|
{
|
|
|
|
class Logger;
|
|
|
|
}
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
class ReadBufferFromFileLog : public ReadBuffer
|
|
|
|
{
|
|
|
|
public:
|
2021-09-05 06:32:32 +00:00
|
|
|
ReadBufferFromFileLog(
|
|
|
|
StorageFileLog & storage_,
|
|
|
|
size_t max_batch_size,
|
|
|
|
size_t poll_timeout_,
|
|
|
|
ContextPtr context_,
|
|
|
|
size_t stream_number_,
|
|
|
|
size_t max_streams_number_);
|
2021-06-09 02:03:36 +00:00
|
|
|
|
|
|
|
~ReadBufferFromFileLog() override = default;
|
|
|
|
|
|
|
|
auto pollTimeout() const { return poll_timeout; }
|
|
|
|
|
2021-09-05 06:32:32 +00:00
|
|
|
bool hasMorePolledRecords() const { return current != records.end(); }
|
2021-06-09 02:03:36 +00:00
|
|
|
|
|
|
|
bool poll();
|
|
|
|
|
2021-09-04 17:04:35 +00:00
|
|
|
bool noRecords() { return buffer_status == BufferStatus::NO_RECORD_RETURNED; }
|
2021-07-04 06:16:40 +00:00
|
|
|
|
2021-09-26 07:22:45 +00:00
|
|
|
auto getFileName() const { return current_file; }
|
|
|
|
auto getOffset() const { return current_offset; }
|
2021-09-24 16:44:22 +00:00
|
|
|
|
2021-06-09 02:03:36 +00:00
|
|
|
private:
|
2021-07-04 06:16:40 +00:00
|
|
|
enum class BufferStatus
|
|
|
|
{
|
2021-09-14 03:15:05 +00:00
|
|
|
INIT,
|
2021-07-04 06:16:40 +00:00
|
|
|
NO_RECORD_RETURNED,
|
2021-09-04 17:04:35 +00:00
|
|
|
POLLED_OK,
|
2021-06-09 02:03:36 +00:00
|
|
|
};
|
|
|
|
|
2021-09-14 03:15:05 +00:00
|
|
|
BufferStatus buffer_status = BufferStatus::INIT;
|
2021-07-04 06:16:40 +00:00
|
|
|
|
2021-09-05 06:32:32 +00:00
|
|
|
Poco::Logger * log;
|
2021-07-03 17:14:56 +00:00
|
|
|
|
2021-09-05 06:32:32 +00:00
|
|
|
StorageFileLog & storage;
|
2021-07-04 06:16:40 +00:00
|
|
|
|
2021-09-14 03:15:05 +00:00
|
|
|
bool stream_out = false;
|
|
|
|
|
2021-09-05 11:41:13 +00:00
|
|
|
size_t batch_size;
|
|
|
|
size_t poll_timeout;
|
2021-06-09 02:03:36 +00:00
|
|
|
|
|
|
|
ContextPtr context;
|
|
|
|
|
2021-09-05 06:32:32 +00:00
|
|
|
size_t stream_number;
|
|
|
|
size_t max_streams_number;
|
|
|
|
|
2021-06-09 02:03:36 +00:00
|
|
|
bool allowed = true;
|
|
|
|
|
2021-09-24 16:44:22 +00:00
|
|
|
using RecordData = std::string;
|
|
|
|
struct Record
|
|
|
|
{
|
|
|
|
RecordData data;
|
|
|
|
std::string file_name;
|
|
|
|
UInt64 offset;
|
|
|
|
};
|
2021-06-09 02:03:36 +00:00
|
|
|
using Records = std::vector<Record>;
|
|
|
|
|
|
|
|
Records records;
|
|
|
|
Records::const_iterator current;
|
|
|
|
|
2021-09-26 07:22:45 +00:00
|
|
|
String current_file;
|
|
|
|
UInt64 current_offset;
|
|
|
|
|
2021-06-09 02:03:36 +00:00
|
|
|
using TaskThread = BackgroundSchedulePool::TaskHolder;
|
|
|
|
|
|
|
|
TaskThread wait_task;
|
|
|
|
|
|
|
|
Records pollBatch(size_t batch_size_);
|
|
|
|
|
|
|
|
void readNewRecords(Records & new_records, size_t batch_size_);
|
|
|
|
|
|
|
|
void cleanUnprocessed();
|
|
|
|
|
|
|
|
bool nextImpl() override;
|
|
|
|
};
|
|
|
|
}
|