MTA - Finding the Best Way Around


Finding the Best Way Around Utilizing Real-Time Transit Data for Travel Optimization. There are a lot of factors involved in determining how you can find our way around and avoid delays, bad weather, dangers and expenses. In this article we will focus on public transport in the largest transit system in the United States, the MTA, the is focused around New York City. Utilizing public and semi-public data feeds, this can be extended to most city and metropolitan areas around the world. If you don't need a deep dive, grab the source code, sign up for an MTA developer account and start running. GitHub - tspannhw/FLaNK-MTA: MTA Data Sources MTA Data Sources as well as other New York, New Jersey and Pennsylvania Data NYC Traffic Cameras TRANSCOM Feeds…github.com Photo by Michael Jin on Unsplash


Once we get our first sample data back from the MTA, we will want to see how we should transform, convert, enrich and finalize it for usage in streaming tables with Flink SQL, but also keep that same contract with Kafka consumers, Iceberg tables and other users of this data. A consistent data contract starts with a data schema defining names, nullability and type. Schema Registry for our schema{ "type": "record", "name": "mta", "namespace": "org.apache.nifi", "fields": [ { "name": "StopPointRef", "type": [ "string", "null" ] }, { "name": "VehicleRef", "type": [ "string", "null" ] }, { "name": "ProgressRate", "type": [ "string", "null" ] }, { "name": "ExpectedDepartureTime", "type": [ "string", "null" ] }, { "name": "StopPoint", "type": [ "string", "null" ] }, { "name": "VisitNumber", "type": [ "string", "null" ] }, { "name": "DataFrameRef", "type": [ "string", "null" ] }, { "name": "StopPointName", "type": [ "string", "null" ] }, { "name": "SituationSimpleRef5", "type": [ "string", "null" ] }, { "name": "SituationSimpleRef3", "type": [ "string", "null" ] }, { "name": "Bearing", "type": [ "string", "null" ] }, { "name": "SituationSimpleRef4", "type": [ "string", "null" ] }, { "name": "SituationSimpleRef1", "type": [ "string", "null" ] }, { "name": "OriginAimedDepartureTime", "type": [ "string", "null" ] }, { "name": "SituationSimpleRef2", "type": [ "string", "null" ] }, { "name": "JourneyPatternRef", "type": [ "string", "null" ] }, { "name": "RecordedAtTime", "type": [ "string", "null" ] }, { "name": "OperatorRef", "type": [ "string", "null" ] }, { "name": "DestinationName", "type": [ "string", "null" ] }, { "name": "ExpectedArrivalTime", "type": [ "string", "null" ] }, { "name": "BlockRef", "type": [ "string", "null" ] }, { "name": "LineRef", "type": [ "string", "null" ] }, { "name": "VehicleLocationLongitude", "type": [ "string", "null" ] }, { "name": "DirectionRef", "type": [ "string", "null" ] }, { "name": "ArrivalProximityText", "type": [ "string", "null" ] }, { "name": "DistanceFromStop", "type": [ "string", "null" ] }, { "name": "EstimatedPassengerCapacity", "type": [ "string", "null" ] }, { "name": "AimedArrivalTime", "type": [ "string", "null" ] }, { "name": "PublishedLineName", "type": [ "string", "null" ] }, { "name": "DatedVehicleJourneyRef", "type": [ "string", "null" ] }, { "name": "Date", "type": [ "string", "null" ] }, { "name": "Monitored", "type": [ "string", "null" ] }, { "name": "ProgressStatus", "type": [ "string", "null" ] }, { "name": "DestinationRef", "type": [ "string", "null" ] }, { "name": "EstimatedPassengerCount", "type": [ "string", "null" ] }, { "name": "VehicleLocationLatitude", "type": [ "string", "null" ] }, { "name": "OriginRef", "type": [ "string", "null" ] }, { "name": "NumberOfStopsAway", "type": [ "string", "null" ] }, { "name": "ts", "type": [ "string", "null" ] }, { "name": "uuid", "type": [ "string", "null" ] } ] } The easiest and most widely used option is an Apache Avro schema in JSON format. So we'll use that and everyone can be happy with our consistency. PARAMETERS When you build a NiFi flow, you should parameterize anything you may want to change when you deploy it to another environment or production. This is a great way to make this reusable. We do that for ReadyFlows. Do this and profit! Attribute List StopPointRef,ExpectedDepartureTime,DataFrameRef,DestinationName,ExpectedArrivalTime,LineRef,VehicleLocationLongitude,DirectionRef,ArrivalProximityText,EstimatedPassengerCapacity,AimedArrivalTime,PublishedLineName,DatedVehicleJourneyRef,Monitored,ProgressStatus,EstimatedPassengerCount,VehicleLocationLatitude,VehicleRef,ProgressRate,StopPoint,VisitNumber,StopPointName,Bearing,OriginAimedDepartureTime,JourneyPatternRef,RecordedAtTime,OperatorRef,BlockRef,DistanceFromStop,SituationSimpleRef1,SituationSimpleRef2,SituationSimpleRef3,SituationSimpleRef4,SituationSimpleRef5,Date,DestinationRef,OriginRef,NumberOfStopsAway Destination Topic mta KAFKABROKERS kafka:9092 KAFKACLIENTID nifi-mta-local MTA_URL api.prod.obanyc.com/api/siri/vehicle-monito.. SCHEMANAME mta SQLQUERY SELECT * FROM FLOWFILE

NiFi DataFlow The Flow starts here… Send our data to KafkaNow that you have seen the high-level flow, let's walk through the settings for each processor. Step 1: InvokeHTTP - Get the REST Data Step 2: QueryRecord Step 3: SplitRecord Step 4: EvaluateJSONPath - Get Fields we want Step 5: AttributesToJSON - Build our new file format Step 6: UpdateRecord - Add timestamp and primary key


A better option is to build utilizing Cloudera DataFlow Designer and start from a pre-tested ReadyFlow. (DataFlow SuperPower)


Apache NiFi Flows To Load to Cloudera Data Flow cdp df import-flow-definition \ --name "MTA" \ --description "MTA" \ --file "<>/MTA.json" \ --comments "Initial Version"

/MTA.json" \ - comments "Initial Version" " tabindex="0" role="button" style="box-sizing: border-box; position: relative; display: flex !important; padding: 0px !important; font-size: 14px; font-weight: var( - base-text-weight-medium, 500); line-height: 20px; white-space: nowrap; vertical-align: middle; cursor: pointer; user-select: none; border: 0px; border-radius: 6px; appearance: none; color: var( - color-accent-fg); background-color: transparent; box-shadow: none; transition: color 80ms cubic-bezier(0.33, 1, 0.68, 1) 0s, background-color, box-shadow, border-color; justify-content: center !important; align-items: center !important; margin: var( - base-size-8, 8px) !important; width: var( - control-small-size, 28px); height: var( - control-small-size, 28px);"> NiFi Toolkit cd nifi-toolkit-1.20.0 bin/cli.sh session set nifi.props base.props nifi list-param-contexts -u localhost:8443 -ot simple NiFi Production Deployment as a DataFlow on Kubernetes - First Deploy Name it

NiFi Production Deployment as a DataFlow on Kubernetes - Monitoring

Top Level Deployment Monitoring for all of my flows

During Deployment You Set Your Parameters That You Can Change After Deployment As Well

References datainmotion.dev/2020/09/devops-working-wit.. nifi.apache.org/docs/nifi-docs/html/toolkit.. docs.cloudera.com/cdp-public-cloud/cloud/cl.. nifi.apache.org/docs/nifi-docs/html/toolkit.. docs.cloudera.com/cdf-datahub/7.2.14/regist.. nifi.apache.org/docs/nifi-docs/html/walkthr...


Streams Messaging Manager for Kafka Management and VisibilityThanks to our schema, we can see and inspect our MTA events very easily inside of SMM so we know our data will be good for Kafka consumers like Flink SQL, Spark, NiFi and Spring.


SQL Stream Builder (Flink SQL) select mta.VehicleRef, mta.StopPointName, mta.Bearing, mta.DestinationName, mta.ExpectedArrivalTime, mta.VehicleLocationLatitude, mta.VehicleLocationLongitude, mta.ArrivalProximityText, mta.DistanceFromStop, mta.AimedArrivalTime, mta.Date, mta.ts, mta.uuid, mta.EstimatedPassengerCapacity, mta.EstimatedPassengerCount from sr1.default_database.mta Flink SQL via SQL Stream Builder

Once we have a built a query that works, we can instantly add a REST output sink for it. Click Create Materialized View and pick your query, patterns, fields and hit Create.

Test SSB REST API The raw data is made available for any REST consumer including JavaScript web pages, Python code and more. SSB REST Feed for our Flink SQL queryTo start a local Jupyter notebook, just type the following: jupyter notebook localhost:8888/tree import json import pandas df = pandas.read_json('localhost:18131/api/v1/query/5204/mta?key=2..) df.info() df We can work with our streaming data via REST as JSON in a Jupyter Notebook. The example code I used is shown above and also stored in the github repo. Jupyter Notebook for reading MTA event streams via REST and Pandas

Reference cloudera.com/downloads/cdf/csp-community-ed.. towardsdatascience.com/json-and-apis-with-p.. dzone.com/articles/tracking-aircraft-in-rea.. github.com/tspannhw/FLiP-Py-ADS-B github.com/tspannhw/pulsar-adsb-function medium.com/@tspann/tracking-aircraft-in-rea.. github.com/tspannhw/FLaNK-TravelAdvisory/bl..


Developer Docs bustime.mta.info/wiki/Developers/SIRIIntro bustime.mta.info/wiki/Developers/SIRIVehicl.. web.mta.info/developers/turnstile.html new.mta.info/developers web.mta.info/developers/resources/nyct/EES/.. web.mta.info/developers/resources/nyct/EES/.. developer.onebusaway.org/modules/onebusaway.. bustime.mta.info/wiki/Developers/CancelledT.. github.com/google/transit/blob/master/gtfs-.. github.com/OneBusAway/onebusaway-gtfs-realt.. web.mta.info/developers/developer-data-term.. github.com/MobilityData/mobility-database-c.. docs.cloudera.com/data-visualization/7/ref-.. github.com/MobilityData/gtfs-realtime-bindi.. swiftly-inc.stoplight.io/docs/realtime-stan.. www3.septa.org/# realtime.septa.org/schedules dev.socrata.com/foundry/data.cityofnewyork... data.cityofnewyork.us/Transportation/Real-T..

GTFS Real-Time Feed api.mta.info/GTFS.pdf

Data web.mta.info/status/serviceStatus.txt web.mta.info/developers/data/nyct/turnstile.. web.mta.info/status/ServiceStatusSubway.xml web.mta.info/status/ServiceStatusBus.xml web.mta.info/developers/data/nyct/plannedwo.. web.mta.info/developers/data/lirr/lirr_gtfs.. web.mta.info/developers/data/lirr/lirr_gtfs.. data.ny.gov/resource/i9wp-a4ja.json data.cityofnewyork.us/resource/i4gi-tjb9.json Bicycle Count data.cityofnewyork.us/resource/uczf-rk3c.json data.ny.gov/resource/vxuj-8kew.json web.mta.info/developers/fare.html data.cityofnewyork.us/resource/xjfq-wh2d.json data.cityofnewyork.us/resource/erdf-2akx.json data.cityofnewyork.us/resource/8vv7-7wx3.json data.cityofnewyork.us/resource/h9gi-nx95.json data.cityofnewyork.us/resource/f55k-p6yu.json web.mta.info/developers/data/nyct/subway/St..

GTFS Real-Time Data api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. api-endpoint.mta.info/Dataservice/mtagtfsfe.. github.com/MobilityData/mobility-database-c.. gtfsrt.prod.obanyc.com/tripUpdates gtfsrt.prod.obanyc.com/alerts

More Data is Streaming

More Data To get the list of and metadata for the agencies covered by MTA Bus Time, use: bustime.mta.info/api/where/agencies-with-co.. To get the list of and metadata for the MTA NYCT and MTABC routes covered by MTA Bus Time, use: bustime.mta.info/api/where/routes-for-agenc.. For information on one specific stop served by MTA Bus Time, use: bustime.mta.info/api/where/stop/MTA_STOP-ID.. For information on the stops that serve a route, use bustime.mta.info/api/where/stops-for-route/..{{/html}} For information on stops near a location, use bustime.mta.info/api/where/stops-for-locati.. data.ny.gov/browse?category=Transportation&..

Data References github.com/KatsuteDev/OneMTA siri.org.uk/schema/schemas.htm

Thanks for following this initial build, next steps to add more data streams and identify the best way around by looking at MTA, NJ Transit, roads, planes and boats. Until then time to REST.