mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge branch 'master' into zookeeper_client_fault_injection
This commit is contained in:
commit
b264be3c63
@ -41,6 +41,7 @@ function configure()
|
||||
export ZOOKEEPER_FAULT_INJECTION=1
|
||||
# install test configs
|
||||
export USE_DATABASE_ORDINARY=1
|
||||
export EXPORT_S3_STORAGE_POLICIES=1
|
||||
/usr/share/clickhouse-test/config/install.sh
|
||||
|
||||
# we mount tests folder from repo to /usr/share
|
||||
@ -184,11 +185,11 @@ install_packages package_folder
|
||||
configure
|
||||
|
||||
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --debug /azurite_log &
|
||||
./setup_minio.sh stateful # to have a proper environment
|
||||
./setup_minio.sh stateless # to have a proper environment
|
||||
|
||||
start
|
||||
|
||||
# shellcheck disable=SC2086 # No quotes because I want to split it into words.
|
||||
shellcheck disable=SC2086 # No quotes because I want to split it into words.
|
||||
/s3downloader --url-prefix "$S3_URL" --dataset-names $DATASETS
|
||||
chmod 777 -R /var/lib/clickhouse
|
||||
clickhouse-client --query "ATTACH DATABASE IF NOT EXISTS datasets ENGINE = Ordinary"
|
||||
@ -201,12 +202,36 @@ start
|
||||
|
||||
clickhouse-client --query "SHOW TABLES FROM datasets"
|
||||
clickhouse-client --query "SHOW TABLES FROM test"
|
||||
clickhouse-client --query "RENAME TABLE datasets.hits_v1 TO test.hits"
|
||||
clickhouse-client --query "RENAME TABLE datasets.visits_v1 TO test.visits"
|
||||
clickhouse-client --query "CREATE TABLE test.hits_s3 (WatchID UInt64, JavaEnable UInt8, Title String, GoodEvent Int16, EventTime DateTime, EventDate Date, CounterID UInt32, ClientIP UInt32, ClientIP6 FixedString(16), RegionID UInt32, UserID UInt64, CounterClass Int8, OS UInt8, UserAgent UInt8, URL String, Referer String, URLDomain String, RefererDomain String, Refresh UInt8, IsRobot UInt8, RefererCategories Array(UInt16), URLCategories Array(UInt16), URLRegions Array(UInt32), RefererRegions Array(UInt32), ResolutionWidth UInt16, ResolutionHeight UInt16, ResolutionDepth UInt8, FlashMajor UInt8, FlashMinor UInt8, FlashMinor2 String, NetMajor UInt8, NetMinor UInt8, UserAgentMajor UInt16, UserAgentMinor FixedString(2), CookieEnable UInt8, JavascriptEnable UInt8, IsMobile UInt8, MobilePhone UInt8, MobilePhoneModel String, Params String, IPNetworkID UInt32, TraficSourceID Int8, SearchEngineID UInt16, SearchPhrase String, AdvEngineID UInt8, IsArtifical UInt8, WindowClientWidth UInt16, WindowClientHeight UInt16, ClientTimeZone Int16, ClientEventTime DateTime, SilverlightVersion1 UInt8, SilverlightVersion2 UInt8, SilverlightVersion3 UInt32, SilverlightVersion4 UInt16, PageCharset String, CodeVersion UInt32, IsLink UInt8, IsDownload UInt8, IsNotBounce UInt8, FUniqID UInt64, HID UInt32, IsOldCounter UInt8, IsEvent UInt8, IsParameter UInt8, DontCountHits UInt8, WithHash UInt8, HitColor FixedString(1), UTCEventTime DateTime, Age UInt8, Sex UInt8, Income UInt8, Interests UInt16, Robotness UInt8, GeneralInterests Array(UInt16), RemoteIP UInt32, RemoteIP6 FixedString(16), WindowName Int32, OpenerName Int32, HistoryLength Int16, BrowserLanguage FixedString(2), BrowserCountry FixedString(2), SocialNetwork String, SocialAction String, HTTPError UInt16, SendTiming Int32, DNSTiming Int32, ConnectTiming Int32, ResponseStartTiming Int32, ResponseEndTiming Int32, FetchTiming Int32, RedirectTiming Int32, DOMInteractiveTiming Int32, DOMContentLoadedTiming Int32, DOMCompleteTiming Int32, LoadEventStartTiming Int32, LoadEventEndTiming Int32, NSToDOMContentLoadedTiming Int32, FirstPaintTiming Int32, RedirectCount Int8, SocialSourceNetworkID UInt8, SocialSourcePage String, ParamPrice Int64, ParamOrderID String, ParamCurrency FixedString(3), ParamCurrencyID UInt16, GoalsReached Array(UInt32), OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID UInt8, RefererHash UInt64, URLHash UInt64, CLID UInt32, YCLID UInt64, ShareService String, ShareURL String, ShareTitle String, ParsedParams Nested(Key1 String, Key2 String, Key3 String, Key4 String, Key5 String, ValueDouble Float64), IslandID FixedString(16), RequestNum UInt32, RequestTry UInt8) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192, storage_policy='s3_cache'"
|
||||
clickhouse-client --query "INSERT INTO test.hits_s3 SELECT * FROM test.hits"
|
||||
|
||||
clickhouse-client --query "CREATE TABLE test.hits_s3 (WatchID UInt64, JavaEnable UInt8, Title String, GoodEvent Int16, EventTime DateTime, EventDate Date, CounterID UInt32, ClientIP UInt32, ClientIP6 FixedString(16), RegionID UInt32, UserID UInt64, CounterClass Int8, OS UInt8, UserAgent UInt8, URL String, Referer String, URLDomain String, RefererDomain String, Refresh UInt8, IsRobot UInt8, RefererCategories Array(UInt16), URLCategories Array(UInt16), URLRegions Array(UInt32), RefererRegions Array(UInt32), ResolutionWidth UInt16, ResolutionHeight UInt16, ResolutionDepth UInt8, FlashMajor UInt8, FlashMinor UInt8, FlashMinor2 String, NetMajor UInt8, NetMinor UInt8, UserAgentMajor UInt16, UserAgentMinor FixedString(2), CookieEnable UInt8, JavascriptEnable UInt8, IsMobile UInt8, MobilePhone UInt8, MobilePhoneModel String, Params String, IPNetworkID UInt32, TraficSourceID Int8, SearchEngineID UInt16, SearchPhrase String, AdvEngineID UInt8, IsArtifical UInt8, WindowClientWidth UInt16, WindowClientHeight UInt16, ClientTimeZone Int16, ClientEventTime DateTime, SilverlightVersion1 UInt8, SilverlightVersion2 UInt8, SilverlightVersion3 UInt32, SilverlightVersion4 UInt16, PageCharset String, CodeVersion UInt32, IsLink UInt8, IsDownload UInt8, IsNotBounce UInt8, FUniqID UInt64, HID UInt32, IsOldCounter UInt8, IsEvent UInt8, IsParameter UInt8, DontCountHits UInt8, WithHash UInt8, HitColor FixedString(1), UTCEventTime DateTime, Age UInt8, Sex UInt8, Income UInt8, Interests UInt16, Robotness UInt8, GeneralInterests Array(UInt16), RemoteIP UInt32, RemoteIP6 FixedString(16), WindowName Int32, OpenerName Int32, HistoryLength Int16, BrowserLanguage FixedString(2), BrowserCountry FixedString(2), SocialNetwork String, SocialAction String, HTTPError UInt16, SendTiming Int32, DNSTiming Int32, ConnectTiming Int32, ResponseStartTiming Int32, ResponseEndTiming Int32, FetchTiming Int32, RedirectTiming Int32, DOMInteractiveTiming Int32, DOMContentLoadedTiming Int32, DOMCompleteTiming Int32, LoadEventStartTiming Int32, LoadEventEndTiming Int32, NSToDOMContentLoadedTiming Int32, FirstPaintTiming Int32, RedirectCount Int8, SocialSourceNetworkID UInt8, SocialSourcePage String, ParamPrice Int64, ParamOrderID String, ParamCurrency FixedString(3), ParamCurrencyID UInt16, GoalsReached Array(UInt32), OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID UInt8, RefererHash UInt64, URLHash UInt64, CLID UInt32, YCLID UInt64, ShareService String, ShareURL String, ShareTitle String, ParsedParams Nested(Key1 String, Key2 String, Key3 String, Key4 String, Key5 String, ValueDouble Float64), IslandID FixedString(16), RequestNum UInt32, RequestTry UInt8) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192, storage_policy='s3_cache'"
|
||||
clickhouse-client --query "CREATE TABLE test.hits (WatchID UInt64, JavaEnable UInt8, Title String, GoodEvent Int16, EventTime DateTime, EventDate Date, CounterID UInt32, ClientIP UInt32, ClientIP6 FixedString(16), RegionID UInt32, UserID UInt64, CounterClass Int8, OS UInt8, UserAgent UInt8, URL String, Referer String, URLDomain String, RefererDomain String, Refresh UInt8, IsRobot UInt8, RefererCategories Array(UInt16), URLCategories Array(UInt16), URLRegions Array(UInt32), RefererRegions Array(UInt32), ResolutionWidth UInt16, ResolutionHeight UInt16, ResolutionDepth UInt8, FlashMajor UInt8, FlashMinor UInt8, FlashMinor2 String, NetMajor UInt8, NetMinor UInt8, UserAgentMajor UInt16, UserAgentMinor FixedString(2), CookieEnable UInt8, JavascriptEnable UInt8, IsMobile UInt8, MobilePhone UInt8, MobilePhoneModel String, Params String, IPNetworkID UInt32, TraficSourceID Int8, SearchEngineID UInt16, SearchPhrase String, AdvEngineID UInt8, IsArtifical UInt8, WindowClientWidth UInt16, WindowClientHeight UInt16, ClientTimeZone Int16, ClientEventTime DateTime, SilverlightVersion1 UInt8, SilverlightVersion2 UInt8, SilverlightVersion3 UInt32, SilverlightVersion4 UInt16, PageCharset String, CodeVersion UInt32, IsLink UInt8, IsDownload UInt8, IsNotBounce UInt8, FUniqID UInt64, HID UInt32, IsOldCounter UInt8, IsEvent UInt8, IsParameter UInt8, DontCountHits UInt8, WithHash UInt8, HitColor FixedString(1), UTCEventTime DateTime, Age UInt8, Sex UInt8, Income UInt8, Interests UInt16, Robotness UInt8, GeneralInterests Array(UInt16), RemoteIP UInt32, RemoteIP6 FixedString(16), WindowName Int32, OpenerName Int32, HistoryLength Int16, BrowserLanguage FixedString(2), BrowserCountry FixedString(2), SocialNetwork String, SocialAction String, HTTPError UInt16, SendTiming Int32, DNSTiming Int32, ConnectTiming Int32, ResponseStartTiming Int32, ResponseEndTiming Int32, FetchTiming Int32, RedirectTiming Int32, DOMInteractiveTiming Int32, DOMContentLoadedTiming Int32, DOMCompleteTiming Int32, LoadEventStartTiming Int32, LoadEventEndTiming Int32, NSToDOMContentLoadedTiming Int32, FirstPaintTiming Int32, RedirectCount Int8, SocialSourceNetworkID UInt8, SocialSourcePage String, ParamPrice Int64, ParamOrderID String, ParamCurrency FixedString(3), ParamCurrencyID UInt16, GoalsReached Array(UInt32), OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID UInt8, RefererHash UInt64, URLHash UInt64, CLID UInt32, YCLID UInt64, ShareService String, ShareURL String, ShareTitle String, ParsedParams Nested(Key1 String, Key2 String, Key3 String, Key4 String, Key5 String, ValueDouble Float64), IslandID FixedString(16), RequestNum UInt32, RequestTry UInt8) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192, storage_policy='s3_cache'"
|
||||
clickhouse-client --query "CREATE TABLE test.visits (CounterID UInt32, StartDate Date, Sign Int8, IsNew UInt8, VisitID UInt64, UserID UInt64, StartTime DateTime, Duration UInt32, UTCStartTime DateTime, PageViews Int32, Hits Int32, IsBounce UInt8, Referer String, StartURL String, RefererDomain String, StartURLDomain String, EndURL String, LinkURL String, IsDownload UInt8, TraficSourceID Int8, SearchEngineID UInt16, SearchPhrase String, AdvEngineID UInt8, PlaceID Int32, RefererCategories Array(UInt16), URLCategories Array(UInt16), URLRegions Array(UInt32), RefererRegions Array(UInt32), IsYandex UInt8, GoalReachesDepth Int32, GoalReachesURL Int32, GoalReachesAny Int32, SocialSourceNetworkID UInt8, SocialSourcePage String, MobilePhoneModel String, ClientEventTime DateTime, RegionID UInt32, ClientIP UInt32, ClientIP6 FixedString(16), RemoteIP UInt32, RemoteIP6 FixedString(16), IPNetworkID UInt32, SilverlightVersion3 UInt32, CodeVersion UInt32, ResolutionWidth UInt16, ResolutionHeight UInt16, UserAgentMajor UInt16, UserAgentMinor UInt16, WindowClientWidth UInt16, WindowClientHeight UInt16, SilverlightVersion2 UInt8, SilverlightVersion4 UInt16, FlashVersion3 UInt16, FlashVersion4 UInt16, ClientTimeZone Int16, OS UInt8, UserAgent UInt8, ResolutionDepth UInt8, FlashMajor UInt8, FlashMinor UInt8, NetMajor UInt8, NetMinor UInt8, MobilePhone UInt8, SilverlightVersion1 UInt8, Age UInt8, Sex UInt8, Income UInt8, JavaEnable UInt8, CookieEnable UInt8, JavascriptEnable UInt8, IsMobile UInt8, BrowserLanguage UInt16, BrowserCountry UInt16, Interests UInt16, Robotness UInt8, GeneralInterests Array(UInt16), Params Array(String), Goals Nested(ID UInt32, Serial UInt32, EventTime DateTime, Price Int64, OrderID String, CurrencyID UInt32), WatchIDs Array(UInt64), ParamSumPrice Int64, ParamCurrency FixedString(3), ParamCurrencyID UInt16, ClickLogID UInt64, ClickEventID Int32, ClickGoodEvent Int32, ClickEventTime DateTime, ClickPriorityID Int32, ClickPhraseID Int32, ClickPageID Int32, ClickPlaceID Int32, ClickTypeID Int32, ClickResourceID Int32, ClickCost UInt32, ClickClientIP UInt32, ClickDomainID UInt32, ClickURL String, ClickAttempt UInt8, ClickOrderID UInt32, ClickBannerID UInt32, ClickMarketCategoryID UInt32, ClickMarketPP UInt32, ClickMarketCategoryName String, ClickMarketPPName String, ClickAWAPSCampaignName String, ClickPageName String, ClickTargetType UInt16, ClickTargetPhraseID UInt64, ClickContextType UInt8, ClickSelectType Int8, ClickOptions String, ClickGroupBannerID Int32, OpenstatServiceName String, OpenstatCampaignID String, OpenstatAdID String, OpenstatSourceID String, UTMSource String, UTMMedium String, UTMCampaign String, UTMContent String, UTMTerm String, FromTag String, HasGCLID UInt8, FirstVisit DateTime, PredLastVisit Date, LastVisit Date, TotalVisits UInt32, TraficSource Nested(ID Int8, SearchEngineID UInt16, AdvEngineID UInt8, PlaceID UInt16, SocialSourceNetworkID UInt8, Domain String, SearchPhrase String, SocialSourcePage String), Attendance FixedString(16), CLID UInt32, YCLID UInt64, NormalizedRefererHash UInt64, SearchPhraseHash UInt64, RefererDomainHash UInt64, NormalizedStartURLHash UInt64, StartURLDomainHash UInt64, NormalizedEndURLHash UInt64, TopLevelDomain UInt64, URLScheme UInt64, OpenstatServiceNameHash UInt64, OpenstatCampaignIDHash UInt64, OpenstatAdIDHash UInt64, OpenstatSourceIDHash UInt64, UTMSourceHash UInt64, UTMMediumHash UInt64, UTMCampaignHash UInt64, UTMContentHash UInt64, UTMTermHash UInt64, FromHash UInt64, WebVisorEnabled UInt8, WebVisorActivity UInt32, ParsedParams Nested(Key1 String, Key2 String, Key3 String, Key4 String, Key5 String, ValueDouble Float64), Market Nested(Type UInt8, GoalID UInt32, OrderID String, OrderPrice Int64, PP UInt32, DirectPlaceID UInt32, DirectOrderID UInt32, DirectBannerID UInt32, GoodID String, GoodName String, GoodQuantity Int32, GoodPrice Int64), IslandID FixedString(16)) ENGINE = CollapsingMergeTree(Sign) PARTITION BY toYYYYMM(StartDate) ORDER BY (CounterID, StartDate, intHash32(UserID), VisitID) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192, storage_policy='s3_cache'"
|
||||
|
||||
clickhouse-client --query "INSERT INTO test.hits_s3 SELECT * FROM datasets.hits_v1 SETTINGS enable_filesystem_cache_on_write_operations=0"
|
||||
clickhouse-client --query "INSERT INTO test.hits SELECT * FROM datasets.hits_v1 SETTINGS enable_filesystem_cache_on_write_operations=0"
|
||||
clickhouse-client --query "INSERT INTO test.visits SELECT * FROM datasets.visits_v1 SETTINGS enable_filesystem_cache_on_write_operations=0"
|
||||
|
||||
clickhouse-client --query "DROP TABLE datasets.visits_v1 SYNC"
|
||||
clickhouse-client --query "DROP TABLE datasets.hits_v1 SYNC"
|
||||
|
||||
clickhouse-client --query "SHOW TABLES FROM test"
|
||||
|
||||
clickhouse-client --query "SYSTEM STOP THREAD FUZZER"
|
||||
|
||||
stop
|
||||
|
||||
# Let's enable S3 storage by default
|
||||
export USE_S3_STORAGE_FOR_MERGE_TREE=1
|
||||
configure
|
||||
|
||||
# But we still need default disk because some tables loaded only into it
|
||||
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml | sed "s|<disk>s3</disk>|<disk>s3</disk><disk>default</disk>|" > /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp
|
||||
mv /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
|
||||
sudo chown clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
|
||||
sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
|
||||
|
||||
start
|
||||
|
||||
./stress --hung-check --drop-databases --output-folder test_output --skip-func-tests "$SKIP_TESTS_OPTION" \
|
||||
&& echo -e 'Test script exit code\tOK' >> /test_output/test_results.tsv \
|
||||
|| echo -e 'Test script failed\tFAIL' >> /test_output/test_results.tsv
|
||||
@ -256,6 +281,14 @@ zgrep -Fa "Code: 49, e.displayText() = DB::Exception:" /var/log/clickhouse-serve
|
||||
# Remove file logical_errors.txt if it's empty
|
||||
[ -s /test_output/logical_errors.txt ] || rm /test_output/logical_errors.txt
|
||||
|
||||
# No such key errors
|
||||
zgrep -Ea "Code: 499.*The specified key does not exist" /var/log/clickhouse-server/clickhouse-server*.log > /test_output/no_such_key_errors.txt \
|
||||
&& echo -e 'S3_ERROR No such key thrown (see clickhouse-server.log or no_such_key_errors.txt)\tFAIL' >> /test_output/test_results.tsv \
|
||||
|| echo -e 'No lost s3 keys\tOK' >> /test_output/test_results.tsv
|
||||
|
||||
# Remove file no_such_key_errors.txt if it's empty
|
||||
[ -s /test_output/no_such_key_errors.txt ] || rm /test_output/no_such_key_errors.txt
|
||||
|
||||
# Crash
|
||||
zgrep -Fa "########################################" /var/log/clickhouse-server/clickhouse-server*.log > /dev/null \
|
||||
&& echo -e 'Killed by signal (in clickhouse-server.log)\tFAIL' >> /test_output/test_results.tsv \
|
||||
|
@ -29,7 +29,7 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
|
||||
size_t max_single_download_retries_,
|
||||
bool use_external_buffer_,
|
||||
size_t read_until_position_)
|
||||
: ReadBufferFromFileBase(read_settings_.remote_fs_buffer_size, nullptr, 0)
|
||||
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : read_settings_.remote_fs_buffer_size, nullptr, 0)
|
||||
, blob_container_client(blob_container_client_)
|
||||
, path(path_)
|
||||
, max_single_read_retries(max_single_read_retries_)
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include "ReadIndirectBufferFromRemoteFS.h"
|
||||
|
||||
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -13,8 +14,8 @@ namespace ErrorCodes
|
||||
|
||||
|
||||
ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS(
|
||||
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_)
|
||||
: ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
|
||||
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_, const ReadSettings & settings)
|
||||
: ReadBufferFromFileBase(settings.remote_fs_buffer_size, nullptr, 0)
|
||||
, impl(impl_)
|
||||
{
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ namespace DB
|
||||
{
|
||||
|
||||
class ReadBufferFromRemoteFSGather;
|
||||
struct ReadSettings;
|
||||
|
||||
/**
|
||||
* Reads data from S3/HDFS/Web using stored paths in metadata.
|
||||
@ -18,7 +19,7 @@ class ReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
|
||||
{
|
||||
|
||||
public:
|
||||
explicit ReadIndirectBufferFromRemoteFS(std::shared_ptr<ReadBufferFromRemoteFSGather> impl_);
|
||||
explicit ReadIndirectBufferFromRemoteFS(std::shared_ptr<ReadBufferFromRemoteFSGather> impl_, const ReadSettings & settings);
|
||||
|
||||
off_t seek(off_t offset_, int whence) override;
|
||||
|
||||
|
@ -112,7 +112,7 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
|
||||
}
|
||||
else
|
||||
{
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(reader_impl));
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(reader_impl), disk_read_settings);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings_ptr->min_bytes_for_seek);
|
||||
}
|
||||
}
|
||||
|
@ -253,6 +253,13 @@ void DiskObjectStorage::removeSharedFile(const String & path, bool delete_metada
|
||||
transaction->commit();
|
||||
}
|
||||
|
||||
void DiskObjectStorage::removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
|
||||
{
|
||||
auto transaction = createObjectStorageTransaction();
|
||||
transaction->removeSharedFiles(files, keep_all_batch_data, file_names_remove_metadata_only);
|
||||
transaction->commit();
|
||||
}
|
||||
|
||||
UInt32 DiskObjectStorage::getRefCount(const String & path) const
|
||||
{
|
||||
return metadata_storage->getHardlinkCount(path);
|
||||
|
@ -92,6 +92,8 @@ public:
|
||||
|
||||
void removeSharedRecursive(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
|
||||
|
||||
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
|
||||
|
||||
MetadataStoragePtr getMetadataStorage() override { return metadata_storage; }
|
||||
|
||||
UInt32 getRefCount(const String & path) const override;
|
||||
|
@ -5,7 +5,6 @@
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -139,6 +138,87 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
}
|
||||
};
|
||||
|
||||
struct RemoveManyObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
{
|
||||
RemoveBatchRequest remove_paths;
|
||||
bool keep_all_batch_data;
|
||||
NameSet file_names_remove_metadata_only;
|
||||
StoredObjects objects_to_remove;
|
||||
bool remove_from_cache = false;
|
||||
|
||||
RemoveManyObjectStorageOperation(
|
||||
IObjectStorage & object_storage_,
|
||||
IMetadataStorage & metadata_storage_,
|
||||
const RemoveBatchRequest & remove_paths_,
|
||||
bool keep_all_batch_data_,
|
||||
const NameSet & file_names_remove_metadata_only_)
|
||||
: IDiskObjectStorageOperation(object_storage_, metadata_storage_)
|
||||
, remove_paths(remove_paths_)
|
||||
, keep_all_batch_data(keep_all_batch_data_)
|
||||
, file_names_remove_metadata_only(file_names_remove_metadata_only_)
|
||||
{}
|
||||
|
||||
std::string getInfoForLog() const override
|
||||
{
|
||||
return fmt::format("RemoveManyObjectStorageOperation (paths size: {}, keep all batch {}, files to keep {})", remove_paths.size(), keep_all_batch_data, fmt::join(file_names_remove_metadata_only, ", "));
|
||||
}
|
||||
|
||||
void execute(MetadataTransactionPtr tx) override
|
||||
{
|
||||
for (const auto & [path, if_exists] : remove_paths)
|
||||
{
|
||||
|
||||
if (!metadata_storage.exists(path))
|
||||
{
|
||||
if (if_exists)
|
||||
continue;
|
||||
|
||||
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Metadata path '{}' doesn't exist", path);
|
||||
}
|
||||
|
||||
if (!metadata_storage.isFile(path))
|
||||
throw Exception(ErrorCodes::BAD_FILE_TYPE, "Path '{}' is not a regular file", path);
|
||||
|
||||
try
|
||||
{
|
||||
uint32_t hardlink_count = metadata_storage.getHardlinkCount(path);
|
||||
auto objects = metadata_storage.getStorageObjects(path);
|
||||
|
||||
tx->unlinkMetadata(path);
|
||||
|
||||
/// File is really redundant
|
||||
if (hardlink_count == 0 && !keep_all_batch_data && !file_names_remove_metadata_only.contains(fs::path(path).filename()))
|
||||
objects_to_remove.insert(objects_to_remove.end(), objects.begin(), objects.end());
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/// If it's impossible to read meta - just remove it from FS.
|
||||
if (e.code() == ErrorCodes::UNKNOWN_FORMAT
|
||||
|| e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
|
||||
|| e.code() == ErrorCodes::CANNOT_READ_ALL_DATA
|
||||
|| e.code() == ErrorCodes::CANNOT_OPEN_FILE)
|
||||
{
|
||||
tx->unlinkFile(path);
|
||||
}
|
||||
else
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void undo() override
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
void finalize() override
|
||||
{
|
||||
if (!objects_to_remove.empty())
|
||||
object_storage.removeObjects(objects_to_remove);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
{
|
||||
std::string path;
|
||||
@ -480,14 +560,8 @@ void DiskObjectStorageTransaction::removeFileIfExists(const std::string & path)
|
||||
void DiskObjectStorageTransaction::removeSharedFiles(
|
||||
const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
|
||||
{
|
||||
for (const auto & file : files)
|
||||
{
|
||||
bool keep_file = keep_all_batch_data || file_names_remove_metadata_only.contains(fs::path(file.path).filename());
|
||||
if (file.if_exists)
|
||||
removeSharedFileIfExists(file.path, keep_file);
|
||||
else
|
||||
removeSharedFile(file.path, keep_file);
|
||||
}
|
||||
auto operation = std::make_unique<RemoveManyObjectStorageOperation>(object_storage, metadata_storage, files, keep_all_batch_data, file_names_remove_metadata_only);
|
||||
operations_to_execute.emplace_back(std::move(operation));
|
||||
}
|
||||
|
||||
namespace
|
||||
|
@ -70,11 +70,12 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
|
||||
auto hdfs_path = path.substr(begin_of_path);
|
||||
auto hdfs_uri = path.substr(0, begin_of_path);
|
||||
|
||||
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, hdfs_path, config, disk_read_settings);
|
||||
return std::make_unique<ReadBufferFromHDFS>(
|
||||
hdfs_uri, hdfs_path, config, disk_read_settings, /* read_until_position */0, /* use_external_buffer */true);
|
||||
};
|
||||
|
||||
auto hdfs_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, disk_read_settings);
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl));
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl), read_settings);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
|
||||
}
|
||||
|
||||
|
@ -157,7 +157,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
}
|
||||
else
|
||||
{
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(s3_impl));
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(s3_impl), disk_read_settings);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings_ptr->min_bytes_for_seek);
|
||||
}
|
||||
}
|
||||
@ -245,6 +245,8 @@ void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exis
|
||||
auto outcome = client_ptr->DeleteObject(request);
|
||||
|
||||
throwIfUnexpectedError(outcome, if_exists);
|
||||
|
||||
LOG_TRACE(log, "Object with path {} was removed from S3", object.absolute_path);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_exists)
|
||||
@ -288,6 +290,8 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
|
||||
auto outcome = client_ptr->DeleteObjects(request);
|
||||
|
||||
throwIfUnexpectedError(outcome, if_exists);
|
||||
|
||||
LOG_TRACE(log, "Objects with paths [{}] were removed from S3", keys);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <aws/s3/model/ListObjectsV2Result.h>
|
||||
#include <Storages/StorageS3Settings.h>
|
||||
#include <Common/MultiVersion.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -180,6 +181,7 @@ private:
|
||||
|
||||
const String version_id;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("S3ObjectStorage");
|
||||
DataSourceDescription data_source_description;
|
||||
};
|
||||
|
||||
|
@ -188,7 +188,7 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
|
||||
}
|
||||
else
|
||||
{
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(web_impl));
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(web_impl), read_settings);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), min_bytes_for_seek);
|
||||
}
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ ReadBufferFromS3::ReadBufferFromS3(
|
||||
size_t offset_,
|
||||
size_t read_until_position_,
|
||||
bool restricted_seek_)
|
||||
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
|
||||
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : settings_.remote_fs_buffer_size, nullptr, 0)
|
||||
, client_ptr(std::move(client_ptr_))
|
||||
, bucket(bucket_)
|
||||
, key(key_)
|
||||
|
@ -41,8 +41,9 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
const std::string & hdfs_file_path_,
|
||||
const Poco::Util::AbstractConfiguration & config_,
|
||||
const ReadSettings & read_settings_,
|
||||
size_t read_until_position_)
|
||||
: BufferWithOwnMemory<SeekableReadBuffer>(read_settings_.remote_fs_buffer_size)
|
||||
size_t read_until_position_,
|
||||
bool use_external_buffer_)
|
||||
: BufferWithOwnMemory<SeekableReadBuffer>(use_external_buffer_ ? 0 : read_settings_.remote_fs_buffer_size)
|
||||
, hdfs_uri(hdfs_uri_)
|
||||
, hdfs_file_path(hdfs_file_path_)
|
||||
, builder(createHDFSBuilder(hdfs_uri_, config_))
|
||||
@ -132,10 +133,12 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(
|
||||
const String & hdfs_file_path_,
|
||||
const Poco::Util::AbstractConfiguration & config_,
|
||||
const ReadSettings & read_settings_,
|
||||
size_t read_until_position_)
|
||||
size_t read_until_position_,
|
||||
bool use_external_buffer_)
|
||||
: ReadBufferFromFileBase(read_settings_.remote_fs_buffer_size, nullptr, 0)
|
||||
, impl(std::make_unique<ReadBufferFromHDFSImpl>(
|
||||
hdfs_uri_, hdfs_file_path_, config_, read_settings_, read_until_position_))
|
||||
hdfs_uri_, hdfs_file_path_, config_, read_settings_, read_until_position_, use_external_buffer_))
|
||||
, use_external_buffer(use_external_buffer_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -146,7 +149,18 @@ size_t ReadBufferFromHDFS::getFileSize()
|
||||
|
||||
bool ReadBufferFromHDFS::nextImpl()
|
||||
{
|
||||
impl->position() = impl->buffer().begin() + offset();
|
||||
if (use_external_buffer)
|
||||
{
|
||||
impl->set(internal_buffer.begin(), internal_buffer.size());
|
||||
assert(working_buffer.begin() != nullptr);
|
||||
assert(!internal_buffer.empty());
|
||||
}
|
||||
else
|
||||
{
|
||||
impl->position() = impl->buffer().begin() + offset();
|
||||
assert(!impl->hasPendingData());
|
||||
}
|
||||
|
||||
auto result = impl->next();
|
||||
|
||||
if (result)
|
||||
|
@ -29,7 +29,8 @@ public:
|
||||
const String & hdfs_file_path_,
|
||||
const Poco::Util::AbstractConfiguration & config_,
|
||||
const ReadSettings & read_settings_,
|
||||
size_t read_until_position_ = 0);
|
||||
size_t read_until_position_ = 0,
|
||||
bool use_external_buffer = false);
|
||||
|
||||
~ReadBufferFromHDFS() override;
|
||||
|
||||
@ -49,6 +50,7 @@ public:
|
||||
|
||||
private:
|
||||
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
|
||||
bool use_external_buffer;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -532,13 +532,34 @@ void IMergeTreeDataPart::removeIfNeeded()
|
||||
LOG_TRACE(storage.log, "Removed part from old location {}", path);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
catch (const Exception & ex)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while removing part {} with path {}", name, path));
|
||||
|
||||
/// In this case we want to avoid assertions, because such errors are unavoidable in setup
|
||||
/// with zero-copy replication.
|
||||
if (const auto * keeper_exception = dynamic_cast<const Coordination::Exception *>(&ex))
|
||||
{
|
||||
if (Coordination::isHardwareError(keeper_exception->code))
|
||||
return;
|
||||
}
|
||||
|
||||
/// FIXME If part it temporary, then directory will not be removed for 1 day (temporary_directories_lifetime).
|
||||
/// If it's tmp_merge_<part_name> or tmp_fetch_<part_name>,
|
||||
/// then all future attempts to execute part producing operation will fail with "directory already exists".
|
||||
/// Seems like it's especially important for remote disks, because removal may fail due to network issues.
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__, "while removiong path: " + path);
|
||||
assert(!is_temp);
|
||||
assert(state != MergeTreeDataPartState::DeleteOnDestroy);
|
||||
assert(state != MergeTreeDataPartState::Temporary);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while removing part {} with path {}", name, path));
|
||||
|
||||
/// FIXME If part it temporary, then directory will not be removed for 1 day (temporary_directories_lifetime).
|
||||
/// If it's tmp_merge_<part_name> or tmp_fetch_<part_name>,
|
||||
/// then all future attempts to execute part producing operation will fail with "directory already exists".
|
||||
///
|
||||
/// For remote disks this issue is really frequent, so we don't about server here
|
||||
assert(!is_temp);
|
||||
assert(state != MergeTreeDataPartState::DeleteOnDestroy);
|
||||
assert(state != MergeTreeDataPartState::Temporary);
|
||||
|
@ -1046,29 +1046,43 @@ void MergeTreeData::loadDataPartsFromDisk(
|
||||
throw;
|
||||
|
||||
broken = true;
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while loading part {} on path {}", part->name, part_path));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
broken = true;
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while loading part {} on path {}", part->name, part_path));
|
||||
}
|
||||
|
||||
/// Ignore broken parts that can appear as a result of hard server restart.
|
||||
if (broken)
|
||||
{
|
||||
/// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist
|
||||
size_t size_of_part = data_part_storage->calculateTotalSizeOnDisk();
|
||||
std::optional<size_t> size_of_part;
|
||||
try
|
||||
{
|
||||
/// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist
|
||||
size_of_part = data_part_storage->calculateTotalSizeOnDisk();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while calculating part size {} on path {}", part->name, part_path));
|
||||
}
|
||||
|
||||
std::string part_size_str = "failed to calculate size";
|
||||
if (size_of_part.has_value())
|
||||
part_size_str = formatReadableSizeWithBinarySuffix(*size_of_part);
|
||||
|
||||
|
||||
LOG_ERROR(log,
|
||||
"Detaching broken part {}{} (size: {}). "
|
||||
"If it happened after update, it is likely because of backward incompability. "
|
||||
"You need to resolve this manually",
|
||||
getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part));
|
||||
getFullPathOnDisk(part_disk_ptr), part_name, part_size_str);
|
||||
std::lock_guard loading_lock(mutex);
|
||||
broken_parts_to_detach.push_back(part);
|
||||
++suspicious_broken_parts;
|
||||
suspicious_broken_parts_bytes += size_of_part;
|
||||
if (size_of_part.has_value())
|
||||
suspicious_broken_parts_bytes += *size_of_part;
|
||||
return;
|
||||
}
|
||||
if (!part->index_granularity_info.is_adaptive)
|
||||
@ -1177,14 +1191,10 @@ void MergeTreeData::loadDataPartsFromDisk(
|
||||
void MergeTreeData::loadDataPartsFromWAL(
|
||||
DataPartsVector & /* broken_parts_to_detach */,
|
||||
DataPartsVector & duplicate_parts_to_remove,
|
||||
MutableDataPartsVector & parts_from_wal,
|
||||
DataPartsLock & part_lock)
|
||||
MutableDataPartsVector & parts_from_wal)
|
||||
{
|
||||
for (auto & part : parts_from_wal)
|
||||
{
|
||||
if (getActiveContainingPart(part->info, DataPartState::Active, part_lock))
|
||||
continue;
|
||||
|
||||
part->modification_time = time(nullptr);
|
||||
/// Assume that all parts are Active, covered parts will be detected and marked as Outdated later
|
||||
part->setState(DataPartState::Active);
|
||||
@ -1212,7 +1222,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
const auto settings = getSettings();
|
||||
MutableDataPartsVector parts_from_wal;
|
||||
Strings part_file_names;
|
||||
|
||||
auto disks = getStoragePolicy()->getDisks();
|
||||
@ -1269,16 +1278,14 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
|
||||
/// Collect part names by disk.
|
||||
std::map<String, std::vector<std::pair<String, DiskPtr>>> disk_part_map;
|
||||
std::map<String, MutableDataPartsVector> disk_wal_part_map;
|
||||
ThreadPool pool(disks.size());
|
||||
std::mutex wal_init_lock;
|
||||
|
||||
for (const auto & disk_ptr : disks)
|
||||
{
|
||||
if (disk_ptr->isBroken())
|
||||
continue;
|
||||
|
||||
auto & disk_parts = disk_part_map[disk_ptr->getName()];
|
||||
auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()];
|
||||
|
||||
pool.scheduleOrThrowOnError([&, disk_ptr]()
|
||||
{
|
||||
@ -1291,34 +1298,11 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
|
||||
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
|
||||
disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr));
|
||||
else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal)
|
||||
{
|
||||
std::lock_guard lock(wal_init_lock);
|
||||
if (write_ahead_log != nullptr)
|
||||
throw Exception(
|
||||
"There are multiple WAL files appeared in current storage policy. You need to resolve this manually",
|
||||
ErrorCodes::CORRUPTED_DATA);
|
||||
|
||||
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
|
||||
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
|
||||
disk_wal_parts.push_back(std::move(part));
|
||||
}
|
||||
else if (settings->in_memory_parts_enable_wal)
|
||||
{
|
||||
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
|
||||
for (auto && part : wal.restore(metadata_snapshot, getContext()))
|
||||
disk_wal_parts.push_back(std::move(part));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pool.wait();
|
||||
|
||||
for (auto & [_, disk_wal_parts] : disk_wal_part_map)
|
||||
parts_from_wal.insert(
|
||||
parts_from_wal.end(), std::make_move_iterator(disk_wal_parts.begin()), std::make_move_iterator(disk_wal_parts.end()));
|
||||
|
||||
size_t num_parts = 0;
|
||||
std::queue<std::vector<std::pair<String, DiskPtr>>> parts_queue;
|
||||
for (auto & [_, disk_parts] : disk_part_map)
|
||||
@ -1332,13 +1316,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
auto part_lock = lockParts();
|
||||
data_parts_indexes.clear();
|
||||
|
||||
if (num_parts == 0 && parts_from_wal.empty())
|
||||
{
|
||||
resetObjectColumnsFromActiveParts(part_lock);
|
||||
LOG_DEBUG(log, "There are no data parts");
|
||||
return;
|
||||
}
|
||||
|
||||
DataPartsVector broken_parts_to_detach;
|
||||
DataPartsVector duplicate_parts_to_remove;
|
||||
|
||||
@ -1346,8 +1323,65 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
loadDataPartsFromDisk(
|
||||
broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings);
|
||||
|
||||
if (!parts_from_wal.empty())
|
||||
loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal, part_lock);
|
||||
if (settings->in_memory_parts_enable_wal)
|
||||
{
|
||||
std::map<String, MutableDataPartsVector> disk_wal_part_map;
|
||||
|
||||
std::mutex wal_init_lock;
|
||||
for (const auto & disk_ptr : disks)
|
||||
{
|
||||
if (disk_ptr->isBroken())
|
||||
continue;
|
||||
|
||||
auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()];
|
||||
|
||||
pool.scheduleOrThrowOnError([&, disk_ptr]()
|
||||
{
|
||||
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
|
||||
{
|
||||
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
|
||||
continue;
|
||||
|
||||
if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME)
|
||||
{
|
||||
std::lock_guard lock(wal_init_lock);
|
||||
if (write_ahead_log != nullptr)
|
||||
throw Exception(
|
||||
"There are multiple WAL files appeared in current storage policy. You need to resolve this manually",
|
||||
ErrorCodes::CORRUPTED_DATA);
|
||||
|
||||
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
|
||||
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext(), part_lock))
|
||||
disk_wal_parts.push_back(std::move(part));
|
||||
}
|
||||
else
|
||||
{
|
||||
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
|
||||
for (auto && part : wal.restore(metadata_snapshot, getContext(), part_lock))
|
||||
disk_wal_parts.push_back(std::move(part));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pool.wait();
|
||||
|
||||
MutableDataPartsVector parts_from_wal;
|
||||
for (auto & [_, disk_wal_parts] : disk_wal_part_map)
|
||||
parts_from_wal.insert(
|
||||
parts_from_wal.end(), std::make_move_iterator(disk_wal_parts.begin()), std::make_move_iterator(disk_wal_parts.end()));
|
||||
|
||||
loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal);
|
||||
|
||||
num_parts += parts_from_wal.size();
|
||||
}
|
||||
|
||||
if (num_parts == 0)
|
||||
{
|
||||
resetObjectColumnsFromActiveParts(part_lock);
|
||||
LOG_DEBUG(log, "There are no data parts");
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto & part : broken_parts_to_detach)
|
||||
{
|
||||
|
@ -1339,8 +1339,7 @@ private:
|
||||
void loadDataPartsFromWAL(
|
||||
DataPartsVector & broken_parts_to_detach,
|
||||
DataPartsVector & duplicate_parts_to_remove,
|
||||
MutableDataPartsVector & parts_from_wal,
|
||||
DataPartsLock & part_lock);
|
||||
MutableDataPartsVector & parts_from_wal);
|
||||
|
||||
void resetObjectColumnsFromActiveParts(const DataPartsLock & lock);
|
||||
void updateObjectColumns(const DataPartPtr & part, const DataPartsLock & lock);
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataWriter.h>
|
||||
#include <Storages/MergeTree/MergedBlockOutputStream.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartState.h>
|
||||
#include <IO/MemoryReadWriteBuffer.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/copyData.h>
|
||||
@ -122,7 +123,10 @@ void MergeTreeWriteAheadLog::rotate(const std::unique_lock<std::mutex> &)
|
||||
init();
|
||||
}
|
||||
|
||||
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
|
||||
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
ContextPtr context,
|
||||
std::unique_lock<std::mutex> & parts_lock)
|
||||
{
|
||||
std::unique_lock lock(write_mutex);
|
||||
|
||||
@ -172,6 +176,9 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
|
||||
part->uuid = metadata.part_uuid;
|
||||
|
||||
block = block_in.read();
|
||||
|
||||
if (storage.getActiveContainingPart(part->info, MergeTreeDataPartState::Active, parts_lock))
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -238,6 +245,15 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
|
||||
std::copy_if(parts.begin(), parts.end(), std::back_inserter(result),
|
||||
[&dropped_parts](const auto & part) { return dropped_parts.count(part->name) == 0; });
|
||||
|
||||
/// All parts in WAL had been already committed into the disk -> clear the WAL
|
||||
if (result.empty())
|
||||
{
|
||||
LOG_DEBUG(log, "WAL file '{}' had been completely processed. Removing.", path);
|
||||
disk->removeFile(path);
|
||||
init();
|
||||
return {};
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -62,7 +62,10 @@ public:
|
||||
|
||||
void addPart(DataPartInMemoryPtr & part);
|
||||
void dropPart(const String & part_name);
|
||||
std::vector<MergeTreeMutableDataPartPtr> restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
|
||||
std::vector<MergeTreeMutableDataPartPtr> restore(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
ContextPtr context,
|
||||
std::unique_lock<std::mutex> & parts_lock);
|
||||
|
||||
using MinMaxBlockNumber = std::pair<Int64, Int64>;
|
||||
static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename);
|
||||
|
@ -111,5 +111,9 @@ for i in $(seq $REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT --query "SELECT * FROM system.mutations WHERE is_done = 0 and table = 'concurrent_alter_add_drop_$i'"
|
||||
$CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.replication_queue WHERE table = 'concurrent_alter_add_drop_$i'"
|
||||
$CLICKHOUSE_CLIENT --query "SELECT * FROM system.replication_queue WHERE table = 'concurrent_alter_add_drop_$i' and (type = 'ALTER_METADATA' or type = 'MUTATE_PART')"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DETACH TABLE concurrent_alter_add_drop_$i"
|
||||
$CLICKHOUSE_CLIENT --query "ATTACH TABLE concurrent_alter_add_drop_$i"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_add_drop_$i"
|
||||
done
|
||||
|
@ -0,0 +1,35 @@
|
||||
-- { echo }
|
||||
|
||||
DROP TABLE IF EXISTS in_memory;
|
||||
CREATE TABLE in_memory (a UInt32) ENGINE = MergeTree ORDER BY a SETTINGS min_rows_for_compact_part = 1000, min_bytes_for_wide_part = 10485760;
|
||||
INSERT INTO in_memory VALUES (1);
|
||||
INSERT INTO in_memory VALUES (2);
|
||||
SELECT name, active, part_type FROM system.parts WHERE database = currentDatabase() AND table = 'in_memory';
|
||||
all_1_1_0 1 InMemory
|
||||
all_2_2_0 1 InMemory
|
||||
SELECT * FROM in_memory ORDER BY a;
|
||||
1
|
||||
2
|
||||
-- no WAL remove since parts are still in use
|
||||
DETACH TABLE in_memory;
|
||||
ATTACH TABLE in_memory;
|
||||
SELECT name, active, part_type FROM system.parts WHERE database = currentDatabase() AND table = 'in_memory';
|
||||
all_1_1_0 1 InMemory
|
||||
all_2_2_0 1 InMemory
|
||||
SELECT * FROM in_memory ORDER BY a;
|
||||
1
|
||||
2
|
||||
-- WAL should be removed, since on disk part covers all parts in WAL
|
||||
OPTIMIZE TABLE in_memory;
|
||||
DETACH TABLE in_memory;
|
||||
ATTACH TABLE in_memory;
|
||||
SELECT name, active, part_type FROM system.parts WHERE database = currentDatabase() AND table = 'in_memory';
|
||||
all_1_2_1 1 Compact
|
||||
-- check that the WAL will be reinitialized after remove
|
||||
INSERT INTO in_memory VALUES (3);
|
||||
DETACH TABLE in_memory;
|
||||
ATTACH TABLE in_memory;
|
||||
SELECT * FROM in_memory ORDER BY a;
|
||||
1
|
||||
2
|
||||
3
|
27
tests/queries/0_stateless/02410_inmemory_wal_cleanup.sql
Normal file
27
tests/queries/0_stateless/02410_inmemory_wal_cleanup.sql
Normal file
@ -0,0 +1,27 @@
|
||||
-- { echo }
|
||||
|
||||
DROP TABLE IF EXISTS in_memory;
|
||||
|
||||
CREATE TABLE in_memory (a UInt32) ENGINE = MergeTree ORDER BY a SETTINGS min_rows_for_compact_part = 1000, min_bytes_for_wide_part = 10485760;
|
||||
INSERT INTO in_memory VALUES (1);
|
||||
INSERT INTO in_memory VALUES (2);
|
||||
SELECT name, active, part_type FROM system.parts WHERE database = currentDatabase() AND table = 'in_memory';
|
||||
SELECT * FROM in_memory ORDER BY a;
|
||||
|
||||
-- no WAL remove since parts are still in use
|
||||
DETACH TABLE in_memory;
|
||||
ATTACH TABLE in_memory;
|
||||
SELECT name, active, part_type FROM system.parts WHERE database = currentDatabase() AND table = 'in_memory';
|
||||
SELECT * FROM in_memory ORDER BY a;
|
||||
|
||||
-- WAL should be removed, since on disk part covers all parts in WAL
|
||||
OPTIMIZE TABLE in_memory;
|
||||
DETACH TABLE in_memory;
|
||||
ATTACH TABLE in_memory;
|
||||
SELECT name, active, part_type FROM system.parts WHERE database = currentDatabase() AND table = 'in_memory';
|
||||
|
||||
-- check that the WAL will be reinitialized after remove
|
||||
INSERT INTO in_memory VALUES (3);
|
||||
DETACH TABLE in_memory;
|
||||
ATTACH TABLE in_memory;
|
||||
SELECT * FROM in_memory ORDER BY a;
|
Loading…
Reference in New Issue
Block a user