From 18c2ac2e1ed4fa5519f5a37771da1d2cc6134cdf Mon Sep 17 00:00:00 2001
From: Artur <613623@mail.ru>
Date: Wed, 13 Oct 2021 15:43:52 +0000
Subject: [PATCH 01/39] add parallel reading from infile in client
---
src/Client/ClientBase.cpp | 41 ++++++++++++++++---
src/Client/ClientBase.h | 3 ++
...048_parallel_reading_from_infile.reference | 1 +
.../02048_parallel_reading_from_infile.sh | 23 +++++++++++
4 files changed, 62 insertions(+), 6 deletions(-)
create mode 100644 tests/queries/0_stateless/02048_parallel_reading_from_infile.reference
create mode 100755 tests/queries/0_stateless/02048_parallel_reading_from_infile.sh
diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp
index cde5a5f9977..cbe09f62ae9 100644
--- a/src/Client/ClientBase.cpp
+++ b/src/Client/ClientBase.cpp
@@ -9,6 +9,7 @@
#include
#include
#include
+#include "Common/getNumberOfPhysicalCPUCores.h"
#if !defined(ARCADIA_BUILD)
# include
@@ -773,7 +774,8 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
/// Get name of this file (path to file)
const auto & in_file_node = parsed_insert_query->infile->as();
const auto in_file = in_file_node.value.safeGet();
-
+ /// Get name of table
+ const auto table_name = parsed_insert_query->table_id.getTableName();
std::string compression_method;
/// Compression method can be specified in query
if (parsed_insert_query->compression)
@@ -782,13 +784,35 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
compression_method = compression_method_node.value.safeGet();
}
- /// Otherwise, it will be detected from file name automatically (by chooseCompressionMethod)
- /// Buffer for reading from file is created and wrapped with appropriate compression method
- auto in_buffer = wrapReadBufferWithCompressionMethod(std::make_unique(in_file), chooseCompressionMethod(in_file, compression_method));
-
+ /// Create temporary storage file, to support globs and parallel reading
+ StorageFile::CommonArguments args{
+ WithContext(global_context),
+ StorageID("_from_infile", table_name),
+ parsed_insert_query->format,
+ std::nullopt /*format settings*/,
+ compression_method,
+ columns_description,
+ ConstraintsDescription{},
+ String{},
+ };
+ StoragePtr storage = StorageFile::create(in_file, global_context->getUserFilesPath(), args);
+ storage->startup();
+ SelectQueryInfo query_info;
+
try
{
- sendDataFrom(*in_buffer, sample, columns_description, parsed_query);
+ sendDataFromPipe(
+ storage->read(
+ sample.getNames(),
+ storage->getInMemoryMetadataPtr(),
+ query_info,
+ global_context,
+ {},
+ DEFAULT_BLOCK_SIZE,
+ getNumberOfPhysicalCPUCores()
+ ),
+ parsed_query
+ );
}
catch (Exception & e)
{
@@ -862,6 +886,11 @@ void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDes
});
}
+ sendDataFromPipe(std::move(pipe), parsed_query);
+}
+
+void ClientBase::sendDataFromPipe(Pipe&& pipe, ASTPtr parsed_query)
+{
QueryPipeline pipeline(std::move(pipe));
PullingAsyncPipelineExecutor executor(pipeline);
diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h
index bf9e8fdfe47..f78e5320422 100644
--- a/src/Client/ClientBase.h
+++ b/src/Client/ClientBase.h
@@ -9,6 +9,8 @@
#include
#include
#include
+#include
+#include
namespace po = boost::program_options;
@@ -113,6 +115,7 @@ private:
void sendData(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query);
void sendDataFrom(ReadBuffer & buf, Block & sample,
const ColumnsDescription & columns_description, ASTPtr parsed_query);
+ void sendDataFromPipe(Pipe&& pipe, ASTPtr parsed_query);
void sendExternalTables(ASTPtr parsed_query);
void initBlockOutputStream(const Block & block, ASTPtr parsed_query);
diff --git a/tests/queries/0_stateless/02048_parallel_reading_from_infile.reference b/tests/queries/0_stateless/02048_parallel_reading_from_infile.reference
new file mode 100644
index 00000000000..00750edc07d
--- /dev/null
+++ b/tests/queries/0_stateless/02048_parallel_reading_from_infile.reference
@@ -0,0 +1 @@
+3
diff --git a/tests/queries/0_stateless/02048_parallel_reading_from_infile.sh b/tests/queries/0_stateless/02048_parallel_reading_from_infile.sh
new file mode 100755
index 00000000000..ee408fc2abb
--- /dev/null
+++ b/tests/queries/0_stateless/02048_parallel_reading_from_infile.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+
+CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
+# shellcheck source=../shell_config.sh
+. "$CURDIR"/../shell_config.sh
+
+set -e
+
+[ -e "${CLICKHOUSE_TMP}"/test_infile_parallel.gz ] && rm "${CLICKHOUSE_TMP}"/test_infile_parallel.gz
+[ -e "${CLICKHOUSE_TMP}"/test_infile_parallel ] && rm "${CLICKHOUSE_TMP}"/test_infile_parallel
+[ -e "${CLICKHOUSE_TMP}"/test_infile_parallel ] && rm "${CLICKHOUSE_TMP}"/test_infile_parallel_1
+[ -e "${CLICKHOUSE_TMP}"/test_infile_parallel ] && rm "${CLICKHOUSE_TMP}"/test_infile_parallel_2
+
+echo "Hello" > "${CLICKHOUSE_TMP}"/test_infile_parallel
+echo "Hello" > "${CLICKHOUSE_TMP}"/test_infile_parallel_1
+echo "Hello" > "${CLICKHOUSE_TMP}"/test_infile_parallel_2
+
+gzip "${CLICKHOUSE_TMP}"/test_infile_parallel
+
+${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test_infile_parallel;"
+${CLICKHOUSE_CLIENT} --query "CREATE TABLE test_infile_parallel (word String) ENGINE=Memory();"
+${CLICKHOUSE_CLIENT} --query "INSERT INTO test_infile_parallel FROM INFILE '${CLICKHOUSE_TMP}/test_infile_parallel*' FORMAT CSV;"
+${CLICKHOUSE_CLIENT} --query "SELECT count() FROM test_infile_parallel;"
From 2384108ea5084caf47199648a25874a1d4ab381d Mon Sep 17 00:00:00 2001
From: Filatenkov Artur <58165623+FArthur-cmd@users.noreply.github.com>
Date: Fri, 22 Oct 2021 15:32:24 +0300
Subject: [PATCH 02/39] remove trailing whitespaces
---
src/Client/ClientBase.cpp | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp
index 183236cee6b..4c47142dd93 100644
--- a/src/Client/ClientBase.cpp
+++ b/src/Client/ClientBase.cpp
@@ -871,7 +871,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
StoragePtr storage = StorageFile::create(in_file, global_context->getUserFilesPath(), args);
storage->startup();
SelectQueryInfo query_info;
-
+
try
{
sendDataFromPipe(
@@ -883,7 +883,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
{},
DEFAULT_BLOCK_SIZE,
getNumberOfPhysicalCPUCores()
- ),
+ ),
parsed_query
);
}
From 20c5e9d3217926911a0891ec9e9d277a0196db4a Mon Sep 17 00:00:00 2001
From: Artur <613623@mail.ru>
Date: Tue, 26 Oct 2021 15:16:58 +0000
Subject: [PATCH 03/39] refactoring, adding tests for local
---
src/Client/ClientBase.cpp | 22 ++++++++++++++-----
src/Client/ClientBase.h | 2 +-
...048_parallel_reading_from_infile.reference | 1 +
.../02048_parallel_reading_from_infile.sh | 7 ++++++
4 files changed, 25 insertions(+), 7 deletions(-)
diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp
index 4c47142dd93..af9e1ae9889 100644
--- a/src/Client/ClientBase.cpp
+++ b/src/Client/ClientBase.cpp
@@ -9,11 +9,14 @@
#include
#include
#include
+#include "Common/Exception.h"
#include "Common/getNumberOfPhysicalCPUCores.h"
+#include "Common/tests/gtest_global_context.h"
#include "Columns/ColumnString.h"
#include "Columns/ColumnsNumber.h"
#include "Core/Block.h"
#include "Core/Protocol.h"
+#include "Formats/FormatFactory.h"
#if !defined(ARCADIA_BUILD)
# include
@@ -826,6 +829,13 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query)
{
+ /// Get columns description from variable or (if it was empty) create it from sample.
+ auto columns_description_for_query = columns_description.empty() ? ColumnsDescription(sample.getNamesAndTypesList()) : columns_description;
+ if (columns_description_for_query.empty())
+ {
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Column description is empty and it can't be built from sample from table. Cannot execute query.");
+ }
+
/// If INSERT data must be sent.
auto * parsed_insert_query = parsed_query->as();
if (!parsed_insert_query)
@@ -860,11 +870,11 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
/// Create temporary storage file, to support globs and parallel reading
StorageFile::CommonArguments args{
WithContext(global_context),
- StorageID("_from_infile", table_name),
+ parsed_insert_query->table_id,
parsed_insert_query->format,
- std::nullopt /*format settings*/,
+ getFormatSettings(global_context),
compression_method,
- columns_description,
+ columns_description_for_query,
ConstraintsDescription{},
String{},
};
@@ -881,7 +891,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
query_info,
global_context,
{},
- DEFAULT_BLOCK_SIZE,
+ global_context->getSettingsRef().max_block_size,
getNumberOfPhysicalCPUCores()
),
parsed_query
@@ -899,7 +909,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
ReadBufferFromMemory data_in(parsed_insert_query->data, parsed_insert_query->end - parsed_insert_query->data);
try
{
- sendDataFrom(data_in, sample, columns_description, parsed_query);
+ sendDataFrom(data_in, sample, columns_description_for_query, parsed_query);
}
catch (Exception & e)
{
@@ -924,7 +934,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
/// Send data read from stdin.
try
{
- sendDataFrom(std_in, sample, columns_description, parsed_query);
+ sendDataFrom(std_in, sample, columns_description_for_query, parsed_query);
}
catch (Exception & e)
{
diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h
index 3fdeac6b943..7f4373dddb6 100644
--- a/src/Client/ClientBase.h
+++ b/src/Client/ClientBase.h
@@ -122,7 +122,7 @@ private:
void sendData(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query);
void sendDataFrom(ReadBuffer & buf, Block & sample,
const ColumnsDescription & columns_description, ASTPtr parsed_query);
- void sendDataFromPipe(Pipe&& pipe, ASTPtr parsed_query);
+ void sendDataFromPipe(Pipe && pipe, ASTPtr parsed_query);
void sendExternalTables(ASTPtr parsed_query);
void initBlockOutputStream(const Block & block, ASTPtr parsed_query);
diff --git a/tests/queries/0_stateless/02048_parallel_reading_from_infile.reference b/tests/queries/0_stateless/02048_parallel_reading_from_infile.reference
index 00750edc07d..a5c8806279f 100644
--- a/tests/queries/0_stateless/02048_parallel_reading_from_infile.reference
+++ b/tests/queries/0_stateless/02048_parallel_reading_from_infile.reference
@@ -1 +1,2 @@
3
+3
diff --git a/tests/queries/0_stateless/02048_parallel_reading_from_infile.sh b/tests/queries/0_stateless/02048_parallel_reading_from_infile.sh
index ee408fc2abb..d300ebe8b99 100755
--- a/tests/queries/0_stateless/02048_parallel_reading_from_infile.sh
+++ b/tests/queries/0_stateless/02048_parallel_reading_from_infile.sh
@@ -21,3 +21,10 @@ ${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test_infile_parallel;"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE test_infile_parallel (word String) ENGINE=Memory();"
${CLICKHOUSE_CLIENT} --query "INSERT INTO test_infile_parallel FROM INFILE '${CLICKHOUSE_TMP}/test_infile_parallel*' FORMAT CSV;"
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM test_infile_parallel;"
+
+${CLICKHOUSE_LOCAL} --multiquery <
Date: Tue, 26 Oct 2021 22:05:34 +0300
Subject: [PATCH 04/39] hsts-max-age
---
.../server-configuration-parameters/settings.md | 2 +-
.../server-configuration-parameters/settings.md | 10 ++++++++++
2 files changed, 11 insertions(+), 1 deletion(-)
diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md
index cdf49678570..963393187b2 100644
--- a/docs/en/operations/server-configuration-parameters/settings.md
+++ b/docs/en/operations/server-configuration-parameters/settings.md
@@ -366,7 +366,7 @@ Opens `https://tabix.io/` when accessing `http://localhost: http_port`.