Enable the Matrix to Rocket.Chat interaction

You’ve completed one directional flow to deliver customer messages from Rocket.Chat to agents in Matrix. Now, you need to transfer agent responses in Matrix, back to customers in Rocket.Chat.

As previously pointed out, Camel’s collection of components does not include one for Matrix. Matrix offers a feature rich client-server API. The API is built around the notion of events, which describe something that has happened on the platform, such as the creation of a room, a user joining a room etc…​ The sync method of said API synchronizes the client’s state with the latest state on the server. By calling the sync API in a loop, the client (our Camel integration) can subscribe to events and act accordingly.

For simplicity, this part of the Matrix integration is already implemented. As mentioned, it calls the sync API in a loop, filters for events we are interested in (room leave events and room message events), and forwards the event to a Camel route.


1. Implement the Agent to Client flow

The listener described above is responsible to pick up agent messages posted in Matrix and direct then to the Camel route you need to implement to process the event.

In essence, our route needs to obtain from cache the context for this particular customer/agent conversation, prepare the JSON data containing the agent’s answer, and send it to the AMQ broker. The Rocket.chat integration will consume the event and deliver it to the customer.

What will I learn?

In the content that follows you will learn the following concepts:

  • How to perform simple changes on JSON data.

  • How to push events via AMQP to the Broker.

Ensure you’ve stopped your dev instance from the test in the previous section. If not stopped yet, from your terminal press Ctrl+C to stop it.

Start your implementation:

  1. From your Dev Spaces terminal, execute the kamel command below to create a new source file to process Matrix events:

    kamel init routes-from-matrix-main.yaml
    The new file has a YAML extension. Camel K automatically generates for you a skeleton using the YAML DSL (Domain Specific Language).


  2. Open the routes-from-matrix-main.yaml file in your editor.

  3. Delete the example route (full from definition)

  4. Replace (the deleted route) with the following snippet:

    #
    - route:
        from:
          uri: "direct:process-agent-message"                       # <1>
          steps:
            - setProperty:                                          # <2>
                name: text
                simple: ${body.get(text)}
            - setProperty:                                          # <2>
                name: agent
                simple: ${body.get(user)}
            - setProperty:                                          # <2>
                name: key
                simple: ${body.get(room)}
            - to:
                uri: "direct:cache-get"                             # <3>
            - choice:
                when:
                  - simple: ${body} != null                         # <4>
                    steps:
                      - to:
                          uri: "language:simple:${body.replace(text,${exchangeProperty.text})}"  # <5>
                          parameters:
                            transform: false
                      - to:
                          uri: "language:simple:${body.put(agent,${exchangeProperty.agent})}"    # <5>
                          parameters:
                            transform: false
                      - setProperty:                                             # <6>
                          name: source
                          simple: ${body.get(source).get(uname)}
                      - marshal:                                                 # <7>
                          json: {}
                      - toD:
                          uri: "amqp:topic:support.${exchangeProperty.source}"   # <8>
                          parameters:
                            connectionFactory: "#myFactory"
                      - setBody:
                          simple: '${exchangeProperty.agent}: ${exchangeProperty.text}'  # <9>
                      - removeHeaders:
                          pattern: "*"
                      - toD:
                          uri: kafka:support.${env.NAMESPACE}.matrix${exchangeProperty.key.replace(":","-").replace("!","-")}
                otherwise:                                                               # <10>
                  steps:
                    - log: "no cache entry, ignoring message from user: ${exchangeProperty.agent}"
    
    #
    Click here for details of the above route
    1 Defines the from element with the direct component to allow other Camel routes invoke it.
    2 Keeps necessary values (as properties) from Matrix’s event.

    The Matrix JSON event has already been un-marshalled for you.

    3 Fetches from the cache system the customer/agent context

    We use Matrix's room key` as our key to fetch the cache entry.

    4 Evaluates if the cache entry exists with a choice.
    • if true, it executes [5] to [9]

    • if false, it executes the otherwise block [10]

    5 When true, the cache payload is recycled, it updates the text field to contain the agent’s answer and also injects the agent’s name.

    There are many strategies in Camel to manipulate data. For minor changes on payloads the language component is very handy.

    6 Obtains from the cache entry the uname (customer’s unique name) which is necessary to route the event to the right destination.
    7 Marshals the Java Map in JSON.
    8 Sends the event over AMQP to the AMQ Broker.

    the call uses toD (Dynamic to) to evaluate at runtime the target AMQP address using the source property.

    The amqp component requires no extra parameters because it has been pre-configured for you, it’s secured with TLS and Scram, and points to the shared environment’s AMQ Broker.

    9 Finally, the interaction is recorded and streamed to Kafka
    • a payload in the format agent: text is prepared using Camel’s simple expression

    • pushes the message to Kafka.

      • Note the Kafka topic defined uses your NAMESPACE, again to prevent clashes with other students since you all share the same Kafka cluster.

      • The kafka component requires no extra parameters because it has been pre-configured for you, it’s secured with TLS and Scram, and points to the shared environment’s Kafka cluster.

    10 Lastly, when a cache entry does not exist, we ignore it.

    This is necessary in our lab to prevent other students from interfering with your tests. In a real-world implementation, you would perform the check anyway for robust error handling.


2. Implement the room leave event.

A crucial phase of the customer/agent interaction is when both parts agree on closing the conversation. At that point the expected sequence of actions is the following:

  • The agent manually leaves the room in Matrix

  • The customer receives a notification indicating the conversation has been closed.

When the agent leaves the room, Matrix fires a room leave event, which our listener picks up and directs to a route called process-room-leave-event

Let’s implement the logic required which is very similar to our previously defined route

Include in the same (routes-from-matrix-main.yaml) YAML file (copy and paste) the snippet below:

#
- route:
    from:
      uri: "direct:process-room-leave-event"
      steps:
        - log:
            message: ${body}
        - setProperty:
            name: key
            simple: ${body.get(room)}
        - setProperty:
            name: agent
            simple: ${body.get(user)}
        - to:
            uri: "direct:cache-get"   # <1>
        - choice:
            when:
              - simple: ${body} != null
                steps:
                  - to:
                      uri: "language:simple:${body.replace(text,'your session ended, conversation is now closed.')}"      # <2>
                      parameters:
                        transform: false
                  - to:
                      uri: "language:simple:${body.put(agent,'support')}"    # <2>
                      parameters:
                        transform: false
                  - setProperty:
                      name: source
                      simple: ${body.get(source).get(uname)}
                  - setProperty:
                      name: key-rocketchat
                      simple: ${body.get(source).get(room)}-${body.get(user)}
                  - setProperty:
                      name: kafka-client
                      simple: matrix${body.get(target).get(room).replace(":","-").replace("!","-")}
                  - marshal:
                      json: {}
                  - setProperty:
                      name: context
                      simple: ${bodyAs(String)}
                  - toD:
                      uri: "amqp:topic:support.${exchangeProperty.source}"    # <3>
                      parameters:
                        connectionFactory: "#myFactory"
                  - to:
                      uri: "direct:cache-remove"                              # <4>
                  - setProperty:
                      name: key
                      simple: ${exchangeProperty.key-rocketchat}
                  - to:
                      uri: "direct:cache-remove"                              # <5>
                  - setBody:
                      simple: done                                            # <6>
                  - removeHeaders:
                      pattern: "*"
                  - setHeader:
                      name: context
                      simple: ${exchangeProperty.context}                     # <6>
                  - toD:
                      uri: kafka:support.${env.NAMESPACE}.${exchangeProperty.kafka-client}     # <7>
                  - setBody:
                      simple: ${exchangeProperty.kafka-client}
                  - toD:
                      uri: "kafka:support.${env.NAMESPACE}.closed"                             # <8>
            otherwise:
              steps:
              - log: no cache entry, ignoring message

You will observe the route above is almost identical to the previous one.

Click here to view a summary of the differences
1 It also fetches from the cache system the customer/agent context.
2 It sends the closing event via AMQP, and proceeds [4] & [5] to delete the two cache entries relevant to this conversation:

Reminder: each customer/agent session owns 2 cache entries. One uses the source key, handy on customer-to-agent processing, and the second uses Matrix’s room key, handy for agent-to-customer processing.

the call uses toD (Dynamic to) to evaluate at runtime the target AMQP address using the source property.

3 It deletes the cache entry with source identifier (Rocket.Chat).
4 It deletes the cache entry with target identifier (Natrix).
5 Finally, it prepares body and headers to send two closure Kafka events [7] & [8].
6 The first event to Kafka contains the context information, sent to the conversation topic.
7 The second one is signal event, a notification that allows other applications to react.

You have completed the return processing flow of messages from agents (in Matrix’s Element) to customers (in Rocket.Chat). Next, deploy your integration in OpenShift and send some messages to validate it.


3. Deploy and test your code

With the Camel K client kamel you can deploy your integrations with one command. Camel K will take care of collecting all your sources, containerizing them and deploying an instance.

Let’s deploy your code .

  1. From your terminal, execute the following command:

    ./deploy.sh
    The deploy.sh scripts executes a kamel run command that defines all the necessary support resources and parameters to run your integration.
    Output
    matrix (main) $ ./deploy.sh
    No IntegrationPlatform resource in globex-camel-{user_name} namespace
    Integration "matrix" created
    You can ignore any warning message stating "Unable to verify existence of operator id [camel-k] due to lack of user privileges"


  2. You can inspect the logs by running the following command:

    kamel log matrix

    If you encounter errors or unexpected results, you might have missed a step following the instructions or done some other human error.
    If so, try again using the prebuilt code by running the following command. This code does the exact same logic that you implemented in the above steps.

    ./safe-deploy.sh
  1. From Matrix’s Element application:

    1. Click on the newly created channel rocketchat-{user_name} to display the messages.

    2. Type a message, for example:

      • My name is Bruno, how can I help you today?

      and send it.

  2. From Rocket.Chat…​

    You should see the agent’s message sent from Matrix appear in the Rocket.Chat channel.

  3. Exchange a few more messages to simulate a conversation.

  4. Then, from Matrix’s Element chat window, to close the session, follow these steps, as per the illustration below:

    1. Right click on the room rocketchat-{user_name}

    2. Click Leave

    3. Confirm your action to leave the room.

  5. In Rocket.Chat, as above on the right hand side, you should see a notification informing the session has ended.


Well done, you’ve completed the full integration, both ways, between Rocket.Chat and Matrix.

In contrast with running in DEV mode, the deploy.sh command made the Camel K operator to fully deploy your code in an OpenShift pod named matrix, which you can see running from the Topology view.

You can also use the kamel client from your terminal to obtain information about your deployed Camel K instances:

kamel get
Output
No IntegrationPlatform resource in globex-camel-{user_name} namespace
NAME    PHASE   KIT
matrix Running globex-camel-{user_name}/kit-chcc8ts5v3ov25mqg460