Merge remote-tracking branch 'origin/fix-thread-status' into fix-thread-status

This commit is contained in:
kssenii 2022-09-26 12:51:22 +02:00
commit e4d6afa884
16 changed files with 105 additions and 30 deletions

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="22.8.5.29"
ARG VERSION="22.9.2.7"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -21,7 +21,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="22.8.5.29"
ARG VERSION="22.9.2.7"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -243,7 +243,7 @@ export USE_S3_STORAGE_FOR_MERGE_TREE=1
configure
# But we still need default disk because some tables loaded only into it
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml | sed "s|<disk>s3</disk>|<disk>s3</disk><disk>default</disk>|" > /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml | sed "s|<main><disk>s3</disk></main>|<main><disk>s3</disk></main><default><disk>default</disk></default>|" > /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp
mv /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
sudo chown clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml

View File

@ -0,0 +1,20 @@
---
sidebar_position: 1
sidebar_label: 2022
---
# 2022 Changelog
### ClickHouse release v22.9.2.7-stable (362e2cefcef) FIXME as compared to v22.9.1.2603-stable (3030d4c7ff0)
#### Improvement
* Backported in [#41709](https://github.com/ClickHouse/ClickHouse/issues/41709): Check file path for path traversal attacks in errors logger for input formats. [#41694](https://github.com/ClickHouse/ClickHouse/pull/41694) ([Kruglov Pavel](https://github.com/Avogar)).
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
* Backported in [#41696](https://github.com/ClickHouse/ClickHouse/issues/41696): Fixes issue when docker run will fail if "https_port" is not present in config. [#41693](https://github.com/ClickHouse/ClickHouse/pull/41693) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Fix typos in JSON formats after [#40910](https://github.com/ClickHouse/ClickHouse/issues/40910) [#41614](https://github.com/ClickHouse/ClickHouse/pull/41614) ([Kruglov Pavel](https://github.com/Avogar)).

View File

@ -240,7 +240,6 @@ CachedOnDiskReadBufferFromFile::ImplementationBufferPtr
CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & file_segment)
{
auto download_state = file_segment->state();
LOG_TEST(log, "getReadBufferForFileSegment: {}", file_segment->getInfoForLog());
if (settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
{
@ -251,7 +250,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
}
else
{
LOG_DEBUG(log, "Bypassing cache because `read_from_filesystem_cache_if_exists_otherwise_bypass_cache` option is used");
LOG_TEST(log, "Bypassing cache because `read_from_filesystem_cache_if_exists_otherwise_bypass_cache` option is used");
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
return getRemoteFSReadBuffer(*file_segment, read_type);
}
@ -263,7 +262,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
{
case FileSegment::State::SKIP_CACHE:
{
LOG_DEBUG(log, "Bypassing cache because file segment state is `SKIP_CACHE`");
LOG_TRACE(log, "Bypassing cache because file segment state is `SKIP_CACHE`");
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
return getRemoteFSReadBuffer(*file_segment, read_type);
}
@ -358,7 +357,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
}
else
{
LOG_DEBUG(
LOG_TRACE(
log,
"Bypassing cache because file segment state is `PARTIALLY_DOWNLOADED_NO_CONTINUATION` and downloaded part already used");
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
@ -658,7 +657,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegmentPtr & file_segment)
implementation_buffer->setReadUntilPosition(file_segment->range().right + 1); /// [..., range.right]
implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET);
LOG_TEST(
LOG_TRACE(
log,
"Predownload failed because of space limit. "
"Will read from remote filesystem starting from offset: {}",
@ -786,10 +785,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
assertCorrectness();
if (file_offset_of_buffer_end == read_until_position)
{
LOG_TEST(log, "Read finished on offset {}", file_offset_of_buffer_end);
return false;
}
if (!initialized)
initialize(file_offset_of_buffer_end, getTotalSizeToRead());
@ -813,11 +809,8 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
{
bool need_complete_file_segment = file_segment->isDownloader();
if (need_complete_file_segment)
{
LOG_TEST(log, "Resetting downloader {} from scope exit", file_segment->getDownloader());
file_segment->completePartAndResetDownloader();
}
}
chassert(!file_segment->isDownloader());
}
@ -956,12 +949,12 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
else
{
chassert(file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
LOG_TEST(log, "Bypassing cache because writeCache method failed");
LOG_TRACE(log, "Bypassing cache because writeCache method failed");
}
}
else
{
LOG_DEBUG(log, "No space left in cache, will continue without cache download");
LOG_TRACE(log, "No space left in cache, will continue without cache download");
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
}

View File

@ -230,9 +230,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 doesn't support append to files");
auto settings_ptr = s3_settings.get();
ThreadPoolCallbackRunner<void> scheduler;
if (write_settings.s3_allow_parallel_part_upload)
scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "VFSWrite");
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "VFSWrite");
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
client.get(),

View File

@ -39,6 +39,7 @@ StoragePolicy::StoragePolicy(
const String & config_prefix,
DiskSelectorPtr disks)
: name(std::move(name_))
, log(&Poco::Logger::get("StoragePolicy (" + name + ")"))
{
Poco::Util::AbstractConfiguration::Keys keys;
String volumes_prefix = config_prefix + ".volumes";
@ -81,11 +82,15 @@ StoragePolicy::StoragePolicy(
throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor) + " in storage policy " + backQuote(name), ErrorCodes::LOGICAL_ERROR);
buildVolumeIndices();
LOG_TRACE(log, "Storage policy {} created, total volumes {}", name, volumes.size());
}
StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_)
: volumes(std::move(volumes_)), name(std::move(name_)), move_factor(move_factor_)
: volumes(std::move(volumes_))
, name(std::move(name_))
, move_factor(move_factor_)
, log(&Poco::Logger::get("StoragePolicy (" + name + ")"))
{
if (volumes.empty())
throw Exception("Storage policy " + backQuote(name) + " must contain at least one Volume.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
@ -94,6 +99,7 @@ StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_
throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor) + " in storage policy " + backQuote(name), ErrorCodes::LOGICAL_ERROR);
buildVolumeIndices();
LOG_TRACE(log, "Storage policy {} created, total volumes {}", name, volumes.size());
}
@ -206,13 +212,17 @@ UInt64 StoragePolicy::getMaxUnreservedFreeSpace() const
ReservationPtr StoragePolicy::reserve(UInt64 bytes, size_t min_volume_index) const
{
LOG_TRACE(log, "Reserving bytes {} from volume index {}, total volumes {}", bytes, min_volume_index, volumes.size());
for (size_t i = min_volume_index; i < volumes.size(); ++i)
{
const auto & volume = volumes[i];
auto reservation = volume->reserve(bytes);
if (reservation)
{
LOG_TRACE(log, "Successfully reserved {} bytes on volume index {}", bytes, i);
return reservation;
}
}
return {};
}

View File

@ -104,6 +104,8 @@ private:
double move_factor = 0.1; /// by default move factor is 10%
void buildVolumeIndices();
Poco::Logger * log;
};

View File

@ -34,7 +34,7 @@ IFileCachePriority::WriteIterator LRUFileCachePriority::add(const Key & key, siz
CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size);
CurrentMetrics::add(CurrentMetrics::FilesystemCacheElements);
LOG_DEBUG(log, "Added entry into LRU queue, key: {}, offset: {}", key.toString(), offset);
LOG_TRACE(log, "Added entry into LRU queue, key: {}, offset: {}", key.toString(), offset);
return std::make_shared<LRUFileCacheIterator>(this, iter);
}
@ -54,7 +54,7 @@ void LRUFileCachePriority::removeAll(std::lock_guard<std::mutex> &)
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, cache_size);
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements, queue.size());
LOG_DEBUG(log, "Removed all entries from LRU queue");
LOG_TRACE(log, "Removed all entries from LRU queue");
queue.clear();
cache_size = 0;
@ -88,7 +88,7 @@ void LRUFileCachePriority::LRUFileCacheIterator::removeAndGetNext(std::lock_guar
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, queue_iter->size);
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements);
LOG_DEBUG(cache_priority->log, "Removed entry from LRU queue, key: {}, offset: {}", queue_iter->key.toString(), queue_iter->offset);
LOG_TRACE(cache_priority->log, "Removed entry from LRU queue, key: {}, offset: {}", queue_iter->key.toString(), queue_iter->offset);
queue_iter = cache_priority->queue.erase(queue_iter);
}

View File

@ -100,8 +100,10 @@ struct ReplicatedFetchReadCallback
}
Service::Service(StorageReplicatedMergeTree & data_) :
data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
Service::Service(StorageReplicatedMergeTree & data_)
: data(data_)
, log(&Poco::Logger::get(data.getStorageID().getNameForLogs() + " (Replicated PartsService)"))
{}
std::string Service::getId(const std::string & node_id) const
{
@ -444,6 +446,11 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name)
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name);
}
Fetcher::Fetcher(StorageReplicatedMergeTree & data_)
: data(data_)
, log(&Poco::Logger::get(data.getStorageID().getNameForLogs() + " (Fetcher)"))
{}
MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
@ -494,6 +501,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
if (disk)
{
LOG_TRACE(log, "Will fetch to disk {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
UInt64 revision = disk->getRevision();
if (revision)
uri.addQueryParameter("disk_revision", toString(revision));
@ -504,13 +512,21 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
{
if (!disk)
{
LOG_TRACE(log, "Trying to fetch with zero-copy replication, but disk is not provided, will try to select");
Disks disks = data.getDisks();
for (const auto & data_disk : disks)
{
LOG_TRACE(log, "Checking disk {} with type {}", data_disk->getName(), toString(data_disk->getDataSourceDescription().type));
if (data_disk->supportZeroCopyReplication())
{
LOG_TRACE(log, "Disk {} (with type {}) supports zero-copy replication", data_disk->getName(), toString(data_disk->getDataSourceDescription().type));
capability.push_back(toString(data_disk->getDataSourceDescription().type));
}
}
}
else if (disk->supportZeroCopyReplication())
{
LOG_TRACE(log, "Trying to fetch with zero copy replication, provided disk {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
capability.push_back(toString(disk->getDataSourceDescription().type));
}
}
@ -562,29 +578,47 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
assertString("ttl format version: 1\n", ttl_infos_buffer);
ttl_infos.read(ttl_infos_buffer);
if (!disk)
{
LOG_TRACE(log, "Disk for fetch is not provided, reserving space using storage balanced reservation");
reservation
= data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true);
if (!reservation)
{
LOG_TRACE(log, "Disk for fetch is not provided, reserving space using TTL rules");
reservation
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
}
}
}
else if (!disk)
{
LOG_TRACE(log, "Making balanced reservation");
reservation = data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, nullptr);
if (!reservation)
{
LOG_TRACE(log, "Making simple reservation");
reservation = data.reserveSpace(sum_files_size);
}
}
}
else if (!disk)
{
LOG_TRACE(log, "Making reservation on the largest disk");
/// We don't know real size of part because sender server version is too old
reservation = data.makeEmptyReservationOnLargestDisk();
}
if (!disk)
{
disk = reservation->getDisk();
LOG_INFO(log, "Disk for fetch is not provided, getting disk from reservation {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
}
else
{
LOG_INFO(log, "Disk for fetch is disk {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
}
UInt64 revision = parse<UInt64>(in->getResponseCookie("disk_revision", "0"));
if (revision)
@ -989,7 +1023,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
if (!disk->supportZeroCopyReplication() || !disk->checkUniqueId(part_id))
{
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {}.", part_name, part_id, disk->getName());
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {} (with type {}).", part_name, part_id, disk->getName(), toString(disk->getDataSourceDescription().type));
}
LOG_DEBUG(log, "Downloading Part {} unique id {} metadata onto disk {}.",

View File

@ -67,7 +67,7 @@ private:
class Fetcher final : private boost::noncopyable
{
public:
explicit Fetcher(StorageReplicatedMergeTree & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {}
explicit Fetcher(StorageReplicatedMergeTree & data_);
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
MergeTreeData::MutableDataPartPtr fetchSelectedPart(

View File

@ -4909,12 +4909,14 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
LOG_TRACE(log, "Trying reserve {} bytes preffering TTL rules", expected_size);
ReservationPtr reservation;
auto move_ttl_entry = selectTTLDescriptionForTTLInfos(metadata_snapshot->getMoveTTLs(), ttl_infos.moves_ttl, time_of_move, true);
if (move_ttl_entry)
{
LOG_TRACE(log, "Got move TTL entry, will try to reserver destination for move");
SpacePtr destination_ptr = getDestinationForMoveTTL(*move_ttl_entry, is_insert);
if (!destination_ptr)
{
@ -4935,10 +4937,15 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
}
else
{
LOG_TRACE(log, "Reserving bytes on selected destination");
reservation = destination_ptr->reserve(expected_size);
if (reservation)
{
LOG_TRACE(log, "Reservation successful");
return reservation;
}
else
{
if (move_ttl_entry->destination_type == DataDestinationType::VOLUME)
LOG_WARNING(
log,
@ -4953,13 +4960,20 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
*std::atomic_load(&log_name));
}
}
}
// Prefer selected_disk
if (selected_disk)
{
LOG_DEBUG(log, "Disk for reservation provided: {} (with type {})", selected_disk->getName(), toString(selected_disk->getDataSourceDescription().type));
reservation = selected_disk->reserve(expected_size);
}
if (!reservation)
{
LOG_DEBUG(log, "No reservation, reserving using storage policy from min volume index {}", min_volume_index);
reservation = getStoragePolicy()->reserve(expected_size, min_volume_index);
}
return reservation;
}

View File

@ -13,9 +13,7 @@
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
<main><disk>s3</disk></main>
</volumes>
</s3>
</policies>

View File

@ -15,6 +15,8 @@ INSERT INTO partslost_0 SELECT toString(number) AS x from system.numbers LIMIT 1
ALTER TABLE partslost_0 ADD INDEX idx x TYPE tokenbf_v1(285000, 3, 12345) GRANULARITY 3;
SET mutations_sync = 2;
ALTER TABLE partslost_0 MATERIALIZE INDEX idx;
-- In worst case doesn't check anything, but it's not flaky

View File

@ -1,3 +1,5 @@
-- Tags: no-backward-compatibility-check
drop table if exists test_02381;
create table test_02381(a UInt64, b UInt64) ENGINE = MergeTree order by (a, b);
insert into test_02381 select number, number * 10 from system.numbers limit 1000000;

View File

@ -1,3 +1,5 @@
v22.9.2.7-stable 2022-09-23
v22.9.1.2603-stable 2022-09-22
v22.8.5.29-lts 2022-09-13
v22.8.4.7-lts 2022-08-31
v22.8.3.13-lts 2022-08-29

1 v22.8.5.29-lts v22.9.2.7-stable 2022-09-13 2022-09-23
1 v22.9.2.7-stable 2022-09-23
2 v22.9.1.2603-stable 2022-09-22
3 v22.8.5.29-lts v22.8.5.29-lts 2022-09-13 2022-09-13
4 v22.8.4.7-lts v22.8.4.7-lts 2022-08-31 2022-08-31
5 v22.8.3.13-lts v22.8.3.13-lts 2022-08-29 2022-08-29