See <https://github.com/toddwschneider/nyc-taxi-data> and <http://tech.marksblogg.com/billion-nyc-taxi-rides-redshift.html> for description of the dataset and loading instructions.
Downloading will result in about 227 GB of uncompressed data in CSV files. The download takes about an hour over a 1 Gbit connection (parallel downloading from s3.amazonaws.com recovers at least half of a 1 Gbit channel).
Some of the files might not download fully. Check the file sizes and re-download any that seem doubtful.
Some of the files might contain invalid rows. You can fix them as follows:
```bash
sed -E '/(.*,){18,}/d' data/yellow_tripdata_2010-02.csv > data/yellow_tripdata_2010-02.csv_
sed -E '/(.*,){18,}/d' data/yellow_tripdata_2010-03.csv > data/yellow_tripdata_2010-03.csv_
Then the data must be pre-processed in PostgreSQL. This will create selections of points in the polygons (to match points on the map with the boroughs of New York City) and combine all the data into a single denormalized flat table by using a JOIN. To do this, you will need to install PostgreSQL with PostGIS support.
Be careful when running `initialize_database.sh` and manually re-check that all the tables were created correctly.
It takes about 20-30 minutes to process each month's worth of data in PostgreSQL, for a total of about 48 hours.
You can check the number of downloaded rows as follows:
```text
time psql nyc-taxi-data -c "SELECT count(*) FROM trips;"
## count
1298979494
(1 row)
real 7m9.164s
```
(This is slightly more than 1.1 billion rows reported by Mark Litwintschik in a series of blog posts.)
LEFT JOIN central_park_weather_observations_raw weather
ON weather.date = trips.pickup_datetime::date
LEFT JOIN nyct2010 pick_up
ON pick_up.gid = trips.pickup_nyct2010_gid
LEFT JOIN nyct2010 drop_off
ON drop_off.gid = trips.dropoff_nyct2010_gid
) TO '/opt/milovidov/nyc-taxi-data/trips.tsv';
```
The data snapshot is created at a speed of about 50 MB per second. While creating the snapshot, PostgreSQL reads from the disk at a speed of about 28 MB per second.
This takes about 5 hours. The resulting TSV file is 590612904969 bytes.
Create a temporary table in ClickHouse:
```sql
CREATE TABLE trips
(
trip_id UInt32,
vendor_id String,
pickup_datetime DateTime,
dropoff_datetime Nullable(DateTime),
store_and_fwd_flag Nullable(FixedString(1)),
rate_code_id Nullable(UInt8),
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count Nullable(UInt8),
trip_distance Nullable(Float64),
fare_amount Nullable(Float32),
extra Nullable(Float32),
mta_tax Nullable(Float32),
tip_amount Nullable(Float32),
tolls_amount Nullable(Float32),
ehail_fee Nullable(Float32),
improvement_surcharge Nullable(Float32),
total_amount Nullable(Float32),
payment_type Nullable(String),
trip_type Nullable(UInt8),
pickup Nullable(String),
dropoff Nullable(String),
cab_type Nullable(String),
precipitation Nullable(UInt8),
snow_depth Nullable(UInt8),
snowfall Nullable(UInt8),
max_temperature Nullable(UInt8),
min_temperature Nullable(UInt8),
average_wind_speed Nullable(UInt8),
pickup_nyct2010_gid Nullable(UInt8),
pickup_ctlabel Nullable(String),
pickup_borocode Nullable(UInt8),
pickup_boroname Nullable(String),
pickup_ct2010 Nullable(String),
pickup_boroct2010 Nullable(String),
pickup_cdeligibil Nullable(FixedString(1)),
pickup_ntacode Nullable(String),
pickup_ntaname Nullable(String),
pickup_puma Nullable(String),
dropoff_nyct2010_gid Nullable(UInt8),
dropoff_ctlabel Nullable(String),
dropoff_borocode Nullable(UInt8),
dropoff_boroname Nullable(String),
dropoff_ct2010 Nullable(String),
dropoff_boroct2010 Nullable(String),
dropoff_cdeligibil Nullable(String),
dropoff_ntacode Nullable(String),
dropoff_ntaname Nullable(String),
dropoff_puma Nullable(String)
) ENGINE = Log;
```
It is needed for converting fields to more correct data types and, if possible, to eliminate NULLs.
```text
time clickhouse-client --query="INSERT INTO trips FORMAT TabSeparated" <trips.tsv
real 75m56.214s
```
Data is read at a speed of 112-140 Mb/second.
Loading data into a Log type table in one stream took 76 minutes.
The data in this table uses 142 GB.
(Importing data directly from Postgres is also possible using ` COPY ... TO PROGRAM`.)
Unfortunately, all the fields associated with the weather (precipitation...average_wind_speed) were filled with NULL. Because of this, we will remove them from the final data set.
To start, we'll create a table on a single server. Later we will make the table distributed.
toUInt16(ifNull(dropoff_puma, '0')) AS dropoff_puma
FROM trips
```
This takes 3030 seconds at a speed of about 428,000 rows per second.
To load it faster, you can create the table with the `Log` engine instead of `MergeTree`. In this case, the download works faster than 200 seconds.
The table uses 126 GB of disk space.
```text
:) SELECT formatReadableSize(sum(bytes)) FROM system.parts WHERE table = 'trips_mergetree' AND active
SELECT formatReadableSize(sum(bytes))
FROM system.parts
WHERE (table = 'trips_mergetree') AND active
┌─formatReadableSize(sum(bytes))─┐
│ 126.18 GiB │
└────────────────────────────────┘
```
Among other things, you can run the OPTIMIZE query on MergeTree. But it's not required, since everything will be fine without it.
## Results on single server
Q1:
```sql
SELECT cab_type, count(*) FROM trips_mergetree GROUP BY cab_type
```
0.490 seconds.
Q2:
```sql
SELECT passenger_count, avg(total_amount) FROM trips_mergetree GROUP BY passenger_count
```
1.224 seconds.
Q3:
```sql
SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree GROUP BY passenger_count, year
```
2.104 seconds.
Q4:
```sql
SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*)
FROM trips_mergetree
GROUP BY passenger_count, year, distance
ORDER BY year, count(*) DESC
```
3.593 seconds.
The following server was used:
Two Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz, 16 physical kernels total,
128 GiB RAM,
8x6 TB HD on hardware RAID-5
Execution time is the best of three runsBut starting from the second run, queries read data from the file system cache. No further caching occurs: the data is read out and processed in each run.