mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Add retries with backoff, better exceptions
This commit is contained in:
parent
e52f5fdfce
commit
147e413346
@ -3,8 +3,6 @@
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
|
||||
#include <Access/AccessControlManager.h>
|
||||
|
||||
#include <Disks/IDiskRemote.h>
|
||||
#include <Disks/ReadIndirectBufferFromRemoteFS.h>
|
||||
#include <Disks/ReadIndirectBufferFromWebServer.h>
|
||||
@ -17,7 +15,6 @@
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
|
||||
#include <Poco/Exception.h>
|
||||
#include <re2/re2.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -115,25 +112,22 @@ public:
|
||||
const String & uri_,
|
||||
RemoteMetadata metadata_,
|
||||
ContextPtr context_,
|
||||
size_t max_read_tries_,
|
||||
size_t buf_size_)
|
||||
: ReadIndirectBufferFromRemoteFS<ReadIndirectBufferFromWebServer>(metadata_)
|
||||
, uri(uri_)
|
||||
, context(context_)
|
||||
, max_read_tries(max_read_tries_)
|
||||
, buf_size(buf_size_)
|
||||
{
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadIndirectBufferFromWebServer> createReadBuffer(const String & path) override
|
||||
{
|
||||
return std::make_unique<ReadIndirectBufferFromWebServer>(fs::path(uri) / path, context, max_read_tries, buf_size);
|
||||
return std::make_unique<ReadIndirectBufferFromWebServer>(fs::path(uri) / path, context, buf_size);
|
||||
}
|
||||
|
||||
private:
|
||||
String uri;
|
||||
ContextPtr context;
|
||||
size_t max_read_tries;
|
||||
size_t buf_size;
|
||||
};
|
||||
|
||||
@ -142,12 +136,12 @@ DiskWebServer::DiskWebServer(
|
||||
const String & disk_name_,
|
||||
const String & url_,
|
||||
ContextPtr context_,
|
||||
SettingsPtr settings_)
|
||||
size_t min_bytes_for_seek_)
|
||||
: WithContext(context_->getGlobalContext())
|
||||
, log(&Poco::Logger::get("DiskWeb"))
|
||||
, url(url_)
|
||||
, name(disk_name_)
|
||||
, settings(std::move(settings_))
|
||||
, min_bytes_for_seek(min_bytes_for_seek_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -196,8 +190,8 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
|
||||
RemoteMetadata meta(path, remote_path);
|
||||
meta.remote_fs_objects.emplace_back(std::make_pair(remote_path, iter->second.size));
|
||||
|
||||
auto reader = std::make_unique<ReadBufferFromWebServer>(url, meta, getContext(), settings->max_read_tries, read_settings.remote_fs_buffer_size);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
|
||||
auto reader = std::make_unique<ReadBufferFromWebServer>(url, meta, getContext(), read_settings.remote_fs_buffer_size);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), min_bytes_for_seek);
|
||||
}
|
||||
|
||||
|
||||
@ -275,12 +269,16 @@ void registerDiskWebServer(DiskFactory & factory)
|
||||
String uri{config.getString(config_prefix + ".endpoint")};
|
||||
if (!uri.ends_with('/'))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "URI must end with '/', but '{}' doesn't.", uri);
|
||||
try
|
||||
{
|
||||
Poco::URI poco_uri(uri);
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad URI: `{}`. Error: {}", uri, e.what());
|
||||
}
|
||||
|
||||
auto settings = std::make_unique<DiskWebServerSettings>(
|
||||
context->getGlobalContext()->getSettingsRef().http_max_single_read_retries,
|
||||
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024));
|
||||
|
||||
return std::make_shared<DiskWebServer>(disk_name, uri, context, std::move(settings));
|
||||
return std::make_shared<DiskWebServer>(disk_name, uri, context, config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024));
|
||||
};
|
||||
|
||||
factory.registerDiskType("web", creator);
|
||||
|
@ -13,18 +13,6 @@ namespace ErrorCodes
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
struct DiskWebServerSettings
|
||||
{
|
||||
/// Number of read attempts before throw that network is unreachable.
|
||||
size_t max_read_tries;
|
||||
/// Passed to SeekAvoidingReadBuffer.
|
||||
size_t min_bytes_for_seek;
|
||||
|
||||
DiskWebServerSettings(size_t max_read_tries_, size_t min_bytes_for_seek_)
|
||||
: max_read_tries(max_read_tries_) , min_bytes_for_seek(min_bytes_for_seek_) {}
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
* Quick ready test: ATTACH TABLE test_hits UUID '1ae36516-d62d-4218-9ae3-6516d62da218' ( WatchID UInt64, JavaEnable UInt8, Title String, GoodEvent Int16, EventTime DateTime, EventDate Date, CounterID UInt32, ClientIP UInt32, ClientIP6 FixedString(16), RegionID UInt32, UserID UInt64, CounterClass Int8, OS UInt8, UserAgent UInt8, URL String, Referer String, URLDomain String, RefererDomain String, Refresh UInt8, IsRobot UInt8, RefererCategories Array(UInt16), URLCategories Array(UInt16), URLRegions Array(UInt32), RefererRegions Array(UInt32), ResolutionWidth UInt16, ResolutionHeight UInt16, ResolutionDepth UInt8, FlashMajor UInt8, FlashMinor UInt8, FlashMinor2 String, NetMajor UInt8, NetMinor UInt8, UserAgentMajor UInt16, UserAgentMinor FixedString(2), CookieEnable UInt8, JavascriptEnable UInt8, IsMobile UInt8, MobilePhone UInt8, MobilePhoneModel String, Params String, IPNetworkID UInt32, TraficSourceID Int8, SearchEngineID UInt16, SearchPhrase String, AdvEngineID UInt8, IsArtifical UInt8, WindowClientWidth UInt16, WindowClientHeight UInt16, ClientTimeZone Int16, ClientEventTime DateTime, SilverlightVersion1 UInt8, SilverlightVersion2 UInt8, SilverlightVersion3 UInt32, SilverlightVersion4 UInt16, PageCharset String, CodeVersion UInt32, IsLink UInt8, IsDownload UInt8, IsNotBounce UInt8, FUniqID UInt64, HID UInt32, IsOldCounter UInt8, IsEvent UInt8, IsParameter UInt8, DontCountHits UInt8, WithHash UInt8, HitColor FixedString(1), UTCEventTime DateTime, Age UInt8, Sex UInt8, Income UInt8, Interests UInt16, Robotness UInt8, GeneralInterests Array(UInt16), RemoteIP UInt32, RemoteIP6 FixedString(16), WindowName Int32, OpenerName Int32, HistoryLength Int16, BrowserLanguage FixedString(2), BrowserCountry FixedString(2), SocialNetwork String, SocialAction String, HTTPError UInt16, SendTiming Int32, DNSTiming Int32, ConnectTiming Int32, ResponseStartTiming Int32, ResponseEndTiming Int32, FetchTiming Int32, RedirectTiming Int32, DOMInteractiveTiming Int32, DOMContentLoadedTiming Int32, DOMCompleteTiming Int32, LoadEventStartTiming Int32, LoadEventEndTiming Int32, NSToDOMContentLoadedTiming Int32, FirstPaintTiming Int32, RedirectCount Int8, SocialSourceNetworkID UInt8, SocialSourcePage String, ParamPrice Int64, ParamOrderID String, ParamCurrency FixedString(3), ParamCurrencyID UInt16, GoalsReached Array(UInt32), OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID UInt8, RefererHash UInt64, URLHash UInt64, CLID UInt32, YCLID UInt64, ShareService String, ShareURL String, ShareTitle String, ParsedParams Nested(Key1 String, Key2 String, Key3 String, Key4 String, Key5 String, ValueDouble Float64), IslandID FixedString(16), RequestNum UInt32, RequestTry UInt8) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS storage_policy='web';
|
||||
*
|
||||
@ -46,7 +34,7 @@ struct DiskWebServerSettings
|
||||
* </policies>
|
||||
* </storage_configuration>
|
||||
*
|
||||
* If query timeouts -- need to adjust settings: http_connection_timeout, http_receive_timeout, http_receive_timeout.
|
||||
* If query fails with `DB:Exception Unreachable URL` -- may help to adjust settings: http_connection_timeout, http_receive_timeout, keep_alive_timeout.
|
||||
*
|
||||
* To get files for upload run:
|
||||
* clickhouse static-files-disk-uploader --metadata-path <path> --output-dir <dir>
|
||||
@ -60,13 +48,12 @@ struct DiskWebServerSettings
|
||||
**/
|
||||
class DiskWebServer : public IDisk, WithContext
|
||||
{
|
||||
using SettingsPtr = std::unique_ptr<DiskWebServerSettings>;
|
||||
|
||||
public:
|
||||
DiskWebServer(const String & disk_name_,
|
||||
const String & url_,
|
||||
ContextPtr context,
|
||||
SettingsPtr settings_);
|
||||
size_t min_bytes_for_seel_);
|
||||
|
||||
bool supportZeroCopyReplication() const override { return false; }
|
||||
|
||||
@ -208,7 +195,8 @@ private:
|
||||
Poco::Logger * log;
|
||||
String url;
|
||||
String name;
|
||||
SettingsPtr settings;
|
||||
|
||||
size_t min_bytes_for_seek;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,9 +1,12 @@
|
||||
#include "ReadIndirectBufferFromWebServer.h"
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
#include <common/sleep.h>
|
||||
#include <Core/Types.h>
|
||||
#include <IO/ReadWriteBufferFromHTTP.h>
|
||||
#include <IO/ConnectionTimeoutsContext.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <thread>
|
||||
|
||||
|
||||
@ -17,17 +20,17 @@ namespace ErrorCodes
|
||||
extern const int NETWORK_ERROR;
|
||||
}
|
||||
|
||||
static const auto WAIT_MS = 10;
|
||||
static const auto WAIT_THRESHOLD_MS = 10000;
|
||||
|
||||
ReadIndirectBufferFromWebServer::ReadIndirectBufferFromWebServer(const String & url_,
|
||||
ContextPtr context_,
|
||||
size_t max_read_tries_,
|
||||
size_t buf_size_)
|
||||
: BufferWithOwnMemory<SeekableReadBuffer>(buf_size_)
|
||||
, log(&Poco::Logger::get("ReadIndirectBufferFromWebServer"))
|
||||
, context(context_)
|
||||
, url(url_)
|
||||
, buf_size(buf_size_)
|
||||
, max_read_tries(max_read_tries_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -40,6 +43,8 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
|
||||
headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-", offset)));
|
||||
const auto & settings = context->getSettingsRef();
|
||||
LOG_DEBUG(log, "Reading from offset: {}", offset);
|
||||
const auto & config = context->getConfigRef();
|
||||
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 20), 0};
|
||||
|
||||
return std::make_unique<ReadWriteBufferFromHTTP>(
|
||||
uri,
|
||||
@ -49,7 +54,7 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
|
||||
settings.http_send_timeout,
|
||||
std::max(Poco::Timespan(settings.http_receive_timeout.totalSeconds(), 0), Poco::Timespan(20, 0)),
|
||||
settings.tcp_keep_alive_timeout,
|
||||
std::max(Poco::Timespan(settings.http_receive_timeout.totalSeconds(), 0), Poco::Timespan(20, 0))),
|
||||
http_keep_alive_timeout),
|
||||
0,
|
||||
Poco::Net::HTTPBasicCredentials{},
|
||||
buf_size,
|
||||
@ -60,6 +65,7 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
|
||||
bool ReadIndirectBufferFromWebServer::nextImpl()
|
||||
{
|
||||
bool next_result = false, successful_read = false;
|
||||
UInt16 milliseconds_to_wait = WAIT_MS;
|
||||
|
||||
if (impl)
|
||||
{
|
||||
@ -67,40 +73,38 @@ bool ReadIndirectBufferFromWebServer::nextImpl()
|
||||
impl->position() = position();
|
||||
assert(!impl->hasPendingData());
|
||||
}
|
||||
else
|
||||
|
||||
WriteBufferFromOwnString error_msg;
|
||||
while (milliseconds_to_wait < WAIT_THRESHOLD_MS)
|
||||
{
|
||||
try
|
||||
{
|
||||
impl = initialize();
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
throw Exception(ErrorCodes::NETWORK_ERROR, "Unreachable url: {}. Error: {}", url, e.what());
|
||||
}
|
||||
if (!impl)
|
||||
{
|
||||
impl = initialize();
|
||||
next_result = impl->hasPendingData();
|
||||
if (next_result)
|
||||
break;
|
||||
}
|
||||
|
||||
next_result = impl->hasPendingData();
|
||||
}
|
||||
|
||||
for (size_t try_num = 0; (try_num < max_read_tries) && !next_result; ++try_num)
|
||||
{
|
||||
try
|
||||
{
|
||||
next_result = impl->next();
|
||||
successful_read = true;
|
||||
break;
|
||||
}
|
||||
catch (const Exception & e)
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
LOG_WARNING(log, "Read attempt {}/{} failed from {}. ({})", try_num, max_read_tries, url, e.message());
|
||||
LOG_WARNING(log, "Read attempt failed for url: {}. Error: {}", url, e.what());
|
||||
error_msg << fmt::format("Error: {}\n", e.what());
|
||||
|
||||
sleepForMilliseconds(milliseconds_to_wait);
|
||||
milliseconds_to_wait *= 2;
|
||||
impl.reset();
|
||||
impl = initialize();
|
||||
next_result = impl->hasPendingData();
|
||||
}
|
||||
}
|
||||
|
||||
if (!successful_read)
|
||||
throw Exception(ErrorCodes::NETWORK_ERROR, "All read attempts ({}) failed for uri: {}", max_read_tries, url);
|
||||
throw Exception(ErrorCodes::NETWORK_ERROR,
|
||||
"All read attempts failed for url: {}. Reason:\n{}", url, error_msg.str());
|
||||
|
||||
if (next_result)
|
||||
{
|
||||
|
@ -18,7 +18,6 @@ class ReadIndirectBufferFromWebServer : public BufferWithOwnMemory<SeekableReadB
|
||||
public:
|
||||
explicit ReadIndirectBufferFromWebServer(const String & url_,
|
||||
ContextPtr context_,
|
||||
size_t max_read_tries_,
|
||||
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
|
||||
|
||||
bool nextImpl() override;
|
||||
@ -34,7 +33,7 @@ private:
|
||||
ContextPtr context;
|
||||
|
||||
const String url;
|
||||
size_t buf_size, max_read_tries;
|
||||
size_t buf_size;
|
||||
|
||||
std::unique_ptr<ReadBuffer> impl;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user