This commit is contained in:
feng lv 2021-09-05 11:41:13 +00:00
parent 09bc3d723a
commit dfea640c67
8 changed files with 29 additions and 26 deletions

View File

@ -20,7 +20,7 @@ FileLogDirectoryWatcher::Events FileLogDirectoryWatcher::getEvents()
return res;
}
bool FileLogDirectoryWatcher::getError() const
bool FileLogDirectoryWatcher::hasError() const
{
return error;
}

View File

@ -24,7 +24,7 @@ public:
Events getEvents();
bool getError() const;
bool hasError() const;
const std::string & getPath() const;

View File

@ -24,16 +24,15 @@ FileLogSource::FileLogSource(
size_t poll_time_out_,
size_t stream_number_,
size_t max_streams_number_)
: SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(columns, storage_.getVirtuals(), storage_.getStorageID()))
: SourceWithProgress(metadata_snapshot_->getSampleBlockForColumns(columns, storage_.getVirtuals(), storage_.getStorageID()))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
, column_names(columns)
, max_block_size(max_block_size_)
, poll_time_out(poll_time_out_)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, virtual_header(
metadata_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames(), storage.getVirtuals(), storage.getStorageID()))
, non_virtual_header(metadata_snapshot_->getSampleBlockNonMaterialized())
, column_names_and_types(metadata_snapshot_->getColumns().getByNames(ColumnsDescription::All, columns, true))
{
buffer = std::make_unique<ReadBufferFromFileLog>(storage, max_block_size, poll_time_out, context, stream_number_, max_streams_number_);
}
@ -43,7 +42,7 @@ Chunk FileLogSource::generate()
if (!buffer)
return {};
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
MutableColumns read_columns = non_virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
@ -79,7 +78,7 @@ Chunk FileLogSource::generate()
auto columns = chunk.detachColumns();
for (size_t i = 0, s = columns.size(); i < s; ++i)
{
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
read_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
}
break;
}
@ -103,9 +102,9 @@ Chunk FileLogSource::generate()
{
new_rows = read_file_log();
}
catch (Exception &)
catch (...)
{
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (new_rows)
@ -122,6 +121,15 @@ Chunk FileLogSource::generate()
if (total_rows == 0)
return {};
Columns result_columns;
result_columns.reserve(column_names_and_types.size());
for (const auto & elem : column_names_and_types)
{
auto index = non_virtual_header.getPositionByName(elem.getNameInStorage());
result_columns.emplace_back(std::move(read_columns[index]));
}
return Chunk(std::move(result_columns), total_rows);
}

View File

@ -43,10 +43,8 @@ private:
std::unique_ptr<ReadBufferFromFileLog> buffer;
bool started = false;
const Block non_virtual_header;
const Block virtual_header;
Block non_virtual_header;
const NamesAndTypesList column_names_and_types;
};
}

View File

@ -92,7 +92,6 @@ ReadBufferFromFileLog::Records ReadBufferFromFileLog::pollBatch(size_t batch_siz
return new_records;
}
// TODO
void ReadBufferFromFileLog::readNewRecords(ReadBufferFromFileLog::Records & new_records, size_t batch_size_)
{
size_t need_records_size = batch_size_ - new_records.size();

View File

@ -50,8 +50,8 @@ private:
StorageFileLog & storage;
const size_t batch_size = 1;
const size_t poll_timeout = 0;
size_t batch_size;
size_t poll_timeout;
ContextPtr context;

View File

@ -77,6 +77,10 @@ StorageFileLog::StorageFileLog(
}
}
}
else
{
throw Exception("The path neigher a regular file, nor a directory", ErrorCodes::BAD_ARGUMENTS);
}
watch_task = getContext()->getMessageBrokerSchedulePool().createTask("watchTask", [this] { watchFunc(); });
@ -384,23 +388,19 @@ void registerStorageFileLog(StorageFactory & factory)
});
}
Names StorageFileLog::getVirtualColumnNames()
{
return {};
}
void StorageFileLog::watchFunc()
{
FileLogDirectoryWatcher dw(path);
while (true)
{
sleepForMicroseconds(filelog_settings->filelog_poll_timeout_ms.totalMilliseconds());
sleepForMilliseconds(filelog_settings->filelog_poll_timeout_ms.totalMilliseconds());
auto error = dw.getError();
auto error = dw.hasError();
if (error)
LOG_INFO(log, "Error happened during watching directory {}.", dw.getPath());
auto events = dw.getEvents();
std::lock_guard<std::mutex> lock(status_mutex);
for (const auto & event : events)

View File

@ -50,8 +50,6 @@ public:
const auto & getFormatName() const { return format_name; }
static Names getVirtualColumnNames();
auto & getFileNames() { return file_names; }
auto & getFileStatus() { return file_status; }