diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index 6d530339a2d..e3160b57eaa 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -68,6 +68,7 @@ struct Settings : public SettingsCollection M(SettingUInt64, idle_connection_timeout, 3600, "Close idle TCP connections after specified number of seconds.") \ M(SettingUInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.") \ M(SettingUInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.") \ + M(SettingUInt64, s3_min_upload_part_size, 512*1024*1024, "The mininum size of part to upload during multipart upload to S3.") \ M(SettingBool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.") \ M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.") \ M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.") \ diff --git a/dbms/src/Storages/StorageS3.cpp b/dbms/src/Storages/StorageS3.cpp index c135a16775a..ae774030c41 100644 --- a/dbms/src/Storages/StorageS3.cpp +++ b/dbms/src/Storages/StorageS3.cpp @@ -79,13 +79,13 @@ namespace public: StorageS3BlockOutputStream(const Poco::URI & uri, const String & format, + UInt64 min_upload_part_size, const Block & sample_block_, const Context & context, const ConnectionTimeouts & timeouts) : sample_block(sample_block_) { - auto minimum_upload_part_size = context.getConfigRef().getUInt64("s3.minimum_upload_part_size", 512 * 1024 * 1024); - write_buf = std::make_unique(uri, minimum_upload_part_size, timeouts); + write_buf = std::make_unique(uri, min_upload_part_size, timeouts); writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); } @@ -124,6 +124,7 @@ StorageS3::StorageS3( const std::string & database_name_, const std::string & table_name_, const String & format_name_, + UInt64 min_upload_part_size_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, Context & context_) @@ -133,6 +134,7 @@ StorageS3::StorageS3( , format_name(format_name_) , database_name(database_name_) , table_name(table_name_) + , min_upload_part_size(min_upload_part_size_) { setColumns(columns_); setConstraints(constraints_); @@ -171,7 +173,7 @@ void StorageS3::rename(const String & /*new_path_to_db*/, const String & new_dat BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const Context & /*context*/) { return std::make_shared( - uri, format_name, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global)); + uri, format_name, min_upload_part_size, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global)); } void registerStorageS3(StorageFactory & factory) @@ -193,7 +195,9 @@ void registerStorageS3(StorageFactory & factory) String format_name = engine_args[1]->as().value.safeGet(); - return StorageS3::create(uri, args.database_name, args.table_name, format_name, args.columns, args.constraints, args.context); + UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size; + + return StorageS3::create(uri, args.database_name, args.table_name, format_name, min_upload_part_size, args.columns, args.constraints, args.context); }); } } diff --git a/dbms/src/Storages/StorageS3.h b/dbms/src/Storages/StorageS3.h index 05a69f439d0..65cd65458c6 100644 --- a/dbms/src/Storages/StorageS3.h +++ b/dbms/src/Storages/StorageS3.h @@ -21,6 +21,7 @@ public: const std::string & database_name_, const std::string & table_name_, const String & format_name_, + UInt64 min_upload_part_size_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, Context & context_); @@ -59,6 +60,7 @@ private: String format_name; String database_name; String table_name; + UInt64 min_upload_part_size; }; } diff --git a/dbms/src/TableFunctions/TableFunctionS3.cpp b/dbms/src/TableFunctions/TableFunctionS3.cpp index 31a66a91af2..849836b0498 100644 --- a/dbms/src/TableFunctions/TableFunctionS3.cpp +++ b/dbms/src/TableFunctions/TableFunctionS3.cpp @@ -10,7 +10,8 @@ StoragePtr TableFunctionS3::getStorage( const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name) const { Poco::URI uri(source); - return StorageS3::create(uri, getDatabaseName(), table_name, format, columns, ConstraintsDescription{}, global_context); + UInt64 min_upload_part_size = global_context.getSettingsRef().s3_min_upload_part_size; + return StorageS3::create(uri, getDatabaseName(), table_name, format, min_upload_part_size, columns, ConstraintsDescription{}, global_context); } void registerTableFunctionS3(TableFunctionFactory & factory) diff --git a/dbms/tests/integration/test_storage_s3/configs/min_chunk_size.xml b/dbms/tests/integration/test_storage_s3/configs/min_chunk_size.xml deleted file mode 100644 index 2a9c465a7b8..00000000000 --- a/dbms/tests/integration/test_storage_s3/configs/min_chunk_size.xml +++ /dev/null @@ -1,5 +0,0 @@ - - - 1000000 - - diff --git a/dbms/tests/integration/test_storage_s3/test.py b/dbms/tests/integration/test_storage_s3/test.py index 84f6bf72f60..c5e7d2a7cf1 100644 --- a/dbms/tests/integration/test_storage_s3/test.py +++ b/dbms/tests/integration/test_storage_s3/test.py @@ -34,7 +34,7 @@ def put_communication_data(started_cluster, body): def started_cluster(): try: cluster = ClickHouseCluster(__file__) - instance = cluster.add_instance("dummy", config_dir="configs", main_configs=["configs/min_chunk_size.xml"]) + instance = cluster.add_instance("dummy") cluster.start() cluster.communication_port = 10000 @@ -64,9 +64,9 @@ def started_cluster(): cluster.shutdown() -def run_query(instance, query, stdin=None): +def run_query(instance, query, stdin=None, settings=None): logging.info("Running query '{}'...".format(query)) - result = instance.query(query, stdin=stdin) + result = instance.query(query, stdin=stdin, settings=settings) logging.info("Query finished") return result @@ -151,7 +151,7 @@ def test_multipart_put(started_cluster): long_data = [[i, i+1, i+2] for i in range(100000)] long_values = "".join([ "{},{},{}\n".format(x,y,z) for x, y, z in long_data ]) put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') format CSV".format(started_cluster.mock_host, started_cluster.multipart_preserving_data_port, started_cluster.bucket, format) - run_query(instance, put_query, stdin=long_values) + run_query(instance, put_query, stdin=long_values, settings={'s3_min_upload_part_size': 1000000}) data = get_communication_data(started_cluster) assert "multipart_received_data" in data received_data = data["multipart_received_data"] diff --git a/dbms/tests/integration/test_storage_s3/test_server.py b/dbms/tests/integration/test_storage_s3/test_server.py index 8896af9c23e..08a1904d1f2 100644 --- a/dbms/tests/integration/test_storage_s3/test_server.py +++ b/dbms/tests/integration/test_storage_s3/test_server.py @@ -33,6 +33,7 @@ logging.getLogger().addHandler(logging.StreamHandler()) communication_port = int(sys.argv[1]) bucket = sys.argv[2] + def GetFreeTCPPortsAndIP(n): result = [] sockets = [] @@ -53,6 +54,7 @@ def GetFreeTCPPortsAndIP(n): redirecting_preserving_data_port ), localhost = GetFreeTCPPortsAndIP(5) + data = { "redirecting_to_http_port": redirecting_to_http_port, "preserving_data_port": preserving_data_port,