Merge branch 'master' into master

This commit is contained in:
mergify[bot] 2022-01-29 11:24:05 +00:00 committed by GitHub
commit ce8373c7a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 206 additions and 83 deletions

View File

@ -3,14 +3,14 @@ toc_priority: 53
toc_title: USE
---
# USE 语句 {#use}
# USE Statement {#use}
``` sql
USE db
```
用于设置会话的当前数据库。
Lets you set the current database for the session.
如果查询语句中没有在表名前面以加点的方式指明数据库名, 则用当前数据库进行搜索。
The current database is used for searching for tables if the database is not explicitly defined in the query with a dot before the table name.
使用 HTTP 协议时无法进行此查询,因为没有会话的概念。
This query cant be made when using the HTTP protocol, since there is no concept of a session.

View File

@ -1 +0,0 @@
../../../en/sql-reference/statements/use.md

View File

@ -0,0 +1,16 @@
---
toc_priority: 53
toc_title: USE
---
# USE 语句 {#use}
``` sql
USE db
```
用于设置会话的当前数据库。
如果查询语句中没有在表名前面以加点的方式指明数据库名, 则用当前数据库进行搜索。
使用 HTTP 协议时无法进行此查询,因为没有会话的概念。

View File

@ -5,7 +5,6 @@
#include <Common/CurrentThread.h>
#include <base/logger_useful.h>
#include <chrono>
#include <base/scope_guard.h>
namespace DB
@ -246,7 +245,6 @@ void BackgroundSchedulePool::threadFunction()
setThreadName(thread_name.c_str());
attachToThreadGroup();
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
while (!shutdown)
{
@ -273,7 +271,6 @@ void BackgroundSchedulePool::delayExecutionThreadFunction()
setThreadName((thread_name + "/D").c_str());
attachToThreadGroup();
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
while (!shutdown)
{

View File

@ -68,7 +68,7 @@ void ExecuteScalarSubqueriesMatcher::visit(ASTPtr & ast, Data & data)
static bool worthConvertingToLiteral(const Block & scalar)
{
const auto * scalar_type_name = scalar.safeGetByPosition(0).type->getFamilyName();
std::set<String> useless_literal_types = {"Array", "Tuple", "AggregateFunction", "Function", "Set", "LowCardinality"};
static const std::set<std::string_view> useless_literal_types = {"Array", "Tuple", "AggregateFunction", "Function", "Set", "LowCardinality"};
return !useless_literal_types.count(scalar_type_name);
}

View File

@ -966,14 +966,14 @@ private:
/// Does the loading, possibly in the separate thread.
void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async, ThreadGroupStatusPtr thread_group = {})
{
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
LOG_TRACE(log, "Start loading object '{}'", name);
try
{

View File

@ -4,7 +4,6 @@
#include <Poco/Event.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <base/scope_guard_safe.h>
#include <iostream>
namespace DB
@ -40,11 +39,6 @@ static void threadFunction(CompletedPipelineExecutor::Data & data, ThreadGroupSt
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
data.executor->execute(num_threads);
}
catch (...)

View File

@ -301,11 +301,6 @@ void PipelineExecutor::executeImpl(size_t num_threads)
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
try
{
executeSingleThread(thread_num);

View File

@ -4,9 +4,7 @@
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Sources/NullSource.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Common/setThreadName.h>
#include <base/scope_guard_safe.h>
namespace DB
{
@ -77,11 +75,6 @@ static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGrou
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
data.executor->execute(num_threads);
}
catch (...)

View File

@ -2,11 +2,8 @@
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/ISource.h>
#include <QueryPipeline/QueryPipeline.h>
#include <iostream>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <base/scope_guard_safe.h>
#include <Poco/Event.h>
namespace DB
@ -107,11 +104,6 @@ static void threadFunction(PushingAsyncPipelineExecutor::Data & data, ThreadGrou
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
data.executor->execute(num_threads);
}
catch (...)

View File

@ -15,21 +15,22 @@ namespace DB
class NativeInputFormat final : public IInputFormat
{
public:
NativeInputFormat(ReadBuffer & buf, const Block & header)
: IInputFormat(header, buf)
, reader(buf, header, 0) {}
NativeInputFormat(ReadBuffer & buf, const Block & header_)
: IInputFormat(header_, buf)
, reader(std::make_unique<NativeReader>(buf, header_, 0))
, header(header_) {}
String getName() const override { return "Native"; }
void resetParser() override
{
IInputFormat::resetParser();
reader.resetParser();
reader->resetParser();
}
Chunk generate() override
{
auto block = reader.read();
auto block = reader->read();
if (!block)
return {};
@ -40,8 +41,15 @@ public:
return Chunk(block.getColumns(), num_rows);
}
void setReadBuffer(ReadBuffer & in_) override
{
reader = std::make_unique<NativeReader>(in_, header, 0);
IInputFormat::setReadBuffer(in_);
}
private:
NativeReader reader;
std::unique_ptr<NativeReader> reader;
Block header;
};
class NativeOutputFormat final : public IOutputFormat

View File

@ -2,17 +2,12 @@
#include <IO/ReadHelpers.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <base/scope_guard_safe.h>
namespace DB
{
void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
@ -59,12 +54,8 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr
void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
CurrentThread::attachToIfDetached(thread_group);
const auto parser_unit_number = current_ticket_number % processing_units.size();
auto & unit = processing_units[parser_unit_number];

View File

@ -67,7 +67,6 @@
#include <boost/algorithm/string/replace.hpp>
#include <base/insertAtEnd.h>
#include <base/scope_guard_safe.h>
#include <algorithm>
#include <iomanip>
@ -1590,12 +1589,8 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
{
pool.scheduleOrThrowOnError([&, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
CurrentThread::attachToIfDetached(thread_group);
LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
part->remove();

View File

@ -1,5 +1,4 @@
#include <boost/rational.hpp> /// For calculations related to sampling coefficients.
#include <base/scope_guard_safe.h>
#include <optional>
#include <unordered_set>
@ -988,9 +987,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachQueryIfNotDetached(););
if (thread_group)
CurrentThread::attachTo(thread_group);
CurrentThread::attachToIfDetached(thread_group);
process_part(part_index);
});

View File

@ -16,22 +16,54 @@ $CLICKHOUSE_CLIENT -nm -q "create database ordinary_$CLICKHOUSE_DATABASE engine=
$CLICKHOUSE_CLIENT -nm -q """
use ordinary_$CLICKHOUSE_DATABASE;
drop table if exists data_01810;
create table data_01810 (key Int) Engine=MergeTree() order by key partition by key settings max_part_removal_threads=10, concurrent_part_removal_threshold=49;
insert into data_01810 select * from numbers(50);
create table data_01810 (key Int)
Engine=MergeTree()
order by key
partition by key%100
settings max_part_removal_threads=10, concurrent_part_removal_threshold=99, min_bytes_for_wide_part=0;
insert into data_01810 select * from numbers(100);
drop table data_01810 settings log_queries=1;
system flush logs;
select throwIf(length(thread_ids)<50) from system.query_log where event_date >= yesterday() and current_database = currentDatabase() and query = 'drop table data_01810 settings log_queries=1;' and type = 'QueryFinish' format Null;
-- sometimes the same thread can be used to remove part, due to ThreadPool,
-- hence we cannot compare strictly.
select throwIf(not(length(thread_ids) between 6 and 11))
from system.query_log
where
event_date >= yesterday() and
current_database = currentDatabase() and
query = 'drop table data_01810 settings log_queries=1;' and
type = 'QueryFinish'
format Null;
"""
# ReplicatedMergeTree
$CLICKHOUSE_CLIENT -nm -q """
use ordinary_$CLICKHOUSE_DATABASE;
drop table if exists rep_data_01810;
create table rep_data_01810 (key Int) Engine=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rep_data_01810', '1') order by key partition by key settings max_part_removal_threads=10, concurrent_part_removal_threshold=49;
insert into rep_data_01810 select * from numbers(50);
create table rep_data_01810 (key Int)
Engine=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rep_data_01810', '1')
order by key
partition by key%100
settings max_part_removal_threads=10, concurrent_part_removal_threshold=99, min_bytes_for_wide_part=0;
insert into rep_data_01810 select * from numbers(100);
drop table rep_data_01810 settings log_queries=1;
system flush logs;
select throwIf(length(thread_ids)<50) from system.query_log where event_date >= yesterday() and current_database = currentDatabase() and query = 'drop table rep_data_01810 settings log_queries=1;' and type = 'QueryFinish' format Null;
-- sometimes the same thread can be used to remove part, due to ThreadPool,
-- hence we cannot compare strictly.
select throwIf(not(length(thread_ids) between 6 and 11))
from system.query_log
where
event_date >= yesterday() and
current_database = currentDatabase() and
query = 'drop table rep_data_01810 settings log_queries=1;' and
type = 'QueryFinish'
format Null;
"""
$CLICKHOUSE_CLIENT -nm -q "drop database ordinary_$CLICKHOUSE_DATABASE"

View File

@ -0,0 +1,50 @@
#!/usr/bin/env python3
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
CLICKHOUSE_TMP = os.environ.get('CLICKHOUSE_TMP')
from pure_http_client import ClickHouseClient
client = ClickHouseClient()
def run_test(data_format, gen_data_template, settings):
print(data_format)
client.query("TRUNCATE TABLE t_async_insert")
expected = client.query(gen_data_template.format("TSV")).strip()
data = client.query(gen_data_template.format(data_format), settings=settings,binary_result=True)
insert_query = "INSERT INTO t_async_insert FORMAT {}".format(data_format)
client.query_with_data(insert_query, data, settings=settings)
result = client.query("SELECT * FROM t_async_insert FORMAT TSV").strip()
if result != expected:
print("Failed for format {}.\nExpected:\n{}\nGot:\n{}\n".format(data_format, expected, result))
exit(1)
formats = client.query("SELECT name FROM system.formats WHERE is_input AND is_output \
AND name NOT IN ('CapnProto', 'RawBLOB', 'Template', 'ProtobufSingle', 'LineAsString', 'Protobuf') ORDER BY name").strip().split('\n')
# Generic formats
client.query("DROP TABLE IF EXISTS t_async_insert")
client.query("CREATE TABLE t_async_insert (id UInt64, s String, arr Array(UInt64)) ENGINE = Memory")
gen_data_query = "SELECT number AS id, toString(number) AS s, range(number) AS arr FROM numbers(10) FORMAT {}"
for data_format in formats:
run_test(data_format, gen_data_query, settings={"async_insert": 1, "wait_for_async_insert": 1})
# LineAsString
client.query("DROP TABLE IF EXISTS t_async_insert")
client.query("CREATE TABLE t_async_insert (s String) ENGINE = Memory")
gen_data_query = "SELECT toString(number) AS s FROM numbers(10) FORMAT {}"
run_test('LineAsString', gen_data_query, settings={"async_insert": 1, "wait_for_async_insert": 1})
# TODO: add CapnProto and Protobuf
print("OK")

View File

@ -0,0 +1,40 @@
Arrow
ArrowStream
Avro
CSV
CSVWithNames
CSVWithNamesAndTypes
CustomSeparated
CustomSeparatedWithNames
CustomSeparatedWithNamesAndTypes
JSONCompactEachRow
JSONCompactEachRowWithNames
JSONCompactEachRowWithNamesAndTypes
JSONCompactStringsEachRow
JSONCompactStringsEachRowWithNames
JSONCompactStringsEachRowWithNamesAndTypes
JSONEachRow
JSONStringsEachRow
MsgPack
Native
ORC
Parquet
RowBinary
RowBinaryWithNames
RowBinaryWithNamesAndTypes
TSKV
TSV
TSVRaw
TSVRawWithNames
TSVRawWithNamesAndTypes
TSVWithNames
TSVWithNamesAndTypes
TabSeparated
TabSeparatedRaw
TabSeparatedRawWithNames
TabSeparatedRawWithNamesAndTypes
TabSeparatedWithNames
TabSeparatedWithNamesAndTypes
Values
LineAsString
OK

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
# Tags: no-fasttest, long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# We should have correct env vars from shell_config.sh to run this test
python3 "$CURDIR"/02187_async_inserts_all_formats.python

View File

@ -14,22 +14,23 @@ class ClickHouseClient:
def __init__(self, host = CLICKHOUSE_SERVER_URL_STR):
self.host = host
def query(self, query, connection_timeout = 1500):
def query(self, query, connection_timeout=1500, settings=dict(), binary_result=False):
NUMBER_OF_TRIES = 30
DELAY = 10
params = {
'timeout_before_checking_execution_speed': 120,
'max_execution_time': 6000,
'database': CLICKHOUSE_DATABASE,
}
# Add extra settings to params
params = {**params, **settings}
for i in range(NUMBER_OF_TRIES):
r = requests.post(
self.host,
params = {
'timeout_before_checking_execution_speed': 120,
'max_execution_time': 6000,
'database': CLICKHOUSE_DATABASE
},
timeout = connection_timeout,
data = query)
r = requests.post(self.host, params=params, timeout=connection_timeout, data=query)
if r.status_code == 200:
return r.text
return r.content if binary_result else r.text
else:
print('ATTENTION: try #%d failed' % i)
if i != (NUMBER_OF_TRIES-1):
@ -44,9 +45,22 @@ class ClickHouseClient:
df = pd.read_csv(io.StringIO(data), sep = '\t')
return df
def query_with_data(self, query, content):
content = content.encode('utf-8')
r = requests.post(self.host, data=content)
def query_with_data(self, query, data, connection_timeout=1500, settings=dict()):
params = {
'query': query,
'timeout_before_checking_execution_speed': 120,
'max_execution_time': 6000,
'database': CLICKHOUSE_DATABASE,
}
headers = {
"Content-Type": "application/binary"
}
# Add extra settings to params
params = {**params, **settings}
r = requests.post(self.host, params=params, timeout=connection_timeout, data=data, headers=headers)
result = r.text
if r.status_code == 200:
return result