Integrations with Camel K

1. Build and deploy integrations with Camel K

Apache Camel is an open source integration framework that allows you to quickly and easily integrate various systems consuming or producing data. It is based on the well known Enterprise Integration patterns and allows you to define routing and mediation rules in a variety of domain-specific languages (such as Java, XML, Groovy, Kotlin, and YAML). It does so by providing over 300 components and connectors.

Apache Camel K is a lightweight integration framework built from Apache Camel that runs natively on Kubernetes or OpenShift and is specifically designed for microservice and serverless architectures. When using Camel K you can instantly run integration code written in Camel DSL on Kubernetes or OpenShift, without having to package the code into an application and building a container image.

In this workshop we leverage Camel and Camel K to bridge between the Kafka topics which contain the customer data change events and the aggregated orders, and the Cashback service.

The first integration we need is pretty simple: we need to consume the aggregated order records from the Kafka globex.order-aggregated topic, and call a REST endpoint on the Cashback service. No data transformation is required. A relatively simple integration like this one is ideally suited to be expressed in YAML.

  • In a browser window, navigate to the browser tab pointing to the Dev Spaces workspace you opened earlier to inspect and deploy the Debezium connector. If you don’t have a browser tab open on the Dev Spaces workspace, refer to the instructions in the Deploy the Debezium Connector section.

  • The Camel K connector for the aggregated orders is defined in the workshop/module-cdc/order-connector/cashback-order-connector.yaml file.

    devspaces workspace order connector
  • Take note of the following elements:

    • from: Camel integrations are defined as routes, a set of processing steps that are applied to a message as it travels from a source to a destination. An integration contains 1 or more routes. A route typically starts with a from statement, which defines the source of the route.

    • from.uri: the source of the route, typically expressed as a URI. The scheme (kafka) defines which connector to use. The {{ }} placeholders refer to properties defined in a properties file.

    • steps: the different steps in the integration. In this simple integration, the body contents of the incoming message is logged, and a couple of headers are set on the message.

    • to: the destination of the integration. In this case a HTTP endpoint on the Cashback service is called. The headers set previously determine how to handle the HTTP call (POST with JSON payload)

    • traits: the comment lines at the top of the file provide additional configuration settings for the integration. Here we define a property file (cashback-order-connector.properties) which contain the properties for the integration, as well as a secret which contains the connection details for the Kafka broker.

  • The connector YAML file can be deployed as such to the OpenShift cluster using the kamel CLI. Under the hood the CLI will transform the YAML file into an Integration Custom Resource. When deployed to OpenShift, the Camel K operator processes the Integration Custom Resource and transforms the Integration into a running application.

    • Go into the terminal of the Dev Spaces workspace. If you don’t have an open terminal, you can open a new one by selecting the devspaces menu icon on the top of the left menu, and selecting Terminal/New Terminal from the drop-down menu.

    • In the terminal, issue the following command:

      kamel run -n globex-cdc-{user_name} workshop/module-cdc/order-connector/cashback-order-connector.yaml --trait container.limit-memory=250Mi
      Output
      Modeline options have been loaded from source files
      Full command: kamel run -n globex--{user_name} workshop/module-cdc/order-connector/cashback-order-connector.yaml --trait container.limit-memory=250Mi --property=file:workshop/module-cdc/order-connector/cashback-order-connector.properties --dependency=camel:http --config=secret:kafka-client-secret
      No IntegrationPlatform resource in globex-cdc--{user_name} namespace
      Integration "cashback-order-connector" created
    • The Camel K operator starts building the integration and packages it in a container image. The first time this can take quite a while during which nothing seems to happen.
      One way to check that the integration is actually being built is by checking its status with the oc command line tool.
      In the terminal in Dev Spaces, you can issue the following command:

      oc get integration -n globex-cdc-{user_name}
      Output
      NAME                       PHASE          KIT                        REPLICAS
      cashback-order-connector   Building Kit   kit-cglu6cgm540hobmmt1r0
    • After a while (this can take a couple of minutes), the build is finished, and the integration moves to running state:

      oc get integration -n globex-cdc-{user_name}
      Output
      NAME                       PHASE     KIT                        REPLICAS
      cashback-order-connector   Running   kit-cglu6cgm540hobmmt1r0   1
    • At this point, the integration is deployed. In the Topology view of the OpenShift console, select the the globex-cdc-{user_name} namespace. You should see the integration that was just deployed:

      openshift console topology integration
    • You can inspect the logs of the pod to check that the connector is working as expected. To do so, click on the center of the deployment in the Topology view, and in the pop-up pane on the right, click View logs.

      openshift console topology integration logs
    • This opens a window with the logs of the pod. You should see a log statement for every Kafka message that was processed by the connector.

      2023-04-04 08:48:22,325 INFO [cam.yaml:4] (Camel (camel-1) thread #1 - KafkaConsumer[globex.order-aggregated]) Order event received: {"orderId":93,"customer":"mhurst","date":"2023-04-04T08:37:11.430+0000","total":64.45}
      2023-04-04 08:48:22,417 INFO [cam.yaml:4] (Camel (camel-1) thread #1 - KafkaConsumer[globex.order-aggregated]) Order event received: {"orderId":94,"customer":"amurphy","date":"2023-04-04T08:37:11.436+0000","total":89.3}
      2023-04-04 08:48:22,422 INFO [cam.yaml:4] (Camel (camel-1) thread #1 - KafkaConsumer[globex.order-aggregated]) Order event received: {"orderId":95,"customer":"eburke","date":"2023-04-04T08:37:11.520+0000","total":61.75}
      2023-04-04 08:48:22,426 INFO [cam.yaml:4] (Camel (camel-1) thread #1 - KafkaConsumer[globex.order-aggregated]) Order event received: {"orderId":96,"customer":"fflores","date":"2023-04-04T08:37:11.615+0000","total":37.5}
      2023-04-04 08:48:22,429 INFO [cam.yaml:4] (Camel (camel-1) thread #1 - KafkaConsumer[globex.order-aggregated]) Order event received: {"orderId":97,"customer":"aoconnell2","date":"2023-04-04T08:37:11.621+0000","total":86.6}
      2023-04-04 08:48:22,518 INFO [cam.yaml:4] (Camel (camel-1) thread #1 - KafkaConsumer[globex.order-aggregated]) Order event received: {"orderId":98,"customer":"rkennedy","date":"2023-04-04T08:37:11.627+0000","total":149.0}
      2023-04-04 08:48:22,522 INFO [cam.yaml:4] (Camel (camel-1) thread #1 - KafkaConsumer[globex.order-aggregated]) Order event received: {"orderId":99,"customer":"onorris","date":"2023-04-04T08:37:11.633+0000","total":100.7}
      2023-04-04 08:48:22,526 INFO [cam.yaml:4] (Camel (camel-1) thread #1 - KafkaConsumer[globex.order-aggregated]) Order event received: {"orderId":100,"customer":"ejackson","date":"2023-04-04T08:37:11.717+0000","total":11.0}
      2023-04-04 08:48:22,530 INFO [cam.yaml:4] (Camel (camel-1) thread #1 - KafkaConsumer[globex.order-aggregated]) Order event received: {"orderId":101,"customer":"mmitchell","date":"2023-04-04T08:37:11.722+0000","total":140.8}

The second integration we need is slightly more complex: we need to consume the change events from the customer table from the Kafka topic, determine whether the change event corresponds to a create/read or update change, transform the data and finally call a REST endpoint (POST for create, PUT for update) on the Cashback service.
This time the integration logic is expressed in Groovy, a dynamic language for the Java virtual machine.

  • In a browser window, navigate to the browser tab pointing to the Dev Spaces workspace you opened earlier. If you don’t have a browser tab open on the Dev Spaces workspace, refer to the instructions in the Deploy the Debezium Connector section.

  • The Camel K integration for the customer change events is defined in the workshop/module-cdc/customer-connector/cashback-customer-connector.groovy file.

    devspaces workspace customer connector

    Reading through the code should give you an idea what the code actually does.
    In a nutshell, messages are consumed from the Kafka topic and marshalled into a JSON object. The payload is introspected using JSONPath to determine the nature of the change event, and set headers on the message accordingly. Finally the payload for the REST call is built and the REST endpoint called.

  • The deployment of the integration is very similar to what you did for the order integration.

    • Go into the terminal of the Dev Spaces workspace. If you don’t have an open terminal, you can open a new one by selecting the devspaces menu icon on the top of the left menu, and selecting Terminal/New Terminal from the drop-down menu.

    • In the terminal, issue the following command:

      kamel run -n globex-cdc-{user_name} workshop/module-cdc/customer-connector/cashback-customer-connector.groovy --trait container.limit-memory=512Mi
      Output
      Modeline options have been loaded from source files
      Full command: kamel run -n globex-cdc-{user_name} workshop/module-cdc/customer-connector/cashback-customer-connector.groovy --trait container.limit-memory=512Mi --dependency=camel:http --property=file:workshop/module-cdc/customer-connector/cashback-customer-connector.properties --config=secret:kafka-client-secret
      No IntegrationPlatform resource in globex-cdc-{user_name} namespace
      Integration "cashback-customer-connector" created
    • Follow the build process with the following command:

      oc get integration -n globex-cdc-{user_name}
      Output
      NAME                          PHASE          KIT                        REPLICAS
      cashback-customer-connector   Building Kit   kit-cgluf9om540hobmmt1rg
      cashback-order-connector      Running        kit-cglu6cgm540hobmmt1r0   1
    • The build process should be quite a lot faster than the the first one. After a while the integration proceeds to the running phase, and becomes visible in the Topology view of the OpenShift console:

      openshift console topology integration 2
    • Open the logs of the pod, by clicking on the deployment in the Topology view and selecting View logs from the popup pane on the right.
      You should see some log statements for every customer data change event processed by the connector.

      2023-04-04 09:03:30,628 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[globex.updates.public.customer]) Create customer arussell
      2023-04-04 09:03:30,629 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[globex.updates.public.customer]) Customer event received: {before=null, after={id=196, user_id=lsexton, first_name=Landon, last_name=Sexton, email=lsexton@firstsimple.com, phone=(302) 741-6817}, source={version=2.1.1.Final, connector=postgresql, name=globex.updates, ts_ms=1680596868520, snapshot=true, db=globex, sequence=[null,"24054160"], schema=public, table=customer, txId=1182, lsn=24054160, xmin=null}, op=r, ts_ms=1680596868929, transaction=null}
      2023-04-04 09:03:30,629 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[globex.updates.public.customer]) Create customer lsexton
      2023-04-04 09:03:30,631 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[globex.updates.public.customer]) Customer event received: {before=null, after={id=197, user_id=lortiz, first_name=Leila, last_name=Ortiz, email=lortiz@forfree.com, phone=(214) 450-3883}, source={version=2.1.1.Final, connector=postgresql, name=globex.updates, ts_ms=1680596868520, snapshot=true, db=globex, sequence=[null,"24054160"], schema=public, table=customer, txId=1182, lsn=24054160, xmin=null}, op=r, ts_ms=1680596868929, transaction=null}
      2023-04-04 09:03:30,631 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[globex.updates.public.customer]) Create customer lortiz
      2023-04-04 09:03:30,633 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[globex.updates.public.customer]) Customer event received: {before=null, after={id=198, user_id=arobinson, first_name=Arianna, last_name=Robinson, email=arobinson@firstsimple.com, phone=(458) 478-1118}, source={version=2.1.1.Final, connector=postgresql, name=globex.updates, ts_ms=1680596868520, snapshot=true, db=globex, sequence=[null,"24054160"], schema=public, table=customer, txId=1182, lsn=24054160, xmin=null}, op=r, ts_ms=1680596868929, transaction=null}
      2023-04-04 09:03:30,633 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[globex.updates.public.customer]) Create customer arobinson
      2023-04-04 09:03:30,634 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[globex.updates.public.customer]) Customer event received: {before=null, after={id=199, user_id=mperry, first_name=Maren, last_name=Perry, email=mperry@yihaa.com, phone=(916) 601-7486}, source={version=2.1.1.Final, connector=postgresql, name=globex.updates, ts_ms=1680596868520, snapshot=true, db=globex, sequence=[null,"24054160"], schema=public, table=customer, txId=1182, lsn=24054160, xmin=null}, op=r, ts_ms=1680596868929, transaction=null}
      2023-04-04 09:03:30,634 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[globex.updates.public.customer]) Create customer mperry
      2023-04-04 09:03:30,636 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[globex.updates.public.customer]) Customer event received: {before=null, after={id=200, user_id=mballard, first_name=Miguela, last_name=Ballard, email=mballard@random.com, phone=(484) 646-1017}, source={version=2.1.1.Final, connector=postgresql, name=globex.updates, ts_ms=1680596868520, snapshot=last_in_data_collection, db=globex, sequence=[null,"24054160"], schema=public, table=customer, txId=1182, lsn=24054160, xmin=null}, op=r, ts_ms=1680596868929, transaction=null}
      2023-04-04 09:03:30,636 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[globex.updates.public.customer]) Create customer mballard