Merge pull request #29680 from azat/defines

Cleanup common defines
This commit is contained in:
Nikita Mikhaylov 2021-10-04 20:13:29 +03:00 committed by GitHub
commit b31d11478b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 182 additions and 203 deletions

View File

@ -15,31 +15,18 @@
# define ch_has_feature __has_feature
#endif
#if defined(_MSC_VER)
# if !defined(likely)
# define likely(x) (x)
# endif
# if !defined(unlikely)
# define unlikely(x) (x)
# endif
#else
# if !defined(likely)
# define likely(x) (__builtin_expect(!!(x), 1))
# endif
# if !defined(unlikely)
# define unlikely(x) (__builtin_expect(!!(x), 0))
# endif
#if !defined(likely)
# define likely(x) (__builtin_expect(!!(x), 1))
#endif
#if !defined(unlikely)
# define unlikely(x) (__builtin_expect(!!(x), 0))
#endif
#if defined(_MSC_VER)
# define ALWAYS_INLINE __forceinline
# define NO_INLINE static __declspec(noinline)
# define MAY_ALIAS
#else
# define ALWAYS_INLINE __attribute__((__always_inline__))
# define NO_INLINE __attribute__((__noinline__))
# define MAY_ALIAS __attribute__((__may_alias__))
#endif
// more aliases: https://mailman.videolan.org/pipermail/x264-devel/2014-May/010660.html
#define ALWAYS_INLINE __attribute__((__always_inline__))
#define NO_INLINE __attribute__((__noinline__))
#define MAY_ALIAS __attribute__((__may_alias__))
#if !defined(__x86_64__) && !defined(__aarch64__) && !defined(__PPC__)
# error "The only supported platforms are x86_64 and AArch64, PowerPC (work in progress)"
@ -117,6 +104,11 @@
# define ALWAYS_INLINE_NO_SANITIZE_UNDEFINED ALWAYS_INLINE
#endif
#if !__has_include(<sanitizer/asan_interface.h>) || !defined(ADDRESS_SANITIZER)
# define ASAN_UNPOISON_MEMORY_REGION(a, b)
# define ASAN_POISON_MEMORY_REGION(a, b)
#endif
/// A template function for suppressing warnings about unused variables or function results.
template <typename... Args>
constexpr void UNUSED(Args &&... args [[maybe_unused]])

View File

@ -1,20 +1,5 @@
#include <base/demangle.h>
#if defined(_MSC_VER)
DemangleResult tryDemangle(const char *)
{
return DemangleResult{};
}
std::string demangle(const char * name, int & status)
{
status = 0;
return name;
}
#else
#include <stdlib.h>
#include <cxxabi.h>
@ -39,6 +24,3 @@ std::string demangle(const char * name, int & status)
return name;
}
#endif

View File

@ -19,22 +19,14 @@ void * mremap_fallback(
return MAP_FAILED;
}
#if defined(_MSC_VER)
void * new_address = ::operator new(new_size);
#else
void * new_address = mmap(nullptr, new_size, mmap_prot, mmap_flags, mmap_fd, mmap_offset);
if (MAP_FAILED == new_address)
return MAP_FAILED;
#endif
memcpy(new_address, old_address, old_size);
#if defined(_MSC_VER)
delete old_address;
#else
if (munmap(old_address, old_size))
abort();
#endif
return new_address;
}

View File

@ -2,9 +2,7 @@
#include <cstddef>
#include <sys/types.h>
#if !defined(_MSC_VER)
#include <sys/mman.h>
#endif
#ifdef MREMAP_MAYMOVE

View File

@ -1,25 +1,24 @@
#include "Keeper.h"
#include <sys/stat.h>
#include <pwd.h>
#include <Common/ClickHouseRevision.h>
#include <Server/ProtocolServerAdapter.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/DNSResolver.h>
#include <Interpreters/DNSCacheUpdater.h>
#include <Poco/Net/NetException.h>
#include <Poco/Net/TCPServerParams.h>
#include <Poco/Net/TCPServer.h>
#include <base/defines.h>
#include <Coordination/Defines.h>
#include <filesystem>
#include <IO/UseSSL.h>
#include <Core/ServerUUID.h>
#include <base/logger_useful.h>
#include <base/ErrorHandlers.h>
#include <base/scope_guard.h>
#include <Poco/Net/NetException.h>
#include <Poco/Net/TCPServerParams.h>
#include <Poco/Net/TCPServer.h>
#include <Poco/Util/HelpFormatter.h>
#include <Poco/Version.h>
#include <Poco/Environment.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Core/ServerUUID.h>
#include <filesystem>
#include <IO/UseSSL.h>
#include <sys/stat.h>
#include <pwd.h>
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
@ -31,6 +30,7 @@
# include <Poco/Net/SecureServerSocket.h>
#endif
#include <Server/ProtocolServerAdapter.h>
#include <Server/KeeperTCPHandlerFactory.h>
#if defined(OS_LINUX)

View File

@ -0,0 +1,3 @@
#pragma once
#define KEEPER_DEFAULT_PATH "/var/lib/clickhouse-keeper/"

View File

@ -1,4 +1,5 @@
#include <Coordination/KeeperServer.h>
#include <Coordination/Defines.h>
#if !defined(ARCADIA_BUILD)
# include "config_core.h"

View File

@ -1,4 +1,5 @@
#include <Coordination/KeeperStateManager.h>
#include <Coordination/Defines.h>
#include <Common/Exception.h>
#include <filesystem>

View File

@ -2,19 +2,11 @@
#include <base/defines.h>
#define DBMS_DEFAULT_HOST "localhost"
#define DBMS_DEFAULT_PORT 9000
#define DBMS_DEFAULT_SECURE_PORT 9440
#define DBMS_DEFAULT_HTTP_PORT 8123
#define DBMS_DEFAULT_CONNECT_TIMEOUT_SEC 10
#define DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_MS 50
#define DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_SECURE_MS 100
#define DBMS_DEFAULT_SEND_TIMEOUT_SEC 300
#define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300
#define DBMS_DEFAULT_DRAIN_TIMEOUT_SEC 3
/// Timeouts for hedged requests.
#define DBMS_DEFAULT_HEDGED_CONNECTION_TIMEOUT_MS 100
#define DBMS_DEFAULT_RECEIVE_DATA_TIMEOUT_MS 2000
/// Timeout for synchronous request-result protocol call (like Ping or TablesStatus).
#define DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC 5
#define DBMS_DEFAULT_POLL_INTERVAL 10
@ -42,8 +34,6 @@
#define DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC 5
#define DEFAULT_PERIODIC_LIVE_VIEW_REFRESH_SEC 60
#define SHOW_CHARS_ON_SYNTAX_ERROR ptrdiff_t(160)
#define DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC 15
#define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 1024
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
/// each period reduces the error counter by 2 times
/// too short a period can cause errors to disappear immediately after creation.
@ -51,76 +41,17 @@
/// replica error max cap, this is to prevent replica from accumulating too many errors and taking to long to recover.
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT 1000
#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
#define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060
#define DBMS_MIN_REVISION_WITH_TABLES_STATUS 54226
#define DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE 54337
#define DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME 54372
#define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401
#define DBMS_MIN_REVISION_WITH_SERVER_LOGS 54406
#define DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA 54415
/// Minimum revision with exactly the same set of aggregation methods and rules to select them.
/// Two-level (bucketed) aggregation is incompatible if servers are inconsistent in these rules
/// (keys will be placed in different buckets and result will not be fully aggregated).
#define DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 54431
#define DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA 54410
#define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405
#define DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO 54420
/// Minimum revision supporting SettingsBinaryFormat::STRINGS.
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
#define DBMS_MIN_REVISION_WITH_SCALARS 54429
/// Minimum revision supporting OpenTelemetry
#define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54442
#define DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION 1
/// Minimum revision supporting interserver secret.
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441
#define DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO 54443
#define DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO 54447
#define DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH 54448
/// Version of ClickHouse TCP protocol.
///
/// Should be incremented manually on protocol changes.
///
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
#define DBMS_TCP_PROTOCOL_VERSION 54449
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
/// Maximum namber of http-connections between two endpoints
/// Maximum number of http-connections between two endpoints
/// the number is unmotivated
#define DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT 15
#define DBMS_DEFAULT_PATH "/var/lib/clickhouse/"
#define KEEPER_DEFAULT_PATH "/var/lib/clickhouse-keeper/"
// more aliases: https://mailman.videolan.org/pipermail/x264-devel/2014-May/010660.html
/// Marks that extra information is sent to a shard. It could be any magic numbers.
#define DBMS_DISTRIBUTED_SIGNATURE_HEADER 0xCAFEDACEull
#define DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT 0xCAFECABEull
#if !__has_include(<sanitizer/asan_interface.h>) || !defined(ADDRESS_SANITIZER)
# define ASAN_UNPOISON_MEMORY_REGION(a, b)
# define ASAN_POISON_MEMORY_REGION(a, b)
#endif
/// Actually, there may be multiple acquisitions of different locks for a given table within one query.
/// Check with IStorage class for the list of possible locks
#define DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC 120

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/types.h>
#include <Core/ProtocolDefines.h>
namespace DB

View File

@ -0,0 +1,48 @@
#pragma once
#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
#define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060
#define DBMS_MIN_REVISION_WITH_TABLES_STATUS 54226
#define DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE 54337
#define DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME 54372
#define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401
#define DBMS_MIN_REVISION_WITH_SERVER_LOGS 54406
#define DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA 54415
/// Minimum revision with exactly the same set of aggregation methods and rules to select them.
/// Two-level (bucketed) aggregation is incompatible if servers are inconsistent in these rules
/// (keys will be placed in different buckets and result will not be fully aggregated).
#define DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 54431
#define DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA 54410
#define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405
#define DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO 54420
/// Minimum revision supporting SettingsBinaryFormat::STRINGS.
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
#define DBMS_MIN_REVISION_WITH_SCALARS 54429
/// Minimum revision supporting OpenTelemetry
#define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54442
#define DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION 1
/// Minimum revision supporting interserver secret.
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441
#define DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO 54443
#define DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO 54447
#define DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH 54448
/// Version of ClickHouse TCP protocol.
///
/// Should be incremented manually on protocol changes.
///
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
#define DBMS_TCP_PROTOCOL_VERSION 54449
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449

View File

@ -51,14 +51,14 @@ class IColumn;
M(UInt64, max_query_size, DBMS_DEFAULT_MAX_QUERY_SIZE, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)", 0) \
M(UInt64, interactive_delay, 100000, "The interval in microseconds to check if the request is cancelled, and to send progress info.", 0) \
M(Seconds, connect_timeout, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, "Connection timeout if there are no replicas.", 0) \
M(Milliseconds, connect_timeout_with_failover_ms, DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_MS, "Connection timeout for selecting first healthy replica.", 0) \
M(Milliseconds, connect_timeout_with_failover_secure_ms, DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_SECURE_MS, "Connection timeout for selecting first healthy replica (for secure connections).", 0) \
M(Milliseconds, connect_timeout_with_failover_ms, 50, "Connection timeout for selecting first healthy replica.", 0) \
M(Milliseconds, connect_timeout_with_failover_secure_ms, 100, "Connection timeout for selecting first healthy replica (for secure connections).", 0) \
M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "", 0) \
M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "", 0) \
M(Seconds, drain_timeout, DBMS_DEFAULT_DRAIN_TIMEOUT_SEC, "", 0) \
M(Seconds, drain_timeout, 3, "", 0) \
M(Seconds, tcp_keep_alive_timeout, 290 /* less than DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC */, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \
M(Milliseconds, hedged_connection_timeout_ms, DBMS_DEFAULT_HEDGED_CONNECTION_TIMEOUT_MS, "Connection timeout for establishing connection with replica for Hedged requests", 0) \
M(Milliseconds, receive_data_timeout_ms, DBMS_DEFAULT_RECEIVE_DATA_TIMEOUT_MS, "Connection timeout for receiving first packet of data or packet with positive progress from replica", 0) \
M(Milliseconds, hedged_connection_timeout_ms, 100, "Connection timeout for establishing connection with replica for Hedged requests", 0) \
M(Milliseconds, receive_data_timeout_ms, 2000, "Connection timeout for receiving first packet of data or packet with positive progress from replica", 0) \
M(Bool, use_hedged_requests, true, "Use hedged requests for distributed queries", 0) \
M(Bool, allow_changing_replica_until_first_data_packet, false, "Allow HedgedConnections to change replica until receiving first data packet", 0) \
M(Milliseconds, queue_max_wait_ms, 0, "The wait time in the request queue, if the number of concurrent requests exceeds the maximum.", 0) \
@ -68,7 +68,7 @@ class IColumn;
M(Milliseconds, rabbitmq_max_wait_ms, 5000, "The wait time for reading from RabbitMQ before retry.", 0) \
M(UInt64, poll_interval, DBMS_DEFAULT_POLL_INTERVAL, "Block at the query wait loop on the server for the specified number of seconds.", 0) \
M(UInt64, idle_connection_timeout, 3600, "Close idle TCP connections after specified number of seconds.", 0) \
M(UInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.", 0) \
M(UInt64, distributed_connections_pool_size, 1024, "Maximum number of connections with one remote server in the pool.", 0) \
M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_min_upload_part_size, 32*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
@ -413,7 +413,7 @@ class IColumn;
M(UInt64, distributed_replica_max_ignored_errors, 0, "Number of errors that will be ignored while choosing replicas", 0) \
\
M(Bool, allow_experimental_live_view, false, "Enable LIVE VIEW. Not mature enough.", 0) \
M(Seconds, live_view_heartbeat_interval, DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC, "The heartbeat interval in seconds to indicate live query is alive.", 0) \
M(Seconds, live_view_heartbeat_interval, 15, "The heartbeat interval in seconds to indicate live query is alive.", 0) \
M(UInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.", 0) \
M(UInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \
\

View File

@ -1,4 +1,4 @@
#include <Core/Defines.h>
#include <Core/ProtocolDefines.h>
#include <Core/Block.h>
#include <IO/WriteHelpers.h>

View File

@ -0,0 +1,72 @@
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <Core/ProtocolDefines.h>
namespace DB
{
/// To read the data that was flushed into the temporary data file.
TemporaryFileStream::TemporaryFileStream(const std::string & path)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
{}
TemporaryFileStream::TemporaryFileStream(const std::string & path, const Block & header_)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header_, 0))
{}
/// Flush data from input stream into file for future reading
void TemporaryFileStream::write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec)
{
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {}));
NativeBlockOutputStream output(compressed_buf, 0, header);
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingPipelineExecutor executor(pipeline);
output.writePrefix();
Block block;
while (executor.pull(block))
output.write(block);
output.writeSuffix();
compressed_buf.finalize();
}
TemporaryFileLazySource::TemporaryFileLazySource(const std::string & path_, const Block & header_)
: ISource(header_)
, path(path_)
, done(false)
{}
Chunk TemporaryFileLazySource::generate()
{
if (done)
return {};
if (!stream)
stream = std::make_unique<TemporaryFileStream>(path, header);
auto block = stream->block_in->read();
if (!block)
{
done = true;
stream.reset();
}
return Chunk(block.getColumns(), block.rows());
}
}

View File

@ -1,16 +1,10 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Processors/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Processors/QueryPipelineBuilder.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <DataStreams/IBlockStream_fwd.h>
namespace DB
{
@ -22,72 +16,28 @@ struct TemporaryFileStream
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
explicit TemporaryFileStream(const std::string & path)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
{}
TemporaryFileStream(const std::string & path, const Block & header_)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header_, 0))
{}
explicit TemporaryFileStream(const std::string & path);
TemporaryFileStream(const std::string & path, const Block & header_);
/// Flush data from input stream into file for future reading
static void write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec)
{
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {}));
NativeBlockOutputStream output(compressed_buf, 0, header);
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingPipelineExecutor executor(pipeline);
output.writePrefix();
Block block;
while (executor.pull(block))
output.write(block);
output.writeSuffix();
compressed_buf.finalize();
}
static void write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec);
};
class TemporaryFileLazySource : public ISource
{
public:
TemporaryFileLazySource(const std::string & path_, const Block & header_)
: ISource(header_)
, path(path_)
, done(false)
{}
TemporaryFileLazySource(const std::string & path_, const Block & header_);
String getName() const override { return "TemporaryFileLazySource"; }
protected:
Chunk generate() override
{
if (done)
return {};
if (!stream)
stream = std::make_unique<TemporaryFileStream>(path, header);
auto block = stream->block_in->read();
if (!block)
{
done = true;
stream.reset();
}
return Chunk(block.getColumns(), block.rows());
}
Chunk generate() override;
private:
const std::string path;
Block header;
bool done;
std::unique_ptr<TemporaryFileStream> stream;
};

View File

@ -4,6 +4,7 @@
#include <IO/WriteBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Core/ProtocolDefines.h>
namespace DB

View File

@ -23,9 +23,6 @@
#endif
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
namespace DB
{
/** Perform HTTP POST request and provide response to read.

View File

@ -24,6 +24,7 @@
#include <IO/Operators.h>
#include <Interpreters/JIT/compileFunction.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Core/ProtocolDefines.h>
namespace ProfileEvents

View File

@ -3,7 +3,7 @@
#include <IO/WriteBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Core/Defines.h>
#include <Core/ProtocolDefines.h>
#include <base/getFQDNOrHostName.h>
#include <unistd.h>

View File

@ -3,6 +3,7 @@
#include <IO/WriteBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Core/ProtocolDefines.h>
namespace DB
{

View File

@ -5,6 +5,7 @@
#include <Processors/Pipe.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <DataStreams/materializeBlock.h>
#include <Core/ProtocolDefines.h>
namespace ProfileEvents
{

View File

@ -0,0 +1,5 @@
#pragma once
/// Marks that extra information is sent to a shard. It could be any magic numbers.
#define DBMS_DISTRIBUTED_SIGNATURE_HEADER 0xCAFEDACEull
#define DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT 0xCAFECABEull

View File

@ -16,6 +16,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/Distributed/Defines.h>
#include <Storages/StorageDistributed.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>

View File

@ -1,5 +1,6 @@
#include <Storages/Distributed/DistributedSink.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/Distributed/Defines.h>
#include <Storages/StorageDistributed.h>
#include <Disks/StoragePolicy.h>