mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-14 10:22:10 +00:00
Merge branch 'master' into Kusto-phase1
This commit is contained in:
commit
d5337fd813
228
.github/workflows/pull_request.yml
vendored
228
.github/workflows/pull_request.yml
vendored
@ -1254,6 +1254,228 @@ jobs:
|
||||
# shellcheck disable=SC2046
|
||||
docker rm -f $(docker ps -a -q) ||:
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
FunctionalStatelessTestS3Debug0:
|
||||
needs: [BuilderDebDebug]
|
||||
runs-on: [self-hosted, func-tester]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/stateless_s3_storage_debug
|
||||
REPORTS_PATH=${{runner.temp}}/reports_dir
|
||||
CHECK_NAME=Stateless tests (debug, s3 storage)
|
||||
REPO_COPY=${{runner.temp}}/stateless_s3_storage_debug/ClickHouse
|
||||
KILL_TIMEOUT=10800
|
||||
RUN_BY_HASH_NUM=0
|
||||
RUN_BY_HASH_TOTAL=3
|
||||
EOF
|
||||
- name: Download json reports
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
path: ${{ env.REPORTS_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
- name: Functional test
|
||||
run: |
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci"
|
||||
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
docker kill "$(docker ps -q)" ||:
|
||||
docker rm -f "$(docker ps -a -q)" ||:
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
FunctionalStatelessTestS3Debug1:
|
||||
needs: [BuilderDebDebug]
|
||||
runs-on: [self-hosted, func-tester]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/stateless_s3_storage_debug
|
||||
REPORTS_PATH=${{runner.temp}}/reports_dir
|
||||
CHECK_NAME=Stateless tests (debug, s3 storage)
|
||||
REPO_COPY=${{runner.temp}}/stateless_s3_storage_debug/ClickHouse
|
||||
KILL_TIMEOUT=10800
|
||||
RUN_BY_HASH_NUM=1
|
||||
RUN_BY_HASH_TOTAL=3
|
||||
EOF
|
||||
- name: Download json reports
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
path: ${{ env.REPORTS_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
- name: Functional test
|
||||
run: |
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci"
|
||||
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
docker kill "$(docker ps -q)" ||:
|
||||
docker rm -f "$(docker ps -a -q)" ||:
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
FunctionalStatelessTestS3Debug2:
|
||||
needs: [BuilderDebDebug]
|
||||
runs-on: [self-hosted, func-tester]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/stateless_s3_storage_debug
|
||||
REPORTS_PATH=${{runner.temp}}/reports_dir
|
||||
CHECK_NAME=Stateless tests (debug, s3 storage)
|
||||
REPO_COPY=${{runner.temp}}/stateless_s3_storage_debug/ClickHouse
|
||||
KILL_TIMEOUT=10800
|
||||
RUN_BY_HASH_NUM=2
|
||||
RUN_BY_HASH_TOTAL=3
|
||||
EOF
|
||||
- name: Download json reports
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
path: ${{ env.REPORTS_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
- name: Functional test
|
||||
run: |
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci"
|
||||
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
docker kill "$(docker ps -q)" ||:
|
||||
docker rm -f "$(docker ps -a -q)" ||:
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
FunctionalStatelessTestS3Tsan0:
|
||||
needs: [BuilderDebTsan]
|
||||
runs-on: [self-hosted, func-tester]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/stateless_s3_storage_tsan
|
||||
REPORTS_PATH=${{runner.temp}}/reports_dir
|
||||
CHECK_NAME=Stateless tests (tsan, s3 storage)
|
||||
REPO_COPY=${{runner.temp}}/stateless_s3_storage_tsan/ClickHouse
|
||||
KILL_TIMEOUT=10800
|
||||
RUN_BY_HASH_NUM=0
|
||||
RUN_BY_HASH_TOTAL=3
|
||||
EOF
|
||||
- name: Download json reports
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
path: ${{ env.REPORTS_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
- name: Functional test
|
||||
run: |
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci"
|
||||
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
docker kill "$(docker ps -q)" ||:
|
||||
docker rm -f "$(docker ps -a -q)" ||:
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
FunctionalStatelessTestS3Tsan1:
|
||||
needs: [BuilderDebTsan]
|
||||
runs-on: [self-hosted, func-tester]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/stateless_s3_storage_tsan
|
||||
REPORTS_PATH=${{runner.temp}}/reports_dir
|
||||
CHECK_NAME=Stateless tests (tsan, s3 storage)
|
||||
REPO_COPY=${{runner.temp}}/stateless_s3_storage_tsan/ClickHouse
|
||||
KILL_TIMEOUT=10800
|
||||
RUN_BY_HASH_NUM=1
|
||||
RUN_BY_HASH_TOTAL=3
|
||||
EOF
|
||||
- name: Download json reports
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
path: ${{ env.REPORTS_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
- name: Functional test
|
||||
run: |
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci"
|
||||
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
docker kill "$(docker ps -q)" ||:
|
||||
docker rm -f "$(docker ps -a -q)" ||:
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
FunctionalStatelessTestS3Tsan2:
|
||||
needs: [BuilderDebTsan]
|
||||
runs-on: [self-hosted, func-tester]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/stateless_s3_storage_tsan
|
||||
REPORTS_PATH=${{runner.temp}}/reports_dir
|
||||
CHECK_NAME=Stateless tests (tsan, s3 storage)
|
||||
REPO_COPY=${{runner.temp}}/stateless_s3_storage_tsan/ClickHouse
|
||||
KILL_TIMEOUT=10800
|
||||
RUN_BY_HASH_NUM=2
|
||||
RUN_BY_HASH_TOTAL=3
|
||||
EOF
|
||||
- name: Download json reports
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
path: ${{ env.REPORTS_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
- name: Functional test
|
||||
run: |
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci"
|
||||
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
docker kill "$(docker ps -q)" ||:
|
||||
docker rm -f "$(docker ps -a -q)" ||:
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
FunctionalStatelessTestAarch64:
|
||||
needs: [BuilderDebAarch64]
|
||||
runs-on: [self-hosted, func-tester-aarch64]
|
||||
@ -3388,6 +3610,12 @@ jobs:
|
||||
- FunctionalStatefulTestMsan
|
||||
- FunctionalStatefulTestUBsan
|
||||
- FunctionalStatelessTestReleaseS3
|
||||
- FunctionalStatelessTestS3Debug0
|
||||
- FunctionalStatelessTestS3Debug1
|
||||
- FunctionalStatelessTestS3Debug2
|
||||
- FunctionalStatelessTestS3Tsan0
|
||||
- FunctionalStatelessTestS3Tsan1
|
||||
- FunctionalStatelessTestS3Tsan2
|
||||
- StressTestDebug
|
||||
- StressTestAsan
|
||||
- StressTestTsan
|
||||
|
@ -3,6 +3,9 @@
|
||||
# shellcheck disable=SC2086
|
||||
# shellcheck disable=SC2024
|
||||
|
||||
# Avoid overlaps with previous runs
|
||||
dmesg --clear
|
||||
|
||||
set -x
|
||||
|
||||
# Thread Fuzzer allows to check more permutations of possible thread scheduling
|
||||
|
@ -1036,7 +1036,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
try
|
||||
{
|
||||
LOG_DEBUG(
|
||||
log, "Initiailizing merge tree metadata cache lru_cache_size:{} continue_if_corrupted:{}", size, continue_if_corrupted);
|
||||
log, "Initializing merge tree metadata cache lru_cache_size:{} continue_if_corrupted:{}", size, continue_if_corrupted);
|
||||
global_context->initializeMergeTreeMetadataCache(path_str + "/" + "rocksdb", size);
|
||||
}
|
||||
catch (...)
|
||||
@ -1089,7 +1089,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Initiailizing interserver credentials.");
|
||||
LOG_DEBUG(log, "Initializing interserver credentials.");
|
||||
global_context->updateInterserverCredentials(config());
|
||||
|
||||
if (config().has("macros"))
|
||||
|
@ -250,7 +250,7 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
SlotCount available(std::unique_lock<std::mutex> &)
|
||||
SlotCount available(std::unique_lock<std::mutex> &) const
|
||||
{
|
||||
if (cur_concurrency < max_concurrency)
|
||||
return max_concurrency - cur_concurrency;
|
||||
|
@ -61,7 +61,7 @@ enum class QueryCancellationState
|
||||
|
||||
// Usually it's hard to set some reasonable hard memory limit
|
||||
// (especially, the default value). This class introduces new
|
||||
// mechanisim for the limiting of memory usage.
|
||||
// mechanism for the limiting of memory usage.
|
||||
// Soft limit represents guaranteed amount of memory query/user
|
||||
// may use. It's allowed to exceed this limit. But if hard limit
|
||||
// is reached, query with the biggest overcommit ratio
|
||||
@ -82,7 +82,7 @@ protected:
|
||||
virtual void pickQueryToExcludeImpl() = 0;
|
||||
|
||||
// This mutex is used to disallow concurrent access
|
||||
// to picked_tracker and cancelation_state variables.
|
||||
// to picked_tracker and cancellation_state variables.
|
||||
std::mutex overcommit_m;
|
||||
std::condition_variable cv;
|
||||
|
||||
|
@ -80,7 +80,7 @@ enum class Error : int32_t
|
||||
ZUNIMPLEMENTED = -6, /// Operation is unimplemented
|
||||
ZOPERATIONTIMEOUT = -7, /// Operation timeout
|
||||
ZBADARGUMENTS = -8, /// Invalid arguments
|
||||
ZINVALIDSTATE = -9, /// Invliad zhandle state
|
||||
ZINVALIDSTATE = -9, /// Invalid zhandle state
|
||||
|
||||
/** API errors.
|
||||
* This is never thrown by the server, it shouldn't be used other than
|
||||
@ -445,7 +445,7 @@ public:
|
||||
/** Usage scenario:
|
||||
* - create an object and issue commands;
|
||||
* - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap:
|
||||
* for example, just signal a condvar / fulfull a promise.
|
||||
* for example, just signal a condvar / fulfill a promise.
|
||||
* - you also may provide callbacks for watches; they are also invoked in internal thread and must be cheap.
|
||||
* - whenever you receive exception with ZSESSIONEXPIRED code or method isExpired returns true,
|
||||
* the ZooKeeper instance is no longer usable - you may only destroy it and probably create another.
|
||||
|
@ -171,7 +171,7 @@ public:
|
||||
*/
|
||||
virtual bool isSuitableForConstantFolding() const { return true; }
|
||||
|
||||
/** If function isSuitableForConstantFolding then, this method will be called during query analyzis
|
||||
/** If function isSuitableForConstantFolding then, this method will be called during query analysis
|
||||
* if some arguments are constants. For example logical functions (AndFunction, OrFunction) can
|
||||
* return they result based on some constant arguments.
|
||||
* Arguments are passed without modifications, useDefaultImplementationForNulls, useDefaultImplementationForNothing,
|
||||
@ -394,7 +394,7 @@ private:
|
||||
using FunctionOverloadResolverPtr = std::shared_ptr<IFunctionOverloadResolver>;
|
||||
|
||||
/// Old function interface. Check documentation in IFunction.h.
|
||||
/// If client do not need statefull properties it can implement this interface.
|
||||
/// If client do not need stateful properties it can implement this interface.
|
||||
class IFunction
|
||||
{
|
||||
public:
|
||||
|
@ -753,7 +753,7 @@ bool InterpreterSystemQuery::dropReplicaImpl(ASTSystemQuery & query, const Stora
|
||||
"if you want to clean the data and drop this replica", ErrorCodes::TABLE_WAS_NOT_DROPPED);
|
||||
|
||||
/// NOTE it's not atomic: replica may become active after this check, but before dropReplica(...)
|
||||
/// However, the main usecase is to drop dead replica, which cannot become active.
|
||||
/// However, the main use case is to drop dead replica, which cannot become active.
|
||||
/// This check prevents only from accidental drop of some other replica.
|
||||
if (zookeeper->exists(status.zookeeper_path + "/replicas/" + query.replica + "/is_active"))
|
||||
throw Exception("Can't drop replica: " + query.replica + ", because it's active",
|
||||
|
@ -349,7 +349,7 @@ CNFQuery & CNFQuery::pullNotOutFunctions()
|
||||
return *this;
|
||||
}
|
||||
|
||||
CNFQuery & CNFQuery::pushNotInFuntions()
|
||||
CNFQuery & CNFQuery::pushNotInFunctions()
|
||||
{
|
||||
transformAtoms([](const AtomicFormula & atom) -> AtomicFormula
|
||||
{
|
||||
|
@ -133,7 +133,7 @@ public:
|
||||
/// Converts != -> NOT =; <,>= -> (NOT) <; >,<= -> (NOT) <= for simpler matching
|
||||
CNFQuery & pullNotOutFunctions();
|
||||
/// Revert pullNotOutFunctions actions
|
||||
CNFQuery & pushNotInFuntions();
|
||||
CNFQuery & pushNotInFunctions();
|
||||
|
||||
/// (a OR b OR ...) AND (NOT a OR b OR ...) -> (b OR ...)
|
||||
CNFQuery & reduce();
|
||||
|
@ -154,7 +154,7 @@ void optimizeGroupBy(ASTSelectQuery * select_query, ContextPtr context)
|
||||
continue;
|
||||
}
|
||||
}
|
||||
/// don't optimise functions that shadow any of it's arguments, e.g.:
|
||||
/// don't optimize functions that shadow any of it's arguments, e.g.:
|
||||
/// SELECT toString(dummy) as dummy FROM system.one GROUP BY dummy;
|
||||
if (!function->alias.empty())
|
||||
{
|
||||
@ -632,7 +632,7 @@ bool convertQueryToCNF(ASTSelectQuery * select_query)
|
||||
if (!cnf_form)
|
||||
return false;
|
||||
|
||||
cnf_form->pushNotInFuntions();
|
||||
cnf_form->pushNotInFunctions();
|
||||
select_query->refWhere() = TreeCNFConverter::fromCNF(*cnf_form);
|
||||
return true;
|
||||
}
|
||||
|
@ -99,7 +99,7 @@ using TreeRewriterResultPtr = std::shared_ptr<const TreeRewriterResult>;
|
||||
|
||||
/// Tree Rewriter in terms of CMU slides @sa https://15721.courses.cs.cmu.edu/spring2020/slides/19-optimizer1.pdf
|
||||
///
|
||||
/// Optimises AST tree and collect information for further expression analysis in ExpressionAnalyzer.
|
||||
/// Optimizes AST tree and collect information for further expression analysis in ExpressionAnalyzer.
|
||||
/// Result AST has the following invariants:
|
||||
/// * all aliases are substituted
|
||||
/// * qualified names are translated
|
||||
|
@ -170,7 +170,7 @@ void WhereConstraintsOptimizer::perform()
|
||||
return replaceTermsToConstants(atom, compare_graph);
|
||||
})
|
||||
.reduce()
|
||||
.pushNotInFuntions();
|
||||
.pushNotInFunctions();
|
||||
|
||||
if (optimize_append_index)
|
||||
AddIndexConstraintsOptimizer(metadata_snapshot).perform(cnf);
|
||||
|
@ -46,6 +46,8 @@ void OwnSplitChannel::log(const Poco::Message & msg)
|
||||
|
||||
void OwnSplitChannel::tryLogSplit(const Poco::Message & msg)
|
||||
{
|
||||
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
|
||||
|
||||
try
|
||||
{
|
||||
logSplit(msg);
|
||||
@ -62,8 +64,6 @@ void OwnSplitChannel::tryLogSplit(const Poco::Message & msg)
|
||||
/// but let's log it into the stderr at least.
|
||||
catch (...)
|
||||
{
|
||||
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
|
||||
|
||||
const std::string & exception_message = getCurrentExceptionMessage(true);
|
||||
const std::string & message = msg.getText();
|
||||
|
||||
|
@ -1075,7 +1075,7 @@ void MergeTreeData::loadDataPartsFromDisk(
|
||||
|
||||
LOG_ERROR(log,
|
||||
"Detaching broken part {}{} (size: {}). "
|
||||
"If it happened after update, it is likely because of backward incompability. "
|
||||
"If it happened after update, it is likely because of backward incompatibility. "
|
||||
"You need to resolve this manually",
|
||||
getFullPathOnDisk(part_disk_ptr), part_name, part_size_str);
|
||||
std::lock_guard loading_lock(mutex);
|
||||
@ -1444,7 +1444,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
continue;
|
||||
}
|
||||
|
||||
/// Check if CSNs were witten after committing transaction, update and write if needed.
|
||||
/// Check if CSNs were written after committing transaction, update and write if needed.
|
||||
bool version_updated = false;
|
||||
chassert(!version.creation_tid.isEmpty());
|
||||
if (!part->version.creation_csn)
|
||||
@ -1867,18 +1867,18 @@ size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
|
||||
|
||||
void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts, bool throw_on_error, NameSet * parts_failed_to_delete)
|
||||
{
|
||||
NameSet part_names_successeded;
|
||||
NameSet part_names_succeed;
|
||||
|
||||
auto get_failed_parts = [&part_names_successeded, &parts_failed_to_delete, &parts] ()
|
||||
auto get_failed_parts = [&part_names_succeed, &parts_failed_to_delete, &parts] ()
|
||||
{
|
||||
if (part_names_successeded.size() == parts.size())
|
||||
if (part_names_succeed.size() == parts.size())
|
||||
return;
|
||||
|
||||
if (parts_failed_to_delete)
|
||||
{
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
if (!part_names_successeded.contains(part->name))
|
||||
if (!part_names_succeed.contains(part->name))
|
||||
parts_failed_to_delete->insert(part->name);
|
||||
}
|
||||
}
|
||||
@ -1886,7 +1886,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts, bool
|
||||
|
||||
try
|
||||
{
|
||||
clearPartsFromFilesystemImpl(parts, &part_names_successeded);
|
||||
clearPartsFromFilesystemImpl(parts, &part_names_succeed);
|
||||
get_failed_parts();
|
||||
}
|
||||
catch (...)
|
||||
@ -1898,7 +1898,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts, bool
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_to_remove, NameSet * part_names_successed)
|
||||
void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_to_remove, NameSet * part_names_succeed)
|
||||
{
|
||||
const auto settings = getSettings();
|
||||
if (parts_to_remove.size() > 1 && settings->max_part_removal_threads > 1 && parts_to_remove.size() > settings->concurrent_part_removal_threshold)
|
||||
@ -1918,10 +1918,10 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
|
||||
|
||||
LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
|
||||
part->remove();
|
||||
if (part_names_successed)
|
||||
if (part_names_succeed)
|
||||
{
|
||||
std::lock_guard lock(part_names_mutex);
|
||||
part_names_successed->insert(part->name);
|
||||
part_names_succeed->insert(part->name);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -1934,13 +1934,13 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
|
||||
{
|
||||
LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
|
||||
part->remove();
|
||||
if (part_names_successed)
|
||||
part_names_successed->insert(part->name);
|
||||
if (part_names_succeed)
|
||||
part_names_succeed->insert(part->name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirecory()
|
||||
size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirectory()
|
||||
{
|
||||
/**
|
||||
* Remove old (configured by setting) broken detached parts.
|
||||
@ -2093,7 +2093,7 @@ void MergeTreeData::rename(const String & new_table_path, const StorageID & new_
|
||||
|
||||
{
|
||||
/// Relies on storage path, so we drop it during rename
|
||||
/// it will be recreated automatiaclly.
|
||||
/// it will be recreated automatically.
|
||||
std::lock_guard wal_lock(write_ahead_log_mutex);
|
||||
if (write_ahead_log)
|
||||
{
|
||||
@ -3928,7 +3928,7 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
|
||||
throw Exception("Volume " + name + " does not exists on policy " + getStoragePolicy()->getName(), ErrorCodes::UNKNOWN_DISK);
|
||||
|
||||
if (parts.empty())
|
||||
throw Exception("Nothing to move (сheck that the partition exists).", ErrorCodes::NO_SUCH_DATA_PART);
|
||||
throw Exception("Nothing to move (check that the partition exists).", ErrorCodes::NO_SUCH_DATA_PART);
|
||||
|
||||
std::erase_if(parts, [&](auto part_ptr)
|
||||
{
|
||||
@ -6286,7 +6286,7 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
|
||||
{
|
||||
|
||||
// Store metadata for replicated table.
|
||||
// Do nothing for non-replocated.
|
||||
// Do nothing for non-replicated.
|
||||
createAndStoreFreezeMetadata(disk, part, fs::path(backup_part_path) / part->data_part_storage->getPartDirectory());
|
||||
};
|
||||
|
||||
@ -6599,7 +6599,7 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge
|
||||
auto disk = moving_part.reserved_space->getDisk();
|
||||
if (supportsReplication() && disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
|
||||
{
|
||||
/// If we acuqired lock than let's try to move. After one
|
||||
/// If we acquired lock than let's try to move. After one
|
||||
/// replica will actually move the part from disk to some
|
||||
/// zero-copy storage other replicas will just fetch
|
||||
/// metainformation.
|
||||
|
@ -177,7 +177,7 @@ public:
|
||||
/// Rename map new_name -> old_name
|
||||
std::unordered_map<String, String> rename_map;
|
||||
|
||||
bool isColumnRenamed(const String & new_name) const { return rename_map.count(new_name) > 0; }
|
||||
bool isColumnRenamed(const String & new_name) const { return rename_map.contains(new_name); }
|
||||
String getColumnOldName(const String & new_name) const { return rename_map.at(new_name); }
|
||||
};
|
||||
|
||||
@ -634,7 +634,7 @@ public:
|
||||
/// Delete WAL files containing parts, that all already stored on disk.
|
||||
size_t clearOldWriteAheadLogs();
|
||||
|
||||
size_t clearOldBrokenPartsFromDetachedDirecory();
|
||||
size_t clearOldBrokenPartsFromDetachedDirectory();
|
||||
|
||||
/// Delete all directories which names begin with "tmp"
|
||||
/// Must be called with locked lockForShare() because it's using relative_data_path.
|
||||
@ -761,7 +761,7 @@ public:
|
||||
|
||||
const ColumnsDescription & getObjectColumns() const { return object_columns; }
|
||||
|
||||
/// Creates desciprion of columns of data type Object from the range of data parts.
|
||||
/// Creates description of columns of data type Object from the range of data parts.
|
||||
static ColumnsDescription getObjectColumns(
|
||||
const DataPartsVector & parts, const ColumnsDescription & storage_columns);
|
||||
|
||||
@ -1083,7 +1083,7 @@ protected:
|
||||
DataPartsIndexes::index<TagByInfo>::type & data_parts_by_info;
|
||||
DataPartsIndexes::index<TagByStateAndInfo>::type & data_parts_by_state_and_info;
|
||||
|
||||
/// Current descriprion of columns of data type Object.
|
||||
/// Current description of columns of data type Object.
|
||||
/// It changes only when set of parts is changed and is
|
||||
/// protected by @data_parts_mutex.
|
||||
ColumnsDescription object_columns;
|
||||
@ -1125,7 +1125,7 @@ protected:
|
||||
return {begin, end};
|
||||
}
|
||||
|
||||
/// Creates desciprion of columns of data type Object from the range of data parts.
|
||||
/// Creates description of columns of data type Object from the range of data parts.
|
||||
static ColumnsDescription getObjectColumns(
|
||||
boost::iterator_range<DataPartIteratorByStateAndInfo> range, const ColumnsDescription & storage_columns);
|
||||
|
||||
@ -1263,7 +1263,7 @@ private:
|
||||
void checkPartCanBeAddedToTable(MutableDataPartPtr & part, DataPartsLock & lock) const;
|
||||
|
||||
/// Preparing itself to be committed in memory: fill some fields inside part, add it to data_parts_indexes
|
||||
/// in precommitted state and to transasction
|
||||
/// in precommitted state and to transaction
|
||||
void preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, DataPartStorageBuilderPtr builder);
|
||||
|
||||
/// Low-level method for preparing parts for commit (in-memory).
|
||||
@ -1351,7 +1351,7 @@ private:
|
||||
/// Remove parts from disk calling part->remove(). Can do it in parallel in case of big set of parts and enabled settings.
|
||||
/// If we fail to remove some part and throw_on_error equal to `true` will throw an exception on the first failed part.
|
||||
/// Otherwise, in non-parallel case will break and return.
|
||||
void clearPartsFromFilesystemImpl(const DataPartsVector & parts, NameSet * part_names_successed);
|
||||
void clearPartsFromFilesystemImpl(const DataPartsVector & parts, NameSet * part_names_succeed);
|
||||
|
||||
TemporaryParts temporary_parts;
|
||||
};
|
||||
|
@ -149,7 +149,7 @@ void ReplicatedMergeTreeAttachThread::runImpl()
|
||||
storage.clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"});
|
||||
storage.clearOldWriteAheadLogs();
|
||||
if (storage.getSettings()->merge_tree_enable_clear_old_broken_detached)
|
||||
storage.clearOldBrokenPartsFromDetachedDirecory();
|
||||
storage.clearOldBrokenPartsFromDetachedDirectory();
|
||||
|
||||
storage.createNewZooKeeperNodes();
|
||||
storage.syncPinnedPartUUIDs();
|
||||
|
@ -66,7 +66,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
|
||||
storage.clearOldWriteAheadLogs();
|
||||
storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
|
||||
if (storage.getSettings()->merge_tree_enable_clear_old_broken_detached)
|
||||
storage.clearOldBrokenPartsFromDetachedDirecory();
|
||||
storage.clearOldBrokenPartsFromDetachedDirectory();
|
||||
}
|
||||
|
||||
/// This is loose condition: no problem if we actually had lost leadership at this moment
|
||||
|
@ -1202,7 +1202,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
|
||||
cleared_count += clearOldMutations();
|
||||
cleared_count += clearEmptyParts();
|
||||
if (getSettings()->merge_tree_enable_clear_old_broken_detached)
|
||||
cleared_count += clearOldBrokenPartsFromDetachedDirecory();
|
||||
cleared_count += clearOldBrokenPartsFromDetachedDirectory();
|
||||
return cleared_count;
|
||||
/// TODO maybe take into account number of cleared objects when calculating backoff
|
||||
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false);
|
||||
|
@ -4582,7 +4582,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
|
||||
if (entry.alter_version < metadata_version)
|
||||
{
|
||||
/// TODO Can we replace it with LOGICAL_ERROR?
|
||||
/// As for now, it may rerely happen due to reordering of ALTER_METADATA entries in the queue of
|
||||
/// As for now, it may rarely happen due to reordering of ALTER_METADATA entries in the queue of
|
||||
/// non-initial replica and also may happen after stale replica recovery.
|
||||
LOG_WARNING(log, "Attempt to update metadata of version {} "
|
||||
"to older version {} when processing log entry {}: {}",
|
||||
@ -4664,7 +4664,7 @@ PartitionBlockNumbersHolder StorageReplicatedMergeTree::allocateBlockNumbersInAf
|
||||
}
|
||||
else
|
||||
{
|
||||
/// TODO: Implement optimal block number aqcuisition algorithm in multiple (but not all) partitions
|
||||
/// TODO: Implement optimal block number acquisition algorithm in multiple (but not all) partitions
|
||||
EphemeralLocksInAllPartitions lock_holder(
|
||||
fs::path(zookeeper_path) / "block_numbers", "block-", fs::path(zookeeper_path) / "temp", *zookeeper);
|
||||
|
||||
@ -4841,7 +4841,7 @@ void StorageReplicatedMergeTree::alter(
|
||||
Coordination::Responses results;
|
||||
Coordination::Error rc = zookeeper->tryMulti(ops, results);
|
||||
|
||||
/// For the sake of constitency with mechanics of concurrent background process of assigning parts merge tasks
|
||||
/// For the sake of consistency with mechanics of concurrent background process of assigning parts merge tasks
|
||||
/// this placeholder must be held up until the moment of committing into ZK of the mutation entry
|
||||
/// See ReplicatedMergeTreeMergePredicate::canMergeTwoParts() method
|
||||
partition_block_numbers_holder.reset();
|
||||
@ -5897,7 +5897,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, Conte
|
||||
/// partitions, saves them in the mutation entry and writes the mutation entry to a new ZK node in
|
||||
/// the /mutations folder. This block numbers are needed to determine which parts should be mutated and
|
||||
/// which shouldn't (parts inserted after the mutation will have the block number higher than the
|
||||
/// block number acquired by the mutation in that partition and so will not be mutatied).
|
||||
/// block number acquired by the mutation in that partition and so will not be mutated).
|
||||
/// This block number is called "mutation version" in that partition.
|
||||
///
|
||||
/// Mutation versions are acquired atomically in all partitions, so the case when an insert in some
|
||||
@ -7217,7 +7217,7 @@ bool StorageReplicatedMergeTree::addOpsToDropAllPartsInPartition(
|
||||
}
|
||||
|
||||
void StorageReplicatedMergeTree::dropAllPartsInPartitions(
|
||||
zkutil::ZooKeeper & zookeeper, const Strings partition_ids, std::vector<LogEntryPtr> & entries, ContextPtr query_context, bool detach)
|
||||
zkutil::ZooKeeper & zookeeper, const Strings & partition_ids, std::vector<LogEntryPtr> & entries, ContextPtr query_context, bool detach)
|
||||
{
|
||||
entries.reserve(partition_ids.size());
|
||||
|
||||
@ -7600,7 +7600,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
|
||||
|
||||
if (!children.empty())
|
||||
{
|
||||
LOG_TRACE(logger, "Found {} ({}) zookeper locks for {}", zookeeper_part_uniq_node, children.size(), fmt::join(children, ", "));
|
||||
LOG_TRACE(logger, "Found {} ({}) zookeeper locks for {}", zookeeper_part_uniq_node, children.size(), fmt::join(children, ", "));
|
||||
part_has_no_more_locks = false;
|
||||
continue;
|
||||
}
|
||||
@ -7706,12 +7706,12 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
|
||||
String zookeeper_part_uniq_node = fs::path(zc_zookeeper_path) / id;
|
||||
Strings id_replicas;
|
||||
zookeeper->tryGetChildren(zookeeper_part_uniq_node, id_replicas);
|
||||
LOG_TRACE(log, "Found zookeper replicas for {}: {}", zookeeper_part_uniq_node, id_replicas.size());
|
||||
LOG_TRACE(log, "Found zookeeper replicas for {}: {}", zookeeper_part_uniq_node, id_replicas.size());
|
||||
replicas.insert(id_replicas.begin(), id_replicas.end());
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Found zookeper replicas for part {}: {}", part.name, replicas.size());
|
||||
LOG_TRACE(log, "Found zookeeper replicas for part {}: {}", part.name, replicas.size());
|
||||
|
||||
Strings active_replicas;
|
||||
|
||||
@ -7724,7 +7724,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
|
||||
if ((replica != replica_name) && (zookeeper->exists(fs::path(zookeeper_path) / "replicas" / replica / "is_active")))
|
||||
active_replicas.push_back(replica);
|
||||
|
||||
LOG_TRACE(log, "Found zookeper active replicas for part {}: {}", part.name, active_replicas.size());
|
||||
LOG_TRACE(log, "Found zookeeper active replicas for part {}: {}", part.name, active_replicas.size());
|
||||
|
||||
if (active_replicas.empty())
|
||||
return "";
|
||||
@ -8159,7 +8159,7 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(
|
||||
|
||||
if (!created)
|
||||
{
|
||||
String mode_str = mode == zkutil::CreateMode::Persistent ? "persistent" : "ephemral";
|
||||
String mode_str = mode == zkutil::CreateMode::Persistent ? "persistent" : "ephemeral";
|
||||
throw Exception(ErrorCodes::NOT_FOUND_NODE, "Cannot create {} zero copy lock {} because part was unlocked from zookeeper", mode_str, zookeeper_node);
|
||||
}
|
||||
}
|
||||
|
@ -754,7 +754,7 @@ private:
|
||||
std::vector<EphemeralLockInZooKeeper> & delimiting_block_locks,
|
||||
std::vector<size_t> & log_entry_ops_idx);
|
||||
void dropAllPartsInPartitions(
|
||||
zkutil::ZooKeeper & zookeeper, const Strings partition_ids, std::vector<LogEntryPtr> & entries, ContextPtr query_context, bool detach);
|
||||
zkutil::ZooKeeper & zookeeper, const Strings & partition_ids, std::vector<LogEntryPtr> & entries, ContextPtr query_context, bool detach);
|
||||
|
||||
LogEntryPtr dropAllPartsInPartition(
|
||||
zkutil::ZooKeeper & zookeeper, const String & partition_id, ContextPtr query_context, bool detach);
|
||||
|
@ -69,13 +69,14 @@ public:
|
||||
|
||||
virtual ~ITableFunction() = default;
|
||||
|
||||
protected:
|
||||
virtual AccessType getSourceAccessType() const;
|
||||
|
||||
private:
|
||||
virtual StoragePtr executeImpl(
|
||||
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const = 0;
|
||||
|
||||
virtual const char * getStorageTypeName() const = 0;
|
||||
|
||||
virtual AccessType getSourceAccessType() const;
|
||||
};
|
||||
|
||||
using TableFunctionPtr = std::shared_ptr<ITableFunction>;
|
||||
|
@ -7,6 +7,8 @@
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <TableFunctions/TableFunctionHDFS.h>
|
||||
#include <Interpreters/parseColumnsListForTableFunction.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Access/Common/AccessFlags.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -29,7 +31,10 @@ StoragePtr TableFunctionHDFS::getStorage(
|
||||
ColumnsDescription TableFunctionHDFS::getActualTableStructure(ContextPtr context) const
|
||||
{
|
||||
if (structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageHDFS::getTableStructureFromData(format, filename, compression_method, context);
|
||||
}
|
||||
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include <TableFunctions/TableFunctionHDFS.h>
|
||||
#include <TableFunctions/TableFunctionHDFSCluster.h>
|
||||
#include <Interpreters/parseColumnsListForTableFunction.h>
|
||||
#include <Access/Common/AccessFlags.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
|
||||
@ -74,7 +75,10 @@ void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, Conte
|
||||
ColumnsDescription TableFunctionHDFSCluster::getActualTableStructure(ContextPtr context) const
|
||||
{
|
||||
if (structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageHDFS::getTableStructureFromData(format, filename, compression_method, context);
|
||||
}
|
||||
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <TableFunctions/TableFunctionS3.h>
|
||||
#include <Interpreters/parseColumnsListForTableFunction.h>
|
||||
#include <Access/Common/AccessFlags.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Storages/checkAndGetLiteralArgument.h>
|
||||
#include <Storages/StorageS3.h>
|
||||
@ -133,6 +134,7 @@ ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context)
|
||||
{
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageS3::getTableStructureFromData(
|
||||
configuration.format,
|
||||
S3::URI(Poco::URI(configuration.url)),
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <TableFunctions/TableFunctionS3.h>
|
||||
#include <TableFunctions/TableFunctionS3Cluster.h>
|
||||
#include <Interpreters/parseColumnsListForTableFunction.h>
|
||||
#include <Access/Common/AccessFlags.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
@ -83,6 +84,7 @@ ColumnsDescription TableFunctionS3Cluster::getActualTableStructure(ContextPtr co
|
||||
{
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageS3::getTableStructureFromData(
|
||||
configuration.format,
|
||||
S3::URI(Poco::URI(configuration.url)),
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <Storages/StorageExternalDistributed.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <Interpreters/parseColumnsListForTableFunction.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
|
||||
|
||||
@ -113,12 +114,15 @@ ReadWriteBufferFromHTTP::HTTPHeaderEntries TableFunctionURL::getHeaders() const
|
||||
ColumnsDescription TableFunctionURL::getActualTableStructure(ContextPtr context) const
|
||||
{
|
||||
if (structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageURL::getTableStructureFromData(format,
|
||||
filename,
|
||||
chooseCompressionMethod(Poco::URI(filename).getPath(), compression_method),
|
||||
getHeaders(),
|
||||
std::nullopt,
|
||||
context);
|
||||
}
|
||||
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
@ -247,6 +247,12 @@ CI_CONFIG = {
|
||||
"Stateless tests (release, s3 storage)": {
|
||||
"required_build": "package_release",
|
||||
},
|
||||
"Stateless tests (debug, s3 storage)": {
|
||||
"required_build": "package_debug",
|
||||
},
|
||||
"Stateless tests (tsan, s3 storage)": {
|
||||
"required_build": "package_tsan",
|
||||
},
|
||||
"Stress test (asan)": {
|
||||
"required_build": "package_asan",
|
||||
},
|
||||
|
@ -1,31 +1,26 @@
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import TSV
|
||||
|
||||
uuids = []
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance(
|
||||
"node1", main_configs=["configs/conf.xml"], with_nginx=True
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def cluster():
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def setup_node():
|
||||
try:
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
cluster.add_instance(
|
||||
"node1", main_configs=["configs/conf.xml"], with_nginx=True
|
||||
)
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
|
||||
node1.query(
|
||||
"insert into table function url(url1) partition by column3 values (1, 2, 3), (3, 2, 1), (1, 3, 2)"
|
||||
)
|
||||
yield
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_partition_by(cluster):
|
||||
node1 = cluster.instances["node1"]
|
||||
|
||||
node1.query(
|
||||
f"insert into table function url(url1) partition by column3 values (1, 2, 3), (3, 2, 1), (1, 3, 2)"
|
||||
)
|
||||
def test_partition_by():
|
||||
result = node1.query(
|
||||
f"select * from url('http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
|
||||
)
|
||||
@ -38,3 +33,45 @@ def test_partition_by(cluster):
|
||||
f"select * from url('http://nginx:80/test_3', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
|
||||
)
|
||||
assert result.strip() == "1\t2\t3"
|
||||
|
||||
|
||||
def test_table_function_url_access_rights():
|
||||
node1.query("CREATE USER OR REPLACE u1")
|
||||
|
||||
expected_error = "necessary to have grant CREATE TEMPORARY TABLE, URL ON *.*"
|
||||
assert expected_error in node1.query_and_get_error(
|
||||
f"SELECT * FROM url('http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')",
|
||||
user="u1",
|
||||
)
|
||||
|
||||
expected_error = "necessary to have grant CREATE TEMPORARY TABLE, URL ON *.*"
|
||||
assert expected_error in node1.query_and_get_error(
|
||||
f"SELECT * FROM url('http://nginx:80/test_1', 'TSV')", user="u1"
|
||||
)
|
||||
|
||||
assert node1.query(
|
||||
f"DESCRIBE TABLE url('http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')",
|
||||
user="u1",
|
||||
) == TSV([["column1", "UInt32"], ["column2", "UInt32"], ["column3", "UInt32"]])
|
||||
|
||||
assert node1.query(
|
||||
f"DESCRIBE TABLE url('http://nginx:80/not-exist', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')",
|
||||
user="u1",
|
||||
) == TSV([["column1", "UInt32"], ["column2", "UInt32"], ["column3", "UInt32"]])
|
||||
|
||||
expected_error = "necessary to have grant URL ON *.*"
|
||||
assert expected_error in node1.query_and_get_error(
|
||||
f"DESCRIBE TABLE url('http://nginx:80/test_1', 'TSV')", user="u1"
|
||||
)
|
||||
|
||||
node1.query("GRANT URL ON *.* TO u1")
|
||||
assert node1.query(
|
||||
f"DESCRIBE TABLE url('http://nginx:80/test_1', 'TSV')",
|
||||
user="u1",
|
||||
) == TSV(
|
||||
[
|
||||
["c1", "Nullable(Int64)"],
|
||||
["c2", "Nullable(Int64)"],
|
||||
["c3", "Nullable(Int64)"],
|
||||
]
|
||||
)
|
||||
|
@ -1,35 +0,0 @@
|
||||
<?xml version="1.0"?>
|
||||
<clickhouse>
|
||||
<logger>
|
||||
<level>trace</level>
|
||||
<console>true</console>
|
||||
</logger>
|
||||
|
||||
<tcp_port>9000</tcp_port>
|
||||
|
||||
<path>./</path>
|
||||
|
||||
<mark_cache_size>0</mark_cache_size>
|
||||
|
||||
<users>
|
||||
<default>
|
||||
<password></password>
|
||||
|
||||
<networks>
|
||||
<ip>::/0</ip>
|
||||
</networks>
|
||||
|
||||
<profile>default</profile>
|
||||
<quota>default</quota>
|
||||
<access_management>1</access_management>
|
||||
</default>
|
||||
</users>
|
||||
|
||||
<profiles>
|
||||
<default/>
|
||||
</profiles>
|
||||
|
||||
<quotas>
|
||||
<default />
|
||||
</quotas>
|
||||
</clickhouse>
|
@ -1,108 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-tsan, no-asan, no-ubsan, no-msan, no-parallel, no-fasttest
|
||||
# Tag no-tsan: requires jemalloc to track small allocations
|
||||
# Tag no-asan: requires jemalloc to track small allocations
|
||||
# Tag no-ubsan: requires jemalloc to track small allocations
|
||||
# Tag no-msan: requires jemalloc to track small allocations
|
||||
|
||||
#
|
||||
# Regression for INSERT SELECT, that abnormally terminates the server
|
||||
# in case of too small memory limits.
|
||||
#
|
||||
# NOTE: After #24483 had been merged the only place where the allocation may
|
||||
# fail is the insert into PODArray in DB::OwnSplitChannel::log, but after
|
||||
# #24069 those errors will be ignored, so to check new behaviour separate
|
||||
# server is required.
|
||||
#
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
server_opts=(
|
||||
"--config-file=$CURDIR/$(basename "${BASH_SOURCE[0]}" .sh).config.xml"
|
||||
"--"
|
||||
# to avoid multiple listen sockets (complexity for port discovering)
|
||||
"--listen_host=127.1"
|
||||
# we will discover the real port later.
|
||||
"--tcp_port=0"
|
||||
"--shutdown_wait_unfinished=0"
|
||||
)
|
||||
CLICKHOUSE_WATCHDOG_ENABLE=0 $CLICKHOUSE_SERVER_BINARY "${server_opts[@]}" >clickhouse-server.log 2>clickhouse-server.stderr &
|
||||
server_pid=$!
|
||||
|
||||
trap cleanup EXIT
|
||||
function cleanup()
|
||||
{
|
||||
kill -9 $server_pid
|
||||
|
||||
echo "Test failed. Server log:"
|
||||
cat clickhouse-server.log
|
||||
cat clickhouse-server.stderr
|
||||
rm -f clickhouse-server.log
|
||||
rm -f clickhouse-server.stderr
|
||||
|
||||
exit 1
|
||||
}
|
||||
|
||||
server_port=
|
||||
i=0 retries=300
|
||||
# wait until server will start to listen (max 30 seconds)
|
||||
while [[ -z $server_port ]] && [[ $i -lt $retries ]]; do
|
||||
server_port=$(lsof -n -a -P -i tcp -s tcp:LISTEN -p $server_pid 2>/dev/null | awk -F'[ :]' '/LISTEN/ { print $(NF-1) }')
|
||||
((++i))
|
||||
sleep 0.1
|
||||
if ! kill -0 $server_pid >& /dev/null; then
|
||||
echo "No server (pid $server_pid)"
|
||||
break
|
||||
fi
|
||||
done
|
||||
if [[ -z $server_port ]]; then
|
||||
echo "Cannot wait for LISTEN socket" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# wait for the server to start accepting tcp connections (max 30 seconds)
|
||||
i=0 retries=300
|
||||
while ! $CLICKHOUSE_CLIENT_BINARY --host 127.1 --port "$server_port" --format Null -q 'select 1' 2>/dev/null && [[ $i -lt $retries ]]; do
|
||||
sleep 0.1
|
||||
if ! kill -0 $server_pid >& /dev/null; then
|
||||
echo "No server (pid $server_pid)"
|
||||
break
|
||||
fi
|
||||
done
|
||||
if ! $CLICKHOUSE_CLIENT_BINARY --host 127.1 --port "$server_port" --format Null -q 'select 1'; then
|
||||
echo "Cannot wait until server will start accepting connections on <tcp_port>" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# it is not mandatory to use existing table since it fails earlier, hence just a placeholder.
|
||||
# this is format of INSERT SELECT, that pass these settings exactly for INSERT query not the SELECT
|
||||
if $CLICKHOUSE_CLIENT_BINARY --host 127.1 --port "$server_port" --format Null --send_logs_level=warning --max_memory_usage=1 --max_untracked_memory=1 -q 'insert into placeholder_table_name select * from numbers_mt(65535)' >& /dev/null; then
|
||||
echo "INSERT SELECT should fail" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# no sleep, since flushing to stderr should not be buffered.
|
||||
if ! grep -E -q 'Cannot add message to the log: Code: 60.*placeholder_table_name' clickhouse-server.stderr; then
|
||||
echo "Adding message to the log should fail" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# check that server is still alive
|
||||
$CLICKHOUSE_CLIENT_BINARY --host 127.1 --port "$server_port" --format Null -q 'SELECT 1'
|
||||
|
||||
# send TERM and save the error code to ensure that it is 0 (EXIT_SUCCESS)
|
||||
kill $server_pid
|
||||
wait $server_pid
|
||||
return_code=$?
|
||||
|
||||
trap '' EXIT
|
||||
if [ $return_code != 0 ]; then
|
||||
cat clickhouse-server.log
|
||||
cat clickhouse-server.stderr
|
||||
fi
|
||||
rm -f clickhouse-server.log
|
||||
rm -f clickhouse-server.stderr
|
||||
|
||||
exit $return_code
|
@ -361,6 +361,8 @@ int decompressFiles(int input_fd, char * path, char * name, bool & have_compress
|
||||
|
||||
#endif
|
||||
|
||||
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
|
||||
|
||||
uint32_t getInode(const char * self)
|
||||
{
|
||||
std::ifstream maps("/proc/self/maps");
|
||||
@ -386,6 +388,8 @@ uint32_t getInode(const char * self)
|
||||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
int main(int/* argc*/, char* argv[])
|
||||
{
|
||||
char self[4096] = {0};
|
||||
@ -409,6 +413,7 @@ int main(int/* argc*/, char* argv[])
|
||||
else
|
||||
name = file_path;
|
||||
|
||||
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
|
||||
/// get inode of this executable
|
||||
uint32_t inode = getInode(self);
|
||||
if (inode == 0)
|
||||
@ -460,6 +465,7 @@ int main(int/* argc*/, char* argv[])
|
||||
printf("No target executable - decompression only was performed.\n");
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
int input_fd = open(self, O_RDONLY);
|
||||
if (input_fd == -1)
|
||||
@ -522,19 +528,21 @@ int main(int/* argc*/, char* argv[])
|
||||
|
||||
if (has_exec)
|
||||
{
|
||||
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
|
||||
/// write one byte to the lock in case other copies of compressed are running to indicate that
|
||||
/// execution should be performed
|
||||
write(lock, "1", 1);
|
||||
|
||||
#endif
|
||||
execv(self, argv);
|
||||
|
||||
/// This part of code will be reached only if error happened
|
||||
perror("execv");
|
||||
return 1;
|
||||
}
|
||||
|
||||
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
|
||||
/// since inodes can be reused - it's a precaution if lock file already exists and have size of 1
|
||||
ftruncate(lock, 0);
|
||||
#endif
|
||||
|
||||
printf("No target executable - decompression only was performed.\n");
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user