refactor some code

This commit is contained in:
feng lv 2021-09-05 06:32:32 +00:00
parent 595005eb21
commit 09bc3d723a
6 changed files with 64 additions and 93 deletions

View File

@ -24,19 +24,18 @@ FileLogSource::FileLogSource(
size_t poll_time_out_,
size_t stream_number_,
size_t max_streams_number_)
: storage(storage_)
: SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(columns, storage_.getVirtuals(), storage_.getStorageID()))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
, column_names(columns)
, max_block_size(max_block_size_)
, poll_time_out(poll_time_out_)
, stream_number(stream_number_)
, max_streams_number(max_streams_number_)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, virtual_header(
metadata_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames(), storage.getVirtuals(), storage.getStorageID()))
{
createReadBuffer();
buffer = std::make_unique<ReadBufferFromFileLog>(storage, max_block_size, poll_time_out, context, stream_number_, max_streams_number_);
}
Chunk FileLogSource::generate()
@ -114,7 +113,7 @@ Chunk FileLogSource::generate()
total_rows = total_rows + new_rows;
}
if ((!buffer->hasMorePolledRecords() && (total_rows >= max_block_size)) || watch.elapsedMilliseconds() > poll_time_out)
if (!buffer->hasMorePolledRecords() && ((total_rows >= max_block_size) || watch.elapsedMilliseconds() > poll_time_out))
{
break;
}

View File

@ -25,7 +25,9 @@ public:
size_t stream_number_,
size_t max_streams_number_);
bool isStalled() { return !buffer || buffer->isStalled(); }
String getName() const override { return "FileLog"; }
bool noRecords() { return !buffer || buffer->noRecords(); }
protected:
Chunk generate() override;
@ -39,17 +41,12 @@ private:
size_t poll_time_out;
size_t stream_number;
size_t max_streams_number;
ReadBufferFromFileLogPtr buffer;
std::unique_ptr<ReadBufferFromFileLog> buffer;
bool started = false;
const Block non_virtual_header;
const Block virtual_header;
void createReadBuffer();
};
}

View File

@ -1,5 +1,6 @@
#include <Interpreters/Context.h>
#include <Storages/FileLog/ReadBufferFromFileLog.h>
#include <Common/Stopwatch.h>
#include <common/logger_useful.h>
@ -15,22 +16,21 @@ namespace ErrorCodes
}
ReadBufferFromFileLog::ReadBufferFromFileLog(
const String & path_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, ContextPtr context_)
StorageFileLog & storage_,
size_t max_batch_size,
size_t poll_timeout_,
ContextPtr context_,
size_t stream_number_,
size_t max_streams_number_)
: ReadBuffer(nullptr, 0)
, path(path_)
, log(log_)
, log(&Poco::Logger::get("ReadBufferFromFileLog " + toString(stream_number)))
, storage(storage_)
, batch_size(max_batch_size)
, poll_timeout(poll_timeout_)
, context(context_)
, stream_number(stream_number_)
, max_streams_number(max_streams_number_)
{
}
void ReadBufferFromFileLog::open()
{
wait_task = context->getMessageBrokerSchedulePool().createTask("waitTask", [this] { waitFunc(); });
wait_task->deactivate();
cleanUnprocessed();
allowed = false;
}
@ -42,17 +42,6 @@ void ReadBufferFromFileLog::cleanUnprocessed()
BufferBase::set(nullptr, 0, 0);
}
void ReadBufferFromFileLog::close()
{
wait_task->deactivate();
if (path_is_directory)
select_task->deactivate();
for (auto & status : file_status)
status.second.reader.close();
}
bool ReadBufferFromFileLog::poll()
{
@ -77,7 +66,7 @@ bool ReadBufferFromFileLog::poll()
LOG_TRACE(log, "Polled batch of {} records. ", records.size());
buffer_status = BufferStatus::NOT_STALLED;
buffer_status = BufferStatus::POLLED_OK;
allowed = true;
return true;
}
@ -94,43 +83,45 @@ ReadBufferFromFileLog::Records ReadBufferFromFileLog::pollBatch(size_t batch_siz
if (new_records.size() == batch_size_)
return new_records;
wait_task->activateAndSchedule();
while (!time_out && new_records.size() != batch_size_)
Stopwatch watch;
while (watch.elapsedMilliseconds() < poll_timeout && new_records.size() != batch_size_)
{
readNewRecords(new_records, batch_size);
}
wait_task->deactivate();
time_out = false;
return new_records;
}
// TODO
void ReadBufferFromFileLog::readNewRecords(ReadBufferFromFileLog::Records & new_records, size_t batch_size_)
{
std::lock_guard<std::mutex> lock(status_mutex);
size_t need_records_size = batch_size_ - new_records.size();
size_t read_records_size = 0;
for (auto & status : file_status)
const auto & file_names = storage.getFileNames();
auto & file_status = storage.getFileStatus();
size_t files_per_stream = file_names.size() / max_streams_number;
size_t start = stream_number * files_per_stream;
size_t end = stream_number == max_streams_number - 1 ? file_names.size() : (stream_number + 1) * files_per_stream;
for (size_t i = start; i < end; ++i)
{
if (status.second.status == FileStatus::NO_CHANGE)
auto & file = file_status[file_names[i]];
if (file.status == StorageFileLog::FileStatus::NO_CHANGE || file.status == StorageFileLog::FileStatus::REMOVED)
continue;
if (status.second.status == FileStatus::REMOVED)
file_status.erase(status.first);
while (read_records_size < need_records_size && status.second.reader.good() && !status.second.reader.eof())
while (read_records_size < need_records_size && file.reader.good() && !file.reader.eof())
{
Record record;
std::getline(status.second.reader, record);
std::getline(file.reader, record);
new_records.emplace_back(record);
++read_records_size;
}
// Read to the end of the file
if (status.second.reader.eof())
status.second.status = FileStatus::NO_CHANGE;
if (file.reader.eof())
file.status = StorageFileLog::FileStatus::NO_CHANGE;
if (read_records_size == need_records_size)
break;
@ -151,10 +142,4 @@ bool ReadBufferFromFileLog::nextImpl()
return true;
}
void ReadBufferFromFileLog::waitFunc()
{
sleepForMicroseconds(poll_timeout);
time_out = true;
}
}

View File

@ -3,12 +3,11 @@
#include <Core/BackgroundSchedulePool.h>
#include <Core/Names.h>
#include <IO/ReadBuffer.h>
#include <Storages/FileLog/FileLogDirectoryWatcher.h>
#include <Storages/FileLog/StorageFileLog.h>
#include <common/types.h>
#include <fstream>
#include <mutex>
#include <unordered_map>
namespace Poco
{
@ -20,16 +19,19 @@ namespace DB
class ReadBufferFromFileLog : public ReadBuffer
{
public:
ReadBufferFromFileLog(const String & path_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, ContextPtr context_);
ReadBufferFromFileLog(
StorageFileLog & storage_,
size_t max_batch_size,
size_t poll_timeout_,
ContextPtr context_,
size_t stream_number_,
size_t max_streams_number_);
~ReadBufferFromFileLog() override = default;
void open();
void close();
auto pollTimeout() const { return poll_timeout; }
inline bool hasMorePolledRecords() const { return current != records.end(); }
bool hasMorePolledRecords() const { return current != records.end(); }
bool poll();
@ -44,19 +46,18 @@ private:
BufferStatus buffer_status;
const String path;
bool path_is_directory = false;
Poco::Logger * log;
StorageFileLog & storage;
const size_t batch_size = 1;
const size_t poll_timeout = 0;
bool time_out = false;
ContextPtr context;
size_t stream_number;
size_t max_streams_number;
bool allowed = true;
using Record = std::string;
@ -76,8 +77,5 @@ private:
void cleanUnprocessed();
bool nextImpl() override;
void waitFunc();
};
}

View File

@ -8,6 +8,7 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Storages/FileLog/FileLogDirectoryWatcher.h>
#include <Storages/FileLog/FileLogSource.h>
#include <Storages/FileLog/ReadBufferFromFileLog.h>
#include <Storages/FileLog/StorageFileLog.h>
@ -383,16 +384,9 @@ void registerStorageFileLog(StorageFactory & factory)
});
}
NamesAndTypesList StorageFileLog::getVirtuals() const
{
auto result = NamesAndTypesList{};
return result;
}
Names StorageFileLog::getVirtualColumnNames()
{
auto result = Names{};
return result;
return {};
}
void StorageFileLog::watchFunc()

View File

@ -22,6 +22,14 @@ class StorageFileLog final : public shared_ptr_helper<StorageFileLog>, public IS
friend struct shared_ptr_helper<StorageFileLog>;
public:
enum class FileStatus
{
BEGIN,
NO_CHANGE,
UPDATED,
REMOVED
};
using Files = std::vector<String>;
std::string getName() const override { return "FileLog"; }
@ -42,10 +50,10 @@ public:
const auto & getFormatName() const { return format_name; }
NamesAndTypesList getVirtuals() const override;
static Names getVirtualColumnNames();
auto & getBuffer() { return buffer; }
auto & getFileNames() { return file_names; }
auto & getFileStatus() { return file_status; }
protected:
StorageFileLog(
@ -64,16 +72,6 @@ private:
const String format_name;
Poco::Logger * log;
ReadBufferFromFileLogPtr buffer;
enum class FileStatus
{
BEGIN,
NO_CHANGE,
UPDATED,
REMOVED
};
struct FileContext
{
FileStatus status = FileStatus::BEGIN;