[ウェビナー] Confluent と AWS を使用して生成 AI スタックを構築 | 今すぐ登録

How to Turn a REST API Into a Data Stream with Kafka and Flink

作成者 :

In the space of APIs for consuming up-to-date data (say, events or state available within an hour of occurring) many API paradigms exist. There are file- or object-based paradigms, e.g., S3 access. There’s database access, e.g., direct Snowflake access. Last, we have decoupled client-server APIs, e.g., REST APIs, gRPC, webhooks, and streaming APIs. In this context, "decoupled" means that the client usually communicates with the server over a language-agnostic standard network protocol like HTTP/S, usually receives data in a standard format like JSON, and, in contrast to direct database access, typically doesn't know what data store backs the API.

Of the above styles, more often than not, API developers settle on HTTP-based REST APIs for a number of reasons. They are incredibly popular. More developers know how to use REST APIs and are using them in production compared to other API technologies. For example, Rapid API's 2022 State of APIs reports 69.3% of survey respondents using REST APIs in production, well above the percentage using alternatives like gRPC (8.2%), GraphQL (18.6%), or webhooks (34.6%). 

Another reason developers choose REST APIs is their scalability; caching and statelessness properties provide a clear path to scale and client performance optimization. REST naturally lends itself to supporting many clients issuing many concurrent requests. REST APIs are also mature, since 20+ years of REST API adoption have resulted in a rich tooling ecosystem, including technologies like OpenAPI and its tool for generating client bindings, API management solutions like Kong Konnect to handle security and rate limiting, Postman for learning and testing APIs, and mature client libraries like Python Requests for building applications that leverage REST APIs. 

There’s also the matter of verb flexibility. While we are talking about reading up-to-date data in this blog, many applications also need to create, update, and delete data too! HTTP-based REST services can use the same tools to develop and deploy all of the verb actions. Some of the API patterns above like streaming are really only geared toward reading. While some mature engineering organizations offer APIs that span API styles (e.g., X exposes streaming and REST APIs), doing so comes at higher cost.

Regardless of why data is exposed via REST API, the bottom line is: As application developers, we don't always get to choose how to consume data. We have to use what API providers offer, and, you can “rest” assured that, more often than not, they give us REST APIs.

So, in this post, we’re going to help you bridge the gap between REST APIs and data streaming. Let’s get started with a real-world example.

The demo

The OpenSky data source represents data on flights from a multitude of sensors. It’s all retrieved from volunteer aviation enthusiasts! There’s a live API of flights that will be our data source. Next up, that data is produced to an Apache Kafka® topic in Confluent Cloud via the HTTP Source Connector. There, we can run Flink SQL statements to process the data and land it in a cleansed Kafka topic. We can consume from that table to see the results.

Note - OpenSky citation: Matthias Schäfer, Martin Strohmeier, Vincent Lenders, Ivan Martinovic and Matthias Wilhelm. "Bringing Up OpenSky: A Large-scale ADS-B Sensor Network for Research." In Proceedings of the 13th IEEE/ACM International Symposium on Information Processing in Sensor Networks (IPSN), pages 83-94, April 2014.

The data

The data is knotty: it needs to be massaged

If you make a curl request to our OpenSky endpoint:

curl -s "https://opensky-network.org/api/states/all?lamin=45.8389&lomin=5.9962&lamax=47.8229&lomax=10.5226"

Here’s the result from our terminal:

{
  "time":1710361210,
  "states":[
    ["4b1817","SWR4TH  ","Switzerland",1710361210,1710361210,8.1462,47.3854,3931.92,false,188.81,233.75,9.75,null,4038.6,"1000",false,0],
    ["4b1806","SWR6MZ  ","Switzerland",1710361210,1710361210,8.5502,47.4551,null,true,8.23,149.06,null,null,null,"1000",false,0],
    ["4b1620","SWR829  ","Switzerland",1710361210,1710361210,8.5914,47.4551,487.68,false,75.05,276.3,-4.55,null,594.36,"1000",false,0],
    ...
  ]
}

You can see that the data comes in from the API a little, but is tough to read. Let’s get that data into Flink SQL to see what we can do about it.

The steps listed here up until the SELECT * FROM all_flights; snippet outline how to get started. First, log in to the Confluent CLI, then run Confluent Flink quickstart to open up a Flink SQL shell. Launching the OpenSky Network source connector feeds data into the Flink SQL table. As in the curl response, the data fed into the table is messy.

key  time       states
║
NULL 1710358279 [[4b1803, SWR736  , Switzerland, 1710358141, 1710358237, 8.5569, 47.4543, 373.38, true, 0, 185.62, NULL, NULL, NULL, 1000, false, 0], ...]

Let’s pause here for a moment to reflect on why we’re about to process this data at this point in the pipeline.

In general, we want to avoid client-side transformation. Why is that? There are a few reasons. For one, it may not be secure to leave the cleanup to the client. What if it’s not a stream of flight data, but a stream of medical data used by a pharmaceutical company? There may be personally identifiable information (PII) in the data stream on the server that should not be filtered out by the client. Security may be the most pressing external issue in favor of keeping data cleanup on the backend, but it’s far from the only reason. Internally, software is more easily maintained when changes can be made on the server or data storage side and developments do not require large changes to the client. Data is also more understandable and less prone to erroneous usage if it is self-documenting. In our case, this means using human-readable keys like barometric_altitude so that data consumers don’t need to refer to documentation to see that barometric_altitude is the seventh item in each array (zero indexed!).

The solution: use Flink SQL

Let’s make this data readable. First, we’ll need a new Flink SQL table. Note that this table does not store the data, but the resultant and corresponding Kafka topic will hold the data:

CREATE TABLE all_flights_cleansed (
  poll_timestamp TIMESTAMP_LTZ(0),
  icao24 STRING,
  callsign STRING,
  origin_country STRING,
  event_timestamp TIMESTAMP_LTZ(0),
  longitude DECIMAL(10, 4),
  latitude DECIMAL(10, 4),
  barometric_altitude DECIMAL(10, 2),
  on_ground BOOLEAN,
  velocity_m_per_s DECIMAL(10, 2)
);

Next, we’ll run a Flink SQL query that takes data from all_flights and performs some operations on it to clean it up.

INSERT INTO all_flights_cleansed
    SELECT TO_TIMESTAMP_LTZ(`time`, 0) AS poll_timestamp,
      RTRIM(StatesTable.states[1]) AS icao24,
      RTRIM(StatesTable.states[2]) AS callsign,
      RTRIM(StatesTable.states[3]) AS origin_country,
      TO_TIMESTAMP_LTZ(CAST(StatesTable.states[4] AS NUMERIC), 0) AS event_timestamp,
      CAST(StatesTable.states[6] AS DECIMAL(10, 4)) AS longitude,
      CAST(StatesTable.states[7] AS DECIMAL(10, 4)) AS latitude,
      CAST(StatesTable.states[8] AS DECIMAL(10, 2)) AS barometric_altitude,
      CAST(StatesTable.states[9] AS BOOLEAN) AS on_ground,
      CAST(StatesTable.states[10] AS DECIMAL(10, 2)) AS velocity_m_per_s
    FROM all_flights CROSS JOIN UNNEST(all_flights.states) as StatesTable (states);

Let’s pay some special attention to the syntax.

 FROM all_flights CROSS JOIN UNNEST(all_flights.states) as StatesTable (states);

What is a CROSS JOIN UNNEST? Basically, if we have a table and one of the columns has an array value, we can take each value in that array and join it on the values in the column in another table. Below, if the “users” value for the user field on the right table is an array, we can UNNEST it into the “users” column in the left table which is CROSS JOIN’ed on emails.

Notice that the states field value here is an array, like the values for users above:

key  time       states
║
NULL 1710358279 [[4b1803, SWR736  , Switzerland, 1710358141, 1710358237, 8.5569, 47.4543, 373.38, true, 0, 185.62, NULL, NULL, NULL, 1000, false, 0], ...]

This is what necessitates our CROSS JOIN UNNEST.

Next up, what is RTRIM doing here?

RTRIM(StatesTable.states[1]) AS icao24,

It removes whitespaces from the right side of a string. As you can see, there were some instances where whitespace interrupted the legibility:

SWR736  , 

How about CAST? This one takes the type of the selected field and transforms its type.

CAST(StatesTable.states[6] AS DECIMAL(10, 4)) AS longitude,

Now, after running the query, this is what our data looks like:


poll_timestamp          icao24 callsign origin_country event_timestamp         longitude latitude barometric_altitude on_ground velocity_m_per_s         
2024-03-28 09:36:07.000 4b44a5 HBZZX    Switzerland    2024-03-28 09:33:01.000 7.3174    46.8865  777.24              FALSE     0.51

It’s cleaner and self-documenting!

The tricky parts

There were some challenges in building this demo due to a couple things.

So far we’ve discussed REST API data shape messiness but there’s also straight up flakiness. Sometimes REST APIs just don’t work consistently, which is what we found in our hunt for a REST API for this demo. You might run into a REST API that returns 500 errors frequently, for example. Luckily, the automated retriability in connectors can help resolve that issue. “Errors.retry.timeout” provides the configuration for this—by default it’s set to 0 which means no retries will be attempted. -1 means you’ll get infinite retries. A 500 error no longer borks your application by necessity—the connector will give it a retry until it receives a code from the 200 suite of response status codes.

The next challenge was parsing CROSS JOIN UNNEST.  Its complexity made drafting Flink SQL queries difficult. It was hard to visualize what the end result of a given query would be. Hopefully our diagram helps you there, but if you want to dive deeper into the syntax, our colleague Sandon Jacobs has written a blog post about it.

Conclusion

This project demonstrates the advantages of using Kafka to bridge the gap between REST APIs and streaming. Using the Kafka connector means that you can use retriability to get around unreliable REST APIs. If you send data to a Kafka topic, it’s then compatible with Flink SQL, which enables you to process the data, clean it up, and make it self-documenting if needed. Flink SQL can help you avoid client-side data massage in any scenario, not just when your data source is in a REST API. If you’re interested in learning more about Flink and Flink SQL, we recommend these resources:

このブログ記事は気に入りましたか?今すぐ共有