わずか5日間で Kafka スキルをレベルアップ | ストリーミングシーズンに参加
Our new cat, Snowy, is waking early. She is startled by the noise of jets flying over our house. Can I determine which plane is upsetting her by utilizing Apache Kafka®, KSQL (the streaming SQL engine for Apache Kafka) and a Raspberry Pi? Perhaps a nice dashboard will distract her and let me get a bit more sleep.
Planes to graphs using Kafka and KSQL
Aircraft determine their position using GPS receivers. An onboard transponder periodically transmits the position along with aircraft identity string, altitude and speed using short radio transmissions. These automatic dependent surveillance-broadcast (ADS-B) transmissions are data packets that can be freely received by ground stations.
A tiny computer, such as a Raspberry Pi, and a handful of supporting components are all that are required to receive aircraft transponder messages of the planes flying over my house.
These aircraft transmissions are not coordinated—the messages appear as a jumble of interwoven transmissions. Unraveling these mixed data streams is complex , like trying to understand a conversation when everyone’s talking at the same time at a party. Therefore, I decided to use a combination of Kafka and KSQL to find the plane that’s upsetting my cat.
Here is Snowy looking wide awake.
To capture the aircraft transmissions, I used a Raspberry Pi and a RTL2832U , which is a USB dongle originally sold to watch digital TV on a computer. On the Pi, I installed dump1090 , a program which accesses ADS-B data via the RTL2832U and a small antennae.
Raspberry Pi and RTL2832U as a software-defined radio (SDR)
Now that I’ve got a stream of raw ADS-B signals, we need to look at the traffic. The Raspberry Pi is a little underpowered for serious computation, so I need to offload processing to my local Kafka cluster.
Received messages are separated into either a location update message or identification message. A location update would be of the form airframe 7c6db8 is flying at 6,250 feet at location -33.8,151.0. An identification message will be of the form airframe 7c451c is operating flight route QJE1726.
A small Python script running on the Raspberry Pi separates the incoming ADS-B messages. The Confluent REST Proxy is used to relay producer data from the Raspberry Pi into the location-topic and ident-topic Kafka topics. The proxy provides a RESTful interface to a Kafka cluster, making it easy to produce messages by simply invoking a REST call on the Pi.I wanted to understand what the planes were and what routes they were flying. OpenFlights has a database of airframes which allow the mapping of an airframe International Civil Aviation Organization (ICAO) code, such as 7C6DB8, to an aircraft type—a Boeing 737, in this case. I loaded this mapping into the icao-to-aircraft topic.
KSQL provides a “SQL engine” that enables real-time data processing against Apache Kafka topics. For example, to look up an airframe code, such as code 7C6DB8, we can query our aircraft topic like this:
CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO');
ksql> SELECT manufacturer, aircraft, registration
FROM icao_to_aircraft
WHERE icao = '7C6DB8';
Boeing | B738 | VH-VYI
Similarly, into the callsign-details topic I loaded callsign details (i.e., QFA563 is a Qantas flight from Brisbane to Sydney).
CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN');ksql> SELECT operatorname, fromairport, toairport
FROM callsign_details
WHERE callsign = 'QFA563';Qantas | Brisbane | Sydney
Let’s have a peek at the location-topic stream. You can see a steady stream of incoming messages reporting location updates from passing aircraft.
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic{"ico":"7C6DB8","height":"6250","location":"-33.807724,151.091495"}
The equivalent KSQL syntax is:
ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'), \ ico, height, location \ FROM location_stream \ WHERE ico = '7C6DB8';2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495
The real power of KSQL comes from combining the incoming stream of location data against the reference data topics (see 03_ksql.sq)—that is, adding useful details to the raw data stream. This is very similar to a left join in a traditional database. The result is another Kafka topic—produced without a single line of Java code!
CREATE STREAM location_and_details_stream AS \ SELECT l.ico, l.height, l.location, t.aircraft \ FROM location_stream l \ LEFT JOIN icao_to_aircraft t ON l.ico = t.icao;
And you also get a KSQL query. The stream looks like this:
ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , manufacturer \ , aircraft \ , registration \ , height \ , location \ FROM location_and_details_stream; 18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052 18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049 18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048
Equally, we can combine the incoming callsign identity stream against the static callsign_details topic:
CREATE STREAM ident_callsign_stream AS \ SELECT i.ico \ , c.operatorname \ , c.callsign \ , c.fromairport \ , c.toairport \ FROM ident_stream i \ LEFT JOIN callsign_details c ON i.indentification = c.callsign;ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss')
, operatorname
, callsign
, fromairport
, toairport
FROM ident_callsign_stream ; 18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns 18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney 18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland
We now have two enriched topics:
With these constantly updating topics, we can make some pretty dashboards. I used Kafka Connect to pump the Kafka topics that KSQL populates into Elasticsearch (full scripts).
Here’s a sample of dashboards displaying aircraft location on a map. In addition, you can see a bar chart of manufactures, altitude line plot and destination word-cloud. A heat map shows areas where aircraft are concentrated, where we might hear increased noise levels.
Kibana display
What I woke up to this morning…
Today, my cat woke me at a little past 6:00 a.m. Can I use KSQL to find a low flying aircraft (below 3,500 feet) around this time?
select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss') , manufacturer , aircraft , registration , height from location_and_details_stream where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm');
2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0 2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0
Terrific, I can locate a plane over my house at 6:15 a.m. It’s an Airbus A380 (which is a huge plane) on a flight to Dubai that’s waking my cat.
Developing the stream processing with KSQL allowed for some rapid weekend development. Plus, KSQL allows for quick discovery of interesting data events. My cat Snowy, however, might take a bit of convincing.
Got a similar pet problem, or just curious about how Kafka and KSQL work together? You can find a combined Docker setup, helper images, KSQL and Elastic setup on GitHub.
Simon Aubury is a data engineer architect from Sydney, Australia. This guest post is based on his article Using KSQL, Apache Kafka, a Rasperry Pi and a software defined radio to find the plane that wakes my cat. |
Skai completely revamped its interactive, ad-campaign dashboard by adding Apache Kafka and an in-memory database—eventually moving the solution to Confluent Cloud. Once on the Cloud, they devised an ingenious architecture for reducing the number of topics they needed.
Turning events into outcomes at scale is not easy! It starts with knowing what events are actually meaningful to your business or customer’s journey and capturing them. At Confluent, we have a good sense of what these critical events or moments are.