Merge remote-tracking branch 'origin/master' into revert-54189-revert-53904-fix-parallel-replicas-skip-shards

This commit is contained in:
Igor Nikonov 2023-09-03 20:48:48 +00:00
commit 40efa989f3
55 changed files with 203 additions and 845 deletions

View File

@ -815,16 +815,16 @@ Aliases: `dateDiff`, `DATE_DIFF`, `timestampDiff`, `timestamp_diff`, `TIMESTAMP_
- `unit` — The type of interval for result. [String](../../sql-reference/data-types/string.md).
Possible values:
- `microsecond` (possible abbreviations: `us`, `u`)
- `millisecond` (possible abbreviations: `ms`)
- `second` (possible abbreviations: `ss`, `s`)
- `minute` (possible abbreviations: `mi`, `n`)
- `hour` (possible abbreviations: `hh`, `h`)
- `day` (possible abbreviations: `dd`, `d`)
- `week` (possible abbreviations: `wk`, `ww`)
- `month` (possible abbreviations: `mm`, `m`)
- `quarter` (possible abbreviations: `qq`, `q`)
- `year` (possible abbreviations: `yyyy`, `yy`)
- `microsecond` (possible abbreviations: `microseconds`, `us`, `u`)
- `millisecond` (possible abbreviations: `milliseconds`, `ms`)
- `second` (possible abbreviations: `seconds`, `ss`, `s`)
- `minute` (possible abbreviations: `minutes`, `mi`, `n`)
- `hour` (possible abbreviations: `hours`, `hh`, `h`)
- `day` (possible abbreviations: `days`, `dd`, `d`)
- `week` (possible abbreviations: `weeks`, `wk`, `ww`)
- `month` (possible abbreviations: `months`, `mm`, `m`)
- `quarter` (possible abbreviations: `quarters`, `qq`, `q`)
- `year` (possible abbreviations: `years`, `yyyy`, `yy`)
- `startdate` — The first time value to subtract (the subtrahend). [Date](../../sql-reference/data-types/date.md), [Date32](../../sql-reference/data-types/date32.md), [DateTime](../../sql-reference/data-types/datetime.md) or [DateTime64](../../sql-reference/data-types/datetime64.md).

View File

@ -1038,41 +1038,6 @@ try
fs::create_directories(path / "metadata_dropped/");
}
#if USE_ROCKSDB
/// Initialize merge tree metadata cache
if (config().has("merge_tree_metadata_cache"))
{
global_context->addWarningMessage("The setting 'merge_tree_metadata_cache' is enabled."
" But the feature of 'metadata cache in RocksDB' is experimental and is not ready for production."
" The usage of this feature can lead to data corruption and loss. The setting should be disabled in production."
" See the corresponding report at https://github.com/ClickHouse/ClickHouse/issues/51182");
fs::create_directories(path / "rocksdb/");
size_t size = config().getUInt64("merge_tree_metadata_cache.lru_cache_size", 256 << 20);
bool continue_if_corrupted = config().getBool("merge_tree_metadata_cache.continue_if_corrupted", false);
try
{
LOG_DEBUG(log, "Initializing MergeTree metadata cache, lru_cache_size: {} continue_if_corrupted: {}",
ReadableSize(size), continue_if_corrupted);
global_context->initializeMergeTreeMetadataCache(path_str + "/" + "rocksdb", size);
}
catch (...)
{
if (continue_if_corrupted)
{
/// Rename rocksdb directory and reinitialize merge tree metadata cache
time_t now = time(nullptr);
fs::rename(path / "rocksdb", path / ("rocksdb.old." + std::to_string(now)));
global_context->initializeMergeTreeMetadataCache(path_str + "/" + "rocksdb", size);
}
else
{
throw;
}
}
}
#endif
if (config().has("interserver_http_port") && config().has("interserver_https_port"))
throw Exception(ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, "Both http and https interserver ports are specified");

View File

@ -463,13 +463,6 @@ The server successfully detected this situation and will download merged part fr
M(AggregationPreallocatedElementsInHashTables, "How many elements were preallocated in hash tables for aggregation.") \
M(AggregationHashTablesInitializedAsTwoLevel, "How many hash tables were inited as two-level for aggregation.") \
\
M(MergeTreeMetadataCacheGet, "Number of rocksdb reads (used for merge tree metadata cache)") \
M(MergeTreeMetadataCachePut, "Number of rocksdb puts (used for merge tree metadata cache)") \
M(MergeTreeMetadataCacheDelete, "Number of rocksdb deletes (used for merge tree metadata cache)") \
M(MergeTreeMetadataCacheSeek, "Number of rocksdb seeks (used for merge tree metadata cache)") \
M(MergeTreeMetadataCacheHit, "Number of times the read of meta file was done from MergeTree metadata cache") \
M(MergeTreeMetadataCacheMiss, "Number of times the read of meta file was not done from MergeTree metadata cache") \
\
M(KafkaRebalanceRevocations, "Number of partition revocations (the first stage of consumer group rebalance)") \
M(KafkaRebalanceAssignments, "Number of partition assignments (the final stage of consumer group rebalance)") \
M(KafkaRebalanceErrors, "Number of failed consumer group rebalances") \

View File

@ -7,8 +7,6 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/logger_useful.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -143,13 +141,9 @@ namespace
}
}
std::shared_ptr<ProxyConfigurationResolver> ProxyConfigurationResolverProvider::get(Protocol protocol)
std::shared_ptr<ProxyConfigurationResolver> ProxyConfigurationResolverProvider::get(Protocol protocol, const Poco::Util::AbstractConfiguration & configuration)
{
auto context = Context::getGlobalContextInstance();
chassert(context);
if (auto resolver = getFromSettings(protocol, "", context->getConfigRef()))
if (auto resolver = getFromSettings(protocol, "", configuration))
{
return resolver;
}
@ -202,7 +196,7 @@ std::shared_ptr<ProxyConfigurationResolver> ProxyConfigurationResolverProvider::
* In case the combination of config_prefix and configuration does not provide a resolver, try to get it from general / new settings.
* Falls back to Environment resolver if no configuration is found.
* */
return ProxyConfigurationResolverProvider::get(Protocol::ANY);
return ProxyConfigurationResolverProvider::get(Protocol::ANY, configuration);
}
}

View File

@ -18,7 +18,9 @@ public:
* Returns appropriate ProxyConfigurationResolver based on current CH settings (Remote resolver or List resolver).
* If no configuration is found, returns Environment Resolver.
* */
static std::shared_ptr<ProxyConfigurationResolver> get(Protocol protocol);
static std::shared_ptr<ProxyConfigurationResolver> get(
Protocol protocol,
const Poco::Util::AbstractConfiguration & configuration);
/*
* This API exists exclusively for backward compatibility with old S3 storage specific proxy configuration.

View File

@ -32,9 +32,10 @@ Poco::URI https_list_proxy_server = Poco::URI("http://https_list_proxy:3128");
TEST_F(ProxyConfigurationResolverProviderTests, EnvironmentResolverShouldBeUsedIfNoSettings)
{
EnvironmentProxySetter setter(http_env_proxy_server, https_env_proxy_server);
const auto & config = getContext().context->getConfigRef();
auto http_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve();
auto https_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve();
auto http_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP, config)->resolve();
auto https_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS, config)->resolve();
ASSERT_EQ(http_configuration.host, http_env_proxy_server.getHost());
ASSERT_EQ(http_configuration.port, http_env_proxy_server.getPort());
@ -54,13 +55,13 @@ TEST_F(ProxyConfigurationResolverProviderTests, ListHTTPOnly)
config->setString("proxy.http.uri", http_list_proxy_server.toString());
context->setConfig(config);
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve();
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP, *config)->resolve();
ASSERT_EQ(http_proxy_configuration.host, http_list_proxy_server.getHost());
ASSERT_EQ(http_proxy_configuration.port, http_list_proxy_server.getPort());
ASSERT_EQ(http_proxy_configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_list_proxy_server.getScheme()));
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve();
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS, *config)->resolve();
// No https configuration since it's not set
ASSERT_EQ(https_proxy_configuration.host, "");
@ -76,12 +77,12 @@ TEST_F(ProxyConfigurationResolverProviderTests, ListHTTPSOnly)
config->setString("proxy.https.uri", https_list_proxy_server.toString());
context->setConfig(config);
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve();
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP, *config)->resolve();
ASSERT_EQ(http_proxy_configuration.host, "");
ASSERT_EQ(http_proxy_configuration.port, 0);
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve();
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS, *config)->resolve();
ASSERT_EQ(https_proxy_configuration.host, https_list_proxy_server.getHost());
@ -104,13 +105,13 @@ TEST_F(ProxyConfigurationResolverProviderTests, ListBoth)
context->setConfig(config);
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve();
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP, *config)->resolve();
ASSERT_EQ(http_proxy_configuration.host, http_list_proxy_server.getHost());
ASSERT_EQ(http_proxy_configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_list_proxy_server.getScheme()));
ASSERT_EQ(http_proxy_configuration.port, http_list_proxy_server.getPort());
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve();
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS, *config)->resolve();
ASSERT_EQ(https_proxy_configuration.host, https_list_proxy_server.getHost());

View File

@ -469,7 +469,7 @@ void BaseSettings<TTraits>::write(WriteBuffer & out, SettingsWriteFormat format)
{
const auto & accessor = Traits::Accessor::instance();
for (auto field : *this)
for (const auto & field : *this)
{
bool is_custom = field.isCustom();
bool is_important = !is_custom && accessor.isImportant(field.index);

View File

@ -25,7 +25,7 @@ void ServerSettings::loadSettingsFromConfig(const Poco::Util::AbstractConfigurat
"max_remote_write_network_bandwidth_for_server",
};
for (auto setting : all())
for (const auto & setting : all())
{
const auto & name = setting.getName();
if (config.has(name))

View File

@ -87,7 +87,7 @@ void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfigura
return;
Settings settings;
for (auto setting : settings.all())
for (const auto & setting : settings.all())
{
const auto & name = setting.getName();
if (config.has(name) && !setting.isObsolete())

View File

@ -783,6 +783,7 @@ class IColumn;
M(Bool, allow_experimental_object_type, false, "Allow Object and JSON data types", 0) \
M(Bool, allow_experimental_annoy_index, false, "Allows to use Annoy index. Disabled by default because this feature is experimental", 0) \
M(Bool, allow_experimental_usearch_index, false, "Allows to use USearch index. Disabled by default because this feature is experimental", 0) \
M(Bool, allow_experimental_s3queue, false, "Allows to use S3Queue engine. Disabled by default, because this feature is experimental", 0) \
M(UInt64, max_limit_for_ann_queries, 1'000'000, "SELECT queries with LIMIT bigger than this setting cannot use ANN indexes. Helps to prevent memory overflows in ANN search indexes.", 0) \
M(Int64, annoy_index_search_k_nodes, -1, "SELECT queries search up to this many nodes in Annoy indexes.", 0) \
M(Bool, throw_on_unsupported_query_inside_transaction, true, "Throw exception if unsupported query is used inside transaction", 0) \

View File

@ -237,7 +237,6 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
/// In case of multiple files for the same file in clickhouse (i.e. log family)
/// file_offset_of_buffer_end will not match getImplementationBufferOffset()
/// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
chassert(file_offset_of_buffer_end >= impl->getFileOffsetOfBufferEnd());
chassert(file_offset_of_buffer_end <= impl->getFileSize());
return bytes_read;
@ -312,7 +311,8 @@ off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
if (read_until_position && new_pos > *read_until_position)
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
if (!impl->seekIsCheap())
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
file_offset_of_buffer_end = new_pos = *read_until_position; /// read_until_position is a non-included boundary.
impl->seek(file_offset_of_buffer_end, SEEK_SET);
return new_pos;
@ -330,7 +330,8 @@ off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
}
else
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
if (!impl->seekIsCheap())
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
file_offset_of_buffer_end = new_pos;
impl->seek(file_offset_of_buffer_end, SEEK_SET);
}

View File

@ -1205,13 +1205,6 @@ off_t CachedOnDiskReadBufferFromFile::getPosition()
return file_offset_of_buffer_end - available();
}
void CachedOnDiskReadBufferFromFile::assertCorrectness() const
{
if (!CachedObjectStorage::canUseReadThroughCache(settings)
&& !settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache usage is not allowed (query_id: {})", query_id);
}
String CachedOnDiskReadBufferFromFile::getInfoForLog()
{
String current_file_segment_info;

View File

@ -64,7 +64,6 @@ private:
using ImplementationBufferPtr = std::shared_ptr<ReadBufferFromFileBase>;
void initialize(size_t offset, size_t size);
void assertCorrectness() const;
/**
* Return a list of file segments ordered in ascending order. This list represents

View File

@ -108,6 +108,10 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset
data += size_to_write;
}
size_t available_size = file_segment->range().size() - file_segment->getDownloadedSize();
if (available_size == 0)
completeFileSegment();
return true;
}
@ -196,15 +200,16 @@ CachedOnDiskWriteBufferFromFile::CachedOnDiskWriteBufferFromFile(
const String & source_path_,
const FileCache::Key & key_,
const String & query_id_,
const WriteSettings & settings_)
const WriteSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: WriteBufferFromFileDecorator(std::move(impl_))
, log(&Poco::Logger::get("CachedOnDiskWriteBufferFromFile"))
, cache(cache_)
, source_path(source_path_)
, key(key_)
, query_id(query_id_)
, enable_cache_log(!query_id_.empty() && settings_.enable_filesystem_cache_log)
, throw_on_error_from_cache(settings_.throw_on_error_from_cache)
, cache_log(!query_id_.empty() && settings_.enable_filesystem_cache_log ? cache_log_ : nullptr)
{
}
@ -241,10 +246,6 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size, bool t
if (!cache_writer)
{
std::shared_ptr<FilesystemCacheLog> cache_log;
if (enable_cache_log)
cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog();
cache_writer = std::make_unique<FileSegmentRangeWriter>(cache.get(), key, cache_log, query_id, source_path);
}

View File

@ -73,7 +73,8 @@ public:
const String & source_path_,
const FileCache::Key & key_,
const String & query_id_,
const WriteSettings & settings_);
const WriteSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_);
void nextImpl() override;
@ -91,12 +92,11 @@ private:
size_t current_download_offset = 0;
const String query_id;
bool enable_cache_log;
bool throw_on_error_from_cache;
bool cache_in_error_state_or_disabled = false;
std::unique_ptr<FileSegmentRangeWriter> cache_writer;
std::shared_ptr<FilesystemCacheLog> cache_log;
};
}

View File

@ -113,7 +113,8 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
implementation_buffer->getFileName(),
key,
CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() ? std::string(CurrentThread::getQueryId()) : "",
modified_write_settings);
modified_write_settings,
Context::getGlobalContextInstance()->getFilesystemCacheLog());
}
return implementation_buffer;

View File

@ -509,13 +509,11 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
LOG_TEST(log, "Write file: {}", path);
auto transaction = createObjectStorageTransaction();
auto result = transaction->writeFile(
return transaction->writeFile(
path,
buf_size,
mode,
object_storage->getAdjustedSettingsFromMetadataFile(settings, path));
return result;
}
Strings DiskObjectStorage::getBlobPath(const String & path) const

View File

@ -381,25 +381,25 @@ public:
const auto & timezone_x = extractTimeZoneFromFunctionArguments(arguments, 3, 1);
const auto & timezone_y = extractTimeZoneFromFunctionArguments(arguments, 3, 2);
if (unit == "year" || unit == "yy" || unit == "yyyy")
if (unit == "year" || unit == "years" || unit == "yy" || unit == "yyyy")
impl.template dispatchForColumns<ToRelativeYearNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "quarter" || unit == "qq" || unit == "q")
else if (unit == "quarter" || unit == "quarters" || unit == "qq" || unit == "q")
impl.template dispatchForColumns<ToRelativeQuarterNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "month" || unit == "mm" || unit == "m")
else if (unit == "month" || unit == "months" || unit == "mm" || unit == "m")
impl.template dispatchForColumns<ToRelativeMonthNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "week" || unit == "wk" || unit == "ww")
else if (unit == "week" || unit == "weeks" || unit == "wk" || unit == "ww")
impl.template dispatchForColumns<ToRelativeWeekNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "day" || unit == "dd" || unit == "d")
else if (unit == "day" || unit == "days" || unit == "dd" || unit == "d")
impl.template dispatchForColumns<ToRelativeDayNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "hour" || unit == "hh" || unit == "h")
else if (unit == "hour" || unit == "hours" || unit == "hh" || unit == "h")
impl.template dispatchForColumns<ToRelativeHourNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "minute" || unit == "mi" || unit == "n")
else if (unit == "minute" || unit == "minutes" || unit == "mi" || unit == "n")
impl.template dispatchForColumns<ToRelativeMinuteNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "second" || unit == "ss" || unit == "s")
else if (unit == "second" || unit == "seconds" || unit == "ss" || unit == "s")
impl.template dispatchForColumns<ToRelativeSecondNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "millisecond" || unit == "ms")
else if (unit == "millisecond" || unit == "milliseconds" || unit == "ms")
impl.template dispatchForColumns<ToRelativeSubsecondNumImpl<millisecond_multiplier>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "microsecond" || unit == "us" || unit == "u")
else if (unit == "microsecond" || unit == "microseconds" || unit == "us" || unit == "u")
impl.template dispatchForColumns<ToRelativeSubsecondNumImpl<microsecond_multiplier>>(x, y, timezone_x, timezone_y, res->getData());
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,

View File

@ -20,6 +20,7 @@
#include <IO/S3/PocoHTTPClientFactory.h>
#include <IO/S3/AWSLogger.h>
#include <IO/S3/Credentials.h>
#include <Interpreters/Context.h>
#include <Common/assert_cast.h>
@ -866,7 +867,9 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
const ThrottlerPtr & put_request_throttler,
const String & protocol)
{
auto proxy_configuration_resolver = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::protocolFromString(protocol));
auto context = Context::getGlobalContextInstance();
chassert(context);
auto proxy_configuration_resolver = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::protocolFromString(protocol), context->getConfigRef());
auto per_request_configuration = [=] () { return proxy_configuration_resolver->resolve(); };
auto error_report = [=] (const DB::ProxyConfiguration & req) { proxy_configuration_resolver->errorReport(req); };

View File

@ -55,7 +55,7 @@ namespace ErrorCodes
FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & settings)
: max_file_segment_size(settings.max_file_segment_size)
, bypass_cache_threshold(settings.enable_bypass_cache_with_threashold ? settings.bypass_cache_threashold : 0)
, bypass_cache_threshold(settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0)
, boundary_alignment(settings.boundary_alignment)
, background_download_threads(settings.background_download_threads)
, metadata_download_threads(settings.load_metadata_threads)

View File

@ -39,10 +39,10 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
enable_filesystem_query_cache_limit = config.getUInt64(config_prefix + ".enable_filesystem_query_cache_limit", false);
cache_hits_threshold = config.getUInt64(config_prefix + ".cache_hits_threshold", FILECACHE_DEFAULT_HITS_THRESHOLD);
enable_bypass_cache_with_threashold = config.getUInt64(config_prefix + ".enable_bypass_cache_with_threashold", false);
enable_bypass_cache_with_threshold = config.getUInt64(config_prefix + ".enable_bypass_cache_with_threshold", false);
if (config.has(config_prefix + ".bypass_cache_threashold"))
bypass_cache_threashold = parseWithSizeSuffix<uint64_t>(config.getString(config_prefix + ".bypass_cache_threashold"));
if (config.has(config_prefix + ".bypass_cache_threshold"))
bypass_cache_threshold = parseWithSizeSuffix<uint64_t>(config.getString(config_prefix + ".bypass_cache_threshold"));
if (config.has(config_prefix + ".boundary_alignment"))
boundary_alignment = parseWithSizeSuffix<uint64_t>(config.getString(config_prefix + ".boundary_alignment"));

View File

@ -22,8 +22,8 @@ struct FileCacheSettings
size_t cache_hits_threshold = FILECACHE_DEFAULT_HITS_THRESHOLD;
bool enable_filesystem_query_cache_limit = false;
bool enable_bypass_cache_with_threashold = false;
size_t bypass_cache_threashold = FILECACHE_BYPASS_THRESHOLD;
bool enable_bypass_cache_with_threshold = false;
size_t bypass_cache_threshold = FILECACHE_BYPASS_THRESHOLD;
size_t boundary_alignment = FILECACHE_DEFAULT_FILE_SEGMENT_ALIGNMENT;
size_t background_download_threads = FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_THREADS;

View File

@ -92,7 +92,6 @@
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Storages/MergeTree/MergeTreeMetadataCache.h>
#include <Interpreters/SynonymsExtensions.h>
#include <Interpreters/Lemmatizers.h>
#include <Interpreters/ClusterDiscovery.h>
@ -351,11 +350,6 @@ struct ContextSharedPart : boost::noncopyable
bool is_server_completely_started = false;
#if USE_ROCKSDB
/// Global merge tree metadata cache, stored in rocksdb.
MergeTreeMetadataCachePtr merge_tree_metadata_cache;
#endif
ContextSharedPart()
: access_control(std::make_unique<AccessControl>())
, global_overcommit_tracker(&process_list)
@ -585,15 +579,6 @@ struct ContextSharedPart : boost::noncopyable
trace_collector.reset();
/// Stop zookeeper connection
zookeeper.reset();
#if USE_ROCKSDB
/// Shutdown merge tree metadata cache
if (merge_tree_metadata_cache)
{
merge_tree_metadata_cache->shutdown();
merge_tree_metadata_cache.reset();
}
#endif
}
/// Can be removed without context lock
@ -2960,13 +2945,6 @@ std::map<String, zkutil::ZooKeeperPtr> Context::getAuxiliaryZooKeepers() const
return shared->auxiliary_zookeepers;
}
#if USE_ROCKSDB
MergeTreeMetadataCachePtr Context::tryGetMergeTreeMetadataCache() const
{
return shared->merge_tree_metadata_cache;
}
#endif
void Context::resetZooKeeper() const
{
std::lock_guard lock(shared->zookeeper_mutex);
@ -3256,13 +3234,6 @@ void Context::initializeTraceCollector()
shared->initializeTraceCollector(getTraceLog());
}
#if USE_ROCKSDB
void Context::initializeMergeTreeMetadataCache(const String & dir, size_t size)
{
shared->merge_tree_metadata_cache = MergeTreeMetadataCache::create(dir, size);
}
#endif
/// Call after unexpected crash happen.
void Context::handleCrash() const
{

View File

@ -196,11 +196,6 @@ using TemporaryDataOnDiskScopePtr = std::shared_ptr<TemporaryDataOnDiskScope>;
class ParallelReplicasReadingCoordinator;
using ParallelReplicasReadingCoordinatorPtr = std::shared_ptr<ParallelReplicasReadingCoordinator>;
#if USE_ROCKSDB
class MergeTreeMetadataCache;
using MergeTreeMetadataCachePtr = std::shared_ptr<MergeTreeMetadataCache>;
#endif
class PreparedSetsCache;
using PreparedSetsCachePtr = std::shared_ptr<PreparedSetsCache>;
@ -895,10 +890,6 @@ public:
UInt64 getClientProtocolVersion() const;
void setClientProtocolVersion(UInt64 version);
#if USE_ROCKSDB
MergeTreeMetadataCachePtr tryGetMergeTreeMetadataCache() const;
#endif
#if USE_NURAFT
std::shared_ptr<KeeperDispatcher> & getKeeperDispatcher() const;
std::shared_ptr<KeeperDispatcher> & tryGetKeeperDispatcher() const;
@ -1003,10 +994,6 @@ public:
/// Call after initialization before using trace collector.
void initializeTraceCollector();
#if USE_ROCKSDB
void initializeMergeTreeMetadataCache(const String & dir, size_t size);
#endif
/// Call after unexpected crash happen.
void handleCrash() const;

View File

@ -54,7 +54,7 @@ BlockIO InterpreterDescribeCacheQuery::execute()
res_columns[i++]->insert(cache->getFileSegmentsNum());
res_columns[i++]->insert(cache->getBasePath());
res_columns[i++]->insert(settings.background_download_threads);
res_columns[i++]->insert(settings.enable_bypass_cache_with_threashold);
res_columns[i++]->insert(settings.enable_bypass_cache_with_threshold);
BlockIO res;
size_t num_rows = res_columns[0]->size();

View File

@ -15,7 +15,6 @@
#include <IO/MMappedFileCache.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMetadataCache.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MarkCache.h>
@ -125,14 +124,6 @@ void ServerAsynchronousMetrics::updateImpl(AsynchronousMetricValues & new_values
"Total number of cached file segments in the `cache` virtual filesystem. This cache is hold on disk." };
}
#if USE_ROCKSDB
if (auto metadata_cache = getContext()->tryGetMergeTreeMetadataCache())
{
new_values["MergeTreeMetadataCacheSize"] = { metadata_cache->getEstimateNumKeys(),
"The size of the metadata cache for tables. This cache is experimental and not used in production." };
}
#endif
#if USE_EMBEDDED_COMPILER
if (auto * compiled_expression_cache = CompiledExpressionCacheFactory::instance().tryGetCache())
{

View File

@ -16,7 +16,6 @@
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/PartMetadataManagerOrdinary.h>
#include <Storages/MergeTree/PartMetadataManagerWithCache.h>
#include <Core/NamesAndTypes.h>
#include <Storages/ColumnsDescription.h>
#include <Common/StringUtils/StringUtils.h>
@ -320,7 +319,6 @@ IMergeTreeDataPart::IMergeTreeDataPart(
, part_type(part_type_)
, parent_part(parent_part_)
, parent_part_name(parent_part ? parent_part->name : "")
, use_metadata_cache(storage.use_metadata_cache)
{
if (parent_part)
{
@ -1673,14 +1671,7 @@ std::pair<bool, NameSet> IMergeTreeDataPart::canRemovePart() const
void IMergeTreeDataPart::initializePartMetadataManager()
{
#if USE_ROCKSDB
if (auto metadata_cache = storage.getContext()->tryGetMergeTreeMetadataCache(); metadata_cache && use_metadata_cache)
metadata_manager = std::make_shared<PartMetadataManagerWithCache>(this, metadata_cache);
else
metadata_manager = std::make_shared<PartMetadataManagerOrdinary>(this);
#else
metadata_manager = std::make_shared<PartMetadataManagerOrdinary>(this);
#endif
metadata_manager = std::make_shared<PartMetadataManagerOrdinary>(this);
}
void IMergeTreeDataPart::initializeIndexGranularityInfo()
@ -2065,34 +2056,6 @@ String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const
return info.partition_id + "_" + toString(hash_value.items[0]) + "_" + toString(hash_value.items[1]);
}
IMergeTreeDataPart::uint128 IMergeTreeDataPart::getActualChecksumByFile(const String & file_name) const
{
assert(use_metadata_cache);
const auto filenames_without_checksums = getFileNamesWithoutChecksums();
auto it = checksums.files.find(file_name);
if (!filenames_without_checksums.contains(file_name) && it != checksums.files.end())
{
return it->second.file_hash;
}
if (!getDataPartStorage().exists(file_name))
{
return {};
}
std::unique_ptr<ReadBufferFromFileBase> in_file = getDataPartStorage().readFile(file_name, {}, std::nullopt, std::nullopt);
HashingReadBuffer in_hash(*in_file);
String value;
readStringUntilEOF(value, in_hash);
return in_hash.getHash();
}
std::unordered_map<String, IMergeTreeDataPart::uint128> IMergeTreeDataPart::checkMetadata() const
{
return metadata_manager->check();
}
bool isCompactPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::Compact);

View File

@ -481,12 +481,6 @@ public:
/// Required for keep data on remote FS when part has shadow copies.
UInt32 getNumberOfRefereneces() const;
/// Get checksums of metadata file in part directory
IMergeTreeDataPart::uint128 getActualChecksumByFile(const String & file_name) const;
/// Check metadata in cache is consistent with actual metadata on disk(if use_metadata_cache is true)
std::unordered_map<String, uint128> checkMetadata() const;
/// True if the part supports lightweight delete mutate.
bool supportLightweightDeleteMutate() const;
@ -536,9 +530,6 @@ protected:
std::map<String, std::shared_ptr<IMergeTreeDataPart>> projection_parts;
/// Disabled when USE_ROCKSDB is OFF or use_metadata_cache is set to false in merge tree settings
bool use_metadata_cache = false;
mutable PartMetadataManagerPtr metadata_manager;
void removeIfNeeded();

View File

@ -20,7 +20,6 @@ using DiskPtr = std::shared_ptr<IDisk>;
/// - PartMetadataManagerOrdinary: manage metadata from disk directly. deleteAll/assertAllDeleted/updateAll/check
/// are all empty implementations because they are not needed for PartMetadataManagerOrdinary(those operations
/// are done implicitly when removing or renaming part directory).
/// - PartMetadataManagerWithCache: manage metadata from RocksDB cache and disk.
class IPartMetadataManager
{
public:

View File

@ -354,7 +354,6 @@ MergeTreeData::MergeTreeData(
, parts_mover(this)
, background_operations_assignee(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext())
, background_moves_assignee(*this, BackgroundJobsAssignee::Type::Moving, getContext())
, use_metadata_cache(getSettings()->use_metadata_cache)
{
context_->getGlobalContext()->initializeBackgroundExecutorsIfNeeded();
@ -405,11 +404,6 @@ MergeTreeData::MergeTreeData(
if (!canUsePolymorphicParts(*settings, reason) && !reason.empty())
LOG_WARNING(log, "{} Settings 'min_rows_for_wide_part'and 'min_bytes_for_wide_part' will be ignored.", reason);
#if !USE_ROCKSDB
if (use_metadata_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't use merge tree metadata cache if clickhouse was compiled without rocksdb");
#endif
common_assignee_trigger = [this] (bool delay) noexcept
{
if (delay)

View File

@ -1183,7 +1183,6 @@ protected:
/// And for ReplicatedMergeTree we don't have LogEntry type for this operation.
BackgroundJobsAssignee background_operations_assignee;
BackgroundJobsAssignee background_moves_assignee;
bool use_metadata_cache;
/// Strongly connected with two fields above.
/// Every task that is finished will ask to assign a new one into an executor.

View File

@ -737,7 +737,7 @@ void bloomFilterIndexValidator(const IndexDescription & index, bool /*attach*/)
data_type = WhichDataType(low_cardinality.getDictionaryType());
}
if (!data_type.isString() && !data_type.isFixedString())
if (!data_type.isString() && !data_type.isFixedString() && !data_type.isIPv6())
throw Exception(ErrorCodes::INCORRECT_QUERY,
"Ngram and token bloom filter indexes can only be used with column types `String`, `FixedString`, `LowCardinality(String)`, `LowCardinality(FixedString)`, `Array(String)` or `Array(FixedString)`");
}

View File

@ -1,107 +0,0 @@
#include "MergeTreeMetadataCache.h"
#if USE_ROCKSDB
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
namespace ProfileEvents
{
extern const Event MergeTreeMetadataCachePut;
extern const Event MergeTreeMetadataCacheGet;
extern const Event MergeTreeMetadataCacheDelete;
extern const Event MergeTreeMetadataCacheSeek;
}
namespace DB
{
namespace ErrorCodes
{
extern const int SYSTEM_ERROR;
}
std::unique_ptr<MergeTreeMetadataCache> MergeTreeMetadataCache::create(const String & dir, size_t size)
{
assert(size != 0);
rocksdb::Options options;
rocksdb::BlockBasedTableOptions table_options;
rocksdb::DB * db;
options.create_if_missing = true;
auto cache = rocksdb::NewLRUCache(size);
table_options.block_cache = cache;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
if (status != rocksdb::Status::OK())
throw Exception(
ErrorCodes::SYSTEM_ERROR,
"Fail to open rocksdb path at: {} status:{}. You can try to remove the cache (this will not affect any table data).",
dir,
status.ToString());
return std::make_unique<MergeTreeMetadataCache>(db);
}
MergeTreeMetadataCache::Status MergeTreeMetadataCache::put(const String & key, const String & value)
{
auto options = rocksdb::WriteOptions();
options.sync = true;
options.disableWAL = false;
auto status = rocksdb->Put(options, key, value);
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCachePut);
return status;
}
MergeTreeMetadataCache::Status MergeTreeMetadataCache::del(const String & key)
{
auto options = rocksdb::WriteOptions();
options.sync = true;
options.disableWAL = false;
auto status = rocksdb->Delete(options, key);
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheDelete);
LOG_TRACE(log, "Delete key:{} from MergeTreeMetadataCache status:{}", key, status.ToString());
return status;
}
MergeTreeMetadataCache::Status MergeTreeMetadataCache::get(const String & key, String & value)
{
auto status = rocksdb->Get(rocksdb::ReadOptions(), key, &value);
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheGet);
LOG_TRACE(log, "Get key:{} from MergeTreeMetadataCache status:{}", key, status.ToString());
return status;
}
void MergeTreeMetadataCache::getByPrefix(const String & prefix, Strings & keys, Strings & values)
{
auto * it = rocksdb->NewIterator(rocksdb::ReadOptions());
rocksdb::Slice target(prefix);
for (it->Seek(target); it->Valid(); it->Next())
{
const auto key = it->key();
if (!key.starts_with(target))
break;
const auto value = it->value();
keys.emplace_back(key.data(), key.size());
values.emplace_back(value.data(), value.size());
}
LOG_TRACE(log, "Seek with prefix:{} from MergeTreeMetadataCache items:{}", prefix, keys.size());
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheSeek);
delete it;
}
uint64_t MergeTreeMetadataCache::getEstimateNumKeys() const
{
uint64_t keys = 0;
rocksdb->GetAggregatedIntProperty("rocksdb.estimate-num-keys", &keys);
return keys;
}
void MergeTreeMetadataCache::shutdown()
{
rocksdb->Close();
rocksdb.reset();
}
}
#endif

View File

@ -1,45 +0,0 @@
#pragma once
#include "config.h"
#if USE_ROCKSDB
#include <base/types.h>
#include <Core/Types.h>
#include <Poco/Logger.h>
#include <rocksdb/table.h>
#include <rocksdb/db.h>
namespace DB
{
class MergeTreeMetadataCache
{
public:
using Status = rocksdb::Status;
static std::unique_ptr<MergeTreeMetadataCache> create(const String & dir, size_t size);
explicit MergeTreeMetadataCache(rocksdb::DB * rocksdb_) : rocksdb{rocksdb_}
{
assert(rocksdb);
}
MergeTreeMetadataCache(const MergeTreeMetadataCache &) = delete;
MergeTreeMetadataCache & operator=(const MergeTreeMetadataCache &) = delete;
Status put(const String & key, const String & value);
Status del(const String & key);
Status get(const String & key, String & value);
void getByPrefix(const String & prefix, Strings & keys, Strings & values);
uint64_t getEstimateNumKeys() const;
void shutdown();
private:
std::unique_ptr<rocksdb::DB> rocksdb;
Poco::Logger * log = &Poco::Logger::get("MergeTreeMetadataCache");
};
using MergeTreeMetadataCachePtr = std::shared_ptr<MergeTreeMetadataCache>;
}
#endif

View File

@ -169,7 +169,6 @@ struct Settings;
/** Experimental/work in progress feature. Unsafe for production. */ \
M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \
M(UInt64, part_moves_between_shards_delay_seconds, 30, "Time to wait before/after moving parts between shards.", 0) \
M(Bool, use_metadata_cache, false, "Experimental feature to speed up parts loading process by using MergeTree metadata cache", 0) \
M(Bool, allow_remote_fs_zero_copy_replication, false, "Don't use this setting in production, because it is not ready.", 0) \
M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for zero-copy table-independent info.", 0) \
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
@ -200,6 +199,7 @@ struct Settings;
M(Bool, in_memory_parts_insert_sync, false, "Obsolete setting, does nothing.", 0) \
M(MaxThreads, max_part_loading_threads, 0, "Obsolete setting, does nothing.", 0) \
M(MaxThreads, max_part_removal_threads, 0, "Obsolete setting, does nothing.", 0) \
M(Bool, use_metadata_cache, false, "Obsolete setting, does nothing.", 0) \
/// Settings that should not change after the creation of a table.
/// NOLINTNEXTLINE

View File

@ -1,298 +0,0 @@
#include "PartMetadataManagerWithCache.h"
#if USE_ROCKSDB
#include <base/hex.h>
#include <Common/ErrorCodes.h>
#include <IO/HashingReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace ProfileEvents
{
extern const Event MergeTreeMetadataCacheHit;
extern const Event MergeTreeMetadataCacheMiss;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CORRUPTED_DATA;
extern const int NO_SUCH_PROJECTION_IN_TABLE;
}
PartMetadataManagerWithCache::PartMetadataManagerWithCache(const IMergeTreeDataPart * part_, const MergeTreeMetadataCachePtr & cache_)
: IPartMetadataManager(part_), cache(cache_)
{
}
String PartMetadataManagerWithCache::getKeyFromFilePath(const String & file_path) const
{
return part->getDataPartStorage().getDiskName() + ":" + file_path;
}
String PartMetadataManagerWithCache::getFilePathFromKey(const String & key) const
{
return key.substr(part->getDataPartStorage().getDiskName().size() + 1);
}
std::unique_ptr<ReadBuffer> PartMetadataManagerWithCache::read(const String & file_name) const
{
String file_path = fs::path(part->getDataPartStorage().getRelativePath()) / file_name;
String key = getKeyFromFilePath(file_path);
String value;
auto status = cache->get(key, value);
if (!status.ok())
{
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheMiss);
auto in = part->getDataPartStorage().readFile(file_name, {}, std::nullopt, std::nullopt);
std::unique_ptr<ReadBuffer> reader;
if (!isCompressedFromFileName(file_name))
reader = std::move(in);
else
reader = std::make_unique<CompressedReadBufferFromFile>(std::move(in));
readStringUntilEOF(value, *reader);
cache->put(key, value);
}
else
{
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheHit);
}
return std::make_unique<ReadBufferFromOwnString>(value);
}
bool PartMetadataManagerWithCache::exists(const String & file_name) const
{
String file_path = fs::path(part->getDataPartStorage().getRelativePath()) / file_name;
String key = getKeyFromFilePath(file_path);
String value;
auto status = cache->get(key, value);
if (status.ok())
{
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheHit);
return true;
}
else
{
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheMiss);
return part->getDataPartStorage().exists(file_name);
}
}
void PartMetadataManagerWithCache::deleteAll(bool include_projection)
{
Strings file_names;
part->appendFilesOfColumnsChecksumsIndexes(file_names, include_projection);
String value;
for (const auto & file_name : file_names)
{
String file_path = fs::path(part->getDataPartStorage().getRelativePath()) / file_name;
String key = getKeyFromFilePath(file_path);
auto status = cache->del(key);
if (!status.ok())
{
status = cache->get(key, value);
if (status.IsNotFound())
continue;
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"deleteAll failed include_projection:{} status:{}, file_path:{}",
include_projection,
status.ToString(),
file_path);
}
}
}
void PartMetadataManagerWithCache::updateAll(bool include_projection)
{
Strings file_names;
part->appendFilesOfColumnsChecksumsIndexes(file_names, include_projection);
String value;
String read_value;
/// This is used to remove the keys in case of any exception while caching other keys
Strings keys_added_to_cache;
keys_added_to_cache.reserve(file_names.size());
try
{
for (const auto & file_name : file_names)
{
String file_path = fs::path(part->getDataPartStorage().getRelativePath()) / file_name;
if (!part->getDataPartStorage().exists(file_name))
continue;
auto in = part->getDataPartStorage().readFile(file_name, {}, std::nullopt, std::nullopt);
readStringUntilEOF(value, *in);
String key = getKeyFromFilePath(file_path);
auto status = cache->put(key, value);
if (!status.ok())
{
status = cache->get(key, read_value);
if (status.IsNotFound() || read_value == value)
continue;
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"updateAll failed include_projection:{} status:{}, file_path:{}",
include_projection,
status.ToString(),
file_path);
}
keys_added_to_cache.emplace_back(key);
}
}
catch (...)
{
for (const auto & key : keys_added_to_cache)
{
cache->del(key);
}
throw;
}
}
void PartMetadataManagerWithCache::assertAllDeleted(bool include_projection) const
{
Strings keys;
std::vector<uint128> _;
getKeysAndCheckSums(keys, _);
if (keys.empty())
return;
String file_path;
String file_name;
for (const auto & key : keys)
{
file_path = getFilePathFromKey(key);
file_name = fs::path(file_path).filename();
/// Metadata file belongs to current part
if (fs::path(part->getDataPartStorage().getRelativePath()) / file_name == file_path)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Data part {} with type {} with meta file {} still in cache",
part->name,
part->getType().toString(),
file_path);
/// File belongs to projection part of current part
if (!part->isProjectionPart() && include_projection)
{
const auto & projection_parts = part->getProjectionParts();
for (const auto & [projection_name, projection_part] : projection_parts)
{
if (fs::path(part->getDataPartStorage().getRelativePath()) / (projection_name + ".proj") / file_name == file_path)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Data part {} with type {} with meta file {} with projection name {} still in cache",
part->name,
part->getType().toString(),
file_path,
projection_name);
}
}
}
}
}
void PartMetadataManagerWithCache::getKeysAndCheckSums(Strings & keys, std::vector<uint128> & checksums) const
{
String prefix = getKeyFromFilePath(fs::path(part->getDataPartStorage().getRelativePath()) / "");
Strings values;
cache->getByPrefix(prefix, keys, values);
size_t size = keys.size();
for (size_t i = 0; i < size; ++i)
{
ReadBufferFromString rbuf(values[i]);
HashingReadBuffer hbuf(rbuf);
hbuf.ignoreAll();
checksums.push_back(hbuf.getHash());
}
}
std::unordered_map<String, IPartMetadataManager::uint128> PartMetadataManagerWithCache::check() const
{
/// Only applies for normal part stored on disk
if (part->isProjectionPart() || !part->isStoredOnDisk())
return {};
/// The directory of projection part is under the directory of its parent part
const auto filenames_without_checksums = part->getFileNamesWithoutChecksums();
std::unordered_map<String, uint128> results;
Strings keys;
std::vector<uint128> cache_checksums;
std::vector<uint128> disk_checksums;
getKeysAndCheckSums(keys, cache_checksums);
for (size_t i = 0; i < keys.size(); ++i)
{
const auto & key = keys[i];
String file_path = getFilePathFromKey(key);
String file_name = fs::path(file_path).filename();
results.emplace(file_name, cache_checksums[i]);
/// File belongs to normal part
if (fs::path(part->getDataPartStorage().getRelativePath()) / file_name == file_path)
{
auto disk_checksum = part->getActualChecksumByFile(file_name);
if (disk_checksum != cache_checksums[i])
throw Exception(
ErrorCodes::CORRUPTED_DATA,
"Checksums doesn't match in part {} for {}. Expected: {}. Found {}.",
part->name, file_path,
getHexUIntUppercase(disk_checksum),
getHexUIntUppercase(cache_checksums[i]));
disk_checksums.push_back(disk_checksum);
continue;
}
/// File belongs to projection part
String proj_dir_name = fs::path(file_path).parent_path().filename();
auto pos = proj_dir_name.find_last_of('.');
if (pos == String::npos)
{
throw Exception(
ErrorCodes::NO_SUCH_PROJECTION_IN_TABLE,
"There is no projection in part: {} contains file: {} with directory name: {}",
part->name,
file_path,
proj_dir_name);
}
String proj_name = proj_dir_name.substr(0, pos);
const auto & projection_parts = part->getProjectionParts();
auto it = projection_parts.find(proj_name);
if (it == projection_parts.end())
{
throw Exception(
ErrorCodes::NO_SUCH_PROJECTION_IN_TABLE,
"There is no projection {} in part: {} contains file: {}",
proj_name, part->name, file_path);
}
auto disk_checksum = it->second->getActualChecksumByFile(file_name);
if (disk_checksum != cache_checksums[i])
throw Exception(
ErrorCodes::CORRUPTED_DATA,
"Checksums doesn't match in projection part {} {}. Expected: {}. Found {}.",
part->name, proj_name,
getHexUIntUppercase(disk_checksum),
getHexUIntUppercase(cache_checksums[i]));
disk_checksums.push_back(disk_checksum);
}
return results;
}
}
#endif

View File

@ -1,55 +0,0 @@
#pragma once
#include "config.h"
#if USE_ROCKSDB
#include <Storages/MergeTree/IPartMetadataManager.h>
#include <Storages/MergeTree/MergeTreeMetadataCache.h>
namespace DB
{
/// PartMetadataManagerWithCache stores metadatas of part in RocksDB as cache layer to speed up
/// loading process of merge tree table.
class PartMetadataManagerWithCache : public IPartMetadataManager
{
public:
PartMetadataManagerWithCache(const IMergeTreeDataPart * part_, const MergeTreeMetadataCachePtr & cache_);
~PartMetadataManagerWithCache() override = default;
/// First read the metadata from RocksDB cache, then from disk.
std::unique_ptr<ReadBuffer> read(const String & file_name) const override;
/// First judge existence of the metadata in RocksDB cache, then in disk.
bool exists(const String & file_name) const override;
/// Delete all metadatas in part from RocksDB cache.
void deleteAll(bool include_projection) override;
/// Assert all metadatas in part from RocksDB cache are deleted.
void assertAllDeleted(bool include_projection) const override;
/// Update all metadatas in part from RocksDB cache.
/// Need to be called after part directory is renamed.
void updateAll(bool include_projection) override;
/// Check if all metadatas in part from RocksDB cache are up to date.
std::unordered_map<String, uint128> check() const override;
private:
/// Get cache key from path of metadata file.
/// Format: <disk_name>:relative/full/path/of/metadata/file
String getKeyFromFilePath(const String & file_path) const;
/// Get metadata file path from cache key.
String getFilePathFromKey(const String & key) const;
/// Get cache keys and checksums of corresponding metadata in a part(including projection parts)
void getKeysAndCheckSums(Strings & keys, std::vector<uint128> & checksums) const;
MergeTreeMetadataCachePtr cache;
};
}
#endif

View File

@ -372,8 +372,6 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
return result;
}
part->checkMetadata();
LOG_INFO(log, "Part {} looks good.", part_name);
result.status = {part_name, true, ""};
result.action = ReplicatedCheckResult::DoNothing;

View File

@ -1,83 +0,0 @@
#include "config.h"
#if USE_ROCKSDB
#include <gtest/gtest.h>
#include <rocksdb/table.h>
#include <rocksdb/db.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/MergeTreeMetadataCache.h>
using namespace DB;
class MergeTreeMetadataCacheTest : public ::testing::Test
{
public:
void SetUp() override
{
cache = MergeTreeMetadataCache::create("./db/", 268435456);
}
void TearDown() override
{
cache->shutdown();
cache.reset();
}
MergeTreeMetadataCachePtr cache;
};
TEST_F(MergeTreeMetadataCacheTest, testCommon)
{
std::vector<String> files
= {"columns.txt", "checksums.txt", "primary.idx", "count.txt", "partition.dat", "minmax_p.idx", "default_compression_codec.txt"};
String prefix = "data/test_metadata_cache/check_part_metadata_cache/201806_1_1_0_4/";
for (const auto & file : files)
{
auto status = cache->put(prefix + file, prefix + file);
ASSERT_EQ(status.code(), rocksdb::Status::Code::kOk);
}
for (const auto & file : files)
{
String value;
auto status = cache->get(prefix + file, value);
ASSERT_EQ(status.code(), rocksdb::Status::Code::kOk);
ASSERT_EQ(value, prefix + file);
}
{
Strings keys;
Strings values;
cache->getByPrefix(prefix, keys, values);
ASSERT_EQ(keys.size(), files.size());
ASSERT_EQ(values.size(), files.size());
for (size_t i = 0; i < files.size(); ++i)
{
ASSERT_EQ(values[i], keys[i]);
}
}
for (const auto & file : files)
{
auto status = cache->del(prefix + file);
ASSERT_EQ(status.code(), rocksdb::Status::Code::kOk);
}
for (const auto & file : files)
{
String value;
auto status = cache->get(prefix + file, value);
ASSERT_EQ(status.code(), rocksdb::Status::Code::kNotFound);
}
{
Strings keys;
Strings values;
cache->getByPrefix(prefix, keys, values);
ASSERT_EQ(keys.size(), 0);
ASSERT_EQ(values.size(), 0);
}
}
#endif

View File

@ -539,6 +539,9 @@ void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
name,
[](const StorageFactory::Arguments & args)
{
if (!args.attach && !args.getLocalContext()->getSettingsRef().allow_experimental_s3queue)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3Queue is experimental. You can enable it with the `allow_experimental_s3queue` setting.");
auto & engine_args = args.engine_args;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");

View File

@ -2221,7 +2221,6 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
auto & part_mutable = const_cast<IMergeTreeDataPart &>(*part);
part_mutable.writeChecksums(part->checksums, local_context->getWriteSettings());
part->checkMetadata();
results.emplace_back(part->name, true, "Checksums recounted and written to disk.");
}
catch (const Exception & ex)
@ -2235,7 +2234,6 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
try
{
checkDataPart(part, true);
part->checkMetadata();
results.emplace_back(part->name, true, "");
}
catch (const Exception & ex)

View File

@ -1278,10 +1278,6 @@ void StorageReplicatedMergeTree::paranoidCheckForCoveredPartsInZooKeeperOnStart(
if (!paranoid_check_for_covered_parts)
return;
/// FIXME https://github.com/ClickHouse/ClickHouse/issues/51182
if (getSettings()->use_metadata_cache)
return;
ActiveDataPartSet active_set(format_version);
for (const auto & part_name : parts_in_zk)
active_set.add(part_name);
@ -2023,7 +2019,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::executeFetchShared
}
}
static void paranoidCheckForCoveredPartsInZooKeeper(const StorageReplicatedMergeTree * storage, const ZooKeeperPtr & zookeeper, const String & replica_path,
static void paranoidCheckForCoveredPartsInZooKeeper(const ZooKeeperPtr & zookeeper, const String & replica_path,
MergeTreeDataFormatVersion format_version, const String & covering_part_name)
{
#ifdef ABORT_ON_LOGICAL_ERROR
@ -2037,10 +2033,6 @@ static void paranoidCheckForCoveredPartsInZooKeeper(const StorageReplicatedMerge
if (!paranoid_check_for_covered_parts)
return;
/// FIXME https://github.com/ClickHouse/ClickHouse/issues/51182
if (storage->getSettings()->use_metadata_cache)
return;
auto drop_range_info = MergeTreePartInfo::fromPartName(covering_part_name, format_version);
Strings parts_remain = zookeeper->getChildren(replica_path + "/parts");
for (const auto & part_name : parts_remain)
@ -2109,7 +2101,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
/// Forcibly remove parts from ZooKeeper
removePartsFromZooKeeperWithRetries(parts_to_remove);
paranoidCheckForCoveredPartsInZooKeeper(this, getZooKeeper(), replica_path, format_version, entry.new_part_name);
paranoidCheckForCoveredPartsInZooKeeper(getZooKeeper(), replica_path, format_version, entry.new_part_name);
if (entry.detach)
LOG_DEBUG(log, "Detached {} parts inside {}.", parts_to_remove.size(), entry.new_part_name);
@ -2246,7 +2238,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
LOG_INFO(log, "All parts from REPLACE PARTITION command have been already attached");
removePartsFromZooKeeperWithRetries(parts_to_remove);
if (replace)
paranoidCheckForCoveredPartsInZooKeeper(this, getZooKeeper(), replica_path, format_version, entry_replace.drop_range_part_name);
paranoidCheckForCoveredPartsInZooKeeper(getZooKeeper(), replica_path, format_version, entry_replace.drop_range_part_name);
return true;
}
@ -2559,7 +2551,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
removePartsFromZooKeeperWithRetries(parts_to_remove);
if (replace)
paranoidCheckForCoveredPartsInZooKeeper(this, getZooKeeper(), replica_path, format_version, entry_replace.drop_range_part_name);
paranoidCheckForCoveredPartsInZooKeeper(getZooKeeper(), replica_path, format_version, entry_replace.drop_range_part_name);
res_parts.clear();
parts_to_remove.clear();
cleanup_thread.wakeup();

View File

@ -180,7 +180,7 @@ namespace
{
auto protocol = protocol_string == "https" ? ProxyConfigurationResolver::Protocol::HTTPS
: ProxyConfigurationResolver::Protocol::HTTP;
auto proxy_config = ProxyConfigurationResolverProvider::get(protocol)->resolve();
auto proxy_config = ProxyConfigurationResolverProvider::get(protocol, Context::getGlobalContextInstance()->getConfigRef())->resolve();
return proxyConfigurationToPocoProxyConfiguration(proxy_config);
}

View File

@ -279,7 +279,7 @@ class CiLogsCredentials:
return ""
extra_columns = (
f"{pr_info.number} AS pull_request_number, '{pr_info.sha}' AS commit_sha, "
f"'{check_start_time}' AS check_start_time, '{check_name}' AS check_name, "
f"toDateTime('{check_start_time}', 'UTC') AS check_start_time, '{check_name}' AS check_name, "
f"'{get_instance_type()}' AS instance_type"
)
return (

View File

@ -1,6 +0,0 @@
<clickhouse>
<merge_tree_metadata_cache>
<lru_cache_size>268435456</lru_cache_size>
<continue_if_corrupted>true</continue_if_corrupted>
</merge_tree_metadata_cache>
</clickhouse>

View File

@ -34,7 +34,6 @@ ln -sf $SRC_PATH/config.d/keeper_port.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/merge_tree.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/lost_forever_check.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/metadata_cache.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/tcp_with_proxy.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/prometheus.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/top_level_domains_lists.xml $DEST_SERVER_PATH/config.d/

View File

@ -12,5 +12,51 @@
</replica>
</shard>
</one_shard_two_nodes>
<one_shard_three_nodes>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</one_shard_three_nodes>
<two_shards_three_nodes>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node4</host>
<port>9000</port>
</replica>
<replica>
<host>node5</host>
<port>9000</port>
</replica>
<replica>
<host>node6</host>
<port>9000</port>
</replica>
</shard>
</two_shards_three_nodes>
</remote_servers>
</clickhouse>

View File

@ -1,6 +1,7 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
@ -17,16 +18,50 @@ def start_cluster():
cluster.shutdown()
def test_remote(start_cluster):
def test_cluster(start_cluster):
assert (
node1.query(
"""SELECT hostName() FROM clusterAllReplicas("one_shard_two_nodes", system.one)"""
"SELECT hostName() FROM clusterAllReplicas('one_shard_two_nodes', system.one)"
)
== "node1\nnode2\n"
)
assert (
node1.query(
"""SELECT hostName() FROM cluster("one_shard_two_nodes", system.one)"""
)
node1.query("SELECT hostName() FROM cluster('one_shard_two_nodes', system.one)")
== "node1\n"
)
assert (
node2.query("SELECT hostName() FROM cluster('one_shard_two_nodes', system.one)")
== "node2\n"
)
@pytest.mark.parametrize(
"cluster",
[
pytest.param("one_shard_three_nodes"),
pytest.param("two_shards_three_nodes"),
],
)
def test_skip_unavailable_replica(start_cluster, cluster):
assert (
node1.query(
f"SELECT hostName() FROM clusterAllReplicas('{cluster}', system.one) settings skip_unavailable_shards=1"
)
== "node1\nnode2\n"
)
@pytest.mark.parametrize(
"cluster",
[
pytest.param("one_shard_three_nodes"),
pytest.param("two_shards_three_nodes"),
],
)
def test_error_on_unavailable_replica(start_cluster, cluster):
# clusterAllReplicas() consider each replica as shard
# so when skip_unavailable_shards=0 - any unavailable replica should lead to an error
with pytest.raises(QueryRuntimeException):
node1.query(
f"SELECT hostName() FROM clusterAllReplicas('{cluster}', system.one) settings skip_unavailable_shards=0"
)

View File

@ -2,6 +2,7 @@
<profiles>
<default>
<stream_like_engine_allow_direct_select>1</stream_like_engine_allow_direct_select>
<allow_experimental_s3queue>1</allow_experimental_s3queue>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,10 @@
-1
-7
-23
-104
-730
-17520
-1051200
-63072000
-63072000000
-63072000000000

View File

@ -0,0 +1,10 @@
SELECT dateDiff('years', toDate('2017-12-31'), toDate('2016-01-01'));
SELECT dateDiff('quarters', toDate('2017-12-31'), toDate('2016-01-01'));
SELECT dateDiff('months', toDateTime('2017-12-31'), toDateTime('2016-01-01'));
SELECT dateDiff('weeks', toDateTime('2017-12-31'), toDateTime('2016-01-01'));
SELECT dateDiff('days', toDateTime('2017-12-31'), toDateTime('2016-01-01'));
SELECT dateDiff('hours', toDateTime('2017-12-31', 'UTC'), toDateTime('2016-01-01', 'UTC'));
SELECT dateDiff('minutes', toDateTime('2017-12-31', 'UTC'), toDateTime('2016-01-01', 'UTC'));
SELECT dateDiff('seconds', toDateTime('2017-12-31', 'UTC'), toDateTime('2016-01-01', 'UTC'));
SELECT dateDiff('milliseconds', toDateTime('2017-12-31', 'UTC'), toDateTime('2016-01-01', 'UTC'));
SELECT dateDiff('microseconds', toDateTime('2017-12-31', 'UTC'), toDateTime('2016-01-01', 'UTC'));

View File

@ -40,3 +40,4 @@
13 abc
"rows_read": 3,
2
::1

View File

@ -150,3 +150,15 @@ CREATE TABLE bloom_filter_idx_na
INDEX bf na TYPE bloom_filter(0.1) GRANULARITY 1
) ENGINE = MergeTree()
ORDER BY na" 2>&1 | grep -c 'DB::Exception: Unexpected type Array(Array(String)) of bloom filter index'
# NGRAM BF with IPv6
$CLICKHOUSE_CLIENT -n --query="
CREATE TABLE bloom_filter_ipv6_idx
(
foo IPv6,
INDEX fooIndex foo TYPE ngrambf_v1(8,512,3,0) GRANULARITY 1
) ENGINE = MergeTree() ORDER BY foo;"
$CLICKHOUSE_CLIENT --query="INSERT INTO bloom_filter_ipv6_idx VALUES ('::1.2.3.4'),('::0'),('::1')"
$CLICKHOUSE_CLIENT --query="SELECT * FROM bloom_filter_ipv6_idx WHERE foo IN ('::1')"
$CLICKHOUSE_CLIENT --query="DROP TABLE bloom_filter_ipv6_idx"

View File

@ -13,8 +13,8 @@ SETTINGS min_bytes_for_wide_part = 10485760,
type = cache,
max_size = '128Mi',
path = '${CLICKHOUSE_TEST_UNIQUE_NAME}_cache',
enable_bypass_cache_with_threashold = 1,
bypass_cache_threashold = 100,
enable_bypass_cache_with_threshold = 1,
bypass_cache_threshold = 100,
delayed_cleanup_interval_ms = 100,
disk = 's3_disk');
INSERT INTO test SELECT number, toString(number) FROM numbers(100);

View File

@ -16,8 +16,8 @@ SETTINGS min_bytes_for_wide_part = 10485760,
type = cache,
max_size = '128Mi',
path = '${CLICKHOUSE_TEST_UNIQUE_NAME}_cache',
enable_bypass_cache_with_threashold = 1,
bypass_cache_threashold = 100,
enable_bypass_cache_with_threshold = 1,
bypass_cache_threshold = 100,
delayed_cleanup_interval_ms = 100,
disk = 's3_disk');