Merge branch 'master' into pvs-studio-integration

This commit is contained in:
Alexey Milovidov 2018-06-11 05:45:36 +03:00
commit 2d05757e62
159 changed files with 1800 additions and 1523 deletions

3
.gitignore vendored
View File

@ -13,6 +13,9 @@
/docs/en_single_page/
/docs/ru_single_page/
/docs/venv/
/docs/build/
/docs/en/development/build/
/docs/ru/development/build/
# callgrind files
callgrind.out.*

View File

@ -55,6 +55,7 @@ add_headers_and_sources(dbms src/Storages)
add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Client)
add_headers_and_sources(dbms src/Formats)
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})

View File

@ -52,7 +52,7 @@
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/AsynchronousBlockInputStream.h>

View File

@ -13,3 +13,4 @@ add_subdirectory (AggregateFunctions)
add_subdirectory (Client)
add_subdirectory (TableFunctions)
add_subdirectory (Analyzers)
add_subdirectory (Formats)

View File

@ -373,6 +373,7 @@ namespace ErrorCodes
extern const int TOO_MANY_ROWS_OR_BYTES = 396;
extern const int QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW = 397;
extern const int UNKNOWN_MUTATION_COMMAND = 398;
extern const int FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT = 399;
extern const int KEEPER_EXCEPTION = 999;

View File

@ -54,6 +54,11 @@ using SetResponse = ZooKeeperImpl::ZooKeeper::SetResponse;
using ListResponse = ZooKeeperImpl::ZooKeeper::ListResponse;
using CheckResponse = ZooKeeperImpl::ZooKeeper::CheckResponse;
/// Gets multiple asynchronous results
/// Each pair, the first is path, the second is response eg. CreateResponse, RemoveResponse
template <typename R>
using AsyncResponses = std::vector<std::pair<std::string, std::future<R>>>;
RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode);
RequestPtr makeRemoveRequest(const std::string & path, int version);
RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version);

View File

@ -6,7 +6,6 @@
#include <Common/FieldVisitors.h>
#include <Common/Stopwatch.h>
#include <DataStreams/TabSeparatedRowOutputStream.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <DataTypes/DataTypeString.h>

View File

@ -1,27 +0,0 @@
#include <Core/Block.h>
#include <IO/ReadBuffer.h>
#include <DataStreams/BinaryRowInputStream.h>
namespace DB
{
BinaryRowInputStream::BinaryRowInputStream(ReadBuffer & istr_, const Block & header_)
: istr(istr_), header(header_)
{
}
bool BinaryRowInputStream::read(MutableColumns & columns)
{
if (istr.eof())
return false;
size_t num_columns = columns.size();
for (size_t i = 0; i < num_columns; ++i)
header.getByPosition(i).type->deserializeBinary(*columns[i], istr);
return true;
}
}

View File

@ -1,25 +0,0 @@
#include <IO/WriteBuffer.h>
#include <Columns/IColumn.h>
#include <DataTypes/IDataType.h>
#include <DataStreams/BinaryRowOutputStream.h>
namespace DB
{
BinaryRowOutputStream::BinaryRowOutputStream(WriteBuffer & ostr_)
: ostr(ostr_)
{
}
void BinaryRowOutputStream::flush()
{
ostr.next();
}
void BinaryRowOutputStream::writeField(const IColumn & column, const IDataType & type, size_t row_num)
{
type.serializeBinary(column, row_num, ostr);
}
}

View File

@ -1,232 +0,0 @@
#include <Common/config.h>
#include <Interpreters/Context.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/TabSeparatedRowInputStream.h>
#include <DataStreams/TabSeparatedRowOutputStream.h>
#include <DataStreams/TabSeparatedRawRowOutputStream.h>
#include <DataStreams/BinaryRowInputStream.h>
#include <DataStreams/BinaryRowOutputStream.h>
#include <DataStreams/ValuesRowInputStream.h>
#include <DataStreams/ValuesRowOutputStream.h>
#include <DataStreams/PrettyBlockOutputStream.h>
#include <DataStreams/PrettyCompactBlockOutputStream.h>
#include <DataStreams/PrettySpaceBlockOutputStream.h>
#include <DataStreams/VerticalRowOutputStream.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <DataStreams/BlockInputStreamFromRowInputStream.h>
#include <DataStreams/BlockOutputStreamFromRowOutputStream.h>
#include <DataStreams/JSONRowOutputStream.h>
#include <DataStreams/JSONCompactRowOutputStream.h>
#include <DataStreams/JSONEachRowRowOutputStream.h>
#include <DataStreams/JSONEachRowRowInputStream.h>
#include <DataStreams/XMLRowOutputStream.h>
#include <DataStreams/TSKVRowOutputStream.h>
#include <DataStreams/TSKVRowInputStream.h>
#include <DataStreams/ODBCDriverBlockOutputStream.h>
#include <DataStreams/CSVRowInputStream.h>
#include <DataStreams/CSVRowOutputStream.h>
#include <DataStreams/MaterializingBlockOutputStream.h>
#include <DataStreams/FormatFactory.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataTypes/FormatSettings.h>
#if USE_CAPNP
#include <DataStreams/CapnProtoRowInputStream.h>
#endif
#include <boost/algorithm/string.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int FORMAT_IS_NOT_SUITABLE_FOR_INPUT;
extern const int UNKNOWN_FORMAT;
}
BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & buf,
const Block & sample, const Context & context, size_t max_block_size) const
{
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings;
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
format_settings.date_time_input_format = settings.date_time_input_format;
auto wrap_row_stream = [&](auto && row_stream)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(std::move(row_stream), sample, max_block_size,
settings.input_format_allow_errors_num, settings.input_format_allow_errors_ratio);
};
if (name == "Native")
{
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);
}
else if (name == "RowBinary")
{
return wrap_row_stream(std::make_shared<BinaryRowInputStream>(buf, sample));
}
else if (name == "TabSeparated" || name == "TSV") /// TSV is a synonym/alias for the original TabSeparated format
{
return wrap_row_stream(std::make_shared<TabSeparatedRowInputStream>(buf, sample, false, false, format_settings));
}
else if (name == "TabSeparatedWithNames" || name == "TSVWithNames")
{
return wrap_row_stream(std::make_shared<TabSeparatedRowInputStream>(buf, sample, true, false, format_settings));
}
else if (name == "TabSeparatedWithNamesAndTypes" || name == "TSVWithNamesAndTypes")
{
return wrap_row_stream(std::make_shared<TabSeparatedRowInputStream>(buf, sample, true, true, format_settings));
}
else if (name == "Values")
{
return wrap_row_stream(std::make_shared<ValuesRowInputStream>(buf, sample, context, format_settings));
}
else if (name == "CSV" || name == "CSVWithNames")
{
bool with_names = name == "CSVWithNames";
return wrap_row_stream(std::make_shared<CSVRowInputStream>(buf, sample, with_names, format_settings));
}
else if (name == "TSKV")
{
return wrap_row_stream(std::make_shared<TSKVRowInputStream>(buf, sample, format_settings));
}
else if (name == "JSONEachRow")
{
return wrap_row_stream(std::make_shared<JSONEachRowRowInputStream>(buf, sample, format_settings));
}
#if USE_CAPNP
else if (name == "CapnProto")
{
std::vector<String> tokens;
auto schema_and_root = settings.format_schema.toString();
boost::split(tokens, schema_and_root, boost::is_any_of(":"));
if (tokens.size() != 2)
throw Exception("Format CapnProto requires 'format_schema' setting to have a schema_file:root_object format, e.g. 'schema.capnp:Message'");
const String & schema_dir = context.getFormatSchemaPath();
return wrap_row_stream(std::make_shared<CapnProtoRowInputStream>(buf, sample, schema_dir, tokens[0], tokens[1]));
}
#endif
else if (name == "TabSeparatedRaw"
|| name == "TSVRaw"
|| name == "Pretty"
|| name == "PrettyCompact"
|| name == "PrettyCompactMonoBlock"
|| name == "PrettySpace"
|| name == "PrettyNoEscapes"
|| name == "PrettyCompactNoEscapes"
|| name == "PrettySpaceNoEscapes"
|| name == "Vertical"
|| name == "Null"
|| name == "JSON"
|| name == "JSONCompact"
|| name == "XML"
|| name == "ODBCDriver")
{
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
}
else
throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT);
}
static BlockOutputStreamPtr getOutputImpl(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context)
{
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings;
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.pretty.max_rows = settings.output_format_pretty_max_rows;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.write_statistics = settings.output_format_write_statistics;
if (name == "Native")
return std::make_shared<NativeBlockOutputStream>(buf, 0, sample);
else if (name == "RowBinary")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<BinaryRowOutputStream>(buf), sample);
else if (name == "TabSeparated" || name == "TSV")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<TabSeparatedRowOutputStream>(buf, sample, false, false, format_settings), sample);
else if (name == "TabSeparatedWithNames" || name == "TSVWithNames")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<TabSeparatedRowOutputStream>(buf, sample, true, false, format_settings), sample);
else if (name == "TabSeparatedWithNamesAndTypes" || name == "TSVWithNamesAndTypes")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<TabSeparatedRowOutputStream>(buf, sample, true, true, format_settings), sample);
else if (name == "TabSeparatedRaw" || name == "TSVRaw")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<TabSeparatedRawRowOutputStream>(buf, sample, false, false, format_settings), sample);
else if (name == "CSV" || name == "CSVWithNames")
{
bool with_names = name == "CSVWithNames";
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<CSVRowOutputStream>(buf, sample, with_names, format_settings), sample);
}
else if (name == "Pretty")
return std::make_shared<PrettyBlockOutputStream>(buf, sample, format_settings);
else if (name == "PrettyCompact")
return std::make_shared<PrettyCompactBlockOutputStream>(buf, sample, format_settings);
else if (name == "PrettyCompactMonoBlock")
{
BlockOutputStreamPtr dst = std::make_shared<PrettyCompactBlockOutputStream>(buf, sample, format_settings);
auto res = std::make_shared<SquashingBlockOutputStream>(dst, format_settings.pretty.max_rows, 0);
res->disableFlush();
return res;
}
else if (name == "PrettySpace")
return std::make_shared<PrettySpaceBlockOutputStream>(buf, sample, format_settings);
else if (name == "PrettyNoEscapes")
{
format_settings.pretty.color = false;
return std::make_shared<PrettyBlockOutputStream>(buf, sample, format_settings);
}
else if (name == "PrettyCompactNoEscapes")
{
format_settings.pretty.color = false;
return std::make_shared<PrettyCompactBlockOutputStream>(buf, sample, format_settings);
}
else if (name == "PrettySpaceNoEscapes")
{
format_settings.pretty.color = false;
return std::make_shared<PrettySpaceBlockOutputStream>(buf, sample, format_settings);
}
else if (name == "Vertical")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<VerticalRowOutputStream>(buf, sample, format_settings), sample);
else if (name == "Values")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<ValuesRowOutputStream>(buf, format_settings), sample);
else if (name == "JSON")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONRowOutputStream>(buf, sample, format_settings), sample);
else if (name == "JSONCompact")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONCompactRowOutputStream>(buf, sample, format_settings), sample);
else if (name == "JSONEachRow")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONEachRowRowOutputStream>(buf, sample, format_settings), sample);
else if (name == "XML")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<XMLRowOutputStream>(buf, sample, format_settings), sample);
else if (name == "TSKV")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TSKVRowOutputStream>(buf, sample, format_settings), sample);
else if (name == "ODBCDriver")
return std::make_shared<ODBCDriverBlockOutputStream>(buf, sample, format_settings);
else if (name == "Null")
return std::make_shared<NullBlockOutputStream>(sample);
else
throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT);
}
BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context) const
{
/** Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return std::make_shared<MaterializingBlockOutputStream>(getOutputImpl(name, buf, materializeBlock(sample), context), sample);
}
}

View File

@ -1,25 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
namespace DB
{
class Context;
/** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format.
* Note: format and compression are independent things.
*/
class FormatFactory
{
public:
BlockInputStreamPtr getInput(const String & name, ReadBuffer & buf,
const Block & sample, const Context & context, size_t max_block_size) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context) const;
};
}

View File

@ -1,26 +1,11 @@
set(SRCS )
add_executable (tab_separated_streams tab_separated_streams.cpp ${SRCS})
target_link_libraries (tab_separated_streams dbms)
add_executable (block_row_transforms block_row_transforms.cpp ${SRCS})
target_link_libraries (block_row_transforms dbms)
add_executable (expression_stream expression_stream.cpp ${SRCS})
target_link_libraries (expression_stream dbms clickhouse_storages_system)
add_executable (native_streams native_streams.cpp ${SRCS})
target_link_libraries (native_streams dbms)
add_executable (filter_stream filter_stream.cpp ${SRCS})
target_link_libraries (filter_stream dbms clickhouse_storages_system)
add_executable (filter_stream_hitlog filter_stream_hitlog.cpp ${SRCS})
target_link_libraries (filter_stream_hitlog dbms)
add_executable (sorting_stream sorting_stream.cpp ${SRCS})
target_link_libraries (sorting_stream dbms)
add_executable (union_stream2 union_stream2.cpp ${SRCS})
target_link_libraries (union_stream2 dbms)

View File

@ -8,8 +8,7 @@
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/TabSeparatedRowOutputStream.h>
#include <DataStreams/BlockOutputStreamFromRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
@ -55,17 +54,14 @@ try
in = std::make_shared<ExpressionBlockInputStream>(in, expression);
in = std::make_shared<LimitBlockInputStream>(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10));
FormatSettings format_settings;
WriteBufferFromOStream out1(std::cout);
RowOutputStreamPtr out2 = std::make_shared<TabSeparatedRowOutputStream>(out1, expression->getSampleBlock(), false, false, format_settings);
BlockOutputStreamFromRowOutputStream out(out2, expression->getSampleBlock());
BlockOutputStreamPtr out = FormatFactory::instance().getOutput("TabSeparated", out1, expression->getSampleBlock(), context);
{
Stopwatch stopwatch;
stopwatch.start();
copyData(*in, out);
copyData(*in, *out);
stopwatch.stop();
std::cout << std::fixed << std::setprecision(2)

View File

@ -9,8 +9,7 @@
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/TabSeparatedRowOutputStream.h>
#include <DataStreams/BlockOutputStreamFromRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
@ -59,18 +58,14 @@ try
in = std::make_shared<FilterBlockInputStream>(in, expression, "equals(modulo(number, 3), 1)");
in = std::make_shared<LimitBlockInputStream>(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10));
FormatSettings format_settings;
WriteBufferFromOStream ob(std::cout);
RowOutputStreamPtr out_ = std::make_shared<TabSeparatedRowOutputStream>(ob, expression->getSampleBlock(), false, false, format_settings);
BlockOutputStreamFromRowOutputStream out(out_, expression->getSampleBlock());
BlockOutputStreamPtr out = FormatFactory::instance().getOutput("TabSeparated", ob, expression->getSampleBlock(), context);
{
Stopwatch stopwatch;
stopwatch.start();
copyData(*in, out);
copyData(*in, *out);
stopwatch.stop();
std::cout << std::fixed << std::setprecision(2)

View File

@ -1,149 +0,0 @@
#include <iostream>
#include <iomanip>
#include <IO/WriteBufferFromOStream.h>
#include <Storages/StorageLog.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/TabSeparatedRowOutputStream.h>
#include <DataStreams/BlockOutputStreamFromRowOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/Context.h>
int main(int, char **)
{
using namespace DB;
try
{
NamesAndTypesList names_and_types_list
{
{"WatchID", std::make_shared<DataTypeUInt64>()},
{"JavaEnable", std::make_shared<DataTypeUInt8>()},
{"Title", std::make_shared<DataTypeString>()},
{"EventTime", std::make_shared<DataTypeDateTime>()},
{"CounterID", std::make_shared<DataTypeUInt32>()},
{"ClientIP", std::make_shared<DataTypeUInt32>()},
{"RegionID", std::make_shared<DataTypeUInt32>()},
{"UniqID", std::make_shared<DataTypeUInt64>()},
{"CounterClass", std::make_shared<DataTypeUInt8>()},
{"OS", std::make_shared<DataTypeUInt8>()},
{"UserAgent", std::make_shared<DataTypeUInt8>()},
{"URL", std::make_shared<DataTypeString>()},
{"Referer", std::make_shared<DataTypeString>()},
{"ResolutionWidth", std::make_shared<DataTypeUInt16>()},
{"ResolutionHeight", std::make_shared<DataTypeUInt16>()},
{"ResolutionDepth", std::make_shared<DataTypeUInt8>()},
{"FlashMajor", std::make_shared<DataTypeUInt8>()},
{"FlashMinor", std::make_shared<DataTypeUInt8>()},
{"FlashMinor2", std::make_shared<DataTypeString>()},
{"NetMajor", std::make_shared<DataTypeUInt8>()},
{"NetMinor", std::make_shared<DataTypeUInt8>()},
{"UserAgentMajor", std::make_shared<DataTypeUInt16>()},
{"UserAgentMinor", std::make_shared<DataTypeFixedString>(2)},
{"CookieEnable", std::make_shared<DataTypeUInt8>()},
{"JavascriptEnable", std::make_shared<DataTypeUInt8>()},
{"IsMobile", std::make_shared<DataTypeUInt8>()},
{"MobilePhone", std::make_shared<DataTypeUInt8>()},
{"MobilePhoneModel", std::make_shared<DataTypeString>()},
{"Params", std::make_shared<DataTypeString>()},
{"IPNetworkID", std::make_shared<DataTypeUInt32>()},
{"TraficSourceID", std::make_shared<DataTypeInt8>()},
{"SearchEngineID", std::make_shared<DataTypeUInt16>()},
{"SearchPhrase", std::make_shared<DataTypeString>()},
{"AdvEngineID", std::make_shared<DataTypeUInt8>()},
{"IsArtifical", std::make_shared<DataTypeUInt8>()},
{"WindowClientWidth", std::make_shared<DataTypeUInt16>()},
{"WindowClientHeight", std::make_shared<DataTypeUInt16>()},
{"ClientTimeZone", std::make_shared<DataTypeInt16>()},
{"ClientEventTime", std::make_shared<DataTypeDateTime>()},
{"SilverlightVersion1", std::make_shared<DataTypeUInt8>()},
{"SilverlightVersion2", std::make_shared<DataTypeUInt8>()},
{"SilverlightVersion3", std::make_shared<DataTypeUInt32>()},
{"SilverlightVersion4", std::make_shared<DataTypeUInt16>()},
{"PageCharset", std::make_shared<DataTypeString>()},
{"CodeVersion", std::make_shared<DataTypeUInt32>()},
{"IsLink", std::make_shared<DataTypeUInt8>()},
{"IsDownload", std::make_shared<DataTypeUInt8>()},
{"IsNotBounce", std::make_shared<DataTypeUInt8>()},
{"FUniqID", std::make_shared<DataTypeUInt64>()},
{"OriginalURL", std::make_shared<DataTypeString>()},
{"HID", std::make_shared<DataTypeUInt32>()},
{"IsOldCounter", std::make_shared<DataTypeUInt8>()},
{"IsEvent", std::make_shared<DataTypeUInt8>()},
{"IsParameter", std::make_shared<DataTypeUInt8>()},
{"DontCountHits", std::make_shared<DataTypeUInt8>()},
{"WithHash", std::make_shared<DataTypeUInt8>()},
};
Context context = Context::createGlobal();
std::string input = "SELECT UniqID, URL, CounterID, IsLink WHERE URL = 'http://mail.yandex.ru/neo2/#inbox'";
ParserSelectQuery parser;
ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0);
formatAST(*ast, std::cerr);
std::cerr << std::endl;
/// create an object of an existing hit log table
StoragePtr table = StorageLog::create(
"./", "HitLog", ColumnsDescription{names_and_types_list}, DEFAULT_MAX_COMPRESS_BLOCK_SIZE);
table->startup();
/// read from it, apply the expression, filter, and write in tsv form to the console
ExpressionAnalyzer analyzer(ast, context, nullptr, names_and_types_list);
ExpressionActionsChain chain;
analyzer.appendSelect(chain, false);
analyzer.appendWhere(chain, false);
chain.finalize();
ExpressionActionsPtr expression = chain.getLastActions();
Names column_names
{
"UniqID",
"URL",
"CounterID",
"IsLink",
};
QueryProcessingStage::Enum stage;
BlockInputStreamPtr in = table->read(column_names, {}, context, stage, 8192, 1)[0];
in = std::make_shared<FilterBlockInputStream>(in, expression, "equals(URL, 'http://mail.yandex.ru/neo2/#inbox')");
//in = std::make_shared<LimitBlockInputStream>(in, 10, 0);
FormatSettings format_settings;
WriteBufferFromOStream ob(std::cout);
RowOutputStreamPtr out_ = std::make_shared<TabSeparatedRowOutputStream>(ob, expression->getSampleBlock(), false, false, format_settings);
BlockOutputStreamFromRowOutputStream out(out_, in->getHeader());
copyData(*in, out);
}
catch (const Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}
return 0;
}

View File

@ -1,129 +0,0 @@
#include <string>
#include <iostream>
#include <fstream>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/CompressedReadBuffer.h>
#include <IO/CompressedWriteBuffer.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Storages/StorageLog.h>
#include <Interpreters/Context.h>
#include <Common/ClickHouseRevision.h>
int main(int argc, char ** argv)
try
{
using namespace DB;
NamesAndTypesList names_and_types_list
{
{"WatchID", std::make_shared<DataTypeUInt64>()},
{"JavaEnable", std::make_shared<DataTypeUInt8>()},
{"Title", std::make_shared<DataTypeString>()},
{"EventTime", std::make_shared<DataTypeDateTime>()},
{"CounterID", std::make_shared<DataTypeUInt32>()},
{"ClientIP", std::make_shared<DataTypeUInt32>()},
{"RegionID", std::make_shared<DataTypeUInt32>()},
{"UniqID", std::make_shared<DataTypeUInt64>()},
{"CounterClass", std::make_shared<DataTypeUInt8>()},
{"OS", std::make_shared<DataTypeUInt8>()},
{"UserAgent", std::make_shared<DataTypeUInt8>()},
{"URL", std::make_shared<DataTypeString>()},
{"Referer", std::make_shared<DataTypeString>()},
{"ResolutionWidth", std::make_shared<DataTypeUInt16>()},
{"ResolutionHeight", std::make_shared<DataTypeUInt16>()},
{"ResolutionDepth", std::make_shared<DataTypeUInt8>()},
{"FlashMajor", std::make_shared<DataTypeUInt8>()},
{"FlashMinor", std::make_shared<DataTypeUInt8>()},
{"FlashMinor2", std::make_shared<DataTypeString>()},
{"NetMajor", std::make_shared<DataTypeUInt8>()},
{"NetMinor", std::make_shared<DataTypeUInt8>()},
{"UserAgentMajor", std::make_shared<DataTypeUInt16>()},
{"UserAgentMinor", std::make_shared<DataTypeFixedString>(2)},
{"CookieEnable", std::make_shared<DataTypeUInt8>()},
{"JavascriptEnable", std::make_shared<DataTypeUInt8>()},
{"IsMobile", std::make_shared<DataTypeUInt8>()},
{"MobilePhone", std::make_shared<DataTypeUInt8>()},
{"MobilePhoneModel", std::make_shared<DataTypeString>()},
{"Params", std::make_shared<DataTypeString>()},
{"IPNetworkID", std::make_shared<DataTypeUInt32>()},
{"TraficSourceID", std::make_shared<DataTypeInt8>()},
{"SearchEngineID", std::make_shared<DataTypeUInt16>()},
{"SearchPhrase", std::make_shared<DataTypeString>()},
{"AdvEngineID", std::make_shared<DataTypeUInt8>()},
{"IsArtifical", std::make_shared<DataTypeUInt8>()},
{"WindowClientWidth", std::make_shared<DataTypeUInt16>()},
{"WindowClientHeight", std::make_shared<DataTypeUInt16>()},
{"ClientTimeZone", std::make_shared<DataTypeInt16>()},
{"ClientEventTime", std::make_shared<DataTypeDateTime>()},
{"SilverlightVersion1", std::make_shared<DataTypeUInt8>()},
{"SilverlightVersion2", std::make_shared<DataTypeUInt8>()},
{"SilverlightVersion3", std::make_shared<DataTypeUInt32>()},
{"SilverlightVersion4", std::make_shared<DataTypeUInt16>()},
{"PageCharset", std::make_shared<DataTypeString>()},
{"CodeVersion", std::make_shared<DataTypeUInt32>()},
{"IsLink", std::make_shared<DataTypeUInt8>()},
{"IsDownload", std::make_shared<DataTypeUInt8>()},
{"IsNotBounce", std::make_shared<DataTypeUInt8>()},
{"FUniqID", std::make_shared<DataTypeUInt64>()},
{"OriginalURL", std::make_shared<DataTypeString>()},
{"HID", std::make_shared<DataTypeUInt32>()},
{"IsOldCounter", std::make_shared<DataTypeUInt8>()},
{"IsEvent", std::make_shared<DataTypeUInt8>()},
{"IsParameter", std::make_shared<DataTypeUInt8>()},
{"DontCountHits", std::make_shared<DataTypeUInt8>()},
{"WithHash", std::make_shared<DataTypeUInt8>()},
};
Names column_names;
for (const auto & name_type : names_and_types_list)
column_names.push_back(name_type.name);
/// create an object of an existing hit log table
StoragePtr table = StorageLog::create(
"./", "HitLog", ColumnsDescription{names_and_types_list}, DEFAULT_MAX_COMPRESS_BLOCK_SIZE);
table->startup();
/// read from it
if (argc == 2 && 0 == strcmp(argv[1], "read"))
{
QueryProcessingStage::Enum stage;
BlockInputStreamPtr in = table->read(column_names, {}, Context::createGlobal(), stage, 8192, 1)[0];
WriteBufferFromFileDescriptor out1(STDOUT_FILENO);
CompressedWriteBuffer out2(out1);
NativeBlockOutputStream out3(out2, ClickHouseRevision::get(), in->getHeader());
copyData(*in, out3);
}
/// read the data from the native file and simultaneously write to the table
if (argc == 2 && 0 == strcmp(argv[1], "write"))
{
ReadBufferFromFileDescriptor in1(STDIN_FILENO);
CompressedReadBuffer in2(in1);
NativeBlockInputStream in3(in2, ClickHouseRevision::get());
BlockOutputStreamPtr out = table->write({}, {});
copyData(in3, *out);
}
return 0;
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
throw;
}

View File

@ -1,167 +0,0 @@
#include <iostream>
#include <iomanip>
#include <IO/WriteBufferFromOStream.h>
#include <Storages/StorageLog.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/TabSeparatedRowOutputStream.h>
#include <DataStreams/BlockOutputStreamFromRowOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/formatAST.h>
#include <Interpreters/Context.h>
using namespace DB;
int main(int argc, char ** argv)
try
{
NamesAndTypesList names_and_types_list
{
{"WatchID", std::make_shared<DataTypeUInt64>()},
{"JavaEnable", std::make_shared<DataTypeUInt8>()},
{"Title", std::make_shared<DataTypeString>()},
{"EventTime", std::make_shared<DataTypeDateTime>()},
{"CounterID", std::make_shared<DataTypeUInt32>()},
{"ClientIP", std::make_shared<DataTypeUInt32>()},
{"RegionID", std::make_shared<DataTypeUInt32>()},
{"UniqID", std::make_shared<DataTypeUInt64>()},
{"CounterClass", std::make_shared<DataTypeUInt8>()},
{"OS", std::make_shared<DataTypeUInt8>()},
{"UserAgent", std::make_shared<DataTypeUInt8>()},
{"URL", std::make_shared<DataTypeString>()},
{"Referer", std::make_shared<DataTypeString>()},
{"ResolutionWidth", std::make_shared<DataTypeUInt16>()},
{"ResolutionHeight", std::make_shared<DataTypeUInt16>()},
{"ResolutionDepth", std::make_shared<DataTypeUInt8>()},
{"FlashMajor", std::make_shared<DataTypeUInt8>()},
{"FlashMinor", std::make_shared<DataTypeUInt8>()},
{"FlashMinor2", std::make_shared<DataTypeString>()},
{"NetMajor", std::make_shared<DataTypeUInt8>()},
{"NetMinor", std::make_shared<DataTypeUInt8>()},
{"UserAgentMajor", std::make_shared<DataTypeUInt16>()},
{"UserAgentMinor", std::make_shared<DataTypeFixedString>(2)},
{"CookieEnable", std::make_shared<DataTypeUInt8>()},
{"JavascriptEnable", std::make_shared<DataTypeUInt8>()},
{"IsMobile", std::make_shared<DataTypeUInt8>()},
{"MobilePhone", std::make_shared<DataTypeUInt8>()},
{"MobilePhoneModel", std::make_shared<DataTypeString>()},
{"Params", std::make_shared<DataTypeString>()},
{"IPNetworkID", std::make_shared<DataTypeUInt32>()},
{"TraficSourceID", std::make_shared<DataTypeInt8>()},
{"SearchEngineID", std::make_shared<DataTypeUInt16>()},
{"SearchPhrase", std::make_shared<DataTypeString>()},
{"AdvEngineID", std::make_shared<DataTypeUInt8>()},
{"IsArtifical", std::make_shared<DataTypeUInt8>()},
{"WindowClientWidth", std::make_shared<DataTypeUInt16>()},
{"WindowClientHeight", std::make_shared<DataTypeUInt16>()},
{"ClientTimeZone", std::make_shared<DataTypeInt16>()},
{"ClientEventTime", std::make_shared<DataTypeDateTime>()},
{"SilverlightVersion1", std::make_shared<DataTypeUInt8>()},
{"SilverlightVersion2", std::make_shared<DataTypeUInt8>()},
{"SilverlightVersion3", std::make_shared<DataTypeUInt32>()},
{"SilverlightVersion4", std::make_shared<DataTypeUInt16>()},
{"PageCharset", std::make_shared<DataTypeString>()},
{"CodeVersion", std::make_shared<DataTypeUInt32>()},
{"IsLink", std::make_shared<DataTypeUInt8>()},
{"IsDownload", std::make_shared<DataTypeUInt8>()},
{"IsNotBounce", std::make_shared<DataTypeUInt8>()},
{"FUniqID", std::make_shared<DataTypeUInt64>()},
{"OriginalURL", std::make_shared<DataTypeString>()},
{"HID", std::make_shared<DataTypeUInt32>()},
{"IsOldCounter", std::make_shared<DataTypeUInt8>()},
{"IsEvent", std::make_shared<DataTypeUInt8>()},
{"IsParameter", std::make_shared<DataTypeUInt8>()},
{"DontCountHits", std::make_shared<DataTypeUInt8>()},
{"WithHash", std::make_shared<DataTypeUInt8>()},
};
using NamesAndTypesMap = std::map<String, DataTypePtr>;
NamesAndTypesMap names_and_types_map;
for (const auto & name_type : names_and_types_list)
names_and_types_map.emplace(name_type.name, name_type.type);
std::string input = "SELECT UniqID, URL, CounterID, IsLink";
ParserSelectQuery parser;
ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0);
formatAST(*ast, std::cerr);
std::cerr << std::endl;
/// create an object of an existing hit log table
StoragePtr table = StorageLog::create(
"./", "HitLog", ColumnsDescription{names_and_types_list}, DEFAULT_MAX_COMPRESS_BLOCK_SIZE);
table->startup();
/// read from it, sort it, and write it in tsv form to the console
Names column_names
{
"UniqID",
"URL",
"CounterID",
"IsLink",
};
DataTypes result_types = DataTypes
{
names_and_types_map["UniqID"],
names_and_types_map["URL"],
names_and_types_map["CounterID"],
names_and_types_map["IsLink"],
};
Block sample;
for (const auto & type : result_types)
{
ColumnWithTypeAndName col;
col.type = type;
sample.insert(std::move(col));
}
SortDescription sort_columns;
sort_columns.push_back(SortColumnDescription(1, -1, -1));
sort_columns.push_back(SortColumnDescription(2, 1, 1));
sort_columns.push_back(SortColumnDescription(0, 1, 1));
sort_columns.push_back(SortColumnDescription(3, 1, 1));
QueryProcessingStage::Enum stage;
BlockInputStreamPtr in = table->read(column_names, {}, Context::createGlobal(), stage, argc == 2 ? atoi(argv[1]) : 65536, 1)[0];
in = std::make_shared<PartialSortingBlockInputStream>(in, sort_columns);
in = std::make_shared<MergeSortingBlockInputStream>(in, sort_columns, DEFAULT_BLOCK_SIZE, 0, 0, "");
//in = std::make_shared<LimitBlockInputStream>(in, 10, 0);
FormatSettings format_settings;
WriteBufferFromOStream ob(std::cout);
RowOutputStreamPtr out_ = std::make_shared<TabSeparatedRowOutputStream>(ob, sample, false, false, format_settings);
BlockOutputStreamFromRowOutputStream out(out_, sample);
copyData(*in, out);
return 0;
}
catch (const Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
throw;
}

View File

@ -7,7 +7,7 @@
#include <Common/typeid_cast.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeFactory.h>

View File

@ -5,7 +5,7 @@
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFactory.h>

View File

@ -5,7 +5,7 @@
#include <common/DateLUT.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeFactory.h>

View File

@ -1,5 +1,5 @@
#include <IO/WriteBufferFromString.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeFactory.h>
#include <Parsers/IAST.h>

View File

@ -5,7 +5,7 @@
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnConst.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeFactory.h>

View File

@ -6,7 +6,7 @@
#include <IO/WriteHelpers.h>
#include <Common/NaNUtils.h>
#include <Common/typeid_cast.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
namespace DB

View File

@ -6,7 +6,7 @@
#include <Common/typeid_cast.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFactory.h>

View File

@ -1,6 +1,6 @@
#include <Common/StringUtils/StringUtils.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFactory.h>

View File

@ -213,9 +213,9 @@ void DatabaseDictionary::shutdown()
{
}
void DatabaseDictionary::drop()
String DatabaseDictionary::getDatabaseName() const
{
/// Additional actions to delete database are not required.
return name;
}
}

View File

@ -25,6 +25,8 @@ class DatabaseDictionary : public IDatabase
public:
DatabaseDictionary(const String & name_, const Context & context);
String getDatabaseName() const override;
String getEngineName() const override
{
return "Dictionary";
@ -87,7 +89,6 @@ public:
ASTPtr getCreateDatabaseQuery(const Context & context) const override;
void shutdown() override;
void drop() override;
private:
const String name;

View File

@ -78,9 +78,9 @@ ASTPtr DatabaseMemory::getCreateDatabaseQuery(
throw Exception("There is no CREATE DATABASE query for DatabaseMemory", ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY);
}
void DatabaseMemory::drop()
String DatabaseMemory::getDatabaseName() const
{
/// Additional actions to delete database are not required.
return name;
}
}

View File

@ -19,6 +19,8 @@ class DatabaseMemory : public DatabaseWithOwnTablesBase
public:
DatabaseMemory(String name_);
String getDatabaseName() const override;
String getEngineName() const override { return "Memory"; }
void loadTables(
@ -57,8 +59,6 @@ public:
ASTPtr getCreateDatabaseQuery(const Context & context) const override;
void drop() override;
private:
Poco::Logger * log;
};

View File

@ -509,12 +509,6 @@ void DatabaseOrdinary::shutdown()
tables.clear();
}
void DatabaseOrdinary::drop()
{
/// No additional removal actions are required.
}
void DatabaseOrdinary::alterTable(
const Context & context,
const String & name,
@ -568,6 +562,14 @@ void DatabaseOrdinary::alterTable(
}
}
void DatabaseOrdinary::drop()
{
Poco::File(data_path).remove(false);
Poco::File(metadata_path).remove(false);
}
String DatabaseOrdinary::getDataPath() const
{
return data_path;
@ -578,6 +580,11 @@ String DatabaseOrdinary::getMetadataPath() const
return metadata_path;
}
String DatabaseOrdinary::getDatabaseName() const
{
return name;
}
String DatabaseOrdinary::getTableMetadataPath(const String & table_name) const
{
return detail::getTableMetadataPath(metadata_path, table_name);

View File

@ -59,12 +59,14 @@ public:
ASTPtr getCreateDatabaseQuery(const Context & context) const override;
String getDataPath() const override;
String getDatabaseName() const override;
String getMetadataPath() const override;
String getTableMetadataPath(const String & table_name) const override;
void shutdown() override;
void drop() override;
void shutdown() override;
private:
const String metadata_path;
const String data_path;

View File

@ -148,7 +148,7 @@ void DatabaseWithOwnTablesBase::shutdown()
tables_snapshot = tables;
}
for (const auto & kv: tables_snapshot)
for (const auto & kv : tables_snapshot)
{
kv.second->shutdown();
}

View File

@ -6,6 +6,9 @@
#include <ctime>
#include <memory>
#include <functional>
#include <Poco/File.h>
#include <Common/escapeForFileName.h>
#include <Interpreters/Context.h>
class ThreadPool;
@ -132,6 +135,8 @@ public:
/// Get the CREATE DATABASE query for current database.
virtual ASTPtr getCreateDatabaseQuery(const Context & context) const = 0;
/// Get name of database.
virtual String getDatabaseName() const = 0;
/// Returns path for persistent data storage if the database supports it, empty string otherwise
virtual String getDataPath() const { return {}; }
/// Returns metadata path if the database supports it, empty string otherwise
@ -142,8 +147,8 @@ public:
/// Ask all tables to complete the background threads they are using and delete all table objects.
virtual void shutdown() = 0;
/// Delete metadata, the deletion of which differs from the recursive deletion of the directory, if any.
virtual void drop() = 0;
/// Delete data and metadata stored inside the database, if exists.
virtual void drop() {}
virtual ~IDatabase() {}
};

View File

@ -1,5 +1,5 @@
#include <Dictionaries/DictionaryStructure.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/DataTypeFactory.h>
#include <Columns/IColumn.h>
#include <Common/StringUtils/StringUtils.h>

View File

@ -1,7 +1,7 @@
#pragma once
#include <string>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
#include <Columns/IColumn.h>

View File

@ -0,0 +1,45 @@
#include <Core/Block.h>
#include <IO/ReadBuffer.h>
#include <Formats/BinaryRowInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
namespace DB
{
BinaryRowInputStream::BinaryRowInputStream(ReadBuffer & istr_, const Block & header_)
: istr(istr_), header(header_)
{
}
bool BinaryRowInputStream::read(MutableColumns & columns)
{
if (istr.eof())
return false;
size_t num_columns = columns.size();
for (size_t i = 0; i < num_columns; ++i)
header.getByPosition(i).type->deserializeBinary(*columns[i], istr);
return true;
}
void registerInputFormatRowBinary(FormatFactory & factory)
{
factory.registerInputFormat("RowBinary", [](
ReadBuffer & buf,
const Block & sample,
const Context &,
size_t max_block_size,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<BinaryRowInputStream>(buf, sample),
sample, max_block_size, settings);
});
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IRowInputStream.h>
#include <Formats/IRowInputStream.h>
#include <Core/Block.h>

View File

@ -0,0 +1,40 @@
#include <IO/WriteBuffer.h>
#include <Columns/IColumn.h>
#include <DataTypes/IDataType.h>
#include <Formats/BinaryRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
namespace DB
{
BinaryRowOutputStream::BinaryRowOutputStream(WriteBuffer & ostr_)
: ostr(ostr_)
{
}
void BinaryRowOutputStream::flush()
{
ostr.next();
}
void BinaryRowOutputStream::writeField(const IColumn & column, const IDataType & type, size_t row_num)
{
type.serializeBinary(column, row_num, ostr);
}
void registerOutputFormatRowBinary(FormatFactory & factory)
{
factory.registerOutputFormat("RowBinary", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings &)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<BinaryRowOutputStream>(buf), sample);
});
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IRowOutputStream.h>
#include <Formats/IRowOutputStream.h>
namespace DB

View File

@ -1,6 +1,6 @@
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <DataStreams/BlockInputStreamFromRowInputStream.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
namespace DB
@ -23,10 +23,9 @@ BlockInputStreamFromRowInputStream::BlockInputStreamFromRowInputStream(
const RowInputStreamPtr & row_input_,
const Block & sample_,
size_t max_block_size_,
UInt64 allow_errors_num_,
Float64 allow_errors_ratio_)
const FormatSettings & settings)
: row_input(row_input_), sample(sample_), max_block_size(max_block_size_),
allow_errors_num(allow_errors_num_), allow_errors_ratio(allow_errors_ratio_)
allow_errors_num(settings.input_allow_errors_num), allow_errors_ratio(settings.input_allow_errors_ratio)
{
}

View File

@ -2,7 +2,8 @@
#include <Core/Defines.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IRowInputStream.h>
#include <Formats/FormatSettings.h>
#include <Formats/IRowInputStream.h>
namespace DB
@ -21,8 +22,7 @@ public:
const RowInputStreamPtr & row_input_,
const Block & sample_,
size_t max_block_size_,
UInt64 allow_errors_num_,
Float64 allow_errors_ratio_);
const FormatSettings & settings);
void readPrefix() override { row_input->readPrefix(); }
void readSuffix() override { row_input->readSuffix(); }

View File

@ -1,5 +1,5 @@
#include <Core/Block.h>
#include <DataStreams/BlockOutputStreamFromRowOutputStream.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
namespace DB

View File

@ -1,7 +1,7 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/IRowOutputStream.h>
#include <Formats/IRowOutputStream.h>
namespace DB

View File

@ -0,0 +1,3 @@
if (ENABLE_TESTS)
add_subdirectory (tests)
endif ()

View File

@ -1,8 +1,10 @@
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <DataStreams/verbosePrintString.h>
#include <DataStreams/CSVRowInputStream.h>
#include <Formats/verbosePrintString.h>
#include <Formats/CSVRowInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
namespace DB
@ -342,4 +344,23 @@ void CSVRowInputStream::updateDiagnosticInfo()
pos_of_current_row = istr.position();
}
void registerInputFormatCSV(FormatFactory & factory)
{
for (bool with_names : {false, true})
{
factory.registerInputFormat(with_names ? "CSVWithNames" : "CSV", [=](
ReadBuffer & buf,
const Block & sample,
const Context &,
size_t max_block_size,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<CSVRowInputStream>(buf, sample, with_names, settings),
sample, max_block_size, settings);
});
}
}
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/IRowInputStream.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/IRowInputStream.h>
#include <Formats/FormatSettings.h>
namespace DB

View File

@ -1,4 +1,6 @@
#include <DataStreams/CSVRowOutputStream.h>
#include <Formats/CSVRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <IO/WriteHelpers.h>
@ -113,4 +115,20 @@ void CSVRowOutputStream::writeExtremes()
}
void registerOutputFormatCSV(FormatFactory & factory)
{
for (bool with_names : {false, true})
{
factory.registerOutputFormat(with_names ? "CSVWithNames" : "CSV", [=](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<CSVRowOutputStream>(buf, sample, with_names, format_settings), sample);
});
}
}
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/IRowOutputStream.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/IRowOutputStream.h>
#include <Formats/FormatSettings.h>
namespace DB

View File

@ -3,7 +3,10 @@
#include <Common/escapeForFileName.h>
#include <IO/ReadBuffer.h>
#include <DataStreams/CapnProtoRowInputStream.h>
#include <Interpreters/Context.h>
#include <Formats/CapnProtoRowInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <capnp/serialize.h>
#include <capnp/dynamic.h>
@ -207,6 +210,37 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns)
return true;
}
void registerInputFormatCapnProto(FormatFactory & factory)
{
factory.registerInputFormat("CapnProto", [](
ReadBuffer & buf,
const Block & sample,
const Context & context,
size_t max_block_size,
const FormatSettings & settings)
{
std::vector<String> tokens;
auto schema_and_root = context.getSettingsRef().format_schema.toString();
boost::split(tokens, schema_and_root, boost::is_any_of(":"));
if (tokens.size() != 2)
throw Exception("Format CapnProto requires 'format_schema' setting to have a schema_file:root_object format, e.g. 'schema.capnp:Message'");
const String & schema_dir = context.getFormatSchemaPath();
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<CapnProtoRowInputStream>(buf, sample, schema_dir, tokens[0], tokens[1]),
sample, max_block_size, settings);
});
}
}
#else
namespace DB
{
class FormatFactory;
void registerInputFormatCapnProto(FormatFactory &) {}
}
#endif // USE_CAPNP

View File

@ -3,7 +3,7 @@
#if USE_CAPNP
#include <Core/Block.h>
#include <DataStreams/IRowInputStream.h>
#include <Formats/IRowInputStream.h>
#include <capnp/schema-parser.h>

View File

@ -0,0 +1,154 @@
#include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <Interpreters/Settings.h>
#include <DataStreams/MaterializingBlockOutputStream.h>
#include <Formats/FormatSettings.h>
#include <Formats/FormatFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT;
extern const int LOGICAL_ERROR;
extern const int FORMAT_IS_NOT_SUITABLE_FOR_INPUT;
extern const int FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT;
}
const FormatFactory::Creators & FormatFactory::getCreators(const String & name) const
{
auto it = dict.find(name);
if (dict.end() != it)
return it->second;
throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT);
}
BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & buf, const Block & sample, const Context & context, size_t max_block_size) const
{
const auto & input_getter = getCreators(name).first;
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings;
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
format_settings.date_time_input_format = settings.date_time_input_format;
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
return input_getter(buf, sample, context, max_block_size, format_settings);
}
BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer & buf, const Block & sample, const Context & context) const
{
const auto & output_getter = getCreators(name).second;
if (!output_getter)
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings;
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.pretty.max_rows = settings.output_format_pretty_max_rows;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.write_statistics = settings.output_format_write_statistics;
/** Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return std::make_shared<MaterializingBlockOutputStream>(
output_getter(buf, sample, context, format_settings), sample);
}
void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator)
{
auto & target = dict[name].first;
if (target)
throw Exception("FormatFactory: Input format " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = input_creator;
}
void FormatFactory::registerOutputFormat(const String & name, OutputCreator output_creator)
{
auto & target = dict[name].second;
if (target)
throw Exception("FormatFactory: Output format " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = output_creator;
}
/// Formats for both input/output.
void registerInputFormatNative(FormatFactory & factory);
void registerOutputFormatNative(FormatFactory & factory);
void registerInputFormatRowBinary(FormatFactory & factory);
void registerOutputFormatRowBinary(FormatFactory & factory);
void registerInputFormatTabSeparated(FormatFactory & factory);
void registerOutputFormatTabSeparated(FormatFactory & factory);
void registerInputFormatValues(FormatFactory & factory);
void registerOutputFormatValues(FormatFactory & factory);
void registerInputFormatCSV(FormatFactory & factory);
void registerOutputFormatCSV(FormatFactory & factory);
void registerInputFormatTSKV(FormatFactory & factory);
void registerOutputFormatTSKV(FormatFactory & factory);
void registerInputFormatJSONEachRow(FormatFactory & factory);
void registerOutputFormatJSONEachRow(FormatFactory & factory);
/// Output only (presentational) formats.
void registerOutputFormatPretty(FormatFactory & factory);
void registerOutputFormatPrettyCompact(FormatFactory & factory);
void registerOutputFormatPrettySpace(FormatFactory & factory);
void registerOutputFormatVertical(FormatFactory & factory);
void registerOutputFormatJSON(FormatFactory & factory);
void registerOutputFormatJSONCompact(FormatFactory & factory);
void registerOutputFormatXML(FormatFactory & factory);
void registerOutputFormatODBCDriver(FormatFactory & factory);
void registerOutputFormatNull(FormatFactory & factory);
/// Input only formats.
void registerInputFormatCapnProto(FormatFactory & factory);
FormatFactory::FormatFactory()
{
registerInputFormatNative(*this);
registerOutputFormatNative(*this);
registerInputFormatRowBinary(*this);
registerOutputFormatRowBinary(*this);
registerInputFormatTabSeparated(*this);
registerOutputFormatTabSeparated(*this);
registerInputFormatValues(*this);
registerOutputFormatValues(*this);
registerInputFormatCSV(*this);
registerOutputFormatCSV(*this);
registerInputFormatTSKV(*this);
registerOutputFormatTSKV(*this);
registerInputFormatJSONEachRow(*this);
registerOutputFormatJSONEachRow(*this);
registerInputFormatCapnProto(*this);
registerOutputFormatPretty(*this);
registerOutputFormatPrettyCompact(*this);
registerOutputFormatPrettySpace(*this);
registerOutputFormatVertical(*this);
registerOutputFormatJSON(*this);
registerOutputFormatJSONCompact(*this);
registerOutputFormatXML(*this);
registerOutputFormatODBCDriver(*this);
registerOutputFormatNull(*this);
}
}

View File

@ -0,0 +1,70 @@
#pragma once
#include <memory>
#include <functional>
#include <unordered_map>
#include <ext/singleton.h>
#include <Core/Types.h>
namespace DB
{
class Block;
class Context;
struct FormatSettings;
class ReadBuffer;
class WriteBuffer;
class IBlockInputStream;
class IBlockOutputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
/** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format.
* Note: format and compression are independent things.
*/
class FormatFactory final : public ext::singleton<FormatFactory>
{
private:
using InputCreator = std::function<BlockInputStreamPtr(
ReadBuffer & buf,
const Block & sample,
const Context & context,
size_t max_block_size,
const FormatSettings & settings)>;
using OutputCreator = std::function<BlockOutputStreamPtr(
WriteBuffer & buf,
const Block & sample,
const Context & context,
const FormatSettings & settings)>;
using Creators = std::pair<InputCreator, OutputCreator>;
using FormatsDictionary = std::unordered_map<String, Creators>;
public:
BlockInputStreamPtr getInput(const String & name, ReadBuffer & buf,
const Block & sample, const Context & context, size_t max_block_size) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context) const;
/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);
void registerOutputFormat(const String & name, OutputCreator output_creator);
private:
FormatsDictionary dict;
FormatFactory();
friend class ext::singleton<FormatFactory>;
const Creators & getCreators(const String & name) const;
};
}

View File

@ -53,6 +53,9 @@ struct FormatSettings
};
DateTimeInputFormat date_time_input_format = DateTimeInputFormat::Basic;
UInt64 input_allow_errors_num = 0;
Float64 input_allow_errors_ratio = 0;
};
}

View File

@ -1,4 +1,4 @@
#include <DataStreams/IRowInputStream.h>
#include <Formats/IRowInputStream.h>
#include <Common/Exception.h>

View File

@ -1,5 +1,5 @@
#include <Core/Block.h>
#include <DataStreams/IRowOutputStream.h>
#include <Formats/IRowOutputStream.h>
namespace DB

View File

@ -1,4 +1,6 @@
#include <DataStreams/JSONCompactRowOutputStream.h>
#include <Formats/JSONCompactRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <IO/WriteHelpers.h>
@ -102,4 +104,17 @@ void JSONCompactRowOutputStream::writeExtremes()
}
void registerOutputFormatJSONCompact(FormatFactory & factory)
{
factory.registerOutputFormat("JSONCompact", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<JSONCompactRowOutputStream>(buf, sample, format_settings), sample);
});
}
}

View File

@ -3,7 +3,7 @@
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferValidUTF8.h>
#include <DataStreams/JSONRowOutputStream.h>
#include <Formats/JSONRowOutputStream.h>
namespace DB

View File

@ -1,5 +1,8 @@
#include <IO/ReadHelpers.h>
#include <DataStreams/JSONEachRowRowInputStream.h>
#include <Formats/JSONEachRowRowInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
namespace DB
@ -155,4 +158,20 @@ void JSONEachRowRowInputStream::syncAfterError()
skipToUnescapedNextLineOrEOF(istr);
}
void registerInputFormatJSONEachRow(FormatFactory & factory)
{
factory.registerInputFormat("JSONEachRow", [](
ReadBuffer & buf,
const Block & sample,
const Context &,
size_t max_block_size,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<JSONEachRowRowInputStream>(buf, sample, settings),
sample, max_block_size, settings);
});
}
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/IRowInputStream.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/IRowInputStream.h>
#include <Formats/FormatSettings.h>
#include <Common/HashTable/HashMap.h>

View File

@ -1,6 +1,8 @@
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <DataStreams/JSONEachRowRowOutputStream.h>
#include <Formats/JSONEachRowRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
namespace DB
@ -48,4 +50,18 @@ void JSONEachRowRowOutputStream::writeRowEndDelimiter()
field_number = 0;
}
void registerOutputFormatJSONEachRow(FormatFactory & factory)
{
factory.registerOutputFormat("JSONEachRow", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<JSONEachRowRowOutputStream>(buf, sample, format_settings), sample);
});
}
}

View File

@ -2,8 +2,8 @@
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <DataStreams/IRowOutputStream.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/IRowOutputStream.h>
#include <Formats/FormatSettings.h>
namespace DB

View File

@ -1,6 +1,8 @@
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <DataStreams/JSONRowOutputStream.h>
#include <Formats/JSONRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
namespace DB
@ -227,4 +229,18 @@ void JSONRowOutputStream::writeStatistics()
writeCString("\t}", *ostr);
}
void registerOutputFormatJSON(FormatFactory & factory)
{
factory.registerOutputFormat("JSON", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<JSONRowOutputStream>(buf, sample, format_settings), sample);
});
}
}

View File

@ -4,8 +4,8 @@
#include <IO/Progress.h>
#include <IO/WriteBuffer.h>
#include <Common/Stopwatch.h>
#include <DataStreams/IRowOutputStream.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/IRowOutputStream.h>
#include <Formats/FormatSettings.h>
namespace DB

View File

@ -0,0 +1,34 @@
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <Formats/FormatFactory.h>
namespace DB
{
void registerInputFormatNative(FormatFactory & factory)
{
factory.registerInputFormat("Native", [](
ReadBuffer & buf,
const Block & sample,
const Context &,
size_t,
const FormatSettings &)
{
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);
});
}
void registerOutputFormatNative(FormatFactory & factory)
{
factory.registerOutputFormat("Native", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings &)
{
return std::make_shared<NativeBlockOutputStream>(buf, 0, sample);
});
}
}

View File

@ -0,0 +1,20 @@
#include <DataStreams/NullBlockOutputStream.h>
#include <Formats/FormatFactory.h>
namespace DB
{
void registerOutputFormatNull(FormatFactory & factory)
{
factory.registerOutputFormat("Null", [](
WriteBuffer &,
const Block & sample,
const Context &,
const FormatSettings &)
{
return std::make_shared<NullBlockOutputStream>(sample);
});
}
}

View File

@ -1,7 +1,8 @@
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <Core/Block.h>
#include <DataStreams/ODBCDriverBlockOutputStream.h>
#include <Formats/ODBCDriverBlockOutputStream.h>
#include <Formats/FormatFactory.h>
namespace DB
@ -57,4 +58,17 @@ void ODBCDriverBlockOutputStream::writePrefix()
}
}
void registerOutputFormatODBCDriver(FormatFactory & factory)
{
factory.registerOutputFormat("ODBCDriver", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
return std::make_shared<ODBCDriverBlockOutputStream>(buf, sample, format_settings);
});
}
}

View File

@ -2,7 +2,7 @@
#include <string>
#include <DataStreams/IBlockOutputStream.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
#include <Core/Block.h>

View File

@ -1,6 +1,7 @@
#include <sys/ioctl.h>
#include <port/unistd.h>
#include <DataStreams/PrettyBlockOutputStream.h>
#include <Formats/PrettyBlockOutputStream.h>
#include <Formats/FormatFactory.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
@ -245,4 +246,27 @@ void PrettyBlockOutputStream::writeExtremes()
}
void registerOutputFormatPretty(FormatFactory & factory)
{
factory.registerOutputFormat("Pretty", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
return std::make_shared<PrettyBlockOutputStream>(buf, sample, format_settings);
});
factory.registerOutputFormat("PrettyNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
FormatSettings changed_settings = format_settings;
changed_settings.pretty.color = false;
return std::make_shared<PrettyBlockOutputStream>(buf, sample, changed_settings);
});
}
}

View File

@ -2,7 +2,7 @@
#include <Core/Block.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
namespace DB

View File

@ -1,6 +1,8 @@
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <DataStreams/PrettyCompactBlockOutputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/PrettyCompactBlockOutputStream.h>
namespace DB
@ -120,4 +122,40 @@ void PrettyCompactBlockOutputStream::write(const Block & block)
total_rows += rows;
}
void registerOutputFormatPrettyCompact(FormatFactory & factory)
{
factory.registerOutputFormat("PrettyCompact", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
return std::make_shared<PrettyCompactBlockOutputStream>(buf, sample, format_settings);
});
factory.registerOutputFormat("PrettyCompactNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
FormatSettings changed_settings = format_settings;
changed_settings.pretty.color = false;
return std::make_shared<PrettyCompactBlockOutputStream>(buf, sample, changed_settings);
});
factory.registerOutputFormat("PrettyCompactMonoBlock", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
BlockOutputStreamPtr impl = std::make_shared<PrettyCompactBlockOutputStream>(buf, sample, format_settings);
auto res = std::make_shared<SquashingBlockOutputStream>(impl, format_settings.pretty.max_rows, 0);
res->disableFlush();
return res;
});
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/PrettyBlockOutputStream.h>
#include <Formats/PrettyBlockOutputStream.h>
namespace DB

View File

@ -1,6 +1,7 @@
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <DataStreams/PrettySpaceBlockOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/PrettySpaceBlockOutputStream.h>
namespace DB
@ -95,4 +96,28 @@ void PrettySpaceBlockOutputStream::writeSuffix()
writeExtremes();
}
void registerOutputFormatPrettySpace(FormatFactory & factory)
{
factory.registerOutputFormat("PrettySpace", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
return std::make_shared<PrettySpaceBlockOutputStream>(buf, sample, format_settings);
});
factory.registerOutputFormat("PrettySpaceNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
FormatSettings changed_settings = format_settings;
changed_settings.pretty.color = false;
return std::make_shared<PrettySpaceBlockOutputStream>(buf, sample, changed_settings);
});
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/PrettyBlockOutputStream.h>
#include <Formats/PrettyBlockOutputStream.h>
namespace DB

View File

@ -1,5 +1,7 @@
#include <IO/ReadHelpers.h>
#include <DataStreams/TSKVRowInputStream.h>
#include <Formats/TSKVRowInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
namespace DB
@ -187,4 +189,20 @@ void TSKVRowInputStream::syncAfterError()
skipToUnescapedNextLineOrEOF(istr);
}
void registerInputFormatTSKV(FormatFactory & factory)
{
factory.registerInputFormat("TSKV", [](
ReadBuffer & buf,
const Block & sample,
const Context &,
size_t max_block_size,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TSKVRowInputStream>(buf, sample, settings),
sample, max_block_size, settings);
});
}
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/IRowInputStream.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/IRowInputStream.h>
#include <Formats/FormatSettings.h>
#include <Common/HashTable/HashMap.h>

View File

@ -1,6 +1,9 @@
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <DataStreams/TSKVRowOutputStream.h>
#include <Formats/TSKVRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
namespace DB
@ -37,4 +40,17 @@ void TSKVRowOutputStream::writeRowEndDelimiter()
}
void registerOutputFormatTSKV(FormatFactory & factory)
{
factory.registerOutputFormat("TSKV", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<TSKVRowOutputStream>(buf, sample, settings), sample);
});
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <DataTypes/FormatSettings.h>
#include <DataStreams/TabSeparatedRowOutputStream.h>
#include <Formats/FormatSettings.h>
#include <Formats/TabSeparatedRowOutputStream.h>
namespace DB

View File

@ -1,7 +1,7 @@
#pragma once
#include <DataTypes/FormatSettings.h>
#include <DataStreams/TabSeparatedRowOutputStream.h>
#include <Formats/FormatSettings.h>
#include <Formats/TabSeparatedRowOutputStream.h>
namespace DB

View File

@ -2,8 +2,10 @@
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <DataStreams/TabSeparatedRowInputStream.h>
#include <DataStreams/verbosePrintString.h>
#include <Formats/TabSeparatedRowInputStream.h>
#include <Formats/verbosePrintString.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
namespace DB
@ -320,4 +322,53 @@ void TabSeparatedRowInputStream::updateDiagnosticInfo()
pos_of_current_row = istr.position();
}
void registerInputFormatTabSeparated(FormatFactory & factory)
{
for (auto name : {"TabSeparated", "TSV"})
{
factory.registerInputFormat(name, [](
ReadBuffer & buf,
const Block & sample,
const Context &,
size_t max_block_size,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TabSeparatedRowInputStream>(buf, sample, false, false, settings),
sample, max_block_size, settings);
});
}
for (auto name : {"TabSeparatedWithNames", "TSVWithNames"})
{
factory.registerInputFormat(name, [](
ReadBuffer & buf,
const Block & sample,
const Context &,
size_t max_block_size,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TabSeparatedRowInputStream>(buf, sample, true, false, settings),
sample, max_block_size, settings);
});
}
for (auto name : {"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"})
{
factory.registerInputFormat(name, [](
ReadBuffer & buf,
const Block & sample,
const Context &,
size_t max_block_size,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<TabSeparatedRowInputStream>(buf, sample, true, true, settings),
sample, max_block_size, settings);
});
}
}
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <Core/Block.h>
#include <DataTypes/FormatSettings.h>
#include <DataStreams/IRowInputStream.h>
#include <Formats/FormatSettings.h>
#include <Formats/IRowInputStream.h>
namespace DB

View File

@ -1,4 +1,7 @@
#include <DataStreams/TabSeparatedRowOutputStream.h>
#include <Formats/TabSeparatedRowOutputStream.h>
#include <Formats/TabSeparatedRawRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <IO/WriteHelpers.h>
@ -118,4 +121,59 @@ void TabSeparatedRowOutputStream::writeExtremes()
}
void registerOutputFormatTabSeparated(FormatFactory & factory)
{
for (auto name : {"TabSeparated", "TSV"})
{
factory.registerOutputFormat(name, [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<TabSeparatedRowOutputStream>(buf, sample, false, false, settings), sample);
});
}
for (auto name : {"TabSeparatedRaw", "TSVRaw"})
{
factory.registerOutputFormat(name, [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<TabSeparatedRawRowOutputStream>(buf, sample, false, false, settings), sample);
});
}
for (auto name : {"TabSeparatedWithNames", "TSVWithNames"})
{
factory.registerOutputFormat(name, [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<TabSeparatedRowOutputStream>(buf, sample, true, false, settings), sample);
});
}
for (auto name : {"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"})
{
factory.registerOutputFormat(name, [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<TabSeparatedRowOutputStream>(buf, sample, true, true, settings), sample);
});
}
}
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <Core/Block.h>
#include <DataTypes/FormatSettings.h>
#include <DataStreams/IRowOutputStream.h>
#include <Formats/FormatSettings.h>
#include <Formats/IRowOutputStream.h>
namespace DB

View File

@ -3,7 +3,9 @@
#include <Interpreters/convertFieldToType.h>
#include <Parsers/TokenIterator.h>
#include <Parsers/ExpressionListParsers.h>
#include <DataStreams/ValuesRowInputStream.h>
#include <Formats/ValuesRowInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <Common/FieldVisitors.h>
#include <Core/Block.h>
#include <Common/typeid_cast.h>
@ -144,4 +146,20 @@ bool ValuesRowInputStream::read(MutableColumns & columns)
return true;
}
void registerInputFormatValues(FormatFactory & factory)
{
factory.registerInputFormat("Values", [](
ReadBuffer & buf,
const Block & sample,
const Context & context,
size_t max_block_size,
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<ValuesRowInputStream>(buf, sample, context, settings),
sample, max_block_size, settings);
});
}
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/IRowInputStream.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/IRowInputStream.h>
#include <Formats/FormatSettings.h>
namespace DB

View File

@ -1,4 +1,6 @@
#include <DataStreams/ValuesRowOutputStream.h>
#include <Formats/ValuesRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <IO/WriteHelpers.h>
#include <Columns/IColumn.h>
@ -45,4 +47,17 @@ void ValuesRowOutputStream::writeRowBetweenDelimiter()
}
void registerOutputFormatValues(FormatFactory & factory)
{
factory.registerOutputFormat("Values", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<ValuesRowOutputStream>(buf, settings), sample);
});
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <DataTypes/FormatSettings.h>
#include <DataStreams/IRowOutputStream.h>
#include <Formats/FormatSettings.h>
#include <Formats/IRowOutputStream.h>
namespace DB

View File

@ -2,7 +2,9 @@
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <DataStreams/VerticalRowOutputStream.h>
#include <Formats/VerticalRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <Common/UTF8Helpers.h>
@ -163,4 +165,17 @@ void VerticalRowOutputStream::writeExtremes()
}
void registerOutputFormatVertical(FormatFactory & factory)
{
factory.registerOutputFormat("Vertical", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<VerticalRowOutputStream>(buf, sample, settings), sample);
});
}
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <Core/Block.h>
#include <DataTypes/FormatSettings.h>
#include <DataStreams/IRowOutputStream.h>
#include <Formats/FormatSettings.h>
#include <Formats/IRowOutputStream.h>
namespace DB

View File

@ -1,6 +1,8 @@
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <DataStreams/XMLRowOutputStream.h>
#include <Formats/XMLRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
namespace DB
@ -221,4 +223,18 @@ void XMLRowOutputStream::writeStatistics()
writeCString("\t</statistics>\n", *ostr);
}
void registerOutputFormatXML(FormatFactory & factory)
{
factory.registerOutputFormat("XML", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<XMLRowOutputStream>(buf, sample, settings), sample);
});
}
}

View File

@ -4,8 +4,8 @@
#include <IO/Progress.h>
#include <IO/WriteBuffer.h>
#include <Common/Stopwatch.h>
#include <DataTypes/FormatSettings.h>
#include <DataStreams/IRowOutputStream.h>
#include <Formats/FormatSettings.h>
#include <Formats/IRowOutputStream.h>
namespace DB

View File

@ -0,0 +1,7 @@
set(SRCS )
add_executable (tab_separated_streams tab_separated_streams.cpp ${SRCS})
target_link_libraries (tab_separated_streams dbms)
add_executable (block_row_transforms block_row_transforms.cpp ${SRCS})
target_link_libraries (block_row_transforms dbms)

View File

@ -12,10 +12,11 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/TabSeparatedRowInputStream.h>
#include <DataStreams/TabSeparatedRowOutputStream.h>
#include <DataStreams/BlockInputStreamFromRowInputStream.h>
#include <DataStreams/BlockOutputStreamFromRowOutputStream.h>
#include <Formats/TabSeparatedRowInputStream.h>
#include <Formats/TabSeparatedRowOutputStream.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <DataStreams/copyData.h>
@ -44,7 +45,7 @@ try
FormatSettings format_settings;
RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, 0);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, format_settings);
RowOutputStreamPtr row_output = std::make_shared<TabSeparatedRowOutputStream>(out_buf, sample, false, false, format_settings);
BlockOutputStreamFromRowOutputStream block_output(row_output, sample);

View File

@ -5,13 +5,16 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/TabSeparatedRowInputStream.h>
#include <DataStreams/TabSeparatedRowOutputStream.h>
#include <Formats/TabSeparatedRowInputStream.h>
#include <Formats/TabSeparatedRowOutputStream.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/BlockInputStreamFromRowInputStream.h>
#include <DataStreams/BlockOutputStreamFromRowOutputStream.h>
using namespace DB;
@ -39,7 +42,7 @@ try
RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings);
RowOutputStreamPtr row_output = std::make_shared<TabSeparatedRowOutputStream>(out_buf, sample, false, false, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, 0);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, format_settings);
BlockOutputStreamFromRowOutputStream block_output(row_output, sample);
copyData(block_input, block_output);

View File

@ -1,4 +1,4 @@
#include <DataStreams/verbosePrintString.h>
#include <Formats/verbosePrintString.h>
#include <Common/hex.h>
#include <IO/Operators.h>

View File

@ -22,7 +22,7 @@
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeInterval.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnConst.h>

View File

@ -24,7 +24,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/NumberTraits.h>
#include <DataTypes/FormatSettings.h>
#include <Formats/FormatSettings.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>

View File

@ -25,8 +25,8 @@ public:
virtual void createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast, const Context & context,
const ThrottlerPtr & throttler,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res) = 0;
};

Some files were not shown because too many files have changed in this diff Show More