Building a GenAI App? Learn Tips in This Webinar! | Register Now

Optimize SaaS Integration with Fully Managed HTTP Connectors V2 for Confluent Cloud

作成者 :

Integrations that fail to deliver consistent performance and suffer from long processing times inevitably fall short of the high standards set by today's dynamic business environment. Providing a seamless integration of Apache Kafka® with various data sources and destinations is crucial in the fast-paced world of data streaming and real-time analytics.

Confluent offers an extensive suite of self-managed and fully managed connectors (explore our 130+ connectors in Confluent Hub) that facilitate seamless integration with Kafka, enabling real-time data flow and connectivity across diverse systems and applications.

If your unique use case doesn't align with the pre-built connectors, and you're contemplating how to orchestrate data streaming between Confluent Cloud and a data system that unveils an HTTP API, you can use Confluent Cloud’s fully managed HTTP Source V2 and HTTP Sink V2 connectors.  They are crafted to work seamlessly with HTTP APIs, allowing you to connect with any system that supports HTTP-based APIs.

The HTTP Source V2 connector allows you to ingest data from HTTP endpoints into Kafka topics while the HTTP Sink V2 connector enables you to deliver data from Kafka topics to HTTP endpoints. This flexibility makes them invaluable for integrating Kafka with a wide range of data sources and destinations—from homegrown microservices to third-party SaaS apps at any scale.

How can you leverage them in your data pipeline?

The no-code, fully managed, HTTP Source V2 and HTTP Sink V2 connectors can be tailored to cater to diverse data integration requirements with Confluent Cloud. They can serve as  a solution for all HTTP API-based use cases.

Though Confluent users leverage the fully managed HTTP Source V2 and HTTP Sink V2 connectors for a variety of use cases, the majority leverage them for the following:

  • Real-time analytics: Users can leverage the HTTP Source V2 connector to pull any data (typically financial data or application log data) into Kafka topics, enrich it using Apache Flink®, and then send it to downstream fraud detection systems or analytical systems using either the HTTP Sink V2 connector or any data warehouse sink connector like the BigQuery Sink V2 connector—enabling real-time analysis and response to potential threats.

  • Real-time SaaS app data synchronization: Users can leverage the HTTP Source V2 connector to pull data from various SaaS application data sources into Kafka topic(s). This data can then be synced with other SaaS applications from Kafka topic(s) using the HTTP Sink V2 connector. For example, customer email data from HubSpot can be pulled into Kafka topics using the HTTP Source V2 connector. This data can then be synced to a mail-based marketing application like Mailchimp for targeted marketing use cases.

  • Async microservices communication and workflow automation: The HTTP Source V2 and HTTP Sink V2 connectors can facilitate asynchronous communication between various microservices. Users can leverage the HTTP Source V2 connector to push event data from a microservice into Kafka topic(s). This data can then be communicated/posted to other microservice(s) from Kafka topic(s) using the HTTP Sink V2 connector or can be used to trigger a workflow using a functions connector like AWS Lambda Sink.

What’s new in the HTTP Source V2 and HTTP Sink V2 connectors?

In addition to the existing features in HTTP Source V1 and HTTP Sink V1 connectors, the following are some of the key features in the HTTP Source V2 and HTTP Sink V2 connectors..

The HTTP Source V2 and HTTP Sink V2 connectors offer:

  • The ability to configure up to 15 APIs with the same hostname and authorization mechanism, thereby diminishing the burden of managing multiple connectors with single endpoints.

  • A seamless experience to configure the connector using OpenAPI Specification 3.0.

  • The ability to test the API with query parameters while configuring the connector.

  • The ability to manage offsets.

  • Client-side field-level encryption.

Additionally, HTTP Sink V2 will support specifying ${topic}, ${key}, and fields from the Kafka record in the URL, params and/or body of the HTTP request. Users can define the structure of the body to avoid the need for additional simple transformations.

How can you get started?

Let’s explore how the HTTP Source V2 and HTTP Sink V2 connectors can be configured for a popular real-world use case.

Imagine a scenario where an e-commerce company “YoloBuy” wants to enhance its fraud detection capabilities. They decide to leverage Stripe for payment processing and Pinecone for advanced vector-based similarity searches to detect fraudulent transactions in real time.

As a prerequisite, we’ve built a console application that creates customer objects in Stripe for the popular “Suits” television series characters.

There are three major steps involved:

  1. Source data from Stripe to Confluent Cloud using a managed HTTP Source V2 connector.

  2. Generate vector embeddings using Confluent Cloud for Apache Flink® and Azure OpenAI. (Additional processing like data enrichment can be done using Flink as required before generating the vector embeddings.)

  3. Sink the data from Confluent Cloud to Pinecone using a managed HTTP Sink V2 connector.

Source data from Stripe using managed HTTP Source V2 connector

In this section, we’ll configure the HTTP Source V2 connector with CHAINING offset mode to capture data in real time using the Stripe Events API.

Here’s the HTTP Source V2 connector configuration for this scenario:

{
  "auth.type": "basic",
  "connection.user": "REPLACE_YOUR_USERNAME",
  "connection.password": "REPLACE_YOUR_PASSWORD",
  "connector.class": "HttpSourceV2",
  "http.api.base.url": "https://api.stripe.com",
  'kafka.api.key': REPLACE_YOUR_API_KEY,
  'kafka.api.secret': REPLACE_YOUR_API_SECRET,
  "kafka.auth.mode": "KAFKA_API_KEY",
  "name": 'stripe-events-demo',
  "output.data.format": "JSON_SR",
  "report.errors.as": "ERROR_STRING",
  "tasks.max": "1",
  "api1.http.api.path": "/v1/events",
  "api1.http.initial.offset": "0",
  "api1.http.offset.mode": "CHAINING",
  "api1.http.offset.json.pointer": "/created",
  "api1.http.request.parameters": "created[gt]=${offset}&limit=1",
  "api1.http.response.data.json.pointer": "/data/0/data/object",
  "api1.topics": "suits-events"
}

Apply vector embedding using Flink and Azure OpenAI

In this section, we demonstrate how to add vector embeddings to the events data we sourced from Stripe and store it in a Kafka topic.

To get started with using Flink SQL for AI models, refer to the documentation provided. Additionally, you need to create a Flink compute pool, which can be easily set up via the Confluent Cloud UI. 

First, we need to create a connection in Flink to Azure OpenAI to fetch the embedding model:

confluent flink connection create azureopenai-cli-connection-matrix   --cloud AZURE   --region eastus   --type azureopenai   --endpoint <YOUR_EMBEDDINGS_URL> --api-key <YOUR_AZURE_OPEN_AI_KEY>

After creating a connection to Azure OpenAI, we need to create a model in Flink to use that connection:

CREATE MODEL azure_embed_matrix
INPUT (text STRING)
OUTPUT (response ARRAY<FLOAT>)
WITH (
  'azureopenai.connection'='azureopenai-cli-connection-matrix',
  'provider'='azureopenai',
  'task'='text_generation'

);

Now, we create a Flink table with a schema designed for seamless integration with Pinecone.

CREATE TABLE `suits-character-names-vectorised` (
    vectors ARRAY<ROW<id STRING, `values` ARRAY<DOUBLE>>>
) WITH (
    'value.format' = 'json-registry'  -- Assuming this table writes out JSON to your sink (Kafka, etc.)

);

Now that all the setup to add vector embeddings is done, we just need to invoke the model created above.

INSERT INTO `suits-character-names-vectorised`
SELECT 
    ARRAY[ROW(name, CAST(prediction.vector AS ARRAY<DOUBLE>))] AS vectors
FROM `suits-events`, 

LATERAL TABLE(ml_predict('azure_embed_matrix', `name`)) AS prediction(vector);

You can check if the embedding got added properly to the new table by simply making a select * query.

select * from `suits-character-names-vectorised`;

Here’s a quick demo on how to set up the HTTP Source V2 connector and Flink (along with Azure OpenAI) for generating vector embeddings:

Sink the vectorized data to Pinecone

Since the vectorized data is already available in a Kafka topic, we can directly send it to Pinecone by creating an HTTP Sink V2 connector. At the time of writing, Pinecone supports TLS 1.2, so it's essential to configure the HTTP connector to use this protocol version. Additionally, Pinecone requires an Api-Key to be included in the request header. To ensure security, we’ll use the configuration for sensitive headers, which prevents the Api-Key from being logged anywhere in our system.

Note: By default, Azure AI uses a vector dimension of 1536, so the index created in Pinecone must match this dimension to ensure compatibility. For more details, refer to the documentation on Pinecone's upsert API.

{
 "connector.class": "HttpSinkV2",
 "kafka.auth.mode": "KAFKA_API_KEY",
 'kafka.api.key': "YOUR_API_KEY",
 'kafka.api.secret': "YOUR_API_SECRET,
 "name": 'PineconeSinkForSuits',
 "input.data.format": "JSON_SR",
 "topics": "suits-character-names-vectorised",
 "http.api.base.url": "<YOUR_PINECONE_INDEX_URL>",
 "https.ssl.enabled": "true",
 "https.ssl.protocol": "TLSv1.2",
 "apis.num": "1",
 "api1.http.api.path": "/vectors/upsert",
 "api1.http.request.method": "POST",
 "api1.topics": "suits-character-names-vectorised",
 "api1.http.request.headers": "Content-type:application/json",
 "api1.http.request.sensitive.headers": "Api-Key: YOUR_API_KEY",
 "api1.request.body.format": "json",
 "api1.max.batch.size": "1",
 "api1.batch.json.as.array": "false",
 "report.errors.as": "ERROR_STRING",
 "tasks.max": "1"
}

You can view the demo below:

What’s next?

Several features are on the roadmap for the HTTP Source V2 and HTTP Sink V2 connectors. Here are the major features that are tracking:

Features for both fully managed HTTP Source V2 and HTTP Sink V2:

  1. Postman Collection configuration: Ability to configure the connector using Postman Collections, simplifying the setup process.

  2. Freeform configuration: Ability to configure the connector without OpenAPI specification or Postman Collection.

  3. Support for HTTP/2 and HTTP/3 APIs: Compatibility with the latest HTTP protocols to enhance performance and security.

  4. SOAP protocol support: Enable the connectors to work with SOAP APIs, expanding their versatility.

Features specific to HTTP Source V2 connector:

  1. Timestamp-range-based querying: Allowing users to query data based on a range of timestamps, which is useful for incremental data fetching.

  2. API chaining: Enabling the chaining of a primary API with a secondary API, where the response from the primary API can be used as input for the secondary API.

  3. Bulk loading: Supporting bulk data loading operations, which can be scheduled at configurable intervals.

These enhancements are designed to make the HTTP Source V2 and HTTP Sink V2 connectors more robust and user-friendly, addressing the specific needs of our customers and improving their overall experience with Confluent Cloud.

Action required:

In due course, Confluent will deprecate the HTTP Source V1 and HTTP Sink V1 connectors. It is strongly recommended that users plan their migration to HTTP Source V2 and HTTP Sink V2. 

You can now transition from HTTP Sink V1 to HTTP Sink V2 by utilizing the open source tool available at Confluent HTTP Sink V2 Migration Tool.

‎ 

Apache®, Apache Kafka®, Kafka®, Apache Flink®, and Flink® are trademarks of Apache Software Foundation.

  • Jai Vignesh began his career as a developer before transitioning to a product manager (PM). As a PM, he has led the roadmap for multiple products, delighting customers and driving product success. Currently, Jai is part of the Confluent Connect PM team. In this role, he focuses on ensuring seamless connectivity between Confluent and various databases, data warehouses, and SaaS applications.

  • Nishant has been working as a Senior Software Engineer at Confluent for the past three years. In addition to contributing to the generic HTTP connector framework, he has worked on multiple cloud infrastructure components for Kafka Connect. Prior to joining Confluent, Nishant worked on backup and restore systems at Cohesity. He also worked at Microsoft and Mentor Graphics in the past.

  • Devesh is a Solutions Engineer in Presales at Confluent, specializing in helping organizations design scalable, real-time data streaming solutions with Apache Kafka. With a passion for simplifying complex architectures, he works closely with customers to drive innovation and maximize the value of event-driven systems.

このブログ記事は気に入りましたか?今すぐ共有