[Webinar] 4 Tips for Cutting Your Kafka Costs Up to 60% | Register Today

Exploring Apache Flink 1.20: Features, Improvements, and More

Written By

The Apache Flink® community released Apache Flink 1.20 this week. In this blog post, we'll highlight some of the most interesting additions and improvements. You’ll find a comprehensive rundown of all of the updates in the official release announcement.

Recent Flink releases have emphasized improving Flink’s usability, not only for developers, but also for those operating Flink clusters, and this theme continues in this latest release. Meanwhile, Flink 1.20 is expected to be the last minor release before Flink 2.0, and so this minor release brings to a conclusion the community’s efforts to deprecate APIs that should be removed or replaced in the upcoming major release, and to clean up a number of configuration options.

Improvements to Flink SQL

With Flink 1.20, we are again seeing a number of improvements in Flink SQL. Besides the general set of performance enhancements and stability fixes, the community is proud to present two headline features that should simplify interactions with tables in Flink going forward:

DISTRIBUTED BY

FLIP-376 introduces a unified way to specify a bucketing strategy for a Flink SQL table. Bucketing is another data layout mechanism in addition to the already long-supported partitioning (FLIP-63). In contrast to partitioning, with bucketing users can control the number of created groups (number of buckets), and define the data distribution strategy (e.g., hash). 

Previously, bucketing could only be configured via Flink’s WITH clause, and every connector implemented its own syntax for defining the bucketing.

Let’s take a look at a Flink SQL Kafka Table DDL.

Before FLIP-376:

CREATE TABLE KafkaTable (
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'key.format' = 'json',
  'key.fields' = 'user_id;item_id',
  'value.format' = 'json',
  'properties.num.partitions' = '6',
)

After FLIP-376:

CREATE TABLE KafkaTable (
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'key.format' = 'json',
  'value.format' = 'json',
) DISTRIBUTED BY (user_id, item_id) INTO 6 BUCKETS

Materialized Tables

When users start with Flink SQL, they often have difficulties understanding the freshness of their data and the related cost implications for inserting data into a table. Either users define a streaming ingestion that offers the lowest latency, and runs continuously (which implies costly infrastructure running all the time), or they insert new data with a Flink batch job, which is potentially not adhering to data freshness requirements (e.g., SLOs).

FLIP-435 offers users the possibility to directly specify data freshness requirements in Flink itself without requiring an external system to manage it. This is achieved through the new concept of a MATERIALIZED table that is automatically refreshed in the background.

CREATE MATERIALIZED TABLE dwd_orders
(
   PRIMARY KEY(ds, id) NOT ENFORCED
)
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '3' MINUTE
AS SELECT 
  o.ds
  o.id,
  o.order_number,
  o.user_id,
...
FROM 
  orders as o
  LEFT JOIN products FOR SYSTEM_TIME AS OF proctime() AS prod
      ON o.product_id = prod.id
  LEFT JOIN order_pay AS pay
      ON o.id = pay.order_id and o.ds = pay.ds

After the DDL is executed, Flink either starts a streaming job to fulfill the freshness requirement, or schedules a batch job for periodic execution. The decision is made based on the new configuration materialized-table.refresh-mode.freshness-threshold. If the specified freshness is below the threshold, the table is continuously updated with a streaming job, or if it is larger, Flink schedules a periodic batch job.

Improvements to the DataStream API

FLIP-380 adds support for computing aggregations on non-keyed streams in parallel. This feature can only be used in batch mode, and brings to the DataStream API a feature that was until now only readily available with the DataSet API. This was needed because the DataSet API has been deprecated, and will be removed in Flink 2.0.

Operational Improvements

Unified File Merging for Checkpoints

As part of Flink’s recovery story, Flink writes periodic metadata (checkpoints) to persistent storage (e.g., blob storage), allowing it to recover in case of failure. A checkpoint consists of many files. Over the years, multiple different state file types have been added, leading to a growing number of separate files that a Flink job has to write to blob storage for a single checkpoint.

For large jobs (those with high parallelism and many operators), the number of files created during checkpoint can cause a significant load on the underlying blob storage. When using a cloud solution (e.g., S3, ADLS, etc.), the load directly translates to more requests and hence higher costs. When using self-hosted blob storage (e.g., HDFS, Minio), it translates into frequent overloads. 

With FLIP-306, Flink now offers multiple configurations to control the upload behavior of the state, in an attempt to minimize the number of files (e.g., state.checkpoints.file-merging to enable the feature, and state.checkpoints.file-merging.max-file-size to control the maximum checkpoint file size). 

In contrast to prior solutions, the new configuration applies to all types of state files, and is applicable to Flink state in general.

Compaction of Small SST Files

The Flink community has also fixed a longstanding issue with the RocksDB state backend (FLINK-26050). 

In rare situations, for example when using Flink processing-time windows that create many small non-overlapping RocksDB SST files, the continuously growing number of local SST files can eventually cause instabilities like FLINK-34430

To avoid relying solely on RocksDB’s auto-compaction, Flink now scans the local files periodically, and triggers a manual compaction when too many small files are detected.

Recovery from JobMaster Failures for Batch Jobs

Flink has long had support for recovering streaming jobs from JobMaster failures by restarting from checkpoints, but batch recovery has relied on restarting jobs from the beginning. This has meant that all previous progress is lost.

FLIP-383 addresses this by making it possible to recover the state from all tasks that had finished before the JobMaster failed.

Final Preparations for Flink 2.0

The community is doing its final preparations to prepare for Flink 2.0:

  • The legacy SinkFunction API has been deprecated

  • Roughly 30 other configuration options that are no longer useful have been deprecated, and some others have been reorganized, most notably the options concerning checkpoints, savepoints, state, and state recovery.

Acknowledgements

We would like to express gratitude to all of the contributors who made this release possible:

Ahmed Hamdy, Alan Sheinberg, Aleksandr Pilipenko, Alexander Fedulov, Andrey Gaskov, Antonio Vespoli, Anupam Aggarwal, Barak Ben-Nathan, Benchao Li, Brad, Cheng Pan, Chesnay Schepler, DamonXue, Danny Cranmer, David Christle, David Moravek, David Schlosnagle, Dawid Wysakowicz, Dian Fu, Dmitriy Linevich, Elphas Toringepi, Emre Kartoglu, Fang Yong, Feng Jin, Ferenc Csaky, Frank Yin, Gabor Somogyi, Gyula Fora, HCTommy, Hangxiang Yu, Hanyu Zheng, Hao Li, Hong Liang Teoh, Hong Teoh, HuangXingBo, Jacky Lau, James Hughes, Jane Chan, Jeyhun Karimov, Jiabao Sun, Jim Hughes, Jing Ge, Jinzhong Li, JunRuiLee, Juntao Hu, JustinLee, Kartikey Pant, Kumar Mallikarjuna, Leonard Xu, Lorenzo Affetti, Luke Chen, Martijn Visser, Mason Chen, Matthias Pohl, Mingliang Liu, Panagiotis Garefalakis, Peter Huang, Peter Vary, Piotr Nowojski, Puneet Duggal, Qinghui Xu, Qingsheng Ren, Ravi Dutt Singh, Robert Metzger, Robert Young, Roc Marshal, Roman, Roman Boyko, Roman Khachatryan, Ron, Rui Fan, Ryan Skraba, Samrat, Sergey Nuyanzin, Shilun Fan, Stefan Richter, SuDewei, Timo Walther, Ufuk Celebi, Vincent Woo, Wang FeiFan, Weijie Guo, Wencong Liu, Wouter Zorgdrager, Xiangyu Feng, Xintong Song, Xuyang, Yanfei Lei, Yangze Guo, Yu Chen, Yubin Li, Yuepeng Pan, Yun Tang, Yuxin Tan, Zakelly, Zhanghao Chen, Zhen Wang, Zhenqiu Huang, Zhu Zhu, Zmm, ammar-master, anupamaggarwal, bvarghese1, caicancai, caodizhou, chenzihao, drymatini, dsaisharath, eason.qin, elon-X, fengli, gongzhongqiang, hejufang, jectpro7, jiangxin, liming.1018, lincoln lee, liuyongvs, lxliyou001, oleksandr.nitavskyi, plugatarev, rmoff, slfan1989, spoon-lz, sunxia, sxnan, sychen, wforget, xiaogang, xingbo, yebukong, yunfengzhou-hub, yunhong, zhouyisha, 马越

  • Fabian Paul is a Senior Software Developer at Confluent and a Committer to the Apache Flink project. He is part of the team developing Confluent Platform for Apache Flink. Prior to joining Confluent, he worked at Databricks securing multiuser workloads on Apache Spark.

    He also worked at Ververica, where he was responsible for redesigning Apache Flink's sink framework, to build sinks for modern data lakes, e.g., Delta Lake, Apache Iceberg.

  • David Anderson is an Apache Flink Committer and a Software Practice Lead in the DevRel group at Confluent. Since discovering Apache Flink in 2016, he has helped countless companies get started with stream processing. Previously, he was a consulting data engineer, designing and building data pipelines for clients with a diverse set of use cases, including search engines, machine learning, and business analytics.

Did you like this blog post? Share it now