This commit is contained in:
feng lv 2021-10-18 06:38:47 +00:00
parent 9378b93e41
commit 074e02eb14
10 changed files with 228 additions and 66 deletions

View File

@ -1,6 +1,8 @@
#include <Interpreters/Context.h>
#include <Storages/FileLog/DirectoryWatcherBase.h>
#include <Storages/FileLog/FileLogDirectoryWatcher.h>
#include <Storages/FileLog/StorageFileLog.h>
#include <base/sleep.h>
#include <filesystem>
#include <unistd.h>
@ -20,7 +22,11 @@ static constexpr int buffer_size = 4096;
DirectoryWatcherBase::DirectoryWatcherBase(
FileLogDirectoryWatcher & owner_, const std::string & path_, ContextPtr context_, int event_mask_)
: WithContext(context_), owner(owner_), path(path_), event_mask(event_mask_)
: WithContext(context_)
, owner(owner_)
, path(path_)
, event_mask(event_mask_)
, milliseconds_to_wait(owner.storage.getFileLogSettings()->poll_directory_watch_events_backoff_init.totalMilliseconds())
{
if (!std::filesystem::exists(path))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Path {} does not exist", path);
@ -64,8 +70,9 @@ void DirectoryWatcherBase::watchFunc()
while (!stopped)
{
const auto & settings = owner.storage.getFileLogSettings();
if (poll(&pfd, 1, 500) > 0 && pfd.revents & POLLIN)
if (poll(&pfd, 1, milliseconds_to_wait) > 0 && pfd.revents & POLLIN)
{
milliseconds_to_wait = settings->poll_directory_watch_events_backoff_init.totalMilliseconds();
int n = read(fd, buffer.data(), buffer.size());
int i = 0;
if (n > 0)
@ -109,33 +116,20 @@ void DirectoryWatcherBase::watchFunc()
}
/// Wake up reader thread
auto & mutex = owner.storage.getMutex();
auto & cv = owner.storage.getConditionVariable();
std::unique_lock<std::mutex> lock(mutex);
owner.storage.setNewEvents();
lock.unlock();
cv.notify_one();
owner.storage.wakeUp();
}
else
{
if (milliseconds_to_wait < static_cast<uint64_t>(settings->poll_directory_watch_events_backoff_max.totalMilliseconds()))
milliseconds_to_wait *= settings->poll_directory_watch_events_backoff_factor.value;
break;
else
{
if (milliseconds_to_wait < static_cast<uint64_t>(settings->poll_directory_watch_events_backoff_max.totalMilliseconds()))
milliseconds_to_wait *= settings->poll_directory_watch_events_backoff_factor.value;
}
}
if (!stopped)
watch_task->scheduleAfter(milliseconds_to_wait);
}
DirectoryWatcherBase::~DirectoryWatcherBase()
{
stop();
close(fd);
if (watch_task)
watch_task->deactivate();
}
void DirectoryWatcherBase::start()

View File

@ -97,10 +97,11 @@ private:
std::atomic<bool> stopped{false};
uint64_t milliseconds_to_wait;
const std::string path;
int event_mask;
uint64_t milliseconds_to_wait;
int fd;
};

View File

@ -78,9 +78,9 @@ void FileLogDirectoryWatcher::onItemModified(DirectoryWatcherBase::DirectoryEven
auto event_path = ev.path;
EventInfo info{ev.event, "onItemModified"};
/// Already have MODIFY event for this file
if (auto it = events.find(event_path); it != events.end())
{
/// Already have MODIFY event for this file
if (it->second.received_modification_event)
return;
else

View File

@ -1,7 +1,6 @@
#pragma once
#include <Storages/FileLog/DirectoryWatcherBase.h>
#include <Storages/FileLog/StorageFileLog.h>
#include <base/logger_useful.h>
@ -10,6 +9,7 @@
namespace DB
{
class StorageFileLog;
class FileLogDirectoryWatcher
{

View File

@ -117,7 +117,7 @@ void ReadBufferFromFileLog::readNewRecords(ReadBufferFromFileLog::Records & new_
{
/// Need to get offset before reading record from stream
auto offset = reader.tellg();
if (static_cast<UInt64>(offset) < file_meta.last_open_end)
if (static_cast<UInt64>(offset) >= file_meta.last_open_end)
break;
record.offset = offset;
StorageFileLog::assertStreamGood(reader);

View File

@ -47,8 +47,6 @@ namespace ErrorCodes
namespace
{
const auto RESCHEDULE_MS = 500;
const auto BACKOFF_TRESHOLD = 32000;
const auto MAX_THREAD_WORK_DURATION_MS = 60000;
}
@ -57,7 +55,6 @@ StorageFileLog::StorageFileLog(
ContextPtr context_,
const ColumnsDescription & columns_,
const String & path_,
const String & relative_data_path_,
const String & format_name_,
std::unique_ptr<FileLogSettings> settings,
const String & comment,
@ -66,10 +63,9 @@ StorageFileLog::StorageFileLog(
, WithContext(context_->getGlobalContext())
, filelog_settings(std::move(settings))
, path(path_)
, relative_data_path(relative_data_path_)
, format_name(format_name_)
, log(&Poco::Logger::get("StorageFileLog (" + table_id_.table_name + ")"))
, milliseconds_to_wait(RESCHEDULE_MS)
, milliseconds_to_wait(filelog_settings->poll_directory_watch_events_backoff_init.totalMilliseconds())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -100,9 +96,9 @@ StorageFileLog::StorageFileLog(
void StorageFileLog::loadMetaFiles(bool attach)
{
const auto & storage_id = getStorageID();
root_meta_path = std::filesystem::path(getContext()->getPath()) / "metadata" / "filelog_storage_metadata" / storage_id.getDatabaseName()
/ storage_id.getTableName();
const auto & storage = getStorageID();
root_meta_path
= std::filesystem::path(getContext()->getPath()) / ".filelog_storage_metadata" / storage.getDatabaseName() / storage.getTableName();
/// Attach table
if (attach)
@ -110,8 +106,8 @@ void StorageFileLog::loadMetaFiles(bool attach)
/// Meta file may lost, log and create directory
if (!std::filesystem::exists(root_meta_path))
{
/// Create root_meta_path directory when store meta data
LOG_ERROR(log, "Metadata files of table {} are lost.", getStorageID().getTableName());
std::filesystem::create_directories(root_meta_path);
}
/// Load all meta info to file_infos;
deserialize();
@ -180,8 +176,8 @@ void StorageFileLog::loadFiles()
/// data file have been renamed, need update meta file's name
if (it->second.file_name != file)
{
it->second.file_name = file;
std::filesystem::rename(getFullMetaPath(it->second.file_name), getFullMetaPath(file));
it->second.file_name = file;
}
}
/// New file
@ -261,6 +257,8 @@ void StorageFileLog::serialize(UInt64 inode, const FileMeta & file_meta) const
void StorageFileLog::deserialize()
{
if (!std::filesystem::exists(root_meta_path))
return;
/// In case of single file (not a watched directory),
/// iterated directoy always has one file inside.
for (const auto & dir_entry : std::filesystem::directory_iterator{root_meta_path})
@ -324,7 +322,7 @@ Pipe StorageFileLog::read(
getStorageID().getTableName());
}
if (running_streams.load(std::memory_order_relaxed))
if (running_streams)
{
throw Exception("Another select query is running on this table, need to wait it finish.", ErrorCodes::CANNOT_SELECT);
}
@ -409,6 +407,9 @@ void StorageFileLog::shutdown()
{
task->stream_cancelled = true;
/// Reader thread may wait for wake up
wakeUp();
LOG_TRACE(log, "Waiting for cleanup");
task->holder->deactivate();
}
@ -623,10 +624,13 @@ void StorageFileLog::threadFunc()
{
if (path_is_directory)
{
std::unique_lock<std::mutex> lock(mutex);
/// Waiting for watch directory thread to wake up
std::unique_lock<std::mutex> lock(mutex);
/// Waiting for watch directory thread to wake up
cv.wait(lock, [this] { return has_new_events; });
has_new_events = false;
if (task->stream_cancelled)
return;
task->holder->schedule();
}
else
@ -636,7 +640,7 @@ void StorageFileLog::threadFunc()
bool StorageFileLog::streamToViews()
{
if (running_streams.load(std::memory_order_relaxed))
if (running_streams)
{
throw Exception("Another select query is running on this table, need to wait it finish.", ErrorCodes::CANNOT_SELECT);
}
@ -702,6 +706,14 @@ bool StorageFileLog::streamToViews()
return updateFileInfos();
}
void StorageFileLog::wakeUp()
{
std::unique_lock<std::mutex> lock(mutex);
has_new_events = true;
lock.unlock();
cv.notify_one();
}
void registerStorageFileLog(StorageFactory & factory)
{
auto creator_fn = [](const StorageFactory::Arguments & args)
@ -767,7 +779,6 @@ void registerStorageFileLog(StorageFactory & factory)
args.getContext(),
args.columns,
path,
args.relative_data_path,
format,
std::move(filelog_settings),
args.comment,
@ -813,10 +824,10 @@ bool StorageFileLog::updateFileInfos()
auto events = directory_watch->getEventsAndReset();
for (const auto & [file_name, event_info] : events)
for (const auto & [file_name, event_infos] : events)
{
String file_path = getFullDataPath(file_name);
for(const auto & event_info : event_info.file_events)
for (const auto & event_info : event_infos.file_events)
{
switch (event_info.type)
{
@ -836,7 +847,7 @@ bool StorageFileLog::updateFileInfos()
file_infos.meta_by_inode.emplace(inode, FileMeta{.file_name = file_name});
if (auto it = file_infos.context_by_name.find(file_name); it != file_infos.context_by_name.end())
it->second = FileContext{.inode = inode};
it->second = FileContext{.status = FileStatus::OPEN, .inode = inode};
else
file_infos.context_by_name.emplace(file_name, FileContext{.inode = inode});
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/FileLog/Buffer_fwd.h>
#include <Storages/FileLog/FileLogDirectoryWatcher.h>
#include <Storages/FileLog/FileLogSettings.h>
#include <Core/BackgroundSchedulePool.h>
@ -126,9 +127,7 @@ public:
void increaseStreams();
void reduceStreams();
auto & getConditionVariable() { return cv; }
auto & getMutex() { return mutex; }
void setNewEvents() { has_new_events = true; }
void wakeUp();
const auto & getFileLogSettings() const { return filelog_settings; }
@ -138,7 +137,6 @@ protected:
ContextPtr context_,
const ColumnsDescription & columns_,
const String & path_,
const String & relative_data_path_,
const String & format_name_,
std::unique_ptr<FileLogSettings> settings,
const String & comment,
@ -148,14 +146,11 @@ private:
std::unique_ptr<FileLogSettings> filelog_settings;
const String path;
/// For meta file
const String relative_data_path;
bool path_is_directory = true;
/// If path argument of the table is a regular file, it equals to user_files_path
/// otherwise, it equals to user_files_path/ + path_argument/, e.g. path
String root_data_path;
/// relative_data_path/ + table_name/
String root_meta_path;
FileInfos file_infos;
@ -163,20 +158,8 @@ private:
const String format_name;
Poco::Logger * log;
std::unique_ptr<FileLogDirectoryWatcher> directory_watch = nullptr;
uint64_t milliseconds_to_wait;
struct TaskContext
{
BackgroundSchedulePool::TaskHolder holder;
std::atomic<bool> stream_cancelled {false};
explicit TaskContext(BackgroundSchedulePool::TaskHolder&& task_) : holder(std::move(task_))
{
}
};
std::shared_ptr<TaskContext> task;
/// In order to avoid data race, using a naive trick to forbid execute two select
/// simultaneously, although read is not useful in this engine. Using an atomic
/// variable to records current unfinishing streams, then if have unfinishing streams,
@ -189,6 +172,18 @@ private:
bool has_new_events = false;
std::condition_variable cv;
struct TaskContext
{
BackgroundSchedulePool::TaskHolder holder;
std::atomic<bool> stream_cancelled {false};
explicit TaskContext(BackgroundSchedulePool::TaskHolder&& task_) : holder(std::move(task_))
{
}
};
std::shared_ptr<TaskContext> task;
std::unique_ptr<FileLogDirectoryWatcher> directory_watch = nullptr;
void loadFiles();
void loadMetaFiles(bool attach);

View File

@ -141,4 +141,147 @@
120 120
120 120
120 120
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
16 16
17 17
18 18
19 19
20 20
100 100
101 101
102 102
103 103
104 104
105 105
106 106
107 107
108 108
109 109
110 110
111 111
112 112
113 113
114 114
115 115
116 116
117 117
118 118
119 119
120 120
150 150
151 151
152 152
153 153
154 154
155 155
156 156
157 157
158 158
159 159
160 160
161 161
162 162
163 163
164 164
165 165
166 166
167 167
168 168
169 169
170 170
171 171
172 172
173 173
174 174
175 175
176 176
177 177
178 178
179 179
180 180
181 181
182 182
183 183
184 184
185 185
186 186
187 187
188 188
189 189
190 190
191 191
192 192
193 193
194 194
195 195
196 196
197 197
198 198
199 199
200 200
200 200
201 201
202 202
203 203
204 204
205 205
206 206
207 207
208 208
209 209
210 210
211 211
212 212
213 213
214 214
215 215
216 216
217 217
218 218
219 219
220 220
221 221
222 222
223 223
224 224
225 225
226 226
227 227
228 228
229 229
230 230
231 231
232 232
233 233
234 234
235 235
236 236
237 237
238 238
239 239
240 240
241 241
242 242
243 243
244 244
245 245
246 246
247 247
248 248
249 249
250 250
OK

View File

@ -10,7 +10,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# Data preparation.
# Now we can get the user_files_path by use the table file function for trick. also we can get it by query as:
# "insert into function file('exist.txt', 'CSV', 'val1 char') values ('aaaa'); select _path from file('exist.txt', 'CSV', 'val1 char')"
user_files_path=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
user_files_path=$(${CLICKHOUSE_CLIENT} --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
mkdir -p ${user_files_path}/logs/
@ -41,11 +41,28 @@ touch ${user_files_path}/logs/a.txt
cp ${user_files_path}/logs/a.txt ${user_files_path}/logs/c.txt
cp ${user_files_path}/logs/a.txt ${user_files_path}/logs/d.txt
cp ${user_files_path}/logs/a.txt ${user_files_path}/logs/e.txt
mv ${user_files_path}/logs/b.txt ${user_files_path}/logs/j.txt
rm ${user_files_path}/logs/d.txt
${CLICKHOUSE_CLIENT} --query "select * from file_log order by k;"
${CLICKHOUSE_CLIENT} --query "detach table file_log;"
cp ${user_files_path}/logs/e.txt ${user_files_path}/logs/f.txt
mv ${user_files_path}/logs/e.txt ${user_files_path}/logs/g.txt
mv ${user_files_path}/logs/c.txt ${user_files_path}/logs/h.txt
for i in {150..200}
do
echo $i, $i >> ${user_files_path}/logs/h.txt
done
for i in {200..250}
do
echo $i, $i >> ${user_files_path}/logs/i.txt
done
${CLICKHOUSE_CLIENT} --query "attach table file_log;"
${CLICKHOUSE_CLIENT} --query "select * from file_log order by k;"
${CLICKHOUSE_CLIENT} --query "detach table file_log;"
${CLICKHOUSE_CLIENT} --query "attach table file_log;"

View File

@ -14,10 +14,6 @@ user_files_path=$(clickhouse-client --query "select _path,_file from file('nonex
mkdir -p ${user_files_path}/logs/
rm -rf ${user_files_path}/logs/*
for i in {1..20}
do
echo $i, $i >> ${user_files_path}/logs/a.txt
done
${CLICKHOUSE_CLIENT} --query "drop table if exists file_log;"
${CLICKHOUSE_CLIENT} --query "create table file_log(k UInt8, v UInt8) engine=FileLog('${user_files_path}/logs/', 'CSV');"
@ -25,6 +21,11 @@ ${CLICKHOUSE_CLIENT} --query "create table file_log(k UInt8, v UInt8) engine=Fil
${CLICKHOUSE_CLIENT} --query "drop table if exists mv;"
${CLICKHOUSE_CLIENT} --query "create Materialized View mv engine=MergeTree order by k as select * from file_log;"
for i in {1..20}
do
echo $i, $i >> ${user_files_path}/logs/a.txt
done
for i in {1..200}
do
sleep 0.1