Merge branch 'master' into check_time_limit_in_index_analysis

This commit is contained in:
Alexander Gololobov 2024-09-16 13:36:51 +02:00 committed by GitHub
commit 8507d209c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 523 additions and 119 deletions

View File

@ -4,15 +4,31 @@ description: Prints workflow debug info
runs:
using: "composite"
steps:
- name: Print envs
- name: Envs, event.json and contexts
shell: bash
run: |
echo "::group::Envs"
env
echo "::endgroup::"
- name: Print Event.json
shell: bash
run: |
echo "::group::Event.json"
echo '::group::Environment variables'
env | sort
echo '::endgroup::'
echo '::group::event.json'
python3 -m json.tool "$GITHUB_EVENT_PATH"
echo "::endgroup::"
echo '::endgroup::'
cat << 'EOF'
::group::github context
${{ toJSON(github) }}
::endgroup::
::group::env context
${{ toJSON(env) }}
::endgroup::
::group::runner context
${{ toJSON(runner) }}
::endgroup::
::group::job context
${{ toJSON(job) }}
::endgroup::
EOF

View File

@ -27,6 +27,8 @@ jobs:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: Labels check
run: |
cd "$GITHUB_WORKSPACE/tests/ci"

View File

@ -33,6 +33,8 @@ jobs:
clear-repository: true
token: ${{secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN}}
fetch-depth: 0
- name: Debug Info
uses: ./.github/actions/debug
- name: Cherry pick
run: |
cd "$GITHUB_WORKSPACE/tests/ci"

View File

@ -56,13 +56,13 @@ jobs:
GH_TOKEN: ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }}
runs-on: [self-hosted, release-maker]
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
token: ${{secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN}}
fetch-depth: 0
- name: Debug Info
uses: ./.github/actions/debug
- name: Prepare Release Info
shell: bash
run: |

View File

@ -11,6 +11,7 @@ name: Build docker images
required: false
type: boolean
default: false
jobs:
DockerBuildAarch64:
runs-on: [self-hosted, style-checker-aarch64]

View File

@ -8,20 +8,21 @@ on: # yamllint disable-line rule:truthy
schedule:
- cron: '0 */6 * * *'
workflow_dispatch:
jobs:
RunConfig:
runs-on: [self-hosted, style-checker-aarch64]
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: PrepareRunConfig
id: runconfig
run: |

View File

@ -15,14 +15,14 @@ jobs:
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: Merge sync PR
run: |
cd "$GITHUB_WORKSPACE/tests/ci"

View File

@ -14,14 +14,14 @@ jobs:
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get a version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: Cancel PR workflow
run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --cancel-previous-run

View File

@ -15,14 +15,14 @@ jobs:
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: PrepareRunConfig
id: runconfig
run: |

View File

@ -25,14 +25,14 @@ jobs:
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get a version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: Cancel previous Sync PR workflow
run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --cancel-previous-run

View File

@ -24,6 +24,8 @@ jobs:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get version
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: Labels check
run: |
cd "$GITHUB_WORKSPACE/tests/ci"

View File

@ -62,8 +62,6 @@ jobs:
env:
GITHUB_JOB_OVERRIDDEN: ${{inputs.test_name}}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
@ -72,6 +70,8 @@ jobs:
submodules: ${{inputs.submodules}}
fetch-depth: ${{inputs.checkout_depth}}
filter: tree:0
- name: Debug Info
uses: ./.github/actions/debug
- name: Set build envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'

View File

@ -107,6 +107,10 @@ The vector similarity index currently does not work with per-table, non-default
[here](https://github.com/ClickHouse/ClickHouse/pull/51325#issuecomment-1605920475)). If necessary, the value must be changed in config.xml.
:::
Vector index creation is known to be slow. To speed the process up, index creation can be parallelized. The maximum number of threads can be
configured using server configuration
setting [max_build_vector_similarity_index_thread_pool_size](../../../operations/server-configuration-parameters/settings.md#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size).
ANN indexes are built during column insertion and merge. As a result, `INSERT` and `OPTIMIZE` statements will be slower than for ordinary
tables. ANNIndexes are ideally used only with immutable or rarely changed data, respectively when are far more read requests than write
requests.

View File

@ -491,6 +491,14 @@ Type: Double
Default: 0.9
## max_build_vector_similarity_index_thread_pool_size {#server_configuration_parameters_max_build_vector_similarity_index_thread_pool_size}
The maximum number of threads to use for building vector indexes. 0 means all cores.
Type: UInt64
Default: 16
## cgroups_memory_usage_observer_wait_time
Interval in seconds during which the server's maximum allowed memory consumption is adjusted by the corresponding threshold in cgroups. (see

View File

@ -178,6 +178,9 @@
M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \
M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \
M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \
M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \
M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \
M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \
\
M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
M(DiskPlainRewritableLocalDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for LocalObjectStorage.") \

View File

@ -609,6 +609,7 @@
M(728, UNEXPECTED_DATA_TYPE) \
M(729, ILLEGAL_TIME_SERIES_TAGS) \
M(730, REFRESH_FAILED) \
M(731, QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -63,6 +63,7 @@ static struct InitFiu
REGULAR(keepermap_fail_drop_data) \
REGULAR(lazy_pipe_fds_fail_close) \
PAUSEABLE(infinite_sleep) \
PAUSEABLE(stop_moving_part_before_swap_with_active) \
REGULAR(slowdown_index_analysis) \

View File

@ -50,7 +50,7 @@ namespace DB
M(UInt32, asynchronous_heavy_metrics_update_period_s, 120, "Period in seconds for updating heavy asynchronous metrics.", 0) \
M(String, default_database, "default", "Default database name.", 0) \
M(String, tmp_policy, "", "Policy for storage with temporary data.", 0) \
M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting., ", 0) \
M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting.", 0) \
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \
M(GroupArrayActionWhenLimitReached, aggregate_function_group_array_action_when_limit_is_reached, GroupArrayActionWhenLimitReached::THROW, "Action to execute when max array element size is exceeded in groupArray: `throw` exception, or `discard` extra values", 0) \
@ -65,6 +65,7 @@ namespace DB
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \
M(UInt64, max_build_vector_similarity_index_thread_pool_size, 16, "The maximum number of threads to use to build vector similarity indexes. 0 means all cores.", 0) \
\
/* Database Catalog */ \
M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \

View File

@ -10,6 +10,7 @@
#include <Common/SensitiveDataMasker.h>
#include <Common/Macros.h>
#include <Common/EventNotifier.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/Throttler.h>
@ -121,7 +122,6 @@
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <base/defines.h>
namespace fs = std::filesystem;
namespace ProfileEvents
@ -164,6 +164,9 @@ namespace CurrentMetrics
extern const Metric TablesLoaderForegroundThreadsActive;
extern const Metric TablesLoaderForegroundThreadsScheduled;
extern const Metric IOWriterThreadsScheduled;
extern const Metric BuildVectorSimilarityIndexThreads;
extern const Metric BuildVectorSimilarityIndexThreadsActive;
extern const Metric BuildVectorSimilarityIndexThreadsScheduled;
extern const Metric AttachedTable;
extern const Metric AttachedView;
extern const Metric AttachedDictionary;
@ -297,6 +300,8 @@ struct ContextSharedPart : boost::noncopyable
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable OnceFlag prefetch_threadpool_initialized;
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable OnceFlag build_vector_similarity_index_threadpool_initialized;
mutable std::unique_ptr<ThreadPool> build_vector_similarity_index_threadpool; /// Threadpool for vector-similarity index creation.
mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices.
mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results.
mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices.
@ -3297,6 +3302,21 @@ size_t Context::getPrefetchThreadpoolSize() const
return config.getUInt(".prefetch_threadpool_pool_size", 100);
}
ThreadPool & Context::getBuildVectorSimilarityIndexThreadPool() const
{
callOnce(shared->build_vector_similarity_index_threadpool_initialized, [&] {
size_t pool_size = shared->server_settings.max_build_vector_similarity_index_thread_pool_size > 0
? shared->server_settings.max_build_vector_similarity_index_thread_pool_size
: getNumberOfPhysicalCPUCores();
shared->build_vector_similarity_index_threadpool = std::make_unique<ThreadPool>(
CurrentMetrics::BuildVectorSimilarityIndexThreads,
CurrentMetrics::BuildVectorSimilarityIndexThreadsActive,
CurrentMetrics::BuildVectorSimilarityIndexThreadsScheduled,
pool_size);
});
return *shared->build_vector_similarity_index_threadpool;
}
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
{
callOnce(shared->buffer_flush_schedule_pool_initialized, [&] {

View File

@ -1097,6 +1097,8 @@ public:
/// and make a prefetch by putting a read task to threadpoolReader.
size_t getPrefetchThreadpoolSize() const;
ThreadPool & getBuildVectorSimilarityIndexThreadPool() const;
/// Settings for MergeTree background tasks stored in config.xml
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;
BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const;

View File

@ -1236,6 +1236,7 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block,
void HashJoin::reuseJoinedData(const HashJoin & join)
{
have_compressed = join.have_compressed;
data = join.data;
from_storage_join = true;

View File

@ -99,6 +99,7 @@ namespace DB
namespace ErrorCodes
{
extern const int QUERY_CACHE_USED_WITH_NONDETERMINISTIC_FUNCTIONS;
extern const int QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE;
extern const int QUERY_CACHE_USED_WITH_SYSTEM_TABLE;
extern const int INTO_OUTFILE_NOT_ALLOWED;
extern const int INVALID_TRANSACTION;
@ -1118,22 +1119,24 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
&& settings.use_query_cache
&& !internal
&& client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
/// Bug 67476: Avoid that the query cache stores truncated results if the query ran with a non-THROW overflow mode and hit a limit.
/// This is more workaround than a fix ... unfortunately it is hard to detect from the perspective of the query cache that the
/// query result is truncated.
&& (settings.read_overflow_mode == OverflowMode::THROW
&& settings.read_overflow_mode_leaf == OverflowMode::THROW
&& settings.group_by_overflow_mode == OverflowMode::THROW
&& settings.sort_overflow_mode == OverflowMode::THROW
&& settings.result_overflow_mode == OverflowMode::THROW
&& settings.timeout_overflow_mode == OverflowMode::THROW
&& settings.set_overflow_mode == OverflowMode::THROW
&& settings.join_overflow_mode == OverflowMode::THROW
&& settings.transfer_overflow_mode == OverflowMode::THROW
&& settings.distinct_overflow_mode == OverflowMode::THROW)
&& (ast->as<ASTSelectQuery>() || ast->as<ASTSelectWithUnionQuery>());
QueryCache::Usage query_cache_usage = QueryCache::Usage::None;
/// Bug 67476: If the query runs with a non-THROW overflow mode and hits a limit, the query cache will store a truncated result (if
/// enabled). This is incorrect. Unfortunately it is hard to detect from the perspective of the query cache that the query result
/// is truncated. Therefore throw an exception, to notify the user to disable either the query cache or use another overflow mode.
if (settings.use_query_cache && (settings.read_overflow_mode != OverflowMode::THROW
|| settings.read_overflow_mode_leaf != OverflowMode::THROW
|| settings.group_by_overflow_mode != OverflowMode::THROW
|| settings.sort_overflow_mode != OverflowMode::THROW
|| settings.result_overflow_mode != OverflowMode::THROW
|| settings.timeout_overflow_mode != OverflowMode::THROW
|| settings.set_overflow_mode != OverflowMode::THROW
|| settings.join_overflow_mode != OverflowMode::THROW
|| settings.transfer_overflow_mode != OverflowMode::THROW
|| settings.distinct_overflow_mode != OverflowMode::THROW))
throw Exception(ErrorCodes::QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE, "use_query_cache and overflow_mode != 'throw' cannot be used together");
/// If the query runs with "use_query_cache = 1", we first probe if the query cache already contains the query result (if yes:
/// return result from cache). If doesn't, we execute the query normally and write the result into the query cache. Both steps use a
/// hash of the AST, the current database and the settings as cache key. Unfortunately, the settings are in some places internally

View File

@ -74,7 +74,8 @@ private:
findMySQLFunctionSecretArguments();
}
else if ((function.name == "s3") || (function.name == "cosn") || (function.name == "oss") ||
(function.name == "deltaLake") || (function.name == "hudi") || (function.name == "iceberg"))
(function.name == "deltaLake") || (function.name == "hudi") || (function.name == "iceberg") ||
(function.name == "gcs"))
{
/// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...)
findS3FunctionSecretArguments(/* is_cluster_function= */ false);

View File

@ -2009,33 +2009,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
{
auto result = getAnalysisResult();
if (is_parallel_reading_from_replicas && context->canUseParallelReplicasOnInitiator()
&& context->getSettingsRef().parallel_replicas_local_plan)
{
CoordinationMode mode = CoordinationMode::Default;
switch (result.read_type)
{
case ReadFromMergeTree::ReadType::Default:
mode = CoordinationMode::Default;
break;
case ReadFromMergeTree::ReadType::InOrder:
mode = CoordinationMode::WithOrder;
break;
case ReadFromMergeTree::ReadType::InReverseOrder:
mode = CoordinationMode::ReverseOrder;
break;
case ReadFromMergeTree::ReadType::ParallelReplicas:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read type can't be ParallelReplicas on initiator");
}
chassert(number_of_current_replica.has_value());
chassert(all_ranges_callback.has_value());
/// initialize working set from local replica
all_ranges_callback.value()(
InitialAllRangesAnnouncement(mode, result.parts_with_ranges.getDescriptions(), number_of_current_replica.value()));
}
if (enable_remove_parts_from_snapshot_optimization)
{
/// Do not keep data parts in snapshot.

View File

@ -5,9 +5,11 @@
#include <Columns/ColumnArray.h>
#include <Common/BitHelpers.h>
#include <Common/formatReadable.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Core/Field.h>
#include <Core/ServerSettings.h>
#include <DataTypes/DataTypeArray.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -29,7 +31,6 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_ALLOCATE_MEMORY;
extern const int FORMAT_VERSION_TOO_OLD;
extern const int ILLEGAL_COLUMN;
extern const int INCORRECT_DATA;
@ -131,8 +132,7 @@ void USearchIndexWithSerialization::deserialize(ReadBuffer & istr)
/// See the comment in MergeTreeIndexGranuleVectorSimilarity::deserializeBinary why we throw here
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not load vector similarity index. Please drop the index and create it again. Error: {}", String(result.error.release()));
if (!try_reserve(limits()))
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for usearch index");
try_reserve(limits());
}
USearchIndexWithSerialization::Statistics USearchIndexWithSerialization::getStatistics() const
@ -270,20 +270,49 @@ void updateImpl(const ColumnArray * column_array, const ColumnArray::Offsets & c
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column with vector similarity index must have equal length");
/// Reserving space is mandatory
if (!index->try_reserve(roundUpToPowerOfTwoOrZero(index->size() + rows)))
throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for vector similarity index");
size_t max_thread_pool_size = Context::getGlobalContextInstance()->getServerSettings().max_build_vector_similarity_index_thread_pool_size;
if (max_thread_pool_size == 0)
max_thread_pool_size = getNumberOfPhysicalCPUCores();
unum::usearch::index_limits_t limits(roundUpToPowerOfTwoOrZero(index->size() + rows), max_thread_pool_size);
index->reserve(limits);
for (size_t row = 0; row < rows; ++row)
/// Vector index creation is slooooow. Add the new rows in parallel. The threadpool is global to avoid oversubscription when multiple
/// indexes are build simultaneously (e.g. multiple merges run at the same time).
auto & thread_pool = Context::getGlobalContextInstance()->getBuildVectorSimilarityIndexThreadPool();
auto add_vector_to_index = [&](USearchIndex::vector_key_t key, size_t row, ThreadGroupPtr thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
/// add is thread-safe
if (auto result = index->add(key, &column_array_data_float_data[column_array_offsets[row - 1]]); !result)
{
if (auto result = index->add(static_cast<USearchIndex::vector_key_t>(index->size()), &column_array_data_float_data[column_array_offsets[row - 1]]); !result)
throw Exception(ErrorCodes::INCORRECT_DATA, "Could not add data to vector similarity index. Error: {}", String(result.error.release()));
}
else
{
ProfileEvents::increment(ProfileEvents::USearchAddCount);
ProfileEvents::increment(ProfileEvents::USearchAddVisitedMembers, result.visited_members);
ProfileEvents::increment(ProfileEvents::USearchAddComputedDistances, result.computed_distances);
}
};
size_t index_size = index->size();
for (size_t row = 0; row < rows; ++row)
{
auto key = static_cast<USearchIndex::vector_key_t>(index_size + row);
auto task = [group = CurrentThread::getGroup(), &add_vector_to_index, key, row] { add_vector_to_index(key, row, group); };
thread_pool.scheduleOrThrowOnError(task);
}
thread_pool.wait();
}
}

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Common/FailPoint.h>
#include <Common/logger_useful.h>
#include <set>
@ -15,6 +16,11 @@ namespace ErrorCodes
extern const int DIRECTORY_ALREADY_EXISTS;
}
namespace FailPoints
{
extern const char stop_moving_part_before_swap_with_active[];
}
namespace
{
@ -226,6 +232,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
cloned_part.temporary_directory_lock = data->getTemporaryPartDirectoryHolder(part->name);
MutableDataPartStoragePtr cloned_part_storage;
bool preserve_blobs = false;
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
{
/// Try zero-copy replication and fallback to default copy if it's not possible
@ -253,6 +260,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
if (zero_copy_part)
{
/// FIXME for some reason we cannot just use this part, we have to re-create it through MergeTreeDataPartBuilder
preserve_blobs = true;
zero_copy_part->is_temp = false; /// Do not remove it in dtor
cloned_part_storage = zero_copy_part->getDataPartStoragePtr();
}
@ -272,7 +280,17 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
cloned_part.part = std::move(builder).withPartFormatFromDisk().build();
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part.part->getDataPartStorage().getFullPath());
cloned_part.part->is_temp = data->allowRemoveStaleMovingParts();
cloned_part.part->is_temp = false;
if (data->allowRemoveStaleMovingParts())
{
cloned_part.part->is_temp = true;
/// Setting it in case connection to zookeeper is lost while moving
/// Otherwise part might be stuck in the moving directory due to the KEEPER_EXCEPTION in part's destructor
if (preserve_blobs)
cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::PRESERVE_BLOBS;
else
cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS;
}
cloned_part.part->loadColumnsChecksumsIndexes(true, true);
cloned_part.part->loadVersionMetadata();
cloned_part.part->modification_time = cloned_part.part->getDataPartStorage().getLastModified().epochTime();
@ -282,6 +300,8 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const
{
/// Used to get some stuck parts in the moving directory by stopping moves while pause is active
FailPointInjection::pauseFailPoint(FailPoints::stop_moving_part_before_swap_with_active);
if (moves_blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");

View File

@ -15,6 +15,7 @@ namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
extern const int REPLICA_STATUS_CHANGED;
extern const int LOGICAL_ERROR;
}
ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicatedMergeTree & storage_)
@ -117,6 +118,67 @@ void ReplicatedMergeTreeAttachThread::checkHasReplicaMetadataInZooKeeper(const z
}
}
Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper)
{
const String & zookeeper_path = storage.zookeeper_path;
const String & replica_path = storage.replica_path;
const bool replica_readonly = storage.is_readonly;
for (size_t i = 0; i != 2; ++i)
{
String replica_metadata_version_str;
const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version_str);
if (!replica_metadata_version_exists)
return -1;
const Int32 metadata_version = parse<Int32>(replica_metadata_version_str);
if (metadata_version != 0 || replica_readonly)
{
/// No need to fix anything
return metadata_version;
}
Coordination::Stat stat;
zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat);
if (stat.version == 0)
{
/// No need to fix anything
return metadata_version;
}
ReplicatedMergeTreeQueue & queue = storage.queue;
queue.pullLogsToQueue(zookeeper);
if (queue.getStatus().metadata_alters_in_queue != 0)
{
LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue");
return metadata_version;
}
const Coordination::Requests ops = {
zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0),
zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version),
};
Coordination::Responses ops_responses;
const auto code = zookeeper->tryMulti(ops, ops_responses);
if (code == Coordination::Error::ZOK)
{
LOG_DEBUG(log, "Successfully set metadata_version to {}", stat.version);
return stat.version;
}
if (code != Coordination::Error::ZBADVERSION)
{
throw zkutil::KeeperException(code);
}
}
/// Second attempt is only possible if metadata_version != 0 or metadata.version changed during the first attempt.
/// If metadata_version != 0, on second attempt we will return the new metadata_version.
/// If metadata.version changed, on second attempt we will either get metadata_version != 0 and return the new metadata_version or we will get metadata_alters_in_queue != 0 and return 0.
/// Either way, on second attempt this method should return.
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to fix replica metadata_version in ZooKeeper after two attempts");
}
void ReplicatedMergeTreeAttachThread::runImpl()
{
storage.setZooKeeper();
@ -160,11 +222,11 @@ void ReplicatedMergeTreeAttachThread::runImpl()
/// Just in case it was not removed earlier due to connection loss
zookeeper->tryRemove(replica_path + "/flags/force_restore_data");
String replica_metadata_version;
const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version);
const Int32 replica_metadata_version = fixReplicaMetadataVersionIfNeeded(zookeeper);
const bool replica_metadata_version_exists = replica_metadata_version != -1;
if (replica_metadata_version_exists)
{
storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(parse<int>(replica_metadata_version)));
storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(replica_metadata_version));
}
else
{

View File

@ -48,6 +48,8 @@ private:
void runImpl();
void finalizeInitialization();
Int32 fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper);
};
}

View File

@ -2222,6 +2222,7 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const
res.inserts_in_queue = 0;
res.merges_in_queue = 0;
res.part_mutations_in_queue = 0;
res.metadata_alters_in_queue = 0;
res.queue_oldest_time = 0;
res.inserts_oldest_time = 0;
res.merges_oldest_time = 0;
@ -2264,6 +2265,11 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const
res.oldest_part_to_mutate_to = entry->new_part_name;
}
}
if (entry->type == LogEntry::ALTER_METADATA)
{
++res.metadata_alters_in_queue;
}
}
return res;

View File

@ -473,6 +473,7 @@ public:
UInt32 inserts_in_queue;
UInt32 merges_in_queue;
UInt32 part_mutations_in_queue;
UInt32 metadata_alters_in_queue;
UInt32 queue_oldest_time;
UInt32 inserts_oldest_time;
UInt32 merges_oldest_time;

View File

@ -42,6 +42,7 @@
<multi_read>1</multi_read>
<check_not_exists>1</check_not_exists>
<create_if_not_exists>1</create_if_not_exists>
<remove_recursive>1</remove_recursive>
</feature_flags>
</keeper_server>
</clickhouse>

View File

@ -64,6 +64,7 @@ function configure()
randomize_config_boolean_value multi_read keeper_port
randomize_config_boolean_value check_not_exists keeper_port
randomize_config_boolean_value create_if_not_exists keeper_port
randomize_config_boolean_value remove_recursive keeper_port
fi
sudo chown clickhouse /etc/clickhouse-server/config.d/keeper_port.xml

View File

@ -393,6 +393,7 @@ def test_table_functions():
f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')",
f"iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')",
f"gcs('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')",
]
def make_test_case(i):

View File

@ -0,0 +1,33 @@
<clickhouse>
<remote_servers>
<parallel_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>node0</host>
<port>9000</port>
</replica>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
<replica>
<host>node4</host>
<port>9000</port>
</replica>
<replica>
<host>node5</host>
<port>9000</port>
</replica>
</shard>
</parallel_replicas>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,73 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
nodes = [
cluster.add_instance(
f"node{num}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
for num in range(6)
]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def _create_tables(table_name):
for idx, node in enumerate(nodes):
node.query(
f"DROP TABLE IF EXISTS {table_name}",
settings={"database_atomic_wait_for_drop_and_detach_synchronously": True},
)
node.query(
f"""
CREATE TABLE {table_name} (value Int64)
Engine=ReplicatedMergeTree('/test_parallel_replicas/shard/{table_name}', '{idx}')
ORDER BY ()
"""
)
nodes[0].query(
f"INSERT INTO {table_name} SELECT * FROM numbers(1000)",
settings={"insert_deduplicate": 0},
)
nodes[0].query(f"SYSTEM SYNC REPLICA ON CLUSTER 'parallel_replicas' {table_name}")
for idx, node in enumerate(nodes):
node.query("SYSTEM STOP REPLICATED SENDS")
# the same data on all nodes except for a single value
node.query(
f"INSERT INTO {table_name} VALUES ({idx})",
settings={"insert_deduplicate": 0},
)
# check that we use the state of data parts from the initiator node (for some sort of determinism of what is been read).
# currently it is implemented only when we build local plan for the initiator node (we aim to make this behavior default)
def test_initiator_snapshot_is_used_for_reading(start_cluster):
table_name = "t"
_create_tables(table_name)
for idx, node in enumerate(nodes):
expected = 499500 + idx # sum of all integers 0..999 + idx
assert (
node.query(
f"SELECT sum(value) FROM {table_name}",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 100,
"cluster_for_parallel_replicas": "parallel_replicas",
"parallel_replicas_local_plan": True,
},
)
== f"{expected}\n"
)

View File

@ -0,0 +1,46 @@
<clickhouse>
<remote_servers>
<cluster>
<shard>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
<macros>
<shard>01</shard>
</macros>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<s3>
<volumes>
<default>
<disk>default</disk>
<perform_ttl_move_on_insert>False</perform_ttl_move_on_insert>
</default>
<s3>
<disk>s3</disk>
<perform_ttl_move_on_insert>False</perform_ttl_move_on_insert>
</s3>
</volumes>
<move_factor>0.0</move_factor>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<allow_remote_fs_zero_copy_replication>true</allow_remote_fs_zero_copy_replication>
<storage_policy>s3</storage_policy>
</merge_tree>
<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
</clickhouse>

View File

@ -0,0 +1,117 @@
from pathlib import Path
import time
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance(
"ch1",
main_configs=[
"config.xml",
],
macros={"replica": "node1"},
with_zookeeper=True,
with_minio=True,
)
DATABASE_NAME = "stale_moving_parts"
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def q(node, query):
return node.query(database=DATABASE_NAME, sql=query)
# .../disks/s3/store/
def get_table_path(node, table):
return (
node.query(
sql=f"SELECT data_paths FROM system.tables WHERE table = '{table}' and database = '{DATABASE_NAME}' LIMIT 1"
)
.strip('"\n[]')
.split(",")[1]
.strip("'")
)
def exec(node, cmd, path):
return node.exec_in_container(
[
"bash",
"-c",
f"{cmd} {path}",
]
)
def wait_part_is_stuck(node, table_moving_path, moving_part):
num_tries = 5
while q(node, "SELECT part_name FROM system.moves").strip() != moving_part:
if num_tries == 0:
raise Exception("Part has not started to move")
num_tries -= 1
time.sleep(1)
num_tries = 5
while exec(node, "ls", table_moving_path).strip() != moving_part:
if num_tries == 0:
raise Exception("Part is not stuck in the moving directory")
num_tries -= 1
time.sleep(1)
def wait_zookeeper_node_to_start(zk_nodes, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
for instance in zk_nodes:
conn = cluster.get_kazoo_client(instance)
conn.get_children("/")
print("All instances of ZooKeeper started")
return
except Exception as ex:
print(("Can't connect to ZooKeeper " + str(ex)))
time.sleep(0.5)
def test_remove_stale_moving_parts_without_zookeeper(started_cluster):
ch1.query(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")
q(
ch1,
"CREATE TABLE test_remove ON CLUSTER cluster ( id UInt32 ) ENGINE ReplicatedMergeTree() ORDER BY id;",
)
table_moving_path = Path(get_table_path(ch1, "test_remove")) / "moving"
q(ch1, "SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active")
q(ch1, "INSERT INTO test_remove SELECT number FROM numbers(100);")
moving_part = "all_0_0_0"
move_response = ch1.get_query_request(
sql=f"ALTER TABLE test_remove MOVE PART '{moving_part}' TO DISK 's3'",
database=DATABASE_NAME,
)
wait_part_is_stuck(ch1, table_moving_path, moving_part)
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
# Stop moves in case table is not read-only yet
q(ch1, "SYSTEM STOP MOVES")
q(ch1, "SYSTEM DISABLE FAILPOINT stop_moving_part_before_swap_with_active")
assert "Cancelled moving parts" in move_response.get_error()
assert exec(ch1, "ls", table_moving_path).strip() == ""
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
wait_zookeeper_node_to_start(["zoo1", "zoo2", "zoo3"])
q(ch1, "SYSTEM START MOVES")
q(ch1, f"DROP TABLE test_remove")

View File

@ -23,23 +23,3 @@ Row 1:
x: 1
2
-- Bug 67476: Queries with overflow mode != throw must not be cached by the query cache
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0

View File

@ -43,25 +43,15 @@ DROP TABLE IF EXISTS tab;
CREATE TABLE tab(c UInt64) ENGINE = Memory;
SYSTEM DROP QUERY CACHE;
SELECT sum(c) FROM tab SETTINGS read_overflow_mode = 'break', use_query_cache = 1;
SELECT count(*) from system.query_cache;
SELECT sum(c) FROM tab SETTINGS read_overflow_mode_leaf = 'break', use_query_cache = 1;
SELECT count(*) from system.query_cache;
SELECT sum(c) FROM tab SETTINGS group_by_overflow_mode = 'break', use_query_cache = 1;
SELECT count(*) from system.query_cache;
SELECT sum(c) FROM tab SETTINGS sort_overflow_mode = 'break', use_query_cache = 1;
SELECT count(*) from system.query_cache;
SELECT sum(c) FROM tab SETTINGS result_overflow_mode = 'break', use_query_cache = 1;
SELECT count(*) from system.query_cache;
SELECT sum(c) FROM tab SETTINGS timeout_overflow_mode = 'break', use_query_cache = 1;
SELECT count(*) from system.query_cache;
SELECT sum(c) FROM tab SETTINGS set_overflow_mode = 'break', use_query_cache = 1;
SELECT count(*) from system.query_cache;
SELECT sum(c) FROM tab SETTINGS join_overflow_mode = 'break', use_query_cache = 1;
SELECT count(*) from system.query_cache;
SELECT sum(c) FROM tab SETTINGS transfer_overflow_mode = 'break', use_query_cache = 1;
SELECT count(*) from system.query_cache;
SELECT sum(c) FROM tab SETTINGS distinct_overflow_mode = 'break', use_query_cache = 1;
SELECT count(*) from system.query_cache;
SELECT sum(c) FROM tab SETTINGS read_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE }
SELECT sum(c) FROM tab SETTINGS read_overflow_mode_leaf = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE }
SELECT sum(c) FROM tab SETTINGS group_by_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE }
SELECT sum(c) FROM tab SETTINGS sort_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE }
SELECT sum(c) FROM tab SETTINGS result_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE }
SELECT sum(c) FROM tab SETTINGS timeout_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE }
SELECT sum(c) FROM tab SETTINGS set_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE }
SELECT sum(c) FROM tab SETTINGS join_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE }
SELECT sum(c) FROM tab SETTINGS transfer_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE }
SELECT sum(c) FROM tab SETTINGS distinct_overflow_mode = 'break', use_query_cache = 1; -- { serverError QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE }
SYSTEM DROP QUERY CACHE;

View File

@ -1,2 +1,2 @@
default 127.0.0.1 9181 0 0 0 1 1 ['FILTERED_LIST','MULTI_READ','CHECK_NOT_EXISTS','CREATE_IF_NOT_EXISTS']
default 127.0.0.1 9181 0 0 0 1 1 ['FILTERED_LIST','MULTI_READ','CHECK_NOT_EXISTS','CREATE_IF_NOT_EXISTS','REMOVE_RECURSIVE']
zookeeper2 localhost 9181 0 0 0 1